Location: PHPKode > scripts > MySQL Load Balancing PHP script > DB_Mgr.php
<?php

require_once('./error_report.php');

// $_SESSION['db_mgr'] = "1.0";

class db_manager
{
	private $db_masters;
	private $db_XXL_slaves;
	private $db_slaves;
	
	private $db_user;
	private $db_pass;
	private $db_default_schema;
	
	private $dbM;
	private $dbM_info;
	
	private $dbS;
	private $dbS_info;
	
	private $deep_replication_check;
	
	public $database;
	public $databaseType;
	public $debug;
	
	public function db_manager( $masters, $slaves, $user, $pass, $default_schema )
	{
		$this->db_masters = $masters;
		
		if( isset($slaves['XXL']) && is_array($slaves['XXL']) ) {
			$this->db_XXL_slaves = $slaves['XXL'];
		}
		unset($slaves['XXL']);
		
		$this->db_slaves  = $slaves;
		
		$this->db_user = $user;
		$this->db_pass = $pass;		
		$this->db_default_schema = $default_schema;
		$this->database = $default_schema;
		
		$this->databaseType = "mysql";
		
		$this->deep_replication_check = false;
		
		return;
	}
	
	public function info() 
	{
		print_r( $this->dbM_info );
		print_r( $this->dbS_info );
		return;
	}
	
	private function connect() 
	{
		$this->forceConnect( true, true );
		return;
	}
	
	public function forceConnect( $slave=true, $master=false )
	{
		if( $master ) $this->connectMaster();
		if( $slave  ) $this->connectSlave();
		return;
	}
	
	private function connectMaster() 
	{
		if( !$this->dbM ) {
			$dbServer = $this->db_masters;
			shuffle( $dbServer );
			
			foreach( $dbServer as $r ) {
				if( $dbM = @connect("mysql", $r, $this->db_user, $this->db_pass, $this->db_default_schema, "ADODB_FETCH_ASSOC", false) ) {
					$this->dbM = $dbM;
					$this->dbM->debug = ( $this->debug === true ? true : false );
					
						$NbMasterRows=0;
						if( $rs = $this->dbM->getAll( "SHOW TABLE STATUS FROM ".$this->db_default_schema ) ) foreach( $rs as $tb_stats ) {
							$NbMasterRows += $tb_stats['Rows'];
						}
					
						$this->dbM_info = array( "host"=>$r, "hide@address.com"=>$NbMasterRows );
						
					break;
				}
			}
			
			if( !$this->dbM ) {
				$this->close();
				die( "DBMGR: PROBLEME DE CONNEXION A LA BASE PRINCIPALE" );
			}
		}
	
		return;
	}
	
	public function getMasterTableCurrentID( $database, $table ) 
	{
			$this->needed( "w" );
			$rqst = "SHOW TABLE STATUS FROM ".$database ;
			$rqst = "# {$_SERVER['SCRIPT_NAME']} \n # {$_SERVER['REMOTE_ADDR']} \n # {$this->dbM_info['host']} \n # getMasterTableCurrentID \n " . $rqst ;
			if( $result = $this->dbM->getAll( $rqst ) ) {
				foreach( $result as $k => $v ) {
					if( $v['Name']==$table ) {
						$this->closeMaster();
						return max( $v['Rows'], $v['Auto_increment'] );
					}
				}
			}
			
			$this->closeMaster();
			return -1;
	}
	
