Skip to content

Commit

Permalink
Merge branch 'develop' into feature/decouple-app-syncrho
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhlongviolin1 authored Feb 20, 2024
2 parents 1f00912 + 16dfc8c commit 88fd6ab
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
from typing import Optional

from ...job.job import Job
from .._abstract_orchestrator import _AbstractOrchestrator
from ._job_dispatcher import _JobDispatcher
Expand All @@ -19,7 +17,7 @@
class _DevelopmentJobDispatcher(_JobDispatcher):
"""Manages job dispatching (instances of `Job^` class) in a synchronous way."""

def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
def __init__(self, orchestrator: _AbstractOrchestrator):
super().__init__(orchestrator)

def start(self):
Expand Down
8 changes: 3 additions & 5 deletions taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import threading
from abc import abstractmethod
from queue import Empty
from typing import Dict, Optional
from typing import Dict

from taipy.config.config import Config
from taipy.logger._taipy_logger import _TaipyLogger
Expand All @@ -32,7 +32,7 @@ class _JobDispatcher(threading.Thread):
_logger = _TaipyLogger._get_logger()
_nb_available_workers: int = 1

def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
def __init__(self, orchestrator: _AbstractOrchestrator):
threading.Thread.__init__(self, name="Thread-Taipy-JobDispatcher")
self.daemon = True
self.orchestrator = orchestrator
Expand Down Expand Up @@ -66,9 +66,7 @@ def run(self):
except Exception as e:
_TaipyLogger._get_logger().exception(e)
pass

# The dispatcher is now shutting down, let's shutdown its executor.
self._executor.shutdown(wait=True)
self._logger.info("Job dispatcher stopped.")

def _can_execute(self) -> bool:
"""Returns True if the dispatcher have resources to execute a new job."""
Expand Down
14 changes: 11 additions & 3 deletions taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,28 @@
class _StandaloneJobDispatcher(_JobDispatcher):
"""Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor."""

def __init__(self, orchestrator: Optional[_AbstractOrchestrator], subproc_initializer: Optional[Callable] = None):
def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Optional[Callable] = None):
super().__init__(orchestrator)
max_workers = Config.job_config.max_nb_of_workers or 1
self._executor: Executor = ProcessPoolExecutor(max_workers=max_workers, initializer=subproc_initializer) # type: ignore
self._executor: Executor = ProcessPoolExecutor(
max_workers=max_workers,
initializer=subproc_initializer
) # type: ignore
self._nb_available_workers = self._executor._max_workers # type: ignore

def run(self):
with self._executor:
super().run()
self._logger.info("Standalone job dispatcher: Pool executor shut down")

def _dispatch(self, job: Job):
"""Dispatches the given `Job^` on an available worker for execution.
Parameters:
job (Job^): The job to submit on an executor with an available worker.
"""
self._nb_available_workers -= 1

self._nb_available_workers -= 1
config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined]
future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)

Expand Down
10 changes: 5 additions & 5 deletions taipy/core/_orchestrator/_orchestrator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import typing
from importlib import util
from typing import Optional, Type

Expand All @@ -27,7 +27,7 @@ class _OrchestratorFactory:
_TAIPY_ENTERPRISE_CORE_DISPATCHER_MODULE = _TAIPY_ENTERPRISE_MODULE + ".core._orchestrator._dispatcher"
__TAIPY_ENTERPRISE_BUILD_DISPATCHER_METHOD = "_build_dispatcher"

_orchestrator: Optional[_Orchestrator] = None
_orchestrator: Optional[_AbstractOrchestrator] = None
_dispatcher: Optional[_JobDispatcher] = None

@classmethod
Expand Down Expand Up @@ -80,20 +80,20 @@ def __build_standalone_job_dispatcher(cls, force_restart=False):
cls._TAIPY_ENTERPRISE_CORE_DISPATCHER_MODULE, cls.__TAIPY_ENTERPRISE_BUILD_DISPATCHER_METHOD
)(cls._orchestrator)
else:
cls._dispatcher = _StandaloneJobDispatcher(cls._orchestrator) # type: ignore
cls._dispatcher = _StandaloneJobDispatcher(typing.cast(_AbstractOrchestrator, cls._orchestrator))
cls._dispatcher.start() # type: ignore

