Location: PHPKode > projects > Yioop! > yioop-v0.78/bin/queue_server.php
<?php
/** 
 *  SeekQuarry/Yioop --
 *  Open Source Pure PHP Search Engine, Crawler, and Indexer
 *
 *  Copyright (C) 2009, 2010, 2011  Chris Pollett hide@address.com
 *
 *  LICENSE:
 *
 *  This program is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 *  END LICENSE
 *
 * @author Chris Pollett hide@address.com
 * @package seek_quarry
 * @subpackage bin
 * @license http://www.gnu.org/licenses/ GPL3
 * @link http://www.seekquarry.com/
 * @copyright 2009, 2010, 2011
 * @filesource
 */

if(php_sapi_name() != 'cli') {echo "BAD REQUEST"; exit();}


/** Calculate base directory of script 
 * @ignore 
 */
define("BASE_DIR", substr(
    dirname(realpath($_SERVER['PHP_SELF'])), 0, 
    -strlen("/bin")));

ini_set("memory_limit","1000M"); //so have enough memory to crawl big pages

/** Load in global configuration settings */
require_once BASE_DIR.'/configs/config.php';
if(!PROFILE) {
    echo "Please configure the search engine instance ".
        "by visiting its web interface on localhost.\n";
    exit();
}

/** NO_CACHE means don't try to use memcache 
 * @ignore
 */
define("NO_CACHE", true);

/** Get the database library based on the current database type */
require_once BASE_DIR."/models/datasources/".DBMS."_manager.php";

/** Load the class that maintains our URL queue */
require_once BASE_DIR."/lib/web_queue_bundle.php";

/** Load word->{array of docs with word} index class */
require_once BASE_DIR."/lib/index_archive_bundle.php";

/** Used for manipulating urls*/
require_once BASE_DIR."/lib/url_parser.php";

/**  For crawlHash function */
require_once BASE_DIR."/lib/utility.php";

/** For crawlDaemon function  */
require_once BASE_DIR."/lib/crawl_daemon.php"; 
/**  */
require_once BASE_DIR."/lib/fetch_url.php";
/** Loads common constants for web crawling*/
require_once BASE_DIR."/lib/crawl_constants.php";

require_once BASE_DIR."/lib/phrase_parser.php";

/** get any indexing plugins */
foreach(glob(BASE_DIR."/lib/indexing_plugins/*_plugin.php") as $filename) { 
    require_once $filename;
}

/*
 *  We'll set up multi-byte string handling to use UTF-8
 */
mb_internal_encoding("UTF-8");
mb_regex_encoding("UTF-8");

/*
 * If Memcache is available queueserver can be used to load 
 * index dictionaries and shards into memcache. Note in only
 * this situation is NO_CACHE ignored
 */
if(USE_MEMCACHE) {
    $MEMCACHE = new Memcache();
    foreach($MEMCACHES as $mc) {
        $MEMCACHE->addServer($mc['host'], $mc['port']);
    }
    unset($mc);
}

/**
 * Command line program responsible for managing Yioop crawls.
 *
 * It maintains a queue of urls that are going to be scheduled to be seen.
 * It also keeps track of what has been seen and robots.txt info.
 * Its last responsibility is to create and populate the IndexArchiveBundle
 * that is used by the search front end.
 *
 * @author Chris Pollett
 * @package seek_quarry
 */
class QueueServer implements CrawlConstants
{
    /**
     * Reference to a database object. Used since has directory manipulation
     * functions
     * @var object
     */
    var $db;
    /**
     * Web-sites that crawler can crawl. If used, ONLY these will be crawled
     * @var array
     */
    var $allowed_sites;
    /**
     * Web-sites that the crawler must not crawl
     * @var array
     */
    var $disallowed_sites;
    /**
     * Constant saying the method used to order the priority queue for the crawl
     * @var string
     */
    var $crawl_order;
    /**
     * Indicates the kind of crawl being performed: self::WEB_CRAWL indicates
     * a new crawl of the web; self::ARCHIVE_CRAWL indicates a crawl of an 
     * existing web archive
     * @var string
     */
    var $crawl_type;

    /**
     * If the crawl_type is self::ARCHIVE_CRAWL, then crawl_index is the 
     * timestamp of the existing archive to crawl
     * @var string
     */
    var $crawl_index;
    /**
     * Says whether the $allowed_sites array is being used or not
     * @var bool
     */
    var $restrict_sites_by_url;
    /**
     * List of file extensions supported for the crawl
     * @var array
     */
    var $indexed_file_types;
    /**
     * Holds an array of word -> url patterns which are used to 
     * add meta words to the words that are extracted from any given doc
     * @var array
     */
    var $meta_words;
    /**
     * Holds the WebQueueBundle for the crawl. This bundle encapsulates
     * the priority queue of urls that specifies what to crawl next
     * @var object
     */
    var $web_queue;
    /**
     * Holds the IndexArchiveBundle for the current crawl. This encapsulates
     * the inverted index word-->documents for the crawls as well as document
     * summaries of each document.
     * @var object
     */
    var $index_archive;
    /**
     * The timestamp of the current active crawl
     * @var int
     */
    var $crawl_time;
    /**
     * This is a list of hosts whose robots.txt file had a Crawl-delay directive
     * and which we have produced a schedule with urls for, but we have not
     * heard back from the fetcher who was processing those urls. Hosts on
     * this list will not be scheduled for more downloads until the fetcher
     * with earlier urls has gotten back to the queue_server.
     * @var array
     */
    var $waiting_hosts;
    /**
     * IP address as a string of the fetcher that most recently spoke with the
     * queue_server.
     * @var string
     */
    var $most_recent_fetcher;
    /**
     * Last time index was saved to disk
     * @var int
     */
    var $last_index_save_time;
    /**
     * flags for whether the index has data to be written to disk
     * @var int
     */
     var $index_dirty;

    /**
     * This keeps track of the time the current archive info was last modified
     * This way the queue_server knows if the user has changed the crawl
     * parameters during the crawl.
     * @var int
     */
    var $archive_modified_time;

    /**
     * This is a list of indexing_plugins which might do
     * post processing after the crawl. The plugins postProcessing function
     * is called if it is selected in the crawl options page.
     * @var array
     */
    var $indexing_plugins;

    /**
     * This is a list of hourly (timestamp, number_of_urls_crawled) data
     * @var array
     */
    var $hourly_crawl_data;

    /**
     * holds the post processors selected in the crawl options page
     */

    function __construct($indexed_file_types) 
    {
        $db_class = ucfirst(DBMS)."Manager";
        $this->db = new $db_class();
        $this->indexed_file_types = $indexed_file_types;
        $this->most_recent_fetcher = "No Fetcher has spoken with me";

        //the next values will be set for real in startCrawl
        $this->crawl_order = self::PAGE_IMPORTANCE; 
        $this->restrict_sites_by_url = true;
        $this->allowed_sites = array();
        $this->disallowed_sites = array();
        $this->meta_words = array();
        $this->last_index_save_time = 0;
        $this->index_dirty = false;
        $this->hourly_crawl_data = array();
        $this->archive_modified_time = 0;
        $this->crawl_time = 0;
    }

    /**
     * This is the function that should be called to get the queue_server 
     * to start. Calls init to handle the command line arguments then enters 
     * the queue_server's main loop
     */
    function start()
    {
        global $argv;

        declare(ticks=1);
        CrawlDaemon::init($argv, "queue_server");

        $this->loop();

    }

