Skip to content

Commit

Permalink
Merge pull request #878 from Avaiga/fix/block-config-and-create-versi…
Browse files Browse the repository at this point in the history
…on-when-create-entities
  • Loading branch information
trgiangdo authored Feb 27, 2024
2 parents ea50048 + 1e4d007 commit 832ccaa
Show file tree
Hide file tree
Showing 60 changed files with 309 additions and 365 deletions.
8 changes: 6 additions & 2 deletions taipy/config/common/_config_blocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ class _ConfigBlocker:

@classmethod
def _block(cls):
cls.__block_config_update = True
if not cls.__block_config_update:
cls.__logger.info("Blocking configuration update.")
cls.__block_config_update = True

@classmethod
def _unblock(cls):
cls.__block_config_update = False
if cls.__block_config_update:
cls.__logger.info("Unblocking configuration update.")
cls.__block_config_update = False

@classmethod
def _check(cls):
Expand Down
48 changes: 33 additions & 15 deletions taipy/core/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class Core:
_is_running = False
__lock_is_running = Lock()

_version_is_initialized = False
__lock_version_is_initialized = Lock()

__logger = _TaipyLogger._get_logger()

_orchestrator: Optional[_Orchestrator] = None
Expand All @@ -48,22 +51,16 @@ def run(self, force_restart=False):
"""
Start a Core service.
This function checks the configuration, manages application's version,
and starts a dispatcher and lock the Config.
This function checks and locks the configuration, manages application's version,
and starts a job dispatcher.
"""
if self.__class__._is_running:
raise CoreServiceIsAlreadyRunning

with self.__class__.__lock_is_running:
self.__class__._is_running = True

self.__update_core_section()
self.__manage_version()
self.__check_and_block_config()

if self._orchestrator is None:
self._orchestrator = _OrchestratorFactory._build_orchestrator()

self._manage_version_and_block_config()
self.__start_dispatcher(force_restart)

def stop(self, wait: bool = True, timeout: Optional[float] = None):
Expand All @@ -84,25 +81,46 @@ def stop(self, wait: bool = True, timeout: Optional[float] = None):
with self.__class__.__lock_is_running:
self.__class__._is_running = False

@staticmethod
def __update_core_section():
with self.__class__.__lock_version_is_initialized:
self.__class__._version_is_initialized = False

@classmethod
def _manage_version_and_block_config(cls):
"""
Manage the application's version and block the Config from updates.
"""
if cls._version_is_initialized:
return

with cls.__lock_version_is_initialized:
cls._version_is_initialized = True

cls.__update_core_section()
cls.__manage_version()
cls.__check_and_block_config()

@classmethod
def __update_core_section(cls):
_CoreCLI.create_parser()
Config._applied_config._unique_sections[CoreSection.name]._update(_CoreCLI.parse_arguments())

@staticmethod
def __manage_version():
@classmethod
def __manage_version(cls):
_VersionManagerFactory._build_manager()._manage_version()
Config._applied_config._unique_sections[CoreSection.name]._update(
{"version_number": _VersionManagerFactory._build_manager()._get_latest_version()}
)

@staticmethod
def __check_and_block_config():
@classmethod
def __check_and_block_config(cls):
Config.check()
Config.block_update()
_init_backup_file_with_storage_folder()

def __start_dispatcher(self, force_restart):
if self._orchestrator is None:
self._orchestrator = _OrchestratorFactory._build_orchestrator()

if dispatcher := _OrchestratorFactory._build_dispatcher(force_restart=force_restart):
self._dispatcher = dispatcher

