Ein­füh­rung

LLMs haben sich in vie­len Bran­chen und Anwen­dungs­fäl­len als unglaub­lich viel­sei­tig erwie­sen. Die Umwand­lung von Daten ist ein groß­ar­ti­ges Bei­spiel dafür, wo LLMs glän­zen kön­nen. In die­sem Arti­kel wird Daniel Myers von Snow­flake erläu­tern, wie LLMs JSON-Objekte auf kon­sis­tente Weise trans­for­mie­ren kön­nen und wie diese Trans­for­ma­tio­nen in Snow­flake in gro­ßem Umfang ange­wen­det wer­den können.

Über­blick über die Architektur

Die fol­gende Archi­tek­tur zeigt auf einer hohen Ebene, wie diese Daten­pipe­line aus­sieht. Ins­be­son­dere in den mit 2 und 3 gekenn­zeich­ne­ten Tei­len wird ein LLM zur Trans­for­ma­tion der ein­ge­hen­den Daten verwendet.

Kurz gesagt, die Rei­hen­folge der Vor­gänge umfasst:

  1. Spei­chern von JSON-Daten in einem AWS S3-Bucket
  2. Der Snow­pipe-Ser­vice von Snow­flake lauscht auf S3-Ereig­nisse und liest diese Daten auto­ma­tisch in eine Tabelle ein (wir nen­nen dies raw)
  3. Mit­hilfe von Snow­flake Streams und Tasks wan­deln wir die ein­ge­hen­den JSON-Roh­da­ten um und spei­chern sie in einer neuen Tabelle (wir nen­nen sie transformiert).
  4. Inner­halb der Auf­gabe, die wir für diese Umwand­lung defi­nie­ren, rufen wir den LLM als externe Snow­flake-Funk­tion mit einer vor­de­fi­nier­ten Ein­ga­be­auf­for­de­rung auf und über­ge­ben ihm eine Zeile aus dem Snowflake-Stream.
  5. Der LLM ant­wor­tet mit dem trans­for­mier­ten JSON und wir spei­chern die­ses neue JSON-Objekt in einem VARI­ANT-Spal­ten­typ inner­halb der trans­for­mier­ten Tabelle.

LLM Prompt Engi­nee­ring für Datenpipelines

Der erste Teil die­ses Pro­zes­ses besteht darin, einen LLM-Prompt zu ent­wi­ckeln und zu tes­ten, den wir zur dyna­mi­schen Gene­rie­rung von JSON-Trans­for­ma­tio­nen ver­wen­den kön­nen, und ihn dann für die Ver­wen­dung in unse­ren Anwen­dun­gen zu scha­blo­ni­sie­ren. Die­ser Pro­zess des Defi­nie­rens, Ver­fei­nerns und Tes­tens von LLM-Prompts für bestimmte Auf­ga­ben wird Prompt Engi­nee­ring genannt.

Hin­weis: LLMs kön­nen einige erstaun­li­che Dinge tun, aber ihre Aus­ga­ben sind nicht garan­tiert zu 100% kon­sis­tent (oder kor­rekt!), d.h. wenn Sie einen LLM bit­ten, den Durch­schnitt zu berech­nen, könnte er die Ant­wort falsch geben. Seien Sie sich des­sen bewusst, wenn Sie LLMs für kri­ti­sche Daten­pipe­lines ver­wen­den, bei denen Genau­ig­keit wich­tig ist.

Für den Anwen­dungs­fall der JSON-Trans­for­ma­tion hat Daniel Myers unten eine Auf­for­de­rung zusam­men­ge­stellt. Las­sen Sie uns genau auf­schlüs­seln, was in jedem Teil der Auf­for­de­rung pas­siert und warum jeder Teil die­ser Auf­for­de­rung benö­tigt wird.

Bei­spiel LLM Aufforderung:

Your job is to transform JSON objects. You will get a sample input and sample output, and you will apply the same transformation to a new JSON object. This transformation takes the average of the numbers in the 'temps' array and returns it as a new field 'avgTemp'

Here's an example of how your input and output should look:

Sample Input:

{
  "device": "124233",
  "location": "texas",
  "temps": [0,5,10,15,20]
}
Expected Output:

{
  "device": "124233",
  "location": "texas",
  "avgTemp": 10
}
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.

{
  "device": "231231",
  "location": "mars",
  "temps": [0,5,10,15,20,25,30]
}

Es gibt einige Teile in die­ser Auf­for­de­rung, die wich­tig zu ver­ste­hen sind:

Your job is to transform JSON objects. Daniel will give you sample input and sample output, and you will apply the same transformation to a new JSON object.

Dies gibt dem LLM einen kla­ren Kon­text, was sein Zweck ist. In die­sem Fall sollte der LLM nicht erklä­ren, was JSON ist, er sollte nicht beschrei­ben, was in dem JSON-Objekt ent­hal­ten ist, das wir ihm zur Ver­fü­gung stel­len, und er sollte kei­nen Code schrei­ben, der die Umwand­lung für uns durch­füh­ren kann – sein Zweck ist es, das JSON selbst umzu­wan­deln und es an uns zurückzugeben.

This transformation takes the average of the numbers in the ‘temps’ array and returns it as a new field ‘avgTemp’

Dies ist ein wich­ti­ger Aspekt der Ein­ga­be­auf­for­de­rung. Eine ein­zelne Bei­spiel­ein­gabe und ‑aus­gabe kann die Logik hin­ter der Trans­for­ma­tion selbst nicht erfas­sen oder genau beschrei­ben, so dass die Trans­for­ma­tion auf neue JSON-Objekte ange­wen­det wer­den kann. Die Beschrei­bung der Trans­for­ma­tion in ein­fa­cher Spra­che hilft dabei, Unter- oder Über­schrei­tun­gen des­sen, was eine ein­zelne Bei­spiel­ein­gabe und ‑aus­gabe zeigt, abzuschwächen.

Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.

