利用go-migrate实现MySQL和ClickHouse的数据库迁移

news/2025/2/26 6:53:36

1. 背景

        在使用gorm时 , 尽管已经有了自动建表和钩子函数 . 但是在面临希望了解到数据库的变更 , 和插入一些系统字段时 , 以及最关键的数据库迁移的工作 . gorm显得稍微有点不便 . 

        在了解到migrate这项技术后 , 就使用go-migrate开发了一个可以迁移MySQL和ClickHouse数据库的工具.

2. 实现

2.1 简单介绍

go-migrate在启动后 , 会在数据库中自动生成一张 "schema_migrations"表 , 这张表在mysqlclickhouse中的结构有一定区别.但是主要的字段是相同的.

                                                                clickhouse

"version": 表示版本号
"dirty": 表示执行成功或失败 0:成功 1:失败

2.2 具体实现

先新建一个目录 , 结构可以自己去梳理:

mysql文件夹中存放的是 mysql数据库相关变迁的sql语句

clickhouse文件夹存放的是clickhouse数据库相关变迁的sql语句

migrate.txt只是为了开发人员更好了解到当前执行到什么版本了

所有的sql文件前需要一个版本号,保证是唯一的. 

.up: 表示的是需要执行的sql

如果希望自动回滚 , 可以在每一个版本的sql文件后 , 在新建一个sql文件. 且 .up 替换为 .down. 即可自动回滚.

# migrate.txt
1_waf_top_mysql_create_app_waf_table.up.sql
2_waf_top_mysql_create_server_waf_table.up.sql
3_waf_top_mysql_create_waf_allow_list_table.up.sql
4_waf_top_mysql_create_waf_buildin_rule_table.up.sql
5_waf_top_mysql_create_waf_rule_group_table.up.sql
6_waf_top_mysql_create_waf_server_allow_table.up.sql
7_waf_top_mysql_create_waf_servers_strategies_table.up.sql
8_waf_top_mysql_create_waf_strategy_table.up.sql
9_waf_top_mysql_create_waf_strategy_config_table.up.sql
10_waf_top_mysql_create_waf_user_rule_table.up.sql
11_waf_user_mysql_create_waf_user_info_table.up.sql
12_dash_borad_ck_create_sec_log_table.up.sql
13_waf_top_mysql_insert_buildin_rule_data.up.sql
14_waf_top_mysql_insert_waf_rule_group_data.up.sql
15_waf_top_mysql_alter_server_waf_desc.up.sql

前缀数字表示的就是版本号

package migrate

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"path/filepath"
	"time"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/go-redis/redis/v8"
	"github.com/golang-migrate/migrate/v4"
	"github.com/golang-migrate/migrate/v4/database"
	chMigrate "github.com/golang-migrate/migrate/v4/database/clickhouse"
	mysqlMigrate "github.com/golang-migrate/migrate/v4/database/mysql"
	_ "github.com/golang-migrate/migrate/v4/source/file"
	"github.com/google/uuid"
	"github.com/sirupsen/logrus"
	"wafconsole/utils/redislock"

	_ "github.com/go-sql-driver/mysql"
)

// Config 迁移配置
type Config struct {
	AppName       string
	MySqlDSN      string
	ClickHouseDSN string
	RedisAddr     string
	RedisPassword string
	RedisDB       int
	MigrationDir  string // 指向 migrations 父目录(包含 mysqlclickhouse 子目录)
	LockTimeout   time.Duration
	TargetVersion uint
}

// DatabaseMigrator 数据库迁移器
type DatabaseMigrator struct {
	mysqlDB      *sql.DB
	clickhouseDB *sql.DB
	redisClient  *redis.Client
	config       *Config
	lockID       string
}

