【導讀】本文介紹了事件總線實現。
最近在學習開源項目Grafana
的代碼,發現作者實現了一個事件總線的機制,在項目里面大量應用,效果也非常好,代碼也比較簡單,介紹給大家看看。
源碼文件地址:grafana/bus.go at main · grafana/grafana · GitHub
1.注冊和調用
在這個項目里面隨處可見這種寫法:
funcValidateOrgAlert(c*models.ReqContext){
id:=c.ParamsInt64(":alertId")
query:=models.GetAlertByIdQuery{Id:id}
iferr:=bus.Dispatch(&query);err!=nil{
c.JsonApiErr(404,"Alertnotfound",nil)
return
}
ifc.OrgId!=query.Result.OrgId{
c.JsonApiErr(403,"Youarenotallowedtoedit/viewalert",nil)
return
}
}
關鍵是bus.Dispatch(&query)
這段代碼,它的參數是一個結構體GetAlertByIdQuery
,內容如下:
typeGetAlertByIdQuerystruct{
Idint64
Result*Alert
}
根據名字可以看出這個方法就是通過Id去查詢Alert,其中Alert
結構體就是結果對象,這里就不貼出來了。
通過查看源碼可以得知,Dispatch背后是調用了GetAlertById
這個方法,然后把結果賦值到query參數的Result中返回。
funcGetAlertById(query*models.GetAlertByIdQuery)error{
alert:=models.Alert{}
has,err:=x.ID(query.Id).Get(&alert)
if!has{
returnfmt.Errorf("couldnotfindalert")
}
iferr!=nil{
returnerr
}
query.Result=&alert
returnnil
}
問題來了,這是怎么實現的呢?Dispatch到底做了哪些操作?這樣做有什么好處?
下面我來一一解答:
首先,在Dispatch之前,你需要先注冊這個方法,也就是調用AddHandler
,在這個項目里面可以看到init函數里面有大量這樣的代碼:
funcinit(){
bus.AddHandler("sql",SaveAlerts)
bus.AddHandler("sql",HandleAlertsQuery)
bus.AddHandler("sql",GetAlertById)
...
}
其實這個方法的邏輯也很簡單,所謂注冊也就是把通過一個map把函數名和對應的函數做一個映射關系保存起來,當我們Dispatch的時候其實就是通過參數名查找之前注冊過的函數,然后通過反射調用該函數。
Bus結構體里面有幾個map成員,在這個項目里面作者定義了3種不同類型的handler,一種是普通的handler,也就是剛才展示的那種,第二種是帶上下文的handler,還有一種則是事件訂閱用到的handler,我們給一個事件注冊多個監聽者,當事件觸發的時候會依次調用多個監聽函數,其實就是一個觀察者模式。
//InProcBusdefinesthebusstructure
typeInProcBusstruct{
handlersmap[string]HandlerFunc
handlersWithCtxmap[string]HandlerFunc
listenersmap[string][]HandlerFunc
txMngTransactionManager
}
下面就看看具體的源碼,AddHandler
方法內容如下:
func(b*InProcBus)AddHandler(handlerHandlerFunc){
handlerType:=reflect.TypeOf(handler)
queryTypeName:=handlerType.In(0).Elem().Name()//獲取函數第一個參數的名稱,在上面例子里面就是GetAlertByIdQuery
b.handlers[queryTypeName]=handler
}
Dispatch方法的源碼如下:
func(b*InProcBus)Dispatch(msgMsg)error{
varmsgName=reflect.TypeOf(msg).Elem().Name()
withCtx:=true
handler:=b.handlersWithCtx[msgName]//根據參數名查找注冊過的函數,先查找帶Ctx的handler
ifhandler==nil{
withCtx=false
handler=b.handlers[msgName]
ifhandler==nil{
returnErrHandlerNotFound
}
}
varparams=[]reflect.Value{}
ifwithCtx{
//如果查找到的handler是帶Ctx的就給個默認的Background的Ctx
params=append(params,reflect.ValueOf(context.Background()))
}
params=append(params,reflect.ValueOf(msg))
ret:=reflect.ValueOf(handler).Call(params)//通過反射機制調用函數
err:=ret[0].Interface()
iferr==nil{
returnnil
}
returnerr.(error)
}
對于AddHandlerCtx
和DispatchCtx
這個2個方法基本上是一樣的,只不過多了一個上下文參數,可以拿來做超時控制或者其它用途。
2.訂閱和發布
除此之外,還有2個方法AddEventListener
和Publish
,即事件的訂閱和發布。
func(b*InProcBus)AddEventListener(handlerHandlerFunc){
handlerType:=reflect.TypeOf(handler)
eventName:=handlerType.In(0).Elem().Name()
_,exists:=b.listeners[eventName]
if!exists{
b.listeners[eventName]=make([]HandlerFunc,0)
}
b.listeners[eventName]=append(b.listeners[eventName],handler)
}
查看源碼可以得知,可以給一個事件注冊多個handler函數,而Publish的時候則是依次調用注冊的函數,邏輯也不復雜。
func(b*InProcBus)Publish(msgMsg)error{
varmsgName=reflect.TypeOf(msg).Elem().Name()
varlisteners=b.listeners[msgName]
varparams=make([]reflect.Value,1)
params[0]=reflect.ValueOf(msg)
for_,listenerHandler:=rangelisteners{
ret:=reflect.ValueOf(listenerHandler).Call(params)
e:=ret[0].Interface()
ife!=nil{
err,ok:=e.(error)
ifok{
returnerr
}
returnfmt.Errorf("expectedlistenertoreturnanerror,got'%T'",e)
}
}
returnnil
}
這里面有一點不好,所有訂閱函數的調用是順序的,并沒有使用協程,所以如果注冊了很多個函數,這樣效率也不高啊。
3.好處
可能有人會好奇,為什么明明可以直接調用函數就行,為啥非得繞個彎子,整這么復雜?
況且,每次調用都得使用反射機制,性能也不行。
我覺得主要有以下幾點:
1.這種寫法邏輯清晰,解耦
2.方便單元測試
3.性能不是最大考量,雖然說反射會降低性能
原文標題:Golang 事件系統 Event Bus
文章出處:【微信公眾號:Linux愛好者】歡迎添加關注!文章轉載請注明出處。
-
總線
+關注
關注
10文章
2878瀏覽量
88052 -
開源
+關注
關注
3文章
3309瀏覽量
42471 -
代碼
+關注
關注
30文章
4779瀏覽量
68525
原文標題:Golang 事件系統 Event Bus
文章出處:【微信號:LinuxHub,微信公眾號:Linux愛好者】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論