<?php
class Analytics_SocketConsumer extends Analytics_QueueConsumer {
protected $type = "Socket";
private $socket_failed;
/**
* Creates a new socket consumer for dispatching async requests immediately
* @param string $secret
* @param array $options
* number "timeout" - the timeout for connecting
* function "error_handler" - function called back on errors.
* boolean "debug" - whether to use debug output, wait for response.
*/
public function __construct($secret, $options = array()) {
if (!isset($options["timeout"]))
$options["timeout"] = 0.5;
if (!isset($options["host"]))
$options["host"] = "api.segment.io";
parent::__construct($secret, $options);
}
/**
* Tracks a user action
* @param string $user_id user id string
* @param string $event name of the event
* @param array $properties properties associated with the event
* @param string $timestamp iso8601 of the timestamp
* @return boolean whether the track call succeeded
*/
public function track($user_id, $event, $properties, $context, $timestamp) {
$body = array(
"secret" => $this->secret,
"userId" => $user_id,
"event" => $event,
"properties" => $properties,
"timestamp" => $timestamp,
"context" => $context,
"action" => "track"
);
return $this->enqueue($body);
}
/**
* Tags traits about the user.
* @param string $user_id
* @param array $traits
* @param string $timestamp iso8601 of the timestamp
* @return boolean whether the track call succeeded
*/
public function identify($user_id, $traits, $context, $timestamp) {
$body = array(
"secret" => $this->secret,
"userId" => $user_id,
"traits" => $traits,
"context" => $context,
"timestamp" => $timestamp,
"action" => "identify"
);
return $this->enqueue($body);
}
public function flushBatch($batch) {
$socket = $this->createSocket();
if (!$socket)
return;
$payload = array("secret" => $this->secret,
"batch" => $batch );
$payload = json_encode($payload);
$body = $this->createBody($this->options["host"], $payload);
return $this->makeRequest($socket, $body);
}
private function createSocket() {
if ($this->socket_failed)
return false;
$protocol = $this->ssl() ? "ssl" : "tcp";
$host = $this->options["host"];
$port = $this->ssl() ? 443 : 80;
$timeout = $this->options["timeout"];
try {
# Open our socket to the API Server.
$socket = pfsockopen($protocol . "://" . $host, $port, $errno,
$errstr, $timeout);
# If we couldn't open the socket, handle the error.
if ($errno != 0) {
$this->handleError($errno, $errstr);
$this->socket_failed = true;
return false;
}
return $socket;
} catch (Exception $e) {
$this->handleError($e->getCode(), $e->getMessage());
$this->socket_failed = true;
return false;
}
}
/**
* Attempt to write the request to the socket, wait for response if debug
* mode is enabled.
* @param stream $socket the handle for the socket
* @param string $req request body
* @return boolean $success
*/
private function makeRequest($socket, $req, $retry = true) {
$bytes_written = 0;
$bytes_total = strlen($req);
$closed = false;
# Write the request
while (!$closed && $bytes_written < $bytes_total) {
$written = fwrite($socket, $req);
if (!$written) {
$closed = true;
} else {
$bytes_written += $written;
}
}
# If the socket has been closed, attempt to retry a single time.
if ($closed) {
fclose($socket);
if ($retry) {
$socket = $this->createSocket();
if ($socket) return $this->makeRequest($socket, $req, false);
}
return false;
}
$success = true;
if ($this->debug()) {
$res = $this->parseResponse(fread($socket, 2048));
if ($res["status"] != "200") {
$this->handleError($res["status"], $res["message"]);
$success = false;
}
}
return $success;
}
/**
* Create the body to send as the post request.
* @param string $host
* @param string $content
* @return string body
*/
private function createBody($host, $content) {
$req = "";
$req.= "POST /v1/import HTTP/1.1\r\n";
$req.= "Host: " . $host . "\r\n";
$req.= "Content-Type: application/json\r\n";
$req.= "Accept: application/json\r\n";
$req.= "Content-length: " . strlen($content) . "\r\n";
$req.= "\r\n";
$req.= $content;
return $req;
}
/**
* Parse our response from the server, check header and body.
* @param string $res
* @return array
* string $status HTTP code, e.g. "200"
* string $message JSON response from the api
*/
private function parseResponse($res) {
$contents = explode("\n", $res);
# Response comes back as HTTP/1.1 200 OK
# Final line contains HTTP response.
$status = explode(" ", $contents[0], 3);
$result = $contents[count($contents) - 1];
return array(
"status" => $status[1],
"message" => $result
);
}
}