This article environment CentOS8.0, PHP8.1, Nginx1.8, and workman 4.0\ If you don't understand, you can comment and contact me. OwenZhang owns the copyright. For commercial reprint, please contact OwenZhang for authorization. For non-commercial reprint, please indicate the source.

Message mode introduction
Generally speaking, message queuing has two modes:
Producer consumer mode (Queue);

The message producer produces messages and sends them to the queue, and then the message consumer takes them out of the queue and consumes them.
After the message is consumed, there is no storage in the queue, so it is impossible for the message consumer to consume the consumed message. The queue supports multiple consumers, but only one consumer can consume a message.
One is the publisher subscriber mode (Topic);

The message producer (publish) publishes the message to topic, and multiple message consumers (subscribers) consume the message at the same time.
Unlike the peer-to-peer method, messages published to topic will be consumed by all subscribers.
Message queues in both scenarios can be implemented using redis.
Queue mode introduction
The producer puts the production messages into the queue, and multiple consumers listen to the queue at the same time. Whoever grabs the message first will take the message from the queue, that is, each message can only be owned by one consumer at most.
The specific method is to create a task queue. The producer takes the initiative to lpush messages while the consumer takes the initiative to rpop data. However, there is a problem that consumers need to take the initiative to request data. Periodic requests will cause a waste of resources. If it is possible to notify the consumer once a new message is added to the queue, then this requirement can be realized with the help of brpop command. The brpop command is similar to the rpop command. The only difference is that when there is no element in the list, the brpop command will block the connection until a new element is added.
config/process.php
copy// Queue consumption 'redis_consumer' => [ 'handler' => Webman\RedisQueue\Process\Consumer::class, 'count' => cpu_count() * 3, // Multiple processes can be set 'constructor' => [ // Consumer catalog 'consumer_dir' => app_path() . '/queue/redis', ], ],
config/redis_queue.php
copy<?php return [ 'default' => [ 'host' => 'redis://127.0.0.1:6379', 'options' => [ 'auth' => env('REDIS_PASSWORD',''), // Password, optional parameters 'db' => 3, // database 'max_attempts' => 5, // Retry times after consumption failure 'retry_seconds' => 5, // Retry interval in seconds ] ], ];
config/redis.php
copy<?php /** * This file is part of webman. * * Licensed under The MIT License * For full copyright and license information, please see the MIT-LICENSE.txt * Redistributions of files must retain the above copyright notice. * * @author walkor<walkor@workerman.net> * @copyright walkor<walkor@workerman.net> * @link http://www.workerman.net/ * @license http://www.opensource.org/licenses/mit-license.php MIT License */ return [ 'default' => [ 'host' => env('REDIS_HOST', '127.0.0.1'), 'password' => env('REDIS_PASSWORD', ''), 'port' => env('REDIS_PORT', '6379'), 'database' => 0, ], ];
app/queue/redis/ConverterSend.php
Queue auto consumption class
copy<?php namespace app\queue\redis; use Exception; use support\Log; use Webman\RedisQueue\Consumer; class ConverterSend implements Consumer { // Queue name to consume public string $queue = 'converter_list'; // Connection name, corresponding to config/redis_ Queue Connections in PHP` public string $connection = 'default'; // consumption /** * Automatic consumption * * Queue production $contact =[ * 'title' => $i, * ]; * $a = RedisQueueService::instance()->_send($contact); * * @throws Exception */ public function consume($data) { Log::info($data['title']); } }
app/service/RedisQueueService.php
Queue production service class
copy<?php // ******************* // ***Message queue producer // ******************* namespace app\service; use support\Log; use Webman\RedisQueue\Client; class RedisQueueService { public static ?RedisQueueService $_instance = null; protected string $queue_name = 'converter_list'; /** * @return RedisQueueService|mixed */ public static function instance(): ?RedisQueueService { if (!static::$_instance) static::$_instance = new self(); return static::$_instance; } /** * @param string $name * @return $this */ public function setName(string $name = ''): RedisQueueService { $this->queue_name = $name; return $this; } /** * @return string */ public function getName(): string { return $this->queue_name; } /** * Post message * * @param array $data * @param int $delay * @return bool */ public function _send(array $data = [], int $delay = 0): bool { $queue = $this->queue_name; Client::send($queue, $data, $delay); // Post a delayed message that will be processed in $delay seconds return true; } }
Queue production instance
copy$contact = [ 'title' => 1, ]; RedisQueueService::instance()->_send($contact);
Effect consumption capacity speed
Simple consumption write log
Log::info($data'title');
Server configuration:
2-core 2G
Queue quantity consumption time
- 1W=2S
- 10W=21S
- 100W=1m33s
The effect is good, hee hee