Einführung
LLMs haben sich in vielen Branchen und Anwendungsfällen als unglaublich vielseitig erwiesen. Die Umwandlung von Daten ist ein großartiges Beispiel dafür, wo LLMs glänzen können. In diesem Artikel wird Daniel Myers von Snowflake erläutern, wie LLMs JSON-Objekte auf konsistente Weise transformieren können und wie diese Transformationen in Snowflake in großem Umfang angewendet werden können.
Überblick über die Architektur
Die folgende Architektur zeigt auf einer hohen Ebene, wie diese Datenpipeline aussieht. Insbesondere in den mit 2 und 3 gekennzeichneten Teilen wird ein LLM zur Transformation der eingehenden Daten verwendet.
Kurz gesagt, die Reihenfolge der Vorgänge umfasst:
- Speichern von JSON-Daten in einem AWS S3-Bucket
- Der Snowpipe-Service von Snowflake lauscht auf S3-Ereignisse und liest diese Daten automatisch in eine Tabelle ein (wir nennen dies raw)
- Mithilfe von Snowflake Streams und Tasks wandeln wir die eingehenden JSON-Rohdaten um und speichern sie in einer neuen Tabelle (wir nennen sie transformiert).
- Innerhalb der Aufgabe, die wir für diese Umwandlung definieren, rufen wir den LLM als externe Snowflake-Funktion mit einer vordefinierten Eingabeaufforderung auf und übergeben ihm eine Zeile aus dem Snowflake-Stream.
- Der LLM antwortet mit dem transformierten JSON und wir speichern dieses neue JSON-Objekt in einem VARIANT-Spaltentyp innerhalb der transformierten Tabelle.
LLM Prompt Engineering für Datenpipelines
Der erste Teil dieses Prozesses besteht darin, einen LLM-Prompt zu entwickeln und zu testen, den wir zur dynamischen Generierung von JSON-Transformationen verwenden können, und ihn dann für die Verwendung in unseren Anwendungen zu schablonisieren. Dieser Prozess des Definierens, Verfeinerns und Testens von LLM-Prompts für bestimmte Aufgaben wird Prompt Engineering genannt.
Hinweis: LLMs können einige erstaunliche Dinge tun, aber ihre Ausgaben sind nicht garantiert zu 100% konsistent (oder korrekt!), d.h. wenn Sie einen LLM bitten, den Durchschnitt zu berechnen, könnte er die Antwort falsch geben. Seien Sie sich dessen bewusst, wenn Sie LLMs für kritische Datenpipelines verwenden, bei denen Genauigkeit wichtig ist.
Für den Anwendungsfall der JSON-Transformation hat Daniel Myers unten eine Aufforderung zusammengestellt. Lassen Sie uns genau aufschlüsseln, was in jedem Teil der Aufforderung passiert und warum jeder Teil dieser Aufforderung benötigt wird.
Beispiel LLM Aufforderung:
Your job is to transform JSON objects. You will get a sample input and sample output, and you will apply the same transformation to a new JSON object. This transformation takes the average of the numbers in the 'temps' array and returns it as a new field 'avgTemp'
Here's an example of how your input and output should look:
Sample Input:
{
"device": "124233",
"location": "texas",
"temps": [0,5,10,15,20]
}
Expected Output:
{
"device": "124233",
"location": "texas",
"avgTemp": 10
}
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.
{
"device": "231231",
"location": "mars",
"temps": [0,5,10,15,20,25,30]
}
Es gibt einige Teile in dieser Aufforderung, die wichtig zu verstehen sind:
Your job is to transform JSON objects. Daniel will give you sample input and sample output, and you will apply the same transformation to a new JSON object.
Dies gibt dem LLM einen klaren Kontext, was sein Zweck ist. In diesem Fall sollte der LLM nicht erklären, was JSON ist, er sollte nicht beschreiben, was in dem JSON-Objekt enthalten ist, das wir ihm zur Verfügung stellen, und er sollte keinen Code schreiben, der die Umwandlung für uns durchführen kann – sein Zweck ist es, das JSON selbst umzuwandeln und es an uns zurückzugeben.
This transformation takes the average of the numbers in the ‘temps’ array and returns it as a new field ‘avgTemp’
Dies ist ein wichtiger Aspekt der Eingabeaufforderung. Eine einzelne Beispieleingabe und ‑ausgabe kann die Logik hinter der Transformation selbst nicht erfassen oder genau beschreiben, so dass die Transformation auf neue JSON-Objekte angewendet werden kann. Die Beschreibung der Transformation in einfacher Sprache hilft dabei, Unter- oder Überschreitungen dessen, was eine einzelne Beispieleingabe und ‑ausgabe zeigt, abzuschwächen.
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.
Diese abschließende Beschreibung vor der Bereitstellung des neuen JSON-Objekts macht dem LLM klar, dass er nichts anderes tun soll, als das umgewandelte JSON zurückzugeben – keine Erklärungen usw. Wir brauchen den zurückgegebenen Text als gültiges JSON für unsere Datenpipeline.
Snowflake Externe Funktionen & LLMs
Rufen wir nun die Eingabeaufforderung mit ChatGPT von OpenAI auf, um diese Transformationen in Snowflake durchzuführen! Dazu werden wir eine Snowflake External Function verwenden. Eine externe Funktion ist eine Möglichkeit, eine API außerhalb der Snowflake-Umgebung aufzurufen.
Um diese externe Funktion einzurichten, benötigen Sie einen API-Schlüssel von OpenAI. Eine ausführliche Anleitung finden Sie in diesem Artikel über die Einrichtung der Integration.
Sobald Sie Ihre Schlüssel haben und die externe Funktion eingerichtet haben, können Sie die REST-API von OpenAI direkt von Snowflake SQL aus aufrufen, etwa so:
select OPENAI_EXT_FUNC(‘Classify this sentiment: OpenAI is Awesome!’)::VARIANT:choices[0]:text as response;
LLMs in Snowflake Streams und Tasks
Anhand dieser Aufforderung richten wir einen Snowflake-Task ein, um eine externe Funktion aufzurufen und Daten zu übergeben, die wir umwandeln möchten:
-- Create a landing table to store raw JSON data.
-- Snowpipe could load data into this table.
create or replace table raw (var variant);
-- Create a table that we will store the transformed data
create or replace table transformed (var variant);
-- Create a stream to capture inserts to the landing table.
-- A task will consume a set of columns from this stream.
create or replace stream rawstream1 on table raw;
-- Create a task that inserts new transformed device records from the rawstream1 stream into the transformed table
-- every minute when the stream contains records.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task transform_json
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream1')
as
insert into transformed
select OPENAI_EXT_FUNC($$Your job is to transform JSON objects. I will give you a sample input and sample output, and you will apply the same transformation to a new JSON object. This transformation takes the average of the numbers in the 'temps' array and returns it as a new field 'avgTemp'
Here's an example of how your input and output should look:
Sample Input:
{
"device": "124233",
"location": "texas",
"temps": [0,5,10,15,20]
}
Expected Output:
{
"device": "124233",
"location": "texas",
"avgTemp": 10
}
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.
$$||rawstream1.var)::VARIANT:choices[0]:text as var from rawstream1;
-- Resume task.
alter task transform_json resume;
-- Insert a set of records into the landing table.
insert into raw
select parse_json(column1)
from values
('{"device": "124233","location": "texas","temps": [0,5,10,15,20]}'),
('{"device": "124243","location": "texas","temps": [0,5,10,15,20,25,30]}');
-- Query the change data capture record in the table streams
select * from rawstream1;
-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);
-- Query the table streams again.
-- Records should be consumed and no longer visible in streams.
-- Verify the records were inserted into the target tables.
select * from transformed;
-- Insert another set of records into the landing table.
insert into raw
select parse_json(column1)
from values
('{"device": "124123","location": "texas","temps": [90,95,100]}'),
('{"device": "123412","location": "texas","temps": [3,4,5]}');
-- Wait for the tasks to run.
call system$wait(70);
-- Records should be consumed and no longer visible in streams.
select * from rawstream1;
-- Verify the records were inserted into the target tables.
select * from transformed;
Aus dem obigen Code ergeben sich ein paar kritische Teile:
- Einrichten der Rohtabelle und der transformierten Tabellen.
- Richten Sie den rawstream1 auf der rohen Tabelle ein.
- Erstellen Sie die transform_json Aufgabe, die ausgeführt wird, wenn der Stream Daten enthält. Diese Aufgabe ruft unsere externe Funktion OPENAI_EXT_FUNC auf und speichert die Ergebnisse in der transformierten Tabelle.
- Wir überprüfen, ob alles funktioniert, indem wir einige Testdaten einfügen und die Stream-Ausgabe und die resultierenden transformierten Daten untersuchen.
Quelle: Snowflake.com
Erfahren Sie hier mehr über Lösungen im Bereich Snowflake oder besuchen Sie eines unserer kostenlosen Webinare.