再次整理了一下這個日志收集系統(tǒng)的框,如下圖
這次要實現(xiàn)的代碼的整體邏輯為:
完整代碼地址為: https://github.com/pythonsite/logagent
etcd介紹
高可用的分布式key-value存儲,可以用于配置共享和服務(wù)發(fā)現(xiàn)
類似的項目:zookeeper和consul
開發(fā)語言:go
接口:提供restful的接口,使用簡單
實現(xiàn)算法:基于raft算法的強一致性,高可用的服務(wù)存儲目錄
etcd的應(yīng)用場景:
服務(wù)發(fā)現(xiàn)和服務(wù)注冊
配置中心(我們實現(xiàn)的日志收集客戶端需要用到)
分布式鎖
master選舉
官網(wǎng)對etcd的有一個非常簡明的介紹:
etcd搭建:
下載地址:https://github.com/coreos/etcd/releases/
根據(jù)自己的環(huán)境下載對應(yīng)的版本然后啟動起來就可以了
啟動之后可以通過如下命令驗證一下:
[root@localhost etcd-v3.2.18-linux-amd64]# 。/etcdctl set name zhaofan
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# 。/etcdctl get name
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]#
context 介紹和使用
其實這個東西翻譯過來就是上下文管理,那么context的作用是做什么,主要有如下兩個作用:
控制goroutine的超時
保存上下文數(shù)據(jù)
通過下面一個簡單的例子進行理解:
package main
import (
“fmt”
“time”
“net/http”
“context”
“io/ioutil”
)
type Result struct{
r *http.Response
err error
}
func process(){
ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
defer cancel()
tr := &http.Transport{}
client := &http.Client{Transport:tr}
c := make(chan Result,1)
req,err := http.NewRequest(“GET”,“http://www.google.com”,nil)
if err != nil{
fmt.Println(“http request failed,err:”,err)
return
}
// 如果請求成功了會將數(shù)據(jù)存入到管道中
go func(){
resp,err := client.Do(req)
pack := Result{resp,err}
c 《- pack
}()
select{
case 《- ctx.Done():
tr.CancelRequest(req)
fmt.Println(“timeout!”)
case res := 《-c:
defer res.r.Body.Close()
out,_:= ioutil.ReadAll(res.r.Body)
fmt.Printf(“server response:%s”,out)
}
return
}
func main() {
process()
}
寫一個通過context保存上下文,代碼例子如:
package main
import (
“github.com/Go-zh/net/context”
“fmt”
)
func add(ctx context.Context,a,b int) int {
traceId := ctx.Value(“trace_id”)。(string)
fmt.Printf(“trace_id:%v
”,traceId)
return a+b
}
func calc(ctx context.Context,a, b int) int{
traceId := ctx.Value(“trace_id”)。(string)
fmt.Printf(“trace_id:%v
”,traceId)
//再將ctx傳入到add中
return add(ctx,a,b)
}
func main() {
//將ctx傳遞到calc中
ctx := context.WithValue(context.Background(),“trace_id”,“123456”)
calc(ctx,20,30)
}
結(jié)合etcd和context使用
關(guān)于通過go連接etcd的簡單例子:(這里有個小問題需要注意就是etcd的啟動方式,默認啟動可能會連接不上,尤其你是在虛擬你安裝,所以需要通過如下命令啟動:
。/etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
)
package main
import (
etcd_client “github.com/coreos/etcd/clientv3”
“time”
“fmt”
)
func main() {
cli, err := etcd_client.New(etcd_client.Config{
Endpoints:[]string{“192.168.0.118:2371”},
DialTimeout:5*time.Second,
})
if err != nil{
fmt.Println(“connect failed,err:”,err)
return
}
fmt.Println(“connect success”)
defer cli.Close()
}
下面一個例子是通過連接etcd,存值并取值
package main
import (
“github.com/coreos/etcd/clientv3”
“time”
“fmt”
“context”
)
func main() {
cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{“192.168.0.118:2371”},
DialTimeout:5*time.Second,
})
if err != nil{
fmt.Println(“connect failed,err:”,err)
return
}
fmt.Println(“connect succ”)
defer cli.Close()
ctx,cancel := context.WithTimeout(context.Background(),time.Second)
_,err = cli.Put(ctx,“l(fā)ogagent/conf/”,“sample_value”)
cancel()
if err != nil{
fmt.Println(“put failed,err”,err)
return
}
ctx, cancel = context.WithTimeout(context.Background(),time.Second)
resp,err := cli.Get(ctx,“l(fā)ogagent/conf/”)
cancel()
if err != nil{
fmt.Println(“get failed,err:”,err)
return
}
for _,ev := range resp.Kvs{
fmt.Printf(“%s:%s
”,ev.Key,ev.Value)
}
}
關(guān)于context官網(wǎng)也有一個例子非常有用,用于控制開啟的goroutine的退出,代碼如下:
package main
import (
“context”
“fmt”
)
func main() {
// gen generates integers in a separate goroutine and
// sends them to the returned channel.
// The callers of gen need to cancel the context once
// they are done consuming generated integers not to leak
// the internal goroutine started by gen.
gen := func(ctx context.Context) 《-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case 《-ctx.Done():
return // returning not to leak the goroutine
case dst 《- n:
n++
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel when we are finished consuming integers
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}
}
關(guān)于官網(wǎng)文檔中的WithDeadline演示的代碼例子:
package main
import (
“context”
“fmt”
“time”
)
func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
// Even though ctx will be expired, it is good practice to call its
// cancelation function in any case. Failure to do so may keep the
// context and its parent alive longer than necessary.
defer cancel()
select {
case 《-time.After(1 * time.Second):
fmt.Println(“overslept”)
case 《-ctx.Done():
fmt.Println(ctx.Err())
}
}
通過上面的代碼有了一個基本的使用,那么如果我們通過etcd來做配置管理,如果配置更改之后,我們?nèi)绾瓮ㄖ獙?yīng)的服務(wù)器配置更改,通過下面例子演示:
package main
import (
“github.com/coreos/etcd/clientv3”
“time”
“fmt”
“context”
)
func main() {
cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{“192.168.0.118:2371”},
DialTimeout:5*time.Second,
})
if err != nil {
fmt.Println(“connect failed,err:”,err)
return
}
defer cli.Close()
// 這里會阻塞
rch := cli.Watch(context.Background(),“l(fā)ogagent/conf/”)
for wresp := range rch{
for _,ev := range wresp.Events{
fmt.Printf(“%s %q : %q
”, ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
實現(xiàn)一個kafka的消費者代碼的簡單例子:
package main
import (
“github.com/Shopify/sarama”
“strings”
“fmt”
“time”
)
func main() {
consumer,err := sarama.NewConsumer(strings.Split(“192.168.0.118:9092”,“,”),nil)
if err != nil{
fmt.Println(“failed to start consumer:”,err)
return
}
partitionList,err := consumer.Partitions(“nginx_log”)
if err != nil {
fmt.Println(“Failed to get the list of partitions:”,err)
return
}
fmt.Println(partitionList)
for partition := range partitionList{
pc,err := consumer.ConsumePartition(“nginx_log”,int32(partition),sarama.OffsetNewest)
if err != nil {
fmt.Printf(“failed to start consumer for partition %d:%s
”,partition,err)
return
}
defer pc.AsyncClose()
go func(partitionConsumer sarama.PartitionConsumer){
for msg := range pc.Messages(){
fmt.Printf(“partition:%d Offset:%d Key:%s Value:%s”,msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
}
}(pc)
}
time.Sleep(time.Hour)
consumer.Close()
}
但是上面的代碼并不是最佳代碼,因為我們最后是通過time.sleep等待goroutine的執(zhí)行,我們可以更改為通過sync.WaitGroup方式實現(xiàn)
package main
import (
“github.com/Shopify/sarama”
“strings”
“fmt”
“sync”
)
var (
wg sync.WaitGroup
)
func main() {
consumer,err := sarama.NewConsumer(strings.Split(“192.168.0.118:9092”,“,”),nil)
if err != nil{
fmt.Println(“failed to start consumer:”,err)
return
}
partitionList,err := consumer.Partitions(“nginx_log”)
if err != nil {
fmt.Println(“Failed to get the list of partitions:”,err)
return
}
fmt.Println(partitionList)
for partition := range partitionList{
pc,err := consumer.ConsumePartition(“nginx_log”,int32(partition),sarama.OffsetNewest)
if err != nil {
fmt.Printf(“failed to start consumer for partition %d:%s
”,partition,err)
return
}
defer pc.AsyncClose()
go func(partitionConsumer sarama.PartitionConsumer){
wg.Add(1)
for msg := range partitionConsumer.Messages(){
fmt.Printf(“partition:%d Offset:%d Key:%s Value:%s”,msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
}
wg.Done()
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}
將客戶端需要收集的日志信息放到etcd中
關(guān)于etcd處理的代碼為:
package main
import (
“github.com/coreos/etcd/clientv3”
“time”
“github.com/astaxie/beego/logs”
“context”
“fmt”
)
var Client *clientv3.Client
var logConfChan chan string
// 初始化etcd
func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){
var keys []string
for _,ip := range ipArrays{
//keyfmt = /logagent/%s/log_config
keys = append(keys,fmt.Sprintf(keyfmt,ip))
}
logConfChan = make(chan string,10)
logs.Debug(“etcd watch key:%v timeout:%v”, keys, timeout)
Client,err = clientv3.New(clientv3.Config{
Endpoints:addr,
DialTimeout: timeout,
})
if err != nil{
logs.Error(“connect failed,err:%v”,err)
return
}
logs.Debug(“init etcd success”)
waitGroup.Add(1)
for _, key := range keys{
ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
// 從etcd中獲取要收集日志的信息
resp,err := Client.Get(ctx,key)
cancel()
if err != nil {
logs.Warn(“get key %s failed,err:%v”,key,err)
continue
}
for _, ev := range resp.Kvs{
logs.Debug(“%q : %q
”, ev.Key, ev.Value)
logConfChan 《- string(ev.Value)
}
}
go WatchEtcd(keys)
return
}
func WatchEtcd(keys []string){
// 這里用于檢測當需要收集的日志信息更改時及時更新
var watchChans []clientv3.WatchChan
for _,key := range keys{
rch := Client.Watch(context.Background(),key)
watchChans = append(watchChans,rch)
}
for {
for _,watchC := range watchChans{
select{
case wresp := 《-watchC:
for _,ev:= range wresp.Events{
logs.Debug(“%s %q : %q
”, ev.Type, ev.Kv.Key, ev.Kv.Value)
logConfChan 《- string(ev.Kv.Value)
}
default:
}
}
time.Sleep(time.Second)
}
waitGroup.Done()
}
func GetLogConf()chan string{
return logConfChan
}
同樣的這里增加對了限速的處理,畢竟日志收集程序不能影響了當前業(yè)務(wù)的性能,所以增加了limit.go用于限制速度:
package main
import (
“time”
“sync/atomic”
“github.com/astaxie/beego/logs”
)
type SecondLimit struct {
unixSecond int64
curCount int32
limit int32
}
func NewSecondLimit(limit int32) *SecondLimit {
secLimit := &SecondLimit{
unixSecond:time.Now().Unix(),
curCount:0,
limit:limit,
}
return secLimit
}
func (s *SecondLimit) Add(count int) {
sec := time.Now().Unix()
if sec == s.unixSecond {
atomic.AddInt32(&s.curCount,int32(count))
return
}
atomic.StoreInt64(&s.unixSecond,sec)
atomic.StoreInt32(&s.curCount, int32(count))
}
func (s *SecondLimit) Wait()bool {
for {
sec := time.Now().Unix()
if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
time.Sleep(time.Microsecond)
logs.Debug(“l(fā)imit is running,limit:%d s.curCount:%d”,s.limit,s.curCount)
continue
}
if sec != atomic.LoadInt64(&s.unixSecond) {
atomic.StoreInt64(&s.unixSecond,sec)
atomic.StoreInt32(&s.curCount,0)
}
logs.Debug(“l(fā)imit is exited”)
return false
}
}
小結(jié)
這次基本實現(xiàn)了日志收集的前半段的處理,后面將把日志扔到es中,并最終在頁面上呈現(xiàn)
來源:IT大咖說
責任編輯:gt
-
開發(fā)
+關(guān)注
關(guān)注
0文章
370瀏覽量
40868 -
代碼
+關(guān)注
關(guān)注
30文章
4801瀏覽量
68728
原文標題:Go實現(xiàn)海量日志收集系統(tǒng)
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論