Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you should use to arrange and function knowledge pipelines within the cloud at scale. Apache Airflow is an open supply device used to programmatically writer, schedule, and monitor sequences of processes and duties, known as workflows. With Amazon MWAA, you should use Apache Airflow and Python to create workflows with out having to handle the underlying infrastructure for scalability, availability, and safety.
Through the use of a number of AWS accounts, organizations can successfully scale their workloads and handle their complexity as they develop. This method gives a sturdy mechanism to mitigate the potential impression of disruptions or failures, ensuring that essential workloads stay operational. Moreover, it allows price optimization by aligning sources with particular use instances, ensuring that bills are effectively managed. By isolating workloads with particular safety necessities or compliance wants, organizations can preserve the best ranges of information privateness and safety. Moreover, the power to arrange a number of AWS accounts in a structured method means that you can align your corporation processes and sources in line with your distinctive operational, regulatory, and budgetary necessities. This method promotes effectivity, flexibility, and scalability, enabling giant enterprises to fulfill their evolving wants and obtain their targets.
This put up demonstrates orchestrate an end-to-end extract, rework, and cargo (ETL) pipeline utilizing Amazon Easy Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.
Resolution overview
For this put up, we take into account a use case the place a knowledge engineering staff needs to construct an ETL course of and provides the perfect expertise to their end-users after they wish to question the most recent knowledge after new uncooked recordsdata are added to Amazon S3 within the central account (Account A within the following structure diagram). The info engineering staff needs to separate the uncooked knowledge into its personal AWS account (Account B within the diagram) for elevated safety and management. Additionally they wish to carry out the info processing and transformation work in their very own account (Account B) to compartmentalize duties and forestall any unintended modifications to the supply uncooked knowledge current within the central account (Account A). This method permits the staff to course of the uncooked knowledge extracted from Account A to Account B, which is devoted for knowledge dealing with duties. This makes positive the uncooked and processed knowledge could be maintained securely separated throughout a number of accounts, if required, for enhanced knowledge governance and safety.
Our resolution makes use of an end-to-end ETL pipeline orchestrated by Amazon MWAA that appears for brand new incremental recordsdata in an Amazon S3 location in Account A, the place the uncooked knowledge is current. That is completed by invoking AWS Glue ETL jobs and writing to knowledge objects in a Redshift Serverless cluster in Account B. The pipeline then begins working saved procedures and SQL instructions on Redshift Serverless. Because the queries end working, an UNLOAD operation is invoked from the Redshift knowledge warehouse to the S3 bucket in Account A.
As a result of safety is vital, this put up additionally covers configure an Airflow connection utilizing AWS Secrets and techniques Supervisor to keep away from storing database credentials inside Airflow connections and variables.
The next diagram illustrates the architectural overview of the parts concerned within the orchestration of the workflow.
The workflow consists of the next parts:
- The supply and goal S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a special account (Account B). Cross-account entry has been arrange between S3 buckets in Account A with sources in Account B to have the ability to load and unload knowledge.
- Within the second account, Amazon MWAA is hosted in a single VPC and Redshift Serverless in a special VPC, that are related by way of VPC peering. A Redshift Serverless workgroup is secured inside personal subnets throughout three Availability Zones.
- Secrets and techniques like person identify, password, DB port, and AWS Area for Redshift Serverless are saved in Secrets and techniques Supervisor.
- VPC endpoints are created for Amazon S3 and Secrets and techniques Supervisor to work together with different sources.
- Often, knowledge engineers create an Airflow Directed Acyclic Graph (DAG) and commit their modifications to GitHub. With GitHub actions, they’re deployed to an S3 bucket in Account B (for this put up, we add the recordsdata into S3 bucket instantly). The S3 bucket shops Airflow-related recordsdata like DAG recordsdata,
necessities.txt
recordsdata, and plugins. AWS Glue ETL scripts and property are saved in one other S3 bucket. This separation helps preserve group and keep away from confusion. - The Airflow DAG makes use of numerous operators, sensors, connections, duties, and guidelines to run the info pipeline as wanted.
- The Airflow logs are logged in Amazon CloudWatch, and alerts could be configured for monitoring duties. For extra data, see Monitoring dashboards and alarms on Amazon MWAA.
Conditions
As a result of this resolution facilities round utilizing Amazon MWAA to orchestrate the ETL pipeline, you must arrange sure foundational sources throughout accounts beforehand. Particularly, you must create the S3 buckets and folders, AWS Glue sources, and Redshift Serverless sources of their respective accounts previous to implementing the total workflow integration utilizing Amazon MWAA.
Deploy sources in Account A utilizing AWS CloudFormation
In Account A, launch the supplied AWS CloudFormation stack to create the next sources:
- The supply and goal S3 buckets and folders. As a greatest apply, the enter and output bucket buildings are formatted with hive type partitioning as
s3://<bucket>/merchandise/YYYY/MM/DD/
. - A pattern dataset referred to as
merchandise.csv
, which we use on this put up.
Add the AWS Glue job to Amazon S3 in Account B
In Account B, create an Amazon S3 location referred to as aws-glue-assets-<account-id>-<area>/scripts
(if not current). Substitute the parameters for the account ID and Area within the sample_glue_job.py script and add the AWS Glue job file to the Amazon S3 location.
Deploy sources in Account B utilizing AWS CloudFormation
In Account B, launch the supplied CloudFormation stack template to create the next sources:
- The S3 bucket
airflow-<username>-bucket
to retailer Airflow-related recordsdata with the next construction:- dags – The folder for DAG recordsdata.
- plugins – The file for any customized or neighborhood Airflow plugins.
- necessities – The
necessities.txt
file for any Python packages. - scripts – Any SQL scripts used within the DAG.
- knowledge – Any datasets used within the DAG.
- A Redshift Serverless setting. The identify of the workgroup and namespace are prefixed with
pattern
. - An AWS Glue setting, which incorporates the next:
- An AWS Glue crawler, which crawls the info from the S3 supply bucket
sample-inp-bucket-etl-<username>
in Account A. - A database referred to as
products_db
within the AWS Glue Information Catalog. - An ELT job referred to as
sample_glue_job
. This job can learn recordsdata from themerchandise
desk within the Information Catalog and cargo knowledge into the Redshift deskmerchandise
.
- An AWS Glue crawler, which crawls the info from the S3 supply bucket
- A VPC gateway endpointto Amazon S3.
- An Amazon MWAA setting. For detailed steps to create an Amazon MWAA setting utilizing the Amazon MWAA console, check with Introducing Amazon Managed Workflows for Apache Airflow (MWAA).
Create Amazon Redshift sources
Create two tables and a saved process on an Redshift Serverless workgroup utilizing the merchandise.sql file.
On this instance, we create two tables referred to as merchandise
and products_f
. The identify of the saved process is sp_products
.
Configure Airflow permissions
After the Amazon MWAA setting is created efficiently, the standing will present as Obtainable. Select Open Airflow UI to view the Airflow UI. DAGs are routinely synced from the S3 bucket and visual within the UI. Nonetheless, at this stage, there aren’t any DAGs within the S3 folder.
Add the shopper managed coverage AmazonMWAAFullConsoleAccess
, which grants Airflow customers permissions to entry AWS Identification and Entry Administration (IAM) sources, and fasten this coverage to the Amazon MWAA function. For extra data, see Accessing an Amazon MWAA setting.
The insurance policies hooked up to the Amazon MWAA function have full entry and should solely be used for testing functions in a safe take a look at setting. For manufacturing deployments, observe the least privilege precept.
Arrange the setting
This part outlines the steps to configure the setting. The method includes the next high-level steps:
- Replace any obligatory suppliers.
- Arrange cross-account entry.
- Set up a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
- Configure Secrets and techniques Supervisor to combine with Amazon MWAA.
- Outline Airflow connections.
Replace the suppliers
Comply with the steps on this part in case your model of Amazon MWAA is lower than 2.8.1 (the most recent model as of penning this put up).
Suppliers are packages which are maintained by the neighborhood and embody all of the core operators, hooks, and sensors for a given service. The Amazon supplier is used to work together with AWS companies like Amazon S3, Amazon Redshift Serverless, AWS Glue, and extra. There are over 200 modules inside the Amazon supplier.
Though the model of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon supplied package deal model 8.2.0, assist for Amazon Redshift Serverless was not added till the Amazon supplied package deal model 8.4.0. As a result of the default bundled supplier model is older than when Redshift Serverless assist was launched, the supplier model have to be upgraded as a way to use that performance.
Step one is to replace the constraints file and necessities.txt
file with the right variations. Check with Specifying newer supplier packages for steps to replace the Amazon supplier package deal.
- Specify the necessities as follows:
- Replace the model within the constraints file to eight.4.0 or larger.
- Add the constraints-3.11-updated.txt file to the
/dags
folder.
Check with Apache Airflow variations on Amazon Managed Workflows for Apache Airflow for proper variations of the constraints file relying on the Airflow model.
- Navigate to the Amazon MWAA setting and select Edit.
- Underneath DAG code in Amazon S3, for Necessities file, select the most recent model.
- Select Save.
It will replace the setting and new suppliers can be in impact.
- To confirm the suppliers model, go to Suppliers below the Admin desk.
The model for the Amazon supplier package deal must be 8.4.0, as proven within the following screenshot. If not, there was an error whereas loading necessities.txt
. To debug any errors, go to the CloudWatch console and open the requirements_install_ip
log in Log streams, the place errors are listed. Check with Enabling logs on the Amazon MWAA console for extra particulars.
Arrange cross-account entry
You’ll want to arrange cross-account insurance policies and roles between Account A and Account B to entry the S3 buckets to load and unload knowledge. Full the next steps:
- In Account A, configure the bucket coverage for bucket
sample-inp-bucket-etl-<username>
to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucketsample-inp-bucket-etl-<username>
: - Equally, configure the bucket coverage for bucket
sample-opt-bucket-etl-<username>
to grant permissions to Amazon MWAA roles in Account B to place objects on this bucket: - In Account A, create an IAM coverage referred to as
policy_for_roleA
, which permits obligatory Amazon S3 actions on the output bucket: - Create a brand new IAM function referred to as
RoleA
with Account B because the trusted entity function and add this coverage to the function. This enables Account B to imagine RoleA to carry out obligatory Amazon S3 actions on the output bucket. - In Account B, create an IAM coverage referred to as
s3-cross-account-access
with permission to entry objects within the bucketsample-inp-bucket-etl-<username>
, which is in Account A. - Add this coverage to the AWS Glue function and Amazon MWAA function:
- In Account B, create the IAM coverage
policy_for_roleB
specifying Account A as a trusted entity. The next is the belief coverage to imagineRoleA
in Account A: - Create a brand new IAM function referred to as
RoleB
with Amazon Redshift because the trusted entity sort and add this coverage to the function. This enablesRoleB
to imagineRoleA
in Account A and in addition to be assumable by Amazon Redshift. - Connect
RoleB
to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A. - Connect the coverage
policy_for_roleB
to the Amazon MWAA function, which permits Amazon MWAA to entry the output bucket in Account A.
Check with How do I present cross-account entry to things which are in Amazon S3 buckets? for extra particulars on establishing cross-account entry to things in Amazon S3 from AWS Glue and Amazon MWAA. Check with How do I COPY or UNLOAD knowledge from Amazon Redshift to an Amazon S3 bucket in one other account? for extra particulars on establishing roles to unload knowledge from Amazon Redshift to Amazon S3 from Amazon MWAA.
Arrange VPC peering between the Amazon MWAA and Amazon Redshift VPCs
As a result of Amazon MWAA and Amazon Redshift are in two separate VPCs, you must arrange VPC peering between them. You have to add a path to the route tables related to the subnets for each companies. Check with Work with VPC peering connections for particulars on VPC peering.
Guarantee that CIDR vary of the Amazon MWAA VPC is allowed within the Redshift safety group and the CIDR vary of the Amazon Redshift VPC is allowed within the Amazon MWAA safety group, as proven within the following screenshot.
If any of the previous steps are configured incorrectly, you’re more likely to encounter a “Connection Timeout” error within the DAG run.
Configure the Amazon MWAA reference to Secrets and techniques Supervisor
When the Amazon MWAA pipeline is configured to make use of Secrets and techniques Supervisor, it’ll first search for connections and variables in an alternate backend (like Secrets and techniques Supervisor). If the alternate backend incorporates the wanted worth, it’s returned. In any other case, it’ll test the metadata database for the worth and return that as an alternative. For extra particulars, check with Configuring an Apache Airflow connection utilizing an AWS Secrets and techniques Supervisor secret.
Full the next steps:
- Configure a VPC endpoint to hyperlink Amazon MWAA and Secrets and techniques Supervisor (
com.amazonaws.us-east-1.secretsmanager
).
This enables Amazon MWAA to entry credentials saved in Secrets and techniques Supervisor.
- To supply Amazon MWAA with permission to entry Secrets and techniques Supervisor secret keys, add the coverage referred to as
SecretsManagerReadWrite
to the IAM function of the setting. - To create the Secrets and techniques Supervisor backend as an Apache Airflow configuration choice, go to the Airflow configuration choices, add the next key-value pairs, and save your settings.
This configures Airflow to search for connection strings and variables on the airflow/connections/*
and airflow/variables/*
paths:
- To generate an Airflow connection URI string, go to AWS CloudShell and enter right into a Python shell.
- Run the next code to generate the connection URI string:
The connection string must be generated as follows:
- Add the connection in Secrets and techniques Supervisor utilizing the next command within the AWS Command Line Interface (AWS CLI).
This may also be completed from the Secrets and techniques Supervisor console. This can be added in Secrets and techniques Supervisor as plaintext.
Use the connection airflow/connections/secrets_redshift_connection
within the DAG. When the DAG is run, it’ll search for this connection and retrieve the secrets and techniques from Secrets and techniques Supervisor. In case of RedshiftDataOperator
, move the secret_arn
as a parameter as an alternative of connection identify.
You can too add secrets and techniques utilizing the Secrets and techniques Supervisor console as key-value pairs.
- Add one other secret in Secrets and techniques Supervisor in and put it aside as
airflow/connections/redshift_conn_test
.
Create an Airflow connection by way of the metadata database
You can too create connections within the UI. On this case, the connection particulars can be saved in an Airflow metadata database. If the Amazon MWAA setting just isn’t configured to make use of the Secrets and techniques Supervisor backend, it’ll test the metadata database for the worth and return that. You possibly can create an Airflow connection utilizing the UI, AWS CLI, or API. On this part, we present create a connection utilizing the Airflow UI.
- For Connection Id, enter a reputation for the connection.
- For Connection Sort, select Amazon Redshift.
- For Host, enter the Redshift endpoint (with out port and database) for Redshift Serverless.
- For Database, enter
dev
. - For Consumer, enter your admin person identify.
- For Password, enter your password.
- For Port, use port 5439.
- For Further, set the
area
andtimeout
parameters. - Take a look at the connection, then save your settings.
Create and run a DAG
On this part, we describe create a DAG utilizing numerous parts. After you create and run the DAG, you may confirm the outcomes by querying Redshift tables and checking the goal S3 buckets.
Create a DAG
In Airflow, knowledge pipelines are outlined in Python code as DAGs. We create a DAG that consists of assorted operators, sensors, connections, duties, and guidelines:
- The DAG begins with searching for supply recordsdata within the S3 bucket
sample-inp-bucket-etl-<username>
below Account A for the present day utilizingS3KeySensor
. S3KeySensor is used to attend for one or a number of keys to be current in an S3 bucket.- For instance, our S3 bucket is partitioned as
s3://bucket/merchandise/YYYY/MM/DD/
, so our sensor ought to test for folders with the present date. We derived the present date within the DAG and handed this toS3KeySensor
, which seems for any new recordsdata within the present day folder. - We additionally set
wildcard_match
asTrue
, which allows searches onbucket_key
to be interpreted as a Unix wildcard sample. Set themode
toreschedule
in order that the sensor job frees the employee slot when the factors just isn’t met and it’s rescheduled at a later time. As a greatest apply, use this mode whenpoke_interval
is greater than 1 minute to forestall an excessive amount of load on a scheduler.
- For instance, our S3 bucket is partitioned as
- After the file is offered within the S3 bucket, the AWS Glue crawler runs utilizing
GlueCrawlerOperator
to crawl the S3 supply bucketsample-inp-bucket-etl-<username>
below Account A and updates the desk metadata below theproducts_db
database within the Information Catalog. The crawler makes use of the AWS Glue function and Information Catalog database that had been created within the earlier steps. - The DAG makes use of
GlueCrawlerSensor
to attend for the crawler to finish. - When the crawler job is full,
GlueJobOperator
is used to run the AWS Glue job. The AWS Glue script identify (together with location) and is handed to the operator together with the AWS Glue IAM function. Different parameters likeGlueVersion
,NumberofWorkers
, andWorkerType
are handed utilizing thecreate_job_kwargs
parameter. - The DAG makes use of
GlueJobSensor
to attend for the AWS Glue job to finish. When it’s full, the Redshift staging deskmerchandise
can be loaded with knowledge from the S3 file. - You possibly can hook up with Amazon Redshift from Airflow utilizing three totally different operators:
PythonOperator
.SQLExecuteQueryOperator
, which makes use of a PostgreSQL connection andredshift_default
because the default connection.RedshiftDataOperator
, which makes use of the Redshift Information API andaws_default
because the default connection.
In our DAG, we use SQLExecuteQueryOperator
and RedshiftDataOperator
to indicate use these operators. The Redshift saved procedures are run RedshiftDataOperator
. The DAG additionally runs SQL instructions in Amazon Redshift to delete the info from the staging desk utilizing SQLExecuteQueryOperator
.
As a result of we configured our Amazon MWAA setting to search for connections in Secrets and techniques Supervisor, when the DAG runs, it retrieves the Redshift connection particulars like person identify, password, host, port, and Area from Secrets and techniques Supervisor. If the connection just isn’t present in Secrets and techniques Supervisor, the values are retrieved from the default connections.
In SQLExecuteQueryOperator
, we move the connection identify that we created in Secrets and techniques Supervisor. It seems for airflow/connections/secrets_redshift_connection
and retrieves the secrets and techniques from Secrets and techniques Supervisor. If Secrets and techniques Supervisor just isn’t arrange, the connection created manually (for instance, redshift-conn-id
) could be handed.
In RedshiftDataOperator
, we move the secret_arn of the airflow/connections/redshift_conn_test
connection created in Secrets and techniques Supervisor as a parameter.
- As last job,
RedshiftToS3Operator
is used to unload knowledge from the Redshift desk to an S3 bucketsample-opt-bucket-etl
in Account B.airflow/connections/redshift_conn_test
from Secrets and techniques Supervisor is used for unloading the info. TriggerRule
is ready toALL_DONE
, which allows the subsequent step to run in any case upstream duties are full.- The dependency of duties is outlined utilizing the
chain()
operate, which permits for parallel runs of duties if wanted. In our case, we would like all duties to run in sequence.
The next is the entire DAG code. The dag_id
ought to match the DAG script identify, in any other case it gained’t be synced into the Airflow UI.
Confirm the DAG run
After you create the DAG file (exchange the variables within the DAG script) and add it to the s3://sample-airflow-instance/dags
folder, it will likely be routinely synced with the Airflow UI. All DAGs seem on the DAGs tab. Toggle the ON choice to make the DAG runnable. As a result of our DAG is ready to schedule="@as soon as"
, you must manually run the job by selecting the run icon below Actions. When the DAG is full, the standing is up to date in inexperienced, as proven within the following screenshot.
Within the Hyperlinks part, there are alternatives to view the code, graph, grid, log, and extra. Select Graph to visualise the DAG in a graph format. As proven within the following screenshot, every coloration of the node denotes a particular operator, and the colour of the node define denotes a particular standing.
Confirm the outcomes
On the Amazon Redshift console, navigate to the Question Editor v2 and choose the info within the products_f
desk. The desk must be loaded and have the identical variety of information as S3 recordsdata.
On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl
in Account B. The product_f
recordsdata must be created below the folder construction s3://sample-opt-bucket-etl/merchandise/YYYY/MM/DD/
.
Clear up
Clear up the sources created as a part of this put up to keep away from incurring ongoing prices:
- Delete the CloudFormation stacks and S3 bucket that you just created as conditions.
- Delete the VPCs and VPC peering connections, cross-account insurance policies and roles, and secrets and techniques in Secrets and techniques Supervisor.
Conclusion
With Amazon MWAA, you may construct advanced workflows utilizing Airflow and Python with out managing clusters, nodes, or another operational overhead sometimes related to deploying and scaling Airflow in manufacturing. On this put up, we confirmed how Amazon MWAA gives an automatic strategy to ingest, rework, analyze, and distribute knowledge between totally different accounts and companies inside AWS. For extra examples of different AWS operators, check with the next GitHub repository; we encourage you to study extra by making an attempt out a few of these examples.
In regards to the Authors
Radhika Jakkula is a Huge Information Prototyping Options Architect at AWS. She helps prospects construct prototypes utilizing AWS analytics companies and purpose-built databases. She is a specialist in assessing wide selection of necessities and making use of related AWS companies, large knowledge instruments, and frameworks to create a sturdy structure.
Sidhanth Muralidhar is a Principal Technical Account Supervisor at AWS. He works with giant enterprise prospects who run their workloads on AWS. He’s enthusiastic about working with prospects and serving to them architect workloads for prices, reliability, efficiency, and operational excellence at scale of their cloud journey. He has a eager curiosity in knowledge analytics as effectively.