Go脚本同步Es Redis

背景

全量同步ES索引数据,Redis数据。

项目目录结构:

go mod init 是 wgg-net/deep/starter

Es同步

  • es_export.go
package shell

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"os"
	"time"

	"github.com/olivere/elastic/v7"
)

type ESExporter struct {
	client *elastic.Client
	index  string
}

func NewESExporter(urls []string, index, user, paswd string) (*ESExporter, error) {
	client, err := elastic.NewClient(
		elastic.SetURL(urls...),
		elastic.SetSniff(false),
		elastic.SetBasicAuth(user, paswd),
	)
	if err != nil {
		return nil, err
	}
	return &ESExporter{
		client: client,
		index:  index,
	}, nil
}

func (e *ESExporter) ExportToFile(filename string, batchSize int) error {
	ctx := context.Background()

	// 验证连接
	info, err := e.client.NodesInfo().Do(ctx)
	if err != nil {
		return fmt.Errorf("ES连接验证失败: %v", err)
	}
	// 获取第一个节点的信息
	var nodeID string
	for id := range info.Nodes {
		nodeID = id
		break
	}
	fmt.Printf("成功连接到ES集群,版本: %s\n", info.Nodes[nodeID].Version)

	// 获取索引文档总数
	count, err := e.client.Count(e.index).Do(ctx)
	if err != nil {
		return fmt.Errorf("获取索引数据量失败: %v", err)
	}
	fmt.Printf("索引 %s 共有 %d 条数据\n", e.index, count)

	// 创建scroll服务
	scroll := e.client.Scroll(e.index).
		Size(batchSize).
		KeepAlive("5m") // 增加 scroll 保持时间

	// 创建输出文件
	file, err := os.Create(filename)
	if err != nil {
		return fmt.Errorf("创建文件失败: %v", err)
	}
	defer file.Close()

	// 创建带缓冲的writer
	bufferedWriter := bufio.NewWriterSize(file, 32*1024) // 32KB 缓冲区
	defer bufferedWriter.Flush()                         // 确保在函数结束时将缓冲区数据写入磁盘

	encoder := json.NewEncoder(bufferedWriter)

	startTime := time.Now()
	var total int64 = 0
	scrollID := ""

	for {
		var results *elastic.SearchResult
		if scrollID == "" {
			results, err = scroll.Do(ctx)
		} else {
			results, err = e.client.Scroll().
				ScrollId(scrollID).
				KeepAlive("5m").
				Do(ctx)
		}

		if err != nil {
			if err == io.EOF {
				break
			}
			return fmt.Errorf("scroll查询失败: %v", err)
		}

		scrollID = results.ScrollId
		hits := results.Hits.Hits

		if len(hits) == 0 {
			break
		}

		// 处理批次数据
		for _, hit := range hits {
			var doc map[string]interface{}
			if err := json.Unmarshal(hit.Source, &doc); err != nil {
				return fmt.Errorf("解析文档失败: %v", err)
			}

			if err := encoder.Encode(doc); err != nil {
				return fmt.Errorf("写入文件失败: %v", err)
			}

			total++
		}

		fmt.Printf("已导出 %d/%d 条数据\r\n", total, count)
	}

	// 清理scroll
	if scrollID != "" {
		e.client.ClearScroll().ScrollId(scrollID).Do(ctx)
	}

	duration := time.Since(startTime)
	fmt.Printf("\n导出完成!共导出 %d 条数据,耗时:%v\n", total, duration)
	return nil
}

// 添加导入相关的方法
func (e *ESExporter) ImportFromFile(filename string, batchSize int) error {
	file, err := os.Open(filename)
	if err != nil {
		return err
	}
	defer file.Close()

	// 创建带缓冲的reader
	bufferedReader := bufio.NewReaderSize(file, 32*1024) // 32KB 缓冲区
	decoder := json.NewDecoder(bufferedReader)

	bulk := e.client.Bulk()
	count := 0
	total := 0
	startTime := time.Now()

	for {
		// 读取一条记录
		var doc map[string]interface{}
		if err := decoder.Decode(&doc); err != nil {
			if err.Error() == "EOF" {
				break
			}
			return err
		}

		// 创建索引请求
		req := elastic.NewBulkIndexRequest().
			Index(e.index).
			Doc(doc)

		bulk.Add(req)
		count++
		total++

		// 达到批次大小,执行批量导入
		if count >= batchSize {
			_, err := bulk.Do(context.Background())
			if err != nil {
				return err
			}
			bulk = e.client.Bulk()
			count = 0
			fmt.Printf("已导入 %d 条数据\n", total)
		}
	}

	// 处理剩余的数据
	if bulk.NumberOfActions() > 0 {
		_, err := bulk.Do(context.Background())
		if err != nil {
			return err
		}
	}

	duration := time.Since(startTime)
	fmt.Printf("导入完成!共导入 %d 条数据,耗时:%v\n", total, duration)
	return nil
}

