RxJava 内存泄漏

1. RxJava 内存泄漏原因

  • Disposable 基本原理:

(1)这里仅看下 Observer 的执行,如在主线程执行 Observer,会走 Observable 的 observeOn 方法,然后会把 Observable 包装成 ObservableObserveOn。 当被订阅者如 PublishSubject 通过 onNext 发送事件时,会调用 ObservableObserveOn 中的订阅者 ObserveOnObserver 的 onNext 方法。之后会通过 Worker 来执行 ObserveOnObserver(实现了Runnable接口) 的 run() 方法。

(2)Worker 是每个动作的执行者,通过线程池执行,而 Worker 的创建和管理是通过 Schedulers 来完成的,在 subscribe 订阅时 Schedulers 负责创建该订阅者执行的 Worker。Schedulers 是我们在使用 RxJava 时指定的,如 Schedulers.io(),Schedulers.computation() 或 AndroidSchedulers.mainThread()。 这里以 Schedulers.computation() 创建的 Worker 为例:

ObserveOnObserver.oNext() –> EventLoopWorker.schedule() –> PoolWorker.scheduleActual() –> ScheduledExecutorService.submit()

(3)scheduleActual() 函数中通过线程池执行任务,传入的 Runnable 使用 ScheduledRunnable 包装了一层,ScheduledRunnable 实现了 Callable,通过线程池执行时返回 Future,通过 Future 可以获取任务执行状态以及可以取消任务。 scheduleActual() 返回结果是 ScheduledRunnable,ScheduledRunnable 同时实现了 Disposable 接口,在 dispose() 方法中通过 Future.cancel() 来取消任务执行。

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    // ScheduledRunnable 实现了 Disposable接口,函数返回 Disposable
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            // 通过线程池执行
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
  • 上下游的 Disposable 传递
 上游  Single.just(true)
  |        .delay(2500, TimeUnit.MILLISECONDS)
  |        .subscribeOn(Schedulers.io())
  |        .observeOn(AndroidSchedulers.mainThread())
 下游      .subscribe(new Consumer<Boolean>()

订阅时的基本流程:简单来说就是下游调用 subscribe(),向上游调用subscribe(),上游 subscribe() 中创建 Disposable,再往下游调用 Observer 的 onSubscribe(Disposable),下游会对上游传过来的 Disposable 进行包装,所以最终调用 dispose() 方法时,下游的 dispose() 方法中也会调用上游的 dispose() 方法。

例如:

(1)LambdaObserver 中 DisposableHelper.setOnce(this, d)

(2)ObserveOnObserver 中变量 Disposable upstream;

调用 dispose(),会调用自己的 dispose() 方法和上游的 dispose() 方法

rxjava_memory_dispose.png

2. 解决方法

内存泄漏示例一:

@Override
protected void onStart() {
    super.onStart();
    mDisposable = Single.just(true)
        .delay(2500, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Boolean>() {
          @Override
          public void accept(Boolean aBoolean) throws Exception {
            // do something
          }
        }, Functions.emptyConsumer());
}

由于在 subscribe 方法中创建 Consumer 或者 Observer 时,属于匿名内部类,所以会持有外部类对象,若果外部类是 Activity 或者 Fragment 或者 View,当页面销毁或者 View 销毁时,RxJava 的线程还在执行,就会一直持有 Activity 、Fragment 或 View,导致内存泄漏。对于这种情况的解决,就是取消订阅,以及结束 RxJava 的线程执行,保证 RxJava 中的订阅者能够被回收。常见的处理方式有以下三种:

  • 解决方式一:
    @Override
    protected void onDestroy() {
      super.onDestroy();
      // 手动解除订阅
      if (mDisposable != null && !mDisposable.isDisposed()) {
        mDisposable.dispose();
        mDisposable = null;
      }
    }
    
  • 解决方式二:使用 CompositeDisposable

CompositeDisposable 是一个 disposable 的容器,可以容纳多个 disposable,添加和去除的复杂度为O(1)。

// 成员变量
  private CompositeDisposable mCompositeDisposable = new CompositeDisposable();
  protected void onStart() {
    super.onStart();
    mCompositeDisposable.add(dispose1);
    mCompositeDisposable.add(dispose2);
    mCompositeDisposable.add(dispose3);
    ...
 }
  @Override
  protected void onDestroy() {
    super.onDestroy();
    // 手动解除订阅
    if (mCompositeDisposable != null && !mCompositeDisposable.isDisposed()) {
      mCompositeDisposable.dispose();
    }
  }
  • 解决方式二:
@Override
protected void onStart() {
    super.onStart();
    mDisposable = Single.just(true)
        .delay(2500, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .compose(((BaseActivity)getActivity()).bindUntilEvent(ActivityEvent.DESTROY)) // 根据生命周期自动解除订阅
        .subscribe(new Consumer<Boolean>() {
          @Override
          public void accept(Boolean aBoolean) throws Exception {
            // do something
          }
        }, Functions.emptyConsumer());
}

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