Skip to content

Thread workers#

The objects in this module provide utilities for running tasks in a separate thread. In general (with the exception of new_worker_qthread), everything here wraps Qt's QRunnable API.

The highest level object is the @thread_worker decorator. It was originally written for napari, and was later extracted into superqt. You may also be interested in reading the napari documentation on this feature, which provides a more in-depth/introductory usage guide.

For additional control, you can create your own FunctionWorker or GeneratorWorker objects.

superqt.utils.WorkerBase #

Bases: QRunnable, Generic[_R]

Base class for creating a Worker that can run in another thread.

Parameters:

Name Type Description Default
SignalsClass type

A QObject subclass that contains signals, by default WorkerBaseSignals

WorkerBaseSignals

Attributes:

Name Type Description
signals WorkerBaseSignals

signal emitter object. To allow identify which worker thread emitted signal.

abort_requested property #

Whether the worker has been requested to stop.

is_running property #

Whether the worker has been started.

__getattr__(name) #

Pass through attr requests to signals to simplify connection API.

The goal is to enable worker.yielded.connect instead of worker.signals.yielded.connect. Because multiple inheritance of Qt classes is not well supported in PyQt, we have to use composition here (signals are provided by QObjects, and QRunnable is not a QObject). So this passthrough allows us to connect to signals on the _signals object.

await_workers(msecs=None) classmethod #

Ask all workers to quit, and wait up to msec for quit.

Attempts to clean up all running workers by calling worker.quit() method. Any workers in the WorkerBase._worker_set set will have this method.

By default, this function will block indefinitely, until worker threads finish. If a timeout is provided, a RuntimeError will be raised if the workers do not gracefully exit in the time requests, but the threads will NOT be killed. It is (currently) left to the user to use their OS to force-quit rogue threads.

Important

If the user does not put any yields in their function, and the function is super long, it will just hang... For instance, there's no graceful way to kill this thread in python:

@thread_worker
def ZZZzzz():
    time.sleep(10000000)

This is why it's always advisable to use a generator that periodically yields for long-running computations in another thread.

See this stack-overflow post for a good discussion on the difficulty of killing a rogue python thread:

Parameters:

Name Type Description Default
msecs int

Waits up to msecs milliseconds for all threads to exit and removes all threads from the thread pool. If msecs is None (the default), the timeout is ignored (waits for the last thread to exit).

None

Raises:

Type Description
RuntimeError

If a timeout is provided and workers do not quit successfully within the time allotted.

quit() #

Send a request to abort the worker.

Note

It is entirely up to subclasses to honor this method by checking self.abort_requested periodically in their worker.work method, and exiting if True.

run() #

Start the worker.

The end-user should never need to call this function. But it cannot be made private or renamed, since it is called by Qt.

The order of method calls when starting a worker is:

   calls QThreadPool.globalInstance().start(worker)
   |               triggered by the QThreadPool.start() method
   |               |             called by worker.run
   |               |             |
   V               V             V
   worker.start -> worker.run -> worker.work

This is the function that actually gets called when calling QThreadPool.start(worker). It simply wraps the work() method, and emits a few signals. Subclasses should NOT override this method (except with good reason), and instead should implement work().

start() #

Start this worker in a thread and add it to the global threadpool.

The order of method calls when starting a worker is:

   calls QThreadPool.globalInstance().start(worker)
   |               triggered by the QThreadPool.start() method
   |               |             called by worker.run
   |               |             |
   V               V             V
   worker.start -> worker.run -> worker.work

work() #

Main method to execute the worker.

The end-user should never need to call this function. But subclasses must implement this method (See GeneratorFunction.work for an example implementation). Minimally, it should check self.abort_requested periodically and exit if True.

Examples:

class MyWorker(WorkerBase):
    def work(self):
        i = 0
        while True:
            if self.abort_requested:
                self.aborted.emit()
                break
            i += 1
            if i > max_iters:
                break
            time.sleep(0.5)

superqt.utils.FunctionWorker #

Bases: WorkerBase[_R]

QRunnable with signals that wraps a simple long-running function.

Note

FunctionWorker does not provide a way to stop a very long-running function (e.g. time.sleep(10000)). So whenever possible, it is better to implement your long running function as a generator that yields periodically, and use the GeneratorWorker instead.

Parameters:

Name Type Description Default
func Callable

A function to call in another thread

required
*args

will be passed to the function

