
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that considerably improves safety and availability, and reduces infrastructure administration overhead when establishing and working end-to-end knowledge pipelines within the cloud.
At the moment, we’re asserting the supply of Apache Airflow model 2.9.2 environments on Amazon MWAA. Apache Airflow 2.9.2 introduces a number of notable enhancements, comparable to new API endpoints for improved dataset administration, superior scheduling choices together with conditional expressions for dataset dependencies, the mixture of dataset and time-based schedules, and customized names in dynamic process mapping for higher readability of your DAGs.
On this publish, we stroll you thru a few of these new options and capabilities, how you should use them, and how one can arrange or improve your Amazon MWAA environments to Airflow 2.9.2.
With every new model launch, the Apache Airflow group is innovating to make Airflow extra data-aware, enabling you to construct reactive, event-driven workflows that may accommodate modifications in datasets, both between Airflow environments or in exterior methods. Let’s undergo a few of these new capabilities.
Logical operators and conditional expressions for DAG scheduling
Previous to the introduction of this functionality, customers confronted vital limitations when working with advanced scheduling situations involving a number of datasets. Airflow’s scheduling capabilities had been restricted to logical AND mixtures of datasets, which means {that a} DAG run would solely be created in spite of everything specified datasets had been up to date for the reason that final run. This inflexible method posed challenges for workflows that required extra nuanced triggering circumstances, comparable to working a DAG when any one in all a number of datasets was up to date or when particular mixtures of dataset updates occurred.
With the discharge of Airflow 2.9.2, now you can use logical operators (AND and OR) and conditional expressions to outline intricate scheduling circumstances primarily based on dataset updates. This characteristic permits for granular management over workflow triggers, enabling DAGs to be scheduled each time a selected dataset or mixture of datasets is up to date.
For instance, within the monetary providers business, a threat administration course of may must be run each time buying and selling knowledge from any regional market is refreshed, or when each buying and selling and regulatory updates can be found. The brand new scheduling capabilities obtainable in Amazon MWAA will let you specific such advanced logic utilizing easy expressions. The next diagram illustrates the dependency we have to set up.
The next DAG code accommodates the logical operations to implement these dependencies:
To be taught extra about this characteristic, seek advice from Logical operators for datasets within the Airflow documentation.
Combining dataset and time-based schedules
With Airflow 2.9.2 environments, Amazon MWAA now has a extra complete scheduling mechanism that mixes the flexibleness of data-driven execution with the consistency of time-based schedules.
Think about a state of affairs the place your workforce is chargeable for managing an information pipeline that generates each day gross sales reviews. This pipeline depends on knowledge from a number of sources. Though it’s important to generate these gross sales reviews each day to supply well timed insights to enterprise stakeholders, you additionally want to ensure the reviews are updated and replicate essential knowledge modifications as quickly as potential. For example, if there’s a big inflow of orders throughout a promotional marketing campaign, or if stock ranges change unexpectedly, the report ought to incorporate these updates to take care of relevance.
Relying solely on time-based scheduling for such a knowledge pipeline may result in potential points comparable to outdated data and infrastructure useful resource wastage.
The DatasetOrTimeSchedule characteristic launched in Airflow 2.9 provides the potential to mix conditional dataset expressions with time-based schedules. Because of this your workflow might be invoked not solely at predefined intervals but in addition each time there are updates to the desired datasets, with the particular dependency relationship amongst them. The next diagram illustrates how you should use this functionality to accommodate such situations.
See the next DAG code for an instance implementation:
Within the instance, the DAG will likely be run underneath two circumstances:
- When the time-based schedule is met (each day at midnight UTC)
- When the mixed dataset situation is met, when there are updates to each orders and stock knowledge, or when there are updates to buyer knowledge, whatever the different datasets
This flexibility lets you create subtle scheduling guidelines that cater to the distinctive necessities of your knowledge pipelines, in order that they run when essential and incorporate the newest knowledge updates from a number of sources.
For extra particulars on data-aware scheduling, seek advice from Information-aware scheduling within the Airflow documentation.
Dataset occasion REST API endpoints
Previous to the introduction of this characteristic, making your Airflow setting conscious of modifications to datasets in exterior methods was a problem—there was no choice to mark a dataset as externally up to date. With the brand new dataset occasion endpoints characteristic, you possibly can programmatically provoke dataset-related occasions. The REST API has endpoints to create, record, and delete dataset occasions.
This functionality permits exterior methods and functions to seamlessly combine and work together along with your Amazon MWAA setting. It considerably improves your means to increase your knowledge pipeline’s capability for dynamic knowledge administration.
For instance, working the next code from an exterior system means that you can invoke a dataset occasion within the goal Amazon MWAA setting. This occasion may then be dealt with by downstream processes or workflows, enabling larger connectivity and responsiveness in data-driven workflows that depend on well timed knowledge updates and interactions.
The next diagram illustrates how the totally different parts within the state of affairs work together with one another.
To get extra particulars on how one can use the Airflow REST API in Amazon MWAA, seek advice from Introducing Amazon MWAA assist for the Airflow REST API and internet server auto scaling. To be taught extra in regards to the dataset occasion REST API endpoints, seek advice from Dataset UI Enhancements within the Airflow documentation.
Airflow 2.9.2 additionally contains options to ease the operation and monitoring of your environments. Let’s discover a few of these new capabilities.
Dag auto-pausing
Prospects are utilizing Amazon MWAA to construct advanced knowledge pipelines with a number of interconnected duties and dependencies. When one in all these pipelines encounters a difficulty or failure, it may end up in a cascade of pointless and redundant process runs, resulting in wasted sources. This downside is especially prevalent in situations the place pipelines run at frequent intervals, comparable to hourly or each day. A typical state of affairs is a crucial pipeline that begins failing through the night, and as a result of failure, it continues to run and fails repeatedly till somebody manually intervenes the subsequent morning. This may end up in dozens of pointless duties, consuming priceless compute sources and probably inflicting knowledge corruption or inconsistencies.
The DAG auto-pausing characteristic goals to handle this problem by introducing two new configuration parameters:
- max_consecutive_failed_dag_runs_per_dag – This can be a international Airflow configuration setting. It means that you can specify the utmost variety of consecutive failed DAG runs earlier than the DAG is robotically paused.
- max_consecutive_failed_dag_runs – This can be a DAG-level argument. It overrides the earlier international configuration, permitting you to set a customized threshold for every DAG.
Within the following code instance, we outline a DAG with a single PythonOperator
. The failing_task
is designed to fail by elevating a ValueError
. The important thing configuration for DAG auto-pausing is the max_consecutive_failed_dag_runs
parameter set within the DAG object. By setting max_consecutive_failed_dag_runs=3
, we’re instructing Airflow to robotically pause the DAG after it fails three consecutive occasions.
With this parameter, now you can configure your Airflow DAGs to robotically pause after a specified variety of consecutive failures.
To be taught extra, seek advice from DAG Auto-pausing within the Airflow documentation.
CLI assist for bulk pause and resume of DAGs
Because the variety of DAGs in your setting grows, managing them turns into more and more difficult. Whether or not for upgrading or migrating environments, or different operational actions, it’s possible you’ll have to pause or resume a number of DAGs. This course of can develop into a frightening cyclical endeavor as a result of you’ll want to navigate by way of the Airflow UI, manually pausing or resuming DAGs one by one. These guide actions are time consuming and improve the danger of human error that may end up in missteps and result in knowledge inconsistencies or pipeline disruptions. The earlier CLI instructions for pausing and resuming DAGs may solely deal with one DAG at a time, making it inefficient.
Airflow 2.9.2 improves these CLI instructions by including the potential to deal with DAG IDs as common expressions, permitting you to pause or resume a number of DAGs with a single command. This new characteristic eliminates the necessity for repetitive guide intervention or particular person DAG operations, considerably lowering the danger of human error, offering reliability and consistency in your knowledge pipelines.
For instance, to pause all DAGs producing each day liquidity reporting utilizing Amazon Redshift as an information supply, you should use the next CLI command with a daily expression:
Customized names for Dynamic Job Mapping
Dynamic Job Mapping was added in Airflow 2.3. This highly effective characteristic permits workflows to create duties dynamically at runtime primarily based on knowledge. As a substitute of counting on the DAG creator to foretell the variety of duties wanted upfront, the scheduler can generate the suitable variety of copies of a process primarily based on the output of a earlier process. After all, with nice powers comes nice obligations. By default, dynamically mapped duties had been assigned numeric indexes as names. In advanced workflows involving excessive numbers of mapped duties, it turns into more and more difficult to pinpoint the particular duties that require consideration, resulting in potential delays and inefficiencies in managing and sustaining your knowledge workflows.
Airflow 2.9 introduces the map_index_template parameter, a extremely requested characteristic that addresses the problem of process identification in Dynamic Job Mapping. With this functionality, now you can present customized names to your dynamically mapped duties, enhancing visibility and manageability inside the Airflow UI.
See the next instance:
The important thing facet within the code is the map_index_template
parameter specified within the PythonOperator.partial
name. This Jinja template instructs Airflow to make use of the values of the ops_args
setting variable because the map index for every dynamically mapped process occasion. Within the Airflow UI, you will note three process cases with the indexes source_a
, source_b
, and source_c
, making it simple to establish and observe the duties related to every knowledge supply. In case of failures, this functionality improves monitoring and troubleshooting.
The map_index_template
characteristic goes past easy template rendering, providing dynamic injection capabilities into the rendering context. This performance unlocks larger ranges of flexibility and customization when naming dynamically mapped duties.
Consult with Named mapping within the Airflow documentation to be taught extra about named mapping.
TaskFlow decorator for Bash instructions
Writing advanced Bash instructions and scripts utilizing the normal Airflow BashOperator
could convey challenges in areas comparable to code consistency, process dependencies definition, and dynamic command era. The brand new @process.bash
decorator addresses these challenges, permitting you to outline Bash statements utilizing Python capabilities, making the code extra readable and maintainable. It seamlessly integrates with Airflow’s TaskFlow API, enabling you to outline dependencies between duties and create advanced workflows. You too can use Airflow’s scheduling and monitoring capabilities whereas sustaining a constant coding type.
The next pattern code showcases how the @process.bash decorator simplifies the mixing of Bash instructions into DAGs, whereas utilizing the total capabilities of Python for dynamic command era and knowledge processing:
You’ll be able to be taught extra in regards to the @process.bash
decorator within the Airflow documentation.
Arrange a brand new Airflow 2.9.2 setting in Amazon MWAA
You’ll be able to provoke the setup in your account and most well-liked AWS Area utilizing the AWS Administration Console, API, or AWS Command Line Interface (AWS CLI). For those who’re adopting infrastructure as code (IaC), you possibly can automate the setup utilizing AWS CloudFormation, the AWS Cloud Growth Equipment (AWS CDK), or Terraform scripts.
Upon profitable creation of an Airflow 2.9 setting in Amazon MWAA, sure packages are robotically put in on the scheduler and employee nodes. For an entire record of put in packages and their variations, seek advice from Apache Airflow supplier packages put in on Amazon MWAA environments. You’ll be able to set up extra packages utilizing a necessities file.
Improve from older variations of Airflow to model 2.9.2
You’ll be able to reap the benefits of these newest capabilities by upgrading your older Airflow model 2.x-based environments to model 2.9 utilizing in-place model upgrades. To be taught extra about in-place model upgrades, seek advice from Upgrading the Apache Airflow model or Introducing in-place model upgrades with Amazon MWAA.
Conclusion
On this publish, we introduced the supply of Apache Airflow 2.9 environments in Amazon MWAA. We mentioned how among the newest options added within the launch allow you to design extra reactive, event-driven workflows, comparable to DAG scheduling primarily based on the results of logical operations, and the supply of endpoints within the REST API to programmatically create dataset occasions. We additionally supplied some pattern code to indicate the implementation in Amazon MWAA.
For the entire record of modifications, seek advice from Airflow’s launch notes. For added particulars and code examples on Amazon MWAA, go to the Amazon MWAA Consumer Information and the Amazon MWAA examples GitHub repo.
Apache, Apache Airflow, and Airflow are both registered emblems or emblems of the Apache Software program Basis in the USA and/or different international locations.
In regards to the authors
Hernan Garcia is a Senior Options Architect at AWS, primarily based out of Amsterdam, working with enterprises within the Monetary Companies Business. He focuses on software modernization and helps prospects within the adoption of serverless applied sciences.
Parnab Basak is a Options Architect and a Serverless Specialist at AWS. He focuses on creating new options which might be cloud native utilizing fashionable software program growth practices like serverless, DevOps, and analytics. Parnab works intently within the analytics and integration providers area serving to prospects undertake AWS providers for his or her workflow orchestration wants.