Module fusion_platform.models.process_service_execution

Process service execution model class file.

author: Matthew Casey

© Digital Content Analysis Technology Ltd

Classes

class ProcessServiceExecution (session, schema=None)
Expand source code
class ProcessServiceExecution(Model):
    """
    Process service execution model class providing attributes and methods to manipulate process execution item details.
    """

    # Override the schema.
    _SCHEMA = ProcessServiceExecutionSchema()

    # Override the base model class name.
    _BASE_MODEL_CLASS_NAME = 'Organisation'  # A string to prevent circular imports.

    # Base path.
    _PATH_BASE = '/organisations/{organisation_id}/process_service_executions/{process_service_execution_id}'

    # Override the standard model paths.
    _PATH_GET = _PATH_BASE

    # Add in the custom model paths.
    _PATH_LOGS = f"{_PATH_BASE}/logs"

    @property
    def inputs(self):
        """
        Provides an iterator through the process execution component's inputs.

        Returns:
            An iterator through the input data objects.

        Raises:
            RequestError: if any get fails.
            ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
        """
        inputs = self._model.get(self.__class__._FIELD_INPUTS, []) if self._model.get(self.__class__._FIELD_INPUTS) is not None else []

        return Data._models_from_api_ids(self._session, Data._get_ids_from_list(inputs, organisation_id=self.organisation_id))

    def download_log_file(self, path):
        """
        Downloads the log output from the component's execution to the specified path as a CSV file.

        Args:
            path: The path to save the log records to.

        Raises:
            RequestError if any get fails.
        """
        logs = ProcessServiceExecutionLog._models_from_api_path(self._session, self._get_path(self.__class__._PATH_LOGS), reverse=True)
        first = True

        try:
            with open(path, 'w') as file:
                for log in logs:
                    header, line = log.to_csv(exclude=['id', 'created_at', 'updated_at', 'process_service_execution_id'])

                    if first:
                        file.write(header)
                        file.write('\n')

                        first = False

                    file.write(line)
                    file.write('\n')
        except:
            # If an error occurred, delete the file.
            os.remove(path)

            # Make sure the error is raised.
            raise

    @property
    def outputs(self):
        """
        Provides an iterator through the process execution component's outputs.

        Returns:
            An iterator through the output data objects.

        Raises:
            RequestError: if any get fails.
            ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
        """
        outputs = self._model.get(self.__class__._FIELD_OUTPUTS, []) if self._model.get(self.__class__._FIELD_OUTPUTS) is not None else []

        return Data._models_from_api_ids(self._session, Data._get_ids_from_list(outputs, organisation_id=self.organisation_id))

Process service execution model class providing attributes and methods to manipulate process execution item details.

Initialises the object.

Args

session
The linked session object for interfacing with the Fusion Platform®.
schema
The optional schema to use instead of the schema defined by the class.

Ancestors

  • Model
  • fusion_platform.base.Base

Instance variables

prop attributes

Inherited from: Model.attributes

Returns

The model attributes as a dictionary.

prop inputs
Expand source code
@property
def inputs(self):
    """
    Provides an iterator through the process execution component's inputs.

    Returns:
        An iterator through the input data objects.

    Raises:
        RequestError: if any get fails.
        ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
    """
    inputs = self._model.get(self.__class__._FIELD_INPUTS, []) if self._model.get(self.__class__._FIELD_INPUTS) is not None else []

    return Data._models_from_api_ids(self._session, Data._get_ids_from_list(inputs, organisation_id=self.organisation_id))

Provides an iterator through the process execution component's inputs.

Returns

An iterator through the input data objects.

Raises

