Microservice spring cloud (stream message driven) integration

What is spring cloud stream?

Spring Cloud Stream is a framework for building message driven microservices. Applications interact with the binder object in Spring Cloud Stream through inputs or outputs. We configure binding, and the binder object of Spring Cloud Stream is responsible for interacting with message middleware. By using Spring Integration to connect the message broker middleware to realize message event driven.
SpringCloud Stream provides personalized automatic configuration for some vendors' message middleware products, citing the three core concepts of publish subscribe, consumer group and partition. At present, it only supports RabbitMQ and Kafka.

SpringCloud Stream official website address:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/

Why spring cloud stream

For example, we use RabbitMQ and Kafka. Due to the different architectures of the two message oriented middleware, RabbitMQ has exchange and Kafka has Topic and Partitions partitions. The differences of these middleware lead to some troubles in our actual project development. If we use one of the two message queues and the business requirements behind it, I want to migrate to another message queue. At this time, it is undoubtedly disastrous. A lot of things have to be pushed down and redone because it is coupled with our system, At this time, springcloud Stream provides us with a way to decouple.

Binder, an important component of spring cloud stream

Without the concept of binder, when our SpringBoot application wants to directly interact with message middleware, due to the different original intention of each message middleware, there will be great differences in their implementation details. By defining binder as the middle layer, it perfectly realizes the isolation between application and message middleware details. The further encapsulation of message oriented middleware by Stream can make the code layer insensitive to the middleware, and even dynamically switch the middleware (rabbitmq to kafka), which makes the development of micro services highly decoupled, and services can pay more attention to their own business processes.


By defining binder as the middle layer, the isolation between application and message middleware details is realized. Binder can generate binding. Binding is used to bind producers and consumers of message containers. It has two types: INPUT and OUTPUT. INPUT corresponds to consumers and OUTPUT corresponds to producers.

Spring cloudstream processing flow

Spring cloud stream API and annotation description

  • Middleware middleware currently only supports RabbitMQ and Kafka
  • Binder: binder is the encapsulation between application and message. At present, binder of Kafka and RabbitMQ is implemented. Binder can easily connect middleware and dynamically change message types.
  • @Input: the annotation identifies the input channel through which messages received enter the application
  • @Output: the annotation identifies the output channel through which the published message will leave the application
  • @StreamListener: listens to the queue and is used to receive messages from the consumption queue
  • @EnableBinding: the value channel is bound with Exchange

Spring cloud stream project integration

We have built RabbitMQ before, so the RabbitMQ environment has no problem. Now we add three sub modules in the project
pcloud-stream-rabbitmq-provider9010 acts as a message sending module for producers
Pccloud stream rabbitmq consumer 9011 as message receiving module
Pcloud-stream-rabbitmq-consumer 9012 as message receiving module
Create a new module pccloud stream rabbitmq provider9010


Modify pom file

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

Add application YML file:

server:
  port: 9010

spring:
  application:
    name: pcloud-stream-provider
  cloud:
    stream:
      binders: # Configure the service information of rabbitmq to be bound here;
        defaultRabbit: # Represents the name of the definition, which is used for binding integration
          type: rabbit # Message component type
          environment: # Set the related environment configuration of rabbitmq
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Integration of services
        output: # This name is the name of a channel
          destination: studyExchange # Indicates the Exchange name definition to use
          content-type: application/json # Set the message type, json this time, and "text/plain" for text
          binder: defaultRabbit # Set the specific settings of the message service to be bound

# Configure Eureka registration on the client
eureka:
  client:
    service-url:
      defaultZone: http://eureka6001:6001/eureka/

New startup class: StreamMain9010

package com.younger.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMain9010 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMain9010.class, args);
    }
}


New MessageProvider class

package com.younger.springcloud.service;

public interface MessageProvider {

    String send();
}

New MessageProviderImpl class

package com.younger.springcloud.service.impl;

import com.younger.springcloud.service.MessageProvider;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.Date;

@EnableBinding(Source.class)
public class MessageProviderImpl implements MessageProvider {

    // Message sending pipeline Output
    @Resource
    private MessageChannel messageChannel;

