개요
문서 작성 시점 기준으로 Composer에서 제공하는 Airflow 버전 중 EOS 기간이 가장 많이 남은 버전이며, Data-aware의 기능이 처음 도입된 버전으로서 Data-aware를 소개함이 이 문서의 핵심 목표입니다.
Data-aware란 시간 기반으로만 스케줄링 되던 Airflow에서 데이터 인식을 통한 DAG 스케줄링을 지원하는 기능을 말합니다.
현재로선 해당 기능이 출시(2022-09)된 지 얼마 되지 않아 강력하다고 볼 수 없지만 지원하기 이전의 파이프라인을 보다 다양하고 효율적인 구성을 구현하게 하는 개념이라 볼 수 있습니다.
앞서 말씀드린 Data-aware 포함한 Airflow 2.4의 변경된 기능을 짚어보도록 합시다.
Data-aware scheduling
Data-aware는 Airflow 내 특정 태스크에서 데이터셋을 업데이트할 때 새로운 DAG를 스케줄링할 수 있습니다.
해당 기능에 대해 자세히 말씀드리자면, DAG 생성 시 작고 독립적인 DAG를 만들어, 이를 큰 데이터 기반 워크플로우로 연결할 수 있음을 의미합니다.
현재 ExternalTaskSensor 나 TriggerDagRunOperator를 사용하고 계신다면, Data-aware 기능으로 대체하여 스케줄링을 효율적으로 관리 할 수 있습니다.
이해를 돕기 위해 아래 예시를 확인해 주세요, 먼저, `my_task`라는 작업이 `my-dataset`이라는 데이터셋을 생성하는 간단한 DAG를 작성해봅시다:
from airflow import Dataset
dataset = Dataset(uri='my-dataset')
with DAG(dag_id='producer', ...)
@task(outlets=[dataset])
def my_task():
...
데이터셋은 URI로 정의됩니다. 이 데이터셋이 변경될 때마다 스케줄되는 두 번째 DAG(consumer)를 다음과 같이 작성할 수 있습니다:
from airflow import Dataset
dataset = Dataset(uri='my-dataset')
with DAG(dag_id='dataset-consumer', schedule=[dataset]):
...
이 두 개의 DAG가 있으면, my_task가 끝나는 즉시 Airflow는 dataset-consumer 워크플로우를 위한 DAG 실행합니다.
현재 지원하는 기능만으로는 모든 사용 사례에 적용할 수 없습니다, 하지만 차기 버전 릴리즈(2.5, 2.6 등)에서는 이 기반을 확장하고 개선할 예정입니다.
데이터셋은 현재로서는 데이터셋의 추상적인 개념을 나타내며 직접 읽기 또는 쓰기 기능이 없습니다. 이번 버전에서는 앞으로 더욱 발전시킬 기본 기능을 추가하였으며, 앞으로 더욱 발전된 기능을 제공할 예정입니다.
데이터셋에 대한 자세한 정보는 데이터 중심 스케줄링에 대한 문서를 참조해 주세요. 이는 데이터셋이 어떻게 식별되는지 (URI), 여러 데이터셋에 어떻게 의존할 수 있는지, 데이터셋이 무엇인지에 대해 어떻게 생각해야 하는지에 대한 세부 정보가 포함되어 있습니다.
Easier management of conflicting python dependencies using the new ExternalPythonOperator
모든 파이썬 라이브러리가 함께 이상 없이 사용될 수 있다면 좋겠지만, 현실적으로 불가능합니다. 때때로 여러 파이썬 라이브러리를 Airflow 설치에 동시에 설치하려 할 때 충돌이 발생합니다.
이 문제를 해결하기 위해 @task.external_python (ExternalPythonOperator 포함)을 도입했습니다.
이를 통해 사전에 구성된 가상 환경 또는 완전히 다른 파이썬 버전에서 파이썬 함수를 Airflow 작업으로 실행할 수 있습니다:
@task.external_python(python='/opt/venvs/task_deps/bin/python')
def my_task(data_interval_start, data_interval_env)
print(f'Looking at data between {data_interval_start} and {data_interval_end}')
...
가상 환경에 설치해야 하는 것은 어떤 컨텍스트 변수에 접근하는지에 따라 다소 미묘하게 달라지니, 상세 ExternalPythonOperator 사용법에 대해 반드시 읽어보시기를 바랍니다.
More improvements to Dynamic Task Mapping
이제 동적 작업 매핑은 다음을 지원합니다:
자세한 내용은 각 링크를 참조해주세요.
Auto-register DAGS used in a context manager (더이상 `as dag:` 구문이 필요하지 않습니다)
앞서 언급되었던 내용들과는 달리 사소한 개선이지만, 이 문법은 기존에 무의미한 반복과 단순한 휴먼에러를 야기 해왔습니다.
with DAG(dag_id="example") as dag:
...
@dag
def dag_maker():
...
dag2 = dag_maker()
‘as dag:’를 사용하는 구문은 다음과 같이 변경될 수 있습니다.
with DAG(dag_id="example"):
...
@dag
def my_dag():
...
my_dag()
‘as dag:’ 없이 간단하게 DAG를 정의하고 함수를 호출할 수 있습니다. 이 동작을 비활성화하려는 경우에는, DAG에서 auto_register=False로 설정하면 됩니다.
# 이 DAG는 변수에 할당되지 않았기 때문에 Airflow에서 인식되지 않습니다.
with DAG(dag_id="example", auto_register=False):
...
Additional improvements
650개 이상의 커밋으로 기능, 수정 사항, 변경 사항의 전체 목록은 모두 나열하기 어려우므로 전체 변경 점은 릴리즈 노트를 확인해 주세요.
주목해야 할 주요 내용은 아래와 같습니다:
- 홈페이지 자동 새로고침
- 데코레이터 추가(@task.short_circuit)
- 데코레이터 추가(@task.kubernetes)
- CLI를 통하여 role 삭제 기능 추가
- ExternalTaskSensor내 TaskGroup 지원 가능
- 워커에서 Dynamic DAG의 최적화를 처리하도록 하는 parsing_context 실험적 기능 추가
- 하나의 schedule 변수로 통합
- 기존 불안정했던 컨피그 마스킹 기능을 이제 Admin → Configuration에서 민감하지 않은 데이터로 설정할 수 있도록 기능 제공
- 클래스 이름에서 operator 이름을 분리