server模块的核心接口是CanalServer,其有2个实现类CanalServerWithNetty、CanalServerWithEmbeded。关于CanalServer,官方文档中有有以下描述:
下图是笔者对官方文档的进一步描述:
左边的图
表示的是Canal独立部署。不同的应用通过canal client与canal server进行通信,所有的canal client的请求统一由CanalServerWithNetty接受,之后CanalServerWithNetty会将客户端请求派给CanalServerWithEmbeded 进行真正的处理。CannalServerWithEmbeded内部维护了多个canal instance,每个canal instance伪装成不同的mysql实例的slave,而CanalServerWithEmbeded会根据客户端请求携带的destination参数确定要由哪一个canal instance为其提供服务。
右边的图
是直接在应用中嵌入CanalServerWithEmbeded,不需要独立部署canal。很明显,网络通信环节少了,同步binlog信息的效率肯定更高。但是对于使用者的技术要求比较高。在应用中,我们可以通过CanalServerWithEmbeded.instance()方法来获得CanalServerWithEmbeded实例,这一个单例。
整个server模块源码目录结构如下所示:
其中上面的红色框就是嵌入式实现,而下面的绿色框是基于Netty的实现。
看起来基于netty的实现代码虽然多一点,这其实只是幻觉,CanalServerWithNetty会将所有的请求委派给CanalServerWithEmbedded处理。
而内嵌的方式只有CanalServerWithEmbedded一个类, 是因为CanalServerWithEmbedded又要根据destination选择某个具体的CanalInstance来处理客户端请求,而CanalInstance的实现位于instance模块,我们将在之后分析。因此从canal server的角度来说,CanalServerWithEmbedded才是server模块真正的核心。
CanalServerWithNetty和CanalServerWithEmbedded都是单例的,提供了一个静态方法instance()获取对应的实例。回顾前一节分析CanalController源码时,在CanalController构造方法中准备CanalServer的相关代码,就是通过这两个静态方法获取对应的实例的。
1 2 3 4 5 6 7 8 9 10 11 12
| public CanalController(final Properties properties){ .... ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator); canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); .... }
|
CanalServer接口
CanalServer接口继承了CanalLifeCycle接口,主要是为了重新定义start和stop方法,抛出CanalServerException。
1 2 3 4 5 6
| public interface CanalServer extends CanalLifeCycle { void start() throws CanalServerException; void stop() throws CanalServerException; }
|
CanalServerWithNetty
CanalServerWithNetty主要用于接受客户端的请求,然后将其委派给CanalServerWithEmbeded处理。下面的源码显示了CanalServerWithNetty种定义的字段和构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer { private CanalServerWithEmbedded embeddedServer; private String ip; private int port; private Channel serverChannel = null; private ServerBootstrap bootstrap = null; private CanalServerWithNetty(){ this.embeddedServer = CanalServerWithEmbedded.instance(); } }
|
字段说明:
- embeddedServer:因为CanalServerWithNetty需要将请求委派给CanalServerWithEmbeded处理,因此其维护了embeddedServer对象。
- ip、port:这是netty监听的网络ip和端口,client通过这个ip和端口与server通信
- serverChannel、bootstrap:这是netty的API。其中ServerBootstrap用于启动服务端,通过调用其bind方法,返回一个类型为Channel的serverChannel对象,代表服务端通道。
start方法
start方法中包含了netty启动的核心逻辑,如下所示:
com.alibaba.otter.canal.server.netty.CanalServerWithNetty#start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public void start() { super.start(); if (!embeddedServer.isStart()) { embeddedServer.start(); }
this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipelines = Channels.pipeline(); pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder()); pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler()); pipelines.addLast(ClientAuthenticationHandler.class.getName(), new ClientAuthenticationHandler(embeddedServer)); SessionHandler sessionHandler = new SessionHandler(embeddedServer); pipelines.addLast(SessionHandler.class.getName(), sessionHandler); return pipelines; } }); if (StringUtils.isNotEmpty(ip)) { this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port)); } else { this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port)); } }
|
关于stop方法无非是一些关闭操作,代码很简单,这里不做介绍。
SessionHandler
很明显的,canal处理client请求的核心逻辑都在SessionHandler这个处理器中。注意其在实例化时,传入了embeddedServer对象,前面我们提过,CanalServerWithNetty要将请求委派给CanalServerWithEmbedded处理,显然SessionHandler也要维护embeddedServer实例。
这里我们主要分析SessionHandler的 messageReceived方法,这个方法表示接受到了一个客户端请求,我们主要看的是SessionHandler如何对客户端请求进行解析,然后委派给CanalServerWithEmbedded处理的。为了体现其转发请求处理的核心逻辑,以下代码省去了大量源码片段,如下
SessionHandler#messageReceived
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class SessionHandler extends SimpleChannelHandler { ....
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { .... switch (packet.getType()) { case SUBSCRIPTION: ... embeddedServer.subscribe(clientIdentity); ... break; case UNSUBSCRIPTION: ... embeddedServer.unsubscribe(clientIdentity); ... break; case GET: .... if (get.getTimeout() == -1) { message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize()); } else { ... message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit); } ... break; case CLIENTACK: ... embeddedServer.ack(clientIdentity, ack.getBatchId()); ... break; case CLIENTROLLBACK: ... if (rollback.getBatchId() == 0L) { embeddedServer.rollback(clientIdentity); } else { embeddedServer.rollback(clientIdentity, rollback.getBatchId()); } ... break; default: NettyUtils.error(400, MessageFormatter.format("packet type={} is NOT supported!", packet.getType()) .getMessage(), ctx.getChannel(), null); break; } ... } ... }
|
可以看到,SessionHandler对client请求进行解析后,根据请求类型,委派给CanalServerWithEmbedded的相应方法进行处理。因此核心逻辑都在CanalServerWithEmbedded中。
CanalServerWithEmbeded
CanalServerWithEmbedded实现了CanalServer和CanalService两个接口。其内部维护了一个Map,key为destination,value为对应的CanalInstance,根据客户端请求携带的destination参数将其转发到对应的CanalInstance上去处理
1 2 3 4 5 6
| public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService { ... private Map<String, CanalInstance> canalInstances; ... }
|
对于CanalServer接口中定义的start和stop这两个方法实现比较简单,这里不再赘述。
在上面的SessionHandler源码分析中,我们已经看到,会根据请求报文的类型,会调用CanalServerWithEmbedded的相应方法,这些方法都定义在CanalService接口中,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public interface CanalService { void subscribe(ClientIdentity clientIdentity) throws CanalServerException; void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException; Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException; Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException; Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException; Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException; void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException; void rollback(ClientIdentity clientIdentity) throws CanalServerException; void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException; }
|
细心地的读者会发现,每个方法中都包含了一个ClientIdentity类型参数,这就是客户端身份的标识。
1 2 3 4 5 6
| public class ClientIdentity implements Serializable { private String destination; private short clientId; private String filter; ... }
|
CanalServerWithEmbedded就是根据ClientIdentity中的destination参数确定这个请求要交给哪个CanalInstance处理的。
下面一次分析每一个方法的作用:
subscribe方法:
subscribe主要用于处理客户端的订阅请求,目前情况下,一个CanalInstance只能由一个客户端订阅,不过可以重复订阅。订阅主要的处理步骤如下:
- 根据客户端要订阅的destination,找到对应的CanalInstance
- 通过这个CanalInstance的CanalMetaManager组件记录下有客户端订阅。
- 获取客户端当前订阅位置(Position)。首先尝试从CanalMetaManager中获取,CanalMetaManager 中记录了某个client当前订阅binlog的位置信息。如果是第一次订阅,肯定无法获取到这个位置,则尝试从CanalEventStore中获取第一个binlog的位置。从CanalEventStore中获取binlog位置信息的逻辑是:CanalInstance一旦启动,就会立刻去拉取binlog,存储到CanalEventStore中,在第一次订阅的情况下,CanalEventStore中的第一条binlog的位置,就是当前客户端当前消费的开始位置。
- 通知CanalInstance订阅关系变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
@Override public void subscribe(ClientIdentity clientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); if (!canalInstance.getMetaManager().isStart()) { canalInstance.getMetaManager().start(); } canalInstance.getMetaManager().subscribe(clientIdentity); Position position = canalInstance.getMetaManager().getCursor(clientIdentity); if (position == null) { position = canalInstance.getEventStore().getFirstPosition(); if (position != null) { canalInstance.getMetaManager().updateCursor(clientIdentity, position); } logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position); } else { logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position); } canalInstance.subscribeChange(clientIdentity); }
|
unsubscribe方法:
unsubscribe方法主要用于取消订阅关系。在下面的代码中,我们可以看到,其实就是找到CanalInstance对应的CanalMetaManager,调用其unsubscribe取消这个订阅记录。需要注意的是,取消订阅并不意味着停止CanalInstance。当某个客户端取消了订阅,还会有新的client来订阅这个CanalInstance,所以不能停。
1 2 3 4 5 6 7 8 9
|
@Override public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException { CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); canalInstance.getMetaManager().unsubscribe(clientIdentity); logger.info("unsubscribe successfully, {}", clientIdentity); }
|
listAllSubscribe方法:
这一个管理方法,其作用是列出订阅某个destination的所有client。这里返回的是一个List,不过我们已经多次提到,目前一个destination只能由一个client订阅。这里之所以返回一个list,是canal原先计划要支持多个client订阅同一个destination。不过,这个功能一直没有实现。所以List中,实际上只会包含一个ClientIdentity。
1 2 3 4 5 6 7
|
public List<ClientIdentity> listAllSubscribe(String destination) throws CanalServerException { CanalInstance canalInstance = canalInstances.get(destination); return canalInstance.getMetaManager().listAllSubscribeInfo(destination); }
|
listBatchIds方法:
1 2 3 4 5 6 7 8 9 10 11 12
|
public List<Long> listBatchIds(ClientIdentity clientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); Map<Long, PositionRange> batchs = canalInstance.getMetaManager().listAllBatchs(clientIdentity); List<Long> result = new ArrayList<Long>(batchs.keySet()); Collections.sort(result); return result; }
|
getWithoutAck方法:
getWithoutAck方法用于客户端获取binlog消息 ,一个获取一批(batch)的binlog,canal会为这批binlog生成一个唯一的batchId。客户端如果消费成功,则调用ack方法对这个批次进行确认。如果失败的话,可以调用rollback方法进行回滚。客户端可以连续多次调用getWithoutAck方法来获取binlog,在ack的时候,需要按照获取到binlog的先后顺序进行ack。如果后面获取的binlog被ack了,那么之前没有ack的binlog消息也会自动被ack。
getWithoutAck方法大致工作步骤如下所示:
- 根据destination找到要从哪一个CanalInstance中获取binlog消息。
- 确定从哪一个位置(Position)开始继续消费binlog。通常情况下,这个信息是存储在CanalMetaManager中。特别的,在第一次获取的时候,CanalMetaManager 中还没有存储任何binlog位置信息。此时CanalEventStore中存储的第一条binlog位置,则应该client开始消费的位置。
- 根据Position从CanalEventStore中获取binlog。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。 如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。
- 在CanalMetaManager中记录这个批次的binlog消息。CanalMetaManager会为获取到的这个批次的binlog生成一个唯一的batchId,batchId是递增的。如果binlog信息为空,则直接把batchId设置为-1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| @Override public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException { return getWithoutAck(clientIdentity, batchSize, null, null); }
@Override public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); synchronized (canalInstance) { PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity); Events<Event> events = null; if (positionRanges != null) { events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit); } else { Position start = canalInstance.getMetaManager().getCursor(clientIdentity); if (start == null) { start = canalInstance.getEventStore().getFirstPosition(); } events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit); } if (CollectionUtils.isEmpty(events.getEvents())) { logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", new Object[] { clientIdentity.getClientId(), batchSize }); return new Message(-1, new ArrayList<Entry>()); } else { Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange()); List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() { public Entry apply(Event input) { return input.getEntry(); } }); logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()); return new Message(batchId, entrys); } } }
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout, TimeUnit unit) { if (timeout == null) { return eventStore.tryGet(start, batchSize); } else { try { if (timeout <= 0) { return eventStore.get(start, batchSize); } else { return eventStore.get(start, batchSize, timeout, unit); } } catch (Exception e) { throw new CanalServerException(e); } } }
|
ack方法:
ack方法时客户端用户确认某个批次的binlog消费成功。进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
ack时需要做以下几件事情:
- 从CanalMetaManager中,移除这个批次的信息。在getWithoutAck方法中,将批次的信息记录到了CanalMetaManager中,ack时移除。
- 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始,这是通过CanalMetaManager记录的。
- 从CanalEventStore中,将这个批次的binlog内容移除。因为已经消费成功,继续保存这些已经消费过的binlog没有任何意义,只会白白占用内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); PositionRange<LogPosition> positionRanges = null; positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); if (positionRanges == null) { throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } if (positionRanges.getAck() != null) { canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck()); logger.info("ack successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); } canalInstance.getEventStore().ack(positionRanges.getEnd()); }
|
rollback方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
@Override public void rollback(ClientIdentity clientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity); if (!hasSubscribe) { return; } synchronized (canalInstance) { canalInstance.getMetaManager().clearAllBatchs(clientIdentity); canalInstance.getEventStore().rollback(); logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() }); } }
@Override public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity); if (!hasSubscribe) { return; } synchronized (canalInstance) { PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); if (positionRanges == null) { throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } canalInstance.getEventStore().rollback(); logger.info("rollback successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); } }
|
get方法:
与getWithoutAck主要流程完全相同,唯一不同的是,在返回数据给用户前,直接进行了ack,而不管客户端消费是否成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Override public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException { return get(clientIdentity, batchSize, null, null); }
@Override public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); synchronized (canalInstance) { PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity); if (positionRanges != null) { throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data", clientIdentity.getClientId(), positionRanges)); } Events<Event> events = null; Position start = canalInstance.getMetaManager().getCursor(clientIdentity); events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit); if (CollectionUtils.isEmpty(events.getEvents())) { logger.debug("get successfully, clientId:{} batchSize:{} but result is null", new Object[] { clientIdentity.getClientId(), batchSize }); return new Message(-1, new ArrayList<Entry>()); } else { Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange()); List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() { public Entry apply(Event input) { return input.getEntry(); } }); logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()); ack(clientIdentity, batchId); return new Message(batchId, entrys); } } }
|