Dynamic DAG technology with YAML and DAG Manufacturing unit in Amazon MWAA


Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that permits you to use a well-known Apache Airflow setting with improved scalability, availability, and safety to boost and scale what you are promoting workflows with out the operational burden of managing the underlying infrastructure. In Airflow, Directed Acyclic Graphs (DAGs) are outlined as Python code. Dynamic DAGs confer with the power to generate DAGs on the fly throughout runtime, usually primarily based on some exterior situations, configurations, or parameters. Dynamic DAGs lets you create, schedule, and run duties inside a DAG primarily based on knowledge and configurations which will change over time.

There are numerous methods to introduce dynamism in Airflow DAGs (dynamic DAG technology) utilizing setting variables and exterior recordsdata. One of many approaches is to make use of the DAG Manufacturing unit YAML primarily based configuration file technique. This library goals to facilitate the creation and configuration of recent DAGs by utilizing declarative parameters in YAML. It permits default customizations and is open-source, making it easy to create and customise new functionalities.

On this submit, we discover the method of making Dynamic DAGs with YAML recordsdata, utilizing the DAG Manufacturing unit library. Dynamic DAGs provide a number of advantages:

  1. Enhanced code reusability – By structuring DAGs via YAML recordsdata, we promote reusable parts, lowering redundancy in your workflow definitions.
  2. Streamlined upkeep – YAML-based DAG technology simplifies the method of modifying and updating workflows, guaranteeing smoother upkeep procedures.
  3. Versatile parameterization – With YAML, you possibly can parameterize DAG configurations, facilitating dynamic changes to workflows primarily based on various necessities.
  4. Improved scheduler effectivity – Dynamic DAGs allow extra environment friendly scheduling, optimizing useful resource allocation and enhancing total workflow runs
  5. Enhanced scalability – YAML-driven DAGs permit for parallel runs, enabling scalable workflows able to dealing with elevated workloads effectively.

By harnessing the facility of YAML recordsdata and the DAG Manufacturing unit library, we unleash a flexible method to constructing and managing DAGs, empowering you to create strong, scalable, and maintainable knowledge pipelines.

Overview of resolution

On this submit, we’ll use an instance DAG file that’s designed to course of a COVID-19 knowledge set. The workflow course of entails processing an open supply knowledge set supplied by WHO-COVID-19-World. After we set up the DAG-Manufacturing unit Python package deal, we create a YAML file that has definitions of varied duties. We course of the country-specific loss of life depend by passing Nation as a variable, which creates particular person country-based DAGs.

The next diagram illustrates the general resolution together with knowledge flows inside logical blocks.

Overview of the Solution

Stipulations

For this walkthrough, you must have the next stipulations:

Moreover, full the next steps (run the setup in an AWS Area the place Amazon MWAA is accessible):

  1. Create an Amazon MWAA setting (should you don’t have one already). If that is your first time utilizing Amazon MWAA, confer with Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Be sure that the AWS Identification and Entry Administration (IAM) person or function used for establishing the setting has IAM insurance policies hooked up for the next permissions:

The entry insurance policies talked about listed here are only for the instance on this submit. In a manufacturing setting, present solely the wanted granular permissions by exercising least privilege rules.

  1. Create an distinctive (inside an account) Amazon S3 bucket title whereas creating your Amazon MWAA setting, and create folders referred to as dags and necessities.
    Amazon S3 Bucket
  2. Create and add a necessities.txt file with the next content material to the necessities folder. Change {environment-version} along with your setting’s model quantity, and {Python-version} with the model of Python that’s suitable along with your setting:
    --constraint "https://uncooked.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas is required only for the instance use case described on this submit, and dag-factory is the one required plug-in. It is strongly recommended to verify the compatibility of the newest model of dag-factory with Amazon MWAA. The boto and psycopg2-binary libraries are included with the Apache Airflow v2 base set up and don’t should be laid out in your necessities.txt file.

  1. Obtain the WHO-COVID-19-global knowledge file to your native machine and add it beneath the dags prefix of your S3 bucket.

Just be sure you are pointing to the newest AWS S3 bucket model of your necessities.txt file for the extra package deal set up to occur. This could usually take between 15 – 20 minutes relying in your setting configuration.

Validate the DAGs

When your Amazon MWAA setting reveals as Accessible on the Amazon MWAA console, navigate to the Airflow UI by selecting Open Airflow UI subsequent to your setting.

Validate the DAG

Confirm the prevailing DAGs by navigating to the DAGs tab.

Verify the DAG

Configure your DAGs

