需求:第三方的接口,限制接口請(qǐng)求的QPS,每秒5次
需要控制job「訪問接口」的次數(shù),每秒不能同時(shí)超過5次,包括 進(jìn)行中的任務(wù)、剛啟動(dòng)的任務(wù)
要確保單位時(shí)間內(nèi)(例如每秒)運(yùn)行的任務(wù)數(shù)量不超過特定的上限(如5個(gè)任務(wù)),并且在任務(wù)執(zhí)行完成得很快時(shí),考慮已完成的任務(wù)和正在執(zhí)行的任務(wù)作為正在運(yùn)行的任務(wù)總數(shù),可以使用限流器來控制任務(wù)的啟動(dòng)頻率,并結(jié)合使用信號(hào)量來管理同時(shí)運(yùn)行的任務(wù)數(shù)量。
具體來說,使用一個(gè)信號(hào)量來限制同時(shí)進(jìn)行的任務(wù)數(shù)量,并且在任務(wù)完成時(shí),僅在下一秒鐘允許新的任務(wù)開始,以確保即使某些任務(wù)快速完成,也不會(huì)在同一秒鐘內(nèi)啟動(dòng)超過限制數(shù)量的任務(wù)
package main import ( "context" "fmt" "math/rand" "sync" "sync/atomic" "time" "golang.org/x/time/rate" ) func RateLimit() { const maxJobsPerSecond = 5 const numJobs = 22 var wg sync.WaitGroup // 計(jì)數(shù)器 var runningJobs int32 // 當(dāng)前正在執(zhí)行的任務(wù)數(shù)量 var startedJobs int32 // 啟動(dòng)后的任務(wù)數(shù)量 var finishedJobs int32 // 剛完成的任務(wù)數(shù)量 limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(maxJobsPerSecond)), maxJobsPerSecond) semaphore := make(chan struct{}, maxJobsPerSecond) for i := 1; i <= numJobs; i++ { wg.Add(1) go func(jobID int) { defer wg.Done() limiter.Wait(context.Background()) // 等待限流器允許進(jìn)行下一個(gè)任務(wù) semaphore <- struct{}{} // 獲取信號(hào)量 atomic.AddInt32(&startedJobs, 1) atomic.AddInt32(&runningJobs, 1) executeJob(jobID) // 執(zhí)行任務(wù) atomic.AddInt32(&finishedJobs, 1) atomic.AddInt32(&runningJobs, -1) <-time.After(time.Second) // 等待一秒鐘后釋放信號(hào)量 <-semaphore // 打印當(dāng)前狀態(tài) printStatus(&runningJobs, &startedJobs, &finishedJobs) }(i) } wg.Wait() fmt.Println("所有工作完成") }
注意事項(xiàng)
限流器rate.NewLimiter用于控制任務(wù)啟動(dòng)的頻率,以確保每秒不超過maxJobsPerSecond個(gè)任務(wù)開始執(zhí)行。
使用信號(hào)量semaphore來控制同時(shí)進(jìn)行的任務(wù)數(shù)量。
為了確保在任何一秒內(nèi)同時(shí)進(jìn)行的任務(wù)數(shù)量不超過限制,在任務(wù)完成后等待一秒鐘,然后再釋放信號(hào)量。這樣做可以保證即使任務(wù)很快完成,也不會(huì)立即啟動(dòng)新的任務(wù)。
這種實(shí)現(xiàn)方式確保了即使任務(wù)執(zhí)行得很快,每秒鐘啟動(dòng)的新任務(wù)數(shù)量也不會(huì)超過限制,并且同時(shí)考慮了正在執(zhí)行和剛剛完成的任務(wù)。
動(dòng)態(tài)創(chuàng)建協(xié)程
協(xié)程的啟動(dòng)是動(dòng)態(tài)的。在代碼中,每個(gè)任務(wù)對(duì)應(yīng)于一個(gè)動(dòng)態(tài)創(chuàng)建的協(xié)程。這些協(xié)程是在循環(huán)中根據(jù)任務(wù)數(shù)量(numJobs)動(dòng)態(tài)生成的。
具體來說,每當(dāng)有一個(gè)新的任務(wù)需要執(zhí)行時(shí),都會(huì)創(chuàng)建一個(gè)新的協(xié)程來處理這個(gè)任務(wù)。這是通過在main函數(shù)的循環(huán)中調(diào)用go關(guān)鍵字實(shí)現(xiàn)的。這個(gè)過程在每次循環(huán)迭代中發(fā)生,從而為每個(gè)任務(wù)動(dòng)態(tài)創(chuàng)建一個(gè)新的協(xié)程
由于使用了限流器(rate.Limiter),這些協(xié)程不是一次性全部創(chuàng)建,而是根據(jù)限流器允許的速率逐個(gè)創(chuàng)建。每個(gè)協(xié)程在開始執(zhí)行任務(wù)之前會(huì)等待限流器的許可,以此確保每秒啟動(dòng)的任務(wù)數(shù)量不超過設(shè)定的最大值
func executeJob(jobID int) { startTime := time.Now() // 記錄任務(wù)開始時(shí)間 // 模擬任務(wù)執(zhí)行時(shí)間 fmt.Printf("%v Job %d started ",time.Now().Format("2006-01-02 1505.000"), jobID) // 初始化隨機(jī)數(shù)種子 rand.Seed(time.Now().UnixNano()) // 隨機(jī)生成一個(gè)時(shí)間間隔(例如,1到5000毫秒之間) min := 1 max := 5000 duration := time.Duration(rand.Intn(max-min+1)+min) * time.Millisecond time.Sleep(duration) durationCost := time.Since(startTime) // 計(jì)算任務(wù)耗時(shí) fmt.Printf("%v Job %d finished Cost:%v ", time.Now().Format("2006-01-02 1505.000"),jobID, durationCost) } func printStatus(runningJobs, startedJobs, finishedJobs *int32) { fmt.Printf("Current status - Running: %d, Started: %d, Finished: %d ", atomic.LoadInt32(runningJobs), atomic.LoadInt32(startedJobs), atomic.LoadInt32(finishedJobs)) }
可以在代碼中添加額外的邏輯來跟蹤和打印正在執(zhí)行、進(jìn)行中、剛啟動(dòng)和剛完成的任務(wù)數(shù)量。使用原子操作(來自sync/atomic包)來確保在并發(fā)環(huán)境下對(duì)這些計(jì)數(shù)器的操作是安全的。
在這個(gè)示例中:
使用sync/atomic包中的AddInt32和LoadInt32來安全地增加和讀取計(jì)數(shù)器的值。
在每個(gè)任務(wù)開始時(shí),增加startedJobs和runningJobs計(jì)數(shù)器。
在每個(gè)任務(wù)完成時(shí),增加finishedJobs計(jì)數(shù)器,并減少runningJobs計(jì)數(shù)器。
在任務(wù)完成后和釋放信號(hào)量前,打印當(dāng)前的任務(wù)狀態(tài)。
注意事項(xiàng)
這種方法可以幫助我們跟蹤不同狀態(tài)下的任務(wù)數(shù)量。
使用原子操作確保在并發(fā)環(huán)境中對(duì)計(jì)數(shù)器的讀寫是安全的。
printStatus函數(shù)在每個(gè)任務(wù)的結(jié)束時(shí)被調(diào)用,以打印當(dāng)前的任務(wù)狀態(tài)
鏈接:https://juejin.cn/post/7315314479204696079
審核編輯:劉清
-
限流器
+關(guān)注
關(guān)注
0文章
41瀏覽量
14481 -
QPS
+關(guān)注
關(guān)注
0文章
24瀏覽量
8800
原文標(biāo)題:Golang根據(jù)job數(shù)量動(dòng)態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法
文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論