Import and Config#

# nuclio: ignore
import nuclio
%nuclio config kind = "job"
%nuclio config spec.image = "iguazio/shell:3.0_b5565_20201026062233_wsdf" # docker image available on idan707/spark_shell 
%nuclio: setting kind to 'job'
%nuclio: setting spec.image to 'iguazio/shell:3.0_b5565_20201026062233_wsdf'
import mlrun
from mlrun.platforms.iguazio import mount_v3io, mount_v3iod
from mlrun.datastore import DataItem
from mlrun.execution import MLClientCtx

import os
from subprocess import run
import pandas as pd
import numpy as np

from pyspark.sql.types import LongType
from pyspark.sql import SparkSession

Build Spark Describe Helper Functions#

import sys
import base64 as b64
import warnings
warnings.filterwarnings("ignore")

from itertools import product
import matplotlib

import numpy as np
import json
import pandas as pd
from matplotlib import pyplot as plt
from pkg_resources import resource_filename
import six
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import (abs as df_abs, col, count, countDistinct,
                                   max as df_max, mean, min as df_min,
                                   sum as df_sum, when
                                   )
from pyspark.sql.functions import variance, stddev, kurtosis, skewness


def describe(df, bins, corr_reject, config, **kwargs):
    if not isinstance(df, SparkDataFrame):
        raise TypeError("df must be of type pyspark.sql.DataFrame")

    # Number of rows:
    table_stats = {"n": df.count()}
    if table_stats["n"] == 0:
        raise ValueError("df cannot be empty")

    try:
        matplotlib.style.use("default")
    except:
        pass

    # Function to "pretty name" floats:
    def pretty_name(x):
        x *= 100
        if x == int(x):
            return '%.0f%%' % x
        else:
            return '%.1f%%' % x

    def corr_matrix(df, columns=None):
        if columns is None:
            columns = df.columns
        combinations = list(product(columns,columns))

        def separate(l, n):
            for i in range(0, len(l), n):
                yield l[i:i+n]

        grouped = list(separate(combinations,len(columns)))
        df_cleaned = df.select(*columns).na.drop(how="any")

        for i in grouped:
            for j in enumerate(i):
                i[j[0]] = i[j[0]] + (df_cleaned.corr(str(j[1][0]), str(j[1][1])),)

        df_pandas = pd.DataFrame(grouped).applymap(lambda x: x[2])
        df_pandas.columns = columns
        df_pandas.index = columns
        
        return df_pandas

    def create_hist_data(df, column, minim, maxim, bins=10):

        def create_all_conditions(current_col, column, left_edges, count=1):
            """
            Recursive function that exploits the
            ability to call the Spark SQL Column method
            .when() in a recursive way.
            """
            left_edges = left_edges[:]
            if len(left_edges) == 0:
                return current_col
            if len(left_edges) == 1:
                next_col = current_col.when(col(column) >= float(left_edges[0]), count)
                left_edges.pop(0)
                return create_all_conditions(next_col, column, left_edges[:], count+1)
            next_col = current_col.when((float(left_edges[0]) <= col(column))
                                        & (col(column) < float(left_edges[1])), count)
            left_edges.pop(0)
            return create_all_conditions(next_col, column, left_edges[:], count+1)

        num_range = maxim - minim
        bin_width = num_range / float(bins)
        left_edges = [minim]
        for _bin in range(bins):
            left_edges = left_edges + [left_edges[-1] + bin_width]
        left_edges.pop()
        expression_col = when((float(left_edges[0]) <= col(column))
                              & (col(column) < float(left_edges[1])), 0)
        left_edges_copy = left_edges[:]
        left_edges_copy.pop(0)
        bin_data = (df.select(col(column))
                    .na.drop()
                    .select(col(column),
                            create_all_conditions(expression_col,
                                                  column,
                                                  left_edges_copy
                                                 ).alias("bin_id")
                           )
                    .groupBy("bin_id").count()
                   ).toPandas()

        bin_data.index = bin_data["bin_id"]
        new_index = list(range(bins))
        bin_data = bin_data.reindex(new_index)
        bin_data["bin_id"] = bin_data.index
        bin_data = bin_data.fillna(0)

        bin_data["left_edge"] = left_edges
        bin_data["width"] = bin_width
        

        return bin_data


    def describe_integer_1d(df, column, current_result, nrows):
        
        stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
                                                       df_min(col(column)).alias("min"),
                                                       df_max(col(column)).alias("max"),
                                                       variance(col(column)).alias("variance"),
                                                       kurtosis(col(column)).alias("kurtosis"),
                                                       stddev(col(column)).alias("std"),
                                                       skewness(col(column)).alias("skewness"),
                                                       df_sum(col(column)).alias("sum")
                                                       ).toPandas()


        for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
            stats_df[pretty_name(x)] = (df.select(column)
                                        .na.drop()
                                        .selectExpr("percentile(`{col}`,CAST({n} AS DOUBLE))"
                                                    .format(col=column, n=x)).toPandas().iloc[:,0]
                                        )
        stats = stats_df.iloc[0].copy()
        stats.name = column
        stats["range"] = stats["max"] - stats["min"]
        stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
        stats["cv"] = stats["std"] / float(stats["mean"])
        stats["mad"] = (df.select(column)
                        .na.drop()
                        .select(df_abs(col(column)-stats["mean"]).alias("delta"))
                        .agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
        stats["type"] = "NUM"
        stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
        stats['p_zeros'] = stats['n_zeros'] / float(nrows)

        hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)

        return stats

    def describe_float_1d(df, column, current_result, nrows):
        stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
                                                       df_min(col(column)).alias("min"),
                                                       df_max(col(column)).alias("max"),
                                                       variance(col(column)).alias("variance"),
                                                       kurtosis(col(column)).alias("kurtosis"),
                                                       stddev(col(column)).alias("std"),
                                                       skewness(col(column)).alias("skewness"),
                                                       df_sum(col(column)).alias("sum")
                                                       ).toPandas()

        for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
            stats_df[pretty_name(x)] = (df.select(column)
                                        .na.drop()
                                        .selectExpr("percentile_approx(`{col}`,CAST({n} AS DOUBLE))"
                                                    .format(col=column, n=x)).toPandas().iloc[:,0]
                                        )
        stats = stats_df.iloc[0].copy()
        stats.name = column
        stats["range"] = stats["max"] - stats["min"]
        stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
        stats["cv"] = stats["std"] / float(stats["mean"])
        stats["mad"] = (df.select(column)
                        .na.drop()
                        .select(df_abs(col(column)-stats["mean"]).alias("delta"))
                        .agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
        stats["type"] = "NUM"
        stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
        stats['p_zeros'] = stats['n_zeros'] / float(nrows)

        hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)

        return stats

    def describe_date_1d(df, column):
        stats_df = df.select(column).na.drop().agg(df_min(col(column)).alias("min"),
                                                   df_max(col(column)).alias("max")
                                                  ).toPandas()
        stats = stats_df.iloc[0].copy()
        stats.name = column

        if isinstance(stats["max"], pd.Timestamp):
            stats = stats.astype(object)
            stats["max"] = str(stats["max"].to_pydatetime())
            stats["min"] = str(stats["min"].to_pydatetime())

        else:
            stats["range"] = stats["max"] - stats["min"]
        stats["type"] = "DATE"
        return stats

    def guess_json_type(string_value):
        try:
            obj = json.loads(string_value)
        except:
            return None

        return type(obj)

    def describe_categorical_1d(df, column):
        value_counts = (df.select(column).na.drop()
                        .groupBy(column)
                        .agg(count(col(column)))
                        .orderBy("count({c})".format(c=column),ascending=False)
                       ).cache()

        # Get the most frequent class:
        stats = (value_counts
                 .limit(1)
                 .withColumnRenamed(column, "top")
                 .withColumnRenamed("count({c})".format(c=column), "freq")
                ).toPandas().iloc[0]

        top_50 = value_counts.limit(50).toPandas().sort_values("count({c})".format(c=column),
                                                               ascending=False)
        top_50_categories = top_50[column].values.tolist()

        others_count = pd.Series([df.select(column).na.drop()
                        .where(~(col(column).isin(*top_50_categories)))
                        .count()
                        ], index=["***Other Values***"])
        others_distinct_count = pd.Series([value_counts
                                .where(~(col(column).isin(*top_50_categories)))
                                .count()
                                ], index=["***Other Values Distinct Count***"])

        top = top_50.set_index(column)["count({c})".format(c=column)]
        top = top.append(others_count)
        top = top.append(others_distinct_count)
        stats["value_counts"] = top
        stats["type"] = "CAT"
        value_counts.unpersist()
        unparsed_valid_jsons = df.select(column).na.drop().rdd.map(
            lambda x: guess_json_type(x[column])).filter(
            lambda x: x).distinct().collect()
        stats["unparsed_json_types"] = unparsed_valid_jsons
        return stats

    def describe_constant_1d(df, column):
        stats = pd.Series(['CONST'], index=['type'], name=column)
        stats["value_counts"] = (df.select(column)
                                 .na.drop()
                                 .limit(1)).toPandas().iloc[:,0].value_counts()
        return stats

    def describe_unique_1d(df, column):
        stats = pd.Series(['UNIQUE'], index=['type'], name=column)
        stats["value_counts"] = (df.select(column)
                                 .na.drop()
                                 .limit(50)).toPandas().iloc[:,0].value_counts()
        return stats

    def describe_1d(df, column, nrows, lookup_config=None):
        column_type = df.select(column).dtypes[0][1]
        if ("array" in column_type) or ("stuct" in column_type) or ("map" in column_type):
            raise NotImplementedError("Column {c} is of type {t} and cannot be analyzed".format(c=column, t=column_type))

        distinct_count = df.select(column).agg(countDistinct(col(column)).alias("distinct_count")).toPandas()
        non_nan_count = df.select(column).na.drop().select(count(col(column)).alias("count")).toPandas()
        results_data = pd.concat([distinct_count, non_nan_count],axis=1)
        results_data["p_unique"] = results_data["distinct_count"] / float(results_data["count"])
        results_data["is_unique"] = results_data["distinct_count"] == nrows
        results_data["n_missing"] = nrows - results_data["count"]
        results_data["p_missing"] = results_data["n_missing"] / float(nrows)
        results_data["p_infinite"] = 0
        results_data["n_infinite"] = 0
        result = results_data.iloc[0].copy()
        result["memorysize"] = 0
        result.name = column

        if result["distinct_count"] <= 1:
            result = result.append(describe_constant_1d(df, column))
        elif column_type in {"tinyint", "smallint", "int", "bigint"}:
            result = result.append(describe_integer_1d(df, column, result, nrows))
        elif column_type in {"float", "double", "decimal"}:
            result = result.append(describe_float_1d(df, column, result, nrows))
        elif column_type in {"date", "timestamp"}:
            result = result.append(describe_date_1d(df, column))
        elif result["is_unique"] == True:
            result = result.append(describe_unique_1d(df, column))
        else:
            result = result.append(describe_categorical_1d(df, column))
            # Fix to also count MISSING value in the distict_count field:
            if result["n_missing"] > 0:
                result["distinct_count"] = result["distinct_count"] + 1

        if (result["count"] > result["distinct_count"] > 1):
            try:
                result["mode"] = result["top"]
            except KeyError:
                result["mode"] = 0
        else:
            try:
                result["mode"] = result["value_counts"].index[0]
            except KeyError:
                result["mode"] = 0
            # If and IndexError happens,
            # it is because all column are NULLs:
            except IndexError:
                result["mode"] = "MISSING"

        if lookup_config:
            lookup_object = lookup_config['object']
            col_name_in_db = lookup_config['col_name_in_db'] if 'col_name_in_db' in lookup_config else None
            try:
                matched, unmatched = lookup_object.lookup(df.select(column), col_name_in_db)
                result['lookedup_values'] = str(matched.count()) + "/" + str(df.select(column).count())
            except:
                result['lookedup_values'] = 'FAILED'
        else:
            result['lookedup_values'] = ''

        return result


    # Do the thing:
    ldesc = {}
    for colum in df.columns:
        if colum in config:
            if 'lookup' in config[colum]:
                lookup_config = config[colum]['lookup']
                desc = describe_1d(df, colum, table_stats["n"], lookup_config=lookup_config)
            else:
                desc = describe_1d(df, colum, table_stats["n"])
        else:
            desc = describe_1d(df, colum, table_stats["n"])
        ldesc.update({colum: desc})

    # Compute correlation matrix
    if corr_reject is not None:
        computable_corrs = [colum for colum in ldesc if ldesc[colum]["type"] in {"NUM"}]

        if len(computable_corrs) > 0:
            corr = corr_matrix(df, columns=computable_corrs)
            for x, corr_x in corr.iterrows():
                for y, corr in corr_x.iteritems():
                    if x == y:
                        break

    # Convert ldesc to a DataFrame
    variable_stats = pd.DataFrame(ldesc)

    # General statistics
    table_stats["nvar"] = len(df.columns)
    table_stats["total_missing"] = float(variable_stats.loc["n_missing"].sum()) / (table_stats["n"] * table_stats["nvar"])
    memsize = 0
    table_stats['memsize'] = fmt_bytesize(memsize)
    table_stats['recordsize'] = fmt_bytesize(memsize / table_stats['n'])
    table_stats.update({k: 0 for k in ("NUM", "DATE", "CONST", "CAT", "UNIQUE", "CORR")})
    table_stats.update(dict(variable_stats.loc['type'].value_counts()))
    table_stats['REJECTED'] = table_stats['CONST'] + table_stats['CORR']

    freq_dict = {}
    for var in variable_stats:
        if "value_counts" not in variable_stats[var]:
            pass
        elif not(variable_stats[var]["value_counts"] is np.nan):
            freq_dict[var] = variable_stats[var]["value_counts"]
        else:
            pass
    try:
        variable_stats = variable_stats.drop("value_counts")
    except (ValueError, KeyError):
        pass

    return table_stats, variable_stats.T, freq_dict
import numpy as np
from pyspark.sql.functions import abs as absou

SKEWNESS_CUTOFF = 20
DEFAULT_FLOAT_FORMATTER = u'spark_df_profiling.__default_float_formatter'


def gradient_format(value, limit1, limit2, c1, c2):
    def LerpColour(c1,c2,t):
        return (int(c1[0]+(c2[0]-c1[0])*t),int(c1[1]+(c2[1]-c1[1])*t),int(c1[2]+(c2[2]-c1[2])*t))
    c = LerpColour(c1, c2, (value-limit1)/(limit2-limit1))
    return fmt_color(value,"rgb{}".format(str(c)))


def fmt_color(text, color):
    return(u'<span style="color:{color}">{text}</span>'.format(color=color,text=str(text)))


def fmt_class(text, cls):
    return(u'<span class="{cls}">{text}</span>'.format(cls=cls,text=str(text)))


def fmt_bytesize(num, suffix='B'):
    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
        if num < 0:
            num = num*-1
            if num < 1024.0:
                return "%3.1f %s%s" % (num, unit, suffix)
            num /= 1024.0
    return "%.1f %s%s" % (num, 'Yi', suffix)


def fmt_percent(v):
    return  "{:2.1f}%".format(v*100)

def fmt_varname(v):
    return u'<code>{0}</code>'.format(v)


