Open Falcon alarm code analysis

Summary: alarm consumes the redis alarm events generated by judge, and sends them to different alarm channels according to whether they are merged according to the priority

High priority alarms such as p0: judge generate alarm events -- > write to the redis event:p0 queue -- > alarm consumption -- > get the sending object and process the call back function (if any) - > generate alarms (im,sms,mail,phone) of different channels according to different policies -- > write to the sending queue of each redis channel /im /sms /mail /phone -- > the worker who sends the alarm takes out the alarm and sends it

For low priority alarms, for example, p4: judge generates alarm events -- > writes to the redis event:p4 queue -- > alarm consumption -- > obtains the sending object and processes the call back function (if any) - > > generates merge (im,sms,mail,phone) events of different channels according to different policies, writes the merge queue (from /queue/user/im in the configuration file), etc. -- > is processed by the merge function of different channels, generates the dashboard link, calls the dashboard api, and writes the Falcon_ Portal Alert_ Link table for users to view the original information in the future -- > write it to the sending queue of redis channels /im /sms /mail /phone -- > the worker who sends the alarm takes out the alarm and sends it

Let's take a look at the code

1. The core of the main function is these goroutine s

        //Consumption alarm event
     go cron.ReadHighEvent()
    go cron.ReadLowEvent()
    //Combined low priority alarm
    go cron.CombineSms()
    go cron.CombineMail()
    go cron.CombineIM()
    //Send true alarm
    go cron.ConsumeIM()
    go cron.ConsumeSms()
    go cron.ConsumeMail()
    go cron.ConsumePhone()
    go cron.CleanExpiredEvent()

2. The difference between readhighevent and ReadLowEvent is the consumption time division priority