// NewDatabaseMigrator 创建新实例
func NewDatabaseMigrator(cfg *Config) (*DatabaseMigrator, error) {
	// 初始化MySQL连接
	mysqlDB, err := sql.Open("mysql", cfg.MySqlDSN)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to MySQL: %w", err)
	}

	// 验证MySQL连接
	if err = mysqlDB.Ping(); err != nil {
		return nil, fmt.Errorf("MySQL ping failed: %w", err)
	}

	// 初始化ClickHouse连接
	clickhouseDB, err := sql.Open("clickhouse", cfg.ClickHouseDSN)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err)
	}

	// 验证ClickHouse连接
	if err = clickhouseDB.Ping(); err != nil {
		return nil, fmt.Errorf("ClickHouse ping failed: %w", err)
	}

	// 初始化Redis客户端
	rdb := redis.NewClient(&redis.Options{
		Addr:     cfg.RedisAddr,
		Password: cfg.RedisPassword,
		DB:       cfg.RedisDB,
	})

	// 验证Redis连接
	if err = rdb.Ping(context.Background()).Err(); err != nil {
		return nil, fmt.Errorf("redis connection failed: %w", err)
	}

	return &DatabaseMigrator{
		mysqlDB:      mysqlDB,
		clickhouseDB: clickhouseDB,
		redisClient:  rdb,
		config:       cfg,
		lockID:       uuid.New().String(),
	}, nil
}

// Run 执行全量迁移
func (m *DatabaseMigrator) Run(ctx context.Context) error {
	lockKey := "database_migration_lock"
	rdLock := redislock.NewRedisLock(m.redisClient, m.config.LockTimeout)

	if err := rdLock.AcquireLock(ctx, lockKey); err != nil {
		return fmt.Errorf("failed to acquire lock: %w", err)
	}
	defer func() {
		if err := rdLock.ReleaseLock(ctx, lockKey); err != nil {
			logrus.Errorf("Failed to release lock: %v", err)
		}
	}()

	if err := m.migrateMySQL(ctx); err != nil {
		return fmt.Errorf("MySQL migration failed: %w", err)
	}

	if err := m.migrateClickHouse(ctx); err != nil {
		return fmt.Errorf("ClickHouse migration failed: %w", err)
	}

	return nil
}

// MySQL 迁移
func (m *DatabaseMigrator) migrateMySQL(ctx context.Context) error {
	driver, err := mysqlMigrate.WithInstance(m.mysqlDB, &mysqlMigrate.Config{})
	if err != nil {
		return fmt.Errorf("failed to create MySQL driver: %w", err)
	}
	return m.runMigration(ctx, driver, "mysql")
}

// ClickHouse 迁移
func (m *DatabaseMigrator) migrateClickHouse(ctx context.Context) error {
	driver, err := chMigrate.WithInstance(m.clickhouseDB, &chMigrate.Config{})
	if err != nil {
		return fmt.Errorf("failed to create ClickHouse driver: %w", err)
	}
	return m.runMigration(ctx, driver, "clickhouse")
}

// 通用迁移逻辑
func (m *DatabaseMigrator) runMigration(
	ctx context.Context,
	driver database.Driver,
	dbType string,
) error {
	// 获取原始路径
	migratePath := filepath.Join(m.config.MigrationDir, dbType)

	// 强制转换为 URL 兼容的斜杠格式
	migratePath = filepath.ToSlash(migratePath)

	// 构建 URL
	sourceURL := fmt.Sprintf("file://%s", migratePath)

	// 初始化迁移实例
	migrator, err := migrate.NewWithDatabaseInstance(sourceURL, dbType, driver)
	if err != nil {
		return fmt.Errorf("failed to initialize migrator: %w", err)
	}
	defer migrator.Close()

	

	// 执行迁移
	var migrationErr error
	if m.config.TargetVersion > 0 {
		migrationErr = migrator.Migrate(m.config.TargetVersion)
	} else {
		migrationErr = migrator.Up()
	}

	// 处理迁移结果
	if migrationErr != nil && !errors.Is(migrationErr, migrate.ErrNoChange) {
		return fmt.Errorf("migration failed: %w", migrationErr)
	}

	logrus.Infof("%s migration completed successfully", dbType)
	return nil
}

// Close 关闭资源(保持不变)
func (m *DatabaseMigrator) Close() error {
	var errs []error

	if err := m.mysqlDB.Close(); err != nil {
		errs = append(errs, fmt.Errorf("MySQL close error: %w", err))
	}

	if err := m.clickhouseDB.Close(); err != nil {
		errs = append(errs, fmt.Errorf("ClickHouse close error: %w", err))
	}

	if err := m.redisClient.Close(); err != nil {
		errs = append(errs, fmt.Errorf("Redis close error: %w", err))
	}

	if len(errs) > 0 {
		return fmt.Errorf("errors occurred during shutdown: %v", errs)
	}
	return nil
}

2.3 优化方式

2.3.1 脏版本处理

