# Copyright 2019 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Generated by nuclio.export.NuclioExporter

import mlrun
from mlrun.platforms.iguazio import mount_v3io, mount_v3iod
from mlrun.datastore import DataItem
from mlrun.execution import MLClientCtx

import os
from subprocess import run
import pandas as pd
import numpy as np

from pyspark.sql.types import LongType
from pyspark.sql import SparkSession

import sys
import base64 as b64
import warnings
warnings.filterwarnings("ignore")

from itertools import product
import matplotlib

import numpy as np
import json
import pandas as pd
from matplotlib import pyplot as plt
from pkg_resources import resource_filename
import six
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import (abs as df_abs, col, count, countDistinct,
                                   max as df_max, mean, min as df_min,
                                   sum as df_sum, when
                                   )
from pyspark.sql.functions import variance, stddev, kurtosis, skewness


def describe(df, bins, corr_reject, config, **kwargs):
    if not isinstance(df, SparkDataFrame):
        raise TypeError("df must be of type pyspark.sql.DataFrame")

    table_stats = {"n": df.count()}
    if table_stats["n"] == 0:
        raise ValueError("df cannot be empty")

    try:
        matplotlib.style.use("default")
    except:
        pass

    def pretty_name(x):
        x *= 100
        if x == int(x):
            return '%.0f%%' % x
        else:
            return '%.1f%%' % x

    def corr_matrix(df, columns=None):
        if columns is None:
            columns = df.columns
        combinations = list(product(columns,columns))

        def separate(l, n):
            for i in range(0, len(l), n):
                yield l[i:i+n]

        grouped = list(separate(combinations,len(columns)))
        df_cleaned = df.select(*columns).na.drop(how="any")

        for i in grouped:
            for j in enumerate(i):
                i[j[0]] = i[j[0]] + (df_cleaned.corr(str(j[1][0]), str(j[1][1])),)

        df_pandas = pd.DataFrame(grouped).applymap(lambda x: x[2])
        df_pandas.columns = columns
        df_pandas.index = columns
        
        return df_pandas

    def create_hist_data(df, column, minim, maxim, bins=10):

        def create_all_conditions(current_col, column, left_edges, count=1):
            """
            Recursive function that exploits the
            ability to call the Spark SQL Column method
            .when() in a recursive way.
            """
            left_edges = left_edges[:]
            if len(left_edges) == 0:
                return current_col
            if len(left_edges) == 1:
                next_col = current_col.when(col(column) >= float(left_edges[0]), count)
                left_edges.pop(0)
                return create_all_conditions(next_col, column, left_edges[:], count+1)
            next_col = current_col.when((float(left_edges[0]) <= col(column))
                                        & (col(column) < float(left_edges[1])), count)
            left_edges.pop(0)
            return create_all_conditions(next_col, column, left_edges[:], count+1)

        num_range = maxim - minim
        bin_width = num_range / float(bins)
        left_edges = [minim]
        for _bin in range(bins):
            left_edges = left_edges + [left_edges[-1] + bin_width]
        left_edges.pop()
        expression_col = when((float(left_edges[0]) <= col(column))
                              & (col(column) < float(left_edges[1])), 0)
        left_edges_copy = left_edges[:]
        left_edges_copy.pop(0)
        bin_data = (df.select(col(column))
                    .na.drop()
                    .select(col(column),
                            create_all_conditions(expression_col,
                                                  column,
                                                  left_edges_copy
                                                 ).alias("bin_id")
                           )
                    .groupBy("bin_id").count()
                   ).toPandas()

        bin_data.index = bin_data["bin_id"]
        new_index = list(range(bins))
        bin_data = bin_data.reindex(new_index)
        bin_data["bin_id"] = bin_data.index
        bin_data = bin_data.fillna(0)

        bin_data["left_edge"] = left_edges
        bin_data["width"] = bin_width
        

        return bin_data


    def describe_integer_1d(df, column, current_result, nrows):
        
        stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
                                                       df_min(col(column)).alias("min"),
                                                       df_max(col(column)).alias("max"),
                                                       variance(col(column)).alias("variance"),
                                                       kurtosis(col(column)).alias("kurtosis"),
                                                       stddev(col(column)).alias("std"),
                                                       skewness(col(column)).alias("skewness"),
                                                       df_sum(col(column)).alias("sum")
                                                       ).toPandas()


        for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
            stats_df[pretty_name(x)] = (df.select(column)
                                        .na.drop()
                                        .selectExpr("percentile(`{col}`,CAST({n} AS DOUBLE))"
                                                    .format(col=column, n=x)).toPandas().iloc[:,0]
                                        )
        stats = stats_df.iloc[0].copy()
        stats.name = column
        stats["range"] = stats["max"] - stats["min"]
        stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
        stats["cv"] = stats["std"] / float(stats["mean"])
        stats["mad"] = (df.select(column)
                        .na.drop()
                        .select(df_abs(col(column)-stats["mean"]).alias("delta"))
                        .agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
        stats["type"] = "NUM"
        stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
        stats['p_zeros'] = stats['n_zeros'] / float(nrows)

        hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)

        return stats

    def describe_float_1d(df, column, current_result, nrows):
        stats_df = df.select(column).na.drop().agg(mean(col(column)).alias("mean"),
                                                       df_min(col(column)).alias("min"),
                                                       df_max(col(column)).alias("max"),
                                                       variance(col(column)).alias("variance"),
                                                       kurtosis(col(column)).alias("kurtosis"),
                                                       stddev(col(column)).alias("std"),
                                                       skewness(col(column)).alias("skewness"),
                                                       df_sum(col(column)).alias("sum")
                                                       ).toPandas()

        for x in np.array([0.05, 0.25, 0.5, 0.75, 0.95]):
            stats_df[pretty_name(x)] = (df.select(column)
                                        .na.drop()
                                        .selectExpr("percentile_approx(`{col}`,CAST({n} AS DOUBLE))"
                                                    .format(col=column, n=x)).toPandas().iloc[:,0]
                                        )
        stats = stats_df.iloc[0].copy()
        stats.name = column
        stats["range"] = stats["max"] - stats["min"]
        stats["iqr"] = stats[pretty_name(0.75)] - stats[pretty_name(0.25)]
        stats["cv"] = stats["std"] / float(stats["mean"])
        stats["mad"] = (df.select(column)
                        .na.drop()
                        .select(df_abs(col(column)-stats["mean"]).alias("delta"))
                        .agg(df_sum(col("delta"))).toPandas().iloc[0,0] / float(current_result["count"]))
        stats["type"] = "NUM"
        stats['n_zeros'] = df.select(column).where(col(column)==0.0).count()
        stats['p_zeros'] = stats['n_zeros'] / float(nrows)

        hist_data = create_hist_data(df, column, stats["min"], stats["max"], bins)

        return stats

    def describe_date_1d(df, column):
        stats_df = df.select(column).na.drop().agg(df_min(col(column)).alias("min"),
                                                   df_max(col(column)).alias("max")
                                                  ).toPandas()
        stats = stats_df.iloc[0].copy()
        stats.name = column

        if isinstance(stats["max"], pd.Timestamp):
            stats = stats.astype(object)
            stats["max"] = str(stats["max"].to_pydatetime())
            stats["min"] = str(stats["min"].to_pydatetime())

        else:
            stats["range"] = stats["max"] - stats["min"]
        stats["type"] = "DATE"
        return stats

    def guess_json_type(string_value):
        try:
            obj = json.loads(string_value)
        except:
            return None

        return type(obj)

    def describe_categorical_1d(df, column):
        value_counts = (df.select(column).na.drop()
                        .groupBy(column)
                        .agg(count(col(column)))
                        .orderBy("count({c})".format(c=column),ascending=False)
                       ).cache()

        stats = (value_counts
                 .limit(1)
                 .withColumnRenamed(column, "top")
                 .withColumnRenamed("count({c})".format(c=column), "freq")
                ).toPandas().iloc[0]

        top_50 = value_counts.limit(50).toPandas().sort_values("count({c})".format(c=column),
                                                               ascending=False)
        top_50_categories = top_50[column].values.tolist()

        others_count = pd.Series([df.select(column).na.drop()
                        .where(~(col(column).isin(*top_50_categories)))
                        .count()
                        ], index=["***Other Values***"])
        others_distinct_count = pd.Series([value_counts
                                .where(~(col(column).isin(*top_50_categories)))
                                .count()
                                ], index=["***Other Values Distinct Count***"])

        top = top_50.set_index(column)["count({c})".format(c=column)]
        top = top.append(others_count)
        top = top.append(others_distinct_count)
        stats["value_counts"] = top
        stats["type"] = "CAT"
        value_counts.unpersist()
        unparsed_valid_jsons = df.select(column).na.drop().rdd.map(
            lambda x: guess_json_type(x[column])).filter(
            lambda x: x).distinct().collect()
        stats["unparsed_json_types"] = unparsed_valid_jsons
        return stats

    def describe_constant_1d(df, column):
        stats = pd.Series(['CONST'], index=['type'], name=column)
        stats["value_counts"] = (df.select(column)
                                 .na.drop()
                                 .limit(1)).toPandas().iloc[:,0].value_counts()
        return stats

    def describe_unique_1d(df, column):
        stats = pd.Series(['UNIQUE'], index=['type'], name=column)
        stats["value_counts"] = (df.select(column)
                                 .na.drop()
                                 .limit(50)).toPandas().iloc[:,0].value_counts()
        return stats

    def describe_1d(df, column, nrows, lookup_config=None):
        column_type = df.select(column).dtypes[0][1]
        if ("array" in column_type) or ("stuct" in column_type) or ("map" in column_type):
            raise NotImplementedError("Column {c} is of type {t} and cannot be analyzed".format(c=column, t=column_type))

        distinct_count = df.select(column).agg(countDistinct(col(column)).alias("distinct_count")).toPandas()
        non_nan_count = df.select(column).na.drop().select(count(col(column)).alias("count")).toPandas()
        results_data = pd.concat([distinct_count, non_nan_count],axis=1)
        results_data["p_unique"] = results_data["distinct_count"] / float(results_data["count"])
        results_data["is_unique"] = results_data["distinct_count"] == nrows
        results_data["n_missing"] = nrows - results_data["count"]
        results_data["p_missing"] = results_data["n_missing"] / float(nrows)
        results_data["p_infinite"] = 0
        results_data["n_infinite"] = 0
        result = results_data.iloc[0].copy()
        result["memorysize"] = 0
        result.name = column

        if result["distinct_count"] <= 1:
            result = result.append(describe_constant_1d(df, column))
        elif column_type in {"tinyint", "smallint", "int", "bigint"}:
            result = result.append(describe_integer_1d(df, column, result, nrows))
        elif column_type in {"float", "double", "decimal"}:
            result = result.append(describe_float_1d(df, column, result, nrows))
        elif column_type in {"date", "timestamp"}:
            result = result.append(describe_date_1d(df, column))
        elif result["is_unique"] == True:
            result = result.append(describe_unique_1d(df, column))
        else:
            result = result.append(describe_categorical_1d(df, column))
            if result["n_missing"] > 0:
                result["distinct_count"] = result["distinct_count"] + 1

        if (result["count"] > result["distinct_count"] > 1):
            try:
                result["mode"] = result["top"]
            except KeyError:
                result["mode"] = 0
        else:
            try:
                result["mode"] = result["value_counts"].index[0]
            except KeyError:
                result["mode"] = 0
            except IndexError:
                result["mode"] = "MISSING"

        if lookup_config:
            lookup_object = lookup_config['object']
            col_name_in_db = lookup_config['col_name_in_db'] if 'col_name_in_db' in lookup_config else None
            try:
                matched, unmatched = lookup_object.lookup(df.select(column), col_name_in_db)
                result['lookedup_values'] = str(matched.count()) + "/" + str(df.select(column).count())
            except:
                result['lookedup_values'] = 'FAILED'
        else:
            result['lookedup_values'] = ''

        return result


    ldesc = {}
    for colum in df.columns:
        if colum in config:
            if 'lookup' in config[colum]:
                lookup_config = config[colum]['lookup']
                desc = describe_1d(df, colum, table_stats["n"], lookup_config=lookup_config)
            else:
                desc = describe_1d(df, colum, table_stats["n"])
        else:
            desc = describe_1d(df, colum, table_stats["n"])
        ldesc.update({colum: desc})

    if corr_reject is not None:
        computable_corrs = [colum for colum in ldesc if ldesc[colum]["type"] in {"NUM"}]

        if len(computable_corrs) > 0:
            corr = corr_matrix(df, columns=computable_corrs)
            for x, corr_x in corr.iterrows():
                for y, corr in corr_x.iteritems():
                    if x == y:
                        break

    variable_stats = pd.DataFrame(ldesc)

    table_stats["nvar"] = len(df.columns)
    table_stats["total_missing"] = float(variable_stats.loc["n_missing"].sum()) / (table_stats["n"] * table_stats["nvar"])
    memsize = 0
    table_stats['memsize'] = fmt_bytesize(memsize)
    table_stats['recordsize'] = fmt_bytesize(memsize / table_stats['n'])
    table_stats.update({k: 0 for k in ("NUM", "DATE", "CONST", "CAT", "UNIQUE", "CORR")})
    table_stats.update(dict(variable_stats.loc['type'].value_counts()))
    table_stats['REJECTED'] = table_stats['CONST'] + table_stats['CORR']

    freq_dict = {}
    for var in variable_stats:
        if "value_counts" not in variable_stats[var]:
            pass
        elif not(variable_stats[var]["value_counts"] is np.nan):
            freq_dict[var] = variable_stats[var]["value_counts"]
        else:
            pass
    try:
        variable_stats = variable_stats.drop("value_counts")
    except (ValueError, KeyError):
        pass

    return table_stats, variable_stats.T, freq_dict

