EventBus源码分析

1、EventBus是什么?

EventBus是适用于Android和Java的发布/订阅事件总线。发布/订阅也被称为观察者模式。

2、EventBus怎么用

分析源码之前,我们先熟悉一下EventBus的用法。 点我打开官网。 首先看到下图:

Publisher(发布者)把Event(事件)post(发送)给EventBus, EventBus把Event传递给Subscriber(订阅者)的 onEvent方法。我们先有一个大概印象,看完下面的源码分析部分,会有更深的理解。


继续往下看,EventBus的使用分为三步:

3、EventBus源码分析

分析的源码版本是EventBus 3.2.0 版本

我们该从哪里开始分析源码呢?试着从我们最熟悉最常用的代码开始,从EventBus的使用步骤开始。回顾上文介绍的使用步骤:


第一步,定义Event(事件)很简单。


第二步,注册订阅者和订阅方法。这里,我们再细分为三小步分析,第一小步就是注册订阅者:

EventBus.getDefault().unregister(this);

首先进入getDefault()方法内部

			
		//volatile修饰的变量对它的修改会立刻刷新到主存,比synchronized和Lock开销小
		static volatile EventBus defaultInstance;

		/**
		* Convenience singleton for apps using a process-wide EventBus instance.
		*/
		public static EventBus getDefault() {
			//单例模式
			EventBus instance = defaultInstance;
			if (instance == null) {
				//synchronized 代码块保证线程安全
				synchronized (EventBus.class) {
					instance = EventBus.defaultInstance;
					if (instance == null) {
						instance = EventBus.defaultInstance = new EventBus();
					}
				}
			}
			return instance;
		}
			
		

getDefault()方法采用单例模式,synchronized关键字保证了线程安全。另外 defaultInstance变量使用volatile关键字修饰。 volatile修饰的变量对它的修改会立刻刷新到主存,比synchronized和Lock开销小。感兴趣的小伙伴请查阅其他资料。


getDefault()返回EventBus实例,通过该实例调用register()

			
		/**
		* Registers the given subscriber to receive events. 
		* Subscribers must call {@link #unregister(Object)} 
		* once they are no longer interested in receiving events.
		* 
		* Subscribers have event handling methods that must be 
		* annotated by {@link Subscribe}.
		* The {@link Subscribe} annotation also allows configuration 
		* like {@linkThreadMode} and priority.
		*/
		public void register(Object subscriber) {
			//Subscriber(订阅者)
			Class<?> subscriberClass = subscriber.getClass();
			//找出Subscriber(订阅者)内部所有的订阅方法
			List<SubscriberMethod> subscriberMethods = subscriberMethodFinder
					.findSubscriberMethods(subscriberClass);
			synchronized (this) {
				for (SubscriberMethod subscriberMethod : subscriberMethods) {
					//建立订阅关系,并用Map集合缓存数据
					subscribe(subscriber, subscriberMethod);
				}
			}
		}
			
		

register方法接收一个订阅者,常见情况就是一个Activity或者Fragment。 通过subscriberMethodFinder类的对象找到订阅者所有的订阅方法, 订阅方法就是指使用@Subscrib注解修饰的方法。

另外方法注释中提到:register()和Unregister()必须成对出现。 @Subscribe注解可配置 threadMode 和 priority 参数


