MediaWiki master
OrderedStreamingForkController.php
Go to the documentation of this file.
1<?php
8
30 protected $workCallback;
32 protected $input;
34 protected $output;
36 protected $nextOutputId;
38 protected $delayedOutputData = [];
39
47 public function __construct( $numProcs, $workCallback, $input, $output ) {
48 parent::__construct( $numProcs );
49 $this->workCallback = $workCallback;
50 $this->input = $input;
51 $this->output = $output;
52 }
53
57 public function start() {
58 if ( $this->procsToStart > 0 ) {
59 $status = parent::start();
60 if ( $status === 'child' ) {
61 $this->consume();
62 }
63 } else {
64 $status = 'parent';
65 $this->consumeNoFork();
66 }
67 return $status;
68 }
69
74 protected function forkWorkers( $numProcs ) {
75 $this->prepareEnvironment();
76
77 $childSockets = [];
78 // Create the child processes
79 for ( $i = 0; $i < $numProcs; $i++ ) {
80 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
81 // Do the fork
82 $pid = pcntl_fork();
83 if ( $pid === -1 ) {
84 echo "Error creating child processes\n";
85 exit( 1 );
86 }
87
88 if ( !$pid ) {
89 $this->initChild();
90 $this->childNumber = $i;
91 $this->input = $sockets[0];
92 $this->output = $sockets[0];
93 fclose( $sockets[1] );
94 return 'child';
95 } else {
96 // This is the parent process
97 $this->children[$pid] = true;
98 fclose( $sockets[0] );
99 $childSockets[] = $sockets[1];
100 }
101 }
102 $this->feedChildren( $childSockets );
103 foreach ( $childSockets as $socket ) {
104 fclose( $socket );
105 }
106 return 'parent';
107 }
108
113 protected function consume() {
114 while ( !feof( $this->input ) ) {
115 $line = trim( fgets( $this->input ) );
116 if ( $line ) {
117 [ $id, $data ] = json_decode( $line );
118 $result = ( $this->workCallback )( $data );
119 fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
120 }
121 }
122 }
123
127 protected function consumeNoFork() {
128 while ( !feof( $this->input ) ) {
129 $data = fgets( $this->input );
130 if ( substr( $data, -1 ) === "\n" ) {
131 // Strip any final new line used to delimit lines of input.
132 // The last line of input might not have it, though.
133 $data = substr( $data, 0, -1 );
134 }
135 if ( $data === '' ) {
136 continue;
137 }
138 $result = ( $this->workCallback )( $data );
139 fwrite( $this->output, "$result\n" );
140 }
141 }
142
149 protected function feedChildren( array $sockets ) {
150 $used = [];
151 $id = 0;
152 $this->nextOutputId = 0;
153
154 while ( !feof( $this->input ) ) {
155 $data = fgets( $this->input );
156 if ( $used ) {
157 do {
158 $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
159 } while ( !$sockets );
160 }
161 if ( substr( $data, -1 ) === "\n" ) {
162 // Strip any final new line used to delimit lines of input.
163 // The last line of input might not have it, though.
164 $data = substr( $data, 0, -1 );
165 }
166 if ( $data === '' ) {
167 continue;
168 }
169 $socket = array_pop( $sockets );
170 fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
171 $used[] = $socket;
172 }
173 while ( $used ) {
174 $this->updateAvailableSockets( $sockets, $used, 5 );
175 }
176 }
177
187 protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
188 $read = $used;
189 $write = $except = [];
190 stream_select( $read, $write, $except, $timeout );
191 foreach ( $read as $socket ) {
192 $line = fgets( $socket );
193 [ $id, $data ] = json_decode( trim( $line ) );
194 $this->receive( (int)$id, $data );
195 $sockets[] = $socket;
196 $idx = array_search( $socket, $used );
197 unset( $used[$idx] );
198 }
199 }
200
205 protected function receive( $id, $data ) {
206 if ( $id !== $this->nextOutputId ) {
207 $this->delayedOutputData[$id] = $data;
208 return;
209 }
210 fwrite( $this->output, $data . "\n" );
211 $this->nextOutputId = $id + 1;
212 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
213 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
214 unset( $this->delayedOutputData[$this->nextOutputId] );
215 $this->nextOutputId++;
216 }
217 }
218}
219
221class_alias( OrderedStreamingForkController::class, 'OrderedStreamingForkController' );
Manage forking inside CLI maintenance scripts.
Apply a transformation to values via a pool of sub processes.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
string[] $delayedOutputData
Int key indicates order, value is data.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
Update the CREDITS list by merging in the list of git commit authors.