import numpy as np
from pyspark.sql.functions import abs as absou

SKEWNESS_CUTOFF = 20
DEFAULT_FLOAT_FORMATTER = u'spark_df_profiling.__default_float_formatter'


def gradient_format(value, limit1, limit2, c1, c2):
    def LerpColour(c1,c2,t):
        return (int(c1[0]+(c2[0]-c1[0])*t),int(c1[1]+(c2[1]-c1[1])*t),int(c1[2]+(c2[2]-c1[2])*t))
    c = LerpColour(c1, c2, (value-limit1)/(limit2-limit1))
    return fmt_color(value,"rgb{}".format(str(c)))


def fmt_color(text, color):
    return(u'{text}'.format(color=color,text=str(text)))


def fmt_class(text, cls):
    return(u'{text}'.format(cls=cls,text=str(text)))


def fmt_bytesize(num, suffix='B'):
    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
        if num < 0:
            num = num*-1
            if num < 1024.0:
                return "%3.1f %s%s" % (num, unit, suffix)
            num /= 1024.0
    return "%.1f %s%s" % (num, 'Yi', suffix)


def fmt_percent(v):
    return  "{:2.1f}%".format(v*100)

def fmt_varname(v):
    return u'{0}'.format(v)


value_formatters={
        u'freq': (lambda v: gradient_format(v, 0, 62000, (30, 198, 244), (99, 200, 72))),
        u'p_missing': fmt_percent,
        u'p_infinite': fmt_percent,
        u'p_unique': fmt_percent,
        u'p_zeros': fmt_percent,
        u'memorysize': fmt_bytesize,
        u'total_missing': fmt_percent,
        DEFAULT_FLOAT_FORMATTER: lambda v: str(float('{:.5g}'.format(v))).rstrip('0').rstrip('.'),
        u'correlation_var': lambda v: fmt_varname(v),
        u'unparsed_json_types': lambda v: ', '.join([s.__name__ for s in v])
        }

