Kas yra Apache Airflow ir kam jis reikalingas
Jei kada nors teko kurti sudėtingus duomenų apdorojimo procesus, tikrai žinote tą jausmą, kai viską reikia suvaldyti – užduotys turi vykti tam tikra tvarka, vienos priklauso nuo kitų, o kai kas nepavyksta, reikia žinoti kur ieškoti problemos. Būtent čia į sceną įžengia Apache Airflow – atvirojo kodo platforma, skirta darbo srautų (angl. workflows) kūrimui, planavimui ir stebėjimui.
Airflow gimė 2014 metais Airbnb kompanijoje, kai inžinieriai susidūrė su būtinybe valdyti vis sudėtingesnius duomenų procesus. Vėliau projektas tapo Apache Software Foundation dalimi ir išaugo į vieną populiariausių orkestravimo įrankių. Šiandien jį naudoja tūkstančiai įmonių – nuo startuolių iki technologijų gigantų.
Pagrindinė Airflow idėja paprasta – darbo srautus aprašote Python kodu. Tai reiškia, kad nereikia mokytis naujos kalbos ar sudėtingų konfigūracijos failų. Jei mokate Python, jau esate pusę kelio. Airflow leidžia kurti DAG (Directed Acyclic Graph) – nukreiptus aciklinius grafus, kurie apibrėžia užduočių seką ir jų tarpusavio priklausomybes.
Pagrindinės Airflow sąvokos ir architektūra
Prieš pradedant dirbti su Airflow, svarbu suprasti kelis pagrindinius terminus. DAG – tai jūsų darbo srauto aprašas, kuris nurodo, kokios užduotys turi būti atliktos ir kokia tvarka. Pavyzdžiui, galite turėti DAG, kuris kas rytą parsisiunčia duomenis iš API, juos apdoroja, įkelia į duomenų bazę ir išsiunčia ataskaitą.
Task (užduotis) – tai atskiras darbo vienetas jūsų DAG viduje. Kiekviena užduotis atlieka konkretų veiksmą: vykdo Python funkciją, paleisdžia Bash komandą, siunčia SQL užklausą ar bet ką kita. Užduotys gali būti įvairių tipų – Airflow palaiko daugybę operatorių skirtingiems poreikiams.
Operator – tai šablonas, apibrėžiantis, ką konkreti užduotis daro. PythonOperator vykdo Python funkciją, BashOperator – bash komandą, EmailOperator siunčia el. laišką. Yra šimtai įvairių operatorių, o jei nerandate tinkamo, galite sukurti savo.
Airflow architektūra susideda iš kelių komponentų. Scheduler nustato, kada paleisti užduotis pagal nustatytą tvarkaraštį. Executor vykdo užduotis – tai gali būti viename procese (SequentialExecutor), keliuose procesuose (LocalExecutor) arba paskirstytoje sistemoje (CeleryExecutor, KubernetesExecutor). Webserver teikia grafinę sąsają, kur galite stebėti darbo srautus, peržiūrėti logus ir valdyti vykdymą. O Metadata Database saugo visą informaciją apie DAG būsenas, užduočių vykdymo istoriją ir kitus metaduomenis.
Pirmieji žingsniai: DAG kūrimas
Sukurkime paprastą DAG, kad suprastumėte, kaip tai veikia praktiškai. Airflow DAG failai paprastai saugomi dags/ kataloge. Štai pavyzdys:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def process_data():
print("Apdorojami duomenys...")
# Čia būtų jūsų duomenų apdorojimo logika
return "Duomenys apdoroti sėkmingai"
with DAG(
'simple_data_pipeline',
default_args=default_args,
description='Paprastas duomenų apdorojimo DAG',
schedule_interval='0 6 * * *', # Kasdien 6 val. ryto
catchup=False,
tags=['example', 'data'],
) as dag:
start = BashOperator(
task_id='start_task',
bash_command='echo "Pradedamas darbas"'
)
process = PythonOperator(
task_id='process_data',
python_callable=process_data
)
finish = BashOperator(
task_id='finish_task',
bash_command='echo "Darbas baigtas"'
)
start >> process >> finish
Šiame pavyzdyje matome kelis svarbius dalykus. default_args nustato bendrus parametrus visoms užduotims – kas yra savininkas, nuo kada pradėti, kaip elgtis klaidos atveju. schedule_interval naudoja cron sintaksę ir nurodo, kada DAG turėtų būti paleidžiamas. Simbolis >> nustato užduočių priklausomybes – šiuo atveju jos vykdomos nuosekliai.
Svarbu suprasti catchup parametrą. Jei jį nustatysite į True ir jūsų DAG turėjo būti vykdomas praeityje, Airflow bandys „prisivyti” ir paleisti visus praleistus vykdymus. Tai gali būti naudinga duomenų užpildymui, bet dažnai geriau nustatyti False, kad būtų vykdomi tik būsimi paleidimų.
Užduočių priklausomybių valdymas
Viena iš Airflow stipriųjų pusių – lankstus užduočių priklausomybių valdymas. Galite kurti sudėtingas schemas, kur vienos užduotys laukia kelių kitų, arba viena užduotis paleidžia kelias lygiagrečias.
Priklausomybes galite nurodyti keliais būdais:
# Paprasčiausias būdas task1 >> task2 >> task3 # Arba atvirkštine kryptimi task3 << task2 << task1 # Viena užduotis priklauso nuo kelių [task1, task2, task3] >> task4 # Viena užduotis paleidžia kelias task1 >> [task2, task3, task4] # Sudėtingesnės schemos task1 >> task2 task1 >> task3 [task2, task3] >> task4
Kai turite sudėtingą logiką, galite naudoti BranchPythonOperator, kuris leidžia dinamiškai pasirinkti, kurią užduotį vykdyti toliau. Pavyzdžiui, priklausomai nuo datos ar duomenų turinio:
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
execution_date = context['execution_date']
if execution_date.weekday() < 5: # Darbo diena
return 'weekday_task'
else:
return 'weekend_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch,
provide_context=True
)
Dar viena naudinga funkcija – trigger rules. Pagal nutylėjimą užduotis vykdoma tik tada, kai visos priklausomos užduotys pavyko (all_success). Bet galite pakeisti šį elgesį:
all_failed– vykdyti, jei visos priklausomos užduotys nepavykoone_success– vykdyti, jei bent viena priklausoma užduotis pavykoone_failed– vykdyti, jei bent viena priklausoma užduotis nepavykoall_done– vykdyti, kai visos priklausomos užduotys baigtos, nepriklausomai nuo rezultato
Duomenų perdavimas tarp užduočių
Dažnai reikia perduoti duomenis iš vienos užduoties į kitą. Airflow tam siūlo XCom (cross-communication) mechanizmą. Tai leidžia užduotims "stumti" ir "traukti" nedidelius duomenų kiekius.
def extract_data(**context):
data = {'users': 150, 'revenue': 5000}
# Išsaugome duomenis XCom
context['task_instance'].xcom_push(key='metrics', value=data)
return data # return automatiškai irgi išsaugo XCom
def transform_data(**context):
# Gauname duomenis iš ankstesnės užduoties
ti = context['task_instance']
data = ti.xcom_pull(task_ids='extract_task', key='metrics')
# Arba tiesiog
data = ti.xcom_pull(task_ids='extract_task')
transformed = {
'users': data['users'] * 2,
'revenue': data['revenue'] * 1.1
}
return transformed
Tačiau atsargiai su XCom – jis skirtas nedideliems duomenims (tekstui, skaičiams, nedideliems žodynams). Jei reikia perduoti didelius duomenų kiekius, geriau naudoti išorinius saugyklas – S3, duomenų bazes ar failus. XCom saugo duomenis metaduomenų bazėje, todėl didelių objektų saugojimas gali ją perpildyti.
Dar vienas būdas dalintis informacija – Variables ir Connections. Variables leidžia saugoti globalius kintamuosius, kuriuos gali pasiekti bet kuri užduotis. Connections saugo prisijungimo prie išorinių sistemų informaciją (duomenų bazės, API, debesų paslaugos).
from airflow.models import Variable
# Nustatyti kintamąjį (paprastai per Web UI arba CLI)
# Variable.set("api_key", "secret_key_123")
# Gauti kintamąjį
api_key = Variable.get("api_key")
# Su numatytu reikšme, jei nerastas
timeout = Variable.get("api_timeout", default_var=30)
Klaidų valdymas ir stebėjimas
Realybėje ne viskas vyksta sklandžiai – API gali neveikti, duomenų bazė gali būti nepasiekiama, duomenys gali būti neteisingi. Airflow turi įrankių tokioms situacijoms valdyti.
Pirmiausia, galite nustatyti automatinį bandymą pakartoti (retries). Jei užduotis nepavyksta, Airflow automatiškai bandys ją paleisti dar kartą po nurodyto laiko:
task = PythonOperator(
task_id='unstable_task',
python_callable=my_function,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True, # Kiekvienas bandymas laukia ilgiau
max_retry_delay=timedelta(hours=1)
)
Galite nustatyti timeouts, kad užduotys neužstrigtų amžinai:
task = BashOperator(
task_id='long_running_task',
bash_command='python long_script.py',
execution_timeout=timedelta(hours=2), # Maksimalus vykdymo laikas
)
Airflow Web UI suteikia puikią vizualizaciją. Graph View rodo DAG struktūrą ir užduočių būsenas spalvomis – žalia (pavyko), raudona (nepavyko), geltona (vykdoma), pilka (laukiama). Tree View rodo istorinius vykdymus laiko juostoje. Gantt Chart parodo, kiek laiko užtruko kiekviena užduotis.
Kai kas nors nepavyksta, galite peržiūrėti logus tiesiogiai Web UI. Kiekviena užduotis turi savo logą, kur matote visą išvestį. Tai neįtikėtinai naudinga debuginant problemas.
Pranešimams galite naudoti įvairius kanalus:
from airflow.operators.email import EmailOperator
send_alert = EmailOperator(
task_id='send_email',
to='[email protected]',
subject='DAG {{ dag.dag_id }} Failed',
html_content='Užduotis nepavyko: {{ task_instance.task_id }}
',
trigger_rule='one_failed'
)
Arba integruoti su Slack, PagerDuty, ar kitomis sistemomis naudojant atitinkamus operatorius ar hooks.
Skalės ir našumo optimizavimas
Kai jūsų Airflow naudojimas auga, pradedate galvoti apie našumą ir skalę. Štai keletas patarimų, kaip išlaikyti viską sklandžiai veikiant.
Executor pasirinkimas turi didelę įtaką. SequentialExecutor tinka tik testavimui – jis vykdo užduotis po vieną. LocalExecutor naudoja procesus ir gali vykdyti kelias užduotis vienu metu viename serveryje. CeleryExecutor leidžia paskirstyti darbą keliuose serveriuose (workers). KubernetesExecutor kiekvienai užduočiai sukuria atskirą Kubernetes pod'ą – tai suteikia maksimalų lankstumą ir izoliaciją.
DAG dizainas irgi svarbus. Venkite per didelių DAG su šimtais užduočių – geriau suskaidyti į kelis mažesnius. Naudokite depends_on_past=False, jei nereikia, kad kiekvienas vykdymas priklausytų nuo ankstesnio – tai leidžia lygiagrečiai vykdyti kelis DAG run'us.
# Vietoj vieno milžiniško DAG
# Sukurkite kelis mažesnius ir naudokite TriggerDagRunOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger = TriggerDagRunOperator(
task_id='trigger_another_dag',
trigger_dag_id='secondary_pipeline',
wait_for_completion=True
)
Pools leidžia riboti lygiagrečiai vykdomų užduočių skaičių. Tai naudinga, kai turite ribotus resursus (pvz., duomenų bazės prisijungimų limitą):
task = PythonOperator(
task_id='db_task',
python_callable=query_database,
pool='database_pool', # Sukuriamas per Web UI
)
Metaduomenų bazės optimizavimas irgi svarbus. Airflow gali generuoti daug įrašų, ypač jei turite dažnai vykdomus DAG. Reguliariai valykite senus įrašus:
# Naudokite airflow db clean komandą airflow db clean --clean-before-timestamp "2024-01-01" --yes
Arba nustatykite automatinius valymo DAG, kurie reguliariai šalina senas užduočių būsenas ir logus.
Realūs panaudojimo scenarijai
Pažiūrėkime, kaip Airflow naudojamas praktikoje įvairiose situacijose.
ETL procesai – tai klasikinis Airflow panaudojimas. Pavyzdžiui, kas naktį ištraukiate duomenis iš įvairių šaltinių (API, CSV failai, duomenų bazės), juos transformuojate ir įkeliami į duomenų saugyklą (data warehouse):
# Supaprastintas ETL DAG
extract_from_api >> extract_from_db >> extract_from_files >> \
validate_data >> transform_data >> load_to_warehouse >> \
update_reports >> send_notification
Machine Learning pipeline – treniruojate modelius reguliariai su naujais duomenimis. Airflow gali orkestravoti visą procesą: duomenų paruošimą, feature engineering, modelio treniravimą, validaciją, deployment'ą:
prepare_data >> create_features >> split_data >> \
train_model >> evaluate_model >> \
[deploy_model, archive_old_model] >> notify_team
Duomenų kokybės tikrinimas – reguliariai tikrinate duomenų integralumą, ieškote anomalijų, generuojate kokybės ataskaitas:
def check_data_quality(**context):
# Tikrinama, ar nėra null reikšmių
# Ar duomenys atitinka laukiamus intervalus
# Ar nėra dublikatų
issues = []
if null_count > threshold:
issues.append(f"Per daug null reikšmių: {null_count}")
if issues:
raise ValueError(f"Duomenų kokybės problemos: {issues}")
return "Duomenų kokybė OK"
Infrastruktūros automatizavimas – Airflow ne tik duomenims. Galite automatizuoti backup'us, serverių konfigūraciją, deployment'us:
check_server_health >> create_backup >> \
deploy_new_version >> run_tests >> \
[rollback, send_success_notification]
Kaip išspausti maksimumą iš Airflow
Baigiant šį žygį po Airflow pasaulį, verta apibendrinti geriausias praktikas ir patarimus, kurie padės jums efektyviai naudoti šį įrankį.
Visų pirma, laikykite DAG failus paprastus. Visa sudėtinga logika turėtų būti atskiruose Python moduliuose, o DAG failas tik orkestravimo aprašas. Tai palengvina testavimą ir kodą daro skaitomesnį. Naudokite version control sistemą (Git) visiems DAG failams – taip galėsite sekti pakeitimus ir grįžti prie ankstesnių versijų.
Testavimas yra kritiškai svarbus. Galite testuoti DAG struktūrą nepaleidžiant užduočių:
# pytest testas
def test_dag_loaded():
from airflow.models import DagBag
dagbag = DagBag()
assert 'my_dag_id' in dagbag.dags
assert len(dagbag.import_errors) == 0
def test_dag_structure():
dag = dagbag.get_dag('my_dag_id')
assert len(dag.tasks) == 5
assert 'start_task' in dag.task_ids
Dokumentacija jūsų DAG viduje labai padeda komandai. Naudokite doc_md parametrą, kuris rodomas Web UI:
dag = DAG(
'documented_dag',
doc_md="""
### Duomenų apdorojimo pipeline
Šis DAG atlieka:
- Duomenų ekstraktavimą iš API
- Transformavimą pagal verslo taisykles
- Įkėlimą į analytics DB
**Savininkas**: Data Team
**SLA**: 2 valandos
""",
)
Aplinkos valdymas – naudokite skirtingas konfigūracijas dev, staging ir production aplinkoms. Airflow palaiko aplinkos kintamuosius ir skirtingus konfigūracijos failus. Niekada nekietuokite slaptažodžių kode – naudokite Airflow Connections arba išorinę secrets valdymo sistemą (Vault, AWS Secrets Manager).
Ir paskutinis, bet ne mažiau svarbus dalykas – bendruomenė. Airflow turi aktyvią bendruomenę, puikią dokumentaciją ir daugybę pavyzdžių. Kai susiduriate su problema, tikėtina, kad kažkas jau ją sprendė. GitHub issues, Stack Overflow, Slack kanalas – visa tai puikūs šaltiniai pagalbai.
Airflow nėra tobulas – jis turi mokymosi kreivę, kartais gali būti per daug sudėtingas paprastiems poreikiams, o Web UI, nors ir funkcionalus, galėtų būti modernesnis. Bet kai reikia suvaldyti sudėtingus duomenų procesus, Airflow yra vienas geriausių įrankių. Jo lankstumas, Python ekosistemos parama ir platus funkcionalumas daro jį puikiu pasirinkimu daugeliui organizacijų. Pradėkite nuo paprastų DAG, eksperimentuokite, mokykitės iš klaidų – ir greitai Airflow taps neatsiejama jūsų duomenų infrastruktūros dalimi.
