技术经验谈 技术经验谈
首页
  • 最佳实践

    • 抓包
    • 数据库操作
  • ui

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 总纲
  • 整体开发框架
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

hss01248

一号线程序员
首页
  • 最佳实践

    • 抓包
    • 数据库操作
  • ui

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 总纲
  • 整体开发框架
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 最佳实践

  • ui

  • 优化

  • aop

  • apm

  • 架构

  • webview

  • rxjava

    • Rxjava
      • 类:
        • 定义:
        • 链式调用之创建:
        • 变换/Map:
        • 订阅时切换线程:
        • 统一切换下游线程:
      • 使用:
        • 将中间注释打开,也就是多次切换线程:
        • 子线程
      • 工程化->如何保证使用Rxjava绝对不崩溃?
      • 链式调用流程中,Observable数量的变化方法:
    • Rxjava对日常callback形式代码的改造
  • activity-fragment-view的回调和日志
  • Android加密相关
  • Android命令行操作
  • app后台任务
  • kotlin
  • kotlin漫谈
  • kotlin语言导论
  • sentry上传mapping.txt文件
  • so放于远程动态加载方案
  • states
  • Xposed模块开发
  • 一个关于manifest合并的猥琐操作
  • 玩坏android存储
  • 获取本app的安装来源信息
  • Android
  • rxjava
hss01248
2022-04-02
目录

Rxjava

# Rxjava

# 根本原理

自己写代码实现rxjava,怎么做? 如何实现: 链式,线程切换,各操作符? 一个类+两个接口即可演示完核心原理:

image-20210111194924736

参考视频教程: https://www.bilibili.com/video/BV19h411172f?p=2

demo代码: https://github.com/hss01248/aop-android/tree/master/rxjavademo/src/main/java/com/hss01248/rxjavademo

public interface MyFunc<R,T> {
    R apply(T t);
}

public interface MyObserver<T> {

    void onNext(T t);

    void onError(Throwable throwable);

    void onComplete();
}
1
2
3
4
5
6
7
8
9
10
11
12

# 类:

# 定义:

public abstract class MyObservable<T> {

    public  abstract void subscrib(MyObserver<T> observer);
 }
1
2
3
4

# 链式调用之创建:

public static <T> MyObservable<T> create(MyObservable<T> observable){
    return observable;
}
1
2
3

# 变换/Map:

public <R> MyObservable<R>  map(MyFunc<R,T> func){
        return new MyObservable<R>() {
            @Override
            public void subscrib(MyObserver<R> observer) {
                MyObservable.this.subscrib(new MyObserver<T>() {
                    @Override
                    public void onNext(T t) {
                        //往下执行时,进行函数调用
                        R r = func.apply(t);
                        observer.onNext(r);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        observer.onError(throwable);
                    }

                    @Override
                    public void onComplete() {
                        observer.onComplete();
                    }
                });
            }
        };
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 订阅时切换线程:

/**
 * 切换上流线程示例
 * @return
 */
public MyObservable<T> subscribOnIO(){
  return  new MyObservable<T>(){
        @Override
        public void subscrib(MyObserver<T> observer) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //往上订阅时切换线程
                    MyObservable.this.subscrib(observer);
                }
            }).start();
        }
    };
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 统一切换下游线程:

public MyObservable<T> observerOnMainThread(){
       return new MyObservable<T>(){
            @Override
            public void subscrib(MyObserver<T> observer) {
                MyObservable.this.subscrib(new MyObserver<T>() {
                    @Override
                    public void onNext(T t) {
                        new Handler(Looper.getMainLooper()).post(new Runnable() {
                            @Override
                            public void run() {
                                //往下执行时切换线程
                                observer.onNext(t);
                            }
                        });
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        //同上
                    }

                    @Override
                    public void onComplete() {
                        //同上
                    }
                });
            }
        };
    }

