You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In order to facilitate a lot of functional tests with Dramatiq, we wrote this little fixture for use with pytest:
@dataclasses.dataclassclassDramatiqFixture:
"""A fixture for running dramatiq jobs in the background. This encapsulates the broker, and the worker, and provides methods to pause and resume the worker, and to wait for it to finish. """worker: dramatiq.Workerbroker: dramatiq.brokers.stub.StubBroker_is_paused: bool=Falsedefget_actor(self, actor_name: str) ->dramatiq.Actor:
returnself.broker.get_actor(actor_name)
@propertydefqueues(self) ->dict[str, queue.Queue]:
returnself.broker.queuesdefwait(self, *, queue_name: str="default", fail_fast: bool=False):
"""Wait for the background worker to finish."""ifself._is_paused:
self.resume()
self.broker.join(queue_name=queue_name, fail_fast=fail_fast)
self.worker.join()
defpause(self):
"""Pause the background worker."""self.worker.pause()
self._is_paused=Truedefresume(self):
"""Resume the background worker."""self.worker.resume()
self._is_paused=False@pytest.fixture(name="dramatiq_q")defdramatiq_broker_worker_fixture(
transactional_db, restore_db
) ->Generator[DramatiqFixture, None, None]:
"""Using this fixture starts up a background worker to run dramatiq jobs."""broker=dramatiq.get_broker()
assertisinstance(broker, dramatiq.brokers.stub.StubBroker)
broker.flush_all()
worker=dramatiq.Worker(broker, worker_timeout=100)
worker.start()
yieldDramatiqFixture(worker=worker, broker=broker)
broker.flush_all()
worker.stop(timeout=5*1000)
The idea being you can import it in any test, and call things like wait() on it to force jobs to run. However, when redently adding more tests that use this, a prior test that was working, started always failing by timing out on the wait. So somehow the multiple tests are interfering with each other, but I can't figure out why. Running both the new test, or the old test, individually they run fine. It's only in combination that it fails.
What did you expect would happen?
Ideally we could have all tests be idempotent.
What happened?
Crashed. 400s timeout on calling dramatiq_q.wait() on a test that only runs one job and that job is being aborted by raising an error in the middleware and setting message.failed = True
The text was updated successfully, but these errors were encountered:
What OS are you using?
Debian Bookworm
What version of Dramatiq are you using?
1.14.2
What did you do?
In order to facilitate a lot of functional tests with Dramatiq, we wrote this little fixture for use with pytest:
The idea being you can import it in any test, and call things like wait() on it to force jobs to run. However, when redently adding more tests that use this, a prior test that was working, started always failing by timing out on the wait. So somehow the multiple tests are interfering with each other, but I can't figure out why. Running both the new test, or the old test, individually they run fine. It's only in combination that it fails.
What did you expect would happen?
Ideally we could have all tests be idempotent.
What happened?
Crashed. 400s timeout on calling dramatiq_q.wait() on a test that only runs one job and that job is being aborted by raising an error in the middleware and setting
message.failed = True
The text was updated successfully, but these errors were encountered: