golang通过canal同步mysql数据到es,redis

程序员卷不动了 2023-08-25 AM 418℃ 0条
1 、环境准备

ubuntu版本

root@~:/var/log/mysql# cat /etc/issue
Ubuntu 22.04 LTS

mysql版本

root@~:/var/log/mysql# mysql --version
mysql  Ver 8.0.34-0ubuntu0.22.04.1 for Linux on x86_64 ((Ubuntu))
2、主从配置

开启binlog日志

server-id               = 100
log_bin                 = /var/log/mysql/mysql-bin.log
max_binlog_size   = 100M
# 设置binlog格式
binlog_format = ROW

查看

mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000010 |      455 | shop         |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

创建新账号授权给从服务器

ALTER USER 'slave'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
grant select,replication slave,replication client,reload on *.* to 'slave'@'%' # 
FLUSH PRIVILEGES; # 刷新权限
3、使用go canal伪装成mysql的从服务器
增量同步
//  msyql8 数据 同步
import (
    "github.com/go-mysql-org/go-mysql/canal"
    "github.com/go-mysql-org/go-mysql/mysql"
    "github.com/siddontang/go-log/log"
    "go.uber.org/zap"
)

type RowHandler struct {
    canal.DummyEventHandler
}

func (r *RowHandler) OnRow(e *canal.RowsEvent) error {

    log.Infof("table = %s,event=%s data=%v\n", e.Table.String(), e.Action, e.Rows)
    return nil
}

func (r *RowHandler) String() string {
    return ""
}

func main() {
    log.Info("启动")
    var cfg = canal.NewDefaultConfig()
    cfg.Addr = "192.168.2.70:3306"
    cfg.User = "slave"
    cfg.Password = "123456"
    cfg.ServerID = 101

    cfg.Dump.SkipMasterData = true
    cfg.Dump.TableDB = "shop" // 监听的数据库
    cfg.Dump.Tables = []string{"order_seq_2"} // 监听的数据表
    c, err := canal.NewCanal(cfg)

    if err != nil {
        //
        zap.S().Fatal("配置失败", err.Error())
    }

    c.SetEventHandler(&RowHandler{})
    pos := mysql.Position{Name: "mysql-bin.000009", Pos: 1346} // 增量同步数据,需要通过show master status,找到binlog文件名和同步点
    if err = c.RunFrom(pos); err != nil {
        zap.S().Fatal("启动失败", err.Error())
    }
}

启动之后,会根据binlogpos点,去同步后面的数据

全量同步
func main() {
    log.Info("启动")
    var cfg = canal.NewDefaultConfig()
    cfg.Addr = "192.168.2.70:3306"
    cfg.User = "slave"
    cfg.Password = "123456"
    cfg.ServerID = 101

    cfg.Dump.SkipMasterData = true
    cfg.Dump.TableDB = "shop" // 监听的数据库
    cfg.Dump.Tables = []string{"order_seq_2"} // 监听的数据表
    c, err := canal.NewCanal(cfg)

    if err != nil {
        //
        zap.S().Fatal("配置失败", err.Error())
    }

    c.SetEventHandler(&RowHandler{})
    if err = c.Run(pos); err != nil {
        zap.S().Fatal("启动失败", err.Error())
    }
}

全量同步,会通过mysqldumpdump全部数据,去同步数据,所以需要在环境变量中配置mysqldump

标签: canal

非特殊说明,本博所有文章均为博主原创。

上一篇 nginx配置lvs负载均衡详解
下一篇 没有了

评论啦~