MediaWiki  master
OrderedStreamingForkController.php
Go to the documentation of this file.
1 <?php
2 
34  protected $workCallback;
36  protected $input;
38  protected $output;
40  protected $nextOutputId;
42  protected $delayedOutputData = [];
43 
51  public function __construct( $numProcs, $workCallback, $input, $output ) {
52  parent::__construct( $numProcs );
53  $this->workCallback = $workCallback;
54  $this->input = $input;
55  $this->output = $output;
56  }
57 
61  public function start() {
62  if ( $this->procsToStart > 0 ) {
63  $status = parent::start();
64  if ( $status === 'child' ) {
65  $this->consume();
66  }
67  } else {
68  $status = 'parent';
69  $this->consumeNoFork();
70  }
71  return $status;
72  }
73 
78  protected function forkWorkers( $numProcs ) {
79  $this->prepareEnvironment();
80 
81  $childSockets = [];
82  // Create the child processes
83  for ( $i = 0; $i < $numProcs; $i++ ) {
84  $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
85  // Do the fork
86  $pid = pcntl_fork();
87  if ( $pid === -1 || $pid === false ) {
88  echo "Error creating child processes\n";
89  exit( 1 );
90  }
91 
92  if ( !$pid ) {
93  $this->initChild();
94  $this->childNumber = $i;
95  $this->input = $sockets[0];
96  $this->output = $sockets[0];
97  fclose( $sockets[1] );
98  return 'child';
99  } else {
100  // This is the parent process
101  $this->children[$pid] = true;
102  fclose( $sockets[0] );
103  $childSockets[] = $sockets[1];
104  }
105  }
106  $this->feedChildren( $childSockets );
107  foreach ( $childSockets as $socket ) {
108  fclose( $socket );
109  }
110  return 'parent';
111  }
112 
117  protected function consume() {
118  while ( !feof( $this->input ) ) {
119  $line = trim( fgets( $this->input ) );
120  if ( $line ) {
121  list( $id, $data ) = json_decode( $line );
122  $result = call_user_func( $this->workCallback, $data );
123  fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
124  }
125  }
126  }
127 
131  protected function consumeNoFork() {
132  while ( !feof( $this->input ) ) {
133  $data = fgets( $this->input );
134  if ( substr( $data, -1 ) === "\n" ) {
135  // Strip any final new line used to delimit lines of input.
136  // The last line of input might not have it, though.
137  $data = substr( $data, 0, -1 );
138  }
139  if ( $data === '' ) {
140  continue;
141  }
142  $result = call_user_func( $this->workCallback, $data );
143  fwrite( $this->output, "$result\n" );
144  }
145  }
146 
153  protected function feedChildren( array $sockets ) {
154  $used = [];
155  $id = 0;
156  $this->nextOutputId = 0;
157 
158  while ( !feof( $this->input ) ) {
159  $data = fgets( $this->input );
160  if ( $used ) {
161  do {
162  $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
163  } while ( !$sockets );
164  }
165  if ( substr( $data, -1 ) === "\n" ) {
166  // Strip any final new line used to delimit lines of input.
167  // The last line of input might not have it, though.
168  $data = substr( $data, 0, -1 );
169  }
170  if ( $data === '' ) {
171  continue;
172  }
173  $socket = array_pop( $sockets );
174  fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
175  $used[] = $socket;
176  }
177  while ( $used ) {
178  $this->updateAvailableSockets( $sockets, $used, 5 );
179  }
180  }
181 
191  protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
192  $read = $used;
193  $write = $except = [];
194  stream_select( $read, $write, $except, $timeout );
195  foreach ( $read as $socket ) {
196  $line = fgets( $socket );
197  list( $id, $data ) = json_decode( trim( $line ) );
198  $this->receive( (int)$id, $data );
199  $sockets[] = $socket;
200  $idx = array_search( $socket, $used );
201  unset( $used[$idx] );
202  }
203  }
204 
209  protected function receive( $id, $data ) {
210  if ( $id !== $this->nextOutputId ) {
211  $this->delayedOutputData[$id] = $data;
212  return;
213  }
214  fwrite( $this->output, $data . "\n" );
215  $this->nextOutputId = $id + 1;
216  while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
217  fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
218  unset( $this->delayedOutputData[$this->nextOutputId] );
219  $this->nextOutputId++;
220  }
221  }
222 }
Class for managing forking command line scripts.
Reads lines of work from an input stream and farms them out to multiple child streams.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
string[] $delayedOutputData
Int key indicates order, value is data.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
__construct( $numProcs, $workCallback, $input, $output)
$line
Definition: mcc.php:119