POABOB

小小工程師的筆記分享

0%

如何實作 Goroutine Pool?

Goroutine Pool

前言

近期對於 golang 的 goroutine 非常感興趣,因為只要在 func 面前下一個關鍵字 go dosomething(),就能過用同步的方式來達到異步的效果。

然而,goroutine 是額外開啟一個輕量化的執行緒去執行我們的任務,如果需要在一個大量開啟 goroutines 情境下使用的話,系統很容易因為無節制的使用 goroutines 而大規模進行擴充與銷毀,這樣會造成 GC (Garbage Collection) 產生大量的壓力,從而影響我們的系統整體的效能。

並且 runtimeGC 都是 goroutine,如果 goroutine 的規模太過於龐大,記憶體無法有效進行調度, Go Scheduler 就會阻塞 goroutine,導致 Processor Local Queue 累積,造成記憶體溢出,最後有可能整個行程直接崩潰。

介紹

一般來說,如果我們要避免我們的 goroutine 無限制的持續增長 導致 process 崩潰,那麼就必需在使用前 提前限制使用的數量,如此一來,就能解決無限制增長的問題。

可是若是這樣單單的限制數量還不夠,因為 goroutine 是會被 擴充銷毀,是不是一旦建立後,先 保存起來重複使用等待一定時間再進行銷毀,就可以縮小 runtime 和 GC 運作的壓力。

假設現在有 100萬 個請求要併發執行,我們其實可以不用同等的規模建立100萬個 goroutine (硬體可能不允許),但我們可以建立 5萬個 再低一點甚至是 1萬個 也可以,只要平均把請求分配至這 1萬個 goroutine 同樣可以在 系統穩定的前提下 慢慢完成。

設計思路

  1. 初始化一個 Goroutine Pool,設定 最大 的 Worker 數量。
  2. 該 Pool 是一個以 Circular Queue 實現 FIFO 的資料結構,主要負責存放要處理 task 的 Worker。
  3. 檢查 Pool 中是否有可用的 Worker。
    1. 有,取出 Worker 執行 task。
    2. 沒有,判斷 Worker 數量是否超出 Pool 的最大限制。
      1. 是,判斷 Pool 是否為 Non-Blocking。
        1. 是,直接返回 nil
        2. 否,Blocking 等待 Worker 被放回 Pool。
      2. 否,創建 一個新的 goroutine 作為 Worker 來執行 task。
        1. 當 Worker 執行完任務,把它放回 Pool 中。
        2. 定時將 閒置的 Worker 清理。

流程圖如下所示:

Goroutine Pool 實作

定義 Pool struct

  • capacity 是我們 Pool 容量 worker 的最大數量
  • running正在執行 worker 數量
  • waiting任務阻塞等待的數量
  • workers 是使用陣列實作 circular queue 的資料結構
  • state 是當 Pool 的狀態,Release() 後會是關閉的,Reboot() 則是會重啟
  • lock 是為了避免操作 worker 時候產生 data race
  • cond 是要讓多出來的任務無法被 worker 馬上消化時,可以 阻塞 等待的一個類似 waitGroup 的功能
  • workerCache 是要讓 worker 初始化不執行時,可以重複使用的一個 cache
  • clearDonestopClear 是用來表示 定時清理 的狀態和 cancel function
  • options 是 Pool 裡的設定

pool.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type Pool struct {
// pool 容量
capacity int32

// 正在執行的goroutine
running int32

// Blocking 的 task 數
waiting int32

// 閒置的Workers
workers workerQueue

// 警告Pool要自己close
state int32

// 鎖
lock sync.Mutex

// worker 等待的鎖
cond *sync.Cond

// 回收使用過的Worker
workerCache sync.Pool

// Clear 是否完成
clearDone int32
stopClear context.CancelFunc

options *Options
}

定義 Options,客製化額外需求

此功能主要是能夠讓使用者自主設定需要的功能,若使用者不設定 Pool 也會有自己的預設模式。

