MediaWiki master
OrderedStreamingForkController.php
Go to the documentation of this file.
1<?php
21namespace MediaWiki\Maintenance;
22
44 protected $workCallback;
46 protected $input;
48 protected $output;
50 protected $nextOutputId;
52 protected $delayedOutputData = [];
53
61 public function __construct( $numProcs, $workCallback, $input, $output ) {
62 parent::__construct( $numProcs );
63 $this->workCallback = $workCallback;
64 $this->input = $input;
65 $this->output = $output;
66 }
67
71 public function start() {
72 if ( $this->procsToStart > 0 ) {
73 $status = parent::start();
74 if ( $status === 'child' ) {
75 $this->consume();
76 }
77 } else {
78 $status = 'parent';
79 $this->consumeNoFork();
80 }
81 return $status;
82 }
83
88 protected function forkWorkers( $numProcs ) {
89 $this->prepareEnvironment();
90
91 $childSockets = [];
92 // Create the child processes
93 for ( $i = 0; $i < $numProcs; $i++ ) {
94 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
95 // Do the fork
96 $pid = pcntl_fork();
97 if ( $pid === -1 ) {
98 echo "Error creating child processes\n";
99 exit( 1 );
100 }
101
102 if ( !$pid ) {
103 $this->initChild();
104 $this->childNumber = $i;
105 $this->input = $sockets[0];
106 $this->output = $sockets[0];
107 fclose( $sockets[1] );
108 return 'child';
109 } else {
110 // This is the parent process
111 $this->children[$pid] = true;
112 fclose( $sockets[0] );
113 $childSockets[] = $sockets[1];
114 }
115 }
116 $this->feedChildren( $childSockets );
117 foreach ( $childSockets as $socket ) {
118 fclose( $socket );
119 }
120 return 'parent';
121 }
122
127 protected function consume() {
128 while ( !feof( $this->input ) ) {
129 $line = trim( fgets( $this->input ) );
130 if ( $line ) {
131 [ $id, $data ] = json_decode( $line );
132 $result = call_user_func( $this->workCallback, $data );
133 fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
134 }
135 }
136 }
137
141 protected function consumeNoFork() {
142 while ( !feof( $this->input ) ) {
143 $data = fgets( $this->input );
144 if ( substr( $data, -1 ) === "\n" ) {
145 // Strip any final new line used to delimit lines of input.
146 // The last line of input might not have it, though.
147 $data = substr( $data, 0, -1 );
148 }
149 if ( $data === '' ) {
150 continue;
151 }
152 $result = call_user_func( $this->workCallback, $data );
153 fwrite( $this->output, "$result\n" );
154 }
155 }
156
163 protected function feedChildren( array $sockets ) {
164 $used = [];
165 $id = 0;
166 $this->nextOutputId = 0;
167
168 while ( !feof( $this->input ) ) {
169 $data = fgets( $this->input );
170 if ( $used ) {
171 do {
172 $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
173 } while ( !$sockets );
174 }
175 if ( substr( $data, -1 ) === "\n" ) {
176 // Strip any final new line used to delimit lines of input.
177 // The last line of input might not have it, though.
178 $data = substr( $data, 0, -1 );
179 }
180 if ( $data === '' ) {
181 continue;
182 }
183 $socket = array_pop( $sockets );
184 fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
185 $used[] = $socket;
186 }
187 while ( $used ) {
188 $this->updateAvailableSockets( $sockets, $used, 5 );
189 }
190 }
191
201 protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
202 $read = $used;
203 $write = $except = [];
204 stream_select( $read, $write, $except, $timeout );
205 foreach ( $read as $socket ) {
206 $line = fgets( $socket );
207 [ $id, $data ] = json_decode( trim( $line ) );
208 $this->receive( (int)$id, $data );
209 $sockets[] = $socket;
210 $idx = array_search( $socket, $used );
211 unset( $used[$idx] );
212 }
213 }
214
219 protected function receive( $id, $data ) {
220 if ( $id !== $this->nextOutputId ) {
221 $this->delayedOutputData[$id] = $data;
222 return;
223 }
224 fwrite( $this->output, $data . "\n" );
225 $this->nextOutputId = $id + 1;
226 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
227 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
228 unset( $this->delayedOutputData[$this->nextOutputId] );
229 $this->nextOutputId++;
230 }
231 }
232}
233
235class_alias( OrderedStreamingForkController::class, 'OrderedStreamingForkController' );
Manage forking inside CLI maintenance scripts.
Apply a transformation to values via a pool of sub processes.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
string[] $delayedOutputData
Int key indicates order, value is data.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
consumeNoFork()
Special cased version of self::consume() when no forking occurs.