redis queue consumption instance of workerman

This article environment CentOS8.0, PHP8.1, Nginx1.8, and workman 4.0\ If you don't understand, you can comment and contact me. OwenZhang owns the copyright. For commercial reprint, please contact OwenZhang for authorization. For non-commercial reprint, please indicate the source.

Message mode introduction

Generally speaking, message queuing has two modes:

Producer consumer mode (Queue);

The message producer produces messages and sends them to the queue, and then the message consumer takes them out of the queue and consumes them.

After the message is consumed, there is no storage in the queue, so it is impossible for the message consumer to consume the consumed message. The queue supports multiple consumers, but only one consumer can consume a message.

One is the publisher subscriber mode (Topic);

The message producer (publish) publishes the message to topic, and multiple message consumers (subscribers) consume the message at the same time.

Unlike the peer-to-peer method, messages published to topic will be consumed by all subscribers.

Message queues in both scenarios can be implemented using redis.

Queue mode introduction

The producer puts the production messages into the queue, and multiple consumers listen to the queue at the same time. Whoever grabs the message first will take the message from the queue, that is, each message can only be owned by one consumer at most.

The specific method is to create a task queue. The producer takes the initiative to lpush messages while the consumer takes the initiative to rpop data. However, there is a problem that consumers need to take the initiative to request data. Periodic requests will cause a waste of resources. If it is possible to notify the consumer once a new message is added to the queue, then this requirement can be realized with the help of brpop command. The brpop command is similar to the rpop command. The only difference is that when there is no element in the list, the brpop command will block the connection until a new element is added.

config/process.php

    // Queue consumption
    'redis_consumer' => [
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => cpu_count() * 3, // Multiple processes can be set
        'constructor' => [
            // Consumer catalog
            'consumer_dir' => app_path() . '/queue/redis',
        ],
    ],
	
copy

config/redis_queue.php

<?php

return [
    'default' => [
        'host'    => 'redis://127.0.0.1:6379',
        'options' => [
            'auth'          => env('REDIS_PASSWORD',''),            // Password, optional parameters
            'db'            => 3,                       // database
            'max_attempts'  => 5,                       // Retry times after consumption failure
            'retry_seconds' => 5,                       // Retry interval in seconds
        ]
    ],
];
copy

config/redis.php

<?php
/**
 * This file is part of webman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */

return [
    'default' => [
        'host'     => env('REDIS_HOST', '127.0.0.1'),
        'password' => env('REDIS_PASSWORD', ''),
        'port'     => env('REDIS_PORT', '6379'),
        'database' => 0,
    ],
];
copy

app/queue/redis/ConverterSend.php

Queue auto consumption class

<?php

namespace app\queue\redis;

use Exception;
use support\Log;
use Webman\RedisQueue\Consumer;

class ConverterSend implements Consumer
{
    // Queue name to consume
    public string $queue = 'converter_list';

    // Connection name, corresponding to config/redis_ Queue Connections in PHP`
    public string $connection = 'default';

    // consumption

    /**
     * Automatic consumption
     *
     * Queue production $contact =[
     * 'title' => $i,
     * ];
     * $a = RedisQueueService::instance()->_send($contact);
     *
     * @throws Exception
     */
    public function consume($data)
    {
        Log::info($data['title']);
    }
}
copy

app/service/RedisQueueService.php

Queue production service class

<?php

// *******************
// ***Message queue producer
// *******************

namespace app\service;

use support\Log;
use Webman\RedisQueue\Client;

class RedisQueueService
{
    public static ?RedisQueueService $_instance  = null;
    protected string                 $queue_name = 'converter_list';

    /**
     * @return RedisQueueService|mixed
     */
    public static function instance(): ?RedisQueueService
    {
        if (!static::$_instance) static::$_instance = new self();
        return static::$_instance;
    }

    /**
     * @param string $name
     * @return $this
     */
    public function setName(string $name = ''): RedisQueueService
    {
        $this->queue_name = $name;
        return $this;
    }

    /**
     * @return string
     */
    public function getName(): string
    {
        return $this->queue_name;
    }

    /**
     * Post message
     *
     * @param array $data
     * @param int   $delay
     * @return bool
     */
    public function _send(array $data = [], int $delay = 0): bool
    {
        $queue = $this->queue_name;
        Client::send($queue, $data, $delay);      // Post a delayed message that will be processed in $delay seconds
        return true;
    }
}
copy

Queue production instance

$contact = [
            'title' => 1,
        ];
RedisQueueService::instance()->_send($contact);
copy

Effect consumption capacity speed

Simple consumption write log

Log::info($data'title');

Server configuration:

2-core 2G

Queue quantity consumption time

  • 1W=2S
  • 10W=21S
  • 100W=1m33s

The effect is good, hee hee

Posted by wattsup88 on Tue, 31 May 2022 06:13:00 +0530