React编程入门示例:RxJava深度解析
文章目录
4.1 RxJava示例
RxJava是ReactiveX在Java虚拟机上的实现,它使用可观察序列来构建异步和基于事件的程序。RxJava提供了丰富的操作符来处理异步数据流,使开发者能够以声明式的方式组合异步操作。
4.1.1 创建Observable
Observable是RxJava中的基本构建块,代表一个可观察的数据源,能够发射0到N个数据项,然后可能以一个完成或错误通知终止。
基本创建方式
-
Observable.just():创建一个发射指定值的Observable
Observable<String> observable = Observable.just(\"Hello\", \"World\");
-
Observable.fromIterable():从集合创建Observable
List<String> list = Arrays.asList(\"Apple\", \"Banana\", \"Cherry\");Observable<String> observable = Observable.fromIterable(list);
-
Observable.range():创建一个发射特定整数序列的Observable
Observable<Integer> observable = Observable.range(1, 5); // 1, 2, 3, 4, 5
-
Observable.interval():创建一个按固定时间间隔发射整数序列的Observable
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS); // 0, 1, 2... 每秒
-
Observable.create():自定义Observable创建
Observable<String> observable = Observable.create(emitter -> { emitter.onNext(\"First\"); emitter.onNext(\"Second\"); emitter.onComplete();});
-
Observable.empty():创建一个不发射任何数据但正常终止的Observable
Observable<String> observable = Observable.empty();
-
Observable.error():创建一个不发射任何数据但以错误终止的Observable
Observable<String> observable = Observable.error(new RuntimeException(\"Error occurred\"));
-
Observable.never():创建一个不发射任何数据也不终止的Observable
Observable<String> observable = Observable.never();
高级创建方式
-
Observable.defer():延迟创建,直到有观察者订阅
Observable<Long> observable = Observable.defer(() -> Observable.just(System.currentTimeMillis()));
-
Observable.fromCallable():从Callable创建,适合可能有异常抛出的场景
Observable<String> observable = Observable.fromCallable(() -> { if (Math.random() > 0.5) { throw new IOException(\"Random error\"); } return \"Success\";});
-
Observable.fromFuture():从Future创建
Future<String> future = Executors.newSingleThreadExecutor() .submit(() -> \"Result from Future\");Observable<String> observable = Observable.fromFuture(future);
-
Observable.generate():同步生成复杂流
Observable<Integer> observable = Observable.generate( () -> 0, // 初始状态 (state, emitter) -> { emitter.onNext(state); if (state == 10) { emitter.onComplete(); } return state + 1; });
-
Observable.merge():合并多个Observable
Observable<String> first = Observable.just(\"A\", \"B\", \"C\");Observable<String> second = Observable.just(\"1\", \"2\", \"3\");Observable<String> merged = Observable.merge(first, second);
-
Observable.concat():顺序连接多个Observable
Observable<String> concatenated = Observable.concat(first, second);
-
Observable.zip():组合多个Observable
Observable<String> zipped = Observable.zip(first, second, (f, s) -> f + s);
特殊用途Observable
-
Subject:既是Observable又是Observer
-
PublishSubject:向所有订阅者广播所有后续事件
PublishSubject<String> subject = PublishSubject.create();subject.onNext(\"Hello\");subject.subscribe(System.out::println); // 不会收到\"Hello\"subject.onNext(\"World\"); // 会打印\"World\"
-
BehaviorSubject:向新订阅者发送最近的一个事件
BehaviorSubject<String> subject = BehaviorSubject.createDefault(\"Initial\");subject.onNext(\"First\");subject.subscribe(System.out::println); // 立即收到\"First\"
-
ReplaySubject:向所有订阅者重放所有事件
ReplaySubject<String> subject = ReplaySubject.create();subject.onNext(\"Hello\");subject.subscribe(System.out::println); // 会收到\"Hello\"
-
AsyncSubject:只在完成时发送最后一个事件
AsyncSubject<String> subject = AsyncSubject.create();subject.onNext(\"Hello\");subject.onNext(\"World\");subject.subscribe(System.out::println); // 不会立即收到subject.onComplete(); // 会打印\"World\"
-
-
ConnectableObservable:需要调用connect()才开始发射数据
ConnectableObservable<String> connectable = Observable.just(\"A\", \"B\", \"C\").publish();connectable.subscribe(System.out::println);connectable.connect(); // 此时才开始发射数据
4.1.2 订阅Observer
Observer是RxJava中的消费者,用于接收Observable发射的数据和通知。
基本订阅方式
-
简单订阅
Observable.just(\"Hello\").subscribe();
-
带Consumer的订阅
Observable.just(\"Hello\") .subscribe( value -> System.out.println(\"Received: \" + value), // onNext error -> System.err.println(\"Error: \" + error), // onError () -> System.out.println(\"Completed\") // onComplete );
-
使用Observer接口
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println(\"Subscribed\"); } @Override public void onNext(String s) { System.out.println(\"Received: \" + s); } @Override public void onError(Throwable e) { System.err.println(\"Error: \" + e); } @Override public void onComplete() { System.out.println(\"Completed\"); }};Observable.just(\"Hello\", \"World\").subscribe(observer);
-
使用Disposable控制订阅
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS) .subscribe(System.out::println);// 在需要时取消订阅disposable.dispose();
背压处理
RxJava 2.x引入了Flowable来处理背压(Backpressure),当数据生产速度大于消费速度时:
-
Flowable基本使用
Flowable.range(1, 1000) .onBackpressureBuffer() // 缓冲策略 .observeOn(Schedulers.io()) .subscribe(System.out::println);
-
背压策略
- BUFFER:缓冲所有数据(可能导致OOM)
- DROP:丢弃无法处理的数据
- LATEST:只保留最新数据
- ERROR:抛出MissingBackpressureException
- MISSING:不实现背压,由下游处理
Flowable.interval(1, TimeUnit.MILLISECONDS) .onBackpressureDrop(dropped -> System.out.println(\"Dropped: \" + dropped)) .observeOn(Schedulers.computation()) .subscribe(System.out::println);
调度器控制
RxJava使用Schedulers控制线程:
-
subscribeOn:指定Observable操作执行的线程
Observable.just(\"Hello\") .subscribeOn(Schedulers.io()) // 在IO线程执行 .subscribe(System.out::println);
-
observeOn:指定Observer接收数据的线程
Observable.range(1, 5) .observeOn(Schedulers.computation()) // 在计算线程接收 .subscribe(System.out::println);
-
常用调度器
- Schedulers.io():适合I/O操作(无界线程池)
- Schedulers.computation():适合计算操作(固定大小线程池)
- Schedulers.newThread():为每个任务创建新线程
- Schedulers.single():单一线程顺序执行
- Schedulers.trampoline():在当前线程排队执行
- Schedulers.from(Executor):自定义Executor
组合订阅
-
合并多个订阅
Observable<String> first = Observable.just(\"A\", \"B\", \"C\");Observable<String> second = Observable.just(\"1\", \"2\", \"3\");Observable.merge(first, second) .subscribe(System.out::println);
-
条件订阅
Observable<String> source = Observable.just(\"Hello\", \"World\", \"Error\");source.flatMap(s -> { if (\"Error\".equals(s)) { return Observable.error(new RuntimeException(\"Error encountered\")); } return Observable.just(s.toUpperCase());}).subscribe(System.out::println, System.err::println);
-
资源管理
Observable.using( () -> new FileInputStream(\"file.txt\"), // 创建资源 inputStream -> Observable.just(readFile(inputStream)), // 使用资源 inputStream -> inputStream.close() // 释放资源).subscribe(System.out::println);
4.1.3 使用操作符
RxJava提供了数百个操作符来处理Observable流,下面介绍最常用的几类操作符。
转换操作符
-
map:对每个元素应用函数
Observable.just(\"Hello\", \"World\") .map(String::toUpperCase) .subscribe(System.out::println);
-
flatMap:将每个元素转换为Observable并合并
Observable.just(\"Hello\", \"World\") .flatMap(s -> Observable.fromArray(s.split(\"\"))) .subscribe(System.out::println);
-
concatMap:类似flatMap但保持顺序
Observable.just(\"Hello\", \"World\") .concatMap(s -> Observable.fromArray(s.split(\"\"))) .subscribe(System.out::println);
-
switchMap:只保留最新的Observable
Observable.just(\"Hello\", \"World\") .switchMap(s -> Observable.interval(100, TimeUnit.MILLISECONDS) .map(i -> s + \" \" + i) .take(5)) .subscribe(System.out::println);
-
cast:强制类型转换
Observable<Object> objObs = Observable.just(\"Hello\");Observable<String> strObs = objObs.cast(String.class);
-
scan:累加器函数
Observable.range(1, 5) .scan((sum, item) -> sum + item) .subscribe(System.out::println); // 1, 3, 6, 10, 15
-
groupBy:按条件分组
Observable.just(\"Apple\", \"Banana\", \"Cherry\", \"Date\") .groupBy(s -> s.length()) .flatMapSingle(group -> group.toList() .map(list -> group.getKey() + \": \" + list)) .subscribe(System.out::println);
过滤操作符
-
filter:基于条件过滤
Observable.range(1, 10) .filter(i -> i % 2 == 0) .subscribe(System.out::println); // 2, 4, 6, 8, 10
-
take:取前N个元素
Observable.interval(1, TimeUnit.SECONDS) .take(5) .subscribe(System.out::println); // 0, 1, 2, 3, 4
-
skip:跳过前N个元素
Observable.range(1, 10) .skip(5) .subscribe(System.out::println); // 6, 7, 8, 9, 10
-
distinct:去重
Observable.just(1, 2, 2, 3, 1, 4) .distinct() .subscribe(System.out::println); // 1, 2, 3, 4
-
distinctUntilChanged:过滤连续重复
Observable.just(1, 1, 2, 2, 3, 1, 1, 4) .distinctUntilChanged() .subscribe(System.out::println); // 1, 2, 3, 1, 4
-
first/last:取第一个/最后一个元素
Observable.range(1, 10) .first(0) // 默认值 .subscribe(System.out::println); // 1
-
elementAt:取指定位置的元素
Observable.range(1, 10) .elementAt(5) // 索引从0开始 .subscribe(System.out::println); // 6
-
sample/throttleLast:定期采样
Observable.interval(100, TimeUnit.MILLISECONDS) .sample(1, TimeUnit.SECONDS) .subscribe(System.out::println); // 大约每秒一个数
-
debounce/throttleWithTimeout:防抖动
Observable.create(emitter -> { emitter.onNext(\"H\"); Thread.sleep(100); emitter.onNext(\"He\"); Thread.sleep(200); emitter.onNext(\"Hel\"); Thread.sleep(300); emitter.onNext(\"Hell\"); Thread.sleep(400); emitter.onNext(\"Hello\");}).debounce(350, TimeUnit.MILLISECONDS) .subscribe(System.out::println); // 只输出\"Hello\"
组合操作符
-
merge:合并多个Observable
Observable<String> first = Observable.interval(1, TimeUnit.SECONDS) .map(i -> \"First: \" + i);Observable<String> second = Observable.interval(750, TimeUnit.MILLISECONDS) .map(i -> \"Second: \" + i);Observable.merge(first, second) .subscribe(System.out::println);
-
concat:顺序连接多个Observable
Observable.concat( Observable.just(\"First\", \"Second\"), Observable.just(\"Third\", \"Fourth\")).subscribe(System.out::println); // First, Second, Third, Fourth
-
zip:组合多个Observable
Observable<String> letters = Observable.just(\"A\", \"B\", \"C\");Observable<Integer> numbers = Observable.just(1, 2, 3);Observable.zip(letters, numbers, (l, n) -> l + n) .subscribe(System.out::println); // A1, B2, C3
-
combineLatest:当任一Observable发射时组合最新值
Observable<String> letters = Observable.interval(1, TimeUnit.SECONDS) .map(i -> \"Letter\" + (char)(i + 65));Observable<Integer> numbers = Observable.interval(750, TimeUnit.MILLISECONDS) .map(i -> i + 1);Observable.combineLatest(letters, numbers, (l, n) -> l + n) .subscribe(System.out::println);
-
withLatestFrom:类似combineLatest但由主Observable触发
letters.withLatestFrom(numbers, (l, n) -> l + n) .subscribe(System.out::println);
-
startWith:在Observable开始前插入数据
Observable.just(\"World\") .startWith(\"Hello\") .subscribe(System.out::println); // Hello, World
-
switchOnNext:切换Observable流
Observable<Observable<String>> observables = Observable.just( Observable.interval(100, TimeUnit.MILLISECONDS).map(i -> \"A\" + i), Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> \"B\" + i));Observable.switchOnNext(observables.delay(500, TimeUnit.MILLISECONDS)) .subscribe(System.out::println);
错误处理操作符
-
onErrorReturn:出错时返回默认值
Observable.error(new RuntimeException(\"Error\")) .onErrorReturn(e -> \"Default\") .subscribe(System.out::println); // Default
-
onErrorResumeNext:出错时切换到另一个Observable
Observable.error(new RuntimeException(\"Error\")) .onErrorResumeNext(Observable.just(\"A\", \"B\", \"C\")) .subscribe(System.out::println); // A, B, C
-
retry:重试
Observable.create(emitter -> { System.out.println(\"Attempting\"); emitter.onError(new RuntimeException(\"Failed\"));}).retry(3) .subscribe(System.out::println, System.err::println);
-
retryWhen:条件重试
Observable.error(new RuntimeException(\"Error\")) .retryWhen(errors -> errors.zipWith(Observable.range(1, 3), (e, i) -> i) .flatMap(i -> { System.out.println(\"Retry #\" + i); return Observable.timer(i, TimeUnit.SECONDS); })) .subscribe(System.out::println, System.err::println);
条件操作符
-
all:是否所有元素满足条件
Observable.range(1, 5) .all(i -> i < 10) .subscribe(System.out::println); // true
-
contains:是否包含指定元素
Observable.just(\"A\", \"B\", \"C\") .contains(\"B\") .subscribe(System.out::println); // true
-
isEmpty:是否为空
Observable.empty() .isEmpty() .subscribe(System.out::println); // true
-
defaultIfEmpty:如果为空提供默认值
Observable.empty() .defaultIfEmpty(\"Default\") .subscribe(System.out::println); // Default
-
sequenceEqual:比较两个Observable序列
Observable.sequenceEqual( Observable.just(1, 2, 3), Observable.just(1, 2, 3)).subscribe(System.out::println); // true
数学和聚合操作符
-
count:计数
Observable.range(1, 10) .count() .subscribe(System.out::println); // 10
-
reduce:累积计算
Observable.range(1, 5) .reduce((sum, i) -> sum + i) .subscribe(System.out::println); // 15
-
collect:收集到容器
Observable.range(1, 5) .collect(ArrayList::new, List::add) .subscribe(System.out::println); // [1, 2, 3, 4, 5]
-
toList/toMap/toSet:转换为集合
Observable.just(\"A\", \"B\", \"A\") .toSet() .subscribe(System.out::println); // [A, B]
-
sum/average/max/min:数学运算
Observable.range(1, 5) .map(Integer::doubleValue) .average() .subscribe(System.out::println); // 3.0
实用操作符
-
doOnNext/doOnError/doOnComplete:副作用操作
Observable.just(\"Hello\") .doOnNext(s -> System.out.println(\"About to emit: \" + s)) .doOnComplete(() -> System.out.println(\"Completed\")) .subscribe();
-
materialize/dematerialize:将通知转换为对象
Observable.just(\"Hello\") .materialize() .subscribe(notification -> { if (notification.isOnNext()) { System.out.println(\"Value: \" + notification.getValue()); } else if (notification.isOnComplete()) { System.out.println(\"Completed\"); } });
-
timeInterval/timestamp:添加时间信息
Observable.interval(1, TimeUnit.SECONDS) .take(3) .timeInterval() .subscribe(ti -> System.out.println(\"Value: \" + ti.value() + \", Interval: \" + ti.time() + \"ms\"));
-
cache:缓存发射的数据
Observable<Long> cached = Observable.interval(1, TimeUnit.SECONDS) .take(5) .cache();cached.subscribe(System.out::println); // 开始发射Thread.sleep(3000);cached.subscribe(System.out::println); // 从缓存中获取
-
replay:重放给后续订阅者
ConnectableObservable<Long> replay = Observable.interval(1, TimeUnit.SECONDS) .take(5) .replay();replay.connect(); // 开始发射Thread.sleep(3000);replay.subscribe(System.out::println); // 从开始重放
实际应用示例
-
网络请求组合
Observable<Profile> profileObservable = userService.getProfile(userId);Observable<List<Friend>> friendsObservable = userService.getFriends(userId);Observable.zip(profileObservable, friendsObservable, (profile, friends) -> new UserData(profile, friends)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(userData -> updateUI(userData), error -> showError(error));
-
搜索建议
RxTextView.textChanges(searchInput) .debounce(300, TimeUnit.MILLISECONDS) .filter(text -> text.length() > 2) .distinctUntilChanged() .switchMap(query -> searchService.suggest(query.toString()) .onErrorResumeNext(Observable.empty())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(suggestions -> updateSuggestions(suggestions));
-
轮询检查
Observable.interval(5, TimeUnit.SECONDS) .flatMap(i -> checkStatus() .retryWhen(errors -> errors.delay(1, TimeUnit.SECONDS))) .distinctUntilChanged() .subscribe(status -> updateStatus(status));
-
批量处理
Observable.fromIterable(hugeList) .buffer(100) // 每100个一批 .flatMap(batch -> processBatch(batch).subscribeOn(Schedulers.io())) .subscribe(result -> aggregateResult(result));
-
事件总线
public class RxEventBus { private final PublishSubject<Object> subject = PublishSubject.create(); public void post(Object event) { subject.onNext(event); } public <T> Observable<T> observe(Class<T> eventClass) { return subject.ofType(eventClass); }}
RxJava的操作符非常丰富,掌握这些操作符能够帮助开发者高效处理各种异步数据流场景。实际开发中,应根据具体需求选择合适的操作符组合,同时注意线程调度和资源管理,以构建高效可靠的响应式应用。