Ein­lei­tung

In unse­rem ers­ten Arti­kel haben wir das Fun­da­ment gelegt, um die Daten-Ware­house-Land­schaft im Zeit­al­ter der Cloud-Migra­tion zu revo­lu­tio­nie­ren. Wir haben die grund­le­gen­den Kon­zepte eines gene­ri­schen, meta­da­ten­ge­trie­be­nen Data Ware­house Frame­works erkun­det, das spe­zi­ell auf die Anfor­de­run­gen der Cloud-Umge­bung zuge­schnit­ten ist. Jetzt ist es an der Zeit, die­ses Kon­zept in die Tat umzusetzen.

Im zwei­ten Teil unse­rer Blog-Serie wer­den wir tie­fer ein­tau­chen und uns auf die Imple­men­tie­rung kon­zen­trie­ren. Die Sta­ging- und Trans­for­ma­ti­ons-Schich­ten ste­hen dabei im Mit­tel­punkt. Wir wer­den Ihnen einen pra­xis­na­hen Ein­blick in die Umset­zung geben, wie Daten in die Sta­ging-Schicht gelan­gen, wie sie dort ver­sio­niert und his­to­ri­siert wer­den, und wie die Trans­for­ma­ti­ons­schicht dazu bei­trägt, Daten in wert­volle Erkennt­nisse zu verwandeln.

Die­ser Arti­kel wird Ihnen zei­gen, wie unser Data Ware­house Frame­work als Kata­ly­sa­tor fun­giert und den Weg für eine naht­lose und effi­zi­ente Daten­ver­ar­bei­tung ebnet. Wir sind bereit, in die prak­ti­sche Welt der Cloud-Migra­tion und Daten­op­ti­mie­rung ein­zu­tau­chen. Will­kom­men im zwei­ten Teil unse­rer auf­re­gen­den Reise!

Gene­rel­les

Wie bereits im ers­ten Teil unse­rer Blog-Serie erläu­tert, bil­det Dat­ab­ricks die zen­trale Kom­po­nente unse­res Data Ware­house Frame­works für die Daten­ver­ar­bei­tung. Neben der Stan­dard­im­ple­men­tie­rung von Spark in Scala wer­den auch die Python- und R‑Bindings (pySpark und sparkR) unter­stützt. Auf­grund sei­ner weit ver­brei­te­ten Popu­la­ri­tät, sei­ner her­aus­ra­gen­den Nut­zer­freund­lich­keit und sei­ner Fülle an Funk­tio­nen fiel die Wahl der Pro­gram­mier­spra­che auf Python. Beach­ten Sie, dass die in die­sem Arti­kel vor­kom­men­den Code-Bei­spiele oft pro­blem­los in eine der ande­ren unter­stütz­ten Spra­chen über­setzt wer­den können.

Die Sta­ging Schicht – Von der Daten­er­fas­sung zur Historisierung

In unse­rem Data Ware­house Frame­work besteht der Ein­gangs­pro­zess aus zwei direkt auf­ein­an­der­fol­gen­den Schrit­ten: dem Laden der Daten in die Tem­po­rary Sta­ging Area (TSA) und die anschlie­ßende His­to­ri­sie­rung in der Glo­bal Sta­ging Area (GSA).

Vali­die­rung und Ein­le­sen in die Tem­po­rary Sta­ging Area

Die TSA dient dazu Daten aus den unter­schied­lichs­ten Quel­len in das Data Ware­house zu laden und gleich­zei­tig eine erste, tech­ni­sche Schema-Vali­die­rung durch­zu­füh­ren. Die Bela­dung ist modu­lar auf­ge­baut und kann fle­xi­bel um neue Quel­len erwei­tert und durch Meta­da­ten gesteu­ert wer­den. Bei­spiels­weise kann für Datei­lie­fe­run­gen fest­ge­legt wer­den, ob eine zusätz­li­che .md5 oder .prot Datei gelie­fert und abge­gli­chen wer­den muss, oder wel­che Trenn­zei­chen in einer CSV-Datei vor­han­den sind.

