Guide to productionizing a Hive query

Data Engineering & Product Analytics

This is meant to be a guide to setting up an ETL job (a data pipeline) based on a Hive query. The queries to be executed are in repos/product-analytics/data-pipeline. The jobs executing those queries are in repos/data-engineering/airflow-dags. Deployment instructions are documented on Wikitech.

The work to enable Jupyter notebook-based pipelines is in-progress.

Some of the text in this guide comes from the Airflow developer guide on Wikitech, available under the Creative Commons Attribution-ShareAlike License like the rest of the text in this guide.

Table of Contents

  1. Query
  2. Data Dependencies
  3. Job Specification
    1. Preamble
    2. Properties
    3. DAG
  4. Unit Test

Query

This Hive query is a simplified version of generate_wikipedia_preview_stats_daily.hql.


--
-- Populates the wikipedia_preview_stats table daily,
-- filtering webrequest data and aggregating it into interesting dimensions.
--
-- Parameters:
--     source_table         -- Fully qualified table name of the source webrequest data.
--     destination_table    -- Fully qualified table name for the wikipedia preview stats.
--     year                 -- Year of the partition to process.
--     month                -- Month of the partition to process.
--     day                  -- Day of the partition to process.
--
-- Usage:
--     spark3-sql -f generate_wikipedia_preview_stats_daily.hql \
--         -d source_table=wmf.webrequest \
--         -d destination_table=wmf_product.wikipedia_preview_stats \
--         -d year=2023 \
--         -d month=5 \
--         -d day=21
--

INSERT OVERWRITE TABLE ${destination_table}
    PARTITION(year=${year}, month=${month}, day=${day})
SELECT /*+ COALESCE(1) */
    SUM(CAST(is_pageview AS INT)) AS pageviews
FROM
    ${source_table}
WHERE
    x_analytics_map['wprov'] REGEXP '^wppw(\\d+)(t?)$'
    AND agent_type = 'user'
    AND webrequest_source = 'text'
    AND year = ${year}
    AND month = ${month}
    AND day = ${day}
;
          

Table-populating queries like that should always be accompanied by table-creating queries — for example: create_wikipedia_preview_stats_table.hql.

Data Dependencies⇑ Top

The file datasets.yaml specifies all the source tables used by the queries to populate destination tables.


hive_wmf_webrequest_text:
  datastore: hive
  table_name wmf.webrequest
  partitioning: "@hourly"
  pre_partitions: ["webrequest_source=text"]
          

For more examples refer to Data Engineering's datasets.yaml.

Job Specification⇑ Top

The job is specified as an Airflow DAG (Directed Acyclic Graph) with Python and it consists of multiple sections. The examples below are from wikipedia_preview_stats_daily_dag.py. DAG filenames should be the same as DAG IDs, except they will have an _dag.py suffix.

Preamble⇑ Top

This where you document details about the job that will be displayed in the Airflow Web UI. You can use basic Markdown syntax.


"""
Computes wikipedia_preview_stats data daily.

* Waits for the corresponding webrequest partitions to be available.
* Populates the destination partition using a SparkSQL query file.
"""

from analytics_product.config.dag_config import alerts_email, dataset, create_easy_dag
from datetime import datetime, timedelta
from wmf_airflow_common.config.dag_properties import DagProperties
from wmf_airflow_common.operators.spark import SparkSqlOperator
          

The imports will be the same across the DAGs.

Properties⇑ Top

The DagProperties class helps to override DAG properties dynamically, altering them without changing the DAG file in version control. To achieve that, it uses Airflow Variable. Variables can be populated from Airflow's UI and also from the CLI. And they can be accessed from Airflow code (i.e. within a DAG file).


props = DagProperties(
  start_date=datetime(2023, 5, 28),
  hql_uri=(
      "https://gitlab.wikimedia.org/repos/product-analytics/data-pipelines"
      "/-/raw/4c3e9fd235ad44b50839c2d8781f896cd73c4d0b"
      "/wikipedia_preview/generate_wikipedia_preview_stats_daily.hql"
  ),
  webrequest_table="wmf.webrequest",
  wikipedia_preview_stats_table="wmf_product.wikipedia_preview_stats",
  sla=timedelta(hours=6),
  alerts_email=alerts_email
)
          

DAG⇑ Top

The create_easy_dag() (source) helper function makes it much easier and simpler to pass parameters and configure a DAG.


with create_easy_dag(
  dag_id="wikipedia_preview_stats_daily",
  doc_md=__doc__,
  start_date=props.start_date,
  schedule="@daily",
  sla=props.sla,
  email=props.alerts_email
) as dag:

sensor = dataset("hive_wmf_webrequest_text").get_sensor_for(dag)

  compute = SparkSqlOperator(
    task_id="compute_wikipedia_preview_stats",
    sql=props.hql_uri,
    query_parameters={
        "source_table" props.webrequest_table,
        "destination_table" props.wikipedia_preview_stats_table,
        "year" "{{data_interval_start.year}}",
        "month" "{{data_interval_start.month}}",
        "day" "{{data_interval_start.day}}"
      },
      driver_cores=2,
      driver_memory="4G",
      executor_cores=4,
      executor_memory="8G",
      conf={
          "spark.dynamicAllocation.maxExecutors": 16,
          "spark.yarn.executor.memoryOverhead": 2048
      }
  )

  sensor >> compute
          

It's possible to add more tasks to the DAG. For example, suppose you have an ETL that aggregates event data from instrumentation and then have a Presto query in Superset that generates a virtual dataset. An unfortunate quirk of virtual datasets in Supersets is that the results aren't shared between different charts using that data – each chart runs an identical query independent of the other charts. In some cases you may end up with many or all of your charts timing out. What you can do there is offload the computations to another ETL and build a chain. The task chain might look like sensor >> aggregate_low_level >> aggregate_high_level >> ingest_aggregates_into_druid

Unit Test⇑ Top

Unlike the other files (datasets.yaml and the DAGs) which are under the analytics_product sub-directory in repos/data-engineering/airflow-dags, the unit tests for the jobs are under tests/analytics_product. The example below is from wikipedia_preview_stats_daily_dag_test.py. The tests check that the DAG parses without errors and then checks how many tasks are defined.


import pytest

@pytest.fixture(name='dag_path')
def fixture_dagpath():
    return ['analytics_product', 'dags', 'wikipedia_preview',
        'wikipedia_preview_stats_daily_dag.py']

def test_wikipedia_preview_stats_daily_dag(dagbag):
    assert dagbag.import_errors == {}
    dag = dagbag.get_dag(dag_id='wikipedia_preview_stats_daily')
    assert dag is not None
    assert len(dag.tasks) == 2