유기농은 너무 비싸서 그런데 농약 친 건 어딨나요?

유기농은 너무 비싸서 그런데 농약 친 건 어딨나요?

29 Nov 2021

워크플로우 엔진 PREFECT : 개념

일주일 마다 한번씩 크롤링을 해서 정리한 후, 검색을 위해 엘라스틱 서치(ES)에 인덱스를 생성할 필요가 있다. 그래서 파이썬 기반의 새로운 워크플로우 관리 엔진인 PREFECT를 사용해보려 한다.

Prefect는 데이터 워크플로우들을 구축하는 툴이고, 워크플로우는 필요한 각 과정들을 특정 순서로 동작시킨다.

Prefect는 라틴어 praefectus 에서 시작 되는데 의미가 교사가 학교 규율에서 학생을 유지하도록 돕는 사람이라고 한다. 도메인을 감독하고 규칙을 잘 지키는지 확인하는 관리자 라는 의미에서 이름을 붙였다고 한다.

바로 아래는 Prefect의 Github과 공식 문서다. 여기를 참고해서 Prefect에서 사용하는 용어와 개념에 대해 정리한다.

Prefect 공식 Github

Prefect 공식 문서

1. 왜 Prefect 인데

Prefect는 데이터 파이프라인을 관리하는 워크플로우 관리 시스템으로 유저가 Tasks로 Flows를 구성하면 Prefect가 나머지를 맡는 구조로 동작 한다.

Prefect의 세가지 컨셉은 아래와 같다.

  • Automate all the things
  • Test local, deploy global
  • Simple but powerful

그리고 이 전제를 볼 수 있다.

너의 코드는 아마 동작할거야. 근데 안할 때도 있어.

워크플로우 관리 툴의 역할은 네거티브 엔지니어링 관점( 방어적인 코드를 짜라~ )으로 코드가 본래 역할을 다하는지, 실패 과정을 성공적인 보장 하는지를 관리하는 것이기 때문에 현실의 보험과 같은 리스크 관리 툴을 제공하려고 한다고 한다.

추가로 개발하기 쉽게 제공하고, 다른 엔진 들에 비해 제한하는 것들이 적고, 더 많은 단위 테스트와 더 거대한 테스트 범위를 보유한다고 한다.

왜 Prefect 인가?

Tasks are functions

Prefect의 Task는 언제 실행해야 하는지에 대한 특수한 규칙이 있는 함수고, 어떤 기능을 하는지에 대해서는 제한이 없다.

Workflows are containers

Prefect의 Workflow를 Flow로 계속 명시 하는데 Task 들을 위한 컨테이너 들이다. 로직이 없고 태스크 간 의존성을 표현한다.

Modularity

Prefect의 모든 컴포넌트는 각각의 모듈로 구성된다. 그래서 커스터마이징 하기 좋다.

Communication via state

Prefect는 State 개념으로 Task와 workflow 모두 동작에 대한 정보를 States로 생성한다

Massive concurrency

Prefect의 workflow가 언제든 동시성을 가지고 실행 가능하도록 설계 했고, 스케줄링이 오히려 특수한 케이스라고 한다.

Idempotency preferred ( but not required )

Prefect는 멱등성( 여러번 적용해도 결과가 달라지지 않는 성질 )에 대한 가정을 하지 않고 Task를 설계했기 때문에, 멱등성을 보장하도록 개발 하지 않아도 된다.

Automation framework

Prefect는 자동화 프레임워크가 갖추어야 할 간소화 된 워크플로우 정의, 워크플로우를 검증하고 실행하는 강력한 엔진, 워크플로우와 태스크의 상태를 모두 포함하고 있다. 특히 Prefect는 워크플로우 상태 정의를 풍부하게 제공해서 실행 중과 후에 대한 제한이 적어 자유롭다고 한다.

2. 왜 Airflow는 아닌데? ( Relative Works )

Apache Airflow는 현재 가장 많이 사용하고 있는 워크플로우 엔진이다. workflow-as-code의 첫 구현체로 데이터 엔지니어링 에코시스템에서 유용하고 유연한 패러다임을 제공했다. DAG(Directed Acyclic Graph) 모델을 파이썬과 결합한 것이 가장 큰 특징이다.

