In [13]:
import wmfdata

# Or get a totally customizable SparkSession using get_custom_session.
spark = wmfdata.spark.get_custom_session(
    master='yarn',
    spark_config={
        "spark.driver.memory": "8g",
        "spark.executor.memory": "8g",
        "spark.executor.cores": 4,
        "spark.executor.memoryOverhead": "8g",
        "spark.sql.shuffle.partitions": 200,
        'spark.dynamicAllocation.maxExecutors': 30
    }
)

fields = ["text_length", "source_text_length", "opening_text_length",
          "defaultsort_text_length", "auxiliary_text_length", "heading_length",
          "template_length", "category_length", "outgoing_link_length",
          "external_link_length", "statement_keywords_length"]

all_fields = ["text_length", "source_text_length", "opening_text_length",
          "defaultsort_text_length", "auxiliary_text_length", "heading_length",
          "template_length", "category_length", "outgoing_link_length",
          "external_link_length", "statement_keywords_length", "all_text_length"]
PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
In [104]:
"""
page_id             	string              	                    
auxiliary_text      	array<string>       	                    
category            	array<string>       	                    
content_model       	string              	                    
coordinates         	array<struct<coord:struct<lat:double,lon:double>,country:string,dim:bigint,globe:string,name:string,primary:boolean,region:string,type:string>>	                    
create_timestamp    	string              	                    
defaultsort         	string              	                    
descriptions        	map<string,string>  	                    
external_link       	array<string>       	                    
extra_source        	string              	                    
file_bits           	bigint              	                    
file_height         	bigint              	                    
file_media_type     	string              	                    
file_mime           	string              	                    
file_resolution     	bigint              	                    
file_size           	bigint              	                    
file_text           	string              	                    
file_width          	bigint              	                    
heading             	array<string>       	                    
incoming_links      	bigint              	                    
labels              	map<string,array<string>>	                    
language            	string              	                    
local_sites_with_dupe	array<string>       	                    
namespace           	bigint              	                    
namespace_text      	string              	                    
opening_text        	string              	                    
outgoing_link       	array<string>       	                    
popularity_score    	double              	                    
redirect            	array<struct<namespace:bigint,title:string>>	                    
source_text         	string              	                    
template            	array<string>       	                    
text                	string              	                    
text_bytes          	bigint              	                    
timestamp           	string              	                    
title               	string              	                    
version             	bigint              	                    
wikibase_item       	string              	                    
weighted_tags       	array<string>       	                    
statement_keywords  	array<string>       	                    
wiki                	string              	                    
snapshot            	string              	                    
"""

from pyspark.sql.functions import col
import pyspark.sql.functions as F
import pyspark.sql.types as T


str_ar_size = F.udf(lambda ar: 0 if ar is None else sum(len(s) for s in ar), T.IntegerType())

all_data = (spark.table("ebernhardson.cirrus2hive_v3")
    .where(col("snapshot")=="20221012")
    .select(col("wiki"),
            col("version"),
            col("title"),
            F.length(F.coalesce(col("text"), F.lit(""))).alias("text_length"),
            F.length(F.coalesce(col("source_text"), F.lit(""))).alias("source_text_length"),
            F.length(F.coalesce(col("opening_text"), F.lit(""))).alias("opening_text_length"),
            F.length(F.coalesce(col("defaultsort"), F.lit(""))).alias("defaultsort_text_length"),
            str_ar_size(col("auxiliary_text")).alias("auxiliary_text_length"),
            str_ar_size(col("heading")).alias("heading_length"),
            str_ar_size(col("template")).alias("template_length"),
            str_ar_size(col("category")).alias("category_length"),
            str_ar_size(col("outgoing_link")).alias("outgoing_link_length"),
            str_ar_size(col("external_link")).alias("external_link_length"),
            str_ar_size(col("statement_keywords")).alias("statement_keywords_length")))

addExpr = F.expr("+".join(fields))

all_data = (all_data.withColumn("all_text_length", addExpr));

all_data.write.mode('overwrite').partitionBy("wiki").parquet("hdfs:///user/dcausse/cirrus_doc_size/size_stats.parquet")
                                                                                
In [105]:
all_data = spark.read.parquet("hdfs:///user/dcausse/cirrus_doc_size/size_stats.parquet")

expr = [F.expr(f'percentile({f}, 0.99999)').cast('int').alias(f"{f}_p99") for f in fields]

per_wiki_p = (all_data
              .groupBy(col("wiki"))
              .agg(
                  F.count(F.lit(1)).alias("articles"),
                  F.expr('percentile(all_text_length, 0.99999)').cast('int').alias(f"all_text_length_p99"),
                  *expr))
22/10/17 15:43:59 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
In [ ]:
per_wiki_p.coalesce(1).write.option("header", True).mode('overwrite').csv("hdfs:///user/dcausse/cirrus_doc_size/per_wiki_p99999.csv")
In [106]:
p99_row = per_wiki_p.take(10000)
                                                                                
In [107]:
import pyspark.sql.types as T

# pre-filter rows that have a field length > p99
cols = {v:i for i,v in enumerate(all_data.columns)}