def fmt_row_severity(v):
    if np.isnan(v) or v<= 0.01:
        return "ignore"
    else:
        return "alert"

def fmt_skewness(v):
    if not np.isnan(v) and (v<-SKEWNESS_CUTOFF or v> SKEWNESS_CUTOFF):
        return "alert"
    else:
        return ""

row_formatters={
    u'p_zeros': fmt_row_severity,
    u'p_missing': fmt_row_severity,
    u'p_infinite': fmt_row_severity,
    u'n_duplicates': fmt_row_severity,
    u'skewness': fmt_skewness,
}

run(["/bin/bash", "/etc/config/v3io/v3io-spark-operator.sh"])

def describe_spark(context: MLClientCtx, 
                   dataset: DataItem, 
                   artifact_path,
                   bins: int=30,
                   describe_extended: bool=True):
    
    location = dataset.local()
    
    spark = SparkSession.builder.appName("Spark job").getOrCreate()
    
    df = spark.read.csv(location, header=True, inferSchema= True)

    kwargs = []
    
    float_cols = [item[0] for item in df.dtypes if item[1].startswith('float') or item[1].startswith('double')]
    
    if describe_extended == True:
        
        table, variables, freq = describe(df, bins, float_cols, kwargs)

        tbl_1 = variables.reset_index()

        if len(freq) != 0:
            tbl_2 = pd.DataFrame.from_dict(freq, orient = "index").sort_index().stack().reset_index()
            tbl_2.columns = ['col', 'key', 'val']
            tbl_2['Merged'] = [{key: val} for key, val in zip(tbl_2.key, tbl_2.val)]
            tbl_2 = tbl_2.groupby('col', as_index=False).agg(lambda x: tuple(x))[['col','Merged']]

            summary = pd.merge(tbl_1, tbl_2, how='left', left_on='index', right_on='col')

        else:
            summary = tbl_1

        context.log_dataset("summary_stats", 
                            df=summary,
                            format="csv", index=False,
                            artifact_path=context.artifact_subpath('data'))

        context.log_results(table)
    
    else:
        tbl_1 = df.describe().toPandas()
        
        summary = tbl_1.T
        
        context.log_dataset("summary_stats", 
                            df=summary,
                            format="csv", index=False,
                            artifact_path=context.artifact_subpath('data'))
    
    spark.stop()