Airflow 소개, 가장 많이 사용되는 데이터 파이프라인 관리 / 구현 프레임워크인 Airflow에 대해 알아보자
Airflow 소개 (1)
● Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임워크
○ Airbnb에서 시작한 아파치 오픈소스 프로젝트
○ 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임워
● 데이터 파이프라인 스케줄링 지원
○ 정해진 시간에 ETL 실행 혹은 한 ETL의 실행이 끝나면 다음 ETL 실행
○ 웹 UI를 제공하기도 함
Airflow 소개 (2)
● 데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌
○ 다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공
○ 데이터 파이프라인 관리 관련 다양한 기능을 제공해줌: 특히 Backfill
Backfill?
데이터베이스나 시스템에서의 결손된 데이터를 채우는 작업을 말한다. 보통 시간이나 순서에 따라 발생하는 데이터 누락이나 오류를 수정하고, 빠진 데이터를 채워넣는 프로세스를 의미하는데, Backfill 작업을 통해 데이터의 완전성과 정확성을 유지하며, 이후 분석이나 처리에 영향을 주는 부분을 보완하는데 사용한다.
● Airflow에서는 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
○ 하나의 DAG는 하나 이상의 태스크로 구성됨
● 2020년 12월에 Airflow 2.0이 릴리스됨
● Airflow 버전 선택 방법: 큰 회사에서 사용하는 버전이 무엇인지 확인.
○ https://cloud.google.com/composer/docs/concepts/versioning/composer-versions
Airflow 구성, Airflow를 구성하는 컴포넌트와 스케일링 방법과 코드 구조에 대해 알아보자
Airflow - 총 5개의 컴포넌트로 구성
1. Web Server
2. Scheduler
3. Worker
4. Database (Sqlite가 기본으로 설치됨)
5. Queue (기본적으로는 멀티노드 구성인 경우에만 사용됨)
a. 이 경우 Executor가 달라짐 (CeleryExecuter, KubernetesExecutor)
Airflow 구조 : 서버 한대
Airflow 스케일링 방법
● 스케일 업 (더 좋은 사양의 서버 사용)
● 스케일 아웃 (서버 추가)
● Docker와 K8s 사용
Airflow 구조 : 다수 서버
Airflow 개발의 장단점
● 장점
○ 데이터 파이프라인을 세밀하게 제어 가능
○ 다양한 데이터 소스와 데이터 웨어하우스를 지원
○ 백필(Backfill)이 쉬움
● 단점
○ 배우기가 쉽지 않음
○ 상대적으로 개발환경을 구성하기가 쉽지 않음
○ 직접 운영이 쉽지 않음. 클라우드 버전 사용이 선호됨
■ GCP provides “Cloud Composer”
■ AWS provides “Managed Workflows for Apache Airflow”
■ Azure provides “Azure Data Factory Managed Airflow”
DAG란 무엇인가?
● Directed Acyclic Graph의 줄임말(방향(Directed)이 있음 loop X)
● Airflow에서 ETL을 부르는 명칭
● DAG는 태스크로 구성됨
○ 예를 3개의 태스크로 구성된다면 Extract, Transform, Load로 구성
● 태스크란? - Airflow의 오퍼레이터(Operator)로 만들어짐
○ Airflow에서 이미 다양한 종류의 오퍼레이터를 제공함
○ 경우에 맞게 사용 오퍼레이터를 결정하거나 필요하다면 직접 개발
○ e.g., Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script
DAG 구성예제 (1)
DAG 구성예제 (2)
모든 Task에 필요한 기본 정보
default_args = {
'owner': 'sungwoo', # 사용자 이름
'start_date': datetime(2020, 8, 7, hour=0, minute=00), # 시작
'end_date': datetime(2020, 8, 31, hour=23, minute=00), # 끝
'email': ['sungwoo@example.com'], # error나면 공유 mail
'retries': 1, # error나면 재시도 횟수
'retry_delay': timedelta(minutes=3), # 작업실패시 3분 기다렸다가 다시시도
}
모든 DAG에 필요한 기본 정보 (1)
from airflow import DAG
test_dag = DAG(
"HelloWorld", # DAG name
schedule="0 9 * * *", # 작업이 어떤 주기로 실행되는지를 지정
tags=['test'] # DAG에 태그를 추가, 이 태그는 DAG를 그룹화하거나 필터링하는 데 사용
default_args=default_args # 이전에 정의한 default_args 딕셔너리를 DAG의 기본 인수로 설정
)
모든 DAG에 필요한 기본 정보 (2)
● schedule의 의미: 크론탭 문법을 따름 (UTC)
● * 분(0-59) / * 시(0-23) / * 일(1-31) / * 월(1-12) / * 요일(0-6, 0이나 7은 일요일)
● “0 * * * *”의 의미는? 매번 0분 마다 실행(1시간마다 실)
● “0 12 * * *”의 의미는? 매일 12시 0분(정) 마다 실행
● “30 6 * * 0”의 의미는? 매주 일요일 아침 6시 30분 마다 실행
Operators Creation Example #1
from airflow.operators.bash import BashOperator
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
Operators Creation Example #2
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(dag=dag, task_id="start", *args, **kwargs)
t1 = BashOperator(
task_id='ls1',
bash_command='ls /tmp/downloaded',
retries=3,
dag=dag)
t2 = BashOperator(
task_id='ls2',
bash_command='ls /tmp/downloaded',
dag=dag)
end = DummyOperator(dag=dag, task_id='end', *args, **kwargs)
데이터 파이프라인을 만들 때 데이터 파이프라인 작성시 고려할 점, best practices 점 들과 기억해두면 좋은 팁들에 대해 정리해보자
이상과 현실간의 괴리
● 이상 혹은 환상
○ 내가 만든 데이터 파이프라인은 문제 없이 동작할 것이다
○ 내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것이다
● 현실 혹은 실상
○ 데이터 파이프라인은 많은 이유로 실패함
■ 버그 :)
■ 데이터 소스상의 이슈: What if data sources are not available or change its data format
■ 데이터 파이프라인들간의 의존도에 이해도 부족
○ 데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
■ 데이터 소스간의 의존도가 생기면서 이는 더 복잡해짐. 만일 마케팅 채널 정보가 업데이트가 안된다면 마케팅 관련 다른 모든 정보들이 갱신되지 않음
■ More tables needs to be managed (source of truth, search cost, …)
Best Practices (1)
● 가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블을 만들기 (Full Refresh)
● Incremental update만이 가능하다면(최대한 Full Refresh하다가 안되는 상황), 대상 데이터소스가 갖춰야할 몇 가지 조건이 있음
○ 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요:
■ created (데이터 업데이트 관점에서 필요하지는 않음)
■ modified
■ deleted
○ 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함
Best Practices (2)
● 멱등성(Idempotency)을 보장하는 것이 중요
● 멱등성은 무엇인가?
○ 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 말아야함
○ 예를 들면 중복 데이터가 생기지 말아야함
Best Practices (3)
● 실패한 데이터 파이프라인을 재실행이 쉬어야함
● 과거 데이터를 다시 채우는 과정(Backfill)이 쉬어야 함
● Airflow는 이 부분(특히 backfill)에 강점을 갖고 있음
○ DAG의 catchup 파라미터가 True가 되어야하고 start_date과 end_date이 적절하게 설정되어야함
○ 대상 테이블이 incremental update가 되는 경우에만 의미가 있음
■ execution_date 파라미터를 사용해서 업데이트되는 날짜 혹은 시간을 알아내게 코드를 작성해야함
● 현재 시간을 기준으로 업데이트 대상을 선택하는 것은 안티 패턴
Best Practices (4)
● 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
○ 데이터 디스커버리 문제!
● 주기적으로 쓸모없는 데이터들을 삭제
○ Kill unused tables and data pipelines proactively
○ Retain only necessary data in DW and move past data to DL (or storage)
Best Practices (5)
● 데이터 파이프라인 사고시 마다 사고 리포트(post-mortem) 쓰기
○ 목적은 동일한 혹은 아주 비슷한 사고가 또 발생하는 것을 막기 위함
● 중요 데이터 파이프라인의 입력과 출력을 체크하기
○ 아주 간단하게 입력 레코드의 수와 출력 레코드의 수가 몇개인지 체크하는 것부터 시작
○ 써머리 테이블을 만들어내고 Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
○ 중복 레코드 체크
○ ...
Backfill이란? 관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되면 이를 어떻게 관리하느냐가 DE의 삶에 큰 영향을 준다.
Full Refresh vs. Incremental Update
● Full Refresh: 매번 소스의 내용을 다 읽어오는 방식
○ 효율성이 떨어질 수 있지만 간단하고 소스 데이터에 문제가 생겨도 다시 다 읽어오기에 유지보수가 쉬움
○ 데이터가 커지면 사용불가
● Incremental Update
○ 효율성이 좋지만 복잡해지고 유지보수가 힘들어짐
○ 보통 daily나 hourly로 동작해서 그 전 시간 혹은 그 전 날 데이터를 읽어오는 형태로 동작
Incremental Update가 실패하면?
Incremental Update시에만 의미하는 이야기
● 다시 한번 가능하면 Full Refresh를 사용하는 것이 좋음
○ 문제가 생겨도 다시 실행하면 됨
● Incremental Update는 효율성이 더 좋을 수 있지만 운영/유지보수의 난이도가 올라감
○ 실수등으로 데이터가 빠지는 일이 생길 수 있음
○ 과거 데이터를 다시 다 읽어와야하는 경우 다시 모두 재실행을 해주어야함
Backfill의 용이성 여부 -> 데이터 엔지니어 삶에 직접적인 영향!(*Incremental Update시에만)
● Backfill의 정의
○ 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미
● Backfill 해결은 Incremental Update에서 복잡해짐
○ Full Refresh에서는 간단. 그냥 다시 실행하면 됨
● 즉 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가?
○ 이게 잘 디자인된 것이 바로 Airflow
보통 Daily DAG를 작성한다고 하면 어떻게 할까?
● 지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
그런데 지난 1년치 데이터를 Backfill 해야한다면?
● 기존 ETL 코드를 조금 수정해서 지난 1년치 데이터에 대해 돌린다
● 실수하기 쉽고 수정하는데 시간이 걸림
from datetime import datetime, timedelta
# y = datetime.now() - timedelta(1)
# yesterday = datetime.strftime(y, '%Y-%m-%d')
yesterday = '2023-01-01'
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
어떻게 ETL을 구현해놓으면 이런 일이 편해질까?
● 시스템적으로 이걸 쉽게 해주는 방법을 구현한다
○ 날짜별로 backfill 결과를 기록하고 성공 여부 기록: 나중에 결과를 쉽게 확인
○ 이 날짜를 시스템에서 ETL의 인자로 제공
○ 데이터 엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고 시스템이 지정해준 날짜를 사용
Airflow 설치 - 도커 사용 도커를 사용해 Airflow를 설치해보자
'Data Engineering > 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트' 카테고리의 다른 글
[4주차] 데이터베이스 트랜잭션 (0) | 2023.09.05 |
---|---|
[3주차] Assignment (0) | 2023.08.31 |
[3주차] ETL (0) | 2023.08.30 |
[2주차] Assignment (0) | 2023.08.25 |
[2주차] SQL for Data Engineers(2) (0) | 2023.08.22 |