Location: PHPKode > projects > StreamOnTheFly > node/code/classes/sotf_Neighbour.class.php
<?php // -*- tab-width: 3; indent-tabs-mode: 1; -*-

/* 
 * $Id: sotf_Neighbour.class.php,v 1.37 2005/01/18 16:43:01 micsik Exp $
 *
 * Created for the StreamOnTheFly project (IST-2001-32226)
 * Authors: András Micsik, Máté Pataki, Tamás Déri 
 *				at MTA SZTAKI DSD, http://dsd.sztaki.hu
 */

require_once($config['classdir'] . "/rpc_Utils.class.php");

class sotf_Neighbour extends sotf_Object {

  var $tablename = 'sotf_neighbours';

  function sotf_Neighbour($id='', $data='') {
	 $this->sotf_Object('sotf_neighbours', $id, $data);
  }

  function delete() {
	 global $db;
	 $db->begin();
	 sotf_NodeObject::nodeLeavingNetwork($this->get('node_id'));
	 parent::delete();
	 $db->commit();
  }

	/** 
	 * @method static getById
	 */
	function getById($nodeId) {
		global $db;
		$data = $db->getRow("SELECT * FROM sotf_neighbours WHERE node_id = '$nodeId'");
	 if(!$data) {
			//logError('no such neighbour: $nodeId');
		return null;
	 }
		return new sotf_Neighbour($data['id'], $data);
	}

	/** 
	 * @method static isNeighbour
	 */
	function isNeighbour($nodeId) {
		global $db;
		return $db->getOne("SELECT count(*) FROM sotf_neighbours WHERE node_id = '$nodeId'");
	}

  /** returns a list of all such objects: can be slow!!
	* @method static listAll
	*/
  function listAll() {
	 global $db;
	 $sql = "SELECT * FROM sotf_neighbours ORDER BY id";
	 $res = $db->getAll($sql);
	 if(DB::isError($res))
		raiseError($res);
	 $slist = array();
	 foreach($res as $st) {
		$slist[] = new sotf_Neighbour($st['id'], $st);
	 }
	 return $slist;
  }

  /** static */
  function listIds() {
	 global $db;
	 static $list;
	 if(!$list)
		$list = $db->getCol("SELECT node_id FROM sotf_neighbours ORDER BY node_id");
	 return $list;
  }

  function getNode() {
	 return sotf_Node::getNodeById($this->get('node_id'));
  }

  function getUrl() {
	 $remoteNode = sotf_Node::getNodeById($this->get('node_id'));
	 if($remoteNode)
		return $remoteNode->get('url');
	 return $this->get('pending_url');
  }

  /** private */
  function log($console, $msg) {
	 global $page;
	 debug($msg);
	 if($console)
		$page->addStatusMsg($msg);
  }

  var $objectsPerRPCRequest = 20;

