Designing workflow with Airflow

I have been using Oozie for a while now and was a little dissatisfied with the tool in terms of managing the Hadoop jobs and not to mention  debugging vague errors. While I was analyzing the substitute workflow engine, the Airflow by Aribnb caught my eye. I’ll skip the introduction for now, you can read more about it here. This post highlights a its key features and demonstration of hadoop job.

Before I begin with the example, I’d like to mention the key advantages of Airflow over other tools:

  • Amazing UI for viewing job flow(DAG), run stats, logs etc.
  • You write an actual Python program instead of ugly configuration files
  • Exceptional monitoring options of batch jobs
  • Ability to query metadata and generate custom charts
  • Contributors in the developer community have mostly worked/evaluated the other similar tools, thus it brings the best of everything as the tool evolves.

Moving on to the example, lets consider we have an “Orders” table in MySQL database which is being populated with new order records with time and our job is to get the new  records loaded to Hive Target table (assume that the first full load is already done to hive table). Thus, we’ll do an incremental load via Sqoop from Mysql to HDFS and store it in avro format. Then have a Hive external table point to it and use HQL to load the target table(parquet format) from external table.  This is just the basic sequence of tasks, in a real use case we’ll have file check, record check tasks in our workflow.

Here I have used a wrapper shell script with property file for running our sqoop job, you can get the full code here. Our main workflow will be the following Python script:

#File Name: wf_incremental_load.py
from airflow import DAG
from airflow.operators import BashOperator, HiveOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'udaysharma',
    'start_date': datetime(2016, 1, 14),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('incremental_load', default_args=default_args)

sqoop_job = """
 exec ./scripts/sqoop_incremental.sh
"""
# Importing the data from Mysql table to HDFS
task1 = BashOperator(
        task_id= 'sqoop_import',
        bash_command=sqoop_job,
        dag=dag
)

# Inserting the data from Hive external table to the target table
task2 = HiveOperator(
        task_id= 'hive_insert',
        hql='INSERT INTO TABLE orders_trans SELECT order_id, first_name,last_name, item_code, order_date FROM orders_stg;',
        depends_on_past=True,
        dag=dag
)
# defining the job dependency
task2.set_upstream(task1)

On submitting the workflow, we can view its DAG, scheduled instances,run-time for each task, code and logs for each task.

output_ZGY1mp

To stay tuned about about Airflow queries/issues  join the user community on their Google group

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s