WebSocket数据同步源码分析
在ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当 前支持ZooKeeper、WebSocket、Http长轮询、Nacos 、etcd 和 Consul 进行数据同步。本文的主要内容是基于WebSocket的数据同步源码分析。
本文基于
shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理 。
1. 关于WebSocket通信
WebSocket协议诞生于2008年,在2011年成为国际标准。它可以双向通信,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息。WebSocket协议建立在 TCP 协议之上,属于应用层,性能开销小,通信高效,协议标识符是ws。
2. Admin数据同步
我们从一个实际案例进行源码追踪,比如在后台管理系统中,新增一条选择器数据:

2.1 接收数据
- SelectorController.createSelector()
进入SelectorController类中的createSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {
@PostMapping("")
public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验
// 添加或更新数据
Integer createCount = selectorService.createOrUpdate(selectorDTO);
// 返回结果信息
return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);
}
// ......
}
2.2 处理数据
- SelectorServiceImpl.createOrUpdate()
在SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream。
@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {
// 负责事件发布的eventPublisher
private final ApplicationEventPublisher eventPublisher;
@Override
@Transactional(rollbackFor = Exception.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
// 构建数据 DTO --> DO
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
// 判断是添加还是更新
if (StringUtils.isEmpty(selectorDTO.getId())) {
// 插入选择器数据
selectorCount = selectorMapper.insertSelective(selectorDO);
// 插入选择器中的条件数据
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
// check selector add
// 权限检查
if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {
DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();
dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());
dataPermissionDTO.setDataId(selectorDO.getId());
dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);
dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));
}
} else {
// 更新数据,先删除再新增
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
// 发布事件
publishEvent(selectorDO, selectorConditionDTOs);
// 更新upstream
updateDivideUpstream(selectorDO);
return selectorCount;
}
// ......
}
在Service类完成数据的持久化操作,即保存数据到数据库,这个大家应该很熟悉了,就不展开。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会进行数据同步。
publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
// 找到选择器对应的插件
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
// 构建条件数据
List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// 发布变更数据
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}
发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。
关于
ApplicationEventPublisher:当有状态发生变化时,发布者调用
ApplicationEventPublisher的publishEvent方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的onApplicationEvent方法把事件对象传递给观察者。调用publishEvent方法有两种途径,一种是实现接口由容器注入ApplicationEventPublisher对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。
ApplicationEventPublisher:发布事件;ApplicationEvent:Spring事件,记录事件源、时间和数据;ApplicationListener:事件监听者,观察者。
在Spring的事件发布机制中,有三个对象,
一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher。
另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。
public class DataChangedEvent extends ApplicationEvent {
//......
}
最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
//......
}
2.3 分发数据
- DataChangedEventDispatcher.onApplicationEvent()
当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件信息
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则信息
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default: // 其他类型,抛出异常
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}
当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。
ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。
这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:
WebsocketDataChangedListener:基于websocket的数据同步;ZookeeperDataChangedListener:基于zookeeper的数据同步;ConsulDataChangedListener:基于consul的数据同步;EtcdDataDataChangedListener:基于etcd的数据同步;HttpLongPollingDataChangedListener:基于http长轮询的数据同步;NacosDataChangedListener:基于nacos的数据同步;
既然有这么多种实现策略,那么如何确定使用哪一种呢?
因为本文是基于websocket的数据同步源码分析,所以这里以WebsocketDataChangedListener为例,分析它是如何被加载并实现的。
通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。
/**
* 数据同步配置类
* 通过springboot条件装配实现
* The type Data sync configuration.
*/
@Configuration
public class DataSyncConfiguration {
/**
* websocket数据同步(默认策略)
* The WebsocketListener(default strategy).
*/
@Configuration
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
/**
* Config event listener data changed listener.
* 配置websocket数据变更监听器
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}
/**
* Websocket collector.
* Websocket处理类:建立连接,发送消息,关闭连接等操作
* @return the websocket collector
*/
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
/**
* Server endpoint exporter
*
* @return the server endpoint exporter
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
//......
}
这个配置类是通过SpringBoot条件装配类实现的。在WebsocketListener类上面有几个注解:
-
@Configuration:配置文件,应用上下文; -
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用websocket进行数据同步。不过,这里需要注意下matchIfMissing = true这个属性,它表示,如果你没有如下的配置,该配置类也会生效。基于websocket的数据同步时官方推荐的方式,也是默认采用的方式。shenyu:
sync:
websocket:
enabled: true -
@EnableConfigurationProperties:启用配置属性;
当我们主动配置,采用websocket进 行数据同步时,WebsocketDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是新增加了一条选择器数据,数据通过采用的是websocket,所以,代码会进入到WebsocketDataChangedListener进行选择器数据变更处理。
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)
for (DataChangedListener listener : listeners) {
// 哪种数据发生变更
switch (event.getGroupKey()) {
// 省略了其他逻辑
case SELECTOR: // 选择器信息
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); // WebsocketDataChangedListener进行选择器数据变更处理
break;
}
}
2.4 Websocket数据变更监听器
-
WebsocketDataChangedListener.onSelectorChanged()
在
onSelectorChanged()方法中,将数据进行了封装,转成WebsocketData,然后通过WebsocketCollector.send()发送数据。
// 选择器数据有更新
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
// 构造 WebsocketData 数据
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
// 通过websocket发送数据
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
2.5 Websocket发送数据
- WebsocketCollector.send()
在send()方法中,判断了一下同步的类型,根据不同的类型,进行处理。
@Slf4j
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
// 如果是MYSELF(第一 次的全量同步)
if (DataEventTypeEnum.MYSELF == type) {
// 从threadlocal中获取session
Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
if (session != null) {
// 向该session发送全量数据
sendMessageBySession(session, message);
}
} else {
// 后续的增量同步
// 向所有的session中同步变更数据
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
}
}
}
private static void sendMessageBySession(final Session session, final String message) {
try {
// 通过websocket的session把消息发送出去
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
我们给的案例是一个新增操作 ,是一个增量同步,所以会走
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
这个逻辑。
再通过
session.getBasicRemote().sendText(message);
将数据发送了出去。
至此,当admin端发生数据变更时,就将变更的数据以增量形式通过WebSocket发给了网关。
分析到这里,不知道大家有没有疑问呢?比如session是怎么来的?网关如何和admin建立连接的?
不要着急,我们接下来就进行网关端的同步分析。
不过,在继续源码分析前,我们用一张图将上面的分析过程串联起来。
