注册

Android实现消息总线的几种方式,你都会吗?

Android中消息总线的几种实现方式

前言

消息总线又叫事件总线,为什么我们需要一个消息总线呢?是因为随着项目变大,页面变多,我们可能出现跨页面、跨组件、跨线程、跨进程传递消息与数据,为了更方便的直接通知到指定的页面实现具体的逻辑,我们需要消息总线来实现。

从最基本的 BroadcastReceiver 到 EventBus 再到RxBus ,后来官方出了AndroidX jetpack 我们开始使用LiveDataBus,最后到Kotlin的流行出来了FlowBus。我们看看他们是怎么一步一步演变的。

一、BroadcastReceiver 广播

我们再初入 Android 的时候都应该学过广播接收者,分为静态广播和动态注册广播,在高版本的 Android 中限制了我们一些静态广播的使用,不过我们还是能通过动态注册的方式获取一些系统的状态改变。像常用的电量变化、网络状态变化、短信发送接收的状态等等。

比如网络变化的监听:

    IntentFilter intentFilter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
application.getApplicationContext().registerReceiver(InstanceHolder.INSTANCE, intentFilter);

在消息中线中,我们可以使用本地广播来实现 LocalBroadcastManager 消息的通知。

    LocalBroadcastManager mLocalBroadcastManager = LocalBroadcastManager.getInstance(mContext);

BroadcastReceiver mLoginReceiver = new LoginSuccessReceiver();
mLocalBroadcastManager.registerReceiver(mLoginReceiver, new IntentFilter(Constants.ACTION_LOGIN_SUCCESS));

private class LoginSuccessReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
//刷新Home界面
refreshHomePage();

//刷新未读信息
requestUnreadNum();
}
}

//记得要解绑对应的接收器
mLocalBroadcastManager.unregisterReceiver(mLoginReceiver);

这样就可以实现一个消息通知了。相比 EventBus 它的性能和空间的消耗都是较大的,并且只能固定在主线程运行。

二、EventBus

EventBus最大的特点就是简洁、解耦,可以直接传递我们自定义的消息Message。EventBus简化了应用程序内各组件间、组件与后台线程间的通信。记得2015年左右是非常火爆的。

EventBus的调度灵活,不依赖于 Context,使用时无需像广播一样关注 Context 的注入与传递。可继承、优先级、粘滞,是 EventBus 比之于广播的优势。几乎可以满足我们全部的需求。

最初的EventBus其实就是一个方法的集合与查找,核心是通过register方法把带有@Subscrib注解的方法和参数之类的东西全部放入一个List集合,然后通过post方法去这个list循环查找到符合条件的方法去执行。

如何使用EventBus,一共分5步:

  @Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_event_bus);

EventBus.getDefault().register(MainActivity.this); //1.注册广播
}
  @Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(MainActivity.this); //2.解注册广播
}
/**
* 3.传递什么类型的。定义一个消息类
*/
public class MessageEvent {
public String name;

public MessageEvent(String name) {
this.name = name;
}
}
    @OnClick({R.id.bt_eventbus_send_main, R.id.bt_eventbus_send_sticky})
public void onClick(View view) {
switch (view.getId()) {
case R.id.bt_eventbus_send_main:
//4.发送消息
EventBus.getDefault().post(new MessageEvent("我是主页面发送过来的消息"));
finish();
break;
}
}
   /**
* 5.接受到消息。需要注解
*
* @param event
*/
@Subscribe(threadMode = ThreadMode.MAIN) //主线程执行
public void MessageEventBus(MessageEvent event) {
//5。显示接受到的消息
mTvEventbusResult.setText(event.name);
}

EventBus的性能开销其实不大,EventBus2.4.0 版是利用反射来实现的,后来改成 APT 实现之后会好很多。主要问题是需要定义很多的消息对象,消息太多之后就感觉管理起来很麻烦。当消息太多之后容器内部的查找会出现性能瓶颈。

就算如此 EventBus 也是值得大家使用的。

三、RxBus

RxBus是基于RxJava实现的,强大是强大,但是学习成本比较高,需要额外导入RxJava RxAndroid等库,这些库体积还是较大的。可以实现异步的消息等。

本身的实现是很简单的:

