Dieser Blogbeitrag stellt die Verwendung und die Notwendigkeit von sortierten Daten in Talend Data Integration (Talend DI) Jobs dar. Es gibt drei grundlegende Anwendungsfälle. Zum einen setzen bestimmte Transformationen eine spezifische Sortierung voraus, wie zum Beispiel das Anreichern von Informationen aus Vorgänger- oder Nachfolgerzeilen analog der SQL-typischen analytischen Funktionen bzw. Fensterfunktionen lag(…)
oder lead(…) over( partition by … order by …)
. Talend bietet keine dedizierte Komponente für diese Art Transformation an und muss mit einer Kombination aus sortierten Daten und tJavaFlex oder tMap umgesetzt werden.
Zum anderen kann die Verarbeitung von großen Datenmengen eine Sortierung von Daten erzwingen. Wenn beispielsweise ein großer Datensatz mehrfach aggregiert und/oder denormalisiert werden soll, so kann eine initiale (aufwendige) Sortierung mehrfach von den speziellen SortedRow-Komponenten verwendet werden.
Der dritte Fall ist eine gängige Anforderung der Zieldatenbank: Das Laden von sortierten Daten kann in einer Datenbank vorteilhaft sein, wenn die Tabelle die gleiche Sortierung wie der Primary Key besitzt. Auch in aktuellen Cloud-nativen Datenbaken wie z.B. Snowflake ist ein sortiertes Beladen vorteilhaft bezüglich pruning der micro-partitions bzw. clustering. Wenn eine Sortierung im Talendjob für solch einen Use Case schon gefordert ist, können wir diese auch zu unserem Vorteil im Sinne der ersten beiden Anwendungsfälle nutzen. Diese sollen im Folgenden näher betrachtet werden.
Sortierte Daten erstellen
Zunächst müssen die Daten sortiert werden. Unabhängig von der Quelle bzw. der Art der Inputkomponente lässt sich ein Datenstrom innerhalb des Talend Jobs mit der tSortRow Komponente sortieren. Es lassen sich beliebig viele Spalten aus dem Inputschema auf- oder absteigend sortieren. Entscheidend ist, ob die Sortierung rein numerisch oder alphanumerisch stattfinden soll. Bei numerischer Sortierung darf die Spalte ausschließlich numerische Inhalte beinhalten. Bei größeren Datenmengen ist die Option „sort on disc“ unter den advanced settings hilfreich. Ist diese Option aktiv, so wird die Datenmenge nicht komplett im Java Heapspace gehalten, sondern physisch „on disc“ zwischengespeichert. Die Laufzeit erhöht sich mit dieser Option zwangsläufig und zum Teil drastisch, kann aber bei begrenztem Arbeitsspeicher / Heapsize die einzige Möglichkeit sein den Job überhaupt ausführen zu können. Falls große Datenmengen on-disc sortiert werden müssen, sind on-Prem Jobserver mit schnellen (NVMe) SSDs zu empfehlen. Falls der Jobserver auf einem Hyperscaler deployt wird, sind storage-optimized Instanzen beziehungsweise Instanzen mit hohen IOPS empfehlenswert.
Bei einer Datenbanktabelle als Input kann die Query ein ORDER BY
beinhalten. Diese Auslagerung der Sortierung auf das Quellsystem macht das Sortieren innerhalb von Talend überflüssig. Ob die Sortierung auf dem Quellsystem erlaubt ist, oder bei großen Datenmengen überhaupt durchführbar ist, ist mit dem Quellsystemeigentümer zu klären und kann nicht pauschal beantwortet oder empfohlen werden.
Transformationen, die eine Sortierung voraussetzen
Vorgänger- oder Nachfolgerzeile?
Da die Bezeichnung Vorgänger- und Nachfolgerzeile bei sortierten Datenmengen und zeilenweiser Verarbeitung in Talend schnell zu Verwirrung führt, sollen sie an dieser Stelle definiert werden:
Der Vorgänger-Datensatz ist der Datensatz, der aus Sicht der Verarbeitung von Talend bereits eine Zeile zuvor verarbeitet wurde. Der Nachfolgersatz folgt.
ID | GUELTIG_AB | Talendkontext |
a | Januar 2022 | bereits verarbeitet |
b | Januar 2022 | Vorgängerzeile und bereits verarbeitet |
b | März 2022 | aktuelle Zeile |
c | Januar 2022 | Nachfolgerzeile noch nicht verarbeitet |
ID
und GUELTIG_AB
. Der dritte Datensatz ist der aktuell betrachtete bzw. derjenige der aktuell im Talendjob verarbeitet wird.Erkennen und Filtern von Dubletten
Via tMap: Die Umsetzung in einer tMap-Komponente ist auf die gleiche Art und Weise in Informatica Power Center in einer expression umsetzbar und wurde bereits in einem früheren Beitrag beschrieben.
Via tMemorizeRows: Talend bietet eine explizite Methode, auf Vorgängerzeilen zuzugreifen. Die Komponente tMemorizeRows speichert eine definierte Anzahl Zeilen zwischen. Die Konfiguration von „einer Zeile“ entspricht lediglich der aktuellen Zeile. Bei zwei Zeilen lässt sich auf die Vorgängerzeile zugreifen, usw. Für jede Spalte im Schema wird ein eigenes Java-Array bereitgestellt mit dem Namenskonzept <column_name>_tMemorizeRows_x. Dabei ist x entsprechend dem eindeutigen Namen der Komponente anzupassen. Dieses Array hat nun an erster Stelle (Arrayindex=0) den Wert aus der aktuellen Zeile und an zweiter Stelle (Arrayindex=1) den Wert aus der Vorgängerzeile, usw.
Die Abbildung zeigt ein Beispiel eines Talendjobs, welcher eine Vorgängerzeile mittels tMemorizeRows (memorize 2 rows) speichert und in einer tJavaRow-Komponente evaluiert. id_tMemorizeRows_1[1] ist in diesem Fall der Wert von id aus der Vorgängerzeile (Arrayindex=1).
// add column is_duplicate to output
if ( nb_line_tJavaRow_1 == 0 ){
output_row.is_duplicate = false;} // first row cannot be a duplicate
else {
output_row.is_duplicate = input_row.id.equals( id_tMemorizeRows_1[1] )
&& input_row.gueltig_ab.equals( gueltig_ab_tMemorizeRows_1[1] );}
Die Sortierung ist in diesem Beispiel über die Spalten ID asc
, GUELTIG_AB asc
und AENDERUNG_TS desc
durchgeführt worden, sodass im Falle einer Dublette von ID
und GUELTIG_AB
die Zeile mit dem höchsten bzw. neustem/aktuellem Änderungszeitstempel AENDERUNG_TS
übernommen wird, und die weiteren Zeilen mit älterem Änderungszeitstempel verworfen werden, indem diese markiert werden (is_duplicate = true
) und anschließend über die tMap Komponente gefiltert werden.
Lag, Lead, Row_Number, Rank und kumulative Summe
Die analytischen Funktionen lag()
, lead()
, row_number()
, rank()
, und kumaltive Summen lassen sich sehr ähnlich wie die oben beschriebene Vorgehensweise zur Dublettenerkennung in Talend nachbilden:
- Sortiere die Daten analog den Spalten in dem
OVER
statement. Wenn beispielsweise innerhalb der GruppeID
sortiert nachGUELTIG_AB
auf Vorgängerzeilen zugegriffen werden soll ->over(partition by ID order by GUELTIG_AB)
, so muss die Sortierung nachID
undGUELTIG_AB
erfolgen. - Konfiguriere die tMemorizeRows-Komponente entsprechend wie viele Zeilen zwischengespeichert werden. Bei einer Vorgängerzeile müssen 2 rows konfiguriert werden.
- Setze in einer tJavaRow- oder tMap-Komponente die neue Spalte mit Hilfe des Java-Arrays aus der tMemorizeRows-Komponente.
Die Vorgehensweise ist bei lead
und lag
identisch. Da aber nicht auf Nachfolgerzeilen, sondern nur auf Vorgängerzeilen zugegriffen werden kann, muss die lead Funktionalität durch eine absteigende Sortierung umgesetzt werden. lag
und lead
kann mit dieser Methode nicht zeitgleich durchgeführt werden. Die beiden Schritte müssen sequenziell mit einer neuen Sortierung erfolgen.
Es gibt Strategien, lag
und lead
auch in einem Schritt mit einer Sortierung durchzuführen. Dieses Vorgehen ist jedoch aufwendig und sollte nur bei großen Datenmengen in Betracht gezogen werden. Die Beschreibung der Umsetzung übersteigt die Kapazität dieses Blogbeitrags.
row_number
und rank
- Sortiere nach den
partition by
undorder by
Spalten - Konfiguriere die tMemorizeRows mit 2 Zeilen
- In einer tJavaFlex-Komponente:
- Initialisiere eine int-Variable als Sequenzwert mit 1.
- Vergleich der
partition by
Spalten mit der Vorgängerzeile, um den Gruppenwechsel zu erkennen. Bei einem Gruppenwechsel wird der Sequenzwert auf 1 zurückgesetzt. - Sequenzwert in einer output_row ausgeben.
- Bei rank: Vergleich der
order by
Spalten. Falls keine Änderung, wird der Sequenzwert nicht erhöht.
kumulative Summe
- analog
sum(value) over(partition by id order by col_order ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
- Sortiere nach den partition by und
order by
Spalten. - Konfiguriere die tMemorizeRows mit 2 Zeilen.
- In einer tJavaFlex Komponente:
- Initialisiere eine int oder float Variable mit 0.
- Vergleich der
partition by
Spalten mit der Vorgängerzeile, um den Gruppenwechsel zu erkennen. Bei einem Gruppenwechsel wird die Summe auf 0 zurückgesetzt. - Addiere den aktuellen Wert zu der Summe und gebe den Wert aus.
SQL-analoges AVG, SUM
Transformationen, die nicht nur von Vorgänger- oder Nachfolgerzeilen abhängen, müssen in einem separaten Schritt durchgeführt werden. Beispiele sind analytische Funktionen mit ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
.
Diese Aggregationen müssen in einem separaten Subjob durchgeführt werden und anschließend über einen Join dem (erneut eingelesenen) Hauptdatenfluss angereichert werden.
Sortierung für einen Performancegewinn
Talend bietet zwei auf sortierte Eingangsdaten spezialisierte Komponenten an: tAggregateSortedRow und tDenormalizeSortedRow. Diese beiden Komponenten verarbeiten den Datenstrom „im Stream“, also ohne den gesamten Datensatz zwischenspeichern zu müssen. Es kann also im Sinne der Performance der Datenverarbeitung sinnvoll sein den Datenstrom zuerst zu sortieren und dann mehrfach durch die zwei oben genannten Komponenten und eventuell zusätzlich in Verbindung mit den zuvor beschriebenen Methoden zu Vorgänger-/Nachfolgerbeziehungen zu verarbeiten.
Caveats and missing features
Die beiden SortedRow-Komponenten müssen mit der erwarteten Zeilenanzahl konfiguriert werden. Das hat zur Folge, dass bei der Verwendung dieser Komponenten zuvor die Gesamtanzahl der Zeilen zu bestimmen ist. Entweder ist diese über eine globalMap-Variable (nb_rows) bekannt oder es muss durch eine separate Abfrage ermittelt werden. Die Anzahl ist genau zu bestimmen, da sonst mit Datenverlust zu rechnen ist (!).
Es ist nicht möglich, durch eine Sortierung die Performance von Joins zu verbessern oder durch eine Sortierung das Aufteilen und Joinen eines Datenstroms zu ermöglichen, wie es z.B. mit Informatica IDMC oder IBM DataStage möglich ist.
Fazit
Hat Ihnen dieser Beitrag in unserem saracus Blog gefallen? Dann schreiben Sie uns doch und geben Feedback. Haben Sie einen Wunsch für ein zukünftiges Thema? Denkbar wäre ein talendspezifischer Beitrag zu verschieden Teststrategien.