Module fusion_platform.models.process_execution
Process execution model class file.
author: Matthew Casey
Classes
class ProcessExecution (session, schema=None)-
Expand source code
class ProcessExecution(Model): """ Process execution model class providing attributes and methods to manipulate process execution item details. """ # Override the schema. _SCHEMA = ProcessExecutionSchema() # Override the base model class name. _BASE_MODEL_CLASS_NAME = 'Organisation' # A string to prevent circular imports. # Base path. _PATH_BASE = '/organisations/{organisation_id}/process_executions/{process_execution_id}' # Override the standard model paths. _PATH_DELETE = _PATH_BASE _PATH_GET = _PATH_BASE _PATH_PATCH = _PATH_BASE # Add in the custom model paths. _PATH_COMPONENTS = f"{_PATH_BASE}/process_service_executions" _PATH_CHANGE_DELETE_EXPIRY = f"{_PATH_BASE}/change_delete_expiry" def change_delete_expiry(self, delete_expiry): """ Changes the delete expiry. This will either change the delete expiry of a single execution, or if the execution is in a group, all the corresponding executions in the group. Args: delete_expiry: The new delete expiry. Raises: RequestError: if the update fails. """ body = {self.__class__.__name__: {'delete_expiry': delete_expiry.isoformat()}} self._session.request(path=self._get_path(self.__class__._PATH_CHANGE_DELETE_EXPIRY), method=Session.METHOD_POST, body=body) def check_complete(self, wait=False): """ Checks whether the execution has completed. Optionally waits for the execution to complete. Args: wait: Optionally wait for the execution to complete? Default False. Returns: True if the execution is complete. Raises: RequestError: if any request fails. ModelError: if the execution failed. ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>®</sup>. """ complete = False # Optionally wait for the execution to finish. while not complete: # Load in the most recent version of the model. self.get(organisation_id=self.organisation_id) # See if the execution has completed. self._logger.debug('checking for execution %s to complete: %f', self.id, self.progress) complete = self.progress >= 100 # Raise an exception if the execution failed. abort_reason = self.abort_reason if hasattr(self, Model._FIELD_ABORT_REASON) else None if complete and not self.success: self._logger.error(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason)) raise ModelError(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason)) if complete: self._logger.debug('execution %s is complete', self.id) if abort_reason is not None: self._logger.warning(i18n.t('models.process_execution.execution_warning', abort_reason=abort_reason)) if (not wait) or complete: break # We are waiting, so block for a short while. sleep(self._session.api_update_wait_period) return complete @property def components(self): """ Provides an iterator through the process execution's components. Returns: An iterator through the component objects. Raises: RequestError: if any get fails. ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>®</sup>. """ return ProcessServiceExecution._models_from_api_path(self._session, self._get_path(self.__class__._PATH_COMPONENTS))Process execution model class providing attributes and methods to manipulate process execution item details.
Initialises the object.
Args
session- The linked session object for interfacing with the Fusion Platform®.
schema- The optional schema to use instead of the schema defined by the class.
Ancestors
- Model
- fusion_platform.base.Base
Instance variables
prop attributes-
Inherited from:
Model.attributesReturns
The model attributes as a dictionary.
prop components-
Expand source code
@property def components(self): """ Provides an iterator through the process execution's components. Returns: An iterator through the component objects. Raises: RequestError: if any get fails. ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>®</sup>. """ return ProcessServiceExecution._models_from_api_path(self._session, self._get_path(self.__class__._PATH_COMPONENTS))Provides an iterator through the process execution's components.
Returns
An iterator through the component objects.
Raises
RequestError- if any get fails.
ModelError- if a model could not be loaded or validated from the Fusion Platform®.
Methods
def change_delete_expiry(self, delete_expiry)-
Expand source code
def change_delete_expiry(self, delete_expiry): """ Changes the delete expiry. This will either change the delete expiry of a single execution, or if the execution is in a group, all the corresponding executions in the group. Args: delete_expiry: The new delete expiry. Raises: RequestError: if the update fails. """ body = {self.__class__.__name__: {'delete_expiry': delete_expiry.isoformat()}} self._session.request(path=self._get_path(self.__class__._PATH_CHANGE_DELETE_EXPIRY), method=Session.METHOD_POST, body=body)Changes the delete expiry. This will either change the delete expiry of a single execution, or if the execution is in a group, all the corresponding executions in the group.
Args
delete_expiry- The new delete expiry.
Raises
RequestError- if the update fails.
def check_complete(self, wait=False)-
Expand source code
def check_complete(self, wait=False): """ Checks whether the execution has completed. Optionally waits for the execution to complete. Args: wait: Optionally wait for the execution to complete? Default False. Returns: True if the execution is complete. Raises: RequestError: if any request fails. ModelError: if the execution failed. ModelError: if a model could not be loaded or validated from the Fusion Platform<sup>®</sup>. """ complete = False # Optionally wait for the execution to finish. while not complete: # Load in the most recent version of the model. self.get(organisation_id=self.organisation_id) # See if the execution has completed. self._logger.debug('checking for execution %s to complete: %f', self.id, self.progress) complete = self.progress >= 100 # Raise an exception if the execution failed. abort_reason = self.abort_reason if hasattr(self, Model._FIELD_ABORT_REASON) else None if complete and not self.success: self._logger.error(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason)) raise ModelError(i18n.t('models.process_execution.execution_failed', abort_reason=abort_reason)) if complete: self._logger.debug('execution %s is complete', self.id) if abort_reason is not None: self._logger.warning(i18n.t('models.process_execution.execution_warning', abort_reason=abort_reason)) if (not wait) or complete: break # We are waiting, so block for a short while. sleep(self._session.api_update_wait_period) return completeChecks whether the execution has completed. Optionally waits for the execution to complete.
Args
wait- Optionally wait for the execution to complete? Default False.
Returns
True if the execution is complete.
Raises
RequestError- if any request fails.
ModelError- if the execution failed.
ModelError- if a model could not be loaded or validated from the Fusion Platform®.
def delete(self)-
Attempts to delete the model object. This assumes the model is deleted using a DELETE RESTful request …
def get(self, **kwargs)-
Gets the model object by loading it from the Fusion Platform®. Uses the model's current id and base model id for the get unless …
def to_csv(self, exclude=None)-
Converts the model attributes into a CSV string …
def update(self, **kwargs)-
Attempts to update the model object with the given values. For models which have not been persisted, the relevant fields are updated without …
class ProcessExecutionSchema (*,
only: types.StrSequenceOrSet | None = None,
exclude: types.StrSequenceOrSet = (),
many: bool | None = None,
load_only: types.StrSequenceOrSet = (),
dump_only: types.StrSequenceOrSet = (),
partial: bool | types.StrSequenceOrSet | None = None,
unknown: types.UnknownOption | None = None)-
Expand source code
class ProcessExecutionSchema(Schema): """ Schema class for process execution model. Each process execution model has the following fields (and nested fields): .. include::process_execution.md """ id = fields.UUID(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. created_at = fields.DateTime(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. updated_at = fields.DateTime(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. organisation_id = fields.UUID(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. process_id = fields.UUID(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. group_id = fields.UUID(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. group_index = fields.Integer(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. group_count = fields.Integer(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. options = fields.List(fields.Nested(ProcessExecutionOptionSchema()), allow_none=True) chains = fields.List(fields.Nested(ProcessExecutionChainSchema()), allow_none=True) # Removed creator. started_at = fields.DateTime(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. ended_at = fields.DateTime(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. # Removed wait_for_inputs. stopped = fields.Boolean(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. abort = fields.Boolean(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. abort_reason = fields.String(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. exit_type = fields.String(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. success = fields.Boolean(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. progress = fields.Integer(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. delete_expiry = fields.DateTime(required=True) delete_warning_status = fields.String(required=True, metadata={'read_only': True}) # Changed to prevent this being updated. deletable = fields.String(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. delete_protection = fields.Boolean(allow_none=True, metadata={'read_only': True}) # Changed to prevent this being updated. class Meta: """ When loading an object, make sure we exclude any unknown fields, rather than raising an exception, and put fields in their definition order. """ unknown = EXCLUDESchema class for process execution model.
Each process execution model has the following fields (and nested fields):
- id: The unique identifier for the record.
- created_at: When was the record created?
- updated_at: When was the record last updated?
- organisation_id: The owning organisation.
- process_id: The process executed.
- group_id: The optional group that this execution belongs to.
- group_index: The optional unique index for the execution within a group.
- group_count: The optional number of executions within a group.
- options: The options defined for this process execution.
- ssd_id: The SSD for this option.
- name: The name of the option.
- value: The value for the option.
- required: Is a value for the option required?
- data_type: The data type associated with the selector.
- validation: The optional validation for the option. This must be supplied for date/time and constrained values.
- mutually_exclusive: The optional expression used by clients to determine whether this value is displayed compared with other option values.
- advanced: Is this an option for advanced usage?
- title: The title for the option.
- description: The description of the option.
- chains: The processing chain of SSDs which will be executed for this process execution.
- ssd_id: The SSD for this part of the chain.
- service_id: The corresponding specific version of the SSD.
- inputs: The inputs to the service.
- outputs: The outputs from the service.
- options: The options from the service.
- name: The name of the option.
- value: The value for the option.
- data_type: The data type associated with the option.
- validation: The optional validation for the option. This must be supplied for date/time and constrained values.
- intermediate: Is this an intermediate service within the chain?
- started_at: When did the execution start?
- ended_at: When did the execution end?
- stopped: Has the execution been stopped by a user?
- abort: Has the execution been aborted?
- abort_reason: The reason for any abort of the execution.
- exit_type: The type of exit experienced by the execution.
- success: Has the execution completed successfully?
- progress: Percentage progress of the execution.
- delete_expiry: When will the execution expire and therefore be deleted?
- delete_warning_status: What is the notification status for the delete warning?
- deletable: Is this execution scheduled for deletion?
- delete_protection: Is this execution prevented from being deleted irrespective of its delete expiry?
Ancestors
- marshmallow.schema.Schema
Class variables
var OPTIONS_CLASS : type-
Defines defaults for
marshmallow.Schema.Meta. var TYPE_MAPPING : dict[type, type[Field]]-
The type of the None singleton.
var dict_class : type[dict]-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
var error_messages : dict[str, str]-
The type of the None singleton.