	private function connectSlave() 
	{
		if( !$this->dbS ) {
			
			if( empty($this->dbS_info['host']) ) {
				$dbSlavesReplicas = $this->db_slaves;
				shuffle( $dbSlavesReplicas );
				
				if( count($this->db_XXL_slaves)>0 ) {
					$dbXXLReplicas = $this->db_XXL_slaves;
					shuffle( $dbXXLReplicas );
					$dbReplicas = array_merge( $dbXXLReplicas, $dbSlavesReplicas );
				} else {
					$dbReplicas = $dbSlavesReplicas;
				}

				foreach( $this->db_masters as $m ) {
					//$dbReplicas[] = $m;
				}
			} else {
				$dbReplicas = array( $this->dbS_info['host'] );
			}
			
			foreach( $dbReplicas as $r ) {
				if( $dbS = @connect("mysql", $r, $this->db_user, $this->db_pass, $this->db_default_schema, "ADODB_FETCH_ASSOC", false) ) {
					
					if( $this->dbS_info['host'] != $r ) {
						
						$Safety_Replication = false;
						$Safety_Nb_Processes = false;
						$Safety_Nb_Rows = false;
						
						$rs = $dbS->getAll( "SHOW SLAVE STATUS" );
						if( $rs && empty($rs['Last_Error']) && $rs['Seconds_Behind_Master']==0 ) {
							$Safety_Replication = true;
						
							$rs = $dbS->getAll( "SHOW PROCESSLIST" );
							$NbSlaveProcesses = ( count($rs)>0 ? count($rs) : 999 );
							$Safety_Nb_Processes = ( $NbSlaveProcesses <= 40 ) && $rs; //( $NbSlaveProcesses <= ( 100/max(count($this->db_slaves),1) ) ) && $rs;
							
							foreach( $rs as $processes ) {
								$nb_locked = 0;
								if( $processes['State']=="Locked" ) {
									$nb_locked++;
									if( $nb_locked>=1 ) $Safety_Nb_Processes = false;
									break;
								}
							}
						
							if( $Safety_Nb_Processes ) {
								
								if( !empty($this->dbM_info['hide@address.com']) ) {
									$NbSlaveRows=0;
									if( $rs = $dbS->getAll( "SHOW TABLE STATUS FROM ".$this->db_default_schema ) ) foreach( $rs as $tb_stats ) {
										$NbSlaveRows += $tb_stats['Rows'];
									}
									$Safety_Nb_Rows = ( $NbSlaveRows >= ( $this->dbM_info['hide@address.com'] - 0 ) );
								}
								
								if( !$Safety_Nb_Rows ) {
									//if( !$this->dbM ) $this->connectMaster();
								
									if( !$NbSlaveRows ) {
										if( $rs = $dbS->getAll( "SHOW TABLE STATUS FROM ".$this->db_default_schema ) ) foreach( $rs as $tb_stats ) {
											$NbSlaveRows += $tb_stats['Rows'];
										}
									}
									
									$this->closeMaster();
									$this->connectMaster();
									
									$Safety_Nb_Rows = ( $NbSlaveRows >= ( $this->dbM_info['hide@address.com'] - 0 ) ); // TOLERANCE DECALAGE
								}
								
								$this->closeMaster();
							}
							
						}
						
						// echo "\n ######## SAFETY ### $r   >>   $Safety_Replication - $Safety_Nb_Processes - $Safety_Nb_Rows    ### \n";
						
						if( $Safety_Replication && $Safety_Nb_Processes && $Safety_Nb_Rows ) {
							$this->dbS = $dbS;
							$this->dbS->debug = ( $this->debug === true ? true : false );
							$this->dbS_info = array( "host"=>$r, "hide@address.com"=>$NbSlaveRows, "hide@address.com"=>$NbSlaveProcesses );
							$this->dbS_info['last_dispatch'] = $last_dispatch;
							break;
						} else {
							$dbS->close();
							unset($dbS);
						}
					} else {
						$this->dbS = $dbS;
						$this->dbS->debug = ( $this->debug === true ? true : false );
						break;
					}
				}
			}
			
			if( !$this->dbS ) {
				$this->connectMaster();
				$this->dbS = &$this->dbM;
				//$this->dbS_info = $this->dbM_info;
				//$this->close();
				//die( "DBMGR: PROBLEME DE CONNEXION A LA BASE DE CONSULTATION" );
			}
		}
	
		return;
	}
	
	private function closeMaster()
	{
		if( $this->dbM ) $this->dbM->close();
		unset( $this->dbM );
		return;
	}
	
	private function closeSlave( $reset=false )
	{
		if( $this->dbS ) $this->dbS->close();
		unset( $this->dbS );
		if( $reset && count($this->dbS_info)>0 ) foreach( $this->dbS_info as $k => $v ) unset( $this->dbS_info[$k] );
		return;
	}
	
	public function close()
	{
		$this->closeMaster();
		$this->closeSlave();
		
		return;
	}
	
	public function checkReplication( $deep_replication_check=true )
	{
		$this->closeMaster();
		unset( $this->dbM_info );
		$this->closeSlave(true);
		
		if( $deep_replication_check ) $this->deep_replication_check = true ;
		
		return;
	}
	
	private function isWriteOperation( $str ) {
		$str = strtolower( ltrim( $str ) );
		$str = substr( $str, 0, strpos( $str, " " ) );
		if( in_array( $str, array( "insert", "update", "delete", "create", "drop", "grant" ) ) ) return true;
		 else return false;
	}
	