    /**
     * Main runtime loop of the queue_server. 
     *
     * Loops until a stop message received, check for start, stop, resume
     * crawl messages, deletes any WebQueueBundle for which an 
     * IndexArchiveBundle does not exist. Processes
     */
    function loop()
    {
        $info[self::STATUS] = self::WAITING_START_MESSAGE_STATE;
        crawlLog("In queue loop!!", "queue_server");

        $this->deleteOrphanedBundles();
        while ($info[self::STATUS] != self::STOP_STATE) {
            crawlLog("Peak memory usage so far".memory_get_peak_usage()."!!");

            $info = $this->handleAdminMessages($info);

            if( $info[self::STATUS] == self::WAITING_START_MESSAGE_STATE) {
                crawlLog("Waiting for start message\n");
                sleep(5);
                continue;
            }

            if($info[self::STATUS] == self::STOP_STATE) {
                continue;
            }

            //check and update if necessary the crawl params of current crawl
            $this->checkUpdateCrawlParameters();

            /* check for orphaned queue bundles 
               also check to make sure indexes closed properly for stopped 
               crawls
             */
            $this->deleteOrphanedBundles();

            $this->processIndexData();
            if(time() - $this->last_index_save_time > FORCE_SAVE_TIME){
                crawlLog("Periodic Index Save... \n");
                $start_time = microtime();
                $this->indexSave();
                crawlLog("... Save time".(changeInMicrotime($start_time)));
            }

            switch($this->crawl_type)
            {
                case self::WEB_CRAWL:
                    $this->processRobotUrls();

                    $count = $this->web_queue->to_crawl_queue->count;

                    if($count < NUM_URLS_QUEUE_RAM - 
                        SEEN_URLS_BEFORE_UPDATE_SCHEDULER * MAX_LINKS_PER_PAGE){
                        $info = $this->processQueueUrls();
                    }

                    if($count > 0) {
                        $top = $this->web_queue->peekQueue();
                        if($top[1] < MIN_QUEUE_WEIGHT) { 
                            crawlLog("Normalizing Weights!!\n");
                            $this->web_queue->normalize(); 
                            /* this will undercount the weights of URLS 
                               from fetcher data that have not completed
                             */
                         }

                        if(!file_exists(
                            CRAWL_DIR."/schedules/".self::schedule_name.
                            $this->crawl_time.".txt")) {
                            $this->produceFetchBatch();
                        }
                    }
                break;
                case self::ARCHIVE_CRAWL:
                    $this->processRecrawlRobotUrls();
                    if(!file_exists(CRAWL_DIR."/schedules/".self::schedule_name.
                            $this->crawl_time.".txt")) {
                        $this->writeArchiveCrawlInfo();
                    }
                break;
            }
            crawlLog("Taking five second sleep...");
            sleep(5);
        }

        crawlLog("Queue Server shutting down!!");
    }

    /**
     * Used to write info about the current recrawl to file as well as to
     * process any recrawl data files received 
     */
    function writeArchiveCrawlInfo()
    {
        $schedule_time = time();
        $first_line = $this->calculateScheduleMetaInfo($schedule_time);
        $fh = fopen(CRAWL_DIR."/schedules/".self::schedule_name.
            $this->crawl_time.".txt", "wb");
        fwrite($fh, $first_line);
        fclose($fh);

        $schedule_dir = 
            CRAWL_DIR."/schedules/".
                self::schedule_data_base_name.$this->crawl_time;
        $this->processDataFile($schedule_dir, "processRecrawlDataArchive");

    }

    /**
     * Even during a recrawl the fetcher may send robot data to the
     * queue_server. This function prints a log message and calls another
     * function to delete this useless robot file.
     */
    function processRecrawlRobotUrls()
    {
        crawlLog("Checking for robots.txt files to process...");
        $robot_dir = 
            CRAWL_DIR."/schedules/".
                self::robot_data_base_name.$this->crawl_time;

        $this->processDataFile($robot_dir, "processRecrawlRobotArchive");
        crawlLog("done. ");
    }

    /**
     * Even during a recrawl the fetcher may send robot data to the
     * queue_server. This function delete the passed robot file.
     *
     * @param string $file robot file to delete
     */
    function processRecrawlRobotArchive($file)
    {
        crawlLog("Deleting unneeded robot schedule files");

        unlink($file);
    }

    /**
     * Used to get a data archive file (either during a normal crawl or a 
     * recrawl). After uncompressing this file (which comes via the web server
     * through fetch_controller, from the fetcher), it sets which fetcher
     * sent it and also returns the sites contained in it.
     *
     * @param string $file name of archive data file
     * @return array sites contained in the file from the fetcher
     */
    function &getDataArchiveFileData($file)
    {
        crawlLog("Processing File: $file");

        $sites = unserialize(gzuncompress(webdecode(file_get_contents($file) ))
            );

        $robot_table_name = CRAWL_DIR."/robot_table.txt";
        if(file_exists($robot_table_name)) {
            $robot_table = unserialize(file_get_contents($robot_table_name));
            $recent = 0 ;
            foreach($robot_table as $robot_name => $robot_data) {
                if($robot_data[2] > $recent) {
                    $this->most_recent_fetcher =  $robot_name;
                    $recent = $robot_data[2];
                }
            }
        }
        return $sites;
    }

    /**
     * Processes fetcher data file information during a recrawl
     * 
     * @param String $file a file which contains the info to process
     */
    function processRecrawlDataArchive($file)
    {
        $sites = & $this->getDataArchiveFileData($file);
        unlink($file);
        $this->writeCrawlStatus($sites);
    }


    /**
     * Handles messages passed via files to the QueueServer.
     *
     * These files are typically written by the CrawlDaemon::init()
     * when QueueServer is run using command-line argument
     *
     * @param array $info associative array with info about current state of
     *      queue_server
     * @return array an updates version $info reflecting changes that occurred
     *      during the handling of the admin messages files.
     */
    function handleAdminMessages($info) 
    {
        $old_info = $info;
        if(file_exists(CRAWL_DIR."/schedules/queue_server_messages.txt")) {
            $info = unserialize(file_get_contents(
                CRAWL_DIR."/schedules/queue_server_messages.txt"));
                unlink(CRAWL_DIR."/schedules/queue_server_messages.txt");
                
            switch($info[self::STATUS])
            {
                case "NEW_CRAWL":
                   if($old_info[self::STATUS] == self::CONTINUE_STATE) {
                        crawlLog("Stopping previous crawl before start".
                            " new crawl!");
                        $this->stopCrawl();
                    }
                    $this->startCrawl($info);
                    crawlLog(
                        "Starting new crawl. Timestamp:".$this->crawl_time);
                    if($this->crawl_type == self::WEB_CRAWL) {
                        crawlLog("Performing a web crawl!");
                    } else {
                        crawlLog("Performing an archive crawl of ".
                            "archive with timestamp ".$this->crawl_index);
                    }
                    $this->writeAdminMessage("BEGIN_CRAWL");
                break;

                case "STOP_CRAWL":
                    crawlLog("Stopping crawl !! This involves multiple steps!");
                    $this->stopCrawl();
                    if(file_exists(CRAWL_DIR."/schedules/crawl_status.txt")) {
                        unlink(CRAWL_DIR."/schedules/crawl_status.txt");
                    }
                    $info[self::STATUS] = self::WAITING_START_MESSAGE_STATE;
                    crawlLog("Crawl has been successfully stopped!!");
                break;

                case "RESUME_CRAWL":
                    if(isset($info[self::CRAWL_TIME]) && 
                        file_exists(CRAWL_DIR.'/cache/'.
                            self::queue_base_name.$info[self::CRAWL_TIME])) {
                        if($old_info[self::STATUS] == self::CONTINUE_STATE) {
                            crawlLog("Resuming old crawl... Stopping current ".
                                "crawl first!");
                            $this->stopCrawl();
                        }
                        $this->startCrawl($info);
                        $this->writeAdminMessage("RESUME_CRAWL");
                        crawlLog("Resuming crawl");
                    } else {
                        $msg = "Restart failed!!!  ";
                        if(!isset($info[self::CRAWL_TIME])) {
                            $msg .="crawl time of crawl to restart not given\n";
                        }
                        if(!file_exists(CRAWL_DIR.'/cache/'.
                            self::queue_base_name.$info[self::CRAWL_TIME])) {
                            $msg .= "bundle for crawl restart doesn't exist\n";
                        }
                        $info["MESSAGE"] = $msg;
                        $this->writeAdminMessage($msg);
                        crawlLog($msg);
                        $info[self::STATUS] = self::WAITING_START_MESSAGE_STATE;
                    }
                break;

            }
        }

        return $info;
    }

    /**
     * Used to stop the currently running crawl gracefully so that it can
     * be restarted. This involved writing the queue's contents back to
     * schedules, making the crawl's dictionary all the same tier and running
     * any indexing_plugins.
     */
    function stopCrawl()
    {
        $this->dumpQueueToSchedules();
        $this->shutdownDictionary();
        //Calling post processing function if the processor is 
        //selected in the crawl options page.
        $this->runPostProcessingPlugins();
        file_put_contents(CRAWL_DIR.'/schedules/'.self::index_closed_name.
            $this->crawl_time.".txt", "1");

    }

    /**
     * Used to write an admin crawl status message during a start or stop
     * crawl.
     * 
     * @param string $message to write into crawl_status.txt this will show
     *      up in the web crawl status element.
     */
    function writeAdminMessage($message)
    {
        $crawl_status = array();
        $crawl_status['MOST_RECENT_FETCHER'] = "";
        $crawl_status['MOST_RECENT_URLS_SEEN'] = array();
        $crawl_status['CRAWL_TIME'] = $this->crawl_time;
        $crawl_status['COUNT'] = 0;
        $crawl_status['DESCRIPTION'] = $message;
        file_put_contents(
            CRAWL_DIR."/schedules/crawl_status.txt", 
            serialize($crawl_status));
        chmod(CRAWL_DIR."/schedules/crawl_status.txt", 0777);
    }

    /**
     * When a crawl is being shutdown, this function is called to write
     * the contents of the web queue bundle back to schedules. This allows
     * crawls to be resumed without losing urls.
     */
    function dumpQueueToSchedules()
    {
        $this->writeAdminMessage("SHUTDOWN_QUEUE");
        if(!isset($this->web_queue->to_crawl_queue)) {
            crawlLog("URL queue appears to be empty or NULL");
            return;
        }
        crawlLog("Writing queue contents back to schedules...");
        $dir = CRAWL_DIR."/schedules/".self::schedule_data_base_name.
            $this->crawl_time;
        if(!file_exists($dir)) {
            mkdir($dir);
            chmod($dir, 0777);
        }

        $day = floor($this->crawl_time/86400) - 1; 
            //want before all other schedules, so will be reloaded first

        $dir .= "/$day";
        if(!file_exists($dir)) {
            mkdir($dir);
            chmod($dir, 0777);
        }
        //get rid of previous restart attempts, if present
        $this->db->unlinkRecursive($dir, false);
        $count = $this->web_queue->to_crawl_queue->count;
        $old_time = 1;
        $now = time();
        $schedule_data = array();
        $schedule_data[self::SCHEDULE_TIME] = $this->crawl_time;
        $schedule_data[self::TO_CRAWL] = array();
        $fh = $this->web_queue->openUrlArchive();
        for($time = 1; $time < $count; $time++) {
            $tmp =  $this->web_queue->peekQueue($time, $fh);
            list($url, $weight) = $tmp;
            // if queue error skip
            if($tmp === false || strcmp($url, "LOOKUP ERROR") == 0) {
                continue;
            }
            $hash = crawlHash($now.$url);
            $schedule_data[self::TO_CRAWL][] = array($url, $weight, $hash);
            if($time - $old_time >= MAX_FETCH_SIZE) {
                if(count($schedule_data[self::TO_CRAWL]) > 0) {
                    $data_string = webencode(
                        gzcompress(serialize($schedule_data)));
                    $data_hash = crawlHash($data_string);
                    file_put_contents($dir."/At".$time."From127-0-0-1".
                        "WithHash$data_hash.txt", $data_string);
                    $data_string = "";
                    $schedule_data[self::TO_CRAWL] = array();
                }
                $old_time = $time;
            }
        }
        $this->web_queue->closeUrlArchive($fh);
        if(count($schedule_data[self::TO_CRAWL]) > 0) {
            $data_string = webencode(
                gzcompress(serialize($schedule_data)));
            $data_hash = crawlHash($data_string);
            file_put_contents($dir."/At".$time."From127-0-0-1".
                "WithHash$data_hash.txt", $data_string);
        }
    }

    /**
     * During crawl shutdown, this function is called to do a final save and
     * merge of the crawl dictionary, so that it is ready to serve queries.
     */
    function shutdownDictionary()
    {
        $this->writeAdminMessage("SHUTDOWN_DICTIONARY");
        if(is_object($this->index_archive)) {
            $this->
                index_archive->saveAndAddCurrentShardDictionary();
            $this->index_archive->dictionary->mergeAllTiers();
        }
        $this->db->setWorldPermissionsRecursive(
            CRAWL_DIR.'/cache/'.
            self::index_data_base_name.$this->crawl_time);
    }

    /**
     * During crawl shutdown this is called to run any post processing plugins
     */
    function runPostProcessingPlugins()
    {
        if(isset($this->indexing_plugins)) {
            crawlLog("Post Processing....");
            $this->writeAdminMessage("SHUTDOWN_RUNPLUGINS");
            foreach($this->indexing_plugins as $plugin) {
                $plugin_instance_name = 
                    lcfirst($plugin)."Plugin";
                $plugin_name = $plugin."Plugin";
                $this->$plugin_instance_name = 
                    new $plugin_name();
                if($this->$plugin_instance_name) {
                    crawlLog(
                        "... executing $plugin_instance_name");
                    $this->$plugin_instance_name->
                        postProcessing($this->crawl_time);
                }
            }
        }
    }
    /**
     * Saves the index_archive and, in particular, its current shard to disk
     */
    function indexSave()
    {
        $this->last_index_save_time = time();
        if(isset($this->index_archive) && $this->index_dirty) {
            $this->index_archive->forceSave();
            $this->index_dirty = false;
            // chmod so apache can also write to these directories
            $this->db->setWorldPermissionsRecursive(
                CRAWL_DIR.'/cache/'.
                self::index_data_base_name.$this->crawl_time);
        }
    }

    /**
     * Begins crawling base on time, order, restricted site $info 
     * Setting up a crawl involves creating a queue bundle and an
     * index archive bundle
     *
     * @param array $info parameter for the crawl
     */
    function startCrawl($info)
    {
        //to get here we at least have to have a crawl_time
        $this->crawl_time = $info[self::CRAWL_TIME];

        $read_from_info = array(
            "crawl_order" => self::CRAWL_ORDER,
            "crawl_type" => self::CRAWL_TYPE,
            "crawl_index" => self::CRAWL_INDEX,
            "restrict_sites_by_url" => self::RESTRICT_SITES_BY_URL,
            "allowed_sites" => self::ALLOWED_SITES,
            "disallowed_sites" => self::DISALLOWED_SITES,
            "meta_words" => self::META_WORDS,
            "indexing_plugins" => self::INDEXING_PLUGINS,
        );
        $try_to_set_from_old_index = array();
        foreach($read_from_info as $index_field => $info_field) {
            if(isset($info[$info_field])) {
                $this->$index_field = $info[$info_field];
            } else {
                array_push($try_to_set_from_old_index,  $index_field);
            }
        }

        switch($this->crawl_order) 
        {
            case self::BREADTH_FIRST:
                $min_or_max = self::MIN;
            break;

            case self::PAGE_IMPORTANCE:
            default:
                $min_or_max = self::MAX;
            break;  
        }
        $this->web_queue = NULL;
        $this->index_archive = NULL;

        gc_collect_cycles(); // garbage collect old crawls

        if($this->crawl_type == self::WEB_CRAWL || 
            !isset($this->crawl_type)) {
            $this->web_queue = new WebQueueBundle(
                CRAWL_DIR.'/cache/'.self::queue_base_name.
                $this->crawl_time, URL_FILTER_SIZE, 
                    NUM_URLS_QUEUE_RAM, $min_or_max);
        }

        $dir = CRAWL_DIR.'/cache/'.self::index_data_base_name.$this->crawl_time;

        if(!file_exists($dir)) {
            $this->index_archive = new IndexArchiveBundle($dir, false,
                serialize($info));
        } else {
            $this->index_archive = new IndexArchiveBundle($dir,
                false);
            $archive_info = IndexArchiveBundle::getArchiveInfo($dir);
            $index_info = unserialize($archive_info['DESCRIPTION']);

            foreach($try_to_set_from_old_index as $index_field) {
                if(isset($index_info[$read_from_info[$index_field]]) ) {
                    $this->$index_field = 
                        $index_info[$read_from_info[$index_field]];
                }
            }
        }
        if(file_exists(CRAWL_DIR.'/schedules/'. self::index_closed_name.
            $this->crawl_time.".txt")) {
            unlink(CRAWL_DIR.'/schedules/'. self::index_closed_name.
                $this->crawl_time.".txt");
        }
        // chmod so web server can also write to these directories
        if($this->crawl_type == self::WEB_CRAWL) {
            $this->db->setWorldPermissionsRecursive(
                CRAWL_DIR.'/cache/'.self::queue_base_name.$this->crawl_time);
        }
        $this->db->setWorldPermissionsRecursive($dir);

        //Get modified time of initial setting of crawl params
        $this->archive_modified_time = 
            IndexArchiveBundle::getParamModifiedTime($dir);
        $info[self::STATUS] = self::CONTINUE_STATE;
        return $info;
    }

    /**
     * Checks to see if the parameters by which the active crawl are being
     * conducted have been modified since the last time the values were put
     * into queue server field variables. If so, it updates the values to
     * to their new values
     */
    function checkUpdateCrawlParameters()
    {
        crawlLog("Check for update in crawl parameters...");
        $dir = CRAWL_DIR.'/cache/'.self::index_data_base_name.$this->crawl_time;
        $modified_time = IndexArchiveBundle::getParamModifiedTime($dir);
        if($this->archive_modified_time == $modified_time) {
            crawlLog("...none.");
            return;
        }
        $updatable_info = array(
            "restrict_sites_by_url" => self::RESTRICT_SITES_BY_URL,
            "allowed_sites" => self::ALLOWED_SITES,
            "disallowed_sites" => self::DISALLOWED_SITES,
            "meta_words" => self::META_WORDS,
            "indexing_plugins" => self::INDEXING_PLUGINS,
        );
        $keys = array_keys($updatable_info);
        $archive_info = IndexArchiveBundle::getArchiveInfo($dir);
        $index_info = unserialize($archive_info['DESCRIPTION']);
        foreach($keys as $index_field) {
            if(isset($index_info[$updatable_info[$index_field]]) ) {
                $this->$index_field = 
                    $index_info[$updatable_info[$index_field]];
                crawlLog("Updating ...$index_field.");
            }
        }
        $this->archive_modified_time = $modified_time;
    }

    /**
     * Delete all the queue bundles and schedules that don't have an 
     * associated index bundle as this means that crawl has been deleted.
     */
    function deleteOrphanedBundles()
    {
        $dirs = glob(CRAWL_DIR.'/cache/*', GLOB_ONLYDIR);
        $living_stamps = array();
        foreach($dirs as $dir) {
            if(strlen(
                $pre_timestamp = strstr($dir, self::queue_base_name)) > 0) {
                $timestamp = 
                    substr($pre_timestamp, strlen(self::queue_base_name));
                if(!file_exists(
                    CRAWL_DIR.'/cache/'.
                        self::index_data_base_name.$timestamp)) {
                    $this->db->unlinkRecursive($dir, true);
                }
            }
            if(strlen(
                $pre_timestamp = strstr($dir, self::index_data_base_name)) > 0){
                $timestamp = 
                    substr($pre_timestamp, strlen(self::index_data_base_name));
                if(!file_exists(CRAWL_DIR.'/schedules/'.self::index_closed_name.
                    $timestamp.".txt") && $this->crawl_time != $timestamp) {
                    crawlLog("Properly closing Index with Timestamp ".
                        $timestamp. ". Will contain all but last shard before ".
                        "crash.");
                    $index_archive = new IndexArchiveBundle($dir, false);
                    $index_archive->dictionary->mergeAllTiers();
                    file_put_contents(CRAWL_DIR.'/schedules/'.
                        self::index_closed_name.$timestamp.".txt", "1");
                    /*
                       touch crawl_status so it doesn't stop the crawl right
                       after it just restarted
                     */
                    touch(CRAWL_DIR."/schedules/crawl_status.txt", time());
                }
                $living_stamps[] = $timestamp;
            }
        }
        $files = glob(CRAWL_DIR.'/schedules/*');
        $names = array(self::schedule_data_base_name, self::schedule_name,
            self::index_data_base_name, self::robot_data_base_name,
            self::index_closed_name);
        foreach($files as $file) {
            $timestamp = "";
            foreach($names as $name) {
                $unlink_flag = false;
                if(strlen(
                    $pre_timestamp = strstr($file, $name)) > 0) {
                    $timestamp =  substr($pre_timestamp, strlen($name), 10);
                    if($name == self::schedule_name ||
                        $name == self::index_closed_name){
                        $unlink_flag = true;
                    }
                    break;
                }
            }
            if($timestamp !== "" && !in_array($timestamp, $living_stamps)) {
                if($unlink_flag) {
                    unlink($file);
                } else {
                    $this->db->unlinkRecursive($file, true);
                }
            }
        }
    }


