数据库分库分表与数据迁移实战
2026/5/14 6:00:50 网站建设 项目流程

数据库分库分表与数据迁移实战

引言

随着业务规模增长,单一数据库实例往往难以承受高并发、大数据量的压力。分库分表成为解决这类问题的常用方案,但同时也带来了复杂性。本文将深入探讨分库分表的各种策略以及数据迁移的最佳实践。

一、分库分表策略

1.1 垂直拆分

垂直拆分按照业务将表或库拆分到不同的实例:

┌─────────────────┐ │ 原始数据库 │ ├─────────────────┤ │ users表 │ │ orders表 │ │ products表 │ │ logs表 │ └─────────────────┘ ↓ 垂直拆分 ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ 用户库 │ │ 订单库 │ │ 产品库 │ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │ users表 │ │ orders表 │ │ products表 │ └─────────────────┘ └─────────────────┘ └─────────────────┘
package sharding import ( "context" "database/sql" "fmt" ) type VerticalShardingManager struct { shards map[string]*sql.DB } func NewVerticalShardingManager() *VerticalShardingManager { return &VerticalShardingManager{ shards: make(map[string]*sql.DB), } } func (vsm *VerticalShardingManager) RegisterShard(name string, db *sql.DB) { vsm.shards[name] = db } func (vsm *VerticalShardingManager) GetShard(name string) (*sql.DB, error) { db, ok := vsm.shards[name] if !ok { return nil, fmt.Errorf("shard not found: %s", name) } return db, nil } type UserRepository struct { db *sql.DB } func NewUserRepository(db *sql.DB) *UserRepository { return &UserRepository{db: db} } type OrderRepository struct { db *sql.DB } func NewOrderRepository(db *sql.DB) *OrderRepository { return &OrderRepository{db: db} }

1.2 水平拆分

水平拆分按照数据分布策略将同一表的数据拆分到多个库或表中:

package sharding import ( "context" "fmt" "hash/fnv" "strings" ) type ShardingStrategy interface { GetShardKey(record interface{}) string GetShardIndex(shardKey string, shardCount int) int } type HashSharding struct { shardCount int } func NewHashSharding(shardCount int) *HashSharding { return &HashSharding{shardCount: shardCount} } func (h *HashSharding) GetShardKey(record interface{}) string { return fmt.Sprintf("%v", record) } func (h *HashSharding) GetShardIndex(shardKey string, shardCount int) int { hsh := fnv.New32a() hsh.Write([]byte(shardKey)) return int(hsh.Sum32()) % shardCount } type RangeSharding struct { ranges []int64 } func NewRangeSharding(ranges []int64) *RangeSharding { return &RangeSharding{ranges: ranges} } func (r *RangeSharding) GetShardKey(record interface{}) string { return fmt.Sprintf("%v", record) } func (r *RangeSharding) GetShardIndex(shardKey string, shardCount int) int { var key int64 fmt.Sscanf(shardKey, "%d", &key) for i, bound := range r.ranges { if key < bound { return i } } return len(r.ranges) } type ShardRouter struct { shards []*ShardConfig strategy ShardingStrategy } type ShardConfig struct { Name string DSN string } func NewShardRouter(shards []ShardConfig, strategy ShardingStrategy) (*ShardRouter, error) { router := &ShardRouter{ shards: make([]*ShardConfig, len(shards)), strategy: strategy, } for i, shard := range shards { router.shards[i] = &shard } return router, nil } func (sr *ShardRouter) GetShard(shardKey string) *ShardConfig { index := sr.strategy.GetShardIndex(shardKey, len(sr.shards)) return sr.shards[index] } func (sr *ShardRouter) GetAllShards() []*ShardConfig { return sr.shards } func (sr *ShardRouter) GetShardByName(name string) *ShardConfig { for _, shard := range sr.shards { if shard.Name == name { return shard } } return nil }

二、分布式ID生成

2.1 Snowflake算法实现

package idgen import ( "errors" "sync" "time" ) var ( ErrTimeBackwards = errors.New("time has gone backwards") ErrIDOverflow = errors.New("ID overflow") ) type Snowflake struct { mu sync.Mutex lastTimestamp int64 sequence int64 epoch int64 nodeID int64 nodeIDBits int64 sequenceBits int64 nodeIDShift int64 timestampShift int64 sequenceMask int64 maxNodeID int64 } func NewSnowflake(nodeID int64, epoch int64) (*Snowflake, error) { nodeIDBits := int64(10) sequenceBits := int64(12) maxNodeID := int64(-1) ^ (int64(-1) << nodeIDBits) if nodeID < 0 || nodeID > maxNodeID { return nil, errors.New("nodeID must be between 0 and " + string(rune(maxNodeID))) } sequenceMask := int64(-1) ^ (int64(-1) << sequenceBits) return &Snowflake{ epoch: epoch, nodeID: nodeID, nodeIDBits: nodeIDBits, sequenceBits: sequenceBits, nodeIDShift: sequenceBits, timestampShift: sequenceBits + nodeIDBits, sequenceMask: sequenceMask, maxNodeID: maxNodeID, lastTimestamp: -1, sequence: 0, }, nil } func (s *Snowflake) Generate() (int64, error) { s.mu.Lock() defer s.mu.Unlock() timestamp := time.Now().UnixMilli() - s.epoch if timestamp < s.lastTimestamp { return 0, ErrTimeBackwards } if timestamp == s.lastTimestamp { s.sequence = (s.sequence + 1) & s.sequenceMask if s.sequence == 0 { timestamp = s.waitNextMillis(timestamp) } } else { s.sequence = 0 } s.lastTimestamp = timestamp id := (timestamp << s.timestampShift) | (s.nodeID << s.nodeIDShift) | s.sequence if id < 0 { return 0, ErrIDOverflow } return id, nil } func (s *Snowflake) waitNextMillis(currentTimestamp int64) int64 { for currentTimestamp == s.lastTimestamp { currentTimestamp = time.Now().UnixMilli() - s.epoch } return currentTimestamp } func (s *Snowflake) Parse(id int64) (timestamp int64, nodeID int64, sequence int64) { timestamp = (id >> s.timestampShift) + s.epoch nodeID = (id >> s.nodeIDShift) & s.maxNodeID sequence = id & s.sequenceMask return }

2.2 批量ID生成

package idgen import ( "context" "sync" ) type IDBatchGenerator struct { snowflake *Snowflake buffer chan int64 batchSize int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } func NewIDBatchGenerator(nodeID int64, epoch int64, batchSize int) (*IDBatchGenerator, error) { sf, err := NewSnowflake(nodeID, epoch) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) return &IDBatchGenerator{ snowflake: sf, buffer: make(chan int64, batchSize*2), batchSize: batchSize, ctx: ctx, cancel: cancel, }, nil } func (g *IDBatchGenerator) Start() { g.wg.Add(1) go g.generateLoop() } func (g *IDBatchGenerator) generateLoop() { defer g.wg.Done() ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-g.ctx.Done(): return case <-ticker.C: if len(g.buffer) < g.batchSize { for i := 0; i < g.batchSize; i++ { id, err := g.snowflake.Generate() if err != nil { continue } select { case g.buffer <- id: default: } } } } } } func (g *IDBatchGenerator) GenerateBatch(ctx context.Context, count int) ([]int64, error) { ids := make([]int64, 0, count) for len(ids) < count { select { case <-ctx.Done(): return ids, ctx.Err() case id := <-g.buffer: ids = append(ids, id) default: id, err := g.snowflake.Generate() if err != nil { continue } ids = append(ids, id) } } return ids, nil } func (g *IDBatchGenerator) Stop() { g.cancel() g.wg.Wait() close(g.buffer) }

三、数据迁移策略

3.1 双写迁移

package migration import ( "context" "fmt" "sync" "time" ) type DualWriter struct { sourceDB *DB targetDB *DB mode WriteMode mu sync.RWMutex } type WriteMode int const ( WriteModeSource WriteMode = iota WriteModeTarget WriteModeBoth ) func NewDualWriter(source, target *DB, mode WriteMode) *DualWriter { return &DualWriter{ sourceDB: source, targetDB: target, mode: mode, } } func (dw *DualWriter) SetMode(mode WriteMode) { dw.mu.Lock() defer dw.mu.Unlock() dw.mode = mode } func (dw *DualWriter) GetMode() WriteMode { dw.mu.RLock() defer dw.mu.RUnlock() return dw.mode } func (dw *DualWriter) Insert(ctx context.Context, table string, data map[string]interface{}) error { dw.mu.RLock() mode := dw.mode dw.mu.RUnlock() switch mode { case WriteModeSource: return dw.sourceDB.Insert(ctx, table, data) case WriteModeTarget: return dw.targetDB.Insert(ctx, table, data) case WriteModeBoth: if err := dw.sourceDB.Insert(ctx, table, data); err != nil { return err } return dw.targetDB.Insert(ctx, table, data) } return nil } func (dw *DualWriter) Update(ctx context.Context, table string, id interface{}, data map[string]interface{}) error { dw.mu.RLock() mode := dw.mode dw.mu.RUnlock() switch mode { case WriteModeSource: return dw.sourceDB.Update(ctx, table, id, data) case WriteModeTarget: return dw.targetDB.Update(ctx, table, id, data) case WriteModeBoth: if err := dw.sourceDB.Update(ctx, table, id, data); err != nil { return err } return dw.targetDB.Update(ctx, table, id, data) } return nil } func (dw *DualWriter) Delete(ctx context.Context, table string, id interface{}) error { dw.mu.RLock() mode := dw.mode dw.mu.RUnlock() switch mode { case WriteModeSource: return dw.sourceDB.Delete(ctx, table, id) case WriteModeTarget: return dw.targetDB.Delete(ctx, table, id) case WriteModeBoth: if err := dw.sourceDB.Delete(ctx, table, id); err != nil { return err } return dw.targetDB.Delete(ctx, table, id) } return nil } type DB struct { Name string } func (db *DB) Insert(ctx context.Context, table string, data map[string]interface{}) error { return fmt.Errorf("not implemented") } func (db *DB) Update(ctx context.Context, table string, id interface{}, data map[string]interface{}) error { return fmt.Errorf("not implemented") } func (db *DB) Delete(ctx context.Context, table string, id interface{}) error { return fmt.Errorf("not implemented") }