all_p99s = {}
for r in p99_row:
    all_p99s[r['wiki']] = r


def p99_filter(r):
    wiki = r[cols['wiki']]
    p99s = all_p99s[wiki]
    for f in all_fields:
        if f == 'defaultsort_text_length':
            continue
        l = p99s[f"{f}_p99"]
        v = r[cols[f]]
        if l < v:
            return True
    return False

p99_data = all_data.rdd.filter(p99_filter).toDF(all_data.schema)
In [108]:
p99_data.show(10, False)
[Stage 86:>                                                         (0 + 1) / 1]
+--------+--------------------------------------------+-----------+------------------+-------------------+-----------------------+---------------------+--------------+---------------+---------------+--------------------+--------------------+-------------------------+---------------+----------+
|version |title                                       |text_length|source_text_length|opening_text_length|defaultsort_text_length|auxiliary_text_length|heading_length|template_length|category_length|outgoing_link_length|external_link_length|statement_keywords_length|all_text_length|wiki      |
+--------+--------------------------------------------+-----------+------------------+-------------------+-----------------------+---------------------+--------------+---------------+---------------+--------------------+--------------------+-------------------------+---------------+----------+
|8980946 |Форум/Общий/Архив/2011                      |101406     |143846            |18182              |0                      |391                  |2496          |558            |42             |3904                |2763                |0                        |273588         |ruwikinews|
|467433  |Архив:Самые популярные новости/2016/сентябрь|1340       |26773             |0                  |0                      |17165                |0             |209            |31             |8909                |0                   |0                        |54427          |ruwikinews|
|59993   |Шаблоны/Участники/Города и страны           |206        |1034              |206                |0                      |8852                 |147           |3005           |98             |6038                |0                   |0                        |19586          |ruwikinews|
|11085748|Potd/2021-06                                |2229       |136               |2229               |0                      |3000                 |60            |1769           |71             |6797                |177                 |0                        |16468          |ruwikinews|
|15293711|VladimirPF                                  |24471      |31301             |13366              |0                      |1624                 |448           |84             |0              |2121                |122                 |0                        |73537          |ruwikinews|
|14982639|Zelio007                                    |14171      |23391             |1777               |0                      |1554                 |1559          |71             |0              |1551                |294                 |0                        |44368          |ruwikinews|
|14995475|Иван Абатуров                               |62366      |74653             |0                  |0                      |1398                 |599           |75             |0              |1451                |1587                |0                        |142129         |ruwikinews|
|114961  |Цвет/doc                                    |878        |8428              |0                  |0                      |2881                 |0             |3439           |20             |4873                |0                   |0                        |20519          |ruwikinews|
|8728076 |Esp rus4                                    |24361      |30268             |0                  |0                      |1393                 |488           |75             |0              |1340                |585                 |0                        |58510          |ruwikinews|
|11289071|Иллюстрирование                             |40901      |58249             |8715               |0                      |2075                 |1470          |1007           |304            |1988                |788                 |0                        |115497         |ruwikinews|
+--------+--------------------------------------------+-----------+------------------+-------------------+-----------------------+---------------------+--------------+---------------+---------------+--------------------+--------------------+-------------------------+---------------+----------+
only showing top 10 rows

                                                                                
In [109]:
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql.window import Window

addExpr = F.expr("+".join(fields))

fields = ["text_length", "source_text_length", "opening_text_length",
          "defaultsort_text_length", "auxiliary_text_length", "heading_length",
          "template_length", "category_length", "outgoing_link_length",
          "external_link_length", "statement_keywords_length", "all_text_length"]

max_ds = p99_data.select(F.col("wiki"), F.col("version"), F.col("title"), *[F.col(f) for f in fields])

for f in fields:
    win = (Window
           .partitionBy("wiki")
           .orderBy(F.col(f).desc()))
    max_ds = max_ds.withColumn(f"row_{f}", F.row_number().over(win))

bool_expr = F.lit(False)

what_is_max = F.when(F.lit(False), F.lit("bug!"))

for f in fields:
    bool_expr = (bool_expr | (F.col(f"row_{f}") < 3))
    what_is_max = what_is_max.when(F.col(f"row_{f}") < 3, F.lit(f))
    

max_ds = (max_ds
          .filter(bool_expr)
          .withColumn("what_is_max", what_is_max.otherwise(F.lit("???"))))

for f in fields:
    max_ds.drop(f"row_{f}")

max_ds = max_ds.select(F.col("wiki"), F.col("what_is_max"), F.col("version"), F.col("title"), *[F.col(f) for f in fields])
In [110]:
max_ds.write.mode('overwrite').partitionBy("wiki").parquet("hdfs:///user/dcausse/cirrus_doc_size/max_per_wiki_field.parquet")
                                                                                
In [112]:
(spark.read
 .parquet('hdfs:///user/dcausse/cirrus_doc_size/max_per_wiki_field.parquet')
 .coalesce(1)
 .write
 .option("header", True)
 .mode('overwrite')
 .csv("hdfs:///user/dcausse/cirrus_doc_size/max_per_wiki_field.csv"))
                                                                                
In [ ]:
per_wiki_p.show(10, False)
In [ ]:
 
In [ ]: