delay_seconds = $delay; } public function getDelay() { return $this->delay_seconds; } } /** * Job handler interface which can be safely enqueued. */ interface DJJobHandlerInterface { /** * Method that will perform the job when retrieved from the jobs table. */ public function perform(); } /** * Base class for delayed job. */ class DJBase { // error severity levels const CRITICAL = 4; const ERROR = 3; const WARN = 2; const INFO = 1; const DEBUG = 0; /** * @var int */ private static $log_level = self::DEBUG; /** * @var null|PDO */ private static $db = null; /** * @var string */ protected static $jobsTable = ""; /** * @var string */ private static $dsn = ""; /** * @var string */ private static $user = ""; /** * @var string */ private static $password = ""; /** * @var int */ private static $retries = 3; //default retries // use either `configure` or `setConnection`, depending on if // you already have a PDO object you can re-use /** * Configures DJJob with certain values for the database connection. */ public static function configure(){ $args = func_get_args(); $numArgs = func_num_args(); switch ($numArgs) { case 1:{ if (is_array($args[0])){ self::configureWithOptions($args[0]); } else { self::configureWithDsnAndOptions($args[0]); } break; } case 2:{ if (is_array($args[0])){ self::configureWithOptions($args[0], $args[1]); } else { self::configureWithDsnAndOptions($args[0], $args[1]); } break; } case 3: { self::configureWithDsnAndOptions($args[0], $args[1], $args[2]); break; } } } /** * Configures DJJob with certain values for the database connection. * * @param $dsn The PDO connection string. * @param array $options The options for the PDO connection. * @param string $jobsTable Name of the jobs table. * * @throws DJException Throws an exception with invalid parameters. */ protected static function configureWithDsnAndOptions($dsn, array $options = array(), $jobsTable = 'jobs') { if (!isset($options['mysql_user'])){ throw new DJException("Please provide the database user in configure options array."); } if (!isset($options['mysql_pass'])){ throw new DJException("Please provide the database password in configure options array."); } self::$dsn = $dsn; self::$jobsTable = $jobsTable; self::$user = $options['mysql_user']; self::$password = $options['mysql_pass']; // searches for retries if (isset($options['retries'])){ self::$retries = (int) $options['retries']; } } /** * @param array $options * @param string $jobsTable * * @throws DJException Throws an exception with invalid parameters. */ protected static function configureWithOptions(array $options, $jobsTable = 'jobs') { if (!isset($options['driver'])){ throw new DJException("Please provide the database driver used in configure options array."); } if (!isset($options['user'])){ throw new DJException("Please provide the database user in configure options array."); } if (!isset($options['password'])){ throw new DJException("Please provide the database password in configure options array."); } self::$user = $options['user']; self::$password = $options['password']; self::$jobsTable = $jobsTable; self::$dsn = $options['driver'] . ':'; foreach ($options as $key => $value) { // skips options already used if ($key == 'driver' || $key == 'user' || $key == 'password') { continue; } // searches for retries if ($key == 'retries'){ self::$retries = (int) $value; continue; } self::$dsn .= $key . '=' . $value . ';'; } } /** * @param int $const The log level to set. */ public static function setLogLevel($const) { self::$log_level = $const; } /** * @param PDO $db The database connection to use. */ public static function setConnection(PDO $db) { self::$db = $db; } /** * Returns the connection DJBase knows about. * * Tries to connect if no connection is present. * * @return null|PDO The connection if a valid connection is present. * @throws Exception */ protected static function getConnection() { if (self::$db === null) { try { self::$db = new PDO(self::$dsn, self::$user, self::$password); self::$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); } catch (PDOException $e) { throw new Exception("DJJob couldn't connect to the database. PDO said [{$e->getMessage()}]"); } } return self::$db; } /** * Runs a query with a resultset against the database. * * @param string $sql The query to execute. * @param array $params The params necessary for a prepared statement. * * @return array Returns the complete resultset. * @throws DJException Throws if the query couldn't be executed. */ public static function runQuery($sql, $params = array()) { for ($attempts = 0; $attempts < self::$retries; $attempts++) { try { $stmt = self::getConnection()->prepare($sql); $stmt->execute($params); $ret = array(); if ($stmt->rowCount()) { // calling fetchAll on a result set with no rows throws a // "general error" exception foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $r) $ret []= $r; } $stmt->closeCursor(); return $ret; } catch (PDOException $e) { // Catch "MySQL server has gone away" error. if ($e->errorInfo[1] == 2006) { self::$db = null; } // Throw all other errors as expected. else { throw $e; } } } throw new DJException("DJJob exhausted retries connecting to database"); } /** * Runs an update query against the database. * * @param string $sql The query to execute. * @param array $params The params necessary for the prepared statement. * * @return int The amount of affected rows. * @throws DJException Throws if the query couldn't be executed. */ public static function runUpdate($sql, $params = array()) { for ($attempts = 0; $attempts < self::$retries; $attempts++) { try { $stmt = self::getConnection()->prepare($sql); $stmt->execute($params); return $stmt->rowCount(); } catch (PDOException $e) { // Catch "MySQL server has gone away" error. if ($e->errorInfo[1] == 2006) { self::$db = null; } // Throw all other errors as expected. else { throw $e; } } } throw new DJException("DJJob exhausted retries connecting to database"); } /** * Logs a message to the output. * * @param string $mesg The message to log. * @param int $severity The log level necessary for this message to display. */ protected static function log($mesg, $severity=self::CRITICAL) { if ($severity >= self::$log_level) { printf("[%s] %s\n", date('c'), $mesg); } } } /** * The worker class that can empty a queue. */ class DJWorker extends DJBase { # This is a singleton-ish thing. It wouldn't really make sense to # instantiate more than one in a single request (or commandline task) /** * DJWorker constructor. * * The following options are available: * `queue`: The queue to work on. Default: 'default' * `count`: How many jobs to execute before exiting. Use '0' for no-limit. Default: '0' * `sleep`: How long to sleep if no jobs are found. Default: '5' * `max_attempts`: How many times to try a job before bailing out. Default: '5' * `fail_on_output`: Whether to fail on output. Default: 'false' * * @param array $options The settings for this worker. */ public function __construct($options = array()) { $options = array_merge(array( "queue" => "default", "count" => 0, "sleep" => 5, "max_attempts" => 5, "fail_on_output" => false ), $options); list($this->queue, $this->count, $this->sleep, $this->max_attempts, $this->fail_on_output) = array($options["queue"], $options["count"], $options["sleep"], $options["max_attempts"], $options["fail_on_output"]); list($hostname, $pid) = array(trim(gethostname()), getmypid()); $this->name = "host::$hostname pid::$pid"; if (function_exists("pcntl_signal")) { pcntl_signal(SIGTERM, array($this, "handleSignal")); pcntl_signal(SIGINT, array($this, "handleSignal")); } } /** * Handles a signal from the operating system. * * @param string $signo The signal received from the OS. */ public function handleSignal($signo) { $signals = array( SIGTERM => "SIGTERM", SIGINT => "SIGINT" ); $signal = $signals[$signo]; $this->log("[WORKER] Received received {$signal}... Shutting down", self::INFO); $this->releaseLocks(); die(0); } /** * Releases all locks this worker has on the jobs table. */ public function releaseLocks() { $this->runUpdate(" UPDATE " . self::$jobsTable . " SET locked_at = NULL, locked_by = NULL WHERE locked_by = ?", array($this->name) ); } /** * Returns a new job ordered by most recent first * why this? * run newest first, some jobs get left behind * run oldest first, all jobs get left behind * * @return \DJJob|false A job if one was successfully locked. Otherwise false. */ public function getNewJob() { # we can grab a locked job if we own the lock $rs = $this->runQuery(" SELECT id FROM " . self::$jobsTable . " WHERE queue = ? AND (run_at IS NULL OR NOW() >= run_at) AND (locked_at IS NULL OR locked_by = ?) AND failed_at IS NULL AND attempts < ? ORDER BY created_at DESC LIMIT 10 ", array($this->queue, $this->name, $this->max_attempts)); // randomly order the 10 to prevent lock contention among workers shuffle($rs); foreach ($rs as $r) { $job = new DJJob($this->name, $r["id"], array( "max_attempts" => $this->max_attempts, "fail_on_output" => $this->fail_on_output )); if ($job->acquireLock()) return $job; } return false; } /** * Starts the worker process. */ public function start() { $this->log("[JOB] Starting worker {$this->name} on queue::{$this->queue}", self::INFO); $count = 0; $job_count = 0; try { while ($this->count == 0 || $count < $this->count) { if (function_exists("pcntl_signal_dispatch")) pcntl_signal_dispatch(); $count += 1; $job = $this->getNewJob($this->queue); if (!$job) { $this->log("[JOB] Failed to get a job, queue::{$this->queue} may be empty", self::DEBUG); sleep($this->sleep); continue; } $job_count += 1; $job->run(); } } catch (Exception $e) { $this->log("[JOB] unhandled exception::\"{$e->getMessage()}\"", self::ERROR); } $this->log("[JOB] worker shutting down after running {$job_count} jobs, over {$count} polling iterations", self::INFO); } } /** * Represents a job that needs to be executed. */ class DJJob extends DJBase { /** * Constructs the Job * * Possible options: * `max_attempts`: The amount of attempts before bailing out. Default: '5' * `fail_on_output`: Whether the job fails if there is output in the handler. Default: 'false' * * @param string $worker_name Name of the worker that created this job. * @param int $job_id ID of this job. * @param array $options The options. */ public function __construct($worker_name, $job_id, $options = array()) { $options = array_merge(array( "max_attempts" => 5, "fail_on_output" => false ), $options); $this->worker_name = $worker_name; $this->job_id = $job_id; $this->max_attempts = $options["max_attempts"]; $this->fail_on_output = $options["fail_on_output"]; } /** * Runs this job. * * First retrieves the handler fro the database. Then perform the job. * * @return bool Whether or not the job succeeded. */ public function run() { # pull the handler from the db $handler = $this->getHandler(); if (!is_object($handler)) { $msg = "[JOB] bad handler for job::{$this->job_id}"; $this->finishWithError($msg); return false; } # run the handler try { if ($this->fail_on_output) { ob_start(); } $handler->perform(); if ($this->fail_on_output) { $output = ob_get_contents(); ob_end_clean(); if (!empty($output)) { throw new Exception("Job produced unexpected output: $output"); } } # cleanup $this->finish(); return true; } catch (DJRetryException $e) { if ($this->fail_on_output) { ob_end_flush(); } # attempts hasn't been incremented yet. $attempts = $this->getAttempts()+1; $msg = "Caught DJRetryException \"{$e->getMessage()}\" on attempt $attempts/{$this->max_attempts}."; if($attempts == $this->max_attempts) { $msg = "[JOB] job::{$this->job_id} $msg Giving up."; $this->finishWithError($msg, $handler); } else { $this->log("[JOB] job::{$this->job_id} $msg Try again in {$e->getDelay()} seconds.", self::WARN); $this->retryLater($e->getDelay()); } return false; } catch (Exception $e) { if ($this->fail_on_output) { ob_end_flush(); } $this->finishWithError($e->getMessage(), $handler); return false; } } /** * Acquires lock on this job. * * @return bool Whether or not acquiring the lock succeeded. */ public function acquireLock() { $this->log("[JOB] attempting to acquire lock for job::{$this->job_id} on {$this->worker_name}", self::INFO); $lock = $this->runUpdate(" UPDATE " . self::$jobsTable . " SET locked_at = NOW(), locked_by = ? WHERE id = ? AND (locked_at IS NULL OR locked_by = ?) AND failed_at IS NULL ", array($this->worker_name, $this->job_id, $this->worker_name)); if (!$lock) { $this->log("[JOB] failed to acquire lock for job::{$this->job_id}", self::INFO); return false; } return true; } /** * Releases the lock on this job. */ public function releaseLock() { $this->runUpdate(" UPDATE " . self::$jobsTable . " SET locked_at = NULL, locked_by = NULL WHERE id = ?", array($this->job_id) ); } /** * Finishes this job. Will delete it from the jobs table. */ public function finish() { $this->runUpdate( "DELETE FROM " . self::$jobsTable . " WHERE id = ?", array($this->job_id) ); $this->log("[JOB] completed job::{$this->job_id}", self::INFO); } /** * Finishes this job, but with an error. Keeps the job in the jobs table. * * @param string $error The error message to write to the job. * @param null|object $handler The handler that ran this job. */ public function finishWithError($error, $handler = null) { $this->runUpdate(" UPDATE " . self::$jobsTable . " SET attempts = attempts + 1, failed_at = IF(attempts >= ?, NOW(), NULL), error = IF(attempts >= ?, ?, NULL) WHERE id = ?", array( $this->max_attempts, $this->max_attempts, $error, $this->job_id ) ); $this->log($error, self::ERROR); $this->log("[JOB] failure in job::{$this->job_id}", self::ERROR); $this->releaseLock(); if ($handler && ($this->getAttempts() == $this->max_attempts) && method_exists($handler, '_onDjjobRetryError')) { $handler->_onDjjobRetryError($error); } } /** * Saves a retry date to this job. * * @param int $delay The amount of seconds to delay this job. */ public function retryLater($delay) { $this->runUpdate(" UPDATE " . self::$jobsTable . " SET run_at = DATE_ADD(NOW(), INTERVAL ? SECOND), attempts = attempts + 1 WHERE id = ?", array( $delay, $this->job_id ) ); $this->releaseLock(); } /** * Returns the handler for this job. * * @return bool|object The handler object for this job. Or false if it failed. */ public function getHandler() { $rs = $this->runQuery( "SELECT handler FROM " . self::$jobsTable . " WHERE id = ?", array($this->job_id) ); foreach ($rs as $r) return unserialize($r["handler"]); return false; } /** * Returns the amount of attempts left for this job. * * @return bool The amount of attempts left. */ public function getAttempts() { $rs = $this->runQuery( "SELECT attempts FROM " . self::$jobsTable . " WHERE id = ?", array($this->job_id) ); foreach ($rs as $r) return $r["attempts"]; return false; } /** * Enqueues a job to the database. * * @param object $handler The handler that can execute this job. * @param string $queue The queue to enqueue this job to. All queues are saved in the same table. * @param string $run_at A valid mysql DATETIME string at which to run the jobs. * * @return bool|string Returns the last inserted ID or false if enqueuing failed. */ public static function enqueue($handler, $queue = "default", $run_at = null) { $affected = self::runUpdate( "INSERT INTO " . self::$jobsTable . " (handler, queue, run_at, created_at) VALUES(?, ?, ?, NOW())", array(serialize($handler), (string) $queue, $run_at) ); if ($affected < 1) { self::log("[JOB] failed to enqueue new job", self::ERROR); return false; } return self::getConnection()->lastInsertId(); // return the job ID, for manipulation later } /** * Bulk enqueues a lot of jobs to the database. * * @param object[] $handlers An array of handlers to enqueue. * @param string $queue The queue to enqueue the handlers to. * @param string $run_at A valid mysql DATETIME string at which to run the jobs. * * @return bool */ public static function bulkEnqueue($handlers, $queue = "default", $run_at = null) { $sql = "INSERT INTO " . self::$jobsTable . " (handler, queue, run_at, created_at) VALUES"; $sql .= implode(",", array_fill(0, count($handlers), "(?, ?, ?, NOW())")); $parameters = array(); foreach ($handlers as $handler) { $parameters []= serialize($handler); $parameters []= (string) $queue; $parameters []= $run_at; } $affected = self::runUpdate($sql, $parameters); if ($affected < 1) { self::log("[JOB] failed to enqueue new jobs", self::ERROR); return false; } if ($affected != count($handlers)) self::log("[JOB] failed to enqueue some new jobs", self::ERROR); return true; } /** * Returns the general status of the jobs table. * * @param string $queue The queue of which to see the status for. * * @return array Information about the status. */ public static function status($queue = "default") { $rs = self::runQuery(" SELECT COUNT(*) as total, COUNT(failed_at) as failed, COUNT(locked_at) as locked FROM `" . self::$jobsTable . "` WHERE queue = ? ", array($queue)); $rs = $rs[0]; $failed = $rs["failed"]; $locked = $rs["locked"]; $total = $rs["total"]; $outstanding = $total - $locked - $failed; return array( "outstanding" => $outstanding, "locked" => $locked, "failed" => $failed, "total" => $total ); } }