option.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package grpool

import "time"

// 參數設定
type Option func(opts *Options)

// 加載設定
func loadOptions(options ...Option) *Options {
opts := new(Options)
for _, option := range options {
option(opts)
}
return opts
}

// 該設定會被用來引入至 Pool 中
type Options struct {
// 過期時間: 用於定時清理過期的 Worker (只要太久沒被使用的 Worker 就會被清理),預設為 1 秒
ExpiryDuration time.Duration

// 是否提前申請空間,大量執行需求中使用
PreAlloc bool

// Nonblocking 用來阻塞任務
// 若設定為 true,就會返回 ErrPoolOverload 錯誤
Nonblocking bool

// 用來處理 worker panic 發生的事件
PanicHandler func(interface{})

// 若設定為 true,Worker 就不會被自動清除
DisableClear bool
}

// 直接傳入 Options
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}

// 設定過期時間
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}

// 設定是否要提前創建空間
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}

// 若為 true,代表沒有有用的 Worker 時,會直接 ErrPoolOverload
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}

// Panic 事件處理
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}

// 是否要關閉 Clear
func WithDisableClear(disable bool) Option {
return func(opts *Options) {
opts.DisableClear = disable
}
}

初始化 Pool,加載 Options 並開啟自動清理

pool.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 初始化
func NewPool(size int, options ...Option) (*Pool, error) {

// 加載設定
opts := loadOptions(options...)

if !opts.DisableClear {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}

// 如果 size 不是一個有效的 Size 就使用 DefaultPoolSize
if size <= 0 {
size = DefaultPoolSize
}

// init
p := &Pool{
capacity: int32(size),
options: opts,
}

// 提前申請 worker
p.workerCache.New = func() interface{} {
return &Worker{
pool: p,
task: make(chan func(), workerChanCap),
}
}

p.cond = sync.NewCond(&p.lock)

// 初始化 circular queue
p.workers = newWorkerCircularQueue(size, p.options.PreAlloc)

// 定期清理過期的worker,節省系統資源
p.goClear()

return p, nil
}

定時清理過期的 Worker

ctx, p.stopClear = context.WithCancel(context.Background())

這是一個可以讓 goroutine 優雅取消的方式,只要呼叫 p.stopClear()ctx.Done() 就會被傳入通知取消工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 開啟一個 goroutine 定時清理過期的 workers
func (p *Pool) goClear() {
// 若設定不開啟就返回
if p.options.DisableClear {
return
}

var ctx context.Context
// p.stopClear() 呼叫的時候,會觸發 ctx.Done() 讓 goroutine 停止
ctx, p.stopClear = context.WithCancel(context.Background())
go p.ClearStaleWorkers(ctx)
}

// 定時清理過期的 workers
func (p *Pool) ClearStaleWorkers(ctx context.Context) {
ticker := time.NewTicker(p.options.ExpiryDuration)

defer func() {
ticker.Stop()
atomic.StoreInt32(&p.clearDone, 1)
}()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

if p.IsClosed() {
break
}

p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
p.lock.Unlock()

for i := range staleWorkers {
staleWorkers[i].finish()
staleWorkers[i] = nil
}
}
}

Pool 的基本操作,Pool 的狀態操作

這邊使用 "sync/atomic" 這個函式庫是為了方便以原子性的方式執行狀態操作,避免 data race 和一直開關鎖。

pool.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 獲取 Pool 容量
func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}

// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
func (p *Pool) Free() int {
c := p.Cap()
if c < 0 {
return -1
}
return c - p.Running()
}

// Waiting returns the number of tasks which are waiting be executed.
func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}

// 獲取正在執行的 Worker 數量
func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running))
}

func (p *Pool) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}

func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}

// 判斷是否被關閉
func (p *Pool) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED
}

Pool 執行任務

