Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - Reset threading lock at conftest #901

Closed
wants to merge 11 commits into from
11 changes: 5 additions & 6 deletions taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ def stop(self, wait: bool = True, timeout: Optional[float] = None):
wait (bool): If True, the method will wait for the dispatcher to stop.
timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely.
"""
self.stop_wait = wait
self.stop_timeout = timeout
self._STOP_FLAG = True

if wait and self.is_alive():
self._logger.debug("Waiting for the dispatcher thread to stop...")
self.join(timeout=timeout)

def run(self):
self._logger.debug("Job dispatcher started.")
while not self._STOP_FLAG:
Expand All @@ -75,10 +77,7 @@ def run(self):
pass
except Exception as e:
self._logger.exception(e)
pass
if self.stop_wait:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

self._logger.debug("Waiting for the dispatcher thread to stop...")
self.join(timeout=self.stop_timeout)

self._logger.debug("Job dispatcher stopped.")

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Opt

def _can_execute(self) -> bool:
"""Returns True if the dispatcher have resources to dispatch a job."""
self._logger.info(f"self._nb_available_workers: {self._nb_available_workers}")
return self._nb_available_workers > 0

def run(self):
Expand Down
2 changes: 1 addition & 1 deletion taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class _Orchestrator(_AbstractOrchestrator):
"""

jobs_to_run: Queue = Queue()
blocked_jobs: List = []
blocked_jobs: List[Job] = []

lock = Lock()
__logger = _TaipyLogger._get_logger()
Expand Down
3 changes: 2 additions & 1 deletion taipy/core/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def __ge__(self, other):

def _update_submission_status(self, job: Job):
from ._submission_manager_factory import _SubmissionManagerFactory

with self.lock:
submission_manager = _SubmissionManagerFactory._build_manager()
submission = submission_manager._get(self)
Expand All @@ -205,7 +206,7 @@ def _update_submission_status(self, job: Job):
job_status = job.status
if job_status == Status.FAILED:
submission._submission_status = SubmissionStatus.FAILED
_SubmissionManagerFactory._build_manager()._set(submission)
submission_manager._set(submission)
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
return
if job_status == Status.CANCELED:
submission._is_canceled = True
Expand Down
16 changes: 7 additions & 9 deletions taipy/gui_core/_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
from taipy.core import (
Cycle,
DataNode,
Job,
Scenario,
Sequence,
Task,
is_deletable,
is_editable,
is_promotable,
Expand All @@ -37,12 +34,13 @@ def __repr__(self):
return self.get_label() if hasattr(self, "get_label") else super().__repr__()


Scenario.__bases__ += (_GCDoNotUpdate,)
Sequence.__bases__ += (_GCDoNotUpdate,)
DataNode.__bases__ += (_GCDoNotUpdate,)
Cycle.__bases__ += (_GCDoNotUpdate,)
Job.__bases__ += (_GCDoNotUpdate,)
Task.__bases__ += (_GCDoNotUpdate,)
# Scenario.__bases__ += (_GCDoNotUpdate,)
# Sequence.__bases__ += (_GCDoNotUpdate,)
# DataNode.__bases__ += (_GCDoNotUpdate,)
# Cycle.__bases__ += (_GCDoNotUpdate,)
# Job.__bases__ += (_GCDoNotUpdate,)
# Task.__bases__ += (_GCDoNotUpdate,)
# Submission.__bases__ += (_GCDoNotUpdate,)


class _EntityType(Enum):
Expand Down
3 changes: 3 additions & 0 deletions tests/core/_orchestrator/test_orchestrator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def test_build_unknown_dispatcher():
_OrchestratorFactory._build_dispatcher()
assert _OrchestratorFactory._dispatcher is None

Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
_OrchestratorFactory._build_dispatcher()


def test_remove_dispatcher_not_built():
_OrchestratorFactory._dispatcher = None
Expand Down
5 changes: 3 additions & 2 deletions tests/core/config/test_override_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ def test_override_default_configuration_with_code_configuration():
def test_override_default_config_with_code_config_including_env_variable_values():
Config.configure_core()
assert Config.core.repository_type == "filesystem"
Config.configure_core(repository_type="othertype")
assert Config.core.repository_type == "othertype"

with mock.patch.dict(os.environ, {"REPOSITORY_TYPE": "foo"}):
Config.configure_core(repository_type="ENV[REPOSITORY_TYPE]")
assert Config.core.repository_type == "foo"

Config.configure_core(repository_type="othertype")
assert Config.core.repository_type == "othertype"


def test_override_default_configuration_with_file_configuration():
tf = NamedTemporaryFile(
Expand Down
19 changes: 11 additions & 8 deletions tests/core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import shutil
from datetime import datetime
from queue import Queue
from threading import Lock
from unittest.mock import patch

import pandas as pd
Expand Down Expand Up @@ -180,14 +181,9 @@ def default_multi_sheet_data_frame():
def cleanup_files():
yield

if os.path.exists(".data"):
shutil.rmtree(".data", ignore_errors=True)
if os.path.exists("user_data"):
shutil.rmtree("user_data", ignore_errors=True)
if os.path.exists(".taipy"):
shutil.rmtree(".taipy", ignore_errors=True)
if os.path.exists(".my_data"):
shutil.rmtree(".my_data", ignore_errors=True)
for path in [".data", ".my_data", "user_data", ".taipy"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if os.path.exists(path):
shutil.rmtree(path, ignore_errors=True)


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -331,6 +327,12 @@ def clean_repository(init_config, init_managers, init_orchestrator, init_notifie
with patch("sys.argv", ["prog"]):
yield

close_all_sessions()
init_orchestrator()
init_managers()
init_config()
init_notifier()


@pytest.fixture
def init_config(reset_configuration_singleton, inject_core_sections):
Expand Down Expand Up @@ -378,6 +380,7 @@ def _init_orchestrator():
_OrchestratorFactory._build_dispatcher(force_restart=True)
_OrchestratorFactory._orchestrator.jobs_to_run = Queue()
_OrchestratorFactory._orchestrator.blocked_jobs = []
_OrchestratorFactory._orchestrator.lock = Lock()

return _init_orchestrator

Expand Down
10 changes: 1 addition & 9 deletions tests/core/job/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,6 @@ def test_delete_job():
assert _JobManager._get(job_1.id) is None


m = multiprocessing.Manager()
lock = m.Lock()


def inner_lock_multiply(nb1: float, nb2: float):
with lock:
return multiply(1 or nb1, 2 or nb2)


def test_raise_when_trying_to_delete_unfinished_job():
Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
m = multiprocessing.Manager()
Expand Down Expand Up @@ -326,6 +317,7 @@ def test_cancel_subsequent_jobs():
orchestrator = _OrchestratorFactory._orchestrator
submission_manager = _SubmissionManagerFactory._build_manager()

m = multiprocessing.Manager()
lock_0 = m.Lock()

dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
Expand Down
Loading