import wmfdata
# Or get a totally customizable SparkSession using get_custom_session.
spark = wmfdata.spark.get_custom_session(
master='yarn',
spark_config={
"spark.driver.memory": "8g",
"spark.executor.memory": "8g",
"spark.executor.cores": 4,
"spark.executor.memoryOverhead": "8g",
"spark.sql.shuffle.partitions": 200,
'spark.dynamicAllocation.maxExecutors': 30
}
)
fields = ["text_length", "source_text_length", "opening_text_length",
"defaultsort_text_length", "auxiliary_text_length", "heading_length",
"template_length", "category_length", "outgoing_link_length",
"external_link_length", "statement_keywords_length"]
all_fields = ["text_length", "source_text_length", "opening_text_length",
"defaultsort_text_length", "auxiliary_text_length", "heading_length",
"template_length", "category_length", "outgoing_link_length",
"external_link_length", "statement_keywords_length", "all_text_length"]
PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
"""
page_id string
auxiliary_text array<string>
category array<string>
content_model string
coordinates array<struct<coord:struct<lat:double,lon:double>,country:string,dim:bigint,globe:string,name:string,primary:boolean,region:string,type:string>>
create_timestamp string
defaultsort string
descriptions map<string,string>
external_link array<string>
extra_source string
file_bits bigint
file_height bigint
file_media_type string
file_mime string
file_resolution bigint
file_size bigint
file_text string
file_width bigint
heading array<string>
incoming_links bigint
labels map<string,array<string>>
language string
local_sites_with_dupe array<string>
namespace bigint
namespace_text string
opening_text string
outgoing_link array<string>
popularity_score double
redirect array<struct<namespace:bigint,title:string>>
source_text string
template array<string>
text string
text_bytes bigint
timestamp string
title string
version bigint
wikibase_item string
weighted_tags array<string>
statement_keywords array<string>
wiki string
snapshot string
"""
from pyspark.sql.functions import col
import pyspark.sql.functions as F
import pyspark.sql.types as T
str_ar_size = F.udf(lambda ar: 0 if ar is None else sum(len(s) for s in ar), T.IntegerType())
all_data = (spark.table("ebernhardson.cirrus2hive_v3")
.where(col("snapshot")=="20221012")
.select(col("wiki"),
col("version"),
col("title"),
F.length(F.coalesce(col("text"), F.lit(""))).alias("text_length"),
F.length(F.coalesce(col("source_text"), F.lit(""))).alias("source_text_length"),
F.length(F.coalesce(col("opening_text"), F.lit(""))).alias("opening_text_length"),
F.length(F.coalesce(col("defaultsort"), F.lit(""))).alias("defaultsort_text_length"),
str_ar_size(col("auxiliary_text")).alias("auxiliary_text_length"),
str_ar_size(col("heading")).alias("heading_length"),
str_ar_size(col("template")).alias("template_length"),
str_ar_size(col("category")).alias("category_length"),
str_ar_size(col("outgoing_link")).alias("outgoing_link_length"),
str_ar_size(col("external_link")).alias("external_link_length"),
str_ar_size(col("statement_keywords")).alias("statement_keywords_length")))
addExpr = F.expr("+".join(fields))
all_data = (all_data.withColumn("all_text_length", addExpr));
all_data.write.mode('overwrite').partitionBy("wiki").parquet("hdfs:///user/dcausse/cirrus_doc_size/size_stats.parquet")
all_data = spark.read.parquet("hdfs:///user/dcausse/cirrus_doc_size/size_stats.parquet")
expr = [F.expr(f'percentile({f}, 0.99999)').cast('int').alias(f"{f}_p99") for f in fields]
per_wiki_p = (all_data
.groupBy(col("wiki"))
.agg(
F.count(F.lit(1)).alias("articles"),
F.expr('percentile(all_text_length, 0.99999)').cast('int').alias(f"all_text_length_p99"),
*expr))
22/10/17 15:43:59 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
per_wiki_p.coalesce(1).write.option("header", True).mode('overwrite').csv("hdfs:///user/dcausse/cirrus_doc_size/per_wiki_p99999.csv")
p99_row = per_wiki_p.take(10000)
import pyspark.sql.types as T
# pre-filter rows that have a field length > p99
cols = {v:i for i,v in enumerate(all_data.columns)}
all_p99s = {}
for r in p99_row:
all_p99s[r['wiki']] = r
def p99_filter(r):
wiki = r[cols['wiki']]
p99s = all_p99s[wiki]
for f in all_fields:
if f == 'defaultsort_text_length':
continue
l = p99s[f"{f}_p99"]
v = r[cols[f]]
if l < v:
return True
return False
p99_data = all_data.rdd.filter(p99_filter).toDF(all_data.schema)
p99_data.show(10, False)
[Stage 86:> (0 + 1) / 1]
+--------+--------------------------------------------+-----------+------------------+-------------------+-----------------------+---------------------+--------------+---------------+---------------+--------------------+--------------------+-------------------------+---------------+----------+ |version |title |text_length|source_text_length|opening_text_length|defaultsort_text_length|auxiliary_text_length|heading_length|template_length|category_length|outgoing_link_length|external_link_length|statement_keywords_length|all_text_length|wiki | +--------+--------------------------------------------+-----------+------------------+-------------------+-----------------------+---------------------+--------------+---------------+---------------+--------------------+--------------------+-------------------------+---------------+----------+ |8980946 |Форум/Общий/Архив/2011 |101406 |143846 |18182 |0 |391 |2496 |558 |42 |3904 |2763 |0 |273588 |ruwikinews| |467433 |Архив:Самые популярные новости/2016/сентябрь|1340 |26773 |0 |0 |17165 |0 |209 |31 |8909 |0 |0 |54427 |ruwikinews| |59993 |Шаблоны/Участники/Города и страны |206 |1034 |206 |0 |8852 |147 |3005 |98 |6038 |0 |0 |19586 |ruwikinews| |11085748|Potd/2021-06 |2229 |136 |2229 |0 |3000 |60 |1769 |71 |6797 |177 |0 |16468 |ruwikinews| |15293711|VladimirPF |24471 |31301 |13366 |0 |1624 |448 |84 |0 |2121 |122 |0 |73537 |ruwikinews| |14982639|Zelio007 |14171 |23391 |1777 |0 |1554 |1559 |71 |0 |1551 |294 |0 |44368 |ruwikinews| |14995475|Иван Абатуров |62366 |74653 |0 |0 |1398 |599 |75 |0 |1451 |1587 |0 |142129 |ruwikinews| |114961 |Цвет/doc |878 |8428 |0 |0 |2881 |0 |3439 |20 |4873 |0 |0 |20519 |ruwikinews| |8728076 |Esp rus4 |24361 |30268 |0 |0 |1393 |488 |75 |0 |1340 |585 |0 |58510 |ruwikinews| |11289071|Иллюстрирование |40901 |58249 |8715 |0 |2075 |1470 |1007 |304 |1988 |788 |0 |115497 |ruwikinews| +--------+--------------------------------------------+-----------+------------------+-------------------+-----------------------+---------------------+--------------+---------------+---------------+--------------------+--------------------+-------------------------+---------------+----------+ only showing top 10 rows
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql.window import Window
addExpr = F.expr("+".join(fields))
fields = ["text_length", "source_text_length", "opening_text_length",
"defaultsort_text_length", "auxiliary_text_length", "heading_length",
"template_length", "category_length", "outgoing_link_length",
"external_link_length", "statement_keywords_length", "all_text_length"]
max_ds = p99_data.select(F.col("wiki"), F.col("version"), F.col("title"), *[F.col(f) for f in fields])
for f in fields:
win = (Window
.partitionBy("wiki")
.orderBy(F.col(f).desc()))
max_ds = max_ds.withColumn(f"row_{f}", F.row_number().over(win))
bool_expr = F.lit(False)
what_is_max = F.when(F.lit(False), F.lit("bug!"))
for f in fields:
bool_expr = (bool_expr | (F.col(f"row_{f}") < 3))
what_is_max = what_is_max.when(F.col(f"row_{f}") < 3, F.lit(f))
max_ds = (max_ds
.filter(bool_expr)
.withColumn("what_is_max", what_is_max.otherwise(F.lit("???"))))
for f in fields:
max_ds.drop(f"row_{f}")
max_ds = max_ds.select(F.col("wiki"), F.col("what_is_max"), F.col("version"), F.col("title"), *[F.col(f) for f in fields])
max_ds.write.mode('overwrite').partitionBy("wiki").parquet("hdfs:///user/dcausse/cirrus_doc_size/max_per_wiki_field.parquet")
(spark.read
.parquet('hdfs:///user/dcausse/cirrus_doc_size/max_per_wiki_field.parquet')
.coalesce(1)
.write
.option("header", True)
.mode('overwrite')
.csv("hdfs:///user/dcausse/cirrus_doc_size/max_per_wiki_field.csv"))
per_wiki_p.show(10, False)