Motivation
Für die meisten Firmen stellen Log-Daten (oder Log-Files) eine signifikante Datengrundlage für Analyse, Optimierung und Informationsgewinnung dar. Sie sind allgegenwärtig, da ihre Generierung mit den fundamentalsten Vorgängen der IT einhergehen. Obwohl diese Daten bei nahezu allen Benutzer-Handlungen und System-Aktionen generiert werden, nutzen viele Firmen Log-Daten nach wie vor fast ausschließlich für die Entwicklung von neuem Code oder der Korrektur von Code auf produktiven Systemen.
Daten aus System-Vorgängen, User-Handlungen oder die Überwachung von Automatisierungen werden oft vernachlässigt und auf Erfolgsmeldungen reduziert. Dabei besteht gerade an diesen Stellen oft Optimierungspotential.
Als Beispiel für interessante Daten kann man an dieser Stelle z.B. Log-Daten von User-Interaktionen in einem Web-Service betrachten. Von Quelldaten, User-IDs, URLs, Zeitpunkten über verschiedene weitere Identifier können nahezu beliebig viele Daten aus jedem Eintrag gezogen werden.
Um diese Daten für SQL-Datenbanken verfügbar zu machen, besteht die größte Hürde oft in der Strukturierung der Daten, da Log-Daten oft eine Reihe unterschiedlicher Informationen beinhalten, welche von Eintrag zu Eintrag variieren können. Genau dieses Problem soll dieser Beitrag beleuchten, wobei eine klassische Methode für den Aufbau eines Spark-Dataframes dargestellt wird.
Das klassische Vorgehen besteht dabei zunächst in der Analyse der Log-Daten, d.h. ihrer Textstruktur, und die anschließende Extraktion der Daten mit Hilfe der erhaltenen Informationen.
Voraussetzungen
Zunächst sollte kurz beschrieben werden, welche Voraussetzungen vorab erfüllt werden müssen, um diesen Ansatz in seinen Grundzügen imitieren zu können. Neben einer funktionsfähigen Spark-Instanz werden die PySpark- und RegExp-Bibliotheken aus Python verwendet.
Sobald die Lieferstrecke der Files bzw. das Speicherverzeichnis in die Umgebung eingebunden wurden, können die Files mit klassischen Read-Befehlen (Spark.Read.Load(…)) nutzbar gemacht werden.
An dieser Stelle muss zunächst eine Analyse der Struktur der Log-Daten erfolgen, um die Daten für die weitere Verarbeitung vorbereiten zu können. Die Struktur der Daten sowie ihre Komplexität können sich dabei – je nach Anwendungsbezug – stark unterscheiden. Diese Informationen werden benötigt, um die Daten mit entsprechenden Regular Expressions filtern und bereinigen zu können.
Der Code, der für die Extraktion der Daten genutzt werden soll, muss außerdem noch an die Nodes (Arbeitseinheiten von Spark-Instanzen) verteilt werden. Dafür muss das Paket in die entsprechende Spark-Instanz eingebunden werden.
Es sollte an dieser Stelle auch gesagt werden, dass für die parallele Verarbeitung von Log-Files auf verschiedenen Umgebungen ein entsprechendes Maß an Rechenleistung benötigt wird. Um dabei Arbeitsspeicher zu sparen, sollten so wenig Dataframes wie nötig aufgebaut werden.
Methodik
Anlieferung der Daten
Für die Verarbeitung der Daten nehmen wir an dieser Stelle an, dass die Eingabedaten in Form eines einzeiligen Eintrags in einem Spark-Dataframe übergeben werden. Dieses Dataframe (DF) soll als Platzhalter für die weitere Verarbeitung dienen.
Zu Beginn erfolgt eine Typüberprüfung der Eingabedaten, der für das Auslesen der Daten benötigt wird. Nach jener Überprüfung werden die Daten ausgelesen. Dieser Schritt kann sich je nach Form der Daten ändern und unterschiedlich komplex ausfallen. Im Beispiel weiter unten werden die Daten zeilenweise als String im JSON-Format ausgelesen. Anschließen muss erneut geprüft werden, ob das Dataframe nun gefüllt oder leer ist.
Um Duplikate zu vermeiden, wird nun eine Korrektur der Spalten vorgenommen und eine Mapping-Liste parallel miterstellt. Das Ziel ist es, aus [„usr“, „use“, „use“] ein Objekt der Form [„usr“, „use_0“, „use_1“] zu machen. Die zugehörige Mapping-Struktur der geänderten Zeilen besitzt die Form {„use“: [„use_0“, „use_1“]}. Neue Spalten werden nun an den Dataframe angehängt und mit Hilfe der Mapping-Tabelle kann die Struktur erhalten werden und Mehrfachnennungen entfernt werden.
An dieser Stelle können nun auch die Zeitpunkte und Daten in bestimmten Formaten gespeichert werden. PySpark bringt mit den to_timestamp()- und to_date()-Funktionen die Möglichkeit mit, vordefinierte Formate als String zu übergeben.
Dataframe-Verarbeitung
Um die Struktur nun weiter zu glätten, müssen die verschiedenen Spalten nun auf ihren Datentypen überprüft werden. Alle Spalten, die den Typ Array haben, können mittels der explode-outer-Funktion von PySpark aufgeteilt werden. Dafür kann eine Liste der array-wertigen Spalten angelegt werden und für jedes Element, das entfernt wird, erfolgt ein neues Select auf den Dataframe, bei dem die skalaren, verbleibenden array-wertigen und neuen exploded-Spalten ausgewählt werden.
Nach diesem Schritt müssen die Spaltennamen, die einen Punkt beinhalten, angepasst werden, da diese eine Notation verwenden („ ‚a.B‚“), die nicht ausgegeben werden soll. Dafür werden zunächst nur die nicht-verschachtelten Zeilen angepasst.
Der vorletzte Schritt besteht darin, den Dataframe final zu glätten. Dafür kann man eine rekursive Funktion in einem Select verwenden, bei der eine Ergebnisliste entweder um den Schema-Key ergänzt wird, falls das Element kein StructType ist und nicht zu den Keys der oben definierten Mapping-Liste gehört, oder ansonsten um einen rekursiven Funktionsaufruf ergänzt wird.
Vor der finalen Ausgabe werden nun noch einmal alle Spaltennamen auf Namen mit der Punkt-Notation überprüft und ggf. geändert. Dies verhindert, dass Spaltennamen, die durch ihre vorher verschachtelte Struktur noch nicht geändert werden konnten, übersehen werden.
Das Ergebnis der Ausgabe ist nun der neu strukturierte Dataframe.
Beispiel Analyse
Wie oben bereits einmal beschrieben, wird an dieser Stelle auf Log-Daten von Onlinenutzern zurückgegriffen. Der Anwendungskontext dieses Beispiels ist eine Customer-Journey-Analyse, mit der es möglich ist, für bestimmte Firmenkampagnen Schwachstellen in der Kundenkonversion zu finden. Dafür müssen Kunden zunächst bestimmen Kampagnen eindeutig zugeordnet werden, was über die Aufruf-URL möglich ist.
Dies geschieht auf Basis von Log-Daten, welche über eine Datenstrecke im CSV-Format geliefert werden. In dieser CSV befindet sich lediglich eine einzige Spalte von Einträgen im JSON-Format.
Zunächst wird also wie oben beschrieben ein Dataframe erstellt, mit den extrahierten Daten beladen und anschließend mit dem weiteren Code angepasst. Der Extraktionsteil bestand in diesem Fall aus:
df_out = rdd.withColumn(column, _udf_logparse(col(column)))
df_out = self._spark.read.json(df_out.rdd.map(lambda _: _[column]))
Nach der Ausgabe des provisorischen Dataframes müssen zeilenbasierte Filter angewendet werden, um Informationen aus den gespeicherten URL-Einträgen zu extrahieren.
Dafür wird eine Reihe RegExp basierter Ausdrücke geschrieben, die mittels
getvalue_udf = udf ( lambda column, url: get_value(column, url) )
zu Funktionen werden, die im PySpark-Kontext auf Zeilen-Ebene ausgeführt werden können. Diese Funktionen können dann in jeder Zeile die URL auf Substrings untersuchen und diese in eine separate Spalte schreiben. Wie bereits zuvor wird auch hier darauf geachtet, den alten Dataframe zu erweitern.
Vor der Speicherung der Daten wird anschließend ein letztes Select auf den Dataframe angewendet, um die für die jeweilige Extraktion relevanten Daten zu extrahieren, und die gefilterten Daten aus dem Dataframe werden mittels JDBC-Connector in die analytische Datenbank des Kunden geschrieben.
Der Code kann parallel für verschiedene Unterbereiche des Unternehmens ausgeführt werden.
Was man sich merken sollte
Fassen wir hier also kurz noch einmal zusammen, welche Dinge wir im Kontext der Datenaufbereitung von Log-Daten im Kopf behalten sollten. Die Gewinnung der Informationen aus Log-Daten ist für die Optimierung von Geschäftsprozessen und der Kundenerfahrung langfristig von Wert. Die Probleme, die dabei auftreten, bestehen simpel gesagt aus dem Zusammenspiel von zwei Problemen: Der Komplexität der Strings und der Masse an Daten. Aus diesen resultieren hohe Anforderungen an Ressourcen und lange Laufzeiten, welche nur durch effizientes Programmieren vermieden werden können. Dabei ist explizit darauf zu achten, dass Dataframes weitestgehend weiterverwendet, d.h. umstrukturiert oder ergänzt werden sollten, um RAM zu sparen. Des Weiteren sollten die String-Search-Befehle ausschließlich auf Basis von RegExp erfolgen, da z.B. der klassische find-Befehl nicht ausreichend optimiert ist, um für größere Datenmengen bzw. Zeilenoperationen geeignet zu sein.