Goroutine Pool 前言
近期對於 golang 的 goroutine 非常感興趣,因為只要在 func 面前下一個關鍵字 go dosomething()
,就能過用同步的方式來達到異步的效果。
然而,goroutine 是額外開啟一個輕量化的執行緒去執行我們的任務,如果需要在一個大量開啟 goroutines 情境下使用的話,系統很容易因為無節制的使用 goroutines 而大規模進行擴充與銷毀,這樣會造成 GC (Garbage Collection) 產生大量的壓力,從而影響我們的系統整體的效能。
並且 runtime
和 GC
都是 goroutine,如果 goroutine 的規模太過於龐大,記憶體無法有效進行調度, Go Scheduler
就會阻塞 goroutine,導致 Processor
Local Queue 累積,造成記憶體溢出,最後有可能整個行程直接崩潰。
介紹 一般來說,如果我們要避免我們的 goroutine 無限制的持續增長
導致 process
崩潰,那麼就必需在使用前 提前限制使用的數量
,如此一來,就能解決無限制增長的問題。
可是若是這樣單單的限制數量還不夠,因為 goroutine 是會被 擴充
和 銷毀
,是不是一旦建立後,先 保存起來重複使用
,等待一定時間再進行銷毀
,就可以縮小 runtime 和 GC 運作的壓力。
假設現在有 100萬
個請求要併發執行,我們其實可以不用同等的規模建立100萬個 goroutine (硬體可能不允許
),但我們可以建立 5萬個
再低一點甚至是 1萬個
也可以,只要平均把請求分配至這 1萬個
goroutine 同樣可以在 系統穩定的前提下
慢慢完成。
設計思路
初始化一個 Goroutine Pool
,設定 最大
的 Worker 數量。
該 Pool 是一個以 Circular Queue
實現 FIFO
的資料結構,主要負責存放要處理 task 的 Worker。
檢查 Pool 中是否有可用的 Worker。
有,取出 Worker 執行 task。
沒有,判斷 Worker 數量是否超出 Pool 的最大限制。
是,判斷 Pool 是否為 Non-Blocking。
是,直接返回 nil
。
否,Blocking 等待 Worker 被放回 Pool。
否,創建
一個新的 goroutine 作為 Worker 來執行 task。
當 Worker 執行完任務,把它放回 Pool 中。
定時將 閒置的 Worker
清理。
流程圖如下所示:
Goroutine Pool 實作 定義 Pool struct
capacity
是我們 Pool 容量 worker 的最大數量
running
是 正在執行 worker 數量
waiting
是 任務阻塞等待的數量
workers
是使用陣列實作 circular queue
的資料結構
state
是當 Pool 的狀態,Release()
後會是關閉的,Reboot()
則是會重啟
lock
是為了避免操作 worker 時候產生 data race
cond
是要讓多出來的任務無法被 worker 馬上消化時,可以 阻塞
等待的一個類似 waitGroup
的功能
workerCache
是要讓 worker 初始化
或 不執行時
,可以重複使用的一個 cache
clearDone
、stopClear
是用來表示 定時清理
的狀態和 cancel function
options
是 Pool 裡的設定
pool.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 type Pool struct { capacity int32 running int32 waiting int32 workers workerQueue state int32 lock sync.Mutex cond *sync.Cond workerCache sync.Pool clearDone int32 stopClear context.CancelFunc options *Options }
定義 Options
,客製化額外需求 此功能主要是能夠讓使用者自主設定需要的功能
,若使用者不設定 Pool 也會有自己的預設模式。
option.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 package grpoolimport "time" type Option func (opts *Options) func loadOptions (options ...Option) *Options { opts := new (Options) for _, option := range options { option(opts) } return opts } type Options struct { ExpiryDuration time.Duration PreAlloc bool Nonblocking bool PanicHandler func (interface {}) DisableClear bool } func WithOptions (options Options) Option { return func (opts *Options) { *opts = options } } func WithExpiryDuration (expiryDuration time.Duration) Option { return func (opts *Options) { opts.ExpiryDuration = expiryDuration } } func WithPreAlloc (preAlloc bool ) Option { return func (opts *Options) { opts.PreAlloc = preAlloc } } func WithNonblocking (nonblocking bool ) Option { return func (opts *Options) { opts.Nonblocking = nonblocking } } func WithPanicHandler (panicHandler func (interface {}) ) Option { return func (opts *Options) { opts.PanicHandler = panicHandler } } func WithDisableClear (disable bool ) Option { return func (opts *Options) { opts.DisableClear = disable } }
初始化 Pool,加載 Options
並開啟自動清理 pool.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func NewPool (size int , options ...Option) (*Pool, error) { opts := loadOptions(options...) if !opts.DisableClear { if expiry := opts.ExpiryDuration; expiry < 0 { return nil , ErrInvalidPoolExpiry } else if expiry == 0 { opts.ExpiryDuration = DefaultCleanIntervalTime } } if size <= 0 { size = DefaultPoolSize } p := &Pool{ capacity: int32 (size), options: opts, } p.workerCache.New = func () interface {} { return &Worker{ pool: p, task: make (chan func () , workerChanCap ), } } p.cond = sync.NewCond(&p.lock) p.workers = newWorkerCircularQueue(size, p.options.PreAlloc) p.goClear() return p, nil }
定時清理過期的 Worker ctx, p.stopClear = context.WithCancel(context.Background())
這是一個可以讓 goroutine 優雅取消的方式,只要呼叫 p.stopClear()
,ctx.Done()
就會被傳入通知取消工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (p *Pool) goClear () { if p.options.DisableClear { return } var ctx context.Context ctx, p.stopClear = context.WithCancel(context.Background()) go p.ClearStaleWorkers(ctx) } func (p *Pool) ClearStaleWorkers (ctx context.Context) { ticker := time.NewTicker(p.options.ExpiryDuration) defer func () { ticker.Stop() atomic.StoreInt32(&p.clearDone, 1 ) }() for { select { case <-ctx.Done(): return case <-ticker.C: } if p.IsClosed() { break } p.lock.Lock() staleWorkers := p.workers.refresh(p.options.ExpiryDuration) p.lock.Unlock() for i := range staleWorkers { staleWorkers[i].finish() staleWorkers[i] = nil } } }
Pool 的基本操作,Pool 的狀態操作 這邊使用 "sync/atomic"
這個函式庫是為了方便以原子性的方式執行狀態操作,避免 data race 和一直開關鎖。
pool.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (p *Pool) Cap () int { return int (atomic.LoadInt32(&p.capacity)) } func (p *Pool) Free () int { c := p.Cap() if c < 0 { return -1 } return c - p.Running() } func (p *Pool) Waiting () int { return int (atomic.LoadInt32(&p.waiting)) } func (p *Pool) Running () int { return int (atomic.LoadInt32(&p.running)) } func (p *Pool) addWaiting (delta int ) { atomic.AddInt32(&p.waiting, int32 (delta)) } func (p *Pool) addRunning (delta int ) { atomic.AddInt32(&p.running, int32 (delta)) } func (p *Pool) IsClosed () bool { return atomic.LoadInt32(&p.state) == CLOSED }
Pool 執行任務 pool.go
1 2 3 4 5 6 7 8 9 10 11 12 13 func (p *Pool) Schedule (task func () ) error { if p.IsClosed() { return ErrPoolClosed } if w := p.getWorker(); w != nil { w.inputFunc(task) return nil } return ErrPoolOverload }
Pool 獲取/放回 Worker 一開始,可以查看看 workerCache
中有沒有 worker,可以節省 重複初始化 worker 的操作
,再來就是向 worker 取用一個實例。
如果 worker 有 正常取到
,那就直接返回使用。
如果沒有 worker,那就判斷目前 Pool 大小是否允許再創建一個 worker,可以的話就 直接創建
。
若都不行,就要 阻塞等待
其他 worker 處理完任務後,返回 Pool 時取用。
pool.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 func (p *Pool) getWorker () (w worker) { genWorker := func () { if w = p.workerCache.Get().(*Worker); w == nil { w = &Worker{ pool: p, task: make (chan func () , workerChanCap ), } } w.run() } p.lock.Lock() if w = p.workers.detach(); w != nil { p.lock.Unlock() return } if cap := p.Cap(); cap > p.Running() { p.lock.Unlock() genWorker() } else { if p.options.Nonblocking { p.lock.Unlock() return } for { p.addWaiting(1 ) p.cond.Wait() p.addWaiting(-1 ) if p.IsClosed() { p.lock.Unlock() return } if w = p.workers.detach(); w != nil { p.lock.Unlock() return } else if w == nil && p.Free() > 0 { p.lock.Unlock() genWorker() return } } } return } func (p *Pool) putWorker (worker *Worker) bool { cap := p.Cap() if (cap > 0 && p.Running() > cap ) || p.IsClosed() { p.cond.Broadcast() return false } worker.lastUpdatedTime = time.Now() p.lock.Lock() defer p.lock.Unlock() if p.IsClosed() { return false } if err := p.workers.insert(worker); err != nil { return false } p.cond.Signal() return true }
Pool 關閉與開啟 pool.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (p *Pool) Release () { if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { return } if p.stopClear != nil { p.stopClear() p.stopClear = nil } p.lock.Lock() p.workers.reset() p.lock.Unlock() p.cond.Broadcast() } func (p *Pool) Reboot () { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { p.workers = newWorkerCircularQueue(int (p.capacity), p.options.PreAlloc) atomic.StoreInt32(&p.clearDone, 0 ) p.goClear() } }
Worker
的定義
run()
:執行並等待任務派發
finish()
:通知 worker 休息,並不用回收
getLastUpdatedTime()
:獲取 worker 最近執行時間
inputFunc()
:添加任務至正在執行的 worker
worker.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 package grpoolimport ( "fmt" "runtime/debug" "time" ) type worker interface { run() finish() getLastUpdatedTime() time.Time inputFunc(func () ) } type Worker struct { pool *Pool task chan func () lastUpdatedTime time.Time } func (w *Worker) run () { w.pool.addRunning(1 ) go func () { defer func () { w.pool.addRunning(-1 ) w.pool.workerCache.Put(w) if p := recover (); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { ph(p) } else { fmt.Printf("worker exited from panic: %v\n%s\n" , p, debug.Stack()) } } w.pool.cond.Signal() }() for f := range w.task { if f == nil { return } f() if ok := w.pool.putWorker(w); !ok { return } } }() } func (w *Worker) finish () { w.task <- nil } func (w *Worker) getLastUpdatedTime () time .Time { return w.lastUpdatedTime } func (w *Worker) inputFunc (fn func () ) { w.task <- fn }
Worker Queue
的實現
len()
:獲取目前 worker 在 Queue 中的數量
isEmpty()
:判斷目前 Queue 中是否為空
insert()
:插入一個 worker 至 Queue 裡
detach()
:從 Queue 獲取一個 worker
refresh()
:清理 Queue 中 過期的 worker
reset()
:重置整個 Queue
worker_circular_queue.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 package grpoolimport ( "errors" "time" ) var ( errQueueIsFull = errors.New("the circular queue is full" ) errQueueIsReleased = errors.New("the circular queue length is zero" ) ) type workerQueue interface { len () int isEmpty() bool insert(worker) error detach() worker refresh(duration time.Duration) []worker reset() } type circularQueue struct { items []worker expiry []worker head int tail int size int isFull bool isPreAlloc bool } func newWorkerCircularQueue (size int , preAlloc bool ) *circularQueue { if preAlloc { return &circularQueue{ items: make ([]worker, size), size: size, isPreAlloc: preAlloc, } } return &circularQueue{ items: make ([]worker, 0 , size), size: size, isPreAlloc: preAlloc, } } func (wq *circularQueue) len () int { if wq.size == 0 || wq.isEmpty() { return 0 } if wq.head == wq.tail && wq.isFull { return wq.size } if wq.tail > wq.head { return wq.tail - wq.head } return wq.size - wq.head + wq.tail } func (wq *circularQueue) isEmpty () bool { return wq.head == wq.tail && !wq.isFull } func (wq *circularQueue) insert (w worker) error { if wq.size == 0 { return errQueueIsReleased } if wq.isFull { return errQueueIsFull } if !wq.isPreAlloc && cap (wq.items) < wq.size { wq.items = append (wq.items, w) } else { wq.items[wq.tail] = w } wq.tail = (wq.tail + 1 ) % wq.size if wq.tail == wq.head { wq.isFull = true } return nil } func (wq *circularQueue) detach () worker { if wq.isEmpty() { return nil } w := wq.items[wq.head] wq.items[wq.head] = nil wq.head = (wq.head + 1 ) % wq.size wq.isFull = false return w } func (wq *circularQueue) refresh (duration time.Duration) []worker { expiryTime := time.Now().Add(-duration) index := wq.binarySearch(expiryTime) if index == -1 { return nil } wq.expiry = wq.expiry[:0 ] if wq.head <= index { wq.expiry = append (wq.expiry, wq.items[wq.head:index+1 ]...) for i := wq.head; i < index+1 ; i++ { wq.items[i] = nil } } else { wq.expiry = append (wq.expiry, wq.items[0 :index+1 ]...) wq.expiry = append (wq.expiry, wq.items[wq.head:]...) for i := 0 ; i < index+1 ; i++ { wq.items[i] = nil } for i := wq.head; i < wq.size; i++ { wq.items[i] = nil } } head := (index + 1 ) % wq.size wq.head = head if len (wq.expiry) > 0 { wq.isFull = false } return wq.expiry } func (wq *circularQueue) binarySearch (expiryTime time.Time) int { var mid, nlen, basel, tmid int nlen = len (wq.items) if wq.isEmpty() || expiryTime.Before(wq.items[wq.head].getLastUpdatedTime()) { return -1 } r := (wq.tail - 1 - wq.head + nlen) % nlen basel = wq.head l := 0 for l <= r { mid = l + ((r - l) >> 1 ) tmid = (mid + basel + nlen) % nlen if expiryTime.Before(wq.items[tmid].getLastUpdatedTime()) { r = mid - 1 } else { l = mid + 1 } } return (r + basel + nlen) % nlen } func (wq *circularQueue) reset () { if wq.isEmpty() { return } for { if w := wq.detach(); w != nil { w.finish() continue } break } wq.items = wq.items[:0 ] wq.expiry = wq.expiry[:0 ] wq.size = 0 wq.head = 0 wq.tail = 0 wq.isFull = false wq.isPreAlloc = false }
Benchmark 10萬個任務,10萬個 worker
100萬個任務,10萬個 worker
1000萬個任務,10萬個 worker
結論 本篇文章是參考潘少 - GMP 并发调度器深度解析之手撸一个高性能 goroutine pool 和 Viney Tai-Li Shih - 那些年我們追的 Goroutine Pool 進而實現的 goroutine pool,通過 限制 goroutines 數量
、 重複使用 goroutines
來達到效能的提升。
其中,我將一般的 stack
方式改成使用 circular queue
進而避免陣列中不斷對記憶體申請產生損耗,在大量任務要執行的時候有產生一定的作用。
在 benchmark 中,同等 worker 處理同等任務時可以獲得更快處理時間(20%
)、更少的記憶體使用空間(1000%
),但是當任務數量大於 worker 數量時,處理時間就會開始變慢(-30%
),記憶體人仍然保持著一定的優勢。或許,變慢可能是我在設計的時候處理鎖處理的不適很恰當的原因。
本文範例程式碼在 GitHub 上的 POABOB/grpool 。
參考文獻
https://strikefreedom.top/archives/high-performance-implementation-of-goroutine-pool#toc-head-18
https://github.com/panjf2000/ants
https://cloud.tencent.com/developer/article/1918236
https://www.readfog.com/a/1631287186498359296
https://medium.com/17live-tech/%E9%82%A3%E4%BA%9B%E5%B9%B4%E6%88%91%E5%80%91%E8%BF%BD%E7%9A%84-goroutine-pool-e8d211757ee