<?php
/**
* Asyncronous Stream Loader Class
*
* @author Alexander Over <hide@address.com>
*/
abstract class Stream
{
/**
* @var int $timeout default stream timeout in sec.
*/
private $timeout = 5;
/**
* @var int $readbuffer default readbuffer size in bytes.
*/
private $readbuffer = 1024;
/**
* @var string $protocol default transport protocol
*/
protected $protocol = 'tcp';
/**
* @var int $port connection port
*/
protected $port;
/**
* @var bool $isSSL use tls or not
*/
protected $isSSL = false;
/**
* @var array $request empty array to hold the request data
*/
protected $request = array();
const ASYNC_CONNECT = STREAM_CLIENT_ASYNC_CONNECT;
const CONNECT = STREAM_CLIENT_CONNECT;
/**
* magic function to set protected class variables
*/
final public function __set($name, $value)
{
if (property_exists($this, $name))
{
$this->$name = $value;
}
else
{
return false;
}
}
/**
* Core request handling
* @param int $maxParallel limit the count of parallel requests. 0 for no limitation
*
* @return array $buffer
*/
protected function doRequest($maxParallel = 0)
{
$requestCount = count($this->request);
if ( $requestCount > 0 )
{
if($maxParallel == 0 || $maxParallel >= $requestCount)
{
$maxParallel = $requestCount;
}
$buffer = $info = array();
$progressCounter = 0;
for (;$progressCounter < $requestCount; $progressCounter += $maxParallel)
{
$requestQueue = array_slice($this->request, $progressCounter, $maxParallel, true);
// start adding the requests to the pool
foreach ($requestQueue as $key => $request)
{
$errno = $errstr = false;
$sockets[$key] = stream_socket_client($this->protocol.'://' . $request['path'], &$errno, &$errstr, $this->timeout, self::CONNECT|self::ASYNC_CONNECT);
if (!$sockets[$key])
{
unset($sockets[$key]);
continue;
}
else
{
stream_set_timeout($sockets[$key], $this->timeout);
if ($this->isSSL)
{
stream_socket_enable_crypto($sockets[$key], true, STREAM_CRYPTO_METHOD_SSLv23_CLIENT);
}
}
}
// we have opened connections, so do some stuff
while (count($sockets))
{
$read = $write = $sockets;
$e = null;
$n = stream_select($read, $write, $e, $this->timeout);
if($n < 1)
{
break;
}
// iterate and read from sockets ...
foreach ($read as $r)
{
$key = array_search($r, $sockets);
$info[$key] = socket_get_status($sockets[$key]);
$buffer[$key] = '';
while (!feof($sockets[$key]) && !$info[$key]['timed_out'])
{
$sRead = fgets($sockets[$key], $this->readbuffer);
if (strlen($sRead) == 0)
{
break;
}
$buffer[$key].= $sRead;
$info[$key] = socket_get_status($sockets[$key]);
}
// we have a custom callback, so do something other with result and return only the result of the custom method.
if (isset($requestQueue[$key]['cust']))
{
if (method_exists( $this, $requestQueue[$key]['cust']))
{
$result = call_user_func_array( array($this, $requestQueue[$key]['cust']),
array('config' => $requestQueue[$key],
'data' => $buffer[$key]));
unset($buffer[$key]);
$buffer[$key] = $result;
} else
{
throw new Exception('Fatal: Custom Callback: '.$requestQueue[$key]['cust'].' not found!');
}
}
// the socket is done, so we can close him.
fclose($sockets[$key]);
unset($sockets[$key]);
}
// iterate and write through active sockets ...
foreach ($write as $w)
{
$key = array_search($w, $sockets);
if (isset($writehistory[$key]))
{
continue;
}
if (isset($sockets[$key]) && is_resource($sockets[$key]))
{
fputs($sockets[$key], $requestQueue[$key]['data']);
}
$writehistory[$key] = true;
}
}
}
return $buffer;
} else
{
throw new Exception('nothing to do');
}
}
/**
* Clear the interal request handler
*/
public function clearRequest()
{
$this->request = array();
}
}