register()内部遍历调用subscribe(),我们来分析一下这个方法:

			
	/**
	* key为事件类型。就是我们前面定义Event类。value是Subscription类的集合。
	* Subscription类表示订阅,把每一个订阅方法和其所在的订阅者封装为对象。
	*/
	private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
	//key是订阅者,value是其订阅的(事件类型)集合
	private final Map<Object, List<Class<?>>> typesBySubscriber;

	// 必须在同步块中调用
	private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
		//订阅方法的参数类型
		Class<?> eventType = subscriberMethod.eventType;
		//订阅方法和他所在的订阅者封装为一个Subscription类的对象
		Subscription newSubscription = new Subscription(subscriber, subscriberMethod);

		//把订阅方法缓存到Map中
		CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
		if (subscriptions == null) {
			subscriptions = new CopyOnWriteArrayList<>();
			subscriptionsByEventType.put(eventType, subscriptions);
		} else {
			if (subscriptions.contains(newSubscription)) {
				throw new EventBusException("Subscriber " + subscriber.getClass() 
				+ " already registered to event "+ eventType);
			}
		}

		//遍历集合,并按优先级降序重新排列
		int size = subscriptions.size();
		for (int i = 0; i <= size; i++) {
			if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
				subscriptions.add(i, newSubscription);
				break;
			}
		}

		//key是订阅者,value是其订阅的(事件类型)集合
		List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
		if (subscribedEvents == null) {
			subscribedEvents = new ArrayList<>();
			typesBySubscriber.put(subscriber, subscribedEvents);
		}
		subscribedEvents.add(eventType);

		//如果是粘性事件
		if (subscriberMethod.sticky) {
			if (eventInheritance) {
				// Existing sticky events of all subclasses of eventType have to be considered.
				// Note: Iterating over all events may be inefficient with lots of sticky events,
				// thus data structure should be changed to allow a more efficient lookup
				// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
				Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
				for (Map.Entry, Object> entry : entries) {
					Class<?> candidateEventType = entry.getKey();
					if (eventType.isAssignableFrom(candidateEventType)) {
						Object stickyEvent = entry.getValue();
						checkPostStickyEventToSubscription(newSubscription, stickyEvent);
					}
				}
			} else {
				Object stickyEvent = stickyEvents.get(eventType);
				checkPostStickyEventToSubscription(newSubscription, stickyEvent);
			}
		}
	}
			
		

该方法接受两个参数,第一个参数是订阅者,第二个参数是SubscriberMethod类的对象,SubscriberMethod表示订阅者内部的订阅方法。 并把两个参数封装为一个Subscription类的对象。通过成员变量subscriptionsByEventTypetypesBySubscriber缓存数据。 关于源码中的粘性事件部分,下一篇文章在继续学习。


第一小步注册订阅者已经分析完了,下面我们来分析第二小步:订阅者取消注册。回想EventBus的使用步骤,取消注册只有一个核心方法。unregister(this):

			
	/**
	* Unregisters the given subscriber from all event classes.
	*/
	public synchronized void unregister(Object subscriber) {
		//获取该订阅者订阅的所有事件类型
		List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
		//调用unregister()前必须调用registered()方法
		if (subscribedTypes != null) {
			for (Class<?> eventType : subscribedTypes) {
				//遍历调用。参数为订阅者和订阅类型
				unsubscribeByEventType(subscriber, eventType);
			}
			typesBySubscriber.remove(subscriber);
		} else {
			logger.log(Level.WARNING, "Subscriber to unregister was not registered before: "
					+ subscriber.getClass());
		}
	}

	/**
	* 更新且只更新subscriptionsByEventType
	*/
	private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
		//获取该eventType的所有订阅者。
		List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
		if (subscriptions != null) {
			int size = subscriptions.size();
			for (int i = 0; i < size; i++) {
				Subscription subscription = subscriptions.get(i);
				//缓存中的订阅者 等于 传入的订阅者,表示该缓存失效,需要清除。
				if (subscription.subscriber == subscriber) {
					//subscription.active标记为false,并从subscriptionse移除该对象
					subscription.active = false;
					subscriptions.remove(i);
					i--;
					size--;
				}
			}
		}
	}
			
		

接下来第三小步:注册订阅方法:

			
		@Subscribe(threadMode = ThreadMode.MAIN, priority = 0, sticky = true)
		public void onMessageEvent(MessageEvent event) {
			Log.d(TAG, "onMessageEvent: ");
		}
			
		

注册订阅方法很简单,用@Subscribe注释方法,就完成了注册。 Subscribe有如下三个属性可以配置:


第二步注册已经讲完了,接下来进入第三步:发送事件。其核心方法也是只有一个post(new MessageEvent());

			
		//Thread Local 修饰的变量线程不共享。变量在第一次调用get()时初始化,此后变量值不再改变
		private final ThreadLocal currentPostingThreadState =
				new ThreadLocal() {
					@Override
					protected PostingThreadState initialValue() {
						return new PostingThreadState();
					}
				};

		/**
		* 将给定事件发布到事件总线。
		*/
		public void post(Object event) {
			//下面三行代码,把event加入队列,等待订阅者处理
			PostingThreadState postingState = currentPostingThreadState.get();
			List<Object> eventQueue = postingState.eventQueue;
			eventQueue.add(event);
	
			//防止重复开启循环
			if (!postingState.isPosting) {
				postingState.isMainThread = isMainThread();
				postingState.isPosting = true;
				if (postingState.canceled) {
					throw new EventBusException("Internal error. Abort state was not reset");
				}
				try {
					while (!eventQueue.isEmpty()) {
						//传入当前需要处理的事件
						postSingleEvent(eventQueue.remove(0), postingState);
					}
				} finally {
					postingState.isPosting = false;
					postingState.isMainThread = false;
				}
			}
		}
				
			

