Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#1704 - Store the status changes in a records #1837

Merged
merged 8 commits into from
Sep 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import datetime
from typing import Optional

from ...job.job import Job
Expand Down Expand Up @@ -45,7 +44,5 @@ def _dispatch(self, job: Job):
Parameters:
job (Job^): The job to submit on an executor with an available worker.
"""
job.execution_started_at = datetime.datetime.now()
rs = _TaskFunctionWrapper(job.id, job.task).execute()
self._update_job_status(job, rs)
job.execution_ended_at = datetime.datetime.now()
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import datetime
import multiprocessing as mp
from concurrent.futures import Executor, ProcessPoolExecutor
from functools import partial
Expand Down Expand Up @@ -61,7 +60,6 @@ def _dispatch(self, job: Job):
self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the dispatch method.")
config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined]

job.execution_started_at = datetime.datetime.now()
future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
future.add_done_callback(partial(self._update_job_status_from_future, job))

Expand All @@ -70,4 +68,3 @@ def _update_job_status_from_future(self, job: Job, ft):
self._nb_available_workers += 1
self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the callback method.")
self._update_job_status(job, ft.result())
job.execution_ended_at = datetime.datetime.now()
10 changes: 4 additions & 6 deletions taipy/core/job/_job_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ def _entity_to_model(cls, job: Job) -> _JobModel:
job.id,
job._task.id,
job._status,
{status: timestamp.isoformat() for status, timestamp in job._status_change_records.items()},
job._force,
job.submit_id,
job.submit_entity_id,
job._creation_date.isoformat(),
job._execution_started_at.isoformat() if job._execution_started_at else None,
job._execution_ended_at.isoformat() if job._execution_ended_at else None,
cls.__serialize_subscribers(job._subscribers),
job._stacktrace,
version=job._version,
Expand All @@ -52,12 +51,11 @@ def _model_to_entity(cls, model: _JobModel) -> Job:
)

job._status = model.status # type: ignore
job._status_change_records = {
status: datetime.fromisoformat(timestamp) for status, timestamp in model.status_change_records.items()
}
job._force = model.force # type: ignore
job._creation_date = datetime.fromisoformat(model.creation_date) # type: ignore
job._execution_started_at = (
datetime.fromisoformat(model.execution_started_at) if model.execution_started_at else None
)
job._execution_ended_at = datetime.fromisoformat(model.execution_ended_at) if model.execution_ended_at else None
for it in model.subscribers:
try:
fct_module, fct_name = it.get("fct_module"), it.get("fct_name")
Expand Down
11 changes: 4 additions & 7 deletions taipy/core/job/_job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# specific language governing permissions and limitations under the License.

from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List

from .._repository._base_taipy_model import _BaseModel
from .job_id import JobId
Expand All @@ -22,12 +22,11 @@ class _JobModel(_BaseModel):
id: JobId
task_id: str
status: Status
status_change_records: Dict[str, str]
force: bool
submit_id: str
submit_entity_id: str
creation_date: str
execution_started_at: Optional[str]
execution_ended_at: Optional[str]
subscribers: List[Dict]
stacktrace: List[str]
version: str
Expand All @@ -38,12 +37,11 @@ def from_dict(data: Dict[str, Any]):
id=data["id"],
task_id=data["task_id"],
status=Status._from_repr(data["status"]),
status_change_records=_BaseModel._deserialize_attribute(data["status_change_records"]),
force=data["force"],
submit_id=data["submit_id"],
submit_entity_id=data["submit_entity_id"],
creation_date=data["creation_date"],
execution_started_at=data["execution_started_at"],
execution_ended_at=data["execution_ended_at"],
subscribers=_BaseModel._deserialize_attribute(data["subscribers"]),
stacktrace=_BaseModel._deserialize_attribute(data["stacktrace"]),
version=data["version"],
Expand All @@ -54,12 +52,11 @@ def to_list(self):
self.id,
self.task_id,
repr(self.status),
_BaseModel._serialize_attribute(self.status_change_records),
self.force,
self.submit_id,
self.submit_entity_id,
self.creation_date,
self.execution_started_at,
self.execution_ended_at,
_BaseModel._serialize_attribute(self.subscribers),
_BaseModel._serialize_attribute(self.stacktrace),
self.version,
Expand Down
121 changes: 99 additions & 22 deletions taipy/core/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__all__ = ["Job"]

from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, List, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

from taipy.logger._taipy_logger import _TaipyLogger

Expand Down Expand Up @@ -49,8 +49,8 @@ class Job(_Entity, _Labeled):

Every time a task is submitted for execution, a new *Job* is created. A job represents a
single execution of a task. It holds all the information related to the task execution,
including the **creation date**, the execution `Status^`, and the **stacktrace** of any
exception that may be raised by the user function.
including the **creation date**, the execution `Status^`, the timestamp of status changes,
and the **stacktrace** of any exception that may be raised by the user function.

In addition, a job notifies scenario or sequence subscribers on its status change.

Expand Down Expand Up @@ -78,8 +78,7 @@ def __init__(self, id: JobId, task: "Task", submit_id: str, submit_entity_id: st
self._creation_date = datetime.now()
self._submit_id: str = submit_id
self._submit_entity_id: str = submit_entity_id
self._execution_started_at: Optional[datetime] = None
self._execution_ended_at: Optional[datetime] = None
self._status_change_records: Dict[str, datetime] = {"SUBMITTED": self._creation_date}
self._subscribers: List[Callable] = []
self._stacktrace: List[str] = []
self.__logger = _TaipyLogger._get_logger()
Expand Down Expand Up @@ -134,6 +133,7 @@ def status(self):
@status.setter # type: ignore
@_self_setter(_MANAGER_NAME)
def status(self, val):
self._status_change_records[val.name] = datetime.now()
self._status = val

@property # type: ignore
Expand All @@ -148,36 +148,113 @@ def creation_date(self, val):

@property
@_self_reload(_MANAGER_NAME)
def execution_started_at(self) -> Optional[datetime]:
return self._execution_started_at
def submitted_at(self) -> datetime:
"""Get the date time when the job was submitted.

@execution_started_at.setter
@_self_setter(_MANAGER_NAME)
def execution_started_at(self, val):
self._execution_started_at = val
Returns:
datetime: The date time when the job was submitted.
"""
return self._status_change_records["SUBMITTED"]

@property
@_self_reload(_MANAGER_NAME)
def execution_ended_at(self) -> Optional[datetime]:
return self._execution_ended_at
def run_at(self) -> Optional[datetime]:
"""Get the date time when the job was run.

@execution_ended_at.setter
@_self_setter(_MANAGER_NAME)
def execution_ended_at(self, val):
self._execution_ended_at = val
Returns:
Optional[datetime]: The date time when the job was run.
If the job is not run, None is returned.
"""
return self._status_change_records.get(Status.RUNNING.name, None)

@property
@_self_reload(_MANAGER_NAME)
def finished_at(self) -> Optional[datetime]:
"""Get the date time when the job was finished.

Returns:
Optional[datetime]: The date time when the job was finished.
If the job is not finished, None is returned.
"""
if self.is_finished():
if self.is_completed():
return self._status_change_records[Status.COMPLETED.name]
elif self.is_failed():
return self._status_change_records[Status.FAILED.name]
elif self.is_canceled():
return self._status_change_records[Status.CANCELED.name]
elif self.is_skipped():
return self._status_change_records[Status.SKIPPED.name]
elif self.is_abandoned():
return self._status_change_records[Status.ABANDONED.name]

return None

@property
@_self_reload(_MANAGER_NAME)
def execution_duration(self) -> Optional[float]:
"""Get the duration of the job execution in seconds.
The execution time is the duration from the job running to the job completion.

Returns:
Optional[float]: The duration of the job execution in seconds. If the job is not
completed, None is returned.
Optional[float]: The duration of the job execution in seconds.
- If the job was not run, None is returned.
- If the job is not finished, the execution time is the duration
from the running time to the current time.
"""
if self._execution_started_at and self._execution_ended_at:
return (self._execution_ended_at - self._execution_started_at).total_seconds()
return None
if Status.RUNNING.name not in self._status_change_records:
return None

if self.is_finished():
return (self.finished_at - self._status_change_records[Status.RUNNING.name]).total_seconds()

return (datetime.now() - self._status_change_records[Status.RUNNING.name]).total_seconds()

@property
@_self_reload(_MANAGER_NAME)
def pending_duration(self) -> Optional[float]:
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
"""Get the duration of the job in the pending state in seconds.

