ML Service Provider

This documentation provides an overview of the ML Service Provider (german: Dienstleister ) module within the FastIoT ecosystem.

Services

This section provides an overview of the services available in the Dienstleister (Service Provider) module. Each service is responsible for a specific functionality within the FastIoT ecosystem.

DataProcessingService

class DataProcessingService(broker_connection: BrokerConnection | None = None, **kwargs)

Bases: FastIoTService

A Data Processing Service

Variables:

_preprocessor – Holds the steps of the processing Pipeline

async get_labeled_dataset(_: str, msg: Thing) Thing

Request a labeled dataset from the broker and returns a corresponding processed dataset

Parameters:
  • _ (str) – The topic of the message. This is not used in this method.

  • msg (Thing) – Actual message

Returns:

Thing to signal reception

Return type:

Thing

EdcDienstleisterService

edc_fetch_dict_list(url)

Fetches EDC data

Parameters:

url (str) – URL where the data is going to be fetched

Returns:

The fetched data

Return type:

Any

class EdcDienstleisterService(broker_connection: BrokerConnection | None = None, **kwargs)

Bases: FastIoTService

EDC Dienstleister Service

async get_labeled_dataset(_: str, __: Thing) Thing

Gets a labled dataset using edc_fetch_dict_list(). :param _: The topic of the message. This is not used in this method. :type _: str :param msg: Actual message :type msg: Thing :return: Thing to signal reception :rtype: Thing

MLTrainingService

class MlTrainingService(*args, **kwargs)

Bases: FastIoTService

ML Training Service (Dienstleister)

Parameters:
  • args – Positional arguments passed to the superclass or internal use.

  • kwargs – Arbitrary keyword arguments, allowing for extensibility or forwarding to the superclass constructor or other components.

async produce()

Produces a model in regular intervals using train_model()

async train_model(**kwargs)

Trains a Mlflow-tracked model

Parameters:

kwargs – Arbitrary keyword arguments, allowing for extensibility or forwarding to the superclass constructor or other components.

Data Processing Utils

This section includes utility classes and functions used for data processing within the Dienstleister module. These utilities are designed to facilitate common data manipulation tasks such as column dropping, type setting, and encoding.

class ColumnDropper(target: list)

Bases: BaseEstimator, TransformerMixin

Drop the specified columns from the DataFrame.

Parameters:

target (list) – List of columns to drop

fit(target)

Return self.

transform(x: DataFrame) DataFrame

Drop the specified columns from the DataFrame.

Parameters:

x (pd.DataFrame) – The Dataframe to transform

Returns:

The transformed Dataframe

Return type:

pd.DataFrame

class ColumnTypeSetter(target: list)

Bases: BaseEstimator, TransformerMixin

Set the specified columns to type float in the DataFrame.

Parameters:

target (list) – List of columns to set

fit(target)

Return self.

transform(x: DataFrame) DataFrame

Set the specified columns to type float in the DataFrame.

Parameters:

x (pd.DataFrame) – The Dataframe to transform

Returns:

The transformed Dataframe

Return type:

pd.DataFrame

class OneHotEncodePd(target: str, prefix: str, sep: str, required_columns=None)

Bases: BaseEstimator, TransformerMixin

One-hot encode the specified column.

Parameters:
  • target (list) – The column to one-hot encode.

  • prefix (str) – The prefix to use for the one-hot encoded columns.

  • sep (str) – The separator to use for the one-hot encoded columns.

  • required_columns (list) – A list of columns that should be present in the DataFrame after one-hot encoding.

fit(target)

Return self.

transform(x: DataFrame) DataFrame

One-hot encode the specified column.

Parameters:

x (pd.DataFrame) – The Dataframe to transform

Returns:

The transformed Dataframe

Return type:

pd.DataFrame

class MultiOneHotEncodePd(target: str, prefix: str, sep: str, required_columns=None)

Bases: BaseEstimator, TransformerMixin

One-hot encode the specified column into multiple categorical values.