value_formatters={
        u'freq': (lambda v: gradient_format(v, 0, 62000, (30, 198, 244), (99, 200, 72))),
        u'p_missing': fmt_percent,
        u'p_infinite': fmt_percent,
        u'p_unique': fmt_percent,
        u'p_zeros': fmt_percent,
        u'memorysize': fmt_bytesize,
        u'total_missing': fmt_percent,
        DEFAULT_FLOAT_FORMATTER: lambda v: str(float('{:.5g}'.format(v))).rstrip('0').rstrip('.'),
        u'correlation_var': lambda v: fmt_varname(v),
        u'unparsed_json_types': lambda v: ', '.join([s.__name__ for s in v])
        }

def fmt_row_severity(v):
    if np.isnan(v) or v<= 0.01:
        return "ignore"
    else:
        return "alert"

def fmt_skewness(v):
    if not np.isnan(v) and (v<-SKEWNESS_CUTOFF or v> SKEWNESS_CUTOFF):
        return "alert"
    else:
        return ""

row_formatters={
    u'p_zeros': fmt_row_severity,
    u'p_missing': fmt_row_severity,
    u'p_infinite': fmt_row_severity,
    u'n_duplicates': fmt_row_severity,
    u'skewness': fmt_skewness,
}

Build Spark Describe Function#

