Query
This Hive query is a simplified version of generate_wikipedia_preview_stats_daily.hql.
--
-- Populates the wikipedia_preview_stats table daily,
-- filtering webrequest data and aggregating it into interesting dimensions.
--
-- Parameters:
-- source_table -- Fully qualified table name of the source webrequest data.
-- destination_table -- Fully qualified table name for the wikipedia preview stats.
-- year -- Year of the partition to process.
-- month -- Month of the partition to process.
-- day -- Day of the partition to process.
--
-- Usage:
-- spark3-sql -f generate_wikipedia_preview_stats_daily.hql \
-- -d source_table=wmf.webrequest \
-- -d destination_table=wmf_product.wikipedia_preview_stats \
-- -d year=2023 \
-- -d month=5 \
-- -d day=21
--
INSERT OVERWRITE TABLE ${destination_table}
PARTITION(year=${year}, month=${month}, day=${day})
SELECT /*+ COALESCE(1) */
SUM(CAST(is_pageview AS INT)) AS pageviews
FROM
${source_table}
WHERE
x_analytics_map['wprov'] REGEXP '^wppw(\\d+)(t?)$'
AND agent_type = 'user'
AND webrequest_source = 'text'
AND year = ${year}
AND month = ${month}
AND day = ${day}
;
Table-populating queries like that should always be accompanied by table-creating queries — for example: create_wikipedia_preview_stats_table.hql.
source_table
and destination_table
.
Data Dependencies⇑ Top
The file datasets.yaml
specifies all the source tables used by the queries to populate destination tables.
hive_wmf_webrequest_text:
datastore: hive
table_name wmf.webrequest
partitioning: "@hourly"
pre_partitions: ["webrequest_source=text"]
For more examples refer to Data Engineering's datasets.yaml.
druid
, hive
, or hive_snapshot
(for tables with "YYYY-MM"-formatted partitions). For example:
hive_wmf_mediawiki_history: datastore: hive_snapshot table_name: wmf.mediawiki_history partitioning: "@monthly"
@daily
instead of 0 0 * * *
). For weekly DAGs, note the preset @weekly
starts the DAGs on Sunday. If you want to start your DAG on Monday, use 0 0 * * 1
. If possible, all your weekly DAGs should start on the same day (either Sunday or Monday).
Job Specification⇑ Top
The job is specified as an Airflow DAG
(Directed Acyclic Graph) with Python and it consists of multiple sections.
The examples below are from wikipedia_preview_stats_daily_dag.py
. DAG filenames should be the same as DAG IDs, except they will have an _dag.py
suffix.
Preamble⇑ Top
This where you document details about the job that will be displayed in the Airflow Web UI. You can use basic Markdown syntax.
"""
Computes wikipedia_preview_stats data daily.
* Waits for the corresponding webrequest partitions to be available.
* Populates the destination partition using a SparkSQL query file.
"""
from analytics_product.config.dag_config import alerts_email, dataset, create_easy_dag
from datetime import datetime, timedelta
from wmf_airflow_common.config.dag_properties import DagProperties
from wmf_airflow_common.operators.spark import SparkSqlOperator
The imports will be the same across the DAGs.
wmf_airflow_common
modules
here.
Properties⇑ Top
The DagProperties
class helps to override DAG properties dynamically, altering them without changing the DAG file in version control. To achieve that, it uses Airflow Variable. Variables can be populated from Airflow's UI and also from the CLI. And they can be accessed from Airflow code (i.e. within a DAG file).
props = DagProperties(
start_date=datetime(2023, 5, 28),
hql_uri=(
"https://gitlab.wikimedia.org/repos/product-analytics/data-pipelines"
"/-/raw/4c3e9fd235ad44b50839c2d8781f896cd73c4d0b"
"/wikipedia_preview/generate_wikipedia_preview_stats_daily.hql"
),
webrequest_table="wmf.webrequest",
wikipedia_preview_stats_table="wmf_product.wikipedia_preview_stats",
sla=timedelta(hours=6),
alerts_email=alerts_email
)
dag_config.py
.
DAG⇑ Top
The create_easy_dag()
(source)
helper function makes it much easier and simpler to pass parameters and configure a DAG.
with create_easy_dag(
dag_id="wikipedia_preview_stats_daily",
doc_md=__doc__,
start_date=props.start_date,
schedule="@daily",
sla=props.sla,
email=props.alerts_email
) as dag:
sensor = dataset("hive_wmf_webrequest_text").get_sensor_for(dag)
compute = SparkSqlOperator(
task_id="compute_wikipedia_preview_stats",
sql=props.hql_uri,
query_parameters={
"source_table" props.webrequest_table,
"destination_table" props.wikipedia_preview_stats_table,
"year" "{{data_interval_start.year}}",
"month" "{{data_interval_start.month}}",
"day" "{{data_interval_start.day}}"
},
driver_cores=2,
driver_memory="4G",
executor_cores=4,
executor_memory="8G",
conf={
"spark.dynamicAllocation.maxExecutors": 16,
"spark.yarn.executor.memoryOverhead": 2048
}
)
sensor >> compute
It's possible to add more tasks to the DAG.
For example, suppose you have an ETL that aggregates event data from instrumentation and then have a Presto query in Superset that generates a virtual dataset.
An unfortunate quirk of virtual datasets in Supersets is that the results aren't shared between different charts using that data – each chart runs an identical query independent of the other charts.
In some cases you may end up with many or all of your charts timing out. What you can do there is offload the computations to another ETL and build a chain.
The task chain might look like sensor >> aggregate_low_level >> aggregate_high_level >> ingest_aggregates_into_druid
The DAG identifier needs to use snake_case
and needs to be unique across the entire Airflow instance.
Some ideas of what a DAG id could contain: The name of the resulting dataset, the granularity of the job (if there are several jobs with different granularities for the same dataset), and the tool that the DAG uses to process or store the resulting dataset (i.e. druid or anomaly_detection). Avoid very generic words like "data", "table" or "dag", which don't give much information.
The task identifier needs to use snake_case
and needs to be unique within the DAG.
Task IDs should describe actions (i.e. "process_certain_dataset"), if possible starting with a verb and following with an object. It is a good idea to be explicit about which data is the object of the action.
wmfdata.spark
module has some Spark session configurations you can reference (yarn-regular
& yarn-large
).
sensor
confirms the presence of its target, the subsequent tasks (compute
) in the DAG aren't triggered.
Unit Test⇑ Top
Unlike the other files (datasets.yaml and the DAGs) which are under the
analytics_product
sub-directory in
repos/data-engineering/airflow-dags
,
the unit tests for the jobs are under
tests/analytics_product
.
The example below is from
wikipedia_preview_stats_daily_dag_test.py
. The tests check that the DAG parses without errors and then checks how many tasks are defined.
import pytest
@pytest.fixture(name='dag_path')
def fixture_dagpath():
return ['analytics_product', 'dags', 'wikipedia_preview',
'wikipedia_preview_stats_daily_dag.py']
def test_wikipedia_preview_stats_daily_dag(dagbag):
assert dagbag.import_errors == {}
dag = dagbag.get_dag(dag_id='wikipedia_preview_stats_daily')
assert dag is not None
assert len(dag.tasks) == 2
sensor
and compute
.