Skip to content

deep.task

Provides processing options for tasks on background threads.

IllegalStateException

Bases: BaseException

This is raised when we are in an incompatible state.

Source code in deep/task/__init__.py
class IllegalStateException(BaseException):
    """This is raised when we are in an incompatible state."""

    pass

TaskHandler

Allow processing of tasks without blocking the current thread.

Source code in deep/task/__init__.py
class TaskHandler:
    """Allow processing of tasks without blocking the current thread."""

    def __init__(self):
        """Create a new TaskHandler to process tasks in a separate thread."""
        self._pool = ThreadPoolExecutor(max_workers=2)
        self._pending = {}
        self._job_id = 0
        self._lock = threading.Lock()
        self._open = True

    def _next_id(self):
        with self._lock:
            self._job_id += 1
            next_id = self._job_id
        return next_id

    def __check_open(self):
        if not self._open:
            raise IllegalStateException

    def submit_task(self, task, *args) -> Future:
        """
        Submit a task to be processed in the task thread.

        :param task: the task function to process
        :param args: the args to pass to the function
        :return: a future that can be listened to for completion
        """
        self.__check_open()
        next_id = self._next_id()
        # there is an at exit in threading that prevents submitting tasks after shutdown, but no api to check this
        future = self._pool.submit(task, *args)
        self._pending[next_id] = future

        # cannot use 'del' in lambda: https://stackoverflow.com/a/41953232/5151254
        def callback(_future: Future):
            if _future.exception() is not None:
                logging.exception("Submitted task failed %s", task)
            if next_id in self._pending:
                del self._pending[next_id]

        future.add_done_callback(callback)
        return future

    def flush(self):
        """Await completion of all pending tasks."""
        self._open = False
        if len(self._pending) > 0:
            for key in dict(self._pending).keys():
                get = self._pending.get(key)
                if get is not None:
                    self._pending[key].result(10)

__init__()

Create a new TaskHandler to process tasks in a separate thread.

Source code in deep/task/__init__.py
def __init__(self):
    """Create a new TaskHandler to process tasks in a separate thread."""
    self._pool = ThreadPoolExecutor(max_workers=2)
    self._pending = {}
    self._job_id = 0
    self._lock = threading.Lock()
    self._open = True

flush()

Await completion of all pending tasks.

Source code in deep/task/__init__.py
def flush(self):
    """Await completion of all pending tasks."""
    self._open = False
    if len(self._pending) > 0:
        for key in dict(self._pending).keys():
            get = self._pending.get(key)
            if get is not None:
                self._pending[key].result(10)

submit_task(task, *args)

Submit a task to be processed in the task thread.

:param task: the task function to process :param args: the args to pass to the function :return: a future that can be listened to for completion

Source code in deep/task/__init__.py
def submit_task(self, task, *args) -> Future:
    """
    Submit a task to be processed in the task thread.

    :param task: the task function to process
    :param args: the args to pass to the function
    :return: a future that can be listened to for completion
    """
    self.__check_open()
    next_id = self._next_id()
    # there is an at exit in threading that prevents submitting tasks after shutdown, but no api to check this
    future = self._pool.submit(task, *args)
    self._pending[next_id] = future

    # cannot use 'del' in lambda: https://stackoverflow.com/a/41953232/5151254
    def callback(_future: Future):
        if _future.exception() is not None:
            logging.exception("Submitted task failed %s", task)
        if next_id in self._pending:
            del self._pending[next_id]

    future.add_done_callback(callback)
    return future