()
**kwargs

will be passed to the function

{}

Raises:

Type Description
TypeError

If func is a generator function and not a regular function.

superqt.utils.GeneratorWorker #

Bases: WorkerBase, Generic[_Y, _S, _R]

QRunnable with signals that wraps a long-running generator.

Provides a convenient way to run a generator function in another thread, while allowing 2-way communication between threads, using plain-python generator syntax in the original function.

Parameters:

Name Type Description Default
func callable

The function being run in another thread. May be a generator function.

required
SignalsClass type

A QObject subclass that contains signals, by default GeneratorWorkerSignals

GeneratorWorkerSignals
*args

Will be passed to func on instantiation

()
**kwargs

Will be passed to func on instantiation

{}

is_paused property #

Whether the worker is currently paused.

pause() #

Request to pause the worker.

resume() #

Send a request to resume the worker.

send(value) #

Send a value into the function (if a generator was used).

toggle_pause() #

Request to pause the worker if playing or resume if paused.

work() #

Core event loop that calls the original function.

Enters a continual loop, yielding and returning from the original function. Checks for various events (quit, pause, resume, etc...). (To clarify: we are creating a rudimentary event loop here because there IS NO Qt event loop running in the other thread to hook into)

Convenience functions#

superqt.utils.thread_worker(function=None, start_thread=None, connect=None, worker_class=None, ignore_errors=False) #

thread_worker(
    function: Callable[_P, Generator[_Y, _S, _R]],
    start_thread: bool | None = None,
    connect: dict[str, Callable | Sequence[Callable]]
    | None = None,
    worker_class: type[WorkerBase] | None = None,
    ignore_errors: bool = False,
) -> Callable[_P, GeneratorWorker[_Y, _S, _R]]
thread_worker(
    function: Callable[_P, _R],
    start_thread: bool | None = None,
    connect: dict[str, Callable | Sequence[Callable]]
    | None = None,
    worker_class: type[WorkerBase] | None = None,
    ignore_errors: bool = False,
) -> Callable[_P, FunctionWorker[_R]]
thread_worker(
    function: Literal[None] = None,
    start_thread: bool | None = None,
    connect: dict[str, Callable | Sequence[Callable]]
    | None = None,
    worker_class: type[WorkerBase] | None = None,
    ignore_errors: bool = False,
) -> Callable[
    [Callable],
    Callable[_P, FunctionWorker | GeneratorWorker],
]

Decorator that runs a function in a separate thread when called.

When called, the decorated function returns a WorkerBase. See create_worker for additional keyword arguments that can be used when calling the function.

The returned worker will have these signals:

  • started: emitted when the work is started
  • finished: emitted when the work is finished
  • returned: emitted with return value
  • errored: emitted with error object on Exception

It will also have a worker.start() method that can be used to start execution of the function in another thread. (useful if you need to connect callbacks to signals prior to execution)

If the decorated function is a generator, the returned worker will also provide these signals:

  • yielded: emitted with yielded values
  • paused: emitted when a running job has successfully paused
  • resumed: emitted when a paused job has successfully resumed
  • aborted: emitted when a running job is successfully aborted

And these methods:

  • quit: ask the thread to quit
  • toggle_paused: toggle the running state of the thread.
  • send: send a value into the generator. (This requires that your decorator function uses the value = yield syntax)

Parameters:

Name Type Description Default
function callable

Function to call in another thread. For communication between threads may be a generator function.

None
start_thread bool

Whether to immediaetly start the thread. If False, the returned worker must be manually started with worker.start(). by default it will be False if the _connect argument is None, otherwise True.

None
connect Dict[str, Union[Callable, Sequence]]

A mapping of "signal_name" -> callable or list of callable: callback functions to connect to the various signals offered by the worker class. by default None

None
worker_class Type[WorkerBase]

The WorkerBase to instantiate, by default FunctionWorker will be used if func is a regular function, and GeneratorWorker will be used if it is a generator.

None
ignore_errors bool

If False (the default), errors raised in the other thread will be reraised in the main thread (makes debugging significantly easier).

False

Returns:

Type Description
callable

function that creates a worker, puts it in a new thread and returns the worker instance.

Examples:

@thread_worker
def long_function(start, end):
    # do work, periodically yielding
    i = start
    while i <= end:
        time.sleep(0.1)
        yield i

    # do teardown
    return "anything"


# call the function to start running in another thread.
worker = long_function()

