canal的流程描述中提及了两个monitor:InstanceConfigMonitor
、ServerRunningMonitor
。本文重点对这两个monitor进行分析。
InstanceConfigMonitor
从字面意思就可以得出,InstanceConfigMonitor
是对canal实例配置的监控。实际上也确实如此,它的功能就是监控canal实例的配置文件——instance.properties,如果其中有增加、修改、删除,执行实例的启动、重启、停止操作。下面我们来分析InstanceConfigMonitor
的代码。
InstanceConfigMonitor的创建
InstanceConfigMonitor
在CanalController
构造函数中被创建:
首先获取canal.auto.scan配置,该配置控制是否监听canal实例配置的变化。如果该配置为false,则不需要创建InstanceConfigMonitor
。默认该配置为true,此时需要创建InstanceConfigMonitor
。
在创建InstanceConfigMonitor之前先创建一个defaultAction(实际类为InstanceAction
),InstanceAction
是一个接口,其中定义了三个函数,分别对应启动实例、定制实例、重启实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public interface InstanceAction {
void start(String destination);
void stop(String destination);
void reload(String destination); }
|
InstanceAction
的具体操作我们在后面再分析。
接着创建一个instanceConfigMonitors
,它是一个Map,当获取InstanceConfigMonitor
是发现Map中不存在时(从调用流程上看,InstanceConfigMonitor
其实是在其将要启动时获取的),调用apply方法创建一个InstanceConfigMonitor
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| int scanInterval = Integer .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
if (mode.isSpring()) { SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); if (StringUtils.isEmpty(rootDir)) { rootDir = "../conf"; }
if (StringUtils.equals("otter-canal", System.getProperty("appName"))) { monitor.setRootConf(rootDir); } else { monitor.setRootConf("src/main/resources/"); } return monitor; } else if (mode.isManager()) { return new ManagerInstanceConfigMonitor(); } else { throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor"); }
|
创建流程如下:
- 获取配置
canal.auto.scan.interval
,它指定了配置监听扫描的间隔时间,默认为5秒。
- 判断mode是否为
SPRING
。mode在配置canal.instance.global.mode
中指定,可以正对各个destination配置不同的mode。全局mode默认为spring
- 新建
SpringInstanceConfigMonitor
- 设置scanInterval
- 设置defaultAction。defaultAction为刚刚创建的
InstanceAction
。
- 获取并设置配置canal.conf.dir,这是监控进行的根目录。
InstanceConfigMonitor的启动
InstanceConfigMonitor
的启动位于CanalController
的start()方法中。
在启动之前,还有一步遍历destination并注册InstanceAction。
1 2 3 4 5 6 7 8 9 10 11 12
| if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); }
public void register(String destination, InstanceAction action) { if (action != null) { actions.put(destination, action); } else { actions.put(destination, defaultAction); } }
|
其中执行instanceConfigMonitors.get(config.getMode())时发现instanceConfigMonitors不存在对应的InstanceConfigMonitor,先创建InstanceConfigMonitor。
接着调用start()方法启动。对于SpringInstanceConfigMonitor来说,start()方法中启动了一个定时任务,每隔scanIntervalInSecond时间(默认5秒),执行其中的scan()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| private void scan() { File rootdir = new File(rootConf); if (!rootdir.exists()) { return; } File[] instanceDirs = rootdir.listFiles(new FileFilter() {
public boolean accept(File pathname) { String filename = pathname.getName(); return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename); } });
Set<String> currentInstanceNames = new HashSet<String>();
for (File instanceDir : instanceDirs) { String destination = instanceDir.getName(); currentInstanceNames.add(destination); File[] instanceConfigs = instanceDir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) { return StringUtils.equalsIgnoreCase(name, "instance.properties"); }
});
if (!actions.containsKey(destination) && instanceConfigs.length > 0) { notifyStart(instanceDir, destination, instanceConfigs); } else if (actions.containsKey(destination)) { if (instanceConfigs.length == 0) { notifyStop(destination); } else { InstanceConfigFiles lastFile = lastFiles.get(destination); if (!isFirst && CollectionUtils.isEmpty(lastFile.getInstanceFiles())) { logger.error("[{}] is started, but not found instance file info.", destination); }
boolean hasChanged = judgeFileChanged(instanceConfigs, lastFile.getInstanceFiles()); if (hasChanged) { notifyReload(destination); }
if (hasChanged || CollectionUtils.isEmpty(lastFile.getInstanceFiles())) { List<FileInfo> newFileInfo = new ArrayList<FileInfo>(); for (File instanceConfig : instanceConfigs) { newFileInfo.add(new FileInfo(instanceConfig.getName(), instanceConfig.lastModified())); }
lastFile.setInstanceFiles(newFileInfo); } } }
}
Set<String> deleteInstanceNames = new HashSet<String>(); for (String destination : actions.keySet()) { if (!currentInstanceNames.contains(destination)) { deleteInstanceNames.add(destination); } } for (String deleteInstanceName : deleteInstanceNames) { notifyStop(deleteInstanceName); } }
|
scan()方法的作用就扫描instance的配置文件,如果有增加、修改、删除,则分别调用notifyStart、notifyReload、notifyStop操作。三个操作的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private void notifyStart(File instanceDir, String destination, File[] instanceConfigs) { try { defaultAction.start(destination); actions.put(destination, defaultAction);
InstanceConfigFiles lastFile = lastFiles.get(destination); List<FileInfo> newFileInfo = new ArrayList<FileInfo>(); for (File instanceConfig : instanceConfigs) { newFileInfo.add(new FileInfo(instanceConfig.getName(), instanceConfig.lastModified())); } lastFile.setInstanceFiles(newFileInfo);
logger.info("auto notify start {} successful.", destination); } catch (Throwable e) { logger.error(String.format("scan add found[%s] but start failed", destination), e); } }
private void notifyReload(String destination) { InstanceAction action = actions.get(destination); if (action != null) { try { action.reload(destination); logger.info("auto notify reload {} successful.", destination); } catch (Throwable e) { logger.error(String.format("scan reload found[%s] but reload failed", destination), e); } } }
private void notifyStop(String destination) { InstanceAction action = actions.remove(destination); try { action.stop(destination); lastFiles.remove(destination); logger.info("auto notify stop {} successful.", destination); } catch (Throwable e) { logger.error(String.format("scan delete found[%s] but stop failed", destination), e); actions.put(destination, action); } }
|
可以看到notifyStart、notifyReload、notifyStop三个操作主要就是调用defaultAction中定义的Instance操作对Instance执行启动、重启、停止操作。
下面来看defaultAction中定义的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| defaultAction = new InstanceAction() { public void start(String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null) { config = parseInstanceConfig(properties, destination); instanceConfigs.put(destination, config); }
if (!embededCanalServer.isStart(destination)) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); if (canalMQStarter != null) { canalMQStarter.startDestination(destination); } } } }
public void stop(String destination) { InstanceConfig config = instanceConfigs.remove(destination); if (config != null) { if (canalMQStarter != null) { canalMQStarter.stopDestination(destination); } embededCanalServer.stop(destination); ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (runningMonitor.isStart()) { runningMonitor.stop(); } } }
public void reload(String destination) { stop(destination); start(destination); } };
|
可以看到,defaultAction中定义了三个操作,其中reload操作就是简单地停止并启动instance,stop操作删除instance的配置然后调用ServerRunningMonitor的stop()方法停止instance,start操作读取instance配置并调用调用ServerRunningMonitor的start()方法启动instance。
ServerRunningMonitor
ServerRunningMonitor是对canal实例进行控制并监控的类。
ServerRunningMonitor在ServerRunningMonitors中维护,ServerRunningMonitors中的runningMonitors变量保存各个destination对应的ServerRunningMonitor。
控制canal的实例的启停
ServerRunningMonitor第一个功能就是控制canal的实例的启停,由start()、stop()两个方法负责:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public synchronized void start() { super.start(); try { processStart(); if (zkClient != null) { String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener);
initRunning(); } else { processActiveEnter(); } } catch (Exception e) { logger.error("start failed", e); stop(); }
}
public synchronized void stop() { super.stop();
if (zkClient != null) { String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.unsubscribeDataChanges(path, dataListener);
releaseRunning(); } else { processActiveExit(); } processStop(); }
|
先介绍其中涉及到四个process**回调方法:processStart、processStop、processActiveEnter、processActiveExit。这个四个方法执行的功能非常简单,就是调用ServerRunningListener中相应的方法。ServerRunningListener是一个接口,其中定义了四个instance状态发生改变时回调的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public interface ServerRunningListener {
public void processStart();
public void processStop();
public void processActiveEnter();
public void processActiveExit(); }
|
canal在ServerRunningMonitor的新建过程中新建并设置了ServerRunningListener的实现,下面分别介绍这个四个方法的实现:
- processStart:如果使用了zookeeper,在zookeeper中新建cluster的目录(目录名:/otter/canal/destinations/{destination}/cluster/{ip}:{port}),监控zookeeper状态改变
- processStop:如果使用了zookeeper,删除zookeeper中的cluster目录
- processActiveEnter:调用CanalServerWithEmbedded的start方法启动destination对应的实例
- processActiveExit:调用CanalServerWithEmbedded的stop方法停止destination对应的实例
现在回到ServerRunningMonitor的start和stop方法:
- start():
- 调用processStart方法,作用如上文所示
- 如果使用了zookeeper,监听zookeeper中的/otter/canal/destinations/{destination}/running节点。调用initRunning方法,运行canal实例
- 如果没有使用zookeeper,直接调用processActiveEnter方法启动实例
- stop():
- 如果使用了zookeeper,取消监听zookeeper中的/otter/canal/destinations/{destination}/running节点。调用releaseRunning方法停止canal实例
- 如果没有使用zookeeper,直接调用processActiveExit方法停止实例
initRunning方法如下所示:
它的作用是在zookeeper的/otter/canal/destinations/{destination}/running节点中写入当前运行的canal服务器信息,并调用processActiveEnter方法启动实例。
如果该节点已经存在,则重新获取数据并将该数据作为正在运行的canal信息保存在activeData中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private void initRunning() { if (!isStart()) { return; }
String path = ZookeeperPathUtils.getDestinationServerRunning(destination); byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; processActiveEnter(); mutex.set(true); } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) { initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); initRunning(); } }
|
releaseRunning方法如下所示。它的作用是删除zookeeper的/otter/canal/destinations/{destination}/running节点,并调用processActiveExit方法停止实例。
1 2 3 4 5 6 7 8 9 10 11
| private boolean releaseRunning() { if (check()) { String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.delete(path); mutex.set(false); processActiveExit(); return true; }
return false; }
|
监听zookeeper,如果zookeeper中的数据发生改变采取措施
ServerRunningMonitor第二个功能就是监听zookeeper,如果zookeeper中的数据发生改变采取措施。
前面我们看到ServerRunningMonitor在执行start()方法时会监听zookeeper中的/otter/canal/destinations/{destination}/running节点,其响应方法在dataListener中定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); }
if (!runningData.isActive() && isMine(runningData.getAddress())) { release = true; releaseRunning(); }
activeData = (ServerRunningData) runningData; }
public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { initRunning(); } else { delayExector.schedule(new Runnable() {
public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } }
};
|
当/otter/canal/destinations/{destination}/running节点的数据发生修改会调用handleDataChange方法:
- 获取该目录修改之后的数据
- 如果数据显示正在运行的canal的地址和本机不同,说明已经有其他机器上的canal正在运行
- 如果数据显示正在运行的canal就是本机,并且状态不是active,说明本机出现了主动释放的操作,此时调用releaseRunning()方法删除zookeeper的/otter/canal/destinations/{destination}/running节点,并调用processActiveExit方法停止实例
- 将正在运行的canal信息保存在activeData中
当/otter/canal/destinations/{destination}/running节点的数据被删除会调用handleDataDeleted方法:
- 如果上次运行canal的就是本机,则执行initRunning重新运行本机的canal
- 否则等待delayTime(5秒)时间后再执行initRunning重新运行本机的canal
总结
本文分析了canal的两个monitor机制:InstanceConfigMonitor、ServerRunningMonitor。通过这两个监控机制,canal实现了实例配置的修改与生效,以及实例在不同机器上的高可用。