Ein klas­si­sches Data Ware­house ist dar­auf aus­ge­legt, den Anwen­dern tags­über kon­sis­tente Unter­neh­mens­da­ten für Berichte und Ana­ly­sen zen­tral zur Ver­fü­gung zu stel­len. Um Kon­flikte zu ver­mei­den, wer­den aktu­elle ope­ra­tive Daten in der Regel tags­über gesam­melt und die Inte­gra­tion fin­det nachts statt, wenn keine Anwen­der auf das Data Ware­house zugrei­fen. Die Belas­tung der Com­pu­ter­res­sour­cen ist somit asym­me­trisch: in der Nacht sehr hoch, wenn große Daten­men­gen ange­lie­fert, auf ver­schie­dene Arten trans­for­miert und anschlie­ßend in die Daten­ban­ken gela­den wer­den, tags­über dage­gen abhän­gig von den Abfra­gen und Aus­wer­tun­gen der Anwender.

Aus die­sem Grund kann es sinn­voll sein, das Data Ware­house zum Teil oder Voll­stän­dig in die Cloud zu migrie­ren. Zum einen kön­nen Anbie­ter von Cloud­lö­sun­gen ihre Rechen­ka­pa­zi­tä­ten kos­ten­güns­tig anbie­ten, da in gro­ßen Rechen­clus­tern Ska­len­ef­fekte zum Tra­gen kom­men, zum ande­ren wer­den die Kapa­zi­tä­ten elas­tisch und ska­lier­bar ange­bo­ten und nach Nut­zungs­zeit abge­rech­net. Um die Mög­lich­kei­ten der Cloud effi­zi­ent zu nut­zen, kann im Zuge des­sen auch die klas­si­sche Batch­ver­ar­bei­tung neu orga­ni­siert wer­den. Eine erste Alter­na­tive ist das Stream Pro­ces­sing, bei dem die Ver­ar­bei­tung bei Anlie­fe­rung in Echt­zeit erfolgt, eine zweite das Micro-batch Pro­ces­sing als Hybrid aus bei­den Ver­fah­ren, bei dem kleine Bat­ches mit kur­zer zeit­li­cher Ver­zö­ge­rung ver­ar­bei­tet wer­den. Der Migra­ti­ons­auf­wand ist hier­bei gering, wenn die bestehende Logik kaum ange­passt wer­den muss und even­tu­ell vor­han­dene Trans­for­ma­ti­ons­stre­cken über­nom­men wer­den kön­nen. Ein sol­ches Ver­fah­ren wird hier anhand von Azure Syn­apse Ana­ly­tics dargestellt.

Was ist Syn­apse Analytics?

Inner­halb der Azure-Land­schaft bie­tet Syn­apse Ana­ly­tics eine Platt­form für Daten­in­te­gra­tion und Data Ware­housing. Es bestehen Mög­lich­kei­ten zur direk­ten Anbin­dung von SQL- oder Azure Cos­mos-Daten­ban­ken sowie dem Azure Data Lake Sto­rage, alter­na­tiv kön­nen viel­fäl­tige andere Quel­len – wie zum Bei­spiel MariaDB, Ora­cle, Post­greSQL oder Mon­goDB – über Kon­nek­to­ren ange­bun­den wer­den, ähn­lich zur Azure Data Fac­tory. Auch die Inte­gra­tion ori­en­tiert sich an der Data Fac­tory und fin­det mit­tels ser­ver­lo­ser oder dedi­zier­ter SQL-Pools statt, im Bereich Big Data über ser­ver­lose Spark-Pools.

Der Daten­fluss wird durch Pipe­lines gesteu­ert, inner­halb derer ver­schie­dene Akti­vi­tä­ten logisch zusam­men­ge­fasst sind. Die Pipe­lines kön­nen unab­hän­gig von­ein­an­der arbei­ten, aber auch zu kom­ple­xen Net­zen ver­knüpft und orches­triert wer­den. Dar­über hin­aus ermög­li­chen Pipe­lines einen struk­tu­rier­ten Deploymentprozess.

Durch die Ver­net­zung inner­halb der Azure-Land­schaft ist es ein Leich­tes, andere Ser­vices zu nut­zen und etwa Azure Active Direc­tory für Iden­ti­täts­ma­nage­ment und Authen­ti­fi­zie­rung zu nut­zen oder die Daten aus dem Ware­house in PowerBI auf­zu­be­rei­ten. In die­sem Bei­spiel wird das Azure Event­Grid genutzt, um die Inte­gra­tion zu automatisieren.

Micro-batch Pro­ces­sing und ELT in der Cloud