# connect signals here if desired... or they may be added using the
# `connect` argument in the `@thread_worker` decorator... in which
# case the worker will start immediately when long_function() is called
worker.start()

superqt.utils.create_worker(func, *args, _start_thread=None, _connect=None, _worker_class=None, _ignore_errors=False, **kwargs) #

create_worker(
    func: Callable[_P, Generator[_Y, _S, _R]],
    *args,
    _start_thread: bool | None = None,
    _connect: dict[str, Callable | Sequence[Callable]]
    | None = None,
    _worker_class: type[GeneratorWorker]
    | type[FunctionWorker]
    | None = None,
    _ignore_errors: bool = False,
    **kwargs,
) -> GeneratorWorker[_Y, _S, _R]
create_worker(
    func: Callable[_P, _R],
    *args,
    _start_thread: bool | None = None,
    _connect: dict[str, Callable | Sequence[Callable]]
    | None = None,
    _worker_class: type[GeneratorWorker]
    | type[FunctionWorker]
    | None = None,
    _ignore_errors: bool = False,
    **kwargs,
) -> FunctionWorker[_R]

Convenience function to start a function in another thread.

By default, uses FunctionWorker for functions and GeneratorWorker for generators, but a custom WorkerBase subclass may be provided. If so, it must be a subclass of WorkerBase, which defines a standard set of signals and a run method.

Parameters:

Name Type Description Default
func Callable

The function to call in another thread.

required
_start_thread bool

Whether to immediaetly start the thread. If False, the returned worker must be manually started with worker.start(). by default it will be False if the _connect argument is None, otherwise True.

None
_connect Dict[str, Union[Callable, Sequence]]

A mapping of "signal_name" -> callable or list of callable: callback functions to connect to the various signals offered by the worker class. by default None

None
_worker_class type of `GeneratorWorker` or `FunctionWorker`

The WorkerBase to instantiate, by default FunctionWorker will be used if func is a regular function, and GeneratorWorker will be used if it is a generator.

None
_ignore_errors bool

If False (the default), errors raised in the other thread will be reraised in the main thread (makes debugging significantly easier).

False
*args

will be passed to func

()
**kwargs

will be passed to func

{}

Returns:

Name Type Description
worker WorkerBase

An instantiated worker. If _start_thread was False, the worker will have a .start() method that can be used to start the thread.

Raises:

Type Description
TypeError

If a worker_class is provided that is not a subclass of WorkerBase.

TypeError

If _connect is provided and is not a dict of {str: callable}

Examples:

def long_function(duration):
    import time

    time.sleep(duration)


worker = create_worker(long_function, 10)

superqt.utils.new_worker_qthread(Worker, *args, _start_thread=False, _connect=None, **kwargs) #

Convenience function to start a worker in a QThread.

thread, not as the actual code or object that runs in that thread. The QThread object is created on the main thread and lives there.

Worker objects which derive from QObject are the things that actually do the work. They can be moved to a QThread as is done here.

Mostly ignorable detail

While the signals/slots syntax of the worker looks very similar to standard "single-threaded" signals & slots, note that inter-thread signals and slots (automatically) use an event-based QueuedConnection, while intra-thread signals use a DirectConnection. See Signals and Slots Across Threads

Parameters:

Name Type Description Default
Worker QObject

QObject type that implements a work() method. The Worker should also emit a finished signal when the work is done.

required
_start_thread bool

If True, thread will be started immediately, otherwise, thread must be manually started with thread.start().

False
_connect dict

Optional dictionary of {signal: function} to connect to the new worker. for instance: _connect = {'incremented': myfunc} will result in: worker.incremented.connect(myfunc)

None
*args

will be passed to the Worker class on instantiation.

()
**kwargs

will be passed to the Worker class on instantiation.

{}

Returns:

Name Type Description
worker WorkerBase

The created worker.

thread QThread

The thread on which the worker is running.

Examples:

Create some QObject that has a long-running work method:

class Worker(QObject):
    finished = Signal()
    increment = Signal(int)

    def __init__(self, argument):
        super().__init__()
        self.argument = argument

    @Slot()
    def work(self):
        # some long running task...
        import time

        for i in range(10):
            time.sleep(1)
            self.increment.emit(i)
        self.finished.emit()


worker, thread = new_worker_qthread(
    Worker,
    "argument",
    _start_thread=True,
    _connect={"increment": print},
)