Reactive Spring Cloud Stream a gyakorlatban

A SCS keretrendszer egy absztrakciós réteget biztosít, mely eseményvezérelt rendszerek közötti aszinkron, üzenet alapú kommunikációt tesz lehetővé, elfedve a rendszer által használt üzenetközvetítő réteget. Ennek köszönhetően könnyedén válthatunk az egyes szállítók implementációi között.

Előnyei

A példában Kotlin nyelv használatával tekintjük át a SCS által nyújtott főbb lehetőségeket. Üzenetközvetítő rendszerként Kafkát és RabbitMQt, adatbázisként MongoDBt használunk. A tesztesetek pedig TestContainers segítségével készülnek.

A SCS támogatja a reaktív programozási modellt, ehhez a Reactor nevű könyvtárat használja, mely lehetővé teszi az aszinkron, nonblocking programkód készítését deklaratív módon. Hátránya lehet, hogy sok műveletet tartalmazó pipeline esetén nehezen átlátható, keveredik a keretrendszer és az üzleti logika kódja. Ebben lehetnek segítségünkre a coroutineok, mely segítségével imperatív stílusban fogalmazhatunk meg aszinkron, nonblocking funkcionalitást. A cikk végén röviden áttekintjük a használatát.

SCS áttekintés

Az SCS modellje négy fő részből áll. Az üzenetközvetítő rendszerből, pl Kafka. Egy az üzenetközvetítő rendszert a belső csatornákkal (pipeline) összekötő csatolóból (binder), mely az üzenetküldő rendszer és a keretrendszer közötti integrációért felelős. A belső csatornákból, ami az alkalmazás és az csatoló közötti kommunikációt valósítja meg, valamint maga az alkalmazás, ahol az üzleti logikát implementálhatjuk.

Spring Cloud Stream architektúra

Az dokumentáció részletes leírást biztosít a keretrendszerről, valamint a létező csatolókról. Természetesen bármikot készíthetünk saját implementációt.

Az alábbiakban megnézzük a különböző üzenetküldési és fogadási módszereket. Az egyszerűség kedvéért az alkalmazás önmagával kommunikál. Az ábrán ennek működése látaható.

A feldolgozás egy REST végponton küldött kéréssel indul, mely tartalmát Kafkára küldi. Az ide érkező üzeneteket három feldolgozó is fogadja és feldolgozás után különböző csatornákra publikálja tovább.

Konfiguráció

Első lépésként az üzenetküldő rendszerekhez történő csatlakozási adatokat állítjuk be. Környezetünk egy docker-ben futó alap konfigurációkkal rendelkező Kafka. Természetesen itt megadhatjuk az authentikációs és egyéb beállításokat is az adott környezetnek megfelelően.

Majd a csatoló (binder) beállításai következnek. Melyek az adott csatolóra érvényes közös beállítások lesznek.

A default-binder beállítással megadhatjuk több csatoló esetén melyik legyen az alapértelmezett. Amennyiben csak egy csatolót használunk, vagy minden kötés esetben megadjuk, elhagyható. A kulcs és értékek szerializálásának módját is itt állíthatjuk be mind producer, mind consumer szinten. A példában JSON üzeneteket küldünk. Kafka esetében éles rendszeren a rendszerek közötti contractként érdemes Avro-t használni.

Ezt követően szükségünk van még a feldolgozó funkciók (function) beállítására, pontosvesszővel elválasztott lista formájában. Ezeket a funkciókat fogja a keretrendszer a megfelelő csatornákhoz kapcsolni.

Az feldolgozás három csoportba sorolható, mely típusokat Java interfészek reprezentálnak.

Forrás (Producer) - Supplier

A forrás segítségével küldhetünk üzenetet az üzenetközvetítő rendszeren keresztül.

Először létrehozzuk a feldolgozó metódust, mely egy magasabb rendű függvénnyel tér vissza. A metódus nevének egyeznie kell function.definion beállításban megadott névvel, vagy a @Bean annotáció name mezőjében megadott névvel. A keretrendszer ennek segítségével gyártja le és rendeli a megfelelő csatornához a feldolgozót.

