Secondary encapsulation of high performance concurrency framework for Disruptor

Disruptor is a java high-performance lock free concurrency processing framework. It is similar to BlockingQueue in JDK, but its processing speed is very fast!!! It is said that "one thread can process 600W orders per second" (anyway, the computer doesn't realize it).

The function of the Disruptor is very powerful, for example, the consumer blocks waiting; Producer consumer one to one, one to many, many to one, many to many; Building consumer serial / parallel processing chains, etc.

For specific conceptual models, please refer to: https://www.cnblogs.com/haiq/p/4112689.html

 

Here are the tools I encapsulated based on the Disruptor framework. The fluent coding style is adopted to simplify the call of the Disruptor.

package com.gravel.demo.test.disruptor;

import com.gravel.demo.test.disruptor.base.EventProducer;
import com.gravel.demo.test.disruptor.base.Publisher;
import com.gravel.demo.test.disruptor.base.PublisherBuilder;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description: Disruptor High performance asynchronous processing framework
 */
public class DisruptorTest {

    public static void main(String[] args) throws Exception {
        builderTest();
    }

    private static void builderTest() throws Exception {
        // Create multiple producer instances
        EventProducer<Domain> producer1 = new EventProducer<>("producer1");
        EventProducer<Domain> producer2 = new EventProducer<>("producer2");

        // Create multiple consumer instances
        DomainConsumer handler1 = new DomainConsumer("handler1");
        DomainConsumer handler2 = new DomainConsumer("handler2");
        DomainConsumer after1 = new DomainConsumer("after1");
        DomainConsumer after2 = new DomainConsumer("after2");
        DomainConsumer after3 = new DomainConsumer("after3");
        DomainConsumer then = new DomainConsumer("then");

        // Create message publisher
        final Publisher<Domain> publisher = PublisherBuilder.newBuilder()
                // Set thread factory
                // .threadFactory(r -> new Thread(r))
                // .threadFactory(new LimitedThreadFactory())
                // .threadFactory(Executors.defaultThreadFactory())
                // Set production type
                // .producerType(ProducerType.SINGLE)
                // .producerType(ProducerType.MULTI)
                // Set event factory
                // .eventFactory(new EventFactory())
                // Set waiting policy
                // .waitStrategy(new SleepingWaitStrategy())
                // .waitStrategy(new YieldingWaitStrategy())
                // Set publishing method
                // .publishStrategy(PublishStrategy.TRANSLATOR)
                // .publishStrategy(PublishStrategy.NORMAL)
                // Set the ringBuffer size
                // .ringSize(1024 * 8)
                // Set exception handler
                .exceptionHandler(new DomainErrorHandler<>())
                // To initialize the Disruptor, you must initialize it before configuring the producer and consumer.
                .disruptor()
                // Set single producer
                // .producer(producer1)
                // Configure individual consumers
                // .handler(handler1)
                // Configure multiple producers
                .producer(producer1, producer2)
                //  ======Set multiple workers or handlers processing chain start=======
                // .worker(handler1)
                .handler(handler1, handler2)
                .after(handler1).handler(after1)
                .after(handler2).handler(after2)
                .after(after1, after2).handler(after3)
                // .then(after3)
                //  ======Set multiple workers or handlers to process the chain end=======
                // start-up
                .build();

        long start = System.currentTimeMillis();
        try {

            for (int i = 0; i < 500; i++) {
                publisher
                        // Continuous release
                        //.publish(new Domain(String.valueOf("a" + i), "init"))
                        .publish(new Domain(String.valueOf(i), "init"));
            }
        } finally {
            long sleep = 200;
            Thread.sleep(sleep);
            System.out.println("used time: " + (System.currentTimeMillis() - start - sleep) + "ms");

            publisher.shutdown();
        }
    }
}

  

From the above code, the tool class entry we encapsulated is Publisher. It can configure a series of parameters required by the Disruptor, such as ThreadFactory, EventFactory, WaitStrategy, message Producer, and Handler/Worker.

The message producer and consumer are the key to Publisher, so they will be described later. First look at the other PublisherBuilder classes.

package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class PublisherBuilder {
    // Publish default parameters
    private static final int RING_SIZE = 1024 * 8;
    private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();
    private static final WaitStrategy WAIT_STRATEGY = new SleepingWaitStrategy();
    private static final com.lmax.disruptor.EventFactory EVENT_FACTORY = new EventFactory();
    private static final ProducerType PRODUCER_TYPE = ProducerType.SINGLE;
    private static final PublishStrategy PUBLISH_STRATEGY = PublishStrategy.TRANSLATOR;

    private com.lmax.disruptor.EventFactory eventFactory;
    private ThreadFactory threadFactory;
    private WaitStrategy waitStrategy;
    private ExceptionHandler exceptionHandler;
    private ProducerType type;
    private PublishStrategy publishStrategy;
    private EventPublisher publisher;
    private int ringSize;

    public static PublisherBuilder newBuilder() {
        return new PublisherBuilder();
    }
    
    /**
     * Specify the ringBuffer size, preferably to the nth power of 2. Default 1024*8
     *
     * @param ringSize
     * @return
     */
    public PublisherBuilder ringSize(int ringSize) {
        this.ringSize = ringSize;
        return this;
    }

    /**
     * Specify eventfactory, default eventfactory
     *
     * @param eventFactory
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder eventFactory(com.lmax.disruptor.EventFactory eventFactory) {
        this.eventFactory = eventFactory;
        return this;
    }

    /**
     * Specify ThreadFactory, default executors Defaultthreadfactory()
     *
     * @param threadFactory
     * @return
     */
    public PublisherBuilder threadFactory(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        return this;
    }

    /**
     * Specify the waiting policy. The default is SleepingWaitStrategy
     *
     * @param waitStrategy
     * @return
     */
    public PublisherBuilder waitStrategy(WaitStrategy waitStrategy) {
        this.waitStrategy = waitStrategy;
        return this;
    }

    public PublisherBuilder publishStrategy(PublishStrategy publishStrategy) {
        this.publishStrategy = publishStrategy;
        return this;
    }

    /**
     * Initializing the disruptor
     *
     * @return
     */
    public PublisherBuilder disruptor() {
        this.eventFactory = this.eventFactory == null ? EVENT_FACTORY : this.eventFactory;
        this.threadFactory = this.threadFactory == null ? THREAD_FACTORY : this.threadFactory;
        this.waitStrategy = this.waitStrategy == null ? WAIT_STRATEGY : this.waitStrategy;
        this.ringSize = this.ringSize <= 0 ? RING_SIZE : this.ringSize;
        this.type = this.type == null ? PRODUCER_TYPE : this.type;
        this.publishStrategy = this.publishStrategy == null ? PUBLISH_STRATEGY : this.publishStrategy;

        publisher = new EventPublisher<>(eventFactory, ringSize, threadFactory, waitStrategy, exceptionHandler, type, publishStrategy);
        return this;
    }

    /**
     * Configure producer
     * @param producers
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder producer(EventProducer<T> ...producers) {
        if (isInit()) {
            this.publisher.producer(producers);
        }
        return this;
    }

    /**
     * eventHandler: Each event event can be handled by all handler s
     *
     * @param eventHandlers
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder handler(EventHandler<Event<T>>... eventHandlers) {
        if (isInit()) {
            this.publisher.eventHandler(eventHandlers);
        }
        return this;
    }

    /**
     * workHandler: Each event event can only be processed by one work
     *
     * @param workHandlers
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder worker(WorkHandler<Event<T>>... workHandlers) {
        if (isInit()) {
            this.publisher.workHandler(workHandlers);
        }
        return this;
    }

    /**
     * handler Or after work
     *
     * @param thenEventHandlers
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder then(EventHandler<Event<T>>... thenEventHandlers) {
        if (isInit()) {
            this.publisher.thenHandler(thenEventHandlers);
        }
        return this;
    }

    /**
     * Specify the handler in sequence. It cannot be called directly after the worker method
     *
     * @param afterEventHandlers
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder after(EventHandler<Event<T>>... afterEventHandlers) {
        if (isInit()) {
            this.publisher.afterHandler(afterEventHandlers);
        }
        return this;
    }

    /**
     * Specify producerType
     *
     * @param type
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder producerType(ProducerType type) {
        this.type = type;
        return this;
    }

    /**
     * Exception handling class
     *
     * @param exceptionHandler
     * @param <T>
     * @return
     */
    public <T> PublisherBuilder exceptionHandler(ExceptionHandler<T> exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    private boolean isInit() {
        if (this.publisher == null) {
            throw new IllegalStateException("execute disruptor() function before set handlers or workers.");
        }
        return true;
    }

    public <T> Publisher<T> build() {
        return this.publisher.start();
    }
}

  

The key code is the disruptor() method, that is, the disruptor instantiation entry. Because the eventHandler and workHandler must be specified after the disruptor instance, the disruptor Before start().

So we must call the tool class as follows:

PublisherBuilder.newBuilder()/*.Various configurations*/.disruptor()/*Specify producer and handler/worker*/.build();

  

Take a look at the eventpublisher class in PublisherBuilder, which implements the Publisher interface, as follows:

Publisher.java

package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public interface Publisher<T> {

    Publisher<T> start();

    Publisher<T> publish(T t);

    Publisher<T> shutdown();
}

  

EventPublisher.java

package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class EventPublisher<T> implements Publisher<T> {

    private RingBuffer<Event<T>> ringBuffer;
    private Disruptor<Event<T>> disruptor;
    private EventHandlerGroup<Event<T>> handlerGroup;
    private PublisherState state;
    private PublishStrategy publishStrategy;
    private EventTranslator<T> translator;
    private Producer<T>[] producers;

    private enum PublisherState {
        START, SHUTDOWN
    }

    public EventPublisher(com.lmax.disruptor.EventFactory<Event<T>> factory,
                          int ringSize,
                          ThreadFactory threadFactory,
                          WaitStrategy waitStrategy,
                          ExceptionHandler<Event<T>> exceptionHandler,
                          ProducerType type,
                          PublishStrategy publishStrategy) {

        this.disruptor = new Disruptor<>(factory, ringSize, threadFactory,
                type, waitStrategy);

        if (!Objects.isNull(exceptionHandler)) {
            this.disruptor.setDefaultExceptionHandler(exceptionHandler);
        }

        this.ringBuffer = disruptor.getRingBuffer();

        this.publishStrategy = publishStrategy;
        if (publishStrategy == PublishStrategy.TRANSLATOR) {
            translator = new EventTranslator<>();
        }

        this.state = PublisherState.SHUTDOWN;
    }

    public EventPublisher<T> producer(EventProducer<T> ...producers) {
        if (!Objects.isNull(producers) && producers.length > 0) {
            for (EventProducer<T> producer : producers) {
                producer.setRingBuffer(this.ringBuffer).setTranslator(this.translator);
            }
            this.producers = producers;
        }
        return this;
    }

    public EventPublisher<T> eventHandler(EventHandler<Event<T>>... eventHandlers) {
        if (this.handlerGroup != null) {
            this.handlerGroup.handleEventsWith(eventHandlers);
        } else {
            this.handlerGroup = disruptor.handleEventsWith(eventHandlers);
        }
        return this;
    }

    public EventPublisher<T> workHandler(WorkHandler<Event<T>>... workHandlers) {
        if (this.handlerGroup != null) {
            this.handlerGroup.handleEventsWithWorkerPool(workHandlers);
        } else {
            this.handlerGroup = disruptor.handleEventsWithWorkerPool(workHandlers);
        }
        return this;
    }

    public EventPublisher<T> thenHandler(EventHandler<Event<T>>... thenHandlers) {
        this.handlerGroup.then(thenHandlers);
        return this;
    }

    public EventPublisher<T> afterHandler(EventHandler<Event<T>>... afterHandlers) {
        this.handlerGroup = this.disruptor.after(afterHandlers);
        return this;
    }

    public Disruptor<Event<T>> getDisruptor() {
        return disruptor;
    }

    @Override
    public EventPublisher<T> start() {
        this.disruptor.start();
        this.state = PublisherState.START;
        return this;
    }

    @Override
    public EventPublisher<T> shutdown() {
        this.disruptor.shutdown();
        this.state = PublisherState.SHUTDOWN;
        return this;
    }

    @Override
    public EventPublisher<T> publish(T t) {
        if (!isStarted()) {
            throw new IllegalStateException("publisher not start..");
        }
        if (producers == null || producers.length <= 0) {
            throw new IllegalStateException("producer must be specify.");
        }

        for (Producer<T> producer : producers) {
            producer.produce(t);
        }
        return this;
    }

    private boolean isStarted() {
        return this.state == PublisherState.START;
    }
}

  

Among them, the more important code is the designated producer and consumer chain (!!! Thief easy to use!!!). Dynamic parameters indicate that producers and consumers can have one-to-one, one to many, many to one, and many to many relationships.

First, let's talk about publisher's minimum data unit Event

package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class Event<T> {

    private T data;

    public void set(T data) {
        this.data = data;
    }

    public T get() {
        return data;
    }
}

  

Let's take a look at the Producer's code. EventProducer inherits from Producer.

Producer.java

package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public interface Producer<T> {
    void produce(T t);
}

  

EventProducer.java

package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.RingBuffer;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public class EventProducer<T> implements Producer<T> {
    private String name;
    private RingBuffer<Event<T>> ringBuffer;
    private EventTranslator<T> translator;
    public EventProducer(String name) {
        this.name = name;
    }

    public EventProducer<T> setRingBuffer(RingBuffer<Event<T>> ringBuffer) {
        this.ringBuffer = ringBuffer;
        return this;
    }

    public EventProducer<T> setTranslator(EventTranslator<T> translator) {
        this.translator = translator;
        return this;
    }

    @Override
    public void produce(T t) {
        System.out.println(String.format("producer message by %s, data: %s", name, t));
        if (translator != null) {
            ringBuffer.publishEvent(translator, t);
        } else {
            long seq = ringBuffer.next();
            try {
                Event<T> event = ringBuffer.get(seq);
                event.set(t);
            } finally {
                // Disruptor requires ringbuffer The subtext that publish must be called is to call publish if an exception occurs
                // If a requested sequence is not submitted, subsequent publishing operations or other producer s will be blocked
                ringBuffer.publish(seq);
            }
        }
    }
}

  

Where, ringBuffer must be a member of the disruptor instance. Therefore, you must traverse the setting of ringBuffer when specifying the producer. The translator object is also in the singleton mode. I don't know whether the inter thread data coverage problem will occur. If the thread is unsafe, each producer initializes a translator object. From the produce() method, we can see that the disruptor publishes messages through the ringBuffer. There are two publishing methods: one is through translator, and the other is through sequence. Pay attention to the tips in finally.

Let's take a look at the code of EventTranslator, which inherits from EventTranslator vararg. The translateTo() method populates the instance created by EventFactory with data.

package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.EventTranslatorVararg;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public class EventTranslator<T> implements EventTranslatorVararg<Event<T>> {

    @Override
    public void translateTo(Event<T> event, long sequence, Object... args) {
        event.set(((T)(args[0])));
    }
}

  

Then consumers. In order to handle the results of worker and handler uniformly, all consumers must implement the EventHandler/WorkHandler of the Disruptor. There is also a custom Consumer interface.

Consumer interface

package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public interface Consumer<T> {
    void consume(T data, Boolean over) throws Exception;
}

  

EventConsumer.java

package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public abstract class EventConsumer<T> implements EventHandler<Event<T>>, Consumer<T>, WorkHandler<Event<T>> {
    String name;

    public EventConsumer(String name) {
        this.name = name;
    }

    @Override
    public void onEvent(Event<T> event, long seq, boolean over) throws Exception {
        consume(event.get(), over);
    }

    @Override
    public void onEvent(Event<T> event) throws Exception {
        consume(event.get(), null);
    }

    protected String getName() {
        return name;
    }
}

  

The abstract class EventConsumer uniformly handles the onEvent methods of worker s and handler s. The specific consumption operation needs to be implemented by the user. Such as DomainConsumer in demo.

Here we go. The key producer and consumer codes are encapsulated.

Now look at the non core code

Error handling class: ErrorHandler implements ExceptionHandler

package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.ExceptionHandler;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public abstract class ErrorHandler<T> implements ExceptionHandler<Event<T>> {
    @Override
    public void handleEventException(Throwable throwable, long l, Event<T> t) {
        handle(t.get(), throwable);
    }

    @Override
    public void handleOnStartException(Throwable throwable) {
    }

    @Override
    public void handleOnShutdownException(Throwable throwable) {
    }

    protected abstract void handle(T object, Throwable throwable);
}

  

EventFactory event factory class, which implements the EeventFactory of the Disruptor

package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class EventFactory implements com.lmax.disruptor.EventFactory {

    @Override
    public Event newInstance() {
        return new Event();
    }
}

  

ringBuffer sending method

package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public enum  PublishStrategy {
    NORMAL, TRANSLATOR
}

  

At this point, the packaging of the Disruptor is over. Put the demo class

Entity Domain

package com.gravel.demo.test.disruptor;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public class Domain {

    private String id;
    private String value;

    public Domain(String id, String value) {
        this.id = id;
        this.value = value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Domain{" +
                "id='" + id + '\'' +
                ", value='" + value + '\'' +
                '}';
    }
}

  

DomainConsumer

package com.gravel.demo.test.disruptor;

import com.gravel.demo.test.disruptor.base.EventConsumer;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class DomainConsumer extends EventConsumer<Domain> {

    public DomainConsumer() {
        this("FirstDisruptorHandler" + (Math.random() * 100));
    }

    public DomainConsumer(String name) {
        super(name);
    }

    @Override
    public void consume(Domain data, Boolean over) throws Exception {
        // errorHandler test: worker throws exceptions and handler handles them normally
        /*if (Objects.isNull(over)) {
            throw new RuntimeException(getName() + " handle exception.");
        }*/

        System.out.println(String.format("received by %s, data: %s, is over?%s", getName(), data.toString(), over));
        data.setValue(getName());
    }
}

  

ConsumerErrorHandler

package com.gravel.demo.test.disruptor;

import com.gravel.demo.test.disruptor.base.ErrorHandler;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class DomainErrorHandler<T> extends ErrorHandler<T> {
    @Override
    protected void handle(T object, Throwable throwable) {
        System.err.println(String.format("received a error message: %s, data: %s, ", throwable.getMessage(), object));
        // If no exception is thrown, the data will continue to flow to the next handler
        throw new IllegalStateException("interrupted.");
    }
}

  

LimitThreadFactory

package com.gravel.demo.test.disruptor;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description: Single thread
 */
public class LimitedThreadFactory implements ThreadFactory {
    private final AtomicInteger count = new AtomicInteger(0);

    public Thread newThread(Runnable r) {
        if (count.compareAndSet(0, 1)) {
            return new Thread(r);
        } else {
            throw new IllegalStateException("Created more that one thread");
        }
    }
}

 

demo test (only one piece of consumption data is tested)

Single production and single consumption (the calling results of worker and handler are consistent)

.producer(producer1)
.handler(handler1)

  

Call result

producer message by producer1, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?true
used time: 19ms

  

Single producer and multiple consumers (hanlder type, set diamond call chain.).

.producer(producer1)
.handler(handler1, handler2)
.after(handler1).handler(after1)
.after(handler2).handler(after2)
.after(after1, after2).handler(after3)

  

Call results (event data will be consumed by each handler.) The

producer message by producer1, data: Domain{id='0', value='init'}
received by handler2, data: Domain{id='0', value='init'}, is over?true
received by handler1, data: Domain{id='0', value='init'}, is over?true
received by after2, data: Domain{id='0', value='handler1'}, is over?true
received by after1, data: Domain{id='0', value='handler1'}, is over?true
received by after3, data: Domain{id='0', value='after1'}, is over?true
used time: 19ms

  

Single thread, single producer and multiple consumers (after cannot be set directly for worker type and work type).

.producer(producer1)
.worker(handler1, handler2)

   

Call result (comparing with handler, it can be seen that event data will only be consumed by one worker.)

producer message by producer1, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?null
used time: 21ms

  

Multi production order consumption (work and handler are consistent)

.handler(handler1)
.producer(producer1, producer2)

  

Running results (data overwritten)

producer message by producer1, data: Domain{id='0', value='init'}
producer message by producer2, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?true
received by handler1, data: Domain{id='0', value='handler1'}, is over?true
used time: 18ms

  

Multi production and multi consumption (mixed use of worker and handler)

.producer(producer1, producer2)
.worker(after1, after2)
.handler(handler1, handler2)

   

Call result

producer message by producer1, data: Domain{id='0', value='init'}
producer message by producer2, data: Domain{id='0', value='init'}
received by after2, data: Domain{id='0', value='init'}, is over?null
received by after1, data: Domain{id='0', value='after2'}, is over?null
received by handler2, data: Domain{id='0', value='after1'}, is over?false
received by handler1, data: Domain{id='0', value='after1'}, is over?false
received by handler2, data: Domain{id='0', value='handler2'}, is over?true
received by handler1, data: Domain{id='0', value='handler1'}, is over?true
used time: 26ms

  

Tags: Java

Posted by -Karl- on Mon, 30 May 2022 00:21:14 +0530