In [243]:
from pyspark.sql import functions as F, types as T, Window, Row
import datetime
import calendar

row_timestamp = F.unix_timestamp(F.concat(
    F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day'), 
    F.lit(' '), F.col('hour'), F.lit(':00:00')))
test_name = "T266027_perfield_builder"
subtests = ['T266027_perfield_builder:perfield', 'T266027_perfield_builder:control', 'mismatch', 'invalid']
date_start = datetime.datetime(2020, 12, 9, 0)
date_end = datetime.datetime(2020, 12, 16, 0)

ts_start = calendar.timegm(date_start.timetuple())
ts_end = calendar.timegm(date_end.timetuple())

wikis = [ 'bowiki', 'dzwiki', 'ganwiki', 'jawiki', 'kmwiki', 'lowiki', 'mywiki', \
    'thwiki', 'wuuwiki', 'zhwiki', 'zh_classicalwiki', 'zh_yuewiki', \
    'bugwiki', 'cdowiki', 'crwiki', 'hakwiki', 'jvwiki', \
    'zh_min_nanwiki' ]


satis = (spark.read.table('event.searchsatisfaction')
         .where(row_timestamp >= ts_start)
         .where(row_timestamp <= ts_end)
         .where(F.col('event.subtest').isin(subtests))
         .filter(F.col('wiki').isin([F.lit(x) for x in wikis]))
         .cache())
In [250]:
# total number of events
number_of_events_per_wiki = satis.groupBy('wiki').count().orderBy('wiki').toPandas().set_index('wiki')
number_of_events_per_wiki.append(number_of_events_per_wiki.sum().rename("total"))
Out[250]:
count
wiki
bowiki 561
bugwiki 31
cdowiki 157
crwiki 59
dzwiki 17
ganwiki 107
hakwiki 92
jawiki 3507495
jvwiki 1851
kmwiki 9652
lowiki 504
mywiki 37074
thwiki 136768
wuuwiki 1079
zh_classicalwiki 1107
zh_min_nanwiki 605
zh_yuewiki 8983
zhwiki 2340542
total 6046684
In [251]:
# number of event per group and type of source/action
(satis
 .groupBy('wiki', 'event.subtest', 'event.source', 'event.action')
 .count()
 .orderBy('wiki', 'event.subtest', 'source', 'action')
 .show(10, False)
)
+------+---------------------------------+------------+----------------+-----+
|wiki  |subtest                          |source      |action          |count|
+------+---------------------------------+------------+----------------+-----+
|bowiki|T266027_perfield_builder:control |autocomplete|checkin         |1    |
|bowiki|T266027_perfield_builder:control |autocomplete|click           |20   |
|bowiki|T266027_perfield_builder:control |autocomplete|searchResultPage|100  |
|bowiki|T266027_perfield_builder:control |autocomplete|visitPage       |5    |
|bowiki|T266027_perfield_builder:control |fulltext    |checkin         |15   |
|bowiki|T266027_perfield_builder:control |fulltext    |click           |8    |
|bowiki|T266027_perfield_builder:control |fulltext    |searchResultPage|20   |
|bowiki|T266027_perfield_builder:control |fulltext    |visitPage       |8    |
|bowiki|T266027_perfield_builder:perfield|autocomplete|checkin         |2    |
|bowiki|T266027_perfield_builder:perfield|autocomplete|click           |28   |
+------+---------------------------------+------------+----------------+-----+
only showing top 10 rows

In [252]:
# Extract the session ids that have at least one event in the 'mismatch' or 'invalid' subtest
invalid_sessions = (satis
                    .where(F.col('event.subTest').isin([F.lit(t) for t in ['mismatch', 'invalid']]))
                    .select(F.col('wiki').alias('inva_wiki'), F.col('event.searchSessionId').alias('inva_searchSessionId'))
                    .distinct())
In [253]:
# Aggregate per session
session_stats = (satis.join(invalid_sessions,
                          (satis.wiki == invalid_sessions.inva_wiki) & (F.col("event.searchSessionId") == F.col("inva_searchSessionId")),
                          how="full")
 .withColumn('source_action', F.concat(F.col('event.source'), F.lit('_'), F.col('event.action')))
 .withColumn('invalid_session', F.col('inva_searchSessionId').isNotNull())
 .where(
     F.col('event.subtest').isin(
      [F.lit('T266027_perfield_builder:control'),
       F.lit('T266027_perfield_builder:perfield')]))
 .groupBy('event.searchSessionId', 'wiki', 'event.subtest', 'invalid_session')
 .pivot('source_action')
 .count()
 .fillna(0)
 .withColumn('subtest', F.col('subtest').substr(len('T266027_perfield_builder:') + 1, 100))
 .cache())
In [254]:
# Invalid sessions per wiki
(session_stats
 .groupBy('wiki')
 .pivot('invalid_session')
 .count()
 .fillna(0)
 .withColumn('total_sessions', F.col('false') + F.col('true'))
 .withColumn('invalid_sessions', F.col('true'))
 .withColumn('pct_invalid', F.round((F.col('invalid_sessions') / F.col('total_sessions')) * F.lit(100), 2))
 .orderBy('wiki')
 .drop('true', 'false')
 .toPandas()
 .set_index('wiki')
)
Out[254]:
total_sessions invalid_sessions pct_invalid
wiki
bowiki 36 2 5.56
bugwiki 2 0 0.00
cdowiki 10 2 20.00
crwiki 9 0 0.00
dzwiki 3 0 0.00
ganwiki 20 0 0.00
hakwiki 21 0 0.00
jawiki 601480 4887 0.81
jvwiki 128 2 1.56
kmwiki 487 23 4.72
lowiki 27 1 3.70
mywiki 1746 43 2.46
thwiki 10415 281 2.70
wuuwiki 156 1 0.64
zh_classicalwiki 131 3 2.29
zh_min_nanwiki 80 0 0.00
zh_yuewiki 1356 26 1.92
zhwiki 355668 4447 1.25
In [255]:
# Number of successfull fulltext search sessions
# Successfull is at least one search and at least one click
# Abandonned is at least one search and at no click