    @Override
    public String send() {
        String message = new Date() + "@ younger Hello";
        messageChannel.send(MessageBuilder.withPayload(message).build());
        System.out.println("Send message:" + message);
        return message;
    }
}

New SendMessageController class

package com.younger.springcloud.controller;

import com.younger.springcloud.service.MessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class SendMessageController {

    @Resource
    private MessageProvider messageProvider;

    /**
     * send message
     * @return
     */
    @GetMapping("/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}

Start project test:

Access address: http://localhost:9010/sendMessage

Create a new module pccloud stream rabbitmq consumer9011



Add pom file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>


  <parent>
    <artifactId>pcloud</artifactId>
    <groupId>com.younger.springcloud</groupId>
    <version>1.0-SNAPSHOT</version>
  </parent>

  <groupId>com.younger.springcloud</groupId>
  <artifactId>pcloud-stream-rabbitmq-consumer9011</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>pcloud-stream-rabbitmq-consumer9011</name>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
 
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

Add application YML file:

server:
  port: 9011

spring:
  application:
    name: pcloud-stream-consumer
  cloud:
    stream:
      binders: # Configure the service information of rabbitmq to be bound here;
        defaultRabbit: # Represents the name of the definition, which is used for binding integration
          type: rabbit # Message component type
          environment: # Set the related environment configuration of rabbitmq
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Integration of services
        input: # This name is the name of a channel
          destination: myExchange # Indicates the Exchange name definition to use
          content-type: application/json # Set the message type. This time, it is the object json. If it is text, set "text/plain"
          binder: defaultRabbit # Set the specific settings of the message service to be bound

eureka:
  client: # Configure Eureka registration on the client
    service-url:
      defaultZone: http://eureka6001:6001/eureka/

New startup class: StreamMain9011

package com.younger.springcloud;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMain9011 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMain9011.class, args);
    }
}


New ReceiveMessageListener class:

package com.younger.springcloud.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("Message received:"+ message.getPayload() +" The port number is:\t " + serverPort);
    }
}

Start project test:

Send message: http://localhost:9010/sendMessage


You can see that we received the message on 9010.

We will create a new Module pcloud-stream-rabbitmq-consumer9012


Add pom file:

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

Add application YML file:

server:
  port: 9012

spring:
  application:
    name: pcloud-stream-consumer
  cloud:
    stream:
      binders: # Configure the service information of rabbitmq to be bound here;
        defaultRabbit: # Represents the name of the definition, which is used for binding integration
          type: rabbit # Message component type
          environment: # Set the related environment configuration of rabbitmq
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Integration of services
        input: # This name is the name of a channel
          destination: myExchange # Indicates the Exchange name definition to use
          content-type: application/json # Set the message type. This time, it is the object json. If it is text, set "text/plain"
          binder: defaultRabbit # Set the specific settings of the message service to be bound

eureka:
  client: # Configure Eureka registration on the client
    service-url:
      defaultZone: http://eureka6001:6001/eureka/

New startup class: StreamMain9012

package com.younger.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMain9012 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMain9012.class, args);
    }
    
}

New ReceiveMessageListener class:

package com.younger.springcloud.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("Message received:"+ message.getPayload() +" The port number is:\t "+ serverPort);
    }
}

Start project test:

Send message: http://localhost:9010/sendMessage


9011 and 9012 have received messages, which leads to the problem of repeated consumption. In order to avoid the problem of repeated consumption, Stream is solved by message grouping. When microservice applications are placed in the same group, it can ensure that messages will only be consumed by one of them once.
Different groups can be consumed. There will be competition in the same group, and only one of them can be consumed.

We modify the yml files of 9011 and 9012:

Set 9011 and 9012 to the same group.

Restart 9011 and 9012 tests:

Send message: http://localhost:9010/sendMessage



As you can see, only 9011 received the message.

Let's now imagine that we close 9011 and 9012 and then send messages through 9010. Will our message be lost? Let's test:
Close 9011 and 9012

Send message: http://localhost:9010/sendMessage

Let's restart 9011 and 9012 to see:


As you can see, we can receive messages, which shows that spring cloud stream can persist messages.

Tags: Spring Cloud Microservices Middleware

Posted by osram on Thu, 17 Mar 2022 00:13:25 +0530