A stress test script for implementing MQTT protocol services based on Locust

Recently, during the interval of busy business, I did some performance tests interspersed.

1. Background

A brief introduction to the business background is that according to the national standard, vehicles need to upload some specified data to ZF’s specified platform, and at the same time, vehicles will also transmit data to enterprise cloud services, so some performance requirements have arisen.

At present, we have simply conducted a performance scenario test first, which is to evaluate whether the current service can support the expected maximum simultaneous online vehicle upload data. After evaluation, the online vehicle data is carried out 10 times as expected, and will continue to run for 12 hours later to check the stability of the service link.

This article is not a rigorous performance test process result sharing, mainly to share the writing of pressure test scripts about mqtt protocol services. Because I have never been in touch with the pressure test of the MQTT protocol before, and the content of the relevant pressure test scripts on the Internet is also quite messy, so record it for reference only.

Just click on the link to know what data needs to be generated (because the service has not yet been used online, so the generated pressure test data can be directly cleaned up later.):

  1. Some pre-data: such as database, vehicle data involved in the cache, communication key data, etc. These can be generated at one time by writing a script before.
  2. Data reported by vehicles: The data reported by vehicles to the cloud has undergone a series of encryption and transcoding, and decryption has to be designed during this period. After evaluation, this can simplify some of the environments, so all vehicles can directly send the same data That's it.
  3. Vehicle data: The last step is to generate the corresponding vehicle data, go online at the same time, and send the data according to the evaluation frequency.

Among them, the data in the first and second steps can be generated separately before, and the data sent by the vehicle in the third step is what the stress test script needs to do.

2. Technology selection

This one is pretty fast. I searched it with a search engine, but there is very little content, or very little content that is useful to me. I have seen that jmeter has related plug-ins, but I basically veto this solution. Firstly, I am not good at using it, and secondly, I think it will definitely be more troublesome to use than coding by myself.

So I continued to code. I still prefer python. I thought of the locust library. Later, when I read the official documents, I saw that locust also expanded some content for the mqtt protocol. But I tried it and it didn't meet my needs, and maybe I used it wrong at the time, so I had to write it from scratch by myself.

During the search, I found that the library used for the mqtt protocol in Python is called paho.mqtt, which supports connection agents, message subscription, sending and receiving, etc., so I finally decided to use the combination of locust+paho.mqtt to realize this load script.

3. Code writing

1. Script code

There is no code layering for the time being. The current scene is simple, so I put it in a module directly. It is a bit long, so I will post it first, and the key content of the script will be disassembled in the later part.

The script currently does these things:

  • Query all available test vehicle information data from db
  • According to the input parameters of the command line, specify the number of vehicles to start and the frequency of establishing a connection with the broker agent
  • After establishing a successful connection, the vehicle can send data to the broker according to the frequency specified in the script
  • The script counts the number of connections, requests, response time and other information and writes them into the report
  • When the debugging encounters the situation that the vehicles will be disconnected in batches, when the vehicle is disconnected, the disconnection time and vehicle information will be written to the local csv, which is convenient for viewing and analysis the next day.
import csv
import datetime
import queue
import os
import sys
import time
import ssl

from paho.mqtt import client as mqtt_client

# Path adaptation according to different systems
if os.name == "nt":
    path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    sys.path.insert(0, path)
    from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
    sys.path.append("/app/qa_test_app/")
    from GB_test.utils.mysql_operating import DB

from locust import User, TaskSet, events, task, between, run_single_user


BROKER_ADDRESS = "broker service address"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000  # overtime time
TEST_TOPIC = "test_topic"

TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # The test data used for publish ing is only for illustration

BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # The business needs to be converted into byte type before sending

# create queue
client_queue = queue.Queue()

# Connect to DB, read vehicle data
db = DB("db_vmd")
select_sql = "select xxxx"  
client_list = db.fetch_all(select_sql)
print("After the vehicle data query is completed, the data volume is:{}".format(len(client_list)))
for t in client_list:
    # Store available vehicle information in the queue
    client_queue.put(t)