func ReadHighEvent() {
    queues := g.Config().Redis.HighQueues
    if len(queues) == 0 {

    for {
        /*brpop One event returned from multiple queues
        1.The list containing multiple high priority queues is passed in, such as [p0,p1,p2]
        Then, always pop the event:p0 queue first, and then P1 and P2 (I have measured here)
        2.Simple popevent is very fast, but each cycle has the following consume rs. If
        consume If the speed is slow, it will directly affect the overall pop speed. I have observed that before goroutine was added
        pop The speed is about 5 bars /s. If there are too many alarms, there will be accumulation. Previously, there will be a delay of about 4 hours
        event, err := popEvent(queues)
        if err != nil {
        //In fact, the consumption here has nothing to do with popevent, so asynchronous execution may generate too many goroutine s
        go consume(event, true)

3. consumption alarm event function

func consume(event *cmodel.Event, isHigh bool) {
    actionId := event.ActionId()
    if actionId <= 0 {
        /*Here, you can get the action through the actionid in the event
        Is to get the name of the alarm group, whether there is callback and other information
    action := api.GetAction(actionId)
    if action == nil {
        //If there is a callback, http get will call the corresponding callback function, and the alarm information will be brought as a parameter
    if action.Callback == 1 {
        HandleCallback(event, action)

    if isHigh {
        consumeHighEvents(event, action)
    } else {
        consumeLowEvents(event, action)

4. let's take a look at the high and low priority consume r functions

// No alarm merging for high priority
func consumeHighEvents(event *cmodel.Event, action *api.Action) {
    //If the alarm does not have a receiving group, it returns directly
    if action.Uic == "" {

    phones, mails, ims := api.ParseTeams(action.Uic)
        log.Infof("api.ParseTeams--phones, mails, ims,action.uic",phones, mails, ims,action.Uic)
      //Generate alarm content, which can be customized for alarms of different channels
    smsContent := GenerateSmsContent(event)
    mailContent := GenerateMailContent(event)
    //imContent := GenerateIMContent(event)
        phoneContent := GeneratePhoneContent(event)

    /* The channel can be customized according to the alarm level
    For example, send SMS only when <=p2 =p9 phone alarm, etc
    The following redi Wirtesms and other methods are to send the alarm content lpush to the sending queue of the non passable channel
    if event.Priority() < 3 {
        redi.WriteSms(phones, smsContent)
        //p9 telephone alarm
        if event.Priority() ==9 {
        redi.WriteSms(phones, smsContent)
        redi.WritePhone(phones, phoneContent)
    redi.WriteIM(mails, mailContent)
    redi.WriteMail(mails, smsContent, mailContent)

// Low priority alarm consolidation
func consumeLowEvents(event *cmodel.Event, action *api.Action) {
    if action.Uic == "" {

    // Send SMS only when <=p2
        //The parseuser function converts an event into a merge message and writes it to the intermediate queue
    if event.Priority() < 3 {
        ParseUserSms(event, action)
    ParseUserIm(event, action)
    ParseUserMail(event, action)

Let's take ParseUserMail as an example

func ParseUserMail(event *cmodel.Event, action *api.Action) {
    //api gets the group members according to the alarm group
    userMap := api.GetUsers(action.Uic)

    metric := event.Metric()
    subject := GenerateSmsContent(event)
    content := GenerateMailContent(event)
    status := event.Status
    priority := event.Priority()

    queue := g.Config().Redis.UserMailQueue

    rc := g.RedisConnPool.Get()
    defer rc.Close()
       //Traverse usermap to generate alarm intermediate state message and write LPUSH to intermediate queue
    for _, user := range userMap {
        dto := MailDto{
            Priority: priority,
            Metric:   metric,
            Subject:  subject,
            Content:  content,
            Email:    user.Email,
            Status:   status,
        bs, err := json.Marshal(dto)
        if err != nil {
            log.Error("json marshal MailDto fail:", err)

        _, err = rc.Do("LPUSH", queue, string(bs))
        if err != nil {
            log.Error("LPUSH redis", queue, "fail:", err, "dto:", string(bs))

At this time, the low priority alarm exists in the redis queue of the intermediate queue name in the configuration file /queue/user/mail

5. alarm merge function

func CombineSms() {
    for {
        // Read processing every minute

func combineIM() {
    //pop out the alarms to be merged from the intermediate queue
    dtos := popAllImDto()
    count := len(dtos)
    if count == 0 {

    dtoMap := make(map[string][]*ImDto)
    for i := 0; i < count; i++ {
        //Combine the alarms into a list according to the metrc priority status of the alarm and the receiver as the key
        key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].IM, dtos[i].Metric)
        if _, ok := dtoMap[key]; ok {
            dtoMap[key] = append(dtoMap[key], dtos[i])
        } else {
            dtoMap[key] = []*ImDto{dtos[i]}

    for _, arr := range dtoMap {
        size := len(arr)
        //If only one consolidated alarm is written directly to the redis send queue
        if size == 1 {
            redi.WriteIM([]string{arr[0].IM}, arr[0].Content)

        // Write multiple im contents to the database and only provide a link to the user
        contentArr := make([]string, size)
        for i := 0; i < size; i++ {
            contentArr[i] = arr[i].Content
        content := strings.Join(contentArr, ",,")

        first := arr[0].Content
        t := strings.Split(first, "][")
        eg := ""
        if len(t) >= 3 {
            eg = t[2]
                //Call the dashboard api to write the merged information to Falcon_ Portal Alert_ Link table
        path, err := api.LinkToSMS(content)
        chat := ""
        if err != nil || path == "" {
            chat = fmt.Sprintf("[P%d][%s] %d %s.  e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
            log.Error("create short link fail", err)
        } else {
            //Generate a summary information display: url of metric status link
            chat = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
                arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)
            log.Debugf("combined im is:%s", chat)
        if  arr[0].IM==""{
            email:= fmt.Sprintf("",arr[0].Name)
            redi.WriteIM([]string{email}, chat)
            redi.WriteIM([]string{arr[0].IM}, chat)


6. finally, take a look at the alarm sending function

func ConsumeIM() {
    for {
        //rpop sends all alarm information to a slice
        L := redi.PopAllIM()
        if len(L) == 0 {
            time.Sleep(time.Millisecond * 200)

func SendIMList(L []*model.IM) {
    for _, im := range L {
        1.IMWorkerChan It is a Chan with buffer. The length of Chan means how many send jobs can be performed at the same time
        2.Send a note of 1 written in the workerchan to the im to send one
        3.If the queue is not full, it will not be blocked here, otherwise it will be blocked
        IMWorkerChan <- 1
        go SendIM(im)

func SendIM(im *model.IM) {
    1.The logic of using defer here is to send first and then read chan
    2.Because if you read it first, it means that another work can start, which is contrary to the logic
    3.The following is the customized sending method
    defer func() {
    if im.Tos==""{
        log.Errorf("content_tos_empty_error %s ",im.Content)

Tags: Go monitor and control

Posted by joquius on Tue, 31 May 2022 21:59:07 +0530