浅析Dubbo的事件驱动机制

本文介绍的是Dubbo 2.7.5中的事件驱动机制,简要分析了其原理,分析了引入事件驱动的原因。

前言

事件驱动程序设计(英语:Event-driven programming)是一种电脑程序设计模型。这种模型的程序运行流程是由用户的动作(如鼠标的按键,键盘的按键动作)或者是由其他程序的消息来决定的。事件驱动程序设计这种设计模型是在交互程序(Interactive program)的情况下孕育而生的。

从这段维基百科的引用可知,事件驱动一开始是由于交互程序中的行为而产生的。这种设计也符合现实中的行为,生活当中经常需要应对一件事情从而行为做出改变。比如听到了闹钟声就要起床,闹钟声就是一个事件,而起床则是人们对于这件事做出的动作。
我们可以将闹钟起床抽象拆分为两个部分:

  • 事件:具体的事情;
  • 事件触发时做出的反应;

Java对Event的支持

在JDK1.1版本中的java.util 包中有两个类:EventObject 和 EventListener,所对应的概念就是上面提到的两个部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package java.util;

/**
* <p>
* The root class from which all event state objects shall be derived.
* <p>
* All Events are constructed with a reference to the object, the "source",
* that is logically deemed to be the object upon which the Event in question
* initially occurred upon.
*
* @since JDK1.1
*/

public class EventObject implements java.io.Serializable {

private static final long serialVersionUID = 5516075349620653480L;

/**
* The object on which the Event initially occurred.
*/
protected transient Object source;

/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @exception IllegalArgumentException if source is null.
*/
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");

this.source = source;
}

/**
* The object on which the Event initially occurred.
*
* @return The object on which the Event initially occurred.
*/
public Object getSource() {
return source;
}

/**
* Returns a String representation of this EventObject.
*
* @return A a String representation of this EventObject.
*/
public String toString() {
return getClass().getName() + "[source=" + source + "]";
}
}

EventObject中持有一个source的Object对象,同时可以通过getSource() 方法获取到source,所以source就是真实的事件。

1
2
3
4
5
6
7
8
9
package java.util;

/**
* A tagging interface that all event listener interfaces must extend.
* @since JDK1.1
*/
public interface EventListener {
}

而EventListener 是一个没有方法的Interface,具体的动作行为我们可以通过继承的方式去定义。
如前言中说的,事件驱动是由交互应用所产生,以上两个类在Java GUI编程中随处可见,在java.awt.event中封装了一系列的Event和EventLister,用于描述诸如鼠标,输入框、窗口等GUI事件。
-w350

Dubbo中 事件发布的实现 Java Event:

在Dubbo 2.7.5的Release版本中,添加了事件驱动机制。在org.apache.dubbo.event中,主要看3个类:Event、EventListener、EventDispatcher,其中EventDispatcher可译作事件分发器,其用于分发和执行相应的Event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package org.apache.dubbo.event;

import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.SPI;

import java.util.concurrent.Executor;

/**
* {@link Event Dubbo Event} Dispatcher
*
* @see Event
* @see EventListener
* @see DirectEventDispatcher
* @since 2.7.5
*/
@SPI("direct")
public interface EventDispatcher extends Listenable<EventListener<?>> {

/**
* Direct {@link Executor} uses sequential execution model
*/
Executor DIRECT_EXECUTOR = Runnable::run;

/**
* Dispatch a Dubbo event to the registered {@link EventListener Dubbo event listeners}
*
* @param event a {@link Event Dubbo event}
*/
void dispatch(Event event);

/**
* The {@link Executor} to dispatch a {@link Event Dubbo event}
*
* @return default implementation directly invoke {@link Runnable#run()} method, rather than multiple-threaded
* {@link Executor}. If the return value is <code>null</code>, the behavior is same as default.
* @see #DIRECT_EXECUTOR
*/
default Executor getExecutor() {
return DIRECT_EXECUTOR;
}

/**
* The default extension of {@link EventDispatcher} is loaded by {@link ExtensionLoader}
*
* @return the default extension of {@link EventDispatcher}
*/
static EventDispatcher getDefaultExtension() {
return ExtensionLoader.getExtensionLoader(EventDispatcher.class).getDefaultExtension();
}
}

从接口上的注解@SPI(“direct”)可知,EventDispatcher是一个SPI接口,且默认实现对应为DirectEventDispatcher。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.apache.dubbo.event;

