背景
全量同步ES索引数据,Redis数据。
项目目录结构:
go mod init 是 wgg-net/deep/starter
Es同步
es_export.go
package shell
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"time"
"github.com/olivere/elastic/v7"
)
type ESExporter struct {
client *elastic.Client
index string
}
func NewESExporter(urls []string, index, user, paswd string) (*ESExporter, error) {
client, err := elastic.NewClient(
elastic.SetURL(urls...),
elastic.SetSniff(false),
elastic.SetBasicAuth(user, paswd),
)
if err != nil {
return nil, err
}
return &ESExporter{
client: client,
index: index,
}, nil
}
func (e *ESExporter) ExportToFile(filename string, batchSize int) error {
ctx := context.Background()
// 验证连接
info, err := e.client.NodesInfo().Do(ctx)
if err != nil {
return fmt.Errorf("ES连接验证失败: %v", err)
}
// 获取第一个节点的信息
var nodeID string
for id := range info.Nodes {
nodeID = id
break
}
fmt.Printf("成功连接到ES集群,版本: %s\n", info.Nodes[nodeID].Version)
// 获取索引文档总数
count, err := e.client.Count(e.index).Do(ctx)
if err != nil {
return fmt.Errorf("获取索引数据量失败: %v", err)
}
fmt.Printf("索引 %s 共有 %d 条数据\n", e.index, count)
// 创建scroll服务
scroll := e.client.Scroll(e.index).
Size(batchSize).
KeepAlive("5m") // 增加 scroll 保持时间
// 创建输出文件
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("创建文件失败: %v", err)
}
defer file.Close()
// 创建带缓冲的writer
bufferedWriter := bufio.NewWriterSize(file, 32*1024) // 32KB 缓冲区
defer bufferedWriter.Flush() // 确保在函数结束时将缓冲区数据写入磁盘
encoder := json.NewEncoder(bufferedWriter)
startTime := time.Now()
var total int64 = 0
scrollID := ""
for {
var results *elastic.SearchResult
if scrollID == "" {
results, err = scroll.Do(ctx)
} else {
results, err = e.client.Scroll().
ScrollId(scrollID).
KeepAlive("5m").
Do(ctx)
}
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("scroll查询失败: %v", err)
}
scrollID = results.ScrollId
hits := results.Hits.Hits
if len(hits) == 0 {
break
}
// 处理批次数据
for _, hit := range hits {
var doc map[string]interface{}
if err := json.Unmarshal(hit.Source, &doc); err != nil {
return fmt.Errorf("解析文档失败: %v", err)
}
if err := encoder.Encode(doc); err != nil {
return fmt.Errorf("写入文件失败: %v", err)
}
total++
}
fmt.Printf("已导出 %d/%d 条数据\r\n", total, count)
}
// 清理scroll
if scrollID != "" {
e.client.ClearScroll().ScrollId(scrollID).Do(ctx)
}
duration := time.Since(startTime)
fmt.Printf("\n导出完成!共导出 %d 条数据,耗时:%v\n", total, duration)
return nil
}
// 添加导入相关的方法
func (e *ESExporter) ImportFromFile(filename string, batchSize int) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
// 创建带缓冲的reader
bufferedReader := bufio.NewReaderSize(file, 32*1024) // 32KB 缓冲区
decoder := json.NewDecoder(bufferedReader)
bulk := e.client.Bulk()
count := 0
total := 0
startTime := time.Now()
for {
// 读取一条记录
var doc map[string]interface{}
if err := decoder.Decode(&doc); err != nil {
if err.Error() == "EOF" {
break
}
return err
}
// 创建索引请求
req := elastic.NewBulkIndexRequest().
Index(e.index).
Doc(doc)
bulk.Add(req)
count++
total++
// 达到批次大小,执行批量导入
if count >= batchSize {
_, err := bulk.Do(context.Background())
if err != nil {
return err
}
bulk = e.client.Bulk()
count = 0
fmt.Printf("已导入 %d 条数据\n", total)
}
}
// 处理剩余的数据
if bulk.NumberOfActions() > 0 {
_, err := bulk.Do(context.Background())
if err != nil {
return err
}
}
duration := time.Since(startTime)
fmt.Printf("导入完成!共导入 %d 条数据,耗时:%v\n", total, duration)
return nil
}
func (e *ESExporter) ImportFromFilePool(filename string, batchSize int) error {
file, err := os.Open(filename)
if err != nil {
return fmt.Errorf("打开文件失败: %v", err)
}
defer file.Close()
// 创建带缓冲的reader,增加缓冲区大小
bufferedReader := bufio.NewReaderSize(file, 128*1024) // 增加到128KB
decoder := json.NewDecoder(bufferedReader)
// 创建工作池
workers := 3
jobs := make(chan map[string]interface{}, batchSize)
results := make(chan error, workers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动工作协程
for i := 0; i < workers; i++ {
go func() {
bulk := e.client.Bulk()
count := 0
for doc := range jobs {
select {
case <-ctx.Done():
results <- nil
return
default:
req := elastic.NewBulkIndexRequest().
Index(e.index).
Doc(doc)
bulk.Add(req)
count++
if count >= batchSize/workers {
// 执行批量导入
for retries := 0; retries < 3; retries++ {
_, err := bulk.Do(ctx)
if err == nil {
break
}
if retries == 2 {
results <- fmt.Errorf("批量导入失败: %v", err)
return
}
time.Sleep(time.Second * time.Duration(retries+1))
}
bulk = e.client.Bulk()
count = 0
}
}
}
// 处理剩余的数据
if bulk.NumberOfActions() > 0 {
_, err := bulk.Do(ctx)
results <- err
} else {
results <- nil
}
}()
}
total := 0
startTime := time.Now()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
// 读取数据并分发给工作协程
go func() {
defer close(jobs)
for {
var doc map[string]interface{}
if err := decoder.Decode(&doc); err != nil {
if err.Error() == "EOF" {
return
}
cancel()
return
}
select {
case <-ctx.Done():
return
case jobs <- doc:
total++
if total%1000 == 0 {
fmt.Printf("已读取 %d 条数据\n", total)
}
}
}
}()
// 等待所有工作协程完成
for i := 0; i < workers; i++ {
if err := <-results; err != nil {
cancel()
return fmt.Errorf("导入过程出错: %v", err)
}
}
duration := time.Since(startTime)
fmt.Printf("导入完成!共导入 %d 条数据,耗时:%v\n", total, duration)
return nil
}
redis同步
redis_export.go
package shell
import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
"time"
"github.com/go-redis/redis/v8"
)
type RedisExporter struct {
client *redis.Client
}
type RedisData struct {
Key string `json:"key"`
Value interface{} `json:"value"`
Type string `json:"type"`
TTL time.Duration `json:"ttl"`
}
func NewRedisExporter(addr, password string, db int) (*RedisExporter, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})
// 测试连接
ctx := context.Background()
if err := client.Ping(ctx).Err(); err != nil {
return nil, err
}
return &RedisExporter{
client: client,
}, nil
}
func (e *RedisExporter) ExportToFile(filename string) error {
ctx := context.Background()
// 创建输出文件
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("创建文件失败: %v", err)
}
defer file.Close()
// 创建带缓冲的writer
bufferedWriter := bufio.NewWriterSize(file, 32*1024) // 32KB 缓冲区
defer bufferedWriter.Flush() // 确保在函数结束时将缓冲区数据写入磁盘
encoder := json.NewEncoder(bufferedWriter)
// 使用SCAN命令遍历所有键
var cursor uint64
var total int64
startTime := time.Now()
for {
var keys []string
var err error
keys, cursor, err = e.client.Scan(ctx, cursor, "*", 100).Result()
if err != nil {
return fmt.Errorf("扫描键失败: %v", err)
}
// 处理当前批次的键
for _, key := range keys {
// 获取键的类型
keyType, err := e.client.Type(ctx, key).Result()
if err != nil {
return fmt.Errorf("获取键类型失败: %v", err)
}
// 获取TTL
ttl := e.client.TTL(ctx, key).Val()
// 根据类型获取值
var value interface{}
switch keyType {
case "string":
value, err = e.client.Get(ctx, key).Result()
case "hash":
value, err = e.client.HGetAll(ctx, key).Result()
case "list":
value, err = e.client.LRange(ctx, key, 0, -1).Result()
case "set":
value, err = e.client.SMembers(ctx, key).Result()
case "zset":
value, err = e.client.ZRangeWithScores(ctx, key, 0, -1).Result()
}
if err != nil {
return fmt.Errorf("获取键值失败: %v", err)
}
// 写入文件
data := RedisData{
Key: key,
Value: value,
Type: keyType,
TTL: ttl,
}
if err := encoder.Encode(data); err != nil {
return fmt.Errorf("写入文件失败: %v", err)
}
total++
}
if cursor == 0 {
break
}
}
duration := time.Since(startTime)
fmt.Printf("导出完成!共导出 %d 个键,耗时:%v\n", total, duration)
return nil
}
func (e *RedisExporter) ImportFromFile(filename string) error {
ctx := context.Background()
file, err := os.Open(filename)
if err != nil {
return fmt.Errorf("打开文件失败: %v", err)
}
defer file.Close()
bufferedReader := bufio.NewReaderSize(file, 32*1024)
decoder := json.NewDecoder(bufferedReader)
var total int64
startTime := time.Now()
// 创建管道用于批量操作
pipeline := e.client.Pipeline()
batchSize := 1000
count := 0
for {
var data RedisData
if err := decoder.Decode(&data); err != nil {
if err.Error() == "EOF" {
break
}
return fmt.Errorf("解析文件失败: %v", err)
}
// 添加到管道
switch data.Type {
case "string":
pipeline.Set(ctx, data.Key, data.Value, data.TTL)
case "hash":
hashData := data.Value.(map[string]interface{})
pipeline.HMSet(ctx, data.Key, hashData)
// ... 其他类型处理 ...
}
if data.TTL > 0 {
pipeline.Expire(ctx, data.Key, data.TTL)
}
count++
total++
// 批量执行
if count >= batchSize {
_, err := pipeline.Exec(ctx)
if err != nil {
return fmt.Errorf("批量导入失败: %v", err)
}
pipeline = e.client.Pipeline()
count = 0
fmt.Printf("已导入 %d 条数据\n", total)
}
}
// 处理剩余的数据
if count > 0 {
_, err := pipeline.Exec(ctx)
if err != nil {
return fmt.Errorf("批量导入失败: %v", err)
}
}
duration := time.Since(startTime)
fmt.Printf("导入完成!共导入 %d 条数据,耗时:%v\n", total, duration)
return nil
}
gen.go
package main
import (
"log"
"wgg-net/deep/starter/shell"
)
const (
// 导出时的批次大小
batchSize = 10
RedisExportFile = "redis_backup.json"
EsExportFile = "es_backup.json"
)
func main() {
// 创建导出器
EsExportToFile()
//EsImportFromFile()
RedisExportToFile()
//RedisImportFromFile()
}
// ===============================================
func EsExportToFile() {
// 创建导出器
eSExporter, err := shell.NewESExporter(
[]string{"http://localhost:9200"},
"mock_user_info", "root", "root",
)
if err != nil {
panic(err)
}
// 执行导出,每批处理1000条数据
err = eSExporter.ExportToFile(EsExportFile, batchSize)
if err != nil {
log.Fatalf("创建导出器失败: %v", err)
panic(err)
}
}
func EsImportFromFile() {
// 2. 导入数据(可以导入到同一个或不同的索引)
eSImporter, err := shell.NewESExporter(
[]string{"http://localhost:9200"},
"mock_user_info_test", "root", "root",
)
if err != nil {
log.Fatal(err)
}
err = eSImporter.ImportFromFilePool(EsExportFile, batchSize)
if err != nil {
log.Fatal(err)
}
}
//===============================================
func RedisExportToFile() {
// 创建导出器
redisHanddler, err := shell.NewRedisExporter(
"localhost:6380", // Redis地址
"KYUIAPErZz2Xb0f8XIQOKfuv7dYURu", // 密码
10, // 数据库编号
)
if err != nil {
log.Fatal(err)
}
// 导出数据
err = redisHanddler.ExportToFile(RedisExportFile)
if err != nil {
log.Fatal(err)
}
}
func RedisImportFromFile() {
// 创建导出器
redisHanddler, err := shell.NewRedisExporter(
"localhost:6380", // Redis地址
"KYUIAPErZz2Xb0f8XIQOKfuv7dYURu", // 密码
18,
)
if err != nil {
log.Fatal(err)
}
// 导入数据
err = redisHanddler.ImportFromFile(RedisExportFile)
if err != nil {
log.Fatal(err)
}
}
测试
go run shell/gen/gen.go