Java 9 – Reactive stream

Trong nhiều ứng dụng, dữ liệu không được truy xuất từ một thiết bị lưu trữ cố định, nhưng đúng hơn, được xử lý  gần thời gian thực, với những người dùng hoặc các hệ thống khác đang đưa các thông tin một cách nhanh chóng vào trong hệ thống. Hầu hết thời gian việc đưa các thông tin này được thực hiện bất đồng bộ, nơi mà chúng ta không biết trước được thời điểm dữ liệu sẽ xuất hiện. Để có thể tạo điều kiện cho kiểu xử lý dữ liệu bất đồng bộ này, chúng ta phải nghĩ lại mô hình cũ dựa trên Polling và thay vào đó, sử dụng một phương pháp nhẹ hơn, hợp lý hơn.

Publisher, Subscriber, và Subscription

Thay vì có một kiểu xử lý dữ liệu Client và Server, nơi Client yêu cầu dữ liệu từ Server và Server trả về, có thể chạy bất đồng bộ cho Client với dữ liệu được yêu cầu, thay vào đó chúng ta sử dụng cơ chế publish-subscribe: Một Subscribler thông báo cho Publisher rằng nó sẵn sàng chấp nhận số phần tử nhất định (yêu cầu số phần tử nhất định), và nếu phần tử có sẵn, Publisher đẩy số phần tử tối đa phải nhận cho Subscriber. Điều quan trọng cần lưu ý rằng đây là một trao đổi 2 chiều, nới Subscriber thông báo cho Publisher có bao nhiêu phần tử nó sẵn sàng xử lý và Publisher đẩy số phần tử đó cho Subscriber.

Quá trình hạn chế số phần tử mà Subscriber sẵn sàng thu nhận (được đánh giá bởi chính Subscriber) được gọi là backpressure, và là cần thiết để giúp ngăn chặn cho Subscriber không bị quá tải (đẩy thêm nhiều phần tử hơn mà Subscriber có thể xử lý). Cách hoạt động này được miêu tả như hình bên dưới đây:

Đây là trao đổi hai chiều giữa Publisher và Subscriber được gọi là Subscription. Subscription này liên kết một Publisher đơn với một Subscriber đơn (liên kết một – một) và có thể là phát đơn hướng hoặc đa hướng.

Hơn nữa, một Publisher có thể có nhiều Subscriber đăng ký với nó, nhưng một Subscriber chỉ có thể đăng ký với một nhà sản suất (Publisher). Một Publisher có thể có nhiều Subscriber, nhưng một Subscriber có thể đăng ký nhiều nhất với một Publisher.

Khi một Subscriber đăng ký với một Publisher, Publisher thông báo tới Subscriber của Subscription đã được tạo, cho phép Subscriber lưu trữ một tham chiếu đến Subscription (nếu muốn). Khi quá trình thông báo này được hoàn tất, Subscriber có thể thông báo cho Publisher rằng nó đã sẵn sàng nhận một số n phần tử.

Khi Publisher có sẵn n phần tử, sau đó gửi nhiều nhất n phần tử cho Subscriber. Nếu một lỗi xảy ra trong Publisher, nó sẽ báo hiệu lỗi cho Subscriber. Nếu Publisher kết thúc hoàn toàn việc gửi dữ liệu, nó sẽ báo hiệu cho Subscriber rằng nó đã hoàn thành. Nếu Subscriber được thông báo rằng một lỗi xảy ra hoặc Publisher đã hoàn thành, Subscription cân nhắc hủy hoặc không tương tác thêm giữa Subscriber và Publisher (hoặc subscription) sẽ diễn ra. Quy trình làm việc của Subscription này được miêu tả như hình bên dưới đây

