Die­ser Blog­bei­trag stellt die Ver­wen­dung und die Not­wen­dig­keit von sor­tier­ten Daten in Tal­end Data Inte­gra­tion (Tal­end DI) Jobs dar. Es gibt drei grund­le­gende Anwen­dungs­fälle. Zum einen set­zen bestimmte Trans­for­ma­tio­nen eine spe­zi­fi­sche Sor­tie­rung vor­aus, wie zum Bei­spiel das Anrei­chern von Infor­ma­tio­nen aus Vor­gän­ger- oder Nach­fol­ger­zei­len ana­log der SQL-typi­schen ana­ly­ti­schen Funk­tio­nen bzw. Fens­ter­funk­tio­nen lag(…) oder lead(…) over( partition by … order by …). Tal­end bie­tet keine dedi­zierte Kom­po­nente für diese Art Trans­for­ma­tion an und muss mit einer Kom­bi­na­tion aus sor­tier­ten Daten und tJa­va­Flex oder tMap umge­setzt werden.

Zum ande­ren kann die Ver­ar­bei­tung von gro­ßen Daten­men­gen eine Sor­tie­rung von Daten erzwin­gen. Wenn bei­spiels­weise ein gro­ßer Daten­satz mehr­fach agg­re­giert und/oder denor­ma­li­siert wer­den soll, so kann eine initiale (auf­wen­dige) Sor­tie­rung mehr­fach von den spe­zi­el­len Sor­te­d­Row-Kom­po­nen­ten ver­wen­det werden.

Der dritte Fall ist eine gän­gige Anfor­de­rung der Ziel­da­ten­bank: Das Laden von sor­tier­ten Daten kann in einer Daten­bank vor­teil­haft sein, wenn die Tabelle die glei­che Sor­tie­rung wie der Pri­mary Key besitzt. Auch in aktu­el­len Cloud-nati­ven Daten­ba­ken wie z.B. Snow­flake ist ein sor­tier­tes Bela­den vor­teil­haft bezüg­lich pru­ning der micro-par­ti­ti­ons bzw. clus­te­ring. Wenn eine Sor­tie­rung im Tal­end­job für solch einen Use Case schon gefor­dert ist, kön­nen wir diese auch zu unse­rem Vor­teil im Sinne der ers­ten bei­den Anwen­dungs­fälle nut­zen. Diese sol­len im Fol­gen­den näher betrach­tet werden.

Sor­tierte Daten erstellen

Zunächst müs­sen die Daten sor­tiert wer­den. Unab­hän­gig von der Quelle bzw. der Art der Input­kom­po­nente lässt sich ein Daten­strom inner­halb des Tal­end Jobs mit der tSor­tRow Kom­po­nente sor­tie­ren. Es las­sen sich belie­big viele Spal­ten aus dem Input­schema auf- oder abstei­gend sor­tie­ren. Ent­schei­dend ist, ob die Sor­tie­rung rein nume­risch oder alpha­nu­me­risch statt­fin­den soll. Bei nume­ri­scher Sor­tie­rung darf die Spalte aus­schließ­lich nume­ri­sche Inhalte beinhal­ten. Bei grö­ße­ren Daten­men­gen ist die Option „sort on disc“ unter den advan­ced set­tings hilf­reich. Ist diese Option aktiv, so wird die Daten­menge nicht kom­plett im Java Heap­space gehal­ten, son­dern phy­sisch „on disc“ zwi­schen­ge­spei­chert. Die Lauf­zeit erhöht sich mit die­ser Option zwangs­läu­fig und zum Teil dras­tisch, kann aber bei begrenz­tem Arbeits­spei­cher / Heap­size die ein­zige Mög­lich­keit sein den Job über­haupt aus­füh­ren zu kön­nen. Falls große Daten­men­gen on-disc sor­tiert wer­den müs­sen, sind on-Prem Job­ser­ver mit schnel­len (NVMe) SSDs zu emp­feh­len. Falls der Job­ser­ver auf einem Hypers­ca­ler deployt wird, sind sto­rage-opti­mi­zed Instan­zen bezie­hungs­weise Instan­zen mit hohen IOPS empfehlenswert.

