NSQ 磁盘持久队列 DiskQueue 设计思想全解析

2025/11/27 实践总结最佳

—— 为什么 NSQ 的 DiskQueue 不用锁也能做到高性能、强一致?

NSQ 的 diskqueue (opens new window) 是一个极其优雅的磁盘持久化 FIFO 队列实现。它的代码不多,却藏着许多工程上的巧思:

  • 单调度协程保证线程安全
  • 记录格式简洁高效(长度+数据)
  • 元数据持久化设计稳健
  • 文件滚动策略减少 IO 并保持性能

本文从设计思想切入,结合源码逐块解析 diskqueue 是如何实现一个可持久、线程安全、性能稳定的磁盘队列。


# 1. 单协程调度(ioLoop)保证线程安全:无需加锁

DiskQueue 的最巧妙设计之一,就是所有写入、读取、同步、文件滚动等操作都在 一个 ioLoop 协程中完成

即使 diskQueue 结构体上存在 RWMutex,但核心变量(read/write position、file num、depth)都只在 ioLoop 中修改,因此可以不用锁,天然线程安全

创建队列时就启动这个协程:

// diskqueue.go: New()
go d.ioLoop()
1
2

来看 ioLoop 的结构:

// diskqueue.go: ioLoop()
func (d *diskQueue) ioLoop() {
    for {
        select {
        case data := <-d.writeChan:
            err := d.writeOne(data)
            d.writeResponseChan <- err
        
        case <-d.emptyChan:
            err := d.deleteAllFiles()
            d.emptyResponseChan <- err

        case d.depthChan <- d.depth:
            // return depth

        // 其他分支:退出、同步等...
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# ✨ 设计亮点

  • 所有请求都通过 channel 串行进入 ioLoop 生产者调用 Put() → writeChan → ioLoop → writeOne()
  • ioLoop 不需要任何互斥锁 因为写入操作永远在单协程执行,不存在并发修改。
  • 读通道 readChan 也在循环内安全发送

这与 Redis 的单线程模型有异曲同工之妙:串行化换线程安全,避免锁开销


# 2. 内容数据按(长度+数据)格式写入磁盘

NSQ 为每条记录写入:

4字节长度(int32 大端序) + N 字节消息内容
1

# 写入逻辑在 writeOne()

// diskqueue.go: writeOne()
dataLen := int32(len(data))
totalBytes := int64(4 + dataLen)

// 写长度
binary.Write(&d.writeBuf, binary.BigEndian, dataLen)

// 写内容
d.writeBuf.Write(data)

// 一次性写入文件
_, err = d.writeFile.Write(d.writeBuf.Bytes())
1
2
3
4
5
6
7
8
9
10
11
12

# 为什么采用“长度 + 内容”?

  • 读取时无需扫描分隔符,直接按长度读取 → 高性能顺序读
  • 能快速跳过损坏数据(长度检测) → 增强鲁棒性
  • 二进制格式紧凑 → 磁盘占用小

# 文件满了会自动滚动

writePos + totalBytes > maxBytesPerFile

if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
    d.writeFileNum++
    d.writePos = 0
    d.sync()
    d.writeFile.Close()
}
1
2
3
4
5
6

即:当前文件满 → fsync → 开新文件继续写


# 3. 元数据持久化:周期性记录读写进度

为了保证崩溃后仍能从正确位置恢复,DiskQueue 会定期将读写指针和 depth 写入 metadata 文件。

元数据内容示例:

depth
readFileNum,readPos
writeFileNum,writePos
1
2
3

对应的写逻辑在:

// diskqueue.go: persistMetaData()
fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
    d.depth,
    d.readFileNum, d.readPos,
    d.writeFileNum, d.writePos)
1
2
3
4
5

采用安全的 tmp file + rename 原子替换,保证文件内部的数据是完整:

tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
os.Rename(tmpFileName, fileName)  // 原子提交
1
2

这样即使在持久化过程中崩溃也不会破坏原有 metadata。

# 什么时候会同步?

两种模式:

  1. 每写 X 条消息(通过 syncEvery 配置)
  2. 定时 syncTimeout 触发 fsync

触发逻辑在 ioLoop 中处理:

if d.needSync && (writes%syncEvery == 0 || time.Since(lastSync) > syncTimeout) {
    d.sync()
}
1
2
3

# 4. 读取记录(一次取一条,同样采用“长度+数据”格式)

读取逻辑在 readOne()

var msgSize int32
binary.Read(d.reader, binary.BigEndian, &msgSize)

readBuf := make([]byte, msgSize)
io.ReadFull(d.reader, readBuf)
1
2
3
4
5

并计算下一次读取的位置:

totalBytes := int64(4 + msgSize)
d.nextReadPos = d.readPos + totalBytes
1
2

# 文件滚动

读取到文件末尾时:

if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
    d.nextReadFileNum++
    d.nextReadPos = 0
}
1
2
3
4

DiskQueue 会自动切换到下一个文件。


# 总结:DiskQueue 以极小的代码实现极稳健的持久化队列

设计点 带来的价值
ioLoop 单协程串行调度 无锁、线程安全、高性能
长度+数据 格式 顺序读写最高效、格式安全
文件滚动策略 提高磁盘局部性,避免巨型文件
metadata 原子持久化 崩溃可恢复,不产生部分写入

NSQ 的 DiskQueue 是工程中小而美、高可靠、高性能磁盘队列实现的典范,非常值得学习。

如果你正在实现:

  • 本地消息队列
  • 任务执行器的持久化存储
  • 分布式系统的持久化 mailbox
  • WAL / 顺序写日志

DiskQueue 都是一个极好的参考。

nsq的diskqueue源码:https://github.com/nsqio/go-diskqueue/blob/master/diskqueue.go