A függvény bementei- és/vagy visszatérési értékének típusa egy reaktív adatfolyam (Flux). Itt használhatjuk közvetlenül az üzenetet reprezentáló objektumot vagy becsomagolhatjuk a keretrendszer által biztosított Message osztályba. Ebben az esetben az üzenet mellett az üzenet fejlécekhez is hozzáférünk.

Az utolsó lépés a létrehozott funkció bekötése a megfelelő üzenetcsatornára (Bindings) és opcionálisan a további beállítások megadása.

A produceMessage funkciót kötjük a Kafka üzenetküldő rendszerhez. A keretrendszer a korábbi beállításaink alapján tudni fogja, hogy éri el. A kötés nevének a function.definition részben megadott névvel kell megegyeznie.
A binding nevének formátuma kötött, három részből épül fel, kötöjellel elválasztva.

Itt a produceMessage-out-0 azt jelenti hogy a productMessage funkció visszatérési értékei közül az első adatfolyamot (Flux) szeretnénk kötni kimeneti üzenetcsatornaként. A destination a Kafka topic, ahova az üzenetet publikálni szeretnénk.
A binder az a csatoló amelyet használni szeretnénk. Jelen esetben opcionális, hisz a Kafka csatolót állítottuk be alapértelmezettnek.

A producer ezzel elkészült, képes üzeneteket küldeni Kafkára.

A küldéshez létrehozunk egy unicast processort, melynek emitNext metodusát hívva tudunk üzenetet helyezni az adott Flux-ra. Ez a processor az, mellyel a produceMessage által visszaadott függvény visszatér.
A unicastProcessor egy adatfolyamot reprezentál melyre a keretrendszer feliratkozik és az általa kibocsátott objektumokat - szerializálás után - az üzenetközvetítő rendszer felé publikálja.

Fogyasztó (Consumer) - Consumer

A publikációhoz hasonlóan az üzenetek fogadásához is létre kell hoznunk egy funkciót.

Consumer esetében fel kell iratkoznunk a reactive streamre. A Consumer interfész accept metódusának nincs visszatérési értéke, így a keretrendszer nem kap referenciát a streamre, mellyel feliratkozhatna.
Ez a ‘probléma’ megkerülhető, amennyiben Consumer helyett Function típust használunk a consumeMessage metódus visszatérési értékeként, melynek bemenete a Consumernél is használt típus, kimenete pedig egy üres Mono.

A beállítások szintén hasonlóak a forrás beállításaihoz. Itt azonban az in kulcsszóval jelöljük hogy bejövő csatornát szeretnénk létrehozni. A group pedig a Kafka által használt consumer group, mely segítségével újracsatlakozás után meg tudja állapítani hogy az adott fogyasztónak honnan kell folytatni az üzenetek kiolvasását.

Feldolgozó (Processor) - Function

A feldolgozó a bejövő üzenetet (feldolgozás után) kimeneti üzenetként továbbítja az üzenetküldő rendszer felé.
Ebben a példában Kafkáról érkező üzeneteket dolgozunk fel és publikáljuk RabbitMQ felé.

Ehhez szükségünk van a Kafkához hasonlóan a RabbitMQ felparaméterezésére. Az üzenet formátuma alapértelmezetten JSON így nem konfiguráljuk külön.

A feldolgozó egy Function típussal tér vissza, melynek első generikus paramétere a bementei adatfolyam, a második pedig a kimeneti. A példában egy Product osztály által reprezentált üzenetet fogadunk Kafka felől, majd feldolgozás után InventoryInfo típusú üzenetként publikáljuk RabbitMQ-ra.

A ki-, és bemeneti adatfolyamot is be kell kötnünk a megfelelő csatornákra. A Kafka beállításokat már megismertük. RabbitMQnál meg kell adnunk egy speciális, RabbitMQhoz tartozó bindings részt is, mely tartalmazza az exchange típusát és a routing key-t. Ez alapján az exchange-re érkező üzeneteket a későbbi fogyasztók a megfelelő queue-ba irányíthatják (routing) direct típusú exchange esetén, mely ideális az egy forrás - egy fogyasztó típusú esetekben.

További típusok bemutatása a hivatalos oldalon.

Az üzeneteink az inventory.message.exchange csatornára érkeznek. Amennyiben van a megadott routing key-el queue hozzárendelve, az üzenetek automatikusan bekerülnek a sorba. A következő fejezetben egy ilyen queue-t létrehozó bindinget is készítünk.

