Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 84
0.00% covered (danger)
0.00%
0 / 8
CRAP
0.00% covered (danger)
0.00%
0 / 1
OrderedStreamingForkController
0.00% covered (danger)
0.00%
0 / 83
0.00% covered (danger)
0.00%
0 / 8
812
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 start
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 forkWorkers
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
30
 consume
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
12
 consumeNoFork
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
20
 feedChildren
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
56
 updateAvailableSockets
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
6
 receive
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21namespace MediaWiki\Maintenance;
22
23/**
24 * Apply a transformation to values via a pool of sub processes.
25 *
26 * The controller reads lines from a given input stream, where each line
27 * describes work to be done. This work is then farmed out to multiple
28 * child streams that correspond to child procesess. Each child has exactly
29 * one piece of work in-flight at a given moment. The result of each work
30 * is written to an output stream.
31 *
32 * If numProcs is zero, the fallback is to perform work in-process instead.
33 *
34 * This class guarantees that the output is produced in the same exact order
35 * as input values were.
36 *
37 * Currently used by CirrusSearch extension to implement CLI search script.
38 *
39 * @ingroup Maintenance
40 * @since 1.30
41 */
42class OrderedStreamingForkController extends ForkController {
43    /** @var callable */
44    protected $workCallback;
45    /** @var resource */
46    protected $input;
47    /** @var resource */
48    protected $output;
49    /** @var int */
50    protected $nextOutputId;
51    /** @var string[] Int key indicates order, value is data */
52    protected $delayedOutputData = [];
53
54    /**
55     * @param int $numProcs The number of worker processes to fork
56     * @param callable $workCallback A callback to call in the child process
57     *  once for each line of work to process.
58     * @param resource $input A socket to read work lines from
59     * @param resource $output A socket to write the result of work to.
60     */
61    public function __construct( $numProcs, $workCallback, $input, $output ) {
62        parent::__construct( $numProcs );
63        $this->workCallback = $workCallback;
64        $this->input = $input;
65        $this->output = $output;
66    }
67
68    /**
69     * @inheritDoc
70     */
71    public function start() {
72        if ( $this->procsToStart > 0 ) {
73            $status = parent::start();
74            if ( $status === 'child' ) {
75                $this->consume();
76            }
77        } else {
78            $status = 'parent';
79            $this->consumeNoFork();
80        }
81        return $status;
82    }
83
84    /**
85     * @param int $numProcs
86     * @return string
87     */
88    protected function forkWorkers( $numProcs ) {
89        $this->prepareEnvironment();
90
91        $childSockets = [];
92        // Create the child processes
93        for ( $i = 0; $i < $numProcs; $i++ ) {
94            $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
95            // Do the fork
96            $pid = pcntl_fork();
97            if ( $pid === -1 ) {
98                echo "Error creating child processes\n";
99                exit( 1 );
100            }
101
102            if ( !$pid ) {
103                $this->initChild();
104                $this->childNumber = $i;
105                $this->input = $sockets[0];
106                $this->output = $sockets[0];
107                fclose( $sockets[1] );
108                return 'child';
109            } else {
110                // This is the parent process
111                $this->children[$pid] = true;
112                fclose( $sockets[0] );
113                $childSockets[] = $sockets[1];
114            }
115        }
116        $this->feedChildren( $childSockets );
117        foreach ( $childSockets as $socket ) {
118            fclose( $socket );
119        }
120        return 'parent';
121    }
122
123    /**
124     * Child worker process. Reads work from $this->input and writes the
125     * result of that work to $this->output when completed.
126     */
127    protected function consume() {
128        while ( !feof( $this->input ) ) {
129            $line = trim( fgets( $this->input ) );
130            if ( $line ) {
131                [ $id, $data ] = json_decode( $line );
132                $result = call_user_func( $this->workCallback, $data );
133                fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
134            }
135        }
136    }
137
138    /**
139     * Special cased version of self::consume() when no forking occurs
140     */
141    protected function consumeNoFork() {
142        while ( !feof( $this->input ) ) {
143            $data = fgets( $this->input );
144            if ( substr( $data, -1 ) === "\n" ) {
145                // Strip any final new line used to delimit lines of input.
146                // The last line of input might not have it, though.
147                $data = substr( $data, 0, -1 );
148            }
149            if ( $data === '' ) {
150                continue;
151            }
152            $result = call_user_func( $this->workCallback, $data );
153            fwrite( $this->output, "$result\n" );
154        }
155    }
156
157    /**
158     * Reads lines of work from $this->input and farms them out to
159     * the provided socket.
160     *
161     * @param resource[] $sockets
162     */
163    protected function feedChildren( array $sockets ) {
164        $used = [];
165        $id = 0;
166        $this->nextOutputId = 0;
167
168        while ( !feof( $this->input ) ) {
169            $data = fgets( $this->input );
170            if ( $used ) {
171                do {
172                    $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
173                } while ( !$sockets );
174            }
175            if ( substr( $data, -1 ) === "\n" ) {
176                // Strip any final new line used to delimit lines of input.
177                // The last line of input might not have it, though.
178                $data = substr( $data, 0, -1 );
179            }
180            if ( $data === '' ) {
181                continue;
182            }
183            $socket = array_pop( $sockets );
184            fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
185            $used[] = $socket;
186        }
187        while ( $used ) {
188            $this->updateAvailableSockets( $sockets, $used, 5 );
189        }
190    }
191
192    /**
193     * Moves sockets from $used to $sockets when they are available
194     * for more work
195     *
196     * @param resource[] &$sockets List of sockets that are waiting for work
197     * @param resource[] &$used List of sockets currently performing work
198     * @param int $timeout The number of seconds to block waiting. 0 for
199     *  non-blocking operation.
200     */
201    protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
202        $read = $used;
203        $write = $except = [];
204        stream_select( $read, $write, $except, $timeout );
205        foreach ( $read as $socket ) {
206            $line = fgets( $socket );
207            [ $id, $data ] = json_decode( trim( $line ) );
208            $this->receive( (int)$id, $data );
209            $sockets[] = $socket;
210            $idx = array_search( $socket, $used );
211            unset( $used[$idx] );
212        }
213    }
214
215    /**
216     * @param int $id
217     * @param string $data
218     */
219    protected function receive( $id, $data ) {
220        if ( $id !== $this->nextOutputId ) {
221            $this->delayedOutputData[$id] = $data;
222            return;
223        }
224        fwrite( $this->output, $data . "\n" );
225        $this->nextOutputId = $id + 1;
226        while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
227            fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
228            unset( $this->delayedOutputData[$this->nextOutputId] );
229            $this->nextOutputId++;
230        }
231    }
232}
233
234/** @deprecated class alias since 1.40 */
235class_alias( OrderedStreamingForkController::class, 'OrderedStreamingForkController' );