Canal-Client如何工作
阿里数据同步中间件–canal的客户端如何消费mysql的binlog数据
对于一般项目来说,canal-server可以直接用原生集成启动,我们的主要改造就是在canal-client上(当然canal-client也有现成的类似canal-mq,改下配置就ok),这里主要是分析下canal是如何消费从mysql接收到的binlog消息的:
canal接收到myql的binlog日志后默认方式是存储在RingBuffer中(环形高性能缓冲队列,Disruptor也是基于此实现),canal-client主要是与RingBuffer交互,从缓冲队列获取信息,提交ACK。
我们来看看具体的代码:
1 | protected void process() { |
这个的这样的过程是这样的
- 连接,connector.connect()
- 订阅,connector.subscribe
- 获取数据,connector.getWithoutAck()
- 业务处理
- 提交确认,connector.ack()
- 回滚,connector.rollback()
- 断开连接,connector.disconnect()
我们具体来看下。
canal client与canal server之间是C/S模式的通信,客户端采用NIO,服务端采用Netty。
canal server启动后,如果没有canal client,那么canal server不会去mysql拉取binlog。
即canal客户端主动发起拉取请求,服务端才会模拟一个MySQL Slave节点去主节点拉取binlog。
通常canal客户端是一个死循环,这样客户端一直调用get方法,服务端也就会一直拉取binlog。
一、建立连接
CanalConnector主要有两个实现,一个是SimpleCanalConnector,一个是ClusterCanalConnector,我们主要看下ClusterCanalConnector,这也是我们要用的一个模式。
我们用的时候,通过一个工厂类生成我们需要的Connector,这里的工厂类是CanalConnectors,里面包含了生成ClusterCanalConnector的方法。
这里说明下这个ClusterCanalConnector,就是集群canal连接器,一般canal集群部署的话,需要通过zk来维护元数据信息,canal客户端需要跟zk交互。
1 | public static CanalConnector newClusterConnector(String zkServers, String destination, String username,String password) { |
用到的参数有zk的地址,canal的名称,数据库的账号密码。里面有个ClusterNodeAccessStrategy是用来选择client的策略,这个ClusterNodeAccessStrategy的构造方法里面有些东西需要我们关注下。
1.1 ClusterNodeAccessStrategy
1 | public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){ |
这边起了两个监听器,都是监听server端的活动服务器的。一个是获取所有的server列表,一个是获取活动的server服务器,都是从zk的对应节点上去取的。
1.2 连接connect
获取到CanalConnector之后,就是真正的连接了。在ClusterCanalConnector中,我们可以看到,其实他底层用的也是SimpleCanalConnector,只不过加了一个选择的策略。
1 | public void connect() throws CanalClientException { |
如果是集群模式的客户端,那么这边的runningMonitor不为空,因为他进行了初始化。我们主要看下runningMonitor.start()里面的操作。
1 | public void start() { |
这边监听的路径是:/otter/canal/destinations/{destination}/{clientId}/running。如果有任何的变化,或节点的删除,那么执行dataListener里面的操作。
1 | dataListener = new IZkDataListener() { |
这里的注释比较清楚,基本上如果数据发生了变化,那么进行节点释放后,将运行节点置为活动节点。如果发生了数据删除,那么直接触发退出,如果上一次的active状态是本机,那么触发一下active抢占,否则等待delayTime,默认5s后重试。下面我们主要看下initRunning。
1.3 initRunning
这块主要是创建运行节点的临时节点。节点路径是/otter/canal/destinations/{destination}/{clientId},节点内容是ClientRunningData的json序列化结果。连接的代码:
1 | public InetSocketAddress processActiveEnter() { |
这块有几段逻辑,我们慢慢看下。
1.3.1 doConnect
这里是client直接连上了server,通过socket连接,也就是server暴露的socket端口。
1 | private InetSocketAddress doConnect() throws CanalClientException { |
这边采用NIO编程,建立和server的socket连接后,发送了握手包和认证包,当收到ack包后,认为连接成功。认证包的服务端处理在ClientAuthenticationHandler类中,握手处理在HandshakeInitializationHandler类。
server接收到认证的消息后,会做如下的处理:
1 | public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception { |
主要的逻辑在subscribe里面。如果metaManager没有启动,那么需要进行启动。启动时,会从zk节点下面拉取一些数据,包括客户端的消费位点情况等等。然后就是订阅,订阅是新建一个zk节点,路径为/otter/canal/destinations/{destination}/{clientId}。然后还有一些过滤器,也需要写到zk中。之后就是获取一下本client的位点信息,如果原来zk中包含,那么直接从内存中获取,否则取eventStore的第一条数据。
1.3.2 subscribe
发送订阅消息给server,通过socket的方式。这边是判断,如果filter不为空,才发送订阅消息。服务端的处理过程是这样的:
1 | case SUBSCRIPTION: |
类似于connect的过程,不过这边带上了filter的参数。这边启动了server以及他的监听器。
1.3.3 rollback
这里的回滚是指回滚server端记录的本client的位点信息。
1 | public void rollback() throws CanalClientException { |
这里发送了rollback的指令。服务端是这么处理的:
1 | case CLIENTROLLBACK: |
这里的batchId传入的是0,也就是要回滚所有的批次。我们来看下这个回滚的动作:
1 |
|
这里回滚的,其实是eventStore中的指针,把get的指针设置为之前ack的指针。
二、订阅数据
当client连接server完成后,就需要进行binlog数据的订阅。
1 | public void subscribe() throws CanalClientException { |
订阅这块的内容不再赘述,在上面的connect过程中有提到。这边还有一个失败重试的机制,当异常不是中断异常的情况下,会重试重启client connector,直到达到了阈值retryTimes。
三、获取数据
在建立连接和进行数据订阅之后,就可以开始进行binlog数据的获取了。主要的方法是getWithOutAck这个方法,这种是需要client自己进行数据ack的,保证了只有数据真正的被消费,而且进行了业务逻辑处理之后,才会ack。当然,如果有了异常,也会进行一定次数的重试和重启。
1 | public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException { |
我们可以看到,其实是发送了一个GET命令给server端,然后传递了一个参数batchSize,还有超时时间,而且不是自动提交的。服务端的处理是这样的:
1 | embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize()); |
也是调用的这个方法:
1 |
|
最主要的逻辑在这里:
- 判断canalInstance是否已经启动:checkStart
- 判断订阅列表中是否包含当前的client:checkSubscribe
- 根据client信息从metaManager中获取最后消费的批次:getLastestBatch,这块在运行起来后,是从内存中取的,但是在instance启动时,是从zk中拉取的,是从/otter/canal/destinations/{destination}/{clientId}/mark下面获取的,后续也会定时(1s)刷新到这里面
- 如果能获取到消费的批次,直接从eventStore的队列中获取数据。
- 如果positionRanges为空,那么从metaManager中获取指针。如果指针也没有,说明原来没有ack过数据,需要从store中第一条开始获取。这个过程其实就是找start,也就是上一次ack的位置。
- 调用getEvent,获取数据。根据传入的参数不同,调用不同的方法去获取数据,但是最终都是调用的goGet方法。这个doGet方法不是很复杂,主要是根据参数从store队列中获取数据,然后把指针进行新的设置。
- 如果没有取到binlog数据,那么直接返回,批次号为-1。
- 如果取到了数据,记录一下流式数据后返回。
结果封装在Messages中,最终改为Message,包含批次号和binlog列表。四、业务处理
拿到message后,需要进行判断batchId,如果batchId=-1或者binlog大小为0,说明没有拿到数据。否则在message基础上进行逻辑处理。
Message的内容,后续我们再进行讨论。五、提交确认
1
connector.ack(batchId); // 提交确认
提交批次id,底层发送CLIENTACK命令到server。server调用CanalServerWithEmbedded的ack方法来进行提交。
1 | public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException { |
首先更新metaManager中的batch,然后更新ack指针,同时清理store中到ack指针位置的数据。
六、回滚
如果有失败的情况,需要进行回滚。发送CLIENTROLLBACK命令给server端,进行数据回滚。回滚单个批次时的处理逻辑是这样的:
1 |
|
这里的rollback到指定的batchId,其实是假的。他的rollback也是全量回滚到ack的指针位置。
七、断开连接
在发生异常情况时,client会断开与server的连接,也就是disconnect方法。
1 | public void disconnect() throws CanalClientException { |
判断是否在断开连接的时候回滚参数(默认false)和当前socket通道是否连接中,进行回滚。
否则调用runningMonitor.stop方法进行停止。主要的过程是这样的:
- 取消监听/otter/canal/destinations/{destination}/{clientId}/running/节点变化信息
- 删除上面这个节点
- 关闭socket的读通道
- 关闭socket的写通道
- 关闭socket channel