Több Forrás - Több Fogyasztó

Lehetőségünk van egy funkción belül több forrásból adatot fogadni, illetve több üzenetközvetítő felé publikálni. Ehhez a feldolgozóknál megismert Funtion interfacet használhatjuk. Mivel az interface egy visszetérési értékkel és egy bemeneti paraméterrel rendelkezik, így a bejövő és kimenő csatornákat egy TupleN osztályba csomagolva tudjuk fogadni és visszaadni, ahol N a paraméterek száma. Az alábbi funkciónak két bemeneti csatornája van. Az egyik Kafkára a másik RabbitMQhoz kerül bekötésre. Mindkettő különböző típusú üzeneteket fogad, majd az objektumokból a neveket és mennyiségeket kinyerve és becsomagolva 1–1 Kafka csatornára továbbítja azokat.

A csatornák beállítása hasonló az eddig megismertekhez, de itt nyer értelmet a névkonvenció végén helyet foglaló index szám. Az első bemeneti paraméter a 0 indexet kapja a második pedig az 1-et és így tovább.
Így például a Flux<ProductMessage> bemeneti adatfolyam teljes neve multiInMultiOut-in-1 mivel a bemeneti Tuple2 második helyén szerepel.

RabbitMQ fogyasztónál is szükséges a gyártóspecifikus paraméterek beállítása. Jelen esetben a legfontosabb a bindingRoutingKey. Ez esetünkben megegyezik az enrichAndSendToRabbitMq kimeneti csatornán beállítottal, így az általa küldött üzenetek az exchangeről ehhez a bemeneti csatornához tartozó queueba kerülnek továbbításra. A queue automatikusan létrehozásra kerül és csatolva lesz a beállított exchangehez (paraméterezéssel módosítható a vislekedés)

Kafka az alapértelmezett binder, így nem kötelező megadni.

Érdemes a bindings-okat külön property fileban tárolni, így egy file cseréjével könnyen átállíthatjuk a szoftvert más üzenetküldő rendszerek használatára.

Komponens teszt

Teszteléshez a TestContainers könyvtárat használjuk, mely a teszt futtatása előtt elindítja az általunk meghatározott konténereket a végén pedig leállítja és törli őket.
A környezethez egy docker compose file készült, ami az összes szükséges konténert tartalmazza. Ezt adjuk át a TestContainersnek.

A példában egy üzenetet küldünk Kafkán keresztül és megnézzük, az adatbázisba történő mentés sikeresen megtörténik-e. Mivel az üzenetküldés aszinkron, így az Awaitility nevű könyvtár segítségével megvárjuk míg az üzenet feldolgozása és mentése megtörténik vagy a meghatározott időlimit letelik, mely esetben a tesztet sikertelennek nyilvánítjuk. A példában azt vizsgáljuk, hogy üres adatbázisba kerül-e rekord az üzenet publikálása után. Természetesen érdemes ennél jóval részletesebb vizsgálatot végezni.

Coroutineok és a Reactive Stream

Vegyük az alábbi streamet. A Flux egy reaktív adatfolyam reprezentációja, melyen különböző műveleteket hajthatunk végre.

Mock stream

Tegyük fel hogy szeretnénk a járművek típusú objektumból autó típusút képezni, ezt adatbázisba lementeni majd egy webszolgáltatáson keresztül lekérdezni a típusát és ebből előállítani a végleges adatmodellt, melyet egy másik rendszer felé publikálunk.
Az első példában a Reactor által nyújtott deklaratív megoldás látható, míg a második coroutin használatával készült.

A teljes reactor + coroutine példa megtalálható a linken.

Reactor

Coroutine

A mono itt egy coroutine builder, mely segítségével elindítjuk a coroutine-t és a végeredményét becsomagoljuk egy Mono-ba, mely kompatibilis a Reactor keretrendszerrel. A kettő kombinációjának nagy előnye hogy könnyedén különválaszthatjuk az üzleti logikát a keretrendszertől, ám nem veszítjtük el a keretrendszer által biztosított funkcionalitást — mint például hibakezelés, retry, stb - sem.

A teljes forráskód megtalálható GitHub-on.

Kotlin, Spring, Microservice enthusiast

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store