48 parent::__construct( $numProcs );
58 if ( $this->procsToStart > 0 ) {
59 $status = parent::start();
60 if ( $status ===
'child' ) {
79 for ( $i = 0; $i < $numProcs; $i++ ) {
80 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
84 echo
"Error creating child processes\n";
90 $this->childNumber = $i;
91 $this->input = $sockets[0];
92 $this->output = $sockets[0];
93 fclose( $sockets[1] );
97 $this->children[$pid] =
true;
98 fclose( $sockets[0] );
99 $childSockets[] = $sockets[1];
103 foreach ( $childSockets as $socket ) {
114 while ( !feof( $this->input ) ) {
115 $line = trim( fgets( $this->input ) );
117 [ $id, $data ] = json_decode( $line );
119 fwrite( $this->output, json_encode( [ $id, $result ] ) .
"\n" );
128 while ( !feof( $this->input ) ) {
129 $data = fgets( $this->input );
130 if ( substr( $data, -1 ) ===
"\n" ) {
133 $data = substr( $data, 0, -1 );
135 if ( $data ===
'' ) {
139 fwrite( $this->output,
"$result\n" );
152 $this->nextOutputId = 0;
154 while ( !feof( $this->input ) ) {
155 $data = fgets( $this->input );
159 }
while ( !$sockets );
161 if ( substr( $data, -1 ) ===
"\n" ) {
164 $data = substr( $data, 0, -1 );
166 if ( $data ===
'' ) {
169 $socket = array_pop( $sockets );
170 fwrite( $socket, json_encode( [ $id++, $data ] ) .
"\n" );
189 $write = $except = [];
190 stream_select( $read, $write, $except, $timeout );
191 foreach ( $read as $socket ) {
192 $line = fgets( $socket );
193 [ $id, $data ] = json_decode( trim( $line ) );
194 $this->
receive( (
int)$id, $data );
195 $sockets[] = $socket;
196 $idx = array_search( $socket, $used );
197 unset( $used[$idx] );
206 if ( $id !== $this->nextOutputId ) {
207 $this->delayedOutputData[$id] = $data;
210 fwrite( $this->output, $data .
"\n" );
211 $this->nextOutputId = $id + 1;
212 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
213 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] .
"\n" );
214 unset( $this->delayedOutputData[$this->nextOutputId] );
215 $this->nextOutputId++;
221class_alias( OrderedStreamingForkController::class,
'OrderedStreamingForkController' );
Manage forking inside CLI maintenance scripts.
Apply a transformation to values via a pool of sub processes.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
__construct( $numProcs, $workCallback, $input, $output)
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
consume()
Child worker process.
string[] $delayedOutputData
Int key indicates order, value is data.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
Update the CREDITS list by merging in the list of git commit authors.