Returns:
Optional[float]: The duration of the job in the pending state in seconds.
- If the job is not running, None is returned.
- If the job is not pending, the pending time is the duration
from the submission to the current time.
"""
if Status.PENDING.name not in self._status_change_records:
return None

if self.is_finished() or self.is_running():
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
return (
self._status_change_records[Status.RUNNING.name] - self._status_change_records[Status.PENDING.name]
).total_seconds()

return (datetime.now() - self._status_change_records[Status.PENDING.name]).total_seconds()

@property
@_self_reload(_MANAGER_NAME)
def blocked_duration(self) -> Optional[float]:
"""Get the duration of the job in the blocked state in seconds.

Returns:
Optional[float]: The duration of the job in the blocked state in seconds.
- If the job is not running, None is returned.
- If the job is not blocked, the blocked time is the duration
from the submission to the current time.
"""
if Status.BLOCKED.name not in self._status_change_records:
return None

if Status.PENDING.name in self._status_change_records:
return (
self._status_change_records[Status.PENDING.name] - self._status_change_records[Status.BLOCKED.name]
).total_seconds()
if self.is_finished():
return (self.finished_at - self._status_change_records[Status.BLOCKED.name]).total_seconds()

# If pending time is not recorded, and the job is not finished, the only possible status left is blocked
# which means the current status is blocked.
return (datetime.now() - self._status_change_records[Status.BLOCKED.name]).total_seconds()

@property # type: ignore
@_self_reload(_MANAGER_NAME)
Expand Down
27 changes: 0 additions & 27 deletions taipy/core/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,33 +139,6 @@ def properties(self):
def creation_date(self):
return self._creation_date

@property
@_self_reload(_MANAGER_NAME)
def execution_started_at(self) -> Optional[datetime]:
if all(job.execution_started_at is not None for job in self.jobs):
return min(job.execution_started_at for job in self.jobs)
return None

@property
@_self_reload(_MANAGER_NAME)
def execution_ended_at(self) -> Optional[datetime]:
if all(job.execution_ended_at is not None for job in self.jobs):
return max(job.execution_ended_at for job in self.jobs)
return None

@property
@_self_reload(_MANAGER_NAME)
def execution_duration(self) -> Optional[float]:
"""Get the duration of the submission in seconds.

Returns:
Optional[float]: The duration of the submission in seconds. If the job is not
completed, None is returned.
"""
if self.execution_started_at and self.execution_ended_at:
return (self.execution_ended_at - self.execution_started_at).total_seconds()
return None

def get_label(self) -> str:
"""Returns the submission simple label prefixed by its owner label.

Expand Down
24 changes: 9 additions & 15 deletions tests/core/_orchestrator/test_orchestrator__submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,17 +535,14 @@ def test_submit_duration_development_mode():
jobs = submission.jobs
orchestrator.stop()

assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
assert all(isinstance(job.submitted_at, datetime) for job in jobs)
assert all(isinstance(job.run_at, datetime) for job in jobs)
assert all(isinstance(job.finished_at, datetime) for job in jobs)
jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
assert jobs_1s.execution_duration >= 1
assert jobs_2s.execution_duration >= 2

assert submission.execution_duration >= 3
assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)


@pytest.mark.standalone
def test_submit_duration_standalone_mode():
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -562,19 +559,16 @@ def test_submit_duration_standalone_mode():
scenario = Scenario("scenario", {task_1, task_2}, {})
_ScenarioManager._set(scenario)
submission = taipy.submit(scenario)
jobs = submission.jobs

assert_true_after_time(jobs[1].is_completed)

assert_true_after_time(lambda: all(job is not None and job.is_completed() for job in submission.jobs))
orchestrator.stop()

assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
jobs = submission.jobs

assert all(isinstance(job.submitted_at, datetime) for job in jobs)
assert all(isinstance(job.run_at, datetime) for job in jobs)
assert all(isinstance(job.finished_at, datetime) for job in jobs)
jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
assert jobs_1s.execution_duration >= 1
assert jobs_2s.execution_duration >= 2

assert submission.execution_duration >= 2 # Both tasks are executed in parallel so the duration may smaller than 3
assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)
Loading
Loading