Moti­va­tion

Für die meis­ten Fir­men stel­len Log-Daten (oder Log-Files) eine signi­fi­kante Daten­grund­lage für Ana­lyse, Opti­mie­rung und Infor­ma­ti­ons­ge­win­nung dar. Sie sind all­ge­gen­wär­tig, da ihre Gene­rie­rung mit den fun­da­men­tals­ten Vor­gän­gen der IT ein­her­ge­hen. Obwohl diese Daten bei nahezu allen Benut­zer-Hand­lun­gen und Sys­tem-Aktio­nen gene­riert wer­den, nut­zen viele Fir­men Log-Daten nach wie vor fast aus­schließ­lich für die Ent­wick­lung von neuem Code oder der Kor­rek­tur von Code auf pro­duk­ti­ven Systemen.

Daten aus Sys­tem-Vor­gän­gen, User-Hand­lun­gen oder die Über­wa­chung von Auto­ma­ti­sie­run­gen wer­den oft ver­nach­läs­sigt und auf Erfolgs­mel­dun­gen redu­ziert. Dabei besteht gerade an die­sen Stel­len oft Optimierungspotential.

Als Bei­spiel für inter­es­sante Daten kann man an die­ser Stelle z.B. Log-Daten von User-Inter­ak­tio­nen in einem Web-Ser­vice betrach­ten. Von Quell­da­ten, User-IDs, URLs, Zeit­punk­ten über ver­schie­dene wei­tere Iden­ti­fier kön­nen nahezu belie­big viele Daten aus jedem Ein­trag gezo­gen werden.

Um diese Daten für SQL-Daten­ban­ken ver­füg­bar zu machen, besteht die größte Hürde oft in der Struk­tu­rie­rung der Daten, da Log-Daten oft eine Reihe unter­schied­li­cher Infor­ma­tio­nen beinhal­ten, wel­che von Ein­trag zu Ein­trag vari­ie­ren kön­nen. Genau die­ses Pro­blem soll die­ser Bei­trag beleuch­ten, wobei eine klas­si­sche Methode für den Auf­bau eines Spark-Data­frames dar­ge­stellt wird.

Das klas­si­sche Vor­ge­hen besteht dabei zunächst in der Ana­lyse der Log-Daten, d.h. ihrer Text­struk­tur, und die anschlie­ßende Extrak­tion der Daten mit Hilfe der erhal­te­nen Informationen.

Vor­aus­set­zun­gen

Zunächst sollte kurz beschrie­ben wer­den, wel­che Vor­aus­set­zun­gen vorab erfüllt wer­den müs­sen, um die­sen Ansatz in sei­nen Grund­zü­gen imi­tie­ren zu kön­nen. Neben einer funk­ti­ons­fä­hi­gen Spark-Instanz wer­den die PySpark- und Reg­Exp-Biblio­the­ken aus Python verwendet.

Sobald die Lie­fer­stre­cke der Files bzw. das Spei­cher­ver­zeich­nis in die Umge­bung ein­ge­bun­den wur­den, kön­nen die Files mit klas­si­schen Read-Befeh­len (Spark.Read.Load(…)) nutz­bar gemacht werden.

An die­ser Stelle muss zunächst eine Ana­lyse der Struk­tur der Log-Daten erfol­gen, um die Daten für die wei­tere Ver­ar­bei­tung vor­be­rei­ten zu kön­nen. Die Struk­tur der Daten sowie ihre Kom­ple­xi­tät kön­nen sich dabei – je nach Anwen­dungs­be­zug – stark unter­schei­den. Diese Infor­ma­tio­nen wer­den benö­tigt, um die Daten mit ent­spre­chen­den Regu­lar Expres­si­ons fil­tern und berei­ni­gen zu können.

Der Code, der für die Extrak­tion der Daten genutzt wer­den soll, muss außer­dem noch an die Nodes (Arbeits­ein­hei­ten von Spark-Instan­zen) ver­teilt wer­den. Dafür muss das Paket in die ent­spre­chende Spark-Instanz ein­ge­bun­den werden.

Es sollte an die­ser Stelle auch gesagt wer­den, dass für die par­al­lele Ver­ar­bei­tung von Log-Files auf ver­schie­de­nen Umge­bun­gen ein ent­spre­chen­des Maß an Rechen­leis­tung benö­tigt wird. Um dabei Arbeits­spei­cher zu spa­ren, soll­ten so wenig Data­frames wie nötig auf­ge­baut werden.

