@igalarzab
and @maraujop
"Resiliency is the ability to provide and maintain an acceptable level of service in the face of faults and challenges to normal operation."
RQ (Redis)
At least for us :)
jobs.py
models.py
must existfrom sqjobs.job import Job class AdderJob(Job): name = 'adder_job' queue = 'my_queue' def run(self, *args, **kwargs): return sum(args)
from sqjobs import create_sqs_broker from myapp.jobs import AdderJob kwargs = { 'access_key': settings.SQJOBS_SQS_ACCESS_KEY, 'secret_key': settings.SQJOBS_SQS_ACCESS_KEY } broker = create_sqs_broker(**kwargs) broker.add_job(AdderJob, *[1, 2, 3, 4])
$payload = array( 'name' => $task_name, 'args' => $args, 'kwargs' => $kwargs ); $json_payload = json_encode($payload); $this->_sqs = new AmazonSQS($amazon_config['aws_key'], $amazon_config['aws_secret_key']); $result = $this->_sqs->send_message($this->_queue_urls[$queue_name], base64_encode($json_payload));
$ ./manage.py sqjobs worker $queue_name
>>> from sqjobs import create_eager_broker >>> broker = create_eager_broker() >>> from jobs import AdderJob >>> job_added = broker.add_job(AdderJob, *[1, 2, 3]) >>> job_added ('fdb005d3-276f-4f75-8e8e-c8fcde67043c', AdderJob()) >>> job_added[1].result 6
from sqjobs.contrib.django.djsqjobs.result_job import ResultJob class DummyResultJob(ResultJob): name = 'dummy_result_job' queue = 'dummy_queue' def run(self, *args, **kwargs): pass
>>> from sqjobs.contrib.django.djsqjobs.models import JobStatus >>> my_job = JobStatus.objects.get(job_id='1234') >>> if my_job.status == JobStatus.SUCCESS: ... print my_job.result
called Beater
from djsqjobs import PeriodicJob class DummyPeriodicJob(PeriodicJob): name = "dummy_periodic_job" queue = "my_queue" schedule = "1 0 * * *" timezone = "Europe/Madrid" def run(self, *args, **kwargs): pass
$ ./manage.py sqjobs beater $queue_name
set_up
, run
, tear_down
run
is mandatoryset_up
would be called before run if existstear_down
right after run
if exists.from abc import abstractmethod, ABCMeta from six import add_metaclass import logging logger = logging.getLogger('timed_job') @add_metaclass(ABCMeta) class TimedJob(Job): def set_up(self, *args, **kwargs): super(TimedJob, self).set_up(*args, **kwargs) self.start_time = datetime.now() def tear_down(self, *args, **kwargs): end_time = datetime.now() delta = end_time - self.start_time logger.info('%s finished in %d seconds' % (self.name, (delta * 1000).seconds)) super(TimedJob, self).tear_down(*args, **kwargs) @abstractmethod def run(self, *args, **kwargs): raise NotImplementedError
on_success
and on_failure
methods will be called
depending on the output of our job execution.
on_success
and on_failure
from abc import abstractmethod, ABCMeta from six import add_metaclass import logging logger = logging.getLogger('logger_job') @add_metaclass(ABCMeta) class LoggerJob(Job): def on_success(self, *args, **kwargs): logger.log('Successfully finished job %s' % self.name) super(LoggerJob, self).on_success(*args, **kwargs) def on_failure(self, *args, **kwargs): logger.log('Failed job %s' % self.name) super(LoggerJob, self).on_failure(*args, **kwargs) @abstractmethod def run(self, *args, **kwargs): raise NotImplementedError
@gnufede