pool.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// 獲取 worker 執行任務
func (p *Pool) Schedule(task func()) error {
// 判斷Pool是否被關閉
if p.IsClosed() {
return ErrPoolClosed
}

if w := p.getWorker(); w != nil {
w.inputFunc(task)
return nil
}
return ErrPoolOverload
}

Pool 獲取/放回 Worker

一開始,可以查看看 workerCache 中有沒有 worker,可以節省 重複初始化 worker 的操作,再來就是向 worker 取用一個實例。

如果 worker 有 正常取到,那就直接返回使用。

如果沒有 worker,那就判斷目前 Pool 大小是否允許再創建一個 worker,可以的話就 直接創建

若都不行,就要 阻塞等待 其他 worker 處理完任務後,返回 Pool 時取用。

pool.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// 獲取 worker 執行任務
func (p *Pool) getWorker() (w worker) {
genWorker := func() {
if w = p.workerCache.Get().(*Worker); w == nil {
// 新開一個worker
w = &Worker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
w.run()
}

// 加鎖
p.lock.Lock()
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}

if cap := p.Cap(); cap > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
// 當前無可用worker,但是Pool沒有滿
genWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return
}

for {
p.addWaiting(1)
// 阻塞等待
p.cond.Wait()
p.addWaiting(-1)

if p.IsClosed() {
p.lock.Unlock()
return
}

if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
} else if w == nil && p.Free() > 0 {
p.lock.Unlock()
genWorker()
return
}
}
}

return
}

// 將 Worker 放回 Pool
func (p *Pool) putWorker(worker *Worker) bool {
// 避免 Worker 超出 Pool 容量,或是 Pool 已關閉
cap := p.Cap()
if (cap > 0 && p.Running() > cap) || p.IsClosed() {
p.cond.Broadcast()
return false
}

// 紀錄Woker最後一次運行時間
worker.lastUpdatedTime = time.Now()

p.lock.Lock()
defer p.lock.Unlock()
// 避免記憶體溢出
if p.IsClosed() {
return false
}

if err := p.workers.insert(worker); err != nil {
return false
}

// 把 Blocking 等待 worker 的 task 喚醒
p.cond.Signal()
return true
}

Pool 關閉與開啟

pool.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 清除 Pool 裡面的 Worker
func (p *Pool) Release() {
if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
return
}

if p.stopClear != nil {
p.stopClear()
p.stopClear = nil
}

p.lock.Lock()
p.workers.reset()
p.lock.Unlock()

// 通知阻塞的 goroutine 繼續執行
p.cond.Broadcast()
}

// 重啟一個可以使用的 Pool
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
// 重新啟用 Worker Queue
p.workers = newWorkerCircularQueue(int(p.capacity), p.options.PreAlloc)

atomic.StoreInt32(&p.clearDone, 0)
p.goClear()
}
}

Worker 的定義

  • run():執行並等待任務派發
  • finish():通知 worker 休息,並不用回收
  • getLastUpdatedTime():獲取 worker 最近執行時間
  • inputFunc():添加任務至正在執行的 worker

worker.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package grpool

import (
"fmt"
"runtime/debug"
"time"
)

type worker interface {
run()
finish()
getLastUpdatedTime() time.Time
inputFunc(func())
}

type Worker struct {
// 任務池
pool *Pool

// 任務 func() error
task chan func()

// 回收時間
lastUpdatedTime time.Time
}

func (w *Worker) run() {
w.pool.addRunning(1)
go func() {
// 回收 Pool 失敗或 worker 發生錯誤
defer func() {
w.pool.addRunning(-1)
// worker 放 cache 可以不用重新初始化
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
fmt.Printf("worker exited from panic: %v\n%s\n", p, debug.Stack())
}
}
// 喚醒 Blocking 的 task
w.pool.cond.Signal()
}()

// 監聽任務列表,有任務就拿出來執行
for f := range w.task {
// 被 finish()
if f == nil {
return
}

// 執行任務
f()

// 回收worker
if ok := w.pool.putWorker(w); !ok {
return
}
}
}()
}

