Location: PHPKode > projects > phc > test/framework/lib/async_test.php
<?php
/*
 * phc -- the open source PHP compiler
 * See doc/license/README.license for licensing information
 *
 * Allows a test to run programs asynchronously
 */

// Allow the number of processes to be specified locally

require_once ("test.php");

if (isset($_ENV["PHC_NUM_PROCS"]))
	define ("PHC_NUM_PROCS", (int)($_ENV["PHC_NUM_PROCS"]));
else
	define ("PHC_NUM_PROCS", 1);

#function inst ($string)
#{
#	print "\n" . microtime () . " $string\n";
#}

class AsyncBundle
{
	function __construct ($object, $subject)
	{
		$this->object = $object;
		$this->subject = $subject;
		$this->state = 0;
	}

	function start ()
	{
		$this->object->start_new_program ($this);
	}

	function get_command ()
	{
		return $this->commands[$this->state];
	}

	function get_in ()
	{
		if (isset ($this->ins))
			return $this->ins[$this->state];

		return NULL;
	}

	function continuation ()
	{
#		inst ("continuing from state {$this->state} of {$this->subject}\n");

		$state = $this->state; // state is just an int
		$this->outs[$state] = $this->out; // copy
		$this->errs[$state] = $this->err; // copy
		$this->exits[$state] = $this->exit; // copy
		unset ($this->out);
		unset ($this->err);
		unset ($this->exit);
		$object = $this->object;

		// Handlers let you modify the stdout, stderr or the exit
		// code. They return FALSE for failure.
		if (isset ($this->out_handlers[$state]))
		{
			$handler = $this->out_handlers[$state];
			$result = $object->$handler ($this->outs[$state], $this);
			if ($result === false)
				return;
			$this->outs[$state] = $result;
		}

		if (isset ($this->err_handlers[$state]))
		{
			$handler = $this->err_handlers[$state];
			$result = $object->$handler ($this->errs[$state], $this);
			if ($result === false)
				return;
			$this->errs[$state] = $result;
		}

		if (isset ($this->exit_handlers[$state]))
		{
			$handler = $this->exit_handlers[$state];
			$result = $object->$handler ($this->exits[$state], $this);
			if ($result === false)
				return;
			$this->exits[$state] = $result;
		}

		// The callback doesnt modify, just calls a function given
		// stdout, stderr and exit_code.
		if (isset ($this->callbacks [$state]))
		{
			$callback = $this->callbacks [$state];
			$result = $object->$callback ($this->outs[$state], $this->errs[$state], $this->exits[$state], $this);
			if ($result === false)
				return;
		}


		$this->state += 1;
		$state = $this->state;

		if (isset ($this->commands[$state]))
		{
#			inst ("Start next program: {$this->commands[$state]}");
			$object->start_program ($this);
		}
		elseif (isset ($this->final))
		{
#			inst ("Finalize");
			$object->{$this->final} ($this);
		}
		else
		{
			die ("uhoh\n");
		}
	}

	function read_streams ()
	{
		$this->out .= stream_get_contents ($this->pipes[1]);
		$this->err .= stream_get_contents ($this->pipes[2]);
	}

}

/* In order to take advantage of multiple cores, and to avoid
 * wasting a lot of time waiting for sequential programs to
 * run, we instead try to run programs in parallel.  Once it
 * has started the program, it will check if any programs
 * have finished execution, and if so, it will continue the
 * next part of their test.
 *
 * This also adds a nice abstraction level to the test suite.
 * Nearly all the tests can be described using a very simple
 * data description language, so this provides it.
 */
abstract class AsyncTest extends Test
{
	function __construct ()
	{
		$this->waiting_procs = array ();
		$this->running_procs = array ();
		parent::__construct ();
	}

	function async_failure ($reason, $bundle)
	{
		$this->mark_failure (
					$bundle->subject, 
					$bundle->commands,
					$bundle->outs,
					$bundle->errs,
					$bundle->exits,
					$reason);
	}

	function async_timeout ($reason, $bundle)
	{
		$this->mark_timeout (
					$bundle->subject, 
					$bundle->commands,
					$bundle->outs,
					$bundle->errs,
					$bundle->exits);
	}

	function async_success ($bundle)
	{
		$this->mark_success (
					$bundle->subject, 
					$bundle->commands,
					$bundle->outs,
					$bundle->errs,
					$bundle->exits);
	}

	function async_skipped ($bundle)
	{
		$this->mark_skipped (
					$bundle->subject, 
					$bundle->commands,
					$bundle->outs,
					$bundle->errs,
					$bundle->exits);
	}

