home..

Airflow documentation - Mechanism

airflow는 자동으로 스케줄링하고 모니터링하기 위한 플랫폼이다. 워크플로우를 DAG구조( Directed Acyclic Graphs )로 파악할 수 있다.

다음과 같은 특징이 있다.

동작 원리

[ 예시 ]

Executor

Executor는 task 인스턴스를 실행하는 주체이다. Sequential Executor, Local Executor, Celery Executor, Dask Executor, Kubernetes Executor를 제공한다.

Local Executor

단일 장비에 웹 서버와 스케줄러를 같이 가동하고 task를 프로세스로 spawn하여 실행한다. Local Executor는 parallelism에 따라 나뉘는데 task_queue를 통해 실행한 task 수에 대해 제어한다.

parallelis이 0이 아니면 설정 수만큼 task 수를 제한한다.

class LocalExecutor(BaseExecutor)
    ...
    def start(self):
            self.manager = multiprocessing.Manager()
            self.result_queue = self.manager.Queue()
            self.workers = []
            self.workers_used = 0
            self.workers_active = 0
            self.impl = (LocalExecutor._UnlimitedParallelism(self) if self.parallelism == 0
                        else LocalExecutor._LimitedParallelism(self))
            self.impl.start()
class _UnlimitedParallelism(object) 
    ...
    def start(self):
        self.executor.workers_used = 0
        self.executor.workers_active = 0

    def execute_async(self, key, command):
        local_worker = LocalWorker(self.executor.result_queue) # result_queue를 대상으로 local를 생성한다.
        local_worker.key = key
        local_worker.command = command
        self.executor.workers_used += 1
        self.executor.workers_active += 1
        local_worker.start()


class _LimitedParallelism(object)
    ...
    def start(self):
        self.queue = self.executor.manager.Queue() # result_queue, task_queue를 대상으로 local 워커 수를 제한한다.
        self.executor.workers = [
            QueuedLocalWorker(self.queue, self.executor.result_queue)
            for _ in range(self.executor.parallelism)
        ]
        self.executor.workers_used = len(self.executor.workers)
        for w in self.executor.workers:
            w.start()

    def execute_async(self, key, command):
        self.queue.put((key, command))

[출처] https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/?fbclid=IwAR1g_gWEmZ9hdPvICeXfZI2pI2yHhvH4vMHO1Jl5VI2EeS-kk5Q55_BcFdQ [참고]

© 2024 Yujin Lee   •  Powered by Soopr   •  Theme  Moonwalk