Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 84 |
|
0.00% |
0 / 8 |
CRAP | |
0.00% |
0 / 1 |
OrderedStreamingForkController | |
0.00% |
0 / 83 |
|
0.00% |
0 / 8 |
812 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
start | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
forkWorkers | |
0.00% |
0 / 22 |
|
0.00% |
0 / 1 |
30 | |||
consume | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
12 | |||
consumeNoFork | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
20 | |||
feedChildren | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
56 | |||
updateAvailableSockets | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
6 | |||
receive | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 |
1 | <?php |
2 | /** |
3 | * This program is free software; you can redistribute it and/or modify |
4 | * it under the terms of the GNU General Public License as published by |
5 | * the Free Software Foundation; either version 2 of the License, or |
6 | * (at your option) any later version. |
7 | * |
8 | * This program is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | * GNU General Public License for more details. |
12 | * |
13 | * You should have received a copy of the GNU General Public License along |
14 | * with this program; if not, write to the Free Software Foundation, Inc., |
15 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
16 | * http://www.gnu.org/copyleft/gpl.html |
17 | * |
18 | * @file |
19 | */ |
20 | |
21 | namespace MediaWiki\Maintenance; |
22 | |
23 | /** |
24 | * Apply a transformation to values via a pool of sub processes. |
25 | * |
26 | * The controller reads lines from a given input stream, where each line |
27 | * describes work to be done. This work is then farmed out to multiple |
28 | * child streams that correspond to child procesess. Each child has exactly |
29 | * one piece of work in-flight at a given moment. The result of each work |
30 | * is written to an output stream. |
31 | * |
32 | * If numProcs is zero, the fallback is to perform work in-process instead. |
33 | * |
34 | * This class guarantees that the output is produced in the same exact order |
35 | * as input values were. |
36 | * |
37 | * Currently used by CirrusSearch extension to implement CLI search script. |
38 | * |
39 | * @ingroup Maintenance |
40 | * @since 1.30 |
41 | */ |
42 | class OrderedStreamingForkController extends ForkController { |
43 | /** @var callable */ |
44 | protected $workCallback; |
45 | /** @var resource */ |
46 | protected $input; |
47 | /** @var resource */ |
48 | protected $output; |
49 | /** @var int */ |
50 | protected $nextOutputId; |
51 | /** @var string[] Int key indicates order, value is data */ |
52 | protected $delayedOutputData = []; |
53 | |
54 | /** |
55 | * @param int $numProcs The number of worker processes to fork |
56 | * @param callable $workCallback A callback to call in the child process |
57 | * once for each line of work to process. |
58 | * @param resource $input A socket to read work lines from |
59 | * @param resource $output A socket to write the result of work to. |
60 | */ |
61 | public function __construct( $numProcs, $workCallback, $input, $output ) { |
62 | parent::__construct( $numProcs ); |
63 | $this->workCallback = $workCallback; |
64 | $this->input = $input; |
65 | $this->output = $output; |
66 | } |
67 | |
68 | /** |
69 | * @inheritDoc |
70 | */ |
71 | public function start() { |
72 | if ( $this->procsToStart > 0 ) { |
73 | $status = parent::start(); |
74 | if ( $status === 'child' ) { |
75 | $this->consume(); |
76 | } |
77 | } else { |
78 | $status = 'parent'; |
79 | $this->consumeNoFork(); |
80 | } |
81 | return $status; |
82 | } |
83 | |
84 | /** |
85 | * @param int $numProcs |
86 | * @return string |
87 | */ |
88 | protected function forkWorkers( $numProcs ) { |
89 | $this->prepareEnvironment(); |
90 | |
91 | $childSockets = []; |
92 | // Create the child processes |
93 | for ( $i = 0; $i < $numProcs; $i++ ) { |
94 | $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP ); |
95 | // Do the fork |
96 | $pid = pcntl_fork(); |
97 | if ( $pid === -1 ) { |
98 | echo "Error creating child processes\n"; |
99 | exit( 1 ); |
100 | } |
101 | |
102 | if ( !$pid ) { |
103 | $this->initChild(); |
104 | $this->childNumber = $i; |
105 | $this->input = $sockets[0]; |
106 | $this->output = $sockets[0]; |
107 | fclose( $sockets[1] ); |
108 | return 'child'; |
109 | } else { |
110 | // This is the parent process |
111 | $this->children[$pid] = true; |
112 | fclose( $sockets[0] ); |
113 | $childSockets[] = $sockets[1]; |
114 | } |
115 | } |
116 | $this->feedChildren( $childSockets ); |
117 | foreach ( $childSockets as $socket ) { |
118 | fclose( $socket ); |
119 | } |
120 | return 'parent'; |
121 | } |
122 | |
123 | /** |
124 | * Child worker process. Reads work from $this->input and writes the |
125 | * result of that work to $this->output when completed. |
126 | */ |
127 | protected function consume() { |
128 | while ( !feof( $this->input ) ) { |
129 | $line = trim( fgets( $this->input ) ); |
130 | if ( $line ) { |
131 | [ $id, $data ] = json_decode( $line ); |
132 | $result = call_user_func( $this->workCallback, $data ); |
133 | fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" ); |
134 | } |
135 | } |
136 | } |
137 | |
138 | /** |
139 | * Special cased version of self::consume() when no forking occurs |
140 | */ |
141 | protected function consumeNoFork() { |
142 | while ( !feof( $this->input ) ) { |
143 | $data = fgets( $this->input ); |
144 | if ( substr( $data, -1 ) === "\n" ) { |
145 | // Strip any final new line used to delimit lines of input. |
146 | // The last line of input might not have it, though. |
147 | $data = substr( $data, 0, -1 ); |
148 | } |
149 | if ( $data === '' ) { |
150 | continue; |
151 | } |
152 | $result = call_user_func( $this->workCallback, $data ); |
153 | fwrite( $this->output, "$result\n" ); |
154 | } |
155 | } |
156 | |
157 | /** |
158 | * Reads lines of work from $this->input and farms them out to |
159 | * the provided socket. |
160 | * |
161 | * @param resource[] $sockets |
162 | */ |
163 | protected function feedChildren( array $sockets ) { |
164 | $used = []; |
165 | $id = 0; |
166 | $this->nextOutputId = 0; |
167 | |
168 | while ( !feof( $this->input ) ) { |
169 | $data = fgets( $this->input ); |
170 | if ( $used ) { |
171 | do { |
172 | $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 ); |
173 | } while ( !$sockets ); |
174 | } |
175 | if ( substr( $data, -1 ) === "\n" ) { |
176 | // Strip any final new line used to delimit lines of input. |
177 | // The last line of input might not have it, though. |
178 | $data = substr( $data, 0, -1 ); |
179 | } |
180 | if ( $data === '' ) { |
181 | continue; |
182 | } |
183 | $socket = array_pop( $sockets ); |
184 | fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" ); |
185 | $used[] = $socket; |
186 | } |
187 | while ( $used ) { |
188 | $this->updateAvailableSockets( $sockets, $used, 5 ); |
189 | } |
190 | } |
191 | |
192 | /** |
193 | * Moves sockets from $used to $sockets when they are available |
194 | * for more work |
195 | * |
196 | * @param resource[] &$sockets List of sockets that are waiting for work |
197 | * @param resource[] &$used List of sockets currently performing work |
198 | * @param int $timeout The number of seconds to block waiting. 0 for |
199 | * non-blocking operation. |
200 | */ |
201 | protected function updateAvailableSockets( &$sockets, &$used, $timeout ) { |
202 | $read = $used; |
203 | $write = $except = []; |
204 | stream_select( $read, $write, $except, $timeout ); |
205 | foreach ( $read as $socket ) { |
206 | $line = fgets( $socket ); |
207 | [ $id, $data ] = json_decode( trim( $line ) ); |
208 | $this->receive( (int)$id, $data ); |
209 | $sockets[] = $socket; |
210 | $idx = array_search( $socket, $used ); |
211 | unset( $used[$idx] ); |
212 | } |
213 | } |
214 | |
215 | /** |
216 | * @param int $id |
217 | * @param string $data |
218 | */ |
219 | protected function receive( $id, $data ) { |
220 | if ( $id !== $this->nextOutputId ) { |
221 | $this->delayedOutputData[$id] = $data; |
222 | return; |
223 | } |
224 | fwrite( $this->output, $data . "\n" ); |
225 | $this->nextOutputId = $id + 1; |
226 | while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) { |
227 | fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" ); |
228 | unset( $this->delayedOutputData[$this->nextOutputId] ); |
229 | $this->nextOutputId++; |
230 | } |
231 | } |
232 | } |
233 | |
234 | /** @deprecated class alias since 1.40 */ |
235 | class_alias( OrderedStreamingForkController::class, 'OrderedStreamingForkController' ); |