docker安装Alibaba Canal的步骤
1.使用场景
Canal[kə’næl]是由Alibaba开发的数据同步中间件,译为水道/管道/沟渠,通过解析MySQL数据库增量日志,提供数据订阅和消费,主要使用场景如下:
- 创建数据库镜像
- 数据库实时备份
- 索引构建和实时维护
- 按需刷新业务cache
- 按业务逻辑需要处理增量数据
- 同步构建其他数据源
相比MySQL本身的主从机制,有下面几点优势:
- 1)让架构更灵活,多机房同步比较简单。
- 2)异构表之间也可以同步,可以控制不同步DDL以免出现数据丢失和不一致。
- 3)Canal可以实现一个表一线程,多个表多线程的同步,速度更快。
2.同步机制
- 1)canal模拟MySQL slave的交互协议,伪装自己为MySQL salve。
- 2)MySQL master收到dump请求,开始推送binary log给伪装的slave,也就是canal。
- 3)canal解析binary log 对象(原始byte流),执行用户定义的业务逻辑。
3.安装Canal
演示安装的软件是MySQL 5.6、Centos 7(已安装docker)。
-
1)配置数据库用户canal:
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'localhost'; FLUSH PRIVILEGES;
删除可能存在的空用户:
USE mysql; DELETE FROM USER WHERE USER=''; FLUSH PRIVILEGES;
如果没有配置权限,canal启动会出现错误信息:
Caused by: java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#] at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:274) at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82)
-
2)检查数据库binlog日志是否开启:
SHOW VARIABLES LIKE '%log_bin%'
修改配置文件,开启binlog日志:
vim /etc/my.cnf
修改my.cnf,指定日志文件地址:
log-bin=/var/lib/mysql/mysql-bin server-id=123454
mysql根据log-bin配置开启日志,并且自动设置log_bin_index文件为指定的文件名,后缀为.index。server-id参数用于指定一个不能和其他集群中机器重名的字符串,如果只有一台机器,可以随便写。
如果binlog没有配置,canel启动会提示下面的错误:
com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
2022-04-09 23:50:30.543 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
配置后重启MySQL:
service mysqld restart
- 3)创建测试表:
CREATE TABLE `canal_test` ( `id` int(10) NOT NULL AUTO_INCREMENT, `update_time` datetime NOT NULL COMMENT '更新时间', `create_time` datetime NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='CANAL测试表';
- 4)docker安装canal:
# 下载最新的镜像 docker pull canal/canal-server:v1.1.5 # 运行镜像 docker run --name canal -d canal/canal-server # 创建外部配置目录 mkdir /opt/canal/ # 拷贝配置文件到外部目录 docker cp canal:/home/admin/canal-server/conf/canal.properties /opt/canal/canal.properties # 拷贝配置文件到外部目录 docker cp canal:/home/admin/canal-server/conf/example/instance.properties /opt/canal/instance.properties #停止容器 docker stop canal #删除容器 docker rm canal
修改instance.properties的配置项canal.instance.master.address为MySQL实例的访问地址。
# 指定外部配置启动容器
docker run --name canal -p 11111:11111 -d -v /opt/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /opt/canal/canal.properties:/home/admin/canal-server/conf/canal/canal.properties canal/canal-server
# 进入容器
docker exec -it canal /bin/bash
# 查看监听日志
tail -f /home/admin/canal-server/logs/example/example.log
4.测试连接
测试数据监听代码:
@Component
public class CanalStartInitializingBean implements InitializingBean {
// 定义了canel_server的端口号和IP
@Autowired
private CanalConfig canalConfig;
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("start listener....");
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),
"example", "canal", "canal");
//批量数据条数
int batchSize = 1;
try {
connector.connect();
connector.subscribe("test.canal_test");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entryList) {
for (Entry entry : entryList) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
System.out.println("rowChare ======>" + rowChange.toString());
EventType eventType = rowChange.getEventType(); //事件類型,比如insert,update,delete
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名称
entry.getHeader().getLogfileOffset(), //偏移量
entry.getHeader().getSchemaName(),//库名
entry.getHeader().getTableName(), //表名
eventType));//事件名
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
打印出新增数据:
================> binlog[mysql-bin.000006:19333] , name[,] , eventType : QUERY
rowChare ======>eventType: INSERT
isDdl: false
sql: "insert into `canal_test`(update_time, create_time)\nvalues (now(), now())"
ddlSchemaName: "test"
完整Java工程演示代码点击下载。
5.参考
https://blog.csdn.net/q3dxdx/article/details/50955533
https://blog.csdn.net/xiaosong2001/article/details/123326179
https://github.com/alibaba/canal/issues/146
https://www.cnblogs.com/clschao/articles/8192345.html
本文链接:https://www.codingbrick.com/archives/631.html
特别声明:除特别标注,本站文章均为原创,转载请注明作者和出处倾城架构,请勿用于任何商业用途。