func (e *ESExporter) ImportFromFilePool(filename string, batchSize int) error {
	file, err := os.Open(filename)
	if err != nil {
		return fmt.Errorf("打开文件失败: %v", err)
	}
	defer file.Close()

	// 创建带缓冲的reader,增加缓冲区大小
	bufferedReader := bufio.NewReaderSize(file, 128*1024) // 增加到128KB
	decoder := json.NewDecoder(bufferedReader)

	// 创建工作池
	workers := 3
	jobs := make(chan map[string]interface{}, batchSize)
	results := make(chan error, workers)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 启动工作协程
	for i := 0; i < workers; i++ {
		go func() {
			bulk := e.client.Bulk()
			count := 0
			for doc := range jobs {
				select {
				case <-ctx.Done():
					results <- nil
					return
				default:
					req := elastic.NewBulkIndexRequest().
						Index(e.index).
						Doc(doc)
					bulk.Add(req)
					count++

					if count >= batchSize/workers {
						// 执行批量导入
						for retries := 0; retries < 3; retries++ {
							_, err := bulk.Do(ctx)
							if err == nil {
								break
							}
							if retries == 2 {
								results <- fmt.Errorf("批量导入失败: %v", err)
								return
							}
							time.Sleep(time.Second * time.Duration(retries+1))
						}
						bulk = e.client.Bulk()
						count = 0
					}
				}
			}
			// 处理剩余的数据
			if bulk.NumberOfActions() > 0 {
				_, err := bulk.Do(ctx)
				results <- err
			} else {
				results <- nil
			}
		}()
	}

	total := 0
	startTime := time.Now()
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	// 读取数据并分发给工作协程
	go func() {
		defer close(jobs)
		for {
			var doc map[string]interface{}
			if err := decoder.Decode(&doc); err != nil {
				if err.Error() == "EOF" {
					return
				}
				cancel()
				return
			}
			select {
			case <-ctx.Done():
				return
			case jobs <- doc:
				total++
				if total%1000 == 0 {
					fmt.Printf("已读取 %d 条数据\n", total)
				}
			}
		}
	}()

	// 等待所有工作协程完成
	for i := 0; i < workers; i++ {
		if err := <-results; err != nil {
			cancel()
			return fmt.Errorf("导入过程出错: %v", err)
		}
	}

	duration := time.Since(startTime)
	fmt.Printf("导入完成!共导入 %d 条数据,耗时:%v\n", total, duration)
	return nil
}

redis同步

  • redis_export.go
package shell

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"os"
	"time"

	"github.com/go-redis/redis/v8"
)

type RedisExporter struct {
	client *redis.Client
}

type RedisData struct {
	Key   string        `json:"key"`
	Value interface{}   `json:"value"`
	Type  string        `json:"type"`
	TTL   time.Duration `json:"ttl"`
}

func NewRedisExporter(addr, password string, db int) (*RedisExporter, error) {
	client := redis.NewClient(&redis.Options{
		Addr:     addr,
		Password: password,
		DB:       db,
	})

	// 测试连接
	ctx := context.Background()
	if err := client.Ping(ctx).Err(); err != nil {
		return nil, err
	}

	return &RedisExporter{
		client: client,
	}, nil
}

func (e *RedisExporter) ExportToFile(filename string) error {
	ctx := context.Background()

	// 创建输出文件
	file, err := os.Create(filename)
	if err != nil {
		return fmt.Errorf("创建文件失败: %v", err)
	}
	defer file.Close()

	// 创建带缓冲的writer
	bufferedWriter := bufio.NewWriterSize(file, 32*1024) // 32KB 缓冲区
	defer bufferedWriter.Flush()                         // 确保在函数结束时将缓冲区数据写入磁盘

	encoder := json.NewEncoder(bufferedWriter)

	// 使用SCAN命令遍历所有键
	var cursor uint64
	var total int64
	startTime := time.Now()

	for {
		var keys []string
		var err error
		keys, cursor, err = e.client.Scan(ctx, cursor, "*", 100).Result()
		if err != nil {
			return fmt.Errorf("扫描键失败: %v", err)
		}

		// 处理当前批次的键
		for _, key := range keys {
			// 获取键的类型
			keyType, err := e.client.Type(ctx, key).Result()
			if err != nil {
				return fmt.Errorf("获取键类型失败: %v", err)
			}

			// 获取TTL
			ttl := e.client.TTL(ctx, key).Val()

			// 根据类型获取值
			var value interface{}
			switch keyType {
			case "string":
				value, err = e.client.Get(ctx, key).Result()
			case "hash":
				value, err = e.client.HGetAll(ctx, key).Result()
			case "list":
				value, err = e.client.LRange(ctx, key, 0, -1).Result()
			case "set":
				value, err = e.client.SMembers(ctx, key).Result()
			case "zset":
				value, err = e.client.ZRangeWithScores(ctx, key, 0, -1).Result()
			}

			if err != nil {
				return fmt.Errorf("获取键值失败: %v", err)
			}

			// 写入文件
			data := RedisData{
				Key:   key,
				Value: value,
				Type:  keyType,
				TTL:   ttl,
			}
			if err := encoder.Encode(data); err != nil {
				return fmt.Errorf("写入文件失败: %v", err)
			}

			total++
		}

		if cursor == 0 {
			break
		}
	}

	duration := time.Since(startTime)
	fmt.Printf("导出完成!共导出 %d 个键,耗时:%v\n", total, duration)
	return nil
}