3.2 数据同步工具

package migration import ( "context" "fmt" "sync" "time" ) type DataSync struct { sourceDB *DB targetDB *DB batchSize int parallelism int progress *SyncProgress } type SyncProgress struct { mu sync.RWMutex TotalRecords int64 SyncedRecords int64 FailedRecords int64 StartTime time.Time Errors []SyncError } type SyncError struct { RecordID interface{} Error error Time time.Time } func NewDataSync(source, target *DB, batchSize, parallelism int) *DataSync { return &DataSync{ sourceDB: source, targetDB: target, batchSize: batchSize, parallelism: parallelism, progress: &SyncProgress{ StartTime: time.Now(), Errors: make([]SyncError, 0), }, } } func (ds *DataSync) GetProgress() *SyncProgress { ds.progress.mu.RLock() defer ds.progress.mu.RUnlock() return ds.progress } func (ds *DataSync) SyncTable(ctx context.Context, table string, whereClause string) error { var lastID interface{} batchNum := 0 for { select { case <-ctx.Done(): return ctx.Err() default: } records, err := ds.sourceDB.FetchBatch(ctx, table, whereClause, lastID, ds.batchSize) if err != nil { return fmt.Errorf("failed to fetch batch %d: %w", batchNum, err) } if len(records) == 0 { break } if err := ds.targetDB.InsertBatch(ctx, table, records); err != nil { for _, record := range records { if err := ds.targetDB.Insert(ctx, table, record); err != nil { ds.recordError(record["id"], err) } } } lastID = records[len(records)-1]["id"] ds.progress.mu.Lock() ds.progress.SyncedRecords += int64(len(records)) ds.progress.mu.Unlock() batchNum++ if len(records) < ds.batchSize { break } } return nil } func (ds *DataSync) recordError(recordID interface{}, err error) { ds.progress.mu.Lock() defer ds.progress.mu.Unlock() ds.progress.FailedRecords++ ds.progress.Errors = append(ds.progress.Errors, SyncError{ RecordID: recordID, Error: err, Time: time.Now(), }) if len(ds.progress.Errors) > 100 { ds.progress.Errors = ds.progress.Errors[1:] } } func (p *SyncProgress) GetProgressPercent() float64 { p.mu.RLock() defer p.mu.RUnlock() if p.TotalRecords == 0 { return 0 } return float64(p.SyncedRecords) / float64(p.TotalRecords) * 100 } func (p *SyncProgress) GetETA() time.Duration { p.mu.RLock() defer p.mu.RUnlock() if p.SyncedRecords == 0 { return 0 } elapsed := time.Since(p.StartTime) rate := float64(p.SyncedRecords) / elapsed.Seconds() remaining := float64(p.TotalRecords - p.SyncedRecords) return time.Duration(remaining/rate) * time.Second }

3.3 一致性校验