public class RxBus {
private volatile static RxBus mDefaultInstance;
private final Subject<Object> mBus;

private RxBus() {
mBus = PublishSubject.create().toSerialized();
}

public static RxBus getInstance() {
if (mDefaultInstance == null) {
synchronized (RxBus.class) {
if (mDefaultInstance == null) {
mDefaultInstance = new RxBus();
}
}
}
return mDefaultInstance;
}

/**
* 发送事件
*/
public void post(Object event) {
mBus.onNext(event);
}

/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservable(final Class<T> eventType) {
return mBus.ofType(eventType);
}

/**
* 判断是否有订阅者
*/
public boolean hasObservers() {
return mBus.hasObservers();
}

public void reset() {
mDefaultInstance = null;
}

}

定义消息对象:

public class MsgEvent {
private String msg;

public MsgEvent(String msg) {
this.msg = msg;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}

发送与接收:

RxBus.getInstance().toObservable(MsgEvent.class).subscribe(new Observer<MsgEvent>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(MsgEvent msgEvent) {
//处理事件
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});


RxBus.getInstance().post(new MsgEvent("Java"));

缺点是容易内存泄露,我们需要使用rxlifecycle 或者使用CompositeDisposable 自己对生命周期进行处理解绑。

四、LiveDataBus

官方出了AndroidX jetpack 内部包含LiveData,它可以感知并遵循Activity、Fragment或Service等组件的生命周期。

为什么要使用LiveDataBus,正是基于LiveData对组件生命周期可感知的特点,因此可以做到仅在组件处于生命周期的激活状态时才更新UI数据。

一个简单的LiveDataBus的实现:

public final class LiveDataBus {

private final Map<String, BusMutableLiveData<Object>> bus;

private LiveDataBus() {
bus = new HashMap<>();
}

private static class SingletonHolder {
private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
}

public static LiveDataBus get() {
return SingletonHolder.DEFAULT_BUS;
}

public <T> MutableLiveData<T> with(String key, Class<T> type) {
if (!bus.containsKey(key)) {
bus.put(key, new BusMutableLiveData<>());
}
return (MutableLiveData<T>) bus.get(key);
}

public MutableLiveData<Object> with(String key) {
return with(key, Object.class);
}

private static class ObserverWrapper<T> implements Observer<T> {

private Observer<T> observer;

public ObserverWrapper(Observer<T> observer) {
this.observer = observer;
}

@Override
public void onChanged(@Nullable T t) {
if (observer != null) {
if (isCallOnObserve()) {
return;
}
observer.onChanged(t);
}
}

private boolean isCallOnObserve() {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (stackTrace != null && stackTrace.length > 0) {
for (StackTraceElement element : stackTrace) {
if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
"observeForever".equals(element.getMethodName())) {
return true;
}
}
}
return false;
}
}

private static class BusMutableLiveData<T> extends MutableLiveData<T> {

private Map<Observer, Observer> observerMap = new HashMap<>();

@Override
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
super.observe(owner, observer);
try {
hook(observer);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void observeForever(@NonNull Observer<T> observer) {
if (!observerMap.containsKey(observer)) {
observerMap.put(observer, new ObserverWrapper(observer));
}
super.observeForever(observerMap.get(observer));
}

@Override
public void removeObserver(@NonNull Observer<T> observer) {
Observer realObserver = null;
if (observerMap.containsKey(observer)) {
realObserver = observerMap.remove(observer);
} else {
realObserver = observer;
}
super.removeObserver(realObserver);
}

private void hook(@NonNull Observer<T> observer) throws Exception {
//get wrapper's version
Class<LiveData> classLiveData = LiveData.class;
Field fieldObservers = classLiveData.getDeclaredField("mObservers");
fieldObservers.setAccessible(true);
Object objectObservers = fieldObservers.get(this);
Class<?> classObservers = objectObservers.getClass();
Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
methodGet.setAccessible(true);
Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
Object objectWrapper = null;
if (objectWrapperEntry instanceof Map.Entry) {
objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
}
if (objectWrapper == null) {
throw new NullPointerException("Wrapper can not be bull!");
}
Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
fieldLastVersion.setAccessible(true);
//get livedata's version
Field fieldVersion = classLiveData.getDeclaredField("mVersion");
fieldVersion.setAccessible(true);
Object objectVersion = fieldVersion.get(this);
//set wrapper's version
fieldLastVersion.set(objectWrapper, objectVersion);
}
}
}

注册与发送:

LiveDataBus.get()
.with("key_test", String.class)
.observe(this, new Observer<String>() {
@Override
public void onChanged(@Nullable String s) {
}
});

LiveDataBus.get().with("key_test").setValue(s);

LiveDataBus已经算是很好用的,自动注册解绑,根据Key传递泛型T对象,容易查找对应的接收者,也可以实现可见的触发和直接触发,可以实现跨进程,

LiveData有几点不足,只能在主线程更新数据,操作符无法转换数据,基于 Android Api 实现的,换一个平台无法适应,基于这几点又开发出了FlowBus。

五、FlowBus

很多人都说Flow 的出现导致 LiveData 没那么重要了,就是因为 LiveData 的场景 都可以使用 Flow 平替了,还能更为的强大和灵活。

StateFlow 可以 替代ViewModel中传递数据,SharedFlow 可以实现事件总线。(这两者的异同如果大家有兴趣,我可以单独开一篇讲下)。

SharedFlow 就是一种热流,可以实现一对多的关系,其构造方法支持天然支持普通的消息发送与粘性的消息发送。一般我们FlowBus都是基于 SharedFlow 来实现:

object FlowBus {
private val busMap = mutableMapOf<String, EventBus<*>>()
private val busStickMap = mutableMapOf<String, StickEventBus<*>>()

@Synchronized
fun <T> with(key: String): EventBus<T> {
var eventBus = busMap[key]
if (eventBus == null) {
eventBus = EventBus<T>(key)
busMap[key] = eventBus
}
return eventBus as EventBus<T>
}

@Synchronized
fun <T> withStick(key: String): StickEventBus<T> {
var eventBus = busStickMap[key]
if (eventBus == null) {
eventBus = StickEventBus<T>(key)
busStickMap[key] = eventBus
}
return eventBus as StickEventBus<T>
}

//真正实现类
open class EventBus<T>(private val key: String) : LifecycleObserver {

//私有对象用于发送消息
private val _events: MutableSharedFlow<T> by lazy {
obtainEvent()
}

//暴露的公有对象用于接收消息
val events = _events.asSharedFlow()

open fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST)

//主线程接收数据
fun register(lifecycleOwner: LifecycleOwner, action: (t: T) -> Unit) {
lifecycleOwner.lifecycle.addObserver(this)
lifecycleOwner.lifecycleScope.launch {
events.collect {
try {
action(it)
} catch (e: Exception) {
e.printStackTrace()
YYLogUtils.e("FlowBus - Error:$e")
}
}
}
}

//协程中发送数据
suspend fun post(event: T) {
_events.emit(event)
}

//主线程发送数据
fun post(scope: CoroutineScope, event: T) {
scope.launch {
_events.emit(event)
}
}

//自动销毁
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
YYLogUtils.w("FlowBus - 自动onDestroy")
val subscriptCount = _events.subscriptionCount.value
if (subscriptCount <= 0)
busMap.remove(key)
}
}

