Location: PHPKode > scripts > Shard-Query > shard-query/shard_query.php
#!/usr/bin/php
<?php
/*
Copyright (c) <year>, <copyright holder>
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.
    * Redistributions in binary form must reproduce the above copyright
      notice, this list of conditions and the following disclaimer in the
      documentation and/or other materials provided with the distribution.
    * Neither the name of the <organization> nor the
      names of its contributors may be used to endorse or promote products
      derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

/*This script requires PEAR Net_Gearman */
/*It also requires Console_Getopt, but this should be installed by default with pear */
require_once 'Net/Gearman/Client.php';
require_once 'Console/Getopt.php';
$verbose=false;
$fetch_method = "store_resultset";

main();
exit;

function main() {
	global $tmp_shard;
	global $row_count;
	global $verbose;
	global $fetch_method;

	$row_count = 0;
	$params = get_commandline();

	if(in_array('help', array_keys($params))) {
		echo "Shard-Query usage:\n";
		echo "--help		This message\n";
		echo "--ini		Full path to the .ini file used for shard information.  Defaults to shard.ini in the working directory\n";
		echo "--verbose		Print verbose information about the work being done\n";
		echo "--file		Full path to the file to read queries from.  If not specified, then queries are read from stdin\n";
		echo "--user		username to use by default\n";
		echo "--password	The password to use by default\n";
		echo "--host		The host to use by default\n";
		echo "--db		The default DB for queries not uses schema.table syntax.  Also where temp table is written.\n";
		echo "--method=fetch|store	If --method=fetch then each worker will fetch the results and return them as an array.\n";
		echo "				If --method=store then each worker will insert results directly into one of the shards.\n";
		echo "                          The default method is store, and only up to max_allowed_packet bytes will be buffered in the worker.\n";
		echo "\n\nNOTE:\n";
		echo "Command line parameters will override any parameters specified in the [default] section of the INI file!\n";
		echo "Any parameters in a particular [shard] section override any default parameters.\n";
		echo "The host parameter doesn't really make any sense, but it is there for completeness\n";
		exit;
	}
	if(in_array('verbose', array_keys($params))) {
		$verbose = true;
	}
	$shards= get_shards();
	$tmp_shard = array_values($shards);
	/* Pick one of the shards to aggregate all of the matching values from each
           of the shards.  Note that all conditions are "pushed down" to the shards
           so the number of rows returned by each shard should hopefully be a managable
           amount.  The rows are stuffed into a temporary table.
	*/
	$tmp_shard = $tmp_shard[mt_rand(0, count($shards)-1)];
	if(empty($params['file'])) {
		$params['file'] = 'php://stdin';
	}
	if(!empty($params['method'])) {
		if($params['method'] == 'fetch') {
			$fetch_method = 'fetch_resultset';
		}
	}
	$fh = fopen($params['file'],'r');
	if(!$fh) {
		die('Could not open file: ' . $params['file'] . "\n");
	}
	$sql = "";
	while ($line = fgets($fh)) {
		if ($sql) $sql .= " ";
		$sql .= trim($line);
		if(feof($fh) || substr($sql,-1) == ";") process_sql($sql, $shards);
		$sql = "";
	}
}

function &get_commandline() {
	$cg = new Console_Getopt();
	$args = $cg->readPHPArgv();
	array_shift($args);

	$shortOpts = 'h::v::';
	$longOpts  = array('user=', 'password=', 'host=', 'db=', 'help==','verbose==', 'method=');

	$params = $cg->getopt2($args, $shortOpts, $longOpts);
	if (PEAR::isError($params)) {
	    echo 'Error: ' . $params->getMessage() . "\n";
	    exit(1);
	}
	$new_params = array();
	foreach ($params[0] as $param) {
		$param[0] = str_replace('--','', $param[0]);
		$new_params[$param[0]] = $param[1];
	}
	unset($params);
	return $new_params;
}