Der im Fol­gen­den beschrie­bene Ver­ar­bei­tungs­pro­zess unter­schei­det sich in zwei Arten vom oben skiz­zier­ten. Ers­tens wer­den in der klas­si­schen Batch­ver­ar­bei­tung die Daten gesam­melt und gemein­sam ver­ar­bei­tet, häu­fig über Nacht oder in noch grö­ße­ren Abstän­den und ist daher mit hoher Latenz ver­bun­den. Wenn Daten dage­gen schnell zur Ver­fü­gung ste­hen sol­len, ist ein Stream Pro­ces­sing erfor­der­lich. Oft kommt es aber nicht auf jede Mil­li­se­kunde an, son­dern es reicht, die Daten in kur­zen Inter­val­len oder bei Über­schrei­ten eines Schwel­len­wer­tes in einem Micro-batch zu ver­ar­bei­ten. Somit müs­sen die Rechen­res­sour­cen nicht per­ma­nent bereit­ste­hen. Gleich­zei­tig wird ein even­tu­el­ler Over­head zum Bereit­stel­len gering gehal­ten, da das Sys­tem nicht für jeden Ein­zel­satz anlau­fen muss. Wie groß ein Micro-batch gewählt wird, hängt von den Anfor­de­run­gen und der Infrastruktur ab.

Die Ver­ar­bei­tung geschieht klas­si­scher­weise in einem ETL-Pro­zess, in dem die Daten aus einem Quell­sys­tem zunächst extra­hiert, dann trans­for­miert und auf­be­rei­tet, schließ­lich in die Ziel­da­ten­bank gela­den wer­den. Dage­gen wird beim ELT-Pro­zess die Rei­hen­folge geän­dert und die Daten zunächst in die Ziel­da­ten­bank gela­den und dann trans­for­miert. Dadurch ste­hen die Roh­da­ten schnel­ler zur Ver­fü­gung und kön­nen, wenn es für Ana­ly­sen von Vor­teil ist, sepa­rat von den trans­for­mier­ten Daten gespei­chert und bereit­ge­stellt wer­den. Ein wei­te­rer Vor­teil der Sepa­ra­tion von Laden und Trans­for­ma­tion ist, dass beide Pro­zesse par­al­lel gesche­hen kön­nen und die Trans­for­ma­tion in der Cloud leicht den Anfor­de­run­gen ent­spre­chend ska­liert wer­den kann. Im klas­si­schen Betrieb dage­gen ist es bei wach­sen­den Daten­men­gen not­wen­dig, wenn zum Bei­spiel die Ver­ar­bei­tung nicht mehr im zur Ver­fü­gung ste­hen­den Zeit­fens­ter mög­lich ist, die Rechen­leis­tung auf­wen­dig zu erwei­tern. Die Leis­tungs­fä­hig­keit muss sich in der Cloud also nicht mehr daran ori­en­tie­ren, was als Maxi­mum benö­tigt wird.

Ein­rich­ten von Synapse

Ein sol­cher Pro­zess soll nun in Syn­apse Ana­ly­tics abge­bil­det wer­den. Zunächst wird ein Syn­apse-Arbeits­be­reich erzeugt, der einem Abon­ne­ment und einer Res­sour­cen­gruppe zuge­ord­net ist. Zudem wird ein Data-Lake-Sto­rage ange­legt, des­sen Name Azure-weit ein­deu­tig sein muss, sowie ein Datei­sys­tem für die Daten des Arbeitsbereiches.

Bild 1 Automatisiertes Micro-batch Processing
Abbil­dung 1 Erstel­lung eines Syn­apse Arbeitsbereichs

Die Bereit­stel­lung erfolgt auto­ma­ti­siert, aller­dings sollte im Anschluss über­prüft wer­den, ob im Abon­ne­ment alle erfor­der­li­chen Res­sour­cen­an­bie­ter regis­triert wor­den sind. Für das auto­ma­ti­sierte Star­ten von Pipe­lines wird das Event­Grid ver­wen­det und die Pipe­line basiert auf der Data­Fac­tory. Wenn einer der Dienste nicht akti­viert ist, kommt es im Fol­gen­den zu Fehlermeldungen.

Ein­rich­ten der Pipelines

Der Arbeits­be­reich steht anschlie­ßend als Web­ober­flä­che zur Ver­fü­gung, der Link steht in der Über­sicht der Syn­apse-Res­source. Die Ober­flä­che ist in ver­schie­dene Berei­che ein­ge­teilt: einer für die Anbin­dung von Daten­ban­ken, einer zum Ent­wi­ckeln von SQL-Skrip­ten, Spark-Note­books oder Daten­flüs­sen, einer für die Inte­gra­tion durch Pipe­lines. Dar­über hin­aus gibt es einen Moni­tor-Bereich zur Über­sicht über die Aus­füh­rung von Pipe­lines oder Trig­gern sowie Akti­vi­tä­ten der Spark- oder SQL-Pools.

