Data/Airflow

Cent OS 9 내 Airflow 설치 및 DAG 구성&실행

chanstory 2025. 3. 9. 21:21
반응형

1. Airflow 란?

* Airflow 란 워크플로우(workflow) 관리 플랫폼으로, 데이터 파이프라인을 자동화하고 오케스트레이션하는 데 사용됩니다. 주로 ETL(Extract, Transform, Load) 작업, 데이터 처리, 머신러닝 파이프라인 관리 등에 활용됨

 

Airflow를 사용하게 된 계기는 프로젝트 진행 중 API를 통해 Data를 주기적으로 수집해오고 DB에 적재하는 과정을 반복해야 했고 이 과정을 자동화 시키고 코드 실행 여부만을 모니터링 하고자 했음

Airflow는 작업을 파이썬으로 쉽게 스케줄링 할 수 있고 UI를 통해 수집 결과 모니터링 및 로그 확인이 가능하다는 장점이 있어서 프로젝트에 적용하기로 했음

 

결과적으로는 데이터 수집, 변환, 적재 과정을 자동화 하여 데이터를 프로젝트에 잘 적용할 수 있었음 

현재는 MVAA(Amazon Managed Workflows for Apache Airflow)를 프로젝트에 적용할 계획임

 

지금 작성하는 게시글은 AWS 내 환경이 아닌 CentOS9 환경 즉, 온프레미스 환경에서의 구축 과정임

 

2. Airflow 설치

* VMWare17, CentOS9 환경에서 설치 진행

2.1 가상환경

$ python3 -m venv airflow_venv
$ source airflow_venv/bin/activate

# 필요 패키지 설치
$ sudo yum install -y gcc gcc-c++ libffi-devel python3-devel

먼저 가상환경을 구성함

이유는 다른 패키지와 의존성 충돌을 방지하며 독립적인 패키지 공간을 만들 수 있기 때문임

 

추후 추가적인 모듈을 설치하거나 버전관리 할 때도 용이함

 

지금은 개인 환경에 구성해보는 것이기에 중간에 문제가 생긴다면 빠르게 삭제하고 재설치가 가능함

위와 같이 설치가 팔요 패키지 설치가 완료된 것을 확인 할 수 있음

2.2 Apache Airflow 설치

$ pip install apache-airflow

# Airflow 홈 디렉토리 및 설정 초기화
$ export AIRFLOW_HOME=~/airflow
또는
$ echo 'export AIRFLOW_HOME=~/airflow' >> ~/.bashrc
$ source ~/.bashrc

Airflow는 파이썬 기반의 도구이기에 pip를 활용해서 설치할 수 있고

환경변수를 같이 설정해준다

apache-airflow 가 잘 설치됨

 

$ airflow db init

이후 DB init을 진행하는데 Airflow는 DB를 활용하기 때문이다

 

실행 코드가 들어있는 DAG 정보를 관리하고 태스크의 실행기록, 스케줄링 정보, 사용자 정보를 저장함

 

DB를 초기화 하지 않으면 Airflow가 정상적으로 동작하지 않음

 

 

$ airflow users create --username admin --firstname hong --lastname dong --role Admin --email test@123.com

이후 Apache Airflow의 UI에 접속할 수 있도록 사용자 계정을 생성함

 

Airflow UI는 웹에서 동작하며 DAG 설정 및 모니터링을 진행하는데 보안을 위해 사용자 계정을

생성하여 진행함

 

Role의 종류로는 Admin, User, Viewer, Op가 있으며

 

  • Admin → 모든 기능 사용 가능 (DAG 실행, 설정 변경)
  • User → DAG 실행 가능하지만 설정 변경 불가능
  • Viewer → DAG 모니터링만 가능 (수정/실행 불가)
  • Op → DAG 실행 가능하지만 코드 수정 불가능

 

각 위와 같은 기능을 수행할 수 있음

사용자 계정을 생성하고 Password를 두번 입력해주면 생성 완료!

 

$ airflow webserver --port 8085
또는
$ nohup airflow webserver --port 8085 &


# 방화벽 열기
$ sudo firewall-cmd --permanent --add-port=8085/tcp
$ sudo firewall-cmd --reload
$ sudo firewall-cmd --list-all

Airflow는 기본적으로 8080포트를 사용하는데 이미 다른 프로그램에서 사용중일 경우

Airflow를 다른 포트에서 실행시켜주면 됨

 

방화벽 확인 후 열어놓으면 됨

 

전 nohup으로 백그라운드로 실행시켰음

$ airflow scheduler
또는
$ nohup airflow scheduler &

Airflow scheduler는 DAG를 감지하고 실행하는 역할을 하고 예약된 태스크가 자동으로 실행되도록 해줌

그렇기 때문에 꼭 실행시켜주어야 함

 

스케줄러를 실행시켜주지 않으면

 

  • DAG이 "scheduled" 상태에서 실행되지 않음 🚨
  • 수동으로 실행(Trigger DAG)을 해야만 DAG이 동작함
  • 스케줄링 기반 자동 실행이 불가능

 

위와 같은 문제가 생길 수 있음

 

 

3. Airflow UI

ifconfig를 통해 서버의 주소를 알아내고 해당 주소:8085로 접속하면 됨

전 192.168.126.128:8085로 접속함 VMWare설정(NAT)에 따라 VMWare 외부 브라우저에서도 접속됨

 

Airflow UI로 접속됐으며 이전에 생성해둔 Airflow 계정으로 접속

접속하면 example DAG들이 기본적으로 세팅되어 있음

 

좌측 버튼을 통해 DAG를 On/Off시킬 수 있고 Runs를 통해 DAG 실행 횟수를 알 수 있음

또한 Recent Teasks를 통해 현재 동작중인 DAG가 어떤 상태인지를 알 수 있음

example DAG들은 airflow.cfg에서 load_examples = False로 설정해주면 사라짐

4. Airflow DAG 구성

환경변수에서 설정했던 Airflow home 경로를 확인하고

dags 디렉토리를 생성함

 

해당 경로에 .py로 airflow dags 코드를 작성하면 됨

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 실행할 Python 함수 정의
def print_hello():
    print("Hello, Airflow!")

# 기본 설정
default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
with DAG(
    dag_id='basic_dag',                  # DAG의 ID (이름)
    default_args=default_args,           # 기본 설정 적용
    description='A simple Airflow DAG',  # DAG 설명
    schedule_interval='*/5 * * * *',     # 5분마다 실행 (Cron 표현식)
    start_date=datetime(2024, 3, 1),     # 시작 날짜
    catchup=False,                        # 지난 실행 시간을 무시하고 최신 실행만 수행
) as dag:

    # PythonOperator를 사용해 태스크 정의
    task_hello = PythonOperator(
        task_id='print_hello',  # 태스크 ID
        python_callable=print_hello,  # 실행할 함수
    )

    # DAG 실행 순서 정의
    task_hello  # 단일 태스크이므로 따로 설정할 필요 없음

전 GPT에게 기본적인 코드를 달라고 요청했고 이러한 코드를 넣었습니다!

 

5분 마다 실행하도록 Cron 표현식을 적용하였고

basic_dag라는 이름으로 dag를 구성했습니당 다른 옵션들은 천천히 살펴보시고

DAG이름과 task_id를 잘 확인하면 됩니다

 

DAGS를 실행시켜주면 초록색 동그라미로 성공표시가 뜸!

실패에는 빨간 동그라미의 숫자가 올라가는데 로그를 확인해야함

(본인이 Error 로그를 코드 속에 넣었으면 굿!, 다른 설정의 문제면 알아서 로그로 Airflow가 알려줌)

로그는 DAG명을 누르고 Total success누르면 됨 (실패시엔 Fail이 있을 것)

스케줄 동작시간을 확인하고 Task ID 클릭!

Logs 탭으로 가면 성공 로그 또는 실패 로그를 확인 할 수 있음

 

이게 진짜 편한게 서버 상태나 리소스, 코드 내 오류, API 수집 과정에서의 오류 등을 쉽게 알 수 있음

(본인이 error log 넣으면 더 빨리 찾음)

 

기본적인 Airflow 설치 및 동작은 이정도로 마치고

다음엔 DB를 연결해보도록 하겠습니다

 

그럼 간단한 ETL이 완성되겠쥬?

반응형