Airflow callbacks to Slack notifications for DAG monitoring and alerting

Alireza Sadeghi
5 min readJul 23, 2023

In this post I’ll demonstrate the step by step guide to integrate Airflow workflows with Slack for notification and monitoring purpose. The main objective is show how to encapsulate the slack notification logic and send DAG callback notifications to slack with minimal effort.

Configuring Slack App and Webhook

As the first step you need to create a Slack App and an incoming Webhook in the slack account to be used by Airflow for posting messages. There are other posts and guides explaining the process of creating a slack app and configuring an incoming Webhook such as the official slack guide, therefore I won’t go through this process.

Creating a connection on Airflow

Once we have configured the required Slack app and the incoming Webbook, we need to create a new HTTP connection on Airflow UI.

A separate connection is needed for each channel you want to use to send notifications to Slack. Following screenshot shows creating a new connection for posting DAG callback notifications to a slack channel called #airflow-callback-alerts which we need to set it as the login parameter.

The password is the tokens at the end of the Webhook url configured on Slack app as explained in the previous step:

Example webhook URL where the part in Bold needs to be copied to Password edit-box:

https://hooks.slack.com/services/Txxxxxx/Bxxxxxxxx/XXXXXXXXXXXXXXXXXXXXX

Creating a Slack Bot Wrapper class

To make the code for posting alerts to Slack reusable without code repetition and following DRY principles, we can encapsulate the calls to slack API in a wrapper class which can easily be included and used in other classes or DAG scripts.

Airflow already provides BaseHook and SlackWebhookOperator for interacting with Slack API which makes our job a lot easier but we can still add additional wrapper around these objects.

Following is a sample SlackBot() class that exposes few useful functions by importing and using the provided BaseHook and SlackWebhookOperator by Airflow. To create an instance of the SlackBot we need to pass the Slack connection ID created in the previous step, and the username which is essentially the name of the Slack App (in my example I called the app airflow).

The post_message function creates a SlackWebhookOperator and executes it immediately using the passed parameters.

from airflow.hooks.base_hook import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

class SlackBot():

def __init__(self, conn_id, username):
self.conn_id = conn_id
self.username = username
self.token = BaseHook.get_connection(self.conn_id).password

def post_message(self, context, message):
slack_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id=self.conn_id,
webhook_token=self.token,
message=message,
username=self.username)
return slack_alert.execute(context=context)

In order to create and post a standard message format a function needs to be defined that formats the message using a standard format for all alerts posted to Slack. In the message body we can include useful information required for quick debugging such as the Airflow DAG name, task ID, and also the task log URL so that we can quickly access the logs of the task by just clicking on the link in the alert. For getting such details we need to pass the context object.

We can pass and include useful information such as icon to use and the title in this example as part of kwargs parameter passed. You can include additional useful peaces of information application to your environment if needed.

def __format_message(self, context, **kwargs):
local_dt = context.get('execution_date').astimezone()
options = {
'icon' : ':large_blue_circle:',
'title': '[not provided]'
}
options.update(kwargs)
slack_msg = """
==================================================-
{icon} *{title}*
==================================================
*Task*: {task}
*Dag*: {dag}
*Execution Date:*: {execution_date}
*Log Url*: {log_url}
""".format(
icon=options['icon'],
title=options['title'],
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
execution_date=local_dt.strftime('%Y%m%d %H:00:00'),
log_url=context.get('task_instance').log_url
)
return slack_msg

To complete our SlackBot()class, we can define the following public functions for formatting and customising messages based on the criticality of the message such as alert, info or warning. for each type a custom icon is used:

  def post_alert(self, context, **kwargs):
kwargs['icon'] = ':red_circle:'
message = self.__format_message(context, **kwargs)
return self.post_message(context, message)

def post_info(self, context, **kwargs):
kwargs['icon'] = ':large_green_circle:'
message = self.__format_message(context, **kwargs)
return self.post_message(context, message)

def post_warning(self, context, **kwargs):
kwargs['icon'] = ':large_orange_circle:'
message = self.__format_message(context, **kwargs)
return self.post_message(context, message)

Creating a callback notification class

The above wrapper class around the Airflow Slack operator and hook might be good enough but we can add an additional helper class with static functions to be assigned to Airflow callbacks such as the failure or retry callback.

At the top we need to import the SlackBot class from the python file created in the previous step. Ensure that the name of the slack connection in the code snippet set by SLACK_CONN_ID constant variable, is set to the name of your own slack connection created in the initial step. USERNAME in this case is the name of the Slack app created in the first step.

from slack_bot import SlackBot

class CallbackNotifier:

SLACK_CONN_ID='slack_report'
USERNAME = 'airflow'

@staticmethod
def on_failure_callback(context):
bot = SlackBot(CallbackNotifier.SLACK_CONN_ID,CallbackNotifier.USERNAME)
bot.post_alert(context,title="Failed Task Alert")

@staticmethod
def on_retry_callback(context):
bot = SlackBot(CallbackNotifier.SLACK_CONN_ID,CallbackNotifier.USERNAME)
bot.post_warning(context,title="Retry Task Alert")

By creating this additional class all we need to do is to add the name of the callback method in the Airflow DAG script to minimise the amount of coding required for posting callback messages to Slack.

Using the callback notification class in DAG scripts

With the above two classes now we can easily post alerts to Slack by assigning the on_retry_callback and on_failure_callback functions to the Airflow’s task’s callback functions with minimal code.

By binding this functions to the callbacks in the default_args list we will receive notifications for all tasks in the DAG in case any of them is scheduled for retry or fails.

from callback_notifier import CallbackNotifier

default_args = {
'start_date': datetime(2023, 1, 1, 0, 0, 0, tzinfo=local_tz),
'on_retry_callback': CallbackNotifier().on_retry_callback,
'on_failure_callback': CallbackNotifier().on_failure_callback,
}
....

Alternatively we can include the callbacks only for specific operators:

dag_success_check = DummyOperator(
task_id="dag_success_check",
retries=0,
dag=dag,
on_failure_callback=CallbackNotifier().on_failure_callback
)

Following screenshot shows a sample alerts received when a task has failed. As seen in the screenshot I have included some additional specific custom parameters such as environment, pipeline and processdate in the alert message.

--

--

Alireza Sadeghi

Senior Software and Data Engineer [big data, distributed storage ,distributed processing, data pipelines, infraustructure, cluster management, workflow orch]