def fire_success(**kwargs):
    """Called when the request is successful"""
    events.request.fire(**kwargs)


def calculate_resp_time(t1, t2):
    """Calculate response time"""
    return int((t2 - t1) * 1000)


class MQTTMessage:
    """Sent message entity class"""
    def __init__(self, _type, qos, topic, payload, start_time, timeout):
        self.type = _type,
        self.qos = qos,
        self.topic = topic
        self.payload = payload
        self.start_time = start_time
        self.timeout = timeout


# Count the total number of successfully sent messages
total_published = 0
disconnect_record_list = []  # Defines a list container for disconnected records


class PublishTask(TaskSet):

    @task
    def task_publish(self):
        self.client.loop_start()
        topic = TEST_TOPIC
        payload = BYTES_DATA
        # Record sending start time
        start_time = time.time()
        mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
        published_mid = mqtt_msg_info.mid
        # Put the successfully sent message content into the published_message field of the client instance
        self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
                                                                   0,
                                                                   topic,
                                                                   payload,
                                                                   start_time,
                                                                   PUBLISH_TIMEOUT)
        # send success callback
        self.client.on_publish = self.on_publish
        # disconnect callback
        self.client.on_disconnect = self.on_disconnect

    @staticmethod
    def on_disconnect(client, userdata, rc):
        """ broker The connection is broken, put into the list container"""
        disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
        disconnect_record_list.append(disconnected_info)
        print("rc state:{} - -".format(rc), "{}-broker The line is disconnected".format(str(client._client_id)))

    @staticmethod
    def on_publish(client, userdata, mid):
        if mid:
            # Record the time when the message was successfully sent
            end_time = time.time()
            # Get the message from the sent message container
            message = client.published_message.pop(mid, None)
            # Calculate the time from the start of sending to the successful sending
            publish_resp_time = calculate_resp_time(message.start_time, end_time)
            fire_success(
                request_type="p_success",
                name="client_id: " + str(client._client_id),
                response_time=publish_resp_time,
                response_length=len(message.payload),
                exception=None,
                context=None
            )
            global total_published
            # Successfully sent accumulative 1
            total_published += 1


class MQTTLocustUser(User):
    tasks = [PublishTask]
    wait_time = between(2, 2)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Get the client username and client_id from the queue
        current_client = client_queue.get()

        self.client = mqtt_client.Client(current_client[1])
        self.client.username_pw_set(current_client[0], PASSWORD)
        # self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # Simulate client connection error

        # Define a container to store sent messages
        self.client.published_message = {}

    def on_start(self):
        # set tls
        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
        self.client.tls_set_context(context)

        self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
        self.client.on_connect = self.on_connect

    def on_stop(self):
        print("publish success, The current number of successfully sent:{}".format(total_published))
        if len(disconnect_record_list) == 0:
            print("no disconnected client")
        else:
            # Write the information in the disconnection record to csv
            with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
                for i in disconnect_record_list:
                    writer.writerow(i)
            print("disconnected client information has been written csv document")

    @staticmethod
    def on_connect(client, userdata, flags, rc, props=None):
        if rc == 0:
            print("rc state:{} - -".format(rc), "{}-connect broker success".format(str(client._client_id)))
            fire_success(
                request_type="c_success",
                name='count_connected',
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )
        else:
            print("rc state:{} - -".format(rc), "{}-connect broker fail".format(str(client._client_id)))
            fire_success(
                request_type="c_fail",
                name="client_id: " + str(client._client_id),
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )


if __name__ == '__main__':
    run_single_user(MQTTLocustUser)

2. Code analysis - locust library part

The concurrent request capability is still the capability of the locust library used. The official only provides related classes of the http protocol interface, and does not directly provide the mqtt protocol, but we can customize related classes according to the official specifications, as long as we inherit User and TaskSet.

