Location: PHPKode > projects > StreamOnTheFly > node/code/classes/sotf_Node.class.php
<?php // -*- tab-width: 3; indent-tabs-mode: 1; -*-

/*
 * $Id: sotf_Node.class.php,v 1.19 2005/02/01 10:06:18 micsik Exp $
 *
 * Created for the StreamOnTheFly project (IST-2001-32226)
 * Authors: András Micsik, Máté Pataki, Tamás Déri
 *					at MTA SZTAKI DSD, http://dsd.sztaki.hu
 */

class sotf_Node extends sotf_NodeObject {

	var $tablename = 'sotf_nodes';

	function sotf_Node($id='', $data='') {
		$this->sotf_NodeObject($this->tablename, $id, $data);
	}

	/** 
	 * @method static getNodeById
	 */
	function getNodeById($nodeId) {
		global $db;
		$id = $db->getOne("SELECT id FROM sotf_nodes WHERE node_id = '$nodeId'");
		if(DB::isError($id))
			raiseError($id);
		if($id)
			return new sotf_Node($id);
		else
			return NULL;
	}

	/** 
	 * @method static getLocalNode
	 */
	function getLocalNode() {
		global $db, $config;
		return sotf_Node::getNodeById($config['nodeId']);
	}

	/** static */
	function redirectToHomeNode($obj, $script) {
	  global $page;

	  $url = sotf_Node::getHomeNodeRootUrl($obj);
	  $oldParams = substr(strstr(myGetenv("REQUEST_URI"), '.php'), 4);
	  $url = $url . "/$script" . $oldParams;
	  $page->redirect($url);
	  exit;
	}

	/** static */
	function getHomeNodeRootUrl($obj) {
	  if($obj->isLocal()) {
		 global $config;
		 return $config['rootUrl'];
	  } else {
		 $node = sotf_Node::getNodeById($obj->getNodeId());
		 if(!$node) {
			raiseError("Could not find home node for programme: " . $obj->id);
		 }
		 return $node->get('url');
	  }
	}

	/** returns a list of all such objects: can be slow!!
	 * @method static listAll
	 */
	function listAll() {
		global $db;
		$sql = "SELECT * FROM sotf_nodes ORDER BY name";
		$res = $db->getAll($sql);
		if(DB::isError($res))
			raiseError($res);
		$slist = array();
		foreach($res as $st) {
			$slist[] = new sotf_Node($st['id'], $st);
		}
		return $slist;
	}

	/** 
	 * @method static countAll
	 */
	function countAll() {
		global $db;
		return $db->getOne("SELECT count(*) FROM sotf_nodes");
	}

  /**************************************************
	*
	*					  MESSAGE FORWARD SUPPORT
	*
	**************************************************/

	var $objectsPerRPCRequest = 50;
	
	function forwardObjects() {
	  global $db, $config;
	  
	  global $page;
	  if(!$console && $this->getBool('use_for_outgoing')) {
		 debug("node $this->id is not used for outgoing sync");
		 return;
	  }
	  debug("FORWARDING TO ", $this->get("node_id"));

	  $rpc = new rpc_Utils;
	  if($config['debug'])
		 $rpc->debug = true;
	  $timestamp = $db->getTimestampTz();
	  $remoteId = $this->get('node_id');
	  $url = $this->get('url');
	  // remove trailing '/'
	  while(substr($url, -1) == '/')
		 $url = substr($url, 0, -1);
	  // calculate chunking
	  $thisChunk = 1;
	  // do XML-RPC conversation
	  $objectsSent = 0;
	  $more = sotf_NodeObject::countForwardObjects($remoteId);
	  if(!$more)
		 debug("No new objects to send");
	  while($more) {
		 $db->begin(true);
		 $modifiedObjects = sotf_NodeObject::getForwardObjects($remoteId, $this->objectsPerRPCRequest);
		 $more = sotf_NodeObject::countForwardObjects($remoteId);
		 $chunkInfo = array('this_chunk' => $thisChunk,
								  'from_node' => $config['nodeId'],
								  'objects_remaining' => $more
								 );
		 debug("chunk info", $chunkInfo);
		 debug("number of sent objects", count($modifiedObjects));
		 $objectsSent = $objectsSent + count($modifiedObjects);
		 $objs = array($chunkInfo, $modifiedObjects);
		 $response = $rpc->call($url . "/xmlrpcServer.php/forward/$thisChunk", 'sotf.forward', $objs);
		 // error handling
		 if(is_null($response)) {
			$db->rollback();
			return;
		 }
		 $db->commit();
		 $replyInfo = $response[0];
		 debug("replyInfo", $replyInfo);
		 $thisChunk++;
	  }

	  debug("total number of objects sent",$objectsSent );
	  //$this->log($console, "number of updated objects: " .count($updatedObjects));
	}

  function forwardResponse($chunkInfo, $objects) {
	 global $db;

	 $remoteId = $this->get('node_id');
	 // save modified objects
	 $db->begin(true);
	 $updatedObjects = sotf_NodeObject::saveForwardObjects($objects);
	 // if db error: don't commit!
	 $db->commit();
	 debug("number of updated objects", $updatedObjects);
	 $replyInfo = array('received' => count($objects),
							  'updated' => $updatedObjects);

	 if($chunkInfo['objects_remaining'] == 0) {
		// last chunk, do something useful!!
		sotf_Object::doUpdates();
	 }
	 return array($replyInfo);
  }

}

?>
Return current item: StreamOnTheFly