52 parent::__construct( $numProcs );
62 if ( $this->procsToStart > 0 ) {
83 for ( $i = 0; $i < $numProcs; $i++ ) {
84 $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
87 if ( $pid === -1 || $pid ===
false ) {
88 echo
"Error creating child processes\n";
94 $this->childNumber = $i;
95 $this->input = $sockets[0];
96 $this->output = $sockets[0];
97 fclose( $sockets[1] );
101 $this->children[$pid] =
true;
102 fclose( $sockets[0] );
103 $childSockets[] = $sockets[1];
107 foreach ( $childSockets as $socket ) {
110 if ( is_resource( $socket ) ) {
122 while ( !feof( $this->input ) ) {
123 $line = trim( fgets( $this->input ) );
125 list( $id, $data ) = json_decode(
$line );
126 $result = call_user_func( $this->workCallback, $data );
127 fwrite( $this->output, json_encode( [ $id, $result ] ) .
"\n" );
136 while ( !feof( $this->input ) ) {
137 $data = fgets( $this->input );
138 if ( substr( $data, -1 ) ===
"\n" ) {
141 $data = substr( $data, 0, -1 );
143 if ( $data ===
'' ) {
146 $result = call_user_func( $this->workCallback, $data );
147 fwrite( $this->output,
"$result\n" );
160 $this->nextOutputId = 0;
162 while ( !feof( $this->input ) ) {
163 $data = fgets( $this->input );
167 }
while ( !$sockets );
169 if ( substr( $data, - 1 ) ===
"\n" ) {
172 $data = substr( $data, 0, -1 );
174 if ( $data ===
'' ) {
177 $socket = array_pop( $sockets );
178 fwrite( $socket, json_encode( [ $id++, $data ] ) .
"\n" );
197 $write = $except = [];
198 stream_select( $read, $write, $except, $timeout );
199 foreach ( $read as $socket ) {
200 $line = fgets( $socket );
201 list( $id, $data ) = json_decode( trim(
$line ) );
202 $this->
receive( (
int)$id, $data );
203 $sockets[] = $socket;
204 $idx = array_search( $socket, $used );
205 unset( $used[$idx] );
214 if ( $id !== $this->nextOutputId ) {
215 $this->delayedOutputData[$id] = $data;
218 fwrite( $this->output, $data .
"\n" );
219 $this->nextOutputId = $id + 1;
220 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
221 fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] .
"\n" );
222 unset( $this->delayedOutputData[$this->nextOutputId] );
223 $this->nextOutputId++;