canal有两种使用方式:1、独立部署 2、内嵌到应用中。 deployer模块主要用于独立部署canal server。关于这两种方式的区别,请参见server模块源码分析。deployer模块源码目录结构如下所示:
在独立部署canal时,需要首先对canal的源码进行打包
1 mvn clean install -Dmaven.test.skip -Denv=release
打包后会在target目录生成一个以下两个文件:
其中canal.deployer-1.0.24.tar.gz就是canal的独立部署包。解压缩后,目录如下所示。其中bin目录和conf目录(包括子目录spring)中的所有文件,都来自于deployer模块。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 canal ├── bin │ ├── startup.bat │ ├── startup.sh │ └── stop.sh ├── conf │ ├── canal.properties │ ├── example │ │ └── instance.properties │ ├── logback.xml │ └── spring │ ├── default-instance.xml │ ├── file-instance.xml │ ├── group-instance.xml │ ├── local -instance.xml │ └── memory-instance.xml ├── lib │ └── ....依赖的各种jar └── logs
deployer模块主要完成以下功能:
读取canal.properties配置文件
启动canal server,监听canal client的请求
启动canal instance,连接mysql数据库,伪装成slave,解析binlog
在canal的运行过程中,监听配置文件的变化
启动和停止脚本 bin目录中包含了canal的启动和停止脚本startup.sh和stop.sh,当我们要启动canal时,只需要输入以下命令即可
在windows环境下,可以直接双击startup.bat。
在startup.sh脚本内,会调用com.alibaba.otter.canal.deployer.CanalLauncher类来进行启动,这是分析Canal源码的入口类,如下图所示:
同时,startup.sh还会在bin目录下生成一个canal.pid
文件,用于存储canal的进程id。当停止canal的时候
CanalLauncher CanalLauncher
是整个源码分析的入口类,代码相当简单。步骤是:
读取canal.properties文件中的配置
利用读取的配置构造一个CanalController实例,将所有的启动操作都委派给CanalController进行处理。
最后注册一个钩子函数,在JVM停止时同时也停止canal server。
com.alibaba.otter.canal.deployer.CanalLauncher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class CanalLauncher { private static final String CLASSPATH_URL_PREFIX = "classpath:" ; private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class); public static void main (String[] args) throws Throwable { try { String conf = System.getProperty("canal.conf" , "classpath:canal.properties" ); Properties properties = new Properties(); if (conf.startsWith(CLASSPATH_URL_PREFIX)) { conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX); properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf)); } else { properties.load(new FileInputStream(conf)); } logger.info("## start the canal server." ); final CanalController controller = new CanalController(properties); controller.start(); logger.info("## the canal server is running now ......" ); Runtime.getRuntime().addShutdownHook(new Thread() { public void run () { try { logger.info("## stop the canal server" ); controller.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal Server:\n{}" , ExceptionUtils.getFullStackTrace(e)); } finally { logger.info("## canal server is down." ); } } }); } catch (Throwable e) { logger.error("## Something goes wrong when starting up the canal Server:\n{}" , ExceptionUtils.getFullStackTrace(e)); System.exit(0 ); } } }
可以看到,CanalLauncher实际上只是负责读取canal.properties配置文件,然后构造CanalController对象,并通过其start和stop方法来开启和停止canal。因此,如果说CanalLauncher是canal源码分析的入口类,那么CanalController就是canal源码分析的核心类。
CanalController 在CanalController的构造方法中,会对配置文件内容解析,初始化相关成员变量,做好canal server的启动前的准备工作,之后在CanalLauncher中调用CanalController.start方法来启动。
CanalController中定义的相关字段和构造方法,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public class CanalController { private static final Logger logger = LoggerFactory.getLogger(CanalController.class); private Long cid; private String ip; private int port; private Map<String, InstanceConfig> instanceConfigs; private InstanceConfig globalInstanceConfig; private Map<String, CanalConfigClient> managerClients; private boolean autoScan = true ; private InstanceAction defaultAction; private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors; private CanalServerWithEmbedded embededCanalServer; private CanalServerWithNetty canalServer; private CanalInstanceGenerator instanceGenerator; private ZkClientx zkclientx; public CanalController () { this (System.getProperties()); } public CanalController (final Properties properties) { managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() { public CanalConfigClient apply (String managerAddress) { return getManagerClient(managerAddress); } }); globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); initInstanceConfig(properties); cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator); canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); if (StringUtils.isEmpty(ip)) { ip = AddressUtils.getHostIp(); } final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true ); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true ); } final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors( autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { instanceConfigMonitors = } } .... }
为了读者能够尽量容易的看出CanalController的构造方法中都做了什么,上面代码片段中省略了部分代码。这样,我们可以很明显的看出来, ,在CanalController构造方法中的代码分划分为了固定的几个处理步骤,下面按照几个步骤的划分,逐一进行讲解,并详细的介绍CanalController中定义的各个字段的作用。
配置解析相关代码 1 2 3 4 5 globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); initInstanceConfig(properties);
globalInstanceConfig字段 表示canal instance的全局配置,类型为InstanceConfig,通过initGlobalConfig方法进行初始化。主要用于解析canal.properties以下几个配置项:
canal.instance.global.mode
:确定canal instance配置加载方式,取值有manager|spring两种方式
canal.instance.global.lazy
:确定canal instance是否延迟初始化
canal.instance.global.manager.address
:配置中心地址。如果canal.instance.global.mode=manager,需要提供此配置项
canal.instance.global.spring.xml
:spring配置文件路径。如果canal.instance.global.mode=spring,需要提供此配置项
initGlobalConfig源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private InstanceConfig initGlobalConfig (Properties properties) { InstanceConfig globalConfig = new InstanceConfig(); String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(modeStr)) { globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr))); } String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(lazyStr)) { globalConfig.setLazy(Boolean.valueOf(lazyStr)); } String managerAddress = getProperty(properties, CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(managerAddress)) { globalConfig.setManagerAddress(managerAddress); } String springXml = getProperty(properties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(springXml)) { globalConfig.setSpringXml(springXml); } instanceGenerator = return globalConfig; }
其中canal.instance.global.mode用于确定canal instance的全局配置加载方式,其取值范围有2个:spring、manager。我们知道一个canal server中可以启动多个canal instance,每个instance都有各自的配置。instance的配置也可以放在本地,也可以放在远程配置中心里。我们可以自定义每个canal instance配置文件存储的位置,如果所有canal instance的配置都在本地或者远程,此时我们就可以通过canal.instance.global.mode这个配置项,来统一的指定配置文件的位置,避免为每个canal instance单独指定。
其中:
spring方式:
表示所有的canal instance的配置文件位于本地。此时,我们必须提供配置项canal.instance.global.spring.xml指定spring配置文件的路径。canal提供了多个spring配置文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。这么多配置文件主要是为了支持canal instance不同的工作方式。我们在稍后将会讲解各个配置文件的区别。而在这些配置文件的开头,我们无一例外的可以看到以下配置:
1 2 3 4 5 6 7 8 9 10 <bean class ="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init ="false" > <property name ="ignoreResourceNotFound" value ="true" /> <property name ="systemPropertiesModeName" value ="SYSTEM_PROPERTIES_MODE_OVERRIDE" /> <property name ="locationNames" > <list > <value > classpath:canal.properties</value > <value > classpath:${canal.instance.destination:}/instance.properties</value > </list > </property > </bean >
这里我们可以看到,所谓通过spring方式加载canal instance配置,无非就是通过spring提供的PropertyPlaceholderConfigurer来加载canal instance的配置文件instance.properties。
这里instance.properties的文件完整路径是${canal.instance.destination:}/instance.properties,其中${canal.instance.destination}是一个变量。这是因为我们可以在一个canal server中配置多个canal instance,每个canal instance配置文件的名称都是instance.properties,因此我们需要通过目录进行区分。例如我们通过配置项canal.destinations指定多个canal instance的名字
1 canal.destinations= example1,example2
此时我们就要conf目录下,新建两个子目录example1和example2,每个目录下各自放置一个instance.properties。
canal在初始化时就会分别使用example1和example2来替换${canal.instance.destination:},从而分别根据example1/instance.properties和example2/instance.properties创建2个canal instance。
manager方式:
表示所有的canal instance的配置文件位于远程配置中心,此时我们必须提供配置项 canal.instance.global.manager.address来指定远程配置中心的地址。目前alibaba内部配置使用这种方式。开发者可以自己实现CanalConfigClient,连接各自的管理系统,完成接入。
instanceGenerator字段 类型为CanalInstanceGenerator
。在initGlobalConfig
方法中,除了创建了globalInstanceConfig
实例,同时还为字段instanceGenerator
字段进行了赋值。
顾名思义,这个字段用于创建CanalInstance
实例。这是instance模块中的类,其作用就是为canal.properties文件中canal.destinations配置项列出的每个destination,创建一个CanalInstance实例。CanalInstanceGenerator是一个接口,定义如下所示:
1 2 3 4 5 6 7 public interface CanalInstanceGenerator { CanalInstance generate (String destination) ; }
针对spring和manager两种instance配置的加载方式,CanalInstanceGenerator提供了两个对应的实现类,如下所示:
instanceGenerator字段通过一个匿名内部类进行初始化。其内部会判断配置的各个destination的配置加载方式,spring 或者manager。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 instanceGenerator = new CanalInstanceGenerator() { public CanalInstance generate (String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null ) { throw new CanalServerException("can't find destination:{}" ); } if (config.getMode().isManager()) { ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator(); instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress())); return instanceGenerator.generate(destination); } else if (config.getMode().isSpring()) { SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator(); synchronized (this ) { try { System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination); instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml())); return instanceGenerator.generate(destination); } catch (Throwable e) { logger.error("generator instance failed." , e); throw new CanalException(e); } finally { System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "" ); } } } else { throw new UnsupportedOperationException("unknow mode :" + config.getMode()); } } };
上述代码中的第1步比较变态,从instanceConfigs中根据destination作为参数,获得对应的InstanceConfig。而instanceConfigs目前还没有被初始化,这个字段是在稍后将要讲解的initInstanceConfig方法初始化的,不过由于这是一个引用类型,当initInstanceConfig方法被执行后,instanceConfigs字段中也就有值了。目前,我们姑且认为, instanceConfigs这个Map类型的字段已经被初始化好了。
2、3两步用于确定是instance的配置加载方式是spring还是manager,如果是spring,就使用SpringCanalInstanceGenerator创建CanalInstance实例,如果是manager,就使用ManagerCanalInstanceGenerator创建CanalInstance实例。
由于目前manager方式的源码并未开源,因此,我们只分析SpringCanalInstanceGenerator相关代码。
上述代码中,首先创建了一个SpringCanalInstanceGenerator实例,然后往里面设置了一个BeanFactory。
1 instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
其中config.getSpringXml()返回的就是我们在canal.properties中通过canal.instance.global.spring.xml配置项指定了spring配置文件路径。getBeanFactory方法源码如下所示:
1 2 3 4 private BeanFactory getBeanFactory (String springXml) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml); return applicationContext; }
往SpringCanalInstanceGenerator
设置了BeanFactory之后,就可以通过其的generate方法获得CanalInstance实例。
SpringCanalInstanceGenerator的源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class SpringCanalInstanceGenerator implements CanalInstanceGenerator , BeanFactoryAware { private String defaultName = "instance" ; private BeanFactory beanFactory; public CanalInstance generate (String destination) { String beanName = destination; if (!beanFactory.containsBean(beanName)) { beanName = defaultName; } return (CanalInstance) beanFactory.getBean(beanName); } public void setBeanFactory (BeanFactory beanFactory) throws BeansException { this .beanFactory = beanFactory; } }
首先尝试以传入的参数destination来获取CanalInstance实例,如果没有,就以默认的bean的id值”instance”来获取CanalInstance实例。事实上,如果你没有修改spring配置文件,那么默认的名字就是instance。事实上,在canal提供的各个spring配置文件xxx-instance.xml中,都有类似以下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <bean id ="instance" class ="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring" > <property name ="destination" value ="${canal.instance.destination}" /> <property name ="eventParser" > <ref local ="eventParser" /> </property > <property name ="eventSink" > <ref local ="eventSink" /> </property > <property name ="eventStore" > <ref local ="eventStore" /> </property > <property name ="metaManager" > <ref local ="metaManager" /> </property > <property name ="alarmHandler" > <ref local ="alarmHandler" /> </property > </bean >
上面的代码片段中,我们看到的确有一个bean的名字是instance,其类型是CanalInstanceWithSpring
,这是CanalInstance
接口的实现类。类似的,我们可以想到在manager配置方式下,获取的CanalInstance实现类是CanalInstanceWithManager
。事实上,你想的没错,CanalInstance
的类图继承关系如下所示:
需要注意的是,到目前为止,我们只是创建好了CanalInstanceGenerator,而CanalInstance尚未创建。在CanalController的start方法被调用时,CanalInstance才会被真正的创建,相关源码将在稍后分析。
instanceConfigs字段 类型为Map<String, InstanceConfig>
。前面提到初始化instanceGenerator后,当其generate方法被调用时,会尝试从instanceConfigs根据一个destination获取对应的InstanceConfig,现在分析instanceConfigs的相关初始化代码。
我们知道globalInstanceConfig
定义全局的配置加载方式。如果需要把部分CanalInstance
配置放于本地,另外一部分CanalIntance
配置放于远程配置中心,则只通过全局方式配置,无法达到这个要求。虽然这种情况很少见,但是为了提供最大的灵活性,canal支持每个CanalIntance
自己来定义自己的加载方式,来覆盖默认的全局配置加载方式。而每个destination对应的InstanceConfig
配置就存放于instanceConfigs字段中。
举例来说:
1 2 3 4 5 6 7 8 9 10 11 12 canal.destinations=instance1,instance2 canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.spring.xml = classpath:spring/file-instance.xml canal.instance.instance1.mode = manager canal.instance.instance1.manager.address = 127.0 .0.1:1099 canal.instance.instance1.lazy = tue
这段配置中,设置了instance的全局加载方式为spring,instance1覆盖了全局配置,使用manager方式加载配置。而instance2没有覆盖配置,因此默认使用spring加载方式。
instanceConfigs字段通过initInstanceConfig方法进行初始化
1 2 3 instanceConfigs = new MapMaker().makeMap(); initInstanceConfig(properties);
initInstanceConfig方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void initInstanceConfig (Properties properties) { String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS); String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT); for (String destination : destinations) { InstanceConfig config = parseInstanceConfig(properties, destination); InstanceConfig oldConfig = instanceConfigs.put(destination, config); if (oldConfig != null ) { logger.warn("destination:{} old config:{} has replace by new config:{}" , new Object[] { destination, oldConfig, config }); } } }
上面代码片段中,首先解析canal.destinations配置项,可以理解一个destination就对应要初始化一个canal instance。针对每个destination会创建各自的InstanceConfig,最终都会放到instanceConfigs这个Map中。
各个destination对应的InstanceConfig都是通过parseInstanceConfig方法来解析
1 2 3 4 5 6 private InstanceConfig parseInstanceConfig (Properties properties, String destination) { InstanceConfig config = new InstanceConfig(globalInstanceConfig); return config; }
此时我们可以看一下InstanceConfig类的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class InstanceConfig { private InstanceConfig globalConfig; private InstanceMode mode; private Boolean lazy; private String managerAddress; private String springXml; public InstanceConfig () { } public InstanceConfig (InstanceConfig globalConfig) { this .globalConfig = globalConfig; } public static enum InstanceMode { SPRING, MANAGER; public boolean isSpring () { return this == InstanceMode.SPRING; } public boolean isManager () { return this == InstanceMode.MANAGER; } } public Boolean getLazy () { if (lazy == null && globalConfig != null ) { return globalConfig.getLazy(); } else { return lazy; } } public void setLazy (Boolean lazy) { this .lazy = lazy; } public InstanceMode getMode () { if (mode == null && globalConfig != null ) { return globalConfig.getMode(); } else { return mode; } } public void setMode (InstanceMode mode) { this .mode = mode; } public String getManagerAddress () { if (managerAddress == null && globalConfig != null ) { return globalConfig.getManagerAddress(); } else { return managerAddress; } } public void setManagerAddress (String managerAddress) { this .managerAddress = managerAddress; } public String getSpringXml () { if (springXml == null && globalConfig != null ) { return globalConfig.getSpringXml(); } else { return springXml; } } public void setSpringXml (String springXml) { this .springXml = springXml; } public String toString () { return ToStringBuilder.reflectionToString(this , CanalToStringStyle.DEFAULT_STYLE); } }
可以看到,InstanceConfig类中维护了一个globalConfig字段,其类型也是InstanceConfig。而其相关get方法在执行时,会按照以下逻辑进行判断:如果没有自身没有这个配置,则返回全局配置,如果有,则返回自身的配置。通过这种方式实现对全局配置的覆盖。
准备canal server相关代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator); canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); ``` 上述代码中,首先解析了cid、ip、port字段,其中: - **cid:Long**,对应canal.properties文件中的canal.id,目前无实际用途 - **ip:String**,对应canal.properties文件中的canal.ip,canal server监听的ip。 - **port:int **,对应canal.properties文件中的canal.port,canal server监听的端口 之后分别为以下两个字段赋值: - **embededCanalServer:**类型为CanalServerWithEmbedded - **canalServer:**类型为CanalServerWithNetty `CanalServerWithEmbedded` 和 `CanalServerWithNetty`都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。 关于这两种类型的实现,canal官方文档有以下描述: {%asset_img 6 .png%} 说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库。如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。 在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。 因此,在上述代码中,我们看到,用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,而ip和port被设置到CanalServerWithNetty中。 关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。 ## 初始化zk相关代码 ```java final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true ); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true ); }
canal支持利用了zk来完成HA机制、以及将当前消费到到的mysql的binlog位置记录到zk中。ZkClientx是canal对ZkClient进行了一层简单的封装。
显然,当我们没有配置canal.zkServers,那么zkclientx不会被初始化。
关于Canal如何利用ZK做HA,我们将在稍后的代码中进行分。而利用zk记录binlog的消费进度,将在之后的章节进行分析。
CanalInstance运行状态监控相关代码 由于这段代码比较长且恶心,这里笔者暂时对部分代码进行省略,以便读者看清楚整各脉络
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() { public ServerRunningMonitor apply (final String destination) { ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); runningMonitor.setDestination(destination); runningMonitor.setListener(new ServerRunningListener() {....}); if (zkclientx != null ) { runningMonitor.setZkClient(zkclientx); } runningMonitor.init(); return runningMonitor; } }));
上述代码中,ServerRunningMonitors
是ServerRunningMonitor对象的容器,而ServerRunningMonitor用于监控CanalInstance。
canal会为每一个destination创建一个CanalInstance,每个CanalInstance都会由一个ServerRunningMonitor来进行监控。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。
除了CanalInstance需要监控,CanalServer本身也需要监控。因此我们在代码一开始,就看到往ServerRunningMonitors设置了一个ServerRunningData对象,封装了canal server监听的ip和端口等信息。
ServerRunningMonitors源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ServerRunningMonitors { private static ServerRunningData serverData; private static Map runningMonitors; public static ServerRunningData getServerData () { return serverData; } public static Map<String, ServerRunningMonitor> getRunningMonitors () { return runningMonitors; } public static ServerRunningMonitor getRunningMonitor (String destination) { return (ServerRunningMonitor) runningMonitors.get(destination); } public static void setServerData (ServerRunningData serverData) { ServerRunningMonitors.serverData = serverData; } public static void setRunningMonitors (Map runningMonitors) { ServerRunningMonitors.runningMonitors = runningMonitors; } }
ServerRunningMonitors的setRunningMonitors方法接收的参数是一个Map,其中Map的key是destination,value是ServerRunningMonitor,也就是说针对每一个destination都有一个ServerRunningMonitor来监控。
上述代码中,在往ServerRunningMonitors设置Map时,是通过MigrateMap.makeComputingMap方法来创建的,其接受一个Function类型的参数,这是guava中定义的接口,其声明了apply抽象方法。其工作原理可以通过下面代码片段进行介绍:
1 2 3 4 5 6 7 8 9 Map<String, User> map = MigrateMap.makeComputingMap(new Function<String, User>() { @Override public User apply (String name) { return new User(name); } }); User user = map.get("tianshouzhi" ); assert user != null ;assert user == map.get("tianshouzhi" );
这段代码中,我们利用MigrateMap.makeComputingMap创建了一个Map,其中key为String类型,value为User类型。当我们调用map.get(“tianshouzhi”)方法,最开始这个Map中并没有任何key/value的,于是其就会回调Function的apply方法,利用参数”tianshouzhi”创建一个User对象并返回。之后当我们再以”tianshouzhi”为key从Map中获取User对象时,会直接将前面创建的对象返回。不会回调apply方法,也就是说,只有在第一次尝试获取时,才会回调apply方法。
而在上述代码中,实际上就利用了这个特性,只不过是根据destination获取ServerRunningMonitor对象,如果不存在就创建。
在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener对象,接着,判断如果zkClientx字段如果不为空,也设置到ServerRunningMonitor中,最后调用init方法进行初始化。
1 2 3 4 5 6 7 8 9 ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); runningMonitor.setDestination(destination); runningMonitor.setListener(new ServerRunningListener(){...}) if (zkclientx != null ) {runningMonitor.setZkClient(zkclientx); } runningMonitor.init(); return runningMonitor;
ServerRunningListener的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 new ServerRunningListener() { public void processActiveEnter () { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); embededCanalServer.start(destination); } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } public void processActiveExit () { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); embededCanalServer.stop(destination); } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } public void processStart () { try { if (zkclientx != null ) { final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port); initCid(path); zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged (KeeperState state) throws Exception { } public void handleNewSession () throws Exception { initCid(path); } }); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } public void processStop () { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); if (zkclientx != null ) { final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port); releaseCid(path); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } }
上述代码中,我们可以看到启动一个CanalInstance实际上是在ServerRunningListener的processActiveEnter方法中,通过调用embededCanalServer的start(destination)方法进行的,对于停止也是类似。
那么ServerRunningListener中的相关方法到底是在哪里回调的呢?我们可以在ServerRunningMonitor的start和stop方法中找到答案,这里只列出start方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ServerRunningMonitor extends AbstractCanalLifeCycle { ... public void start () { super .start(); processStart(); if (zkClient != null ) { String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter(); } } }
当ServerRunningMonitor的start方法被调用时,其首先会直接调用processStart方法,这个方法内部直接调了ServerRunningListener的processStart()方法,源码如下所示。通过前面的分析,我们已经知道在存在zkClient!=null的情况,会往zk中创建一个节点。
1 2 3 4 5 6 7 8 9 private void processStart () { if (listener != null ) { try { listener.processStart(); } catch (Exception e) { logger.error("processStart failed" , e); } } }
之后会判断是否存在zkClient,如果不存在,则以本地方式启动,如果存在,则以HA方式启动。我们知道,canal server可以部署成两种方式:集群方式或者独立部署。其中集群方式是利用zk来做HA,独立部署则可以直接进行启动。我们先来看比较简单的直接启动。
直接启动:
不存在zk的情况下,会进入else代码块,调用processActiveEnter方法,其内部调用了listener的processActiveEnter,启动相应destination对应的CanalInstance。
1 2 3 4 5 6 7 8 9 private void processActiveEnter () { if (listener != null ) { try { listener.processActiveEnter(); } catch (Exception e) { logger.error("processActiveEnter failed" , e); } } }
HA方式启动:
存在zk,说明canal server可能做了集群,因为canal就是利用zk来做HA的。首先根据destination构造一个zk的节点路径,然后进行监听。
1 2 3 4 5 6 7 8 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener);
上述只是监听代码,之后尝试调用initRunning方法通过HA的方式来启动CanalInstance。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void initRunning () { if (!isStart()) { return ; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); byte [] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false ); zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; processActiveEnter(); mutex.set(true ); } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true ); if (bytes == null ) { initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true ); initRunning(); } }
可以看到,initRunning方法内部只有在尝试在zk中创建节点成功后,才会去调用listener的processActiveEnter方法来真正启动destination对应的canal instance,这是canal HA方式启动的核心。canal官方文档中介绍了CanalServer HA机制启动的流程,如下:
canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断(实现:创建EPHEMERAL节点,谁创建成功就运行谁启动)
创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就处于standby状态
一旦zookeeper发现canal server A 创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance
事实上,这个说明的前两步,都是在initRunning方法中实现的。从上面的代码中,我们可以看出,在HA机启动的情况下,initRunning方法不一定能走到processActiveEnter()方法,因为创建临时节点可能会出错。
此外,根据官方文档说明,如果出错,那么当前canal instance则进入standBy状态。也就是另外一个canal instance出现异常时,当前canal instance顶上去。那么相关源码在什么地方呢?在HA方式启动最开始的2行代码的监听逻辑中:
1 2 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener);
其中dataListener类型是IZkDataListener
,这是zkclient客户端提供的接口,定义如下:
1 2 3 4 public interface IZkDataListener { public void handleDataChange (String dataPath, Object data) throws Exception ; public void handleDataDeleted (String dataPath) throws Exception ; }
当zk节点中的数据发生变更时,会自动回调这两个方法,很明显,一个是用于处理节点数据发生变化,一个是用于处理节点数据被删除。
而dataListener是在ServerRunningMonitor的构造方法中初始化的,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public ServerRunningMonitor () { dataListener = new IZkDataListener() { public void handleDataChange (String dataPath, Object data) throws Exception { MDC.put("destination" , destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte []) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false ); } if (!runningData.isActive() && isMine(runningData.getAddress())) { release = true ; releaseRunning(); activeData = (ServerRunningData) runningData; } public void handleDataDeleted (String dataPath) throws Exception { MDC.put("destination" , destination); mutex.set(false ); if (!release && activeData != null && isMine(activeData.getAddress())) { initRunning(); } else { delayExector.schedule(new Runnable() { public void run () { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; }
那么现在问题来了?ServerRunningMonitor的start方法又是在哪里被调用的, 这个方法被调用了,才能真正的启动canal instance。这部分代码我们放到后面的CanalController中的start方法进行讲解。
下面分析最后一部分代码,autoScan机制相关代码。
autoScan机制相关代码 关于autoscan,官方文档有以下介绍:
结合autoscan机制的相关源码:
1 2 3 4 5 6 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { instanceConfigMonitors = }
可以看到,autoScan是否需要自动扫描的开关,只有当autoScan为true时,才会初始化defaultAction字段和instanceConfigMonitors字段。其中:
其中:
defaultAction 其作用是如果配置发生了变更,默认应该采取什么样的操作。其实现了InstanceAction接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动;当移除一个destination配置时,需要调用stop方法来停止;当某个destination配置发生变更时,需要调用reload方法来进行重启。
instanceConfigMonitors 类型为Map。defaultAction字段只是定义了配置发生变化默认应该采取的操作,那么总该有一个类来监听配置是否发生了变化,这就是InstanceConfigMonitor的作用。官方文档中,只提到了对canal.conf.dir配置项指定的目录的监听,这指的是通过spring方式加载配置。显然的,通过manager方式加载配置,配置中心的内容也是可能发生变化的,也需要进行监听。此时可以理解为什么instanceConfigMonitors的类型是一个Map,key为InstanceMode,就是为了对这两种方式的配置加载方式都进行监听。
defaultAction字段初始化源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 defaultAction = new InstanceAction() { public void start (String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null ) { config = parseInstanceConfig(properties, destination); instanceConfigs.put(destination, config); } if (!embededCanalServer.isStart(destination)) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } } public void stop (String destination) { InstanceConfig config = instanceConfigs.remove(destination); if (config != null ) { embededCanalServer.stop(destination); ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (runningMonitor.isStart()) { runningMonitor.stop(); } } } public void reload (String destination) { stop(destination); start(destination); } };
instanceConfigMonitors字段初始化源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() { public InstanceConfigMonitor apply (InstanceMode mode) { int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL)); if (mode.isSpring()) { SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); if (StringUtils.isEmpty(rootDir)) { rootDir = "../conf" ; } if (StringUtils.equals("otter-canal" , System.getProperty("appName" ))) { monitor.setRootConf(rootDir); } else { monitor.setRootConf("src/main/resources/" ); } return monitor; } else if (mode.isManager()) { return new ManagerInstanceConfigMonitor(); } else { throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor" ); } } });
可以看到instanceConfigMonitors也是根据mode属性,来采取不同的监控实现类SpringInstanceConfigMonitor 或者ManagerInstanceConfigMonitor,二者都实现了InstanceConfigMonitor接口。
1 2 3 4 public interface InstanceConfigMonitor extends CanalLifeCycle { void register (String destination, InstanceAction action) ; void unregister (String destination) ; }
当需要对一个destination进行监听时,调用register方法
当取消对一个destination监听时,调用unregister方法。
事实上,unregister方法在canal 内部并没有有任何地方被调用,也就是说,某个destination如果开启了autoScan=true,那么你是无法在运行时停止对其进行监控的。如果要停止,你可以选择将对应的目录删除。
InstanceConfigMonitor本身并不知道哪些canal instance需要进行监控,因为不同的canal instance,有的可能设置autoScan为true,另外一些可能设置为false。
在CanalConroller的start方法中,对于autoScan为true的destination,会调用InstanceConfigMonitor的register方法进行注册,此时InstanceConfigMonitor才会真正的对这个destination配置进行扫描监听。对于那些autoScan为false的destination,则不会进行监听。
目前SpringInstanceConfigMonitor对这两个方法都进行了实现,而ManagerInstanceConfigMonitor目前对这两个方法实现的都是空,需要开发者自己来实现。
在实现ManagerInstanceConfigMonitor时,可以参考SpringInstanceConfigMonitor。
此处不打算再继续进行分析SpringInstanceConfigMonitor的源码,因为逻辑很简单,感兴趣的读者可以自行查看SpringInstanceConfigMonitor 的scan方法,内部在什么情况下会回调defualtAction的start、stop、reload方法 。
CanalController的start方法 而ServerRunningMonitor的start方法,是在CanalController中的start方法中被调用的,CanalController中的start方法是在CanalLauncher中被调用的。
com.alibaba.otter.canal.deployer.CanalController#start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public void start () throws Throwable { logger.info("## start the canal server[{}:{}]" , ip, port); final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port); initCid(path); if (zkclientx != null ) { this .zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged (KeeperState state) throws Exception { } public void handleNewSession () throws Exception { initCid(path); } }); } embededCanalServer.start(); for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); if (!embededCanalServer.isStart(destination)) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } if (autoScan) { instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } } canalServer.start(); }
总结 deployer模块的主要作用:
读取canal.properties,确定canal instance的配置加载方式
确定canal instance的启动方式:独立启动或者集群方式启动
监听canal instance的配置的变化,动态停止、启动或新增
启动canal server,监听客户端请求