Strea­ming gewinnt in unse­rer schnell­le­bi­gen Welt immer mehr Bedeu­tung und löst immer öfter Batch Pro­zesse ab. Dies pas­siert haupt­säch­lich, wenn Daten in nahezu Echt­zeit (near real time) zur Ver­fü­gung ste­hen müs­sen. In sol­chen Fäl­len wer­den Batch Pro­zesse in immer kür­ze­ren Abstän­den auf­ge­führt, sodass sie quasi dau­er­haft lau­fen, oder eine Strea­ming Appli­ka­tion wird ver­wen­det. Eines der wich­tigs­ten Öko­sys­teme für Strea­ming ist Apa­che Kafka inklu­sive Kafka Streams. Was einige der Unter­schiede zwi­schen Batch Pro­zes­sen und Strea­ming (über Kafka Streams) sind, erfahrt ihr in einem unse­rer Blog­bei­träge. In die­sem Blog­bei­trag wer­det ihr jedoch einen Ein­blick über das Design einer (ein­fa­chen) Kafka Streams Appli­ka­tion bekom­men. Um eine Kafka Streams Appli­ka­tion zu ent­wi­ckeln, muss man zwar an viele Details den­ken, aber zwei Dinge sind am wich­tigs­ten: die Daten, und die Architektur.

Daten

Bei der Ent­wick­lung einer Kafka Streams Appli­ka­tion ste­hen wie so oft die Daten im Vor­der­grund. Man muss dabei einige Fra­gen beden­ken, zum Bei­spiel, wie bekommt man die Daten, was pas­siert mit ihnen und wie gibt man sie wei­ter. Über jedes die­ser drei The­men­be­rei­che könnte man Stun­den lang spre­chen, daher wird hier davon aus­ge­gan­gen, dass die Daten bereits in Kafka Topics bereit gestellt und von dort auch abge­holt wer­den. Dies könnte zum Bei­spiel mit­tels Kafka Con­nect pas­sie­ren, wodurch Daten über einen JDBC oder ODBC Dri­ver in die Kafka Topics bzw. in die Daten­ban­ken geschrie­ben wer­den. In die­sem Blog­ein­trag wird aber nur beleuch­tet, was mit den Daten in einer Kafka Streams Appli­ka­tion passiert.

Archi­tek­tur

Die Daten wer­den lei­der sel­ten im Vor­feld per­fekt bereit­ge­stellt, so dass man sie direkt wei­ter­lei­ten kann. Meist müs­sen die bereit­ge­stell­ten Daten in irgend­ei­ner Form auf­be­rei­tet und ange­rei­chert wer­den, bevor der Abneh­mer mit den Daten etwas anfan­gen kann. Dabei muss man vor allem dar­über nach­den­ken, ob man Daten aus ver­schie­de­nen Quel­len zusam­men­füh­ren, also joinen, muss, ob man sta­te­l­ess oder sta­teful Logik ver­wen­det und wie man die Logik imple­men­tiert. Weil diese The­men für jede Appli­ka­tion spe­zi­ell beant­wor­tet wer­den, gibt es kei­nen all­ge­mei­nen Fall. Um das Design bild­lich erklä­ren zu kön­nen, wird ange­nom­men, dass Daten mehr­fach geschickt wer­den, das Ziel­sys­tem damit aber nicht umge­hen kann. Daher müs­sen die Daten in die­sem Bei­spiel dedu­pli­ziert werden.

Bei­spiel: Dedu­pli­ka­tion von Daten

Für die Dedu­pli­ka­tion von Daten braucht man sta­teful Logik, da man fest­stel­len muss, ob ein Daten­satz jemals geschickt wurde. Wenn dies der Fall ist, muss der Daten­satz mit dem vor­he­ri­gen Zustand abge­gli­chen wer­den. Daher wer­den ver­schie­dene Anteile benö­tigt: Im ers­ten Schritt müs­sen die Daten mit Ser­des (Seria­li­zer und Dese­ria­li­zer Paar) aus dem Kafka Topic gele­sen wer­den. Sobald die Daten ver­füg­bar sind, muss mit einem Sta­tes­tore per­sis­tiert wer­den, um fest­stel­len zu kön­nen, ob die Daten schon­mal ver­schickt und ob sie ver­än­dert wur­den. Im letz­ten Schritt müs­sen die Daten dann mit Ser­des wie­der in ein Kafka Topic geschrie­ben werden.

