Summary: alarm consumes the redis alarm events generated by judge, and sends them to different alarm channels according to whether they are merged according to the priority
High priority alarms such as p0: judge generate alarm events -- > write to the redis event:p0 queue -- > alarm consumption -- > get the sending object and process the call back function (if any) - > generate alarms (im,sms,mail,phone) of different channels according to different policies -- > write to the sending queue of each redis channel /im /sms /mail /phone -- > the worker who sends the alarm takes out the alarm and sends it
For low priority alarms, for example, p4: judge generates alarm events -- > writes to the redis event:p4 queue -- > alarm consumption -- > obtains the sending object and processes the call back function (if any) - > > generates merge (im,sms,mail,phone) events of different channels according to different policies, writes the merge queue (from /queue/user/im in the configuration file), etc. -- > is processed by the merge function of different channels, generates the dashboard link, calls the dashboard api, and writes the Falcon_ Portal Alert_ Link table for users to view the original information in the future -- > write it to the sending queue of redis channels /im /sms /mail /phone -- > the worker who sends the alarm takes out the alarm and sends it
Let's take a look at the code
1. The core of the main function is these goroutine s
//Consumption alarm event go cron.ReadHighEvent() go cron.ReadLowEvent() //Combined low priority alarm go cron.CombineSms() go cron.CombineMail() go cron.CombineIM() //Send true alarm go cron.ConsumeIM() go cron.ConsumeSms() go cron.ConsumeMail() go cron.ConsumePhone() go cron.CleanExpiredEvent()
2. The difference between readhighevent and ReadLowEvent is the consumption time division priority
func ReadHighEvent() { queues := g.Config().Redis.HighQueues if len(queues) == 0 { return } for { /*brpop One event returned from multiple queues 1.The list containing multiple high priority queues is passed in, such as [p0,p1,p2] Then, always pop the event:p0 queue first, and then P1 and P2 (I have measured here) 2.Simple popevent is very fast, but each cycle has the following consume rs. If consume If the speed is slow, it will directly affect the overall pop speed. I have observed that before goroutine was added pop The speed is about 5 bars /s. If there are too many alarms, there will be accumulation. Previously, there will be a delay of about 4 hours */ event, err := popEvent(queues) if err != nil { time.Sleep(time.Second) continue } //In fact, the consumption here has nothing to do with popevent, so asynchronous execution may generate too many goroutine s go consume(event, true) } }
3. consumption alarm event function
func consume(event *cmodel.Event, isHigh bool) { actionId := event.ActionId() if actionId <= 0 { return } /*Here, you can get the action through the actionid in the event Is to get the name of the alarm group, whether there is callback and other information */ action := api.GetAction(actionId) if action == nil { return } //If there is a callback, http get will call the corresponding callback function, and the alarm information will be brought as a parameter if action.Callback == 1 { HandleCallback(event, action) } if isHigh { consumeHighEvents(event, action) } else { consumeLowEvents(event, action) } }
4. let's take a look at the high and low priority consume r functions
// No alarm merging for high priority func consumeHighEvents(event *cmodel.Event, action *api.Action) { //If the alarm does not have a receiving group, it returns directly if action.Uic == "" { return } phones, mails, ims := api.ParseTeams(action.Uic) log.Infof("api.ParseTeams--phones, mails, ims,action.uic",phones, mails, ims,action.Uic) //Generate alarm content, which can be customized for alarms of different channels smsContent := GenerateSmsContent(event) mailContent := GenerateMailContent(event) //imContent := GenerateIMContent(event) phoneContent := GeneratePhoneContent(event) /* The channel can be customized according to the alarm level For example, send SMS only when <=p2 =p9 phone alarm, etc The following redi Wirtesms and other methods are to send the alarm content lpush to the sending queue of the non passable channel */ if event.Priority() < 3 { redi.WriteSms(phones, smsContent) } //p9 telephone alarm if event.Priority() ==9 { redi.WriteSms(phones, smsContent) redi.WritePhone(phones, phoneContent) } redi.WriteIM(mails, mailContent) redi.WriteMail(mails, smsContent, mailContent) }
// Low priority alarm consolidation func consumeLowEvents(event *cmodel.Event, action *api.Action) { if action.Uic == "" { return } // Send SMS only when <=p2 //The parseuser function converts an event into a merge message and writes it to the intermediate queue if event.Priority() < 3 { ParseUserSms(event, action) } ParseUserIm(event, action) ParseUserMail(event, action) }
Let's take ParseUserMail as an example
func ParseUserMail(event *cmodel.Event, action *api.Action) { //api gets the group members according to the alarm group userMap := api.GetUsers(action.Uic) metric := event.Metric() subject := GenerateSmsContent(event) content := GenerateMailContent(event) status := event.Status priority := event.Priority() queue := g.Config().Redis.UserMailQueue rc := g.RedisConnPool.Get() defer rc.Close() //Traverse usermap to generate alarm intermediate state message and write LPUSH to intermediate queue for _, user := range userMap { dto := MailDto{ Priority: priority, Metric: metric, Subject: subject, Content: content, Email: user.Email, Status: status, } bs, err := json.Marshal(dto) if err != nil { log.Error("json marshal MailDto fail:", err) continue } _, err = rc.Do("LPUSH", queue, string(bs)) if err != nil { log.Error("LPUSH redis", queue, "fail:", err, "dto:", string(bs)) } } }
At this time, the low priority alarm exists in the redis queue of the intermediate queue name in the configuration file /queue/user/mail
5. alarm merge function
func CombineSms() { for { // Read processing every minute time.Sleep(time.Minute) combineSms() } } func combineIM() { //pop out the alarms to be merged from the intermediate queue dtos := popAllImDto() count := len(dtos) if count == 0 { return } dtoMap := make(map[string][]*ImDto) for i := 0; i < count; i++ { //Combine the alarms into a list according to the metrc priority status of the alarm and the receiver as the key key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].IM, dtos[i].Metric) if _, ok := dtoMap[key]; ok { dtoMap[key] = append(dtoMap[key], dtos[i]) } else { dtoMap[key] = []*ImDto{dtos[i]} } } for _, arr := range dtoMap { size := len(arr) //If only one consolidated alarm is written directly to the redis send queue if size == 1 { redi.WriteIM([]string{arr[0].IM}, arr[0].Content) continue } // Write multiple im contents to the database and only provide a link to the user contentArr := make([]string, size) for i := 0; i < size; i++ { contentArr[i] = arr[i].Content } content := strings.Join(contentArr, ",,") first := arr[0].Content t := strings.Split(first, "][") eg := "" if len(t) >= 3 { eg = t[2] } //Call the dashboard api to write the merged information to Falcon_ Portal Alert_ Link table path, err := api.LinkToSMS(content) chat := "" if err != nil || path == "" { chat = fmt.Sprintf("[P%d][%s] %d %s. e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg) log.Error("create short link fail", err) } else { //Generate a summary information display: url of metric status link chat = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path) log.Debugf("combined im is:%s", chat) } if arr[0].IM==""{ email:= fmt.Sprintf("%s@bytedance.com",arr[0].Name) redi.WriteIM([]string{email}, chat) }else{ redi.WriteIM([]string{arr[0].IM}, chat) } } }
6. finally, take a look at the alarm sending function
func ConsumeIM() { for { //rpop sends all alarm information to a slice L := redi.PopAllIM() if len(L) == 0 { time.Sleep(time.Millisecond * 200) continue } SendIMList(L) } } func SendIMList(L []*model.IM) { for _, im := range L { /* 1.IMWorkerChan It is a Chan with buffer. The length of Chan means how many send jobs can be performed at the same time 2.Send a note of 1 written in the workerchan to the im to send one 3.If the queue is not full, it will not be blocked here, otherwise it will be blocked */ IMWorkerChan <- 1 go SendIM(im) } } func SendIM(im *model.IM) { /* 1.The logic of using defer here is to send first and then read chan 2.Because if you read it first, it means that another work can start, which is contrary to the logic 3.The following is the customized sending method */ defer func() { <-IMWorkerChan }() if im.Tos==""{ log.Errorf("content_tos_empty_error %s ",im.Content) return } }