Bei einer Daten­bank­ta­belle als Input kann die Query ein ORDER BY beinhal­ten. Diese Aus­la­ge­rung der Sor­tie­rung auf das Quell­sys­tem macht das Sor­tie­ren inner­halb von Tal­end über­flüs­sig. Ob die Sor­tie­rung auf dem Quell­sys­tem erlaubt ist, oder bei gro­ßen Daten­men­gen über­haupt durch­führ­bar ist, ist mit dem Quell­sys­tem­ei­gen­tü­mer zu klä­ren und kann nicht pau­schal beant­wor­tet oder emp­foh­len werden.

Trans­for­ma­tio­nen, die eine Sor­tie­rung voraussetzen

Vor­gän­ger- oder Nachfolgerzeile?

Da die Bezeich­nung Vor­gän­ger- und Nach­fol­ger­zeile bei sor­tier­ten Daten­men­gen und zei­len­wei­ser Ver­ar­bei­tung in Tal­end schnell zu Ver­wir­rung führt, sol­len sie an die­ser Stelle defi­niert werden:

Der Vor­gän­ger-Daten­satz ist der Daten­satz, der aus Sicht der Ver­ar­bei­tung von Tal­end bereits eine Zeile zuvor ver­ar­bei­tet wurde. Der Nach­fol­ger­satz folgt.

IDGUELTIG_ABTal­end­kon­text
aJanuar 2022bereits ver­ar­bei­tet
bJanuar 2022Vor­gän­ger­zeile und bereits verarbeitet
bMärz 2022aktu­elle Zeile
cJanuar 2022Nach­fol­ger­zeile noch nicht verarbeitet
Daten­satz sor­tiert nach ID und GUELTIG_AB. Der dritte Daten­satz ist der aktu­ell betrach­tete bzw. der­je­nige der aktu­ell im Tal­end­job ver­ar­bei­tet wird.

Erken­nen und Fil­tern von Dubletten

Via tMap: Die Umset­zung in einer tMap-Kom­po­nente ist auf die glei­che Art und Weise in Infor­ma­tica Power Cen­ter in einer expres­sion umsetz­bar und wurde bereits in einem frü­he­ren Bei­trag beschrieben.

Via tMe­mo­ri­ze­Rows: Tal­end bie­tet eine expli­zite Methode, auf Vor­gän­ger­zei­len zuzu­grei­fen. Die Kom­po­nente tMe­mo­ri­ze­Rows spei­chert eine defi­nierte Anzahl Zei­len zwi­schen. Die Kon­fi­gu­ra­tion von „einer Zeile“ ent­spricht ledig­lich der aktu­el­len Zeile. Bei zwei Zei­len lässt sich auf die Vor­gän­ger­zeile zugrei­fen, usw. Für jede Spalte im Schema wird ein eige­nes Java-Array bereit­ge­stellt mit dem Namens­kon­zept <column_name>_tMemorizeRows_x. Dabei ist x ent­spre­chend dem ein­deu­ti­gen Namen der Kom­po­nente anzu­pas­sen. Die­ses Array hat nun an ers­ter Stelle (Arrayindex=0) den Wert aus der aktu­el­len Zeile und an zwei­ter Stelle (Arrayindex=1) den Wert aus der Vor­gän­ger­zeile, usw.

flow of data in Talend Data Integration

Die Abbil­dung zeigt ein Bei­spiel eines Tal­end­jobs, wel­cher eine Vor­gän­ger­zeile mit­tels tMe­mo­ri­ze­Rows (memo­rize 2 rows) spei­chert und in einer tJa­va­Row-Kom­po­nente eva­lu­iert. id_tMemorizeRows_1[1] ist in die­sem Fall der Wert von id aus der Vor­gän­ger­zeile (Arrayindex=1).

// add column is_duplicate to output
if ( nb_line_tJavaRow_1 == 0 ){
	output_row.is_duplicate = false;} // first row cannot be a duplicate
else {
	output_row.is_duplicate = input_row.id.equals( id_tMemorizeRows_1[1] ) 
	    && input_row.gueltig_ab.equals( gueltig_ab_tMemorizeRows_1[1] );}

Die Sor­tie­rung ist in die­sem Bei­spiel über die Spal­ten ID asc, GUELTIG_AB asc und AENDERUNG_TS desc durch­ge­führt wor­den, sodass im Falle einer Dublette von ID und GUELTIG_AB die Zeile mit dem höchs­ten bzw. neustem/aktuellem Ände­rungs­zeit­stem­pel AENDERUNG_TS über­nom­men wird, und die wei­te­ren Zei­len mit älte­rem Ände­rungs­zeit­stem­pel ver­wor­fen wer­den, indem diese mar­kiert wer­den (is_duplicate = true) und anschlie­ßend über die tMap Kom­po­nente gefil­tert werden.

Lag, Lead, Row_Number, Rank und kumu­la­tive Summe

Die ana­ly­ti­schen Funk­tio­nen lag(), lead(), row_number(), rank(), und kumal­tive Sum­men las­sen sich sehr ähn­lich wie die oben beschrie­bene Vor­ge­hens­weise zur Dublet­ten­er­ken­nung in Tal­end nachbilden:

  • Sor­tiere die Daten ana­log den Spal­ten in dem OVER state­ment. Wenn bei­spiels­weise inner­halb der Gruppe ID sor­tiert nach GUELTIG_AB auf Vor­gän­ger­zei­len zuge­grif­fen wer­den soll -> over(partition by ID order by GUELTIG_AB), so muss die Sor­tie­rung nach ID und GUELTIG_AB erfol­gen.
  • Kon­fi­gu­riere die tMe­mo­ri­ze­Rows-Kom­po­nente ent­spre­chend wie viele Zei­len zwi­schen­ge­spei­chert wer­den. Bei einer Vor­gän­ger­zeile müs­sen 2 rows kon­fi­gu­riert werden.
  • Setze in einer tJa­va­Row- oder tMap-Kom­po­nente die neue Spalte mit Hilfe des Java-Arrays aus der tMemorizeRows-Komponente.

Die Vor­ge­hens­weise ist bei lead und lag iden­tisch. Da aber nicht auf Nach­fol­ger­zei­len, son­dern nur auf Vor­gän­ger­zei­len zuge­grif­fen wer­den kann, muss die lead Funk­tio­na­li­tät durch eine abstei­gende Sor­tie­rung umge­setzt wer­den. lag und lead kann mit die­ser Methode nicht zeit­gleich durch­ge­führt wer­den. Die bei­den Schritte müs­sen sequen­zi­ell mit einer neuen Sor­tie­rung erfolgen.

Es gibt Stra­te­gien, lag und lead auch in einem Schritt mit einer Sor­tie­rung durch­zu­füh­ren. Die­ses Vor­ge­hen ist jedoch auf­wen­dig und sollte nur bei gro­ßen Daten­men­gen in Betracht gezo­gen wer­den. Die Beschrei­bung der Umset­zung über­steigt die Kapa­zi­tät die­ses Blogbeitrags.

row_number und rank
  • Sor­tiere nach den partition by und order by Spalten
  • Kon­fi­gu­riere die tMe­mo­ri­ze­Rows mit 2 Zeilen
  • In einer tJavaFlex-Komponente: 
    • Initia­li­siere eine int-Varia­ble als Sequenz­wert mit 1.
    • Ver­gleich der partition by Spal­ten mit der Vor­gän­ger­zeile, um den Grup­pen­wech­sel zu erken­nen. Bei einem Grup­pen­wech­sel wird der Sequenz­wert auf 1 zurückgesetzt.
    • Sequenz­wert in einer output_row ausgeben.
    • Bei rank: Ver­gleich der order by Spal­ten. Falls keine Ände­rung, wird der Sequenz­wert nicht erhöht.
