Heute möchte ich mei­nen ers­ten Ein­druck von Micro­soft Fabric Lake­house mit Ihnen tei­len. Micro­soft Fabric ist ein neues Pro­dukt im Ange­bot von Micro­soft, das sich der­zeit in der Pre­view-Phase befin­det. Sie kön­nen sich jedoch regis­trie­ren und es 60 Tage lang tes­ten. Fabric ent­hält meh­rere Kom­po­nen­ten wie Data Fac­tory, One­Lake Data, Ware­house und Lake­house, die ich in die­sem Bei­trag vor­stel­len werde.

Die Lake­house-Platt­form stützt sich voll­stän­dig auf die Spark-Engine. Spark selbst bie­tet Fle­xi­bi­li­tät und ermög­licht es uns, ver­schie­dene Pro­gram­mier­spra­chen wie Spark, Scala, SQL oder Python in Note­books für Daten­ope­ra­tio­nen mit den in One­Lake gespei­cher­ten Dateien zu ver­wen­den. Es ist erwäh­nens­wert, dass Lake­house und Ware­house zwar einige Gemein­sam­kei­ten auf­wei­sen, aber auch einen bedeu­ten­den Unter­schied haben. Das Ware­house bie­tet ein umfas­sen­des T‑SQL-Erleb­nis, wäh­rend Lake­house in ers­ter Linie als Spark-basier­tes Engi­nee­ring-Tool dient.

Fabric Lake­house erstellen

Sie kön­nen ein neues Lake­house erstel­len, indem Sie im Menü auf der lin­ken Seite auf die Schalt­flä­che „Erstel­len“ klicken.

Nach eini­ger Zeit sollte Ihr Lake­house fer­tig sein. Über die­ses Menü (Bild­schirm unten) kön­nen Sie auf Tabel­len und Dateien in Ihrem Lake­house zugrei­fen und neue Ele­mente wie Daten­pipe­lines (Data Fac­tory), Notiz­bü­cher und Dataf­lows (die neue Ver­sion von Dataf­low in Data Fac­tory) erstellen.

Die Aus­füh­rung von Code in Notizbüchern

Die­ser Bei­trag wird sich auf die Python/S­park-Funk­tio­nen in Fabric kon­zen­trie­ren, also wähle ich „Neues Notiz­buch“ aus dem obe­ren Menü, um mit der Code­ent­wick­lung zu beginnen.

Ich habe ein Notiz­buch erstellt und einen Code geschrie­ben, um mit Python Daten aus dem Inter­net zu impor­tie­ren, den Sie unten finden.

# Welcome to your new notebook
# Type here in the cell editor to add code!
from datetime import datetime, timedelta
import requests
import json
import pandas as pd
from builtin.utils import create_array_first_last_day_of_year

LINK = "/lakehouse/default/Files/nbp/"

def import_gold_prices(date: tuple) -> list[(str, str)]:
    """
    This function returns an array of date and gold price.

    Args:
        date: tuple: date range.

    Returns:
        list[str, str]: list of prices.
    """
    url = f"https://api.nbp.pl/api/cenyzlota/{date[0]}/{date[1]}"
    response = requests.get(url)
    response.raise_for_status()
    data = json.loads(response.text)
    gold_prices = []

    for cena_zlota in data:
        date = cena_zlota["data"]
        price = cena_zlota["cena"]
        gold_prices.append((date, price))

    return gold_prices

def import_usd_prices(date: tuple[str, str]) -> list[(str, str)]:
    """
    This function returns an array of date and usd price.

    Args:
        date: tuple: date range.

    Returns:
        list[str, str]: list of prices.
    """
    url = f"https://api.nbp.pl/api/exchangerates/rates/a/usd/{date[0]}/{date[1]}/"
    response = requests.get(url)
    response.raise_for_status()
    data = json.loads(response.text)
    prices = []

    for cena_usd in data["rates"]:
        date = cena_usd["effectiveDate"]
        price = cena_usd["mid"]
        prices.append((date, price))
        
    return prices