Full the next steps:

  1. Create empty recordsdata named dynamic_dags.yml, example_dag_factory.py and process_s3_data.py in your native machine.
  2. Edit the process_s3_data.py file and put it aside with following code content material, then add the file again to the Amazon S3 bucket dags folder. We’re doing a little fundamental knowledge processing within the code:
    1. Learn the file from an Amazon S3 location
    2. Rename the Country_code column as acceptable to the nation.
    3. Filter knowledge by the given nation.
    4. Write the processed closing knowledge into CSV format and add again to S3 prefix.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Prime degree Variables change S3_BUCKET along with your bucket title ###
    s3 = boto3.shopper('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   standing = response['ResponseMetadata']['HTTPStatusCode']
   if standing == 200:
### learn csv file and filter primarily based on the nation to write down again ###
       df = pd.read_csv(response.get("Physique"))
       df.rename(columns={"Country_code": "nation"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Physique=csv_buffer.getvalue()
                   )
       standing = response['ResponseMetadata']['HTTPStatusCode']
       if standing == 200:
           print(f"Profitable S3 put_object response. Standing - {standing}")
       else:
           print(f"Unsuccessful S3 put_object response. Standing - {standing}")
   else:
       print(f"Unsuccessful S3 get_object response. Standing - {standing}")

  1. Edit the dynamic_dags.yml and put it aside with the next code content material, then add the file again to the dags folder. We’re stitching varied DAGs primarily based on the nation as follows:
    1. Outline the default arguments which might be handed to all DAGs.
    2. Create a DAG definition for particular person nations by passing op_args
    3. Map the process_s3_data perform with python_callable_name.
    4. Use Python Operator to course of csv file knowledge saved in Amazon S3 bucket.
    5. We have now set schedule_interval as 10 minutes, however be at liberty to regulate this worth as wanted.
default:
  default_args:
    proprietor: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Edit the file example_dag_factory.py and put it aside with the next code content material, then add the file again to dags folder. The code cleans the prevailing the DAGs and generates clean_dags() technique and the creating new DAGs utilizing the generate_dags() technique from the DagFactory occasion.
from airflow import DAG
import dagfactory
  
config_file = "/usr/native/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to wash up or delete any current DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. After you add the recordsdata, return to the Airflow UI console and navigate to the DAGs tab, the place you will see new DAGs.
    List the new DAGs
  2. When you add the recordsdata, return to the Airflow UI console and beneath the DAGs tab you will see new DAGs are showing as proven beneath:DAGs

You possibly can allow DAGs by making them energetic and testing them individually. Upon activation, an extra CSV file named count_death_{COUNTRY_CODE}.csv is generated within the dags folder.

Cleansing up

There could also be prices related to utilizing the varied AWS companies mentioned on this submit. To stop incurring future fees, delete the Amazon MWAA setting after you will have accomplished the duties outlined on this submit, and empty and delete the S3 bucket.

Conclusion

On this weblog submit we demonstrated learn how to use the dag-factory library to create dynamic DAGs. Dynamic DAGs are characterised by their skill to generate outcomes with every parsing of the DAG file primarily based on configurations. Think about using dynamic DAGs within the following eventualities:

  • Automating migration from a legacy system to Airflow, the place flexibility in DAG technology is essential
  • Conditions the place solely a parameter adjustments between completely different DAGs, streamlining the workflow administration course of
  • Managing DAGs which might be reliant on the evolving construction of a supply system, offering adaptability to adjustments
  • Establishing standardized practices for DAGs throughout your crew or group by creating these blueprints, selling consistency and effectivity
  • Embracing YAML-based declarations over complicated Python coding, simplifying DAG configuration and upkeep processes
  • Creating knowledge pushed workflows that adapt and evolve primarily based on the info inputs, enabling environment friendly automation

By incorporating dynamic DAGs into your workflow, you possibly can improve automation, adaptability, and standardization, finally enhancing the effectivity and effectiveness of your knowledge pipeline administration.

To be taught extra about Amazon MWAA DAG Manufacturing unit, go to Amazon MWAA for Analytics Workshop: DAG Manufacturing unit. For added particulars and code examples on Amazon MWAA, go to the Amazon MWAA Person Information and the Amazon MWAA examples GitHub repository.


Concerning the Authors

 Jayesh Shinde is Sr. Software Architect with AWS ProServe India. He focuses on creating varied options which might be cloud centered utilizing trendy software program improvement practices like serverless, DevOps, and analytics.

Harshd Yeola is Sr. Cloud Architect with AWS ProServe India serving to prospects emigrate and modernize their infrastructure into AWS. He focuses on constructing DevSecOps and scalable infrastructure utilizing containers, AIOPs, and AWS Developer Instruments and companies.

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here