52 parent::__construct( $numProcs );
62 if ( $this->procsToStart > 0 ) {
63 $status = parent::start();
64 if ( $status ===
'child' ) {
83 for ( $i = 0; $i < $numProcs; $i++ ) {
84 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
87 if ( $pid === -1 || $pid ===
false ) {
88 echo
"Error creating child processes\n";
94 $this->childNumber = $i;
95 $this->input = $sockets[0];
96 $this->output = $sockets[0];
97 fclose( $sockets[1] );
101 $this->children[$pid] =
true;
102 fclose( $sockets[0] );
103 $childSockets[] = $sockets[1];
107 foreach ( $childSockets as $socket ) {
110 if ( is_resource( $socket ) ) {
122 while ( !feof( $this->input ) ) {
123 $line = trim( fgets( $this->input ) );
125 list( $id, $data ) = json_decode(
$line );
126 $result = call_user_func( $this->workCallback, $data );
127 fwrite( $this->output, json_encode( [ $id, $result ] ) .
"\n" );
136 while ( !feof( $this->input ) ) {
137 $data = fgets( $this->input );
138 if ( substr( $data, -1 ) ===
"\n" ) {
141 $data = substr( $data, 0, -1 );
143 if ( $data ===
'' ) {
146 $result = call_user_func( $this->workCallback, $data );
147 fwrite( $this->output,
"$result\n" );
160 $this->nextOutputId = 0;
162 while ( !feof( $this->input ) ) {
163 $data = fgets( $this->input );
167 }
while ( !$sockets );
169 if ( substr( $data, - 1 ) ===
"\n" ) {
172 $data = substr( $data, 0, -1 );
174 if ( $data ===
'' ) {
177 $socket = array_pop( $sockets );
178 fwrite( $socket, json_encode( [ $id++, $data ] ) .
"\n" );
197 $write = $except = [];
198 stream_select( $read, $write, $except, $timeout );
199 foreach ( $read as $socket ) {
200 $line = fgets( $socket );
201 list( $id, $data ) = json_decode( trim(
$line ) );
202 $this->
receive( (
int)$id, $data );
203 $sockets[] = $socket;
204 $idx = array_search( $socket, $used );
205 unset( $used[$idx] );
214 if ( $id !== $this->nextOutputId ) {
215 $this->delayedOutputData[$id] = $data;
218 fwrite( $this->output, $data .
"\n" );
219 $this->nextOutputId = $id + 1;
220 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
221 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] .
"\n" );
222 unset( $this->delayedOutputData[$this->nextOutputId] );
223 $this->nextOutputId++;
Class for managing forking command line scripts.
Reads lines of work from an input stream and farms them out to multiple child streams.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
string[] $delayedOutputData
Int key indicates order, value is data.
consume()
Child worker process.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
__construct( $numProcs, $workCallback, $input, $output)