我们知道 golang 的 map 并发会有问题,所以 go 官方在 sync 包中加入了一个 sync.map 来作为一个官方的并发安全的 map 实现。

如果你了解过 java 中常用的一个并发安全的 map 叫做 ConcurrentHashMap 就会知道它有两个亮点设计:一是当链表长度过长的时候会转换为红黑树的实现,还有一个就是分段锁。得益于这两个设计也导致 java 中实现的代码非常复杂,偷笑。

那么 go 里面是如何设计的呢?今天我们就来看看它是怎么实现的。

PS: 本文 go 源码基于版本1.16.2,我觉得当有了泛型之后这个库十有八九是要改的….

数据结构定义

1
2
3
4
5
6
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
1
2
3
4
5
6
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
// 注意这个标志,这个翻译为修改过的,如果为 true 表示 dirty 和 read 是不一致的了
amended bool // true if the dirty map contains some key not in m.
}
1
2
3
4
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
p unsafe.Pointer // *interface{}
}

可以看到,它的数据结构非常简单

  • mu: dirty 操作所需要使用的锁
  • read: 里面存储的数据只读
  • dirty: 最新的数据,但是需要加锁访问
  • misses: 读取 miss 计数器

刚看到这个数据结构的时候,就可以猜测一下,通过这个命名,感觉像是这样的:读写分离,读取全部从 read 中读取;修改操作记录在 dirty 中,然后当 miss 达到一个指标后进行 dirty 和 read 的交换;当然这是猜测,让我们往下看看源码实现。

方法实现

Load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 首先从 read 中获取,因为 read 只读,所以不加锁
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]

// 如果从 read 没有获取到,则需要验证 amended 标志是否为true
// 如果 amended 为true,则证明 read 和 dirty 不一致,则需要从 dirty 中获取
if !ok && read.amended {
m.mu.Lock()
// double check 因为当加锁之后,可能 read 的数据已经修改,所以需要再次验证
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 从 dirty 中获取
e, ok = m.dirty[key]
// 注意获取之后需要增加 misses 标志数值,证明这次 read 没读到,是从 dirty 中获取的
m.missLocked()
}
m.mu.Unlock()
}
// 如果 dirty 中也没有那么就是没有这个键对应的值
if !ok {
return nil, false
}
// 如果一开始已经从 read 中获取到,那么直接 load 就可以了
return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}

func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// 当 misses 标志数值达到了 dirty 的长度,那么就需要将 dirty 赋值给 read,让 read 存储最新的数据
// 并且把 dirty 清空,misses 标志数值清零
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
  1. 优先读取 read
  2. read 没有就看 dirty 和 read 是否一致
  3. 不一致就从 dirty 中读取
  4. 每次 miss 增加 miss 数值,当 miss 数值到达 dirty 长度的时候就重新将 dirty 赋值给 read

这里利用到了一个常用的并发操作,double check,因为验证和加锁并不是一个原子操作,所以需要二次确认,加锁之后是否还满足原来的加锁条件

Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
func (m *Map) Store(key, value interface{}) {
// 先从 read 中获取,如果存在则尝试直接修改,注意这里的修改是一个原子操作,并且判断了这个 key 是否已经被删除
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
// double check
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
// 这里比较难理解一点:
// 如果加锁之后,read 能获取到,并且被标记为删除,则认为 dirty 肯定不为空,并且这个 key 不在 dirty 中,所以需要在 dirty 中添加这个 key
m.dirty[key] = e
}
// 然后将 value 值更新到 entry 中
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果 read 中还是不存在,但是 dirty 中存在,则直接更新
e.storeLocked(&value)
} else {
// 如果read,dirty都没有,则是一个新的 key
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
// 如果之前 read 没有被修改过,则需要初始化 dirty
m.dirtyLocked()
// 并且需要更行 read 为 已经被修改过
m.read.Store(readOnly{m: read.m, amended: true})
}
// 此时 dirty 已经被初始化过了,直接添加新的 key 就可以了
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
// 如果这个键已经被删除了,则直接返回
if p == expunged {
return false
}
// 如果还存在,则直接修改,利用 cas 原子操作
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 注意这里有一个细节,需要验证是否已经被删除,如果已经被删除无需要添加到 dirty 中
// 并且内部将 nil 的 entry 设置为 expunged
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}

存储的时候相对复杂一些:

  1. 如果 read 中存在则直接尝试更新
  2. 如果 read 中不存在则加锁
  3. double check
  4. 如何二次确认时已经 read 中存在则需要判断是否已经被删除,如果已经删除需要在 dirty 中加回来
  5. 如果二次确认 read 中不存在,则查看 dirty 中是否存在, 如果存在直接更新
  6. 如果 read 和 dirty 中都不存在,则是一个新的 key
  7. 如果 dirty 还没有被初始化,则先初始化
  8. 最后更新 dirty 即可

需要把握的细节是:

  • read 和 dirty 可能不一致,dirty 有就可以操作
  • double check 之后 key 也有可能被删除

Delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果 read 中没有,就尝试去 dirty 中找
if !ok && read.amended {
m.mu.Lock()
// 这里也是 double check
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 反正只要 dirty 中有就直接操作就好了
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
// 这里是细节需要增加 miss 的计数,因为这里本质也是一个 load 操作, LoadAndDelete 嘛
m.missLocked()
}
m.mu.Unlock()
}
// 如果 read 中有,就直接操作就好了
if ok {
return e.delete()
}
return nil, false
}

// 注意这里上面的所有删除操作都是直接 cas 交换为 nil,并不是很多人说的标记为 expunged,expunged 是在 read 重新赋值个 dirty 初始化的时候才进行的
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}

删除比较简单

  1. 如果 read 中存在则直接操作
  2. 如果 read 中不存在则需要去 dirty 中找
  3. 删除的时候只需要设置为 nil 就可以了

Range

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
// 如果有修改过,那么 dirty 中的数据才是最新的,所以需要重新赋值给 read 然后读取
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}

for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

理解了前面几个方法,range 就很简单了,就只需要知道如果已经修改过的时候需要遍历的是 dirty 的,因为 dirty 才是最新的

设计原理

那么 sync.map 的设计原理是什么呢?其实非常简单:

  • 读写分离,读取大多数情况下会直接读 read 不需要加锁
  • 懒更新,删除标记一下不需要额外操作其他数据结构

所以整体设计真的非常简单易懂,没有复杂的数据结构,当然这样的设计其实性能上并不会满足所有情况的要求

总结

实际使用

其实我实际在很多代码中更多的时候看到的是,使用 sync.Mutex 或者 RWMutex 来实现的,如:https://blog.golang.org/maps#TOC_6. 一方面我 sync.Map 使用起来必须进行 interface 转换,代码写起来比较麻烦,还需要额外进行封装一层;还有就是当前性能还没有那么极致的追求,所以很多时候也够用。

适用场景

  1. 读多写少的场景
  2. 多个goroutine读/写/修改的key集合没有交集的场景
  3. 压测后 sync.Map 确实能带来性能提升的场景

其他场景其实个人也并不建议去使用

为什么 go 不采用类似 java 的实现

其实肯定有人已经尝试过了 https://github.com/orcaman/concurrent-map 但是不采纳的原因也很简单,go 官方一直秉承的就是 less is more 他们很多地方都认为简单一点是好事,一旦复杂了,就会变得原来越难以维护。其实个人认为还是等待泛型的出现,否则轮子要重新造一次,有点累。