func (w *Worker) finish() {
w.task <- nil
}

func (w *Worker) getLastUpdatedTime() time.Time {
return w.lastUpdatedTime
}

func (w *Worker) inputFunc(fn func()) {
w.task <- fn
}

Worker Queue 的實現

  • len():獲取目前 worker 在 Queue 中的數量
  • isEmpty():判斷目前 Queue 中是否為空
  • insert():插入一個 worker 至 Queue 裡
  • detach():從 Queue 獲取一個 worker
  • refresh():清理 Queue 中 過期的 worker
  • reset():重置整個 Queue

worker_circular_queue.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package grpool

import (
"errors"
"time"
)

var (
// Queue 滿了
errQueueIsFull = errors.New("the circular queue is full")

// Queue 已經關閉了
errQueueIsReleased = errors.New("the circular queue length is zero")
)

type workerQueue interface {
len() int
isEmpty() bool
insert(worker) error
detach() worker
refresh(duration time.Duration) []worker
reset()
}

// workerQueue 的實現
type circularQueue struct {
items []worker
expiry []worker
head int
tail int
size int
isFull bool
isPreAlloc bool
}

// 初始化 WorkerCircularQueue
func newWorkerCircularQueue(size int, preAlloc bool) *circularQueue {
if preAlloc {
return &circularQueue{
items: make([]worker, size),
size: size,
isPreAlloc: preAlloc,
}
}
return &circularQueue{
items: make([]worker, 0, size),
size: size,
isPreAlloc: preAlloc,
}
}

// 獲取 Queue 長度,因為是 Loop Queue,所以會有 head 指標 > tail 指標的情況很正常
func (wq *circularQueue) len() int {
// size == 0 || 頭尾一樣卻沒滿
if wq.size == 0 || wq.isEmpty() {
return 0
}

// 頭尾一樣滿了
if wq.head == wq.tail && wq.isFull {
return wq.size
}

if wq.tail > wq.head {
return wq.tail - wq.head
}

return wq.size - wq.head + wq.tail
}

// 判斷 Queue 是否為空
func (wq *circularQueue) isEmpty() bool {
return wq.head == wq.tail && !wq.isFull
}

// 插入一個 worker 進入 Queue
func (wq *circularQueue) insert(w worker) error {
// Pool 已經被關閉
if wq.size == 0 {
return errQueueIsReleased
}

// Pool 已經滿了
if wq.isFull {
return errQueueIsFull
}

// 增加 Worker
// 如果
if !wq.isPreAlloc && cap(wq.items) < wq.size {
wq.items = append(wq.items, w)
} else {
wq.items[wq.tail] = w
}
// 如果 tail == size,讓 tail 變 0
wq.tail = (wq.tail + 1) % wq.size

// 如果 tail == head 代表滿了
if wq.tail == wq.head {
wq.isFull = true
}

return nil
}

// 從 Queue 獲取一個 worker
func (wq *circularQueue) detach() worker {
if wq.isEmpty() {
return nil
}

// 獲取一個 Worker
w := wq.items[wq.head]
wq.items[wq.head] = nil // 避免記憶體溢出
// 如果 head == size,讓 head 變 0
wq.head = (wq.head + 1) % wq.size

wq.isFull = false

return w
}

