Batch Inference V2#

A function for inferring given input through a given model while producing a Result Set and performing Data Drift Analysis.

In this notebook we will go over the function’s docs and outputs and see an end-to-end example of running it.

  1. Documentation

  2. Results Prediction

  3. Data Drift Analysis

  4. End-to-end Demo

1. Documentation#

Perform a prediction on a given dataset with the given model. Can perform drift analysis between the sample set statistics stored in the model to the current input data. The drift rule is the value per-feature mean of the TVD and Hellinger scores according to the thresholds configures here. When performing drift analysis, this function either creates or update an existing model endpoint record (depends on the provided endpoint_id).

At the moment, this function is supported for mlrun>=1.5.0 versions.

1.1. Parameters:#

1.1.1 Batch Infer Parameters:#

  • context: mlrun.MLClientCtx

    An MLRun context.

  • dataset: Union[mlrun.DataItem, list, dict, pd.DataFrame, pd.Series, np.ndarray]

    The dataset to infer through the model.

    • Can be passed in inputs as a dataset artifact / Feature vector URI.

    • Or, in parameters as a list, dictionary or numpy array.

  • model_path: Union[str, mlrun.DataItem]

    Model store uri (should start with store://). Provided as an input (DataItem). To generate a valid model store URI, please log the model before running this function. If endpoint_id of existing model endpoint is provided, make sure that it has a similar model store path, otherwise the drift analysis won’t be triggered.

  • drop_columns: Union[str, List[str], int, List[int]] = None

    A string / integer or a list of strings / integers that represent the column names / indices to drop. When the dataset is a list or a numpy array this parameter must be represented by integers.

  • label_columns: Union[str, List[str]] = None

    The target label(s) of the column(s) in the dataset. These names will be used as the column names for the predictions. The label column can be accessed from the model object, or the feature vector provided if available. The default name is "predicted_label_i" for the i column.

  • feature_columns: Union[str, List[str]] = None

    List of feature columns that will be used to build the dataframe when dataset is from type list or numpy array.

  • log_result_set: str = True

    Whether to log the result set - a DataFrame of the given inputs concatenated with the predictions. Defaulted to True.

  • result_set_name: str = "prediction"

    The db key to set name of the prediction result and the filename. Defaulted to "prediction".

  • batch_id: str = None

    The ID of the given batch (inference dataset). If None, it will be generated. Will be logged as a result of the run.

  • artifacts_tag: str = ""

    Tag to use for all the artifacts resulted from the function. Defaulted to no tag.

1.1.2 Drift Analysis Parameters:#

  • perform_drift_analysis: bool = None

    Whether to perform drift analysis between the sample set of the model object to the dataset given. By default, None, which means it will perform drift analysis if the model already has feature stats that are considered as a reference sample set. Performing drift analysis on a new endpoint id will generate a new model endpoint record. Please note that in order to trigger the drift analysis job, you need to set trigger_monitoring_job=True. Otherwise, the drift analysis will be triggered only as part the scheduled monitoring job (if exist in the current project) or if triggered manually by the user.

  • trigger_monitoring_job: bool = False

    Whether to trigger the batch drift analysis after the infer job.

  • batch_image_job: str = mlrun/mlrun

    The image that will be used for the monitoring batch job analysis. By default, the image is mlrun/mlrun

  • endpoint_id: str = ""

    Model endpoint unique ID. If perform_drift_analysis was set, the endpoint_id will be used either to perform the analysis on existing model endpoint or to generate a new model endpoint record.

1.1.3 New Model Endpoint Parameters:#

  • model_endpoint_name: str = "batch-infer"

    If a new model endpoint is generated, the model name will be presented under this endpoint.

  • model_endpoint_drift_threshold: float = 0.7

    The threshold of which to mark drifts. Defaulted to 0.7.

  • model_endpoint_possible_drift_threshold: float = 0.5

    The threshold of which to mark possible drifts. Defaulted to 0.5.

  • model_endpoint_sample_set: Union[mlrun.DataItem, list, dict, pd.DataFrame, pd.Series, np.ndarray] = None

    A sample dataset to give to compare the inputs in the drift analysis. Can be provided as an input or as a parameter. The default chosen sample set will always be the one who is set in the model artifact itself.

1.2. Outputs#

The outputs are split to two actions the functions can perform:

  • Results Prediction - Will log:

    • A dataset artifact named by the result_set_name parameter.

    • A str result named "batch_id" of the given / generated batch ID.

  • Data Drift Analysis - Will log:

    • A plotly artifact named "data_drift_table" with a visualization of the drifts results and histograms.

    • A json artifact named "features_drift_results" with all the features metric values.

    • A bool result named "drift_status" of the overall drift status (True if there was a drift and False otherwise).

    • A float result named "drift_score" of the overall drift metric score.

For more details, see the next chapters.

2. Results Prediction#

The result set is a concatenated dataset of the inputs ($X$) provided and the predictions ($Y$) yielded by the model, so it will be $X | Y$.

For example, if the dataset given as inputs was:

x1

x2

x3

x4

x5

And the outputs yielded by the model’s prediction was:

y1

y2

Then the result set will be:

x1

x2

x3

x4

x5

y1

y2

In case the parameter log_result_set is True, the outputs of the results prediction will be:

  • The result set as described above.

  • The batch ID result - batch_id: str - a hashing result that is given by the user or generated randomly in case it was not provided to represent the batch that was being inferred.

    {
        "batch_id": "884a0cb00d8ae16d132dd8259aac29aa78f50a9245d0e4bd58cfbf77",
    }
    

3. Data Drift Analysis#

The data drift analysis is done per feature using two distance measure metrics for probability distributions.

Let us mark our sample set as $S$ and our inputs as $I$. We will look at one feature $x$ out of $n$ features. Assuming the histograms of feature $x$ is split into 20 bins: $b_1,b_2,…,b_{20}$, we will match the feature $x$ histogram of the inputs $I$ ($x_I$) into the same bins (meaning to $x_S$) and compare their distributions using:

  • Total Variance Distance: $TVD(x_S,x_I) = \frac{1}{2}\sum_{b_1}^{b_{20}} {|x_S - x_I|}$

  • Hellinger Distance: $H(x_S,x_I) = \sqrt{1-{\sum_{b_1}^{b_{20}}\sqrt{x_S \cdot x_I}}}$

Our rule then is calculating for each $x\in S: \frac{H(x_S,x_I)+TVD(x_S,x_I)}{2} < $ given thresholds.

The outputs of the analysis will be:

  • Drift table plot - The results are presented in a plotly table artifact named "drift_table_plot" that shows each feature’s statistics and its TVD, Hellinger and KLD (Kullback–Leibler divergence) results as follows:

Count

Mean

Std

Min

Max

Tvd

Hellinger

Kld

Histograms

Sample

Input

Sample

Input

Sample

Input

Sample

Input

Sample

Input

x1

x2

x3

  • Features drift results - A rule metric per feature dictionary is saved in a json file named "features_drift_results" where each key is a feature and its value is the feature’s metric value: Dict[str, float]

    {
        "x1": 0.12,
        "x2": 0.345,
        "x3": 0.00678,
        ...
    }
    
  • In addition, two results are being added to summarize the drift analysis:

    • drift_status: bool - A boolean value indicating whether a drift was found.

    • drift_metric: float - The mean of all the features drift metric value (the rule above): for $n$ features and metric rule $M(x_S,x_I)=\frac{H(x_S,x_I)+TVD(x_S,x_I)}{2}$, drift_metric $=\frac{1}{n}\sum_{x\in S}M(x_S,x_I)$

    {
        "drift_status": True,
        "drift_metric": 0.81234
    }
    

4. End-to-end Demo#

We will see an end-to-end example that follows the steps below:

  1. Generate data.

  2. Train a model.

  3. Infer data through the model using batch_predict and review the outputs.

4.1. Code review#

We are using a very simple example of training a decision tree on a binary classification problem. For that we wrote two functions:

  • generate_data - Generate a binary classification data. The data will be split into a training set and data for prediction. The data for prediction will be drifted in half of its features to showcase the plot later on.

  • train - Train a decision tree classifier on a given data.

import mlrun

# Create MLRun project
project_name = "batch-infer-demo"
project = mlrun.get_or_create_project(project_name, context="./")
> 2023-08-29 11:13:44,649 [warning] Failed resolving version info. Ignoring and using defaults
> 2023-08-29 11:13:46,598 [warning] Server or client version is unstable. Assuming compatible: {'server_version': '0.0.0+image-test', 'client_version': '0.0.0+unstable'}
> 2023-08-29 11:13:46,667 [info] Loading project from path: {'project_name': 'batch-infer-demo', 'path': './'}
> 2023-08-29 11:14:02,192 [info] Project loaded successfully: {'project_name': 'batch-infer-demo', 'path': './', 'stored_in_db': True}
# mlrun: start-code
# upload environment variables from env file if exists
import os

# Specify path
path = "/tmp/examples_ci.env"

if os.path.exists(path):
    env_dict = mlrun.set_env_from_file(path, return_dict=True)
import numpy as np
import pandas as pd

from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier

from mlrun.frameworks.sklearn import apply_mlrun


def generate_data(n_samples: int = 5000, n_features: int = 20):
    # Generate a classification data:
    x, y = make_classification(
        n_samples=n_samples, n_features=n_features, n_classes=2
    )

    # Split the data into a training set and a prediction set:
    x_train, x_prediction = x[: n_samples // 2], x[n_samples // 2 :]
    y_train = y[: n_samples // 2]
    
    # Randomly drift some features:
    x_prediction += (
        np.random.uniform(low=2, high=4, size=x_train.shape) * 
        np.random.randint(low=0, high=2, size=x_train.shape[1], dtype=int)
    )
    
    # Initialize dataframes:
    features = [f"feature_{i}" for i in range(n_features)]
    training_set = pd.DataFrame(data=x_train, columns=features)
    training_set.insert(
        loc=n_features, column="label", value=y_train, allow_duplicates=True
    )
    prediction_set = pd.DataFrame(data=x_prediction, columns=features)

    return training_set, prediction_set


def train(training_set: pd.DataFrame):
    # Get the data into x, y:
    labels = pd.DataFrame(training_set["label"])
    training_set.drop(columns=["label"], inplace=True)

    # Initialize a model:
    model = DecisionTreeClassifier()

    # Apply MLRun:
    apply_mlrun(model=model, model_name="model")

    # Train:
    model.fit(training_set, labels)
# mlrun: end-code

4.2. Run the Example with MLRun#

First, we will prepare our MLRun functions:

  1. We will use mlrun.code_to_function to turn this demo notebook into an MLRun function we can run.

  2. We will use mlrun.import_function to import the batch_predict function .

# Create an MLRun function to run the notebook:
demo_function = mlrun.code_to_function(name="batch-inference-demo", kind="job")

# Import the `batch_inference_v2` function from the functions hub:
batch_inference_function = mlrun.import_function('hub://batch_inference_v2')
# you can import the function from the current directory as well: 
# batch_inference_function = mlrun.import_function("function.yaml")
> 2023-08-29 11:14:04,182 [warning] Failed to add git metadata, ignore if path is not part of a git repo.: {'path': './', 'error': '/User/EYAL'}

Now, we will follow the demo steps as discussed above:

# 1. Generate data:
generate_data_run = demo_function.run(
    handler="generate_data",
    returns=["training_set : dataset", "prediction_set : dataset"],
)

# 2. Train a model:
train_run = demo_function.run(
    handler="train",
    inputs={"training_set": generate_data_run.outputs["training_set"]},
)

# 3. Perform batch prediction:
batch_inference_run = batch_inference_function.run(
    handler="infer",
    inputs={"dataset": generate_data_run.outputs["prediction_set"]},
    params={
        "model_path": train_run.outputs["model"],
        "label_columns": "label",
        "trigger_monitoring_job": True,
        "perform_drift_analysis": True,
        "model_endpoint_drift_threshold": 0.2,
        "model_endpoint_possible_drift_threshold": 0.1,
    },
)
> 2023-08-29 11:14:42,198 [warning] artifact/output path is not defined or is local and relative, artifacts will not be visible in the UI: {'output_path': './'}
> 2023-08-29 11:14:42,198 [info] Storing function: {'name': 'batch-inference-demo-generate-data', 'uid': 'd04e9f978132472695774f01b2becb6c', 'db': None}
project uid iter start state name labels inputs parameters results artifacts
batch-infer-demo 0 Aug 29 11:14:42 completed batch-inference-demo-generate-data
v3io_user=iguazio
kind=
owner=iguazio
host=jupyter-69ff7bc987-9fmj4
training_set
prediction_set

> to track results use the .show() or .logs() methods or click here to open in UI
> 2023-08-29 11:14:44,943 [info] Run execution finished: {'status': 'completed', 'name': 'batch-inference-demo-generate-data'}
> 2023-08-29 11:14:44,945 [warning] artifact/output path is not defined or is local and relative, artifacts will not be visible in the UI: {'output_path': './'}
> 2023-08-29 11:14:44,946 [info] Storing function: {'name': 'batch-inference-demo-train', 'uid': 'c0a23b01a8ac4b36b78c6066780df84d', 'db': None}
project uid iter start state name labels inputs parameters results artifacts
batch-infer-demo 0 Aug 29 11:14:45 completed batch-inference-demo-train
v3io_user=iguazio
kind=
owner=iguazio
host=jupyter-69ff7bc987-9fmj4
training_set
model

> to track results use the .show() or .logs() methods or click here to open in UI
> 2023-08-29 11:14:45,967 [info] Run execution finished: {'status': 'completed', 'name': 'batch-inference-demo-train'}
> 2023-08-29 11:14:45,971 [warning] artifact/output path is not defined or is local and relative, artifacts will not be visible in the UI: {'output_path': './'}
> 2023-08-29 11:14:45,971 [info] Storing function: {'name': 'batch-inference-demo-infer', 'uid': 'b4ff7a15a7594058ba3e5900546c8a4d', 'db': None}
> 2023-08-29 11:14:46,230 [info] Loading model...
> 2023-08-29 11:14:46,255 [info] Loading data...
> 2023-08-29 11:14:46,264 [info] Calculating prediction...
> 2023-08-29 11:14:46,268 [info] Logging result set (x | prediction)...
> 2023-08-29 11:14:46,560 [info] Performing drift analysis...
> 2023-08-29 11:14:48,232 [info] Storing function: {'name': 'model-monitoring-batch', 'uid': '54146baa72334a6aab0dc9d6ed3294b0', 'db': 'http://mlrun-api:8080'}
> 2023-08-29 11:14:48,480 [info] Job is running in the background, pod: model-monitoring-batch-klkxx
> 2023-08-29 11:15:03,762 [warning] Server or client version is unstable. Assuming compatible: {'server_version': '0.0.0+image-test', 'client_version': '0.0.0+image-test'}
> 2023-08-29 11:15:03,907 [info] Initializing BatchProcessor: {'project': 'batch-infer-demo'}
This will be deprecated in 1.3.0, and will be removed in 1.5.0
divide by zero encountered in log
> 2023-08-29 11:15:04,332 [info] Drift result: {'drift_result': defaultdict(<class 'dict'>, {'feature_5': {'tvd': 0.0434, 'hellinger': 0.05100089784945347, 'kld': 0.014494998206659872}, 'tvd_sum': 8.5102, 'tvd_mean': 0.405247619047619, 'hellinger_sum': 10.300052681268772, 'hellinger_mean': 0.49047869910803676, 'kld_sum': 62.6616191196174, 'kld_mean': 2.9838866247436857, 'feature_2': {'tvd': 0.02459999999999999, 'hellinger': 0.030899816438305312, 'kld': 0.006057604919573473}, 'feature_19': {'tvd': 0.7218000000000002, 'hellinger': 0.7922489156563376, 'kld': 5.537308189437186}, 'label': {'tvd': 0.24960000000000002, 'hellinger': 0.18384462140440389, 'kld': 0.27238393060366317}, 'feature_17': {'tvd': 0.647, 'hellinger': 0.798114529629571, 'kld': 4.721655573957096}, 'feature_16': {'tvd': 0.5, 'hellinger': 1.0, 'kld': 6.647696602384231}, 'feature_4': {'tvd': 0.652, 'hellinger': 0.7890180560465803, 'kld': 4.724331880103662}, 'feature_10': {'tvd': 0.6936, 'hellinger': 0.798649085236416, 'kld': 5.143133985824956}, 'feature_18': {'tvd': 0.029800000000000004, 'hellinger': 0.04016200720162751, 'kld': 0.010300448066673708}, 'feature_1': {'tvd': 0.6028, 'hellinger': 0.8034907163435347, 'kld': 4.375985111186623}, 'feature_0': {'tvd': 0.036800000000000006, 'hellinger': 0.038468796584326524, 'kld': 0.01082520912974786}, 'feature_12': {'tvd': 0.6426000000000001, 'hellinger': 0.7881935479078817, 'kld': 4.434165973058529}, 'feature_15': {'tvd': 0.05080000000000001, 'hellinger': 0.05278865255294484, 'kld': 0.02081902282461378}, 'feature_7': {'tvd': 0.6878, 'hellinger': 0.7901987054573444, 'kld': 5.043243793527386}, 'feature_3': {'tvd': 0.035600000000000014, 'hellinger': 0.04663589029042092, 'kld': 0.014301909461766606}, 'feature_14': {'tvd': 0.04439999999999999, 'hellinger': 0.06309191719567614, 'kld': 0.021311623053023344}, 'feature_9': {'tvd': 0.0424, 'hellinger': 0.05342675000294549, 'kld': 0.020325699772498724}, 'feature_8': {'tvd': 0.6944, 'hellinger': 0.7972137555552585, 'kld': 5.385980028089341}, 'feature_11': {'tvd': 0.7078, 'hellinger': 0.7934880907740085, 'kld': 5.3956608599070055}, 'feature_13': {'tvd': 0.6986, 'hellinger': 0.8067455940744085, 'kld': 5.650553956005931}, 'feature_6': {'tvd': 0.7043999999999999, 'hellinger': 0.7823723350673265, 'kld': 5.211082720097231}})}
> 2023-08-29 11:15:04,333 [info] Drift status: {'endpoint_id': '80daa6376087b987a2e8d97b4850772fee6ff503', 'drift_status': 'DRIFT_DETECTED', 'drift_measure': 0.4478631590778279}
> 2023-08-29 11:15:04,345 [warning] Could not write drift measures to TSDB: {'err': Error("cannot call API - write error: backend Write failed: failed to create adapter: No TSDB schema file found at 'v3io-webapi:8081/users/pipelines/batch-infer-demo/model-endpoints/events/'."), 'tsdb_path': 'pipelines/batch-infer-demo/model-endpoints/events/', 'endpoint': '80daa6376087b987a2e8d97b4850772fee6ff503'}
> 2023-08-29 11:15:04,345 [info] Done updating drift measures: {'endpoint_id': '80daa6376087b987a2e8d97b4850772fee6ff503'}
> 2023-08-29 11:15:04,451 [info] Run execution finished: {'status': 'completed', 'name': 'model-monitoring-batch'}
project uid iter start state name labels inputs parameters results artifacts
batch-infer-demo 0 Aug 29 11:15:03 completed model-monitoring-batch
v3io_user=iguazio
kind=job
owner=iguazio
mlrun/client_version=0.0.0+unstable
mlrun/client_python_version=3.9.16
host=model-monitoring-batch-klkxx
model_endpoints=['80daa6376087b987a2e8d97b4850772fee6ff503']
batch_intervals_dict={'minutes': 0, 'hours': 1, 'days': 0}

> to track results use the .show() or .logs() methods or click here to open in UI
> 2023-08-29 11:15:06,630 [info] Run execution finished: {'status': 'completed', 'name': 'model-monitoring-batch'}
/User/.pythonlibs/mlrun-base/lib/python3.9/site-packages/mlrun/model_monitoring/features_drift_table.py:359: RuntimeWarning:

invalid value encountered in true_divide
project uid iter start state name labels inputs parameters results artifacts
batch-infer-demo 0 Aug 29 11:14:46 completed batch-inference-demo-infer
v3io_user=iguazio
kind=
owner=iguazio
host=jupyter-69ff7bc987-9fmj4
dataset
model_path=store://artifacts/batch-infer-demo/model:c0a23b01a8ac4b36b78c6066780df84d
label_columns=label
trigger_monitoring_job=True
perform_drift_analysis=True
model_endpoint_drift_threshold=0.2
model_endpoint_possible_drift_threshold=0.1
batch_image_job=eyaligu/mlrun-api:image-test
batch_id=3d5f6aa8a2d63cc0e84ebd95a0bc0000979a0989bbf4c211651a4e2a
drift_status=True
drift_metric=0.4478631590778279
prediction
drift_table_plot
features_drift_results

> to track results use the .show() or .logs() methods or click here to open in UI
> 2023-08-29 11:15:08,835 [info] Run execution finished: {'status': 'completed', 'name': 'batch-inference-demo-infer'}

4.3. Review Outputs#

We will review the outputs as explained in the notebook above.

4.3.1. Results Prediction#

First we will showcase the Result Set. As we didn’t send any name, it’s default name will be "prediction":

batch_inference_run.artifact("prediction").as_df()
feature_0 feature_1 feature_2 feature_3 feature_4 feature_5 feature_6 feature_7 feature_8 feature_9 ... feature_11 feature_12 feature_13 feature_14 feature_15 feature_16 feature_17 feature_18 feature_19 label
0 -0.839322 1.333380 0.096353 -1.406629 2.400648 -1.290373 3.977797 3.313780 5.283433 -1.052645 ... 2.453055 3.461347 2.907938 -0.101736 0.525861 3.303174 2.662573 -0.156935 3.249469 0
1 -1.396684 2.160555 -2.079091 -0.202079 2.884480 2.034316 4.885637 4.221477 2.573911 0.180173 ... 4.962888 1.372881 2.562159 0.662939 -0.805594 3.533896 3.790616 -0.154599 4.930846 0
2 1.040872 5.255454 -0.147940 0.294196 0.597649 -1.235295 4.313004 5.238071 5.605235 0.012568 ... 2.174434 3.278299 3.462563 -0.409624 0.322454 4.029577 2.600332 -1.107307 2.887775 0
3 0.997923 2.140354 -0.010764 0.354853 1.915891 -1.041988 2.193550 3.467318 2.996250 0.242412 ... 3.236173 2.916559 4.634331 -0.348694 -2.413849 2.261911 1.594445 -0.301463 2.758082 0
4 0.201835 2.702231 0.350186 1.007252 3.745162 1.618354 5.198454 4.954427 4.045311 -0.155478 ... 3.502037 5.419818 0.721887 0.780405 -1.332553 2.386323 3.399030 0.805458 5.311518 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
2495 -1.612271 1.136322 0.439253 -1.661940 2.503822 -0.445082 2.862625 2.228701 3.714258 -1.834704 ... 3.293596 5.575856 4.126424 0.011100 -0.096880 3.031544 1.944827 -0.927567 4.226295 0
2496 0.266415 3.206542 -0.116758 0.445226 3.721457 0.407119 2.387406 1.687102 2.824435 1.165571 ... 3.934637 1.732800 2.574786 0.529199 -0.287021 3.898641 0.147049 1.544828 2.825104 0
2497 -0.326930 2.356003 0.174470 -0.050694 3.098816 0.469552 3.528227 1.894595 3.937233 -0.107356 ... 1.889425 3.963069 2.947138 -0.626888 -0.797704 3.755991 2.711294 1.180842 4.045224 0
2498 0.153820 3.246263 0.519227 0.920204 3.802444 1.531671 2.902433 3.815808 2.586090 -1.919927 ... 2.679366 2.824765 3.152592 -0.303761 1.058759 3.185221 3.995456 0.490862 2.338864 0
2499 -1.521514 3.773368 -0.685662 0.175765 3.599282 2.990228 1.918675 4.382931 5.108156 0.764052 ... 4.139320 1.933803 2.970427 -0.553020 0.483961 2.124600 2.598879 -0.444126 2.429726 0

2500 rows × 21 columns

4.3.2. Data Drift Analysis#

Second we will review the data drift table plot and the drift results:

batch_inference_run.artifact("drift_table_plot").show()
batch_inference_run.status.results
{'batch_id': '3d5f6aa8a2d63cc0e84ebd95a0bc0000979a0989bbf4c211651a4e2a',
 'drift_status': True,
 'drift_metric': 0.4478631590778279}