<?php
/**
* Interface for Danga's Gearman job scheduling system
*
* PHP version 5.1.0+
*
* LICENSE: This source file is subject to the New BSD license that is
* available through the world-wide-web at the following URI:
* http://www.opensource.org/licenses/bsd-license.php. If you did not receive
* a copy of the New BSD License and are unable to obtain it through the web,
* please send a note to license@php.net so we can mail you a copy immediately.
*
* @category Net
* @package ShSo\Net\Gearman
* @author Joe Stump <joe@joestump.net>
* @copyright 2007-2008 Digg.com, Inc.
* @license http://www.opensource.org/licenses/bsd-license.php New BSD License
* @version CVS: $Id$
* @link http://pear.php.net/package/Net_Gearman
* @link http://www.danga.com/gearman/
*/
namespace ShSo\Net\Gearman;
/**
* The base connection class
*
* @category Net
* @package ShSo\Net\Gearman
* @author Joe Stump <joe@joestump.net>
* @copyright 2007-2008 Digg.com, Inc.
* @license http://www.opensource.org/licenses/bsd-license.php New BSD License
* @version Release: @package_version@
* @link http://www.danga.com/gearman/
*/
class Connection
{
/**
* A list of valid Gearman commands
*
* This is a list of valid Gearman commands (the key of the array), their
* integery type (first key in second array) used in the binary header, and
* the arguments / order of arguments to send/receive.
*
* @var array $commands
* @see ShSo\Net\Gearman\Connection::$magic
* @see ShSo\Net\Gearman\Connection::connect()
*/
static protected $commands = array(
'can_do' => array(1, array('func')),
'can_do_timeout' => array(23, array('func', 'timeout')),
'cant_do' => array(2, array('func')),
'reset_abilities' => array(3, array()),
'set_client_id' => array(22, array('client_id')),
'pre_sleep' => array(4, array()),
'noop' => array(6, array()),
'submit_job' => array(7, array('func', 'uniq', 'arg')),
'submit_job_high' => array(21, array('func', 'uniq', 'arg')),
'submit_job_bg' => array(18, array('func', 'uniq', 'arg')),
'submit_job_high_bg' => array(32, array('func', 'uniq', 'arg')),
'submit_job_low' => array(33, array('func', 'uniq', 'arg')),
'submit_job_low_bg' => array(34, array('func', 'uniq', 'arg')),
'job_created' => array(8, array('handle')),
'grab_job' => array(9, array()),
'no_job' => array(10, array()),
'job_assign' => array(11, array('handle', 'func', 'arg')),
'work_status' => array(12, array('handle', 'numerator', 'denominator')),
'work_complete' => array(13, array('handle', 'result')),
'work_fail' => array(14, array('handle')),
'get_status' => array(15, array('handle')),
'status_res' => array(20, array('handle', 'known', 'running', 'numerator', 'denominator')),
'echo_req' => array(16, array('text')),
'echo_res' => array(17, array('text')),
'error' => array(19, array('err_code', 'err_text')),
'all_yours' => array(24, array())
);
/**
* The reverse of ShSo\Net\Gearman\Connection::$commands
*
* This is the same as the ShSo\Net\Gearman\Connection::$commands array only
* it's keyed by the magic (integer value) value of the command.
*
* @var array $magic
* @see ShSo\Net\Gearman\Connection::$commands
* @see ShSo\Net\Gearman\Connection::connect()
*/
static protected $magic = array();
/**
* Tasks waiting for a handle
*
* Tasks are popped onto this queue as they're submitted so that they can
* later be popped off of the queue once a handle has been assigned via
* the job_created command.
*
* @access public
* @var array $waiting
* @static
*/
static public $waiting = array();
/**
* Is PHP's multibyte overload turned on?
*
* @var integer $multiByteSupport
*/
static protected $multiByteSupport = null;
/**
* Constructor
*
* @return void
*/
final private function __construct()
{
// Don't allow this class to be instantiated
}
/**
* Connect to Gearman
*
* Opens the socket to the Gearman Job server. It throws an exception if
* a socket error occurs. Also populates ShSo\Net\Gearman\Connection::$magic.
*
* @param string $host e.g. 127.0.0.1 or 127.0.0.1:7003
* @param int $timeout Timeout in milliseconds
*
* @return resource A connection to a Gearman server
* @throws ShSo\Net\Gearman\Exception when it can't connect to server
* @see ShSo\Net\Gearman\Connection::$waiting
* @see ShSo\Net\Gearman\Connection::$magic
* @see ShSo\Net\Gearman\Connection::$commands
*/
static public function connect($host = 'localhost', $timeout = 2000)
{
if (!count(self::$magic)) {
foreach (self::$commands as $cmd => $i) {
self::$magic[$i[0]] = array($cmd, $i[1]);
}
}
$err = '';
$errno = 0;
$port = 4730;
if (strpos($host, ':')) {
list($host, $port) = explode(':', $host);
}
$start = microtime(true);
do {
$socket = \socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$socket_connected = @socket_connect($socket, $host, $port);
if ($socket_connected) {
socket_set_nonblock($socket);
socket_set_option($socket, SOL_TCP, 1, 1);
$timeLeft = ((microtime(true) - $start) * 1000);
}
} while (!$socket_connected && ($timeLeft ?? 0)< $timeout);
if (!$socket_connected) {
$errno = socket_last_error($socket);
$errstr = socket_strerror($errno);
throw new Exception(
"Can't connect to server ($errno: $errstr)"
);
}
self::$waiting[(int)$socket] = array();
return $socket;
}
/**
* Send a command to Gearman
*
* This is the command that takes the string version of the command you
* wish to run (e.g. 'can_do', 'grab_job', etc.) along with an array of
* parameters (in key value pairings) and packs it all up to send across
* the socket.
*
* @param resource $socket The socket to send the command to
* @param string $command Command to send (e.g. 'can_do')
* @param array $params Params to send
*
* @see ShSo\Net\Gearman\Connection::$commands, ShSo\Net\Gearman\Connection::$socket
* @return boolean
* @throws ShSo\Net\Gearman\Exception on invalid command or unable to write
*/
static public function send($socket, $command, array $params = array())
{
if (!isset(self::$commands[$command])) {
throw new Exception('Invalid command: ' . $command);
}
$data = array();
foreach (self::$commands[$command][1] as $field) {
if (isset($params[$field])) {
$data[] = $params[$field];
}
}
$d = implode("\x00", $data);
$cmd = "\0REQ" . pack("NN",
self::$commands[$command][0],
self::stringLength($d)) . $d;
$cmdLength = self::stringLength($cmd);
$written = 0;
$error = false;
do {
$check = @socket_write($socket,
self::subString($cmd, $written, $cmdLength),
$cmdLength);
if ($check === false) {
if (socket_last_error($socket) == SOCKET_EAGAIN or
socket_last_error($socket) == SOCKET_EWOULDBLOCK or
socket_last_error($socket) == SOCKET_EINPROGRESS)
{
// skip this is okay
}
else
{
$error = true;
break;
}
}
$written += (int)$check;
} while ($written < $cmdLength);
if ($error === true) {
$errno = socket_last_error($socket);
$errstr = socket_strerror($errno);
throw new Exception(
"Could not write command to socket ($errno: $errstr)"
);
}
}
/**
* Read command from Gearman
*
* @param resource $socket The socket to read from
*
* @see ShSo\Net\Gearman\Connection::$magic
* @return array Result read back from Gearman
* @throws ShSo\Net\Gearman\Exception connection issues or invalid responses
*/
static public function read($socket)
{
$header = '';
do {
$buf = socket_read($socket, 12 - self::stringLength($header));
$header .= $buf;
} while ($buf !== false &&
$buf !== '' && self::stringLength($header) < 12);
if ($buf === '') {
throw new Exception("Connection was reset");
}
if (self::stringLength($header) == 0) {
return array();
}
$resp = @unpack('a4magic/Ntype/Nlen', $header);
if (!count($resp) == 3) {
throw new Exception('Received an invalid response');
}
if (!isset(self::$magic[$resp['type']])) {
throw new Exception(
'Invalid response magic returned: ' . $resp['type']
);
}
$return = array();
if ($resp['len'] > 0) {
$data = '';
while (self::stringLength($data) < $resp['len']) {
$data .= socket_read($socket, $resp['len'] - self::stringLength($data));
}
$d = explode("\x00", $data);
foreach (self::$magic[$resp['type']][1] as $i => $a) {
$return[$a] = $d[$i];
}
}
$function = self::$magic[$resp['type']][0];
if ($function == 'error') {
if (!self::stringLength($return['err_text'])) {
$return['err_text'] = 'Unknown error; see error code.';
}
throw new Exception(
$return['err_text'], $return['err_code']
);
}
return array('function' => self::$magic[$resp['type']][0],
'type' => $resp['type'],
'data' => $return);
}
/**
* Blocking socket read
*
* @param resource $socket The socket to read from
* @param float $timeout The timeout for the read
*
* @throws ShSo\Net\Gearman\Exception on timeouts
* @return array
*/
static public function blockingRead($socket, $timeout = 500)
{
static $cmds = array();
$tv_sec = floor(($timeout % 1000));
$tv_usec = ($timeout * 1000);
$start = microtime(true);
while (count($cmds) == 0) {
if (((microtime(true) - $start) * 1000) > $timeout) {
throw new Exception('Blocking read timed out');
}
$write = null;
$except = null;
$read = array($socket);
socket_select($read, $write, $except, $tv_sec, $tv_usec);
foreach ($read as $s) {
$cmds[] = Connection::read($s);
}
}
return array_shift($cmds);
}
/**
* Close the connection
*
* @param resource $socket The connection/socket to close
*
* @return void
*/
static public function close($socket)
{
if (is_resource($socket)) {
socket_close($socket);
}
}
/**
* Are we connected?
*
* @param resource $conn The connection/socket to check
*
* @return boolean False if we aren't connected
*/
static public function isConnected($conn)
{
return (is_null($conn) !== true &&
is_resource($conn) === true &&
strtolower(get_resource_type($conn)) == 'socket');
}
/**
* Determine if we should use mb_strlen or stock strlen
*
* @param string $value The string value to check
*
* @return integer Size of string
* @see ShSo\Net\Gearman\Connection::$multiByteSupport
*/
static public function stringLength($value)
{
if (is_null(self::$multiByteSupport)) {
self::$multiByteSupport = intval(ini_get('mbstring.func_overload'));
}
if (self::$multiByteSupport & 2) {
return mb_strlen($value, '8bit');
} else {
return strlen($value);
}
}
/**
* Multibyte substr() implementation
*
* @param string $str The string to substr()
* @param integer $start The first position used
* @param integer $length The maximum length of the returned string
*
* @return string Portion of $str specified by $start and $length
* @see ShSo\Net\Gearman\Connection::$multiByteSupport
* @link http://us3.php.net/mb_substr
* @link http://us3.php.net/substr
*/
static public function subString($str, $start, $length)
{
if (is_null(self::$multiByteSupport)) {
self::$multiByteSupport = intval(ini_get('mbstring.func_overload'));
}
if (self::$multiByteSupport & 2) {
return mb_substr($str, $start, $length, '8bit');
} else {
return substr($str, $start, $length);
}
}
}