	private function isReadOperation( $str ) {
		$str = strtolower( ltrim( $str ) );
		$str = substr( $str, 0, strpos( $str, " " ) );
		if( in_array( $str, array( "select" ) ) ) return true;
		 else return false;
	}
	
	private function needed( $rw, $str="" ) 
	{
		switch( $rw ) {
			case 'r' :
			case 'R' :
			
				if( !$this->dbS ) $this->connectSlave();
				if( !$this->dbS ) die( "DBMGR: LA BASE DE DONNEES DE CONSULTATION N'EST PAS DISPONIBLE" );
				if( !empty( $str ) ) {
					if( $this->isWriteOperation( $str ) ) {
						$this->close();
						die( "DBMGR: UNE OPERATION D'ECRITURE NON AUTORISEE A ETE REJETEE" );
					}
				}
				break;
			
			case 'w' :
			case 'W' :
			default :
				if( !$this->dbM ) $this->connectMaster();
				if( !$this->dbM ) {
					$this->close();
					die( "DBMGR: LA BASE DE DONNEES PRINCIPALE N'EST PAS DISPONIBLE" );
				}
				break;
		}
		
		return;
	}
	
	
	public function getOne( $rqst )
	{
		$this->needed( "r", $rqst );
		$rqst = "# {$_SERVER['SCRIPT_NAME']} \n # {$_SERVER['REMOTE_ADDR']} \n # {$this->dbS_info['host']} \n # getOne \n " . $rqst ;
		
		/* $result = $this->dbS->getOne( $rqst ); */ // TODO : Get a single value in return
		
		$this->error_parsed( $this->dbS, $rqst );
		//$this->closeSlave();
		return $result;
	}
	
	public function getRow( $rqst )
	{
		$this->needed( "r", $rqst );
		$rqst = "# {$_SERVER['SCRIPT_NAME']} \n # {$_SERVER['REMOTE_ADDR']} \n # {$this->dbS_info['host']} \n # getRow \n " . $rqst ;
		
		/* $result = $this->dbS->getRow( $rqst ); */ // TODO : Get a whole row (the first one ?) in return
		
		$this->error_parsed( $this->dbS, $rqst );
		//$this->closeSlave();
		return $result;
	}

	public function getAll( $rqst )
	{
		$this->needed( "r", $rqst );
		$rqst = "# {$_SERVER['SCRIPT_NAME']} \n # {$_SERVER['REMOTE_ADDR']} \n # {$this->dbS_info['host']} \n # getAll \n " . $rqst ;
		
		/* $result = $this->dbS->getAll( $rqst ); */ // TODO : Return all results (multidimensional array)
		
		$this->error_parsed( $this->dbS, $rqst );
		//$this->closeSlave();
		return $result;
	}
	
	public function execute( $rqst )
	{
		if( $this->isReadOperation( $rqst ) ) {
			$this->needed( "r" );
			$rqst = "# {$_SERVER['SCRIPT_NAME']} \n # {$_SERVER['REMOTE_ADDR']} \n # {$this->dbS_info['host']} \n # execute \n " . $rqst ;
			/* $result = $this->dbS->execute( $rqst ); */ // TODO : execute the request
			$this->error_parsed( $this->dbS, $rqst );
			//$this->closeSlave();
			return $result;
		} else {
			$this->needed( "w" );
			$rqst = "# {$_SERVER['SCRIPT_NAME']} \n # {$_SERVER['REMOTE_ADDR']} \n # {$this->dbM_info['host']} \n # execute \n " . $rqst ;
			/* $result = $this->dbM->execute( $rqst ); */ // TODO : execute the request
			if( $this->error_parsed( $this->dbM, $rqst ) ) {
				$this->close();
				die( "DBMGR: UN PROBLEME D'ENREGISTREMENT A ETE DETECTE ET L'EXECUTION DOIT ETRE ARRETEE" );
			}
			//$this->closeMaster();
			//$this->closeSlave(true);
			
			$this->checkReplication();
			
			return $result;
		}
	}
	
	public function qstr( $str )
	{
		$this->needed( "r" );
		/* $result = $this->dbS->qstr( $str ); */ // TODO : protect the query against SQL injection
		//$this->closeSlave();
		return $result;
	}
	
	private function error_parsed( &$db, $rqst ) {
		$error = $db->ErrorMsg();
		if( !empty( $error ) ) {
			error_report::send_report( array("hide@address.com"), $rqst . "\n --- \n" . $error );
			return true;
		}
		
		return false;
	}

}

?>
Return current item: MySQL Load Balancing PHP script