<?php
/**
* Interface for Danga's Gearman job scheduling system
*
* PHP version 5.1.0+
*
* LICENSE: This source file is subject to the New BSD license that is
* available through the world-wide-web at the following URI:
* http://www.opensource.org/licenses/bsd-license.php. If you did not receive
* a copy of the New BSD License and are unable to obtain it through the web,
* please send a note to license@php.net so we can mail you a copy immediately.
*
* @category Net
* @package ShSo\Net\Gearman
* @author Joe Stump <joe@joestump.net>
* @copyright 2007-2008 Digg.com, Inc.
* @license http://www.opensource.org/licenses/bsd-license.php New BSD License
* @version CVS: $Id$
* @link http://pear.php.net/package/Net_Gearman
* @link http://www.danga.com/gearman/
*/
namespace ShSo\Net\Gearman;
/**
* Gearman worker class
*
* Run an instance of a worker to listen for jobs. It then manages the running
* of jobs, etc.
*
* <code>
* <?php
*
* $servers = array(
* '127.0.0.1:7003',
* '127.0.0.1:7004'
* );
*
* $abilities = array('HelloWorld', 'Foo', 'Bar');
*
* try {
* $worker = new ShSo\Net\Gearman\Worker($servers);
* foreach ($abilities as $ability) {
* $worker->addAbility('HelloWorld');
* }
* $worker->beginWork();
* } catch (ShSo\Net\Gearman\Exception $e) {
* echo $e->getMessage() . "\n";
* exit;
* }
*
* ?>
* </code>
*
* @category Net
* @package ShSo\Net\Gearman
* @author Joe Stump <joe@joestump.net>
* @copyright 2007-2008 Digg.com, Inc.
* @license http://www.opensource.org/licenses/bsd-license.php New BSD License
* @version Release: @package_version@
* @link http://www.danga.com/gearman/
* @see ShSo\Net\Gearman\Job, ShSo\Net\Gearman\Connection
*/
class Worker
{
/**
* Pool of connections to Gearman servers
*
* @var array $conn
*/
protected $conn = array();
/**
* Pool of retry connections
*
* @var array $conn
*/
protected $retryConn = array();
/**
* Pool of worker abilities
*
* @var array $conn
*/
protected $abilities = array();
/**
* Parameters for job contructors, indexed by ability name
*
* @var array $initParams
*/
protected $initParams = array();
/**
* Callbacks registered for this worker
*
* @var array $callback
* @see ShSo\Net\Gearman\Worker::JOB_START
* @see ShSo\Net\Gearman\Worker::JOB_COMPLETE
* @see ShSo\Net\Gearman\Worker::JOB_FAIL
*/
protected $callback = array(
self::JOB_START => array(),
self::JOB_COMPLETE => array(),
self::JOB_FAIL => array()
);
/**
* Unique id for this worker
*
* @var string $id
*/
protected $id = "";
/**
* Callback types
*
* @const integer JOB_START Ran when a job is started
* @const integer JOB_COMPLETE Ran when a job is finished
* @const integer JOB_FAIL Ran when a job fails
*/
const JOB_START = 1;
const JOB_COMPLETE = 2;
const JOB_FAIL = 3;
/**
* Constructor
*
* @param array $servers List of servers to connect to
* @param string $id Optional unique id for this worker
*
* @return void
* @throws ShSo\Net\Gearman\Exception
* @see ShSo\Net\Gearman\Connection
*/
public function __construct($servers = null, $id = "")
{
if (is_null($servers)){
$servers = array("localhost");
} elseif (!is_array($servers) && strlen($servers)) {
$servers = array($servers);
} elseif (is_array($servers) && !count($servers)) {
throw new Exception('Invalid servers specified');
}
if(empty($id)){
$id = "pid_".getmypid()."_".uniqid();
}
$this->id = $id;
foreach ($servers as $s) {
try {
$conn = Connection::connect($s);
Connection::send($conn, "set_client_id", array("client_id" => $this->id));
$this->conn[$s] = $conn;
} catch (Exception $e) {
$this->retryConn[$s] = time();
}
}
if (empty($this->conn)) {
throw new Exception(
"Couldn't connect to any available servers"
);
}
}
/**
* Announce an ability to the job server
*
* @param string $ability Name of functcion/ability
* @param integer $timeout How long to give this job
* @param array $initParams Parameters for job constructor
*
* @return void
* @see ShSo\Net\Gearman\Connection::send()
*/
public function addAbility($ability, $timeout = null, $initParams=array())
{
$call = 'can_do';
$params = array('func' => $ability);
if (is_int($timeout) && $timeout > 0) {
$params['timeout'] = $timeout;
$call = 'can_do_timeout';
}
$this->initParams[$ability] = $initParams;
$this->abilities[$ability] = $timeout;
foreach ($this->conn as $conn) {
Connection::send($conn, $call, $params);
}
}
/**
* Begin working
*
* This starts the worker on its journey of actually working. The first
* argument is a PHP callback to a function that can be used to monitor
* the worker. If no callback is provided then the worker works until it
* is killed. The monitor is passed two arguments; whether or not the
* worker is idle and when the last job was ran.
*
* @param callback $monitor Function to monitor work
*
* @return void
* @see ShSo\Net\Gearman\Connection::send(), ShSo\Net\Gearman\Connection::connect()
* @see ShSo\Net\Gearman\Worker::doWork(), ShSo\Net\Gearman\Worker::addAbility()
*/
public function beginWork($monitor = null)
{
if (!is_callable($monitor)) {
$monitor = array($this, 'stopWork');
}
$write = null;
$except = null;
$working = true;
$lastJob = time();
$retryTime = 5;
while ($working) {
$sleep = true;
$currentTime = time();
foreach ($this->conn as $server => $socket) {
$worked = false;
try {
$worked = $this->doWork($socket);
} catch (Exception $e) {
unset($this->conn[$server]);
$this->retryConn[$server] = $currentTime;
}
if ($worked) {
$lastJob = time();
$sleep = false;
}
}
$idle = false;
if ($sleep && count($this->conn)) {
foreach ($this->conn as $socket) {
Connection::send($socket, 'pre_sleep');
}
$read = $this->conn;
socket_select($read, $write, $except, 60);
$idle = (count($read) == 0);
}
$retryChange = false;
foreach ($this->retryConn as $s => $lastTry) {
if (($lastTry + $retryTime) < $currentTime) {
try {
$conn = Connection::connect($s);
$this->conn[$s] = $conn;
$retryChange = true;
unset($this->retryConn[$s]);
Connection::send($conn, "set_client_id", array("client_id" => $this->id));
} catch (Exception $e) {
$this->retryConn[$s] = $currentTime;
}
}
}
if (count($this->conn) == 0) {
// sleep to avoid wasted cpu cycles if no connections to block on using socket_select
sleep(1);
}
if ($retryChange === true) {
// broadcast all abilities to all servers
foreach ($this->abilities as $ability => $timeout) {
$this->addAbility(
$ability, $timeout, $this->initParams[$ability]
);
}
}
if (call_user_func($monitor, $idle, $lastJob) == true) {
$working = false;
}
}
}
/**
* Listen on the socket for work
*
* Sends the 'grab_job' command and then listens for either the 'noop' or
* the 'no_job' command to come back. If the 'job_assign' comes down the
* pipe then we run that job.
*
* @param resource $socket The socket to work on
*
* @return boolean Returns true if work was done, false if not
* @throws ShSo\Net\Gearman\Exception
* @see ShSo\Net\Gearman\Connection::send()
*/
protected function doWork($socket)
{
Connection::send($socket, 'grab_job');
$resp = array('function' => 'noop');
while (count($resp) && $resp['function'] == 'noop') {
$resp = Connection::blockingRead($socket);
}
if (in_array($resp['function'], array('noop', 'no_job'))) {
return false;
}
if ($resp['function'] != 'job_assign') {
throw new Exception('Holy Cow! What are you doing?!');
}
$name = $resp['data']['func'];
$handle = $resp['data']['handle'];
$arg = array();
if (isset($resp['data']['arg']) &&
Connection::stringLength($resp['data']['arg'])) {
$arg = json_decode($resp['data']['arg'], true);
if($arg === null){
$arg = $resp['data']['arg'];
}
}
$job = Job::factory(
$name, $socket, $handle, $this->initParams[$name]
);
try {
$this->start($handle, $name, $arg);
$res = $job->run($arg);
if (!is_array($res)) {
$res = array('result' => $res);
}
$job->complete($res);
$this->complete($handle, $name, $res);
} catch (Job\Exception $e) {
$job->fail();
$this->fail($handle, $name, $e);
}
// Force the job's destructor to run
$job = null;
return true;
}
/**
* Attach a callback
*
* @param callback $callback A valid PHP callback
* @param integer $type Type of callback
*
* @return void
* @throws ShSo\Net\Gearman\Exception When an invalid callback is specified.
* @throws ShSo\Net\Gearman\Exception When an invalid type is specified.
*/
public function attachCallback($callback, $type = self::JOB_COMPLETE)
{
if (!is_callable($callback)) {
throw new Exception('Invalid callback specified');
}
if (!isset($this->callback[$type])) {
throw new Exception('Invalid callback type specified.');
}
$this->callback[$type][] = $callback;
}
/**
* Run the job start callbacks
*
* @param string $handle The job's Gearman handle
* @param string $job The name of the job
* @param mixed $args The job's argument list
*
* @return void
*/
protected function start($handle, $job, $args)
{
if (count($this->callback[self::JOB_START]) == 0) {
return; // No callbacks to run
}
foreach ($this->callback[self::JOB_START] as $callback) {
call_user_func($callback, $handle, $job, $args);
}
}
/**
* Run the complete callbacks
*
* @param string $handle The job's Gearman handle
* @param string $job The name of the job
* @param array $result The job's returned result
*
* @return void
*/
protected function complete($handle, $job, array $result)
{
if (count($this->callback[self::JOB_COMPLETE]) == 0) {
return; // No callbacks to run
}
foreach ($this->callback[self::JOB_COMPLETE] as $callback) {
call_user_func($callback, $handle, $job, $result);
}
}
/**
* Run the fail callbacks
*
* @param string $handle The job's Gearman handle
* @param string $job The name of the job
* @param object $error The exception thrown
*
* @return void
*/
protected function fail($handle, $job, \Exception $error)
{
if (count($this->callback[self::JOB_FAIL]) == 0) {
return; // No callbacks to run
}
foreach ($this->callback[self::JOB_FAIL] as $callback) {
call_user_func($callback, $handle, $job, $error);
}
}
/**
* Stop working
*
* @return void
*/
public function endWork()
{
foreach ($this->conn as $conn) {
Connection::close($conn);
}
}
/**
* Destructor
*
* @return void
* @see ShSo\Net\Gearman\Worker::stop()
*/
public function __destruct()
{
$this->endWork();
}
/**
* Should we stop work?
*
* @return boolean
*/
public function stopWork()
{
return false;
}
}