def save_gold_df(dest_path: str) -> None:
    start_year = 2013 
    end_year = 2023
    first_last_days_of_years = create_array_first_last_day_of_year(start_year, end_year)
    arr = []

    for date in first_last_days_of_years:
            
        arr_gold = import_gold_prices(date)
        arr.extend(arr_gold)

    pd.DataFrame(arr, columns=["date","price"]).to_csv(f'{dest_path}gold.csv', index=False)   

   
def save_usd_df(dest_path: str) -> None:
    """
    Saves USD dataframe

    Args:
    Returns:
    """
    start_year = 2006
    end_year = 2023
    first_last_days_of_years = create_array_first_last_day_of_year(start_year, end_year)
    arr = []


    for date in first_last_days_of_years:
        gold_prices_data = import_usd_prices(date)
        arr.extend(gold_prices_data)
        pd.DataFrame(arr, columns=["date","price"]).to_csv(f'{dest_path}usd.csv', index=False) 

def import_flat_price(dest_path) -> None:
    url = "https://static.nbp.pl/dane/rynek-nieruchomosci/ceny_mieszkan.xlsx"
    response = requests.get(url)
    response.raise_for_status()
    with open(f"{dest_path}flat_prices.xlsx", "wb") as file:
        file.write(response.content)

# 
if __name__ == "__main__":
    save_gold_df("/lakehouse/default/Files/nbp/")
    save_usd_df("/lakehouse/default/Files/nbp/")
    import_flat_price("/lakehouse/default/Files/nbp/")

Um den Code aus­zu­füh­ren, habe ich auf die Schalt­flä­che „Alles aus­füh­ren“ geklickt.

Die Geschwin­dig­keit der Aus­füh­rung war beein­dru­ckend. Anstatt minu­ten­lang zu war­ten, wie bei Azure Syn­apse Spark Pool oder Dat­ab­ricks, dau­erte es nur 5 Sekun­den, um mein Skript zu star­ten und aus­zu­füh­ren. Der Bild­schirm unten zeigt das Ergeb­nis der Code-Aus­füh­rung. Wie Sie sehen kön­nen, sind die Dateien im rech­ten Menü zugäng­lich. Ich kann auf sie in mei­nem Note­book über ein Menü im Linux-Stil zugreifen.

Schön, die Dateien wur­den impor­tiert, und jetzt brau­che ich ein Skript, um sie zu berei­ni­gen und in das erwar­tete For­mat zu kon­ver­tie­ren. Ich habe ein wei­te­res Note­book und ein Skript im sel­ben Lake­house erstellt. Ich konnte keine Option fin­den, die es mir erlaubt, Abhän­gig­kei­ten auf dem Clus­ter zu instal­lie­ren, aber ich kann den Befehl „pip install“ aus­füh­ren, um die erfor­der­li­chen Python-Biblio­the­ken in mein Note­book zu importieren.

pip install duckdb
import pandas as pd 
import duckdb
from builtin.utils import convert_to_last_day_of_quarter, generate_days_in_years


def clean_flats(dest_path):
    flats = pd.read_excel(f"{dest_path}flat_prices.xlsx", header=6, usecols="X:AO", sheet_name="Rynek pierwotny")
    cities = ['Białystok','Bydgoszcz','Gdańsk','Gdynia','Katowice','Kielce','Kraków','Lublin','Łódź','Olsztyn','Opole',
    'Poznań','Rzeszów','Szczecin','Warszawa','Wrocław','Zielona Góra']
    flats.columns = flats.columns.str.replace(".1", "")
    flats.columns = flats.columns.str.replace("*", "")

    flats = flats[flats['Kwartał'].notna()]
    flats_unpivot = pd.melt(flats, id_vars='Kwartał', value_vars=cities)
    flats_unpivot['date'] = flats_unpivot.apply(lambda row: convert_to_last_day_of_quarter(row['Kwartał']),axis=1)
    flats_unpivot['date'] = pd.to_datetime(flats_unpivot['date'])
    flats_unpivot['city'] = flats_unpivot['variable']
   
    flats_unpivot.to_parquet(f"{dest_path}flats_price.parquet")


