# Model Deployment Pipeline

<a id="model-deployment-with-streaming"></a>


Deploy a model with streaming information. The demo covers the use case of 1<sup>st</sup>-day churn.

The importance of 1<sup>st</sup>-day churn prediction:
- In some segments of the gaming industry, the average 1st day churn is as high as 70%.
- Acquiring new customers is 5x&ndash;25x more expensive than retaining existing ones.
- Reducing churn by just 5% can boost profitability by 75%.
- Improving retention has a 2x&ndash;4x greater impact on growth than acquisition.
- The probability of selling to an existing customer is 60%&ndash;70%, but only 5%&ndash;20% for a prospect.
- Churn rate also informs metrics like customer lifetime value (LTV).

This demo is comprised of several steps:

![Model deployment Pipeline Real-time operational Pipeline](assets/model-deployment-pipeline.png)

While this demo covers the use case of 1<sup>st</sup>-day churn, it is easy to replace the data, related features and training model and reuse the same workflow for different business cases.

These steps are covered by the following demo:

- [**1. Data generator**](functions/data-generator.ipynb) â€” Generates events for the training and serving and Create an enrichment table (lookup values). 
- [**2. Event handler**](functions/event-handler.ipynb) - Receive data from the input. This is a common input stream for all the data. This way, one can easily replace the event source data (in this case we have a data generator) without affecting the rest of this flow. It also store all incoming data to parquet files.
- [**3. Stream to features**](functions/stream-to-features.ipynb) - Enrich the stream using the enrichment table and Update aggregation features using the incoming event handler.
- **4. Optional model training steps -**
 - [**4.1 Get Data Snapshot**](https://github.com/mlrun/functions/tree/master/describe) - Takes a snapshot of the feature table for training.
  - [**4.2 Describe the Dataset**](functions/get-data-snapshot.ipynb) - Runs common analysis on the datasets and produces plots suche as histogram, feature importance, corollation and more.
  - [**4.3 Training**](https://github.com/mlrun/functions/tree/master/sklearn_classifier) - Runing training with multiple classification models.
  - [**4.4 Testing**](https://github.com/mlrun/functions/tree/master/test_classifier) - Testing the best performing model.
- [**5. Serving**](https://github.com/mlrun/functions/tree/master/model_server) - Serve the model and process the data from the enriched stream and aggregation features.
- [**6. Inference logger**](functions/event-handler.ipynb) - We use the same event handler function from above but only its capability to store incoming data to parquet files.

This demo comes with a pre-trained model using the base features, enrichment data and derived features, calculated using the same generated data. You can retrain the model or train a new model by running the  **optional model training steps**. You will need to ensure enough data is collected via the streams to the data storage in order to train a new model.

## About this demo

### Input Data

The data generator ([data-generator.ipynb](functions/-generator.ipynb)) creates the following events: `new_registration`, `new_purchases`, `new_bet` and `new_win` with the following data:

| new_registration |   | new_purchases |   | new_bet    |   | new_win    |
|------------------|---|---------------|---|------------|---|------------|
| user_id          |   | user_id       |   | user_id    |   | user_id    |
| event_type       |   | event_type    |   | event_type |   | event_type |
| event_time       |   | event_time    |   | event_time |   | event_time |
| name             |   | amount        |   | bet_amount |   | win_amount |
| date_of_birth    |   |               |   |            |   |            |
| street_address   |   |               |   |            |   |            |
| city             |   |               |   |            |   |            |
| country          |   |               |   |            |   |            |
| postcode         |   |               |   |            |   |            |
| affiliate_url    |   |               |   |            |   |            |
| campaign         |   |               |   |            |   |            |

Furthermore, `new_registration` includes a `label` column to indicate whether or not the user has churned (1 for churned and 0 for not)

## Enrichment

The same data generator ([data-generator.ipynb](functions/-generator.ipynb)) also creates the enrichment table which contains a lookup of postcode and returns a socioeconomic index (`socioeconomic_idx`).

## Feature calculation

During the feature calculation ([stream-to-features.ipynb](functions/stream-to-features.ipynb)), enriches the events using the enrichment table and calculates sum, mean, count and variance for the 3 amount fields (`amount`, `bet_amount` and `win_amount` for `new_purchases`, `new_bet` and `new_win` respectively). This results with the following list of fields:

- purchase_sum
- purchase_mean
- purchase_count
- purchase_var
- bet_sum
- bet_mean
- bet_count
- bet_var
- win_sum
- win_mean
- win_count
- win_var

## Configure

The configuration below is shared across the notebooks. Change the values in this subsection if you would like different configuration settings.

In [1]:
%pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-0.17.1-py2.py3-none-any.whl (18 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-0.17.1
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


This demo requires access to the Iguazio multi-model data layer (V3IO). Set the environment variables in `env.txt`.
> **Note**: When running this demo from the Iguazio Data Science Platform, the variables are already configured and therefore you do not need
> to edit these variables.

In [2]:
from dotenv import load_dotenv

load_dotenv('env.txt')

True

### Project

Projects in the platform are used to package multiple functions, workflows, and artifacts. Set here the project base name.

In [3]:
project_base_name = "model-deployment-pipeline"

### Optional Training

In [4]:
run_training = False

### Data

All data in the platform is stored in user-defined data containers. This demo uses the predefined "users" container. For more information, see the platform's [data-containers](https://www.iguazio.com/docs/latest-release/data-layer/containers/) documentation.

In [5]:
container = 'users'

Data path where to store stream data and kv tables:

In [6]:
from os import getenv, path, getcwd

# set parameters and  environment variables
v3io_envs = {'V3IO_API': getenv('V3IO_API'),
             'V3IO_ACCESS_KEY': getenv('V3IO_ACCESS_KEY'),
             'V3IO_FRAMESD': getenv('V3IO_FRAMESD')}
v3io_username = getenv('V3IO_USERNAME')
data_path = path.join(v3io_username, 'examples',project_base_name, 'data')

Set up the different stream information

In [7]:
from urllib.parse import urljoin
from urllib.parse import urlparse
web_api = getenv('V3IO_API')
if not urlparse(web_api).scheme:
    web_api = 'http://' + webapi
web_api_users = urljoin(web_api, container)
stream_configs = {'generated-stream': {
                        'path': path.join(data_path, 'generated-stream'),
                        'shard_count': 8},
                  'incoming-events-stream': {
                        'path': path.join(data_path, 'incoming-events-stream'),
                        'shard_count': 8
                  },
                  'serving-stream': {
                        'path': path.join(data_path, 'serving-stream'),
                        'shard_count': 8
                  },
                  'inference-stream': {
                        'path': path.join(data_path, 'inference-stream'),
                        'shard_count': 8
                  }
                 }

When we stream data, we associate the records with a specific partition key to ensure that similar records are assigned to the same shard. For more information, see the [stream sharding and partitioning description](https://www.iguazio.com/docs/latest-release/data-layer/stream/#stream-sharding-and-partitioning).

In [8]:
partition_attr = "user_id"

Target path to store the raw data and the inference data as parquet files.
The parquet files will be written via file mount, hence we configure the path to start with '/User' which will be mounted to our home dir.

In [9]:
raw_parquet_target_path = path.join(data_path.replace(v3io_username, '/User'),  'events-pq')
inference_parquet_target_path = path.join(data_path.replace(v3io_username, '/User'),  'inference-pq')

Target path to store the enrichment table (a key-value table)

In [10]:
enrichment_table_path = path.join(data_path, 'enrichment-table')

Target path to store the calculated features

In [11]:
feature_table_path = path.join(data_path, 'feature-table')

The list of features

In [12]:
feature_list = ['socioeconomic_idx','purchase_sum','purchase_mean','purchase_count',
                'purchase_var','bet_sum','bet_mean','bet_count',
                'bet_var','win_sum','win_mean','win_count','win_var']

## Create V3IO Client

With the dataplane client you can manipulate data in the platform's multi-model data layer, including:
* Objects
* Key-values (NoSQL)
* Streams
* Containers

Under the hood, the client connects through the platform's web API (https://www.iguazio.com/docs/latest-release/data-layer/reference/web-apis/) and wraps each low level API with an interface. Calls are blocking, but you can use the batching interface to send multiple requests in parallel for greater performance. 

In [13]:
import v3io.dataplane
v3io_client = v3io.dataplane.Client(endpoint=web_api,
                                    access_key=getenv('V3IO_ACCESS_KEY'))

## Manage Streams

#### Delete all streams

Cleanup previous streams

In [14]:
for stream_name, stream_config in stream_configs.items():
    resp = v3io_client.stream.delete(container=container, stream_path=stream_config['path'], 
                                     raise_for_status=v3io.dataplane.RaiseForStatus.never)
    print(f'Delete Stream call for stream {stream_name} returned with status {resp.status_code}, and content: {resp.body.decode("utf-8")}')

Delete Stream call for stream generated-stream returned with status 204, and content: 
Delete Stream call for stream incoming-events-stream returned with status 204, and content: 
Delete Stream call for stream serving-stream returned with status 204, and content: 
Delete Stream call for stream inference-stream returned with status 204, and content: 


#### Create all streams

In [15]:
for stream_name, stream_config in stream_configs.items():
    print(stream_config['path'])
    resp = v3io_client.stream.create(container=container,
                                     stream_path=stream_config['path'],
                                     shard_count=stream_config['shard_count'],
                                     raise_for_status=v3io.dataplane.RaiseForStatus.never)
    print(f'Create Stream call for stream {stream_name} returned with status {resp.status_code}, and content: {resp.body.decode("utf-8")}')

admin/examples/model-deployment-pipeline/data/generated-stream
Create Stream call for stream generated-stream returned with status 204, and content: 
admin/examples/model-deployment-pipeline/data/incoming-events-stream
Create Stream call for stream incoming-events-stream returned with status 204, and content: 
admin/examples/model-deployment-pipeline/data/serving-stream
Create Stream call for stream serving-stream returned with status 204, and content: 
admin/examples/model-deployment-pipeline/data/inference-stream
Create Stream call for stream inference-stream returned with status 204, and content: 


## Set-up MLRun Project

[MLRun](https://docs.mlrun.org) is a generic and convenient mechanism for data scientists and software developers to describe and run tasks related to machine learning in various, scalable runtime environments and ML pipelines while automatically tracking executed code, metadata, inputs, and outputs.
MLRun integrates with the Nuclio serverless framework and with the Kubeflow Pipelines framework for running ML pipelines.
The demo uses MLRun to create a project, run Nuclio serverless functions, as well as run the model training.
Before running your code, you need to set some MLRun configurations:

Projects are visible in the MLRun dashboard only after they're saved to the MLRun database, which happens whenever you run code for a project.

The following code creates a project using the `project_base_name`, concatenated with your current running username, and sets the project directory to a **conf** directory in the current demo directory (**./conf**).

>   You can easily change the default project name for this tutorial by changing the definition of the `project_base_name` variable, defined in the beginning of the notebook.
> - Don't include in your project proprietary information that you don't want to expose to other users.
>   Note that while projects are a useful tool, you can easily develop and run code in the platform without using projects.

- <a id="gs-mlrun-config-artifcats-path"></a>**Artifacts path** &mdash; the location for storing versioned data artifacts (such as files, objects, data sets, and models) that are produced or consumed by functions, runs, and workflows.
  The path can be defined either as a local directory path or as a URL (of the format `s3://*`, `v3io://*`, etc.).
  You can set the artifacts path either by defining an `MLRUN_ARTIFACT_PATH` environment variable (which applies globally throughout the current environment) or as part of the MLRun configuration.
 
  If the target directory doesn't exist, MLRun creates it.
  You can use the notation `{{run.uid}}` in the path to signify the current run ID.
  For pipelines, you can use the notation `{{workflow.uid}}` to signify the workflow ID.
  This allows you to create a unique artifacts directory for each executed job or workflow.

  After you run an MLRun job, the artifacts directory might contain one or more of the following directories:
 
  - **plots** &mdash; a directory for storing images, figures, and plotlines.
  - **models** &mdash; a directory for storing all trained models.
  - **data** &mdash; a directory for storing any other type of data artifact, such as data sets.

In [16]:
import mlrun
project_name, artifact_path = mlrun.set_environment(project=project_base_name,
                                                    user_project=True)

project_path = path.abspath('conf')
project = mlrun.new_project(project_base_name,
                            context=project_path,                           
                            user_project=True,
                            init_git=True)

print(f'Project path: {project_path}\nProject name: {project.name}')

Project path: /home/jovyan/data/demos/model-deployment-pipeline/conf
Project name: model-deployment-pipeline-admin


## Set project's functions

#### Data Generator

In [17]:
import nuclio

data_generator = mlrun.code_to_function(name='data-generator', handler='main', kind='job', filename='functions/data-generator.ipynb')
project.set_function(data_generator)

dg_params = {'container': container,
             'output_stream_path': stream_configs['generated-stream']['path'],
             'enrichment_table_path': enrichment_table_path,
             'num_users_group1': 280,
             'num_users_group2': 120,
             'events_per_user': 200}

project.func('data-generator').set_envs(v3io_envs)

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fdd43d63820>

In [18]:
#Build the image
project.func('data-generator').deploy()

> 2021-05-20 19:15:39,298 [info] starting remote build, image: .gshaham/func-model-deployment-pipeline-admin-data-generator:latest
E0520 19:15:41.148163       1 aws_credentials.go:77] while getting AWS credentials NoCredentialProviders: no valid providers in chain. Deprecated.
	For verbose messaging see aws.Config.CredentialsChainVerboseErrors
[36mINFO[0m[0003] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0005] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0008] Built cross stage deps: map[]                
[36mINFO[0m[0008] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0010] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0013] Executing 0 build triggers                   
[36mINFO[0m[0013] Unpacking rootfs as cmd RUN python -m pip install faker requires it. 
[36mINFO[0m[0090] RUN python -m pip install faker              
[36mINFO[0m[0090] Taking snapshot of full filesystem...        
[36mINFO[0m[0093] cmd:

True

In [20]:
#Run the job
project.func('data-generator').run(params=dg_params, artifact_path=project.artifact_path)

> 2021-05-20 19:52:15,416 [info] starting run data-generator-main uid=460e4dcc1d8849fba33fa0e7e8de415a DB=http://mlrun-api:8080
> 2021-05-20 19:52:15,504 [info] Job is running in the background, pod: data-generator-main-ft7jx
> 2021-05-20 20:31:51,587 [info] Created enrichment table with 89999 items
> 2021-05-20 20:32:48,721 [info] Records sent 80400
> 2021-05-20 20:32:48,757 [info] All data streamed successfully.
> 2021-05-20 20:32:48,869 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...e8de415a,0,May 20 19:52:22,completed,data-generator-main,v3io_user=adminkind=jobowner=adminhost=data-generator-main-ft7jx,,container=usersoutput_stream_path=admin/examples/model-deployment-pipeline/data/generated-streamenrichment_table_path=admin/examples/model-deployment-pipeline/data/enrichment-tablenum_users_group1=280num_users_group2=120events_per_user=200,,


to track results use .show() or .logs() or in CLI: 
!mlrun get run 460e4dcc1d8849fba33fa0e7e8de415a --project model-deployment-pipeline-admin , !mlrun logs 460e4dcc1d8849fba33fa0e7e8de415a --project model-deployment-pipeline-admin
> 2021-05-20 20:32:52,411 [info] run executed, status=completed


<mlrun.model.RunObject at 0x7fdd59c84fa0>

#### Event Handler

In [21]:
event_handler = mlrun.code_to_function(name='event-handler', handler='handler', kind='nuclio', filename='functions/event-handler.ipynb')
project.set_function(event_handler)

eh_envs = {'PARQUET_SINK_FLAG': 'true',
           'STREAM_SINK_FLAG': 'true',
           'PARQUET_TARGET_PATH' : raw_parquet_target_path,
           'PARQUET_BATCH_SIZE': 8192,
           'TS_KEY': 'event_time',
           'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f',
           'CONTAINER': container,
           'OUTPUT_STREAM_PATH': stream_configs['incoming-events-stream']['path'],
           'PARTITION_ATTR': partition_attr}

project.func('event-handler').set_envs({**v3io_envs, **eh_envs})
project.func('event-handler').apply(mlrun.platforms.auto_mount())

generated_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['generated-stream']['path']]) + '@eh'
project.func('event-handler').add_trigger('serving_stream',
                                           nuclio.triggers.V3IOStreamTrigger(url=generated_stream,
                                                                             maxWorkers=stream_configs['generated-stream']['shard_count']+2,
                                                                             seekTo='earliest'))

project.func('event-handler').spec.replicas=1

In [22]:
project.func('event-handler').deploy()

> 2021-05-21 00:58:25,795 [info] Starting remote function deploy
2021-05-21 00:58:25  (info) Deploying function
2021-05-21 00:58:25  (info) Building
2021-05-21 00:58:25  (info) Staging files and preparing base images
2021-05-21 00:58:25  (info) Building processor image
2021-05-21 00:59:31  (info) Build complete
2021-05-21 00:59:45  (info) Function deploy complete
> 2021-05-21 00:59:46,280 [info] function deployed, address=192.168.65.4:31072


'http://192.168.65.4:31072'

#### Stream to Features

In [23]:
stream_to_features = mlrun.code_to_function(name='stream-to-features', handler='handler', kind='nuclio', filename='functions/stream-to-features.ipynb')
project.set_function(stream_to_features)

stf_envs = {'FEATURE_TABLE_PATH': feature_table_path,
            'SERVING_EVENTS': ",".join(['bet','win']),
            'FEATURE_LIST': ",".join(feature_list),
            'CONTAINER': container,
            'OUTPUT_STREAM_PATH': stream_configs['serving-stream']['path'],
            'PARTITION_ATTR': partition_attr,
            'ENRICHMENT_TABLE_PATH': enrichment_table_path,
            'ENRICHMENT_KEY':"postcode"}

project.func('stream-to-features').set_envs({**v3io_envs, **stf_envs})

incoming_events_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['incoming-events-stream']['path']]) + '@stf'
project.func('stream-to-features').add_trigger('serving_stream',
                                               nuclio.triggers.V3IOStreamTrigger(url=incoming_events_stream,
                                                                                 maxWorkers=stream_configs['incoming-events-stream']['shard_count']+2,
                                                                                 seekTo='earliest'))

project.func('stream-to-features').spec.readiness_timeout = 200
project.func('stream-to-features').spec.replicas=1
project.func('stream-to-features').deploy()

> 2021-05-21 01:07:19,798 [info] Starting remote function deploy
2021-05-21 01:07:19  (info) Deploying function
2021-05-21 01:07:19  (info) Building
2021-05-21 01:07:19  (info) Staging files and preparing base images
2021-05-21 01:07:19  (info) Building processor image
2021-05-21 01:07:49  (info) Build complete
2021-05-21 01:08:01  (info) Function deploy complete
> 2021-05-21 01:08:02,408 [info] function deployed, address=192.168.65.4:32174


'http://192.168.65.4:32174'

#### Get Data Snapshot (part of optional model training)

In [33]:
if run_training:
    get_data_snapshot = mlrun.code_to_function(name='get-data-snapshot', handler='snapshot_data', kind='job', filename='functions/get-data-snapshot.ipynb')
    project.set_function(get_data_snapshot)

    # set parameters and  environment variables
    gds_params = {'container': container, 
                  'table_path': feature_table_path, 
                  'columns': ['label']+feature_list, 
                  'format': 'csv'}

    project.func('get-data-snapshot').set_envs(v3io_envs)
    project.func('get-data-snapshot').apply(mlrun.platforms.auto_mount())
    project.func('get-data-snapshot').deploy()

> 2021-05-21 01:11:12,830 [info] starting remote build, image: .gshaham/func-model-deployment-pipeline-admin-get-data-snapshot:latest
E0521 01:11:15.037815       1 aws_credentials.go:77] while getting AWS credentials NoCredentialProviders: no valid providers in chain. Deprecated.
	For verbose messaging see aws.Config.CredentialsChainVerboseErrors
[36mINFO[0m[0003] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0005] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0008] Built cross stage deps: map[]                
[36mINFO[0m[0008] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0010] Retrieving image manifest mlrun/mlrun:0.6.4-rc3 
[36mINFO[0m[0013] Executing 0 build triggers                   
[36mINFO[0m[0013] Unpacking rootfs as cmd RUN pip install v3io-frames==0.8.* requires it. 
[36mINFO[0m[0099] RUN pip install v3io-frames==0.8.*           
[36mINFO[0m[0099] Taking snapshot of full filesystem...        
[36mINFO[0m[0105

In [34]:
if run_training:
    snapshot_data_run = project.func('get-data-snapshot').run(params=gds_params, artifact_path=project.artifact_path)

> 2021-05-21 01:13:14,657 [info] starting run get-data-snapshot-snapshot_data uid=664a06569151466bba7591b2c6cb058e DB=http://mlrun-api:8080
> 2021-05-21 01:13:14,851 [info] Job is running in the background, pod: get-data-snapshot-snapshot-data-gjdkj
> 2021-05-21 01:15:08,817 [info] Saving snapshot data set to /home/jovyan/data/data ...
> 2021-05-21 01:15:08,998 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...c6cb058e,0,May 21 01:13:25,completed,get-data-snapshot-snapshot_data,v3io_user=adminkind=jobowner=adminhost=get-data-snapshot-snapshot-data-gjdkj,,"container=userstable_path=admin/examples/model-deployment-pipeline/data/feature-tablecolumns=['label', 'socioeconomic_idx', 'purchase_sum', 'purchase_mean', 'purchase_count', 'purchase_var', 'bet_sum', 'bet_mean', 'bet_count', 'bet_var', 'win_sum', 'win_mean', 'win_count', 'win_var']format=csv",,snapshot_dataset


to track results use .show() or .logs() or in CLI: 
!mlrun get run 664a06569151466bba7591b2c6cb058e --project model-deployment-pipeline-admin , !mlrun logs 664a06569151466bba7591b2c6cb058e --project model-deployment-pipeline-admin
> 2021-05-21 01:15:14,547 [info] run executed, status=completed


#### Describe the Dataset (part of optional model training)
-------------------
You can review the plots under - artifacts/plots/

In [35]:
if run_training:
    project.set_function('hub://describe', 'describe')

    project.func('describe').apply(mlrun.platforms.auto_mount())
    describe_run = project.func('describe').run(params={'label_column': 'label'},
                                inputs={"table":
                                        snapshot_data_run.outputs['snapshot_dataset']},
                                artifact_path=project.artifact_path)

> 2021-05-21 01:15:15,006 [info] starting run describe-summarize uid=d9c87b3a5824444282efb429df459187 DB=http://mlrun-api:8080
> 2021-05-21 01:15:15,142 [info] Job is running in the background, pod: describe-summarize-rnff8
> 2021-05-21 01:24:02,980 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...df459187,0,May 21 01:22:59,completed,describe-summarize,v3io_user=adminkind=jobowner=adminhost=describe-summarize-rnff8,table,label_column=label,,histogramsviolinimbalanceimbalance-weights-veccorrelation-matrixcorrelation


to track results use .show() or .logs() or in CLI: 
!mlrun get run d9c87b3a5824444282efb429df459187 --project model-deployment-pipeline-admin , !mlrun logs d9c87b3a5824444282efb429df459187 --project model-deployment-pipeline-admin
> 2021-05-21 01:24:06,001 [info] run executed, status=completed


#### Training (part of optional model training)
---------------------
function's source and full docstrings can be found at https://github.com/mlrun/functions/tree/master/sklearn_classifier

In [36]:
if run_training:
    project.set_function('hub://sklearn_classifier', 'train')
    project.func('train').apply(mlrun.platforms.auto_mount())
    
    # Configure the models to train
    models = ["sklearn.ensemble.RandomForestClassifier", 
              "sklearn.linear_model.LogisticRegression",
              "sklearn.ensemble.AdaBoostClassifier"]
    
    # Create a training task
    train_task = mlrun.NewTask(name="train",
                               params={"sample": -1,
                                       "label_column": "label",
                                       "test_size": 0.10},
                               inputs={"dataset": snapshot_data_run.outputs['snapshot_dataset']})
    
    # Run the training task
    train_run = project.func('train').run(train_task.with_hyper_params({'model_pkg_class': models},
                                                                        selector='max.accuracy'),
                                                                        artifact_path=project.artifact_path)

> 2021-05-21 01:24:06,497 [info] starting run train uid=21df4c23caf8458ebc731f1677d16a5a DB=http://mlrun-api:8080
> 2021-05-21 01:24:06,597 [info] Job is running in the background, pod: train-njqbh
> 2021-05-21 01:24:19,011 [info] best iteration=1, used criteria max.accuracy
> 2021-05-21 01:24:19,181 [info] run executed, status=completed
lbfgs failed to converge (status=1):
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...77d16a5a,0,May 21 01:24:12,completed,train,v3io_user=adminkind=jobowner=admin,dataset,sample=-1label_column=labeltest_size=0.1,best_iteration=1accuracy=0.9292035398230089test-error=0.07079646017699115rocauc=0.9825428194993412brier_score=0.05947433628318583f1-score=0.9420289855072463precision_score=0.9420289855072463recall_score=0.9420289855072463,test_setprobability-calibrationconfusion-matrixfeature-importancesprecision-recall-binaryroc-binarymodeliteration_results


to track results use .show() or .logs() or in CLI: 
!mlrun get run 21df4c23caf8458ebc731f1677d16a5a --project model-deployment-pipeline-admin , !mlrun logs 21df4c23caf8458ebc731f1677d16a5a --project model-deployment-pipeline-admin
> 2021-05-21 01:24:25,838 [info] run executed, status=completed


In [37]:
if run_training:
    # Display the name of the selected model
    print(f'Best model: {models[train_run.outputs["best_iteration"]-1]}')

    # Display the accuracy for the optimal run iteration
    print(f'Accuracy: {train_run.outputs["accuracy"]}')


Best model: sklearn.ensemble.RandomForestClassifier
Accuracy: 0.9292035398230089


#### Testing (part of optional model training)

In [38]:
if run_training:
    project.set_function('hub://test_classifier', 'test')
    project.func('test').apply(mlrun.platforms.auto_mount())
    
    test_task = mlrun.NewTask(name="test",
                              params={"label_column": "label",
                                      "plots_dest": path.join("plots", "test")},
                              inputs={"models_path": train_run.outputs['model'],
                                      "test_set": train_run.outputs['test_set']}
                              )
    test_run = project.func('test').run(test_task,
                        artifact_path=project.artifact_path)

> 2021-05-21 01:24:26,256 [info] starting run test uid=b114879dba184f0a85e49bfb06875939 DB=http://mlrun-api:8080
> 2021-05-21 01:24:26,403 [info] Job is running in the background, pod: test-4d4nk
> 2021-05-21 01:24:37,135 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
model-deployment-pipeline-admin,...06875939,0,May 21 01:24:34,completed,test,v3io_user=adminkind=jobowner=adminhost=test-4d4nk,models_pathtest_set,label_column=labelplots_dest=plots/test,accuracy=0.8571428571428571test-error=0.14285714285714285rocauc=0.9345679012345679brier_score=0.09414523809523809f1-score=0.8888888888888888precision_score=0.8888888888888888recall_score=0.8888888888888888,probability-calibrationconfusion-matrixfeature-importancesprecision-recall-binaryroc-binarytest_set_preds


to track results use .show() or .logs() or in CLI: 
!mlrun get run b114879dba184f0a85e49bfb06875939 --project model-deployment-pipeline-admin , !mlrun logs b114879dba184f0a85e49bfb06875939 --project model-deployment-pipeline-admin
> 2021-05-21 01:24:45,659 [info] run executed, status=completed


In [39]:
if run_training:
    # Display the model accuracy
    print(f'Test Accuracy: {test_run.outputs["accuracy"]}')

Test Accuracy: 0.8571428571428571


#### Serving

In [40]:
project.set_function('hub://model_server:development', 'serving')

serving = project.func('serving').apply(mlrun.platforms.auto_mount())
if 'train_run' in locals() and train_run.outputs.get('model') is not None:
    serving.add_model('my_model', train_run.outputs.get('model'))
else:
    serving.add_model('my_model', path.join(getcwd(), 'assets/model.pkl'))
        
serving.set_envs({'INFERENCE_STREAM' : path.join(container, stream_configs['inference-stream']['path']) })
serving.set_envs(v3io_envs)


serving_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['serving-stream']['path']]) + '@ms'
serving.add_trigger('serving_stream',
                    nuclio.triggers.V3IOStreamTrigger(url=serving_stream,
                                                      maxWorkers=stream_configs['serving-stream']['shard_count']+2,
                                                      seekTo='earliest'))
serving.spec.config.pop('spec.triggers.http')
serving.spec.readiness_timeout = 200
serving.spec.replicas = 1

serving.deploy()

> 2021-05-21 01:24:46,092 [info] Starting remote function deploy
2021-05-21 01:24:46  (info) Deploying function
2021-05-21 01:24:46  (info) Building
2021-05-21 01:24:46  (info) Staging files and preparing base images
2021-05-21 01:24:46  (info) Building processor image
2021-05-21 01:25:03  (info) Build complete
2021-05-21 01:25:52  (info) Function deploy complete
> 2021-05-21 01:25:53,408 [info] function deployed, address=192.168.65.4:30864


'http://192.168.65.4:30864'

#### Inference logger

In [41]:
# We will use the same event-handler function for logging the inference stream to parquet.
inference_logger = mlrun.code_to_function(name='inference-logger', handler='handler', kind='nuclio', filename='functions/event-handler.ipynb')
project.set_function(inference_logger)

il_envs = {'PARQUET_SINK_FLAG': 'true',
           'STREAM_SINK_FLAG': 'false',
           'PARQUET_TARGET_PATH' : inference_parquet_target_path,
           'PARQUET_BATCH_SIZE': 8192,
           'TS_KEY': 'when',
           'TS_FORMAT': '%Y-%m-%d %H:%M:%S.%f',
           'FEATURES': ",".join(feature_list),
           'PREDICTIONS': 'about_to_churn',
           'CONTAINER': container}
project.func('inference-logger').set_envs({**v3io_envs, **il_envs})

project.func('inference-logger').apply(mlrun.platforms.auto_mount())

inference_stream = '/'.join(s.strip('/') for s in [web_api_users, stream_configs['inference-stream']['path']]) + '@il'
project.func('inference-logger').add_trigger('inference_stream',
                                               nuclio.triggers.V3IOStreamTrigger(url=inference_stream,
                                                                                 maxWorkers=stream_configs['inference-stream']['shard_count']+2,
                                                                                 seekTo='earliest'))
project.func('inference-logger').spec.replicas=1
project.func('inference-logger').deploy()

> 2021-05-21 01:38:56,166 [info] Starting remote function deploy
2021-05-21 01:38:56  (info) Deploying function
2021-05-21 01:38:56  (info) Building
2021-05-21 01:38:56  (info) Staging files and preparing base images
2021-05-21 01:38:56  (info) Building processor image
2021-05-21 01:41:00  (info) Build complete
2021-05-21 01:41:14  (info) Function deploy complete
> 2021-05-21 01:41:15,489 [info] function deployed, address=192.168.65.4:31976


'http://192.168.65.4:31976'

### Save the Project

In [42]:
project.save()

## Done