1. 在执行过程中 , 可能会出现一些因sql语句错误而执行失败  . migrate实现了清洗脏版本的功能.加在通用迁移逻辑 初始化迁移实列后即可.

    // 检查是否为脏版本
	version, dirty, err := migrator.Version()
	if err != nil && !errors.Is(err, migrate.ErrNilVersion) {
		return fmt.Errorf("failed to check version: %w", err)
	}
	if dirty {
		// 强制清除脏状态
		if err = migrator.Force(int(version)); err != nil {
			return fmt.Errorf("failed to force clean version: %w", err)
		}
	}

2. 或者手动修改 表中的版本号 , 修改到上一个版本(即sql文件最开始的数字) , 且状态改为0. 因为当migrate检测到为1执行失败后  , 就不在继续执行了.

2.3.2 分布式锁防止并发情况下 , 同时执行多个迁移操作

    lockKey := "database_migration_lock"
	rdLock := redislock.NewRedisLock(m.redisClient, m.config.LockTimeout)

	if err := rdLock.AcquireLock(ctx, lockKey); err != nil {
		return fmt.Errorf("failed to acquire lock: %w", err)
	}
	defer func() {
		if err := rdLock.ReleaseLock(ctx, lockKey); err != nil {
			logrus.Errorf("Failed to release lock: %v", err)
		}
	}()

上面具体实现中已经包含了这段代码 , 是我自己封装的一个redis分布式锁的实现. 这段代码 , 如果不需要可以删除 , 如有需要 , 可以自己实现一个简单的redis分布式锁即可.


http://www.niftyadmin.cn/n/5868256.html

相关文章

Sqlserver安全篇之_隐藏实例功能和禁用SQL Server Browser服务

总结: 1、隐藏实例功能和禁用SQL Server Browser服务的功能一样,对应非默认实例(且这个默认实例是1433端口)的情况下,都是需要在连接字符串中提供端口号才能连接到实例 2、隐藏实例功能后,就算开启了SQL Server Browser服务&#…

JVM生产环境问题定位与解决实战(三):揭秘Java飞行记录器(JFR)的强大功能

提到飞行记录器,或许你的脑海中并未立刻浮现出清晰的画面,但一说起“黑匣子”,想必大多数人都能恍然大悟,知晓其重要性及用途。在航空领域,黑匣子作为不可或缺的设备,默默记录着飞行过程中的每一项关键数据…

1.1部署es:9200

安装es:root用户: 1.布署java环境 - 所有节点 wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm yum localinstall jdk-8u341-linux-x64.rpm -y java -version 2.下载安装elasticsearch - 所有节点 wget ftp://10.3.148.254/Note/Elk/…

Java进阶学习笔记7——权限修饰符

什么是权限修饰符? 就是用来限制类中的成员(成员变量、成员方法、构造器、代码块…)能够被访问的范围。 protected使用的比较少,但是程序员还是要阅读代码,看官方文档是怎么写的,都会接触到protected修饰…

Linux 之 Centos 安装Consul

sudo yum install -y yum-utils sudo yum-config-manager --add-repo https://rpm.releases.hashicorp.com/RHEL/hashicorp.repo sudo yum -y install consul他有两种运行模式server和client。每个数据中心官方建议需要3或5个server节点以保证数据安全,同时保证serv…

Git -版本管理工具 -常用API整理

文章目录 前言常用API1. 设置本地的名称2. 创建仓库3. 克隆远程仓库4. 切换检索当前分支5. 拉取并合并主干代码6. 推送代码到指定分支7. 提交到本地仓库 commit8. 本地代码 commit 后不想推送到远程分支回滚 注意事项结束语 每个人都在主宰自己的命运,人有选择&…

电脑软件:推荐一款非常强大的视频音频转换剪辑工具FFmpeg Batch AV Converter V3.12

目录 一、软件介绍 二、软件功能 三、软件特点 四、使用技巧 在多媒体处理领域,FFmpeg Batch AV Converter以其强大的功能和便捷的操作方式,成为了许多用户的首选工具。这款软件以其高效的批量处理能力、直观的图形用户界面(GUI&#xff…

从零开始:在 MacOS 中通过 Docker 部署跨平台 Redis 服务(支持 Ubuntu 迁移)

前言 在开发和生产环境中,Docker 已成为部署服务的标准工具。但对于使用 MacOS(尤其是 Apple Silicon 架构)的开发者,将本地构建的镜像迁移到 x86-64 架构的 Ubuntu 服务器时,常会遇到平台兼容性问题。本文将提供一套…