Das nach­fol­gende Code-Bei­spiel illus­triert, wie mühe­los das Ein­le­sen von CSV-Daten in Dat­ab­ricks unter Nut­zung von nati­ven Spark-Ope­ra­tio­nen und spe­zi­fi­schen Dat­ab­ricks-Vali­die­run­gen durch­ge­führt wer­den kann. Die als feh­ler­haft erkann­ten Daten kön­nen mit einer anschlie­ßen­den Feh­ler­be­hand­lung wei­ter ver­ar­bei­tet wer­den, wohin­ge­gen die ande­ren Daten wei­te­ren Vali­die­run­gen unter­zo­gen wer­den, bevor sie in schließ­lich in die TSA geschrie­ben werden.

def read_csv_file(source_file, csv_options, tsa_schema):
    """
    Read a csv file into a DataFrame.

    Load data from tmpDir csv file and return it as DataFrame.
    Rows that cannot be parsed due to schema mismatch or
    time / date parsing errors are returned in a separate DataFrame.

    Parameters
    ----------
    source_file : str
        Full path to file to be read into TSA.
    csv_options : dict
        Dictionary defining spark.read.options like seperator,
        encoding and quote characters.
    tsa_schema : pyspark.sql.types.StructType
        Schema used to read the csv file with.

    Returns
    -------
    df_valid : DataFrame
        Spark DataFrame containing the parsed records.
        This includes records with missing columns.
        These are set to ``null`` and have an entry in ``_corrupt_record``.
    df_invalid : DataFrame
        Spark DataFrame containing records that did not parse correctly.
        This does not include records with missing columns, as they
        are included in dfValid.
    """
    # Options for improved error handling
    csv_options ["mode"] = "PERMISSIVE"
    csv_options ["rescuedDataColumn"] = "_rescued_data"

    read_schema = tsa_schema.add("_corrupt_record", T.StringType(), True)

    df_inbound = (
        spark.read.format("csv")
        .options(**csv_options)
        .schema(read_schema)
        .load(source_file)
    )

    df_valid = df_inbound .where(
        "(_rescued_data IS NULL or _rescued_data = '{}') and _corrupt_record is NULL"
    ).drop("_rescued_data", "_corrupt_record")

    # invalid records are split from valid ones
    df_invalid = (
        df_inbound.where(
            "(_rescued_data IS NOT NULL and _rescued_data <> '{}') or _corrupt_record is NOT NULL"
        )
    )

    return df_valid, df_invalid
Weg des Wis­sens: His­to­ri­sie­rung in der Glo­bal Sta­ging Area

Nach­dem die Ein­gangs­da­ten in die TSA gelangt sind müs­sen diese in der GSA per­sis­tiert und his­to­ri­siert wer­den. Dazu sind für jedes Daten­ob­jekt fach­li­che Schlüs­sel sowie eine His­to­ri­sie­rungs­me­thode defi­niert. Stan­dard­mä­ßig wird eine Delta-Bela­dung ver­wen­det, die Inserts, Updates und Copies bestimmt und ent­spre­chend behandelt.

Tech­ni­sche Spal­ten – Das Gerüst der Historisierung

Um eine zuver­läs­sige his­to­ri­sierte Daten­hal­tung zu gewähr­leis­ten, müs­sen die fach­li­chen Daten aus der Quelle um tech­ni­sche Spal­ten erwei­tert wer­den. Dies ermög­licht die Rekon­struk­tion des Daten­stands zu jedem belie­bi­gen Zeit­punkt und gewähr­leis­tet die ein­deu­tige Iden­ti­fi­zier­bar­keit jeder Zeile. Es gibt ver­schie­dene Her­an­ge­hens­wei­sen um die­ses Pro­blem zu lösen und in use­rem Data Ware­house ver­wen­den wir eine bitem­po­rale Daten­hal­tung mit zusätz­li­chen Active- und Dele­ted-Flags. Ein wich­ti­ger Grund­satz dabei ist, dass eine Zeile im Data Ware­house nur ein ein­zi­ges mal geup­dated wer­den darf, näm­lich von aktiv auf inak­tiv. Ins­ge­samt wer­den fol­gende tech­ni­schen Spal­ten in der GSA verwendet:

SpalteDaten­typBei­spiel­werteErklä­rung
ID_JOBRUN_INSBig­int1,2,3Lade­lauf, wann die Zeile ein­ge­fügt wurde
ID_JOBRUN_UPDBig­intNull,2,3Lade­lauf, wann die Zeile geup­datet wurde
DWH_ACTIVEFLAGBool0,1Ob die Zeile aktu­ell aktiv ist.
DWH_DELETEDFLAGBool0,1Ob die Zeile als gelöscht mar­kiert wurde.
DWH_VALIDFROMTimestamp2023–10-01T12:00:00Zeit­stem­pel, wann die Zeile ein­ge­fügt wurde
DWH_VALIDTOTimestampNull, 2023–10-02T14:00:00Zeit­stem­pel, wann die Zeile geup­dated wurde
DWH_ROWIDBig­int1,2,3Fort­lau­fende Num­mer, die die Zeile ein­deu­tig identifiziert
Tabelle 1: Tech­ni­sche Spal­ten in der GSA
Bestim­mung und Ver­ar­bei­tung von Inserts und Updates – Die Essenz der Historisierung

Im Sinne der bitem­po­ra­len Daten­hal­tung wie sie in unse­rem Data Ware­house Frame­work ange­wandt wird muss zwi­schen logi­schen und tech­ni­schen (phy­si­schen) Inserts und Updates unter­schie­den werden. 

Ein logi­sches Insert ent­spricht einer neuen Daten­zeile mit einem Schlüs­sel, der der­zeit nicht in der GSA aktiv ist, und führt direkt zu einem tech­ni­schen Insert. Logi­sche Updates hin­ge­gen tritt auf, wenn eine Zeile den­sel­ben Schlüs­sel­wert wie eine in der GSA vor­han­dene Zeile auf­weist und sich min­des­tens in einem ande­ren Feld unter­schei­det. Ein logi­sches Update führt sowohl zu einem tech­ni­schen Update (Inak­ti­vie­rung der bereits bestehen­den Zeile) als auch zu einem tech­ni­schen Insert (Ein­fü­gen des neuen Wertes).

Um die­sen Pro­zess zu initi­ie­ren, müs­sen zuerst die logi­schen Inserts und Updates iden­ti­fi­ziert wer­den. Hier­bei wer­den die neuen Daten aus der TSA mit den vor­han­de­nen Daten (der­zeit aktiv in der GSA) verglichen:

def determine_inserts(df_new, df_old, key_columns):
    """
    Determine logical inserts.

    Records need to be inserted when there are no records
    with this business key combination yet.

    Parameters
    ----------
    df_new: DataFrame
        Spark DataFrame with the freshly delivered data.
    df_old: DataFrame
        Spark DataFrame with the already existing data.
    key_columns: list
        List of key column names.


    Returns
    -------
    DataFrame
        Spark DataFrame with the determined inserts.
    """
    df_log_inserts = df_new.join(
        df_old.where("DWH_ACTIVEFLAG=1"),
        on=key_columns,
        how="left_anti",
    )
    return df_log_inserts 

def determine_updates(df_new, df_old, key_columns, value_columns):
    """
    Determine logical updates.

    Old records need to be updated when there are rows
    in new which have the same key column values but different
    entries in any of the value columns.

    Parameters
    ----------
    df_new: DataFrame
        Spark DataFrame with the freshly delivered data.
    df_old: DataFrame
        Spark DataFrame with the already existing data.
    key_columns: list
        List of key column names.
    value_columns: list
        List of value column names.

    Returns
    -------
    DataFrame
        Spark DataFrame with the determined updates.
    """
    # We need to use the null safe equal operator <=> here and then negate the value,
    # since there is no null safe unequal operator available.
    update_condition = " or ".join([f"!(new.{v} <=> old.{v})" for v in value_columns])


    df_log_updates= (
        df_new.alias("new")
        .join(
            df_old.alias("old").where("DWH_ACTIVEFLAG=1"),
            on=key_columns,
            how="inner",
        )
        .where(update_condition)
        .select("new.*")
    )
    return df_log_updates

Diese Daten­sets kön­nen nun um die tech­ni­schen Spal­ten erwei­tert wer­den bevor sie in die GSA geschrie­ben werden:

import datetime as dt

def update(df_log_updates, df_log_inserts, dt_gsa):
    """
    Update GSA with new data.

    Generates technical inserts and updates and merges them into
    the GSA.

    Parameters
    ----------
    df_log_updates : DataFrame
        Logical updates to write.
    df_log_inserts : DataFrame
        Logical inserts to write.
    dt_gsa : DeltaTable
        Target GSA table to merge into.
    """
    max_id = dt_gsa.toDf().agg({"DWH_ROWID":"max"}).head()[0]
    timestamp = dt.datetime.now()

    df_tech_updates = df_log_updates

    # technical inserts need to have technical columns added
    df_tech_inserts = df_log_inserts.unionByName(df_log_updates).selectExpr(
        "*",
        f"{job_run_id} as ID_JOBRUN_INS",
        "NULL as ID_JOBRUN_UPD",
        "1 as DWH_ACTIVEFLAG",
        "0 as DWH_DELETEDFLAG",
        f"'{timestamp}' as DWH_VALIDFROM",
        "'9999-12-31 23:59:59.999999' as DWH_VALIDTO",
        f"ROW_NUMBER() over (order by '') + {max_id} as DWH_ROWID"
    )
    # we build a single dataframe to update the gsa with a single merge operation
    df_updates_inserts = df_tech_updates.unionByName(df_tech_inserts, allowMissingColumns=True)

    # merge on the key columns, also only consider active entries in the gsa
    # the upd.DWH_ACTIVEFLAG is used to differentiate updates from inserts
    merge_condition = " and ".join([f"gsa.{k} = upd.{k}" for k in keyColumns]) 
    merge_condition += " and gsa.DWH_ACTIVEFLAG =1 and upd.DWH_ACTIVEFLAG is NULL"

    (
        dt_gsa.alias("gsa)
        .merge(source=df_updates_inserts.alias("upd"), condition=merge_condition)
        .whenMatchedUpdate(
            set={
                "DWH_ACTIVEFLAG": "0",
                "DWH_VALIDTO": f"'{timestamp}'",
                "ID_JOBRUN_UPD": "job_run_id",
            },
        )
        .whenNotMatchedInsertAll()
        .execute()
    )

Mit­hilfe der merge-Funk­tio­na­li­tät von Delta Lake und den spe­zi­ell prä­pa­rier­ten Daten ist es mög­lich Updates und Inserts in einer ein­zi­gen ACID-Trans­ak­tion aus­zu­füh­ren und die Daten­kon­sis­tenz sicherzustellen.

Die Trans­for­ma­ti­ons­schicht – Von der Roh­da­ten­ver­ar­bei­tung zum Erkenntnisgewinn

In der Sta­ging-Schicht haben wir gese­hen, wie Daten aus ver­schie­de­nen Quel­len erfasst, his­to­ri­siert und per­sis­tiert wer­den. Jetzt kon­zen­trie­ren wir uns auf die Trans­for­ma­ti­ons­schicht, die den ent­schei­den­den Schritt in der Wert­schöp­fungs­kette darstellt.

Wäh­rend der Bela­dungs­pro­zess der Sta­ging-Schicht sehr gene­risch auf­ge­baut wer­den kann, benö­tigt die Trans­for­ma­ti­ons­schicht nor­ma­ler­weise indi­vi­du­elle Busi­ness­lo­gi­ken. Das bedeu­tet, dass maß­ge­schnei­derte Pro­zesse ent­wi­ckelt wer­den müs­sen, um die spe­zi­fi­schen Anfor­de­run­gen jedes Unter­neh­mens zu erfül­len. Um diese Ver­ar­bei­tungs­jobs den­noch modu­lar und gene­risch zu gestal­ten, tei­len wir die Jobs in der Trans­for­ma­ti­ons­schicht in zwei Teile auf:

  1. Der gene­ri­sche äußere Teil befasst sich mit Auf­ga­ben wie Log­ging und Meta­da­ten­ma­nage­ment. Die­ser Teil sorgt für eine kon­sis­tente Pro­to­kol­lie­rung der Ver­ar­bei­tungs­schritte und ver­wal­tet die sta­ti­schen und dyna­mi­schen Meta­da­ten.
  2. Der innere Teil ist indi­vi­du­ell und beinhal­tet die spe­zi­fi­sche Busi­ness­lo­gik. Hier­bei han­delt es sich um eine Anein­an­der­ket­tung von ein­zel­nen, wie­der­ver­wend­ba­ren Bau­stei­nen, die bestimmte Busi­ness­lo­gi­ken kap­seln. Zum Bei­spiel kann ein Bau­stein eine Vali­die­rung der Daten durch­füh­ren oder die Daten mit­hilfe von Stamm­da­ten anreichern.

Eine ele­gante Art diese Pro­gramm­bau­steine zu imple­men­tie­ren stel­len Python Funk­tio­nen da:

def filterTransactions(limit=0):
    """
    Filter transactions by amount.
    
    Remove records with `TRANSACTION_AMOUNT` below the limit.

    Parameters
    ----------
    limit : float
        Lower limit a transaction must have.
    """
    def inner(df_input):
        return df_input.where(f"TRANSACTION_AMOUNT >= {limit}"

    return inner

Der Vor­teil der zunächst kom­pli­ziert wir­ken­den ver­schach­te­lung von zwei Funk­tio­nen liegt in der Para­me­tri­sier­bar­keit und gleich­zei­ti­gen Ver­wend­bar­keit der .trans­form() Data­frame Methode:

df_filtered = df.transform(filterTransaction(limit=42.69)).transform(...)

Fazit

In die­sem zwei­ten Teil unse­rer Blog-Serie haben wir einen ers­ten Über­blick über die Imple­men­tie­rung unse­res Data Ware­house Frame­works ver­mit­telt. Wir haben gezeigt, wie Daten in der Sta­ging-Schicht erfasst und vor­be­rei­tet wer­den, um die Daten­qua­li­tät sicher­zu­stel­len. Die Glo­bal Sta­ging Area ermög­licht eine zuver­läs­sige His­to­ri­sie­rung und Daten­kon­sis­tenz, wobei Updates und Inserts in einer ein­zi­gen ACID-Trans­ak­tion ver­ar­bei­tet werden.

Die Trans­for­ma­ti­ons­schicht, als zen­tra­les Ele­ment, setzt maß­ge­schnei­derte Busi­ness­lo­gi­ken modu­lar um, und wir prä­sen­tier­ten eine effek­tive Methode, um diese Logi­ken mit­hilfe von Python-Funk­tio­nen zu rea­li­sie­ren. Diese Kom­bi­na­tion aus gene­ri­scher äuße­rer Struk­tur und indi­vi­du­el­len inne­ren Pro­zes­sen erfüllt die spe­zi­fi­schen Anfor­de­run­gen von Unter­neh­men, wäh­rend sie den­noch von modu­la­rer Fle­xi­bi­li­tät profitieren.

Selbst­ver­ständ­lich stellt die hier gezeig­ten Funk­tio­na­li­tä­ten nur einen Bruch­teil der umfang­rei­chen Logik unse­res Data Ware­house Frame­works dar. Dank des modu­la­ren Auf­baus kön­nen außer­dem Kun­den­wün­sche ein­fach und schnell umge­setzt wer­den, da wir für indi­vi­du­elle Anfor­de­run­gen maß­ge­schnei­derte Module ent­wi­ckeln kön­nen. Wenn Sie mehr über unser Data Ware­house Frame­work erfah­ren oder dar­über spre­chen möch­ten, wie es Ihr Unter­neh­men unter­stüt­zen kann, zögern Sie nicht, uns zu kon­tak­tie­ren. Wir freuen uns auf den Aus­tausch mit Ihnen und beglei­ten Sie gerne bei ihrere Reise vom On-Prem zum Cloud Data Warehouse.