본문 바로가기
데이터엔지니어링/Airflow

Airflow의 Operator와 Decorators의 활용 및 Backfill 정의

by goemgoem-i 2024. 11. 25.
반응형

PythonOperator 실행

- Airflow DAG 내에서 실행할 작업이 Python 코드일 때, PythonOperator를 사용하여 Python 함수를 실행 할 수 있음

- 데이터 처리, 모델 훈련, 웹 크롤링 등 다양한 Python 작업을 Airflow 워크플로우의 일부로 실행하려면 PythonOperator가 필요

 

 

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 각 업무를 처리하는 Python 함수 정의
def task_1():
    print("업무 1: 이메일 확인 및 답장 완료")

def task_2():
    print("업무 2: 회의 준비 완료")

def task_3():
    print("업무 3: 보고서 작성 완료")

# DAG 설정
default_args = {
    'owner': 'airflow',
    'retries': 1,
}

with DAG('company_work_day',
         default_args=default_args,
         description='회사에서 하루의 업무를 처리하는 DAG',
         schedule_interval=None,  # 수동 실행
         start_date=datetime(2024, 11, 20),
         catchup=False) as dag:

    # 업무 정의
    task_1_operator = PythonOperator(
        task_id='task_1',
        python_callable=task_1,
    )

    task_2_operator = PythonOperator(
        task_id='task_2',
        python_callable=task_2,
    )

    task_3_operator = PythonOperator(
        task_id='task_3',
        python_callable=task_3,
    )

    # 업무 간 의존성 설정
    task_1_operator >> task_2_operator >> task_3_operator

 

 

 

Airflow Decorators를 활용

 

기존 코드에서는 PythonOperator를 사용하여 각각의 작업을 수동으로 생성하고 의존성을 설정

데코레이터를 사용하면 코드가 더 간결해지고, DAG와 작업을 쉽게 정의 가능

기존 코드에 비해 코드의 간결성과 가독성, 유지보수성 면에서 좋음

별도의 operate 작업이 필요 없어짐 

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

# 각 업무를 처리하는 작업 정의
@task
def task_1():
    print("업무 1: 이메일 확인 및 답장 완료")

@task
def task_2():
    print("업무 2: 회의 준비 완료")

@task
def task_3():
    print("업무 3: 보고서 작성 완료")

# DAG 정의
with DAG(
    dag_id='company_work_day_v2',
    description='회사에서 하루의 업무를 처리하는 DAG',
    start_date=datetime(2024, 11, 20),
    schedule_interval=None,  # 수동 실행
    catchup=False,
    tags=['example']
) as dag:
    # 작업 순서 정의
    task_1() >> task_2() >> task_3()

 

 

 

 

DAG Parameters vs. Task Parameters

- DAG 파라미터: DAG 객체를 생성할 때의 전역 설정

- Task 파라미터: 개별 작업(Task)을 정의할 때 사용하는 설정

 

Backfill

Airflow에서 start_date 이후부터 실행되지 않은 기간 동안의 DAG 실행을 보충하는 작업

DAG의 시작 날짜(start_date)가 과거로 설정되어 있는 경우,

Airflow는 해당 날짜부터 현재까지의 실행 간격에 맞춰 모든 작업을 실행하려고 시도

 

  • Backfill 활성화 (catchup=True): 과거의 모든 작업을 보충 실행.
  • Backfill 비활성화 (catchup=False): 현재 시점 이후의 작업만 실행.

 

 

중요한 이유 : 

- 과거 데이터 보충 가능

- 리소스 관리 

 

반응형