// 重新整理 Queue,用於清理過期的 worker
func (wq *circularQueue) refresh(duration time.Duration) []worker {
expiryTime := time.Now().Add(-duration)
// 獲取過期 worker 的 index
index := wq.binarySearch(expiryTime)
if index == -1 {
return nil
}
wq.expiry = wq.expiry[:0]

if wq.head <= index {
// 找出從"頭"到 "index" 的 worker,因為 FIFO 的關係,越前面的沒有被使用代表有一端時間未使用了
wq.expiry = append(wq.expiry, wq.items[wq.head:index+1]...)
for i := wq.head; i < index+1; i++ {
wq.items[i] = nil
}
} else {
// E.g. size = 10, tail = 5, head = 8 的狀態
// 假設 index 是 3,那麼 8~9 和 0~3 的 worker 都要被清理,剩下 index 為 4, 5 的 worker
wq.expiry = append(wq.expiry, wq.items[0:index+1]...)
wq.expiry = append(wq.expiry, wq.items[wq.head:]...)
for i := 0; i < index+1; i++ {
wq.items[i] = nil
}
for i := wq.head; i < wq.size; i++ {
wq.items[i] = nil
}
}

// 清理過後,重新分配 head
head := (index + 1) % wq.size
wq.head = head
if len(wq.expiry) > 0 {
wq.isFull = false
}

// 返回這些過期 worker 要讓 pool 去手動 finish 它
return wq.expiry
}

// 二元搜尋,找出過期的 worker
func (wq *circularQueue) binarySearch(expiryTime time.Time) int {
var mid, nlen, basel, tmid int
nlen = len(wq.items)

// if no need to remove work, return -1
if wq.isEmpty() || expiryTime.Before(wq.items[wq.head].getLastUpdatedTime()) {
return -1
}

// example
// size = 8, head = 7, tail = 4
// [ 2, 3, 4, 5, nil, nil, nil, 1] true position
// 0 1 2 3 4 5 6 7
// tail head
//
// 1 2 3 4 nil nil nil 0 mapped position
// r l

// base algorithm is a copy from worker_stack
// map head and tail to effective left and right
r := (wq.tail - 1 - wq.head + nlen) % nlen
basel = wq.head
l := 0
for l <= r {
mid = l + ((r - l) >> 1)
// calculate true mid position from mapped mid position
tmid = (mid + basel + nlen) % nlen
if expiryTime.Before(wq.items[tmid].getLastUpdatedTime()) {
r = mid - 1
} else {
l = mid + 1
}
}
// return true position from mapped position
return (r + basel + nlen) % nlen
}

// 當 Pool 被 Release 後,就會觸發此方法,將所有 Worker queue 清理
func (wq *circularQueue) reset() {
if wq.isEmpty() {
return
}

// 逐一把 goroutin 解決
for {
if w := wq.detach(); w != nil {
w.finish()
continue
}
break
}

wq.items = wq.items[:0]
wq.expiry = wq.expiry[:0]
wq.size = 0
wq.head = 0
wq.tail = 0
wq.isFull = false
wq.isPreAlloc = false
}

Benchmark

10萬個任務,10萬個 worker

100萬個任務,10萬個 worker

1000萬個任務,10萬個 worker

結論

本篇文章是參考潘少 - GMP 并发调度器深度解析之手撸一个高性能 goroutine poolViney Tai-Li Shih - 那些年我們追的 Goroutine Pool 進而實現的 goroutine pool,通過 限制 goroutines 數量重複使用 goroutines 來達到效能的提升。

其中,我將一般的 stack 方式改成使用 circular queue 進而避免陣列中不斷對記憶體申請產生損耗,在大量任務要執行的時候有產生一定的作用。

在 benchmark 中,同等 worker 處理同等任務時可以獲得更快處理時間(20%)、更少的記憶體使用空間(1000%),但是當任務數量大於 worker 數量時,處理時間就會開始變慢(-30%),記憶體人仍然保持著一定的優勢。或許,變慢可能是我在設計的時候處理鎖處理的不適很恰當的原因。

本文範例程式碼在 GitHub 上的 POABOB/grpool

參考文獻

  • https://strikefreedom.top/archives/high-performance-implementation-of-goroutine-pool#toc-head-18
  • https://github.com/panjf2000/ants
  • https://cloud.tencent.com/developer/article/1918236
  • https://www.readfog.com/a/1631287186498359296
  • https://medium.com/17live-tech/%E9%82%A3%E4%BA%9B%E5%B9%B4%E6%88%91%E5%80%91%E8%BF%BD%E7%9A%84-goroutine-pool-e8d211757ee
------ 本文結束 ------