A Python package that helps Data and Analytics engineers trigger run on demand job items of Microsoft Fabric in Apache Airflow DAGs.
Microsoft Fabric is an end-to-end analytics and data platform designed for enterprises that require a unified solution. It encompasses data movement, processing, ingestion, transformation, real-time event routing, and report building. It offers a comprehensive suite of services including Data Engineering, Data Factory, Data Science, Real-Time Analytics, Data Warehouse, and Databases.
Pypi package: https://pypi.org/project/apache-airflow-microsoft-fabric-plugin/
pip install apache-airflow-microsoft-fabric-pluginBefore diving in,
- The plugin supports the authentication using user tokens. Tenant level admin account must enable the setting Allow user consent for apps. Refer to: Configure user consent
- Create a Microsoft Entra Id app if you don’t have one.
- You must have Refresh token.
Since custom connection forms aren't feasible in Apache Airflow plugins, use can use Generic connection type. Here's what you need to store:
Connection Id: Name of the connection IdConnection Type: GenericLogin: The Client ID of your service principal.Password: The refresh token fetched using Microsoft OAuth.Extra: { "tenantId": "The Tenant Id of your service principal", "clientSecret": "(optional) The Client Secret for your Entra ID App" "scopes": "(optional) Scopes you used to fetch the refresh token" }
NOTE: Default scopes applied are: https://api.fabric.microsoft.com/Item.Execute.All, https://api.fabric.microsoft.com/Item.ReadWrite.All, offline_access, openid, profile
This operator composes the logic for this plugin. It triggers the Fabric item run and pushes the details in Xcom. It can accept the following parameters:
workspace_id: The workspace Id.item_id: The Item Id. i.e Notebook and Pipeline.fabric_conn_id: Connection Id for Fabric.job_type: "RunNotebook" or "Pipeline".wait_for_termination: (Default value: True) Wait until the run item.timeout: int (Default value: 60 * 60 * 24 * 7). Time in seconds to wait for the pipeline or notebook. Used only ifwait_for_terminationis True.check_interval: int (Default value: 60s). Time in seconds to wait before rechecking the refresh status.max_retries: int (Default value: 5 retries). Max number of times to poll the API for a valid response after starting a job.retry_delay: int (Default value: 1s). Polling retry delay.deferrable: Boolean. Use the operator in deferrable mode.job_params: Dict. Parameters to pass into the job.
-
Refresh token rotation is a security mechanism that involves replacing the refresh token each time it is used to obtain a new access token. This process enhances security by reducing the risk of stolen tokens being reused indefinitely.
-
The Fabric run item enriches the Xcom with essential fields for downstream tasks:
run_id: Run Id of the Fabric item.run_status: Fabric item run status.In Progress: Item run is in progress.Completed: Item run successfully completed.Failed: Item run failed.Disabled: Item run is disabled by a selective refresh.
run_location: The location of item run status.
-
The operator conveniently provides a redirect link to the Microsoft Fabric item run.
-
The operator runs in deferrable mode. The operator is deferred until the target status of the item run is achieved.
Ready to give it a spin? Check out the sample DAG code below:
from __future__ import annotations
from airflow import DAG
from apache_airflow_microsoft_fabric_plugin.operators.fabric import FabricRunItemOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
}
with DAG(
dag_id="fabric_items_dag",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
) as dag:
run_notebook = FabricRunItemOperator(
task_id="run_fabric_notebook",
workspace_id="<workspace_id>",
item_id="<item_id>",
fabric_conn_id="fabric_conn_id",
job_type="RunNotebook",
wait_for_termination=True,
deferrable=True,
)
run_notebookFeel free to tweak and tailor this DAG to suit your needs!
We welcome any contributions:
- Report all enhancements, bugs, and tasks as GitHub issues
- Provide fixes or enhancements by opening pull requests in GitHub.