하지만 Airflow는 Monolithic Batch Scheduler 때문에 다른 Third-Party 시스템들의 오케스트레이션 지원이 제한 되는 경우가 있다. 사용자가 해결해야 하는 문제를 오히려 Airflow에 맞게 구현하게 되는 경우가 발생한다. 에어플로우가 지원하지 못하는 방식은 아래와 같다.

  • 스케줄 없이 실행해야하는 DAG
  • 동일한 시작 시간에 동시에 실행되는 DAG
  • 브랜치 로직이 복잡한 DAG
  • 빠른 작업이 많은 DAG
  • 데이터 교환에 의존하는 DAG
  • 파라미터화 된 DAG
  • 다이나믹 DAG

위의 이슈 해결을 위해 Custom DSL이나 Plugin 들을 자체적으로 개발해서 사용하는 경우가 많다. 내부적으로 구현한 툴을 유지 보수를 해야한다는 이슈가 추가로 발생한다. Prefect는 Airflow를 사용한 프로젝트를 진행하면서 이슈들을 해결한 수년간의 노하우를 기반으로 개발한 워크플로우 엔진이라고 한다.

API

Airlfow는 클래스 기반의 명령형 API를 제공하고 Workflow에서 할수 있는 것과 없는 것에 대한 제약 조건이 있다. 그래서 Airflow의 DAG 구현이 Airflow 코드 구현으로 느껴질 수 있다.

→ Prefect의 설계 목표는 Task에 대한 간섭을 최소화 하고, 잘못 될때 최대한 지원 하는 것이다. 따라서 “Functional API"로 제공되어야 하고 Task들은 함수처럼 동작한다. 파이썬 함수를 코드 한 줄로 태스크로 변환하고 그 함수를 호출하는 방식으로 DAG를 구축할 수 있다. Airflow 유저들을 위해 명령형 API도 제공하는데 더 복잡한 태스크 간 의존성 구현이나 명시적인 제어에 유용하다.

Scheduling and Time

Airflow는 특정 실행 시간인 execution_date에 대한 의존성이 강하다. 일단 동일한 execution_date로 두번 실행되는 DAG는 존재하지 않는다. 그리고 동시에 시작해서 연속 두번 실행 해야하는 DAG의 존재 자체를 가정하지 않는다. Airflow에서 execution_date는 DAG 시작 시간이 아닌 시작 시간으로 정해지는 interval 끝으로 인식된다.

  • execution_date에 대한 개념은 “2일의 데이터 작업이 3일에 수행된다"는 요구사항으로 시작된 것인데, 이 것 때문에 Workflow 스케줄링에 많은 오해가 발생한다.
  • interval에 대한 개념은 “DAG는 잘 정의된 스케줄이 있어야 한다” 요구사항에서 나왔다. 최근까지 스케줄 외 실행이 불가능 했다.

→ Prefect에서는 Workflow를 언제든지 동시에 실행 가능한 독립 객체로 본다. 스케줄은 간단하게도 복잡하게도 만들 수 있다. 시간에 의존한 워크플로우를 원하면 파라미터로 추가하면 된다.

Scheduler는 Airflow의 성능에 중요한 역할을 하는 서비스로 아래의 기능들을 담당한다

  • 매 초마다 DAG 폴더 파싱
  • DAG 실행 준비를 결정하기 위한 DAG 스케줄 체크
  • Task 실행 준비를 결정하기 위한 Task 의존성 체크
  • 최종 DAG 상태 DB 반영

→ Prefect는 위의 로직의 대부분을 서로 다른 프로세스로 분리했다. Prefect의 Flow 스케줄링은 워크플로우의 로직이나 실행에 관련되지 않는 가벼운 작업으로 새로운 Flow 실행을 생성하고 Scheduled 상태로 위치시킨다. Flow는 논리적으로 독립적인 Workflow 실행 단위로 스케줄러가 파싱을 하거나 결과에 대해 상호작용하지 않는다. Flow가 동작하면, 내부의 태스크들을 스케줄링한다. 중요한 몇가지 이유가 있는데

  • Flow는 워크플로우 로직의 소스 코드로, 책임을 져아하는 유일한 객체다
  • 중앙 스케줄러의 막대한 부담을 완화할 수 있다
  • Flow가 Task 동적 생성과 같은 유니크한 상황에서 결정을 할수 있게 한다 ( 예를 들어, Prefect의 map 연산자의 결과 )
  • Dask 같은 외부 시스템 들의 구체적인 실행을 아웃소싱 할 수 있게 한다

