Import and Config#
# nuclio: ignore
import nuclio
%nuclio config kind = "job"
%nuclio config spec.image = "iguazio/shell:3.0_b5565_20201026062233_wsdf" # docker image available on idan707/spark_shell
%nuclio: setting kind to 'job'
%nuclio: setting spec.image to 'iguazio/shell:3.0_b5565_20201026062233_wsdf'
import mlrun
from mlrun.platforms.iguazio import mount_v3io, mount_v3iod
from mlrun.datastore import DataItem
from mlrun.execution import MLClientCtx
import os
from subprocess import run
import pandas as pd
import numpy as np
from pyspark.sql.types import LongType
from pyspark.sql import SparkSession
Build Spark Describe Helper Functions#
import sys
import base64 as b64
import warnings
warnings.filterwarnings("ignore")
from itertools import product
import matplotlib
import numpy as np
import json
import pandas as pd
from matplotlib import pyplot as plt
from pkg_resources import resource_filename
import six
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import (abs as df_abs, col, count, countDistinct,
max as df_max, mean, min as df_min,
sum as df_sum, when
)
from pyspark.sql.functions import variance, stddev, kurtosis, skewness
def describe(df, bins, corr_reject, config, **kwargs):
if not isinstance(df, SparkDataFrame):
raise TypeError("df must be of type pyspark.sql.DataFrame")
# Number of rows:
table_stats = {"n": df.count()}
if table_stats["n"] == 0:
raise ValueError("df cannot be empty")
try:
matplotlib.style.use("default")
except:
pass
# Function to "pretty name" floats:
def pretty_name(x):
x *= 100
if x == int(x):
return '%.0f%%' % x
else:
return '%.1f%%' % x
def corr_matrix(df, columns=None):
if columns is None:
columns = df.columns
combinations = list(product(columns,columns))
def separate(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
grouped = list(separate(combinations,len(columns)))
df_cleaned = df.select(*columns).na.drop(how="any")
for i in grouped:
for j in enumerate(i):
i[j[0]] = i[j[0]] + (df_cleaned.corr(str(j[1][0]), str(j[1][1])),)
df_pandas = pd.DataFrame(grouped).applymap(lambda x: x[2])
df_pandas.columns = columns
df_pandas.index = columns
return df_pandas
def create_hist_data(df, column, minim, maxim, bins=10):
def create_all_conditions(current_col, column, left_edges, count=1):
"""
Recursive function that exploits the
ability to call the Spark SQL Column method
.when() in a recursive way.
"""
left_edges = left_edges[:]
if len(left_edges) == 0:
return current_col
if len(left_edges) == 1:
next_col = current_col.when(col(column) >= float(left_edges[0]), count)
left_edges.pop(0)
return create_all_conditions(next_col, column, left_edges[:], count+1)
next_col = current_col.when((float(left_edges[0]) <= col(column))
& (col(column) < float(left_edges[1])), count)
left_edges.pop(0)
return create_all_conditions(next_col, column, left_edges[:], count+1)
num_range = maxim - minim
bin_width = num_range / float(bins)
left_edges = [minim]
for _bin in range(bins):
left_edges = left_edges + [left_edges[-1] + bin_width]
left_edges.pop()
expression_col = when((float(left_edges[0]) <= col(column))
& (col(column) < float(left_edges[1])), 0)
left_edges_copy = left_edges[:]
left_edges_copy.pop(0)
bin_data = (df.select(col(column))
.na.drop()
.select(col(column),
create_all_conditions(expression_col,
column,
left_edges_copy
).alias("bin_id")
)
.groupBy("bin_id").count()
).toPandas()
bin_data.index = bin_data["bin_id"]
new_index = list(range(bins))
bin_data = bin_data.reindex(new_index)
bin_data["bin_id"] = bin_data.index
bin_data = bin_data.fillna(0)
bin_data["left_edge"] = left_edges
bin_data["width"] = bin_width
return bin_data
def describe_integer_1d(df, column, current_result, nrows):
stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
df_min(col(column)).alias("min"),
df_max(col(column)).alias("max"),
variance(col(column)).alias("variance"),
kurtosis(col(column)).alias("kurtosis"),
stddev(col(column)).alias("std"),
skewness(col(column)).alias("skewness"),
df_sum(col(column)).alias("sum")
).toPandas()
for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
stats_df[pretty_name(x)] = (df.select(column)
.na.drop()
.selectExpr("percentile(`{col}`,CAST({n} AS DOUBLE))"
.format(col=column, n=x)).toPandas().iloc[:,0]
)
stats = stats_df.iloc[0].copy()
stats.name = column
stats["range"] = stats["max"] - stats["min"]
stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
stats["cv"] = stats["std"] / float(stats["mean"])
stats["mad"] = (df.select(column)
.na.drop()
.select(df_abs(col(column)-stats["mean"]).alias("delta"))
.agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
stats["type"] = "NUM"
stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
stats['p_zeros'] = stats['n_zeros'] / float(nrows)
hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)
return stats
def describe_float_1d(df, column, current_result, nrows):
stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
df_min(col(column)).alias("min"),
df_max(col(column)).alias("max"),
variance(col(column)).alias("variance"),
kurtosis(col(column)).alias("kurtosis"),
stddev(col(column)).alias("std"),
skewness(col(column)).alias("skewness"),
df_sum(col(column)).alias("sum")
).toPandas()
for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
stats_df[pretty_name(x)] = (df.select(column)
.na.drop()
.selectExpr("percentile_approx(`{col}`,CAST({n} AS DOUBLE))"
.format(col=column, n=x)).toPandas().iloc[:,0]
)
stats = stats_df.iloc[0].copy()
stats.name = column
stats["range"] = stats["max"] - stats["min"]
stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
stats["cv"] = stats["std"] / float(stats["mean"])
stats["mad"] = (df.select(column)
.na.drop()
.select(df_abs(col(column)-stats["mean"]).alias("delta"))
.agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
stats["type"] = "NUM"
stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
stats['p_zeros'] = stats['n_zeros'] / float(nrows)
hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)
return stats
def describe_date_1d(df, column):
stats_df = df.select(column).na.drop().agg(df_min(col(column)).alias("min"),
df_max(col(column)).alias("max")
).toPandas()
stats = stats_df.iloc[0].copy()
stats.name = column
if isinstance(stats["max"], pd.Timestamp):
stats = stats.astype(object)
stats["max"] = str(stats["max"].to_pydatetime())
stats["min"] = str(stats["min"].to_pydatetime())
else:
stats["range"] = stats["max"] - stats["min"]
stats["type"] = "DATE"
return stats
def guess_json_type(string_value):
try:
obj = json.loads(string_value)
except:
return None
return type(obj)
def describe_categorical_1d(df, column):
value_counts = (df.select(column).na.drop()
.groupBy(column)
.agg(count(col(column)))
.orderBy("count({c})".format(c=column),ascending=False)
).cache()
# Get the most frequent class:
stats = (value_counts
.limit(1)
.withColumnRenamed(column, "top")
.withColumnRenamed("count({c})".format(c=column), "freq")
).toPandas().iloc[0]
top_50 = value_counts.limit(50).toPandas().sort_values("count({c})".format(c=column),
ascending=False)
top_50_categories = top_50[column].values.tolist()
others_count = pd.Series([df.select(column).na.drop()
.where(~(col(column).isin(*top_50_categories)))
.count()
], index=["***Other Values***"])
others_distinct_count = pd.Series([value_counts
.where(~(col(column).isin(*top_50_categories)))
.count()
], index=["***Other Values Distinct Count***"])
top = top_50.set_index(column)["count({c})".format(c=column)]
top = top.append(others_count)
top = top.append(others_distinct_count)
stats["value_counts"] = top
stats["type"] = "CAT"
value_counts.unpersist()
unparsed_valid_jsons = df.select(column).na.drop().rdd.map(
lambda x: guess_json_type(x[column])).filter(
lambda x: x).distinct().collect()
stats["unparsed_json_types"] = unparsed_valid_jsons
return stats
def describe_constant_1d(df, column):
stats = pd.Series(['CONST'], index=['type'], name=column)
stats["value_counts"] = (df.select(column)
.na.drop()
.limit(1)).toPandas().iloc[:,0].value_counts()
return stats
def describe_unique_1d(df, column):
stats = pd.Series(['UNIQUE'], index=['type'], name=column)
stats["value_counts"] = (df.select(column)
.na.drop()
.limit(50)).toPandas().iloc[:,0].value_counts()
return stats
def describe_1d(df, column, nrows, lookup_config=None):
column_type = df.select(column).dtypes[0][1]
if ("array" in column_type) or ("stuct" in column_type) or ("map" in column_type):
raise NotImplementedError("Column {c} is of type {t} and cannot be analyzed".format(c=column, t=column_type))
distinct_count = df.select(column).agg(countDistinct(col(column)).alias("distinct_count")).toPandas()
non_nan_count = df.select(column).na.drop().select(count(col(column)).alias("count")).toPandas()
results_data = pd.concat([distinct_count, non_nan_count],axis=1)
results_data["p_unique"] = results_data["distinct_count"] / float(results_data["count"])
results_data["is_unique"] = results_data["distinct_count"] == nrows
results_data["n_missing"] = nrows - results_data["count"]
results_data["p_missing"] = results_data["n_missing"] / float(nrows)
results_data["p_infinite"] = 0
results_data["n_infinite"] = 0
result = results_data.iloc[0].copy()
result["memorysize"] = 0
result.name = column
if result["distinct_count"] <= 1:
result = result.append(describe_constant_1d(df, column))
elif column_type in {"tinyint", "smallint", "int", "bigint"}:
result = result.append(describe_integer_1d(df, column, result, nrows))
elif column_type in {"float", "double", "decimal"}:
result = result.append(describe_float_1d(df, column, result, nrows))
elif column_type in {"date", "timestamp"}:
result = result.append(describe_date_1d(df, column))
elif result["is_unique"] == True:
result = result.append(describe_unique_1d(df, column))
else:
result = result.append(describe_categorical_1d(df, column))
# Fix to also count MISSING value in the distict_count field:
if result["n_missing"] > 0:
result["distinct_count"] = result["distinct_count"] + 1
if (result["count"] > result["distinct_count"] > 1):
try:
result["mode"] = result["top"]
except KeyError:
result["mode"] = 0
else:
try:
result["mode"] = result["value_counts"].index[0]
except KeyError:
result["mode"] = 0
# If and IndexError happens,
# it is because all column are NULLs:
except IndexError:
result["mode"] = "MISSING"
if lookup_config:
lookup_object = lookup_config['object']
col_name_in_db = lookup_config['col_name_in_db'] if 'col_name_in_db' in lookup_config else None
try:
matched, unmatched = lookup_object.lookup(df.select(column), col_name_in_db)
result['lookedup_values'] = str(matched.count()) + "/" + str(df.select(column).count())
except:
result['lookedup_values'] = 'FAILED'
else:
result['lookedup_values'] = ''
return result
# Do the thing:
ldesc = {}
for colum in df.columns:
if colum in config:
if 'lookup' in config[colum]:
lookup_config = config[colum]['lookup']
desc = describe_1d(df, colum, table_stats["n"], lookup_config=lookup_config)
else:
desc = describe_1d(df, colum, table_stats["n"])
else:
desc = describe_1d(df, colum, table_stats["n"])
ldesc.update({colum: desc})
# Compute correlation matrix
if corr_reject is not None:
computable_corrs = [colum for colum in ldesc if ldesc[colum]["type"] in {"NUM"}]
if len(computable_corrs) > 0:
corr = corr_matrix(df, columns=computable_corrs)
for x, corr_x in corr.iterrows():
for y, corr in corr_x.iteritems():
if x == y:
break
# Convert ldesc to a DataFrame
variable_stats = pd.DataFrame(ldesc)
# General statistics
table_stats["nvar"] = len(df.columns)
table_stats["total_missing"] = float(variable_stats.loc["n_missing"].sum()) / (table_stats["n"] * table_stats["nvar"])
memsize = 0
table_stats['memsize'] = fmt_bytesize(memsize)
table_stats['recordsize'] = fmt_bytesize(memsize / table_stats['n'])
table_stats.update({k: 0 for k in ("NUM", "DATE", "CONST", "CAT", "UNIQUE", "CORR")})
table_stats.update(dict(variable_stats.loc['type'].value_counts()))
table_stats['REJECTED'] = table_stats['CONST'] + table_stats['CORR']
freq_dict = {}
for var in variable_stats:
if "value_counts" not in variable_stats[var]:
pass
elif not(variable_stats[var]["value_counts"] is np.nan):
freq_dict[var] = variable_stats[var]["value_counts"]
else:
pass
try:
variable_stats = variable_stats.drop("value_counts")
except (ValueError, KeyError):
pass
return table_stats, variable_stats.T, freq_dict
import numpy as np
from pyspark.sql.functions import abs as absou
SKEWNESS_CUTOFF = 20
DEFAULT_FLOAT_FORMATTER = u'spark_df_profiling.__default_float_formatter'
def gradient_format(value, limit1, limit2, c1, c2):
def LerpColour(c1,c2,t):
return (int(c1[0]+(c2[0]-c1[0])*t),int(c1[1]+(c2[1]-c1[1])*t),int(c1[2]+(c2[2]-c1[2])*t))
c = LerpColour(c1, c2, (value-limit1)/(limit2-limit1))
return fmt_color(value,"rgb{}".format(str(c)))
def fmt_color(text, color):
return(u'<span style="color:{color}">{text}</span>'.format(color=color,text=str(text)))
def fmt_class(text, cls):
return(u'<span class="{cls}">{text}</span>'.format(cls=cls,text=str(text)))
def fmt_bytesize(num, suffix='B'):
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if num < 0:
num = num*-1
if num < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
def fmt_percent(v):
return "{:2.1f}%".format(v*100)
def fmt_varname(v):
return u'<code>{0}</code>'.format(v)
value_formatters={
u'freq': (lambda v: gradient_format(v, 0, 62000, (30, 198, 244), (99, 200, 72))),
u'p_missing': fmt_percent,
u'p_infinite': fmt_percent,
u'p_unique': fmt_percent,
u'p_zeros': fmt_percent,
u'memorysize': fmt_bytesize,
u'total_missing': fmt_percent,
DEFAULT_FLOAT_FORMATTER: lambda v: str(float('{:.5g}'.format(v))).rstrip('0').rstrip('.'),
u'correlation_var': lambda v: fmt_varname(v),
u'unparsed_json_types': lambda v: ', '.join([s.__name__ for s in v])
}
def fmt_row_severity(v):
if np.isnan(v) or v<= 0.01:
return "ignore"
else:
return "alert"
def fmt_skewness(v):
if not np.isnan(v) and (v<-SKEWNESS_CUTOFF or v> SKEWNESS_CUTOFF):
return "alert"
else:
return ""
row_formatters={
u'p_zeros': fmt_row_severity,
u'p_missing': fmt_row_severity,
u'p_infinite': fmt_row_severity,
u'n_duplicates': fmt_row_severity,
u'skewness': fmt_skewness,
}
Build Spark Describe Function#
run(["/bin/bash", "/etc/config/v3io/v3io-spark-operator.sh"])
def describe_spark(context: MLClientCtx,
dataset: DataItem,
bins: int=30,
describe_extended: bool=True)-> None:
"""
Generates profile reports from an Apache Spark DataFrame.
Based on pandas_profiling, but for Spark's DataFrames instead of pandas.
For each column the following statistics - if relevant for the column type - are presented:
Essentials: type, unique values, missing values
Quantile statistics: minimum value, Q1, median, Q3, maximum, range, interquartile range
Descriptive statistics: mean, mode, standard deviation, sum, median absolute deviation, coefficient of variation, kurtosis, skewness
Most frequent values: for categorical data
:param context: Function context.
:param dataset: Raw data file (currently needs to be a local file located in v3io://User/bigdata)
:param bins: Number of bin in histograms
:param describe_extended: (True) set to False if the aim is to get a simple .describe() infomration
"""
# get file location
location = dataset.local()
# build spark session
spark = SparkSession.builder.appName("Spark job").getOrCreate()
# read csv
df = spark.read.csv(location, header=True, inferSchema= True)
# No use for now
kwargs = []
# take only numric column
float_cols = [item[0] for item in df.dtypes if item[1].startswith('float') or item[1].startswith('double')]
if describe_extended == True:
# run describe function
table, variables, freq = describe(df, bins, float_cols, kwargs)
# get summary table
tbl_1 = variables.reset_index()
# prep report
if len(freq) != 0:
tbl_2 = pd.DataFrame.from_dict(freq, orient = "index").sort_index().stack().reset_index()
tbl_2.columns = ['col', 'key', 'val']
tbl_2['Merged'] = [{key: val} for key, val in zip(tbl_2.key, tbl_2.val)]
tbl_2 = tbl_2.groupby('col', as_index=False).agg(lambda x: tuple(x))[['col','Merged']]
# get summary
summary = pd.merge(tbl_1, tbl_2, how='left', left_on='index', right_on='col')
else:
summary = tbl_1
# log final report
context.log_dataset("summary_stats",
df=summary,
format="csv", index=False,
artifact_path=context.artifact_subpath('data'))
# log overview
context.log_results(table)
else:
# run simple describe and save to pandas
tbl_1 = df.describe().toPandas()
# save final report and transpose
summary = tbl_1.T
# log final report
context.log_dataset("summary_stats",
df=summary,
format="csv", index=False,
artifact_path=context.artifact_subpath('data'))
# stop spark session
spark.stop()
# nuclio: end-code
Save and Config#
fn = mlrun.code_to_function(handler="describe_spark", code_output=".")
fn.apply(mount_v3io())
fn.apply(mount_v3iod(namespace="default-tenant", v3io_config_configmap="spark-operator-v3io-config"))
fn.spec.image_pull_policy = "IfNotPresent"
fn.export("function.yaml")
> 2020-10-28 13:54:47,724 [info] function spec saved to path: function.yaml
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fb4beb8fa50>
Set Environment#
artifact_path = mlrun.set_environment(api_path = 'http://mlrun-api:8080',
artifact_path = os.path.abspath('./'))
> 2020-10-28 13:54:47,738 [warning] warning!, server (0.5.3-rc1) and client (0.5.2) ver dont match
Run and Save Outputs#
run_res = fn.run(inputs={"dataset": "iris_dataset.csv"},
artifact_path=artifact_path, watch=True)
> 2020-10-28 13:54:47,762 [warning] warning!, server (0.5.3-rc1) and client (0.5.2) ver dont match
> 2020-10-28 13:54:47,763 [info] starting run describe-spark-describe_spark uid=a9cc8a2b48ce42d180e490043091da52 -> http://mlrun-api:8080
> 2020-10-28 13:54:48,205 [info] Job is running in the background, pod: describe-spark-describe-spark-pxzxc
project | uid | iter | start | state | name | labels | inputs | parameters | results | artifacts |
---|---|---|---|---|---|---|---|---|---|---|
default | 0 | Oct 28 13:54:47 | running | describe-spark-describe_spark | v3io_user=admin kind=job owner=admin |
dataset |
to track results use .show() or .logs() or in CLI:
!mlrun get run a9cc8a2b48ce42d180e490043091da52 --project default , !mlrun logs a9cc8a2b48ce42d180e490043091da52 --project default
> 2020-10-28 13:54:48,285 [info] run executed, status=running
run_res.show()
> 2020-10-28 13:55:36,021 [warning] warning!, server (0.5.3-rc1) and client (0.5.2) ver dont match
project | uid | iter | start | state | name | labels | inputs | parameters | results | artifacts |
---|---|---|---|---|---|---|---|---|---|---|
default | 0 | Oct 28 13:54:55 | completed | describe-spark-describe_spark | v3io_user=admin kind=job owner=admin host=describe-spark-describe-spark-pxzxc |
dataset |
n=150 nvar=5 total_missing=0.0 memsize=0.0 YiB recordsize=0.0 YiB NUM=5 DATE=0 CONST=0 CAT=0 UNIQUE=0 CORR=0 REJECTED=0 |
summary_stats |