/**
* Direct {@link EventDispatcher} implementation uses current thread execution model
*
* @see EventDispatcher
* @since 2.7.5
*/
public final class DirectEventDispatcher extends AbstractEventDispatcher {

public DirectEventDispatcher() {
super(DIRECT_EXECUTOR);
}
}

大部分逻辑在AbstractEventDispatcher中实现,我们具体看AbstractEventDispatcher的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public abstract class AbstractEventDispatcher implements EventDispatcher {

private final Object mutex = new Object();

private final ConcurrentMap<Class<? extends Event>, List<EventListener>> listenersCache = new ConcurrentHashMap<>();

private final Executor executor;

...

@Override
public void addEventListener(EventListener<?> listener) throws NullPointerException, IllegalArgumentException {
Listenable.assertListener(listener);
doInListener(listener, listeners -> {
addIfAbsent(listeners, listener);
});
}

...

@Override
public void dispatch(Event event) {

Executor executor = getExecutor();

// execute in sequential or parallel execution model
executor.execute(() -> {
sortedListeners(entry -> entry.getKey().isAssignableFrom(event.getClass()))
.forEach(listener -> {
if (listener instanceof ConditionalEventListener) {
ConditionalEventListener predicateEventListener = (ConditionalEventListener) listener;
if (!predicateEventListener.accept(event)) { // No accept
return;
}
}
// Handle the event
listener.onEvent(event);
});
});
}

/**
* @return the non-null {@link Executor}
*/
@Override
public final Executor getExecutor() {
return executor;
}


}

从关键代码中可以看出,AbstractEventDispatcher中包含3个成员变量,其中listenersCache维护的是listeners列表,executor表示的是用户执行listener逻辑的线程执行器,默认使用当前线程。dispatch()中通过Event的类型从listenersCache找到对应listeners列表后,逐一执行listener.onEvent(event),onEvent()所定义的则是listener中的实际业务逻辑。

补充一下,Dubbo的Event和EventListener继承的是JDK的EventObject和EventListener,也就是说Dubbo的事件驱动基于JDK的事件驱动机制。

为什么Dubbo发布到2.7.5版本才添加了事件驱动机制

在2.7.5版本前,Dubbo支持事件通知特性。对于特定服务,在调用之前、调用之后、出现异常时,会触发 oninvoke、onreturn、onthrow 三个事件,可以配置当事件发生时,通知哪个类的哪个方法.与其叫做事件通知,更像是一种AOP机制,在服务调用生命周期中切入业务逻辑。

之所以Dubbo在2.7.5才添加了事件驱动机制,是因为在这个版本,Dubbo同时加入服务自省架构。关于服务自省架构可以阅读Dubbo 迈出云原生重要一步 - 应用级服务发现解析解释的非常详细。在这里简单解释,通俗来讲,在Dubbo里面我们所讲的服务指的RPC Service,粒度是接口级别的。倘若后面再接触Spring Cloud体系的微服务,则发现其中服务指的是一个应用,粒度是应用级别的(之前我就很好奇为什么Spring Cloud为什么不维护接口的信息呢)。

  • 对于Consumer而言,其更应该关注的是Provider实例的IP+PORT列表,而Provider所发布的接口信息并非是必选项。
  • 粒度为接口级别的Dubbo服务会带来更多的数据冗余

为此,Dubbo决定把自己的微服务发现模型也向Spring Cloud去拉齐,也就是说之前在注册中心维护的数据结构从RPC Service -> Instance 变为 Application -> Instance。
举个例子:

RPC Service -> Instance:

1
dubbo://10.74.139.59:20881/com.dubbo.spi.demo.api.IHelloService?anyhost=true&application=demo-provider&dubbo=2.6.0&generic=false&interface=com.dubbo.spi.demo.api.IHelloService&methods=hello&pid=40849&side=provider&timestamp=1594650881680

Application -> Instance

1
{"name":"spring-cloud-alibaba-dubbo-provider","id":"9c5c1bce-13de-4023-b2d8-fde037ae6041","address":"10.74.139.119","port":9090,"sslPort":null,"payload":{"@class":"org.springframework.cloud.zookeeper.discovery.ZookeeperInstance","id":"spring-cloud-alibaba-dubbo-provider-1","name":"spring-cloud-alibaba-dubbo-provider","metadata":{"dubbo.metadata-service.urls":"[ \"dubbo://127.0.0.1:20880/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=127.0.0.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs&pid=15432&qos.enable=false&release=2.7.6&revision=1.0.0&side=provider&timestamp=1594977679715&version=1.0.0\" ]","dubbo.protocols.dubbo.port":"20880","dubbo.protocols.rest.port":"9090"}},"registrationTimeUTC":1594977677964,"serviceType":"DYNAMIC","uriSpec":{"parts":[{"value":"scheme","variable":true},{"value":"://","variable":false},{"value":"address","variable":true},{"value":":","variable":false},{"value":"port","variable":true}]}}

