Implementing Websocket clustering with message oriented middleware Kafka in SpringBoot

1. In the actual project, due to the increase of data volume and concurrency, it is impossible for us to use only one websocket service. At this time, we need to use the websocket cluster. But websocket cluster will encounter some problems. First of all, we will definitely think of directly putting websocket sessions into Redis and other cache servers, and then directly obtaining them in Redis when using them. However, websocket sessions are special and cannot be serialized, because websocket sessions are stateful, and websocket sessions are time effective. Once the connection is disconnected, the session will become invalid.

2. Three methods to solve Websocket cluster

2.1. Assign all associated users (i.e. objects that may chat) to a websocket service through corresponding algorithms. In this way, there will be no case that the chat object cannot receive the message. However, this method has limitations, that is, users can only chat with associated users, not with other users who have not established an association.

2.2. Use Redis' message subscription function to implement WebSocket cluster. The general idea is as follows.

2.3 use message oriented middleware such as Kafka to implement websocket cluster. This is also the way I choose at present. In fact, this method is similar to Redis' message subscription. However, we only use Redis as a cache. We don't want Redis to involve too much business processing, so we choose Kafka.

2.3.3 add Kafka dependency in the pom file of the project (Note: the version of Kafka dependency must be consistent with the version installed on the server)

        <!-- kafka rely on -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.0.RELEASE</version>
        </dependency>

2.3.4 establish Kafka producer Bean

 

package com.yxl.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: yxl
 * @Description: Kafka Producer (message sender)
 * @DATE: Created in 2018/11/14
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    public Map<String, Object> producerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", "kafka colony IP1:9092,kafka colony IP2:9092");
        properties.put("acks", "all");//ack is a condition to judge whether the request is complete (that is, whether it has been successfully sent). We specified that "all" will block messages. This setting has the lowest performance, but is the most reliable.
        properties.put("retries", 0);//If the request fails, the producer will automatically retry. We specify 0 times. If retry is enabled, there will be the possibility of duplicate messages.
        properties.put("batch.size", 16384);//The producer cache does not send messages per partition. The cache size is determined by batch Size configuration specified
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

}

 

2.3.4 establish Kafka's consumer Bean and consumer monitoring

package com.yxl.configuration;

import com.yxl.myListener.MyKafkaListener;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @Author: yxl
 * @Description: Kafka consumer
 * @DATE: Created in 2018/11/14
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", "kafka colony IP1:9092,kafka colony IP2:9092");
        properties.put("group.id", getIPAddress()); //Get server Ip as groupId
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    public String getIPAddress() {
        try {
            InetAddress address = InetAddress.getLocalHost();
            if (address != null && StringUtils.isNotBlank(address.getHostAddress())) {
                return address.getHostAddress();
            }
        }catch (UnknownHostException e) {
            return UUID.randomUUID().toString().replace("-","");
        }
        return UUID.randomUUID().toString().replace("-","");
    }

    /**
      * Custom listening
      */
    @Bean
    public MyKafkaListener listener() {
        return new MyKafkaListener();
    }
}

2.3.4 consumer monitoring

package com.yxl.myListener;

import com.yxl.websocket.ChatWebsocket;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * @Author: yxl
 * @Description:
 * @DATE: Created in 2018/11/14
 */
public class MyKafkaListener {

    Logger logger = Logger.getLogger(MyKafkaListener.class);

    /**
     * Listening when sending chat messages
     * @param record
     */
    @KafkaListener(topics = {"chatMessage"})
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("chatMessage Send chat message listening:"+record.value().toString());
        ChatWebsocket chatWebsocket = new ChatWebsocket();
        chatWebsocket.kafkaReceiveMsg(record.value().toString());
    }

    /**
     * Listening when closing a connection
     * @param record
     */
    @KafkaListener(topics = {"closeWebsocket"})
    private void closeListener(ConsumerRecord<?, ?> record) {
        logger.info("closeWebsocket close websocket Connection monitoring:"+record.value().toString());
        ChatWebsocket chatWebsocket = new ChatWebsocket();
        chatWebsocket.kafkaCloseWebsocket(record.value().toString());
    }

}

2.3.6 Websocket cluster java code

