EventBus源码分析2
通过EventBus源码分析1的学习,我们已经知道了EventBus源码是如何实现发布订阅流程的。今天我们来继续学习EventBus 的粘性事件和线程模型
粘性事件
这里不介绍粘性事件的使用方法,不了解的同学,请自行学习。直接进入源码分析部分,EventBus使用postSticky()发送粘性事件:
/**
* 缓存粘性事件,Map集合的key 为事件类型,value 就是事件本身。
* 因为采用map的数据结构,所以每一个类型,只能缓存一个对象。
*/
private final Map<Class<?>, Object> stickyEvents;
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
//和普通事件一样的发送逻辑
post(event);
}
post()是如何工作,请参考EventBus源码分析1。可以看到在发送过程种,粘性事件比普通事件,只是多了一步缓存操作。 接下来复用了普通post()方法。
继续下一个问题,粘性事件如何接收?
- 第一种:postSticky()内部调用了post(),所以会走普通事件的分发流程。调用postSticky()前,如果注册了订阅方法。 该方法将会执行。
- 第二种:注册订阅者和订阅方法时,如果内存中有匹配的事件,订阅方法会被执行。
通过上一篇文章的分析,我们知道,注册订阅者时,会调用subscribe()。在subscribe() 方法内有如下代码块:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { ...... // subscriberMethod.sticky = true 表示订阅方法接收粘性事件 if (subscriberMethod.sticky) { // eventInheritance默认为true 表示会匹配Event的父类 if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.EntryClass<?>, Object> entry : entries) {//遍历所有的粘性事件 Class<?> candidateEventType = entry.getKey(); // 表达式为true,表示eventType与candidateEventType相同的类,或者是其父类 if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { //取出粘性事件,进行处理 Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null) {// 判断非空 postToSubscription(newSubscription, stickyEvent, isMainThread()); } }
isAssignableFrom()返回一个boolean值,true表示方法接受到的参数对象,可以强转为该方法的调用者。从这里我们可以知道:订阅方法订阅A类的事件, 源码部分默认我们订阅了A类的子类的事件。
subscribe()内部调用checkPostStickyEventToSubscription(),改方法内部简单判空,继续调用postToSubscription(),执行订阅方法。
最后一个问题,取消粘性事件。缓存粘性事件会消耗内存资源,那要如何取消粘性事件呢?EventBus提供了下面三个方法:
//根据事件类型,移除粘性事件
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (stickyEvents) {
//把粘性事件从Map中移除,并强转为EventType表示的类的对象
return eventType.cast(stickyEvents.remove(eventType));
}
}
//根据给定的事件,删除粘性事件。
//return true 如果事件匹配并且粘性事件已删除。
public boolean removeStickyEvent(Object event) {
//把粘性事件从Map中移除
synchronized (stickyEvents) {
Class<?> eventType = event.getClass();
Object existingEvent = stickyEvents.get(eventType);
if (event.equals(existingEvent)) {
stickyEvents.remove(eventType);
return true;
} else {
return false;
}
}
}
// 删除所有粘性事件。
public void removeAllStickyEvents() {
//清空map集合
synchronized (stickyEvents) {
stickyEvents.clear();
}
}
Tips:粘性事件使用完毕,要及时清除,释放内存资源。
到这里,EventBus源码中粘性事件的部分就讲解完了。下面我们继续分析EventBus的线程模型。
threadMode/线程模型
EventBus最常使用线程的地方就是订阅方法,我们就从这里开始:
@Subscribe(threadMode = ThreadMode.BACKGROUND, priority = 0, sticky = true)
public void onMessageEvent(Object event) {
Log.d(TAG, "onMessageEvent: ");
}
通过ThreadMode字段,我们可以指定线程。该字段是一个枚举类型。那我们就来看看这个枚举类,内部有多少个常量。
public enum ThreadMode {
/**
* 发送事件、和执行订阅方法在同一个线程。没有线程切换,开销小。
* 默认值。
* 如果在主线程发送事件,订阅方法内部不要执行耗时操作,避免阻塞主线程
*/
POSTING,
/**
* 订阅方法在主线程执行,非Android系统,等于POSTING
* 发送事件在主线程,不会涉及到线程切换。
* 如果在其他线程发布事件,事件会被发送到消息队列,等待处理
*/
MAIN,
/**
* 不管发送事件在那个线程,订阅方法一定在主线程执行。
* 使用消息队列处理事件。
*/
MAIN_ORDERED,
/**
* 在后台线程,执行订阅方法。
* 发送的线程不是主线程,那么发送事件,和执行订阅方法,在同一个线程
* 在主线程发送事件,EventBus会启用线程池来执行订阅方法。
*/
BACKGROUND,
/**
* 总是启用另一个线程(非发布线程和主线程),来执行订阅方法。
* 通过线程池管理线程
* 该模式适合用来处理耗时操作,比如数据库操作等。
*/
ASYNC
}
回顾EventBus的post流程我们发现,和ThreadMode相关的部分都在postToSubscription()中,我们就来分析一下这个方法:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//订阅方法上设置ThreadMode 属性
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
//反射调用订阅方法
invokeSubscriber(subscription, event);
} else {
//把事件发送到主线程执行 HandlerPoster
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not
// decoupled from subscriber
// 不正确的代码,不用管,上面英文注释,完全复制源码中的注释
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
//发布事件和执行订阅方法在同一个线程
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: "
+ subscription.subscriberMethod.threadMode);
}
}
仔细对照postToSubscription()源码,可以很好的验证我们对于线程模型的理解。该方法内部使用了以下字段,我们一个一个分析。
//主线程接口类,Android上就是UI线程
private final MainThreadSupport mainThreadSupport;
//继承Handler,负责向MainLooper发送消息
private final Poster mainThreadPoster;
//后台线程
private final BackgroundPoster backgroundPoster;
//异步线程
private final AsyncPoster asyncPoster;
//线程池,复用线程
private final ExecutorService executorService;
EventBus的构造方法中,对这些变量进行了初始化。
EventBus(EventBusBuilder builder) {
......
mainThreadSupport = builder.getMainThreadSupport();
//主线程Post
mainThreadPoster = mainThreadSupport != null ?
mainThreadSupport.createPoster(this) : null;
//后台线程Post
backgroundPoster = new BackgroundPoster(this);
//异步线程Post
asyncPoster = new AsyncPoster(this);
......
//初始化线程池
executorService = builder.executorService;
}
mainThreadSupport & mainThreadPoster
构造方法中,调用了getMainThreadSupport()获取mainThreadSupport对象。该方法内部获取并保存MainLooper。
MainThreadSupport getMainThreadSupport() {
if (mainThreadSupport != null) {
return mainThreadSupport;
//判断Android系统Log类是否可用
} else if (AndroidLogger.isAndroidLogAvailable()) {
// 获取主线程的Looper
Object looperOrNull = getAndroidMainLooperOrNull();
return looperOrNull == null ? null :
new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
} else {
return null;
}
}
/**
* “主”线程的接口,可以是您喜欢的任何线程。通常在Android上,使用Android的主线程。
*/
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
//主线程Looper
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
//获取当前线程的Looper 并判断是不是等于主线程的Looper
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
}
createPoster()方法负责创建mainThreadPoster对象。HandlerPoster继承Handler,和主线程的Looper绑定。提供了public的enqueue(),用来发送事件。 源码中细节比较多,这里无法面面俱到。还请参阅源码阅读。我们来看一下比较关键的handleMessage()方法。
@Override
public void handleMessage(Message msg) {
//临时标记变量,表示消息是否处理完毕
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//从队列中取出消息
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
//再次检查,这次同步
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//执行订阅流程
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException(
"Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
//return前,执行的最后一句代码
handlerActive = rescheduled;
}
}
handlerActive为true表示mainThreadPoster在工作中,事件入列,等待处理。 为false,事件入列后,还需要向MainLooper发送Message。启动handleMessage(),开始工作。
当我们在非主线程发送消息时,事件一步步传递到postToSubscription()内,如果的订阅方法的ThreadMode == MAIN 或者 == MAIN_ORDERED, 事件将通过mainThreadPoster.enqueue()入列,等待处理。mainThreadPoster本质上是一个和主线程Looper绑定的Handler。
BackgroundPoster
final class BackgroundPoster implements Runnable, Poster {
//消息队列
private final PendingPostQueue queue;
private final EventBus eventBus;
//当前处理事件的线程 是否在工作中
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
//用线程池 执行 这个Runnable
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
//return前,执行的最后一句代码
executorRunning = false;
}
}
}
BackgroundPoster继承Runnable和Poster,通过enqueue()将消息入列,通过线程池来执行Runnable。 通过executorRunning字段进行简单的判断。是否有正在的工作的线程。有的话直接复用。
asyncPoster
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
//打开另外一个线程,执行run()
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
//执行订阅方法
eventBus.invokeSubscriber(pendingPost);
}
}
AsyncPoster和BackgroundPoster很像,且更简单。每次有需要执行的订阅方法。直接从线程池取线程执行。
到这里,EventBus的粘性事件和线程模型我们就分析完毕了。加上 第一篇文章,EventBus核心功能的源码,我们已经分析完毕。