ML Service Provider¶
This documentation provides an overview of the ML Service Provider (german: Dienstleister ) module within the FastIoT ecosystem.
Services¶
This section provides an overview of the services available in the Dienstleister (Service Provider) module. Each service is responsible for a specific functionality within the FastIoT ecosystem.
DataProcessingService¶
- class DataProcessingService(broker_connection: BrokerConnection | None = None, **kwargs)¶
Bases:
FastIoTServiceA Data Processing Service
- Variables:
_preprocessor – Holds the steps of the processing Pipeline
- async get_labeled_dataset(_: str, msg: Thing) Thing¶
Request a labeled dataset from the broker and returns a corresponding processed dataset
- Parameters:
_ (str) – The topic of the message. This is not used in this method.
msg (Thing) – Actual message
- Returns:
Thing to signal reception
- Return type:
Thing
EdcDienstleisterService¶
- edc_fetch_dict_list(url)¶
Fetches EDC data
- Parameters:
url (str) – URL where the data is going to be fetched
- Returns:
The fetched data
- Return type:
Any
- class EdcDienstleisterService(broker_connection: BrokerConnection | None = None, **kwargs)¶
Bases:
FastIoTServiceEDC Dienstleister Service
- async get_labeled_dataset(_: str, __: Thing) Thing¶
Gets a labled dataset using
edc_fetch_dict_list(). :param _: The topic of the message. This is not used in this method. :type _: str :param msg: Actual message :type msg: Thing :return: Thing to signal reception :rtype: Thing
MLTrainingService¶
- class MlTrainingService(*args, **kwargs)¶
Bases:
FastIoTServiceML Training Service (Dienstleister)
- Parameters:
args – Positional arguments passed to the superclass or internal use.
kwargs – Arbitrary keyword arguments, allowing for extensibility or forwarding to the superclass constructor or other components.
- async produce()¶
Produces a model in regular intervals using
train_model()
- async train_model(**kwargs)¶
Trains a Mlflow-tracked model
- Parameters:
kwargs – Arbitrary keyword arguments, allowing for extensibility or forwarding to the superclass constructor or other components.
Data Processing Utils¶
This section includes utility classes and functions used for data processing within the Dienstleister module. These utilities are designed to facilitate common data manipulation tasks such as column dropping, type setting, and encoding.
- class ColumnDropper(target: list)¶
Bases:
BaseEstimator,TransformerMixinDrop the specified columns from the DataFrame.
- Parameters:
target (list) – List of columns to drop
- fit(target)¶
Return self.
- transform(x: DataFrame) DataFrame¶
Drop the specified columns from the DataFrame.
- Parameters:
x (pd.DataFrame) – The Dataframe to transform
- Returns:
The transformed Dataframe
- Return type:
pd.DataFrame
- class ColumnTypeSetter(target: list)¶
Bases:
BaseEstimator,TransformerMixinSet the specified columns to type float in the DataFrame.
- Parameters:
target (list) – List of columns to set
- fit(target)¶
Return self.
- transform(x: DataFrame) DataFrame¶
Set the specified columns to type float in the DataFrame.
- Parameters:
x (pd.DataFrame) – The Dataframe to transform
- Returns:
The transformed Dataframe
- Return type:
pd.DataFrame
- class OneHotEncodePd(target: str, prefix: str, sep: str, required_columns=None)¶
Bases:
BaseEstimator,TransformerMixinOne-hot encode the specified column.
- Parameters:
target (list) – The column to one-hot encode.
prefix (str) – The prefix to use for the one-hot encoded columns.
sep (str) – The separator to use for the one-hot encoded columns.
required_columns (list) – A list of columns that should be present in the DataFrame after one-hot encoding.
- fit(target)¶
Return self.
- transform(x: DataFrame) DataFrame¶
One-hot encode the specified column.
- Parameters:
x (pd.DataFrame) – The Dataframe to transform
- Returns:
The transformed Dataframe
- Return type:
pd.DataFrame
- class MultiOneHotEncodePd(target: str, prefix: str, sep: str, required_columns=None)¶
Bases:
BaseEstimator,TransformerMixinOne-hot encode the specified column into multiple categorical values.
- Parameters:
target (list) – The column to one-hot encode.
prefix (str) – The prefix to use for the one-hot encoded columns.
sep (str) – The separator to use for the one-hot encoded columns.
required_columns (list) – A list of columns that should be present in the DataFrame after one-hot encoding.
- fit(target)¶
Return self.
- transform(x: DataFrame) DataFrame¶
One-hot encode the specified column containing lists of categorical values.
- Parameters:
x (pd.DataFrame) – The Dataframe to transform
- Returns:
The transformed Dataframe
- Return type:
pd.DataFrame
- class NormalizeCols(target: str, feature_range: tuple, column_range: tuple)¶
Bases:
BaseEstimator,TransformerMixinNormalize the specified column to the specified feature range using provided column range.
- Parameters:
target (str) – The column to normalize.
feature_range (tuple) – The desired range of the transformed data.
column_range (tuple) – The actual range of the column data.
- fit(target)¶
Return self.
- transform(x: DataFrame) DataFrame¶
Normalize the specified column to the specified feature range using provided column range.
- Parameters:
x (pd.DataFrame) – The Dataframe to transform
- Returns:
The transformed Dataframe
- Return type:
pd.DataFrame
ML Training Utils¶
This section provides utility classes and functions for machine learning training tasks, including dataset handling and neural network definitions.
- class MyDataset(df: DataFrame, prediction_columns: list)¶
Bases:
DatasetA custom dataset for the pytorch regression service.
- Parameters:
df (pd.DataFrame) – The dataframe containing the data.
prediction_columns (list) – The columns to be used as prediction targets.
- __getitem__(idx)¶
Get an item from the dataset.
- Parameters:
idx (int) – The index of the item to get.
- Returns:
The input and output data.
- Return type:
tuple
- __len__()¶
Return the length of the dataset.
- Returns:
The length of the dataset.
- Return type:
int
- class DemonstratorNeuralNet(input_dim, hidden_dim, output_dim, *args, **kwargs)¶
Bases:
ModuleA simple neural network for demonstration purposes.
- Parameters:
input_dim (int) – Dimension of the input layer.
hidden_dim (int) – Dimension of the hidden layers.
output_dim (int) – Dimension of the output layer.
args – Positional arguments passed to the superclass or internal use.
kwargs – Arbitrary keyword arguments, allowing for extensibility or forwarding to the superclass constructor or other components.
- forward(x)¶
Forward pass through the network.
- Parameters:
x – The input to the network.
- Returns:
The output of the network.
- Return type:
torch.Tensor
Broker Facade¶
- ok_response_thing(payload: dict | list, fiot_service: FastIoTService) Thing¶
Creates a Thing object with the payload and the name of the service that created it.
- Parameters:
payload (dict | list) – The payload to be sent.
fiot_service (FastIoTService) – The service that created the payload.
- Returns:
A Thing object with the payload and the name of the service that created it.
- Return type:
Thing
- error_response_thing(exception: Exception, fiot_service: FastIoTService) Thing¶
Creates a Thing object with the error information and the name of the service that created it.
- Parameters:
exception (Exception) – The exception that was raised.
fiot_service (FastIoTService) – The service that raised the exception.
- Returns:
A Thing object with the error information and the name of the service that created it.
- Return type:
Thing
- async request_replysubject_thing_wrapper(fiot_service: FastIoTService, data: dict | list[dict], subject: str, timeout: float) Future[dict | list[dict] | int | float | str | None]¶
This function wraps the request to the broker and handles the response.
- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (dict) – The data to be sent to the broker.
subject (str) – The subject to which the request is sent.
timeout (float) – Seconds to wait for a response.
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_save_many_raw_data_points(fiot_service: FastIoTService, data: list[dict], timeout: float = 10) Future[dict | list[dict]]¶
This function sends a request to the broker under the
DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECTsubject to save raw datapoints to the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_upsert_many_processed_data_points(fiot_service: FastIoTService, data: list[dict], timeout: float = 10) Future[dict | list[dict]]¶
This function sends a request to the broker under the
DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECTsubject to update processed datapoints in the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_all_raw_data_points(fiot_service: FastIoTService, timeout: float = 10) Future[dict | list[dict]]¶
This function sends a request to the broker under the
DB_GET_ALL_RAW_DATA_SUBJECTfor all raw datapoints in the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_all_processed_data_points(fiot_service: FastIoTService, timeout: float = 10) Future[dict | list[dict]]¶
This function sends a request to the broker under the
DB_GET_ALL_PROCESSED_DATA_SUBJECTfor all processed datapoints in the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_processed_data_points_count(fiot_service: FastIoTService, timeout: float = 10) Future[int]¶
This function sends a request to the broker under the
DB_GET_PROCESSED_DATA_POINTS_COUNT_SUBJECTfor the processed datapoints count in the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_processed_data_points_page(fiot_service: FastIoTService, page: int = 0, page_size: int = 10, timeout: float = 10) Future[list[dict]]¶
This function sends a request to the broker under the
DB_GET_PROCESSED_DATA_PAGE_SUBJECTfor a page of processed datapoints in the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_processed_data_points_from_raw_data(fiot_service: FastIoTService, data: dict, timeout: float = 10) Future[list[dict]]¶
This function sends a request to the broker under the
DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECTfor a set of raw data to be processed and handed back to the requesting service utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_prediction(fiot_service: FastIoTService, data: dict, timeout: float = 10) Future[list[dict]]¶
This function sends a request to the broker under the
ML_SERVING_SUBJECTfor a prediction utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_labeled_dataset(fiot_service: FastIoTService, data: dict = {}, timeout: float = 10) Future[list[dict]]¶
This function sends a request to the broker under the
DB_GET_LABELED_DATASET_SUBJECTfor a labeled dataset from the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
- async request_get_edc_labeled_dataset(fiot_service: FastIoTService, data: dict = {}, timeout: float = 10) Future[list[dict]]¶
This function sends a request to the broker under the
DB_GET_EDC_LABELED_DATASET_SUBJECTfor a labeled EDC dataset from the database utilizing therequest_replysubject_thing_wrapper- Parameters:
fiot_service (FastIoTService) – The FastIoTService instance that is used to send the request.
data (list[dict]) – The data to be sent to the broker.
timeout (float) – Seconds to wait for a response
- Returns:
Future placeholder for the awaited Reply
- Return type:
Future[dict | list[dict] | int | float | str | None]
Broker Lifecycle Topics¶
- DB_GET_LABELED_DATASET_SUBJECT = ReplySubject(name='get-labeled-dataset', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with retrieving a labeled dataset
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_SAVE_MANY_RAW_DATAPOINTS_SUBJECT = ReplySubject(name='save-many-raw-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with saving a set of datapoints to the database
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_GET_ALL_RAW_DATA_SUBJECT = ReplySubject(name='get-all-raw-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with retrieving all raw data contained in the database
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_GET_ALL_PROCESSED_DATA_SUBJECT = ReplySubject(name='get-all-processed-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with retrieving all processed data contained in the database
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_GET_PROCESSED_DATA_POINTS_COUNT_SUBJECT = ReplySubject(name='get-processed-datapoints-count', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with retrieving a certain count of processed datapoints
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_UPSERT_MANY_PROCESSED_DATAPOINTS_SUBJECT = ReplySubject(name='upsert-many-processed-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with updating a set of processed datapoints in the database
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_GET_PROCESSED_DATA_COUNT_SUBJECT = ReplySubject(name='get-processed-data-count', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with retrieving the count of processed datapoints in the database
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_GET_PROCESSED_DATA_PAGE_SUBJECT = ReplySubject(name='get-processed-data-page', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with retrieving processed data page from the database
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DATA_PROCESSING_PROCESS_RAW_DATA_SUBJECT = ReplySubject(name='process-raw-datapoints', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with processing raw attached data
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- ML_SERVING_SUBJECT = ReplySubject(name='get-prediction', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with ML services
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker
- DB_GET_EDC_LABELED_DATASET_SUBJECT = ReplySubject(name='get-edc-labeled-dataset', msg_cls=<class 'fastiot.msg.thing.Thing'>, reply_cls=<class 'fastiot.msg.thing.Thing'>)¶
Broker Subject associated with requesting a labeled EDC dataset from the database
- Parameters:
name (string) – Subject Label
msg_cls (Thing) – Class of the message sent to broker
reply_cls (Thing) – Class of the reply received from the broker