您现在的位置是:首页 > 正文

SpringBoot整合Canal1.1.6并同步数据到Redis(超详细和很多踩坑点)

2024-01-30 19:32:03阅读 0

一、使用背景

最近公司新接了一个医院项目,主要是对接医院内营养处方数据,院内his部署在内网环境,因此仅提供一个外网数据库给我们(他们自己应该做了类似主从的方式将我们需要的数据同步一份到外网数据库),又因为落地的数据我们需要立马进行处理并在公众号推送给病人进行付款等一系列其他操作,所以我们需要实时监听数据库的变化。以下是当时能想到的方案:

  • 方案一:轮询数据库
    第一个想到的也第一个被pass。原因也不必多说

  • 方案二:数据库触发器
    数据库触发器公司内部的小伙伴使用的都极其的少,确实是我们自己学艺不精,而且维护起来不方便,对后面来的开发小伙伴也不友好,最终也被pass

  • 方案三:消息队列
    这应该是在没有提供外网数据库的时候想出的方案,也是最符合业务场景的方案之一,但由于甲方不想增加开发成本、业务代码被入侵以及使用第三方插件增加系统风险和不稳定性等一系列原因,这个方案也被pass

  • 方案四:Canal
    讨论很久以后才决定使用canal,主要原因第一个是我们需要实时监听院内处方的数据变化,第二个是推过来的处方数据我们需要展示给用户,每个人只能看到自己的处方数据,并且处方有时效性,利用redis的过期时间也能完美代替数据库条件查询。

二、什么是 Canal?

这里简单介绍,具体就不从官网贴图了,啥都没有官网说的清楚,点击进入Canal官网
1、了解什么是Canal之前我们应该首先了解以下MySql的主从复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据。

    上述中涉及到两个重要的MySql事务日志 binlog和relaylog,简单介绍一下两种日志

BinLog

即binary log,二进制日志文件,也叫作变更日志(update log)。它记录了数据库所有执行的
DDL 和 DML 等数据库更新事件的语句,但是不包含没有修改任何数据的语句(如数据查询语句select、 show等)。
说白了就是MySql记录了除了查询语句之外的几乎任何增删改操作。MySql8以后默认开启。开启会损失约百分之一的性能。

RelayLog

中继日志只在主从服务器架构的从服务器上存在。从服务器为了与主服务器保持一致,要从主服务器读
取二进制日志(binary log)的内容,并且把读取到的信息写入 本地的日志文件 中,这个从服务器本地的日志文件就叫
中继日志 。然后,从服务器读取中继日志,并根据中继日志的内容对从服务器的数据进行更新,完成主
从服务器的 数据同步 。

2、了解了主从复制的原理以后,就把Canal当成一个Slave,原理也就清晰了

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

三、准备工作

1、准备MySql 8.x

1)查看数据库版本

SELECT VERSION(); -- 查看数据库版本

2)查看BinLog日志是否开启

SHOW VARIABLES LIKE 'log_bin%';
-- 如果log_bin的value为on则已开启,显示off则未开启。

3)如果未开启binlog

  • 找到服务器上my.cnf,增加如下配置
[mysqld]  
log-bin=mysql-bin #添加这一行就ok  
binlog-format=ROW #选择row模式  
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复  

4)为canal新建账号

CREATE USER canal IDENTIFIED BY 'canal';  
-- 授权部分操作
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 也可以使用下面的命令授权所有操作
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
-- 需要刷新权限生效
FLUSH PRIVILEGES;

5)踩坑点

------------注意踩坑点------------

注意点一: log_bin 为只读变量,不能通过命令set global log_bin = on;临时生效。
在这里插入图片描述
注意点二: 这是我刚开始学习Canal踩的坑,也是自己对binlog不够了解犯的错误。也是很多博主没有提到过的问题,所以自己照葫芦画瓢就容易出错。执行命令如下:

SHOW BINARY LOGS;

在这里插入图片描述
针对于已经运行的或者重启过很多次的MySql可以看到很多binlog日志文件(也有可能是几十几百个),这和binlog的配置有很大关系,同学们自己感兴趣的了解一下。因此canal在配置的时候要配置监听的文件名称,很多博主照搬照抄都设置为binlog.000001。导致后面监听不到数据便变化。所以后面设置的时候需要注意文件名称为当前MySql正在写入的binlog,如何确定具体是哪个呢?执行如下命令:

SHOW MASTER STATUS;

在这里插入图片描述

正对于一些不重要的数据库可以直接使用下面命令删除所有binlog日志文件(慎用!!!)

RESET MASTER;

在这里插入图片描述
这样后面canal的配置文件名称就可以用binlog.000001,否则只能用上述的binlog.000054

2、准备Canal

下载地址:

1)Canal1.1.4下载

在这里插入图片描述

2)解压并配置Canal

解压:tar -zxvf canal.deployer-1.1.4.tar.gz
解压后如图四个文件夹
在这里插入图片描述
进入conf/example下
在这里插入图片描述
修改instance.properties,主要配置参考如下

#################################################
#需要配置的地方都进行了中文描述,其他都为默认信息
canal.instance.gtidon=false

# 这里是数据库地址信息,数据库账号密码在下面
canal.instance.master.address=124.111.11.111:3306
#这里就是上面强调的binlog日志文件名称即mysql主库链接时起始的binlog文件
canal.instance.master.journal.name=binlog.000001
#mysql主库链接时起始的binlog偏移量
canal.instance.master.position=157
#mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
#这里为数据库账号
canal.instance.dbUsername=canal
#这里为数据库密码
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

3)启动Canal

进入canal1.1.4/bin可以看以下脚本。
在这里插入图片描述
直接命令启动:sh startup.sh;
因为canal-server本质上也是jar包在运行,直接通过jps命令查看运行状态
在这里插入图片描述
发现已经初步启动
接下来进入/canal1.1.4/logs/example/查看日志观察是否报错tail -f example.log查看日志信息

在这里插入图片描述
如此便算是完全启动成功。

4)踩坑点

------------注意踩坑点------------
注意点一:linux启动完成后,会在bin目录下生成canal.pid,stop.sh会读取canal.pid进行进程关闭。如果直接通过kill命令可能会导致canal不能完全关闭,这样下次执行./startup.sh命令可能会启动失败。

3、准备项目测试

1、构建SpringBoot项目,引入坐标

	<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.</groupId>
    <artifactId>springcloud-alibaba</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>



    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

    </dependencies>

</project>

2、新建监听类


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author zqf
 * @desc
 */

@Component
public class CanalListener {


    /**
     * 解析数据
     *
     * @param beforeColumns 修改、删除后的数据
     * @param afterColumns  新增、修改、删除前的数据
     * @param dbName        数据库名字
     * @param tableName     表大的名字
     * @param eventType     操作类型(INSERT,UPDATE,DELETE)
     * @param timestamp     消耗时间
     */
    private static void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns, String dbName, String tableName, CanalEntry.EventType eventType, long timestamp) {

        System.out.println("数据库:" + dbName);
        System.out.println("表名:" + tableName);
        System.out.println("操作类型:" + eventType);
        if (CanalEntry.EventType.INSERT.equals(eventType)) {
            System.out.println("这是一条新增的数据");
        } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
            System.out.println("删除数据:" + afterColumns);
        } else {
            System.out.println("更新数据:更新前数据--" + afterColumns);
            System.out.println("更新数据:更新后数据--" + beforeColumns);

        }
        System.out.println("操作时间:" + timestamp);
    }

    @PostConstruct
    public void run() throws Exception {
        CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress("124.111.11.111", 11111), "example", null, null);
        while (true) {
            conn.connect();
            conn.subscribe(".*\\..*");
            // 回滚到未进行ack的地方
            conn.rollback();
            // 获取数据 每次获取一百条改变数据
            Message message = conn.getWithoutAck(100);
            //获取这条消息的id
            long id = message.getId();
            int size = message.getEntries().size();
            if (id != -1 && size > 0) {
                // 数据解析
                analysis(message.getEntries());
            } else {
                //暂停1秒防止重复链接数据库
                Thread.sleep(1000);
            }
            // 确认消费完成这条消息
            conn.ack(message.getId());
            // 关闭连接
            conn.disconnect();
        }
    }

    /**
     * 数据解析
     */
    private void analysis(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 解析binlog
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
            }
            if (rowChange != null) {
                // 获取操作类型
                CanalEntry.EventType eventType = rowChange.getEventType();
                // 获取当前操作所属的数据库
                String dbName = entry.getHeader().getSchemaName();
                // 获取当前操作所属的表
                String tableName = entry.getHeader().getTableName();
                // 事务提交时间
                long timestamp = entry.getHeader().getExecuteTime();
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);

                }
            }
        }
    }

}
3、新建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author zqf
 * @desc
 */
@SpringBootApplication
public class CanalApplication {

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }
}

如此即可,启动项目,并随意在监听的数据库增删改,查看控制台信息输出

数据库:linyi
表名:tb_answer
操作类型:UPDATE
更新数据:更新前数据--[index: 0
sqlType: -5
name: "answer_no"
isKey: true
updated: false
isNull: false
value: "1638010372783263744"
mysqlType: "bigint"
, index: 1
sqlType: 12
name: "questionnaire_name"
isKey: false
updated: true
isNull: false
value: "\351\227\256\345\215\267\350\260\203\346\237\245\350\241\2501"
mysqlType: "varchar(255)"
, index: 2
sqlType: -5
name: "questionnaire_no"
isKey: false
updated: false
isNull: false
value: "1637737551628750848"
mysqlType: "bigint"
, index: 3
sqlType: -5
name: "goods_no"
isKey: false
updated: false
isNull: false
value: "1"
mysqlType: "bigint"
, index: 4
sqlType: -5
name: "patient_no"
isKey: false
updated: false
isNull: false
value: "1635614014201823232"
mysqlType: "bigint"
, index: 5
sqlType: 12
name: "answer_url"
isKey: false
updated: false
isNull: false
value: "questionnaire_answer/2023-03-21/1638010372783263745.jpg"
mysqlType: "varchar(255)"
, index: 6
sqlType: 4
name: "assess_score"
isKey: false
updated: false
isNull: false
value: "2"
mysqlType: "int"
, index: 7
sqlType: 12
name: "assess_result"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(500)"
, index: 8
sqlType: 4
name: "status"
isKey: false
updated: false
isNull: false
value: "0"
mysqlType: "int"
, index: 9
sqlType: 93
name: "create_time"
isKey: false
updated: false
isNull: false
value: "2023-03-21 10:51:20"
mysqlType: "datetime"
, index: 10
sqlType: 93
name: "update_time"
isKey: false
updated: false
isNull: false
value: "2023-03-21 10:51:20"
mysqlType: "datetime"
, index: 11
sqlType: 4
name: "is_del"
isKey: false
updated: false
isNull: false
value: "0"
mysqlType: "int"
]
更新数据:更新后数据--[index: 0
sqlType: -5
name: "answer_no"
isKey: true
updated: false
isNull: false
value: "1638010372783263744"
mysqlType: "bigint"
, index: 1
sqlType: 12
name: "questionnaire_name"
isKey: false
updated: false
isNull: false
value: "\351\227\256\345\215\267\350\260\203\346\237\245\350\241\250"
mysqlType: "varchar(255)"
, index: 2
sqlType: -5
name: "questionnaire_no"
isKey: false
updated: false
isNull: false
value: "1637737551628750848"
mysqlType: "bigint"
, index: 3
sqlType: -5
name: "goods_no"
isKey: false
updated: false
isNull: false
value: "1"
mysqlType: "bigint"
, index: 4
sqlType: -5
name: "patient_no"
isKey: false
updated: false
isNull: false
value: "1635614014201823232"
mysqlType: "bigint"
, index: 5
sqlType: 12
name: "answer_url"
isKey: false
updated: false
isNull: false
value: "questionnaire_answer/2023-03-21/1638010372783263745.jpg"
mysqlType: "varchar(255)"
, index: 6
sqlType: 4
name: "assess_score"
isKey: false
updated: false
isNull: false
value: "2"
mysqlType: "int"
, index: 7
sqlType: 12
name: "assess_result"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(500)"
, index: 8
sqlType: 4
name: "status"
isKey: false
updated: false
isNull: false
value: "0"
mysqlType: "int"
, index: 9
sqlType: 93
name: "create_time"
isKey: false
updated: false
isNull: false
value: "2023-03-21 10:51:20"
mysqlType: "datetime"
, index: 10
sqlType: 93
name: "update_time"
isKey: false
updated: false
isNull: false
value: "2023-03-21 10:51:20"
mysqlType: "datetime"
, index: 11
sqlType: 4
name: "is_del"
isKey: false
updated: false
isNull: false
value: "0"
mysqlType: "int"
]
操作时间:1681747515000

至此,Canal整合完成

网站文章

  • 计算机组成原理怎么考试,计算机组成原理考试及答案

    《计算机组成原理考试及答案》由会员分享,可在线阅读,更多相关《计算机组成原理考试及答案(2页珍藏版)》请在人人文库网上搜索。1、一、单选题1.目前我们所说的个人台式商用机属于(4.微型机)2.(4.移...

    2024-01-30 19:31:33
  • Eclipse 显示 Server 面板

    Eclipse 显示 Server 面板

    Eclipse 显示 Server 面板 2018年01月13日 08:09:27 mystonelxj 阅读数:1884 版权声明:未经博主允许,请勿转载原创,谢谢! https://blog.csdn.net/mystonelxj/article/details/79049259 在做web工程的时候,有时候server面板会不留意关闭了。需要手动将其打开,便于做代码的运行。 具体做...

    2024-01-30 19:31:26
  • Android面试送分题:这份354页笔记的Android进阶知识+大厂高频面试题,已拿offer

    Android面试送分题:这份354页笔记的Android进阶知识+大厂高频面试题,已拿offer

    程序员与别的专业有所不同,其他专业都是越老越香,而程序员却是一个例外,因为计算机技术更新太快,而且工作强度很大,因此大部分程序员只会写 3 年代码。3 年后要不晋升做项目经理,要么转行,个别研究所除外...

    2024-01-30 19:31:19
  • CSS的三大特性总结

    CSS的三大特性总结

    CSS的三大特性:继承性,层叠性,优先级

    2024-01-30 19:30:53
  • C++中的Split函数(字符串自动分割)

    函数原型:C/C++中的Split函数是strtok(),其函数原型如下: char * strtok (char * str, const char * delimiters); 函数说明 :strtok()用来将字符串分割成一个个片段。参数str指向欲分割的字符串,参数delimiters则为分割字符串,当strtok()在参数str的字符串中发现到参数delimiters的分割字符时则

    2024-01-30 19:30:46
  • postgresql子查询优化(提升子查询)

    问题背景 在开发项目过程中,客户要求使用gbase8s数据库(基于informix),简单的分页页面响应很慢。排查发现分页sql是先查询出数据在外面套一层后再取多少条,如果去掉嵌套的一层,直接获取则很快。日常使用中postgresql并没有这样的操作也很快,这是为什么呢? 说明 在数据库实现早期,查询优化器对子查询一般采用嵌套执行的方式,即父查询中的每一行,都要执行一次子查询,这样子查询会执...

    2024-01-30 19:30:39
  • 安装ArcGIS提示localhost是无效的主机名解决方法 热门推荐

    安装ArcGIS提示localhost是无效的主机名解决方法 热门推荐

    打开注册表编辑器(开始-&gt;运行或win+R输入regedit) 将HKEY_LOCAL_MACHINE\SOFTEARE\ESRI\License10.1中LINESE_SERVER的值改为@localhost

    2024-01-30 19:30:31
  • 用线性代数求解乘法逆元 手算逆元

    用线性代数求解乘法逆元 手算逆元

    2024-01-30 19:30:04
  • 泛化、实现、依赖和关联的区别,组合是强关系

    泛化、实现、依赖和关联的区别,组合是强关系

    泛化、实现、依赖和关联的区别传统应用程序设计中所说的依赖一般指“类之间的关系”,那先让我们复习一下类之间的关系:a、实现表示类对接口的实现。UML图中实现使用一条带有空心三角箭头的虚线指向接口,如下:b、泛化表示类与类之间的继承关系、接口与接口之间的继承关系。UML图中实现使用一条带有空心三角箭头的实线指向基类,如下:c、依赖表现为函数中的参数(us

    2024-01-30 19:29:56
  • 文件解析时,如何创建大量数据的文件解析进行测试,应该怎么测呢

    文件解析时,如何创建大量数据的文件解析进行测试,应该怎么测呢

    为了创建大量数据的文件解析进行测试,可以考虑以下几个步骤:

    2024-01-30 19:29:48