What is spring cloud stream?
Spring Cloud Stream is a framework for building message driven microservices. Applications interact with the binder object in Spring Cloud Stream through inputs or outputs. We configure binding, and the binder object of Spring Cloud Stream is responsible for interacting with message middleware. By using Spring Integration to connect the message broker middleware to realize message event driven.
SpringCloud Stream provides personalized automatic configuration for some vendors' message middleware products, citing the three core concepts of publish subscribe, consumer group and partition. At present, it only supports RabbitMQ and Kafka.
SpringCloud Stream official website address:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
Why spring cloud stream
For example, we use RabbitMQ and Kafka. Due to the different architectures of the two message oriented middleware, RabbitMQ has exchange and Kafka has Topic and Partitions partitions. The differences of these middleware lead to some troubles in our actual project development. If we use one of the two message queues and the business requirements behind it, I want to migrate to another message queue. At this time, it is undoubtedly disastrous. A lot of things have to be pushed down and redone because it is coupled with our system, At this time, springcloud Stream provides us with a way to decouple.
Binder, an important component of spring cloud stream
Without the concept of binder, when our SpringBoot application wants to directly interact with message middleware, due to the different original intention of each message middleware, there will be great differences in their implementation details. By defining binder as the middle layer, it perfectly realizes the isolation between application and message middleware details. The further encapsulation of message oriented middleware by Stream can make the code layer insensitive to the middleware, and even dynamically switch the middleware (rabbitmq to kafka), which makes the development of micro services highly decoupled, and services can pay more attention to their own business processes.
By defining binder as the middle layer, the isolation between application and message middleware details is realized. Binder can generate binding. Binding is used to bind producers and consumers of message containers. It has two types: INPUT and OUTPUT. INPUT corresponds to consumers and OUTPUT corresponds to producers.
Spring cloudstream processing flow
Spring cloud stream API and annotation description
- Middleware middleware currently only supports RabbitMQ and Kafka
- Binder: binder is the encapsulation between application and message. At present, binder of Kafka and RabbitMQ is implemented. Binder can easily connect middleware and dynamically change message types.
- @Input: the annotation identifies the input channel through which messages received enter the application
- @Output: the annotation identifies the output channel through which the published message will leave the application
- @StreamListener: listens to the queue and is used to receive messages from the consumption queue
- @EnableBinding: the value channel is bound with Exchange
Spring cloud stream project integration
We have built RabbitMQ before, so the RabbitMQ environment has no problem. Now we add three sub modules in the project
pcloud-stream-rabbitmq-provider9010 acts as a message sending module for producers
Pccloud stream rabbitmq consumer 9011 as message receiving module
Pcloud-stream-rabbitmq-consumer 9012 as message receiving module
Create a new module pccloud stream rabbitmq provider9010
Modify pom file
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Add application YML file:
server: port: 9010 spring: application: name: pcloud-stream-provider cloud: stream: binders: # Configure the service information of rabbitmq to be bound here; defaultRabbit: # Represents the name of the definition, which is used for binding integration type: rabbit # Message component type environment: # Set the related environment configuration of rabbitmq spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # Integration of services output: # This name is the name of a channel destination: studyExchange # Indicates the Exchange name definition to use content-type: application/json # Set the message type, json this time, and "text/plain" for text binder: defaultRabbit # Set the specific settings of the message service to be bound # Configure Eureka registration on the client eureka: client: service-url: defaultZone: http://eureka6001:6001/eureka/
New startup class: StreamMain9010
package com.younger.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamMain9010 { public static void main(String[] args) { SpringApplication.run(StreamMain9010.class, args); } }
New MessageProvider class
package com.younger.springcloud.service; public interface MessageProvider { String send(); }
New MessageProviderImpl class
package com.younger.springcloud.service.impl; import com.younger.springcloud.service.MessageProvider; import org.springframework.integration.support.MessageBuilder; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.Date; @EnableBinding(Source.class) public class MessageProviderImpl implements MessageProvider { // Message sending pipeline Output @Resource private MessageChannel messageChannel; @Override public String send() { String message = new Date() + "@ younger Hello"; messageChannel.send(MessageBuilder.withPayload(message).build()); System.out.println("Send message:" + message); return message; } }
New SendMessageController class
package com.younger.springcloud.controller; import com.younger.springcloud.service.MessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class SendMessageController { @Resource private MessageProvider messageProvider; /** * send message * @return */ @GetMapping("/sendMessage") public String sendMessage() { return messageProvider.send(); } }
Start project test:
Access address: http://localhost:9010/sendMessage
Create a new module pccloud stream rabbitmq consumer9011
Add pom file:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>pcloud</artifactId> <groupId>com.younger.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <groupId>com.younger.springcloud</groupId> <artifactId>pcloud-stream-rabbitmq-consumer9011</artifactId> <version>1.0-SNAPSHOT</version> <name>pcloud-stream-rabbitmq-consumer9011</name> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
Add application YML file:
server: port: 9011 spring: application: name: pcloud-stream-consumer cloud: stream: binders: # Configure the service information of rabbitmq to be bound here; defaultRabbit: # Represents the name of the definition, which is used for binding integration type: rabbit # Message component type environment: # Set the related environment configuration of rabbitmq spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # Integration of services input: # This name is the name of a channel destination: myExchange # Indicates the Exchange name definition to use content-type: application/json # Set the message type. This time, it is the object json. If it is text, set "text/plain" binder: defaultRabbit # Set the specific settings of the message service to be bound eureka: client: # Configure Eureka registration on the client service-url: defaultZone: http://eureka6001:6001/eureka/
New startup class: StreamMain9011
package com.younger.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamMain9011 { public static void main(String[] args) { SpringApplication.run(StreamMain9011.class, args); } }
New ReceiveMessageListener class:
package com.younger.springcloud.service; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("Message received:"+ message.getPayload() +" The port number is:\t " + serverPort); } }
Start project test:
Send message: http://localhost:9010/sendMessage
You can see that we received the message on 9010.
We will create a new Module pcloud-stream-rabbitmq-consumer9012
Add pom file:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Add application YML file:
server: port: 9012 spring: application: name: pcloud-stream-consumer cloud: stream: binders: # Configure the service information of rabbitmq to be bound here; defaultRabbit: # Represents the name of the definition, which is used for binding integration type: rabbit # Message component type environment: # Set the related environment configuration of rabbitmq spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # Integration of services input: # This name is the name of a channel destination: myExchange # Indicates the Exchange name definition to use content-type: application/json # Set the message type. This time, it is the object json. If it is text, set "text/plain" binder: defaultRabbit # Set the specific settings of the message service to be bound eureka: client: # Configure Eureka registration on the client service-url: defaultZone: http://eureka6001:6001/eureka/
New startup class: StreamMain9012
package com.younger.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamMain9012 { public static void main(String[] args) { SpringApplication.run(StreamMain9012.class, args); } }
New ReceiveMessageListener class:
package com.younger.springcloud.service; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("Message received:"+ message.getPayload() +" The port number is:\t "+ serverPort); } }
Start project test:
Send message: http://localhost:9010/sendMessage
9011 and 9012 have received messages, which leads to the problem of repeated consumption. In order to avoid the problem of repeated consumption, Stream is solved by message grouping. When microservice applications are placed in the same group, it can ensure that messages will only be consumed by one of them once.
Different groups can be consumed. There will be competition in the same group, and only one of them can be consumed.
We modify the yml files of 9011 and 9012:
Set 9011 and 9012 to the same group.
Restart 9011 and 9012 tests:
Send message: http://localhost:9010/sendMessage
As you can see, only 9011 received the message.
Let's now imagine that we close 9011 and 9012 and then send messages through 9010. Will our message be lost? Let's test:
Close 9011 and 9012
Send message: http://localhost:9010/sendMessage
Let's restart 9011 and 9012 to see:
As you can see, we can receive messages, which shows that spring cloud stream can persist messages.