반응형
PythonOperator 실행
- Airflow DAG 내에서 실행할 작업이 Python 코드일 때, PythonOperator를 사용하여 Python 함수를 실행 할 수 있음
- 데이터 처리, 모델 훈련, 웹 크롤링 등 다양한 Python 작업을 Airflow 워크플로우의 일부로 실행하려면 PythonOperator가 필요
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 각 업무를 처리하는 Python 함수 정의
def task_1():
print("업무 1: 이메일 확인 및 답장 완료")
def task_2():
print("업무 2: 회의 준비 완료")
def task_3():
print("업무 3: 보고서 작성 완료")
# DAG 설정
default_args = {
'owner': 'airflow',
'retries': 1,
}
with DAG('company_work_day',
default_args=default_args,
description='회사에서 하루의 업무를 처리하는 DAG',
schedule_interval=None, # 수동 실행
start_date=datetime(2024, 11, 20),
catchup=False) as dag:
# 업무 정의
task_1_operator = PythonOperator(
task_id='task_1',
python_callable=task_1,
)
task_2_operator = PythonOperator(
task_id='task_2',
python_callable=task_2,
)
task_3_operator = PythonOperator(
task_id='task_3',
python_callable=task_3,
)
# 업무 간 의존성 설정
task_1_operator >> task_2_operator >> task_3_operator
Airflow Decorators를 활용
기존 코드에서는 PythonOperator를 사용하여 각각의 작업을 수동으로 생성하고 의존성을 설정
데코레이터를 사용하면 코드가 더 간결해지고, DAG와 작업을 쉽게 정의 가능
기존 코드에 비해 코드의 간결성과 가독성, 유지보수성 면에서 좋음
별도의 operate 작업이 필요 없어짐
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
# 각 업무를 처리하는 작업 정의
@task
def task_1():
print("업무 1: 이메일 확인 및 답장 완료")
@task
def task_2():
print("업무 2: 회의 준비 완료")
@task
def task_3():
print("업무 3: 보고서 작성 완료")
# DAG 정의
with DAG(
dag_id='company_work_day_v2',
description='회사에서 하루의 업무를 처리하는 DAG',
start_date=datetime(2024, 11, 20),
schedule_interval=None, # 수동 실행
catchup=False,
tags=['example']
) as dag:
# 작업 순서 정의
task_1() >> task_2() >> task_3()
DAG Parameters vs. Task Parameters
- DAG 파라미터: DAG 객체를 생성할 때의 전역 설정
- Task 파라미터: 개별 작업(Task)을 정의할 때 사용하는 설정
Backfill
Airflow에서 start_date 이후부터 실행되지 않은 기간 동안의 DAG 실행을 보충하는 작업
DAG의 시작 날짜(start_date)가 과거로 설정되어 있는 경우,
Airflow는 해당 날짜부터 현재까지의 실행 간격에 맞춰 모든 작업을 실행하려고 시도
- Backfill 활성화 (catchup=True): 과거의 모든 작업을 보충 실행.
- Backfill 비활성화 (catchup=False): 현재 시점 이후의 작업만 실행.
중요한 이유 :
- 과거 데이터 보충 가능
- 리소스 관리
반응형
'데이터엔지니어링 > Airflow' 카테고리의 다른 글
anaconda 비쥬얼스튜디오코드 가상환경 설정방법 윈도우 (1) | 2024.11.26 |
---|---|
docker 설치하고 airflow 설치하기 윈도우 (1) | 2024.11.25 |
gcp 코드 4003에러 해결 : Cloud Identity-Aware Proxy를 통한 연결 실패 VM에 IP 범위 35.235.240.0/20, 포트 22에서 오는 TCP 인그레스 트래픽을 허용하는 방화벽 규칙 (0) | 2024.11.23 |
트랜잭션과 aws ec2에 docker 설치 윈도우 버전 (1) | 2024.11.23 |
키움 open api에서 데이터 가지고 오기 (3) | 2024.11.22 |