public MyObservable<T> observerOnBackThread(){
        return new MyObservable<T>(){
            @Override
            public void subscrib(MyObserver<T> observer) {
                MyObservable.this.subscrib(new MyObserver<T>() {
                    @Override
                    public void onNext(T t) {
                        new Thread(new Runnable() {
                            @Override
                            public void run() {
                                //往下执行时切换线程
                                observer.onNext(t);
                            }
                        }).start();
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        //同上
                    }

                    @Override
                    public void onComplete() {
                        //同上
                    }
                });
            }
        };
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# subscrib和onNext的调用时机和线程

subscrib什么时候调用

onNext什么时候调用

如下图,先顺序执行,再逆序执行,再顺序执行

wecom-temp-cd9fbfbfcb98025a19f24b0dfe21ae26

比如,下方的调用,aop日志如下:

Xnip2022-04-02_11-29-48

# 使用:

 MyObservable.create(new MyObservable<String>() {
            @Override
            public void subscrib(MyObserver<String> observer) {
                Log.d("create:", "," + Thread.currentThread().getName());
                for (int i = 0; i < 10; i++) {
                    observer.onNext(i + "");
                }


            }
        })//.subscribOnIO() //(打开时为上方截图的调用链)
   .map(new MyFunc<Integer, String>() {
            @Override
            public Integer apply(String s) {
                Log.d("map1:", "integer:" + s + "," + Thread.currentThread().getName());
                return Integer.parseInt(s) * 2;
            }
        })
        .map(new MyFunc<String, Integer>() {
            @Override
            public String apply(Integer integer) {
                Log.d("map2:", "integer:" + integer + "," + Thread.currentThread().getName());
                return integer + "->String";
            }
        }).subscribOnIO()
        .observerOnMainThread()
        .subscrib(new MyObserver<String>() {
            @Override
            public void onNext(String s) {
                Log.d("onNext:", "finnaly:" + s + "," + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();

            }

            @Override
            public void onComplete() {
                Log.d("onComplete:", "finnaly: onComplete");
            }
        });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

Log:

image-20210111195655731

# 将中间注释打开,也就是多次切换线程:

.subscribOnIO() .observerOnMainThread()

image-20210111200008825

# 子线程

}).map(new MyFunc<Integer, String>() {
    @Override
    public Integer apply(String s) {
        Log.d("map1:", "integer:" + s + "," + Thread.currentThread().getName());
        return Integer.parseInt(s) * 2;
    }
}).subscribOnIO()//上面使用子线程
.observerOnMainThread()//后续使用主线程
.map(new MyFunc<String, Integer>() {
    @Override
    public String apply(Integer integer) {
        Log.d("map2:", "integer:" + integer + "," + Thread.currentThread().getName());
        return integer + "->String";
    }
})
.observerOnBackThread()//后续切到子线程
.subscrib(new MyObserver<String>() {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

image-20210111200259162

# 一些问题

# 工程化->如何保证使用Rxjava绝对不崩溃?

  • 对MyObserver的onNext(),onComplete()进行try-catch, catch后调用onError()
  • 如果onError里也崩溃呢? 对onError也try-catch包裹,提供一个全局处理异常的回调,供框架使用者初始化时调用.

# 链式调用流程中,Observable数量的变化方法:

  • 一个Observable怎么样能在链式流中变成多个Observable?
  • 上面已经变成的多个Observable怎么能够在下游组合成一个Observable?以及怎么组合成一次回调?

# 一些经典函数方法的实现:

//todo

# 学习资料/博客

https://github.com/Carson-Ho/RxJavaLearningMaterial

# 代码示例

# 学习Demo app

https://github.com/KunMinX/RxJava2-Operators-Magician

https://github.com/leeowenowen/rxjava-examples

https://github.com/xinghongfei/Hello-RxJava

# 开发工具

https://github.com/akaita/RxJava2Debug exception中显示真正的原因

https://github.com/davidmoten/rxjava2-extras 各种transformers和工具

https://github.com/akarnokd/RxJavaExtensions

https://github.com/skyNet2017/frodo2 logging RxJava (opens new window) Observables (opens new window) and Subscribers (opens new window) outputs on the logcat

# Android开发套件

https://github.com/Piasy/RxScreenshotDetector

rxjava基于集合的并发

https://www.jianshu.com/p/1e4d194bb782

# 存储和缓存相关库

https://github.com/Gridstone/RxStore

https://github.com/VictorAlbertos/RxCache

# 视频

https://www.bilibili.com/video/BV1Wt411d7N2 Java编程方法论-响应式 之 Rxjava2源码设计实现解读 全集(已完结)

https://www.bilibili.com/video/BV1iT4y137PZ

image-20200615225433728

image-20200615225523090

#

编辑 (opens new window)
上次更新: 2022/08/25, 20:20:31
webview权限适配和getUserMedia适配
Rxjava对日常callback形式代码的改造

← webview权限适配和getUserMedia适配 Rxjava对日常callback形式代码的改造→

最近更新
01
截图后的自动压缩工具
12-27
02
图片视频文件根据exif批量重命名
12-27
03
chatgpt图片识别描述功能
02-20
更多文章>
Theme by Vdoing | Copyright © 2020-2025 | 粤ICP备20041795号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式