function get_shards() {
	$shards = array();
	$params = get_commandline();

	if(!empty($params['ini'])) {
		$filename = $params['ini'];
	} else { 
		$filename = 'shards.ini';
	}
	if(!file_exists($filename)) {
		die("Could not find settings file: $filename\n");
	}
	$shards = @parse_ini_file($filename, true);

	if( !$shards || (empty($shards['default']) && count($shards) < 2) || (!empty($shards['default']) && count($shards) < 3) ) {
		echo "Found less than two shards in the shards.ini file, or shards.ini not found.\n";
		print_r($shards);
		exit;
	}

	if(!empty($shards['default'])) {
		$defaults = $shards['default'];
		unset($shards['default']);
	} else {
		$defaults = array('host' => '127.0.0.1', 'user'=> 'root', 'db' => 'test','password' => '');
	}

	#override any default values with the ones from the command line
	$params = get_commandline();
	
	foreach($defaults as $key => $val) {
		if(!empty($params[$key])) $defaults[$key] = $params[$key];
	}
	
	$shard_keys = array_keys($shards);
	
	#populate missing values with the defaults, in case any INI sections are missing things like 'db'
	for($i=0;$i<count($shard_keys);++$i) {
		foreach($defaults as $key => $val) {
			if (empty($shards[$shard_keys[$i]][$key]) || $shards[$shard_keys[$i]][$key] == '') {
				$shards[$shard_keys[$i]][$key] = $val;
			}
		}
	}
	return $shards;
}

