Module fusion_platform.models.process_execution

Process execution model class file.

author: Matthew Casey

© Digital Content Analysis Technology Ltd

Classes

class ProcessExecution (session, schema=None)
Expand source code
class ProcessExecution(Model):
    """
    Process execution model class providing attributes and methods to manipulate process execution item details.
    """

    # Override the schema.
    _SCHEMA = ProcessExecutionSchema()

    # Override the base model class name.
    _BASE_MODEL_CLASS_NAME = 'Organisation'  # A string to prevent circular imports.

    # Base path.
    _PATH_BASE = '/organisations/{organisation_id}/process_executions/{process_execution_id}'

    # Override the standard model paths.
    _PATH_DELETE = _PATH_BASE
    _PATH_GET = _PATH_BASE
    _PATH_PATCH = _PATH_BASE

    # Add in the custom model paths.
    _PATH_COMPONENTS = f"{_PATH_BASE}/process_service_executions"
    _PATH_CHANGE_DELETE_EXPIRY = f"{_PATH_BASE}/change_delete_expiry"

    def change_delete_expiry(self, delete_expiry):
        """
        Changes the delete expiry. This will either change the delete expiry of a single execution, or if the execution is in a group, all the corresponding
        executions in the group.

        Args:
            delete_expiry: The new delete expiry.

        Raises:
            RequestError: if the update fails.
        """
        body = {self.__class__.__name__: {'delete_expiry': delete_expiry.isoformat()}}
        self._session.request(path=self._get_path(self.__class__._PATH_CHANGE_DELETE_EXPIRY), method=Session.METHOD_POST, body=body)

    def check_complete(self, wait=False):
        """
        Checks whether the execution has completed. Optionally waits for the execution to complete.

        Args:
            wait: Optionally wait for the execution to complete? Default False.

        Returns:
            True if the execution is complete.

        Raises:
            RequestError: if any request fails.
            ModelError: if the execution failed.
            ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
        """
        complete = False

        # Optionally wait for the execution to finish.
        while not complete:
            # Load in the most recent version of the model.
            self.get(organisation_id=self.organisation_id)

            # See if the execution has completed.
            self._logger.debug('checking for execution %s to complete: %f', self.id, self.progress)
            complete = self.progress >= 100

            # Raise an exception if the execution failed.
            abort_reason = self.abort_reason if hasattr(self, Model._FIELD_ABORT_REASON) else None

            if complete and not self.success:
                self._logger.error(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason))
                raise ModelError(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason))

            if complete:
                self._logger.debug('execution %s is complete', self.id)

                if abort_reason is not None:
                    self._logger.warning(i18n.t('models.process_execution.execution_warning', abort_reason=abort_reason))

            if (not wait) or complete:
                break

            # We are waiting, so block for a short while.
            sleep(self._session.api_update_wait_period)

        return complete

    @property
    def components(self):
        """
        Provides an iterator through the process execution's components.

        Returns:
            An iterator through the component objects.

        Raises:
            RequestError: if any get fails.
            ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
        """
        return ProcessServiceExecution._models_from_api_path(self._session, self._get_path(self.__class__._PATH_COMPONENTS))

Process execution model class providing attributes and methods to manipulate process execution item details.

Initialises the object.

Args

session
The linked session object for interfacing with the Fusion Platform®.
schema
The optional schema to use instead of the schema defined by the class.

Ancestors

  • Model
  • fusion_platform.base.Base

Instance variables

prop attributes

Inherited from: Model.attributes

Returns

The model attributes as a dictionary.

prop components
Expand source code
@property
def components(self):
    """
    Provides an iterator through the process execution's components.

    Returns:
        An iterator through the component objects.

    Raises:
        RequestError: if any get fails.
        ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
    """
    return ProcessServiceExecution._models_from_api_path(self._session, self._get_path(self.__class__._PATH_COMPONENTS))

Provides an iterator through the process execution's components.

Returns

An iterator through the component objects.

Raises

RequestError
if any get fails.
ModelError
if a model could not be loaded or validated from the Fusion Platform®.

Methods

def change_delete_expiry(self, delete_expiry)
Expand source code
def change_delete_expiry(self, delete_expiry):
    """
    Changes the delete expiry. This will either change the delete expiry of a single execution, or if the execution is in a group, all the corresponding
    executions in the group.

    Args:
        delete_expiry: The new delete expiry.

    Raises:
        RequestError: if the update fails.
    """
    body = {self.__class__.__name__: {'delete_expiry': delete_expiry.isoformat()}}
    self._session.request(path=self._get_path(self.__class__._PATH_CHANGE_DELETE_EXPIRY), method=Session.METHOD_POST, body=body)

Changes the delete expiry. This will either change the delete expiry of a single execution, or if the execution is in a group, all the corresponding executions in the group.

Args

delete_expiry
The new delete expiry.

Raises

RequestError
if the update fails.
def check_complete(self, wait=False)
Expand source code
def check_complete(self, wait=False):
    """
    Checks whether the execution has completed. Optionally waits for the execution to complete.

    Args:
        wait: Optionally wait for the execution to complete? Default False.

    Returns:
        True if the execution is complete.

    Raises:
        RequestError: if any request fails.
        ModelError: if the execution failed.
        ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>&reg;</sup>.
    """
    complete = False

    # Optionally wait for the execution to finish.
    while not complete:
        # Load in the most recent version of the model.
        self.get(organisation_id=self.organisation_id)

        # See if the execution has completed.
        self._logger.debug('checking for execution %s to complete: %f', self.id, self.progress)
        complete = self.progress >= 100

        # Raise an exception if the execution failed.
        abort_reason = self.abort_reason if hasattr(self, Model._FIELD_ABORT_REASON) else None

        if complete and not self.success:
            self._logger.error(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason))
            raise ModelError(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason))

        if complete:
            self._logger.debug('execution %s is complete', self.id)

            if abort_reason is not None:
                self._logger.warning(i18n.t('models.process_execution.execution_warning', abort_reason=abort_reason))

        if (not wait) or complete:
            break

        # We are waiting, so block for a short while.
        sleep(self._session.api_update_wait_period)

    return complete