def clean_currency(dest_path):
    
    gold = pd.read_csv(f"{dest_path}gold.csv")
    gold['date'] = pd.to_datetime(gold['date'])
    gold['currency'] = 'gold'
    calendar = pd.DataFrame(generate_days_in_years(2006,2023), columns=["date","last_date"])
    calendar['date'] = pd.to_datetime(calendar['date'])
    usd = pd.read_csv(f"{dest_path}usd.csv")
    usd['date'] = pd.to_datetime(usd['date'])
    usd['currency'] = 'usd'

    currency = pd.concat([gold, usd], ignore_index=True, sort=False)
    # fill gups
    usd = duckdb.sql("""
                     
                     select
                         date,
                         price,
                         currency,
                     from
                     (
                     select
                     row_number() over (partition by currency, a.date order by b.date desc) lp,
                     a.date,
                     b.date org_date,
                     b.price,    
                     currency,
                     from
                     calendar a left join currency b on b.date between a.date - INTERVAL 3 DAY and a.date 
                     
                     )
                     WHERE
                     lp = 1
                     order by date
                     """).to_df()

    usd.to_parquet(f"{dest_path}currency.parquet")

if __name__ == "__main__":
    clean_flats("/lakehouse/default/Files/nbp/")
    clean_currency("/lakehouse/default/Files/nbp/")
    clean_currency("/lakehouse/default/Files/nbp/")

Wie­derum wurde mein Code inner­halb weni­ger Sekun­den aus­ge­führt, und ich konnte die neuen Dateien in mei­nem Lake­house sehen.

Schließ­lich kann ich mei­nen Bericht auf der Grund­lage der gesam­mel­ten und umge­wan­del­ten Dateien erstel­len. Ich habe das Notiz­buch „Bericht“ erstellt und den fol­gen­den Code aus­ge­führt, um die Berichts­da­tei zu erstellen.

pip install duckdb
import pandas as pd 
import duckdb
from builtin.utils import convert_to_last_day_of_quarter, generate_days_in_years


def report(source_path):
     calendar = pd.DataFrame(generate_days_in_years(2006,2023), columns=["date","last_date"])
     flats_price = pd.read_parquet(f"{source_path}flats_price.parquet")
     currency = pd.read_parquet(f"{source_path}currency.parquet")

     df_data = duckdb.sql("""
               select 
                    a.date, 
                    a.value  flat_price, 
                    b.price gold, 
                    b.price*31 ounce, 
                    c.price usd,
                    a.value / c.price flat_price_usd,
                    a.value  / (b.price * 31) flat_price_gold,
                    (a.value - lag(a.value) over (order by a.date))/lag(a.value) over (order by a.date) m2mgrow
                                                
               from 
               flats_price a 
               left join currency b on a.date = b.date and b.currency = 'gold'
               left join currency c on a.date = c.date and c.currency = 'usd'
               where
               city = 'Warszawa'
               order by a.date
               """).to_df()

     df_data.to_csv(f"{source_path}data.csv", encoding='utf-8', index=False)
     df_data.to_parquet(f"{source_path}flats_report.parquet")


if __name__ == "__main__":
     report("/lakehouse/default/Files/nbp/")

Wie Sie viel­leicht bemer­ken, ver­wende ich Spark nicht, weil ich einen Code repli­ziere, den ich lokal auf mei­nem Com­pu­ter ent­wi­ckelt und getes­tet habe. Außer­dem ist nicht jeder Data Sci­en­tist mit Spark ver­traut, und es gibt Anwen­dungs­fälle, in denen es nicht benö­tigt wird.

Wenn Sie es jedoch vor­zie­hen, mit Apa­che Spark zu arbei­ten, kön­nen Sie natür­lich Spark-Befehle ver­wen­den, um Daten umzu­wan­deln, wie im fol­gen­den Bei­spiel gezeigt. Wenn Sie Ihre Dateien im Delta-For­mat spei­chern, ste­hen Ihnen außer­dem Tabel­len in der Lake­house-Struk­tur unter dem Kno­ten „tables“ zur Verfügung.

