1. 首页 > 数据库技术

从零实现一个时序数据库

从零实现一个时序数据库

时序数据库(TSDB: Time Series Database)大多数时候都是为了满足监控场景的需求,这里先介绍两个概念:

  • 数据点(Point): 时序数据的数据点是一个包含 (Timestamp:int64, Value:float64) 的二元组。
  • 时间线(Series): 不同标签(Label)的组合称为不同的时间线,如
  1. series1: {"__name__""netspeed""host""localhost""iface""eth0"
  2. series2: {"__name__""netspeed""host""localhost""iface""eth1"

Prometheus, InfluxDB, M3, TimescaleDB 都是时下流行的 TSDB。时序数据的压缩算法很大程度上决定了 TSDB 的性能,以上几个项目的实现都参考了 Fackbook 2015 年发表的论文《Gorilla: A fast, scalable, in-memory time series database》(http://www.vldb.org/pvldb/vol8/p1816-teller.pdf) 中提到的差值算法,该算法平均可以将 16 字节的数据点压缩成 1.37 字节。

Who's mando?

  • Din Djarin, also known as "the Mandalorian" or simply "Mando," was a human male Mandalorian who worked as a famous bounty hunter during the New Republic Era.

从零实现一个时序数据库

What's mandodb?

mandodb(https://github.com/chenjiandongx/mandodb) 是我在学习过程中实现的一个最小化的 TSDB,从概念上来讲它还算不上是一个完整的 TSDB,因为它:

  • 没有实现自己的查询引擎(实现难度大)
  • 缺少磁盘归档文件 Compact 操作(有空的话会实现)
  • 没有 WAL 作为灾备保证高可用(心情好的话会实现)

mandodb 主要受到了两个项目的启发。本项目仅限于学习用途,未经生产环境测试验证!

  • nakabonne/tstorage
  • prometheus/prometheus

prometheus 的核心开发者 Fabian Reinartz 写了一篇文章 《Writing a Time Series Database from Scratch》(https://fabxc.org/tsdb/) 来介绍 prometheus TSDB 的演变过程,非常值得一读,强烈推荐。

 数据模型 & API 文档

 

数据模型定义

  1. // Point 表示一个数据点 (ts, value) 二元组 
  2. type Point struct { 
  3.  Ts    int64 // in seconds 
  4.  Value float64 
  5.  
  6. // Label 代表一个标签组合 
  7. type Label struct { 
  8.  Name  string 
  9.  Value string 
  10.  
  11. // Row 一行时序数据 包括数据点和标签组合 
  12. type Row struct { 
  13.  Metric string 
  14.  Labels LabelSet 
  15.  Point  Point 
  16.  
  17. // LabelSet 表示 Label 组合 
  18. type LabelSet []Label 
  19.  
  20. // LabelMatcher Label 匹配器 支持正则 
  21. type LabelMatcher struct { 
  22.  Name   string 
  23.  Value  string 
  24.  IsRegx bool 
  25.  
  26. // LabelMatcherSet 表示 LabelMatcher 组合 
  27. type LabelMatcherSet []LabelMatcher 

API

  1. // InsertRows 写数据 
  2. InsertRows(rows []*Row) error  
  3.  
  4. // QueryRange 查询时序数据点 
  5. QueryRange(metric string, lms LabelMatcherSet, start, end int64) ([]MetricRet, error) 
  6.  
  7. // QuerySeries 查询时序序列组合 
  8. QuerySeries(lms LabelMatcherSet, start, end int64) ([]map[string]string, error) 
  9.  
  10. // QueryLabelValues 查询标签值 
  11. QueryLabelValues(label string, start, end int64) []string 

 配置选项

 

配置项在初始化 TSDB 的时候设置。

  1. // WithMetaSerializerType 设置 Metadata 数据的序列化类型 
  2. // 目前只提供了 BinaryMetaSerializer 
  3. WithMetaSerializerType(t MetaSerializerType) Option  
  4.  
  5. // WithMetaBytesCompressorType 设置字节数据的压缩算法 
  6. // 目前提供了 
  7. // * 不压缩: NoopBytesCompressor(默认) 
  8. // * ZSTD: ZstdBytesCompressor 
  9. // * Snappy: SnappyBytesCompressor 
  10. WithMetaBytesCompressorType(t BytesCompressorType) Option 
  11.  
  12. // WithOnlyMemoryMode 设置是否默认只存储在内存中 
  13. // 默认为 false 
  14. WithOnlyMemoryMode(memoryMode bool) Option 
  15.  
  16. // WithEnabledOutdated 设置是否支持乱序写入 此特性会增加资源开销 但会提升数据完整性 
  17. // 默认为 true 
  18. WithEnabledOutdated(outdated bool) Option 
  19.  
  20. // WithMaxRowsPerSegment 设置单 Segment 最大允许存储的点数 
  21. // 默认为 19960412(夹杂私货 ) 
  22. WithMaxRowsPerSegment(n int64) Option 
  23.  
  24. // WithDataPath 设置 Segment 持久化存储文件夹 
  25. // 默认为 "." 
  26. WithDataPath(d string) Option 
  27.  
  28. // WithRetention 设置 Segment 持久化数据保存时长 
  29. // 默认为 7d 
  30. WithRetention(t time.Duration) Option 
  31.  
  32. // WithWriteTimeout 设置写入超时阈值 
  33. // 默认为 30s 
  34. WithWriteTimeout(t time.Duration) Option 
  35.  
  36. // WithLoggerConfig 设置日志配置项 
  37. // logger: github.com/chenjiandongx/logger 
  38. WithLoggerConfig(opt *logger.Options) Option 

 用法示例

 

  1. package main 
  2.  
  3. import ( 
  4.  "fmt" 
  5.  "time" 
  6.  
  7.  "github.com/chenjiandongx/mandodb" 
  8.  
  9. func main() { 
  10.  store := mandodb.OpenTSDB( 
  11.   mandodb.WithOnlyMemoryMode(true), 
  12.   mandodb.WithWriteTimeout(10*time.Second), 
  13.  ) 
  14.  defer store.Close() 
  15.  
  16.  // 插入数据 
  17.  _ = store.InsertRows([]*mandodb.Row{ 
  18.   { 
  19.    Metric: "cpu.busy"
  20.    Labels: []mandodb.Label{ 
  21.     {Name"node", Value: "vm1"}, 
  22.     {Name"dc", Value: "gz-idc"}, 
  23.    }, 
  24.    Point: mandodb.Point{Ts: 1600000001, Value: 0.1}, 
  25.   }, 
  26.   { 
  27.    Metric: "cpu.busy"
  28.    Labels: []mandodb.Label{ 
  29.     {Name"node", Value: "vm2"}, 
  30.     {Name"dc", Value: "sz-idc"}, 
  31.    }, 
  32.    Point: mandodb.Point{Ts: 1600000001, Value: 0.1}, 
  33.   }, 
  34.  }) 
  35.  
  36.  time.Sleep(time.Millisecond) 
  37.  
  38.  // 时序数据查询 
  39.  data, _ := store.QueryRange("cpu.busy", nil, 1600000000, 1600000002) 
  40.  fmt.Printf("data: %+v\n", data) 
  41.  // output
  42.  // data: [{Labels:{__name__="cpu.busy", dc="gz-idc", node="vm1"} Points:[{Ts:1600000001 Value:0.1}]}] 
  43.  
  44.  // 查询 Series 
  45.  // __name__ 是 metric 名称在 TSDB 中的 Label Key 
  46.  ser, _ := store.QuerySeries( 
  47.         mandodb.LabelMatcherSet{{Name"__name__", Value: "cpu.busy"}}, 1600000000, 1600000002) 
  48.  for _, d := range ser { 
  49.   fmt.Printf("data: %+v\n", d) 
  50.  } 
  51.  // output
  52.  // data: map[__name__:cpu.busy dc:gz-idc node:vm1] 
  53.  // data: map[__name__:cpu.busy dc:sz-idc node:vm2] 
  54.  
  55.  // 查询标签值 
  56.  lvs := store.QueryLabelValues("node", 1600000000, 1600000002) 
  57.  fmt.Printf("data: %+v\n", lvs) 
  58.  // output
  59.  // data: [vm1 vm2] 

下面是我对这段时间学习内容的整理,尝试完整介绍如何从零开始实现一个小型的 TSDB。

我本身并没有数据库开发的背景,某些描述可能并不那么准确,所以欢迎 实名 diss 指正。

 Gorilla 差值算法

 

Gorilla 论文 4.1 小节介绍了压缩算法,先整体看一下压缩方案,T/V 是紧挨存储的,'0'/'10'/'11' 表示控制位。

Figure: Gorilla 压缩算法

从零实现一个时序数据库

Timestamp DOD 压缩:

在时序的场景中,每个时序点都有一个对应的 Timestamp,一条时序序列中相邻数据点的间隔是有规律可循的。一般来讲,监控数据的采集都是会以固定的时间间隔进行的,所以就可以用差值来记录时间间隔,更进一步,我们可以用差值的差值来记录以此来减少存储空间。

  1. t1: 1627401800; t2: 1627401810; t3: 1627401820; t4: 1627401830 
  2. -------------------------------------------------------------- 
  3. // 差值:delta 
  4. t1: 1627401800; (t2-t1)d1: 10; (t3-t2)d2: 10; (t4-t3)d3: 10;  
  5. -------------------------------------------------------------- 
  6. // 差值的差值:delta of delta 
  7. t1: 1627401800; dod1: 0; dod2: 0; dod3: 0;  

实际环境中当然不可能每个间隔都这么均匀,由于网络延迟等其他原因,差值会有波动。

Value XOR 压缩:

Figure: IEEE 浮点数以及 XOR 计算结果

从零实现一个时序数据库

当两个数据点数值值比较接近的话,通过异或操作计算出来的结果是比较相似的,利用这点就可以通过记录前置零和后置零个数以及数值部分来达到压缩空间的目的。

下面通过算法实现来介绍,代码来自项目 dgryski/go-tsz。代码完全按照论文中给出的步骤来实现。

  1. // New 初始化 block 这里会将第一个原始时间戳写入到 block 中 
  2. func New(t0 uint32) *Series { 
  3.  s := Series{ 
  4.   T0:      t0, 
  5.   leading: ^uint8(0), 
  6.  } 
  7.  
  8.  s.bw.writeBits(uint64(t0), 32) 
  9.  return &s 
  10.  
  11. // Push 负责写入时序数据 
  12. func (s *Series) Push(t uint32, v float64) { 
  13.  // .... 
  14.  // 如果是第一个数据点的话写入原始数据后直接返回 
  15.  if s.t == 0 { 
  16.   s.t = t 
  17.   s.val = v 
  18.   s.tDelta = t - s.T0 // 实际上这里为 0 
  19.  
  20.   // The block header stores the starting time stamp, t-1(前一个时间戳), 
  21.   // which is aligned to a two hour window; the first time 
  22.   // stamp, t0, in the block is stored as a delta from t−1 in 14 bits. 
  23.          
  24.   // 用 14 个 bit 写入时间戳差值 
  25.   s.bw.writeBits(uint64(s.tDelta), 14) 
  26.   // 原始数据点完整写入 
  27.   s.bw.writeBits(math.Float64bits(v), 64) 
  28.   return 
  29.  } 
  30.  
  31.  tDelta := t - s.t 
  32.  dod := int32(tDelta - s.tDelta) // 计算差值的差值 Detla of Delta 
  33.  
  34.  // 下面开始就处理非第一个数据点的情况了 
  35.  switch { 
  36.   // If D is zero, then store a single ‘0’ bit 
  37.   // 如果是零的话 那直接用 '0' 一个字节就可以直接表示 
  38.  case dod == 0: 
  39.   s.bw.writeBit(zero) 
  40.  
  41.   //  If D is between [-63, 64], store ‘10’ followed by the value (7 bits) 
  42.  case -63 <= dod && dod <= 64: 
  43.   s.bw.writeBits(0x02, 2) // 控制位 '10' 
  44.   s.bw.writeBits(uint64(dod), 7) // 7bits 可以表示 [-63, 64] 的范围 
  45.  
  46.   // If D is between [-255, 256], store ‘110’ followed by the value (9 bits) 
  47.  case -255 <= dod && dod <= 256: 
  48.   s.bw.writeBits(0x06, 3) // 控制位 '110' 
  49.   s.bw.writeBits(uint64(dod), 9) 
  50.  
  51.   // if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits) 
  52.  case -2047 <= dod && dod <= 2048: 
  53.   s.bw.writeBits(0x0e, 4) // 控制位 '1110' 
  54.   s.bw.writeBits(uint64(dod), 12) 
  55.  
  56.   // Otherwise store ‘1111’ followed by D using 32 bits 
  57.  default
  58.   s.bw.writeBits(0x0f, 4) // 其余情况控制位均用 '1111' 
  59.   s.bw.writeBits(uint64(dod), 32) 
  60.  } 
  61.  
  62.  // 到这里 (T, V) 中的时间戳已经写入完毕了 接下来是写 V 部分 
  63.  
  64.  // 先计算两个值的异或结果 
  65.  vDelta := math.Float64bits(v) ^ math.Float64bits(s.val) 
  66.  
  67.  // If XOR with the previous is zero (same value), store single ‘0’ bit 
  68.  // 如果前后两个值相等的话 直接用 '0' 1 个 bit 就可以表示 
  69.  // 所以如果上报的时序数据是 1 或者 0 这种的话 占用的内存会非常少 
  70.  
  71.  // zero = '0'; one = '1' 
  72.  if vDelta == 0 { 
  73.   s.bw.writeBit(zero) 
  74.  } else {    // 非 0 情况那就要把控制位置为 1 
  75.   s.bw.writeBit(one) 
  76.  
  77.   // 计算前置 0 和后置 0 
  78.   leading := uint8(bits.LeadingZeros64(vDelta)) 
  79.   trailing := uint8(bits.TrailingZeros64(vDelta)) 
  80.  
  81.   // clamp number of leading zeros to avoid overflow when encoding 
  82.   if leading >= 32 { 
  83.    leading = 31 
  84.   } 
  85.  
  86.   // (Control bit ‘0’) If the block of meaningful bits 
  87.   // falls within the block of previous meaningful bits, 
  88.   // i.e., there are at least as many leading zeros and 
  89.   // as many trailing zeros as with the previous value, 
  90.   // use that information for the block position and 
  91.   // just store the meaningful XORed value. 
  92.  
  93.   // 如果前置 0 不小于上一个值计算的异或结果的前置 0 且后置 0 也不小于上一个值计算的异或结果的后置 0 
  94.   if s.leading != ^uint8(0) && leading >= s.leading && trailing >= s.trailing { // => 控制位 '10' 
  95.    s.bw.writeBit(zero) 
  96.    // 记录异或值非零部分 
  97.    s.bw.writeBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing)) 
  98.   } else { // => 控制位 '11' 
  99.  
  100.    // (Control bit ‘1’) Store the length of the number 
  101.    // of leading zeros in the next 5 bits, then store the 
  102.    // length of the meaningful XORed value in the next 
  103.    // 6 bits. Finally store the meaningful bits of the XORed value. 
  104.    s.leading, s.trailing = leading, trailing 
  105.  
  106.    // 其他情况控制位置为 1 并用接下来的 5bits 记录前置 0 个数 
  107.    s.bw.writeBit(one) 
  108.    s.bw.writeBits(uint64(leading), 5) 
  109.  
  110.    // 然后用接下来的 6bits 记录异或差值中的非零部分 
  111.    sigbits := 64 - leading - trailing 
  112.    s.bw.writeBits(uint64(sigbits), 6) 
  113.    s.bw.writeBits(vDelta>>trailing, int(sigbits)) 
  114.   } 
  115.  } 
  116.  
  117.  // 状态更新 至此(T, V)均已被压缩写入到内存中 
  118.  s.tDelta = tDelta 
  119.  s.t = t 
  120.  s.val = v 
  121.  
  122. // 每个 block 的结尾会使用特殊标记用于标识 
  123. func finish(w *bstream) { 
  124.  // write an end-of-stream record 
  125.  w.writeBits(0x0f, 4) 
  126.  w.writeBits(0xffffffff, 32) 
  127.  w.writeBit(zero) 

论文给出了不同 case 的 buckets 占比分布。

Figure: Timestamp buckets distribution

从零实现一个时序数据库

Figure: Value buckets distribution

从零实现一个时序数据库

Timestamp buckets 中,前后两个时间戳差值相同的比例高达 96.39%,而在 Value buckets 中只用一个控制位的占比也达到了 59.06%,可见其压缩比之高。

论文还给出了一个重要结论,数据压缩比随着时间的增长而增长,并在 120 个点的时候开始收敛到一个最佳值。

Figure: 压缩率曲线

从零实现一个时序数据库

Gorilla 差值算法也应用于我的另外一个项目 chenjiandongx/tszlist,一种时序数据线程安全链表。

 数据写入

 

时序数据具有「垂直写,水平查」的特性,即同一时刻有多条时间线的数据不断被追加。但查询的时候往往是查某条时间线持续一段时间内的数据点。

  1. series 
  2.   ^    
  3.   │   . . . . . . . . . . . . . . . . .   . . . . .   {__name__="request_total", method="GET"
  4.   │     . . . . . . . . . . . . . . . . . . . . . .   {__name__="request_total", method="POST"
  5.   │         . . . . . . . 
  6.   │       . . .     . . . . . . . . . . . . . . . .                  ...  
  7.   │     . . . . . . . . . . . . . . . . .   . . . .    
  8.   │     . . . . . . . . . .   . . . . . . . . . . .   {__name__="errors_total", method="POST"
  9.   │           . . .   . . . . . . . . .   . . . . .   {__name__="errors_total", method="GET"
  10.   │         . . . . . . . . .       . . . . . 
  11.   │       . . .     . . . . . . . . . . . . . . . .                  ...  
  12.   │     . . . . . . . . . . . . . . . .   . . . .  
  13.   v 
  14.     <-------------------- time ---------------------> 

时序数据跟时间是强相关的(不然还叫时序数据?),即大多数查询其实只会查询最近时刻的数据,这里的「最近」是个相对概念。所以没必要维护一条时间线的完整生命周期,特别是在 Kubernetes 这种云原生场景,Pod 随时有可能会被扩缩容,也就意味着一条时间线的生命周期可能会很短。如果我们一直记录着所有的时间线的索引信息,那么随着时间的推移,数据库里的时间线的数量会呈现一个线性增长的趋势 ,会极大地影响查询效率。

这里引入一个概念「序列分流」,这个概念描述的是一组时间序列变得不活跃,即不再接收数据点,取而代之的是有一组新的活跃的序列出现的场景。

  1. series 
  2.   ^ 
  3.   │   . . . . . . 
  4.   │   . . . . . . 
  5.   │   . . . . . . 
  6.   │               . . . . . . . 
  7.   │               . . . . . . . 
  8.   │               . . . . . . . 
  9.   │                             . . . . . . 
  10.   │                             . . . . . . 
  11.   │                                         . . . . . 
  12.   │                                         . . . . . 
  13.   │                                         . . . . . 
  14.   v 
  15.     <-------------------- time ---------------------> 

我们将多条时间线的数据按一定的时间跨度切割成多个小块,每个小块本质就是一个独立小型的数据库,这种做法另外一个优势是清除过期操作的时候非常方便,只要将整个块给删了就行 (梭哈是一种智慧)。内存中保留最近两个小时的热数据(Memory Segment),其余数据持久化到磁盘(Disk Segment)。

Figure: 序列分块

从零实现一个时序数据库

DiskSegment 使用的是 AVL Tree 实现的列表,可在插入时排序。为什么不用更加高大上的红黑树?因为不好实现...

从零实现一个时序数据库

当 Memory Segment 达到归档条件的时候,会创建一个新的内存块并异步将刚归档的块写入到磁盘,同时会使用 mmap 将磁盘文件句柄映射到内存中。代码实现如下。

  1. func (tsdb *TSDB) getHeadPartition() (Segment, error) { 
  2.  tsdb.mut.Lock() 
  3.  defer tsdb.mut.Unlock() 
  4.  
  5.  if tsdb.segs.head.Frozen() { 
  6.   head := tsdb.segs.head 
  7.  
  8.   go func() { 
  9.    tsdb.wg.Add(1) 
  10.    defer tsdb.wg.Done() 
  11.  
  12.    tsdb.segs.Add(head) 
  13.  
  14.    t0 := time.Now() 
  15.    dn := dirname(head.MinTs(), head.MaxTs()) 
  16.  
  17.    if err := writeToDisk(head.(*memorySegment)); err != nil { 
  18.     logger.Errorf("failed to flush data to disk, %v", err) 
  19.     return 
  20.    } 
  21.  
  22.    fname := path.Join(dn, "data"
  23.    mf, err := mmap.OpenMmapFile(fname) 
  24.    if err != nil { 
  25.     logger.Errorf("failed to make a mmap file %s, %v", fname, err) 
  26.     return 
  27.    } 
  28.  
  29.    tsdb.segs.Remove(head) 
  30.    tsdb.segs.Add(newDiskSegment(mf, dn, head.MinTs(), head.MaxTs())) 
  31.    logger.Infof("write file %s take: %v", fname, time.Since(t0)) 
  32.   }() 
  33.  
  34.   tsdb.segs.head = newMemorySegment() 
  35.  } 
  36.  
  37.  return tsdb.segs.head, nil 

Figure: Memory Segment 两部分数据

从零实现一个时序数据库

写入的时候支持数据时间回拨,也就是支持有限的乱序数据写入,实现方案是在内存中对还没归档的每条时间线维护一个链表(同样使用 AVL Tree 实现),当数据点的时间戳不是递增的时候存储到链表中,查询的时候会将两部分数据合并查询,持久化的时候也会将两者合并写入。

 Mmap 内存映射

 

mmap 是一种将磁盘文件映射到进程的虚拟地址空间来实现对文件读取和修改操作的技术。

从 Linux 角度来看,操作系统的内存空间被分为「内核空间」和「用户空间」两大部分,其中内核空间和用户空间的空间大小、操作权限以及核心功能都不相同。这里的内核空间是指操作系统本身使用的内存空间,而用户空间则是提供给各个进程使用的内存空间。由于用户进程不具有访问内核资源的权限,例如访问硬件资源,因此当一个用户进程需要使用内核资源的时候,就需要通过 系统调用 来完成。

从零实现一个时序数据库

虚拟内存细节可以阅读 《虚拟内存精粹》 这篇文章。

Figure: 常规文件操作和 mmap 操作的区别

从零实现一个时序数据库

常规文件操作

读文件: 用户进程首先执行 read(2) 系统调用,会进行系统上下文环境切换,从用户态切换到内核态,之后由 DMA 将文件数据从磁盘读取到内核缓冲区,再将内核空间缓冲区的数据复制到用户空间的缓冲区中,最后 read(2) 系统调用返回,进程从内核态切换到用户态,整个过程结束。

写文件: 用户进程发起 write(2) 系统调用,从用户态切换到内核态,将数据从用户空间缓冲区复制到内核空间缓冲区,接着 write(2) 系统调用返回,同时进程从内核态切换到用户态,数据从内核缓冲区写入到磁盘,整个过程结束。

mmap 操作

mmap 内存映射的实现过程,总的来说可以分为三个阶段:

进程启动映射过程,并在虚拟地址空间中为映射创建虚拟映射区域。

执行内核空间的系统调用函数 mmap,建立文件物理地址和进程虚拟地址的一一映射关系。

进程发起对这片映射空间的访问,引发缺页异常,实现文件内容到物理内存的拷贝。

 小结

常规文件操作为了提高读写效率和保护磁盘,使用了页缓存机制。这样造成读文件时需要先将文件页从磁盘拷贝到页缓存中,由于页缓存处在内核空间,不能被用户进程直接寻址,所以还需要将页缓存中数据页再次拷贝到内存对应的用户空间中。这样,通过了两次数据拷贝过程,才能完成进程对文件内容的获取任务。写操作也是一样,待写入的 buffer 在内核空间不能直接访问,必须要先拷贝至内核空间对应的主存,再写回磁盘中(延迟写回),也是需要两次数据拷贝。

而使用 mmap 操作文件,创建新的虚拟内存区域和建立文件磁盘地址和虚拟内存区域映射这两步,没有任何文件拷贝操作。而之后访问数据时发现内存中并无数据而发起的缺页异常过程,可以通过已经建立好的映射关系,只使用一次数据拷贝,就从磁盘中将数据传入内存的用户空间中,供进程使用。

从零实现一个时序数据库

总而言之,常规文件操作需要从磁盘到页缓存再到用户主存的两次数据拷贝。而 mmap 操控文件只需要从磁盘到用户主存的一次数据拷贝过程。mmap 的关键点是实现了「用户空间」和「内核空间」的数据直接交互而省去了不同空间数据复制的开销。

 索引设计

 

TSDB 的查询,是通过 Label 组合来锁定到具体的时间线进而确定分块偏移检索出数据。

Sid(MetricHash/-/LabelHash) 是一个 Series 的唯一标识。

Label(Name/-/Value) => vm="node1"; vm="node2"; iface="eth0"。

在传统的关系型数据库,索引设计可能是这样的。

从零实现一个时序数据库

时序数据是 NoSchema 的,没办法提前建表和定义数据模型 ,因为我们要支持用户上报任意 Label 组合的数据,这样的话就没办法进行动态的扩展了。或许你会灵光一现 ✨,既然这样,那把 Labels 放一个字段拼接起来不就可以无限扩展啦,比如下面这个样子。

从零实现一个时序数据库

哟嚯,乍一看没毛病,靓仔窃喜。

不对,有问题,要定位到其中的某条时间线,那我是不是得全表扫描一趟。而且这种设计还有另外一个弊病,就是会导致内存激增,Label 的 Name 和 Value 都可能是特别长的字符串。

那怎么办呢(靓仔沉默...),刹那间我的脑中闪过一个帅气的身影,没错,就是你,花泽类「只要倒立眼泪就不会流出来」。

我悟了!要学会逆向思维,把 Label 当做主键,Sid 当做其字段不就好了。这其实有点类似于 ElasticSearch 中的倒排索引,主键为 Keyword,字段为 DocumentID。索引设计如下。

从零实现一个时序数据库

Label 作为主键时会建立索引(Hashkey),查找的效率可视为 O(1),再根据锁定的 Label 来最终确定想要的 Sid。举个例子,我们想要查找 {vm="node1", iface="eth0"} 的时间线的话就可以快速定位到 Sids(忽略其他 ... sid)。

  1. sid1; sid2; sid3 
  2. sid2; sid3; sid5 

两者求一个交集,就可以得到最终要查询的 Sid 为 sid2 和 sid3。Nice!

假设我们的查询只支持相等匹配的话,格局明显就小了。查询条件是 {vm=~"node*", iface="eth0"} 肿么办?对 label1、label2、label3 和 label4 一起求一个并集吗?显然不是,因为这样算的话那结果就是 sid3。

厘清关系就不难看出,只要对相同的 Label Name 做并集然后再对不同的 Label Name 求交集就可以了。这样算的正确结果就是 sid3 和 sid5。实现的时候用到了 Roaring Bitmap,一种优化的位图算法。

Memory Segment 索引匹配

  1. func (mim *memoryIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []string { 
  2.  // ... 
  3.  sids := newMemorySidSet() 
  4.  var got bool 
  5.  for i := len(lms) - 1; i >= 0; i-- { 
  6.   tmp := newMemorySidSet() 
  7.   vs := lvs.Match(lms[i]) 
  8.   // 对相同的 Label Name 求并集 
  9.   for _, v := range vs { 
  10.    midx := mim.idx[joinSeparator(lms[i].Name, v)] 
  11.    if midx == nil || midx.Size() <= 0 { 
  12.     continue 
  13.    } 
  14.  
  15.    tmp.Union(midx.Copy()) 
  16.   } 
  17.  
  18.   if tmp == nil || tmp.Size() <= 0 { 
  19.    return nil 
  20.   } 
  21.  
  22.   if !got { 
  23.    sids = tmp 
  24.    got = true 
  25.    continue 
  26.   } 
  27.  
  28.   // 对不同的 Label Name 求交集 
  29.   sids.Intersection(tmp.Copy()) 
  30.  } 
  31.  
  32.  return sids.List() 

Disk Segment 索引匹配

  1. func (dim *diskIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []uint32 { 
  2.  // ... 
  3.  
  4.  lst := make([]*roaring.Bitmap, 0) 
  5.  for i := len(lms) - 1; i >= 0; i-- { 
  6.   tmp := make([]*roaring.Bitmap, 0) 
  7.   vs := lvs.Match(lms[i]) 
  8.  
  9.   // 对相同的 Label Name 求并集 
  10.   for _, v := range vs { 
  11.    didx := dim.label2sids[joinSeparator(lms[i].Name, v)] 
  12.    if didx == nil || didx.set.IsEmpty() { 
  13.     continue 
  14.    } 
  15.  
  16.    tmp = append(tmp, didx.set
  17.   } 
  18.  
  19.   union := roaring.ParOr(4, tmp...) 
  20.   if union.IsEmpty() { 
  21.    return nil 
  22.   } 
  23.  
  24.   lst = append(lst, union
  25.  } 
  26.  
  27.  // 对不同的 Label Name 求交集 
  28.  return roaring.ParAnd(4, lst...).ToArray() 

然而,确定相同的 LabelName 也是一个问题,因为 Label 本身就代表着 Name:Value,难不成我还要遍历所有 label 才能确定嘛,这不就又成了全表扫描???

没有什么问题是一个索引解决不了的,如果有,那就再增加一个索引。--- 鲁迅。

只要我们保存 Label 的 Name 对应的 Value 列表的映射关系即可高效解决这个问题。

从零实现一个时序数据库

还是上面的 {vm=~"node1|node2", iface="eth0"} 查询,第一步通过正则匹配确定匹配到 node1, node2,第二步匹配到 eth0,再将 LabelName 和 LabelValue 一拼装,Label 就出来了,完事!

桥豆麻袋!还有一个精彩的正则匹配优化算法没介绍。

fastRegexMatcher 是一种优化的正则匹配器,算法来自 Prometheus。

  1. // 思路就是尽量先执行前缀匹配和后缀匹配 能不用正则就不用正则 
  2. // 如 label 表达式为 {vm="node*"
  3. // 而我们此时内存中有 vm=node1, vm=node2, vm=foo, vm=bar,那这个时候只需要前缀匹配就能直接把 vm=foo,vm=bar 给过滤了 
  4. // 毕竟前缀匹配和后缀匹配的执行效率还是比正则高不少的 
  5. type fastRegexMatcher struct { 
  6.  re       *regexp.Regexp 
  7.  prefix   string 
  8.  suffix   string 
  9.  contains string 
  10.  
  11. func newFastRegexMatcher(v string) (*fastRegexMatcher, error) { 
  12.  re, err := regexp.Compile("^(?:" + v + ")$"
  13.  if err != nil { 
  14.   return nil, err 
  15.  } 
  16.  
  17.  parsed, err := syntax.Parse(v, syntax.Perl) 
  18.  if err != nil { 
  19.   return nil, err 
  20.  } 
  21.  
  22.  m := &fastRegexMatcher{ 
  23.   re: re, 
  24.  } 
  25.  
  26.  if parsed.Op == syntax.OpConcat { 
  27.   m.prefix, m.suffix, m.contains = optimizeConcatRegex(parsed) 
  28.  } 
  29.  
  30.  return m, nil 
  31.  
  32. // optimizeConcatRegex returns literal prefix/suffix text that can be safely 
  33. // checked against the label value before running the regexp matcher. 
  34. func optimizeConcatRegex(r *syntax.Regexp) (prefix, suffix, contains string) { 
  35.  sub := r.Sub 
  36.  
  37.  // We can safely remove begin and end text matchers respectively 
  38.  // at the beginning and end of the regexp. 
  39.  if len(sub) > 0 && sub[0].Op == syntax.OpBeginText { 
  40.   sub = sub[1:] 
  41.  } 
  42.  if len(sub) > 0 && sub[len(sub)-1].Op == syntax.OpEndText { 
  43.   sub = sub[:len(sub)-1] 
  44.  } 
  45.  
  46.  if len(sub) == 0 { 
  47.   return 
  48.  } 
  49.  
  50.  // Given Prometheus regex matchers are always anchored to the begin/end 
  51.  // of the text, if the first/last operations are literals, we can safely 
  52.  // treat them as prefix/suffix. 
  53.  if sub[0].Op == syntax.OpLiteral && (sub[0].Flags&syntax.FoldCase) == 0 { 
  54.   prefix = string(sub[0].Rune) 
  55.  } 
  56.  if last := len(sub) - 1; sub[last].Op == syntax.OpLiteral && (sub[last].Flags&syntax.FoldCase) == 0 { 
  57.   suffix = string(sub[last].Rune) 
  58.  } 
  59.  
  60.  // If contains any literal which is not a prefix/suffix, we keep the 
  61.  // 1st one. We do not keep the whole list of literals to simplify the 
  62.  // fast path. 
  63.  for i := 1; i < len(sub)-1; i++ { 
  64.   if sub[i].Op == syntax.OpLiteral && (sub[i].Flags&syntax.FoldCase) == 0 { 
  65.    contains = string(sub[i].Rune) 
  66.    break 
  67.   } 
  68.  } 
  69.  
  70.  return 
  71.  
  72. func (m *fastRegexMatcher) MatchString(s string) bool { 
  73.  if m.prefix != "" && !strings.HasPrefix(s, m.prefix) { 
  74.   return false 
  75.  } 
  76.  
  77.  if m.suffix != "" && !strings.HasSuffix(s, m.suffix) { 
  78.   return false 
  79.  } 
  80.  
  81.  if m.contains != "" && !strings.Contains(s, m.contains) { 
  82.   return false 
  83.  } 
  84.  return m.re.MatchString(s) 

存储布局

 

既然是数据库,那么自然少不了数据持久化的特性。了解完索引的设计,再看看落到磁盘的存储布局就很清晰了。先跑个示例程序写入一些数据热热身。

  1. package main 
  2.  
  3. import ( 
  4.  "fmt" 
  5.  "math/rand" 
  6.  "strconv" 
  7.  "time" 
  8.  
  9.  "github.com/chenjiandongx/mandodb" 
  10.  "github.com/satori/go.uuid" 
  11.  
  12. // 模拟一些监控指标 
  13. var metrics = []string{ 
  14.  "cpu.busy""cpu.load1""cpu.load5""cpu.load15""cpu.iowait"
  15.  "disk.write.ops""disk.read.ops""disk.used"
  16.  "net.in.bytes""net.out.bytes""net.in.packages""net.out.packages"
  17.  "mem.used""mem.idle""mem.used.bytes""mem.total.bytes"
  18.  
  19. // 增加 Label 数量 
  20. var uid1, uid2, uid3 []string 
  21.  
  22. func init() { 
  23.  for i := 0; i < len(metrics); i++ { 
  24.   uid1 = append(uid1, uuid.NewV4().String()) 
  25.   uid2 = append(uid2, uuid.NewV4().String()) 
  26.   uid3 = append(uid3, uuid.NewV4().String()) 
  27.  } 
  28.  
  29. func genPoints(ts int64, node, dc int) []*mandodb.Row { 
  30.  points := make([]*mandodb.Row, 0) 
  31.  for idx, metric := range metrics { 
  32.   points = append(points, &mandodb.Row{ 
  33.    Metric: metric, 
  34.    Labels: []mandodb.Label{ 
  35.     {Name"node", Value: "vm" + strconv.Itoa(node)}, 
  36.     {Name"dc", Value: strconv.Itoa(dc)}, 
  37.     {Name"foo", Value: uid1[idx]}, 
  38.     {Name"bar", Value: uid2[idx]}, 
  39.     {Name"zoo", Value: uid3[idx]}, 
  40.    }, 
  41.    Point: mandodb.Point{Ts: ts, Value: float64(rand.Int31n(60))}, 
  42.   }) 
  43.  } 
  44.  
  45.  return points 
  46.  
  47. func main() { 
  48.  store := mandodb.OpenTSDB() 
  49.  defer store.Close() 
  50.  
  51.  now := time.Now().Unix() - 36000 // 10h ago 
  52.  
  53.  for i := 0; i < 720; i++ { 
  54.   for n := 0; n < 5; n++ { 
  55.    for j := 0; j < 1024; j++ { 
  56.     _ = store.InsertRows(genPoints(now, n, j)) 
  57.    } 
  58.   } 
  59.  
  60.   now += 60 //1min 
  61.  } 
  62.  
  63.  fmt.Println("finished"
  64.  
  65.  select {} 

每个分块保存在名字为 seg-${mints}-${maxts} 文件夹里,每个文件夹含有 data 和 meta.json 两个文件。

  • data: 存储了一个 Segment 的所有数据,包括数据点和索引信息。
  • meta.json: 描述了分块的时间线数量,数据点数量以及该块的数据时间跨度。
  1. ❯  tree -h seg-* 
  2. seg-1627709713-1627716973 
  3. ├── [ 28M]  data 
  4. └── [ 110]  meta.json 
  5. seg-1627716973-1627724233 
  6. ├── [ 28M]  data 
  7. └── [ 110]  meta.json 
  8. seg-1627724233-1627731493 
  9. ├── [ 28M]  data 
  10. └── [ 110]  meta.json 
  11. seg-1627731493-1627738753 
  12. ├── [ 28M]  data 
  13. └── [ 110]  meta.json 
  14. seg-1627738753-1627746013 
  15. ├── [ 28M]  data 
  16. └── [ 110]  meta.json 
  17.  
  18. 0 directories, 10 files 
  19.  
  20. ❯  cat seg-1627709713-1627716973/meta.json -p 
  21.     "seriesCount": 81920, 
  22.     "dataPointsCount": 9912336, 
  23.     "maxTs": 1627716973, 
  24.     "minTs": 1627709713 

存储 8 万条时间线共接近 1 千万的数据点的数据块占用磁盘 28M。实际上在写入的时候,一条数据是这个样子的。

  1. {__name__="cpu.busy", node="vm0", dc="0", foo="bdac463d-8805-4cbe-bc9a-9bf495f87bab", bar="3689df1d-cbf3-4962-abea-6491861e62d2", zoo="9551010d-9726-4b3b-baf3-77e50655b950"} 1627710454 41 

这样一条数据按照 JSON 格式进行网络通信的话,大概是 200Byte,初略计算一下。

200 * 9912336 = 1982467200Byte = 1890M

可以选择 ZSTD 或者 Snappy 算法进行二次压缩(默认不开启)。还是上面的示例代码,不过在 TSDB 启动的时候指定了压缩算法。

ZstdBytesCompressor

  1. func main() { 
  2.  store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.ZstdBytesCompressor)) 
  3.  defer store.Close() 
  4.  // ... 
  5.  
  6. // 压缩效果 28M -> 25M 
  7. ❯  ll seg-1627711905-1627719165 
  8. Permissions Size User          Date Modified Name 
  9. .rwxr-xr-x   25M chenjiandongx  1 Aug 00:13  data 
  10. .rwxr-xr-x   110 chenjiandongx  1 Aug 00:13  meta.json 

SnappyBytesCompressor

  1. func main() { 
  2.  store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.SnappyBytesCompressor)) 
  3.  defer store.Close() 
  4.  // ... 
  5.  
  6. // 压缩效果 28M -> 26M 
  7. ❯  ll seg-1627763918-1627771178 
  8. Permissions Size User          Date Modified Name 
  9. .rwxr-xr-x   26M chenjiandongx  1 Aug 14:39  data 
  10. .rwxr-xr-x   110 chenjiandongx  1 Aug 14:39  meta.json 

多多少少还是有点效果的 ...

压缩是有成本的,压缩体积的同时会增大 CPU 开销(mbp 可以煎鸡蛋了),减缓写入速率。

敲黑板,接下来就要来好好讲讲 data 文件到底写了什么东西。 data 存储布局如下。

Figure: Segment Stroage

从零实现一个时序数据库

TOC 描述了 Data Block 和 Meta Block(Series Block + Labels Block)的体积,用于后面对 data 进行解析读取。Data Block 存储了每条时间线具体的数据点,时间线之间数据紧挨存储。DataContent 就是使用 Gorilla 差值算法压缩的 block。

Figure: Data Block

从零实现一个时序数据库

Labels Block 记录了具体的 Label 值以及对应 Label 与哪些 Series 相关联。

Figure: Labels Block

从零实现一个时序数据库

Series Block 记录了每条时间线的元数据,字段解释如下。

  • SidLength: Sid 的长度。
  • Sid: 时间线的唯一标识。
  • StartOffset: 时间线数据块在 Data Block 中的起始偏移。
  • EndOffset: 时间线数据块在 Data Block 中的终止偏移。
  • LabelCount: 时间线包含的 Label 数量。
  • Labels: 标签在 Labels Block 中的序号(仅记录序号,不记录具体值)。
  • Figure: Series Block

从零实现一个时序数据库

了解完设计,再看看 Meta Block 编码和解编码的代码实现,binaryMetaSerializer 实现了 MetaSerializer 接口。

  1. type MetaSerializer interface { 
  2.  Marshal(Metadata) ([]byte, error) 
  3.  Unmarshal([]byte, *Metadata) error 

编码 Metadata

  1. const ( 
  2.  endOfBlock uint16 = 0xffff 
  3.  uint16Size        = 2 
  4.  uint32Size        = 4 
  5.  uint64Size        = 8 
  6.  
  7.  magic = "https://github.com/chenjiandongx/mandodb" 
  8.  
  9. func (s *binaryMetaSerializer) Marshal(meta Metadata) ([]byte, error) { 
  10.  encf := newEncbuf() 
  11.  
  12.  // labels block 
  13.  labelOrdered := make(map[string]int
  14.  for idx, row := range meta.Labels { 
  15.   labelOrdered[row.Name] = idx 
  16.   encf.MarshalUint16(uint16(len(row.Name))) 
  17.   encf.MarshalString(row.Name
  18.   encf.MarshalUint32(uint32(len(row.Sids))) 
  19.   encf.MarshalUint32(row.Sids...) 
  20.  } 
  21.  encf.MarshalUint16(endOfBlock) 
  22.  
  23.  // series block 
  24.  for idx, series := range meta.Series { 
  25.   encf.MarshalUint16(uint16(len(series.Sid))) 
  26.   encf.MarshalString(series.Sid) 
  27.   encf.MarshalUint64(series.StartOffset, series.EndOffset) 
  28.  
  29.   rl := meta.sidRelatedLabels[idx] 
  30.   encf.MarshalUint32(uint32(rl.Len())) 
  31.  
  32.   lids := make([]uint32, 0, rl.Len()) 
  33.   for _, lb := range rl { 
  34.    lids = append(lids, uint32(labelOrdered[lb.MarshalName()])) 
  35.   } 
  36.  
  37.   sort.Slice(lids, func(i, j int) bool { 
  38.    return lids[i] < lids[j] 
  39.   }) 
  40.   encf.MarshalUint32(lids...) 
  41.  } 
  42.  encf.MarshalUint16(endOfBlock) 
  43.  
  44.  encf.MarshalUint64(uint64(meta.MinTs)) 
  45.  encf.MarshalUint64(uint64(meta.MaxTs)) 
  46.  encf.MarshalString(magic)   // <-- magic here 
  47.  
  48.  return ByteCompress(encf.Bytes()), nil 

解码 Metadata

  1. func (s *binaryMetaSerializer) Unmarshal(data []byte, meta *Metadata) error { 
  2.  data, err := ByteDecompress(data) 
  3.  if err != nil { 
  4.   return ErrInvalidSize 
  5.  } 
  6.  
  7.  if len(data) < len(magic) { 
  8.   return ErrInvalidSize 
  9.  } 
  10.  
  11.  decf := newDecbuf() 
  12.  // 检验数据完整性 
  13.  if decf.UnmarshalString(data[len(data)-len(magic):]) != magic { 
  14.   return ErrInvalidSize 
  15.  } 
  16.  
  17.  // labels block 
  18.  offset := 0 
  19.  labels := make([]seriesWithLabel, 0) 
  20.  for { 
  21.   var labelName string 
  22.   labelLen := decf.UnmarshalUint16(data[offset : offset+uint16Size]) 
  23.   offset += uint16Size 
  24.  
  25.   if labelLen == endOfBlock { 
  26.    break 
  27.   } 
  28.  
  29.   labelName = decf.UnmarshalString(data[offset : offset+int(labelLen)]) 
  30.   offset += int(labelLen) 
  31.   sidCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size]) 
  32.   offset += uint32Size 
  33.  
  34.   sidLst := make([]uint32, sidCnt) 
  35.   for i := 0; i < int(sidCnt); i++ { 
  36.    sidLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size]) 
  37.    offset += uint32Size 
  38.   } 
  39.   labels = append(labels, seriesWithLabel{Name: labelName, Sids: sidLst}) 
  40.  } 
  41.  meta.Labels = labels 
  42.  
  43.  // series block 
  44.  rows := make([]metaSeries, 0) 
  45.  for { 
  46.   series := metaSeries{} 
  47.   sidLen := decf.UnmarshalUint16(data[offset : offset+uint16Size]) 
  48.   offset += uint16Size 
  49.  
  50.   if sidLen == endOfBlock { 
  51.    break 
  52.   } 
  53.  
  54.   series.Sid = decf.UnmarshalString(data[offset : offset+int(sidLen)]) 
  55.   offset += int(sidLen) 
  56.   series.StartOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size]) 
  57.   offset += uint64Size 
  58.   series.EndOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size]) 
  59.   offset += uint64Size 
  60.   labelCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size]) 
  61.   offset += uint32Size 
  62.  
  63.   labelLst := make([]uint32, labelCnt) 
  64.   for i := 0; i < int(labelCnt); i++ { 
  65.    labelLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size]) 
  66.    offset += uint32Size 
  67.   } 
  68.   series.Labels = labelLst 
  69.   rows = append(rows, series) 
  70.  } 
  71.  meta.Series = rows 
  72.  
  73.  meta.MinTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size])) 
  74.  offset += uint64Size 
  75.  meta.MaxTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size])) 
  76.  offset += uint64Size 
  77.  
  78.  return decf.Err() 

至此,对 mandodb 的索引和存储整体设计是不是就了然于胸。

【编辑推荐】https://mp.weixin.qq.com/s/PqVHjGLLu5dXxjHubPbXYA

本文由主机测评网发布,不代表主机测评网立场,转载联系作者并注明出处:https://zhuji.jb51.net/shujuku/3120.html

联系我们

在线咨询:点击这里给我发消息

微信号:

工作日:8:30-17:30,节假日休息