@classmethod
def __build_development_job_dispatcher(cls):
if isinstance(cls._dispatcher, _StandaloneJobDispatcher):
cls._dispatcher.stop()
cls._dispatcher = _DevelopmentJobDispatcher(cls._orchestrator) # type: ignore
cls._dispatcher = _DevelopmentJobDispatcher(typing.cast(_AbstractOrchestrator, cls._orchestrator))

@classmethod
def __build_enterprise_job_dispatcher(cls, force_restart=False):
cls._dispatcher = _load_fct(
cls._TAIPY_ENTERPRISE_CORE_DISPATCHER_MODULE, cls.__TAIPY_ENTERPRISE_BUILD_DISPATCHER_METHOD
)(cls._orchestrator, force_restart)
)(typing.cast(_AbstractOrchestrator, cls._orchestrator), force_restart)
if cls._dispatcher:
cls._dispatcher.start()
else:
Expand Down
105 changes: 53 additions & 52 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,58 +195,59 @@ def _read(self):
return self._read_as()

def _read_as(self):
excel_file = load_workbook(self._path)
exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
work_books = dict()
sheet_names = excel_file.sheetnames

user_provided_sheet_names = self.properties.get(self.__SHEET_NAME_PROPERTY) or []
if not isinstance(user_provided_sheet_names, (List, Set, Tuple)):
user_provided_sheet_names = [user_provided_sheet_names]

provided_sheet_names = user_provided_sheet_names or sheet_names

for sheet_name in provided_sheet_names:
if sheet_name not in sheet_names:
raise NonExistingExcelSheet(sheet_name, self._path)

if isinstance(exposed_type, List):
if len(provided_sheet_names) != len(self.properties[self._EXPOSED_TYPE_PROPERTY]):
raise ExposedTypeLengthMismatch(
f"Expected {len(provided_sheet_names)} exposed types, got "
f"{len(self.properties[self._EXPOSED_TYPE_PROPERTY])}"
)

for i, sheet_name in enumerate(provided_sheet_names):
work_sheet = excel_file[sheet_name]
sheet_exposed_type = exposed_type

if not isinstance(sheet_exposed_type, str):
if isinstance(exposed_type, dict):
sheet_exposed_type = exposed_type.get(sheet_name, self._EXPOSED_TYPE_PANDAS)
elif isinstance(exposed_type, List):
sheet_exposed_type = exposed_type[i]

if isinstance(sheet_exposed_type, str):
if sheet_exposed_type == self._EXPOSED_TYPE_NUMPY:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name).to_numpy()
elif sheet_exposed_type == self._EXPOSED_TYPE_PANDAS:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name)
continue

res = list()
for row in work_sheet.rows:
res.append([col.value for col in row])
if self.properties[self._HAS_HEADER_PROPERTY] and res:
header = res.pop(0)
for i, row in enumerate(res):
res[i] = sheet_exposed_type(**dict([[h, r] for h, r in zip(header, row)]))
else:
for i, row in enumerate(res):
res[i] = sheet_exposed_type(*row)
work_books[sheet_name] = res

excel_file.close()
try:
excel_file = load_workbook(self._path)
exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
work_books = dict()
sheet_names = excel_file.sheetnames

user_provided_sheet_names = self.properties.get(self.__SHEET_NAME_PROPERTY) or []
if not isinstance(user_provided_sheet_names, (List, Set, Tuple)):
user_provided_sheet_names = [user_provided_sheet_names]

provided_sheet_names = user_provided_sheet_names or sheet_names

for sheet_name in provided_sheet_names:
if sheet_name not in sheet_names:
raise NonExistingExcelSheet(sheet_name, self._path)

if isinstance(exposed_type, List):
if len(provided_sheet_names) != len(self.properties[self._EXPOSED_TYPE_PROPERTY]):
raise ExposedTypeLengthMismatch(
f"Expected {len(provided_sheet_names)} exposed types, got "
f"{len(self.properties[self._EXPOSED_TYPE_PROPERTY])}"
)

