PHP 5 Multi-Tasking Class

Through these tests, I've determined that PHP Multi-Tasking should be used only as a means to process large data sources, and not to decrease execution time. Testing was done with an external data source containing 539,221 email addresses. The external data source was read into an array and split between the number of sockets. Each socket inserted each email address from it's chunk of data into a table containing an ID and arrKey column. Sockets closed their process as their script execution completed.

Average Memory size for initial process was 115M

1 Socket:
Failure - Memory failed for data import to socket process.

5 Sockets:
Failure - Memory failed for data import to each socket process.

10 Sockets:
Started at:     2008-04-24 15:22:58
Ended at:       2008-04-24 15:28:08
Execution Time:	5 minutes 10 seconds

15 Sockets:
Started at:     2008-04-24 15:28:56
Ended at:       2008-04-24 15:34:05
Execution Time:	5 minutes 9 seconds

20 Sockets:
Started at:     2008-04-24 15:37:01
Ended at:       2008-04-24 15:42:17
Execution Time:	5 minutes 16 seconds

Usage

$MT = new Terminal;
$MT->cmd = 'php /home/jayfortner/webroot/dev/tasking/pointer.php';
$MT->totalSockets = 15;

ini_set("memory_limit","128M");
$file ='http://www.website.com/file.txt';


$handle = fopen($file, "r");

$contents = '';
while (!feof($handle)) {
  $contents .= fread($handle, 8192);
}

$replace = array("\r\n", "\r", "\n");

$contents = str_replace($replace, ',', $contents);

$content = explode(',', $contents);

$MT->input = $content;

fclose($handle);

$MT->run();
exit();

PHP 5 Multi-Tasking Class

class Terminal {

	var $startTime;
	var $endTime;
	var $timeout;
	var $totalSockets;
	var $readBlock;
	var $cmd;
	var $execute;
	var $descriptorspec;
	var $sockets = array();
	var $streams = array();
	var $pipes = array();
	var $handles = array();
	var $__status = array();
	
	/********************************************************************/
	// Initializes Class

	// Build default values

	function __construct() {
		$this->endTime = 0;
		$this->timeout = 10;
		$this->totalSockets = 4;
		$this->readBlock = 1024;
		$this->descriptorspec = array(
			0 => array("pipe", "r"),
			1 => array("pipe", "w"),
			2 => array("pipe", "w")
			);
	} // function

	
	/********************************************************************/
	// Run Full Class

	// Use this if you want the startTime, endTime, and average loads.

	// Otherwise, use process();

	function run()
	{
		$this->startTime = date('Y-m-d H:i:s');
		$this->__status['start_loadavg'] = sys_getloadavg();
		$this->process();
		$this->__status['end_loadavg'] = sys_getloadavg();
		$this->endTime = date('Y-m-d H:i:s');
		return TRUE;
	} // function

	
	/********************************************************************/
	// Split input array into sections for each stream

	function split_input($i)
	{
		$input = $this->input;
		$sockets = $this->totalSockets;
		$offset = ceil(count($input) / $sockets);
		$a = $offset * $i;
		return array_slice($input, $a, $offset, TRUE);
	} // function

	
	/********************************************************************/
	// Handle Multi-Tasking

	// Run Multiple Tasks from Split Input

	function process()
	{
		
		for($i=0;$i<=$this->totalSockets-1;$i++) {
			$this->execute = '$inputArr = '.var_export($this->split_input($i), TRUE).';';
			
			$this->start_process($i, $this->cmd, $this->execute);
		} // for


		echo "\r\n".'Waiting on Processes to end...'."\r\n\r\n";

		while(count($this->streams)) {
			sleep(1);
			
			$read = $this->streams;
			foreach($read as $id => $r) {
				$this->kill_process($id);
			} // foreach

		} // while

		return TRUE;
	} // function

	
	/********************************************************************/
	// Handle Script Execution

	function start_process($id, $cmd, $execute = null)
	{		
	  $this->handles[$id] = $process = proc_open($cmd, $this->descriptorspec, $this->pipes[$id], null, null);
		$this->streams[$id] = $this->pipes[$id][1];
		stream_set_blocking($this->pipes[$id][2], 0);
		if(is_resource($process)) {
			echo 'Starting Process '.$id."\t";
			
			if(fwrite($this->pipes[$id][0], $execute)) {
				fclose($this->pipes[$id][0]);
				echo 'Started.'."\r\n";
			} else {
				echo 'Failed.'."\r\n";
			}// if

			
			return $process;
		} // if

		return TRUE;
	} // function

	
	/********************************************************************/
	// Handle Script Completion

	function kill_process($id)
	{		
		$status = proc_get_status($this->handles[$id]);
		if($status['running'] === false) {
			echo 'Closing Process '.$id."\t";
		
			fclose($this->pipes[$id][1]);
			fclose($this->pipes[$id][2]);
		
			proc_close($this->handles[$id]);
		
			echo 'Closed.'."\r\n";
	
			unset($this->streams[$id]);
			unset($this->pipes[$id]);
			unset($this->handles[$id]);
		} // if

		return TRUE;
	} // function


} // class

Pointer File

  # read incoming command
  if($fh = fopen('php://stdin','rb')) {

	$val_in = stream_get_contents($fh);
    fclose($fh);
  }

  # execute incoming command

  if($val_in)
		eval($val_in);

sleep(1);

# run script for task $inputArr given
	$resource = mysql_connect('server', 'username','password');
	mysql_select_db('database');
	foreach($inputArr as $key => $value) {
		$query = "INSERT INTO tasking SET arrKey = '".addslashes($key)."'";
		mysql_query($query);
	}
	mysql_close($resource);
	
	exit();

Share PHP 5 Multi-Tasking Class on Digg, Delicious, and StumbleUpon

Comments

Add your Comment



Search