TransWikia.com

Airflow DAG with configuration/parameter json and loop to that parameter to generate the operators

Stack Overflow Asked by alltej on November 24, 2021

I have a manually triggered dag. It takes a parameters like:

{"id_list":"3,5,1"}

In the DAG, I create the operators dynamically based on this list of integers:

for id in id_list:
   task = create_task(id)

I need to initialize the id_list based on the parameter values of id_list.
How can I initialize that list since I cannot reference that parameter directly when not in a templated field? This is how I want to see it in the Graph View where the process tasks are based on the id_list params.

enter image description here

I have seen examples of dynamically created tasks but they are not really dynamic in the sense that the list values are hard-coded. The tasks are created dynamically based on the list of hard-code values, if that makes sense.

2 Answers

First, create a fixed number of tasks to execute. This example is using PythonOperator. In the python_callable, if the index is less than the length of the param_list then execute else raise AirflowSkipException

        def execute(index, account_ids):
            param_list = account_ids.split(',')
            if index < len(param_list):
                print(f"execute task index {index}")
            else:
                raise AirflowSkipException


        def create_task(task_id, index):
            return PythonOperator(task_id=task_id,
                                  python_callable=execute,
                                  op_kwargs={
                                      "index": index,
                                      "account_ids": "{{ dag_run.conf['account_ids'] }}"}
                                  )

        record_size_limit = 5
        ACCOUNT_LIST = [None] * record_size_limit

        for idx in range(record_size_limit):
            task = create_task(f"task_{idx}", idx)
            task

Trigger DAG and pass this as parameters:

enter image description here

Graph View:

enter image description here

Answered by alltej on November 24, 2021

A DAG and its tasks must be resolved prior to being available for use; this includes the webserver, scheduler, everywhere. The webserver is actually a perfect example why: how would you render the process to the user?

The only dynamic components of a process are the parameters that are available during template rendering. In most cases I've seen people use a PythonOperator to loop over the input and perform some action N times to solve the same issue.

Answered by joebeeson on November 24, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP