一、漏桶算法
算法思想
與令牌桶是“反向”的算法,當有請求到來時先放到木桶中,worker以固定的速度從木桶中取出請求進行相應。如果木桶已經滿了,直接返回請求頻率超限的錯誤碼或者頁面
適用場景
流量最均勻的限流方式,一般用于流量“整形”,例如保護數據庫的限流。先把對數據庫的訪問加入到木桶中,worker再以db能夠承受的qps從木桶中取出請求,去訪問數據庫。不太適合電商搶購和微博出現熱點事件等場景的限流,一是應對突發流量不是很靈活,二是為每個user_id/ip維護一個隊列(木桶),workder從這些隊列中拉取任務,資源的消耗會比較大。
go語言實現
通常使用隊列來實現,在go語言中可以通過buffered channel來快速實現,任務加入channel,開啟一定數量的worker從channel中獲取任務執行。
package?main import?( ?"fmt" ?"sync" ?"time" ) //?每個請求來了,把需要執行的業務邏輯封裝成Task,放入木桶,等待worker取出執行 type?Task?struct?{ ?handler?func()?Result?//?worker從木桶中取出請求對象后要執行的業務邏輯函數 ?resChan?chan?Result???//?等待worker執行并返回結果的channel ?taskID??int } //?封裝業務邏輯的執行結果 type?Result?struct?{ } //?模擬業務邏輯的函數 func?handler()?Result?{ ?time.Sleep(300?*?time.Millisecond) ?return?Result{} } func?NewTask(id?int)?Task?{ ?return?Task{ ??handler:?handler, ??resChan:?make(chan?Result), ??taskID:??id, ?} } //?漏桶 type?LeakyBucket?struct?{ ?BucketSize?int???????//?木桶的大小 ?NumWorker??int???????//?同時從木桶中獲取任務執行的worker數量 ?bucket?????chan?Task?//?存方任務的木桶 } func?NewLeakyBucket(bucketSize?int,?numWorker?int)?*LeakyBucket?{ ?return?&LeakyBucket{ ??BucketSize:?bucketSize, ??NumWorker:??numWorker, ??bucket:?????make(chan?Task,?bucketSize), ?} } func?(b?*LeakyBucket)?validate(task?Task)?bool?{ ?//?如果木桶已經滿了,返回false ?select?{ ?case?b.bucket?<-?task: ?default: ??fmt.Printf("request[id=%d]?is?refused ",?task.taskID) ??return?false ?} ?//?等待worker執行 ?<-task.resChan ?fmt.Printf("request[id=%d]?is?run ",?task.taskID) ?return?true } func?(b?*LeakyBucket)?Start()?{ ?//?開啟worker從木桶拉取任務執行 ?go?func()?{ ??for?i?:=?0;?i?二、令牌桶算法
算法思想
想象有一個木桶,以固定的速度往木桶里加入令牌,木桶滿了則不再加入令牌。服務收到請求時嘗試從木桶中取出一個令牌,如果能夠得到令牌則繼續執行后續的業務邏輯;如果沒有得到令牌,直接返回反問頻率超限的錯誤碼或頁面等,不繼續執行后續的業務邏輯
特點:由于木桶內只要有令牌,請求就可以被處理,所以令牌桶算法可以支持突發流量。同時由于往木桶添加令牌的速度是固定的,且木桶的容量有上限,所以單位時間內處理的請求書也能夠得到控制,起到限流的目的。假設加入令牌的速度為 1token/10ms,桶的容量為500,在請求比較的少的時候(小于每10毫秒1個請求)時,木桶可以先"攢"一些令牌(最多500個)。當有突發流量時,一下把木桶內的令牌取空,也就是有500個在并發執行的業務邏輯,之后要等每10ms補充一個新的令牌才能接收一個新的請求。
參數設置:木桶的容量 - 考慮業務邏輯的資源消耗和機器能承載并發處理多少業務邏輯。生成令牌的速度 - 太慢的話起不到“攢”令牌應對突發流量的效果。
適用場景:適合電商搶購或者微博出現熱點事件這種場景,因為在限流的同時可以應對一定的突發流量。如果采用均勻速度處理請求的算法,在發生熱點時間的時候,會造成大量的用戶無法訪問,對用戶體驗的損害比較大。
go語言實現:假設每100ms生產一個令牌,按user_id/IP記錄訪問最近一次訪問的時間戳 t_last 和令牌數,每次請求時如果 now - last > 100ms, 增加 (now - last) / 100ms個令牌。然后,如果令牌數 > 0,令牌數 -1 繼續執行后續的業務邏輯,否則返回請求頻率超限的錯誤碼或頁面。
package?main import?( ?"fmt" ?"sync" ?"time" ) //?并發訪問同一個user_id/ip的記錄需要上鎖 var?recordMu?map[string]*sync.RWMutex func?init()?{ ?recordMu?=?make(map[string]*sync.RWMutex) } func?max(a,?b?int)?int?{ ?if?a?>?b?{ ??return?a ?} ?return?b } type?TokenBucket?struct?{ ?BucketSize int //?木桶內的容量:最多可以存放多少個令牌 ?TokenRate?time.Duration?//?多長時間生成一個令牌 ?records?map[string]*record?//?報錯user_id/ip的訪問記錄 } //?上次訪問時的時間戳和令牌數 type?record?struct?{ ?last?time.Time ?token?int } func?NewTokenBucket(bucketSize?int,?tokenRate?time.Duration)?*TokenBucket?{ ?return?&TokenBucket{ ??BucketSize:?bucketSize, ??TokenRate:??tokenRate, ??records:????make(map[string]*record), ?} } func?(t?*TokenBucket)?getUidOrIp()?string?{ ?//?獲取請求用戶的user_id或者ip地址 ?return?"127.0.0.1" } //?獲取這個user_id/ip上次訪問時的時間戳和令牌數 func?(t?*TokenBucket)?getRecord(uidOrIp?string)?*record?{ ?if?r,?ok?:=?t.records[uidOrIp];?ok?{ ??return?r ?} ?return?&record{} } //?保存user_id/ip最近一次請求時的時間戳和令牌數量 func?(t?*TokenBucket)?storeRecord(uidOrIp?string,?r?*record)?{ ?t.records[uidOrIp]?=?r } //?驗證是否能獲取一個令牌 func?(t?*TokenBucket)?validate(uidOrIp?string)?bool?{ ?//?并發修改同一個用戶的記錄上寫鎖 ?rl,?ok?:=?recordMu[uidOrIp] ?if?!ok?{ ??var?mu?sync.RWMutex ??rl?=?&mu ??recordMu[uidOrIp]?=?rl ?} ?rl.Lock() ?defer?rl.Unlock() ?r?:=?t.getRecord(uidOrIp) ?now?:=?time.Now() ?if?r.last.IsZero()?{ ??//?第一次訪問初始化為最大令牌數 ??r.last,?r.token?=?now,?t.BucketSize ?}?else?{ ??if?r.last.Add(t.TokenRate).Before(now)?{ ???//?如果與上次請求的間隔超過了token?rate ???//?則增加令牌,更新last ???r.token?+=?max(int(now.Sub(r.last)?/?t.TokenRate),?t.BucketSize) ???r.last?=?now ??} ?} ?var?result?bool ?if?r.token?>?0?{ ??//?如果令牌數大于1,取走一個令牌,validate結果為true ??r.token-- ??result?=?true ?} ?//?保存最新的record ?t.storeRecord(uidOrIp,?r) ?return?result } //?返回是否被限流 func?(t?*TokenBucket)?IsLimited()?bool?{ ?return?!t.validate(t.getUidOrIp()) } func?main()?{ ?tokenBucket?:=?NewTokenBucket(5,?100*time.Millisecond) ?for?i?:=?0;?i6;?i++?{ ??fmt.Println(tokenBucket.IsLimited()) ?} ?time.Sleep(100?*?time.Millisecond) ?fmt.Println(tokenBucket.IsLimited()) }三、滑動時間窗口算法
算法思想
滑動時間窗口算法,是從對普通時間窗口計數的優化。使用普通時間窗口時,我們會為每個user_id/ip維護一個KV: uidOrIp: timestamp_requestCount。假設限制1秒1000個請求,那么第100ms有一個請求,這個KV變成 uidOrIp: timestamp_1,遞200ms有1個請求,我們先比較距離記錄的timestamp有沒有超過1s,如果沒有只更新count,此時KV變成 uidOrIp: timestamp_2。當第1100ms來一個請求時,更新記錄中的timestamp并重置計數,KV變成 uidOrIp: newtimestamp_1 普通時間窗口有一個問題,假設有500個請求集中在前1s的后100ms,500個請求集中在后1s的前100ms,其實在這200ms沒就已經請求超限了,但是由于時間窗每經過1s就會重置計數,就無法識別到此時的請求超限。
對于滑動時間窗口,我們可以把1ms的時間窗口劃分成10個time slot, 每個time slot統計某個100ms的請求數量。每經過100ms,有一個新的time slot加入窗口,早于當前時間100ms的time slot出窗口。窗口內最多維護10個time slot,儲存空間的消耗同樣是比較低的。
適用場景
與令牌桶一樣,有應對突發流量的能力
go語言實現
主要就是實現sliding window算法。可以參考Bilibili開源的kratos框架里circuit breaker用循環列表保存time slot對象的實現,他們這個實現的好處是不用頻繁的創建和銷毀time slot對象。下面給出一個簡單的基本實現:
package?main import?( ?"fmt" ?"sync" ?"time" ) var?winMu?map[string]*sync.RWMutex func?init()?{ ?winMu?=?make(map[string]*sync.RWMutex) } type?timeSlot?struct?{ ?timestamp?time.Time?//?這個timeSlot的時間起點 ?count?????int???????//?落在這個timeSlot內的請求數 } func?countReq(win?[]*timeSlot)?int?{ ?var?count?int ?for?_,?ts?:=?range?win?{ ??count?+=?ts.count ?} ?return?count } type?SlidingWindowLimiter?struct?{ ?SlotDuration?time.Duration?//?time?slot的長度 ?WinDuration??time.Duration?//?sliding?window的長度 ?numSlots?????int???????????//?window內最多有多少個slot ?windows??????map[string][]*timeSlot ?maxReq???????int?//?win?duration內允許的最大請求數 } func?NewSliding(slotDuration?time.Duration,?winDuration?time.Duration,?maxReq?int)?*SlidingWindowLimiter?{ ?return?&SlidingWindowLimiter{ ??SlotDuration:?slotDuration, ??WinDuration:??winDuration, ??numSlots:?????int(winDuration?/?slotDuration), ??windows:??????make(map[string][]*timeSlot), ??maxReq:???????maxReq, ?} } //?獲取user_id/ip的時間窗口 func?(l?*SlidingWindowLimiter)?getWindow(uidOrIp?string)?[]*timeSlot?{ ?win,?ok?:=?l.windows[uidOrIp] ?if?!ok?{ ??win?=?make([]*timeSlot,?0,?l.numSlots) ?} ?return?win } func?(l?*SlidingWindowLimiter)?storeWindow(uidOrIp?string,?win?[]*timeSlot)?{ ?l.windows[uidOrIp]?=?win } func?(l?*SlidingWindowLimiter)?validate(uidOrIp?string)?bool?{ ?//?同一user_id/ip并發安全 ?mu,?ok?:=?winMu[uidOrIp] ?if?!ok?{ ??var?m?sync.RWMutex ??mu?=?&m ??winMu[uidOrIp]?=?mu ?} ?mu.Lock() ?defer?mu.Unlock() ?win?:=?l.getWindow(uidOrIp) ?now?:=?time.Now() ?//?已經過期的time?slot移出時間窗 ?timeoutOffset?:=?-1 ?for?i,?ts?:=?range?win?{ ??if?ts.timestamp.Add(l.WinDuration).After(now)?{ ???break ??} ??timeoutOffset?=?i ?} ?if?timeoutOffset?>?-1?{ ??win?=?win[timeoutOffset+1:] ?} ?//?判斷請求是否超限 ?var?result?bool ?if?countReq(win)??0?{ ??lastSlot?=?win[len(win)-1] ??if?lastSlot.timestamp.Add(l.SlotDuration).Before(now)?{ ???lastSlot?=?&timeSlot{timestamp:?now,?count:?1} ???win?=?append(win,?lastSlot) ??}?else?{ ???lastSlot.count++ ??} ?}?else?{ ??lastSlot?=?&timeSlot{timestamp:?now,?count:?1} ??win?=?append(win,?lastSlot) ?} ?l.storeWindow(uidOrIp,?win) ?return?result } func?(l?*SlidingWindowLimiter)?getUidOrIp()?string?{ ?return?"127.0.0.1" } func?(l?*SlidingWindowLimiter)?IsLimited()?bool?{ ?return?!l.validate(l.getUidOrIp()) } func?main()?{ ?limiter?:=?NewSliding(100*time.Millisecond,?time.Second,?10) ?for?i?:=?0;?i?5;?i++?{ ??fmt.Println(limiter.IsLimited()) ?} ?time.Sleep(100?*?time.Millisecond) ?for?i?:=?0;?i?5;?i++?{ ??fmt.Println(limiter.IsLimited()) ?} ?fmt.Println(limiter.IsLimited()) ?for?_,?v?:=?range?limiter.windows[limiter.getUidOrIp()]?{ ??fmt.Println(v.timestamp,?v.count) ?} ?fmt.Println("a?thousand?years?later...") ?time.Sleep(time.Second) ?for?i?:=?0;?i?7;?i++?{ ??fmt.Println(limiter.IsLimited()) ?} ?for?_,?v?:=?range?limiter.windows[limiter.getUidOrIp()]?{ ??fmt.Println(v.timestamp,?v.count) ?} }編輯:黃飛
評論
查看更多