Is this the Next Best Way to Create DAGs?
Reuse one or more tasks in more than one DAGs without duplicating your code.

Probably not but it was fun writing nevertheless. Not much fun reading it though as it's meant to be rude. So if you are easily offended or a true python purist, please click away now.
Check this out:
- Do you want to be able to reuse one or more tasks in more than one DAG without duplicating your code?
- Do you despise jinja and templating with a passion because they make your code look like sh*t?
Here's what we're going to do. We'll use good ol' python in a weird and creative way that may make you raise your eyebrows. Tough potatoes. Whatever that means. Let's go.
This is the directory structure. If you have import issues, you are in the wrong place. Ask chatGPT what to do and leave me alone.
|--i_am_not_common
| |--config.py
| |--model.py
|--dags
| |--the_big_tamale.py
|--tasks
| |--dance.py
| |--finish.py
| |--start.py
Model
First let's create our model that once initialized will contain the id of the DAG and it's tasks.
from dataclasses import dataclass
from typing import Any, Dict, List
@dataclass
class Pipeline:
dag_id: str
tasks: List[Dict[str, Any]]
Tasks
Now let's create three tasks in separate files.
import os
from airflow.decorators import task
TASK_ID = os.path.basename(__file__).partition(".")[0]
@task(task_id=TASK_ID)
def run() -> None:
print("I'm starting")
import os
from airflow.decorators import task
TASK_ID = os.path.basename(__file__).partition(".")[0]
@task(task_id=TASK_ID)
def run() -> None:
print("I'm dancing")
import os
from airflow.decorators import task
TASK_ID = os.path.basename(__file__).partition(".")[0]
@task(task_id=TASK_ID)
def run() -> None:
print("I'm finished")
Configuration
Let's describe the task sequence. Note, that this exercise demoes the capability to execute a linear task progression (ordinal
is here only for illustration purposes and maybe an inspiration for a later exercise). Where is the fun in giving you everything?
from i_am_not_common.tasks.model import Pipeline
from dags.tasks import dance, finish, start
the_big_tamale = Pipeline(
dag_id="the_big_tamale",
tasks=[
{"ordinal": 1, "name": "start", "callable": start.run},
{"ordinal": 2, "name": "dance", "callable": dance.run},
{"ordinal": 3, "name": "finish", "callable": finish.run},
],
)
The Big Tamale
Proceed with caution. You might hurt yourself.
import importlib
from airflow.models.baseoperator import chain
from datetime import datetime
from airflow.models.dag import dag, DAG
from i_am_not_common.tasks import config
for task_info in config.super_dynamic_dag.tasks:
importlib.import_module(name=f"""dags.tasks.{task_info["name"]}""")
LOCALS = locals()
with DAG(
catchup=False,
dag_id=config.the_big_tamale.dag_id,
description="experiment",
start_date=datetime(year=2023, month=1, day=1),
tags=["experiment"],
max_active_runs=1,
) as dag:
for task_info in config.super_dynamic_dag.tasks:
LOCALS[task_info["name"]] = task_info["callable"]
callables = []
for task_info in config.the_big_tamale.tasks:
task_name = task_info["name"]
task_callable = task_info["callable"]
task_callable.dag = dag
callables.append(task_callable())
chain(*callables)
Well, hello? Does your DAG look like this?

If not, then good luck with that. If yes, then life is beautiful. Either way, let's unpack it for fun.
The unpacking
for task_info in config.super_dynamic_dag.tasks:
importlib.import_module(name=f"""dags.tasks.{task_info["name"]}""")
The importlib is used to dynamically import the relevant tasks with the f-string value of name
constructed from your directory/module structure notation.
LOCALS = locals()
I use this because when we later add keys to our locals, the value assignment looks marginally better. E.g LOCALS["key"] = "value
" is better than locals()["key"] = "value"
. Eeerm or not, they both look like crap so it doesn't matter, it doesn't look pythonic... or does it? I told you, you may raise your eyebrows. This is dangerous territory especially if you over-write a local you shouldn't, so pay attention to the naming conventions.
with DAG(
catchup=False,
dag_id=config.the_big_tamale.dag_id,
description="DAG experiment",
start_date=datetime(year=2023, month=1, day=1),
tags=["experiment"],
max_active_runs=1,
) as dag:
Hang on, why is this hardcoded? Isn't the point of this exercise to be fully parameterised? Keep reading you lazy bum.
Use the
model.py
and config.py
to further parameterise this DAG. Ha!Let's continue unpacking:
for task_info in config.super_dynamic_dag.tasks:
LOCALS[task_info["name"]] = task_info["callable"]
Hang on what's that? That's interpreted the same as:
@task
def start():
...
@task
def dance():
...
@task
def finish():
...
It just makes the DAG a little more compact and mysterious.
callables = []
for task_info in config.the_big_tamale.tasks:
task_name = task_info["name"]
task_callable = task_info["callable"]
task_callable.dag = dag
callables.append(task_callable())
chain(*callables)
This above monstrosity is iterating through the tasks of the Pipeline
class instance, assigning a dag
object to each task (otherwise you are doomed to fail with a trecherous airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again
, and at the very end is chains everything together nicely.
1. Create a new task called
spin.py
that prints "I am spinning". 2.
spin.py
is kicked off by start.py
and is executed at the same time as dance.py
. 3.
finish.py
will only execute once both dance.py
and spin.py
have completed.4. I could have used a diagram but couldn't be bothered.
Can you design this in another way? E.g could the dag object be assigned to a task post
Pipeline
class initialization and if yes what would your DAG look like?This questionable and clean design will allow you to test your tasks independently, remove duplication and unlock reusability across multiple DAGs.
There are of course some other ways of doing the same thing such as gusty or dag-factory which look pretty good if you want an existing solution.
Thanks for reading. Bye.