Positionierung von Spark und Iceberg im Data Lake
Spark ist eine vielseitige Plattform zum Transformieren und Analysieren von grossen Datasets. In der Regel werden Daten aus Files (Parquet, Avro) gelesen, verarbeitet und in neuen Files abgelegt. Um die Administration dieser Files zu erleichtern, insbesondere wenn mit strukturierten Daten gearbeitet wird, kommt ein Metastore zum Einsatz. Dieser bietet eine Abstraktionsebene zwischen logisch angelegten Tabellen und den effektiv hinterlegten Files.
Apache Iceberg ist ein neues File Format, welches immer häufiger in Data lakes eingesetzt wird. Als Einführung zu Iceberg haben wir vor kurzen bereits einem Blogbeitrag veröffentlicht: saracus Blog: Apache Iceberg, eine revolutionäre Lösung zur Verwaltung von Data Lakes. Iceberg nutzt ein ausgeklügeltes System an Metadaten Files, um seine neuen Features wie Time-Travel oder Deletes auf File-basierten Tabellen zu ermöglichen. Neben den File-basierten Metadaten kommt bei Iceberg ein Metastore zum Einsatz. Damit das Zusammenspiel zwischen den Metastores von Spark und Iceberg korrekt funktioniert, sind einige Anpassungen in der Spark Config notwendig. Die Dokumentation hierfür ist spärlich und Erfahrungswerte sind nur bedingt online zu finden. Insbesondere der Umgang mit Spark Catalogs ist nur vage dokumentiert. Deshalb haben wir hier unser Wissen zusammengetragen, damit Sie erfolgreich mit Spark Catalogs und dem Iceberg Tabellenformat arbeiten können.
Verwendung des internen Metadaten-Katalogs in Spark
Per Default wird in Spark ein in-memory Data Catalog verwendet, in welchem Metadaten abgelegt werden können. Analog zur Implementation in databricks werden sogenannte three-tier Namespaces genutzt, um ein Objekt im Catalog eindeutig zu identifizieren. Der default in-memory Catalog wird über den Namen spark_catalog angesteuert:
# Show available catalogs, databases and tables
spark.sql("SHOW CATALOGS").show()
spark.sql("SHOW DATABASES IN spark_catalog").show()
spark.sql("SHOW TABLES IN spark_catalog.default").show()
Output:
+-------------+
| catalog|
+-------------+
|spark_catalog|
+-------------+
+---------+
|namespace|
+---------+
| default|
+---------+
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+
Die häufigste Verwendung für den in-memory Catalog ist das Erstellen von temporären Views. Diese werden automatisch im default Spark Catalog, hier «spark_catalog» genannt, registriert und werden nach Beenden der Spark Session wieder gelöscht.
# Read in source data
berries_df = spark.read.option("header",True).csv("s3a://some_bucket/berries.csv")
# Create TempView and select from it
berries_df.createOrReplaceTempView("all_berries_temp")
spark.sql("SHOW TABLES IN spark_catalog.default").show()
spark.sql("SELECT * FROM all_berries_temp").show()
Output:
+---------+----------------+-----------+
|namespace| tableName|isTemporary|
+---------+----------------+-----------+
| |all_berries_temp| true|
+---------+----------------+-----------+
+------------+------+-----+
| Name| Color|Taste|
+------------+------+-----+
| Strawberry| red| 3|
| Blueberry| blue| 3|
|Blackcurrent| blue| 2|
| Physalis|orange| 1|
+------------+------+-----+
Der Nachteil dieser Lösung ist, dass die Metadaten des in-memory Catalogs nur für die aktive Spark Session verfügbar sind. Eine andere Spark Session hat keinen Zugang zu den Metadaten. Auch andere Services können darauf nicht zugreifen. Sollen die Metadaten persistiert werden und für andere Services zugänglich sein, gibt es die Möglichkeit einen externen Metastore, z.B. einen Hive Metastore, zu verwenden.
Verwendung eines externen Hive Metastores in Spark
Seit einiger Zeit verwendet Apache Hive den Hive Metastore, mit dem Metadaten zentral geführt werden. Der Hive Metastore unterstützt sowohl Tabellen als auch Views. Im Gegensatz zu den temporären Views sind die eigentlichen Views persistent und stehen dadurch auch anderen Applikationen zur Verfügung. Tabellen und persistierte Views können mit Spark SQL erstellt werden, wie weiter unten im Beispiel gezeigt wird. Alternativ kann die pyspark.sql.Catalog.createTable Syntax zur Erstellung von Tabellen verwendet werden.
Zur Verwendung eines externen Hive Metastores in Spark muss die Spark Config angepasst werden. Der default spark_catalog kann direkt durch den HMS ersetzt werden. Damit wird sichergestellt, dass in jedem Falle mit dem externen HMS gearbeitet wird und es nicht zu Verwechslungen mit dem in-memory Catalog kommt. Beim Aufruf von Spark können die folgenden Argumente mitgegeben werden:
spark = (SparkSession
.builder
.appName("TEST-APP")
.config("spark.sql.catalogImplementation","hive")
.config("spark.hadoop.hive.metastore.uris","thrift://METASTORE:9083")
.getOrCreate()
)
Mit dieser Konfiguration kann nun die Erstellung von Tabellen und Views im Hive Metastore demonstriert werden:
# Create table in default warehouse location
spark.sql("CREATE TABLE spark_catalog.default.berry_table AS SELECT * FROM all_berries_temp")
# Create view based on newly defined table
spark.sql("CREATE VIEW spark_catalog.default.berry_view AS SELECT * FROM spark_catalog.default.berry_table where color = 'blue'")
spark.sql("SELECT * FROM spark_catalog.default.berry_view").show()
Output:
+------------+-----+-----+
| Name|Color|Taste|
+------------+-----+-----+
| Blueberry| blue| 3|
|Blackcurrent| blue| 2|
+------------+-----+-----+
Im Gegensatz zum in-memory Catalogs, sind auch nach Beenden der Spark Session die so erstellten Tabellen und Views im Metastore persistiert und von einer neuen Session aus abrufbar.
Apache Iceberg und der Metadatenkatalog
Wie bereits in der Einführung erwähnt, greift auch das Iceberg Tabellenformat auf einen Metastore zurück. Hier kann ein Hive Metastore verwendet werden. Spark benötigt verschiedene Packages, um Tabellen im Iceberg Format zu schreiben. Diese müssen vorgängig installiert und in der Spark Config mitgegeben werden. Dies wird in den Grundzügen in der Iceberg Dokumentation aufgezeigt: Apache Iceberg, Spark Configuration
Die zuvor gezeigte Spark Config muss angepasst werden, damit beim Schreiben im Iceberg Format die korrekten Packages verwendet werden. Auf Empfehlung des Herstellers wird neben dem eigentlichen Iceberg Catalog ein sogenannter Session Catalog verwendet. So kann sowohl mit Iceberg als auch konventionellen Objekten im HMS gearbeitet werden.
spark = (SparkSession
.builder
.appName("TEST-APP")
.config("spark.sql.catalog.iceberg_catalog","org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_catalog.type","hive")
.config("spark.sql.catalog.iceberg_catalog.uri","thrift://METASTORE:9083")
.config("spark.sql.catalog.spark_catalog.type","hive")
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")
.getOrCreate()
)
Der Name des Iceberg Catalogs wird in der Spark Config gesetzt und kann beliebig angepasst werden. Auch hier wird die three-tier Namespace Nomenklatur verwendet, um Tabellen und Views zu referenzieren.
# Read & Write to/from iceberg using the iceberg catalog
spark.sql("CREATE TABLE iceberg_catalog.default.berry_iceberg_table USING ICEBERG AS SELECT * FROM all_berries_temp where taste > 2")
spark.sql("SELECT * FROM iceberg_catalog.default.berry_iceberg_table").show()
Output:
+----------+-----+-----+
| Name|Color|Taste|
+----------+-----+-----+
|Strawberry| red| 3|
| Blueberry| blue| 3|
+----------+-----+-----+
Der Nachteil an diesem Setup ist, dass die Catalog Klasse für den Session Catalog «org.apache.iceberg.spark.SparkSessionCatalog» keine Views unterstützt. Es gibt zwar Pläne, dies in Zukunft umzusetzen, im Moment ist dies jedoch nicht direkt möglich. Beim Anzeigen von Views erhält man eine entsprechende Fehlermeldung:
# Working with views in the session catalog results in error messages
try:
spark.sql("SHOW VIEWS IN iceberg_catalog.default").show()
except AnalysisException as e:
print(e)
Output:
Catalog iceberg_catalog does not support views.
Catalog-übergreifende Queries ermöglichen Views auf Iceberg Tabellen
Glücklicherweise muss man nicht auf das Apache Iceberg Entwicklerteam warten, um Views nativ auf Iceberg nutzen zu können. Mit der richtigen Spark Konfiguration und der Verwendung eines zusätzlichen Catalogs ist es trotz der oben beschriebenen Limitation möglich, Views auf eine Iceberg Tabelle aufzusetzen.
spark = (SparkSession
.builder
.appName("TEST-APP")
.config("spark.sql.catalog.iceberg_catalog","org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_catalog.type","hive")
.config("spark.sql.catalog.iceberg_catalog.uri","thrift://METASTORE:9083")
.config("spark.sql.catalogImplementation","hive")
.config("spark.hadoop.hive.metastore.uris","thrift://METASTORE:9083")
.getOrCreate()
)
Die hier angegebene Spark Config ist eine Kombination der bereits zuvor gezeigten Code Snippets. Der iceberg_catalog stellt die Iceberg Funktionalitäten zur Verfügung. Im zweiten Teil der Config wird aber nicht der org.apache.iceberg.spark.SparkSessionCatalog verwendet, sondern ein regulärer, externer Hive Metastore. Dieser ersetzt den Default spark_catalog und kann für externe Tabellen, aber auch Views verwendet werden.
Wie an der Config zu sehen ist, zeigen beide definierten catalogs auf denselben HMS. Dies ist problemlos möglich. In beiden Catalogs sind dieselben Tabellen zu sehen. Beim Lesen und Schreiben muss darauf Acht gegeben werden, dass der korrekte three-tier Namespace verwendet wird. Im spark_catalog können nun Views erstellt und abgefragt werden, welche auf Iceberg Tabellen im iceberg_catalog zugreifen.
# Create and read view built on an iceberg table
spark.sql("CREATE VIEW spark_catalog.default.berry_view AS SELECT * FROM iceberg_catalog.default.berry_iceberg_table WHERE color = 'red'")
spark.sql("SHOW VIEWS IN spark_catalog.default").show()
spark.sql("SELECT * FROM spark_catalog.default.berry_view").show()
Output:
+---------+----------+-----------+
|namespace| viewName|isTemporary|
+---------+----------+-----------+
| default|berry_view| false|
+---------+----------+-----------+
+----------+-----+-----+
| Name|Color|Taste|
+----------+-----+-----+
|Strawberry| red| 3|
+----------+-----+-----+
Ausbau und Personalisierung von externen Hive Metastores
Wie in diesem Artikel beschrieben, gibt es für Spark Metadaten Kataloge viele verschiedene Konfigurationsmöglichkeiten. Einige Anbieter, wie Apache Iceberg, stellen sogar eigene Packages und Klassen zur Verfügung, um die Funktionen des Metadatenkataloges zu erweitern. Die Funktionen der Kataloge an sich können beliebig ausgebaut werden. Leider ist die Dokumentation für die Konfiguration und den Umgang mit den Metastores oft lückenhaft und nicht besonders detailliert. Wir hoffen, dass wir Ihnen mit diesem Artikel das Rüstzeug vermitteln konnten, damit Sie in Zukunft erfolgreich mit Spark und Apache Iceberg arbeiten können.