Parameters:
  • target (list) – The column to one-hot encode.

  • prefix (str) – The prefix to use for the one-hot encoded columns.

  • sep (str) – The separator to use for the one-hot encoded columns.

  • required_columns (list) – A list of columns that should be present in the DataFrame after one-hot encoding.

fit(target)

Return self.

transform(x: DataFrame) DataFrame

One-hot encode the specified column containing lists of categorical values.

Parameters:

x (pd.DataFrame) – The Dataframe to transform

Returns:

The transformed Dataframe

Return type:

pd.DataFrame

class NormalizeCols(target: str, feature_range: tuple, column_range: tuple)

Bases: BaseEstimator, TransformerMixin

Normalize the specified column to the specified feature range using provided column range.

Parameters:
  • target (str) – The column to normalize.

  • feature_range (tuple) – The desired range of the transformed data.

  • column_range (tuple) – The actual range of the column data.

fit(target)

Return self.

transform(x: DataFrame) DataFrame

Normalize the specified column to the specified feature range using provided column range.

Parameters:

x (pd.DataFrame) – The Dataframe to transform

Returns:

The transformed Dataframe

Return type:

pd.DataFrame

ML Training Utils

This section provides utility classes and functions for machine learning training tasks, including dataset handling and neural network definitions.

class MyDataset(df: DataFrame, prediction_columns: list)

Bases: Dataset

A custom dataset for the pytorch regression service.

Parameters:
  • df (pd.DataFrame) – The dataframe containing the data.

  • prediction_columns (list) – The columns to be used as prediction targets.

__getitem__(idx)

Get an item from the dataset.

Parameters:

idx (int) – The index of the item to get.

Returns:

The input and output data.

Return type:

tuple

__len__()

Return the length of the dataset.

Returns:

The length of the dataset.

Return type:

int

class DemonstratorNeuralNet(input_dim, hidden_dim, output_dim, *args, **kwargs)

Bases: Module

A simple neural network for demonstration purposes.

Parameters:
  • input_dim (int) – Dimension of the input layer.

  • hidden_dim (int) – Dimension of the hidden layers.

  • output_dim (int) – Dimension of the output layer.

  • args – Positional arguments passed to the superclass or internal use.

  • kwargs – Arbitrary keyword arguments, allowing for extensibility or forwarding to the superclass constructor or other components.

forward(x)

Forward pass through the network.

Parameters:

x – The input to the network.

Returns:

The output of the network.

Return type:

torch.Tensor

Broker Facade

ok_response_thing(payload: dict | list, fiot_service: FastIoTService) Thing

Creates a Thing object with the payload and the name of the service that created it.

Parameters:
  • payload (dict | list) – The payload to be sent.

  • fiot_service (FastIoTService) – The service that created the payload.

Returns:

A Thing object with the payload and the name of the service that created it.

Return type:

Thing

error_response_thing(exception: Exception, fiot_service: FastIoTService) Thing

Creates a Thing object with the error information and the name of the service that created it.

Parameters:
  • exception (Exception) – The exception that was raised.

  • fiot_service (FastIoTService) – The service that raised the exception.

Returns:

A Thing object with the error information and the name of the service that created it.

Return type:

Thing

async request_replysubject_thing_wrapper(fiot_service: FastIoTService, data: dict | list[dict], subject: str, timeout: float) Future[dict | list[dict] | int | float | str | None]

This function wraps the request to the broker and handles the response.

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (dict) – The data to be sent to the broker.

  • subject (str) – The subject to which the request is sent.

  • timeout (float) – Seconds to wait for a response.

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_save_many_raw_data_points(fiot_service: FastIoTService, data: list[dict], timeout: float = 10) Future[dict | list[dict]]

This function sends a request to the broker under the DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT subject to save raw datapoints to the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_upsert_many_processed_data_points(fiot_service: FastIoTService, data: list[dict], timeout: float = 10) Future[dict | list[dict]]

