PHP 5 Multi-Tasking Class
Through these tests, I've determined that PHP Multi-Tasking should be used only as a means to process large data sources, and not to decrease execution time. Testing was done with an external data source containing 539,221 email addresses. The external data source was read into an array and split between the number of sockets. Each socket inserted each email address from it's chunk of data into a table containing an ID and arrKey column. Sockets closed their process as their script execution completed.
Average Memory size for initial process was 115M
1 Socket: Failure - Memory failed for data import to socket process. 5 Sockets: Failure - Memory failed for data import to each socket process. 10 Sockets: Started at: 2008-04-24 15:22:58 Ended at: 2008-04-24 15:28:08 Execution Time: 5 minutes 10 seconds 15 Sockets: Started at: 2008-04-24 15:28:56 Ended at: 2008-04-24 15:34:05 Execution Time: 5 minutes 9 seconds 20 Sockets: Started at: 2008-04-24 15:37:01 Ended at: 2008-04-24 15:42:17 Execution Time: 5 minutes 16 seconds
Usage
$MT = new Terminal;
$MT->cmd = 'php /home/jayfortner/webroot/dev/tasking/pointer.php';
$MT->totalSockets = 15;
ini_set("memory_limit","128M");
$file ='http://www.website.com/file.txt';
$handle = fopen($file, "r");
$contents = '';
while (!feof($handle)) {
$contents .= fread($handle, 8192);
}
$replace = array("\r\n", "\r", "\n");
$contents = str_replace($replace, ',', $contents);
$content = explode(',', $contents);
$MT->input = $content;
fclose($handle);
$MT->run();
exit();
PHP 5 Multi-Tasking Class
class Terminal {
var $startTime;
var $endTime;
var $timeout;
var $totalSockets;
var $readBlock;
var $cmd;
var $execute;
var $descriptorspec;
var $sockets = array();
var $streams = array();
var $pipes = array();
var $handles = array();
var $__status = array();
/********************************************************************/
// Initializes Class
// Build default values
function __construct() {
$this->endTime = 0;
$this->timeout = 10;
$this->totalSockets = 4;
$this->readBlock = 1024;
$this->descriptorspec = array(
0 => array("pipe", "r"),
1 => array("pipe", "w"),
2 => array("pipe", "w")
);
} // function
/********************************************************************/
// Run Full Class
// Use this if you want the startTime, endTime, and average loads.
// Otherwise, use process();
function run()
{
$this->startTime = date('Y-m-d H:i:s');
$this->__status['start_loadavg'] = sys_getloadavg();
$this->process();
$this->__status['end_loadavg'] = sys_getloadavg();
$this->endTime = date('Y-m-d H:i:s');
return TRUE;
} // function
/********************************************************************/
// Split input array into sections for each stream
function split_input($i)
{
$input = $this->input;
$sockets = $this->totalSockets;
$offset = ceil(count($input) / $sockets);
$a = $offset * $i;
return array_slice($input, $a, $offset, TRUE);
} // function
/********************************************************************/
// Handle Multi-Tasking
// Run Multiple Tasks from Split Input
function process()
{
for($i=0;$i<=$this->totalSockets-1;$i++) {
$this->execute = '$inputArr = '.var_export($this->split_input($i), TRUE).';';
$this->start_process($i, $this->cmd, $this->execute);
} // for
echo "\r\n".'Waiting on Processes to end...'."\r\n\r\n";
while(count($this->streams)) {
sleep(1);
$read = $this->streams;
foreach($read as $id => $r) {
$this->kill_process($id);
} // foreach
} // while
return TRUE;
} // function
/********************************************************************/
// Handle Script Execution
function start_process($id, $cmd, $execute = null)
{
$this->handles[$id] = $process = proc_open($cmd, $this->descriptorspec, $this->pipes[$id], null, null);
$this->streams[$id] = $this->pipes[$id][1];
stream_set_blocking($this->pipes[$id][2], 0);
if(is_resource($process)) {
echo 'Starting Process '.$id."\t";
if(fwrite($this->pipes[$id][0], $execute)) {
fclose($this->pipes[$id][0]);
echo 'Started.'."\r\n";
} else {
echo 'Failed.'."\r\n";
}// if
return $process;
} // if
return TRUE;
} // function
/********************************************************************/
// Handle Script Completion
function kill_process($id)
{
$status = proc_get_status($this->handles[$id]);
if($status['running'] === false) {
echo 'Closing Process '.$id."\t";
fclose($this->pipes[$id][1]);
fclose($this->pipes[$id][2]);
proc_close($this->handles[$id]);
echo 'Closed.'."\r\n";
unset($this->streams[$id]);
unset($this->pipes[$id]);
unset($this->handles[$id]);
} // if
return TRUE;
} // function
} // class
Pointer File
# read incoming command
if($fh = fopen('php://stdin','rb')) {
$val_in = stream_get_contents($fh);
fclose($fh);
}
# execute incoming command
if($val_in)
eval($val_in);
sleep(1);
# run script for task $inputArr given
$resource = mysql_connect('server', 'username','password');
mysql_select_db('database');
foreach($inputArr as $key => $value) {
$query = "INSERT INTO tasking SET arrKey = '".addslashes($key)."'";
mysql_query($query);
}
mysql_close($resource);
exit();
Share PHP 5 Multi-Tasking Class on Digg, Delicious, and StumbleUpon
Comments
Add your Comment
Search
Latest Additions
- Dynamic Active State for Navigation Elements using jQuery
- W3prodigy.com v2 Updates
- T-Mobile's Hotspot at Home Failure
- Ask Reboot
- Facebook Development
- Spam-Bots Love Digg.com
Popular in PHP
- PHP 5 Multi-Tasking Class
- PHP Form Map Inserts
- Portable PHP Search Query Generator
- BCS: MySQL PHP Class
- PHP Remote Mailer v1
- Forcing a download through PHP
