4 datapipeline-övningar som du (förmodligen) inte visste om

Marina near Kaag

Bakom varje teknisk produkt finns det automatisering att göra för att hålla data ren och uppdaterad. Bara att veta hur man bygger coola datamodeller är inte tillräckligt för att överleva i dataforskarnas rymden. Du måste dyka djupare.

Nuförtiden prioriteras datapipelines mer.

Detta beror främst på branschens mognad. Dataforskare inser att för att få ren data för sina modeller måste de bygga en solid grund för sitt datalager.

Därför, förutom dedikerade dataingenjörer, måste datavetare själva förstå datapipelining. I den här artikeln kommer vi att prata om några datapipeline-praxis som du bör implementera i din design för att säkerställa ett stabilt dataekosystem.

Förutsättningar

Tillvägagångssätten i den här artikeln täcker allmänna koncept för datapipelining. Det spelar ingen roll vilket arbetsflödeshanteringssystem du använder, du kan tillämpa dessa koncept var som helst.

Personligen har jag använt Apache Airflow för batchjobb och Apache Beam för streamingjobb. Jag tycker att de är ganska effektiva.

Det kommer också att vara till hjälp om du kan grundläggande Python eftersom det är det dominerande programmeringsspråket i dataindustrin.

I traditionell pipelining finns det grundläggande ETL/ELT-jobb.

Oavsett vilken typ av jobb du arbetar med kommer det att finnas ett behov av triggern för att starta själva jobbet.

Vi kommer sedan fram till debatten om schematriggers vs algoritmiska triggers.

Schemalägg utlösare

Din grundläggande typ av trigger.

Schemaläggare finns i alla typer av arbetsflödeshanteringssystem.

Det låter dig starta jobbet med ett konstant tidsintervall som varje minut, timme, dag, vecka, etc.

Schemautlösare är användbara för oundvikliga jobb. Detta innebär att de måste köra oavsett beroende och ofta inte misslyckas under några omständigheter.

Algoritmiska triggers

En anpassad typ av trigger.

Algoritmiska triggers finns i många former. De vanligaste är sensorer och triggers från andra jobb. Det låter dig börja jobbet under vissa förutsättningar. Exempel är –

  • En fil importeras till en mapp
  • En rad läggs till i en tabell
  • Flera beroende jobb är klara


Algoritmiska utlösare är användbara för jobb med beroenden. Ofta kommer dessa jobb att misslyckas om inte vissa villkor är uppfyllda.

Domen

Vilken typ av trigger du ska använda beror på vilken typ av jobb du har.
Att ha rätt beroende och utlösande struktur är avgörande för att bygga en solid pipelining-arkitektur.

Ställ frågor som –

  • Ska det här jobbet vara igång varje dag?
  • Tar det här jobbet upp mer resurser än vad som krävs?
  • Hur ofta ska jobbet utföras?
  • Finns det flera förutsättningar för jobbets framgång?


I en perfekt värld fungerar alla våra jobb bra.
Men vi vet att det inte är möjligt.
Lär dig när ditt jobb misslyckas. Du blir bättre med erfarenhet.

Korrekta varningar

Våra pipelines går sönder. Det är inte slutet på världen.

Det viktiga är vad vi gör i efterhand. Vi måste få besked när detta händer. Oftast räcker det inte med grundläggande varning för att uppnå detta.

De flesta arbetsflödeshanteringssystem erbjuder grundläggande varningslösningar. Men organisationer lägger ofta till sin anpassade varningsnivå.

E-post

Den enklaste lösningen. Detta bör åtminstone inkluderas.

Ett e-postmeddelande skickas till alla medlemmar i teamet för att varna dem om att jobbet har misslyckats. Detta är inte den mest effektiva lösningen eftersom vi inte alla ständigt stirrar på våra e-postmeddelanden. Vi kan missa detta då och då.

Här är en mycket grundläggande form av e-postvarning i Airflow.

från airflow.utils.email import send_email
email_to = '[email protected]'
email_subject = '(FAILED) ' + jobbnamn + datum
email_content = 'Ditt jobb har misslyckats.'
send_email(email_to, email_subject, email_content)

Slack

Nästa nivå av varningar. Om du är inom tekniken är chansen stor att ditt företag har antagit slack som en form av kommunikationskanal. Slack skickar ett klickljud (som jag är ganska säker på att vi alla är bekanta med) när meddelanden skickas.

Skapa en kanal dedikerad till varningar. Låt dina pipelines skicka varningar till kanalen vid fel. Detta kommer att säkerställa att teamet blir varnat när något går fel.

Här är en grundläggande form av en slack-varning med dess API.

från slack import WebClient
klient = WebClient(token = 'din token här')
svar = client.chat_postMessage(
kanal = slack_cannel,
text = meddelande
)

Korrekt loggning

I programmering är loggning bara ett fint ord för att definiera processen att skriva ner allt du gör. Det finns två typer av stockar i pipeliningprocessen. Uppgiftsloggar och jobbloggar.

Uppgiftsloggar

Dessa loggar är till för att registrera viktig information under hela utförandet av uppgiften. Vi vill inte skriva ut varje rad med kod vi kör, men det bör finnas tillräckligt med loggar för att hjälpa oss att navigera genom uppgiften när vi tittar på dem.

Här är ett exempel –


Import PostgresHook
#Extraction Job
def ExtractFromSource(query):
    query_to_run = query
    logging.info("Query : %" query_to_run)
    
    cursor = PostgresHook(connection).get_conn().cursor()
    logging.info("Connecting to Postgres Connection %" connection)
    cursor.execute(query_to_run)
    result = cursor.fetchall()

Loggar fungerar som en brytpunkt under felsökning. De är användbara för oss för att fastställa felkällan vid uppgiftsfel.

I detta utvinningsjobb loggar vi frågan och anslutningen.Detta uppnår två saker –

  • Informerar oss om att uppgiften fungerar bra tills varje punkt.
  • Visar oss frågan och anslutningsvariabeln som skickas

Om något går fel kan vi tydligt avgöra vilket steg som är felaktigt, samt kontrollera om frågan eller anslutningsvariabeln är felaktig.

Jobbloggar

Jobbloggar är register över jobbkörningar. Åtminstone visar de start till slut tid och status för jobbet.
Om ditt arbetsflödeshanteringssystem inte redan har detta som standard bör du ha detta lagrat någonstans, kanske till och med i ditt datalager.

Skapa en revisionstabell i ditt datalager. Infoga en rad för varje kört jobb med dessa data. Dataforskare och analytiker kan enkelt navigera genom tabellen för att kontrollera den senaste jobbkörningen för vissa tabeller och om jobbet körs framgångsrikt.

Checking Duplications

If you’re extracting or transforming data, chances are you’ve encountered this error before. Duplications in your extraction/transformation.

This might be due to several reasons like —

  • Faulty joins
  • Job Triggered Twice
  • Dirty Data

This can be rectified by adding a step to check for duplications in your pipeline. One of the methods to achieve this is checking count by primary keys.

Select max(count) from (
Select primary_key, count(*) from table group by primary_key)

If the result returns more than 1, it shows that duplications exist in your data.
If your data does not have primary keys, you can also assign it to them.

alter table TABLENAME add primary key (ID,NAME,PMID)

Slutsats

Du har framgångsrikt förbättrat dina datapipelines. Jag har plockat upp dessa koncept genom åren och hoppas att du kan spara lite tid genom att dela dem. Du har framgångsrikt förbättrat dina datapipelines. Jag har plockat upp dessa koncept genom åren och hoppas att du kan spara lite tid genom att dela dem.

Leave a reply:

Your email address will not be published.