Điều quan trọng cần chú ý rằng về mặt lý thuyết có 2 cách tiếp cận để truyền tải dữ liệu cho Subscriber: (1) Subscription nắm giữ các phần tử hoặc (2) Publisher nắm giữ các phần tử. Ở trường hợp đầu tiên, Publisher đẩy các phần tử cho Subcription khi chúng đã sẵn sàng, sau đó, Subscriber yêu cầu n phần tử, Subscription cung cấp n phần tử hoặc ít hơn đã được đưa cho Publisher trước đấy. Điều này có thể được sử dụng khi Publisher quản lý các phần tử trong hàng đợi, như là các HTTP Request đến. Trong trường hợp thứ hai, Subscriber chuyển tiếp các yêu cầu cho Publisher, điều này đẩy n phần tử hoặc ít hơn cho Subscription, từ đó đẩy các phần tử tương tự cho Subscription. Kịch bản này có thể phù hợp hơn cho các trường hợp các sản phẩm được sinh ra khi cần, chẳng hạn sinh ra các số nguyên tố.

Cũng cần phải lưu ý rằng các phần tử không cần thiết phải xuất hiện trước khi một yêu cầu được tạo. Nếu một Subscriber tạo một yêu cầu cho n phần tử và không có phần tử nào có sẵn, Subscriber sẽ đợi cho tới khi ít nhất một phần tử sẵn sàng và được đẩy cho Subscriber. Nếu có i phần tử sẵn sàng thì Subsciber tạo một yêu cầu n phần tử, trong đó i nhỏ hơn n, và i phần tử được đẩy cho Subscriber. Khi có thêm nhiều j phần tử sẵn sàng, n-j phần tử của j phần tử cũng được đẩy cho Subscriber cho tới khi số lượng n của tổng phần tử đã được đẩy cho Subscriber (n = i+j), hoặc Subscriber đã yêu cầu thêm m phần tử, trong trường hợp này, toàn bộ sộ j phần tử có thể được đẩy cho Subscriber miễn là i + j nhỏ hơn hoặc bằng m + n. Số phần tử mà Subscriber có thể chấp nhận tại bất cứ thời điểm nào (có thể bằng hoặc không bằng n, dựa trên số phần tử được đấy cho Subscriber) được gọi là yêu cầu đang giải quyết.

Ví dụ, giả sử Subscriber yêu cầu 5 phần tử và hiện tại có 7 phần tử sẵn sàng trong Publisher. Yêu cầu cần giải quyết cho Subsciber là 5, nên 5 của 7 phần tử được đẩy cho Subscriber. 2 phần tử còn lại được duy trì bởi Publisher, chờ Subscriber yêu cầu thêm phần tử. Nếu Subscriber sau đó yêu cầu thêm 10 phần tử thì 2 phần tử còn lại được đẩy cho Subscriber, dẫn đến yêu cầu cần giải quyết là 8. Nếu có thêm 5 phần tử đã sẵn sàng trong Publisher, 5 phần tử này được đẩy cho Subscriber, để lại yêu cầu cần giải quyết giờ là 3. Yêu cầu cần giải quyết sẽ vẫn ở mức 3 trừ khi Subscriber yêu cầu thêm n phần tử, trong trường hợp này yêu cầu cần giải quyết tăng lên là n + 3, hoặc thêm i phần tử được đẩy cho Subscriber, trong trường hợp này yêu cầu cần giải quyết giảm xuống còn 3 – i

Processor

Nếu một thực thể vừa là Publisher vừa là Suscriber, nó được gọi là Processor. Thông thường Processor hoạt động đứng trung gian ở giữa Publisher và Subscriber (một trong 2 có thể là Processor khác), thực hiện một vài chuyển đổi trên luồng dữ liệu. Ví dụ, một Processor có thể được tạo dùng để lọc các phần tử mà phù hợp với một vài tiêu chí trước khi truyền chúng cho Subscriber. Diễn tả trực quan về Processor được biểu diễn như bên dưới đây

Với những hiểu biết cơ bản cách Reative Stream hoạt động, chúng ta có thể chuyển đổi những khái niệm này vào trong Java bằng cách tạo các interface cho chúng.

Miêu tả Interface