func (e *RedisExporter) ImportFromFile(filename string) error {
	ctx := context.Background()

	file, err := os.Open(filename)
	if err != nil {
		return fmt.Errorf("打开文件失败: %v", err)
	}
	defer file.Close()

	bufferedReader := bufio.NewReaderSize(file, 32*1024)
	decoder := json.NewDecoder(bufferedReader)

	var total int64
	startTime := time.Now()

	// 创建管道用于批量操作
	pipeline := e.client.Pipeline()
	batchSize := 1000
	count := 0

	for {
		var data RedisData
		if err := decoder.Decode(&data); err != nil {
			if err.Error() == "EOF" {
				break
			}
			return fmt.Errorf("解析文件失败: %v", err)
		}

		// 添加到管道
		switch data.Type {
		case "string":
			pipeline.Set(ctx, data.Key, data.Value, data.TTL)
		case "hash":
			hashData := data.Value.(map[string]interface{})
			pipeline.HMSet(ctx, data.Key, hashData)
			// ... 其他类型处理 ...
		}

		if data.TTL > 0 {
			pipeline.Expire(ctx, data.Key, data.TTL)
		}

		count++
		total++

		// 批量执行
		if count >= batchSize {
			_, err := pipeline.Exec(ctx)
			if err != nil {
				return fmt.Errorf("批量导入失败: %v", err)
			}
			pipeline = e.client.Pipeline()
			count = 0
			fmt.Printf("已导入 %d 条数据\n", total)
		}
	}

	// 处理剩余的数据
	if count > 0 {
		_, err := pipeline.Exec(ctx)
		if err != nil {
			return fmt.Errorf("批量导入失败: %v", err)
		}
	}

	duration := time.Since(startTime)
	fmt.Printf("导入完成!共导入 %d 条数据,耗时:%v\n", total, duration)
	return nil
}

  • gen.go
package main

import (
	"log"

	"wgg-net/deep/starter/shell"
)

const (
	// 导出时的批次大小
	batchSize = 10


	RedisExportFile = "redis_backup.json"
	EsExportFile    = "es_backup.json"
)

func main() {
	// 创建导出器
	EsExportToFile()
	//EsImportFromFile()
	RedisExportToFile()
	//RedisImportFromFile()
}

// ===============================================
func EsExportToFile() {
	// 创建导出器
	eSExporter, err := shell.NewESExporter(
		[]string{"http://localhost:9200"},
		"mock_user_info", "root", "root",
	)
	if err != nil {
		panic(err)
	}

	// 执行导出,每批处理1000条数据
	err = eSExporter.ExportToFile(EsExportFile, batchSize)
	if err != nil {
		log.Fatalf("创建导出器失败: %v", err)
		panic(err)
	}

}

func EsImportFromFile() {

	// 2. 导入数据(可以导入到同一个或不同的索引)
	eSImporter, err := shell.NewESExporter(
		[]string{"http://localhost:9200"},
		"mock_user_info_test", "root", "root",
	)
	if err != nil {
		log.Fatal(err)
	}

	err = eSImporter.ImportFromFilePool(EsExportFile, batchSize)
	if err != nil {
		log.Fatal(err)
	}
}

//===============================================

func RedisExportToFile() {
	// 创建导出器
	redisHanddler, err := shell.NewRedisExporter(
		"localhost:6380",                 // Redis地址
		"KYUIAPErZz2Xb0f8XIQOKfuv7dYURu", // 密码
		10,                               // 数据库编号
	)
	if err != nil {
		log.Fatal(err)
	}

	// 导出数据
	err = redisHanddler.ExportToFile(RedisExportFile)
	if err != nil {
		log.Fatal(err)
	}

}

func RedisImportFromFile() {
	// 创建导出器
	redisHanddler, err := shell.NewRedisExporter(
		"localhost:6380",                 // Redis地址
		"KYUIAPErZz2Xb0f8XIQOKfuv7dYURu", // 密码
		18,
	)
	if err != nil {
		log.Fatal(err)
	}

	// 导入数据
	err = redisHanddler.ImportFromFile(RedisExportFile)
	if err != nil {
		log.Fatal(err)
	}
}

测试

 go run shell/gen/gen.go 
大数据

spark大数据平台搭建(spark大数据平台的基本构架)

2025-3-3 10:15:46

大数据

hive leg函数

2025-3-3 10:15:48

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