分布式消息队列kafka【六】—— kafka整合数据同步神器canal
2026/5/4 12:26:58 网站建设 项目流程

分布式消息队列kafka【六】—— kafka整合数据同步神器canal

文章目录

  • 分布式消息队列kafka【六】—— kafka整合数据同步神器canal
    • 数据同步-整体介绍
    • 数据同步-初识canal
      • mysql主从数据同步过程
      • canal数据同步过程
      • canal优点
      • canal缺点
      • canal整体架构
      • canal安装部署和组件
    • 数据同步-canal环境搭建
      • mysql binlog开启
      • canal环境搭建
    • 数据同步-Java操作canal详解
    • 数据同步-canal集成kafka配置详解
      • 前言
      • canal.properties
      • instance.properties
    • 数据同步-kafka消费canal数据最终测试详解

数据同步-整体介绍

  • 什么是数据同步?
  • 数据同步的应用场景有哪些?
  • 如何去选择数据同步技术?
  • 数据同步的数据源选择

适用场景:比如mysql中按照哈希的算法分库分表的存储了订单数据,未来可能按照某个维度(比如:城市)作查询,此时就需要遍历所有的数据库和表去查询,比较麻烦。此时就可以采用数据同步的方案,比如按照相同城市的数据存储一张表或者一个elasticsearch索引。

传统的数据同步:比如业务订单入库后,消息发MQ,消费者处理消息同步到elasticsearch中,这种方式还是存在弊端的,开始的订单入库可能失败,此时消息发MQ并同步到elasticsearch中会导致数据不一致的问题。

注意:数据同步要基于已经确定,100%入库的数据。

数据同步-初识canal

  • 数据同步框架:canal

    • canal是用java开发的基于数据库增量日志解析,提供增量数据订阅和消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)
  • git地址:https://github.com/alibaba/canal

  • 当前的canal支持源端MySQL版本包括5.1x,5.5x,5.6x,5.7x,8.0x

  • canal基于日志增量数据订阅和消费的业务包括:

    • 数据库镜像、数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务cache刷新、带业务逻辑的增量数据处理

mysql主从数据同步过程

mysql的slave节点将master的Binary log事件copy到中继日志Relay log,然后mysql的slave节点将中继日志Relay log的数据变更更新到自己的数据库上。

canal数据同步过程

canal会把自己伪装成一个slave节点,然后实时读取master的Binary log的变更,再在canal内部处理

canal优点

  • 实时性好
    • 因为采用了Binary log的机制,master节点上数据的更新、新增、删除都会实时的反馈到Binary log
  • 分布式
    • 采用zookeeper作分布式协调的框架,实现高可用的机制,即主备切换的概念
  • ACK机制
    • 可以批量读取Binary log,解析成功提交,失败回滚,下次还从上次提交的地方读取数据

canal缺点

  • 只支持增量同步,不支持全量同步
  • 支持MySQL->MySQL,MySQL->ES,MySQL->RDB
  • 数据同步过程中,日志如果不完整的话,没法收集运行过程中的指标,对于整个数据同步的监控手段还不是特别完善,需要业务方实时上报
  • 一个instance只能有一个消费端消费,无法并行,因为Binary log就是一个
  • 直支持主备模式,不支持两边分布式的扩容,单点压力过大

canal整体架构

一个server对应有1-n个Instance,EventParser:数据源接入,协议解析的组件;EventSink:拦截器,解析数据后处理、过滤、加工;EventStore:存储;MetaManager:增量订阅和消费信息的管理器

canal安装部署和组件

  • canal安装部署:https://github.com/alibaba/canal/releases
  • canal.adapter-1.1.4.tar.gz:同步的适配器(可以帮助作es等的适配,一般不会选择,因为真实的业务场景比这个要复杂)
  • canal.admin-1.1.4.tar.gz:admin控制台
  • canal.deployer-1.1.4.tar.gz:canal服务
  • canal.example-1.1.4.tar.gz:canal例子

标准的canal和kafka集成,并最终sink到es的处理流程

数据同步-canal环境搭建

mysql binlog开启

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
## 准备工作## mysql配置修改文件:vim/etc/my.cnf[mysqld]log-bin=mysql-bin# 开启 binlogbinlog-format=ROW# 选择 ROW 模式server_id=1# 配置 MySQL replaction 需要定义,mysql的master和slave节点不能重复,且不要和 canal 的 slaveId 重复## 重启服务## systemctl stop mariadb## systemctl start mariadbmysql -uroot -proot show variables like'%log_bin%';## sql_log_bin ON 表示binlog已开启## 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grantCREATEUSERroot IDENTIFIED BY'root';GRANT ALL PRIVILEGES ON *.* TO'root'@'%'IDENTIFIED BY'root'WITH GRANT OPTION;## 新建用户授权 canal-- CREATEUSERcanal IDENTIFIED BY'canal';-- GRANT ALL PRIVILEGES ON *.* TO'canal'@'%'IDENTIFIED BY'canal'WITH GRANT OPTION;-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO'canal'@'%';FLUSH PRIVILEGES;## mysql-bin的log日志存放目录cd/var/lib/mysql

