raftexample的数据库是一个内存数据库,在启动时候进行数据恢复也就是写到内存中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
func (rc *raftNode) replayWAL() *wal.WAL { log.Printf("replaying WAL of member %d", rc.id) snapshot := rc.loadSnapshot() w := rc.openWAL(snapshot) _, st, ents, err := w.ReadAll() if err != nil { log.Fatalf("raftexample: failed to read WAL (%v)", err) } rc.raftStorage = raft.NewMemoryStorage() if snapshot != nil { rc.raftStorage.ApplySnapshot(*snapshot) } rc.raftStorage.SetHardState(st)
rc.raftStorage.Append(ents)
return w }
|
1 用snap快照恢复
在raft启动的过程中检测到有snap快照文件后,会尝试用快照文件恢复数据
1.1 首先检测有没有快照文件
很键盘,并不需要真的去找snap文件,间接看wal就行,没有wal目录就判定为没snap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { if wal.Exist(rc.waldir) { walSnaps, err := wal.ValidSnapshotEntries(rc.logger, rc.waldir) if err != nil { log.Fatalf("raftexample: error listing snapshots (%v)", err) } snapshot, err := rc.snapshotter.LoadNewestAvailable(walSnaps) if err != nil && !errors.Is(err, snap.ErrNoSnapshot) { log.Fatalf("raftexample: error loading snapshot (%v)", err) } return snapshot } return &raftpb.Snapshot{} }
|
1.2 定位用哪个快照
系统可能存在很多快照,理论上用最新的一个快照文件就行,这个地方做了比较有意思的防御性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) { return s.loadMatching(func(snapshot *raftpb.Snapshot) bool { m := snapshot.Metadata for i := len(walSnaps) - 1; i >= 0; i-- { if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index { return true } } return false }) }
|
1.3 用snap进行恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error { ms.Lock() defer ms.Unlock()
msIndex := ms.snapshot.Metadata.Index snapIndex := snap.Metadata.Index if msIndex >= snapIndex { return ErrSnapOutOfDate }
ms.snapshot = snap ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}} return nil }
|
2 wal文件
首先wal文件名命名是有设计的
1 2 3 4 5 6 7 8
|
func walName(seq, index uint64) string { return fmt.Sprintf("%016x-%016x.wal", seq, index) }
|
2.1 定位用哪些wal文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) { for i := len(names) - 1; i >= 0; i-- { name := names[i] _, curIndex, err := parseWALName(name) if err != nil { lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err)) } if index >= curIndex { return i, true } } return -1, false }
|
2.2 回放wal时候怎么丢弃重复记录
因为wal文件的设计已经保证了记录index的绝对有序,所以当发现内存中第一条记录的index比用来恢复的wal中第一条记录index大,就说明wal找多了,就直接丢弃,从头开始砍,砍到不在内存中的那条记录开始
1 2 3
| if first > entries[0].Index { entries = entries[first-entries[0].Index:] }
|