diff --git a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py index 1d139710d5..51b4344a69 100644 --- a/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py +++ b/taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py @@ -37,7 +37,6 @@ def __init__(self, orchestrator: _AbstractOrchestrator): threading.Thread.__init__(self, name="Thread-Taipy-JobDispatcher") self.daemon = True self.orchestrator = orchestrator - self.lock = self.orchestrator.lock # type: ignore Config.block_update() def start(self): @@ -64,10 +63,9 @@ def run(self): while not self._STOP_FLAG: try: if self._can_execute(): - with self.lock: - if self._STOP_FLAG: - break - job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) + if self._STOP_FLAG: + break + job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1) self._execute_job(job) else: time.sleep(0.1) # We need to sleep to avoid busy waiting. @@ -99,11 +97,10 @@ def _execute_job(self, job: Job): def _execute_jobs_synchronously(self): while not self.orchestrator.jobs_to_run.empty(): - with self.lock: - try: - job = self.orchestrator.jobs_to_run.get() - except Exception: # In case the last job of the queue has been removed. - self._logger.warning(f"{job.id} is no longer in the list of jobs to run.") + try: + job = self.orchestrator.jobs_to_run.get() + except Empty: + pass self._execute_job(job) @staticmethod diff --git a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py index 43a428e685..63ad673c78 100644 --- a/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py @@ -40,7 +40,6 @@ def test_init_default(): job_dispatcher = _StandaloneJobDispatcher(orchestrator) assert job_dispatcher.orchestrator == orchestrator - assert job_dispatcher.lock == orchestrator.lock assert job_dispatcher._nb_available_workers == 1 assert isinstance(job_dispatcher._executor, ProcessPoolExecutor)