docker安装Alibaba Canal的步骤

docker安装Alibaba Canal的步骤

1.使用场景

Canal[kə’næl]是由Alibaba开发的数据同步中间件,译为水道/管道/沟渠,通过解析MySQL数据库增量日志,提供数据订阅和消费,主要使用场景如下:

  • 创建数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护
  • 按需刷新业务cache
  • 按业务逻辑需要处理增量数据
  • 同步构建其他数据源

相比MySQL本身的主从机制,有下面几点优势:

  • 1)让架构更灵活,多机房同步比较简单。
  • 2)异构表之间也可以同步,可以控制不同步DDL以免出现数据丢失和不一致。
  • 3)Canal可以实现一个表一线程,多个表多线程的同步,速度更快。

2.同步机制

image

  • 1)canal模拟MySQL slave的交互协议,伪装自己为MySQL salve。
  • 2)MySQL master收到dump请求,开始推送binary log给伪装的slave,也就是canal。
  • 3)canal解析binary log 对象(原始byte流),执行用户定义的业务逻辑。

3.安装Canal

演示安装的软件是MySQL 5.6、Centos 7(已安装docker)。

  • 1)配置数据库用户canal:

    CREATE USER canal IDENTIFIED BY 'canal';    
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'localhost';  
    FLUSH PRIVILEGES;

    删除可能存在的空用户:

    USE mysql;
    DELETE FROM USER WHERE USER=''; 
    FLUSH PRIVILEGES; 

    如果没有配置权限,canal启动会出现错误信息:

    Caused by: java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]
    at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:274)
    at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82)
  • 2)检查数据库binlog日志是否开启:

    SHOW VARIABLES LIKE '%log_bin%'  

    修改配置文件,开启binlog日志:

    vim /etc/my.cnf 

    修改my.cnf,指定日志文件地址:

    log-bin=/var/lib/mysql/mysql-bin
    server-id=123454

    mysql根据log-bin配置开启日志,并且自动设置log_bin_index文件为指定的文件名,后缀为.index。server-id参数用于指定一个不能和其他集群中机器重名的字符串,如果只有一台机器,可以随便写。

如果binlog没有配置,canel启动会提示下面的错误:

com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
2022-04-09 23:50:30.543 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation

配置后重启MySQL:

service mysqld restart
  • 3)创建测试表:
    CREATE TABLE `canal_test` (
    `id` int(10) NOT NULL AUTO_INCREMENT,
    `update_time` datetime NOT NULL COMMENT '更新时间',
    `create_time` datetime NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '创建时间',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='CANAL测试表';
  • 4)docker安装canal:
    # 下载最新的镜像
    docker pull canal/canal-server:v1.1.5
    # 运行镜像
    docker run --name canal -d canal/canal-server
    # 创建外部配置目录
    mkdir /opt/canal/
    # 拷贝配置文件到外部目录
    docker cp canal:/home/admin/canal-server/conf/canal.properties /opt/canal/canal.properties
    # 拷贝配置文件到外部目录
    docker cp canal:/home/admin/canal-server/conf/example/instance.properties /opt/canal/instance.properties
    #停止容器
    docker stop canal
    #删除容器
    docker rm canal

修改instance.properties的配置项canal.instance.master.address为MySQL实例的访问地址。

# 指定外部配置启动容器
docker run --name canal -p 11111:11111 -d -v /opt/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /opt/canal/canal.properties:/home/admin/canal-server/conf/canal/canal.properties canal/canal-server

# 进入容器
docker exec -it canal /bin/bash

# 查看监听日志
tail -f /home/admin/canal-server/logs/example/example.log

4.测试连接

测试数据监听代码:

@Component
public class CanalStartInitializingBean implements InitializingBean {
    // 定义了canel_server的端口号和IP
    @Autowired
    private CanalConfig canalConfig;

    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("start listener....");

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),
                "example", "canal", "canal");
        //批量数据条数
        int batchSize = 1;
        try {
            connector.connect();
            connector.subscribe("test.canal_test");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId);
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entryList) {
        for (Entry entry : entryList) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChange;
            try {
                rowChange = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("rowChare ======>" + rowChange.toString());

            EventType eventType = rowChange.getEventType(); //事件類型,比如insert,update,delete
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名称
                    entry.getHeader().getLogfileOffset(), //偏移量
                    entry.getHeader().getSchemaName(),//库名
                    entry.getHeader().getTableName(), //表名
                    eventType));//事件名

            for (RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

打印出新增数据:

================> binlog[mysql-bin.000006:19333] , name[,] , eventType : QUERY
rowChare ======>eventType: INSERT
isDdl: false
sql: "insert into `canal_test`(update_time, create_time)\nvalues (now(), now())"
ddlSchemaName: "test"

完整Java工程演示代码点击下载

5.参考

https://blog.csdn.net/q3dxdx/article/details/50955533
https://blog.csdn.net/xiaosong2001/article/details/123326179
https://github.com/alibaba/canal/issues/146
https://www.cnblogs.com/clschao/articles/8192345.html

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注