RequestError
if any get fails.
ModelError
if a model could not be loaded or validated from the Fusion Platform®.
prop outputs
Expand source code
@property
def outputs(self):
    """
    Provides an iterator through the process execution component's outputs.

    Returns:
        An iterator through the output data objects.

    Raises:
        RequestError: if any get fails.
        ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
    """
    outputs = self._model.get(self.__class__._FIELD_OUTPUTS, []) if self._model.get(self.__class__._FIELD_OUTPUTS) is not None else []

    return Data._models_from_api_ids(self._session, Data._get_ids_from_list(outputs, organisation_id=self.organisation_id))

Provides an iterator through the process execution component's outputs.

Returns

An iterator through the output data objects.

Raises

RequestError
if any get fails.
ModelError
if a model could not be loaded or validated from the Fusion Platform®.

Methods

def delete(self)

Inherited from: Model.delete

Attempts to delete the model object. This assumes the model is deleted using a DELETE RESTful request …

def download_log_file(self, path)
Expand source code
def download_log_file(self, path):
    """
    Downloads the log output from the component's execution to the specified path as a CSV file.

    Args:
        path: The path to save the log records to.

    Raises:
        RequestError if any get fails.
    """
    logs = ProcessServiceExecutionLog._models_from_api_path(self._session, self._get_path(self.__class__._PATH_LOGS), reverse=True)
    first = True

    try:
        with open(path, 'w') as file:
            for log in logs:
                header, line = log.to_csv(exclude=['id', 'created_at', 'updated_at', 'process_service_execution_id'])

                if first:
                    file.write(header)
                    file.write('\n')

                    first = False

                file.write(line)
                file.write('\n')
    except:
        # If an error occurred, delete the file.
        os.remove(path)

        # Make sure the error is raised.
        raise

Downloads the log output from the component's execution to the specified path as a CSV file.

Args

path
The path to save the log records to.

Raises

RequestError if any get fails.

def get(self, **kwargs)

Inherited from: Model.get

Gets the model object by loading it from the Fusion Platform®. Uses the model's current id and base model id for the get unless …

def to_csv(self, exclude=None)

Inherited from: Model.to_csv

Converts the model attributes into a CSV string …

def update(self, **kwargs)

Inherited from: Model.update

Attempts to update the model object with the given values. For models which have not been persisted, the relevant fields are updated without …