In die­sem Bei­spiel soll die Ver­ar­bei­tung durch Spark-Note­books vor­ge­nom­men wer­den, also muss im Bereich “Ent­wi­ckeln” ein neues Note­book ange­legt wer­den. Im ers­ten Schritt wird ein von Azure bereit­ge­stell­ter Test­da­ten­satz von Taxi­fah­ren in New York ver­wen­det, der im Par­quet-For­mat vor­liegt. Die­ses For­mat ist spal­ten­ori­en­tiert und ermög­licht eine effi­zi­ente Ver­ar­bei­tung und Spei­che­rung von Daten. Als Spra­chen sind im Note­book PySpark, Spark, .NET Spark und Spark SQL mög­lich, für das Bei­spiel wird PySpark verwendet.

Ers­tes Note­book: Bereit­stel­len der Testdaten

Der erste Schritt stellt Test­da­ten bereit und sieht wie folgt aus:

start_date = parser.parse(‘2009-02-10’)
end_date = parser.parse(‘2009-02-20’)
df = NycTlcYellow(start_date=start_date, end_date=end_date).to_spark_dataframe()

Hier­durch wird ein Spark Data­frame erzeugt, aus dem nun ein­zelne Tage aus­ge­wählt und als sepa­rate Dateien gespei­chert wer­den. Das Zeit­fens­ter ist schmal gewählt, um nur wenige Test­sätze für jeden Tag zu erhal­ten. In der Pra­xis wür­den die Daten aus einem ope­ra­ti­ven Sys­tem dyna­misch ange­lie­fert wer­den, was für die­ses Bei­spiel aller­dings uner­heb­lich ist, hier wer­den ledig­lich drei Sätze über eine Schleife erzeugt:

for i in range(3):
    timestamp = datetime.datetime.now().strftime(“%Y-%m-%d_%H-%M-%S”)
    date_from = datetime.datetime(2009, 2, 11, 10, 0, 0) + timedelta(days = i)
    date_to = datetime.datetime(2009, 2, 11, 12, 0, 0) + timedelta(days = i)
    data_path = “abfss://%s@%s.dfs.core.windows.net/%s” % 
                (data_container_name, account_name, data_relative_path + 
                timestamp + “.data”)
    df_filtered = df.filter(date_from <= df[“tpepPickupDateTime”])
                  .filter(df[“tpepPickupDateTime”] <= date_to)
    df_filtered.write.parquet(data_path, mode = “overwrite”)

Der Pfad inner­halb des Data Lakes ist para­me­tri­siert über container_name, account_name und data_relative_path. Zudem wird ein Zeit­stem­pel ange­hängt, um die Daten­sätze zu unter­schei­den. Über Fil­ter wird das pas­sende Zeit­in­ter­vall ausgewählt.

Zusätz­lich wer­den Meta­da­ten abge­legt, die den Zeit­punkt der Lie­fe­rung, den letz­ten Zugriff, die Anzahl der ent­hal­te­nen Daten­sätze sowie ein Kenn­zei­chen, ob die Daten bereits ein­ge­le­sen wur­den, ent­hal­ten. Dies geschieht in einem sepa­ra­ten Con­tai­ner, der Pfad ist eben­falls parametrisiert.

Zwei­tes Note­book: Laden der Testdaten

Der zweite Teil besteht dar­aus, die zuvor bereit­ge­stell­ten Daten ent­ge­gen­zu­neh­men und gesam­melt zu laden. Die­ses Note­book wird auto­ma­ti­siert gestar­tet, wenn im Data Lake eine neue Datei ange­legt wird. Als ers­tes wer­den die Meta­da­ten aus­ge­le­sen und die Dateien aus­ge­wählt, die neu gelie­fert und noch nicht ver­ar­bei­tet wurden.

Über einen Schwel­len­wert wird die Größe des Micro-bat­ches gesteu­ert. Inder Pra­xis wäre das Ziel der Ver­ar­bei­tung das Data Ware­house, hier wer­den die Daten­sätze gesam­melt in einem Data­frame abge­legt. Wenn der Schwel­len­wert über­schrit­ten ist, wird zunächst ein lee­res Data­frame anhand eines zuvor gespei­cher­ten Sche­mas angelegt.

if df_meta.agg({“datasets” : “sum”}).first()[0] > dataset_threshold:
    schema_path = “abfss://%s@%s.dfs.core.windows.net/%s” % 
                  (meta_container_name, account_name, “nyctlc1.json”)
    schema_txt = json.loads(str(schema_rdd.collect()[0][1]))
    schema = StructType.fromJson(schema_txt)
    df_total = spark.createDataFrame([], schema)