Airflow는 Local, Celery, Dask 그리고 K8S 같은 다양한 실행 환경을 지원하지만, 자체 스케줄링에 대한 병목 현상이 남아 있다고 한다. 그리고 기본적으로는 “대규모” 태스크를 권장한다.

→ Prefect는 모던한 기술을 채용해서 Prefect 클라우드는 기본적으로 K8S의 Dask 클러스터에 배포된다고 한다. 그리고 모듈식인 작은 태스크로 구현을 권장하지만, 거대한 태스크도 다룰 수 있다.

Dataflow

데이터 파이프라인을 구축하기 위해서 대부분 Airflow를 사용한다. 하지만 아이러니 하게 데이터 플로우를 가장 좋은 방식으로 제공하지는 않는다. XCom으로 Task간 데이터 교환을 제공하는데, XCom은 관리자 접근으로 에어플로우 메타 DB에 실행 가능한 pickle 형식으로 작성하는 방식으로 Push/Pull 로 동작한다. 이전 태스크가 데이터를 저장한 경로를 다음 태스크가 전달 받는 것과 같은 작은 메타데이터 교환에는 적절하다. 하지만 데이터 파이프라인 메커니즘으로 사용하는데 많은 문제가 있다.

  1. 데이터 보안 상 이슈 ( .pkl 포맷 기반 데이터 저장 )
  2. 성능과 비용 이슈 ( TTL과 expire 기능 없음, 메타 DB 사용 ) .
  3. 태스크 간 업스트림과 다운스트림 종속성 강제 ( PushPull )

→ Prefect에서 Task들은 입력을 선택적으로 받고 출력값도 선택적으로 리턴한다. DB에 데이터를 작성하지 않는다. 대신 결과 스토리지가 유저가 쉽게 구성 가능한 보안 로직으로 관리된다. 아래와 같은 장점들이 있다.

  • 친숙한 파이썬 패턴으로 코드를 작성할 수 있다
  • 엔진이 의존성을 알고 있는 투명한 디버깅을 제공한다
  • 의존성 없는 Airflow 스타일 패턴들이 여전히 지원된다.
  • 태스크들 간 직접적인 데이터 교환으로 더 복잡한 분기 로직, 풍부한 태스크 상태를 제공할 수 있다. Flow 내에서 Task와 Runner간 엄격한 관계를 정의할 수 있다.

Parametrized Workflows

Airflow는 고정된 스케줄 상에서 동작한다고 가정되고 입력을 받지 않는다. 이 제약 사항에서도 Airflow에서 파라미터화된 워크플로우를 운영하기 위해서는 Hijacking의 개념을 사용해야 한다. 지속적인 DAG 파일을 파싱과 재분석하는 과정이 필요하고, DAG 파일에 동적으로 응답하는 Airflow 변수를 사용해야 한다. 문제가 생길 가능성이 높다.

→ Prefect는 런타임에 오버라이딩 될 수 있는 Parameter라는 태스크의 특별한 타입을 제공한다. 아래와 같은 장점 들이 있다.

  • 뭔가 잘못되고 있을 때를 위한 더 투명한 데이터 계보( linease )
  • 다른 파라미터 값을 위해 새롭게 실행 시키면 되고 Workflow를 재생성할 필요 없다
  • 이벤트에 응답하는 Workflow 셋업이 가능하고, Workflow가 이벤트의 컨텐츠나 타입에 따라 서로 다른 분기들을 이용할 수 있다

Dynamic Workflows

정확한 횟수를 모른채 반복되어야 하는 태스크가 필요한 경우가 있다. Airflow에서는 다운스트림 태스크 B를 구현해서 특정 액션을 수행하기 위해 루프문을 실행해야 한다. 이 구현의 단점은

  • UI가 동적 워크로드에 대해 알 수 없기 때문에 동작 모니터링이 어렵다.
  • 특정 레코드를 처리하는 태스크 실패 시, 전체 워크플로우가 실패 된다
  • 실패 후 재시도를 시스템이 이해하지 못하기 때문에. 멱등성을 보장하는 재시도 로직을 반복적으로 구현해야 한다.