    /**
     * Generic function used to process Data, Index, and Robot info schedules
     * Finds the first file in the the direcotry of schedules of the given
     * type, and calls the appropriate callback method for that type.
     * 
     * @param string $base_dir directory for of schedules
     * @param string $callback_method what method should be called to handle
     *      a schedule
     */
    function processDataFile($base_dir, $callback_method)
    {
        $dirs = glob($base_dir.'/*', GLOB_ONLYDIR);

        foreach($dirs as $dir) {
            $files = glob($dir.'/*.txt');
            if(isset($old_dir)) {
                crawlLog("Deleting $old_dir\n");
                $this->db->unlinkRecursive($old_dir);
                /* The idea is that only go through outer loop more than once 
                   if earlier data directory empty.
                   Note: older directories should only have data dirs or 
                   deleting like this might cause problems! 
                 */
            }
            foreach($files as $file) {
                $path_parts = pathinfo($file);
                $base_name = $path_parts['basename'];
                $len_name =  strlen(self::data_base_name);
                $file_root_name = substr($base_name, 0, $len_name);

                if(strcmp($file_root_name, self::data_base_name) == 0) {
                    $this->$callback_method($file);
                    return;
                }
            }
            $old_dir = $dir;
        }

    }

    /**
     * Sets up the directory to look for a file of unprocessed 
     * index archive data from fetchers then calls the function 
     * processDataFile to process the oldest file found
     */
    function processIndexData()
    {
        crawlLog("Checking for index data files to process...");

        $index_dir =  CRAWL_DIR."/schedules/".
            self::index_data_base_name.$this->crawl_time;
        $this->processDataFile($index_dir, "processIndexArchive");
        crawlLog("done.");
    }

    /**
     * Adds the summary and index data in $file to summary bundle and word index
     *
     * @param string $file containing web pages summaries and a mini-inverted
     *      index for their content
     */
    function processIndexArchive($file)
    {
        static $first = true;
        crawlLog(
            "Start processing index data memory usage".
            memory_get_usage() . "...");
        crawlLog("Processing index data in $file...");

        $start_time = microtime();

        $pre_sites = webdecode(file_get_contents($file));

        $len_urls = unpackInt(substr($pre_sites, 0, 4));

        $seen_urls_string = substr($pre_sites, 4, $len_urls);
        $pre_sites = substr($pre_sites, 4 + $len_urls);

        $sites[self::SEEN_URLS] = array();
        $pos = 0;
        while($pos < $len_urls) {
            $len_site = unpackInt(substr($seen_urls_string, $pos ,4));
            if($len_site > 2*PAGE_RANGE_REQUEST) {
                crawlLog("Site string too long, $len_site,".
                    " data file may be corrupted? Skip rest.");
                break;
            }
            $pos += 4;
            $site_string = substr($seen_urls_string, $pos, $len_site);
            $pos += strlen($site_string);
            $tmp = unserialize(gzuncompress($site_string));
            if(!$tmp  || !is_array($tmp)) {
                crawlLog("Compressed array null,".
                    " data file may be corrupted? Skip rest.");
                break;
            }
            $sites[self::SEEN_URLS][] = $tmp;
        }

        $sites[self::INVERTED_INDEX] = IndexShard::load("fetcher_shard", 
            $pre_sites);
        unset($pre_sites);

        crawlLog("A memory usage".memory_get_usage() .
          " time: ".(changeInMicrotime($start_time)));
        $start_time = microtime();

        //do deduplication of summaries
        if(isset($sites[self::SEEN_URLS]) && 
            count($sites[self::SEEN_URLS]) > 0) {
            $seen_sites = $sites[self::SEEN_URLS];
            $seen_sites = array_values($seen_sites);
            $num_seen = count($seen_sites);
 
        } else {
            $num_seen = 0;
        }

        $visited_urls_count = 0;
        $recent_urls_count = 0;
        $recent_urls = array();
        for($i = 0; $i < $num_seen; $i++) {
            $seen_sites[$i][self::HASH_URL] = 
                crawlHash($seen_sites[$i][self::URL], true);
            $link_url_parts = explode("|", $seen_sites[$i][self::URL]);
            if(strcmp("url", $link_url_parts[0]) == 0) {
                $reftype =  (strcmp("eref", $link_url_parts[4]) == 0) ?
                    "e" : "i";
                $seen_sites[$i][self::HASH_URL] = 
                    crawlHash($link_url_parts[1], true)
                    . crawlHash($seen_sites[$i][self::URL], true)
                    . $reftype. substr(crawlHash(
                      UrlParser::getHost($link_url_parts[5])."/", true), 1);
                $seen_sites[$i][self::IS_DOC] = false;
            } else {
                $seen_sites[$i][self::IS_DOC] = true;
                $visited_urls_count++;
                array_push($recent_urls, $seen_sites[$i][self::URL]);
                if($recent_urls_count >= NUM_RECENT_URLS_TO_DISPLAY)
                {
                    array_shift($recent_urls);
                }
                $recent_urls_count++;
            }
        }

        if(isset($sites[self::INVERTED_INDEX])) {
            $index_shard =  & $sites[self::INVERTED_INDEX];
            if($index_shard->word_docs_packed) {
                $index_shard->unpackWordDocs();
            }
            $generation = 
                $this->index_archive->initGenerationToAdd($index_shard);

            $summary_offsets = array();
            if(isset($seen_sites)) {
                $this->index_archive->addPages(
                    $generation, self::SUMMARY_OFFSET, $seen_sites,
                    $visited_urls_count);

                foreach($seen_sites as $site) {
                    if($site[self::IS_DOC]){ // so not link
                        $hash = $site[self::HASH_URL]. 
                            $site[self::HASH] . 
                            "d". substr(crawlHash(
                            UrlParser::getHost($site[self::URL])."/", true), 1);
                    } else {
                        $hash = $site[self::HASH_URL];
                    }
                    $summary_offsets[$hash] = $site[self::SUMMARY_OFFSET];
                }
            }
            crawlLog("B Init Shard, Store Summaries memory usage".
                memory_get_usage() .
                " time: ".(changeInMicrotime($start_time)));
            $start_time = microtime();
            // added summary offset info to inverted index data

            $index_shard->changeDocumentOffsets($summary_offsets);

            crawlLog("C (update shard offsets) memory usage".memory_get_usage().
                " time: ".(changeInMicrotime($start_time)));
            $start_time = microtime();

            $this->index_archive->addIndexData($index_shard);
            $this->index_dirty = true;
        }
        crawlLog("D (add index shard) memory usage".memory_get_usage(). 
            " time: ".(changeInMicrotime($start_time)));


        crawlLog("Done Processing File: $file");
        if(isset($recent_urls)) {
            $sites[self::RECENT_URLS] = & $recent_urls;
            $this->writeCrawlStatus($sites);
        }

        unlink($file);
    }

    /**
     * Checks how old the oldest robot data is and dumps if older then a 
     * threshold, then sets up the path to the robot schedule directory
     * and tries to process a file of robots.txt robot paths data from there
     */
    function processRobotUrls()
    {
        if(!isset($this->web_queue) ) {return;}
        crawlLog("Checking age of robot data in queue server ");

        if($this->web_queue->getRobotTxtAge() > CACHE_ROBOT_TXT_TIME) {
            $this->deleteRobotData();
        } else {
            crawlLog("... less than max age\n");
            crawlLog("Number of Crawl-Delayed Hosts: ".count(
                $this->waiting_hosts));
        }

        crawlLog("Checking for robots.txt files to process...");
        $robot_dir = 
            CRAWL_DIR."/schedules/".
                self::robot_data_base_name.$this->crawl_time;

        $this->processDataFile($robot_dir, "processRobotArchive");
        crawlLog("done. ");
    }