#this function splits up a SQL statement into easy to "parse"
#tokens for the SQL processor
function split_sql($sql) {
		$regex=<<<EOREGEX
/([A-Za-z0-9_]+)
|(\(.*?\))   # Match FUNCTION(...) OR BAREWORDS
|("[^"](?:|\"|"")*?"+)
|('[^'](?:|\'|'')*?'+)
|(`(?:[^`]|``)*`+)
|([^ ,]+)
/x
EOREGEX
;
	$tokens = preg_split($regex, $sql,-1, PREG_SPLIT_NO_EMPTY | PREG_SPLIT_DELIM_CAPTURE);
	return $tokens;	

}

#step 1, rewrite query without aggregation
#step 2, send rewritten query to each shard using gearman.  wait for results via callback
#step 3, after all callbacks complete, run another rewritten version (the magic_sql variable) over all the rows which were collected
#  additional info:
#  one of the shards was picked as the tmp_shard.  This shard is the one that contains the temporary table that will be accessed by the magic sql
#  
function process_sql($sql, $shards) {
	global $table_name;
	global $shard_result_count;
        global $tmp_shard;
	global $cols;
	global $row_count;
	global $verbose;
	global $fetch_method;
	global $shown_temp_table_create;
	$shown_temp_table_create = false;

	$conn = false;
	$tokens = split_sql($sql);
	
	$shard_sql = ""; #identical SQL which will be broadcast to all shards
	$prev_token=""; #Only set for FROM, GROUP and ORDER and WHERE tokens
		        #the clauses after those statements get special treatment
			#depending on if the SQL is going to every shard, or is going to be used for magic sql
	$exprCount=0;
	$magic_sql = ""; #the magic SQL variable
	$uniqueExpressions = array(); #we might have sum(c1), count(c1).  only push down (c1) once.

	$skip_next=false;
	foreach($tokens as $token) {
		$magic_token = ""; # the token as it will be sent to tmp_shard
		$new_token="";  # the token as it will be sent to each shard
		
		if($skip_next) {
			if(trim($token) == "") {
				continue;
			}
			
			$new_token = "";
			$skip_next = false;
		}

		#Handle supported SQL syntax
		switch(trim(strtoupper($token))) {

			case 'GROUP':
				$prev_token = 'GROUP';
				$new_token = " ";
				break;
			case 'ORDER':
				$prev_token='ORDER';
				$new_token = $token;
				break;
					
			case 'BY':
				if($prev_token != 'ORDER') { 
					$skip_next = true;
					$new_token = " ";	
				} else {
					$new_token = $token;
				}
				break;
			case '(*)':
				$token = "(0)";
				$new_token = "1";									
				break;
				
			case 'SUM':
			case 'AVG':
			case 'COUNT':
			case 'MIN':
			case 'MAX':
			case 'GROUP_CONCAT':
			case 'STDDEV':
			case 'BIT_AND':
			case 'BIT_OR':
			case 'BIT_XOR':
			case 'STD':
			case 'STDDEV_SAMP':
			case 'STDDEV_POP':
			case 'VARIANCE':
			case 'VAR_SAMP':
			case 'VAR_POP':		
				$new_token = "";	
				break;
				

			case 'HAVING':
				$new_token = $token;
				$prev_token = "HAVING";
				break;
							
			case 'FROM':	
				$in_expression = false;
				$new_token = $token;
				$prev_token="FROM"; 
				break;
							
			case 'WHERE':	
				$new_token = $token;
				$prev_token="WHERE"; 
				break;

			case 'AS':
				$in_expression = false;
				$skip_next = true;
				break;

			#handle operators
			case 'AND':
			case '&&':
			case 'BETWEEN':
			case 'AND':
			case 'BINARY':
			case '&':
			case '~':
			case '|':
			case '^':
			case 'CASE':
			case 'WHEN':
			case 'DIV':
			case '/':
			case '<=>':
			case '=':
			case '>=':
			case '>':
			case 'IS':
			case 'NOT':
			case 'NULL':
			case '<<':
			case '<=':
			case '<':
			case 'LIKE':
			case '-':
			case '%':
			case '!=':
			case '<>':
			case 'REGEXP':
			case '!':
			case '||':
			case 'OR':
			case '+':
			case '>>':
			case 'RLIKE':
			case 'SOUNDS':
			case '*':
			case '-':
			case 'XOR':
				$new_token = $token;
				$in_expression = true;
				break;
			case '':
			case 'SELECT':	
			case 'LIMIT':
			case 'ORDER':
			case ',':
			case ';':
				$in_expression = false;
				$new_token = $token;
				break;
			
			default;
				if(trim($token)) {
					$magic_token = '`' . str_replace('`', '``', $token) . '`';	
					if(empty($uniqueExpressions[$token])) {
						
						if($prev_token == "") { 
							$new_token = $token . " as $magic_token";
						} else {
							$new_token = $token;
						}
						$uniqueExpressions[$token] = 1;
					} else {
						if($prev_token == '') {
							$new_token = "0";
						} elseif($prev_token != 'GROUP') {
							$new_token = $token;
						}
					}
					
					++$exprCount;
				}
				
				break;	
		}
		if($prev_token != 'HAVING') {
			$shard_sql .= " " . $new_token;
		}
	
		if($prev_token == 'WHERE') {
			$magic_token = " ";
		}
	
		if(strtoupper(trim($token)) == 'FROM') {
			$table_name = "aggregation_tmp_#" . mt_rand(1, 100000000);
			$magic_token = "FROM `$table_name` ";
		} else {
			if($prev_token == 'FROM') {
				$magic_token = "";
				$token = "";
			}
		}
		
		if($magic_token) {
			if(!$prev_token) {
				$magic_sql .= "($magic_token)";
			} else {
				$magic_sql .= " " . $magic_token;
			} 
		} else {
			$magic_sql .= $token;
		}
	}
	if($verbose) {
		echo "-- INPUT SQL: $sql\n";
		echo "-- SQL TO SEND TO SHARDS: $shard_sql\n";
		echo "-- AGGREGATION SQL: $magic_sql\n";
	}

	#run Gearman jobs to collect the resultsets from each shard.
	#the resultset comes back as an array which is sent to the
        #collect_resultset callback function
	$rows = array();
	$shard_num = 0;
	$set = new Net_Gearman_Set();
  	foreach($shards as $shard) {
	
		$job = new stdClass;
		$job->sql = $shard_sql;
		$job->shard = $shard;
		
    		$task = new Net_Gearman_Task($fetch_method, array('table_name' => $table_name, 'sql' => $shard_sql, 'shard'=>$shard,'tmp_shard'=>$tmp_shard, 'shard_num' => $shard_num));
		$task->attachCallback('collect_rowset',Net_Gearman_Task::TASK_COMPLETE);
		$task->attachCallback('handle_failure',Net_Gearman_Task::TASK_FAIL);
		$set->addTask($task);
		++$shard_num;
	}
	$client = new Net_Gearman_Client(array('localhost:7000'));
	global $shown_temp_table_create;
	$show_temp_table_create=false;
	$client->runSet($set);


	#wait for all workers to complete their work
	if($verbose) {
		echo "-- Waiting for workers to complete their jobs\n";
	}
	while($shard_result_count<count($shards)-1) {
		usleep(100000);
	}

	$conn = @mysql_connect($tmp_shard['host'], $tmp_shard['user'], $tmp_shard['password']);
	if(!$conn) {
		die(mysql_error() . "\n");
	}
	if(!mysql_select_db($tmp_shard['db'], $conn)) {
		die(mysql_error($conn));
	} 

	#SEND THE FINAL "magic" SQL
	$stmt = mysql_query($magic_sql, $conn);
	if(!$stmt) die(mysql_error() . "\n");
	$sent_header=false;
	while($row = mysql_fetch_assoc($stmt)) {
		if(!$sent_header) {
			$col_str = join("\t", array_keys($row));
			echo $col_str . "\n";
			echo preg_replace("/[^\t]+/", '-', $col_str) . "\n";
			$sent_header=true;
		}
		echo join("\t", $row) . "\n";
	}
	$sql = "DROP TABLE IF EXISTS {$table_name};";
	mysql_query($sql, $conn) or die(mysql_error() . "\n");
	
}

