Posi­tio­nie­rung von Spark und Ice­berg im Data Lake

Spark ist eine viel­sei­tige Platt­form zum Trans­for­mie­ren und Ana­ly­sie­ren von gros­sen Data­sets. In der Regel wer­den Daten aus Files (Par­quet, Avro) gele­sen, ver­ar­bei­tet und in neuen Files abge­legt. Um die Admi­nis­tra­tion die­ser Files zu erleich­tern, ins­be­son­dere wenn mit struk­tu­rier­ten Daten gear­bei­tet wird, kommt ein Metastore zum Ein­satz. Die­ser bie­tet eine Abs­trak­ti­ons­ebene zwi­schen logisch ange­leg­ten Tabel­len und den effek­tiv hin­ter­leg­ten Files. 

Apa­che Ice­berg ist ein neues File For­mat, wel­ches immer häu­fi­ger in Data lakes ein­ge­setzt wird. Als Ein­füh­rung zu Ice­berg haben wir vor kur­zen bereits einem Blog­bei­trag ver­öf­fent­licht: saracus Blog: Apa­che Ice­berg, eine revo­lu­tio­näre Lösung zur Ver­wal­tung von Data Lakes. Ice­berg nutzt ein aus­ge­klü­gel­tes Sys­tem an Meta­da­ten Files, um seine neuen Fea­tures wie Time-Tra­vel oder Dele­tes auf File-basier­ten Tabel­len zu ermög­li­chen. Neben den File-basier­ten Meta­da­ten kommt bei Ice­berg ein Metastore zum Ein­satz. Damit das Zusam­men­spiel zwi­schen den Metasto­res von Spark und Ice­berg kor­rekt funk­tio­niert, sind einige Anpas­sun­gen in der Spark Con­fig not­wen­dig. Die Doku­men­ta­tion hier­für ist spär­lich und Erfah­rungs­werte sind nur bedingt online zu fin­den. Ins­be­son­dere der Umgang mit Spark Cata­logs ist nur vage doku­men­tiert. Des­halb haben wir hier unser Wis­sen zusam­men­ge­tra­gen, damit Sie erfolg­reich mit Spark Cata­logs und dem Ice­berg Tabel­len­for­mat arbei­ten können.

Ver­wen­dung des inter­nen Meta­da­ten-Kata­logs in Spark

Per Default wird in Spark ein in-memory Data Cata­log ver­wen­det, in wel­chem Meta­da­ten abge­legt wer­den kön­nen. Ana­log zur Imple­men­ta­tion in dat­ab­ricks wer­den soge­nannte three-tier Name­spaces genutzt, um ein Objekt im Cata­log ein­deu­tig zu iden­ti­fi­zie­ren. Der default in-memory Cata­log wird über den Namen spark_catalog angesteuert:

# Show available catalogs, databases and tables
spark.sql("SHOW CATALOGS").show()
spark.sql("SHOW DATABASES IN spark_catalog").show()
spark.sql("SHOW TABLES IN spark_catalog.default").show()
Output:
+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+
+---------+
|namespace|
+---------+
|  default|
+---------+
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

Die häu­figste Ver­wen­dung für den in-memory Cata­log ist das Erstel­len von tem­po­rä­ren Views. Diese wer­den auto­ma­tisch im default Spark Cata­log, hier «spark_catalog» genannt, regis­triert und wer­den nach Been­den der Spark Ses­sion wie­der gelöscht.

# Read in source data
berries_df = spark.read.option("header",True).csv("s3a://some_bucket/berries.csv")
# Create TempView and select from it
berries_df.createOrReplaceTempView("all_berries_temp")
spark.sql("SHOW TABLES IN spark_catalog.default").show()
spark.sql("SELECT * FROM all_berries_temp").show()
Output:
+---------+----------------+-----------+
|namespace|       tableName|isTemporary|
+---------+----------------+-----------+
|         |all_berries_temp|       true|
+---------+----------------+-----------+
+------------+------+-----+
|        Name| Color|Taste|
+------------+------+-----+
|  Strawberry|   red|    3|
|   Blueberry|  blue|    3|
|Blackcurrent|  blue|    2|
|    Physalis|orange|    1|
+------------+------+-----+

Der Nach­teil die­ser Lösung ist, dass die Meta­da­ten des in-memory Cata­logs nur für die aktive Spark Ses­sion ver­füg­bar sind. Eine andere Spark Ses­sion hat kei­nen Zugang zu den Meta­da­ten. Auch andere Ser­vices kön­nen dar­auf nicht zugrei­fen. Sol­len die Meta­da­ten per­sis­tiert wer­den und für andere Ser­vices zugäng­lich sein, gibt es die Mög­lich­keit einen exter­nen Metastore, z.B. einen Hive Metastore, zu verwenden.

Ver­wen­dung eines exter­nen Hive Metasto­res in Spark

