Introduzione a Spring Reactive Stream

1. Introduzione

Spring Reactive è una libreria di Spring che consente di usare i Reactive Stream. Per comprendere questi ultimi, vedremo prima i concetti che hanno portato alla loro creazione: gli stream di Java 8 e la Reactive Processing.

1.1 Stream di Java 8

Gli Stream di Java 8 (java.util.stream) sono un meccanismo usato per elaborare in modo efficiente una collezione di dati e prevede queste entità.

  • Lo Stream: una sequenza di dati letti da una Data Source (sorgente dati) che può essere una collezione (array, lista, set, altro). Lo Stream poi viene trasformato da una o più Operation in un nuovo Stream attraverso una Stream Pipeline.
  • L’Operation: la trasformazione dei dati contenuti nello Stream fatta da una funzione operatore, ad esempio: filter, map e reduce.
  • La Stream Pipeline: una sequenza di più Stream e Operation consecutive che sostanzialemete trasforma i dati in più passaggi senza alterare i dati di origine, fornendo un risultato finale.

Quindi una Stream Pipeline esegue questi passi:

  1. Legge i dati da un Data Source per inserirli in uno Stream.
  2. Esegue una o piu operazioni di elaborazione in sequenza producendo uno o più Stream aggiornati con i dati elaborati, fino ad ottenere il risultato desiderato.
  3. Restitusce il risultato finale dell’elaborazione che può essere usato da oggetti che ne fanno richiesta (Consumer).

1.2 Reactive Processing

La Reactive Processing (Elaborazione Reattiva) è un paradigma di sviluppo software dichiarativo, permette di sviluppare applicazioni asincrone, non bloccanti che possono trasformare flussi di dati in maniera semplice e strutturata. Introdotto per sfruttare al meglio i processori moderni multi-core e dare resilienza e disaccoppiamento ai componenti dell’applicazione.

1.3 Reactive Stream

I Reactive Stream sono una API Specification implementata da diversi framework tra cui anche Spring con il Progetto Reactor. L’API è simile agli Stream di Java 8 e la differenza sostanziale risiede nella gestione dei tempi in cui i dati vengono spostati e processati nell’applicazione:

  • Negli Stream di Java 8 i dati arrivano immediatamente da un Data Source già popolato: la collection (list, set, map, ecc.).
  • Nei Reactive Stream invece, i dati possono arrivare in un periodo di tempo indefinito da un nuovo Data Source asincrono: il Producer. Questo può essere qualsiasi cosa che emette dati in modo asincrono come ad esempio una chiamata http, un evento proveniente da un Message Broker (Kafka, RabbitMQ), ecc.

I Reactive Stream aiutano a creare, combinare e ascoltare i flussi di dati. Astraggono (rendono trasparente per lo sviluppatore) diversi aspetti low-level come i thread, la thread-safety, la sincronizzazione, le strutture dati in concorrenza, il non-blocking I/O.

Permettono di leggere dati provenienti da un Producer e trasformarli; combinare e trasformare dati provenienti da più Producer e resituire un risultato attraverso l’uso di operatori reactive.

2. Riferimenti

3. Spring Reactive Stack

Spring ha introdotto il Reactive Stack dalla release 5.0 con il progetto Reactor (projectreactor.io).

I Reactive Stream che fanno parte dello stack usano tipi di dati reattivi: reactor.core.publisher.Mono e Flux che sono implementati con l’unione di due pattern: Iterator e Observer:

  • L’Iterator pattern è usato per leggere dal Data Source.
  • L’Observer riceve i dati dall’Iterator in modo asincrono, li processa e li invia ai Consumer.
  • I Consumer sottoscritti allo stream (Mono/Flux) ricevono i dati.

3.1 Produrre uno stream di dati

Mono e Flux sono Publisher cioè implementano l’interfaccia Publisher della Reactive Stream API. Leggono (iterano) i dati dal Data Source e li emettono, quando sono disponibili, verso i loro Subscriber (Consumer).

Flux può emettere 0..n elementi. Esempio di Stream statico di 4 elementi: Publisher<Integer> staticFlux = Flux.just(1, 2, 3, 4);

Mono può emettere al massimo 0..1 elemento. Esempio di Stream Mono di 1 elemento: Publisher<Integer> staticMono = Mono.just(1);

3.2 Sottoscriversi ad uno stream di dati

I dati non fluiscono dal Producer (Publisher) verso un Consumer finchè quest’ultimo non si sottoscrive e diventa quindi Subscriber. I dati fluiscono secondo il Push Model: Il Publisher invia dati (eventi) ai Subscriber quando sono disponibili e ed è istanziato almeno un Subscriber.

3.3 Esempio - Cold Stream

Ecco un semplice esempio di Producer Flux che emette i dati provenienti da un Data Source statico verso un Consumer.