    /**
     * Reads in $file of robot data adding host-paths to the disallowed
     * robot filter and setting the delay in the delay filter of
     * crawled delayed hosts
     * @param string $file file to read of robot data, is removed after
     *      processing
     */
    function processRobotArchive($file)
    {
        crawlLog("Processing Robots data in $file");
        $start_time = microtime();

        $sites = unserialize(gzuncompress(webdecode(file_get_contents($file))));
        if(isset($sites)) {
            foreach($sites as $robot_host => $robot_info) {
                $this->web_queue->addGotRobotTxtFilter($robot_host);
                $robot_url = $robot_host."/robots.txt";
                if($this->web_queue->containsUrlQueue($robot_url)) {
                    crawlLog("Removing $robot_url from Queue");
                    $this->web_queue->removeQueue($robot_url);
                }

                if(isset($robot_info[self::CRAWL_DELAY])) {
                    $this->web_queue->setCrawlDelay($robot_host,
                        $robot_info[self::CRAWL_DELAY]);
                }

                if(isset($robot_info[self::ROBOT_PATHS])) {
                    foreach($robot_info[self::ROBOT_PATHS] as $path) {
                        $this->web_queue->addDisallowedRobotFilter(
                            $robot_host.$path); 
                    }
                }
            }
        }

        crawlLog(" time: ".(changeInMicrotime($start_time))."\n");

        crawlLog("Done Processing File: $file");

        unlink($file);
    }

    /**
     * Deletes all Robot informations stored by the QueueServer. 
     * 
     * This function is called roughly every CACHE_ROBOT_TXT_TIME.
     * It forces the crawler to redownload robots.txt files before hosts
     * can be continued to be crawled. This ensures if the cache robots.txt
     * file is never too old. Thus, if someone changes it to allow or disallow
     * the crawler it will be noticed reasonably promptly.
     */
    function deleteRobotData()
    {
        crawlLog("... unlinking robot schedule files ...");

        $robot_schedules = CRAWL_DIR.'/schedules/'.
            self::robot_data_base_name.$this->crawl_time;
        $this->db->unlinkRecursive($robot_schedules, true);

        crawlLog("... resetting robot bloom filters ...");
        $this->web_queue->emptyRobotFilters();

        crawlLog("...Clearing Waiting Hosts");
        $this->waiting_hosts = array();
    }

    /**
     * Checks for a new crawl file or a schedule data for the current crawl and
     * if such a exists then processes its contents adding the relevant urls to
     * the priority queue
     *
     * @return array info array with continue status
     */
    function processQueueUrls()
    {
        crawlLog("Start checking for new URLs data memory usage".
            memory_get_usage());

        $info = array();
        $info[self::STATUS] = self::CONTINUE_STATE;

        if(file_exists(CRAWL_DIR."/schedules/".self::schedule_start_name)) {
            crawlLog(
                "Start schedule urls".CRAWL_DIR.
                    "/schedules/".self::schedule_start_name); 
            $this->processDataArchive(
                CRAWL_DIR."/schedules/".self::schedule_start_name);
            return $info;
        }

        $schedule_dir = 
            CRAWL_DIR."/schedules/".
                self::schedule_data_base_name.$this->crawl_time;
        $this->processDataFile($schedule_dir, "processDataArchive");

        crawlLog("done.");

        return $info;
    }

    /**
     * Process a file of to-crawl urls adding to or adjusting the weight in
     * the PriorityQueue of those which have not been seen. Also 
     * updates the queue with seen url info
     *
     * @param string $file containing serialized to crawl and seen url info
     */
    function processDataArchive($file)
    {
        $sites = & $this->getDataArchiveFileData($file);

        crawlLog("...Updating Delayed Hosts Array ...");
        $start_time = microtime();
        if(isset($sites[self::SCHEDULE_TIME])) {
            if(isset($this->waiting_hosts[$sites[self::SCHEDULE_TIME]])) {
                $delayed_hosts = 
                    $this->waiting_hosts[$sites[self::SCHEDULE_TIME]];
                unset($this->waiting_hosts[$sites[self::SCHEDULE_TIME]]);
                foreach($delayed_hosts as $hash_host) {
                    unset($this->waiting_hosts[$hash_host]); 
                        //allows crawl-delayed host to be scheduled again
                }
            }
        }
        crawlLog(" time: ".(changeInMicrotime($start_time)));

        crawlLog("...Seen Urls ...");
        $start_time = microtime();

        if(isset($sites[self::HASH_SEEN_URLS])) {
            $cnt = 0;
            foreach($sites[self::HASH_SEEN_URLS] as $hash_url) {
                if($this->web_queue->lookupHashTable($hash_url)) {
                    crawlLog("Removing hash ". base64_encode($hash_url).
                        " from Queue");
                    $this->web_queue->removeQueue($hash_url, true);
                }
            }
        }

        crawlLog(" time: ".(changeInMicrotime($start_time)));

        crawlLog("... To Crawl ...");
        $start_time = microtime();
        if(isset($sites[self::TO_CRAWL])) {

            crawlLog("A.. Delete previously seen urls from add set");
            $to_crawl_sites = & $sites[self::TO_CRAWL];
            $this->deleteSeenUrls($to_crawl_sites);
            crawlLog(" time: ".(changeInMicrotime($start_time)));

            crawlLog("B..Insert unseen robots.txt urls;adjust changed weights");
            $start_time = microtime();

            $added_urls = array();
            $added_pairs = array();
            $contains_host = array();
            foreach($to_crawl_sites as $triple) {
                $url = & $triple[0];
                $weight = $triple[1];
                $this->web_queue->addSeenUrlFilter($triple[2]); //add for dedup
                unset($triple[2]); // so triple is now a pair
                $host_url = UrlParser::getHost($url);
                $host_with_robots = $host_url."/robots.txt";
                $robots_in_queue = 
                    $this->web_queue->containsUrlQueue($host_with_robots);

                if($this->web_queue->containsUrlQueue($url)) {

                    if($robots_in_queue) {
                        $this->web_queue->adjustQueueWeight(
                            $host_with_robots, $weight);
                    }
                    $this->web_queue->adjustQueueWeight($url, $weight);
                } else if($this->allowedToCrawlSite($url) && 
                    !$this->disallowedToCrawlSite($url)  ) {

                    if(!$this->web_queue->containsGotRobotTxt($host_url) 
                        && !$robots_in_queue
                        && !isset($added_urls[$host_with_robots])
                        ) {
                            $added_pairs[] = array($host_with_robots, $weight);
                            $added_urls[$host_with_robots] = true;
                    }

                    if(!isset($added_urls[$url])) {
                        $added_pairs[] = $triple; // see comment above
                        $added_urls[$url] = true;
                    }

                }
            }

            crawlLog(" time: ".(changeInMicrotime($start_time)));

            crawlLog("C.. Add urls to queue");
            $start_time = microtime();
            /* 
                 adding urls to queue involves disk contains and adjust do not
                 so group and do last
             */
            $this->web_queue->addUrlsQueue($added_pairs);

        }
        crawlLog(" time: ".(changeInMicrotime($start_time)));

        crawlLog("Done Processing File: $file");

        unlink($file);

    }

