Введение в RxJava: наблюдаемый паттерн

фото Даниэль Грегуар

на Unsplash

"RxJava является реализацией Java VM Реактивные расширения: библиотека для составления асинхронных и событийных программ с использованием наблюдаемых последовательностей. "от разработчиков RxJava.

Вот и все. Это в основном позволяет вам следовать парадигме реактивного программирования.

Цель этой статьи — познакомить вас с наблюдаемый, Для этого я расскажу вам о простом сценарии использования, с которым я столкнулся и получил пользу от использования Observable.

Случай использования:

Возбудить внешний процесс, используя ProcessBuilder и используйте Observable для наблюдения за процессом.

Сначала я напишу полную программу, если не буду использовать RxJava, а затем объясню. Затем к Реактивному программированию.

Реализация 1: Обычный метод с использованием Java Consumer

package io.convert2pdf.commons.http;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.function.Consumer;

public class RxJavaObservableProcess {
    ProcessBuilder processBuilder = null;
    Process process = null;
    
    Consumer inputConsumer;
    Consumer errorConsumer;
    Consumer completeConsumer;
    
    public RxJavaObservableProcess(List command, Consumer inputConsumer, Consumer errorConsumer, Consumer completeConsumer) {
        processBuilder = new ProcessBuilder(command);
        this.inputConsumer = inputConsumer;
        this.errorConsumer = errorConsumer;
        this.completeConsumer = completeConsumer;
    }
    
    public void start() throws Exception {
        process = processBuilder.start();
        readInputStream(process);
        readErrorStream(process);
        process.waitFor();
    }
    
