MediaWiki  1.34.0
OrderedStreamingForkController.php
Go to the documentation of this file.
1 <?php
2 
34  protected $workCallback;
36  protected $input;
38  protected $output;
40  protected $nextOutputId;
42  protected $delayedOutputData = [];
43 
51  public function __construct( $numProcs, $workCallback, $input, $output ) {
52  parent::__construct( $numProcs );
53  $this->workCallback = $workCallback;
54  $this->input = $input;
55  $this->output = $output;
56  }
57 
61  public function start() {
62  if ( $this->procsToStart > 0 ) {
63  $status = parent::start();
64  if ( $status === 'child' ) {
65  $this->consume();
66  }
67  } else {
68  $status = 'parent';
69  $this->consumeNoFork();
70  }
71  return $status;
72  }
73 
78  protected function forkWorkers( $numProcs ) {
79  $this->prepareEnvironment();
80 
81  $childSockets = [];
82  // Create the child processes
83  for ( $i = 0; $i < $numProcs; $i++ ) {
84  $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
85  // Do the fork
86  $pid = pcntl_fork();
87  if ( $pid === -1 || $pid === false ) {
88  echo "Error creating child processes\n";
89  exit( 1 );
90  }
91 
92  if ( !$pid ) {
93  $this->initChild();
94  $this->childNumber = $i;
95  $this->input = $sockets[0];
96  $this->output = $sockets[0];
97  fclose( $sockets[1] );
98  return 'child';
99  } else {
100  // This is the parent process
101  $this->children[$pid] = true;
102  fclose( $sockets[0] );
103  $childSockets[] = $sockets[1];
104  }
105  }
106  $this->feedChildren( $childSockets );
107  foreach ( $childSockets as $socket ) {
108  // if a child has already shutdown the sockets will be closed,
109  // closing a second time would raise a warning.
110  if ( is_resource( $socket ) ) {
111  fclose( $socket );
112  }
113  }
114  return 'parent';
115  }
116 
121  protected function consume() {
122  while ( !feof( $this->input ) ) {
123  $line = trim( fgets( $this->input ) );
124  if ( $line ) {
125  list( $id, $data ) = json_decode( $line );
126  $result = call_user_func( $this->workCallback, $data );
127  fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
128  }
129  }
130  }
131 
135  protected function consumeNoFork() {
136  while ( !feof( $this->input ) ) {
137  $data = fgets( $this->input );
138  if ( substr( $data, -1 ) === "\n" ) {
139  // Strip any final new line used to delimit lines of input.
140  // The last line of input might not have it, though.
141  $data = substr( $data, 0, -1 );
142  }
143  if ( $data === '' ) {
144  continue;
145  }
146  $result = call_user_func( $this->workCallback, $data );
147  fwrite( $this->output, "$result\n" );
148  }
149  }
150 
157  protected function feedChildren( array $sockets ) {
158  $used = [];
159  $id = 0;
160  $this->nextOutputId = 0;
161 
162  while ( !feof( $this->input ) ) {
163  $data = fgets( $this->input );
164  if ( $used ) {
165  do {
166  $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
167  } while ( !$sockets );
168  }
169  if ( substr( $data, - 1 ) === "\n" ) {
170  // Strip any final new line used to delimit lines of input.
171  // The last line of input might not have it, though.
172  $data = substr( $data, 0, -1 );
173  }
174  if ( $data === '' ) {
175  continue;
176  }
177  $socket = array_pop( $sockets );
178  fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
179  $used[] = $socket;
180  }
181  while ( $used ) {
182  $this->updateAvailableSockets( $sockets, $used, 5 );
183  }
184  }
185 
195  protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
196  $read = $used;
197  $write = $except = [];
198  stream_select( $read, $write, $except, $timeout );
199  foreach ( $read as $socket ) {
200  $line = fgets( $socket );
201  list( $id, $data ) = json_decode( trim( $line ) );
202  $this->receive( (int)$id, $data );
203  $sockets[] = $socket;
204  $idx = array_search( $socket, $used );
205  unset( $used[$idx] );
206  }
207  }
208 
213  protected function receive( $id, $data ) {
214  if ( $id !== $this->nextOutputId ) {
215  $this->delayedOutputData[$id] = $data;
216  return;
217  }
218  fwrite( $this->output, $data . "\n" );
219  $this->nextOutputId = $id + 1;
220  while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
221  fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
222  unset( $this->delayedOutputData[$this->nextOutputId] );
223  $this->nextOutputId++;
224  }
225  }
226 }
ForkController
Class for managing forking command line scripts.
Definition: ForkController.php:33
OrderedStreamingForkController\$input
resource $input
Definition: OrderedStreamingForkController.php:36
OrderedStreamingForkController\$delayedOutputData
string[] $delayedOutputData
Int key indicates order, value is data.
Definition: OrderedStreamingForkController.php:42
OrderedStreamingForkController\$output
resource $output
Definition: OrderedStreamingForkController.php:38
OrderedStreamingForkController\consume
consume()
Child worker process.
Definition: OrderedStreamingForkController.php:121
OrderedStreamingForkController\consumeNoFork
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
Definition: OrderedStreamingForkController.php:135
OrderedStreamingForkController\__construct
__construct( $numProcs, $workCallback, $input, $output)
Definition: OrderedStreamingForkController.php:51
OrderedStreamingForkController\receive
receive( $id, $data)
Definition: OrderedStreamingForkController.php:213
OrderedStreamingForkController\feedChildren
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
Definition: OrderedStreamingForkController.php:157
OrderedStreamingForkController\$workCallback
callable $workCallback
Definition: OrderedStreamingForkController.php:34
OrderedStreamingForkController\start
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
Definition: OrderedStreamingForkController.php:61
$line
$line
Definition: cdb.php:59
OrderedStreamingForkController\forkWorkers
forkWorkers( $numProcs)
Definition: OrderedStreamingForkController.php:78
OrderedStreamingForkController\$nextOutputId
int $nextOutputId
Definition: OrderedStreamingForkController.php:40
$status
return $status
Definition: SyntaxHighlight.php:347
ForkController\prepareEnvironment
prepareEnvironment()
Definition: ForkController.php:152
OrderedStreamingForkController
Reads lines of work from an input stream and farms them out to multiple child streams.
Definition: OrderedStreamingForkController.php:32
OrderedStreamingForkController\updateAvailableSockets
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
Definition: OrderedStreamingForkController.php:195
ForkController\initChild
initChild()
Definition: ForkController.php:194