package com.kk.server.chat.websocket;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Websocket colony
 * Created by yxl on 2018-11-17.
 */
@ServerEndpoint("/chat/{userId}")
@Component
public class ChatWebsocket {

    private Logger logger = Logger.getLogger(ChatWebsocket.class);

    private static ApplicationContext applicationContext;

    private KafkaTemplate kafkaTemplate;

    //A static variable used to record the current number of online connections. It should be designed to be thread safe.
    private static int onlineCount = 0;
    //The thread safe Set of the concurrent package is used to store the MyWebSocket object corresponding to each client. To realize the communication between the server and a single client, you can use the Map to store it, where the Key can be the user ID
    private static Map<String, Session> drWebSocketSet = new ConcurrentHashMap<>(); //Doctor web


    /**
     * Method successfully called for connection establishment
     *
     * @param userId     User ID
     */
    @OnOpen
    public void onOpen(@PathParam("userId") String userId, Session session) {

        if (kafkaTemplate == null) {
            kafkaTemplate = applicationContext.getBean(KafkaTemplate.class); //Get the Bean instance of kafka
        }

        drWebSocketSet.put(userId, session);
    }


    /**
     * s
     * Method called after receiving a client message
     *
     * @param message Messages sent by the client
     * @param session Optional parameters
     */
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        if ("ping".equals(message)) {
            session.getBasicRemote().sendText("pong"); //heartbeat
        } else {
            sendMessage(message, session); //Call Kafka for message distribution
        }


    }

    /**
     * send message
     *
     * @param message
     * @param session
     * @throws IOException
     */
    public void sendMessage(String message, Session session) throws IOException {
        if (StringUtils.isNotBlank(message)) {

            JSONObject jsonObject = JSONObject.parseObject(message);

            String sender_id = jsonObject.getString("sender_id"); //Sender ID
            String receiver_id = jsonObject.getString("receiver_id"); //Recipient ID

            //TODO can be optimized here. You can first use the userId of the receiver, that is, the receiver_id to determine whether the receiver is on the current server. If yes, you can directly obtain the session and send it. You don't need to go to Kafka to save resources
            kafkaTemplate.send("chatMessage", message);
        }
    }

    /**
     * Method invoked for connection closure
     */
    @OnClose
    public void onClose(Session session) {
        Map<String, String> pathParameters = session.getPathParameters();
        String userId = pathParameters.get("userId"); //Get userId from session
        Map<String, String> map = new HashMap<>();
        map.put("username", userId);
        kafkaTemplate.send("closeWebsocket", JSON.toJSONString(map));
    }
}

    /**
     * Close connection
     *
     * @param map map of the currently logged in client
     */
    private void close(Map<String, Session> map, String username) {
        if (StringUtils.isNotBlank(username)) {
            logger.info("close websocket link,Close client username:" + username);
            if (map.get(username) != null) {
                map.remove(username);
            }
        }
    }

    /**
     * kafka Send a message to listen to the event, and distribute the message
     *
     * @param message
     * @author yxl
     */
    public void kafkaReceiveMsg(String message) {
        JSONObject jsonObject = JSONObject.parseObject(message);

        String receiver_id = jsonObject.getString("receiver_id"); //Recipient ID

        if (drWebSocketSet.get(receiver_id) != null) {
            drWebSocketSet.get(receiver_id).getBasicRemote.sendText(message); //Send messages
        }
    }

    /**
     * kafka Listen to close websocket connection
     *
     * @param closeMessage
     */
    public void kafkaCloseWebsocket(String closeMessage) {
        JSONObject jsonObject = JSONObject.parseObject(closeMessage);
        String userId = jsonObject.getString("userId");
        drWebSocketSet.remove(userId);
    }



    /**
     * Called when an error occurs
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.info("webscoket An error occurred!close websocket link");
        //onClose(session);
        error.printStackTrace();
        logger.info("webscoket An error occurred!" + error.getMessage());
    }

}

Corresponding Bean instances cannot be directly injected into websocket. You can read another blog of mine at this time https://www.cnblogs.com/Amaris-Lin/p/9038813.html

Tags: websocket

Posted by shneoh on Mon, 30 May 2022 03:49:40 +0530