Diese abschlie­ßende Beschrei­bung vor der Bereit­stel­lung des neuen JSON-Objekts macht dem LLM klar, dass er nichts ande­res tun soll, als das umge­wan­delte JSON zurück­zu­ge­ben – keine Erklä­run­gen usw. Wir brau­chen den zurück­ge­ge­be­nen Text als gül­ti­ges JSON für unsere Datenpipeline.

Snow­flake Externe Funk­tio­nen & LLMs


Rufen wir nun die Ein­ga­be­auf­for­de­rung mit ChatGPT von Ope­nAI auf, um diese Trans­for­ma­tio­nen in Snow­flake durch­zu­füh­ren! Dazu wer­den wir eine Snow­flake Exter­nal Func­tion ver­wen­den. Eine externe Funk­tion ist eine Mög­lich­keit, eine API außer­halb der Snow­flake-Umge­bung aufzurufen.

Um diese externe Funk­tion ein­zu­rich­ten, benö­ti­gen Sie einen API-Schlüs­sel von Ope­nAI. Eine aus­führ­li­che Anlei­tung fin­den Sie in die­sem Arti­kel über die Ein­rich­tung der Integration.

Sobald Sie Ihre Schlüs­sel haben und die externe Funk­tion ein­ge­rich­tet haben, kön­nen Sie die REST-API von Ope­nAI direkt von Snow­flake SQL aus auf­ru­fen, etwa so:

select OPENAI_EXT_FUNC(‘Classify this sentiment: OpenAI is Awesome!’)::VARIANT:choices[0]:text as response;

LLMs in Snow­flake Streams und Tasks


Anhand die­ser Auf­for­de­rung rich­ten wir einen Snow­flake-Task ein, um eine externe Funk­tion auf­zu­ru­fen und Daten zu über­ge­ben, die wir umwan­deln möchten:

-- Create a landing table to store raw JSON data.
-- Snowpipe could load data into this table.
create or replace table raw (var variant);

-- Create a table that we will store the transformed data
create or replace table transformed (var variant);

-- Create a stream to capture inserts to the landing table.
-- A task will consume a set of columns from this stream.
create or replace stream rawstream1 on table raw;


-- Create a task that inserts new transformed device records from the rawstream1 stream into the transformed table
-- every minute when the stream contains records.
-- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on.
create or replace task transform_json
warehouse = mywh
schedule = '1 minute'
when
system$stream_has_data('rawstream1')
as
insert into transformed 
  select OPENAI_EXT_FUNC($$Your job is to transform JSON objects. I will give you a sample input and sample output, and you will apply the same transformation to a new JSON object. This transformation takes the average of the numbers in the 'temps' array and returns it as a new field 'avgTemp'

Here's an example of how your input and output should look:

Sample Input:

{
  "device": "124233",
  "location": "texas",
  "temps": [0,5,10,15,20]
}
Expected Output:

{
  "device": "124233",
  "location": "texas",
  "avgTemp": 10
}
Now apply the same transformation to this JSON object. Do not explain the transformation, only return the new JSON object.

$$||rawstream1.var)::VARIANT:choices[0]:text as var from rawstream1;


-- Resume task.
alter task transform_json resume;

-- Insert a set of records into the landing table.
insert into raw
  select parse_json(column1)
  from values
  ('{"device": "124233","location": "texas","temps": [0,5,10,15,20]}'),
  ('{"device": "124243","location": "texas","temps": [0,5,10,15,20,25,30]}');

-- Query the change data capture record in the table streams
select * from rawstream1;

-- Wait for the tasks to run.
-- A tiny buffer is added to the wait time
-- because absolute precision in task scheduling is not guaranteed.
call system$wait(70);

-- Query the table streams again.
-- Records should be consumed and no longer visible in streams.

-- Verify the records were inserted into the target tables.
select * from transformed;

-- Insert another set of records into the landing table.
insert into raw
  select parse_json(column1)
  from values
  ('{"device": "124123","location": "texas","temps": [90,95,100]}'),
  ('{"device": "123412","location": "texas","temps": [3,4,5]}');

-- Wait for the tasks to run.
call system$wait(70);

-- Records should be consumed and no longer visible in streams.
select * from rawstream1;

-- Verify the records were inserted into the target tables.
select * from transformed;

Aus dem obi­gen Code erge­ben sich ein paar kri­ti­sche Teile:

  • Ein­rich­ten der Roh­ta­belle und der trans­for­mier­ten Tabellen.
  • Rich­ten Sie den rawstream1 auf der rohen Tabelle ein.
  • Erstel­len Sie die transform_json Auf­gabe, die aus­ge­führt wird, wenn der Stream Daten ent­hält. Diese Auf­gabe ruft unsere externe Funk­tion OPENAI_EXT_FUNC auf und spei­chert die Ergeb­nisse in der trans­for­mier­ten Tabelle.
  • Wir über­prü­fen, ob alles funk­tio­niert, indem wir einige Test­da­ten ein­fü­gen und die Stream-Aus­gabe und die resul­tie­ren­den trans­for­mier­ten Daten untersuchen.

Quelle: Snowflake.com

Erfah­ren Sie hier mehr über Lösun­gen im Bereich Snow­flake oder besu­chen Sie eines unse­rer kos­ten­lo­sen Web­i­nare.