Description: When it comes to request current limiting, MQ is generally used. No matter what kind of MQ, both producers and consumers are required to play the powerful role of MQ. However, in the docking project, there may be cases where the docking party cannot use MQ together. At this time, it is also a feasible idea to use the thread pool for current limiting.
process:
1. Need to manually implement a thread pool. When it comes to thread pools, the factors to consider are: number of core threads, task queue, maximum number of threads, thread idle time, retention policy.
①Open the thread pool, accept tasks, and create a thread for each task accepted.
② When the number of threads reaches the number of core threads, the following tasks are put into the task queue. It is recommended to use blocking queues to prevent memory overflow.
③ When the task queue is saturated, additional threads will be created in the thread pool to process tasks until the maximum number of threads is reached.
④ When this part of the extra threads in the thread pool is in an idle state and reaches the thread idle time requirement, this part of the threads will be destroyed.
⑤ When the maximum number of threads is reached, there are still follow-up tasks to be processed. At this time, a decision should be made on the removal and retention of this part of the task. Three retention policies are available:
Ⅰ. Discard directly without treatment.
Ⅱ. Open up threads that are separated from the thread pool for processing.
Ⅲ. Discard long-awaited tasks in the task queue and add them to subsequent tasks.
2. To limit the flow of requests, you must first understand the operating principle of the server
①The server needs to have a listener to monitor the request connection. When the client sends a request, the server will first establish a tcp connection with the client.
② Open up a thread to process the http request sent in this tcp connection separately, until the http request is read and the response is returned. The default tcp connection will live for 90 seconds. We need to perform the operation of request current limit here, see the code for detailed operation.
//Thread Pool package myroutine import ( "fmt" "strconv" ) /** * @ Author : jgbb * @ Date : Created in 2019/9/4 13:19 * @ Description : TODO Thread Pool * @ Modified by : * @ Version : 1.0 */ func Init(poolSize int,name string) *RoutinePool{ pool := &RoutinePool{ Queue:make(chan func()), PoolSize:poolSize, Name:name, } defer pool.ExeTask() return pool } type RoutinePool struct { //cache task Queue chan func() PoolSize int Name string } // Add tasks to thread pool func (pool *RoutinePool) AddTask(task func()){ pool.Queue <- task } //perform tasks func (pool *RoutinePool) ExeTask(){ counter := make(chan int) for i:=0;i<pool.PoolSize;i++ { go func() { j := <- counter//which thread var count int64= 0//count (how many times the thread ran) var stdout =pool.Name+"\t thread"+strconv.Itoa(j)+"\t" for task := range pool.Queue{ count++ fmt.Printf("%p\t%s\n",pool,stdout+strconv.FormatInt(count,10)) task() } }() counter <- i } }
//Modify the server source code
const(
DefaultPoolSize = 10
)//Thread pool corresponding to each request var PoolMap = make(map[string]*myroutine.RoutinePool) //golang source code func (srv *Server) Serve(l net.Listener) error { defer l.Close() if fn := testHookServerServe; fn != nil { fn(srv, l) } var tempDelay time.Duration // how long to sleep on accept failure if err := srv.setupHTTP2_Serve(); err != nil { return err } srv.trackListener(l, true) defer srv.trackListener(l, false) baseCtx := context.Background() // base is always background, per Issue 16220 ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, e := l.Accept() if e != nil { select { case <-srv.getDoneChan(): return ErrServerClosed default: } if ne, ok := e.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay) time.Sleep(tempDelay) continue } return e } tempDelay = 0 c := srv.newConn(rw) /***********************Modify start *********************************/ //Put the processing of c.server(ctx) into the thread pool //First, you need to request the path, and get the corresponding thread pool according to the path c.r = &connReader{conn: c} c.r.setReadLimit(c.server.initialReadLimitSize()) //Without setReadLimit, the data in the buffer stream cannot be read c.bufr = newBufioReader(c.r)//to read the stream s,err := c.bufr.Peek(100)//The buffered stream uses peek(), the cursor will not be counted, so that the data in the stream can be reused in subsequent processing. Otherwise, subsequent read streams will start from the cursor news := make([]byte,0) for i:=0;i<100;i++ { news = append(news,s[i]) if s[i] == 10 { //10 means newline, get the required information here break } } if err != nil { fmt.Errorf("my err:%v",err) } newss := string(news) //Request path as thread pool name poolName := newss[strings.Index(newss,"/"):strings.LastIndex(newss," ")] c.setState(c.rwc, StateNew) // before Serve can return //go c.serve(ctx) //source code //Put it into the thread pool to process the request putPoolMap(poolName).AddTask(func() { c.serve(ctx) }) /***********************End of modification ******************************/ } } //Generate thread pool //-parameter 1: thread pool size //-parameter 2: thread pool name func PutPoolMap(poolSize int,name string) *myroutine.RoutinePool{ if _,ok := PoolMap[name]; !ok { //If there is no corresponding thread pool, generate one PoolMap[name] = myroutine.Init(poolSize,name) } //Returns the corresponding thread pool return PoolMap[name] } //By default, this method is used to generate a thread pool //-parameter 1: thread pool name func putPoolMap(name string) *myroutine.RoutinePool{ return PutPoolMap(DefaultPoolSize,name) }