    private void readInputStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line = null;
            try {
                while((line = br.readLine()) != null) {
                    this.inputConsumer.accept(line);
                }
                this.completeConsumer.accept(null);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void readErrorStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
            String line = null;
            try {
                while((line = br.readLine()) != null) {
                    this.errorConsumer.accept(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    public static void main(String() args) throws Exception {
        RxJavaObservableProcess process = new RxJavaObservableProcess(
                List.of("tasklist"), 
                (input) -> System.out.println("Input: " + input), 
                (error) -> System.out.println("Error: " + error), 
                (complete) -> System.out.println("Complete: " + complete)
        );
        
        process.start();
    }
}

Примечание: если вы работаете в Unix, измените List.of («список задач») на List.of («ps», «aux»)

Объяснение:

В основном методе я инициализирую RxJavaObservableProcess, который принимает 4 аргумента

  1. Команда. В этом случае я просто хочу получить текущие запущенные процессы на моем ноутбуке с Windows.
  2. Потребитель входного потока. Потребитель, который вызывается, когда есть доступная строка ввода.
  3. Ошибка потока потребителя. Потребитель, который вызывается при появлении строки ошибки.
  4. Полный государственный потребитель. Я добавил этого потребителя, чтобы при чтении кода с помощью RxJava вы могли установить связь между ними.

Я создаю процесс с заданными аргументами на моем Начните метод и читать входные и ошибочные потоки на readInputStream и readErrorStream методы соответственно. Чтение потоков ввода / ошибок является блокирующей операцией, поэтому я обрабатываю их в отдельном потоке.

Внутри каждого потока методы чтения я использую BufferedReader прочитать строку и написать ее потребителю. В конце чтения входного потока я вызываю completeConsumer с нулевым, чтобы сообщить пользователю, что процесс завершен.

Это простая задача, но для ее написания потребовалось так много кода. Кроме того, мне пришлось заранее передать всех потребителей, чтобы я мог присоединить их к соответствующему потоку, прежде чем я вызову блокирующий вызов process.waitFor ().

Это не плохая программа, у меня фактически нет проблем с ней. Но я думаю, что парадигма реактивного программирования может помочь мне сделать ее лучше.

Реализация 2: использование RxJava

package io.convert2pdf.commons.http;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.ReplaySubject;

public class RxJavaObservableProcessV2 {
    ProcessBuilder processBuilder = null;
    Process process = null;
    
    ReplaySubject processInputs = ReplaySubject.create();
    
    public RxJavaObservableProcessV2(List command) {
        processBuilder = new ProcessBuilder(command);
    }
    
    public void startAsync() throws Exception {
        new Thread(() -> {
            try {
                process = processBuilder.start();
                readInputStream(process);
                readErrorStream(process);
                process.waitFor();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    
    public Observable getInputs() {
        return processInputs;
    }
    
    private void readInputStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line = null;
            try {
                while((line = br.readLine()) != null) {
                    this.processInputs.onNext(line);
                }
                this.processInputs.onComplete();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void readErrorStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
            String line = null;
            try {
                ArrayList buffer = new ArrayList();
                while((line = br.readLine()) != null) {
                    buffer.add(line);
                }
                if(buffer.size() > 0) {
                    this.processInputs.onError(new Throwable(Arrays.toString(buffer.toArray())));                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    public static void main(String() args) throws Exception {
        RxJavaObservableProcessV2 process = new RxJavaObservableProcessV2(List.of("tasklist"));
        process.startAsync();
        
        process.getInputs().subscribe(
                (input) -> System.out.println("Input: " + input), 
                (error) -> System.out.println("Error: " + error), 
                () -> System.out.println("Completed")
        );
    }
}

Обратите внимание, что полная реализация должна позаботиться и об отмене подписки. Subscribe возвращает Disposable, которое следует утилизировать после завершения.

Объяснение:

Код выглядит довольно похоже с небольшим изменением. Теперь все потребители заменены одним ReplaySubject processInputs. Это открывает много возможностей для развязки вашей программы и выполнения операций в отдельности (разделение интересов). ReplaySubject Я использовал здесь, это тип наблюдателя, который испускает события. Эти события могут быть отправлены как

  1. простое сообщение, хотя onNext
  2. ошибка типа Throwable through OnError
  3. хотя полное состояние OnComplete

После того, как наблюдатель завершает работу, он не может генерировать сообщения о следующих ошибках или ошибках. То же самое относится и к ошибке. В RxJava все предметы действуют как наблюдатель и наблюдаемый,

Это удобно После создания и запуска процесса запуска на startAsync, Я могу передать этот объект и, тем не менее, получить доступ к потоку ввода и ошибок, подписавшись на Observable return by getInputs ().

Это обращение, чтобы вернуть наблюдаемое, чтобы мы не открывали Субъект внешнему миру. В отличие от Observer, Observable позволяет пользователю только наблюдать за источником (субъектом) и не позволяет генерировать события через него.

Это отличается от предыдущего примера. Ранее, когда потребитель потребляет сообщение, оно исчезает. Если другие сервисы должны обрабатывать данные из процесса, нам нужно явно вызывать эти сервисы внутри наших потребителей. С ReplaySubject каждый сервис, который подписан на processInputs Наблюдаемый будет получать все сообщения, пока OnError/OnComplete не называется. Это улучшает управление событиями, позволяя наблюдать и обрабатывать сообщения в разных сервисах.

RxJava — это крутая технология, которая имеет много преимуществ. Хотя я только коснулся общего использования Observable и ReplaySubject, вы можете узнать больше о нем, перейдя на веб-сайты RxJava или ReactiveX.

Я использовал вариант этого кода для управления процессами на сайте PS2PDF. Ps2pdf.com позволяет пользователям сжать pdf и конвертировать изображения такие как JPEG Все эти преобразования порождают процессы, которые выполняются FFMPEG или какая-либо другая программа с открытым исходным кодом для выполнения фактического преобразования. Одно из мест, где я использую это, когда я хочу обновить процент завершения преобразование видео конечному пользователю.

Я порождаю процесс FFMPEG и передаю этот процесс Observable в службу под названием WebSocketService. Который управляет связью в реальном времени между пользователями и системой. Этот WebSocketService подписывается на входной поток Observable и ожидает данные. В случае FFMPEG входной поток строки принимает следующую форму.

frame= 1185 fps=337 q=38.0 size=    2048kB time=00:00:20.94 bitrate= 801.1kbits/s speed=5.95x

WebSocketService декодирует это сообщение и отправляет ход выполнения внешнему интерфейсу, который будет отображаться с индикатором выполнения, чтобы пользователь знал, что происходит.

Примечание. Этот код был написан на Java 14 и RxJava 3. Он должен работать с Java 8+ с любой версией RxJava с текущим импортом.



Источник: Введение в RxJava: наблюдаемый паттерн


Похожие материалы по теме: Введение в RxJava: наблюдаемый паттерн

Leave a comment