Einleitung
In unserem ersten Artikel haben wir das Fundament gelegt, um die Daten-Warehouse-Landschaft im Zeitalter der Cloud-Migration zu revolutionieren. Wir haben die grundlegenden Konzepte eines generischen, metadatengetriebenen Data Warehouse Frameworks erkundet, das speziell auf die Anforderungen der Cloud-Umgebung zugeschnitten ist. Jetzt ist es an der Zeit, dieses Konzept in die Tat umzusetzen.
Im zweiten Teil unserer Blog-Serie werden wir tiefer eintauchen und uns auf die Implementierung konzentrieren. Die Staging- und Transformations-Schichten stehen dabei im Mittelpunkt. Wir werden Ihnen einen praxisnahen Einblick in die Umsetzung geben, wie Daten in die Staging-Schicht gelangen, wie sie dort versioniert und historisiert werden, und wie die Transformationsschicht dazu beiträgt, Daten in wertvolle Erkenntnisse zu verwandeln.
Dieser Artikel wird Ihnen zeigen, wie unser Data Warehouse Framework als Katalysator fungiert und den Weg für eine nahtlose und effiziente Datenverarbeitung ebnet. Wir sind bereit, in die praktische Welt der Cloud-Migration und Datenoptimierung einzutauchen. Willkommen im zweiten Teil unserer aufregenden Reise!
Generelles
Wie bereits im ersten Teil unserer Blog-Serie erläutert, bildet Databricks die zentrale Komponente unseres Data Warehouse Frameworks für die Datenverarbeitung. Neben der Standardimplementierung von Spark in Scala werden auch die Python- und R‑Bindings (pySpark und sparkR) unterstützt. Aufgrund seiner weit verbreiteten Popularität, seiner herausragenden Nutzerfreundlichkeit und seiner Fülle an Funktionen fiel die Wahl der Programmiersprache auf Python. Beachten Sie, dass die in diesem Artikel vorkommenden Code-Beispiele oft problemlos in eine der anderen unterstützten Sprachen übersetzt werden können.
Die Staging Schicht – Von der Datenerfassung zur Historisierung
In unserem Data Warehouse Framework besteht der Eingangsprozess aus zwei direkt aufeinanderfolgenden Schritten: dem Laden der Daten in die Temporary Staging Area (TSA) und die anschließende Historisierung in der Global Staging Area (GSA).
Validierung und Einlesen in die Temporary Staging Area
Die TSA dient dazu Daten aus den unterschiedlichsten Quellen in das Data Warehouse zu laden und gleichzeitig eine erste, technische Schema-Validierung durchzuführen. Die Beladung ist modular aufgebaut und kann flexibel um neue Quellen erweitert und durch Metadaten gesteuert werden. Beispielsweise kann für Dateilieferungen festgelegt werden, ob eine zusätzliche .md5 oder .prot Datei geliefert und abgeglichen werden muss, oder welche Trennzeichen in einer CSV-Datei vorhanden sind.
Das nachfolgende Code-Beispiel illustriert, wie mühelos das Einlesen von CSV-Daten in Databricks unter Nutzung von nativen Spark-Operationen und spezifischen Databricks-Validierungen durchgeführt werden kann. Die als fehlerhaft erkannten Daten können mit einer anschließenden Fehlerbehandlung weiter verarbeitet werden, wohingegen die anderen Daten weiteren Validierungen unterzogen werden, bevor sie in schließlich in die TSA geschrieben werden.
def read_csv_file(source_file, csv_options, tsa_schema):
"""
Read a csv file into a DataFrame.
Load data from tmpDir csv file and return it as DataFrame.
Rows that cannot be parsed due to schema mismatch or
time / date parsing errors are returned in a separate DataFrame.
Parameters
----------
source_file : str
Full path to file to be read into TSA.
csv_options : dict
Dictionary defining spark.read.options like seperator,
encoding and quote characters.
tsa_schema : pyspark.sql.types.StructType
Schema used to read the csv file with.
Returns
-------
df_valid : DataFrame
Spark DataFrame containing the parsed records.
This includes records with missing columns.
These are set to ``null`` and have an entry in ``_corrupt_record``.
df_invalid : DataFrame
Spark DataFrame containing records that did not parse correctly.
This does not include records with missing columns, as they
are included in dfValid.
"""
# Options for improved error handling
csv_options ["mode"] = "PERMISSIVE"
csv_options ["rescuedDataColumn"] = "_rescued_data"
read_schema = tsa_schema.add("_corrupt_record", T.StringType(), True)
df_inbound = (
spark.read.format("csv")
.options(**csv_options)
.schema(read_schema)
.load(source_file)
)
df_valid = df_inbound .where(
"(_rescued_data IS NULL or _rescued_data = '{}') and _corrupt_record is NULL"
).drop("_rescued_data", "_corrupt_record")
# invalid records are split from valid ones
df_invalid = (
df_inbound.where(
"(_rescued_data IS NOT NULL and _rescued_data <> '{}') or _corrupt_record is NOT NULL"
)
)
return df_valid, df_invalid
Weg des Wissens: Historisierung in der Global Staging Area
Nachdem die Eingangsdaten in die TSA gelangt sind müssen diese in der GSA persistiert und historisiert werden. Dazu sind für jedes Datenobjekt fachliche Schlüssel sowie eine Historisierungsmethode definiert. Standardmäßig wird eine Delta-Beladung verwendet, die Inserts, Updates und Copies bestimmt und entsprechend behandelt.
Technische Spalten – Das Gerüst der Historisierung
Um eine zuverlässige historisierte Datenhaltung zu gewährleisten, müssen die fachlichen Daten aus der Quelle um technische Spalten erweitert werden. Dies ermöglicht die Rekonstruktion des Datenstands zu jedem beliebigen Zeitpunkt und gewährleistet die eindeutige Identifizierbarkeit jeder Zeile. Es gibt verschiedene Herangehensweisen um dieses Problem zu lösen und in userem Data Warehouse verwenden wir eine bitemporale Datenhaltung mit zusätzlichen Active- und Deleted-Flags. Ein wichtiger Grundsatz dabei ist, dass eine Zeile im Data Warehouse nur ein einziges mal geupdated werden darf, nämlich von aktiv auf inaktiv. Insgesamt werden folgende technischen Spalten in der GSA verwendet:
Spalte | Datentyp | Beispielwerte | Erklärung |
ID_JOBRUN_INS | Bigint | 1,2,3 | Ladelauf, wann die Zeile eingefügt wurde |
ID_JOBRUN_UPD | Bigint | Null,2,3 | Ladelauf, wann die Zeile geupdatet wurde |
DWH_ACTIVEFLAG | Bool | 0,1 | Ob die Zeile aktuell aktiv ist. |
DWH_DELETEDFLAG | Bool | 0,1 | Ob die Zeile als gelöscht markiert wurde. |
DWH_VALIDFROM | Timestamp | 2023–10-01T12:00:00 | Zeitstempel, wann die Zeile eingefügt wurde |
DWH_VALIDTO | Timestamp | Null, 2023–10-02T14:00:00 | Zeitstempel, wann die Zeile geupdated wurde |
DWH_ROWID | Bigint | 1,2,3 | Fortlaufende Nummer, die die Zeile eindeutig identifiziert |
Bestimmung und Verarbeitung von Inserts und Updates – Die Essenz der Historisierung
Im Sinne der bitemporalen Datenhaltung wie sie in unserem Data Warehouse Framework angewandt wird muss zwischen logischen und technischen (physischen) Inserts und Updates unterschieden werden.
Ein logisches Insert entspricht einer neuen Datenzeile mit einem Schlüssel, der derzeit nicht in der GSA aktiv ist, und führt direkt zu einem technischen Insert. Logische Updates hingegen tritt auf, wenn eine Zeile denselben Schlüsselwert wie eine in der GSA vorhandene Zeile aufweist und sich mindestens in einem anderen Feld unterscheidet. Ein logisches Update führt sowohl zu einem technischen Update (Inaktivierung der bereits bestehenden Zeile) als auch zu einem technischen Insert (Einfügen des neuen Wertes).
Um diesen Prozess zu initiieren, müssen zuerst die logischen Inserts und Updates identifiziert werden. Hierbei werden die neuen Daten aus der TSA mit den vorhandenen Daten (derzeit aktiv in der GSA) verglichen:
def determine_inserts(df_new, df_old, key_columns):
"""
Determine logical inserts.
Records need to be inserted when there are no records
with this business key combination yet.
Parameters
----------
df_new: DataFrame
Spark DataFrame with the freshly delivered data.
df_old: DataFrame
Spark DataFrame with the already existing data.
key_columns: list
List of key column names.
Returns
-------
DataFrame
Spark DataFrame with the determined inserts.
"""
df_log_inserts = df_new.join(
df_old.where("DWH_ACTIVEFLAG=1"),
on=key_columns,
how="left_anti",
)
return df_log_inserts
def determine_updates(df_new, df_old, key_columns, value_columns):
"""
Determine logical updates.
Old records need to be updated when there are rows
in new which have the same key column values but different
entries in any of the value columns.
Parameters
----------
df_new: DataFrame
Spark DataFrame with the freshly delivered data.
df_old: DataFrame
Spark DataFrame with the already existing data.
key_columns: list
List of key column names.
value_columns: list
List of value column names.
Returns
-------
DataFrame
Spark DataFrame with the determined updates.
"""
# We need to use the null safe equal operator <=> here and then negate the value,
# since there is no null safe unequal operator available.
update_condition = " or ".join([f"!(new.{v} <=> old.{v})" for v in value_columns])
df_log_updates= (
df_new.alias("new")
.join(
df_old.alias("old").where("DWH_ACTIVEFLAG=1"),
on=key_columns,
how="inner",
)
.where(update_condition)
.select("new.*")
)
return df_log_updates
Diese Datensets können nun um die technischen Spalten erweitert werden bevor sie in die GSA geschrieben werden:
import datetime as dt
def update(df_log_updates, df_log_inserts, dt_gsa):
"""
Update GSA with new data.
Generates technical inserts and updates and merges them into
the GSA.
Parameters
----------
df_log_updates : DataFrame
Logical updates to write.
df_log_inserts : DataFrame
Logical inserts to write.
dt_gsa : DeltaTable
Target GSA table to merge into.
"""
max_id = dt_gsa.toDf().agg({"DWH_ROWID":"max"}).head()[0]
timestamp = dt.datetime.now()
df_tech_updates = df_log_updates
# technical inserts need to have technical columns added
df_tech_inserts = df_log_inserts.unionByName(df_log_updates).selectExpr(
"*",
f"{job_run_id} as ID_JOBRUN_INS",
"NULL as ID_JOBRUN_UPD",
"1 as DWH_ACTIVEFLAG",
"0 as DWH_DELETEDFLAG",
f"'{timestamp}' as DWH_VALIDFROM",
"'9999-12-31 23:59:59.999999' as DWH_VALIDTO",
f"ROW_NUMBER() over (order by '') + {max_id} as DWH_ROWID"
)
# we build a single dataframe to update the gsa with a single merge operation
df_updates_inserts = df_tech_updates.unionByName(df_tech_inserts, allowMissingColumns=True)
# merge on the key columns, also only consider active entries in the gsa
# the upd.DWH_ACTIVEFLAG is used to differentiate updates from inserts
merge_condition = " and ".join([f"gsa.{k} = upd.{k}" for k in keyColumns])
merge_condition += " and gsa.DWH_ACTIVEFLAG =1 and upd.DWH_ACTIVEFLAG is NULL"
(
dt_gsa.alias("gsa)
.merge(source=df_updates_inserts.alias("upd"), condition=merge_condition)
.whenMatchedUpdate(
set={
"DWH_ACTIVEFLAG": "0",
"DWH_VALIDTO": f"'{timestamp}'",
"ID_JOBRUN_UPD": "job_run_id",
},
)
.whenNotMatchedInsertAll()
.execute()
)
Mithilfe der merge-Funktionalität von Delta Lake und den speziell präparierten Daten ist es möglich Updates und Inserts in einer einzigen ACID-Transaktion auszuführen und die Datenkonsistenz sicherzustellen.
Die Transformationsschicht – Von der Rohdatenverarbeitung zum Erkenntnisgewinn
In der Staging-Schicht haben wir gesehen, wie Daten aus verschiedenen Quellen erfasst, historisiert und persistiert werden. Jetzt konzentrieren wir uns auf die Transformationsschicht, die den entscheidenden Schritt in der Wertschöpfungskette darstellt.
Während der Beladungsprozess der Staging-Schicht sehr generisch aufgebaut werden kann, benötigt die Transformationsschicht normalerweise individuelle Businesslogiken. Das bedeutet, dass maßgeschneiderte Prozesse entwickelt werden müssen, um die spezifischen Anforderungen jedes Unternehmens zu erfüllen. Um diese Verarbeitungsjobs dennoch modular und generisch zu gestalten, teilen wir die Jobs in der Transformationsschicht in zwei Teile auf:
- Der generische äußere Teil befasst sich mit Aufgaben wie Logging und Metadatenmanagement. Dieser Teil sorgt für eine konsistente Protokollierung der Verarbeitungsschritte und verwaltet die statischen und dynamischen Metadaten.
- Der innere Teil ist individuell und beinhaltet die spezifische Businesslogik. Hierbei handelt es sich um eine Aneinanderkettung von einzelnen, wiederverwendbaren Bausteinen, die bestimmte Businesslogiken kapseln. Zum Beispiel kann ein Baustein eine Validierung der Daten durchführen oder die Daten mithilfe von Stammdaten anreichern.
Eine elegante Art diese Programmbausteine zu implementieren stellen Python Funktionen da:
def filterTransactions(limit=0):
"""
Filter transactions by amount.
Remove records with `TRANSACTION_AMOUNT` below the limit.
Parameters
----------
limit : float
Lower limit a transaction must have.
"""
def inner(df_input):
return df_input.where(f"TRANSACTION_AMOUNT >= {limit}"
return inner
Der Vorteil der zunächst kompliziert wirkenden verschachtelung von zwei Funktionen liegt in der Parametrisierbarkeit und gleichzeitigen Verwendbarkeit der .transform() Dataframe Methode:
df_filtered = df.transform(filterTransaction(limit=42.69)).transform(...)
Fazit
In diesem zweiten Teil unserer Blog-Serie haben wir einen ersten Überblick über die Implementierung unseres Data Warehouse Frameworks vermittelt. Wir haben gezeigt, wie Daten in der Staging-Schicht erfasst und vorbereitet werden, um die Datenqualität sicherzustellen. Die Global Staging Area ermöglicht eine zuverlässige Historisierung und Datenkonsistenz, wobei Updates und Inserts in einer einzigen ACID-Transaktion verarbeitet werden.
Die Transformationsschicht, als zentrales Element, setzt maßgeschneiderte Businesslogiken modular um, und wir präsentierten eine effektive Methode, um diese Logiken mithilfe von Python-Funktionen zu realisieren. Diese Kombination aus generischer äußerer Struktur und individuellen inneren Prozessen erfüllt die spezifischen Anforderungen von Unternehmen, während sie dennoch von modularer Flexibilität profitieren.
Selbstverständlich stellt die hier gezeigten Funktionalitäten nur einen Bruchteil der umfangreichen Logik unseres Data Warehouse Frameworks dar. Dank des modularen Aufbaus können außerdem Kundenwünsche einfach und schnell umgesetzt werden, da wir für individuelle Anforderungen maßgeschneiderte Module entwickeln können. Wenn Sie mehr über unser Data Warehouse Framework erfahren oder darüber sprechen möchten, wie es Ihr Unternehmen unterstützen kann, zögern Sie nicht, uns zu kontaktieren. Wir freuen uns auf den Austausch mit Ihnen und begleiten Sie gerne bei ihrere Reise vom On-Prem zum Cloud Data Warehouse.