<?php
#THIS JOB INSERTS DATA DIRECTLY INTO the tmp_shard
#This job also uses an unbuffered query
require_once 'Net/Gearman/Job.php';
class Net_Gearman_Job_store_resultset extends Net_Gearman_Job_Common
{
public function run($arg) {
$stmt = false;
if(!$arg) return;
$arg = (object)$arg;
$arg->shard = (object)$arg->shard;
$arg->tmp_shard = (object)$arg->tmp_shard;
$conn = false;
$conn = @mysql_connect($arg->shard->host, $arg->shard->user, $arg->shard->password,true);
if(!$conn) throw new Net_Gearman_Job_Exception(mysql_error());
$tmp_conn = @mysql_connect($arg->shard->host, $arg->shard->user, $arg->shard->password,true);
if(!$tmp_conn) throw new Net_Gearman_Job_Exception(mysql_error());
if(!mysql_select_db($arg->shard->db, $conn)) throw new Net_Gearman_Job_Exception(mysql_error());
if(!mysql_select_db($arg->shard->db, $tmp_conn)) throw new Net_Gearman_Job_Exception(mysql_error());
$stmt = mysql_query("select @@max_allowed_packet", $tmp_conn);
if(!$stmt) throw new Net_Gearman_Job_Exception(mysql_error());
$row = mysql_fetch_array($stmt);
$max_len = $row[0] - 4096;
mysql_free_result($stmt);
#get the data from the shard using MYSQL_USE_RESULT
$stmt = mysql_unbuffered_query($arg->sql, $conn);
if(!$stmt) throw new Net_Gearman_Job_Exception(mysql_error());
$created_sql = false;
$sql = "INSERT INTO `{$arg->table_name}` VALUES ";
$values = "";
while($row = mysql_fetch_assoc($stmt)) {
if(!$created_sql) {
$created_sql = "CREATE TABLE IF NOT EXISTS `{$arg->table_name}` (";
$col_sql = "";
$cols = array_keys($row);
for($i = 0; $i<count($cols);++$i) {
if($cols[$i] == "0") continue;
if($col_sql) $col_sql .= ", ";
$col_sql .= "`{$cols[$i]}` BINARY(255)";
}
$created_sql .= $col_sql . ") ENGINE=INNODB";
#echo $created_sql . "\n";
if(!mysql_query($created_sql, $tmp_conn)) {
throw new Net_Gearman_Job_Exception(mysql_error());
}
}
if($values) $values .= ",";
$val_list = "";
foreach($row as $col => $val) {
#echo "COL: $col, VAL: $val\n";
if($col == "0") continue;
if($val_list) $val_list .= ',';
$val_list .= "'{$val}'";
}
$values .= "({$val_list})";
#we don't want to exceed max_packet_len
if(strlen($values) >= $max_len) {
#echo $sql . "$values\n";
if(!mysql_query($sql . $values, $tmp_conn)) {
throw new Net_Gearman_Job_Exception(mysql_error($tmp_conn));
}
$values = "";
}
}
#any rows left over?
if($values) {
#echo $sql . "$values\n";
if(!mysql_query($sql . $values, $tmp_conn)) {
throw new Net_Gearman_Job_Exception(mysql_error($tmp_conn));
}
}
mysql_free_result($stmt);
return(array('done' =>true));
}
}
?>