MediaWiki REL1_34
OrderedStreamingForkController.php
Go to the documentation of this file.
1<?php
2
34 protected $workCallback;
36 protected $input;
38 protected $output;
40 protected $nextOutputId;
42 protected $delayedOutputData = [];
43
51 public function __construct( $numProcs, $workCallback, $input, $output ) {
52 parent::__construct( $numProcs );
53 $this->workCallback = $workCallback;
54 $this->input = $input;
55 $this->output = $output;
56 }
57
61 public function start() {
62 if ( $this->procsToStart > 0 ) {
63 $status = parent::start();
64 if ( $status === 'child' ) {
65 $this->consume();
66 }
67 } else {
68 $status = 'parent';
69 $this->consumeNoFork();
70 }
71 return $status;
72 }
73
78 protected function forkWorkers( $numProcs ) {
79 $this->prepareEnvironment();
80
81 $childSockets = [];
82 // Create the child processes
83 for ( $i = 0; $i < $numProcs; $i++ ) {
84 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
85 // Do the fork
86 $pid = pcntl_fork();
87 if ( $pid === -1 || $pid === false ) {
88 echo "Error creating child processes\n";
89 exit( 1 );
90 }
91
92 if ( !$pid ) {
93 $this->initChild();
94 $this->childNumber = $i;
95 $this->input = $sockets[0];
96 $this->output = $sockets[0];
97 fclose( $sockets[1] );
98 return 'child';
99 } else {
100 // This is the parent process
101 $this->children[$pid] = true;
102 fclose( $sockets[0] );
103 $childSockets[] = $sockets[1];
104 }
105 }
106 $this->feedChildren( $childSockets );
107 foreach ( $childSockets as $socket ) {
108 // if a child has already shutdown the sockets will be closed,
109 // closing a second time would raise a warning.
110 if ( is_resource( $socket ) ) {
111 fclose( $socket );
112 }
113 }
114 return 'parent';
115 }
116
121 protected function consume() {
122 while ( !feof( $this->input ) ) {
123 $line = trim( fgets( $this->input ) );
124 if ( $line ) {
125 list( $id, $data ) = json_decode( $line );
126 $result = call_user_func( $this->workCallback, $data );
127 fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
128 }
129 }
130 }
131
135 protected function consumeNoFork() {
136 while ( !feof( $this->input ) ) {
137 $data = fgets( $this->input );
138 if ( substr( $data, -1 ) === "\n" ) {
139 // Strip any final new line used to delimit lines of input.
140 // The last line of input might not have it, though.
141 $data = substr( $data, 0, -1 );
142 }
143 if ( $data === '' ) {
144 continue;
145 }
146 $result = call_user_func( $this->workCallback, $data );
147 fwrite( $this->output, "$result\n" );
148 }
149 }
150
157 protected function feedChildren( array $sockets ) {
158 $used = [];
159 $id = 0;
160 $this->nextOutputId = 0;
161
162 while ( !feof( $this->input ) ) {
163 $data = fgets( $this->input );
164 if ( $used ) {
165 do {
166 $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
167 } while ( !$sockets );
168 }
169 if ( substr( $data, - 1 ) === "\n" ) {
170 // Strip any final new line used to delimit lines of input.
171 // The last line of input might not have it, though.
172 $data = substr( $data, 0, -1 );
173 }
174 if ( $data === '' ) {
175 continue;
176 }
177 $socket = array_pop( $sockets );
178 fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
179 $used[] = $socket;
180 }
181 while ( $used ) {
182 $this->updateAvailableSockets( $sockets, $used, 5 );
183 }
184 }
185
195 protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
196 $read = $used;
197 $write = $except = [];
198 stream_select( $read, $write, $except, $timeout );
199 foreach ( $read as $socket ) {
200 $line = fgets( $socket );
201 list( $id, $data ) = json_decode( trim( $line ) );
202 $this->receive( (int)$id, $data );
203 $sockets[] = $socket;
204 $idx = array_search( $socket, $used );
205 unset( $used[$idx] );
206 }
207 }
208
213 protected function receive( $id, $data ) {
214 if ( $id !== $this->nextOutputId ) {
215 $this->delayedOutputData[$id] = $data;
216 return;
217 }
218 fwrite( $this->output, $data . "\n" );
219 $this->nextOutputId = $id + 1;
220 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
221 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
222 unset( $this->delayedOutputData[$this->nextOutputId] );
223 $this->nextOutputId++;
224 }
225 }
226}
$line
Definition cdb.php:59
Class for managing forking command line scripts.
Reads lines of work from an input stream and farms them out to multiple child streams.
start()
Start the child processes.This should only be called from the command line. It should be called as ea...
feedChildren(array $sockets)
Reads lines of work from $this->input and farms them out to the provided socket.
string[] $delayedOutputData
Int key indicates order, value is data.
updateAvailableSockets(&$sockets, &$used, $timeout)
Moves sockets from $used to $sockets when they are available for more work.
consumeNoFork()
Special cased version of self::consume() when no forking occurs.
__construct( $numProcs, $workCallback, $input, $output)