> 技术文档 > React编程入门示例:RxJava深度解析

React编程入门示例:RxJava深度解析

React编程入门示例:RxJava深度解析

文章目录

    • 4.1 RxJava示例
      • 4.1.1 创建Observable
        • 基本创建方式
        • 高级创建方式
        • 特殊用途Observable
      • 4.1.2 订阅Observer
        • 基本订阅方式
        • 背压处理
        • 调度器控制
        • 组合订阅
      • 4.1.3 使用操作
        • 转换操作符
        • 过滤操作符
        • 组合操作符
        • 错误处理操作符
        • 条件操作符
        • 数学和聚合操作符
        • 实用操作符![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/453b1e743b334afb961ea96a8edf996e.png)
        • 实际应用示例

React编程入门示例:RxJava深度解析

4.1 RxJava示例

RxJava是ReactiveX在Java虚拟机上的实现,它使用可观察序列来构建异步和基于事件的程序。RxJava提供了丰富的操作符来处理异步数据流,使开发者能够以声明式的方式组合异步操作。
React编程入门示例:RxJava深度解析

4.1.1 创建Observable

Observable是RxJava中的基本构建块,代表一个可观察的数据源,能够发射0到N个数据项,然后可能以一个完成或错误通知终止。

基本创建方式
  1. Observable.just():创建一个发射指定值的Observable

    Observable<String> observable = Observable.just(\"Hello\", \"World\");
  2. Observable.fromIterable():从集合创建Observable

    List<String> list = Arrays.asList(\"Apple\", \"Banana\", \"Cherry\");Observable<String> observable = Observable.fromIterable(list);
  3. Observable.range():创建一个发射特定整数序列的Observable

    Observable<Integer> observable = Observable.range(1, 5); // 1, 2, 3, 4, 5
  4. Observable.interval():创建一个按固定时间间隔发射整数序列的Observable

    Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS); // 0, 1, 2... 每秒
  5. Observable.create():自定义Observable创建

    Observable<String> observable = Observable.create(emitter -> { emitter.onNext(\"First\"); emitter.onNext(\"Second\"); emitter.onComplete();});
  6. Observable.empty():创建一个不发射任何数据但正常终止的Observable

    Observable<String> observable = Observable.empty();
  7. Observable.error():创建一个不发射任何数据但以错误终止的Observable

    Observable<String> observable = Observable.error(new RuntimeException(\"Error occurred\"));
  8. Observable.never():创建一个不发射任何数据也不终止的Observable

    Observable<String> observable = Observable.never();
高级创建方式

React编程入门示例:RxJava深度解析

  1. Observable.defer():延迟创建,直到有观察者订阅

    Observable<Long> observable = Observable.defer(() -> Observable.just(System.currentTimeMillis()));
  2. Observable.fromCallable():从Callable创建,适合可能有异常抛出的场景

    Observable<String> observable = Observable.fromCallable(() -> { if (Math.random() > 0.5) { throw new IOException(\"Random error\"); } return \"Success\";});
  3. Observable.fromFuture():从Future创建

    Future<String> future = Executors.newSingleThreadExecutor() .submit(() -> \"Result from Future\");Observable<String> observable = Observable.fromFuture(future);
  4. Observable.generate():同步生成复杂流

    Observable<Integer> observable = Observable.generate( () -> 0, // 初始状态 (state, emitter) -> { emitter.onNext(state); if (state == 10) { emitter.onComplete(); } return state + 1; });
  5. Observable.merge():合并多个Observable

    Observable<String> first = Observable.just(\"A\", \"B\", \"C\");Observable<String> second = Observable.just(\"1\", \"2\", \"3\");Observable<String> merged = Observable.merge(first, second);
  6. Observable.concat():顺序连接多个Observable

    Observable<String> concatenated = Observable.concat(first, second);
  7. Observable.zip():组合多个Observable

    Observable<String> zipped = Observable.zip(first, second, (f, s) -> f + s);
特殊用途Observable

React编程入门示例:RxJava深度解析

  1. Subject:既是Observable又是Observer

    • PublishSubject:向所有订阅者广播所有后续事件

      PublishSubject<String> subject = PublishSubject.create();subject.onNext(\"Hello\");subject.subscribe(System.out::println); // 不会收到\"Hello\"subject.onNext(\"World\"); // 会打印\"World\"
    • BehaviorSubject:向新订阅者发送最近的一个事件

      BehaviorSubject<String> subject = BehaviorSubject.createDefault(\"Initial\");subject.onNext(\"First\");subject.subscribe(System.out::println); // 立即收到\"First\"
    • ReplaySubject:向所有订阅者重放所有事件

      ReplaySubject<String> subject = ReplaySubject.create();subject.onNext(\"Hello\");subject.subscribe(System.out::println); // 会收到\"Hello\"
    • AsyncSubject:只在完成时发送最后一个事件

      AsyncSubject<String> subject = AsyncSubject.create();subject.onNext(\"Hello\");subject.onNext(\"World\");subject.subscribe(System.out::println); // 不会立即收到subject.onComplete(); // 会打印\"World\"
  2. ConnectableObservable:需要调用connect()才开始发射数据

    ConnectableObservable<String> connectable = Observable.just(\"A\", \"B\", \"C\").publish();connectable.subscribe(System.out::println);connectable.connect(); // 此时才开始发射数据

4.1.2 订阅Observer

React编程入门示例:RxJava深度解析
Observer是RxJava中的消费者,用于接收Observable发射的数据和通知。

基本订阅方式
  1. 简单订阅

    Observable.just(\"Hello\").subscribe();
  2. 带Consumer的订阅

    Observable.just(\"Hello\") .subscribe( value -> System.out.println(\"Received: \" + value), // onNext error -> System.err.println(\"Error: \" + error), // onError () -> System.out.println(\"Completed\") // onComplete );
  3. 使用Observer接口

    Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println(\"Subscribed\"); } @Override public void onNext(String s) { System.out.println(\"Received: \" + s); } @Override public void onError(Throwable e) { System.err.println(\"Error: \" + e); } @Override public void onComplete() { System.out.println(\"Completed\"); }};Observable.just(\"Hello\", \"World\").subscribe(observer);
  4. 使用Disposable控制订阅

    Disposable disposable = Observable.interval(1, TimeUnit.SECONDS) .subscribe(System.out::println);// 在需要时取消订阅disposable.dispose();
背压处理

React编程入门示例:RxJava深度解析
RxJava 2.x引入了Flowable来处理背压(Backpressure),当数据生产速度大于消费速度时:

  1. Flowable基本使用

    Flowable.range(1, 1000) .onBackpressureBuffer() // 缓冲策略 .observeOn(Schedulers.io()) .subscribe(System.out::println);
  2. 背压策略

    • BUFFER:缓冲所有数据(可能导致OOM)
    • DROP:丢弃无法处理的数据
    • LATEST:只保留最新数据
    • ERROR:抛出MissingBackpressureException
    • MISSING:不实现背压,由下游处理
    Flowable.interval(1, TimeUnit.MILLISECONDS) .onBackpressureDrop(dropped -> System.out.println(\"Dropped: \" + dropped)) .observeOn(Schedulers.computation()) .subscribe(System.out::println);
调度器控制

RxJava使用Schedulers控制线程:

  1. subscribeOn:指定Observable操作执行的线程

    Observable.just(\"Hello\") .subscribeOn(Schedulers.io()) // 在IO线程执行 .subscribe(System.out::println);
  2. observeOn:指定Observer接收数据的线程

    Observable.range(1, 5) .observeOn(Schedulers.computation()) // 在计算线程接收 .subscribe(System.out::println);
  3. 常用调度器

    • Schedulers.io():适合I/O操作(无界线程池)
    • Schedulers.computation():适合计算操作(固定大小线程池)
    • Schedulers.newThread():为每个任务创建新线程
    • Schedulers.single():单一线程顺序执行
    • Schedulers.trampoline():在当前线程排队执行
    • Schedulers.from(Executor):自定义Executor
组合订阅

React编程入门示例:RxJava深度解析

  1. 合并多个订阅

    Observable<String> first = Observable.just(\"A\", \"B\", \"C\");Observable<String> second = Observable.just(\"1\", \"2\", \"3\");Observable.merge(first, second) .subscribe(System.out::println);
  2. 条件订阅

    Observable<String> source = Observable.just(\"Hello\", \"World\", \"Error\");source.flatMap(s -> { if (\"Error\".equals(s)) { return Observable.error(new RuntimeException(\"Error encountered\")); } return Observable.just(s.toUpperCase());}).subscribe(System.out::println, System.err::println);
  3. 资源管理

    Observable.using( () -> new FileInputStream(\"file.txt\"), // 创建资源 inputStream -> Observable.just(readFile(inputStream)), // 使用资源 inputStream -> inputStream.close() // 释放资源).subscribe(System.out::println);

4.1.3 使用操作符

RxJava提供了数百个操作符来处理Observable流,下面介绍最常用的几类操作符。

转换操作符
  1. map:对每个元素应用函数

    Observable.just(\"Hello\", \"World\") .map(String::toUpperCase) .subscribe(System.out::println);
  2. flatMap:将每个元素转换为Observable并合并

    Observable.just(\"Hello\", \"World\") .flatMap(s -> Observable.fromArray(s.split(\"\"))) .subscribe(System.out::println);
  3. concatMap:类似flatMap但保持顺序

    Observable.just(\"Hello\", \"World\") .concatMap(s -> Observable.fromArray(s.split(\"\"))) .subscribe(System.out::println);
  4. switchMap:只保留最新的Observable

    Observable.just(\"Hello\", \"World\") .switchMap(s -> Observable.interval(100, TimeUnit.MILLISECONDS) .map(i -> s + \" \" + i) .take(5)) .subscribe(System.out::println);
  5. cast:强制类型转换

    Observable<Object> objObs = Observable.just(\"Hello\");Observable<String> strObs = objObs.cast(String.class);
  6. scan:累加器函数

    Observable.range(1, 5) .scan((sum, item) -> sum + item) .subscribe(System.out::println); // 1, 3, 6, 10, 15
  7. groupBy:按条件分组

    Observable.just(\"Apple\", \"Banana\", \"Cherry\", \"Date\") .groupBy(s -> s.length()) .flatMapSingle(group -> group.toList() .map(list -> group.getKey() + \": \" + list)) .subscribe(System.out::println);
过滤操作符

React编程入门示例:RxJava深度解析

  1. filter:基于条件过滤

    Observable.range(1, 10) .filter(i -> i % 2 == 0) .subscribe(System.out::println); // 2, 4, 6, 8, 10
  2. take:取前N个元素

    Observable.interval(1, TimeUnit.SECONDS) .take(5) .subscribe(System.out::println); // 0, 1, 2, 3, 4
  3. skip:跳过前N个元素

    Observable.range(1, 10) .skip(5) .subscribe(System.out::println); // 6, 7, 8, 9, 10
  4. distinct:去重

    Observable.just(1, 2, 2, 3, 1, 4) .distinct() .subscribe(System.out::println); // 1, 2, 3, 4
  5. distinctUntilChanged:过滤连续重复

    Observable.just(1, 1, 2, 2, 3, 1, 1, 4) .distinctUntilChanged() .subscribe(System.out::println); // 1, 2, 3, 1, 4
  6. first/last:取第一个/最后一个元素

    Observable.range(1, 10) .first(0) // 默认值 .subscribe(System.out::println); // 1
  7. elementAt:取指定位置的元素

    Observable.range(1, 10) .elementAt(5) // 索引从0开始 .subscribe(System.out::println); // 6
  8. sample/throttleLast:定期采样

    Observable.interval(100, TimeUnit.MILLISECONDS) .sample(1, TimeUnit.SECONDS) .subscribe(System.out::println); // 大约每秒一个数
  9. debounce/throttleWithTimeout:防抖动

    Observable.create(emitter -> { emitter.onNext(\"H\"); Thread.sleep(100); emitter.onNext(\"He\"); Thread.sleep(200); emitter.onNext(\"Hel\"); Thread.sleep(300); emitter.onNext(\"Hell\"); Thread.sleep(400); emitter.onNext(\"Hello\");}).debounce(350, TimeUnit.MILLISECONDS) .subscribe(System.out::println); // 只输出\"Hello\"
组合操作符

React编程入门示例:RxJava深度解析

  1. merge:合并多个Observable

    Observable<String> first = Observable.interval(1, TimeUnit.SECONDS) .map(i -> \"First: \" + i);Observable<String> second = Observable.interval(750, TimeUnit.MILLISECONDS) .map(i -> \"Second: \" + i);Observable.merge(first, second) .subscribe(System.out::println);
  2. concat:顺序连接多个Observable

    Observable.concat( Observable.just(\"First\", \"Second\"), Observable.just(\"Third\", \"Fourth\")).subscribe(System.out::println); // First, Second, Third, Fourth
  3. zip:组合多个Observable

    Observable<String> letters = Observable.just(\"A\", \"B\", \"C\");Observable<Integer> numbers = Observable.just(1, 2, 3);Observable.zip(letters, numbers, (l, n) -> l + n) .subscribe(System.out::println); // A1, B2, C3
  4. combineLatest:当任一Observable发射时组合最新值

    Observable<String> letters = Observable.interval(1, TimeUnit.SECONDS) .map(i -> \"Letter\" + (char)(i + 65));Observable<Integer> numbers = Observable.interval(750, TimeUnit.MILLISECONDS) .map(i -> i + 1);Observable.combineLatest(letters, numbers, (l, n) -> l + n) .subscribe(System.out::println);
  5. withLatestFrom:类似combineLatest但由主Observable触发

    letters.withLatestFrom(numbers, (l, n) -> l + n) .subscribe(System.out::println);
  6. startWith:在Observable开始前插入数据

    Observable.just(\"World\") .startWith(\"Hello\") .subscribe(System.out::println); // Hello, World
  7. switchOnNext:切换Observable流

    Observable<Observable<String>> observables = Observable.just( Observable.interval(100, TimeUnit.MILLISECONDS).map(i -> \"A\" + i), Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> \"B\" + i));Observable.switchOnNext(observables.delay(500, TimeUnit.MILLISECONDS)) .subscribe(System.out::println);
错误处理操作符

React编程入门示例:RxJava深度解析

  1. onErrorReturn:出错时返回默认值

    Observable.error(new RuntimeException(\"Error\")) .onErrorReturn(e -> \"Default\") .subscribe(System.out::println); // Default
  2. onErrorResumeNext:出错时切换到另一个Observable

    Observable.error(new RuntimeException(\"Error\")) .onErrorResumeNext(Observable.just(\"A\", \"B\", \"C\")) .subscribe(System.out::println); // A, B, C
  3. retry:重试

    Observable.create(emitter -> { System.out.println(\"Attempting\"); emitter.onError(new RuntimeException(\"Failed\"));}).retry(3) .subscribe(System.out::println, System.err::println);
  4. retryWhen:条件重试

    Observable.error(new RuntimeException(\"Error\")) .retryWhen(errors -> errors.zipWith(Observable.range(1, 3), (e, i) -> i) .flatMap(i -> { System.out.println(\"Retry #\" + i); return Observable.timer(i, TimeUnit.SECONDS); })) .subscribe(System.out::println, System.err::println);
条件操作符
  1. all:是否所有元素满足条件

    Observable.range(1, 5) .all(i -> i < 10) .subscribe(System.out::println); // true
  2. contains:是否包含指定元素

    Observable.just(\"A\", \"B\", \"C\") .contains(\"B\") .subscribe(System.out::println); // true
  3. isEmpty:是否为空

    Observable.empty() .isEmpty() .subscribe(System.out::println); // true
  4. defaultIfEmpty:如果为空提供默认值

    Observable.empty() .defaultIfEmpty(\"Default\") .subscribe(System.out::println); // Default
  5. sequenceEqual:比较两个Observable序列

    Observable.sequenceEqual( Observable.just(1, 2, 3), Observable.just(1, 2, 3)).subscribe(System.out::println); // true
数学和聚合操作符

React编程入门示例:RxJava深度解析

  1. count:计数

    Observable.range(1, 10) .count() .subscribe(System.out::println); // 10
  2. reduce:累积计算

    Observable.range(1, 5) .reduce((sum, i) -> sum + i) .subscribe(System.out::println); // 15
  3. collect:收集到容器

    Observable.range(1, 5) .collect(ArrayList::new, List::add) .subscribe(System.out::println); // [1, 2, 3, 4, 5]
  4. toList/toMap/toSet:转换为集合

    Observable.just(\"A\", \"B\", \"A\") .toSet() .subscribe(System.out::println); // [A, B]
  5. sum/average/max/min:数学运算

    Observable.range(1, 5) .map(Integer::doubleValue) .average() .subscribe(System.out::println); // 3.0
实用操作符React编程入门示例:RxJava深度解析
  1. doOnNext/doOnError/doOnComplete:副作用操作

    Observable.just(\"Hello\") .doOnNext(s -> System.out.println(\"About to emit: \" + s)) .doOnComplete(() -> System.out.println(\"Completed\")) .subscribe();
  2. materialize/dematerialize:将通知转换为对象

    Observable.just(\"Hello\") .materialize() .subscribe(notification -> { if (notification.isOnNext()) { System.out.println(\"Value: \" + notification.getValue()); } else if (notification.isOnComplete()) { System.out.println(\"Completed\"); } });
  3. timeInterval/timestamp:添加时间信息

    Observable.interval(1, TimeUnit.SECONDS) .take(3) .timeInterval() .subscribe(ti -> System.out.println(\"Value: \" + ti.value() + \", Interval: \" + ti.time() + \"ms\"));
  4. cache:缓存发射的数据

    Observable<Long> cached = Observable.interval(1, TimeUnit.SECONDS) .take(5) .cache();cached.subscribe(System.out::println); // 开始发射Thread.sleep(3000);cached.subscribe(System.out::println); // 从缓存中获取
  5. replay:重放给后续订阅者

    ConnectableObservable<Long> replay = Observable.interval(1, TimeUnit.SECONDS) .take(5) .replay();replay.connect(); // 开始发射Thread.sleep(3000);replay.subscribe(System.out::println); // 从开始重放
实际应用示例

React编程入门示例:RxJava深度解析

  1. 网络请求组合

    Observable<Profile> profileObservable = userService.getProfile(userId);Observable<List<Friend>> friendsObservable = userService.getFriends(userId);Observable.zip(profileObservable, friendsObservable, (profile, friends) -> new UserData(profile, friends)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(userData -> updateUI(userData), error -> showError(error));
  2. 搜索建议

    RxTextView.textChanges(searchInput) .debounce(300, TimeUnit.MILLISECONDS) .filter(text -> text.length() > 2) .distinctUntilChanged() .switchMap(query -> searchService.suggest(query.toString()) .onErrorResumeNext(Observable.empty())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(suggestions -> updateSuggestions(suggestions));
  3. 轮询检查

    Observable.interval(5, TimeUnit.SECONDS) .flatMap(i -> checkStatus() .retryWhen(errors -> errors.delay(1, TimeUnit.SECONDS))) .distinctUntilChanged() .subscribe(status -> updateStatus(status));
  4. 批量处理

    Observable.fromIterable(hugeList) .buffer(100) // 每100个一批 .flatMap(batch -> processBatch(batch).subscribeOn(Schedulers.io())) .subscribe(result -> aggregateResult(result));
  5. 事件总线

    public class RxEventBus { private final PublishSubject<Object> subject = PublishSubject.create(); public void post(Object event) { subject.onNext(event); } public <T> Observable<T> observe(Class<T> eventClass) { return subject.ofType(eventClass); }}

RxJava的操作符非常丰富,掌握这些操作符能够帮助开发者高效处理各种异步数据流场景。实际开发中,应根据具体需求选择合适的操作符组合,同时注意线程调度和资源管理,以构建高效可靠的响应式应用。