from pyspark.sql import functions as F, types as T
dates = F.lit(False)
for day in range(1, 8):
dates |= F.col('day') == day
PARTITION_COND = (
(F.col('year') == 2020)
& (F.col('month') == 8)
& dates
)
df = (
spark.read.table('event.mediawiki_cirrussearch_request')
.where(PARTITION_COND)
# Full text only defaults to "full_text", we end up excluding some keyword usage.
# could use elasticsearch_requests.syntax instead
.where(F.expr('array_contains(elasticsearch_requests.query_type, "full_text")'))
.select('database', 'identity', 'source',
F.explode('elasticsearch_requests').alias('req'))
.where(F.col('req.query_type') == 'full_text')
.withColumn('queried_commonswiki',
F.array_contains(F.col('req.indices'), "commonswiki")
| F.array_contains(F.col('req.indices'), "commonswiki_file"))
.withColumn('is_api_search',
F.col('queried_commonswiki') & (F.col('source') == 'api'))
.withColumn('is_web_search',
F.col('queried_commonswiki') & (F.col('source') == 'web'))
)
def sum_bool(col_name):
return F.sum(F.col(col_name).cast(T.IntegerType())).alias(col_name)
df_stats_by_wiki = (
df
# Transform into per-identity booleans indicating if that identity performed
# a kind of search
.groupBy('database', 'identity')
.agg(
F.max(F.col('is_api_search')).alias('used_api_search'),
F.max(F.col('is_web_search')).alias('used_web_search')
)
# counts identities per class per wiki
.groupBy('database')
.agg(
F.sum(F.lit(1)).alias('total_identities'),
sum_bool('used_api_search'),
sum_bool('used_web_search')
)
)
pd = df_stats_by_wiki.toPandas()
pd.sort_values('used_api_search', ascending=False).iloc[:20]
pd.sort_values('used_web_search', ascending=False).iloc[:20]