Checks whether the execution has completed. Optionally waits for the execution to complete.

Args

wait
Optionally wait for the execution to complete? Default False.

Returns

True if the execution is complete.

Raises

RequestError
if any request fails.
ModelError
if the execution failed.
ModelError
if a model could not be loaded or validated from the Fusion Platform®.
def delete(self)

Inherited from: Model.delete

Attempts to delete the model object. This assumes the model is deleted using a DELETE RESTful request …

def get(self, **kwargs)

Inherited from: Model.get

Gets the model object by loading it from the Fusion Platform®. Uses the model's current id and base model id for the get unless …

def to_csv(self, exclude=None)

Inherited from: Model.to_csv

Converts the model attributes into a CSV string …

def update(self, **kwargs)

Inherited from: Model.update

Attempts to update the model object with the given values. For models which have not been persisted, the relevant fields are updated without …

class ProcessExecutionSchema (*,
only: types.StrSequenceOrSet | None = None,
exclude: types.StrSequenceOrSet = (),
many: bool | None = None,
load_only: types.StrSequenceOrSet = (),
dump_only: types.StrSequenceOrSet = (),
partial: bool | types.StrSequenceOrSet | None = None,
unknown: types.UnknownOption | None = None)
Expand source code
class ProcessExecutionSchema(Schema):
    """
    Schema class for process execution model.

    Each process execution model has the following fields (and nested fields):

    .. include::process_execution.md
    """
    id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    created_at = fields.DateTime(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    updated_at = fields.DateTime(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    organisation_id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    process_id = fields.UUID(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    group_id = fields.UUID(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    group_index = fields.Integer(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    group_count = fields.Integer(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    options = fields.List(fields.Nested(ProcessExecutionOptionSchema()), allow_none=True)
    chains = fields.List(fields.Nested(ProcessExecutionChainSchema()), allow_none=True)
    # Removed creator.

    started_at = fields.DateTime(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    ended_at = fields.DateTime(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    # Removed wait_for_inputs.
    stopped = fields.Boolean(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    abort = fields.Boolean(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    abort_reason = fields.String(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    exit_type = fields.String(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    success = fields.Boolean(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    progress = fields.Integer(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    delete_expiry = fields.DateTime(required=True)
    delete_warning_status = fields.String(required=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    deletable = fields.String(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.
    delete_protection = fields.Boolean(allow_none=True, metadata={'read_only': True})  # Changed to prevent this being updated.

    class Meta:
        """
        When loading an object, make sure we exclude any unknown fields, rather than raising an exception, and put fields in their definition order.
        """
        unknown = EXCLUDE

Schema class for process execution model.

Each process execution model has the following fields (and nested fields):

  • id: The unique identifier for the record.
  • created_at: When was the record created?
  • updated_at: When was the record last updated?
  • organisation_id: The owning organisation.
  • process_id: The process executed.
  • group_id: The optional group that this execution belongs to.
  • group_index: The optional unique index for the execution within a group.
  • group_count: The optional number of executions within a group.
  • options: The options defined for this process execution.
    • ssd_id: The SSD for this option.
    • name: The name of the option.
    • value: The value for the option.
    • required: Is a value for the option required?
    • data_type: The data type associated with the selector.
    • validation: The optional validation for the option. This must be supplied for date/time and constrained values.
    • mutually_exclusive: The optional expression used by clients to determine whether this value is displayed compared with other option values.
    • advanced: Is this an option for advanced usage?
    • title: The title for the option.
    • description: The description of the option.
  • chains: The processing chain of SSDs which will be executed for this process execution.
    • ssd_id: The SSD for this part of the chain.
    • service_id: The corresponding specific version of the SSD.
    • inputs: The inputs to the service.
    • outputs: The outputs from the service.
    • options: The options from the service.
      • name: The name of the option.
      • value: The value for the option.
      • data_type: The data type associated with the option.
      • validation: The optional validation for the option. This must be supplied for date/time and constrained values.
    • intermediate: Is this an intermediate service within the chain?
  • started_at: When did the execution start?
  • ended_at: When did the execution end?
  • stopped: Has the execution been stopped by a user?
  • abort: Has the execution been aborted?
  • abort_reason: The reason for any abort of the execution.
  • exit_type: The type of exit experienced by the execution.
  • success: Has the execution completed successfully?
  • progress: Percentage progress of the execution.
  • delete_expiry: When will the execution expire and therefore be deleted?
  • delete_warning_status: What is the notification status for the delete warning?
  • deletable: Is this execution scheduled for deletion?
  • delete_protection: Is this execution prevented from being deleted irrespective of its delete expiry?

Ancestors

  • marshmallow.schema.Schema

Class variables

var OPTIONS_CLASS : type

Defines defaults for marshmallow.Schema.Meta.

var TYPE_MAPPING : dict[type, type[Field]]

The type of the None singleton.

var dict_class : type[dict]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

var error_messages : dict[str, str]

The type of the None singleton.