Schritt 1: Daten aus einem Topic lesen

Um Daten aus einem Topic zu lesen, müs­sen 3 Anteile imple­men­tiert wer­den:
Man muss die Topo­lo­gie, einen Serde und den Daten-Stream defi­nie­ren. Oft defi­niert man für jeden der Anteile eine eigene Java Klasse. In die­sem Bei­spiel ste­hen die Daten in einem Input Topic, in dem das Schlüs­sel-Wert Paar <Long, Part­ner>, wobei Part­ner ein bekann­tes Avro ist. Um die Daten in einem KStream zu lesen steht, wird ein Stream­sBuil­der ver­wen­det. Aus die­sem Stream­sBuil­der kann dann die Topo­lo­gie erzeugt wer­den. Das kann wie folgt Imple­men­tiert werden: 

@Produces
public Topology buildTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<Long, Partner> partners = builder.stream("NAME_DES_INPUT_TOPICS",
	        Consumed.with(serdes.partnerKey, 
                              serdes.partnerValue).withName("NAME_PARTNER_SOURCE"));
    Topology topology = builder.build();

    return topology;
}

Die Daten im KStream wer­den dabei mit den Ser­des aus­ge­le­sen. In die­sem Bei­spiel wer­den die Ser­des wie folgt definiert:

@Singleton
public class Serdes {
    public Serde<Long> partnerKey;
    public Serde<Partner> partnerValue;

    @PostConstruct
    public void init() {
        serdeConfig();
        partnerKey = org.apache.kafka.common.serialization.Serdes.Long();
        partnerKey.configure(serdeConfig, true);
        partnerValue = new SpecificAvroSerde<>();
        partnerValue.configure(serdeConfig, false);
    }
}

Jeg­li­che Logik zur Erken­nung von Dupli­ka­ten muss nach dem Aus­le­sen der Daten statt­fin­den. Oft wird dafür eine Trans­for­mer Klasse erzeugt, in der man ent­we­der die Pro­ces­sor oder Trans­for­mer API benutzt. Die Logik wird im zwei­ten Schritt vorgestellt.

Schritt 2: Dupli­kate erkennen

Nun da die Daten aus dem Topic gele­sen wur­den, kann fest­ge­stellt wer­den, wel­che Daten zum ers­ten Mal ver­ar­bei­tet wer­den und wel­che Daten schon bekannt sind. Da es sich hier um soge­nannte sta­teful Logik han­delt, muss der State der Daten fest­ge­hal­ten wer­den. Dies wird in Kafka Streams in Sta­tes­to­res gemacht. Sta­tes­to­res spei­chern Daten als Schlüs­sel-Werte Paare in Kafka Topics in ihrer Byte-Reprä­sen­ta­tion und kön­nen jeder­zeit abge­fragt wer­den. Ein Sta­tes­tore muss im Stream­sBuil­der hin­zu­ge­fügt wer­den bevor die Topo­lo­gie gebaut wird. Das kann wie fett mar­kiert ein­ge­rich­tet werden:

@Produces
public Topology buildTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<Long, Partner> partners = builder.stream("NAME_DES_INPUT_TOPICS",
            Consumed.with(serdes.partnerKey, 
            serdes.partnerValue).withName("NAME_PARTNER_SOURCE"));

    builder.addStateStore(Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("nameStateStore"),
        this.serdes.partnerKey, this.serdes.partnerValue));

    Topology topology = builder.build();
    return topology;
}
Imple­men­ta­tion der Duplikatserkennung