→ Prefect는 Task Mapping으로 런타임에 새로운 복사본 태스크들을 업스트림 태스크의 출력으로 동적으로 생성 가능하다. 쉽게 말해 동적인 병렬 파이프라인들을 만들 수 있다. 동적 파이프라인들의 결과를 리듀싱하거나 모으는 것도 매핑되지 않은 태스크의 입력으로 피딩하면 되어 간단하다. Task Mapping이 가진 아래의 장점이 있다.

  • 매핑 패턴이 Flow를 구체화 하기 쉽다
  • 각 Task는 독립적인 객체라서 개별적이고 독립적으로 ‘재시도/알람’을 할 수 있다. 동적으로 생성되는 파이프라인도 직접 생성한 파이프라인이 가지는 상태 관리에 대한 모든 기능적인 장점을 가진다
  • Flow를 실행 마다 동적으로 태스크의 수가 결정 될 수 있다
  • UI가 매핑된 작업을 표현 할 수 있다

Versioned Workflows

Airflow는 중앙 스케줄러가 DAG 파일의 경로를 DAG 로드하는 방식으로 동작한다. DAG 코드가 업데이트 되면 새롭게 DAG를 로드 하지만 DAG의 변경점을 모르는 상태로 실행하기 때문에 아래와 같은 문제가 있다.

  • 과거 DAG 재사용을 위해 별도의 엔티티로 이름을 지정해야 한다
  • UI는 버전 시스템에 대해 모르기 때문에 버전에 대한 정보를 확인 할 수 없다.

→ Prefect에서는 모든 Workflow가 version group 파트가 될 수 있다. 히스토리 유지와 트래킹이 쉬워진다. 기본적인 세팅은 다음과 같다.

  • 동일한 이름의 Flow를 배포하면 자동으로 버저닝 된다
  • Flow가 새롭게 버저닝 되면 이전 버전에 비해 높은 버전을 얻게 된다

Local Testing

Airflow와 Prefect 모두 파이썬으로 작성되었기 때문에 표준 파이썬 패턴을 사용해서 태스크 / 오퍼레이터를 유닛 테스트 할 수 있다. Airflow에서는 DagBag을 임포트할 수 있고 Prefect에서는 Flow를 임포트 할 수 있다.

Airflow의 워크플로우 로직 테스트는 Prefect에 비해 두 가지가 더 어려울 수 있다.

  • CI 파이프라인에 포함하기 어려울 수 있다. Airflow의 DAG 레벨의 실행은 중앙 스케줄러에 의해 제어 되기 때문에, Airflow DB와 스케줄러 서비스가 실행 중이어야 테스트가 가능하다
  • 테스팅 복잡도가 높다. Airflow의 태스크 State 개념은 상태를 묘사하는 String이다. 어떤 예외가 발생했는지 확인을 하기 위해 DB 쿼리를 해야한다

→ Prefect에서는 flow 들이 flow.run이나 FlowRunner를 사용해서 로컬에 자체적으로 실행될 수 있다. 추가로 업스트림 태스크의 상태를 수동 지정하는 방법을 포함해 Flow를 테스트 할 수 있도록 수많은 키워드 파라미터를 제공한다.

UI

Airflow가 인기있었던 이유 중 하나는 웹 인터페이스 였다. UI에서 스케줄을 켜거나 끄고 DAG를 시각화하고 SQL 쿼리를 사용해서 Airflow의 메타 데이터에 접근할 수 있었다. 사실은 데이터베이스 뷰를 노출하는 간단한 방식 이었다.

Prefect UI

→ Prefect는 실시간 UI를 지원한다.

  • 시스템 오버뷰용 대시보드
  • 파라미터화된 실행 스케줄링
  • 태스크와 실행 상태 실시간 업데이트
  • 메뉴얼 하게 상태 업데이트
  • 스트리밍 로그들, 최신 에러 로그로 즉시 점프
  • 글로벌 검색
  • 에이전트 관리
  • flow들을 조직화 하는 프로젝트
  • 팀 관리와 퍼미션
  • API 키 생성
  • 글로벌 동시성 제한
  • 타임 존
  • 기타 등등