package migration import ( "context" "fmt" ) type DataVerifier struct { sourceDB *DB targetDB *DB } func NewDataVerifier(source, target *DB) *DataVerifier { return &DataVerifier{ sourceDB: source, targetDB: target, } } type VerificationResult struct { Table string Status VerificationStatus SourceCount int64 TargetCount int64 MatchedRows int64 MissingRows int64 MismatchRows int64 Errors []error } type VerificationStatus string const ( StatusPassed VerificationStatus = "passed" StatusFailed VerificationStatus = "failed" StatusPartial VerificationStatus = "partial" ) func (v *DataVerifier) VerifyTable(ctx context.Context, table string, primaryKey string) (*VerificationResult, error) { result := &VerificationResult{ Table: table, Status: StatusPassed, Errors: make([]error, 0), } sourceCount, err := v.sourceDB.Count(ctx, table) if err != nil { return nil, fmt.Errorf("failed to count source: %w", err) } result.SourceCount = sourceCount targetCount, err := v.targetDB.Count(ctx, table) if err != nil { return nil, fmt.Errorf("failed to count target: %w", err) } result.TargetCount = targetCount if sourceCount != targetCount { result.Status = StatusFailed result.MissingRows = sourceCount - targetCount } matched, err := v.verifyDataMatch(ctx, table, primaryKey) if err != nil { result.Errors = append(result.Errors, err) } result.MatchedRows = matched if result.MatchedRows != sourceCount { result.Status = StatusFailed result.MismatchRows = sourceCount - result.MatchedRows } return result, nil } func (v *DataVerifier) verifyDataMatch(ctx context.Context, table, primaryKey string) (int64, error) { sourceRows, err := v.sourceDB.FetchAll(ctx, table) if err != nil { return 0, err } var matched int64 for _, row := range sourceRows { targetRow, err := v.targetDB.FetchByID(ctx, table, row[primaryKey]) if err != nil { continue } if v.rowsMatch(row, targetRow) { matched++ } } return matched, nil } func (v *DataVerifier) rowsMatch(source, target map[string]interface{}) bool { if len(source) != len(target) { return false } for key, sourceVal := range source { if targetVal, ok := target[key]; ok { if sourceVal != targetVal { return false } } else { return false } } return true } func (v *DataVerifier) QuickCheck(ctx context.Context, table string, sampleSize int) (bool, error) { sourceSample, err := v.sourceDB.FetchSample(ctx, table, sampleSize) if err != nil { return false, err } for _, row := range sourceSample { exists, err := v.targetDB.Exists(ctx, table, row) if err != nil { return false, err } if !exists { return false, nil } } return true, nil }

四、路由中间件实现

4.1 分片路由中间件

package middleware import ( "context" "fmt" "strings" ) type ShardRouterMiddleware struct { router *ShardRouter } func NewShardRouterMiddleware(router *ShardRouter) *ShardRouterMiddleware { return &ShardRouterMiddleware{router: router} } type ShardContextKey string const ShardContextKey ShardContextKey = "shard" func (m *ShardRouterMiddleware) SelectShard(ctx context.Context, table string, shardKey interface{}) (context.Context, error) { key := fmt.Sprintf("%s:%v", table, shardKey) shard := m.router.GetShard(fmt.Sprintf("%v", key)) if shard == nil { return nil, fmt.Errorf("no shard found for key: %s", key) } return context.WithValue(ctx, ShardContextKey, shard), nil } func (m *ShardRouterMiddleware) ExtractShard(ctx context.Context) (*ShardConfig, error) { shard, ok := ctx.Value(ShardContextKey).(*ShardConfig) if !ok { return nil, fmt.Errorf("shard not found in context") } return shard, nil } type QueryTemplate struct { SQL string ShardKey string TargetTable string } func (m *ShardRouterMiddleware) BuildShardQuery(template *QueryTemplate, shardKey interface{}) (string, error) { shard := m.router.GetShard(fmt.Sprintf("%v", shardKey)) if shard == nil { return "", fmt.Errorf("no shard found for key: %v", shardKey) } query := strings.Replace(template.SQL, "{{table}}", template.TargetTable, 1) query = strings.Replace(query, "{{shard_table}}", fmt.Sprintf("%s_%v", template.TargetTable, shard.Name), 1) return query, nil }

五、总结

分库分表是应对大数据量、高并发的有效手段:

  1. 分片策略选择

    • 哈希分片:数据分布均匀,适合随机访问
    • 范围分片:适合按时间顺序访问,但可能产生热点
    • 目录分片:灵活但需要额外的查找开销
  2. ID生成方案

    • Snowflake:高性能、有序
    • UUID:完全独立但无序
    • 数据库自增:简单但分布式下有瓶颈
  3. 数据迁移流程

    • 双写方案:业务切换平滑,但需要处理数据一致性
    • 增量同步:使用CDC或日志同步
    • 一致性校验:确保数据完整
  4. 注意事项

    • 跨分片查询代价高,尽量避免
    • 事务边界需要重新设计
    • 索引维护更加复杂

合理的分库分表设计能够显著提升系统容量和性能。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询