Seit eini­ger Zeit ver­wen­det Apa­che Hive den Hive Metastore, mit dem Meta­da­ten zen­tral geführt wer­den. Der Hive Metastore unter­stützt sowohl Tabel­len als auch Views. Im Gegen­satz zu den tem­po­rä­ren Views sind die eigent­li­chen Views per­sis­tent und ste­hen dadurch auch ande­ren Appli­ka­tio­nen zur Ver­fü­gung. Tabel­len und per­sis­tierte Views kön­nen mit Spark SQL erstellt wer­den, wie wei­ter unten im Bei­spiel gezeigt wird. Alter­na­tiv kann die pyspark.sql.Catalog.createTable Syn­tax zur Erstel­lung von Tabel­len ver­wen­det werden.

Zur Ver­wen­dung eines exter­nen Hive Metasto­res in Spark muss die Spark Con­fig ange­passt wer­den. Der default spark_catalog kann direkt durch den HMS ersetzt wer­den. Damit wird sicher­ge­stellt, dass in jedem Falle mit dem exter­nen HMS gear­bei­tet wird und es nicht zu Ver­wechs­lun­gen mit dem in-memory Cata­log kommt. Beim Auf­ruf von Spark kön­nen die fol­gen­den Argu­mente mit­ge­ge­ben werden:

spark = (SparkSession
         .builder
         .appName("TEST-APP")
         .config("spark.sql.catalogImplementation","hive")
         .config("spark.hadoop.hive.metastore.uris","thrift://METASTORE:9083")
         .getOrCreate()
        )

Mit die­ser Kon­fi­gu­ra­tion kann nun die Erstel­lung von Tabel­len und Views im Hive Metastore demons­triert werden:

# Create table in default warehouse location
spark.sql("CREATE TABLE spark_catalog.default.berry_table AS SELECT * FROM all_berries_temp")
# Create view based on newly defined table
spark.sql("CREATE VIEW spark_catalog.default.berry_view AS SELECT * FROM spark_catalog.default.berry_table where color = 'blue'")
spark.sql("SELECT * FROM spark_catalog.default.berry_view").show()
Output:
+------------+-----+-----+
|        Name|Color|Taste|
+------------+-----+-----+
|   Blueberry| blue|    3|
|Blackcurrent| blue|    2|
+------------+-----+-----+

Im Gegen­satz zum in-memory Cata­logs, sind auch nach Been­den der Spark Ses­sion die so erstell­ten Tabel­len und Views im Metastore per­sis­tiert und von einer neuen Ses­sion aus abrufbar.

Apa­che Ice­berg und der Metadatenkatalog

Wie bereits in der Ein­füh­rung erwähnt, greift auch das Ice­berg Tabel­len­for­mat auf einen Metastore zurück. Hier kann ein Hive Metastore ver­wen­det wer­den. Spark benö­tigt ver­schie­dene Packa­ges, um Tabel­len im Ice­berg For­mat zu schrei­ben. Diese müs­sen vor­gän­gig instal­liert und in der Spark Con­fig mit­ge­ge­ben wer­den. Dies wird in den Grund­zü­gen in der Ice­berg Doku­men­ta­tion auf­ge­zeigt: Apa­che Ice­berg, Spark Configuration

Die zuvor gezeigte Spark Con­fig muss ange­passt wer­den, damit beim Schrei­ben im Ice­berg For­mat die kor­rek­ten Packa­ges ver­wen­det wer­den. Auf Emp­feh­lung des Her­stel­lers wird neben dem eigent­li­chen Ice­berg Cata­log ein soge­nann­ter Ses­sion Cata­log ver­wen­det. So kann sowohl mit Ice­berg als auch kon­ven­tio­nel­len Objek­ten im HMS gear­bei­tet werden.

spark = (SparkSession
        .builder
        .appName("TEST-APP")
        .config("spark.sql.catalog.iceberg_catalog","org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.iceberg_catalog.type","hive")
        .config("spark.sql.catalog.iceberg_catalog.uri","thrift://METASTORE:9083")
        .config("spark.sql.catalog.spark_catalog.type","hive")
        .config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")
        .getOrCreate()
        )

Der Name des Ice­berg Cata­logs wird in der Spark Con­fig gesetzt und kann belie­big ange­passt wer­den. Auch hier wird die three-tier Name­space Nomen­kla­tur ver­wen­det, um Tabel­len und Views zu referenzieren.

# Read & Write to/from iceberg using the iceberg catalog
spark.sql("CREATE TABLE iceberg_catalog.default.berry_iceberg_table USING ICEBERG AS SELECT * FROM all_berries_temp where taste > 2")
spark.sql("SELECT * FROM iceberg_catalog.default.berry_iceberg_table").show()
Output:
+----------+-----+-----+
|      Name|Color|Taste|
+----------+-----+-----+
|Strawberry|  red|    3|
| Blueberry| blue|    3|
+----------+-----+-----+

