Introduction to Python responsive class library RxPy

RxPy is a Python version of the very popular responsive framework Reactive X. in fact, these versions are the same, but the implementation of each language is different. Therefore, if you learn one of them, it is easy to use other responsive versions. I have heard of this framework before, and recently decided to study it.

Basic concepts

There are several core concepts in Reactive X. let's briefly introduce them first.

Observable and Observer

The first is observable and observer, which are observable objects and observers respectively. Observable can be understood as an asynchronous data source that sends a series of values. Observer is similar to a consumer. It needs to subscribe to observable before it can receive the transmitted value. It can be said that this group of concepts is a synthesis of the observer model and the producer consumer model in the design pattern.

Operator

Another very important concept is the operator. The operator acts on the Observable data stream and can perform various operations on it. More importantly, operators can also be chained together. Such chained function calls not only separate data from operations, but also make the code more readable. Once you master it, you will love it.

Single (single example)

In RxJava and its variants, there is a special concept called Single. It is an Observable that only emits the same value. In other words, it is a singleton. Of course, if you are familiar with Java and other languages, you must also be familiar with singletons.

Subject

The concept of Subject is very special. It is both Observable and Observer. Because of this feature, the Subject can subscribe to other Observable objects, and can also transmit objects to other observers. In some scenarios, Subject plays an important role.

Scheduler (scheduler)

By default, Reactive X only runs in the current thread. However, if necessary, the scheduler can be used to make Reactive X run in a multi-threaded environment. There are many schedulers and corresponding operators that can handle various requirements in multi-threaded scenarios.

Observer and Observable

Let's take a look at the simplest example. The results of the run will print these numbers in turn. Here, of is an operator, which can create a new Observable according to the given parameters. After creation, you can subscribe to Observable, and the three callback methods are executed at the corresponding time. Once the Observer subscribes to Observable, it will receive the values of subsequent Observable transmissions.

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

This example seems simple and useless. But when you understand some core concepts of Rx, you will understand how powerful this tool is. More importantly, Observable generates data and subscribes asynchronously. If you are familiar with it, you can use this feature to do many things.

Operator

Another very important concept in RxPy is the operator. It can even be said that the operator is the most important concept. Almost all functions can be realized by combining various operators. Mastering operators is the key to learning RxPy well. Operators can also be connected by pipe functions to form a complex operation chain.

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

There are a large number of operators in RxPy, which can complete a variety of functions. Let's take a brief look at some of the commonly used operators. If you are familiar with Java8's stream libraries or other functional programming libraries, you should be very familiar with these operators.

Create type operator

The first is to create Observable operators. Some common creation operators are listed.

Operator effect
just(n) Observable with only 1 value
repeated_value(v,n) Repeat Observable with value v n times
of(a,b,c,d) Observable with all parameters
empty() An empty Observable
from_iterable(iter) Create an Observable with iterable
generate(0, lambda x: x < 10, lambda x: x + 1) Generating Observable with initial values and loop conditions
interval(n) Observable for sending integer sequence at regular intervals of n seconds

Filter operator

The main function of filter operators is to filter and filter Observable.

Operator effect
debounce Filter by time interval, values within the range will be ignored
distinct Ignore duplicate values
elementAt Transmit only the value of the nth bit
filter Filter values by criteria
first/last Launch first / last value
skip Skip the first n values
take Only take the first n values

Conversion operator

Operator effect
flatMap Convert the values of multiple Observable and merge them into one Observable
groupBy Group values and return multiple Observable
map Map an Observable to another Observable
scan Apply the function to each value of Observable, and then return the following values

arithmetic operator

Operator effect
average Average
count Number
max Maximum
min minimum value
reduce Apply the function to each value and return the final calculated result
sum Summation

Subject

Subject is a special object, which is both Observer and Observable. However, this object is not commonly used, but it is still very useful for some purposes. So I still want to introduce it. The following code will only print the values that are transmitted after the subscription because the first value has been transmitted during the subscription.

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject is both Observer and Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

There are also several special subjects, which are introduced below.

ReplaySubject

ReplaySubject is a special Subject, which records all transmitted values, no matter when it is subscribed. So it can be used as a cache. ReplaySubject can also accept a bufferSize parameter to specify the latest data that can be cached. By default, it is all.

The following code is almost identical to the above code, but because ReplaySubject is used, all values will be printed. Of course, you can also try to put the subscription statements in other places to see if the output will change.

# ReplaySubject will cache all values. If parameters are specified, only the latest values will be cached
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

BehaviorSubject

BehaviorSubject is a special Subject, which only records the last emitted value. When creating it, you must specify an initial value, and all objects subscribing to it can receive this initial value. Of course, if the subscription is late, the initial value will also be overwritten by the later transmitted value. This should be noted.

# BehaviorSubject caches the last emitted value unless Observable is turned off
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

AsyncSubject

AsyncSubject is a special Subject. As its name suggests, it is an asynchronous Subject. It will only transmit data when the Observer is completed, and only the last data will be transmitted. So the following code will only output 4 If you comment out the last line co_ Call completed, then nothing will be output.

# AsyncSubject caches the last emitted value, and only launches after Observable is turned off
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

Scheduler

Although RxPy is an asynchronous framework, it still runs on a single thread by default. Therefore, if some operations are used that will prevent the thread from running, the program will get stuck. Of course, in these cases, we can use other schedulers to schedule tasks to ensure that the program can run efficiently.

The following example creates a ThreadPoolScheduler, which is a thread pool based scheduler. Subscribe for two Observable_ The on method specifies the scheduler, so they work with different threads.

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

If you have observed the API of each operator, you can find that most operators support the optional Scheduler parameter to specify a Scheduler for the operator. If a Scheduler is specified on the operator, it will be used preferentially; Secondly, the Scheduler specified on the subscribe method will be used; If none of the above is specified, the default Scheduler will be used.

Application scenarios

OK, after introducing some knowledge of Reactive X, let's take a look at how to use Reactive X. In many application scenarios, Reactive X can be used to abstract data processing and simplify concepts.

Prevent duplicate sending

In many cases, we need to control the occurrence interval of events. For example, a button is accidentally pressed several times, and we only want the button to take effect for the first time. In this case, you can use the debounce operator, which will filter Observable data, and data smaller than the specified time interval will be filtered out. The debounce operator waits for a period of time until the interval has elapsed before the last data is transmitted. If you want to filter the following data and send the first data, you need to use throttle_first operator.

The following code can better demonstrate this operator. Quickly press the Enter key to send data. Pay attention to the relationship between the keys and the data display. You can also set the throttle_ Replace the first operator with the debounce operator, and then see what happens to the output. You can also completely comment out the operators in the pipe, and then see what happens to the output.

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce operator, which can only be emitted outside the time interval

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

Operational data flow

If you need to operate on some data, there are also a lot of operators to meet your needs. Of course, these functions are not unique to Reactive X. if you know about the stream class library of Java 8, you will find that these two functions are almost the same.

The following is a simple example. Combine the two data sources and find all the even numbers.

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# Operational data flow
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

Or a simple example of using reduce to find the sum of integers from 1 to 100.

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))

Tags: Python Functional Programming

Posted by harley1387 on Mon, 30 May 2022 06:37:03 +0530