User class

The first is to define the User class, which is used to generate the vehicle I want to test.


When the class is initialized, in the yellow box, it will go to the queue to take out the vehicle information and use it to make some related settings. The client comes from the capability provided by from paho.mqtt import client as mqtt_client, fixed usage, just use it according to other people's documents.

In the red box are two important familiar attributes of the User class:

tasks: This defines what the generated user needs to do, that is, the content defined under the PublishTask class in the corresponding script.

wait_time: The time that the user stays during the execution of the task can be an interval, which is random in it. What I mean here is to send data to the broker every 2s.

In the green box, a dictionary container is defined, which is used to store the content of the message that the current user has successfully sent, because I will take it out later and write the relevant data in it to the generated report.

There are 2 methods in the blue box, which are also the capabilities provided by locust:

on_start: Called when the user starts running. Here I have done the processing of the vehicle connection broker proxy. Note that tls needs to be set here because the service connection requires it.

on_stop: Called when the user finishes running, here I do some other processing, such as writing the vehicle information that is disconnected during the running to the local csv.

TaskSet class

After defining the User class, you need to define the TaskSet class. You have to tell the generated users what to do.

According to my business needs, I just let the vehicle send data to the broker continuously.

The red part is also the capability provided by paho.mqtt, which will start a new thread to execute the things you define.

The yellow part is to do the operation of sending data, and I can get some returns. Check the source code to know that the return is the MQTTMessageInfo class.

Note the 2 properties returned:

  • mid: returns the order in which this message was sent
  • rc: Indicates the status of the response sent, 0 means success

For the green part, remember that I defined a container in the User class above, and here I put the information related to the sent message into the container for later use.

2. Code analysis - part of the paho.mqtt library

The above code has already used a lot of capabilities of paho.mqtt, and here is an overall review.

  • client.Client(): declare a client
  • client.username_pw_set(): Set the client's username and password
  • client.tls_set_context: set ssl mode
  • client.connect(): connect proxy
  • client.publish: Push messages to the agent

Some callback functions are also used:

  • on_connect: callback when the connection operation is successful
  • on_publish: callback when publishing succeeds
  • on_disconnect: callback when the client disconnects from the broker

In addition, an event function events.request is used.

It will be called when the client sends a request, whether the request is successful or the request fails; when I need to customize my report content, I need to use this event.

Check the source code and know which parameters to pass in it, then we need to pass in the corresponding parameters when calling.

For example, I called this method in the send callback function.

So in the end, the report displayed on the console has the content I defined.

Since I found out later in use that I don’t know when batch disconnection will occur, so I added corresponding processing in the on_disconnect callback function, recorded the relevant disconnection information, and wrote it to the local file at the end of the operation. .

Later, I took the initiative to test the writing result of the file when the client was disconnected, and the function was normal.

3. Summary

It will start to run later. During the running process, the development focuses on various indicators of link services, which will not be expanded here. If you are busy with business, you don't do this too much, and it's not professional. Indeed, many problems were found, which will be gradually optimized later, and then continue testing.

Now it has been running stably for 12 hours, and the service is normal, so it will come to an end for the time being. There will be other related performance test scenarios later, and then we can share them in a targeted manner.

In addition, this script sharing is only for reference. Now I am using it simply, as long as it can be used, there may be some unreasonable places that need to be optimized. If you need it, please refer to the relevant documents yourself.

Finally: The following complete software testing video learning tutorial has been sorted out and uploaded, and friends can get it for free if they need it [Guaranteed 100% free]

These materials should be the most comprehensive and complete preparation warehouse for [software testing] friends. This warehouse has also accompanied tens of thousands of test engineers through the most difficult journey. I hope it can help you too!

Tags: Python Programmer Testing software testing programming language

Posted by pukstpr12 on Fri, 24 Mar 2023 21:22:00 +0530