(session_stats
 .filter(F.col('invalid_session') == F.lit(False))
 .groupBy('wiki', 'subtest')
 .agg(
     F.count(F.when((F.col('fulltext_searchResultPage') > 0) & (F.col('fulltext_click') == 0), True)).alias('abandonned'),
     F.count(F.when((F.col('fulltext_searchResultPage') > 0) & (F.col('fulltext_click') > 0), True)).alias('successfull')
 )
 .withColumn('pct_successull', F.round(F.lit(100) * F.col('successfull') / (F.col('successfull') + F.col('abandonned')), 2))
 .groupBy('wiki')
 .pivot('subtest')
 .agg(F.sum('pct_successull').alias('pct_successfull'),
      F.sum(F.col('successfull') + F.col('abandonned')).alias('num_sessions'))
 .withColumn('best',
             F.when((F.col('control_pct_successfull').isNull()) | (F.col('perfield_pct_successfull').isNull()), F.lit('unknown'))
              .when(F.col('control_pct_successfull') > F.col('perfield_pct_successfull'), F.lit('control'))
              .otherwise(F.lit('perfield')))
 .withColumn('delta',
             F.when((F.col('control_pct_successfull').isNull()) | (F.col('perfield_pct_successfull').isNull()), F.lit('?'))
              .otherwise(F.round(F.col('perfield_pct_successfull') - F.col('control_pct_successfull'), 2)))

 .orderBy('wiki')
 .toPandas()
 .set_index('wiki')
)
Out[255]:
control_pct_successfull control_num_sessions perfield_pct_successfull perfield_num_sessions best delta
wiki
bowiki 45.45 11.0 22.22 9 control -23.23
bugwiki NaN NaN 0.00 1 unknown ?
cdowiki 100.00 1.0 100.00 3 perfield 0.0
crwiki 0.00 2.0 50.00 2 perfield 50.0
dzwiki NaN NaN 33.33 3 unknown ?
ganwiki 0.00 5.0 22.22 9 perfield 22.22
hakwiki 16.67 6.0 50.00 2 perfield 33.33
jawiki 58.71 81397.0 60.95 81840 perfield 2.24
jvwiki 13.33 30.0 31.82 22 perfield 18.49
kmwiki 17.01 147.0 26.72 131 perfield 9.71
lowiki 33.33 9.0 33.33 9 perfield 0.0
mywiki 32.77 534.0 34.21 535 perfield 1.44
thwiki 29.57 2252.0 31.07 2208 perfield 1.5
wuuwiki 22.45 49.0 22.00 50 control -0.45
zh_classicalwiki 35.71 42.0 43.90 41 perfield 8.19
zh_min_nanwiki 23.08 13.0 23.81 21 perfield 0.73
zh_yuewiki 27.56 381.0 25.07 355 control -2.49
zhwiki 47.52 60573.0 48.93 60806 perfield 1.41
In [256]:
# Calculate ZRR per subTest
# notel: invalid sessions are not removed
from pyspark.sql.types import IntegerType

(satis
 .where(F.col('event.subtest').isin([F.lit('T266027_perfield_builder:control'), F.lit('T266027_perfield_builder:perfield')]))
 .where((F.col('event.source') == F.lit('fulltext')) & (F.col('event.action') == F.lit('searchResultPage')))
 .withColumn('subtest', F.col('event.subTest').substr(len('T266027_perfield_builder:') + 1, 100))
 .groupBy('wiki', 'subtest')
 .agg(
     F.count(F.when(F.coalesce(F.col('event.hitsReturned'), F.lit(0)) <= F.lit(0), True)).alias('zero_results'),
     F.count(F.lit(1)).alias('total'))
 .withColumn('zrr', F.round(F.lit(100) * F.col('zero_results')/F.col('total'), 2))
 .groupBy('wiki')
 .pivot('subtest')
 .agg(
     F.sum(F.col('zrr')).alias('zrr'),
     F.sum(F.col('total')).cast(IntegerType()).alias('total_searches')
 )
 .withColumn('delta', F.round(F.col('control_zrr') - F.col('perfield_zrr'), 2))
 .orderBy('wiki')
 .toPandas()
 .set_index('wiki'))
Out[256]:
control_zrr control_total_searches perfield_zrr perfield_total_searches delta
wiki
bowiki 20.00 20.0 25.00 20 -5.00
bugwiki NaN NaN 100.00 1 NaN
cdowiki 16.67 6.0 25.00 16 -8.33
crwiki 100.00 2.0 75.00 4 25.00
dzwiki NaN NaN 66.67 3 NaN
ganwiki 77.78 9.0 73.33 15 4.45
hakwiki 25.00 8.0 28.57 7 -3.57
jawiki 4.15 154762.0 3.98 153098 0.17
jvwiki 24.36 78.0 26.03 73 -1.67
kmwiki 16.33 392.0 16.32 337 0.01
lowiki 9.38 32.0 4.55 22 4.83
mywiki 20.06 1565.0 18.62 1364 1.44
thwiki 10.34 4925.0 10.19 4801 0.15
wuuwiki 16.82 107.0 21.37 131 -4.55
zh_classicalwiki 30.36 112.0 27.27 77 3.09
zh_min_nanwiki 16.67 30.0 32.35 34 -15.68
zh_yuewiki 18.39 658.0 23.35 668 -4.96
zhwiki 5.62 115782.0 5.71 116005 -0.09
In [ ]: