TC 集群Seata1.6高可用架构源码解析

这篇文章主要为大家介绍了TC 集群Seata1.6高可用架构源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

一、背景

TC 集群具有高可用架构,应用到集群是这样一个间接的关系:应用 -》事务分组 -》TC 集群,应用启动后所指定的事务分组不能变,可通过配置中心变更事务分组所属的 TC 集群,Seata 客户端监听到这个变更后,会切换到新的 TC 集群。

本篇从源码梳理这个高可用能力是如何实现的。

二、环境配置

客户端配置使用nacos配置中心和nacos注册中心,

seata: enabled: true # Seata 应用编号 application-id: seataclistock # Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应 tx-service-group: tx_group_stock # 关闭自动代理 enable-auto-data-source-proxy: false config: # support: nacos, consul, apollo, zk, etcd3 type: nacos nacos: serverAddr: namespace: seata # 需要与服务端添加的配置文件相同 group: SEATA_GROUP_ROCKTEST username: seata password: seata data-id: seataClient.tx_group_busin.properties registry: # support: nacos, eureka, redis, zk, consul, etcd3, sofa type: nacos nacos: application: seata-server serverAddr: namespace: seata  # 需要与服务端添加的配置文件相同 group: SEATA_GROUP_ROCKTEST   # 需要与服务端添加的配置文件相同 username: seata password: seata 

三、从配置中心获取TC集群

服务注册的能力要依赖配置中心,从nacos的配置中心获取配置NacosConfiguration#initSeataConfig

Data Id:seataClient.tx_group_stock.properties

Group:SEATA_GROUP_LWKTEST

其中的service.vgroupMapping.tx_group_stock的值是dev_cluster_1,接下来注册能力就要使用这个集群来工作。

private static void initSeataConfig() { try { String nacosDataId = getNacosDataId(); String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT); if (StringUtils.isNotBlank(config)) { seataConfig = ConfigProcessor.processConfig(config, getNacosDataType()); NacosListener nacosListener = new NacosListener(nacosDataId, null); configService.addListener(nacosDataId, getNacosGroup(), nacosListener); } } catch (NacosException | IOException e) { LOGGER.error("init config properties error", e); } } 

RegistryFactory#getInstance()这是个单例机制,所以源码梳理起来很简单,下边获取TC服务的时候会调用此单例方法做初始化。

  • 读取配置文件中registry.type,
  • 配置的值是nacos,所以读出的值是nacos
  • 通过SPI加载并实例化 NacosRegistryProvider
public class RegistryFactory { /** * Gets instance. * * @return the instance */ public static RegistryService getInstance() { return RegistryFactoryHolder.INSTANCE; } private static RegistryService buildRegistryService() { RegistryType registryType; //registryTypeName = "registry.type" String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig( ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR + ConfigurationKeys.FILE_ROOT_TYPE); try { // nacos registryType = RegistryType.getType(registryTypeName); } catch (Exception exx) { throw new NotSupportYetException("not support registry type: " + registryTypeName); } // 通过SPI 加载并实例化 NacosRegistryProvider return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide(); } private static class RegistryFactoryHolder { private static final RegistryService INSTANCE = buildRegistryService(); } } 

TM、RM 客户端需要与TC通信,所以在其初始化时必然会有获取TC集群的逻辑,对应在源码TmNettyRemotingClient#init 中的reconnect方法。

@Override public void init() { // registry processor registerProcessor(); if (initialized.compareAndSet(false, true)) { //父类中会开启定时任务来执行 getClientChannelManager().reconnect(transactionServiceGroup) super.init(); if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) { getClientChannelManager().reconnect(transactionServiceGroup); } } } 

reconnect中的.NettyClientChannelManager#getAvailServerList 是根据seata.tx-service-group的值来检索TC集群信息。直接提供出来调用堆栈,方便大家快速熟悉调用链路:

getServiceGroup:111, RegistryService (io.seata.discovery.registry) lookup:145, NacosRegistryServiceImpl (io.seata.discovery.registry.nacos) getAvailServerList:257, NettyClientChannelManager (io.seata.core.rpc.netty) reconnect:171, NettyClientChannelManager (io.seata.core.rpc.netty) init:198, TmNettyRemotingClient (io.seata.core.rpc.netty) init:47, TMClient (io.seata.tm) initClient:220, GlobalTransactionScanner (io.seata.spring.annotation) afterPropertiesSet:512, GlobalTransactionScanner (io.seata.spring.annotation) 

这里便是通过Seata客户端 seata.tx-service-group的值,找到最终TC集群的关键之处。在getServiceGroup中从nacos中获取service.vgroupMapping.tx_group_stock的值,即dev_cluster_1

default String getServiceGroup(String key) { //key = service.vgroupMapping.tx_group_stock key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; if (!SERVICE_GROUP_NAME.contains(key)) { ConfigurationCache.addConfigListener(key); SERVICE_GROUP_NAME.add(key); } return ConfigurationFactory.getInstance().getConfig(key); } 

然后NettyClientChannelManager#reconnect中获取 TC 集群中的所有 TC 服务节点,对每个TC 服务节点建连。

for (String serverAddress : availList) { try { acquireChannel(serverAddress); channelAddress.add(serverAddress); } catch (Exception e) { LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e); } } 

再梳理一下,梳理这么多的关键就是通过tx_group_stock 找到 TC 集群 dev_cluster_1

客户端:

seata: # 默认关闭,如需启用spring.datasource.dynami.seata需要同时开启 enabled: true # Seata 事务组编号,用于 TC 集群名。该配置需要与服务端提到的group相对应,也需要与下面的相对应 tx-service-group: tx_group_stock 

nacos:

Data ID: seataClient.tx_group_stock.properties Group: SEATA_GROUP_ROCKTEST 配置内容: ... service.vgroupMapping.tx_group_stock=dev_cluster_1 ... 

TC服务端:

registry: # support: nacos 、 eureka 、 redis 、 zk  、 consul 、 etcd3 、 sofa type: nacos preferred-networks: 30.240.* nacos: application: seata-server cluster: dev_cluster_1 

RegistryService#getServiceGroup中从nacos获取值的时候,有个细节需要注意:通过namespace + Group + Key(Data Id) 三维来唯一标示一个Key。

四、刷新TC集群

AbstractNettyRemotingClient#init中默认会每隔10s进行一次 TC 服务清单刷新与重连

timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup()); } }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

以上就是TC 集群Seata1.6高可用架构源码解析的详细内容,更多关于TC 集群Seata高可用架构的资料请关注0133技术站其它相关文章!

以上就是TC 集群Seata1.6高可用架构源码解析的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » Java