    /**
     * Writes status information about the current crawl so that the webserver
     * app can use it for its display. 
     *
     * @param array $sites contains the most recently crawled sites
     */
    function writeCrawlStatus(&$sites)
    {
        $crawl_status = array();
        $stat_file = CRAWL_DIR."/schedules/crawl_status.txt";
        if(file_exists($stat_file) ) {
            $crawl_status = unserialize(file_get_contents($stat_file));
        }
        $crawl_status['MOST_RECENT_FETCHER'] = $this->most_recent_fetcher;
        if(isset($sites[self::RECENT_URLS])) {
            $crawl_status['MOST_RECENT_URLS_SEEN'] = $sites[self::RECENT_URLS]; 
        }
        $crawl_status['CRAWL_TIME'] = $this->crawl_time;
        $info_bundle = IndexArchiveBundle::getArchiveInfo(
            CRAWL_DIR.'/cache/'.self::index_data_base_name.$this->crawl_time);
        $index_archive_info = unserialize($info_bundle['DESCRIPTION']);
        $crawl_status['COUNT'] = $info_bundle['COUNT'];
        $now = time();
        if(count($this->hourly_crawl_data) > 0 ) {
            $least_recent_hourly_pair = array_pop($this->hourly_crawl_data);
            $change_in_time = 
                ($now - $least_recent_hourly_pair[0]);
            if($change_in_time <= 3600) {
                $this->hourly_crawl_data[] = $least_recent_hourly_pair;
            }
        }
        array_unshift($this->hourly_crawl_data, 
            array($now, $info_bundle['VISITED_URLS_COUNT']));
        $crawl_status['VISITED_COUNT_HISTORY'] = $this->hourly_crawl_data;
        $crawl_status['VISITED_URLS_COUNT'] =$info_bundle['VISITED_URLS_COUNT'];
        $crawl_status['DESCRIPTION'] = $index_archive_info['DESCRIPTION'];
        $crawl_status['QUEUE_PEAK_MEMORY'] = memory_get_peak_usage();
        file_put_contents($stat_file, serialize($crawl_status));
        chmod($stat_file, 0777);
        crawlLog(
            "End checking for new URLs data memory usage".memory_get_usage());

        crawlLog(
            "The current crawl description is: ".
                $index_archive_info['DESCRIPTION']);
        crawlLog("Number of unique pages so far: ".
            $info_bundle['VISITED_URLS_COUNT']);
        crawlLog("Total urls extracted so far: ".$info_bundle['COUNT']); 
        if(isset($sites[self::RECENT_URLS])) {
            crawlLog("Of these, the most recent urls are:");
            foreach($sites[self::RECENT_URLS] as $url) {
                crawlLog("URL: $url");
            }
        }
    }

    /**
     * Removes the already seen urls from the supplied array
     *
     * @param array &$sites url data to check if seen
     */
    function deleteSeenUrls(&$sites)
    {
        $this->web_queue->differenceSeenUrls($sites, array(0, 2));
    }

    /**
     * Used to create an encode a string representing with meta info for
     * a fetcher schedule.
     *
     * @param int $schedule_time timestamp of the schedule
     * @return string base64 encoded meta info
     */
    function calculateScheduleMetaInfo($schedule_time)
    {
        $sites = array();
        $sites[self::CRAWL_TIME] = $this->crawl_time;
        $sites[self::SCHEDULE_TIME] = $schedule_time;
        $sites[self::SAVED_CRAWL_TIMES] =  $this->getCrawlTimes(); 
            // fetcher should delete any crawl time not listed here
        $sites[self::CRAWL_ORDER] = $this->crawl_order;
        $sites[self::CRAWL_TYPE] = $this->crawl_type;
        $sites[self::CRAWL_INDEX] = $this->crawl_index;
        $sites[self::META_WORDS] = $this->meta_words;
        $sites[self::INDEXING_PLUGINS] =  $this->indexing_plugins;
        $sites[self::SITES] = array();

        return base64_encode(serialize($sites))."\n";
    }