Với những miêu tả bên trên, Reactive Stream được tạo thành từ 4 thực thể chính: (1) Publisher, (2) Subscriber, (3) Subscription, và (4) Processor. Từ góc độ Interface, các Publisher được yêu cầu chỉ cho phép các Subscriber đăng ký chúng. Vì thế, chúng ta có thể tạo một interface đơn giản cho Publisher, trong đó tham số kiểu generic hình thức, T, tượng trưng cho kiểu của các phần tử mà Publisher tạo:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Định nghĩa interface này yêu cầu chúng ta sau đó định nghĩa interface cho Subscriber. Như đã miêu tả ở bên trên, một Subscriber có 4 tương tác chính: (1) thông báo về việc đăng ký, (2) chấp nhận các phần tử được đẩy, (3) chấp nhận các lỗi xảy ra trong Publisher đã đăng ký, và (4) thông báo khi một Publisher hoàn thành. Điều này dẫn đến interface tiếp theo, được tham số hóa tương tự theo kiểu phần tử mà nó yêu cầu:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Tiếp theo, chúng ta phải định nghĩa interface cho Subscription. Thực thể này đơn giản hơn Subscriber và chỉ chịu trách nhiệm cho 2 hành động: (1) Chấp nhận các yêu cầu cho các phần tử và (2) bị hủy bỏ. Điều này dẫn đến định nghĩa interface như sau:

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Cuối cùng, chúng ta định nghĩa Processor là sự kết hợp giữa interface Publisher và Subscriber, với một giải quyết quan trọng:  Một Processor có thể tạo các phần tử thuộc kiểu khác với kiểu phần tử nó dùng. Do đó, chúng ta sẽ sử dụng tham số kiểu generic hình thức T để tượng trưng cho kiểu của các phần tử Processor dùng và R tượng trưng cho kiểu của phần tử trả về. Chú ý rằng việc thực thi của Publisher có thể dùng hoặc tạo ra các phần tử cùng kiểu nhưng không có giới hạn thời gian biên dịch mà nó phải làm như vậy. Kết quả interface này như sau:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

Trong khi 4 interface này tạo thành một giao kèo được code cho Reative Stream, có một số hạn chế và cách xử lý có chủ ý khác mà những interface này phải tuân theo. Những đặc điểm này, cùng với các định nghĩa interface ở bên trên, có thể được tìm thấy trong Reactive Streams JVM Specification. Chúng ta sẽ được thấy trong phần tiếp theo, phần thực thi chuẩn Java của đặc tả kỹ thuật Reactive Stream gần như giống hệt với đặc tả kỹ thuật Reactive Stream của JVM và hoạt động như một tiêu chuẩn hóa của giao kéo Reactive Stream trong  Java Standard Library.

Reactive Stream hoạt động như thế nào trong Java?

Cổng Java chuẩn của Interface Reactive Stream có thể tìm ở trong class java.util.concurrent.Flow và được đóng gói như các interface static trong class Flow. Khi Javadocs bị xóa, class Flow được định nghĩa như sau:

public final class Flow {
    private Flow() {} // uninstantiable
    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }
    public static interface Subscriber<T> {
        public void onSubscribe(Subscription subscription);
        public void onNext(T item);
        public void onError(Throwable throwable);
        public void onComplete();
    }
    public static interface Subscription {
        public void request(long n);
        public void cancel();
    }
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
}

Mặc dù không có nhiều điều để nói khi so sánh đặc tả kỹ thuật của Reactive Stream JVM với các định nghĩa tiêu chuẩn Java, các phiên bản Java tiêu chuẩn bao gồm một phần thực thi của Publisher: SubmissionPublisher. Class SubmissionPublisher hoạt động như một Publisher đơn giản, chấp nhận các phần tử để đẩy cho các Subscriber sử dụng phương thức submit(T item). Khi một phần tử được gửi cho phương thức submit,nó được đẩy bất đồng bộ cho các Subscriber như ví dụ sau:

public class PrintSubscriber implements Subscriber<Integer> {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(Integer item) {
        System.out.println("Received item: " + item);
        subscription.request(1);
    }
    @Override
    public void onError(Throwable error) {
        System.out.println("Error occurred: " + error.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.println("PrintSubscriber is complete");
    }
}
public class SubmissionPublisherExample {
    public static void main(String... args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        publisher.subscribe(new PrintSubscriber());
        System.out.println("Submitting items...");
        for (int i = 0; i < 10; i++) {
            publisher.submit(i);
        }
        Thread.sleep(1000);
        publisher.close();
    }
}

Kết quả sau khi chạy được in ra như sau:

Submitting items...
Received item: 0
Received item: 1
Received item: 2
Received item: 3
Received item: 4
Received item: 5
Received item: 6
Received item: 7
Received item: 8
Received item: 9
PrintSubscriber is complete

Bên trong Subscriber, chúng ta bắt được Object Subscription được truyền đến phương thức onSubscribe, cho phép chúng ta tương tác với Subscription sau đó. Khi chúng ta lưu object Subscription, chúng ta ngay lập tức thông báo cho Subscription rằng Subscriber đã sẵn sàng nhận một phần tử (bằng cách gọi subscription.request(1)). Chúng ta có thể làm tương tự như vậy trong phương thức onNext sau khi in ra phần tử nhận được. Số lượng này để thông báo đến Publisher rằng chúng tôi đã sẵn sàng chấp nhận phần tử khác ngay sau khi xử lý xong một phần tử.

Trong phương thức main, chúng ta chỉ cần khởi tạo SubmissionPublisher và PrintSubscriber và đăng ký cái sau với các trước. Sau khi Subscription được khởi tạo, chúng ta gửi các giá trị từ 0 – 9 cho Publisher, lần lượt đẩy các giá trị cho Subscriber một cách bất đồng bộ. Subscriber sau đó xử lý từng phần tử bằng cách bằng cách in giá trị của nó ra và thông báo với Subscription rằng nó đã sẵn sàng nhận các giá trị khác. Chúng ta sau đó tạm dừng thread của main 1 giây để cho phép phần gửi bất động bộ hoàn tất. Đây là một bước rất quan trọng khi phương thức submit đẩy bất đồng bộ các phần tử được gửi cho các Subcriber của nó. Vì thế, chúng ta phải cung cấp một khoảng thời gian hợp lý cho hoạt động bất đồng bộ hoàn tất. Cuối cùng, chúng ta đóng Publisher, lần lượt thông báo cho Subscriber rằng Subscription đã hoàn thành.

Chúng ta cũng có thể dùng một Processor và xâu chuỗi Publisher và Subscriber đầu tiên với Processor này. Trong ví dụ sau, chúng ta tạo một Processor tăng giá trị nhận được lên 10 và đẩy các giá trị đã tăng này cho Subscriber của nó.

public class PlusTenProcessor extends SubmissionPublisher<Integer> implements Subscriber<Integer> {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(Integer item) {
        submit(item + 10);
        subscription.request(1);
    }
    @Override
    public void onError(Throwable error) {
        error.printStackTrace();
        closeExceptionally(error);
    }
    @Override
    public void onComplete() {
        System.out.println("PlusTenProcessor completed");
        close();
    }
}
public class SubmissionPublisherExample {
    public static void main(String... args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        PlusTenProcessor processor = new PlusTenProcessor();
        PrintSubscriber subscriber = new PrintSubscriber();
        publisher.subscribe(processor);
        processor.subscribe(subscriber);
        System.out.println("Submitting items...");
        for (int i = 0; i < 10; i++) {
            publisher.submit(i);
        }
        Thread.sleep(1000);
        publisher.close();
    }
}

Chạy ví dụ này kết quả đầu ra như sau:

Submitting items...
Received item: 10
Received item: 11
Received item: 12
Received item: 13
Received item: 14
Received item: 15
Received item: 16
Received item: 17
Received item: 18
Received item: 19
PlusTenProcessor completed
PrintSubscriber is complete

Như mong đợi, mỗi giá trị được đẩy đã tăng thêm 10 và kết quả nhận được bởi Processor được chuyển đến Subscriber, cuối cùng một thông báo hoàn tất được in ra ở cả PlusTenProcessor và PrintSubscriber

You May Also Like

About the Author: Nguyen Dinh Thuc

Leave a Reply

Your email address will not be published.