openlayer.InferencePipeline.stream_data#

InferencePipeline.stream_data(*args, **kwargs)#

Streams production data to the Openlayer platform.

Parameters:
stream_data: Union[Dict[str, any], List[Dict[str, any]]]

Dictionary or list of dictionaries containing the production data. E.g., {'CreditScore': 618, 'Geography': 'France', 'Balance': 321.92}.

stream_configDict[str, any], optional

Dictionary containing the stream configuration. This is not needed if stream_config_file_path is provided.

What’s in the config?

The configuration for a stream of data depends on the TaskType. Refer to the How to write dataset configs guides for details. These configurations are the same for development and production data.

stream_config_file_pathstr

Path to the configuration YAML file. This is not needed if stream_config is provided.

What’s in the config file?

The configuration for a stream of data depends on the TaskType. Refer to the How to write dataset configs guides for details. These configurations are the same for development and production data.

Notes

Production data usually contains the inference timestamps. This column is specified in the timestampsColumnName of the stream config file, and it should contain timestamps in the UNIX format in seconds.

Production data also usually contains the prediction IDs. This column is specified in the inferenceIdColumnName of the stream config file. This column is particularly important when the ground truths are not available during inference time, and they are updated later.

If the above are not provided, Openlayer will generate inference IDs and use the current time as the inference timestamp.

Examples

Related guide: How to set up monitoring.

First, instantiate the client and retrieve an existing inference pipeline:

>>> import openlayer
>>>
>>> client = openlayer.OpenlayerClient('YOUR_API_KEY_HERE')
>>>
>>> project = client.load_project(name="Churn prediction")
>>>
>>> inference_pipeline = project.load_inference_pipeline(
...     name="XGBoost model inference pipeline",
... )

With the InferencePipeline object retrieved, you can stream production data – in this example, stored in a dictionary called stream_data – with:

>>> inference_pipeline.stream_data(
...     stream_data=stream_data,
...     stream_config=config,
... )