Aggregate time-series dataframe#

performs a rolling aggregation on df_artifact, over window by the selected keys applying metric_aggs on metrics and label_aggs on labels.
adding suffix to the feature names.

Steps#

  1. Data exploration

  2. Importing the function

  3. Running the function locally

  4. Running the function remotely

Data exploration#

This is the dataset Occupancy Detection Data Set, UCI as used in the article how-to-predict-room-occupancy-based-on-environmental-factors.

Attribute Information:
date - time year-month-day hour:minute:second
Temperature - in Celsius
Relative Humidity - %
Light - in Lux
CO2 - in ppm
Humidity Ratio - Derived quantity from temperature and relative humidity, in kgwater-vapor/kg-air
Occupancy - 0 or 1, 0 for not occupied, 1 for occupied status

# upload environment variables from env file if exists
import os,mlrun
   
# Specify path
path = "/tmp/examples_ci.env"
   
if os.path.exists(path):
    env_dict = mlrun.set_env_from_file(path, return_dict=True)
# create the new project
project_name = 'aggregate-example'

# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)
> 2022-11-30 13:51:49,512 [info] loaded project aggregate-example from MLRun DB
import pandas as pd

data_path = 'https://s3.wasabisys.com/iguazio/data/function-marketplace-data/aggregate/train_room_occupancy.csv'
df = pd.read_csv(data_path).set_index('date',drop=False)
df.head()
date Temperature Humidity Light CO2 HumidityRatio Occupancy
date
2015-02-04 17:51:00 2015-02-04 17:51:00 23.18 27.2720 426.0 721.25 0.004793 1
2015-02-04 17:51:59 2015-02-04 17:51:59 23.15 27.2675 429.5 714.00 0.004783 1
2015-02-04 17:53:00 2015-02-04 17:53:00 23.15 27.2450 426.0 713.50 0.004779 1
2015-02-04 17:54:00 2015-02-04 17:54:00 23.15 27.2000 426.0 708.25 0.004772 1
2015-02-04 17:55:00 2015-02-04 17:55:00 23.10 27.2000 426.0 704.50 0.004757 1

Importing the function#

import os
aggregate_function = mlrun.import_function("hub://aggregate")
if os.getenv('V3IO_ACCESS_KEY','FALSE')=='TRUE':
    aggregate_function.apply(mlrun.auto_mount())
import numpy as np

# Declaring a custom aggregation function
def dist_from_mean(l):
    mean = np.mean(l)
    return abs(list(l)[3] - mean)

Running the function locally#

aggregate_run = aggregate_function.run(name='aggregate',
                       params = {'metrics': ['Temperature','Humidity'],
                                 'labels': ['Occupancy'],
                                 'metric_aggs': ['mean','std',dist_from_mean],
                                 'label_aggs': ['sum'],
                                 'window': 5,
                                 'center': True},
                       inputs={'df_artifact': data_path},
                       local=True)
> 2022-11-30 13:51:52,271 [info] starting run aggregate uid=62de1243460c487b875b4c66c652bb5f DB=http://mlrun-api:8080
> 2022-11-30 13:51:52,424 [info] Aggregating https://s3.wasabisys.com/iguazio/data/function-marketplace-data/aggregate/train_room_occupancy.csv
> 2022-11-30 13:51:55,166 [info] Logging artifact
project uid iter start state name labels inputs parameters results artifacts
aggregate-example-avia 0 Nov 30 13:51:52 completed aggregate
v3io_user=avia
kind=
owner=avia
host=jupyter-avia-757b4bc677-wn6mf
df_artifact
metrics=['Temperature', 'Humidity']
labels=['Occupancy']
metric_aggs=['mean', 'std', ]
label_aggs=['sum']
window=5
center=True
aggregate

> to track results use the .show() or .logs() methods or click here to open in UI
> 2022-11-30 13:51:55,604 [info] run executed, status=completed
aggregate_run.artifact('aggregate').as_df()
> 2022-11-30 13:51:52,424 [info] Aggregating https://s3.wasabisys.com/iguazio/data/function-marketplace-data/aggregate/train_room_occupancy.csv
> 2022-11-30 13:51:55,166 [info] Logging artifact

final state: completed
date Temperature Humidity Light CO2 HumidityRatio Occupancy Temperature_mean Humidity_mean Occupancy_max
2 2015-02-04 17:53:00 23.15 27.2450 426.0 713.500000 0.004779 1 23.146 27.2369 1.0
3 2015-02-04 17:54:00 23.15 27.2000 426.0 708.250000 0.004772 1 23.130 27.2225 1.0
4 2015-02-04 17:55:00 23.10 27.2000 426.0 704.500000 0.004757 1 23.120 27.2090 1.0
5 2015-02-04 17:55:59 23.10 27.2000 419.0 701.000000 0.004757 1 23.110 27.2000 1.0
6 2015-02-04 17:57:00 23.10 27.2000 419.0 701.666667 0.004757 1 23.100 27.2000 1.0
... ... ... ... ... ... ... ... ... ... ...
8136 2015-02-10 09:27:00 21.00 35.8600 433.0 771.333333 0.005525 1 21.025 35.9315 1.0
8137 2015-02-10 09:28:00 21.05 36.0500 433.0 780.250000 0.005571 1 21.035 35.9905 1.0
8138 2015-02-10 09:29:00 21.05 36.0975 433.0 787.250000 0.005579 1 21.050 36.0195 1.0
8139 2015-02-10 09:29:59 21.05 35.9950 433.0 789.500000 0.005563 1 21.070 36.0995 1.0
8140 2015-02-10 09:30:59 21.10 36.0950 433.0 798.500000 0.005596 1 21.080 36.1295 1.0

8139 rows × 10 columns

Running the function remotely#

aggregate_run = aggregate_function.run(name='aggregate',
                       params = {'metrics': ['Temperature','Humidity'],
                                 'labels': ['Occupancy'],
                                 'metric_aggs': ['mean','std'],
                                 'label_aggs': ['sum'],
                                 'window': 5,
                                 'center': True},
                       inputs={'df_artifact': data_path},
                       local=False)
> 2022-11-30 13:51:55,832 [info] starting run aggregate uid=6b4637497e564c158d985b7986954110 DB=http://mlrun-api:8080
> 2022-11-30 13:51:56,020 [info] Job is running in the background, pod: aggregate-bqk9s
> 2022-11-30 13:52:00,372 [info] Aggregating https://s3.wasabisys.com/iguazio/data/function-marketplace-data/aggregate/train_room_occupancy.csv
> 2022-11-30 13:52:02,082 [info] Logging artifact
> 2022-11-30 13:52:02,284 [info] run executed, status=completed
final state: completed
project uid iter start state name labels inputs parameters results artifacts
aggregate-example-avia 0 Nov 30 13:52:00 completed aggregate
v3io_user=avia
kind=job
owner=avia
mlrun/client_version=1.2.0-rc18
host=aggregate-bqk9s
df_artifact
metrics=['Temperature', 'Humidity']
labels=['Occupancy']
metric_aggs=['mean', 'std']
label_aggs=['sum']
window=5
center=True
aggregate

> to track results use the .show() or .logs() methods or click here to open in UI
> 2022-11-30 13:52:05,399 [info] run executed, status=completed

Back to the top