Metho­dik

Anlie­fe­rung der Daten

Für die Ver­ar­bei­tung der Daten neh­men wir an die­ser Stelle an, dass die Ein­ga­be­da­ten in Form eines ein­zei­li­gen Ein­trags in einem Spark-Data­frame über­ge­ben wer­den. Die­ses Data­frame (DF) soll als Platz­hal­ter für die wei­tere Ver­ar­bei­tung dienen.

Zu Beginn erfolgt eine Typ­über­prü­fung der Ein­ga­be­da­ten, der für das Aus­le­sen der Daten benö­tigt wird. Nach jener Über­prü­fung wer­den die Daten aus­ge­le­sen. Die­ser Schritt kann sich je nach Form der Daten ändern und unter­schied­lich kom­plex aus­fal­len. Im Bei­spiel wei­ter unten wer­den die Daten zei­len­weise als String im JSON-For­mat aus­ge­le­sen. Anschlie­ßen muss erneut geprüft wer­den, ob das Data­frame nun gefüllt oder leer ist.

1. Auf­bau der CSV-Datei im Bei­spiel
(Sin­gle-Line JSON Spalte, semistrukturiert)

Um Dupli­kate zu ver­mei­den, wird nun eine Kor­rek­tur der Spal­ten vor­ge­nom­men und eine Map­ping-Liste par­al­lel mit­er­stellt. Das Ziel ist es, aus [„usr“, „use“, „use“] ein Objekt der Form [„usr“, „use_0“, „use_1“] zu machen. Die zuge­hö­rige Map­ping-Struk­tur der geän­der­ten Zei­len besitzt die Form {„use“: [„use_0“, „use_1“]}. Neue Spal­ten wer­den nun an den Data­frame ange­hängt und mit Hilfe der Map­ping-Tabelle kann die Struk­tur erhal­ten wer­den und Mehr­fach­nen­nun­gen ent­fernt werden.

An die­ser Stelle kön­nen nun auch die Zeit­punkte und Daten in bestimm­ten For­ma­ten gespei­chert wer­den. PySpark bringt mit den to_timestamp()- und to_date()-Funktionen die Mög­lich­keit mit, vor­de­fi­nierte For­mate als String zu übergeben.

Data­frame-Ver­ar­bei­tung

Um die Struk­tur nun wei­ter zu glät­ten, müs­sen die ver­schie­de­nen Spal­ten nun auf ihren Daten­ty­pen über­prüft wer­den. Alle Spal­ten, die den Typ Array haben, kön­nen mit­tels der explode-outer-Funk­tion von PySpark auf­ge­teilt wer­den. Dafür kann eine Liste der array-wer­ti­gen Spal­ten ange­legt wer­den und für jedes Ele­ment, das ent­fernt wird, erfolgt ein neues Sel­ect auf den Data­frame, bei dem die ska­la­ren, ver­blei­ben­den array-wer­ti­gen und neuen explo­ded-Spal­ten aus­ge­wählt werden.

Nach die­sem Schritt müs­sen die Spal­ten­na­men, die einen Punkt beinhal­ten, ange­passt wer­den, da diese eine Nota­tion ver­wen­den („ ‚a.B‚“), die nicht aus­ge­ge­ben wer­den soll. Dafür wer­den zunächst nur die nicht-ver­schach­tel­ten Zei­len angepasst.

Der vor­letzte Schritt besteht darin, den Data­frame final zu glät­ten. Dafür kann man eine rekur­sive Funk­tion in einem Sel­ect ver­wen­den, bei der eine Ergeb­nis­liste ent­we­der um den Schema-Key ergänzt wird, falls das Ele­ment kein Struct­Type ist und nicht zu den Keys der oben defi­nier­ten Map­ping-Liste gehört, oder ansons­ten um einen rekur­si­ven Funk­ti­ons­auf­ruf ergänzt wird.

Vor der fina­len Aus­gabe wer­den nun noch ein­mal alle Spal­ten­na­men auf Namen mit der Punkt-Nota­tion über­prüft und ggf. geän­dert. Dies ver­hin­dert, dass Spal­ten­na­men, die durch ihre vor­her ver­schach­telte Struk­tur noch nicht geän­dert wer­den konn­ten, über­se­hen werden.

Das Ergeb­nis der Aus­gabe ist nun der neu struk­tu­rierte Dataframe.

Bei­spiel Analyse

