Spring5 reactor functional programming

  • content {:toc}

preface

Reactive programming is a programming paradigm that can replace imperative programming. The reason for this substitution is that reactive programming solves some of the limitations of imperative programming. Understanding these limitations will help you better understand the advantages of the reactive programming model

Reactive flow specification

  • Comparing streams in Java

    Java flows and reactive flows there are many similarities between Java flows and reactive flows. First, they all have the word Stream in their names.

    They also provide functional API s for processing data. In fact, as you will see later when you introduce Reactor, they can even share many of the same operations.

    Java streams are usually synchronized and can only handle a limited set of data. Essentially, they are just a way to iterate over a set using functions.

    Reactive flow supports asynchronous processing of data sets of any size, as well as infinite data sets. As long as the data is ready, they can process the data in real time, and can avoid crushing the data consumers through backpressure.

  • Reactive flow specification

    The reactive flow specification can be summarized into four interfaces: Publisher, Subscriber, Subscription and Processor.

    Publisher is responsible for generating data and sending the data to the Subscription (one Subscription for each Subscriber).

    The Publisher interface declares a method subscribe(), through which a Subscriber can initiate a subscription to Publisher.

  public interface Publisher<T> {
      void subscribe(Subscriber<? super T> var1);
  }


  public interface Publisher<T> {
      void subscribe(Subscriber<? super T> var1);
  }

  public interface Subscriber<T> {
      void onSubscribe(Subscription var1);

      void onNext(T var1);

      void onError(Throwable var1);

      void onComplete();
  }

  public interface Subscription {
      void request(long var1);

      void cancel();
  }

  public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
  
  }

First meet Reactor

The Reactor project is an implementation of the reactive flow specification and provides a set of functional API s for assembling reactive flows.

Reactive programming requires us to think differently from imperative programming. At this point, we will not describe the steps to be carried out at each step. Reactive programming means to build the pipeline through which the data will flow. When the data flows through the pipeline, they can be modified or used in some form.

Imperative programming:

 String a = "Apple";
 String s = a.toUpperCase();
 String s1 = "hello" + s + "!";
 System.out.println(s1);

Reactive programming:

  Mono.just("Apple")
 .map(String::toUpperCase)
 .map(x-> "hello" + x + "!")
 .subscribe(System.out::println);

Console:

helloAPPLE!
14:36:38.685 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
helloAPPLE!

Mono in this example is one of the two core types of Reactor. The other type is flux. Both implement the Publisher interface of reactive flow. Flux represents a pipeline with zero, one or more (possibly infinite) data items

Add Reactor dependency

To start using Reactor, add the following dependencies to the project's build file:

        <!--reactor core-->
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
	</dependency>
		<!--reactor test-->
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<version>3.2.6.RELEASE</version>
		<scope>compile</scope>
	</dependency>
		

Use common reactive operations

Flux and Mono are the most basic building blocks provided by Reactor, and the operators provided by these two types of reactive expressions are adhesives that combine them to build data flow pipelines.

Flux and Mono have more than 500 operations, which can be roughly classified as:

• create operation;

• combined operation;

• switching operation;

• logic operation.

  • 1 create
       Flux<String> fruitFlux = Flux.just("Apple","Orange");

       fruitFlux.subscribe(System.out::println);

The lambda expression passed to the subscribe() method here is actually a java Util Consumer, a Subscriber used to create a reactive flow. After you call subscribe(), the data starts flowing. In this example, there is no intermediate operation, so the data flows directly from Flux to the Subscriber

To verify whether the predefined data flows through fruitFlux, we can write the following test code:

       StepVerifier.create(fruitFlux)
       .expectNext("Apple")
       .expectNext("Orange")
       .verifyComplete();

In this example, StepVerifier subscribes to fruitFlux and then asserts whether each data item in Flux matches the expected fruit name. Finally, it verifies that the whole fruitFlux completes normally after the Flux releases the "Strawberry".

You can also use the Java Stream from the array collection as the source of Flux.

      List<String> list = Lists.newArrayList();
        list.add("Apple");
        list.add("Orange");
        Flux<String> stringFlux = Flux.fromIterable(list);