class ProcessServiceExecutionSchema (*,
only: types.StrSequenceOrSet | None = None,
exclude: types.StrSequenceOrSet = (),
many: bool | None = None,
load_only: types.StrSequenceOrSet = (),
dump_only: types.StrSequenceOrSet = (),
partial: bool | types.StrSequenceOrSet | None = None,
unknown: types.UnknownOption | None = None)
Expand source code
class ProcessServiceExecutionSchema(Schema):
    """
    Schema class for process service execution model.

    Each process service execution model has the following fields (and nested fields):

    .. include::process_service_execution.md
    """
    id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    created_at = fields.DateTime(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    updated_at = fields.DateTime(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    organisation_id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    process_execution_id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    process_id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    service_id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    image_id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    chain_index = fields.Integer(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    name = fields.String(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    started_at = fields.DateTime(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    ended_at = fields.DateTime(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    runtime = fields.Integer(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    # Removed input_size.
    architecture = fields.String(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    cpu = fields.Decimal(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    gpu = fields.Decimal(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    memory = fields.Decimal(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    storage = fields.Decimal(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    instance_type = fields.String(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    actions = fields.List(fields.Nested(ProcessServiceExecutionActionSchema()), allow_none=True)
    options = fields.List(fields.Nested(ProcessServiceExecutionOptionSchema()), allow_none=True,
                          metadata={'read_only': True})  # Changed to prevent this being updated.
    inputs = fields.List(fields.UUID(required=True), allow_none=True, metadata={'hide': True})  # Changed to hide as an attribute.
    outputs = fields.List(fields.UUID(required=True), allow_none=True, metadata={'hide': True})  # Changed to hide as an attribute.
    # Removed storage_id.
    intermediate = fields.Boolean(allow_none=True)
    # Removed task_id.
    # Removed task_starts.
    # Removed instance_id.
    success = fields.Boolean(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    metrics = fields.List(fields.Nested(ProcessServiceExecutionMetricSchema()), allow_none=True,
                          metadata={'read_only': True})  # Changed to prevent this being updated.

    class Meta:
        """
        When loading an object, make sure we exclude any unknown fields, rather than raising an exception, and put fields in their definition order.
        """
        unknown = EXCLUDE

Schema class for process service execution model.

Each process service execution model has the following fields (and nested fields):

  • id: The unique identifier for the record.
  • created_at: When was the record created?
  • updated_at: When was the record last updated?
  • organisation_id: The owning organisation.
  • process_execution_id: The specific execution of the process.
  • process_id: The process executed.
  • service_id: The explicit service version of the SSD which has been executed.
  • image_id: The image which was executed.
  • chain_index: The index in the processing chain for this executed service in the process.
  • name: The name of the service which has been executed.
  • started_at: When did the execution start?
  • ended_at: When did the execution end?
  • runtime: The execution runtime in seconds.
  • architecture: The processor architecture used to run the image.
  • cpu: The number of CPUs used to run the image.
  • gpu: The number of GPUs used to run the image.
  • memory: The memory in megabytes used to run the image.
  • storage: The local storage space in gigabytes used to run the image.
  • instance_type: The instance type allocated for use by the service.
  • actions: The custom actions associated with this execution.
    • name: The name of the value.
    • values: The values required for the action.
      • name: The name of the value.
      • required: Is the value required for the action?
      • data_type: The data type associated with the value.
      • default: The optional default value.
      • validation: The optional validation. This must be supplied for date/time and constrained values.
      • constant: Is this value constant and therefore cannot be changed?
      • ssd_id: The SSD from which this value is taken.
      • output: The output from the SSD being used for the value.
      • selector: The selector from the SSD output being used for the value.
      • data_id: The data item which has been output by the SSD to extract the value from.
      • expression: The expression used to calculate the value.
      • value: The actual value.
      • url: The URL which can be used to obtain the value.
      • advanced: Is this value for advanced usage?
      • title: The title for the value.
      • description: The description of the value.
    • url: The URL which can be executed to receive the value of the action.
    • title: The title for the action.
    • description: The description of the action.
  • options: The options used by this execution.
    • name: The name of the option.
    • value: The value for the option.
    • data_type: The data type associated with the option.
    • validation: The optional validation for the option. This must be supplied for date/time and constrained values.
  • intermediate: Is this an intermediate service?
  • success: Has the execution completed successfully?
  • metrics: Metrics recorded during the execution.
    • date: When was the metric recorded?
    • memory_total_bytes: The total memory in bytes.
    • memory_free_bytes: The free memory in bytes.
    • swap_total_bytes: The total swap space in bytes.
    • swap_free_bytes: The free swap space in bytes.
    • tmp_total_bytes: The total temporary disk space in bytes.
    • tmp_free_bytes: The free temporary disk space in bytes.
    • tmp_used_bytes: The used temporary disk space in bytes.
    • scratch_total_bytes: The total scratch disk space in bytes.
    • scratch_free_bytes: The free scratch disk space in bytes.
    • scratch_used_bytes: The used scratch disk space in bytes.
    • s3_transfer_bytes: The number of bytes transferred in from S3.
    • gcs_transfer_bytes: The number of bytes transferred in from Google Cloud Storage.
    • external_transfer_bytes: The number of bytes transferred in externally.
    • internal_transfer_bytes: The number of bytes transferred in internally.
    • comment: Any comment recorded with the metric.

Ancestors

  • marshmallow.schema.Schema

Class variables

var OPTIONS_CLASS : type

Defines defaults for marshmallow.Schema.Meta.

var TYPE_MAPPING : dict[type, type[Field]]

The type of the None singleton.

var dict_class : type[dict]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

var error_messages : dict[str, str]

The type of the None singleton.