本文最后更新于 2025-04-01,文章超过7天没更新,应该是已完结了~

1. RxJava 是什么?

RxJava(Reactive Extensions for Java)是一个 响应式编程库,用于 处理异步数据流。它的核心思想是 事件驱动 + 链式调用,适合处理:

  • 网络请求(异步 & 链式调用)

  • 流式数据处理(如 WebSocket, SSE)

  • 数据库操作(Room, SQLite)

  • UI 事件流(点击、输入等)

👉 简单来说:

  • 传统回调(callback)会让代码变得难以维护(回调地狱)。

  • RxJava 让 异步代码变得像写同步代码一样流畅,并且能 优雅地管理线程

2. RxJava 核心组件

RxJava 的核心由 四个部分 组成:

组件

作用

例子

Observable

事件源(数据流)

生产数据

Observer

观察者(订阅者)

消费数据

Operators

变换数据

map(), flatMap()

Schedulers

线程控制

Schedulers.io()


3. Observable & Observer(被观察者和观察者)

RxJava 遵循 观察者模式,即:

  • Observable(被观察者) 负责 发射(emit)数据

  • Observer(观察者) 负责 接收(subscribe)数据

基本示例

Observable<String> observable = Observable.create(emitter -> {
    emitter.onNext("Hello, RxJava!");
    emitter.onComplete();
});

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println("订阅成功");
    }
    
    @Override
    public void onNext(String s) {
        System.out.println("收到数据:" + s);
    }
    
    @Override
    public void onError(Throwable e) {
        System.out.println("发生错误:" + e.getMessage());
    }
    
    @Override
    public void onComplete() {
        System.out.println("数据接收完成");
    }
};

// 订阅(Observable -> Observer)
observable.subscribe(observer);

运行结果

订阅成功
收到数据:Hello, RxJava!
数据接收完成

👉 总结:

  • onNext():每次有数据都会触发

  • onComplete():数据流结束

  • onError():发生错误时调用

在 RxJava 中,不同的数据流处理方式对应不同的 Observable 类型,它们用于处理异步数据流。我们可以简单理解为:

  • Observable:处理多个数据(标准流)

  • Flowable:处理大量数据,支持背压(防止数据淹没)

  • Single:只返回 一个 数据(类似 API 请求)

  • Completable:不关心数据,只关心操作是否成功(如上传、删除)Observable:可以发射多个数据

Observable<T>RxJava 最常见的数据流,它可以连续不断地 发送多个数据(或者不发送)。

适用场景

  • 聊天消息流(一个个消息推送)

  • 股票价格更新(每秒更新一次)

  • 传感器数据(持续读取温度、湿度等)

Observable<String> observable = Observable.create(emitter -> {
    emitter.onNext("数据1");
    emitter.onNext("数据2");
    emitter.onNext("数据3");
    emitter.onComplete(); // 结束信号
});

observable.subscribe(
    item -> System.out.println("收到:" + item),
    error -> System.err.println("错误:" + error),
    () -> System.out.println("完成")
);
收到:数据1
收到:数据2
收到:数据3
完成

Flowable<T> 适用于 大数据流,支持背压机制(Backpressure),防止数据流量过大导致 OOM(内存溢出)。

适用场景

  • 日志流(可能有大量数据)

  • 大规模文件处理

  • 数据库查询返回大量数据

Flowable.range(1, 1000000)
    .observeOn(Schedulers.io()) // 异步执行
    .subscribe(item -> System.out.println("收到:" + item));

⚠️ 如果用 Observable 处理百万级数据,可能会崩溃,但 Flowable 可以处理!

Single<T> 只会发射 一个值(或者错误),适用于 一次性请求(比如 HTTP API 请求)。

适用场景

  • 网络请求

  • 数据库查询

  • 文件下载

  • 用户登录

Single<String> single = Single.create(emitter -> {
    emitter.onSuccess("请求成功,返回数据");
});

single.subscribe(
    data -> System.out.println("收到:" + data),
    error -> System.err.println("错误:" + error)
);
收到:请求成功,返回数据

Completable 不会发射任何数据,它只关心 操作是否成功完成(或者失败)。