  function sync($console = false) {
	 global $db, $page, $config;

	 $remoteId = $this->get('node_id');
	 if(!$console && $this->get('use_for_outgoing') != 't') {
		debug("node $remoteId is not used for outgoing sync");
		return;
	 }
	 debug("SYNCing with ", $this->get("node_id"));

	 $rpc = new rpc_Utils;
	 if($config['debug'])
		$rpc->debug = true;
	 $timestamp = $db->getTimestampTz();
	 $url = $this->getUrl();
	 // remove trailing '/'
	 while(substr($url, -1) == '/')
		$url = substr($url, 0, -1);
	 // collect local data to send
	 $localNode = sotf_Node::getLocalNode();
	 // check if url is correct...
	 if($localNode->get('url') != $config['rootUrl']) {
		$localNode->set('url', $config['rootUrl']);
		$localNode->update();
	 }
	 //debug("localNode", $localNode);
	 //debug("neighbour", $this);
	 $localNodeData = $localNode->getAll();
	 // calculate chunking
	 $thisChunk = 1;
	 // do XML-RPC conversation
	 $objectsSent = 0;
	 $more = sotf_NodeObject::countModifiedObjects($remoteId);
	 if(!$more)
		debug("No new objects to send");
	 while($more) {
		$db->begin(true);
		$modifiedObjects = sotf_NodeObject::getModifiedObjects($remoteId, $this->objectsPerRPCRequest);
		$remaining = sotf_NodeObject::countModifiedObjects($remoteId);
		if(count($modifiedObjects)==0 && $remaining > 0) {
		  logError("DATA integrity problem", "$remaining objects remained in sotf_object_status after sync");
		}
		if($remaining==0 || count($modifiedObjects)==0)
		  $more = false;
		else
		  $more = true;
		$chunkInfo = array('this_chunk' => $thisChunk,
								 'node' => $localNodeData,
								 'objects_remaining' => $remaining
								 );
		debug("chunk info", $chunkInfo);
		debug("number of sent objects", count($modifiedObjects));
		$objectsSent = $objectsSent + count($modifiedObjects);
		$objs = array($chunkInfo, $modifiedObjects);
		$response = $rpc->call($url . "/xmlrpcServer.php/sync/$thisChunk", 'sotf.sync', $objs);
		// error handling
		if(is_null($response)) {
		  $this->set('errors', $this->get('errors')+1);
		  $this->update();
		  $db->rollback();
		  return;
		}
		$db->commit();
		// save received data
		$replyInfo = $response[0];
		debug("replyInfo", $replyInfo);
		$thisChunk++;
	 }

	 debug("total number of objects sent",$objectsSent );
	 //$this->log($console, "number of updated objects: " .count($updatedObjects));
	 
	 // save node and neighbour stats
	 $this->set('success', $this->get('success')+1);
	 $this->set('last_sync_out', $timestamp);
	 $localNode->set('last_sync_out', $timestamp);
	 // take out from pending nodes
	 if($this->get('pending_url')) {
		$remoteNode = sotf_Node::getNodeById($remoteId);
		// TODO: problem is that if this is first sync or one-way connection, then object fro remote node may not exist
		if($remoteNode) {
		  $this->set('pending_url','');
		}
		$localNode->set('neighbours', $this->getNeighbourString());
	 }
	 $this->update();
	 $localNode->update();
  }

  function syncResponse($chunkInfo, $objects) {
	 global $db;

	 $timestamp = $db->getTimestampTz();
	 $remoteId = $this->get('node_id');
	 // save modified objects
	 $db->begin(true);
	 $updatedObjects = sotf_NodeObject::saveModifiedObjects($objects, $remoteId);
	 // if db error: don't commit!
	 $db->commit();
	 debug("number of updated objects", $updatedObjects);
	 $replyInfo = array('received' => count($objects),
							  'updated' => $updatedObjects);

	 if($chunkInfo['objects_remaining'] == 0) {
		// last chunk,  save node and neighbour stats
		$node = sotf_Node::getLocalNode();
		$this->set('last_sync_in', $timestamp);
		$node->set('last_sync_in', $timestamp);
		// take out from pending nodes, update neighbour list
		if($this->get('pending_url')) {
		  $this->set('pending_url','');
		  $node->set('neighbours', $this->getNeighbourString());
		}
		$this->update();
		$node->update();
		//$replyInfo['node'] = $node->getAll();
	 }
	 return array($replyInfo);
  }

  function getNeighbourString() {
	 $neis = sotf_Neighbour::listAll();
	 $first = 1;
	 while(list(,$nei) = each($neis)) {
		if($first)
		  $first = 0;
		else
		  $retval .= ',';
		$retval .= $nei->get('node_id');
		if($nei->getBool('accept_incoming'))
		  $retval .= 'i';
		if($nei->getBool('use_for_outgoing'))
		  $retval .= 'o';
	 }
	 return $retval;
  }

}

?>
Return current item: StreamOnTheFly