go uses thread pool to limit request flow

Description: When it comes to request current limiting, MQ is generally used. No matter what kind of MQ, both producers and consumers are required to play the powerful role of MQ. However, in the docking project, there may be cases where the docking party cannot use MQ together. At this time, it is also a feasible idea to use the thread pool for current limiting.

process:

1. Need to manually implement a thread pool. When it comes to thread pools, the factors to consider are: number of core threads, task queue, maximum number of threads, thread idle time, retention policy.

①Open the thread pool, accept tasks, and create a thread for each task accepted.

② When the number of threads reaches the number of core threads, the following tasks are put into the task queue. It is recommended to use blocking queues to prevent memory overflow.

③ When the task queue is saturated, additional threads will be created in the thread pool to process tasks until the maximum number of threads is reached.

④ When this part of the extra threads in the thread pool is in an idle state and reaches the thread idle time requirement, this part of the threads will be destroyed.

⑤ When the maximum number of threads is reached, there are still follow-up tasks to be processed. At this time, a decision should be made on the removal and retention of this part of the task. Three retention policies are available:

Ⅰ. Discard directly without treatment.

Ⅱ. Open up threads that are separated from the thread pool for processing.

Ⅲ. Discard long-awaited tasks in the task queue and add them to subsequent tasks.

2. To limit the flow of requests, you must first understand the operating principle of the server

①The server needs to have a listener to monitor the request connection. When the client sends a request, the server will first establish a tcp connection with the client.

② Open up a thread to process the http request sent in this tcp connection separately, until the http request is read and the response is returned. The default tcp connection will live for 90 seconds. We need to perform the operation of request current limit here, see the code for detailed operation.

//Thread Pool
package myroutine

import (
	"fmt"
	"strconv"
)

/**
 * @ Author      : jgbb
 * @ Date        : Created in 2019/9/4 13:19
 * @ Description : TODO Thread Pool
 * @ Modified by :
 * @ Version     : 1.0
 */

func Init(poolSize int,name string) *RoutinePool{
	pool := &RoutinePool{
		Queue:make(chan func()),
		PoolSize:poolSize,
		Name:name,
	}
	defer pool.ExeTask()
	return pool
}

type RoutinePool struct {
	//cache task
	Queue chan func()
	PoolSize int
	Name string
}


// Add tasks to thread pool
func (pool *RoutinePool) AddTask(task func()){
	pool.Queue <- task
}

//perform tasks
func (pool *RoutinePool) ExeTask(){
	counter := make(chan int)
	for i:=0;i<pool.PoolSize;i++ {
		go func() {
			j := <- counter//which thread
			var count int64= 0//count (how many times the thread ran)
			var stdout =pool.Name+"\t thread"+strconv.Itoa(j)+"\t"
			for task := range pool.Queue{
				count++
				fmt.Printf("%p\t%s\n",pool,stdout+strconv.FormatInt(count,10))

				task()
			}
		}()

		counter <- i
	}
}

  

//Modify the server source code
const(
DefaultPoolSize = 10
)
//Thread pool corresponding to each request
var PoolMap  = make(map[string]*myroutine.RoutinePool)

//golang source code
func (srv *Server) Serve(l net.Listener) error {
	defer l.Close()
	if fn := testHookServerServe; fn != nil {
		fn(srv, l)
	}
	var tempDelay time.Duration // how long to sleep on accept failure

	if err := srv.setupHTTP2_Serve(); err != nil {
		return err
	}

	srv.trackListener(l, true)
	defer srv.trackListener(l, false)

	baseCtx := context.Background() // base is always background, per Issue 16220
	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
	for {
		rw, e := l.Accept()
		if e != nil {
			select {
			case <-srv.getDoneChan():
				return ErrServerClosed
			default:
			}
			if ne, ok := e.(net.Error); ok && ne.Temporary() {
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond
				} else {
					tempDelay *= 2
				}
				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				}
				srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
				time.Sleep(tempDelay)
				continue
			}
			return e
		}
		tempDelay = 0
		c := srv.newConn(rw)
                
                /***********************Modify start *********************************/
		//Put the processing of c.server(ctx) into the thread pool
		//First, you need to request the path, and get the corresponding thread pool according to the path
		c.r = &connReader{conn: c}
		c.r.setReadLimit(c.server.initialReadLimitSize()) //Without setReadLimit, the data in the buffer stream cannot be read
		c.bufr = newBufioReader(c.r)//to read the stream
		s,err := c.bufr.Peek(100)//The buffered stream uses peek(), the cursor will not be counted, so that the data in the stream can be reused in subsequent processing. Otherwise, subsequent read streams will start from the cursor
		news := make([]byte,0)
		for i:=0;i<100;i++ {
			news = append(news,s[i])
			if s[i] == 10 {
                                //10 means newline, get the required information here
				break
			}
		}
		if err != nil {
			fmt.Errorf("my err:%v",err)
		}
		newss := string(news)
                //Request path as thread pool name
		poolName := newss[strings.Index(newss,"/"):strings.LastIndex(newss," ")]


		c.setState(c.rwc, StateNew) // before Serve can return
		//go c.serve(ctx) //source code

		//Put it into the thread pool to process the request
		putPoolMap(poolName).AddTask(func() {
			c.serve(ctx)
		})
                /***********************End of modification ******************************/
	}
}

//Generate thread pool
//-parameter 1: thread pool size
//-parameter 2: thread pool name
func PutPoolMap(poolSize int,name string) *myroutine.RoutinePool{
	if _,ok := PoolMap[name]; !ok {
                //If there is no corresponding thread pool, generate one
		PoolMap[name] = myroutine.Init(poolSize,name)
	}
        //Returns the corresponding thread pool
	return PoolMap[name]
}

//By default, this method is used to generate a thread pool
//-parameter 1: thread pool name
func putPoolMap(name string) *myroutine.RoutinePool{
	return PutPoolMap(DefaultPoolSize,name)
}        

  

Tags: Go

Posted by kosstr12 on Thu, 02 Jun 2022 06:30:10 +0530