	function fail_on_output (&$stream, $bundle)
	{
		if ($stream === 0 or $stream === "")
			return $stream;

		$this->async_failure ("exit or err not clear", $bundle);
		return false;
	}

	# Put this program to the front of the queue, and start waiting progs
	function start_program ($bundle)
	{
		array_unshift ($this->waiting_procs, $bundle);

		$this->check_capacity ();
	}

	# Put this program to the back of the queue, and start waiting progs
	function start_new_program ($bundle)
	{
		$this->waiting_procs[] = $bundle;

		$this->check_capacity ();
	}




	function finish_test ()
	{
		while (count ($this->running_procs) or count ($this->waiting_procs))
		{
			usleep (30000);
			$this->run_waiting_procs ();
			$this->check_running_procs ();
		}
		parent::finish_test ();
	}

	function check_capacity ()
	{
		if (count ($this->running_procs) 
				>= ($this->get_num_procs () / $this->get_phc_num_procs_divisor()))
		{
			// check each and see if we can remove some
			$this->check_running_procs ();
		}

		if (count ($this->waiting_procs) > 1)
		{
			// start some programs
			$this->run_waiting_procs ();
		}
		else
			; // return and carry on, adding programs to the queue

	}

	function check_running_procs ()
	{
		// try to keep this really lightweight
#		inst ("Check running");

		foreach ($this->running_procs as $index => $bundle)
		{
			$status = proc_get_status ($bundle->handle);

			// read the streams in case they block
			$bundle->read_streams ();

			if ($status["running"] !== true)
			{
				$bundle->exit = $status["exitcode"];
				unset ($this->running_procs [$index]); // remove from the running list
				$bundle->continuation (); // start the next bit straight away
			}
			else if ($this->max_time != 0
				&& time () > $bundle->start_time + $this->max_time)
			{
				kill_properly ($bundle->handle, $bundle->pipes);

				$bundle->exits[] = "Timeout";
				$bundle->out .= "\n--- TIMEOUT ---";
				$bundle->outs[] = $bundle->out;
				$bundle->errs[] = $bundle->err;
				$this->async_timeout ("Timeout", $bundle);

				unset ($this->running_procs [$index]); // remove from the running list
			}
			else
			{
				// let it keep running
			}
		}
	}

	function run_waiting_procs ()
	{
		while (count ($this->running_procs) 
				< ($this->get_num_procs () / $this->get_phc_num_procs_divisor()))
		{
#			inst ("Poll waiting");
			if (count ($this->waiting_procs) == 0)
			{
#				inst ("No procs waiting");
				break;
			}

			$bundle = array_shift ($this->waiting_procs);
			$this->run_program ($bundle);
		}
	}

	function run_program ($bundle)
	{
		$command = $bundle->get_command ();
		$command = "ulimit -v 262144 && $command";
		global $opt_verbose;
		if ($opt_verbose)
			print "Running command: $command\n";

#		inst ("Running prog: $command");

		// now start this process and add it to the list
		$descriptorspec = array(1 => array("pipe", "w"),
										2 => array("pipe", "w"));

		$in = $bundle-> get_in ();
		if ($in)
			$descriptorspec[0] = array ("pipe", "r");
		$pipes = array();
		$handle = proc_open ($command, $descriptorspec, &$pipes);
		stream_set_blocking ($pipes[1], 0);
		stream_set_blocking ($pipes[2], 0);
		if ($in)
		{
			fwrite ($pipes[0], $in);
			fclose ($pipes[0]);
		}


		$bundle->handle = $handle;
		$bundle->pipes = $pipes;
		$bundle->start_time = time ();
		$bundle->out = "";
		$bundle->err = "";
		unset ($bundle->exit);

		$this->running_procs[] = $bundle;
	}

	// We use this a lot.
	function two_command_finish ($async)
	{
		if ($async->outs[0] !== $async->outs[1]
			or $async->errs[0] !== $async->errs[1]
			or $async->exits[0] !== $async->exits[1])
		{
			$output = diff ($async->outs[0], $async->outs[1]);
			$async->outs = $output;
			$this->async_failure ("Outputs dont match", $async);
		}
		else
		{
			$this->async_success ($async);
		}
	}

	/* Some tests are very CPU intensive. Overriding this will reduce the
	 * number of tests run at once. */
	function get_phc_num_procs_divisor ()
	{
		return 1;
	}

	function get_num_procs ()
	{
		return PHC_NUM_PROCS;
	}
}

?>
Return current item: phc