run(["/bin/bash", "/etc/config/v3io/v3io-spark-operator.sh"])

def describe_spark(context: MLClientCtx, 
                   dataset: DataItem,
                   bins: int=30,
                   describe_extended: bool=True)-> None:
    """
    Generates profile reports from an Apache Spark DataFrame. 
    Based on pandas_profiling, but for Spark's DataFrames instead of pandas.
    For each column the following statistics - if relevant for the column type - are presented:
    
    Essentials: type, unique values, missing values
    Quantile statistics: minimum value, Q1, median, Q3, maximum, range, interquartile range
    Descriptive statistics: mean, mode, standard deviation, sum, median absolute deviation, coefficient of variation, kurtosis, skewness
    Most frequent values: for categorical data 
    
    :param context:               Function context.
    :param dataset:               Raw data file (currently needs to be a local file located in v3io://User/bigdata)
    :param bins:                  Number of bin in histograms
    :param describe_extended:     (True) set to False if the aim is to get a simple .describe() infomration
    """
    
    # get file location
    location = dataset.local()
    
    # build spark session
    spark = SparkSession.builder.appName("Spark job").getOrCreate()
    
    # read csv
    df = spark.read.csv(location, header=True, inferSchema= True)

    # No use for now
    kwargs = []
    
    # take only numric column
    float_cols = [item[0] for item in df.dtypes if item[1].startswith('float') or item[1].startswith('double')]
    
    if describe_extended == True:
        
        # run describe function
        table, variables, freq = describe(df, bins, float_cols, kwargs)

        # get summary table
        tbl_1 = variables.reset_index()

        # prep report 
        if len(freq) != 0:
            tbl_2 = pd.DataFrame.from_dict(freq, orient = "index").sort_index().stack().reset_index()
            tbl_2.columns = ['col', 'key', 'val']
            tbl_2['Merged'] = [{key: val} for key, val in zip(tbl_2.key, tbl_2.val)]
            tbl_2 = tbl_2.groupby('col', as_index=False).agg(lambda x: tuple(x))[['col','Merged']]

            # get summary
            summary = pd.merge(tbl_1, tbl_2, how='left', left_on='index', right_on='col')

        else:
            summary = tbl_1

        # log final report
        context.log_dataset("summary_stats", 
                            df=summary,
                            format="csv", index=False,
                            artifact_path=context.artifact_subpath('data'))

        # log overview
        context.log_results(table)
    
    else:
        # run simple describe and save to pandas
        tbl_1 = df.describe().toPandas()
        
        # save final report and transpose 
        summary = tbl_1.T
        
        # log final report
        context.log_dataset("summary_stats", 
                            df=summary,
                            format="csv", index=False,
                            artifact_path=context.artifact_subpath('data'))
    
    # stop spark session
    spark.stop()