Prefect 적으로 생각하기

데이터 워크플로우의 각 과정은 Task라고 하고 Task들을 결합하는 것을 Flow라고 한다.

Tasks

태스크는 파이썬 함수다. 기존 파이썬 함수에 데코레이터를 추가하면 새로운 태스크를 생성할 수 있다. Prefect에서는 “작은 태스크"들로 워크플로우를 구성하기를 권장하지만 Prefect Engine에서 각 태스크 처리를 위해 많은 기능을 제공하기 때문에 사실상 제한은 없다. 논리적인 단위로 구분되도록 동작하기만 하면 된다. 작게 구성하면 크게 구성하는데에 비해 더 유용하게 지원하는 기능들을 사용 할 수 있을 뿐이다.

Task 입력과 출력

입력 값들을 선택적으로 받을 수 있고 출력 값도 선택적으로 생성 할 수 있다. 추가적으로 파이썬3의 어노테이션으로 타이핑을 할 수 있다.

Task classes

클래스를 사용하면 더 복잡하게 태스크를 설계 할 수 있다. Task 클래스의 서브 클래스로 구성하고 __init__과 run() 메소드를 구현할 수 있다. 태스크를 클래스로 구현하게 되면 초기화 한 후 사용해야 한다는 단점이 있다. ( 함수에 데코레이터 방식으로 사용하면 초기화는 자동으로 된다 )

Flows

태스크 간 의존성 정의에 사용한다. 태스크라는 함수를 결합하는 스크립트라고 생각하면 된다

Functional API

Flow를 구축하는 가장 간단한 방법으로 스크립트와 유사한 스타일을 제공한다. Flow를 컨텍스트 매니저로 생성하고 각 태스크를 함수로 호출한다. Prefect는 각 함수 호출을 트래킹하고 그래프를 구축해서 Workflow로 구축한다. 실제로 실행 되지는 않는다.

Running the flow

Flow가 한번 생성되면, flow.run()을 호출해서 실행 할 수 있다.

Parameters

런타임에 Flow로 정보를 제공하는 기능은 유용하다. Prefect는 Parameter라는 특별한 태스크로 이 기능을 제공한다.

Imperative API

더 프로그램 적인 방식으로 Flow를 구축하는 방법이다. Functional API와 함께 사용할 수 있다.

Orchestrating flows

Prefect가 이미 완벽하게 구축된 상태에서 Flow를 오케스트레이션하고 모니터링 할 수 있는데 Prefect Cloud와 오픈소스를 통해 Backend 서버를 제공하고 있다.

Trigger

태스크를 좀 더 복잡한 방법으로 동작시키는 Flow를 구축하는게 더 좋은 경우가 있다. 만약 스파크를 이용한 워크플로우를 예로 들면

  1. 첫번째 작업에서 원격 클러스터를 구축
  2. 두번째 작업에서 구축한 클러스터에서 잡을 진행
  3. 세번째에서 원격 클러스터를 회수

위 Flow의 두번째 작업이 실패하면 세번째인 클러스터 회수가 동작하지 않아 클러스터가 영원히 동작하게 된다. 따라서 Prefect 에서는 다양한 내장 트리거를 통해 이러한 상황을 해결할 방법을 제공한다.

Reference Tasks

트리거만 사용했을 때 발생하는 시나리오로 실제 두번째 태스크가 실패해도 트리거를 사용해서 세번째 태스크를 성공적으로 마치면 전체 워크플로우가 성공 상태가 된다. 따라서 Prefect는 Reference Task를 추가로 지원해서 자체적으로 최종 상태를 결정할 수 있다. 두번째 태스크가 실패하고 세번째 작업이 최종적으로 성공적으로 동작해도 Flow의 State를 실패로 만들 수 있다.

Signals

Task를 특정 State로 즉시 변경해야 한다는 것을 Prefect Engine에 알리는 기능이다. Signal을 이용해서 Task의 상태를 디테일하게 관리할 수 있다.

Categories