Streaming gewinnt in unserer schnelllebigen Welt immer mehr Bedeutung und löst immer öfter Batch Prozesse ab. Dies passiert hauptsächlich, wenn Daten in nahezu Echtzeit (near real time) zur Verfügung stehen müssen. In solchen Fällen werden Batch Prozesse in immer kürzeren Abständen aufgeführt, sodass sie quasi dauerhaft laufen, oder eine Streaming Applikation wird verwendet. Eines der wichtigsten Ökosysteme für Streaming ist Apache Kafka inklusive Kafka Streams. Was einige der Unterschiede zwischen Batch Prozessen und Streaming (über Kafka Streams) sind, erfahrt ihr in einem unserer Blogbeiträge. In diesem Blogbeitrag werdet ihr jedoch einen Einblick über das Design einer (einfachen) Kafka Streams Applikation bekommen. Um eine Kafka Streams Applikation zu entwickeln, muss man zwar an viele Details denken, aber zwei Dinge sind am wichtigsten: die Daten, und die Architektur.
Daten
Bei der Entwicklung einer Kafka Streams Applikation stehen wie so oft die Daten im Vordergrund. Man muss dabei einige Fragen bedenken, zum Beispiel, wie bekommt man die Daten, was passiert mit ihnen und wie gibt man sie weiter. Über jedes dieser drei Themenbereiche könnte man Stunden lang sprechen, daher wird hier davon ausgegangen, dass die Daten bereits in Kafka Topics bereit gestellt und von dort auch abgeholt werden. Dies könnte zum Beispiel mittels Kafka Connect passieren, wodurch Daten über einen JDBC oder ODBC Driver in die Kafka Topics bzw. in die Datenbanken geschrieben werden. In diesem Blogeintrag wird aber nur beleuchtet, was mit den Daten in einer Kafka Streams Applikation passiert.
Architektur
Die Daten werden leider selten im Vorfeld perfekt bereitgestellt, so dass man sie direkt weiterleiten kann. Meist müssen die bereitgestellten Daten in irgendeiner Form aufbereitet und angereichert werden, bevor der Abnehmer mit den Daten etwas anfangen kann. Dabei muss man vor allem darüber nachdenken, ob man Daten aus verschiedenen Quellen zusammenführen, also joinen, muss, ob man stateless oder stateful Logik verwendet und wie man die Logik implementiert. Weil diese Themen für jede Applikation speziell beantwortet werden, gibt es keinen allgemeinen Fall. Um das Design bildlich erklären zu können, wird angenommen, dass Daten mehrfach geschickt werden, das Zielsystem damit aber nicht umgehen kann. Daher müssen die Daten in diesem Beispiel dedupliziert werden.
Beispiel: Deduplikation von Daten
Für die Deduplikation von Daten braucht man stateful Logik, da man feststellen muss, ob ein Datensatz jemals geschickt wurde. Wenn dies der Fall ist, muss der Datensatz mit dem vorherigen Zustand abgeglichen werden. Daher werden verschiedene Anteile benötigt: Im ersten Schritt müssen die Daten mit Serdes (Serializer und Deserializer Paar) aus dem Kafka Topic gelesen werden. Sobald die Daten verfügbar sind, muss mit einem Statestore persistiert werden, um feststellen zu können, ob die Daten schonmal verschickt und ob sie verändert wurden. Im letzten Schritt müssen die Daten dann mit Serdes wieder in ein Kafka Topic geschrieben werden.
Schritt 1: Daten aus einem Topic lesen
Um Daten aus einem Topic zu lesen, müssen 3 Anteile implementiert werden:
Man muss die Topologie, einen Serde und den Daten-Stream definieren. Oft definiert man für jeden der Anteile eine eigene Java Klasse. In diesem Beispiel stehen die Daten in einem Input Topic, in dem das Schlüssel-Wert Paar <Long, Partner>, wobei Partner ein bekanntes Avro ist. Um die Daten in einem KStream zu lesen steht, wird ein StreamsBuilder verwendet. Aus diesem StreamsBuilder kann dann die Topologie erzeugt werden. Das kann wie folgt Implementiert werden:
@Produces
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, Partner> partners = builder.stream("NAME_DES_INPUT_TOPICS",
Consumed.with(serdes.partnerKey,
serdes.partnerValue).withName("NAME_PARTNER_SOURCE"));
Topology topology = builder.build();
return topology;
}
Die Daten im KStream werden dabei mit den Serdes ausgelesen. In diesem Beispiel werden die Serdes wie folgt definiert:
@Singleton
public class Serdes {
public Serde<Long> partnerKey;
public Serde<Partner> partnerValue;
@PostConstruct
public void init() {
serdeConfig();
partnerKey = org.apache.kafka.common.serialization.Serdes.Long();
partnerKey.configure(serdeConfig, true);
partnerValue = new SpecificAvroSerde<>();
partnerValue.configure(serdeConfig, false);
}
}
Jegliche Logik zur Erkennung von Duplikaten muss nach dem Auslesen der Daten stattfinden. Oft wird dafür eine Transformer Klasse erzeugt, in der man entweder die Processor oder Transformer API benutzt. Die Logik wird im zweiten Schritt vorgestellt.
Schritt 2: Duplikate erkennen
Nun da die Daten aus dem Topic gelesen wurden, kann festgestellt werden, welche Daten zum ersten Mal verarbeitet werden und welche Daten schon bekannt sind. Da es sich hier um sogenannte stateful Logik handelt, muss der State der Daten festgehalten werden. Dies wird in Kafka Streams in Statestores gemacht. Statestores speichern Daten als Schlüssel-Werte Paare in Kafka Topics in ihrer Byte-Repräsentation und können jederzeit abgefragt werden. Ein Statestore muss im StreamsBuilder hinzugefügt werden bevor die Topologie gebaut wird. Das kann wie fett markiert eingerichtet werden:
@Produces
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, Partner> partners = builder.stream("NAME_DES_INPUT_TOPICS",
Consumed.with(serdes.partnerKey,
serdes.partnerValue).withName("NAME_PARTNER_SOURCE"));
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("nameStateStore"),
this.serdes.partnerKey, this.serdes.partnerValue));
Topology topology = builder.build();
return topology;
}
Implementation der Duplikatserkennung
Sobald der Statestore eingerichtet ist, kann die Logik implementiert werden, um die Duplikate zu erkennen. Dabei muss man Datensätze, die zuvor schonmal verschickt wurden und daher im Statestore vorhanden sind, mit dem neuen Datensatz abgleichen. Nur Events, die entweder noch nicht geschickt wurden oder von dem letzten Datensatz abweichen werden verschickt. Mit der Prozessor API kann das so aussehen:
class MappingProcessor implements Processor<Long, Partner> {
ProcessorContext context;
private final String stateStoreName;
private KeyValueStore<Long, Partner > stateStore;
public MappingProcessor(String stateStoreName){
this.stateStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext processorContext) {
context = processorContext;
this.stateStore = (KeyValueStore<Long, Partner>)
context.getStateStore(stateStoreName);
}
@Override
public void process(Long originalKey, Partner partner) {
if(!isDuplicate(originalKey, partner)) {
//todo: write data to output Topic
}
stateStore.put(originalKey, partner);
}
@Override
public void close() {
}
boolean isDuplicate(Long originalKey, Partner partner){
Partner partnerState = stateStore.get(originalKey);
if(partnerState!=null && partner != null)
return partnerState.equals(partner);
if(partnerState== null && partner == null)
return true;
//Case: either state or current partner is null
//and the other is not null.
//This is not a duplicate!
return false;
}
}
Der Processor hat drei wichtige Funktionen. Die init Funktion, in der der Processor initialisiert wird und der Statestore angebunden wird, die isDuplicate Funktion, in der die Duplikatslogik eingebaut wird und die process Funktion, in der geprüft wird, ob ein Datensatz, der kein Duplikat ist, verschickt wird, und der Statestore auf den neusten Stand gebracht wird. Das Verschicken der Daten ist in der process Funktion noch als Todo eingetragen.
Schritt 3: Schreiben der Daten in Topics
Um die Daten zu verschicken, muss man in der Processor API den ProcessorContext benutzen, um Daten weiterzuleiten. Dabei wird ein Schlüssel-Werte Paar in eine Sink geschrieben. In der process Funktion des MappingProcessors kann so aussehen:
context.forward(originalKey, partner, To.child("Name der Sink"));
Die Sink muss dabei in der Topologie hinzugefügt werden, bevor die topologly zurück gegeben wird.
topology.addSink("Name der Sink",
OutputPartnerTopic,
serdes.myPartnerKey.serializer(),
serdes.myPartnerValue.serializer(),
"Name des Processors");
Zusammenfassung
In diesem Blogbeitrag wurde vorgestellt, wie man eine einfache Kafka Streams Applikation designed. Dabei wurde das Beispiel der Deduplikation von Datensätzen benutzt. Eine typische Kafka Streams Applikation besteht aus drei Teilen: Zunächst einen Anteil, der Daten aus einer Datenquelle, meist einem Kafka Topic, liest. Im zweiten Anteil wird die Logik implementiert, die je nach Anwendungsfall stateful oder stateless sein kann. Im dritten und letzten Anteil werden die aufbereiteten Daten weggeschrieben, meist in ein Ziel Topic.