function handle_failure($func) {
	global $verbose;
	if($verbose) {
		print_r($func);
	}
	die("ERROR ENCOUNTERED IN A WORKER.   ABORTED.\n");
}


function collect_rowset($func, $handle, &$result) {

	global $shard_result_count;
	global $fetch_method;

	++$shard_result_count;
	if ($fetch_method == 'fetch_resultset')  {
		global $verbose;
		global $tmp_shard;
		global $table_name;
		global $cols;
		global $row_count;	
		global $shown_temp_table_create;
		if($result['err'] != NULL) {
			die('ERROR: ' . $result['err'] . "\n");
		}
		$sql = "";
		$conn = @mysql_connect($tmp_shard['host'], $tmp_shard['user'], $tmp_shard['password']);
	
		if(!$conn) die(mysql_error() . "\n");
		if(!mysql_select_db($tmp_shard['db'])) die(mysql_error($conn) . "\n");
	
		$result =(object)$result;
	
		if( count($result->rows) > 0 ) {
			$sql = "CREATE TEMPORARY TABLE IF NOT EXISTS `{$table_name}` (";
			$col_sql = "";
			$cols = array_keys($result->rows[0]);
			for($i = 0; $i<count($cols);++$i) {
				
				if($cols[$i] == "0") continue;
				if($col_sql) $col_sql .= ", ";
				$col_sql .= "`{$cols[$i]}` BINARY(255)";
			}
			$sql .= $col_sql . ") ENGINE=INNODB";
		}
		if($sql) {
			if($verbose && !$shown_temp_table_create) {
				echo "-- TEMPORARY_TABLE: $sql\n";
				$shown_temp_table_create = true;
			}
			mysql_query($sql, $conn) or die(mysql_error($conn));
			foreach($result->rows as $row) {
			
				$vals_sql = "";
				for($i = 0; $i<count($cols);++$i) {
					if($cols[$i] == "0") continue;
					if($vals_sql) $vals_sql .= ', ';
					$vals_sql .= "'{$row[$cols[$i]]}'";
				}
				++$row_count;
				$sql = "INSERT INTO `{$table_name}` VALUES ({$vals_sql})";
				mysql_query($sql, $conn) or die(mysql_error($conn) . "\n");
			}
		}
	}
}


Return current item: Shard-Query