SpringBatch from getting started to mastering -3- parallel processing

1. background

Many batch processing problems can be solved by single thread and single process jobs, so it is best to check whether these jobs meet your needs before considering more complex implementations. To measure the performance of a real job, first look at whether the simplest implementation meets your needs. Even with standard hardware, you can read and write hundreds of megabytes of files in a minute.

2. parallel processing

Spring Batch provides a series of options, which are described in this chapter, although some functions are described elsewhere. At a higher level, there are two parallel processing modes:

  • Single process, multi thread
  • Multiprocess

These can also be divided into the following categories:

  • Multithreaded steps (single process)
  • Parallel steps (single process)
  • step based remote blocking (multi process)
  • Partition based on step (single or multiple processes)

2.1 multithreaded step

The easiest way to start parallel processing is to add TaskExecutor to the step configuration.

When using java configuration, you can add TaskExecutor to the step, as shown in the following example:

@Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    @Bean
    public Step sampleMutliStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleMutliStep")
                .<DemoUser, DemoUser>chunk(2)
                .reader(new DemoReader(10))
                .writer(new DemoWriter<>())
                .taskExecutor(taskExecutor)
                .throttleLimit(5)
                .build();
    }

2.2 parallel step

As long as the application logic requiring parallelization can be divided into different responsibilities and assigned to each step, it can be parallelized in a single process. Parallel step execution is easy to configure and use.

When using java configuration, it is very simple to execute steps (step1, step2) in parallel with step3, as shown in the following example:

@Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(splitFlow())
                .next(step4())
                .build()        //builds FlowJobBuilder instance
                .build();       //builds Job instance
    }

    @Bean
    public Flow splitFlow() {
        return new FlowBuilder<SimpleFlow>("splitFlow")
                .split(paralTaskExecutor())
                .add(flow1(), flow2())
                .build();
    }

    @Bean
    public Flow flow1() {
        return new FlowBuilder<SimpleFlow>("flow1")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Flow flow2() {
        return new FlowBuilder<SimpleFlow>("flow2")
                .start(step3())
                .build();
    }

2.3 remote blocking

In remote blocking, step processing is divided into multiple processes and communicated with each other through some middleware. The following figure shows this mode:

The manager component is a single process, and the Worker is multiple remote processes. This pattern works best if the manager is not a bottleneck, so the cost of processing must be higher than the cost of reading items (as is often the case in practice).

manager is an implementation of the Spring batch step. ItemWriter is replaced by a generic version that knows how to send item blocks as messages to the middleware. Staff are standard listeners using any middleware (for example, for JMS, they will be the MessageListener Implementation). Their role is to use standard ItemWriter or ItemProcessor plus ItemWriter to process item blocks through the ChunkProcessor interface. One of the advantages of using this mode is that the reader, processor, and writer components are out of the box (the same components used when performing steps locally). Projects are divided dynamically and work is shared through middleware. Therefore, if listeners are enthusiastic consumers, load balancing is automatic. Middleware must be persistent, with guaranteed delivery, and each message has a consumer. JMS is an obvious candidate, but there are other options (such as JavaSpace) in the grid computing and shared memory product space.

Further, you can externalize block processing by using the ChunkMessageChannelItemWriter (provided by Spring Batch Integration), which sends out items and collects results. After sending, Spring Batch continues the process of reading and grouping items without waiting for results. Instead, the ChunkMessageChannelItemWriter is responsible for collecting the results and integrating them back into the Spring Batch process.

With Spring Integration, you have complete control over the concurrency of processes (for example, by using QueueChannel instead of DirectChannel). In addition, by relying on Spring Integration's rich collection of channel adapters (such as JMS and AMQP), you can distribute blocks of batch jobs to external systems for processing.

A simple job with steps to be partitioned remotely may have a configuration similar to the following:

Starting with version 4.1, Spring Batch Integration has introduced the @EnableBatchIntegration annotation, which can be used to simplify remote blocking settings. This annotation provides two bean s that can be automatically connected in the application context:

RemoteChunkingManagerStepBuilderFactory: for configuration manager steps

RemoteChunkingWorkerBuilder: used to configure the remote worker integration flow

These API s are responsible for configuring many components, as shown in the following figure:

  • The code is as follows: take a look at the producer remoteChunkManager
    @Autowired
    private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
    @Bean
    public Job remoteChunkingJob() {
        return this.jobBuilderFactory.get("remoteChunkingJob")
                .start(remoteChunkManagerStep()).build();
    }

    @Bean
    public DirectChannel mangerRequests() {
        return new DirectChannel();
    }
    
    @Bean
    public IntegrationFlow managerOutboundFlow() {
        return IntegrationFlows.from(mangerRequests())
                .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests"))
                .get();
    }

    @Bean
    public QueueChannel managerReplies() {
        return new QueueChannel();
    }
    @Bean
    public IntegrationFlow managerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"replies")
                )
                .channel(managerReplies()).get();
    }

    @Bean
    public TaskletStep remoteChunkManagerStep() {
        return this.managerStepBuilderFactory.get("remoteChunkManagerStep")
                .<Integer, Integer>chunk(3).reader(itemReader())
                .outputChannel(mangerRequests())
                .inputChannel(managerReplies()).build();
    }

    @Bean
    public ListItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
    }

