Data Engineering/Workflow

2. Airflow on Kubernetes - git sync

Kim Jinung 2022. 11. 22. 17:47

Prerequisite

 

1. Airflow on Kubernetes

Apache Airflow는 쿠버네티스에 친숙한 프로젝트를 목표로 하고 있으며, 공식 helm 차트를 관리하고 있다. 해당 차트를 이용해서 쿠버네티스 클러스터에 비교적 쉽게 에어플로우를 구축할 수 있다. G

jinyes-tistory.tistory.com


Native Airflow를 사용하면 local directory에 dag를 저장하고 사용하면 되지만, kubernetes는 컨테이너 형태(상태)로 리소스를 관리하기 때문에, 언제든 Airflow가 종료되었다가 재시작이 될 수 있다. Airflow는 remote directory 기능인 git sync를 제공하는데, 해당 기능을 이용해서 DAG 파일 저장소로 github repository를 이용할 수 있다.

 

Goal

- Airflow git sync가 무엇인가?

- Airflow on Kubernetes with gitsync


1. Airflow git sync가 무엇인가?

github repository를 Airflow DAG 디렉토리로 사용하는 것이다. 즉 아래 이미지에서 DAG Directory가 Github repository가 되는 것이다.

https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html

 

1. 여러 사람이 하나의 repository를 통해 dag 파일을 관리할 수 있다.

2. Github 플랫폼을 그대로 사용하므로 코드 리뷰가 쉬워진다.

3. DAG 디렉토리를 별도의 컨테이너로 관리할 필요가 없다.

 


2. Airflow on Kubernetes with git sync

 

Setting up Apache Airflow on Kubernetes with GitSync

When I first started studying data engineering I felt overwhelmed by the number of tools and techniques that were required to run data…

blog.devgenius.io

위 아티클을 참조하였으나, 조금 더 따라가기 쉽게 디테일을 추가했다.

 

(1) Repository 생성

github에서 dag directory에 사용할 private repository를 생성한다. 

(2) ssh key 생성

ssh-keygen -t rsa -b 4096 -C "your_email@example.com"

.ssh 디렉토리 내에 airflow_ssh_key 라는 이름으로 ssh key를 생성한다.

 

주의할 점은 passphrase를 별도로 주입하지 않고 빈값으로 놔둬야 한다.

 

클립보드에 미리 키를 복사해둔다.

pbcopy < airflow_ssh_key.pub

 

(3) Deploy Keys 설정 

repositry setting 페이지로 넘어가면, 왼쪽 사이드 바에 Deploy keys 항목이 있다.

복사한 key를 붙여넣고 Allow write access를 허용해준 후 키를 등록한다.

 

(4) Create  K8s secret

아래 커맨드 /path/to 경로에는 자신의 .ssh 디렉토리 경로를 주입해서 k8s secret을 생성한다.

kubectl create secret generic airflow-git-ssh-secret \
  --from-file=gitSshKey=/path/to/.ssh/airflow_ssh_key \
  --from-file=id_ed25519.pub=/path/to/.ssh/airflow_ssh_key.pub \
  -n airflow
kubectl get secret -n airflow

airflow-git-ssh-secret이 생성된 것을 확인할 수 있다.

 

 

(5) Editing the airflow helm YAML file

아래 helm 커맨드로 airflow value.yaml 파일을 생성한다.

helm show values apache-airflow/airflow > values.yaml

 

해당 야믈 파일에서는 airflow 설정을 조작할 수 있다. 

 

 

쭉 내리다 보면 Git sync를 항목을 확인할 수 있다.

아래와 같이 수정하되 repo에는 private으로 생성한 자신의 repo 주소로 변경한다. 

  • enabled: 해당 기능을 사용할 것인지에 대한 여부
  • repo: sync에 사용할 github repository 주소
  • branch: repository의 어떤 branch를 sync 할 것인지
  • depth: root 디렉토리의 depth. 디렉토리 자체를 dags 디렉토리로 사용할 것이므로 1로 놔둔다.
  • subPath: 추가로 사용할 경로 path. 참조한 자료에서는 테스트 코드를 subPath에 추가해서 사용한다.
  • sshKeySecret: 앞서 작성한 k8s secret의 이름

 

# Git sync
dags:
  persistence:
    # Enable persistent volume for storing dags
    enabled: true
    # Volume size for dags
    size: 10Gi
    # If using a custom storageClass, pass name here
    storageClassName:
    # access mode of the persistent volume
    accessMode: ReadWriteOnce
    ## the name of an existing PVC to use
    existingClaim:
  gitSync:
    enabled: true
    repo: git@github.com:your-github-username/repository-name.git
    branch: main
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: ""

    credentialsSecret: git-credentials

    sshKeySecret: airflow-git-ssh-secret

    wait: 10
    containerName: git-sync
    uid: 65533

    # When not set, the values defined in the global securityContext will be used
    securityContext: {}
    #  runAsUser: 65533
    #  runAsGroup: 0

    extraVolumeMounts: []
    env: []
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi

 

(6). Airflow upgrade

Airflow 업그레이드를 진행한다. (airflow가 이미 실행 중이더라도 상관없다.)

helm upgrade --install airflow apache-airflow/airflow -n airflow -f values.yaml

 

3. Sample DAG 작성

Airflow upgrade를 진행하는 동안 sample dag를 하나 작성한다.

 

아래 sample_dag.py 파일을 앞서 만든 airflow-dags-demo 리포지토리에 push 한다.

Airflow 에서 제공하는 튜토리얼 DAG의 스케쥴 타임만 수정했다.

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

from datetime import datetime 
def test():
    print(datetime.now())

    
with DAG(
    'tutorial',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple tutorial DAG',
    schedule_interval='1 * * * *',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = PythonOperator(
        task_id='print_date',
        python_callable=test,
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

 

4. Test - Airflow git sync 

port forward 진행 후 8080 포트로 웹서버에 접속한다.

kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow

 

DAGs 목록에 tutorial DAG가 정상적으로 잡힌다.

 

 

DAG를 Active 상태로 변경하여 정상 작동하는 것을 확인한다.

 


마치며

지금까지 Airflow git sync가 무엇이고, 어떻게 사용하는지 정리했다. 그런데 Airflow를 Kubernetes에 올리면서 exexcutor 설정은 건드리지 않았다.

pod 목록을 확인 해보면 airflow-worker가 항상 running 중인 것을 확인할 수 있다. 

이제 이 worker pod를 DAG job이 실행될 때만 생성 되도록 Kubernetes executor를 사용해볼 것이다.