    /**
     * Produces a schedule.txt file of url data for a fetcher to crawl next.
     *
     * The hard part of scheduling is to make sure that the overall crawl
     * process obeys robots.txt files. This involves checking the url is in
     * an allowed path for that host and it also involves making sure the
     * Crawl-delay directive is respected. The first fetcher that contacts the 
     * server requesting data to crawl will get the schedule.txt
     * produced by produceFetchBatch() at which point it will be unlinked
     * (these latter thing are controlled in FetchController).
     *
     * @see FetchController
     */
    function produceFetchBatch()
    {

        $i = 1; // array implementation of priority queue starts at 1 not 0
        $fetch_size = 0;

        crawlLog("Start Produce Fetch Batch Memory usage".memory_get_usage() );

        $count = $this->web_queue->to_crawl_queue->count;

        $schedule_time = time();
        $first_line = $this->calculateScheduleMetaInfo($schedule_time);

        $sites = array();

        $delete_urls = array();
        $crawl_delay_hosts = array();
        $time_per_request_guess = MINIMUM_FETCH_LOOP_TIME ; 
            // it would be impressive if we can achieve this speed

        $current_crawl_index = -1;

        crawlLog("Trying to Produce Fetch Batch; Queue Size $count");
        $start_time = microtime();

        $fh = $this->web_queue->openUrlArchive();
        /*
            $delete - array of items we will delete from the queue after
                we have selected all of the items for fetch batch
            $sites - array of urls for fetch batch indices in this array we'll
                call slots. Crawled-delayed host urls are spaced by a certain
                number of slots
        */
        while ($i <= $count && $fetch_size < MAX_FETCH_SIZE) {

            //look in queue for url and its weight
            $tmp = $this->web_queue->peekQueue($i, $fh);

            list($url, $weight) = $tmp;

            // if queue error remove entry any loop
            if($tmp === false || strcmp($url, "LOOKUP ERROR") == 0) {
                $delete_urls[$i] = $url;
                crawlLog("Removing lookup error index");
                $i++;
                continue;
            }

            $host_url = UrlParser::getHost($url);

            //if $url is a robots.txt url see if we need to schedule or not
            if(strcmp($host_url."/robots.txt", $url) == 0) {
                if($this->web_queue->containsGotRobotTxt($host_url)) {
                    $delete_urls[$i] = $url;
                    $i++;
                } else {
                    $next_slot = $this->getEarliestSlot($current_crawl_index, 
                        $sites);
                    
                    if($next_slot < MAX_FETCH_SIZE) {
                        $sites[$next_slot] = 
                            array($url, $weight, 0);
                        $delete_urls[$i] = $url;
                        /* note don't add to seen url filter 
                           since check robots every 24 hours as needed
                         */
                        $current_crawl_index = $next_slot;
                        $fetch_size++;
                        $i++;
                      } else { //no more available slots so prepare to bail
                        $i = $count;
                    }
                }
                continue;
            }

            //Now handle the non-robots.txt url case
            $robots_okay = true;

            if($this->web_queue->containsGotRobotTxt($host_url)) {
                $host_paths = UrlParser::getHostPaths($url);

                foreach($host_paths as $host_path) {
                    if($this->web_queue->containsDisallowedRobot($host_path)) {
                        $robots_okay = false;
                        $delete_urls[$i] = $url; 
                            //want to remove from queue since robots forbid it
                        $this->web_queue->addSeenUrlFilter($url); 
                            /* at this point we might miss some sites by marking
                               them seen: the robot url might change in 24 hours
                             */
                        break;
                    }
                }

                if(!$robots_okay) {
                    $i++;
                    continue;
                }

                $delay = $this->web_queue->getCrawlDelay($host_url);
                $num_waiting = count($this->waiting_hosts);

                if($delay > 0 ) { 
                    // handle adding a url if there is a crawl delay
                    if((!isset($this->waiting_hosts[crawlHash($host_url)])
                        && $num_waiting < MAX_WAITING_HOSTS)
                        || (isset($this->waiting_hosts[crawlHash($host_url)]) &&
                            $this->waiting_hosts[crawlHash($host_url) ] == 
                            $schedule_time)) {

                        $this->waiting_hosts[crawlHash($host_url)] = 
                           $schedule_time;
                        $this->waiting_hosts[$schedule_time][] = 
                            crawlHash($host_url);
                        $request_batches_per_delay = 
                            ceil($delay/$time_per_request_guess); 
                        
                        if(!isset($crawl_delay_hosts[$host_url])) {
                            $next_earliest_slot = $current_crawl_index;
                            $crawl_delay_hosts[$host_url] = $next_earliest_slot;
                        } else {
                            $next_earliest_slot = $crawl_delay_hosts[$host_url] 
                                + $request_batches_per_delay
                                * NUM_MULTI_CURL_PAGES;
                        }
                        
                        if(($next_slot = 
                            $this->getEarliestSlot( $next_earliest_slot, 
                                $sites)) < MAX_FETCH_SIZE) {
                            $crawl_delay_hosts[$host_url] = $next_slot;
                            $sites[$next_slot] = 
                                array($url, $weight, $delay);
                            $delete_urls[$i] = $url;
                            $this->web_queue->addSeenUrlFilter($url); 
                                /* we might miss some sites by marking them 
                                   seen after only scheduling them
                                 */

                            $fetch_size++;
                        }
                    }
                } else { // add a url no crawl delay
                    $next_slot = $this->getEarliestSlot(
                        $current_crawl_index, $sites);
                    if($next_slot < MAX_FETCH_SIZE) {
                        $sites[$next_slot] = 
                            array($url, $weight, 0);
                        $delete_urls[$i] = $url;
                        $this->web_queue->addSeenUrlFilter($url);
                            /* we might miss some sites by marking them 
                               seen after only scheduling them
                             */

                        $current_crawl_index = $next_slot;
                        $fetch_size++;
                    } else { //no more available slots so prepare to bail
                        $i = $count;
                    }
                } //if delay else
            } // if containsGotRobotTxt

            // handle robots.txt urls


            $i++;
        } //end while
        $this->web_queue->closeUrlArchive($fh);

        foreach($delete_urls as $delete_url) {
            $this->web_queue->removeQueue($delete_url);
        }

        if(isset($sites) && count($sites) > 0 ) {
            $dummy_slot = array(self::DUMMY, 0.0, 0);
            /* dummy's are used for crawl delays of sites with longer delays
               when we don't have much else to crawl.
             */
            $cnt = 0;
            for($j = 0; $j < MAX_FETCH_SIZE; $j++) {
                if(isset( $sites[$j])) {
                    $cnt++;
                    if($cnt == $fetch_size) {break; }
                } else {
                    if($j % NUM_MULTI_CURL_PAGES == 0) {
                        $sites[$j] = $dummy_slot;
                    }
                }
            }
            ksort($sites);

            //write schedule to disk
            $fh = fopen(CRAWL_DIR.
                "/schedules/".
                self::schedule_name.$this->crawl_time.".txt", "wb");
            fwrite($fh, $first_line);
            foreach($sites as $site) {
                list($url, $weight, $delay) = $site;
                $out_string = base64_encode(
                    packFloat($weight).packInt($delay).$url)."\n";
                fwrite($fh, $out_string);
            }
            fclose($fh);

            crawlLog("End Produce Fetch Memory usage".memory_get_usage() );
            crawlLog("Created fetch batch... Queue size is now ".
                $this->web_queue->to_crawl_queue->count.
                "...Time to create batch: ".
                (changeInMicrotime($start_time)));
        } else {
            crawlLog("No fetch batch created!!");
        }

    }

    /**
     * Gets the first unfilled schedule slot after $index in $arr
     *
     * A schedule of sites for a fetcher to crawl consists of MAX_FETCH_SIZE
     * many slots earch of which could eventually hold url information.
     * This function is used to schedule slots for crawl-delayed host.
     *
     * @param int $index location to begin searching for an empty slot
     * @param array &$arr list of slots to look in
     * @return int index of first available slot
     */
    function getEarliestSlot($index, &$arr)
    {
        $cnt = count($arr);

        for( $i = $index + 1; $i < $cnt; $i++) {
            if(!isset($arr[$i] ) ) {
                break;
            }
        }

        return $i;
    }


    /**
     * Checks if url belongs to a list of sites that are allowed to be 
     * crawled
     *
     * @param string $url url to check
     * @return bool whether is allowed to be crawled or not
     */
    function allowedToCrawlSite($url) 
    {
        $doc_type = UrlParser::getDocumentType($url);
        if(!in_array($doc_type, $this->indexed_file_types)) {
            return false;
        }

        if($this->restrict_sites_by_url) {
           return $this->urlMemberSiteArray($url, $this->allowed_sites);
        }
        return true;
    }

    /**
     * Checks if url belongs to a list of sites that aren't supposed to be 
     * crawled
     *
     * @param string $url url to check
     * @return bool whether is shouldn't be crawled
     */
    function disallowedToCrawlSite($url)
    {
        return $this->urlMemberSiteArray($url, $this->disallowed_sites);
    }

    /**
     * Checks if the url belongs to one of the sites listed in site_array
     * Sites can be either given in the form domain:host or
     * in the form of a url in which case it is check that the site url
     * is a substring of the passed url.
     *
     * @param string $url url to check
     * @param array $site_array sites to check against
     * @return bool whether the url belongs to one of the sites
     */
    function urlMemberSiteArray($url, $site_array)
    {
        $flag = false;
        if(!is_array($site_array)) {return false;}
        foreach($site_array as $site) {
            $site_parts = mb_split("domain:", $site);
            if(isset($site_parts[1]) && 
                mb_strstr(UrlParser::getHost($url), $site_parts[1]) ) {
                $flag = true;
                break;
            }


            if(mb_strstr($url, $site)) { 
                $flag = true;
                break;
            }
        }
        return $flag;
    }

    /**
     * Gets a list of all the timestamps of previously stored crawls
     *
     * @return array list of timestamps
     */
    function getCrawlTimes()
    {
        $list = array();
        $dirs = glob(CRAWL_DIR.'/cache/*', GLOB_ONLYDIR);

        foreach($dirs as $dir) {
            if(strlen($pre_timestamp = strstr($dir, 
                self::index_data_base_name)) > 0) {
                $list[] = substr($pre_timestamp, 
                    strlen(self::index_data_base_name));
            }
        }

        return $list;
    }


}

/*
 *  Instantiate and runs the QueueSever
 */
$queue_server =  new QueueServer($INDEXED_FILE_TYPES);
$queue_server->start();


?>
Return current item: Yioop!