canal环境搭建

## 创建文件夹并 解压 canalmkdir/usr/local/canaltar-zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/## 配置文件vim/usr/local/canal/conf/canal.properties## java程序连接端口canal.port=11111vim/usr/local/canal/conf/example/instance.properties## 不能与已有的mysql节点server-id重复canal.instance.mysql.slaveId=1001## mysql master的地址canal.instance.master.address=192.168.11.31:3306## 修改内容如下:canal.instance.dbUsername=root#指定连接mysql的用户密码canal.instance.dbPassword=root canal.instance.connectionCharset=UTF-8#字符集## 启动canalcd/usr/local/canal/bin ./startup.sh## 验证服务cat/usr/local/canal/logs/canal/canal.log## 查看实例日志tail-f -n100/usr/local/canal/logs/example/example.log ```
  • canal properties配置:
# position info canal.instance.master.address=172.16.210.74:3306#指定要读取binlog的MySQL的IP地址和端口 canal.instance.master.journal.name=#从指定的binlog文件开始读取数据 canal.instance.master.position=#指定偏移量,做过主从复制的应该都理解这两个参数。 #tips:binlog和偏移量也可以不指定,则canal-server会从当前的位置开始读取。我建议不设置 canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlog canal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb info canal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #这几个参数是设置高可用配置的,可以配置mysql从库的信息 #canal.instance.standby.address=#canal.instance.standby.journal.name=#canal.instance.standby.position=#canal.instance.standby.timestamp=#canal.instance.standby.gtid=# username/password canal.instance.dbUsername=canal #指定连接mysql的用户密码 canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8#字符集 # enable druid Decrypt database password canal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex #canal.instance.filter.regex=.*\\..*canal.instance.filter.regex=risk.canal,risk.cwx #这个是比较重要的参数,匹配库表白名单,比如我只要test库的user表的增量数据,则这样写 test.user # table black regex canal.instance.filter.black.regex=# table fieldfilter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field blackfilter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config #canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

数据同步-Java操作canal详解

packagecom.bfxy.canal.base;importjava.net.InetSocketAddress;importjava.util.List;importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.protocol.CanalEntry.Column;importcom.alibaba.otter.canal.protocol.CanalEntry.Entry;importcom.alibaba.otter.canal.protocol.CanalEntry.EntryType;importcom.alibaba.otter.canal.protocol.CanalEntry.EventType;importcom.alibaba.otter.canal.protocol.CanalEntry.RowChange;importcom.alibaba.otter.canal.protocol.CanalEntry.RowData;importcom.alibaba.otter.canal.protocol.Message;/** * CanalTest */publicclassCanalTest{/** * 1. 连接Canal服务器 * 2. 向Master请求dump协议 * 3. 把发送过来的binlog进行解析 * 4. 最后做实际的数据处理...发送到MQ Print... * @param args */publicstaticvoidmain(String[]args){CanalConnectorconnector=CanalConnectors.newSingleConnector(newInetSocketAddress("192.168.11.221",11111),"example","root","root");intbatchSize=1000;// 一次拉取数据量intemptyCount=0;// 计数器try{// 连接我们的canal服务器connector.connect();// 订阅什么内容?什么库表的内容??// .*\..*:代表订阅上面连接的这个canal下的全部库表的数据connector.subscribe(".*\\..*");// 出现问题直接进行回滚操作connector.rollback();inttotalEmptyCount=1200;// 总拉取数据量while(emptyCount<totalEmptyCount){Messagemessage=connector.getWithoutAck(batchSize);// 用于处理完数据后进行ACK提交动作longbatchId=message.getId();intsize=message.getEntries().size();// 没拉取到数据if(batchId==-1||size==0){emptyCount++;System.err.println("empty count: "+emptyCount);try{Thread.sleep(1000);}catch(InterruptedExceptione){// ignore..}}else{emptyCount=0;System.err.printf("message[batchId=%s, size=%s] \n",batchId,size);// 处理解析数据printEntry(message.getEntries());}// 确认提交处理后的数据connector.ack(batchId);}System.err.println("empty too many times, exit");}finally{// 关闭连接connector.disconnect();}}privatestaticvoidprintEntry(List<Entry>entries){// 可以将一个个entry理解为一个个事件/记录for(Entryentry:entries){// 如果EntryType 当前处于事务的过程中 那就不能处理if(entry.getEntryType()==EntryType.TRANSACTIONBEGIN||entry.getEntryType()==EntryType.TRANSACTIONEND){continue;}// RowChange里面包含很多信息:存储数据库、表、binlogRowChangerc=null;try{// entry.getStoreValue()是二进制的数据rc=RowChange.parseFrom(entry.getStoreValue());}catch(Exceptione){thrownewRuntimeException("parser error!");}EventTypeeventType=rc.getEventType();System.err.println(String.format("binlog[%s:%s], name[%s,%s], eventType : %s",entry.getHeader().getLogfileName(),// binlog文件名称entry.getHeader().getLogfileOffset(),// binlog当前位置entry.getHeader().getSchemaName(),// 库名entry.getHeader().getTableName(),// 表名eventType));// 真正的对数据进行处理for(RowDatard:rc.getRowDatasList()){if(eventType==EventType.DELETE){// delete操作获取BeforeColumnsList得到删除之前的数据List<Column>deleteList=rd.getBeforeColumnsList();printColumn(deleteList);}elseif(eventType==EventType.INSERT){// insert操作获取AfterColumnsList得到新增之后的数据List<Column>insertList=rd.getAfterColumnsList();printColumn(insertList);}// updateelse{List<Column>updateBeforeList=rd.getBeforeColumnsList();printColumn(updateBeforeList);List<Column>updateAfterList=rd.getAfterColumnsList();printColumn(updateAfterList);}}}}privatestaticvoidprintColumn(List<Column>columns){for(Columncolumn:columns){System.err.println(column.getName()+" : "+column.getValue()+", update = "+column.getUpdated());}}}

数据同步-canal集成kafka配置详解

前言

直接用java解析canal的数据,性能不是特别好,因为这样是单线程的,无任何消息堆积能力,不适合高并发的场景。

一般来说,我们把mysql-bin.log直连到java去解析,会先把数据发到kafka,通过消费者解析数据。

canal.properties

################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, RocketMQ canal.serverMode = tcp # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = ################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## canal.serverMode = kafka canal.mq.servers = 192.168.11.51:9092 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local # aliyun mq namespace #canal.mq.namespace = ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable = false canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

instance.properties

################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1001 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.11.31:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=root canal.instance.dbPassword=root canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex # 数据过滤,.*\\..*代表不过滤 canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config # canal.mq.topic=example # dynamic topic route by schema or table regex # 动态生成topic canal.mq.dynamicTopic=.*\\..* #canal.mq.partition=0 # hash partition config # partitions数,需要和正常topic建立的默认partitions一致 canal.mq.partitionsNum=5 # 定义发到topic哪个分区的规则,表主键hash策略 canal.mq.partitionHash=.*\\..*:$pk$ #################################################

数据同步-kafka消费canal数据最终测试详解

packagecom.bfxy.canal.kafka;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.consumer.OffsetAndMetadata;importorg.apache.kafka.common.TopicPartition;/** * 直接用java解析canal的数据,性能不是特别好,因为这样是单线程的,无任何消息堆积能力,不适合高并发的场景。 * 一般来说,我们把mysql-bin.log直连到java去解析,会先把数据发到kafka,通过消费者解析数据。 */publicclassCollectKafkaConsumer{privatefinalKafkaConsumer<String,String>consumer;privatefinalStringtopic;publicCollectKafkaConsumer(Stringtopic){Propertiesprops=newProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.11.221:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG,"demo-group-id");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");consumer=newKafkaConsumer<>(props);this.topic=topic;consumer.subscribe(Collections.singletonList(topic));}privatevoidreceive(KafkaConsumer<String,String>consumer){while(true){// 拉取结果集ConsumerRecords<String,String>records=consumer.poll(Duration.ofSeconds(1));for(TopicPartitionpartition:records.partitions()){List<ConsumerRecord<String,String>>partitionRecords=records.records(partition);Stringtopic=partition.topic();intsize=partitionRecords.size();// 获取topic: test-db.demo, 分区位置: 2, 消息数为:1System.err.println("获取topic: "+topic+", 分区位置: "+partition.partition()+", 消息数为:"+size);for(inti=0;i<size;i++){/** * { * ----> "data":[{"id":"010","name":"z100","age":"35"}], * ----> "database":"test-db", * "es":1605269364000, * ----> "id":2, * "isDdl":false, * ----> "mysqlType":{"id":"varchar(32)","name":"varchar(40)","age":"int(8)"}, * ----> "old":[{"name":"z10","age":"32"}], * ----> "pkNames":["id"], * "sql":"", * "sqlType":{"id":12,"name":12,"age":4}, * ----> "table":"demo", * ----> "ts":1605269365135, * ----> "type":"UPDATE"} */System.err.println("-----> value: "+partitionRecords.get(i).value());longoffset=partitionRecords.get(i).offset()+1;// consumer.commitSync();consumer.commitSync(Collections.singletonMap(partition,newOffsetAndMetadata(offset)));System.err.println("同步成功, topic: "+topic+", 提交的 offset: "+offset);}//System.err.println("msgList: " + msgList);}}}publicstaticvoidmain(String[]args){Stringtopic="test-db.demo";CollectKafkaConsumercollectKafkaConsumer=newCollectKafkaConsumer(topic);collectKafkaConsumer.receive(collectKafkaConsumer.consumer);}}

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询