Source code for tamr_unify_client.operation

from time import sleep, time as now

from tamr_unify_client.base_resource import BaseResource


[docs]class Operation(BaseResource): """A long-running operation performed by Tamr. Operations appear on the "Jobs" page of the Tamr UI. By design, client-side operations represent server-side operations *at a particular point in time* (namely, when the operation was fetched from the server). In other words: Operations *will not* pick up on server-side changes automatically. To get an up-to-date representation, refetch the operation e.g. ``op = op.poll()``. """ @classmethod def from_json(cls, client, resource_json, api_path=None): return super().from_data(client, resource_json, api_path)
[docs] @classmethod def from_response(cls, client, response): """ Handle idiosyncrasies in constructing Operations from Tamr responses. When a Tamr API call would start an operation, but all results that would be produced by that operation are already up-to-date, Tamr returns `HTTP 204 No Content` To make it easy for client code to handle these API responses without checking the response code, this method will either construct an Operation, or a dummy `NoOp` operation representing the 204 Success response. :param client: Delegate underlying API calls to this client. :type client: :class:`~tamr_unify_client.Client` :param response: HTTP Response from the request that started the operation. :type response: :class:`requests.Response` :return: Operation :rtype: :class:`~tamr_unify_client.operation.Operation` """ if response.status_code == 204: # Operation was successful, but the response contains no content. # Create a dummy operation to represent this. _never = "0000-00-00T00:00:00.000Z" _description = """Tamr returned HTTP 204 for this operation, indicating that all results that would be produced by the operation are already up-to-date.""" resource_json = { "id": "-1", "type": "NOOP", "description": _description, "status": { "state": "SUCCEEDED", "startTime": _never, "endTime": _never, "message": "", }, "created": {"username": "", "time": _never, "version": "-1"}, "lastModified": {"username": "", "time": _never, "version": "-1"}, "relativeId": "operations/-1", } else: resource_json = response.json() return Operation.from_json(client, resource_json)
[docs] def apply_options(self, asynchronous=False, **options): """Applies operation options to this operation. **NOTE**: This function **should not** be called directly. Rather, options should be passed in through a higher-level function e.g. :func:`~tamr_unify_client.dataset.resource.Dataset.refresh` . Synchronous mode: Automatically waits for operation to resolve before returning the operation. asynchronous mode: Immediately return the ``'PENDING'`` operation. It is up to the user to coordinate this operation with their code via :func:`~tamr_unify_client.operation.Operation.wait` and/or :func:`~tamr_unify_client.operation.Operation.poll` . :param asynchronous: Whether or not to run in asynchronous mode. Default: ``False``. :type asynchronous: bool :param ``**options``: When running in synchronous mode, these options are passed to the underlying :func:`~tamr_unify_client.operation.Operation.wait` call. :return: Operation with options applied. :rtype: :class:`~tamr_unify_client.operation.Operation` """ if asynchronous: return self return self.wait(**options)
@property def type(self): """:type: str""" return self._data.get("type") @property def description(self): """:type: str""" return self._data.get("description") @property def status(self): return self._data.get("status") @property def state(self): """Server-side state of this operation. Operation state can be unresolved (i.e. ``state`` is one of: ``'PENDING'``, ``'RUNNING'``), or resolved (i.e. `state` is one of: ``'CANCELED'``, ``'SUCCEEDED'``, ``'FAILED'``). Unless opting into asynchronous mode, all exposed operations should be resolved. Note: you only need to manually pick up server-side changes when opting into asynchronous mode when kicking off this operation. Usage: >>> op.state # operation is currently 'PENDING' 'PENDING' >>> op.wait() # continually polls until operation resolves >>> op.state # incorrect usage; operation object state never changes. 'PENDING' >>> op = op.poll() # correct usage; use value returned by Operation.poll or Operation.wait >>> op.state 'SUCCEEDED' """ return (self.status or {}).get("state")
[docs] def poll(self): """Poll this operation for server-side updates. Does not update the calling :class:`~tamr_unify_client.operation.Operation` object. Instead, returns a new :class:`~tamr_unify_client.operation.Operation`. :return: Updated representation of this operation. :rtype: :class:`~tamr_unify_client.operation.Operation` """ op_json = self.client.get(self.api_path).successful().json() return Operation.from_json(self.client, op_json)
[docs] def wait(self, poll_interval_seconds=3, timeout_seconds=None): """Continuously polls for this operation's server-side state. :param int poll_interval_seconds: Time interval (in seconds) between subsequent polls. :param int timeout_seconds: Time (in seconds) to wait for operation to resolve. :raises TimeoutError: If operation takes longer than `timeout_seconds` to resolve. :return: Resolved operation. :rtype: :class:`~tamr_unify_client.operation.Operation` """ started = now() op = self while timeout_seconds is None or now() - started < timeout_seconds: if op.state in ["PENDING", "RUNNING"]: sleep(poll_interval_seconds) elif op.state in ["CANCELED", "SUCCEEDED", "FAILED"]: return op op = op.poll() raise TimeoutError( f"Waiting for operation took longer than {timeout_seconds} seconds." )
[docs] def succeeded(self): """Convenience method for checking if operation was successful. :return: ``True`` if operation's state is ``'SUCCEEDED'``, ``False`` otherwise. :rtype: :py:class:`bool` """ return self.state == "SUCCEEDED"
def __repr__(self): return ( f"{self.__class__.__module__}." f"{self.__class__.__qualname__}(" f"relative_id={self.relative_id!r}, " f"description={self.description!r}, " f"state={self.state!r})" )