# nuclio: end-code

Save and Config#

fn = mlrun.code_to_function(handler="describe_spark", code_output=".")
fn.apply(mount_v3io())
fn.apply(mount_v3iod(namespace="default-tenant", v3io_config_configmap="spark-operator-v3io-config"))
fn.spec.image_pull_policy = "IfNotPresent"
fn.export("function.yaml")
> 2020-10-28 13:54:47,724 [info] function spec saved to path: function.yaml
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fb4beb8fa50>

Set Environment#

artifact_path = mlrun.set_environment(api_path = 'http://mlrun-api:8080',
                                      artifact_path = os.path.abspath('./'))
> 2020-10-28 13:54:47,738 [warning] warning!, server (0.5.3-rc1) and client (0.5.2) ver dont match

Run and Save Outputs#

run_res = fn.run(inputs={"dataset": "iris_dataset.csv"},
                 artifact_path=artifact_path, watch=True)
> 2020-10-28 13:54:47,762 [warning] warning!, server (0.5.3-rc1) and client (0.5.2) ver dont match
> 2020-10-28 13:54:47,763 [info] starting run describe-spark-describe_spark uid=a9cc8a2b48ce42d180e490043091da52  -> http://mlrun-api:8080
> 2020-10-28 13:54:48,205 [info] Job is running in the background, pod: describe-spark-describe-spark-pxzxc
project uid iter start state name labels inputs parameters results artifacts
default 0 Oct 28 13:54:47 running describe-spark-describe_spark
v3io_user=admin
kind=job
owner=admin
dataset
to track results use .show() or .logs() or in CLI: 
!mlrun get run a9cc8a2b48ce42d180e490043091da52 --project default , !mlrun logs a9cc8a2b48ce42d180e490043091da52 --project default
> 2020-10-28 13:54:48,285 [info] run executed, status=running
run_res.show()
> 2020-10-28 13:55:36,021 [warning] warning!, server (0.5.3-rc1) and client (0.5.2) ver dont match
project uid iter start state name labels inputs parameters results artifacts
default 0 Oct 28 13:54:55 completed describe-spark-describe_spark
v3io_user=admin
kind=job
owner=admin
host=describe-spark-describe-spark-pxzxc
dataset
n=150
nvar=5
total_missing=0.0
memsize=0.0 YiB
recordsize=0.0 YiB
NUM=5
DATE=0
CONST=0
CAT=0
UNIQUE=0
CORR=0
REJECTED=0
summary_stats