62 parent::__construct( $numProcs );
72 if ( $this->procsToStart > 0 ) {
73 $status = parent::start();
74 if ( $status ===
'child' ) {
93 for ( $i = 0; $i < $numProcs; $i++ ) {
94 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
98 echo
"Error creating child processes\n";
104 $this->childNumber = $i;
105 $this->input = $sockets[0];
106 $this->output = $sockets[0];
107 fclose( $sockets[1] );
111 $this->children[$pid] =
true;
112 fclose( $sockets[0] );
113 $childSockets[] = $sockets[1];
117 foreach ( $childSockets as $socket ) {
128 while ( !feof( $this->input ) ) {
129 $line = trim( fgets( $this->input ) );
131 [ $id, $data ] = json_decode( $line );
132 $result = call_user_func( $this->workCallback, $data );
133 fwrite( $this->output, json_encode( [ $id, $result ] ) .
"\n" );
142 while ( !feof( $this->input ) ) {
143 $data = fgets( $this->input );
144 if ( substr( $data, -1 ) ===
"\n" ) {
147 $data = substr( $data, 0, -1 );
149 if ( $data ===
'' ) {
152 $result = call_user_func( $this->workCallback, $data );
153 fwrite( $this->output,
"$result\n" );
166 $this->nextOutputId = 0;
168 while ( !feof( $this->input ) ) {
169 $data = fgets( $this->input );
173 }
while ( !$sockets );
175 if ( substr( $data, -1 ) ===
"\n" ) {
178 $data = substr( $data, 0, -1 );
180 if ( $data ===
'' ) {
183 $socket = array_pop( $sockets );
184 fwrite( $socket, json_encode( [ $id++, $data ] ) .
"\n" );
203 $write = $except = [];
204 stream_select( $read, $write, $except, $timeout );
205 foreach ( $read as $socket ) {
206 $line = fgets( $socket );
207 [ $id, $data ] = json_decode( trim( $line ) );
208 $this->
receive( (
int)$id, $data );
209 $sockets[] = $socket;
210 $idx = array_search( $socket, $used );
211 unset( $used[$idx] );
220 if ( $id !== $this->nextOutputId ) {
221 $this->delayedOutputData[$id] = $data;
224 fwrite( $this->output, $data .
"\n" );
225 $this->nextOutputId = $id + 1;
226 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
227 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] .
"\n" );
228 unset( $this->delayedOutputData[$this->nextOutputId] );
229 $this->nextOutputId++;
234class_alias( OrderedStreamingForkController::class,
'OrderedStreamingForkController' );
Manage forking inside CLI maintenance scripts.
Apply a transformation to values via a pool of sub processes.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
__construct( $numProcs, $workCallback, $input, $output)
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
consume()
Child worker process.
string[] $delayedOutputData
Int key indicates order, value is data.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
consumeNoFork()
Special cased version of self::consume() when no forking occurs.