Wie oben bereits ein­mal beschrie­ben, wird an die­ser Stelle auf Log-Daten von Onlinen­ut­zern zurück­ge­grif­fen. Der Anwen­dungs­kon­text die­ses Bei­spiels ist eine Cus­to­mer-Jour­ney-Ana­lyse, mit der es mög­lich ist, für bestimmte Fir­men­kam­pa­gnen Schwach­stel­len in der Kun­den­kon­ver­sion zu fin­den. Dafür müs­sen Kun­den zunächst bestim­men Kam­pa­gnen ein­deu­tig zuge­ord­net wer­den, was über die Auf­ruf-URL mög­lich ist.

2. Reale Bei­spiel-Datei der Log-Daten

Dies geschieht auf Basis von Log-Daten, wel­che über eine Daten­stre­cke im CSV-For­mat gelie­fert wer­den. In die­ser CSV befin­det sich ledig­lich eine ein­zige Spalte von Ein­trä­gen im JSON-For­mat.
Zunächst wird also wie oben beschrie­ben ein Data­frame erstellt, mit den extra­hier­ten Daten bela­den und anschlie­ßend mit dem wei­te­ren Code ange­passt. Der Extrak­ti­ons­teil bestand in die­sem Fall aus:

df_out = rdd.withColumn(column, _udf_logparse(col(column)))
df_out = self._spark.read.json(df_out.rdd.map(lambda _: _[column]))

Nach der Aus­gabe des pro­vi­so­ri­schen Data­frames müs­sen zei­len­ba­sierte Fil­ter ange­wen­det wer­den, um Infor­ma­tio­nen aus den gespei­cher­ten URL-Ein­trä­gen zu extrahieren.

Dafür wird eine Reihe Reg­Exp basier­ter Aus­drü­cke geschrie­ben, die mittels

getvalue_udf = udf ( lambda column, url: get_value(column, url) )

zu Funk­tio­nen wer­den, die im PySpark-Kon­text auf Zei­len-Ebene aus­ge­führt wer­den kön­nen. Diese Funk­tio­nen kön­nen dann in jeder Zeile die URL auf Sub­strings unter­su­chen und diese in eine sepa­rate Spalte schrei­ben. Wie bereits zuvor wird auch hier dar­auf geach­tet, den alten Data­frame zu erweitern.

Vor der Spei­che­rung der Daten wird anschlie­ßend ein letz­tes Sel­ect auf den Data­frame ange­wen­det, um die für die jewei­lige Extrak­tion rele­van­ten Daten zu extra­hie­ren, und die gefil­ter­ten Daten aus dem Data­frame wer­den mit­tels JDBC-Con­nec­tor in die ana­ly­ti­sche Daten­bank des Kun­den geschrieben.

Der Code kann par­al­lel für ver­schie­dene Unter­be­rei­che des Unter­neh­mens aus­ge­führt werden.

3. Bei­spiel Aus­gabe eines erstel­len Spark Dataframes

Was man sich mer­ken sollte

Fas­sen wir hier also kurz noch ein­mal zusam­men, wel­che Dinge wir im Kon­text der Daten­auf­be­rei­tung von Log-Daten im Kopf behal­ten soll­ten. Die Gewin­nung der Infor­ma­tio­nen aus Log-Daten ist für die Opti­mie­rung von Geschäfts­pro­zes­sen und der Kun­de­n­er­fah­rung lang­fris­tig von Wert. Die Pro­bleme, die dabei auf­tre­ten, bestehen sim­pel gesagt aus dem Zusam­men­spiel von zwei Pro­ble­men: Der Kom­ple­xi­tät der Strings und der Masse an Daten. Aus die­sen resul­tie­ren hohe Anfor­de­run­gen an Res­sour­cen und lange Lauf­zei­ten, wel­che nur durch effi­zi­en­tes Pro­gram­mie­ren ver­mie­den wer­den kön­nen. Dabei ist expli­zit dar­auf zu ach­ten, dass Data­frames wei­test­ge­hend wei­ter­ver­wen­det, d.h. umstruk­tu­riert oder ergänzt wer­den soll­ten, um RAM zu spa­ren. Des Wei­te­ren soll­ten die String-Search-Befehle aus­schließ­lich auf Basis von Reg­Exp erfol­gen, da z.B. der klas­si­sche find-Befehl nicht aus­rei­chend opti­miert ist, um für grö­ßere Daten­men­gen bzw. Zei­len­ope­ra­tio­nen geeig­net zu sein.