You can see. The Manager needs to define a Reader. Define the message channel to send and the message channel to receive.

Only the reader is defined. What did you send out? What was received?

The addition of ChunkMessageChannelItemWriter in remotechunkingmanagerstepbuilder\build() is equivalent to the internal splicing of completed steps.

What did you send out?

ChunkRequest sent

  • Take a look at the consumer remoteChunkWorker

    @Autowired
        private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Bean
        public DirectChannel workerRequests() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow workerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
            return IntegrationFlows
                    .from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"requests"))
                    .channel(workerRequests()).get();
        }
        @Bean
        public DirectChannel workerReplies() {
            return new DirectChannel();
        }
    
    
    
        @Bean
        public IntegrationFlow workerOutboundFlow() {
            return IntegrationFlows.from(workerReplies())
                    .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies"))
                    .get();
        }
    
        @Bean
        public ItemProcessor<Integer, Integer> itemProcessor() {
            return item -> {
                System.out.println("processing item " + item);
                return item;
            };
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return items -> {
                for (Integer item : items) {
                    System.out.println("writing item " + item);
                }
            };
        }
    
        @Bean
        public IntegrationFlow workerIntegrationFlow() {
            return this.remoteChunkingWorkerBuilder
                    .itemProcessor(itemProcessor())
                    .itemWriter(itemWriter())
                    .inputChannel(workerRequests())
                    .outputChannel(workerReplies()).build();
        }

    How is the received message handled?

    Execute the subsequent process of process. Return to ChunResponse after execution

    And send the response to the queue. Send back to the producer. How do producers handle it

    The producer receives messages from replychannel. The message is then updated to memory. Operation. Memory operation entity is LocalState

  • Read it:

    • On the Manager side, RemoteChunkingManagerStepBuilderFactory allows you to configure Manager steps by declaring the following:

      The reader reads the item and sends it to the Worker

      Output channel ("requests") for sending requests to workers

      Input channel ("replies") to receive Worker replies

      ChunkMessageChannelItemWriter and MessagingTemplate do not need to be explicitly configured (they can still be explicitly configured if necessary)

    • On the Worker side, RemoteChunkingWorkerBuilder allows you to configure workers as:

      Listen for requests ("requests") sent by the manager on the input channel

      Call the handleChunk method of the ChunkProcessorChunkHandler for each request using the configured ItemProcessor and ItemWriter

      Send replies ("replies") on the output channel to the Manager

      SimpleChunkProcessor and ChunkProcessorChunkHandler do not need to be explicitly configured (they can be explicitly configured if necessary).

The overall process can be summarized as follows:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
    @Configuration
    public static class ManagerConfiguration {
        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }
        // Middleware beans setup omitted
    }
    @Configuration
    public static class WorkerConfiguration {
        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;
        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }
        // Middleware beans setup omitted
    }
}

2.4 remote zoning

Remote partitions control the overall process through the partitioner. The real execution (including the reader, here is a complete step function) is different from the remote blocking module.

On the other hand, remote partitioning is useful when the bottleneck is not project processing but related I/O. Using remote partitioning, you can assign work to staff who perform the full Spring batch steps. Therefore, each worker has its own ItemReader, ItemProcessor, and ItemWriter. For this purpose, Spring Batch Integration provides a MessageChannelPartitionHandler.