kumu­la­tive Summe
  • ana­log sum(value) over(partition by id order by col_order ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
  • Sor­tiere nach den par­ti­tion by und order by Spalten.
  • Kon­fi­gu­riere die tMe­mo­ri­ze­Rows mit 2 Zeilen.
  • In einer tJa­va­Flex Komponente: 
    • Initia­li­siere eine int oder float Varia­ble mit 0.
    • Ver­gleich der partition by Spal­ten mit der Vor­gän­ger­zeile, um den Grup­pen­wech­sel zu erken­nen. Bei einem Grup­pen­wech­sel wird die Summe auf 0 zurückgesetzt.
    • Addiere den aktu­el­len Wert zu der Summe und gebe den Wert aus.

SQL-ana­lo­ges AVG, SUM

Trans­for­ma­tio­nen, die nicht nur von Vor­gän­ger- oder Nach­fol­ger­zei­len abhän­gen, müs­sen in einem sepa­ra­ten Schritt durch­ge­führt wer­den. Bei­spiele sind ana­ly­ti­sche Funk­tio­nen mit ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.

Diese Aggre­ga­tio­nen müs­sen in einem sepa­ra­ten Sub­job durch­ge­führt wer­den und anschlie­ßend über einen Join dem (erneut ein­ge­le­se­nen) Haupt­da­ten­fluss ange­rei­chert werden.

Sor­tie­rung für einen Performancegewinn

Tal­end bie­tet zwei auf sor­tierte Ein­gangs­da­ten spe­zia­li­sierte Kom­po­nen­ten an: tAg­gre­ga­te­Sor­te­d­Row und tDe­nor­ma­li­ze­Sor­te­d­Row. Diese bei­den Kom­po­nen­ten ver­ar­bei­ten den Daten­strom „im Stream“, also ohne den gesam­ten Daten­satz zwi­schen­spei­chern zu müs­sen. Es kann also im Sinne der Per­for­mance der Daten­ver­ar­bei­tung sinn­voll sein den Daten­strom zuerst zu sor­tie­ren und dann mehr­fach durch die zwei oben genann­ten Kom­po­nen­ten und even­tu­ell zusätz­lich in Ver­bin­dung mit den zuvor beschrie­be­nen Metho­den zu Vor­gän­ger-/Nach­fol­ger­be­zie­hun­gen zu verarbeiten.

Caveats and miss­ing features

Die bei­den Sor­te­d­Row-Kom­po­nen­ten müs­sen mit der erwar­te­ten Zei­len­an­zahl kon­fi­gu­riert wer­den. Das hat zur Folge, dass bei der Ver­wen­dung die­ser Kom­po­nen­ten zuvor die Gesamt­an­zahl der Zei­len zu bestim­men ist. Ent­we­der ist diese über eine glo­bal­Map-Varia­ble (nb_rows) bekannt oder es muss durch eine sepa­rate Abfrage ermit­telt wer­den. Die Anzahl ist genau zu bestim­men, da sonst mit Daten­ver­lust zu rech­nen ist (!).

Es ist nicht mög­lich, durch eine Sor­tie­rung die Per­for­mance von Joins zu ver­bes­sern oder durch eine Sor­tie­rung das Auf­tei­len und Joinen eines Daten­stroms zu ermög­li­chen, wie es z.B. mit Infor­ma­tica IDMC oder IBM Data­S­tage mög­lich ist.

Fazit

Hat Ihnen die­ser Bei­trag in unse­rem saracus Blog gefal­len? Dann schrei­ben Sie uns doch und geben Feed­back. Haben Sie einen Wunsch für ein zukünf­ti­ges Thema? Denk­bar wäre ein tal­end­spe­zi­fi­scher Bei­trag zu ver­schie­den Teststrategien.