适用场景

  • 文件上传

  • 删除数据库数据

  • 发送网络请求但不关心响应

  • 本地缓存数据

Completable completable = Completable.create(emitter -> {
    // 模拟执行任务(比如文件上传)
    Thread.sleep(2000);
    emitter.onComplete(); // 任务完成
});

completable.subscribe(
    () -> System.out.println("任务完成"),
    error -> System.err.println("错误:" + error)
);
任务完成

4. 线程调度(Schedulers)

RxJava 提供了一套强大的 线程调度机制,可以轻松地 在不同线程切换任务

常见的线程调度器

调度器

作用

Schedulers.io()

I/O 操作(如网络请求、文件读写)

Schedulers.computation()

计算任务(如数学计算、数据处理)

Schedulers.newThread()

每次创建一个新线程

AndroidSchedulers.mainThread()

Android 主线程(UI 操作)

示例:子线程执行网络请求,主线程更新 UI

Observable.just("网络数据")
    .subscribeOn(Schedulers.io())    // 在 IO 线程执行网络请求
    .observeOn(AndroidSchedulers.mainThread())  // 在主线程更新 UI
    .subscribe(result -> System.out.println("更新 UI:" + result));

运行结果

更新 UI:网络数据

👉 总结:

  • subscribeOn() 指定 Observable 执行的线程

  • observeOn() 指定 Observer 接收数据的线程


5. 操作符(Operators)

RxJava 的最大特点是 可以像操作数组一样操作数据流,常见的 数据转换操作符

操作符

作用

map()

转换数据

flatMap()

拆分 & 合并数据

filter()

过滤数据

concat()

串行执行多个 Observable

merge()

并行合并多个 Observable

示例:使用 map() 进行数据转换

Observable.just(1, 2, 3)
    .map(num -> "转换成字符串:" + num)
    .subscribe(System.out::println);

运行结果

转换成字符串:1
转换成字符串:2
转换成字符串:3

👉 总结:

  • map() 对数据进行转换,类似 Java 的 stream().map()


示例:使用 flatMap() 展开多个网络请求

flatMap() 可以 把一个数据变成多个数据流,用于处理 依赖关系(如请求嵌套)

Observable.just("用户ID 123")
    .flatMap(userId -> {
        // 根据用户 ID 发起网络请求
        return Observable.just("用户详情:" + userId);
    })
    .subscribe(System.out::println);

运行结果

用户详情:用户ID 123

👉 总结:

  • flatMap() 展开多个异步任务,适合级联请求(如:先获取用户 ID,再获取用户详情)

6. 实战案例

(1) 处理多个 API 请求

假设你要:

  1. 先请求 用户信息

  2. 再请求 用户订单

  3. 处理数据后 更新 UI

getUserInfo()
    .flatMap(user -> getUserOrders(user.getId()))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(orders -> System.out.println("订单数据:" + orders));

👉 流程

  • getUserInfo() 获取用户信息

  • flatMap() 再用 user.getId() 查询订单

  • subscribeOn(Schedulers.io())IO 线程请求

  • observeOn(AndroidSchedulers.mainThread())主线程更新 UI


(2) 处理错误(onErrorResumeNext()

Observable.just(10)
    .map(num -> num / 0)  // 触发异常
    .onErrorResumeNext(Observable.just(1))  // 发生错误时返回默认值
    .subscribe(System.out::println);

输出

1

👉 onErrorResumeNext() 可以在发生错误时提供备用方案

RxJava + Retrofit 结合示例

public interface ChatApi {
    @GET("chat")
    Observable<ChatResponse> getChatResponse();
}

// 订阅并处理数据
chatApi.getChatResponse()
    .subscribeOn(Schedulers.io()) // 在 IO 线程执行请求
    .observeOn(AndroidSchedulers.mainThread()) // 在主线程更新 UI
    .subscribe(
        response -> System.out.println("收到消息:" + response.getText()),
        error -> System.err.println("请求失败:" + error.getMessage())
    );
  • .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) 让 Retrofit 支持 RxJava

  • 让 API 直接返回 Observable<T>,不再需要 Call<T>

  • 结合 RxJava 线程调度,异步处理更加优雅 🚀

这样,你的网络请求就可以完美支持响应式编程了! 🎯