This implementation of the PartitionHandler interface uses the MessageChannel instance to send instructions to remote workers and receive their responses. This provides a good abstraction for the transport used to communicate with remote workers (such as rabbitmq in JMS and AMQP examples).

The section on remote partitioning in the scalability chapter outlines the concepts and components required to configure remote partitioning, and shows an example of partitioning in a separate local execution thread using the default TaskExecutionPartitionHandler. For remote partitions to multiple JVM s, two additional components are required:

  • Remote fabric or grid environment
  • PartitionHandler implementation supporting required remoting structure or grid environment

Similar to remote chunking, JMS/AMQP can be used as a "remoting structure". In this case, the MessageChannelPartitionHandler instance is used as the PartitionHandler implementation, as described above. The following example assumes that a partition job exists and focuses on MessageChannelPartitionHandler and JMS/AMQP configuration:

  • The codes are as follows:

    • Look at the manager code
        @Bean
        public DirectChannel managerRequests() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow managerOutboundFlow() {
            return IntegrationFlows.from(managerRequests())
                    .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests"))
                    .get();
        }
        @Bean
        public DirectChannel managerReplies() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow managerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
            return IntegrationFlows
                    .from(Amqp.inboundAdapter(
                            rabbitmqConnectionFactory,"replies"))
                    .channel(managerReplies()).get();
        }
        @Bean
        public Step managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                    .partitioner("workerStep", new SimplePartitioner())
                    .gridSize(GRID_SIZE)
                    .outputChannel(managerRequests())
                    .inputChannel(managerReplies()).build();
        }
        @Bean
        public Job remotePartitioningJob() {
            return this.jobBuilderFactory
                    .get("remotePartitioningJob")
                    .start(managerStep())
                    .build();
        }
    • Look at the worker code
        @Bean
        public DirectChannel workerRequests() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow workerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
            return IntegrationFlows
                    .from(Amqp.inboundAdapter(
                            rabbitmqConnectionFactory,"requests"))
                    .channel(workerRequests()).get();
        }
        @Bean
        public DirectChannel workerReplies() {
            return new DirectChannel();
        }
        @Bean
        public IntegrationFlow workerOutboundFlow() {
            return IntegrationFlows.from(workerReplies())
                    .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies"))
                    .get();
        }
        @Bean
        public Step workerStep() {
            return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(workerRequests())
                    .outputChannel(workerReplies())
                    .tasklet(tasklet(null)).build();
        }
        @Bean
        @StepScope
        public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
            return (contribution, chunkContext) -> {
                System.out.println("processing " + partition);
                return RepeatStatus.FINISHED;
            };
        }

The @EnableBatchIntegration annotation that can be used to simplify remote partition settings. This annotation provides two bean s that are useful for remote partitioning:

RemotePartitioningManagerStepBuilderFactory: used to configure Manager steps

RemotePartitioningWorkerStepBuilderFactory: used to configure Worker steps

These API s are responsible for configuring many components, as shown in the following figure: (using db polling mode)

Remote Partitioning Configuration (with job repository polling)

Remote Partitioning Configuration (with replies aggregation)

On the Manager side, RemotePartitioningManagerStepBuilderFactory allows you to configure the Manager steps by declaring the following:

  • Partitioner for partitioning data
  • The output channel that sends the request to the Worker ("outgoing request")
  • Input channel to receive Worker replies ("incoming replies") (when configuring reply aggregation)
  • Polling interval and timeout parameters (when configuring job repository polling)

MessageChannelPartitionHandler and MessagingTemplate do not need to be explicitly configured (they can still be explicitly configured if necessary).

On the worker side, RemotePartitioningWorkersStepBuilderFactory allows you to configure workers as:

  • Listen for requests sent by the manager on the input channel ("incoming requests")
  • Call the handle method of StepExecutionRequestHandler for each request
  • =Send the reply on the output channel ("outgoing reply") to the manager

StepExecutionRequestHandler does not need to be explicitly configured (it can be explicitly configured if necessary).

The following example shows how to use these API s:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
    @Configuration
    public static class ManagerConfiguration {
        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }
        // Middleware beans setup omitted
    }
    @Configuration
    public static class WorkerConfiguration {
        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }
        // Middleware beans setup omitted
    }
}

Tags: Java Spring Spring Boot

Posted by watthehell on Mon, 30 May 2022 19:37:17 +0530