Dipendenze maven:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>(inserire l'ultima versione stabile)</version>
</dependency>

<dependency> 
    <groupId>ch.qos.logback</groupId> 
    <artifactId>logback-classic</artifactId> 
    <version>(inserire l'ultima versione stabile)</version> 
</dependency>

Codice Java:

// Consumer
List<Integer> consumer = new ArrayList<>();

// Producer a cui viene sottoscritto il Consumer
Flux.just(1, 2, 3, 4)
    .log()
    .subscribe(consumer::add);

// Verifica che il Consumer abbia ricevuto tutti i dati
assertThat(consumer).containsExactly(1, 2, 3, 4);

Dal log si può vedere come i dati fluiscono attraverso lo Stream, dalla sottoscrizione del consumer al completamento:

20:25:19.550 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onComplete()

Il flusso è definito nell’interfaccia del Subscriber visible tramite implementazione con classe anonima dell’esempio precedente:

// Consumer
List<Integer> consumer = new ArrayList<>();

// Producer a cui viene sottoscritto il Consumer
Flux.just(1, 2, 3, 4)
    .log()
    .subscribe(new Subscriber<Integer>() {
        
        @Override
        public void onSubscribe(Subscription s) {
          s.request(Long.MAX_VALUE);
        }
    
        @Override
        public void onNext(Integer integer) {
          consumer.add(integer);
        }
    
        @Override
        public void onError(Throwable t) {}
    
        @Override
        public void onComplete() {}
    });

// Verifica che il Consumer abbia ricevuto tutti i dati
assertThat(consumer).containsExactly(1, 2, 3, 4);

3.4 Backpressure

Backpressure (contropressione) è un termine preso in prestito dalla fluidodinamica, definisce la pressione o forza che si oppone alla fuoriuscita di un fluido da un’apertura.

In ambito informatico è il sovraccarico di dati che un componente software sta inviando ad un altro componente nel sistema.

Nei Reactive Stream la Backpressure è un meccanismo di comunicazione tra il Producer e il Consumer di eventi che permette di regolare il flusso di messaggi tra di essi affinchè sia sostenibile e non faccia andare in sovraccarico l’applicazione, evitando la perdita di messaggi in transito, errori di overflow, rendendo quindi l’applicazione robusta nel tempo.
Quindi, un Subscriber può dire ad un Publisher di inviargli meno dati per evitare sovraccarico.

Volendo applicare la Backpressure nell’esempio mostrato in precedenza, si può usare il metodo request() sulla subscription per chiedere al Publisher di mandare solo una certa quantità di elementi (2) e solo quando il subscriber è pronto:

Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(new Subscriber<Integer>() {
    private Subscription s;
    int onNextAmount;

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        s.request(2);  // <- backpressure, chiede 2 dati alla volta
    }

    @Override
    public void onNext(Integer integer) {
        elements.add(integer);
        onNextAmount++;
        if (onNextAmount % 2 == 0) {
            s.request(2);
        }
    }

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
});

Se si esegue l’applicazione adesso, nel log si vede la chiamta al metodo request(2) seguita da due chiamate onNext():

23:31:15.395 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onComplete()

3.5 Mappare dati in uno Stream

Considerando l’Esempio 1, possiamo introdurre una Operation di mapping con il metodo map() sul reactive stream:

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)  // <- mapping operation
  .subscribe(elements::add);

3.6 Combinare due Stream

In questo esempio si definisce un flusso con i valori 1, 2, 3, 4 che vengono mappati col loro doppio. Poi si combina questo fusso con uno nuovo tramite l’operaotore zipWith(). Il flusso nuovo emette dati 0, 1, 2, 3, ecc. L’operatore zipWith combina i due flussi stampando i rspettivi valori in una stringa. Questa stringa poi è inviata ad un subscriber.

// Consumer
List<Integer> consumer = new ArrayList<>();

Flux.just(1, 2, 3, 4)
    .log()
    .map(i -> i * 2)
    .zipWith(Flux.range(0, Integer.MAX_VALUE), 
        (one, two) -> String.format("1st Flux: %d, 2nd Flux: %d", one, two))
    .subscribe(consumer::add);

assertThat(elements).containsExactly(
  "First Flux: 2, Second Flux: 0",
  "First Flux: 4, Second Flux: 1",
  "First Flux: 6, Second Flux: 2",
  "First Flux: 8, Second Flux: 3");
20:04:38.064 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO  reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO  reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | cancel()

3.7 Esempio - Hot Stream

Gli esempi visti fin’ora hanno fatto uso di Cold Stream, cioè stream che usano Data Source statici. Gli Hot Stream invece:

  • hanno Data Source dinamici che possono essere aggiornati nel tempo in modo indefinito.
  • sono sempre in esecuzione.
  • possono essere sottoscritti in qualsiasi momento e i nuovi Subscriber riceveranno dati continuamente ma avranno perso quelli che sono passati prima della loro sottoscrizione.

Esempi di Hot Stream:

  • Uno stream di RSS Feed che regisce quando viene pubblicato nuovo articolo su un blog e invia un messaggio su un canale Telegram.
  • Uno stream di movimenti del mouse che deve reagire continuamente e inviare i dati ad un logger.

Creare un Flux collegabile

Creiamo un Flux che dura per sempre ed emette risultati sulla console. Simula uno stream di dati infinito che arriva da un Data Source esterno, System.currentTimeMillis():

ConnectableFlux<Object> publisher = Flux.create(
    fluxSink -> {
        while(true) {
            fluxSink.next(System.currentTimeMillis());
        }
    })
    .publish();  // Setup the flux

publisher.subscribe(System.out::println);

publisher.connect()

  • create() crea un nuovo Flux che dura per sempre e che legge i millisecondi. Qui lo stiamo solo creando, ma non è ancora attivo cioè non emette dati.
  • publish() prepara il Flux alla emissione dei dati verso i subscriber con una backpressure impostata.
  • subscribe() sottoscrive un consumer come Subscriber.
  • connect() connette il Flux al Data Source e ciò lo fa attivare: il Flux inizia ad emettere i dati.
0%