from pyspark.sql import functions as F, types as T, Window, Row
import datetime
import calendar
row_timestamp = F.unix_timestamp(F.concat(
F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day'),
F.lit(' '), F.col('hour'), F.lit(':00:00')))
test_name = "T266027_perfield_builder"
subtests = ['T266027_perfield_builder:perfield', 'T266027_perfield_builder:control', 'mismatch', 'invalid']
date_start = datetime.datetime(2020, 12, 9, 0)
date_end = datetime.datetime(2020, 12, 16, 0)
ts_start = calendar.timegm(date_start.timetuple())
ts_end = calendar.timegm(date_end.timetuple())
wikis = [ 'bowiki', 'dzwiki', 'ganwiki', 'jawiki', 'kmwiki', 'lowiki', 'mywiki', \
'thwiki', 'wuuwiki', 'zhwiki', 'zh_classicalwiki', 'zh_yuewiki', \
'bugwiki', 'cdowiki', 'crwiki', 'hakwiki', 'jvwiki', \
'zh_min_nanwiki' ]
satis = (spark.read.table('event.searchsatisfaction')
.where(row_timestamp >= ts_start)
.where(row_timestamp <= ts_end)
.where(F.col('event.subtest').isin(subtests))
.filter(F.col('wiki').isin([F.lit(x) for x in wikis]))
.cache())
# total number of events
number_of_events_per_wiki = satis.groupBy('wiki').count().orderBy('wiki').toPandas().set_index('wiki')
number_of_events_per_wiki.append(number_of_events_per_wiki.sum().rename("total"))
# number of event per group and type of source/action
(satis
.groupBy('wiki', 'event.subtest', 'event.source', 'event.action')
.count()
.orderBy('wiki', 'event.subtest', 'source', 'action')
.show(10, False)
)
# Extract the session ids that have at least one event in the 'mismatch' or 'invalid' subtest
invalid_sessions = (satis
.where(F.col('event.subTest').isin([F.lit(t) for t in ['mismatch', 'invalid']]))
.select(F.col('wiki').alias('inva_wiki'), F.col('event.searchSessionId').alias('inva_searchSessionId'))
.distinct())
# Aggregate per session
session_stats = (satis.join(invalid_sessions,
(satis.wiki == invalid_sessions.inva_wiki) & (F.col("event.searchSessionId") == F.col("inva_searchSessionId")),
how="full")
.withColumn('source_action', F.concat(F.col('event.source'), F.lit('_'), F.col('event.action')))
.withColumn('invalid_session', F.col('inva_searchSessionId').isNotNull())
.where(
F.col('event.subtest').isin(
[F.lit('T266027_perfield_builder:control'),
F.lit('T266027_perfield_builder:perfield')]))
.groupBy('event.searchSessionId', 'wiki', 'event.subtest', 'invalid_session')
.pivot('source_action')
.count()
.fillna(0)
.withColumn('subtest', F.col('subtest').substr(len('T266027_perfield_builder:') + 1, 100))
.cache())
# Invalid sessions per wiki
(session_stats
.groupBy('wiki')
.pivot('invalid_session')
.count()
.fillna(0)
.withColumn('total_sessions', F.col('false') + F.col('true'))
.withColumn('invalid_sessions', F.col('true'))
.withColumn('pct_invalid', F.round((F.col('invalid_sessions') / F.col('total_sessions')) * F.lit(100), 2))
.orderBy('wiki')
.drop('true', 'false')
.toPandas()
.set_index('wiki')
)
# Number of successfull fulltext search sessions
# Successfull is at least one search and at least one click
# Abandonned is at least one search and at no click
(session_stats
.filter(F.col('invalid_session') == F.lit(False))
.groupBy('wiki', 'subtest')
.agg(
F.count(F.when((F.col('fulltext_searchResultPage') > 0) & (F.col('fulltext_click') == 0), True)).alias('abandonned'),
F.count(F.when((F.col('fulltext_searchResultPage') > 0) & (F.col('fulltext_click') > 0), True)).alias('successfull')
)
.withColumn('pct_successull', F.round(F.lit(100) * F.col('successfull') / (F.col('successfull') + F.col('abandonned')), 2))
.groupBy('wiki')
.pivot('subtest')
.agg(F.sum('pct_successull').alias('pct_successfull'),
F.sum(F.col('successfull') + F.col('abandonned')).alias('num_sessions'))
.withColumn('best',
F.when((F.col('control_pct_successfull').isNull()) | (F.col('perfield_pct_successfull').isNull()), F.lit('unknown'))
.when(F.col('control_pct_successfull') > F.col('perfield_pct_successfull'), F.lit('control'))
.otherwise(F.lit('perfield')))
.withColumn('delta',
F.when((F.col('control_pct_successfull').isNull()) | (F.col('perfield_pct_successfull').isNull()), F.lit('?'))
.otherwise(F.round(F.col('perfield_pct_successfull') - F.col('control_pct_successfull'), 2)))
.orderBy('wiki')
.toPandas()
.set_index('wiki')
)
# Calculate ZRR per subTest
# notel: invalid sessions are not removed
from pyspark.sql.types import IntegerType
(satis
.where(F.col('event.subtest').isin([F.lit('T266027_perfield_builder:control'), F.lit('T266027_perfield_builder:perfield')]))
.where((F.col('event.source') == F.lit('fulltext')) & (F.col('event.action') == F.lit('searchResultPage')))
.withColumn('subtest', F.col('event.subTest').substr(len('T266027_perfield_builder:') + 1, 100))
.groupBy('wiki', 'subtest')
.agg(
F.count(F.when(F.coalesce(F.col('event.hitsReturned'), F.lit(0)) <= F.lit(0), True)).alias('zero_results'),
F.count(F.lit(1)).alias('total'))
.withColumn('zrr', F.round(F.lit(100) * F.col('zero_results')/F.col('total'), 2))
.groupBy('wiki')
.pivot('subtest')
.agg(
F.sum(F.col('zrr')).alias('zrr'),
F.sum(F.col('total')).cast(IntegerType()).alias('total_searches')
)
.withColumn('delta', F.round(F.col('control_zrr') - F.col('perfield_zrr'), 2))
.orderBy('wiki')
.toPandas()
.set_index('wiki'))