Expand Down
1 change: 1 addition & 0 deletions taipy/core/_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .submission.submission_id import SubmissionId
from .taipy import (
cancel_job,
clean_all_entities,
clean_all_entities_by_version,
compare_scenarios,
create_global_data_node,
Expand Down
4 changes: 2 additions & 2 deletions taipy/core/_version/_cli/_version_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ...job._job_manager_factory import _JobManagerFactory
from ...scenario._scenario_manager_factory import _ScenarioManagerFactory
from ...sequence._sequence_manager_factory import _SequenceManagerFactory
from ...taipy import clean_all_entities_by_version
from ...taipy import clean_all_entities
from ...task._task_manager_factory import _TaskManagerFactory
from .._version_manager_factory import _VersionManagerFactory
from ._bcolor import _Bcolors
Expand Down Expand Up @@ -102,7 +102,7 @@ def parse_arguments(cls):
raise SystemExit(e) from None

if args.delete:
if clean_all_entities_by_version(args.delete):
if clean_all_entities(args.delete):
cls.__logger.info(f"Successfully delete version {args.delete}.")
else:
sys.exit(1)
Expand Down
4 changes: 2 additions & 2 deletions taipy/core/_version/_version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ def _replace_version_number(cls, version_number: Optional[str] = None):

@classmethod
def _manage_version(cls):
from ..taipy import clean_all_entities_by_version
from ..taipy import clean_all_entities

if Config.core.mode == "development":
current_version_number = cls._get_development_version()
cls.__logger.info(f"Development mode: Clean all entities of version {current_version_number}")
clean_all_entities_by_version(current_version_number)
clean_all_entities(current_version_number)
cls._set_development_version(current_version_number)

elif Config.core.mode in ["experiment", "production"]:
Expand Down
4 changes: 2 additions & 2 deletions taipy/core/common/_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ def _warn_deprecated(deprecated: str, suggest: Optional[str] = None, stacklevel:
warnings.warn(message=message, category=category, stacklevel=stacklevel)


def _warn_no_core_service(stacklevel: int = 3):
def _warn_no_core_service(specific_message, stacklevel: int = 3):
def inner(f):
@functools.wraps(f)
def _check_if_core_service_is_running(*args, **kwargs):
from .._orchestrator._orchestrator_factory import _OrchestratorFactory

if not _OrchestratorFactory._dispatcher:
message = "The Core service is NOT running"
message = f"The Core service is NOT running. {specific_message}"
warnings.warn(message=message, category=ResourceWarning, stacklevel=stacklevel)

return f(*args, **kwargs)
Expand Down
46 changes: 37 additions & 9 deletions taipy/core/taipy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from taipy.config.common.scope import Scope
from taipy.logger._taipy_logger import _TaipyLogger

from ._core import Core
from ._entity._entity import _Entity
from ._version._version_manager_factory import _VersionManagerFactory
from .common._check_instance import (
Expand All @@ -28,7 +29,7 @@
_is_submission,
_is_task,
)
from .common._warnings import _warn_no_core_service
from .common._warnings import _warn_deprecated, _warn_no_core_service
from .config.data_node_config import DataNodeConfig
from .config.scenario_config import ScenarioConfig
from .cycle._cycle_manager_factory import _CycleManagerFactory
Expand Down Expand Up @@ -219,7 +220,7 @@ def is_readable(
return False


@_warn_no_core_service()
@_warn_no_core_service("The submitted entity will not be executed until the Core service is running.")
def submit(
entity: Union[Scenario, Sequence, Task],
force: bool = False,
Expand Down Expand Up @@ -835,7 +836,10 @@ def create_scenario(
) -> Scenario:
"""Create and return a new scenario based on a scenario configuration.
If the scenario belongs to a cycle, a cycle (corresponding to the _creation_date_
This function checks and locks the configuration, manages application's version,
and creates a new scenario from the scenario configuration provided.
If the scenario belongs to a cycle, the cycle (corresponding to the _creation_date_
and the configuration frequency attribute) is created if it does not exist yet.
Parameters:
Expand All @@ -846,13 +850,22 @@ def create_scenario(
Returns:
The new scenario.
Raises:
SystemExit: If the configuration check returns some errors.
"""
Core._manage_version_and_block_config()

return _ScenarioManagerFactory._build_manager()._create(config, creation_date, name)


def create_global_data_node(config: DataNodeConfig) -> DataNode:
"""Create and return a new GLOBAL data node from a data node configuration.
This function checks and locks the configuration, manages application's version,
and creates the new data node from the data node configuration provided.
Parameters:
config (DataNodeConfig^): The data node configuration. It must have a `GLOBAL` scope.
Expand All @@ -861,32 +874,40 @@ def create_global_data_node(config: DataNodeConfig) -> DataNode:
Raises:
DataNodeConfigIsNotGlobal^: If the data node configuration does not have GLOBAL scope.
SystemExit: If the configuration check returns some errors.
"""
# Check if the data node config has GLOBAL scope
if config.scope is not Scope.GLOBAL:
raise DataNodeConfigIsNotGlobal(config.id)

Core._manage_version_and_block_config()

if dns := _DataManagerFactory._build_manager()._get_by_config_id(config.id):
return dns[0]
return _DataManagerFactory._build_manager()._create_and_set(config, None, None)


def clean_all_entities_by_version(version_number=None) -> bool:
"""Delete all entities of a specific version.
"""Deprecated. Use `clean_all_entities` function instead."""
_warn_deprecated("'clean_all_entities_by_version'", suggest="the 'clean_all_entities' function")
return clean_all_entities(version_number)


This function deletes all entities associated with the specified version.
def clean_all_entities(version_number: str) -> bool:
"""Deletes all entities associated with the specified version.
Parameters:
version_number (optional[str]): The version number of the entities to be deleted.
If None, the default behavior may apply.
version_number (str): The version number of the entities to be deleted.
The version_number should not be a production version.
Returns:
True if the operation succeeded, False otherwise.
Notes:
- If the specified version does not exist, the operation will be aborted, and False will be returned.
- This function cleans all entities, including jobs, scenarios, sequences, tasks, and data nodes.
- The production version of the specified version is also deleted if it exists.
- If the specified version is a production version, the operation will be aborted, and False will be returned.
- This function cleans all entities, including jobs, submissions, scenarios, cycles, sequences, tasks,
and data nodes.
"""
version_manager = _VersionManagerFactory._build_manager()
try:
Expand All @@ -895,6 +916,12 @@ def clean_all_entities_by_version(version_number=None) -> bool:
__logger.warning(f"{e.message} Abort cleaning the entities of version '{version_number}'.")
return False

if version_number in version_manager._get_production_versions():
__logger.warning(
f"Abort cleaning the entities of version '{version_number}'. A production version can not be deleted."
)
return False

_JobManagerFactory._build_manager()._delete_by_version(version_number)
_SubmissionManagerFactory._build_manager()._delete_by_version(version_number)
_ScenarioManagerFactory._build_manager()._delete_by_version(version_number)
Expand All @@ -903,6 +930,7 @@ def clean_all_entities_by_version(version_number=None) -> bool:
_DataManagerFactory._build_manager()._delete_by_version(version_number)

version_manager._delete(version_number)

try:
version_manager._delete_production_version(version_number)
except VersionIsNotProductionVersion:
Expand Down
6 changes: 2 additions & 4 deletions tests/core/_backup/test_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# specific language governing permissions and limitations under the License.

import os
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -47,9 +46,8 @@ def init_backup_file():


def test_backup_storage_folder_when_core_run():
with patch("sys.argv", ["prog"]):
core = Core()
core.run()
core = Core()
core.run()
backup_files = read_backup_file(backup_file_path)
assert backup_files == [f"{Config.core.storage_folder}\n"]
core.stop()
Expand Down
1 change: 1 addition & 0 deletions tests/core/_orchestrator/test_orchestrator__cancel_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

from taipy import Job, JobId, Status
from taipy.config import Config
from taipy.core import taipy
Expand Down
8 changes: 4 additions & 4 deletions tests/core/_orchestrator/test_orchestrator__is_blocked.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_is_not_blocked_task_multiple_input_and_output():
dn_0 = Config.configure_data_node("in_0", default_data="THIS")
dn_1 = Config.configure_data_node("in_1", default_data="IS")
dn_2 = Config.configure_data_node("in_2", default_data="DEFAULT")
out = Config.configure_data_node("output")
out = Config.configure_data_node("output_dn")
t = Config.configure_task("the_task", nothing, [dn_0, dn_1, dn_2], [out])
sc_conf = Config.configure_scenario("scenario", [t])
scenario = taipy.create_scenario(sc_conf)
Expand Down Expand Up @@ -74,7 +74,7 @@ def test_is_blocked_task_single_input_edit_in_progress():
def test_is_blocked_task_multiple_input_no_data():
dn_0 = Config.configure_data_node("input_0", default_data="THIS")
dn_1 = Config.configure_data_node("input_1")
out = Config.configure_data_node("output")
out = Config.configure_data_node("output_dn")
t_config = Config.configure_task("the_task", nothing, [dn_0, dn_1], [out])
sc_conf = Config.configure_scenario("scenario", [t_config])
scenario = taipy.create_scenario(sc_conf)
Expand All @@ -101,7 +101,7 @@ def test_is_not_blocked_job_single_input():
def test_is_not_blocked_job_multiple_input_and_output():
in_0 = Config.configure_data_node("in_0", default_data="THIS")
in_1 = Config.configure_data_node("in_1", default_data="IS")
out = Config.configure_data_node("output")
out = Config.configure_data_node("output_dn")
t = Config.configure_task("the_task", nothing, [in_0, in_1], [out])
sc_conf = Config.configure_scenario("scenario", [t])
scenario = taipy.create_scenario(sc_conf)
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_is_blocked_job_multiple_input_no_data():
dn_0 = Config.configure_data_node("in_0", default_data="THIS")
dn_1 = Config.configure_data_node("in_1", default_data="IS")
dn_2 = Config.configure_data_node("in_2")
out = Config.configure_data_node("output")
out = Config.configure_data_node("output_dn")
t = Config.configure_task("the_task", nothing, [dn_0, dn_1, dn_2], [out])
sc_conf = Config.configure_scenario("scenario", [t])
scenario = taipy.create_scenario(sc_conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def test_lock_dn_and_create_job_with_callback_and_force():


def test_lock_dn_and_create_job_one_output():
dn = Config.configure_data_node("output")
dn = Config.configure_data_node("output_dn")
t = Config.configure_task("one_output", nothing, [], [dn])
sc_conf = Config.configure_scenario("scenario", [t])
scenario = taipy.create_scenario(sc_conf)
orchestrator = _OrchestratorFactory._build_orchestrator()
orchestrator._lock_dn_output_and_create_job(scenario.one_output, "submit_id", "scenario_id")

assert scenario.output.edit_in_progress
assert scenario.output_dn.edit_in_progress


def test_lock_dn_and_create_job_multiple_outputs_one_input():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

from taipy import Status
from taipy.config import Config
from taipy.core import taipy
Expand Down Expand Up @@ -50,9 +51,9 @@ def test_orchestrate_job_to_run_or_block_single_pending_job():


def test_orchestrate_job_to_run_or_block_multiple_jobs():
input = Config.configure_data_node("input", default_data=1) # Has default data
input = Config.configure_data_node("input_dn", default_data=1) # Has default data
intermediate = Config.configure_data_node("intermediate") # Has default data
output = Config.configure_data_node("output") # Has default data
output = Config.configure_data_node("output_dn") # Has default data
t1 = Config.configure_task("my_task_1", nothing, [input], [])
t2 = Config.configure_task("my_task_2", nothing, [], [intermediate])
t3 = Config.configure_task("my_task_3", nothing, [intermediate], [output])
Expand Down
Loading

0 comments on commit 832ccaa

Please sign in to comment.