52 parent::__construct( $numProcs );
62 if ( $this->procsToStart > 0 ) {
63 $status = parent::start();
64 if ( $status ===
'child' ) {
83 for ( $i = 0; $i < $numProcs; $i++ ) {
84 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
87 if ( $pid === -1 || $pid ===
false ) {
88 echo
"Error creating child processes\n";
94 $this->childNumber = $i;
95 $this->input = $sockets[0];
96 $this->output = $sockets[0];
97 fclose( $sockets[1] );
101 $this->children[$pid] =
true;
102 fclose( $sockets[0] );
103 $childSockets[] = $sockets[1];
107 foreach ( $childSockets as $socket ) {
118 while ( !feof( $this->input ) ) {
119 $line = trim( fgets( $this->input ) );
121 list( $id, $data ) = json_decode(
$line );
122 $result = call_user_func( $this->workCallback, $data );
123 fwrite( $this->output, json_encode( [ $id, $result ] ) .
"\n" );
132 while ( !feof( $this->input ) ) {
133 $data = fgets( $this->input );
134 if ( substr( $data, -1 ) ===
"\n" ) {
137 $data = substr( $data, 0, -1 );
139 if ( $data ===
'' ) {
142 $result = call_user_func( $this->workCallback, $data );
143 fwrite( $this->output,
"$result\n" );
156 $this->nextOutputId = 0;
158 while ( !feof( $this->input ) ) {
159 $data = fgets( $this->input );
163 }
while ( !$sockets );
165 if ( substr( $data, -1 ) ===
"\n" ) {
168 $data = substr( $data, 0, -1 );
170 if ( $data ===
'' ) {
173 $socket = array_pop( $sockets );
174 fwrite( $socket, json_encode( [ $id++, $data ] ) .
"\n" );
193 $write = $except = [];
194 stream_select( $read, $write, $except, $timeout );
195 foreach ( $read as $socket ) {
196 $line = fgets( $socket );
197 list( $id, $data ) = json_decode( trim(
$line ) );
198 $this->
receive( (
int)$id, $data );
199 $sockets[] = $socket;
200 $idx = array_search( $socket, $used );
201 unset( $used[$idx] );
210 if ( $id !== $this->nextOutputId ) {
211 $this->delayedOutputData[$id] = $data;
214 fwrite( $this->output, $data .
"\n" );
215 $this->nextOutputId = $id + 1;
216 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
217 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] .
"\n" );
218 unset( $this->delayedOutputData[$this->nextOutputId] );
219 $this->nextOutputId++;
Class for managing forking command line scripts.
Reads lines of work from an input stream and farms them out to multiple child streams.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
string[] $delayedOutputData
Int key indicates order, value is data.
consume()
Child worker process.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
__construct( $numProcs, $workCallback, $input, $output)