58 private const SENSITIVE_HEADERS =
'/(^|-|_)(authorization|auth|password|cookie)($|-|_)/';
85 protected $userAgent =
'wikimedia/multi-http-client v1.1';
89 protected array $headers = [];
94 private const TIMEOUT_ACCURACY_FACTOR = 0.1;
121 if ( isset( $options[
'caBundlePath'] ) ) {
122 $this->caBundlePath = $options[
'caBundlePath'];
123 if ( !file_exists( $this->caBundlePath ) ) {
124 throw new Exception(
"Cannot find CA bundle: " . $this->caBundlePath );
128 'connTimeout',
'maxConnTimeout',
'reqTimeout',
'maxReqTimeout',
129 'usePipelining',
'maxConnsPerHost',
'proxy',
'userAgent',
'logger',
130 'localProxy',
'localVirtualHosts',
'headers',
'telemetry'
132 foreach ( $opts as $key ) {
133 if ( isset( $options[$key] ) ) {
134 $this->$key = $options[$key];
137 $this->logger ??=
new NullLogger;
163 public function run( array $req, array $opts = [] ) {
164 return $this->
runMulti( [ $req ], $opts )[0][
'response'];
198 public function runMulti( array $reqs, array $opts = [] ) {
199 $this->normalizeRequests( $reqs );
200 $opts += [
'connTimeout' => $this->connTimeout,
'reqTimeout' => $this->reqTimeout ];
202 if ( $this->maxConnTimeout && $opts[
'connTimeout'] > $this->maxConnTimeout ) {
203 $opts[
'connTimeout'] = $this->maxConnTimeout;
205 if ( $this->maxReqTimeout && $opts[
'reqTimeout'] > $this->maxReqTimeout ) {
206 $opts[
'reqTimeout'] = $this->maxReqTimeout;
210 switch ( $opts[
'httpVersion'] ??
null ) {
212 $opts[
'httpVersion'] = CURL_HTTP_VERSION_1_0;
215 $opts[
'httpVersion'] = CURL_HTTP_VERSION_1_1;
219 $opts[
'httpVersion'] = CURL_HTTP_VERSION_2_0;
222 $opts[
'httpVersion'] = CURL_HTTP_VERSION_NONE;
224 return $this->runMultiCurl( $reqs, $opts );
226 # TODO: Add handling for httpVersion option
227 return $this->runMultiHttp( $reqs, $opts );
239 return extension_loaded(
'curl' ) && function_exists(
'curl_multi_init' );
259 private function runMultiCurl( array $reqs, array $opts ) {
260 $chm = $this->getCurlMulti( $opts );
262 $selectTimeout = $this->getSelectTimeout( $opts );
266 foreach ( $reqs as $index => &$req ) {
267 $handles[$index] = $this->getCurlHandle( $req, $opts );
268 curl_multi_add_handle( $chm, $handles[$index] );
278 $mrc = curl_multi_exec( $chm, $active );
279 $info = curl_multi_info_read( $chm );
280 if ( $info !==
false ) {
284 $infos[(int)$info[
'handle']] = $info;
286 }
while ( $mrc == CURLM_CALL_MULTI_PERFORM );
288 if ( $active > 0 && $mrc == CURLM_OK && curl_multi_select( $chm, $selectTimeout ) == -1 ) {
292 }
while ( $active > 0 && $mrc == CURLM_OK );
295 foreach ( $reqs as $index => &$req ) {
296 $ch = $handles[$index];
297 curl_multi_remove_handle( $chm, $ch );
299 if ( isset( $infos[(
int)$ch] ) ) {
300 $info = $infos[(int)$ch];
301 $errno = $info[
'result'];
302 if ( $errno !== 0 ) {
303 $req[
'response'][
'error'] =
"(curl error: $errno)";
304 if ( function_exists(
'curl_strerror' ) ) {
305 $req[
'response'][
'error'] .=
" " . curl_strerror( $errno );
307 $this->logger->warning(
"Error fetching URL \"{$req['url']}\": " .
308 $req[
'response'][
'error'] );
310 $this->logger->debug(
311 "HTTP complete: {method} {url} code={response_code} size={size} " .
312 "total={total_time} connect={connect_time}",
314 'method' => $req[
'method'],
315 'url' => $req[
'url'],
316 'response_code' => $req[
'response'][
'code'],
317 'size' => curl_getinfo( $ch, CURLINFO_SIZE_DOWNLOAD ),
318 'total_time' => $this->getCurlTime(
319 $ch, CURLINFO_TOTAL_TIME,
'CURLINFO_TOTAL_TIME_T'
321 'connect_time' => $this->getCurlTime(
322 $ch, CURLINFO_CONNECT_TIME,
'CURLINFO_CONNECT_TIME_T'
328 $req[
'response'][
'error'] =
"(curl error: no status set)";
332 $req[
'response'][0] = $req[
'response'][
'code'];
333 $req[
'response'][1] = $req[
'response'][
'reason'];
334 $req[
'response'][2] = $req[
'response'][
'headers'];
335 $req[
'response'][3] = $req[
'response'][
'body'];
336 $req[
'response'][4] = $req[
'response'][
'error'];
339 if ( isset( $req[
'_closeHandle'] ) ) {
340 fclose( $req[
'_closeHandle'] );
341 unset( $req[
'_closeHandle'] );
364 curl_setopt( $ch, CURLOPT_PROXY, $req[
'proxy'] ?? $this->proxy );
365 curl_setopt( $ch, CURLOPT_CONNECTTIMEOUT_MS, intval( $opts[
'connTimeout'] * 1e3 ) );
366 curl_setopt( $ch, CURLOPT_TIMEOUT_MS, intval( $opts[
'reqTimeout'] * 1e3 ) );
367 curl_setopt( $ch, CURLOPT_FOLLOWLOCATION, 1 );
368 curl_setopt( $ch, CURLOPT_MAXREDIRS, 4 );
369 curl_setopt( $ch, CURLOPT_HEADER, 0 );
370 if ( $this->caBundlePath !==
null ) {
371 curl_setopt( $ch, CURLOPT_SSL_VERIFYPEER,
true );
372 curl_setopt( $ch, CURLOPT_CAINFO, $this->caBundlePath );
374 curl_setopt( $ch, CURLOPT_RETURNTRANSFER, 1 );
377 $query = http_build_query( $req[
'query'],
'',
'&', PHP_QUERY_RFC3986 );
378 if ( $query !=
'' ) {
379 $url .= strpos( $req[
'url'],
'?' ) ===
false ?
"?$query" :
"&$query";
381 curl_setopt( $ch, CURLOPT_URL, $url );
382 curl_setopt( $ch, CURLOPT_CUSTOMREQUEST, $req[
'method'] );
383 curl_setopt( $ch, CURLOPT_NOBODY, ( $req[
'method'] ===
'HEAD' ) );
384 curl_setopt( $ch, CURLOPT_HTTP_VERSION, $opts[
'httpVersion'] ?? CURL_HTTP_VERSION_NONE );
386 if ( $req[
'method'] ===
'PUT' ) {
387 curl_setopt( $ch, CURLOPT_PUT, 1 );
389 if ( is_resource( $req[
'body'] ) ) {
390 curl_setopt( $ch, CURLOPT_INFILE, $req[
'body'] );
391 if ( isset( $req[
'headers'][
'content-length'] ) ) {
392 curl_setopt( $ch, CURLOPT_INFILESIZE, $req[
'headers'][
'content-length'] );
393 } elseif ( isset( $req[
'headers'][
'transfer-encoding'] ) &&
394 $req[
'headers'][
'transfer-encoding'] ===
'chunks'
396 curl_setopt( $ch, CURLOPT_UPLOAD,
true );
398 throw new Exception(
"Missing 'Content-Length' or 'Transfer-Encoding' header." );
400 } elseif ( $req[
'body'] !==
'' ) {
401 $fp = fopen(
"php://temp",
"wb+" );
402 fwrite( $fp, $req[
'body'], strlen( $req[
'body'] ) );
404 curl_setopt( $ch, CURLOPT_INFILE, $fp );
405 curl_setopt( $ch, CURLOPT_INFILESIZE, strlen( $req[
'body'] ) );
406 $req[
'_closeHandle'] = $fp;
408 curl_setopt( $ch, CURLOPT_INFILESIZE, 0 );
410 curl_setopt( $ch, CURLOPT_READFUNCTION,
411 static function ( $ch, $fd, $length ) {
412 return (
string)fread( $fd, $length );
415 } elseif ( $req[
'method'] ===
'POST' ) {
416 curl_setopt( $ch, CURLOPT_POST, 1 );
417 curl_setopt( $ch, CURLOPT_POSTFIELDS, $req[
'body'] );
420 if ( is_resource( $req[
'body'] ) || $req[
'body'] !==
'' ) {
421 throw new Exception(
"HTTP body specified for a non PUT/POST request." );
423 $req[
'headers'][
'content-length'] = 0;
426 if ( !isset( $req[
'headers'][
'user-agent'] ) ) {
427 $req[
'headers'][
'user-agent'] = $this->userAgent;
431 foreach ( $req[
'headers'] as $name => $value ) {
432 if ( strpos( $name,
':' ) !==
false ) {
433 throw new Exception(
"Header name must not contain colon-space." );
435 $headers[] = $name .
': ' . trim( $value );
437 curl_setopt( $ch, CURLOPT_HTTPHEADER, $headers );
439 curl_setopt( $ch, CURLOPT_HEADERFUNCTION,
440 static function ( $ch,
$header ) use ( &$req ) {
441 if ( !empty( $req[
'flags'][
'relayResponseHeaders'] ) && trim(
$header ) !==
'' ) {
446 if ( preg_match(
"/^(HTTP\/(?:1\.[01]|2)) (\d{3}) (.*)/",
$header,
$matches ) ) {
447 $req[
'response'][
'code'] = (int)
$matches[2];
448 $req[
'response'][
'reason'] = trim(
$matches[3] );
451 $req[
'response'][
'headers'] = [];
454 if ( strpos(
$header,
":" ) ===
false ) {
457 [ $name, $value ] = explode(
":",
$header, 2 );
458 $name = strtolower( $name );
459 $value = trim( $value );
460 if ( isset( $req[
'response'][
'headers'][$name] ) ) {
461 $req[
'response'][
'headers'][$name] .=
', ' . $value;
463 $req[
'response'][
'headers'][$name] = $value;
470 $hasOutputStream = isset( $req[
'stream'] );
471 curl_setopt( $ch, CURLOPT_WRITEFUNCTION,
472 static function ( $ch, $data ) use ( &$req, $hasOutputStream ) {
473 if ( $hasOutputStream ) {
475 return fwrite( $req[
'stream'], $data );
478 $req[
'response'][
'body'] .= $data;
480 return strlen( $data );
496 $cmh = curl_multi_init();
499 curl_multi_setopt(
$cmh, CURLMOPT_MAXCONNECTS, (
int)$this->maxConnsPerHost );
503 $curlVersion = curl_version()[
'version'];
506 if ( version_compare( $curlVersion,
'7.30.0',
'>=' ) ) {
508 $maxHostConns = $opts[
'maxConnsPerHost'] ?? $this->maxConnsPerHost;
509 curl_multi_setopt( $this->cmh, CURLMOPT_MAX_HOST_CONNECTIONS, (
int)$maxHostConns );
512 if ( $opts[
'usePipelining'] ?? $this->usePipelining ) {
513 if ( version_compare( $curlVersion,
'7.43',
'<' ) ) {
516 } elseif ( version_compare( $curlVersion,
'7.62',
'<' ) ) {
518 $pipelining = CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX;
521 $pipelining = CURLPIPE_MULTIPLEX;
525 @curl_multi_setopt( $this->cmh, CURLMOPT_PIPELINING, $pipelining );
541 private function getCurlTime( $ch, $oldOption, $newConstName ): string {
542 if ( defined( $newConstName ) ) {
543 return sprintf(
"%.6F", curl_getinfo( $ch, constant( $newConstName ) ) / 1e6 );
545 return (
string)curl_getinfo( $ch, $oldOption );
564 private function runMultiHttp( array $reqs, array $opts = [] ) {
566 'timeout' => $opts[
'reqTimeout'] ?? $this->reqTimeout,
567 'connectTimeout' => $opts[
'connTimeout'] ?? $this->connTimeout,
568 'logger' => $this->logger,
569 'caInfo' => $this->caBundlePath,
571 foreach ( $reqs as &$req ) {
572 $reqOptions = $httpOptions + [
573 'method' => $req[
'method'],
574 'proxy' => $req[
'proxy'] ?? $this->proxy,
575 'userAgent' => $req[
'headers'][
'user-agent'] ?? $this->userAgent,
576 'postData' => $req[
'body'],
580 $query = http_build_query( $req[
'query'],
'',
'&', PHP_QUERY_RFC3986 );
581 if ( $query !=
'' ) {
582 $url .= strpos( $req[
'url'],
'?' ) ===
false ?
"?$query" :
"&$query";
585 $httpRequest = MediaWikiServices::getInstance()->getHttpRequestFactory()->create(
586 $url, $reqOptions, __METHOD__ );
587 $httpRequest->setLogger( $this->logger );
588 foreach ( $req[
'headers'] as
$header => $value ) {
589 $httpRequest->setHeader(
$header, $value );
591 $sv = $httpRequest->execute()->getStatusValue();
593 $respHeaders = array_map(
594 static function ( $v ) {
595 return implode(
', ', $v );
597 $httpRequest->getResponseHeaders() );
600 'code' => $httpRequest->getStatus(),
602 'headers' => $respHeaders,
603 'body' => $httpRequest->getContent(),
607 if ( !$sv->isOK() ) {
608 $svErrors = $sv->getErrors();
609 if ( isset( $svErrors[0] ) ) {
610 $req[
'response'][
'error'] = $svErrors[0][
'message'];
613 if ( isset( $svErrors[0][
'params'][0] ) ) {
614 if ( is_numeric( $svErrors[0][
'params'][0] ) ) {
615 if ( isset( $svErrors[0][
'params'][1] ) ) {
617 $req[
'response'][
'reason'] = $svErrors[0][
'params'][1];
620 $req[
'response'][
'reason'] = $svErrors[0][
'params'][0];
626 $req[
'response'][0] = $req[
'response'][
'code'];
627 $req[
'response'][1] = $req[
'response'][
'reason'];
628 $req[
'response'][2] = $req[
'response'][
'headers'];
629 $req[
'response'][3] = $req[
'response'][
'body'];
630 $req[
'response'][4] = $req[
'response'][
'error'];
641 private function normalizeHeaders( array $headers ): array {
643 foreach ( $headers as $name => $value ) {
644 $normalized[strtolower( $name )] = $value;
654 private function normalizeRequests( array &$reqs ) {
655 foreach ( $reqs as &$req ) {
663 if ( isset( $req[0] ) ) {
664 $req[
'method'] = $req[0];
667 if ( isset( $req[1] ) ) {
668 $req[
'url'] = $req[1];
671 if ( !isset( $req[
'method'] ) ) {
672 throw new Exception(
"Request has no 'method' field set." );
673 } elseif ( !isset( $req[
'url'] ) ) {
674 throw new Exception(
"Request has no 'url' field set." );
676 if ( $this->localProxy !==
false && $this->isLocalURL( $req[
'url'] ) ) {
677 $this->useReverseProxy( $req, $this->localProxy );
679 $req[
'query'] ??= [];
680 $req[
'headers'] = $this->normalizeHeaders(
683 $this->telemetry ? $this->telemetry->getRequestHeaders() : [],
684 $req[
'headers'] ?? []
688 if ( !isset( $req[
'body'] ) ) {
690 $req[
'headers'][
'content-length'] = 0;
693 $logHeaders = $req[
'headers'];
694 foreach ( $logHeaders as
$header => $value ) {
695 if ( preg_match( self::SENSITIVE_HEADERS,
$header ) === 1 ) {
696 $logHeaders[
$header] =
'[redacted]';
699 $this->logger->debug(
"HTTP start: {method} {url}",
701 'method' => $req[
'method'],
702 'url' => $req[
'url'],
703 'headers' => $logHeaders,
706 $req[
'flags'] ??= [];
710 private function useReverseProxy( array &$req, $proxy ) {
712 if ( $parsedProxy ===
false ) {
713 throw new Exception(
"Invalid reverseProxy configured: $proxy" );
716 if ( $parsedUrl ===
false ) {
717 throw new Exception(
"Invalid url specified: {$req['url']}" );
720 $req[
'headers'][
'Host'] = $parsedUrl[
'host'];
722 $parsedUrl[
'scheme'] = $parsedProxy[
'scheme'];
723 $parsedUrl[
'host'] = $parsedProxy[
'host'];
724 if ( isset( $parsedProxy[
'port'] ) ) {
725 $parsedUrl[
'port'] = $parsedProxy[
'port'];
727 unset( $parsedUrl[
'port'] );
732 $req[
'proxy'] =
false;
742 private function isLocalURL( $url ) {
743 if ( !$this->localVirtualHosts ) {
750 if ( preg_match(
'!^https?://([\w.-]+)[/:].*$!', $url,
$matches ) ) {
753 $domainParts = explode(
'.', $host );
755 $domainParts = array_reverse( $domainParts );
758 $countParts = count( $domainParts );
759 for ( $i = 0; $i < $countParts; $i++ ) {
760 $domainPart = $domainParts[$i];
762 $domain = $domainPart;
764 $domain = $domainPart .
'.' . $domain;
767 if ( in_array( $domain, $this->localVirtualHosts ) ) {
782 private function getSelectTimeout( $opts ) {
783 $connTimeout = $opts[
'connTimeout'] ?? $this->connTimeout;
784 $reqTimeout = $opts[
'reqTimeout'] ?? $this->reqTimeout;
785 $timeouts = array_filter( [ $connTimeout, $reqTimeout ] );
786 if ( count( $timeouts ) === 0 ) {
790 $selectTimeout = min( $timeouts ) * self::TIMEOUT_ACCURACY_FACTOR;
792 if ( $selectTimeout < 10e-6 ) {
793 $selectTimeout = 10e-6;
795 return $selectTimeout;
804 $this->logger = $logger;
809 curl_multi_close( $this->cmh );