Skip to content

Commit

Permalink
feat: enable requests batching (#109)
Browse files Browse the repository at this point in the history
Co-authored-by: Avram Tudor <[email protected]>
  • Loading branch information
quitrk and Avram Tudor authored Oct 17, 2024
1 parent a2abf14 commit 8ef27ec
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
1 change: 1 addition & 0 deletions skynet/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def tobool(val: str | None):

# summaries
summary_minimum_payload_length = int(os.environ.get('SUMMARY_MINIMUM_PAYLOAD_LENGTH', 100))
enable_batching = tobool(os.environ.get('ENABLE_BATCHING', 'true'))

# monitoring
enable_metrics = tobool(os.environ.get('ENABLE_METRICS', 'true'))
Expand Down
1 change: 1 addition & 0 deletions skynet/modules/ttt/summaries/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
app.include_router(v1_router)

if echo_requests_base_url:
log.info(f'Echoing requests enabled for url: {echo_requests_base_url}')

@app.middleware("http")
async def echo_requests(request: Request, call_next):
Expand Down
27 changes: 17 additions & 10 deletions skynet/modules/ttt/summaries/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from skynet.auth.openai import CredentialsType, get_credentials

from skynet.env import job_timeout, modules, redis_exp_seconds, summary_minimum_payload_length
from skynet.env import enable_batching, job_timeout, modules, redis_exp_seconds, summary_minimum_payload_length
from skynet.logs import get_logger
from skynet.modules.monitoring import (
OPENAI_API_RESTART_COUNTER,
Expand All @@ -24,7 +24,7 @@

log = get_logger(__name__)

TIME_BETWEEN_JOBS_CHECK = 2
TIME_BETWEEN_JOBS_CHECK = 1
TIME_BETWEEN_JOBS_CHECK_ON_ERROR = 10

PENDING_JOBS_KEY = "jobs:pending"
Expand All @@ -45,7 +45,13 @@ def restart():


def can_run_next_job() -> bool:
return 'summaries:executor' in modules and (current_task is None or current_task.done())
if 'summaries:executor' not in modules:
return False

if enable_batching:
return True

return current_task is None or current_task.done()


def get_job_processor(customer_id: str) -> Processors:
Expand Down Expand Up @@ -167,8 +173,9 @@ async def update_done_job(job: Job, result: str, processor: Processors, has_fail
SUMMARY_FULL_DURATION_METRIC.observe(updated_job.computed_full_duration)
SUMMARY_INPUT_LENGTH_METRIC.observe(len(updated_job.payload.text))

log.info(f"Job {updated_job.id} duration: {updated_job.computed_duration} seconds")
log.info(f"Job {updated_job.id} full duration: {updated_job.computed_full_duration} seconds")
log.info(
f"Job {updated_job.id} duration: {updated_job.computed_duration}s full duration: {updated_job.computed_full_duration}s"
)


async def _run_job(job: Job) -> None:
Expand Down Expand Up @@ -236,14 +243,9 @@ def create_run_job_task(job: Job) -> asyncio.Task:


async def maybe_run_next_job() -> None:
if not await is_openai_api_ready():
return

if not can_run_next_job():
return

await restore_stale_jobs()

next_job_id = await db.lpop(PENDING_JOBS_KEY)

await update_summary_queue_metric()
Expand All @@ -256,6 +258,11 @@ async def maybe_run_next_job() -> None:


async def monitor_candidate_jobs() -> None:
await restore_stale_jobs()

while not await is_openai_api_ready():
await asyncio.sleep(TIME_BETWEEN_JOBS_CHECK)

while True:
try:
await maybe_run_next_job()
Expand Down

0 comments on commit 8ef27ec

Please sign in to comment.