RxJava 认识
本文最后更新于 2025-04-01,文章超过7天没更新,应该是已完结了~
1. RxJava 是什么?
RxJava(Reactive Extensions for Java)是一个 响应式编程库,用于 处理异步数据流。它的核心思想是 事件驱动 + 链式调用,适合处理:
网络请求(异步 & 链式调用)
流式数据处理(如 WebSocket, SSE)
数据库操作(Room, SQLite)
UI 事件流(点击、输入等)
👉 简单来说:
传统回调(callback)会让代码变得难以维护(回调地狱)。
RxJava 让 异步代码变得像写同步代码一样流畅,并且能 优雅地管理线程。
2. RxJava 核心组件
RxJava 的核心由 四个部分 组成:
3. Observable & Observer(被观察者和观察者)
RxJava 遵循 观察者模式,即:
Observable
(被观察者) 负责 发射(emit)数据Observer
(观察者) 负责 接收(subscribe)数据
基本示例
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Hello, RxJava!");
emitter.onComplete();
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("订阅成功");
}
@Override
public void onNext(String s) {
System.out.println("收到数据:" + s);
}
@Override
public void onError(Throwable e) {
System.out.println("发生错误:" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("数据接收完成");
}
};
// 订阅(Observable -> Observer)
observable.subscribe(observer);
✅ 运行结果
订阅成功
收到数据:Hello, RxJava!
数据接收完成
👉 总结:
onNext()
:每次有数据都会触发onComplete()
:数据流结束onError()
:发生错误时调用
在 RxJava 中,不同的数据流处理方式对应不同的 Observable
类型,它们用于处理异步数据流。我们可以简单理解为:
Observable
:处理多个数据(标准流)Flowable
:处理大量数据,支持背压(防止数据淹没)Single
:只返回 一个 数据(类似 API 请求)Completable
:不关心数据,只关心操作是否成功(如上传、删除)Observable:可以发射多个数据
Observable<T>
是 RxJava 最常见的数据流,它可以连续不断地 发送多个数据(或者不发送)。
✅ 适用场景
聊天消息流(一个个消息推送)
股票价格更新(每秒更新一次)
传感器数据(持续读取温度、湿度等)
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("数据1");
emitter.onNext("数据2");
emitter.onNext("数据3");
emitter.onComplete(); // 结束信号
});
observable.subscribe(
item -> System.out.println("收到:" + item),
error -> System.err.println("错误:" + error),
() -> System.out.println("完成")
);
收到:数据1
收到:数据2
收到:数据3
完成
Flowable<T>
适用于 大数据流,支持背压机制(Backpressure),防止数据流量过大导致 OOM(内存溢出)。
✅ 适用场景
日志流(可能有大量数据)
大规模文件处理
数据库查询返回大量数据
Flowable.range(1, 1000000)
.observeOn(Schedulers.io()) // 异步执行
.subscribe(item -> System.out.println("收到:" + item));
⚠️ 如果用 Observable
处理百万级数据,可能会崩溃,但 Flowable
可以处理!
Single<T>
只会发射 一个值(或者错误),适用于 一次性请求(比如 HTTP API 请求)。
✅ 适用场景
网络请求
数据库查询
文件下载
用户登录
Single<String> single = Single.create(emitter -> {
emitter.onSuccess("请求成功,返回数据");
});
single.subscribe(
data -> System.out.println("收到:" + data),
error -> System.err.println("错误:" + error)
);
收到:请求成功,返回数据
Completable
不会发射任何数据,它只关心 操作是否成功完成(或者失败)。
✅ 适用场景
文件上传
删除数据库数据
发送网络请求但不关心响应
本地缓存数据
Completable completable = Completable.create(emitter -> {
// 模拟执行任务(比如文件上传)
Thread.sleep(2000);
emitter.onComplete(); // 任务完成
});
completable.subscribe(
() -> System.out.println("任务完成"),
error -> System.err.println("错误:" + error)
);
任务完成
4. 线程调度(Schedulers)
RxJava 提供了一套强大的 线程调度机制,可以轻松地 在不同线程切换任务。
常见的线程调度器
示例:子线程执行网络请求,主线程更新 UI
Observable.just("网络数据")
.subscribeOn(Schedulers.io()) // 在 IO 线程执行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程更新 UI
.subscribe(result -> System.out.println("更新 UI:" + result));
✅ 运行结果
更新 UI:网络数据
👉 总结:
subscribeOn()
指定 Observable 执行的线程observeOn()
指定 Observer 接收数据的线程
5. 操作符(Operators)
RxJava 的最大特点是 可以像操作数组一样操作数据流,常见的 数据转换操作符:
示例:使用 map()
进行数据转换
Observable.just(1, 2, 3)
.map(num -> "转换成字符串:" + num)
.subscribe(System.out::println);
✅ 运行结果
转换成字符串:1
转换成字符串:2
转换成字符串:3
👉 总结:
map()
对数据进行转换,类似 Java 的stream().map()
示例:使用 flatMap()
展开多个网络请求
flatMap()
可以 把一个数据变成多个数据流,用于处理 依赖关系(如请求嵌套)。
Observable.just("用户ID 123")
.flatMap(userId -> {
// 根据用户 ID 发起网络请求
return Observable.just("用户详情:" + userId);
})
.subscribe(System.out::println);
✅ 运行结果
用户详情:用户ID 123
👉 总结:
flatMap()
展开多个异步任务,适合级联请求(如:先获取用户 ID,再获取用户详情)
6. 实战案例
(1) 处理多个 API 请求
假设你要:
先请求 用户信息
再请求 用户订单
处理数据后 更新 UI
getUserInfo()
.flatMap(user -> getUserOrders(user.getId()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(orders -> System.out.println("订单数据:" + orders));
👉 流程
getUserInfo()
获取用户信息flatMap()
再用user.getId()
查询订单subscribeOn(Schedulers.io())
在 IO 线程请求observeOn(AndroidSchedulers.mainThread())
在 主线程更新 UI
(2) 处理错误(onErrorResumeNext()
)
Observable.just(10)
.map(num -> num / 0) // 触发异常
.onErrorResumeNext(Observable.just(1)) // 发生错误时返回默认值
.subscribe(System.out::println);
✅ 输出
1
👉 onErrorResumeNext()
可以在发生错误时提供备用方案
RxJava + Retrofit 结合示例
public interface ChatApi {
@GET("chat")
Observable<ChatResponse> getChatResponse();
}
// 订阅并处理数据
chatApi.getChatResponse()
.subscribeOn(Schedulers.io()) // 在 IO 线程执行请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程更新 UI
.subscribe(
response -> System.out.println("收到消息:" + response.getText()),
error -> System.err.println("请求失败:" + error.getMessage())
);
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
让 Retrofit 支持 RxJava让 API 直接返回
Observable<T>
,不再需要Call<T>
结合 RxJava 线程调度,异步处理更加优雅 🚀
这样,你的网络请求就可以完美支持响应式编程了! 🎯
- 感谢你赐予我前进的力量