For example, to create a Flux that publishes a value every second, you can use the static interval() method on Flux, as shown below:

   Flux<Long> take = Flux.interval(Duration.ofSeconds(1)).take(5);
   

The Flux created by the interval() method will publish the value from 0, and the subsequent entries will be incremented in turn. In addition, because the interval() method does not specify a maximum value, it may run forever. We can also use the take() method to limit the results to the first five entries

  • 2 type of combined reaction

Sometimes, we need to manipulate two types of reactions and merge them together in some way. Or, in other cases, we may need to split Flux into multiple reactive types

Consolidation:

  Flux<String> fruitFluxA = Flux.just("Apple","Orange");

  Flux<String> fruitFluxB = Flux.just("Banana","watermelon");
        
  fruitFluxA.mergeWith(fruitFluxB).subscribe(System.out::println);
 com.ckj.superlearn.superlearn.base.ReactorStrategy
16:03:07.343 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Apple
Orange
Banana
watermelon

Process finished with exit code 0

The mergeWith() method cannot guarantee the order of source fluxes perfectly, so we can consider using the zip() method

  Flux<String> fruitFluxA = Flux.just("Apple","Orange").delayElements(Duration.ofMillis(10));

  Flux<String> fruitFluxB = Flux.just("Banana","watermelon").delayElements(Duration.ofMillis(50));

  Flux<String> allFlux = fruitFluxA.mergeWith(fruitFluxB);

  allFlux.subscribe(x-> System.out.println("allFlux:"+x));

  Flux<Tuple2<String, String>> zip = Flux.zip(fruitFluxA, fruitFluxB);

  zip.subscribe(x-> System.out.println("zip:"+x));

  Thread.sleep(1000);

Console:

/com.ckj.superlearn.superlearn.base.ReactorStrategy
16:49:44.543 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
allFlux:Apple
allFlux:Orange
allFlux:Banana
zip:[Apple,Banana]
allFlux:watermelon
zip:[Orange,watermelon]

Process finished with exit code 0

  • 3 conversion and filtration of reactive flow

For a Flux with multiple data items, the skip operation will create a new Flux, which will first skip the specified number of data items, and then publish the remaining data items from the source Flux. The following test method shows how to use the skip() method:

  Flux<String> fruitFluxA = Flux.just("Apple","Orange","Banana","watermelon").skip(2);

    fruitFluxA.subscribe(x->{

        System.out.println(x);

        });
/com.ckj.superlearn.superlearn.base.ReactorStrategy
17:05:00.141 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Banana
watermelon

Process finished with exit code 0

The opposite is take()

 Flux<String> fruitFluxA = Flux.just("Apple","Orange","Banana","watermelon").take(2);

        fruitFluxA.subscribe(x->{

            System.out.println(x);

        });
 com.ckj.superlearn.superlearn.base.ReactorStrategy
17:20:59.483 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Apple
Orange

Process finished with exit code 0

filter() filtering effect

  Flux<String> fruitFluxA = Flux.just("Apple","Orange","Banana","watermelon").take(2);

  fruitFluxA.filter(x->x.equals("Apple")).subscribe(x->{

     System.out.println(x);

	 });

com.ckj.superlearn.superlearn.base.ReactorStrategy
17:24:03.242 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Apple

Process finished with exit code 0

How to use the flatMap() method and the subscribeOn() method

  Flux<String> fruitFluxA = Flux.just("Apple", "Orange", "Banana", "watermelon", "Apple", "Orange", "Banana",
                "watermelon", "Apple", "Orange", "Banana", "watermelon", "Apple", "Orange", "Banana", "watermelon");

   fruitFluxA.flatMap(Mono::just).map(String::toUpperCase).subscribeOn(Schedulers.parallel());

The advantage of using flatMap() and subscribeOn() is that we can split the work among multiple parallel threads, thereby increasing the throughput of the stream. Because the work is completed in parallel, it is impossible to guarantee which work is completed first, so the release order of data items in Flux is unknown

It's not easy to be original. If you think it's useful, please mercilessly give me a like and forward it. This will be the strongest driving force for me to continue to output high-quality articles.

Tags: flux

Posted by shan111 on Mon, 30 May 2022 10:17:35 +0530