Anschlie­ßend wer­den alle noch nicht gele­se­nen Par­quet-Dateien durch­lau­fen und an das leere Data­frame ange­fügt. In den Meta­da­ten wird dabei der Daten­satz auf „read“ gesetzt und der Zeit­stem­pel des Zugriffs aktualisiert.

for timestamp in df_meta.select(df_metadata[“created”]).collect()[0]:
    timestamp = timestamp[0]
    data_path = “abfss://%s@%s.dfs.core.windows.net/%s” % (data_container_name, 
                account_name, data_relative_path + timestamp + “.data”)
    df_new = spark.read.parquet(data_path)
    df_total = df_total.union(df_new)
    timestamp_new = datetime.datetime.now().strftime(“%Y-%m-%d_%H-%M-%S”)
    df_meta_update = df_meta.filter(df_meta[“created”] == timestamp)
                     .withColumn(“read”, lit(True)).withColumn(“accessed”,
                     lit(timestamp_new))
    df_meta = df_meta.filter(df_meta[“created”] != timestamp)
              .union(df_meta_update)
    out_path = “abfss://%s@%s.dfs.core.windows.net/%s” % (out_container_name, 
               account_name, out_relative_path)
    df_total.write.parquet(out_path, mode = “append”)

Auf­bau der Pipelines

Die bei­den Note­books wer­den über eine Akti­vi­tät jeweils zunächst in eine Pipe­line ein­ge­bun­den und para­me­tri­siert. Anschlie­ßend kön­nen die Varia­blen als dyna­mi­scher Inhalt, also als einer der Pipe­line zuge­ord­ne­ten Varia­ble, ver­wen­det werden.

Bild 2 Automatisiertes Micro-batch Processing
Abbil­dung 2 Variablendefinition

Für die zweite Pipe­line, die die Test­da­ten lädt, wird ein Trig­ger gesetzt, der die Aus­füh­rung auto­ma­tisch star­tet, sobald ein Blob im Data Lake erstellt wird. In der Kon­fi­gu­ra­tion kann über den Blob­pfad selek­tiert wer­den, wann ein Ereig­nis aus­ge­löst wird:

Bild 3 Automatisiertes Micro-batch Processing
Abbil­dung 3 Trig­ger der zwei­ten Pipeline

Vom Spei­cher­konto wird eine Benach­rich­ti­gung an das Event­Grid gesen­det, wenn eine Datei ange­legt wird. Das Event­Grid ist mit Azure ver­knüpft, so dass der Trig­ger durch die­ses Ereig­nis aus­ge­löst wird.

Ergeb­nis

Mit den gewähl­ten Para­me­tern wer­den drei Par­quet-Dateien mit jeweils knapp über 40.000 Daten­sät­zen geschrie­ben, wie der Aus­zug der ange­leg­ten Meta­da­ten zeigt:

+-------------------+-------------------+--------+-----+-------+
|           accessed|            created|datasets| read|version|
+-------------------+-------------------+--------+-----+-------+
|2021-04-30_06-35-58|2021-04-30_06-35-58|   43944|false|nyctcl1|
|2021-04-30_06-37-06|2021-04-30_06-37-06|   45148|false|nyctcl1|
|2021-04-30_06-35-08|2021-04-30_06-35-08|   41132|false|nyctcl1|
+-------------------+-------------------+--------+-----+-------+

Dabei wird bei jeder Datei ein Event erzeugt, das die Pipe­line mit dem zwei­ten Note­book star­tet. Da jedoch der Schwel­len­wert auf 100.000 gesetzt ist, fin­det eine Ver­ar­bei­tung erst mit der drit­ten Datei statt.

Bild 5 Automatisiertes Micro-batch Processing
Abbil­dung 4 Durch­lau­fene Tasks

Im Anschluss sind die gelie­fer­ten Dateien in eine zusam­men­ge­fasst und wür­den in der Pra­xis wei­ter ver­ar­bei­tet wer­den, außer­dem sind in den Meta­da­ten der Zeit­punkt des Zugrif­fes sowie der Sta­tus „read“ aktualisiert.

+-------------------+-------------------+--------+----+-------+
|           accessed|            created|datasets|read|version|
+-------------------+-------------------+--------+----+-------+
|2021-04-30_06-35-58|2021-04-30_06-35-58|   43944|true|nyctcl1|
|2021-04-30_06-37-06|2021-04-30_06-37-06|   45148|true|nyctcl1|
|2021-04-30_06-35-08|2021-04-30_06-35-08|   41132|true|nyctcl1|
+-------------------+-------------------+--------+----+-------+