MediaWiki 1.39.10
OrderedStreamingForkController.php
Go to the documentation of this file.
1<?php
2
34 protected $workCallback;
36 protected $input;
38 protected $output;
40 protected $nextOutputId;
42 protected $delayedOutputData = [];
43
51 public function __construct( $numProcs, $workCallback, $input, $output ) {
52 parent::__construct( $numProcs );
53 $this->workCallback = $workCallback;
54 $this->input = $input;
55 $this->output = $output;
56 }
57
61 public function start() {
62 if ( $this->procsToStart > 0 ) {
63 $status = parent::start();
64 if ( $status === 'child' ) {
65 $this->consume();
66 }
67 } else {
68 $status = 'parent';
69 $this->consumeNoFork();
70 }
71 return $status;
72 }
73
78 protected function forkWorkers( $numProcs ) {
79 $this->prepareEnvironment();
80
81 $childSockets = [];
82 // Create the child processes
83 for ( $i = 0; $i < $numProcs; $i++ ) {
84 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
85 // Do the fork
86 $pid = pcntl_fork();
87 if ( $pid === -1 ) {
88 echo "Error creating child processes\n";
89 exit( 1 );
90 }
91
92 if ( !$pid ) {
93 $this->initChild();
94 $this->childNumber = $i;
95 $this->input = $sockets[0];
96 $this->output = $sockets[0];
97 fclose( $sockets[1] );
98 return 'child';
99 } else {
100 // This is the parent process
101 $this->children[$pid] = true;
102 fclose( $sockets[0] );
103 $childSockets[] = $sockets[1];
104 }
105 }
106 $this->feedChildren( $childSockets );
107 foreach ( $childSockets as $socket ) {
108 fclose( $socket );
109 }
110 return 'parent';
111 }
112
117 protected function consume() {
118 while ( !feof( $this->input ) ) {
119 $line = trim( fgets( $this->input ) );
120 if ( $line ) {
121 list( $id, $data ) = json_decode( $line );
122 $result = call_user_func( $this->workCallback, $data );
123 fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
124 }
125 }
126 }
127
131 protected function consumeNoFork() {
132 while ( !feof( $this->input ) ) {
133 $data = fgets( $this->input );
134 if ( substr( $data, -1 ) === "\n" ) {
135 // Strip any final new line used to delimit lines of input.
136 // The last line of input might not have it, though.
137 $data = substr( $data, 0, -1 );
138 }
139 if ( $data === '' ) {
140 continue;
141 }
142 $result = call_user_func( $this->workCallback, $data );
143 fwrite( $this->output, "$result\n" );
144 }
145 }
146
153 protected function feedChildren( array $sockets ) {
154 $used = [];
155 $id = 0;
156 $this->nextOutputId = 0;
157
158 while ( !feof( $this->input ) ) {
159 $data = fgets( $this->input );
160 if ( $used ) {
161 do {
162 $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
163 } while ( !$sockets );
164 }
165 if ( substr( $data, -1 ) === "\n" ) {
166 // Strip any final new line used to delimit lines of input.
167 // The last line of input might not have it, though.
168 $data = substr( $data, 0, -1 );
169 }
170 if ( $data === '' ) {
171 continue;
172 }
173 $socket = array_pop( $sockets );
174 fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
175 $used[] = $socket;
176 }
177 while ( $used ) {
178 $this->updateAvailableSockets( $sockets, $used, 5 );
179 }
180 }
181
191 protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
192 $read = $used;
193 $write = $except = [];
194 stream_select( $read, $write, $except, $timeout );
195 foreach ( $read as $socket ) {
196 $line = fgets( $socket );
197 list( $id, $data ) = json_decode( trim( $line ) );
198 $this->receive( (int)$id, $data );
199 $sockets[] = $socket;
200 $idx = array_search( $socket, $used );
201 unset( $used[$idx] );
202 }
203 }
204
209 protected function receive( $id, $data ) {
210 if ( $id !== $this->nextOutputId ) {
211 $this->delayedOutputData[$id] = $data;
212 return;
213 }
214 fwrite( $this->output, $data . "\n" );
215 $this->nextOutputId = $id + 1;
216 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
217 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
218 unset( $this->delayedOutputData[$this->nextOutputId] );
219 $this->nextOutputId++;
220 }
221 }
222}
Class for managing forking command line scripts.
Reads lines of work from an input stream and farms them out to multiple child streams.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
string[] $delayedOutputData
Int key indicates order, value is data.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
__construct( $numProcs, $workCallback, $input, $output)
$line
Definition mcc.php:119