Thread workers#
The objects in this module provide utilities for running tasks in a separate
thread. In general (with the exception of new_worker_qthread
), everything
here wraps Qt's QRunnable API.
The highest level object is the
@thread_worker
decorator. It was originally
written for napari
, and was later extracted into superqt
. You may also be
interested in reading the napari
documentation on this feature,
which provides a more in-depth/introductory usage guide.
For additional control, you can create your own
FunctionWorker
or
GeneratorWorker
objects.
superqt.utils.WorkerBase
#
Bases: QRunnable
, Generic[_R]
Base class for creating a Worker that can run in another thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
SignalsClass
|
type
|
A QObject subclass that contains signals, by default WorkerBaseSignals |
WorkerBaseSignals
|
Attributes:
Name | Type | Description |
---|---|---|
signals |
WorkerBaseSignals
|
signal emitter object. To allow identify which worker thread emitted signal. |
abort_requested
property
#
Whether the worker has been requested to stop.
is_running
property
#
Whether the worker has been started.
__getattr__(name)
#
Pass through attr requests to signals to simplify connection API.
The goal is to enable worker.yielded.connect
instead of
worker.signals.yielded.connect
. Because multiple inheritance of Qt
classes is not well supported in PyQt, we have to use composition here
(signals are provided by QObjects, and QRunnable is not a QObject). So
this passthrough allows us to connect to signals on the _signals
object.
await_workers(msecs=None)
classmethod
#
Ask all workers to quit, and wait up to msec
for quit.
Attempts to clean up all running workers by calling worker.quit()
method. Any workers in the WorkerBase._worker_set
set will have this
method.
By default, this function will block indefinitely, until worker threads
finish. If a timeout is provided, a RuntimeError
will be raised if
the workers do not gracefully exit in the time requests, but the threads
will NOT be killed. It is (currently) left to the user to use their OS
to force-quit rogue threads.
Important
If the user does not put any yields in their function, and the function is super long, it will just hang... For instance, there's no graceful way to kill this thread in python:
@thread_worker
def ZZZzzz():
time.sleep(10000000)
This is why it's always advisable to use a generator that periodically yields for long-running computations in another thread.
See this stack-overflow post for a good discussion on the difficulty of killing a rogue python thread:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msecs
|
int
|
Waits up to msecs milliseconds for all threads to exit and removes all
threads from the thread pool. If msecs is |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
If a timeout is provided and workers do not quit successfully within the time allotted. |
quit()
#
Send a request to abort the worker.
Note
It is entirely up to subclasses to honor this method by checking
self.abort_requested
periodically in their worker.work
method, and exiting if True
.
run()
#
Start the worker.
The end-user should never need to call this function. But it cannot be made private or renamed, since it is called by Qt.
The order of method calls when starting a worker is:
calls QThreadPool.globalInstance().start(worker)
| triggered by the QThreadPool.start() method
| | called by worker.run
| | |
V V V
worker.start -> worker.run -> worker.work
This is the function that actually gets called when calling
QThreadPool.start(worker)
. It simply wraps the work()
method, and emits a few signals. Subclasses should NOT override this
method (except with good reason), and instead should implement
work()
.
start()
#
Start this worker in a thread and add it to the global threadpool.
The order of method calls when starting a worker is:
calls QThreadPool.globalInstance().start(worker)
| triggered by the QThreadPool.start() method
| | called by worker.run
| | |
V V V
worker.start -> worker.run -> worker.work
work()
#
Main method to execute the worker.
The end-user should never need to call this function.
But subclasses must implement this method (See
GeneratorFunction.work
for
an example implementation). Minimally, it should check self.abort_requested
periodically and exit if True.
Examples:
class MyWorker(WorkerBase):
def work(self):
i = 0
while True:
if self.abort_requested:
self.aborted.emit()
break
i += 1
if i > max_iters:
break
time.sleep(0.5)
superqt.utils.FunctionWorker
#
Bases: WorkerBase[_R]
QRunnable with signals that wraps a simple long-running function.
Note
FunctionWorker
does not provide a way to stop a very long-running
function (e.g. time.sleep(10000)
). So whenever possible, it is better to
implement your long running function as a generator that yields periodically,
and use the GeneratorWorker
instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func
|
Callable
|
A function to call in another thread |
required |
*args
|
will be passed to the function |
()
|
|
**kwargs
|
will be passed to the function |
{}
|
Raises:
Type | Description |
---|---|
TypeError
|
If |
superqt.utils.GeneratorWorker
#
Bases: WorkerBase
, Generic[_Y, _S, _R]
QRunnable with signals that wraps a long-running generator.
Provides a convenient way to run a generator function in another thread, while allowing 2-way communication between threads, using plain-python generator syntax in the original function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func
|
callable
|
The function being run in another thread. May be a generator function. |
required |
SignalsClass
|
type
|
A QObject subclass that contains signals, by default GeneratorWorkerSignals |
GeneratorWorkerSignals
|
*args
|
Will be passed to func on instantiation |
()
|
|
**kwargs
|
Will be passed to func on instantiation |
{}
|
is_paused
property
#
Whether the worker is currently paused.
pause()
#
Request to pause the worker.
resume()
#
Send a request to resume the worker.
send(value)
#
Send a value into the function (if a generator was used).
toggle_pause()
#
Request to pause the worker if playing or resume if paused.
work()
#
Core event loop that calls the original function.
Enters a continual loop, yielding and returning from the original function. Checks for various events (quit, pause, resume, etc...). (To clarify: we are creating a rudimentary event loop here because there IS NO Qt event loop running in the other thread to hook into)
Convenience functions#
superqt.utils.thread_worker(function=None, start_thread=None, connect=None, worker_class=None, ignore_errors=False)
#
thread_worker(
function: Callable[_P, Generator[_Y, _S, _R]],
start_thread: bool | None = None,
connect: dict[str, Callable | Sequence[Callable]]
| None = None,
worker_class: type[WorkerBase] | None = None,
ignore_errors: bool = False,
) -> Callable[_P, GeneratorWorker[_Y, _S, _R]]
thread_worker(
function: Callable[_P, _R],
start_thread: bool | None = None,
connect: dict[str, Callable | Sequence[Callable]]
| None = None,
worker_class: type[WorkerBase] | None = None,
ignore_errors: bool = False,
) -> Callable[_P, FunctionWorker[_R]]
thread_worker(
function: Literal[None] = None,
start_thread: bool | None = None,
connect: dict[str, Callable | Sequence[Callable]]
| None = None,
worker_class: type[WorkerBase] | None = None,
ignore_errors: bool = False,
) -> Callable[
[Callable],
Callable[_P, FunctionWorker | GeneratorWorker],
]
Decorator that runs a function in a separate thread when called.
When called, the decorated function returns a
WorkerBase
. See
create_worker
for additional keyword arguments that
can be used
when calling the function.
The returned worker will have these signals:
- started: emitted when the work is started
- finished: emitted when the work is finished
- returned: emitted with return value
- errored: emitted with error object on Exception
It will also have a worker.start()
method that can be used to start
execution of the function in another thread. (useful if you need to connect
callbacks to signals prior to execution)
If the decorated function is a generator, the returned worker will also provide these signals:
- yielded: emitted with yielded values
- paused: emitted when a running job has successfully paused
- resumed: emitted when a paused job has successfully resumed
- aborted: emitted when a running job is successfully aborted
And these methods:
- quit: ask the thread to quit
- toggle_paused: toggle the running state of the thread.
- send: send a value into the generator. (This requires that your
decorator function uses the
value = yield
syntax)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function
|
callable
|
Function to call in another thread. For communication between threads may be a generator function. |
None
|
start_thread
|
bool
|
Whether to immediaetly start the thread. If False, the returned worker
must be manually started with |
None
|
connect
|
Dict[str, Union[Callable, Sequence]]
|
A mapping of |
None
|
worker_class
|
Type[WorkerBase]
|
The |
None
|
ignore_errors
|
bool
|
If |
False
|
Returns:
Type | Description |
---|---|
callable
|
function that creates a worker, puts it in a new thread and returns the worker instance. |
Examples:
@thread_worker
def long_function(start, end):
# do work, periodically yielding
i = start
while i <= end:
time.sleep(0.1)
yield i
# do teardown
return "anything"
# call the function to start running in another thread.
worker = long_function()
# connect signals here if desired... or they may be added using the
# `connect` argument in the `@thread_worker` decorator... in which
# case the worker will start immediately when long_function() is called
worker.start()
superqt.utils.create_worker(func, *args, _start_thread=None, _connect=None, _worker_class=None, _ignore_errors=False, **kwargs)
#
create_worker(
func: Callable[_P, Generator[_Y, _S, _R]],
*args,
_start_thread: bool | None = None,
_connect: dict[str, Callable | Sequence[Callable]]
| None = None,
_worker_class: type[GeneratorWorker]
| type[FunctionWorker]
| None = None,
_ignore_errors: bool = False,
**kwargs,
) -> GeneratorWorker[_Y, _S, _R]
create_worker(
func: Callable[_P, _R],
*args,
_start_thread: bool | None = None,
_connect: dict[str, Callable | Sequence[Callable]]
| None = None,
_worker_class: type[GeneratorWorker]
| type[FunctionWorker]
| None = None,
_ignore_errors: bool = False,
**kwargs,
) -> FunctionWorker[_R]
Convenience function to start a function in another thread.
By default, uses FunctionWorker
for functions and GeneratorWorker
for
generators, but a custom WorkerBase
subclass may be provided. If so, it must be a
subclass of WorkerBase
, which defines a standard set of signals and a run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func
|
Callable
|
The function to call in another thread. |
required |
_start_thread
|
bool
|
Whether to immediaetly start the thread. If False, the returned worker
must be manually started with |
None
|
_connect
|
Dict[str, Union[Callable, Sequence]]
|
A mapping of |
None
|
_worker_class
|
type of `GeneratorWorker` or `FunctionWorker`
|
The |
None
|
_ignore_errors
|
bool
|
If |
False
|
*args
|
will be passed to |
()
|
|
**kwargs
|
will be passed to |
{}
|
Returns:
Name | Type | Description |
---|---|---|
worker |
WorkerBase
|
An instantiated worker. If |
Raises:
Type | Description |
---|---|
TypeError
|
If a worker_class is provided that is not a subclass of WorkerBase. |
TypeError
|
If _connect is provided and is not a dict of |
Examples:
def long_function(duration):
import time
time.sleep(duration)
worker = create_worker(long_function, 10)
superqt.utils.new_worker_qthread(Worker, *args, _start_thread=False, _connect=None, **kwargs)
#
Convenience function to start a worker in a QThread
.
thread, not as the actual code or object that runs in that thread. The QThread object is created on the main thread and lives there.
Worker objects which derive from QObject are the things that actually do the work. They can be moved to a QThread as is done here.
Mostly ignorable detail
While the signals/slots syntax of the worker looks very similar to standard "single-threaded" signals & slots, note that inter-thread signals and slots (automatically) use an event-based QueuedConnection, while intra-thread signals use a DirectConnection. See Signals and Slots Across Threads
Parameters:
Name | Type | Description | Default |
---|---|---|---|
Worker
|
QObject
|
QObject type that implements a |
required |
_start_thread
|
bool
|
If True, thread will be started immediately, otherwise, thread must be manually started with thread.start(). |
False
|
_connect
|
dict
|
Optional dictionary of {signal: function} to connect to the new worker. for instance: _connect = {'incremented': myfunc} will result in: worker.incremented.connect(myfunc) |
None
|
*args
|
will be passed to the Worker class on instantiation. |
()
|
|
**kwargs
|
will be passed to the Worker class on instantiation. |
{}
|
Returns:
Name | Type | Description |
---|---|---|
worker |
WorkerBase
|
The created worker. |
thread |
QThread
|
The thread on which the worker is running. |
Examples:
Create some QObject that has a long-running work method:
class Worker(QObject):
finished = Signal()
increment = Signal(int)
def __init__(self, argument):
super().__init__()
self.argument = argument
@Slot()
def work(self):
# some long running task...
import time
for i in range(10):
time.sleep(1)
self.increment.emit(i)
self.finished.emit()
worker, thread = new_worker_qthread(
Worker,
"argument",
_start_thread=True,
_connect={"increment": print},
)