Skip to content

Commit

Permalink
add logs on lock acquire and release
Browse files Browse the repository at this point in the history
  • Loading branch information
trgiangdo committed Mar 8, 2024
1 parent 87a0082 commit 19c316b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
8 changes: 8 additions & 0 deletions taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,31 @@ def run(self):
self._logger.info("Start job dispatcher...")
while not self._STOP_FLAG:
try:
self._logger.info("Check if can execute before getting from queue")
if self._can_execute():
self._logger.info(f"run() TRY TO acquired the {self.lock}")
with self.lock:
self._logger.info(f"run() acquired the {self.lock}")
if self._STOP_FLAG:
break
self._logger.info("Getting job from ", self.orchestrator.jobs_to_run)
job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1)
self._execute_job(job)
self._logger.info(f"run() release the {self.lock}")
else:
time.sleep(0.1) # We need to sleep to avoid busy waiting.
except Empty: # In case the last job of the queue has been removed.
self._logger.info(f"run() release the {self.lock} because of Empty")
pass
except Exception as e:
self._logger.info(f"run() release the {self.lock} because of {e}")
self._logger.exception(e)

self._logger.info("Job dispatcher stopped.")

def _can_execute(self) -> bool:
"""Returns True if the dispatcher have resources to execute a new job."""
self._logger.info(f"self._nb_available_workers: {self._nb_available_workers}")
return self._nb_available_workers > 0

def _execute_job(self, job: Job):
Expand Down
10 changes: 10 additions & 0 deletions taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def submit(
jobs = []
tasks = submittable._get_sorted_tasks()
with cls.lock:
cls.__logger.info(f"submit() acquired the {cls.lock}")
for ts in tasks:
for task in ts:
jobs.append(
Expand All @@ -90,6 +91,7 @@ def submit(
force=force, # type: ignore
)
)
cls.__logger.info(f"submit() released the {cls.lock}")
submission.jobs = jobs # type: ignore
cls._orchestrate_job_to_run_or_block(jobs)
if Config.job_config.is_development:
Expand Down Expand Up @@ -129,13 +131,15 @@ def submit_task(
)
submit_id = submission.id
with cls.lock:
cls.__logger.info(f"submit_task() acquired the {cls.lock}")
job = cls._lock_dn_output_and_create_job(
task,
submit_id,
submission.entity_id,
itertools.chain([cls._update_submission_status], callbacks or []),
force,
)
cls.__logger.info(f"submit_task() released the {cls.lock}")
jobs = [job]
submission.jobs = jobs # type: ignore
cls._orchestrate_job_to_run_or_block(jobs)
Expand Down Expand Up @@ -233,10 +237,12 @@ def _on_status_change(cls, job: Job):
def __unblock_jobs(cls):
for job in cls.blocked_jobs:
if not cls._is_blocked(job):
cls.__logger.info(f"unblock_jobs() acquired the {cls.lock}")
with cls.lock:
job.pending()
cls.__remove_blocked_job(job)
cls.jobs_to_run.put(job)
cls.__logger.info(f"unblock_jobs() released the {cls.lock}")

@classmethod
def __remove_blocked_job(cls, job):
Expand All @@ -255,12 +261,14 @@ def cancel_job(cls, job: Job):
cls.__logger.info(f"{job.id} has already failed and cannot be canceled.")
else:
with cls.lock:
cls.__logger.info(f"cancel_job() acquired the {cls.lock}")
to_cancel_or_abandon_jobs = set([job])
to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys())))
cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs)
cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs)
cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs)
cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs)
cls.__logger.info(f"cancel_job() released the {cls.lock}")

@classmethod
def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]:
Expand Down Expand Up @@ -294,6 +302,7 @@ def __remove_jobs_to_run(cls, jobs):
@classmethod
def _fail_subsequent_jobs(cls, failed_job: Job):
with cls.lock:
cls.__logger.info(f"_fail_subsequent_jobs() acquired the {cls.lock}")
to_fail_or_abandon_jobs = set()
to_fail_or_abandon_jobs.update(
cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys()))
Expand All @@ -304,6 +313,7 @@ def _fail_subsequent_jobs(cls, failed_job: Job):
cls.__remove_blocked_jobs(to_fail_or_abandon_jobs)
cls.__remove_jobs_to_run(to_fail_or_abandon_jobs)
cls._unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs)
cls.__logger.info(f"_fail_subsequent_jobs() released the {cls.lock}")

@classmethod
def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]):
Expand Down

0 comments on commit 19c316b

Please sign in to comment.