This function sends a request to the broker under the DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT subject to update processed datapoints in the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_all_raw_data_points(fiot_service: FastIoTService, timeout: float = 10) Future[dict | list[dict]]

This function sends a request to the broker under the DB_GET_ALL_RAW_DATA_SUBJECT for all raw datapoints in the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_all_processed_data_points(fiot_service: FastIoTService, timeout: float = 10) Future[dict | list[dict]]

This function sends a request to the broker under the DB_GET_ALL_PROCESSED_DATA_SUBJECT for all processed datapoints in the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_processed_data_points_count(fiot_service: FastIoTService, timeout: float = 10) Future[int]

This function sends a request to the broker under the DB_GET_PROCESSED_DATA_POINTS_COUNT_SUBJECT for the processed datapoints count in the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_processed_data_points_page(fiot_service: FastIoTService, page: int = 0, page_size: int = 10, timeout: float = 10) Future[list[dict]]

This function sends a request to the broker under the DB_GET_PROCESSED_DATA_PAGE_SUBJECT for a page of processed datapoints in the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_processed_data_points_from_raw_data(fiot_service: FastIoTService, data: dict, timeout: float = 10) Future[list[dict]]

This function sends a request to the broker under the DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECT for a set of raw data to be processed and handed back to the requesting service utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_prediction(fiot_service: FastIoTService, data: dict, timeout: float = 10) Future[list[dict]]

This function sends a request to the broker under the ML_SERVING_SUBJECT for a prediction utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_labeled_dataset(fiot_service: FastIoTService, data: dict = {}, timeout: float = 10) Future[list[dict]]

This function sends a request to the broker under the DB_GET_LABELED_DATASET_SUBJECT for a labeled dataset from the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

async request_get_edc_labeled_dataset(fiot_service: FastIoTService, data: dict = {}, timeout: float = 10) Future[list[dict]]

This function sends a request to the broker under the DB_GET_EDC_LABELED_DATASET_SUBJECT for a labeled EDC dataset from the database utilizing the request_replysubject_thing_wrapper

Parameters:
  • fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.

  • data (list[dict]) – The data to be sent to the broker.

  • timeout (float) – Seconds to wait for a response

Returns:

Future placeholder for the awaited Reply

Return type:

Future[dict | list[dict] | int | float | str | None]

Broker Lifecycle Topics

DB_GET_LABELED_DATASET_SUBJECT = ReplySubject(name='get-labeled-dataset', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with retrieving a labeled dataset

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT = ReplySubject(name='save-many-raw-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with saving a set of datapoints to the database

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_GET_ALL_RAW_DATA_SUBJECT = ReplySubject(name='get-all-raw-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with retrieving all raw data contained in the database

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_GET_ALL_PROCESSED_DATA_SUBJECT = ReplySubject(name='get-all-processed-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with retrieving all processed data contained in the database

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_GET_PROCESSED_DATA_POINTS_COUNT_SUBJECT = ReplySubject(name='get-processed-datapoints-count', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with retrieving a certain count of processed datapoints

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT = ReplySubject(name='upsert-many-processed-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with updating a set of processed datapoints in the database

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_GET_PROCESSED_DATA_COUNT_SUBJECT = ReplySubject(name='get-processed-data-count', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with retrieving the count of processed datapoints in the database

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_GET_PROCESSED_DATA_PAGE_SUBJECT = ReplySubject(name='get-processed-data-page', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with retrieving processed data page from the database

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECT = ReplySubject(name='process-raw-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with processing raw attached data

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

ML_SERVING_SUBJECT = ReplySubject(name='get-prediction', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with ML services

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker

DB_GET_EDC_LABELED_DATASET_SUBJECT = ReplySubject(name='get-edc-labeled-dataset', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)

Broker Subject associated with requesting a labeled EDC dataset from the database

Parameters:
  • name (string) – Subject Label

  • msg_cls (Thing) – Class of the message sent to broker

  • reply_cls (Thing) – Class of the reply received from the broker