Sobald der Sta­tes­tore ein­ge­rich­tet ist, kann die Logik imple­men­tiert wer­den, um die Dupli­kate zu erken­nen. Dabei muss man Daten­sätze, die zuvor schon­mal ver­schickt wur­den und daher im Sta­tes­tore vor­han­den sind, mit dem neuen Daten­satz abglei­chen. Nur Events, die ent­we­der noch nicht geschickt wur­den oder von dem letz­ten Daten­satz abwei­chen wer­den ver­schickt. Mit der Pro­zes­sor API kann das so aussehen:

class MappingProcessor implements Processor<Long, Partner> {

	ProcessorContext context;

	private final String stateStoreName;
	private KeyValueStore<Long, Partner > stateStore;

	public MappingProcessor(String stateStoreName){
		this.stateStoreName = stateStoreName;
	}

	@Override
	public void init(ProcessorContext processorContext) {
		context = processorContext;
		this.stateStore = (KeyValueStore<Long, Partner>) 
                          context.getStateStore(stateStoreName);
	}

	@Override
	public void process(Long originalKey, Partner partner) {

		if(!isDuplicate(originalKey, partner)) {
			//todo: write data to output Topic
		}
		stateStore.put(originalKey, partner);

	}

	@Override
	public void close() {
	}

	boolean isDuplicate(Long originalKey, Partner partner){
		Partner partnerState = stateStore.get(originalKey);
		if(partnerState!=null && partner != null)
			return partnerState.equals(partner);
		if(partnerState== null && partner == null)
			return true;
		//Case: either state or current partner is null 
                //and the other is not null. 
                //This is not a duplicate!
		return false;
	}
}

Der Pro­ces­sor hat drei wich­tige Funk­tio­nen. Die init Funk­tion, in der der Pro­ces­sor initia­li­siert wird und der Sta­tes­tore ange­bun­den wird, die isDu­pli­cate Funk­tion, in der die Dupli­kats­lo­gik ein­ge­baut wird und die pro­cess Funk­tion, in der geprüft wird, ob ein Daten­satz, der kein Dupli­kat ist, ver­schickt wird, und der Sta­tes­tore auf den neus­ten Stand gebracht wird. Das Ver­schi­cken der Daten ist in der pro­cess Funk­tion noch als Todo eingetragen. 

Schritt 3: Schrei­ben der Daten in Topics

Um die Daten zu ver­schi­cken, muss man in der Pro­ces­sor API den Pro­ces­sor­Con­text benut­zen, um Daten wei­ter­zu­lei­ten. Dabei wird ein Schlüs­sel-Werte Paar in eine Sink geschrie­ben. In der pro­cess Funk­tion des Map­ping­Pro­ces­sors kann so aussehen: 

context.forward(originalKey, partner, To.child("Name der Sink"));

Die Sink muss dabei in der Topo­lo­gie hin­zu­ge­fügt wer­den, bevor die topo­lo­gly zurück gege­ben wird. 

topology.addSink("Name der Sink", 
                 OutputPartnerTopic, 
                 serdes.myPartnerKey.serializer(), 
                 serdes.myPartnerValue.serializer(), 
                 "Name des Processors");

Zusam­men­fas­sung

In die­sem Blog­bei­trag wurde vor­ge­stellt, wie man eine ein­fa­che Kafka Streams Appli­ka­tion desi­gned. Dabei wurde das Bei­spiel der Dedu­pli­ka­tion von Daten­sät­zen benutzt. Eine typi­sche Kafka Streams Appli­ka­tion besteht aus drei Tei­len: Zunächst einen Anteil, der Daten aus einer Daten­quelle, meist einem Kafka Topic, liest. Im zwei­ten Anteil wird die Logik imple­men­tiert, die je nach Anwen­dungs­fall sta­teful oder sta­te­l­ess sein kann. Im drit­ten und letz­ten Anteil wer­den die auf­be­rei­te­ten Daten weg­ge­schrie­ben, meist in ein Ziel Topic.