Skip to content

Commit

Permalink
Transition basket tasks to use rq
Browse files Browse the repository at this point in the history
  • Loading branch information
robhudson committed Jun 26, 2023
1 parent 4a30aa0 commit bf373e5
Show file tree
Hide file tree
Showing 45 changed files with 1,014 additions and 762 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
source = basket
omit =
manage.py
basket/settings.py
basket/wsgi.py
basket/base/tests/*
basket/news/migrations/*
basket/news/tests/*

1 change: 0 additions & 1 deletion .demo_env
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
DEBUG=False
DATABASE_URL=sqlite:///basket.db
CELERY_TASK_ALWAYS_EAGER=True
SECRET_KEY=sssssssssshhhhhhhhhhh
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
.git
.env
celerybeat-schedule
*.db
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
/static
Dockerfile.jenkins
build.py
celerybeat-schedule
docs/_build
docs/_gh-pages
pip-log.txt
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pull: .env
@touch .make.docker.pull

run: .make.docker.pull
${DC} up web
${DC} up web worker

run-shell:
${DC} run --rm web bash
Expand Down
4 changes: 0 additions & 4 deletions Procfile

This file was deleted.

55 changes: 55 additions & 0 deletions basket/base/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import functools

from django.conf import settings

from django_statsd.clients import statsd
from rq.decorators import job as rq_job

from basket.base.rq import enqueue_kwargs, get_queue


def rq_task(func):
"""
Decorator to standardize RQ tasks.
Uses RQ's job decorator, but:
- uses our default queue and connection
- adds retry logic with exponential backoff
- adds success/failure/retry callbacks
- adds statsd metrics for job success/failure/retry
- adds Sentry error reporting for failed jobs
"""
task_name = f"{func.__module__}.{func.__qualname__}"

queue = get_queue()
connection = queue.connection

kwargs = enqueue_kwargs(func)

@rq_job(
queue,
connection=connection,
**kwargs,
)
@functools.wraps(func)
def wrapped(*args, **kwargs):
# If in maintenance mode, queue the task for later.
if settings.MAINTENANCE_MODE:
if settings.READ_ONLY_MODE:
statsd.incr(f"{task_name}.not_queued")
else:
from basket.news.models import QueuedTask

QueuedTask.objects.create(
name=task_name,
args=args,
kwargs=kwargs,
)
statsd.incr(f"{task_name}.queued")
return

# NOTE: Exceptions are handled with the RQ_EXCEPTION_HANDLERS
return func(*args, **kwargs)

return wrapped
15 changes: 15 additions & 0 deletions basket/base/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class BasketError(Exception):
"""
Tasks can raise this when an error happens that we should not retry.
E.g. if the error indicates we're passing bad parameters, as opposed to an
error where we'd typically raise `NewsletterException`.
"""

pass


class RetryTask(Exception):
"""An exception to raise within a task if you just want to retry."""

pass
37 changes: 37 additions & 0 deletions basket/base/management/commands/rqworker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import sys

from django.core.management.base import BaseCommand

from basket.base.rq import get_worker


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"-b",
"--burst",
action="store_true",
dest="burst",
default=False,
help="Run worker in burst mode and quit when queues are empty. Default: False",
)
parser.add_argument(
"-s",
"--with-scheduler",
action="store_true",
dest="with_scheduler",
default=False,
help="Run worker with scheduler enabled. Default: False",
)

def handle(self, *args, **options):
kwargs = {
"burst": options.get("burst", False),
"with_scheduler": options.get("with_scheduler", False),
}
try:
worker = get_worker()
worker.work(**kwargs)
except ConnectionError as e:
self.stderr.write(str(e))
sys.exit(1)
10 changes: 10 additions & 0 deletions basket/base/management/commands/snitch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from time import time

from django.core.management.base import BaseCommand

from basket.base.tasks import snitch


class Command(BaseCommand):
def handle(self, *args, **options):
snitch.delay(time())
233 changes: 233 additions & 0 deletions basket/base/rq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import random
import re
import traceback
from time import time

from django.conf import settings

import redis
import requests
import sentry_sdk
from django_statsd.clients import statsd
from rq import Callback, Retry, SimpleWorker
from rq.queue import Queue
from rq.serializers import JSONSerializer
from silverpop.api import SilverpopResponseException

from basket.base.exceptions import RetryTask
from basket.news.backends.common import NewsletterException

# don't propagate and don't retry if these are the error messages
IGNORE_ERROR_MSGS = [
"INVALID_EMAIL_ADDRESS",
"InvalidEmailAddress",
"An invalid phone number was provided",
"No valid subscribers were provided",
"There are no valid subscribers",
"email address is suppressed",
"invalid email address",
]
# don't propagate and don't retry if these regex match the error messages
IGNORE_ERROR_MSGS_RE = [re.compile(r"campaignId \d+ not found")]
# Exceptions we allow to retry, all others will abort retries.
EXCEPTIONS_ALLOW_RETRY = [
IOError,
NewsletterException,
requests.RequestException,
RetryTask,
SilverpopResponseException,
]


# Our cached Redis connection.
_REDIS_CONN = None


def get_redis_connection(url=None, force=False):
"""
Get a Redis connection.
Expects a URL including the db, or defaults to `settings.RQ_URL`.
Call example:
get_redis_connection("redis://localhost:6379/0")
"""
global _REDIS_CONN

if force or _REDIS_CONN is None:
if url is None:
if settings.RQ_URL is None:
# Note: RQ_URL is derived from REDIS_URL.
raise ValueError("No `settings.REDIS_URL` specified")

Check warning on line 62 in basket/base/rq.py

View check run for this annotation

Codecov / codecov/patch

basket/base/rq.py#L62

Added line #L62 was not covered by tests
url = settings.RQ_URL
_REDIS_CONN = redis.Redis.from_url(url)

return _REDIS_CONN


def get_queue(queue="default"):
"""
Get an RQ queue with our chosen parameters.
"""
return Queue(
queue,
connection=get_redis_connection(),
is_async=settings.RQ_IS_ASYNC,
serializer=JSONSerializer,
)


def get_worker(queues=None):
"""
Get an RQ worker with our chosen parameters.
"""
if queues is None:
queues = ["default"]

return SimpleWorker(
queues,
connection=get_redis_connection(),
disable_default_exception_handler=True,
exception_handlers=[store_task_exception_handler],
serializer=JSONSerializer,
)


def enqueue_kwargs(func):
if isinstance(func, str):
task_name = func
else:
task_name = f"{func.__module__}.{func.__qualname__}"

# Start time is used to calculate the total time taken by the task, which includes queue time plus execution time of the task itself.
meta = {
"task_name": task_name,
"start_time": time(),
}

if settings.RQ_MAX_RETRIES == 0:
retry = None
else:
retry = Retry(settings.RQ_MAX_RETRIES, rq_exponential_backoff())

return {
"meta": meta,
"retry": retry,
"result_ttl": settings.RQ_RESULT_TTL,
"on_success": Callback(rq_on_success),
"on_failure": Callback(rq_on_failure),
}


def rq_exponential_backoff():
"""
Return an array of retry delays for RQ using an exponential back-off, using
jitter to even out the spikes, waiting at least 1 minute between retries.
"""
if settings.DEBUG:
# While debugging locally, enable faster retries.
return [5 for n in range(settings.RQ_MAX_RETRIES)]
else:
return [max(60, random.randrange(min(settings.RQ_MAX_RETRY_DELAY, 120 * (2**n)))) for n in range(settings.RQ_MAX_RETRIES)]


def log_timing(job):
if start_time := job.meta.get("start_time"):
total_time = int(time() - start_time) * 1000
statsd.timing(f"{job.meta['task_name']}.duration", total_time)


def rq_on_success(job, connection, result, *args, **kwargs):
# Don't fire statsd metrics in maintenance mode.
if not settings.MAINTENANCE_MODE:
log_timing(job)
task_name = job.meta["task_name"]
statsd.incr(f"{task_name}.success")
if not task_name.endswith("snitch"):
statsd.incr("news.tasks.success_total")


def rq_on_failure(job, connection, *exc_info, **kwargs):
# Don't fire statsd metrics in maintenance mode.
if not settings.MAINTENANCE_MODE:
log_timing(job)
task_name = job.meta["task_name"]
statsd.incr(f"{task_name}.failure")
if not task_name.endswith("snitch"):
statsd.incr("news.tasks.failure_total")


def ignore_error(exc, to_ignore=None, to_ignore_re=None):
to_ignore = to_ignore or IGNORE_ERROR_MSGS
to_ignore_re = to_ignore_re or IGNORE_ERROR_MSGS_RE
msg = str(exc)
for ignore_msg in to_ignore:
if ignore_msg in msg:
return True

for ignore_re in to_ignore_re:
if ignore_re.search(msg):
return True

return False


def store_task_exception_handler(job, *exc_info):
"""
Handler to store task failures in the database.
"""
task_name = job.meta["task_name"]

if task_name.endswith("snitch"):
return

# A job will retry if it's failed but not yet reached the max retries.
# We know when a job is going to be retried if the status is `is_scheduled`, otherwise the
# status is set to `is_failed`.

if job.is_scheduled:
# Job failed but is scheduled for a retry.
statsd.incr(f"{task_name}.retry")
statsd.incr(f"{task_name}.retries_left.{job.retries_left + 1}")
statsd.incr("news.tasks.retry_total")

if exc_info[1] not in EXCEPTIONS_ALLOW_RETRY:
# Force retries to abort.
# Since there's no way to abort retries at the moment in RQ, we can set the job `retries_left` to zero.
# This will retry this time but no further retries will be performed.
job.retries_left = 0

return

if job.is_failed:
statsd.incr(f"{task_name}.retry_max")
statsd.incr("news.tasks.retry_max_total")

# Job failed but no retries left.
if settings.STORE_TASK_FAILURES:
# Here to avoid a circular import.
from basket.news.models import FailedTask

FailedTask.objects.create(
task_id=job.id,
name=job.meta["task_name"],
args=job.args,
kwargs=job.kwargs,
exc=exc_info[1].__repr__(),
einfo="".join(traceback.format_exception(*exc_info)),
)

if ignore_error(exc_info[1]):
with sentry_sdk.push_scope() as scope:
scope.set_tag("action", "ignored")
sentry_sdk.capture_exception()
return

# Don't log to sentry if we explicitly raise `RetryTask`.
if not (isinstance(exc_info[1], RetryTask)):
with sentry_sdk.push_scope() as scope:
scope.set_tag("action", "retried")
sentry_sdk.capture_exception()
Loading

0 comments on commit bf373e5

Please sign in to comment.