df = spark.read.parquet("Files/nbp/flats_price.parquet")
df.write.mode("overwrite").saveAsTable("flats_report")
df = spark.read.parquet("Files/nbp/currency.parquet")
df.write.mode("overwrite").saveAsTable("dim_currency")

Fabric Pipe­lines

Wenn wir die Ent­wick­lung des Note­books abge­schlos­sen haben, kön­nen wir mit der Pro­zes­sor­ches­trie­rung unter Ver­wen­dung von Fabric fort­fah­ren. Fabric unter­stützt dies durch das Modul Data Fac­tory, das eine breite Palette von Akti­vi­tä­ten bie­tet. Zu die­sen Akti­vi­tä­ten gehö­ren das Kopie­ren von Daten aus exter­nen Quel­len, die Aus­füh­rung gespei­cher­ter Pro­ze­du­ren, die Ite­ra­tion durch Ele­mente, die Aus­füh­rung von Notiz­bü­chern und vie­les mehr.

Hier sehen Sie den Fluss, den ich für die Orches­trie­rung mei­nes Pro­zes­ses zur Ana­lyse von Pau­schal­prei­sen erstellt habe. Mit­hilfe von Pipe­lines kön­nen wir Abhän­gig­kei­ten zwi­schen den Schrit­ten des Pro­zes­ses herstellen.

SQL End­point

Die Funk­tion, die mich am meis­ten inter­es­siert, ist der SQL-End­punkt, bei dem es sich um ein auto­ma­tisch aus einem Lake­house gene­rier­tes Ware­house han­delt. Damit kann ich eine von Spark erstellte Tabelle mit SQL abfra­gen und sie dann mit Power BI nut­zen. Unten fin­den Sie einen Bericht, den ich auf der Grund­lage der Aus­gabe des Berichts­no­tiz­buchs erstellt habe. Es ist wirk­lich erstaun­lich, wie schnell man ver­schie­dene Tech­no­lo­gien mit­ein­an­der ver­bin­den kann, um seine Daten zu visualisieren.

Anmer­kung:

Um die Dateien mit SQL End­point abzu­fra­gen, müs­sen Sie sie im Delta-For­mat speichern.

df = spark.read.parquet("Files/nbp/flats_report.parquet")
# df now is a Spark DataFrame containing parquet data from "Files/nbp/flats_report.parquet".
df.write.format("delta").mode("overwrite").saveAsTable("flats_report1")

Zusam­men­fas­sung

Micro­soft Fabric befin­det sich noch in der Vor­schau­phase, aber es scheint sich in Rich­tung einer ein­heit­li­chen Umge­bung für die Arbeit mit Daten inner­halb Ihres Unter­neh­mens zu bewe­gen. Diese Lösung zielt dar­auf ab, die Inte­gra­tion von Data Engi­nee­ring, Data Sci­ence und Power BI Report­ing zu erleich­tern. Es ist jedoch anzu­mer­ken, dass sie noch nicht pro­duk­ti­ons­reif ist und einige Ver­bes­se­run­gen erfor­dert. Kleine Aktua­li­sie­run­gen sind erfor­der­lich, um die Benut­zer­freund­lich­keit zu ver­bes­sern; ich habe zum Bei­spiel eine Weile gebraucht, um her­aus­zu­fin­den, wie man den Namen eines Notiz­buchs ändert. Außer­dem ist es wich­tig zu erwäh­nen, dass die Notiz­bü­cher nicht die­sel­ben Res­sour­cen nut­zen, so dass man für jedes Notiz­buch sepa­rate Dateien bereit­stel­len muss (z. B. ein Python-Modul). Ich frage mich, was mit bestehen­den Tools wie ADF und Azure Syn­apse in Bezug auf die Fabric-Imple­men­tie­rung pas­sie­ren wird. Nichts­des­to­trotz arbei­tet Micro­soft aktiv daran, und wir kön­nen erwar­ten, neue Funk­tio­nen in die­sem leis­tungs­star­ken Tool zu sehen.

Quelle: medium.com

Erfah­ren Sie mehr über Lösun­gen im Bereich Data Manage­ment oder besu­chen Sie eines unse­rer kos­ten­lo­sen Web­i­nare.