看起来数据结构更大了,但由于服务粒度不同,总体来说需要维护的数据变少了。
怎么兼容旧版本的设计?新的服务发现模型要实现对原有 Dubbo 消费端开发者的无感知迁移,即 Dubbo 继续面向 RPC 服务编程、面向 RPC 服务治理,做到对用户侧完全无感知。
为此,Dubbo新增了一个MetaService的服务,服务端实例会暴露一个预定义的 MetadataService RPC 服务,消费端通过调用 MetadataService 获取每个实例 RPC 方法相关的配置信息。

-w739
服务的提供者需要维护一个元数据服务(MetadataService).
服务自省架构流程更加复杂,执行动作之间的关联非常紧密。

相较于传统的 Dubbo 架构,服务自省架构的执行流程更为复杂,执行动作之间的关联非常紧密,如 Dubbo Service 服务实例注册前需要完成 Dubbo 服务 revision 的计算,并将其添加至服务实例的 metadata 中。又如当 Dubbo Service 服务实例出现变化时,Consumer 元数据需要重新计算。这些动作被 “事件”(Event)驱动,驱动者被定义为“事件分发器”( EventDispatcher ),而动作的处理则由“事件监听器”(EventListener)执行,三者均为 “Dubbo 事件“的核心组件

Dubbo 中的事件分类

Dubbo 提供的事件类型如下:

  • Dubbo进程级别:提供一些进程的勾子事件,如:DubboShutdownHookRegisteredEvent、DubboShutdownHookUnregisteredEvent、DubboServiceDestroyedEvent;
  • 和服务发现组件生命周期有关:ServiceDiscoveryInitializingEvent、ServiceDiscoveryInitializedEvent、ServiceDiscoveryExceptionEvent、ServiceDiscoveryDestroyingEvent、ServiceDiscoveryDestroyedEvent;
  • 和服务暴露和引用的生命周期有关:ServiceConfigExportedEvent、ServiceConfigUnexportedEvent、ReferenceConfigInitializedEvent、ReferenceConfigDestroyedEvent
  • 和服务实例注册和注销等有关:ServiceInstancePreRegisteredEvent、ServiceInstanceRegisteredEvent、ServiceInstancePreUnregisteredEvent、ServiceInstanceUnregisteredEvent、ServiceInstancesChangedEvent;

在Dubbo源码中有一个LoggingEventListenerTest测试类可以很好的演示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Test
public void testOnEvent() throws Exception {

URL connectionURL = URL.valueOf("file:///Users/Home");

ServiceDiscovery serviceDiscovery = new FileSystemServiceDiscovery();

serviceDiscovery.initialize(connectionURL);

// ServiceDiscoveryStartingEvent
listener.onEvent(new ServiceDiscoveryInitializingEvent(serviceDiscovery, serviceDiscovery));

// ServiceDiscoveryStartedEvent
listener.onEvent(new ServiceDiscoveryInitializedEvent(serviceDiscovery, serviceDiscovery));

// ServiceInstancePreRegisteredEvent
listener.onEvent(new ServiceInstancePreRegisteredEvent(serviceDiscovery, createInstance()));

// ServiceInstanceRegisteredEvent
listener.onEvent(new ServiceInstanceRegisteredEvent(serviceDiscovery, createInstance()));

// ServiceInstancesChangedEvent
listener.onEvent(new ServiceInstancesChangedEvent("test", singleton(createInstance())));

// ServiceInstancePreUnregisteredEvent
listener.onEvent(new ServiceInstancePreUnregisteredEvent(serviceDiscovery, createInstance()));

// ServiceInstanceUnregisteredEvent
listener.onEvent(new ServiceInstanceUnregisteredEvent(serviceDiscovery, createInstance()));

// ServiceDiscoveryStoppingEvent
listener.onEvent(new ServiceDiscoveryDestroyingEvent(serviceDiscovery, serviceDiscovery));

// ServiceDiscoveryStoppedEvent
listener.onEvent(new ServiceDiscoveryDestroyedEvent(serviceDiscovery, serviceDiscovery));
}

LoggingEventListenerTest模拟了服务发现组件从初始化到服务注册、注销、到组件注销的整个生命周期,测试结果如下:

1
2
3
4
5
6
7
8
9
10
[28/07/20 00:13:08:661 CST]  INFO logger.LoggerFactory: using logger: org.apache.dubbo.common.logger.log4j.Log4jLoggerAdapter
[28/07/20 00:13:08:773 CST] INFO listener.LoggingEventListener: [DUBBO] org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18 is initializing..., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:773 CST] INFO listener.LoggingEventListener: [DUBBO] org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18 is initialized., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:776 CST] INFO listener.LoggingEventListener: [DUBBO] DefaultServiceInstance{id='45903280824015', serviceName='A', host='127.0.0.1', port=8080, enabled=true, healthy=true, metadata={dubbo.metadata-service.url-params={"dubbo":{"application":"dubbo-provider-demo","deprecated":"false","group":"dubbo-provider-demo","version":"1.0.0","timestamp":"1564845042651","dubbo":"2.0.2","provider.host":"192.168.0.102","provider.port":"20880"}}, dubbo.metadata-service.urls=[ "dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs&pid=17134&qos.enable=false&register=true&release=2.7.3&revision=1.0.0&side=provider&timestamp=1564826098503&version=1.0.0" ]}} is registering into org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18..., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:777 CST] INFO listener.LoggingEventListener: [DUBBO] DefaultServiceInstance{id='45903281361282', serviceName='A', host='127.0.0.1', port=8080, enabled=true, healthy=true, metadata={dubbo.metadata-service.url-params={"dubbo":{"application":"dubbo-provider-demo","deprecated":"false","group":"dubbo-provider-demo","version":"1.0.0","timestamp":"1564845042651","dubbo":"2.0.2","provider.host":"192.168.0.102","provider.port":"20880"}}, dubbo.metadata-service.urls=[ "dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs&pid=17134&qos.enable=false&register=true&release=2.7.3&revision=1.0.0&side=provider&timestamp=1564826098503&version=1.0.0" ]}} has been registered into org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:777 CST] INFO listener.LoggingEventListener: [DUBBO] The services'[name : test] instances[size : 1] has been changed., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:778 CST] INFO listener.LoggingEventListener: [DUBBO] DefaultServiceInstance{id='45903282570448', serviceName='A', host='127.0.0.1', port=8080, enabled=true, healthy=true, metadata={dubbo.metadata-service.url-params={"dubbo":{"application":"dubbo-provider-demo","deprecated":"false","group":"dubbo-provider-demo","version":"1.0.0","timestamp":"1564845042651","dubbo":"2.0.2","provider.host":"192.168.0.102","provider.port":"20880"}}, dubbo.metadata-service.urls=[ "dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs&pid=17134&qos.enable=false&register=true&release=2.7.3&revision=1.0.0&side=provider&timestamp=1564826098503&version=1.0.0" ]}} is registering from org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18..., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:778 CST] INFO listener.LoggingEventListener: [DUBBO] DefaultServiceInstance{id='45903282953719', serviceName='A', host='127.0.0.1', port=8080, enabled=true, healthy=true, metadata={dubbo.metadata-service.url-params={"dubbo":{"application":"dubbo-provider-demo","deprecated":"false","group":"dubbo-provider-demo","version":"1.0.0","timestamp":"1564845042651","dubbo":"2.0.2","provider.host":"192.168.0.102","provider.port":"20880"}}, dubbo.metadata-service.urls=[ "dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs&pid=17134&qos.enable=false&register=true&release=2.7.3&revision=1.0.0&side=provider&timestamp=1564826098503&version=1.0.0" ]}} has been unregistered from org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:778 CST] INFO listener.LoggingEventListener: [DUBBO] org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18 is stopping..., dubbo version: , current host: 192.168.1.110
[28/07/20 00:13:08:778 CST] INFO listener.LoggingEventListener: [DUBBO] org.apache.dubbo.registry.client.FileSystemServiceDiscovery@71d15f18 is stopped., dubbo version: , current host: 192.168.1.110

Dubbo Spring Cloud 的事件驱动机制

在Spring Cloud Alibaba中也有类似的事件驱动设计 区别于Dubbo的事件驱动基于语言级别的Event抽象,Spring Cloud Alibaba则是基于Spring ApplicationContext 框架级别的事件发布机制。
具体实现可自行阅读源码,在com.alibaba.cloud.dubbo.registry.event下有四个类:

  • ServiceInstancePreRegisteredEvent:服务预注册事件;
  • ServiceInstanceRegisteredEvent:服务注册完毕事件;
  • ServiceInstancesChangedEvent:服务实例变更事件;
  • SubscribedServicesChangedEvent:所订阅服务变更事件;

引用