Hacking Apache Airflow to trigger DAGs based on FileSystem events

Phani Kumar Yadavilli
2 min readMar 9, 2020

In this blog, I would like to cover the concepts of triggering the Airflow DAGs basing on the events from a filesystem or simply a file-watcher.

At the moment, Airflow does not have any Operators or Sensors which gives us the features of file-watcher. The Airflow FileSensor listens to the changes in a directory but with a poke-interval associated with it. This is more of polling a filesystem rather than event-driven.

The data pipeline which I am building needs a file watcher that triggers the DAG created in the Airflow. Therefore, I implemented a file-watcher which triggers a DAG by using the WatchDog API. Watchdog monitors the FileSystem events and TriggerDagRunOperator provided by Airflow.

TriggerDagRunOperator is used to kick-off a DAG. I created a target DAG and passed the target DAG ID to the operator.

trigger = TriggerDagRunOperator
(
task_id="test_trigger_dag_run_operator",
trigger_dag_id="dummy_operator",
conf={"message": "Hello World"},
dag=dag,
)

The WatchDog file-watcher triggers the DAG when it listens to the changes in the watch directory. As per my requirement, I am triggering the DAG when a file is created under the watch directory.

class Handler(FileSystemEventHandler):
def on_created(self, event):
if event.event_type == 'created':
print("file created")
print('Executing the dag')
trigger
def main():
observer = Observer()
event_handler = FileSystemEventHandler()
observer_path = os.getcwd()
observer.schedule(Handler(), observer_path, recursive=False)
observer.start()

Conclusion

Airflow is one of the best choices especially when it comes to integrating with a plethora of libraries and it has a lot of developer-friendly features to build ETL pipelines. I hope this helps.

You can find the complete code here.

--

--

Phani Kumar Yadavilli
Phani Kumar Yadavilli

Written by Phani Kumar Yadavilli

I am a Big Data Analytics Engineer passionate about writing good code and building highly scalable distributed systems.

Responses (1)