Der Nach­teil an die­sem Setup ist, dass die Cata­log Klasse für den Ses­sion Cata­log «org.apache.iceberg.spark.SparkSessionCatalog» keine Views unter­stützt. Es gibt zwar Pläne, dies in Zukunft umzu­set­zen, im Moment ist dies jedoch nicht direkt mög­lich. Beim Anzei­gen von Views erhält man eine ent­spre­chende Fehlermeldung:

# Working with views in the session catalog results in error messages
try:
    spark.sql("SHOW VIEWS IN iceberg_catalog.default").show()
except AnalysisException as e:
    print(e)
Output:
Catalog iceberg_catalog does not support views.

Cata­log-über­grei­fende Queries ermög­li­chen Views auf Ice­berg Tabellen

Glück­li­cher­weise muss man nicht auf das Apa­che Ice­berg Ent­wick­ler­team war­ten, um Views nativ auf Ice­berg nut­zen zu kön­nen. Mit der rich­ti­gen Spark Kon­fi­gu­ra­tion und der Ver­wen­dung eines zusätz­li­chen Cata­logs ist es trotz der oben beschrie­be­nen Limi­ta­tion mög­lich, Views auf eine Ice­berg Tabelle aufzusetzen.

spark = (SparkSession
         .builder
         .appName("TEST-APP")
         .config("spark.sql.catalog.iceberg_catalog","org.apache.iceberg.spark.SparkCatalog")
         .config("spark.sql.catalog.iceberg_catalog.type","hive")
         .config("spark.sql.catalog.iceberg_catalog.uri","thrift://METASTORE:9083")
         .config("spark.sql.catalogImplementation","hive")
         .config("spark.hadoop.hive.metastore.uris","thrift://METASTORE:9083")
         .getOrCreate()
        )

Die hier ange­ge­bene Spark Con­fig ist eine Kom­bi­na­tion der bereits zuvor gezeig­ten Code Snip­pets. Der iceberg_catalog stellt die Ice­berg Funk­tio­na­li­tä­ten zur Ver­fü­gung. Im zwei­ten Teil der Con­fig wird aber nicht der org.apache.iceberg.spark.SparkSessionCatalog ver­wen­det, son­dern ein regu­lä­rer, exter­ner Hive Metastore. Die­ser ersetzt den Default spark_catalog und kann für externe Tabel­len, aber auch Views ver­wen­det werden.

Wie an der Con­fig zu sehen ist, zei­gen beide defi­nier­ten cata­logs auf den­sel­ben HMS. Dies ist pro­blem­los mög­lich. In bei­den Cata­logs sind die­sel­ben Tabel­len zu sehen. Beim Lesen und Schrei­ben muss dar­auf Acht gege­ben wer­den, dass der kor­rekte three-tier Name­space ver­wen­det wird. Im spark_catalog kön­nen nun Views erstellt und abge­fragt wer­den, wel­che auf Ice­berg Tabel­len im iceberg_catalog zugreifen.

# Create and read view built on an iceberg table
spark.sql("CREATE VIEW spark_catalog.default.berry_view AS SELECT * FROM iceberg_catalog.default.berry_iceberg_table WHERE color = 'red'")
spark.sql("SHOW VIEWS IN spark_catalog.default").show()
spark.sql("SELECT * FROM spark_catalog.default.berry_view").show()
Output:
+---------+----------+-----------+
|namespace|  viewName|isTemporary|
+---------+----------+-----------+
|  default|berry_view|      false|
+---------+----------+-----------+
+----------+-----+-----+
|      Name|Color|Taste|
+----------+-----+-----+
|Strawberry|  red|    3|
+----------+-----+-----+

Aus­bau und Per­so­na­li­sie­rung von exter­nen Hive Metastores

Wie in die­sem Arti­kel beschrie­ben, gibt es für Spark Meta­da­ten Kata­loge viele ver­schie­dene Kon­fi­gu­ra­ti­ons­mög­lich­kei­ten. Einige Anbie­ter, wie Apa­che Ice­berg, stel­len sogar eigene Packa­ges und Klas­sen zur Ver­fü­gung, um die Funk­tio­nen des Meta­da­ten­ka­ta­lo­ges zu erwei­tern. Die Funk­tio­nen der Kata­loge an sich kön­nen belie­big aus­ge­baut wer­den. Lei­der ist die Doku­men­ta­tion für die Kon­fi­gu­ra­tion und den Umgang mit den Metasto­res oft lücken­haft und nicht beson­ders detail­liert. Wir hof­fen, dass wir Ihnen mit die­sem Arti­kel das Rüst­zeug ver­mit­teln konn­ten, damit Sie in Zukunft erfolg­reich mit Spark und Apa­che Ice­berg arbei­ten können.