# Copyright 2019 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Generated by nuclio.export.NuclioExporter
import mlrun
from mlrun.platforms.iguazio import mount_v3io, mount_v3iod
from mlrun.datastore import DataItem
from mlrun.execution import MLClientCtx
import os
from subprocess import run
import pandas as pd
import numpy as np
from pyspark.sql.types import LongType
from pyspark.sql import SparkSession
import sys
import base64 as b64
import warnings
warnings.filterwarnings("ignore")
from itertools import product
import matplotlib
import numpy as np
import json
import pandas as pd
from matplotlib import pyplot as plt
from pkg_resources import resource_filename
import six
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import (abs as df_abs, col, count, countDistinct,
max as df_max, mean, min as df_min,
sum as df_sum, when
)
from pyspark.sql.functions import variance, stddev, kurtosis, skewness
def describe(df, bins, corr_reject, config, **kwargs):
if not isinstance(df, SparkDataFrame):
raise TypeError("df must be of type pyspark.sql.DataFrame")
table_stats = {"n": df.count()}
if table_stats["n"] == 0:
raise ValueError("df cannot be empty")
try:
matplotlib.style.use("default")
except:
pass
def pretty_name(x):
x *= 100
if x == int(x):
return '%.0f%%' % x
else:
return '%.1f%%' % x
def corr_matrix(df, columns=None):
if columns is None:
columns = df.columns
combinations = list(product(columns,columns))
def separate(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
grouped = list(separate(combinations,len(columns)))
df_cleaned = df.select(*columns).na.drop(how="any")
for i in grouped:
for j in enumerate(i):
i[j[0]] = i[j[0]] + (df_cleaned.corr(str(j[1][0]), str(j[1][1])),)
df_pandas = pd.DataFrame(grouped).applymap(lambda x: x[2])
df_pandas.columns = columns
df_pandas.index = columns
return df_pandas
def create_hist_data(df, column, minim, maxim, bins=10):
def create_all_conditions(current_col, column, left_edges, count=1):
"""
Recursive function that exploits the
ability to call the Spark SQL Column method
.when() in a recursive way.
"""
left_edges = left_edges[:]
if len(left_edges) == 0:
return current_col
if len(left_edges) == 1:
next_col = current_col.when(col(column) >= float(left_edges[0]), count)
left_edges.pop(0)
return create_all_conditions(next_col, column, left_edges[:], count+1)
next_col = current_col.when((float(left_edges[0]) <= col(column))
& (col(column) < float(left_edges[1])), count)
left_edges.pop(0)
return create_all_conditions(next_col, column, left_edges[:], count+1)
num_range = maxim - minim
bin_width = num_range / float(bins)
left_edges = [minim]
for _bin in range(bins):
left_edges = left_edges + [left_edges[-1] + bin_width]
left_edges.pop()
expression_col = when((float(left_edges[0]) <= col(column))
& (col(column) < float(left_edges[1])), 0)
left_edges_copy = left_edges[:]
left_edges_copy.pop(0)
bin_data = (df.select(col(column))
.na.drop()
.select(col(column),
create_all_conditions(expression_col,
column,
left_edges_copy
).alias("bin_id")
)
.groupBy("bin_id").count()
).toPandas()
bin_data.index = bin_data["bin_id"]
new_index = list(range(bins))
bin_data = bin_data.reindex(new_index)
bin_data["bin_id"] = bin_data.index
bin_data = bin_data.fillna(0)
bin_data["left_edge"] = left_edges
bin_data["width"] = bin_width
return bin_data
def describe_integer_1d(df, column, current_result, nrows):
stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
df_min(col(column)).alias("min"),
df_max(col(column)).alias("max"),
variance(col(column)).alias("variance"),
kurtosis(col(column)).alias("kurtosis"),
stddev(col(column)).alias("std"),
skewness(col(column)).alias("skewness"),
df_sum(col(column)).alias("sum")
).toPandas()
for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
stats_df[pretty_name(x)] = (df.select(column)
.na.drop()
.selectExpr("percentile(`{col}`,CAST({n} AS DOUBLE))"
.format(col=column, n=x)).toPandas().iloc[:,0]
)
stats = stats_df.iloc[0].copy()
stats.name = column
stats["range"] = stats["max"] - stats["min"]
stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
stats["cv"] = stats["std"] / float(stats["mean"])
stats["mad"] = (df.select(column)
.na.drop()
.select(df_abs(col(column)-stats["mean"]).alias("delta"))
.agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
stats["type"] = "NUM"
stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
stats['p_zeros'] = stats['n_zeros'] / float(nrows)
hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)
return stats
def describe_float_1d(df, column, current_result, nrows):
stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
df_min(col(column)).alias("min"),
df_max(col(column)).alias("max"),
variance(col(column)).alias("variance"),
kurtosis(col(column)).alias("kurtosis"),
stddev(col(column)).alias("std"),
skewness(col(column)).alias("skewness"),
df_sum(col(column)).alias("sum")
).toPandas()
for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
stats_df[pretty_name(x)] = (df.select(column)
.na.drop()
.selectExpr("percentile_approx(`{col}`,CAST({n} AS DOUBLE))"
.format(col=column, n=x)).toPandas().iloc[:,0]
)
stats = stats_df.iloc[0].copy()
stats.name = column
stats["range"] = stats["max"] - stats["min"]
stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
stats["cv"] = stats["std"] / float(stats["mean"])
stats["mad"] = (df.select(column)
.na.drop()
.select(df_abs(col(column)-stats["mean"]).alias("delta"))
.agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
stats["type"] = "NUM"
stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
stats['p_zeros'] = stats['n_zeros'] / float(nrows)
hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)
return stats
def describe_date_1d(df, column):
stats_df = df.select(column).na.drop().agg(df_min(col(column)).alias("min"),
df_max(col(column)).alias("max")
).toPandas()
stats = stats_df.iloc[0].copy()
stats.name = column
if isinstance(stats["max"], pd.Timestamp):
stats = stats.astype(object)
stats["max"] = str(stats["max"].to_pydatetime())
stats["min"] = str(stats["min"].to_pydatetime())
else:
stats["range"] = stats["max"] - stats["min"]
stats["type"] = "DATE"
return stats
def guess_json_type(string_value):
try:
obj = json.loads(string_value)
except:
return None
return type(obj)
def describe_categorical_1d(df, column):
value_counts = (df.select(column).na.drop()
.groupBy(column)
.agg(count(col(column)))
.orderBy("count({c})".format(c=column),ascending=False)
).cache()
stats = (value_counts
.limit(1)
.withColumnRenamed(column, "top")
.withColumnRenamed("count({c})".format(c=column), "freq")
).toPandas().iloc[0]
top_50 = value_counts.limit(50).toPandas().sort_values("count({c})".format(c=column),
ascending=False)
top_50_categories = top_50[column].values.tolist()
others_count = pd.Series([df.select(column).na.drop()
.where(~(col(column).isin(*top_50_categories)))
.count()
], index=["***Other Values***"])
others_distinct_count = pd.Series([value_counts
.where(~(col(column).isin(*top_50_categories)))
.count()
], index=["***Other Values Distinct Count***"])
top = top_50.set_index(column)["count({c})".format(c=column)]
top = top.append(others_count)
top = top.append(others_distinct_count)
stats["value_counts"] = top
stats["type"] = "CAT"
value_counts.unpersist()
unparsed_valid_jsons = df.select(column).na.drop().rdd.map(
lambda x: guess_json_type(x[column])).filter(
lambda x: x).distinct().collect()
stats["unparsed_json_types"] = unparsed_valid_jsons
return stats
def describe_constant_1d(df, column):
stats = pd.Series(['CONST'], index=['type'], name=column)
stats["value_counts"] = (df.select(column)
.na.drop()
.limit(1)).toPandas().iloc[:,0].value_counts()
return stats
def describe_unique_1d(df, column):
stats = pd.Series(['UNIQUE'], index=['type'], name=column)
stats["value_counts"] = (df.select(column)
.na.drop()
.limit(50)).toPandas().iloc[:,0].value_counts()
return stats
def describe_1d(df, column, nrows, lookup_config=None):
column_type = df.select(column).dtypes[0][1]
if ("array" in column_type) or ("stuct" in column_type) or ("map" in column_type):
raise NotImplementedError("Column {c} is of type {t} and cannot be analyzed".format(c=column, t=column_type))
distinct_count = df.select(column).agg(countDistinct(col(column)).alias("distinct_count")).toPandas()
non_nan_count = df.select(column).na.drop().select(count(col(column)).alias("count")).toPandas()
results_data = pd.concat([distinct_count, non_nan_count],axis=1)
results_data["p_unique"] = results_data["distinct_count"] / float(results_data["count"])
results_data["is_unique"] = results_data["distinct_count"] == nrows
results_data["n_missing"] = nrows - results_data["count"]
results_data["p_missing"] = results_data["n_missing"] / float(nrows)
results_data["p_infinite"] = 0
results_data["n_infinite"] = 0
result = results_data.iloc[0].copy()
result["memorysize"] = 0
result.name = column
if result["distinct_count"] <= 1:
result = result.append(describe_constant_1d(df, column))
elif column_type in {"tinyint", "smallint", "int", "bigint"}:
result = result.append(describe_integer_1d(df, column, result, nrows))
elif column_type in {"float", "double", "decimal"}:
result = result.append(describe_float_1d(df, column, result, nrows))
elif column_type in {"date", "timestamp"}:
result = result.append(describe_date_1d(df, column))
elif result["is_unique"] == True:
result = result.append(describe_unique_1d(df, column))
else:
result = result.append(describe_categorical_1d(df, column))
if result["n_missing"] > 0:
result["distinct_count"] = result["distinct_count"] + 1
if (result["count"] > result["distinct_count"] > 1):
try:
result["mode"] = result["top"]
except KeyError:
result["mode"] = 0
else:
try:
result["mode"] = result["value_counts"].index[0]
except KeyError:
result["mode"] = 0
except IndexError:
result["mode"] = "MISSING"
if lookup_config:
lookup_object = lookup_config['object']
col_name_in_db = lookup_config['col_name_in_db'] if 'col_name_in_db' in lookup_config else None
try:
matched, unmatched = lookup_object.lookup(df.select(column), col_name_in_db)
result['lookedup_values'] = str(matched.count()) + "/" + str(df.select(column).count())
except:
result['lookedup_values'] = 'FAILED'
else:
result['lookedup_values'] = ''
return result
ldesc = {}
for colum in df.columns:
if colum in config:
if 'lookup' in config[colum]:
lookup_config = config[colum]['lookup']
desc = describe_1d(df, colum, table_stats["n"], lookup_config=lookup_config)
else:
desc = describe_1d(df, colum, table_stats["n"])
else:
desc = describe_1d(df, colum, table_stats["n"])
ldesc.update({colum: desc})
if corr_reject is not None:
computable_corrs = [colum for colum in ldesc if ldesc[colum]["type"] in {"NUM"}]
if len(computable_corrs) > 0:
corr = corr_matrix(df, columns=computable_corrs)
for x, corr_x in corr.iterrows():
for y, corr in corr_x.iteritems():
if x == y:
break
variable_stats = pd.DataFrame(ldesc)
table_stats["nvar"] = len(df.columns)
table_stats["total_missing"] = float(variable_stats.loc["n_missing"].sum()) / (table_stats["n"] * table_stats["nvar"])
memsize = 0
table_stats['memsize'] = fmt_bytesize(memsize)
table_stats['recordsize'] = fmt_bytesize(memsize / table_stats['n'])
table_stats.update({k: 0 for k in ("NUM", "DATE", "CONST", "CAT", "UNIQUE", "CORR")})
table_stats.update(dict(variable_stats.loc['type'].value_counts()))
table_stats['REJECTED'] = table_stats['CONST'] + table_stats['CORR']
freq_dict = {}
for var in variable_stats:
if "value_counts" not in variable_stats[var]:
pass
elif not(variable_stats[var]["value_counts"] is np.nan):
freq_dict[var] = variable_stats[var]["value_counts"]
else:
pass
try:
variable_stats = variable_stats.drop("value_counts")
except (ValueError, KeyError):
pass
return table_stats, variable_stats.T, freq_dict
import numpy as np
from pyspark.sql.functions import abs as absou
SKEWNESS_CUTOFF = 20
DEFAULT_FLOAT_FORMATTER = u'spark_df_profiling.__default_float_formatter'
def gradient_format(value, limit1, limit2, c1, c2):
def LerpColour(c1,c2,t):
return (int(c1[0]+(c2[0]-c1[0])*t),int(c1[1]+(c2[1]-c1[1])*t),int(c1[2]+(c2[2]-c1[2])*t))
c = LerpColour(c1, c2, (value-limit1)/(limit2-limit1))
return fmt_color(value,"rgb{}".format(str(c)))
def fmt_color(text, color):
return(u'{text}'.format(color=color,text=str(text)))
def fmt_class(text, cls):
return(u'{text}'.format(cls=cls,text=str(text)))
def fmt_bytesize(num, suffix='B'):
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if num < 0:
num = num*-1
if num < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
def fmt_percent(v):
return "{:2.1f}%".format(v*100)
def fmt_varname(v):
return u'{0}
'.format(v)
value_formatters={
u'freq': (lambda v: gradient_format(v, 0, 62000, (30, 198, 244), (99, 200, 72))),
u'p_missing': fmt_percent,
u'p_infinite': fmt_percent,
u'p_unique': fmt_percent,
u'p_zeros': fmt_percent,
u'memorysize': fmt_bytesize,
u'total_missing': fmt_percent,
DEFAULT_FLOAT_FORMATTER: lambda v: str(float('{:.5g}'.format(v))).rstrip('0').rstrip('.'),
u'correlation_var': lambda v: fmt_varname(v),
u'unparsed_json_types': lambda v: ', '.join([s.__name__ for s in v])
}
def fmt_row_severity(v):
if np.isnan(v) or v<= 0.01:
return "ignore"
else:
return "alert"
def fmt_skewness(v):
if not np.isnan(v) and (v<-SKEWNESS_CUTOFF or v> SKEWNESS_CUTOFF):
return "alert"
else:
return ""
row_formatters={
u'p_zeros': fmt_row_severity,
u'p_missing': fmt_row_severity,
u'p_infinite': fmt_row_severity,
u'n_duplicates': fmt_row_severity,
u'skewness': fmt_skewness,
}
run(["/bin/bash", "/etc/config/v3io/v3io-spark-operator.sh"])
def describe_spark(context: MLClientCtx,
dataset: DataItem,
artifact_path,
bins: int=30,
describe_extended: bool=True):
location = dataset.local()
spark = SparkSession.builder.appName("Spark job").getOrCreate()
df = spark.read.csv(location, header=True, inferSchema= True)
kwargs = []
float_cols = [item[0] for item in df.dtypes if item[1].startswith('float') or item[1].startswith('double')]
if describe_extended == True:
table, variables, freq = describe(df, bins, float_cols, kwargs)
tbl_1 = variables.reset_index()
if len(freq) != 0:
tbl_2 = pd.DataFrame.from_dict(freq, orient = "index").sort_index().stack().reset_index()
tbl_2.columns = ['col', 'key', 'val']
tbl_2['Merged'] = [{key: val} for key, val in zip(tbl_2.key, tbl_2.val)]
tbl_2 = tbl_2.groupby('col', as_index=False).agg(lambda x: tuple(x))[['col','Merged']]
summary = pd.merge(tbl_1, tbl_2, how='left', left_on='index', right_on='col')
else:
summary = tbl_1
context.log_dataset("summary_stats",
df=summary,
format="csv", index=False,
artifact_path=context.artifact_subpath('data'))
context.log_results(table)
else:
tbl_1 = df.describe().toPandas()
summary = tbl_1.T
context.log_dataset("summary_stats",
df=summary,
format="csv", index=False,
artifact_path=context.artifact_subpath('data'))
spark.stop()