EventBus源码分析
1、EventBus是什么?
EventBus是适用于Android和Java的发布/订阅事件总线。发布/订阅也被称为观察者模式。
- 它简化了组件间的通信。
- 事件的发送者和接受者完全解耦。
- 避免了复杂的生命周期问题。
2、EventBus怎么用
分析源码之前,我们先熟悉一下EventBus的用法。 点我打开官网。 首先看到下图:
Publisher(发布者)把Event(事件)post(发送)给EventBus, EventBus把Event传递给Subscriber(订阅者)的 onEvent方法。我们先有一个大概印象,看完下面的源码分析部分,会有更深的理解。
继续往下看,EventBus的使用分为三步:
- 1.定义事件:
//step 1
public class MessageEvent {
}
@Override
public void onStart() {
super.onStart();
//step 2
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
super.onStop();
//step 2
EventBus.getDefault().unregister(this);
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {
Log.d(TAG, "onMessageEvent: ");
}
public void test(View view) {
//step 3
EventBus.getDefault().post(new MessageEvent());
}
3、EventBus源码分析
分析的源码版本是EventBus 3.2.0 版本
我们该从哪里开始分析源码呢?试着从我们最熟悉最常用的代码开始,从EventBus的使用步骤开始。回顾上文介绍的使用步骤:
第一步,定义Event(事件)很简单。
第二步,注册订阅者和订阅方法。这里,我们再细分为三小步分析,第一小步就是注册订阅者:
EventBus.getDefault().unregister(this);
首先进入getDefault()方法内部
//volatile修饰的变量对它的修改会立刻刷新到主存,比synchronized和Lock开销小
static volatile EventBus defaultInstance;
/**
* Convenience singleton for apps using a process-wide EventBus instance.
*/
public static EventBus getDefault() {
//单例模式
EventBus instance = defaultInstance;
if (instance == null) {
//synchronized 代码块保证线程安全
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
getDefault()方法采用单例模式,synchronized关键字保证了线程安全。另外 defaultInstance变量使用volatile关键字修饰。 volatile修饰的变量对它的修改会立刻刷新到主存,比synchronized和Lock开销小。感兴趣的小伙伴请查阅其他资料。
getDefault()返回EventBus实例,通过该实例调用register()
/**
* Registers the given subscriber to receive events.
* Subscribers must call {@link #unregister(Object)}
* once they are no longer interested in receiving events.
*
* Subscribers have event handling methods that must be
* annotated by {@link Subscribe}.
* The {@link Subscribe} annotation also allows configuration
* like {@linkThreadMode} and priority.
*/
public void register(Object subscriber) {
//Subscriber(订阅者)
Class<?> subscriberClass = subscriber.getClass();
//找出Subscriber(订阅者)内部所有的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder
.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//建立订阅关系,并用Map集合缓存数据
subscribe(subscriber, subscriberMethod);
}
}
}
register方法接收一个订阅者,常见情况就是一个Activity或者Fragment。 通过subscriberMethodFinder类的对象找到订阅者所有的订阅方法, 订阅方法就是指使用@Subscrib注解修饰的方法。
另外方法注释中提到:register()和Unregister()必须成对出现。 @Subscribe注解可配置 threadMode 和 priority 参数
register()内部遍历调用subscribe(),我们来分析一下这个方法:
/**
* key为事件类型。就是我们前面定义Event类。value是Subscription类的集合。
* Subscription类表示订阅,把每一个订阅方法和其所在的订阅者封装为对象。
*/
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
//key是订阅者,value是其订阅的(事件类型)集合
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 必须在同步块中调用
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//订阅方法的参数类型
Class<?> eventType = subscriberMethod.eventType;
//订阅方法和他所在的订阅者封装为一个Subscription类的对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//把订阅方法缓存到Map中
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass()
+ " already registered to event "+ eventType);
}
}
//遍历集合,并按优先级降序重新排列
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//key是订阅者,value是其订阅的(事件类型)集合
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//如果是粘性事件
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
该方法接受两个参数,第一个参数是订阅者,第二个参数是SubscriberMethod类的对象,SubscriberMethod表示订阅者内部的订阅方法。 并把两个参数封装为一个Subscription类的对象。通过成员变量subscriptionsByEventType和typesBySubscriber缓存数据。 关于源码中的粘性事件部分,下一篇文章在继续学习。
第一小步注册订阅者已经分析完了,下面我们来分析第二小步:订阅者取消注册。回想EventBus的使用步骤,取消注册只有一个核心方法。unregister(this):
/**
* Unregisters the given subscriber from all event classes.
*/
public synchronized void unregister(Object subscriber) {
//获取该订阅者订阅的所有事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
//调用unregister()前必须调用registered()方法
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//遍历调用。参数为订阅者和订阅类型
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: "
+ subscriber.getClass());
}
}
/**
* 更新且只更新subscriptionsByEventType
*/
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//获取该eventType的所有订阅者。
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
//缓存中的订阅者 等于 传入的订阅者,表示该缓存失效,需要清除。
if (subscription.subscriber == subscriber) {
//subscription.active标记为false,并从subscriptionse移除该对象
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
接下来第三小步:注册订阅方法:
@Subscribe(threadMode = ThreadMode.MAIN, priority = 0, sticky = true)
public void onMessageEvent(MessageEvent event) {
Log.d(TAG, "onMessageEvent: ");
}
注册订阅方法很简单,用@Subscribe注释方法,就完成了注册。 Subscribe有如下三个属性可以配置:
- threadMode:线程模型,收到事件时,该方法在那个线程执行。
- priority:优先级。
- sticky:true表示接受粘性事件
第二步注册已经讲完了,接下来进入第三步:发送事件。其核心方法也是只有一个post(new MessageEvent());
//Thread Local 修饰的变量线程不共享。变量在第一次调用get()时初始化,此后变量值不再改变
private final ThreadLocal currentPostingThreadState =
new ThreadLocal() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
/**
* 将给定事件发布到事件总线。
*/
public void post(Object event) {
//下面三行代码,把event加入队列,等待订阅者处理
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
//防止重复开启循环
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//传入当前需要处理的事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
处理事件的队列在PostingThreadState类中:
/**
* For ThreadLocal, much faster to set (and get multiple values).
*/
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();//事件队列
boolean isPosting;//是否在发送中
boolean isMainThread;//是否是主线程
Subscription subscription;//当前正在处理的订阅关系,执行订阅方法
Object event;//当前正在处理的事件
boolean canceled;//是否取消事件
}
回到Post方法中,其内部循环调用了postSingleEvent(),我们进入该方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//事件类型
Class<?> eventClass = event.getClass();
//false表示该事件没有订阅者
boolean subscriptionFound = false;
//默认为true
if (eventInheritance) {
//根据event对象获取它所有的父类和实现的接口
List<Class<?>>eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//发送事件
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//事件没有订阅者时
if (!subscriptionFound) {
if (logNoSubscriberMessages) {//日志开关,默认true
//打印日志
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState,
Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//所有订阅了eventClass的方法集合
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//遍历执行所有的订阅方法
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//订阅方法上设置的线程模型,大部分情况都是ThreadMode.MAIN。
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
//发射调用订阅方法
invokeSubscriber(subscription, event);
} else {
//把事件发送到主线程执行
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: "
+ subscription.subscriberMethod.threadMode);
}
}
EventBus的工作流程我们已经从源码的角度梳理了一遍。关于源码中的粘性事件和线程模型部分, 将在下一篇博客继续分析。