处理事件的队列在PostingThreadState类中:

			
		/**
		* For ThreadLocal, much faster to set (and get multiple values).
		*/
		final static class PostingThreadState {
			final List<Object> eventQueue = new ArrayList<>();//事件队列
			boolean isPosting;//是否在发送中
			boolean isMainThread;//是否是主线程
			Subscription subscription;//当前正在处理的订阅关系,执行订阅方法
			Object event;//当前正在处理的事件
			boolean canceled;//是否取消事件
		}
			
		

回到Post方法中,其内部循环调用了postSingleEvent(),我们进入该方法:

		
	private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
		//事件类型
		Class<?> eventClass = event.getClass();
		//false表示该事件没有订阅者
		boolean subscriptionFound = false;
		//默认为true
		if (eventInheritance) {
			//根据event对象获取它所有的父类和实现的接口
			List<Class<?>>eventTypes = lookupAllEventTypes(eventClass);
			int countTypes = eventTypes.size();
			for (int h = 0; h < countTypes; h++) {
				Class<?> clazz = eventTypes.get(h);
				//发送事件
				subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
			}
		} else {
			subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
		}

		//事件没有订阅者时
		if (!subscriptionFound) {
			if (logNoSubscriberMessages) {//日志开关,默认true
				//打印日志
				logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
			}
			if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
					eventClass != SubscriberExceptionEvent.class) {
				post(new NoSubscriberEvent(this, event));
			}
		}
	}

	private boolean postSingleEventForEventType(Object event, PostingThreadState postingState,
													Class<?> eventClass) {
		CopyOnWriteArrayList<Subscription> subscriptions;
		synchronized (this) {
			//所有订阅了eventClass的方法集合
			subscriptions = subscriptionsByEventType.get(eventClass);
		}

		if (subscriptions != null && !subscriptions.isEmpty()) {
			//遍历执行所有的订阅方法
			for (Subscription subscription : subscriptions) {
				postingState.event = event;
				postingState.subscription = subscription;
				boolean aborted = false;
				try {
					postToSubscription(subscription, event, postingState.isMainThread);
					aborted = postingState.canceled;
				} finally {
					postingState.event = null;
					postingState.subscription = null;
					postingState.canceled = false;
				}
				if (aborted) {
					break;
				}
			}
			return true;
		}
		return false;
	}

	private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
		//订阅方法上设置的线程模型,大部分情况都是ThreadMode.MAIN。
		switch (subscription.subscriberMethod.threadMode) {
			case POSTING:
				invokeSubscriber(subscription, event);
				break;
			case MAIN:
				if (isMainThread) {
					//发射调用订阅方法
					invokeSubscriber(subscription, event);
				} else {
					//把事件发送到主线程执行
					mainThreadPoster.enqueue(subscription, event);
				}
				break;
			case MAIN_ORDERED:
				if (mainThreadPoster != null) {
					mainThreadPoster.enqueue(subscription, event);
				} else {
					// temporary: technically not correct as poster not decoupled from subscriber
					invokeSubscriber(subscription, event);
				}
				break;
			case BACKGROUND:
				if (isMainThread) {
					backgroundPoster.enqueue(subscription, event);
				} else {
					invokeSubscriber(subscription, event);
				}
				break;
			case ASYNC:
				asyncPoster.enqueue(subscription, event);
				break;
			default:
				throw new IllegalStateException("Unknown thread mode: " 
				+ subscription.subscriberMethod.threadMode);
		}
	}	
		
	

EventBus的工作流程我们已经从源码的角度梳理了一遍。关于源码中的粘性事件线程模型部分, 将在下一篇博客继续分析。