Airflow을 사용하기 앞서 꼭 알아야하는 주요 개념
Apache Airflow는 워크플로우(Workflow) 자동화 및 스케줄링을 위한 오픈소스 플랫폼임. 데이터 파이프라인을 효율적으로 관리하려면 Airflow의 핵심 개념을 이해하는 것이 중요.
- DAG (Directed Acyclic Graph): Airflow에서 워크플로우를 정의하는 기본 단위로, DAG은 여러 개의 Task로 구성
- Task & Operator: DAG안에 실행되는 개별작업을 Task라고 함. Task은 airflow의 Operator라는 클레스로 정의
- Task depdendecies: 테스크의 실행순서를 정의
- Branching (조건 분기)
그외 기타..
- Xcom (cross-communication): Task간 데이터를 공유하는 기능
- Executor와 Scheduler: Scheduler: DAG을 주기적으로 실행하는 역할, Executor: Task 실행 방식 결정
설치: 설치 후 바로실행할 수 있는게 아니라 몇몇 작업 필요...
1. pip 패키지 관리자로 설치.
pip install apache-airflow
2. 데이터베이스 초기화: airflow 내부적으로 사용하는 데이터베이스를 초기화하기 위함.
airflow db init # Airflow의 실행을 위한 Metadata DB 설정
3. Web server 및 Scheduler 실행
airflow webserver --port 8080 # UI 실행
airflow scheduler # DAG 스케줄러 실행
DAG
- DAG는 비순환 유향 그래프 형태를 가지며, Task 간의 실행 순서를 정의
- 주기적인 실행을 위해 schedule_interval을 설정할 수 있음.
예제 DAG 코드
from airflow import DAG
from datetime import datetime
dag = DAG(
dag_id="my_dag",
start_date=datetime(2025, 3, 10),
schedule_interval="@daily",
catchup=False
)
Operator: task을 정의하는 요소
- EmptyOperator / DummyOperator: (Airflow version > 2.0).
- 사용 목적 1: 시작을 명시적으로 알려주기 위해서 주로 사용("start"라는 이름을 가진 Operator을 정의할 수 있기 때문. 이를 사용하는 경우에는 "start"->"preprocess"->... 이런식으로 시작되지만, 사사용하지 않는 경우에는 "preprocess"->"..."로 바로 시작되기 떄문에 DAG의 시작을 직관적으로 알기 불편함)
- 사용 목적 2: 특정 테스크의 그룹을 논리적으로 묶을때 사용. 예를 들어, 여러 테스크가 병렬로(동시에) 실행될 때, 그 테스크들이 완료된 다음에 task을 연이어 실행하는 경우. "start >> [task_1, task_2] >> group_end >> final_task"
- PythonOperator: Python 함수를 실행.
- BashOperator: Bash 명령어 실행.
- BranchPythonOperator: 조건에 따라 실행할 경로를 분기.
- Sensor: 외부 데이터의 존재를 감지할 때 사용
가장 많이 사용되는 PythonOperator은 python을 실행시켜주는 오퍼레이션입니다. 주요 사용되는 인자는 아래와 같습니다.
PythonOperator(
task_id="my_task",
python_callable=my_function,
provide_context=True,
op_args=["arg1", "arg2"],
op_kwargs={"key1": "value1", "key2": "value2"},
dag=dag
)
- task_id: 테스크의 고유한 ID (DAG내에서 유일해야함). UI에서 테스크 이름으로 표시가 됨.
- python_callable: 실제로 실행할 python함수. DAG 실행시 전달되는 함수가 실행됨.
- provide_context: airflow 실행 메타데이터(context)을 kwargs으로 전달할지 여부. 예를 들어, kwargs["execution_data"]와 같은 airflow의 실행 메타데이터를 가져올 수 있음.
- op_args: 함수의 위치에 positional arugments을 전달
- dag: 이 테스크가 어디의 DAG에 속할지. with dag구문을 이용하면 dag=dag을 안써도됨
두 번째로 많이 사용되는 오퍼레이터인 BranchPythonOperator.
BranchPythonOperator는 특정 조건에 따라 실행할 Task를 동적으로 결정하는 Operator입니다.즉, DAG 실행 중 어떤 Task를 실행할지 선택하는 역할을 한다.
BranchPythonOperator의 핵심 개념
- python_callable에 지정된 함수의 실행 결과에 따라 어떤 Task를 실행할지 결정함.
- 일반적인 Task와 다르게 하나 이상의 Task를 선택 가능.
- 선택되지 않은 Task는 건너뛰기(Skipped) 상태가 됨.
- 반드시 return 타입이 str이거나 List[str]이어야함. (str은 task_id이거나, 복수의 task_id여야함)
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
import random
def choose_branch():
return "task_A" if random.choice([True, False]) else "task_B"
# DAG 정의
with DAG(
dag_id="branching_example",
start_date=datetime(2025, 3, 10),
schedule_interval="@daily",
catchup=False
) as dag:
start = EmptyOperator(task_id="start")
branching = BranchPythonOperator(
task_id="branching_task",
python_callable=choose_branch # 실행할 Task를 결정하는 함수
)
task_A = EmptyOperator(task_id="task_A")
task_B = EmptyOperator(task_id="task_B")
end = EmptyOperator(task_id="end")
start >> branching >> [task_A, task_B] >> end
- Task depedencies (테스크 의존성): task1 >> task2 과 같이 실행순서를 정할 수 있음
Task Dependencies란?
- Airflow DAG에서 Task 간의 실행 순서를 정의하는 것
- DAG 내의 Task는 단순히 나열하는 것이 아니라, 어떤 Task가 먼저 실행되고, 어떤 Task가 나중에 실행될지 지정해야 함.
- 의존성(Dependency)을 정의하면 Airflow Scheduler가 올바른 순서로 Task를 실행함.
Airflow에서 Task 의존성을 정의하는 방법은 다음과 같음:
방법 | 코드 예시 | 설명 |
연산자 (>>, <<) 사용 | task1 >> task2 | task1 실행 후 task2 실행 |
.set_upstream() 사용 | task2.set_upstream(task1) | task1 실행 후 task2 실행 |
.set_downstream() 사용 | task1.set_downstream(task2) | task1 실행 후 task2 실행 |
실행 및 구성 요소
- Scheduler: DAG파일을 읽고 실행할 Task의 스케쥴을 관리
- Executor: Task 실행을 담당하는 엔진 (operator랑 유사한데, Operator은 DAG 내에서 개별 Task을 구성하는 요소라면, Executor은 Task을 실행하는 엔진, 실행환경을 결정하는 역할을 의미. 즉, DAG내에 정의된 Operator을 어떤방식으로 실행할것인지를 선택하는 것)
FAQ
Q1. Airflow DAG에서 if __name__ == "__main__": 필요 여부
Airflow에서 dag.py 파일을 실행할 때는 if __name__ == "__main__":을 따로 설정할 필요 없음.
즉, 일반적인 Python 스크립트처럼 직접 실행되는 게 아니라, Airflow의 Scheduler가 자동으로 DAG 파일을 로드하기 때문. Airflow는 "dags/"디렉토리에 있는 모든 python파일을 주기적으로 스캔함. 이 때, DAG 파일 내에 정의된 DAG 객체를 파싱하여 등록함.
Q2. with DAG(...) as dag의 컨텍스트 관리방식을 꼭 써야하나?
결론적으로 쓰는 것을 추천. with구문(context)을 관리해야하는것인가의 질문과 거의 유사함.
- 코드 가독성 향상: with DAG(...) 내에 with (=context) 블록내부에서 관리되어서 코드가 깔끔해짐. 그리고, 각 operator내에 dag=dag의 인자를 명시적으로 전달하지 않아도 됨.
- 코드 실행 순서와 DAG 구성간의 예측가능성 증가: dag=dag을 명시적으로 사용하는 경우, DAG 정의와 Task 생성이 분리될 수 있어 코드 실행 순서가 중요해질 수 있음. with DAG(...) 블록 내에서 Task를 정의하면, Airflow가 DAG을 로드하는 순서가 보장되므로 예측 가능성이 높아짐. with 구문을 사용안하면, 이후에 DAG정의 이후에도 Task가 추가될 수 있어 예측이 어려울 수 있음.
- 자원해지: with 구문을 이용하여 DAG객체의 생명주기가 컨텍스트메니저를 통해 자동으로 관리되므로, DAG가 메모리에서 안전하게 처리됨.
'Data science' 카테고리의 다른 글
GitHub Actions 의존성 캐싱하기 (0) | 2025.01.17 |
---|---|
[5분 안에 이해하는] 프롬프트 엔지니어링 핵심기법: Few shot ,CoT, SC, ReACT, RAG (0) | 2024.04.15 |
구글드라이브 파일 wget 사용하기 (0) | 2022.11.11 |
오토마우스: 간호사, 의사 보수교육 사이버연수 자동클릭 (0) | 2022.10.29 |
Data structure (0) | 2020.01.28 |