Skip to content

Commit

Permalink
add worker id to logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Peddle committed Dec 7, 2023
1 parent 90404a2 commit 325f772
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
6 changes: 6 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ def handler(context: dict, request: Request) -> Response:
status=200
)

@app.background("/background")
def background(context: dict, request: Request):
time.sleep(5)
print('hi')


@app.handler("/stream")
def stream(context: dict, request: Request):
def stream():
Expand Down
2 changes: 1 addition & 1 deletion potassium/potassium.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def _init_server(self):
Pool = ThreadPool
else:
Pool = ProcessPool
self._worker_pool = Pool(self._num_workers, init_worker, (index_queue, self._event_queue, self._response_queue, self._init_func))
self._worker_pool = Pool(self._num_workers, init_worker, (index_queue, self._event_queue, self._response_queue, self._init_func, self._num_workers))

# serve runs the http server
def serve(self, host="0.0.0.0", port=8000):
Expand Down
16 changes: 13 additions & 3 deletions potassium/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ def set_prefix(self, prefix):

@dataclass
class Worker():
worker_num: int
total_workers: int
context: Dict[Any, Any]
event_queue: Queue
response_queue: Queue
stderr_redirect: FDRedirect
stdout_redirect: FDRedirect

def init_worker(index_queue, event_queue, response_queue, init_func):
def init_worker(index_queue, event_queue, response_queue, init_func, total_workers):
global worker
worker_num = index_queue.get()

Expand All @@ -76,6 +78,8 @@ def init_worker(index_queue, event_queue, response_queue, init_func):
event_queue.put((StatusEvent.WORKER_STARTED,))

worker = Worker(
worker_num,
total_workers,
context,
event_queue,
response_queue,
Expand All @@ -85,9 +89,15 @@ def init_worker(index_queue, event_queue, response_queue, init_func):

def run_worker(func, request, internal_id, use_response=False):
assert worker is not None, "worker is not initialized"

if worker.total_workers > 1:
prefix = f"[worker {worker.worker_num}, requestID {request.id}] "
else:
prefix = f"[requestID {request.id}] "

worker.stderr_redirect.set_prefix(f"[requestID {request.id}] ")
worker.stdout_redirect.set_prefix(f"[requestID {request.id}] ")

worker.stderr_redirect.set_prefix(prefix)
worker.stdout_redirect.set_prefix(prefix)

resp = None
worker.event_queue.put((StatusEvent.INFERENCE_START, internal_id))
Expand Down

0 comments on commit 325f772

Please sign in to comment.