EventBus源码分析2

通过EventBus源码分析1的学习,我们已经知道了EventBus源码是如何实现发布订阅流程的。今天我们来继续学习EventBus 的粘性事件线程模型

粘性事件

这里不介绍粘性事件的使用方法,不了解的同学,请自行学习。直接进入源码分析部分,EventBus使用postSticky()发送粘性事件:

			
		/**
		* 缓存粘性事件,Map集合的key 为事件类型,value 就是事件本身。
		* 因为采用map的数据结构,所以每一个类型,只能缓存一个对象。
		*/
		private final Map<Class<?>, Object> stickyEvents;

		public void postSticky(Object event) {
			synchronized (stickyEvents) {
				stickyEvents.put(event.getClass(), event);
			}
			//和普通事件一样的发送逻辑
			post(event);
		}
			
		

post()是如何工作,请参考EventBus源码分析1。可以看到在发送过程种,粘性事件比普通事件,只是多了一步缓存操作。 接下来复用了普通post()方法。


继续下一个问题,粘性事件如何接收?


最后一个问题,取消粘性事件。缓存粘性事件会消耗内存资源,那要如何取消粘性事件呢?EventBus提供了下面三个方法:

			
		//根据事件类型,移除粘性事件
		public <T> T removeStickyEvent(Class<T> eventType) {
			synchronized (stickyEvents) {
				//把粘性事件从Map中移除,并强转为EventType表示的类的对象
				return eventType.cast(stickyEvents.remove(eventType));
			}
		}

		//根据给定的事件,删除粘性事件。
		//return true 如果事件匹配并且粘性事件已删除。
		public boolean removeStickyEvent(Object event) {
			//把粘性事件从Map中移除
			synchronized (stickyEvents) {
				Class<?> eventType = event.getClass();
				Object existingEvent = stickyEvents.get(eventType);
				if (event.equals(existingEvent)) {
					stickyEvents.remove(eventType);
					return true;
				} else {
					return false;
				}
			}
		}

		// 删除所有粘性事件。
		public void removeAllStickyEvents() {
			//清空map集合
			synchronized (stickyEvents) {
				stickyEvents.clear();
			}
		}
			
		

Tips:粘性事件使用完毕,要及时清除,释放内存资源。

到这里,EventBus源码中粘性事件的部分就讲解完了。下面我们继续分析EventBus的线程模型。


threadMode/线程模型

EventBus最常使用线程的地方就是订阅方法,我们就从这里开始:

			
			@Subscribe(threadMode = ThreadMode.BACKGROUND, priority = 0, sticky = true)
			public void onMessageEvent(Object event) {
				Log.d(TAG, "onMessageEvent: ");
			}
			
		

通过ThreadMode字段,我们可以指定线程。该字段是一个枚举类型。那我们就来看看这个枚举类,内部有多少个常量。

			
		public enum ThreadMode {
			/**
			* 发送事件、和执行订阅方法在同一个线程。没有线程切换,开销小。
			* 默认值。
			* 如果在主线程发送事件,订阅方法内部不要执行耗时操作,避免阻塞主线程
			*/
			POSTING,
		
			/**
			* 订阅方法在主线程执行,非Android系统,等于POSTING
			* 发送事件在主线程,不会涉及到线程切换。
			* 如果在其他线程发布事件,事件会被发送到消息队列,等待处理
			*/
			MAIN,
		
			/**
			* 不管发送事件在那个线程,订阅方法一定在主线程执行。
			* 使用消息队列处理事件。
			*/
			MAIN_ORDERED,
		
			/**
			* 在后台线程,执行订阅方法。 
			* 发送的线程不是主线程,那么发送事件,和执行订阅方法,在同一个线程
			* 在主线程发送事件,EventBus会启用线程池来执行订阅方法。
			*/
			BACKGROUND,
		
			/**
			* 总是启用另一个线程(非发布线程和主线程),来执行订阅方法。
			* 通过线程池管理线程
			* 该模式适合用来处理耗时操作,比如数据库操作等。
			*/
			ASYNC
		}
			
		

回顾EventBus的post流程我们发现,和ThreadMode相关的部分都在postToSubscription()中,我们就来分析一下这个方法:

			
	private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
		//订阅方法上设置ThreadMode 属性
		switch (subscription.subscriberMethod.threadMode) {
			case POSTING:
				invokeSubscriber(subscription, event);
				break;
			case MAIN:
				if (isMainThread) {
					//反射调用订阅方法
					invokeSubscriber(subscription, event);
				} else {
					//把事件发送到主线程执行  HandlerPoster
					mainThreadPoster.enqueue(subscription, event);
				}
				break;
			case MAIN_ORDERED:
				if (mainThreadPoster != null) {
					mainThreadPoster.enqueue(subscription, event);
				} else {
					// temporary: technically not correct as poster not
					// decoupled from subscriber
					// 不正确的代码,不用管,上面英文注释,完全复制源码中的注释
					invokeSubscriber(subscription, event);
				}
				break;
			case BACKGROUND:
				if (isMainThread) {
					backgroundPoster.enqueue(subscription, event);
				} else {
					//发布事件和执行订阅方法在同一个线程
					invokeSubscriber(subscription, event);
				}
				break;
			case ASYNC:
				asyncPoster.enqueue(subscription, event);
				break;
			default:
				throw new IllegalStateException("Unknown thread mode: " 
				+ subscription.subscriberMethod.threadMode);
		}
	}
			
		

仔细对照postToSubscription()源码,可以很好的验证我们对于线程模型的理解。该方法内部使用了以下字段,我们一个一个分析。

			
				//主线程接口类,Android上就是UI线程
				private final MainThreadSupport mainThreadSupport;
				//继承Handler,负责向MainLooper发送消息
				private final Poster mainThreadPoster;
				//后台线程
				private final BackgroundPoster backgroundPoster;
				//异步线程
				private final AsyncPoster asyncPoster;
				//线程池,复用线程
				private final ExecutorService executorService;
			
		

EventBus的构造方法中,对这些变量进行了初始化。

			
			EventBus(EventBusBuilder builder) {
				......

				mainThreadSupport = builder.getMainThreadSupport();
				//主线程Post
				mainThreadPoster = mainThreadSupport != null ? 
				mainThreadSupport.createPoster(this) : null;
				//后台线程Post
				backgroundPoster = new BackgroundPoster(this);
				//异步线程Post
				asyncPoster = new AsyncPoster(this);

				......

				//初始化线程池
				executorService = builder.executorService;
			}
			
		

mainThreadSupport & mainThreadPoster

构造方法中,调用了getMainThreadSupport()获取mainThreadSupport对象。该方法内部获取并保存MainLooper。

			
		MainThreadSupport getMainThreadSupport() {
			if (mainThreadSupport != null) {
				return mainThreadSupport;
				//判断Android系统Log类是否可用
			} else if (AndroidLogger.isAndroidLogAvailable()) {
				// 获取主线程的Looper
				Object looperOrNull = getAndroidMainLooperOrNull();
				return looperOrNull == null ? null :
						new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
			} else {
				return null;
			}
		}
			
		
			
		/**
		* “主”线程的接口,可以是您喜欢的任何线程。通常在Android上,使用Android的主线程。
		*/
		public interface MainThreadSupport {

			boolean isMainThread();
		
			Poster createPoster(EventBus eventBus);
		
			class AndroidHandlerMainThreadSupport implements MainThreadSupport {
		
				//主线程Looper
				private final Looper looper;
		
				public AndroidHandlerMainThreadSupport(Looper looper) {
					this.looper = looper;
				}
		
				@Override
				public boolean isMainThread() {
					//获取当前线程的Looper 并判断是不是等于主线程的Looper
					return looper == Looper.myLooper();
				}
		
				@Override
				public Poster createPoster(EventBus eventBus) {
					return new HandlerPoster(eventBus, looper, 10);
				}
			}
		
		}
			
		

createPoster()方法负责创建mainThreadPoster对象。HandlerPoster继承Handler,和主线程的Looper绑定。提供了public的enqueue(),用来发送事件。 源码中细节比较多,这里无法面面俱到。还请参阅源码阅读。我们来看一下比较关键的handleMessage()方法。

			
		@Override
		public void handleMessage(Message msg) {
			//临时标记变量,表示消息是否处理完毕
			boolean rescheduled = false;
			try {
				long started = SystemClock.uptimeMillis();
				while (true) {
					//从队列中取出消息
					PendingPost pendingPost = queue.poll();
					if (pendingPost == null) {
						synchronized (this) {
							//再次检查,这次同步
							pendingPost = queue.poll();
							if (pendingPost == null) {
								handlerActive = false;
								return;
							}
						}
					}
					//执行订阅流程
					eventBus.invokeSubscriber(pendingPost);
					long timeInMethod = SystemClock.uptimeMillis() - started;
					if (timeInMethod >= maxMillisInsideHandleMessage) {
						if (!sendMessage(obtainMessage())) {
							throw new EventBusException(
								"Could not send handler message");
						}
						rescheduled = true;
						return;
					}
				}
			} finally {
				//return前,执行的最后一句代码
				handlerActive = rescheduled;
			}
		}
			
		

handlerActive为true表示mainThreadPoster在工作中,事件入列,等待处理。 为false,事件入列后,还需要向MainLooper发送Message。启动handleMessage(),开始工作。


当我们在非主线程发送消息时,事件一步步传递到postToSubscription()内,如果的订阅方法的ThreadMode == MAIN 或者 == MAIN_ORDERED, 事件将通过mainThreadPoster.enqueue()入列,等待处理。mainThreadPoster本质上是一个和主线程Looper绑定的Handler。


BackgroundPoster

			
		final class BackgroundPoster implements Runnable, Poster {

			//消息队列
			private final PendingPostQueue queue;
			private final EventBus eventBus;
		
			//当前处理事件的线程 是否在工作中
			private volatile boolean executorRunning;
		
			BackgroundPoster(EventBus eventBus) {
				this.eventBus = eventBus;
				queue = new PendingPostQueue();
			}
		
			public void enqueue(Subscription subscription, Object event) {
				PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
				synchronized (this) {
					queue.enqueue(pendingPost);
					if (!executorRunning) {
						executorRunning = true;
						//用线程池 执行 这个Runnable
						eventBus.getExecutorService().execute(this);
					}
				}
			}
		
			@Override
			public void run() {
				try {
					try {
						while (true) {
							PendingPost pendingPost = queue.poll(1000);
							if (pendingPost == null) {
								synchronized (this) {
									// Check again, this time in synchronized
									pendingPost = queue.poll();
									if (pendingPost == null) {
										executorRunning = false;
										return;
									}
								}
							}
							eventBus.invokeSubscriber(pendingPost);
						}
					} catch (InterruptedException e) {
						eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
					}
				} finally {
					//return前,执行的最后一句代码
					executorRunning = false;
				}
			}
		
		}
			
		

BackgroundPoster继承Runnable和Poster,通过enqueue()将消息入列,通过线程池来执行Runnable。 通过executorRunning字段进行简单的判断。是否有正在的工作的线程。有的话直接复用。


asyncPoster

			
		class AsyncPoster implements Runnable, Poster {

			private final PendingPostQueue queue;
			private final EventBus eventBus;
		
			AsyncPoster(EventBus eventBus) {
				this.eventBus = eventBus;
				queue = new PendingPostQueue();
			}
		
			public void enqueue(Subscription subscription, Object event) {
				PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
				queue.enqueue(pendingPost);
				//打开另外一个线程,执行run()
				eventBus.getExecutorService().execute(this);
			}
		
			@Override
			public void run() {
				PendingPost pendingPost = queue.poll();
				if (pendingPost == null) {
					throw new IllegalStateException("No pending post available");
				}
				//执行订阅方法
				eventBus.invokeSubscriber(pendingPost);
			}
		
		}
			
		

AsyncPoster和BackgroundPoster很像,且更简单。每次有需要执行的订阅方法。直接从线程池取线程执行。


到这里,EventBus的粘性事件和线程模型我们就分析完毕了。加上 第一篇文章,EventBus核心功能的源码,我们已经分析完毕。