class StickEventBus<T>(key: String) : EventBus<T>(key) {
override fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(1, 1, BufferOverflow.DROP_OLDEST)
}

}

发送与接收消息

    // 主线程-发送消息
FlowBus.with<String>("test-key-01").post(this@Demo11OneFragment2.lifecycleScope, "Test Flow Bus Message")
    // 接收消息
FlowBus.with<String>("test-key-01").register(this) {
LogUtils.w("收到FlowBus消息 - " + it)
}

发送粘性消息

 FlowBus.withStick<String>("test-key-02").post(lifecycleScope, "Test Stick Message")
   FlowBus.withStick<String>("test-key-02").register(this){
LogUtils.w("收到粘性消息:$it")
}

Log如下:

总结

其实这么多消息总线框架,目前比较常用的是EventBus LiveDataBus FlowBus这三种。

总的来说,我们尽量不依赖第三方的框架来实现,那么 FlowBus 是语言层级的,基于Kotlin的特性实现,比较推荐了。LiveDataBus 是基于Android SDK 中的类实现的(我本人是比较喜欢用),只适应于 Android 开发,但也几乎能满足日常使用了。EventBus 是基于 Java 的语言特性注解和APT,也是比较好用的。

如果大家有源码方面的需求可以看看这里,上面的源码也都贴出来了。

本文的代码也只是简单的实现,只是为了抛砖引玉的实现几种基本的代码,如果大家需要在实战汇总使用,更推荐大家根据不同的类型自行去 Github 上面找对应的实现封装,功能会更多,健壮性也更好。

好了,关于消息总线就说到这了,如果觉得不错还请点赞支持哦!

完结!


作者:newki
链接:https://juejin.cn/post/7108604765898014757
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册