From 325f7724ff27371e2e8d52bac8c0781e31c386f7 Mon Sep 17 00:00:00 2001 From: Aaron Peddle Date: Thu, 7 Dec 2023 12:05:35 -0800 Subject: [PATCH] add worker id to logs --- example.py | 6 ++++++ potassium/potassium.py | 2 +- potassium/worker.py | 16 +++++++++++++--- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/example.py b/example.py index 96c458c..e37497a 100644 --- a/example.py +++ b/example.py @@ -30,6 +30,12 @@ def handler(context: dict, request: Request) -> Response: status=200 ) +@app.background("/background") +def background(context: dict, request: Request): + time.sleep(5) + print('hi') + + @app.handler("/stream") def stream(context: dict, request: Request): def stream(): diff --git a/potassium/potassium.py b/potassium/potassium.py index 3b3d05d..805040a 100644 --- a/potassium/potassium.py +++ b/potassium/potassium.py @@ -291,7 +291,7 @@ def _init_server(self): Pool = ThreadPool else: Pool = ProcessPool - self._worker_pool = Pool(self._num_workers, init_worker, (index_queue, self._event_queue, self._response_queue, self._init_func)) + self._worker_pool = Pool(self._num_workers, init_worker, (index_queue, self._event_queue, self._response_queue, self._init_func, self._num_workers)) # serve runs the http server def serve(self, host="0.0.0.0", port=8000): diff --git a/potassium/worker.py b/potassium/worker.py index 94d6525..66ceaed 100644 --- a/potassium/worker.py +++ b/potassium/worker.py @@ -45,13 +45,15 @@ def set_prefix(self, prefix): @dataclass class Worker(): + worker_num: int + total_workers: int context: Dict[Any, Any] event_queue: Queue response_queue: Queue stderr_redirect: FDRedirect stdout_redirect: FDRedirect -def init_worker(index_queue, event_queue, response_queue, init_func): +def init_worker(index_queue, event_queue, response_queue, init_func, total_workers): global worker worker_num = index_queue.get() @@ -76,6 +78,8 @@ def init_worker(index_queue, event_queue, response_queue, init_func): event_queue.put((StatusEvent.WORKER_STARTED,)) worker = Worker( + worker_num, + total_workers, context, event_queue, response_queue, @@ -85,9 +89,15 @@ def init_worker(index_queue, event_queue, response_queue, init_func): def run_worker(func, request, internal_id, use_response=False): assert worker is not None, "worker is not initialized" + + if worker.total_workers > 1: + prefix = f"[worker {worker.worker_num}, requestID {request.id}] " + else: + prefix = f"[requestID {request.id}] " - worker.stderr_redirect.set_prefix(f"[requestID {request.id}] ") - worker.stdout_redirect.set_prefix(f"[requestID {request.id}] ") + + worker.stderr_redirect.set_prefix(prefix) + worker.stdout_redirect.set_prefix(prefix) resp = None worker.event_queue.put((StatusEvent.INFERENCE_START, internal_id))