for i, sheet_name in enumerate(provided_sheet_names):
work_sheet = excel_file[sheet_name]
sheet_exposed_type = exposed_type

if not isinstance(sheet_exposed_type, str):
if isinstance(exposed_type, dict):
sheet_exposed_type = exposed_type.get(sheet_name, self._EXPOSED_TYPE_PANDAS)
elif isinstance(exposed_type, List):
sheet_exposed_type = exposed_type[i]

if isinstance(sheet_exposed_type, str):
if sheet_exposed_type == self._EXPOSED_TYPE_NUMPY:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name).to_numpy()
elif sheet_exposed_type == self._EXPOSED_TYPE_PANDAS:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name)
continue

res = list()
for row in work_sheet.rows:
res.append([col.value for col in row])
if self.properties[self._HAS_HEADER_PROPERTY] and res:
header = res.pop(0)
for i, row in enumerate(res):
res[i] = sheet_exposed_type(**dict([[h, r] for h, r in zip(header, row)]))
else:
for i, row in enumerate(res):
res[i] = sheet_exposed_type(*row)
work_books[sheet_name] = res
finally:
excel_file.close()

if len(provided_sheet_names) == 1:
return work_books[provided_sheet_names[0]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# specific language governing permissions and limitations under the License.

from concurrent.futures import Executor, Future
from typing import List, Optional
from typing import List

from taipy.core import Job
from taipy.core._orchestrator._abstract_orchestrator import _AbstractOrchestrator
Expand All @@ -35,7 +35,7 @@ def submit(self, fn, *args, **kwargs):


class MockStandaloneDispatcher(_StandaloneJobDispatcher):
def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
def __init__(self, orchestrator: _AbstractOrchestrator):
super(_StandaloneJobDispatcher, self).__init__(orchestrator)
self._executor: Executor = MockProcessPoolExecutor()
self.dispatch_calls: List = []
Expand Down
25 changes: 19 additions & 6 deletions tests/core/data/test_write_multiple_sheet_excel_data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ def tmp_excel_file():
def cleanup(tmp_excel_file):
yield
if os.path.exists(tmp_excel_file):
os.remove(tmp_excel_file)
try:
os.remove(tmp_excel_file)
except Exception as e:
from taipy.logger._taipy_logger import _TaipyLogger
logger = _TaipyLogger._get_logger()
logger.error(f"Failed to delete {tmp_excel_file}. {e}")


@dataclasses.dataclass
Expand Down Expand Up @@ -167,7 +172,6 @@ def test_write_with_header_multiple_sheet_custom_exposed_type_with_sheet_name(tm
Scope.SCENARIO,
properties={"path": tmp_excel_file, "sheet_name": sheet_names, "exposed_type": MyCustomObject},
)

row_1 = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
row_2 = [MyCustomObject(0, 4, "hello"), MyCustomObject(1, 5, "abc"), MyCustomObject(2, 6, ".")]
sheet_data = {"Sheet1": row_1, "Sheet2": row_2}
Expand All @@ -180,7 +184,10 @@ def test_write_with_header_multiple_sheet_custom_exposed_type_with_sheet_name(tm


def test_write_with_header_multiple_sheet_custom_exposed_type_without_sheet_name(tmp_excel_file):
excel_dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": tmp_excel_file, "exposed_type": MyCustomObject})
excel_dn = ExcelDataNode(
"foo",
Scope.SCENARIO,
properties={"path": tmp_excel_file, "exposed_type": MyCustomObject})

row_1 = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
row_2 = [MyCustomObject(0, 4, "hello"), MyCustomObject(1, 5, "abc"), MyCustomObject(2, 6, ".")]
Expand All @@ -195,7 +202,9 @@ def test_write_with_header_multiple_sheet_custom_exposed_type_without_sheet_name

def test_write_without_header_multiple_sheet_pandas_with_sheet_name(tmp_excel_file):
excel_dn = ExcelDataNode(
"foo", Scope.SCENARIO, properties={"path": tmp_excel_file, "sheet_name": sheet_names, "has_header": False}
"foo",
Scope.SCENARIO,
properties={"path": tmp_excel_file, "sheet_name": sheet_names, "has_header": False}
)

df_1 = pd.DataFrame([*zip([1, 2, 3])])
Expand Down Expand Up @@ -283,7 +292,9 @@ def test_write_without_header_multiple_sheet_numpy_with_sheet_name(tmp_excel_fil

def test_write_without_header_multiple_sheet_numpy_without_sheet_name(tmp_excel_file):
excel_dn = ExcelDataNode(
"foo", Scope.SCENARIO, properties={"path": tmp_excel_file, "exposed_type": "numpy", "has_header": False}
"foo",
Scope.SCENARIO,
properties={"path": tmp_excel_file, "exposed_type": "numpy", "has_header": False}
)

arr_1 = np.array([[1], [2], [3]])
Expand Down Expand Up @@ -332,7 +343,9 @@ def test_write_without_header_multiple_sheet_custom_exposed_type_with_sheet_name

def test_write_without_header_multiple_sheet_custom_exposed_type_without_sheet_name(tmp_excel_file):
excel_dn = ExcelDataNode(
"foo", Scope.SCENARIO, properties={"path": tmp_excel_file, "exposed_type": MyCustomObject, "has_header": False}
"foo",
Scope.SCENARIO,
properties={"path": tmp_excel_file, "exposed_type": MyCustomObject, "has_header": False}
)

row_1 = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
Expand Down
25 changes: 17 additions & 8 deletions tests/core/data/test_write_single_sheet_excel_data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import dataclasses
import os
import pathlib
Expand All @@ -32,7 +31,12 @@ def tmp_excel_file():
def cleanup(tmp_excel_file):
yield
if os.path.exists(tmp_excel_file):
os.remove(tmp_excel_file)
try:
os.remove(tmp_excel_file)
except Exception as e:
from taipy.logger._taipy_logger import _TaipyLogger
logger = _TaipyLogger._get_logger()
logger.error(f"Failed to delete {tmp_excel_file}. {e}")


@dataclasses.dataclass
Expand Down Expand Up @@ -248,17 +252,21 @@ def test_write_with_header_single_sheet_custom_exposed_type_with_sheet_name(tmp_
Scope.SCENARIO,
properties={"path": tmp_excel_file, "sheet_name": "Sheet1", "exposed_type": MyCustomObject},
)
expected_data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]

data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
excel_dn.write(data)
assert all(actual == expected for actual, expected in zip(excel_dn.read(), data))
excel_dn.write(expected_data)
actual_data = excel_dn.read()

assert all(actual == expected for actual, expected in zip(actual_data, expected_data))

excel_dn.write(None)
assert excel_dn.read() == []
actual_data = excel_dn.read()
assert actual_data == []


def test_write_with_header_single_sheet_custom_exposed_type_without_sheet_name(tmp_excel_file):
excel_dn = ExcelDataNode("foo", Scope.SCENARIO, properties={"path": tmp_excel_file, "exposed_type": MyCustomObject})
excel_dn = ExcelDataNode("foo", Scope.SCENARIO,
properties={"path": tmp_excel_file, "exposed_type": MyCustomObject})

data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
excel_dn.write(data)
Expand Down Expand Up @@ -290,7 +298,8 @@ def test_write_without_header_single_sheet_custom_exposed_type_with_sheet_name(t

def test_write_without_header_single_sheet_custom_exposed_type_without_sheet_name(tmp_excel_file):
excel_dn = ExcelDataNode(
"foo", Scope.SCENARIO, properties={"path": tmp_excel_file, "exposed_type": MyCustomObject, "has_header": False}
"foo", Scope.SCENARIO,
properties={"path": tmp_excel_file, "exposed_type": MyCustomObject, "has_header": False}
)

data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
Expand Down
Loading

0 comments on commit 88fd6ab

Please sign in to comment.