47 private const SENSITIVE_HEADERS =
'/(^|-|_)(authorization|auth|password|cookie)($|-|_)/';
74 protected $userAgent =
'wikimedia/multi-http-client v1.1';
82 private const TIMEOUT_ACCURACY_FACTOR = 0.1;
109 if ( isset( $options[
'caBundlePath'] ) ) {
110 $this->caBundlePath = $options[
'caBundlePath'];
111 if ( !file_exists( $this->caBundlePath ) ) {
112 throw new InvalidArgumentException(
"Cannot find CA bundle: " . $this->caBundlePath );
116 'connTimeout',
'maxConnTimeout',
'reqTimeout',
'maxReqTimeout',
117 'usePipelining',
'maxConnsPerHost',
'proxy',
'userAgent',
'logger',
118 'localProxy',
'localVirtualHosts',
'headers',
'telemetry'
120 foreach ( $opts as $key ) {
121 if ( isset( $options[$key] ) ) {
122 $this->$key = $options[$key];
125 $this->logger ??=
new NullLogger;
152 public function run( array $req, array $opts = [],
string $caller = __METHOD__ ) {
153 return $this->
runMulti( [ $req ], $opts, $caller )[0][
'response'];
188 public function runMulti( array $reqs, array $opts = [],
string $caller = __METHOD__ ) {
189 $this->normalizeRequests( $reqs );
192 if ( $this->maxConnTimeout && $opts[
'connTimeout'] > $this->maxConnTimeout ) {
195 if ( $this->maxReqTimeout && $opts[
'reqTimeout'] > $this->maxReqTimeout ) {
200 switch ( $opts[
'httpVersion'] ??
null ) {
202 $opts[
'httpVersion'] = CURL_HTTP_VERSION_1_0;
205 $opts[
'httpVersion'] = CURL_HTTP_VERSION_1_1;
209 $opts[
'httpVersion'] = CURL_HTTP_VERSION_2_0;
213 $opts[
'httpVersion'] = CURL_HTTP_VERSION_3;
216 $opts[
'httpVersion'] = CURL_HTTP_VERSION_NONE;
218 return $this->runMultiCurl( $reqs, $opts, $caller );
220 # TODO: Add handling for httpVersion option
221 return $this->runMultiHttp( $reqs, $opts );
233 return extension_loaded(
'curl' ) && function_exists(
'curl_multi_init' );
254 private function runMultiCurl( array $reqs, array $opts,
string $caller = __METHOD__ ) {
257 $selectTimeout = $this->getSelectTimeout( $opts );
261 foreach ( $reqs as $index => &$req ) {
263 curl_multi_add_handle( $chm, $handles[$index] );
272 $mrc = curl_multi_exec( $chm, $active );
274 if ( $mrc !== CURLM_OK ) {
275 $error = curl_multi_strerror( $mrc );
276 $this->logger->error(
'curl_multi_exec() failed: {error}', [
278 'exception' =>
new RuntimeException(),
285 if ( $active > 0 && curl_multi_select( $chm, $selectTimeout ) === -1 ) {
286 $errno = curl_multi_errno( $chm );
287 $error = curl_multi_strerror( $errno );
288 $this->logger->error(
'curl_multi_select() failed: {error}', [
290 'exception' =>
new RuntimeException(),
294 }
while ( $active > 0 );
296 $queuedMessages =
null;
298 $info = curl_multi_info_read( $chm, $queuedMessages );
299 if ( $info !==
false && $info[
'msg'] === CURLMSG_DONE ) {
303 $infos[(int)$info[
'handle']] = $info;
305 }
while ( $queuedMessages > 0 );
308 foreach ( $reqs as $index => &$req ) {
309 $ch = $handles[$index];
310 curl_multi_remove_handle( $chm, $ch );
312 if ( isset( $infos[(
int)$ch] ) ) {
313 $info = $infos[(int)$ch];
314 $errno = $info[
'result'];
315 if ( $errno !== 0 ) {
316 $req[
'response'][
'error'] =
"(curl error: $errno)";
317 if ( function_exists(
'curl_strerror' ) ) {
318 $req[
'response'][
'error'] .=
" " . curl_strerror( $errno );
320 $this->logger->error(
'Error fetching URL "{url}": {error}', [
321 'url' => $req[
'url'],
322 'error' => $req[
'response'][
'error'],
323 'exception' =>
new RuntimeException(),
327 $this->logger->debug(
328 "HTTP complete: {method} {url} code={response_code} size={size} " .
329 "total={total_time} connect={connect_time}",
331 'method' => $req[
'method'],
332 'url' => $req[
'url'],
333 'response_code' => $req[
'response'][
'code'],
334 'size' => curl_getinfo( $ch, CURLINFO_SIZE_DOWNLOAD ),
335 'total_time' => $this->getCurlTime(
336 $ch, CURLINFO_TOTAL_TIME,
'CURLINFO_TOTAL_TIME_T'
338 'connect_time' => $this->getCurlTime(
339 $ch, CURLINFO_CONNECT_TIME,
'CURLINFO_CONNECT_TIME_T'
345 $req[
'response'][
'error'] =
"(curl error: no status set)";
349 $req[
'response'][0] = $req[
'response'][
'code'];
350 $req[
'response'][1] = $req[
'response'][
'reason'];
351 $req[
'response'][2] = $req[
'response'][
'headers'];
352 $req[
'response'][3] = $req[
'response'][
'body'];
353 $req[
'response'][4] = $req[
'response'][
'error'];
355 if ( isset( $req[
'_closeHandle'] ) ) {
356 fclose( $req[
'_closeHandle'] );
357 unset( $req[
'_closeHandle'] );
380 curl_setopt( $ch, CURLOPT_PROXY, $req[
'proxy'] ?? $this->proxy );
381 curl_setopt( $ch, CURLOPT_CONNECTTIMEOUT_MS, intval( $opts[
'connTimeout'] * 1e3 ) );
382 curl_setopt( $ch, CURLOPT_TIMEOUT_MS, intval( $opts[
'reqTimeout'] * 1e3 ) );
383 curl_setopt( $ch, CURLOPT_FOLLOWLOCATION, 1 );
384 curl_setopt( $ch, CURLOPT_MAXREDIRS, 4 );
385 curl_setopt( $ch, CURLOPT_HEADER, 0 );
386 if ( $this->caBundlePath !==
null ) {
387 curl_setopt( $ch, CURLOPT_SSL_VERIFYPEER,
true );
388 curl_setopt( $ch, CURLOPT_CAINFO, $this->caBundlePath );
390 curl_setopt( $ch, CURLOPT_RETURNTRANSFER, 1 );
393 $query = http_build_query( $req[
'query'],
'',
'&', PHP_QUERY_RFC3986 );
394 if ( $query !=
'' ) {
395 $url .= !str_contains( $req[
'url'],
'?' ) ?
"?$query" :
"&$query";
397 curl_setopt( $ch, CURLOPT_URL,
$url );
398 curl_setopt( $ch, CURLOPT_CUSTOMREQUEST, $req[
'method'] );
399 curl_setopt( $ch, CURLOPT_NOBODY, ( $req[
'method'] ===
'HEAD' ) );
400 curl_setopt( $ch, CURLOPT_HTTP_VERSION, $opts[
'httpVersion'] ?? CURL_HTTP_VERSION_NONE );
402 if ( $req[
'method'] ===
'PUT' ) {
403 curl_setopt( $ch, CURLOPT_PUT, 1 );
405 if ( is_resource( $req[
'body'] ) ) {
406 curl_setopt( $ch, CURLOPT_INFILE, $req[
'body'] );
407 if ( isset( $req[
'headers'][
'content-length'] ) ) {
408 curl_setopt( $ch, CURLOPT_INFILESIZE, $req[
'headers'][
'content-length'] );
409 } elseif ( isset( $req[
'headers'][
'transfer-encoding'] ) &&
410 $req[
'headers'][
'transfer-encoding'] ===
'chunks'
412 curl_setopt( $ch, CURLOPT_UPLOAD,
true );
414 throw new InvalidArgumentException(
"Missing 'Content-Length' or 'Transfer-Encoding' header." );
416 } elseif ( $req[
'body'] !==
'' ) {
417 $fp = fopen(
"php://temp",
"wb+" );
418 fwrite( $fp, $req[
'body'], strlen( $req[
'body'] ) );
420 curl_setopt( $ch, CURLOPT_INFILE, $fp );
421 curl_setopt( $ch, CURLOPT_INFILESIZE, strlen( $req[
'body'] ) );
422 $req[
'_closeHandle'] = $fp;
424 curl_setopt( $ch, CURLOPT_INFILESIZE, 0 );
426 curl_setopt( $ch, CURLOPT_READFUNCTION,
427 static function ( $ch, $fd, $length ) {
428 return (
string)fread( $fd, $length );
431 } elseif ( $req[
'method'] ===
'POST' ) {
432 curl_setopt( $ch, CURLOPT_POST, 1 );
433 curl_setopt( $ch, CURLOPT_POSTFIELDS, $req[
'body'] );
436 if ( is_resource( $req[
'body'] ) || $req[
'body'] !==
'' ) {
437 throw new InvalidArgumentException(
"HTTP body specified for a non PUT/POST request." );
439 $req[
'headers'][
'content-length'] = 0;
442 if ( !isset( $req[
'headers'][
'user-agent'] ) ) {
447 foreach ( $req[
'headers'] as $name => $value ) {
448 if ( str_contains( $name,
':' ) ) {
449 throw new InvalidArgumentException(
"Header name must not contain colon-space." );
451 $headers[] = $name .
': ' . trim( $value );
453 curl_setopt( $ch, CURLOPT_HTTPHEADER,
$headers );
455 curl_setopt( $ch, CURLOPT_HEADERFUNCTION,
456 static function ( $ch, $header ) use ( &$req ) {
457 if ( !empty( $req[
'flags'][
'relayResponseHeaders'] ) && trim( $header ) !==
'' ) {
460 $length = strlen( $header );
462 if ( preg_match(
"/^(HTTP\/(?:1\.[01]|2|3)) (\d{3}) (.*)/", $header,
$matches ) ) {
463 $req[
'response'][
'code'] = (int)
$matches[2];
464 $req[
'response'][
'reason'] = trim(
$matches[3] );
467 $req[
'response'][
'headers'] = [];
470 if ( !str_contains( $header,
":" ) ) {
473 [ $name, $value ] = explode(
":", $header, 2 );
474 $name = strtolower( $name );
475 $value = trim( $value );
476 if ( isset( $req[
'response'][
'headers'][$name] ) ) {
477 $req[
'response'][
'headers'][$name] .=
', ' . $value;
479 $req[
'response'][
'headers'][$name] = $value;
486 $hasOutputStream = isset( $req[
'stream'] );
487 curl_setopt( $ch, CURLOPT_WRITEFUNCTION,
488 static function ( $ch, $data ) use ( &$req, $hasOutputStream ) {
489 if ( $hasOutputStream ) {
490 return fwrite( $req[
'stream'], $data );
492 $req[
'response'][
'body'] .= $data;
494 return strlen( $data );
510 $cmh = curl_multi_init();
513 curl_multi_setopt(
$cmh, CURLMOPT_MAXCONNECTS, (
int)$this->maxConnsPerHost );
517 $curlVersion = curl_version()[
'version'];
520 if ( version_compare( $curlVersion,
'7.30.0',
'>=' ) ) {
523 curl_multi_setopt( $this->cmh, CURLMOPT_MAX_HOST_CONNECTIONS, (
int)$maxHostConns );
526 if ( $opts[
'usePipelining'] ?? $this->usePipelining ) {
527 if ( version_compare( $curlVersion,
'7.43',
'<' ) ) {
530 } elseif ( version_compare( $curlVersion,
'7.62',
'<' ) ) {
532 $pipelining = CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX;
535 $pipelining = CURLPIPE_MULTIPLEX;
539 @curl_multi_setopt( $this->cmh, CURLMOPT_PIPELINING, $pipelining );
555 private function getCurlTime( $ch, $oldOption, $newConstName ): string {
556 if ( defined( $newConstName ) ) {
557 return sprintf(
"%.6F", curl_getinfo( $ch, constant( $newConstName ) ) / 1e6 );
559 return (
string)curl_getinfo( $ch, $oldOption );
578 private function runMultiHttp( array $reqs, array $opts = [] ) {
580 'timeout' => $opts[
'reqTimeout'] ?? $this->reqTimeout,
581 'connectTimeout' => $opts[
'connTimeout'] ?? $this->connTimeout,
582 'logger' => $this->logger,
583 'caInfo' => $this->caBundlePath,
585 foreach ( $reqs as &$req ) {
586 $reqOptions = $httpOptions + [
587 'method' => $req[
'method'],
588 'proxy' => $req[
'proxy'] ?? $this->proxy,
589 'userAgent' => $req[
'headers'][
'user-agent'] ?? $this->userAgent,
590 'postData' => $req[
'body'],
594 $query = http_build_query( $req[
'query'],
'',
'&', PHP_QUERY_RFC3986 );
595 if ( $query !=
'' ) {
596 $url .= !str_contains( $req[
'url'],
'?' ) ?
"?$query" :
"&$query";
599 $httpRequest = MediaWikiServices::getInstance()->getHttpRequestFactory()->create(
600 $url, $reqOptions, __METHOD__ );
601 $httpRequest->setLogger( $this->logger );
602 foreach ( $req[
'headers'] as $header => $value ) {
603 $httpRequest->setHeader( $header, $value );
605 $sv = $httpRequest->execute()->getStatusValue();
607 $respHeaders = array_map(
608 static fn ( $v ) => implode(
', ', $v ),
609 $httpRequest->getResponseHeaders() );
612 'code' => $httpRequest->getStatus(),
614 'headers' => $respHeaders,
615 'body' => $httpRequest->getContent(),
619 if ( !$sv->isOK() ) {
620 $svErrors = $sv->getErrors();
621 if ( isset( $svErrors[0] ) ) {
622 $req[
'response'][
'error'] = $svErrors[0][
'message'];
625 if ( isset( $svErrors[0][
'params'][0] ) ) {
626 if ( is_numeric( $svErrors[0][
'params'][0] ) ) {
627 if ( isset( $svErrors[0][
'params'][1] ) ) {
629 $req[
'response'][
'reason'] = $svErrors[0][
'params'][1];
632 $req[
'response'][
'reason'] = $svErrors[0][
'params'][0];
638 $req[
'response'][0] = $req[
'response'][
'code'];
639 $req[
'response'][1] = $req[
'response'][
'reason'];
640 $req[
'response'][2] = $req[
'response'][
'headers'];
641 $req[
'response'][3] = $req[
'response'][
'body'];
642 $req[
'response'][4] = $req[
'response'][
'error'];
653 private function normalizeHeaders( array $headers ): array {
655 foreach ( $headers as $name => $value ) {
656 $normalized[strtolower( $name )] = $value;
666 private function normalizeRequests( array &$reqs ) {
667 foreach ( $reqs as &$req ) {
675 if ( isset( $req[0] ) ) {
676 $req[
'method'] = $req[0];
679 if ( isset( $req[1] ) ) {
680 $req[
'url'] = $req[1];
683 if ( !isset( $req[
'method'] ) ) {
684 throw new InvalidArgumentException(
"Request has no 'method' field set." );
685 } elseif ( !isset( $req[
'url'] ) ) {
686 throw new InvalidArgumentException(
"Request has no 'url' field set." );
688 if ( $this->localProxy !==
false && $this->isLocalURL( $req[
'url'] ) ) {
689 $this->useReverseProxy( $req, $this->localProxy );
691 $req[
'query'] ??= [];
692 $req[
'headers'] = $this->normalizeHeaders(
695 $this->telemetry ? $this->telemetry->getRequestHeaders() : [],
696 $req[
'headers'] ?? []
700 if ( !isset( $req[
'body'] ) ) {
702 $req[
'headers'][
'content-length'] = 0;
705 $logHeaders = $req[
'headers'];
706 foreach ( $logHeaders as $header => $value ) {
707 if ( preg_match( self::SENSITIVE_HEADERS, $header ) === 1 ) {
708 $logHeaders[$header] =
'[redacted]';
711 $this->logger->debug(
"HTTP start: {method} {url}",
713 'method' => $req[
'method'],
714 'url' => $req[
'url'],
715 'headers' => $logHeaders,
718 $req[
'flags'] ??= [];
722 private function useReverseProxy( array &$req,
string $proxy ) {
723 $parsedProxy = parse_url( $proxy );
724 if ( $parsedProxy ===
false ) {
725 throw new InvalidArgumentException(
"Invalid reverseProxy configured: $proxy" );
727 $parsedUrl = parse_url( $req[
'url'] );
728 if ( $parsedUrl ===
false ) {
729 throw new InvalidArgumentException(
"Invalid url specified: {$req['url']}" );
733 $req[
'headers'][
'Host'] = $parsedUrl[
'host'];
736 $parsedUrl[
'scheme'] = $parsedProxy[
'scheme'];
738 $parsedUrl[
'host'] = $parsedProxy[
'host'];
739 if ( isset( $parsedProxy[
'port'] ) ) {
740 $parsedUrl[
'port'] = $parsedProxy[
'port'];
742 unset( $parsedUrl[
'port'] );
744 $req[
'url'] = self::assembleUrl( $parsedUrl );
747 $req[
'proxy'] =
false;
760 private static function assembleUrl( array $urlParts ): string {
761 $result = isset( $urlParts[
'scheme'] ) ? $urlParts[
'scheme'] .
'://' :
'';
763 if ( isset( $urlParts[
'host'] ) ) {
764 if ( isset( $urlParts[
'user'] ) ) {
765 $result .= $urlParts[
'user'];
766 if ( isset( $urlParts[
'pass'] ) ) {
767 $result .=
':' . $urlParts[
'pass'];
772 $result .= $urlParts[
'host'];
774 if ( isset( $urlParts[
'port'] ) ) {
775 $result .=
':' . $urlParts[
'port'];
779 if ( isset( $urlParts[
'path'] ) ) {
780 $result .= $urlParts[
'path'];
783 if ( isset( $urlParts[
'query'] ) && $urlParts[
'query'] !==
'' ) {
784 $result .=
'?' . $urlParts[
'query'];
787 if ( isset( $urlParts[
'fragment'] ) ) {
788 $result .=
'#' . $urlParts[
'fragment'];
801 private function isLocalURL(
$url ) {
802 if ( !$this->localVirtualHosts ) {
809 if ( preg_match(
'!^https?://([\w.-]+)[/:].*$!',
$url,
$matches ) ) {
812 $domainParts = explode(
'.', $host );
814 $domainParts = array_reverse( $domainParts );
817 $countParts = count( $domainParts );
818 for ( $i = 0; $i < $countParts; $i++ ) {
819 $domainPart = $domainParts[$i];
821 $domain = $domainPart;
823 $domain = $domainPart .
'.' . $domain;
826 if ( in_array( $domain, $this->localVirtualHosts ) ) {
841 private function getSelectTimeout( $opts ) {
842 $connTimeout = $opts[
'connTimeout'] ?? $this->connTimeout;
843 $reqTimeout = $opts[
'reqTimeout'] ?? $this->reqTimeout;
844 $timeouts = array_filter( [ $connTimeout, $reqTimeout ] );
845 if ( count( $timeouts ) === 0 ) {
849 $selectTimeout = min( $timeouts ) * self::TIMEOUT_ACCURACY_FACTOR;
851 if ( $selectTimeout < 10e-6 ) {
852 $selectTimeout = 10e-6;
854 return $selectTimeout;
860 public function setLogger( LoggerInterface $logger ): void {
861 $this->logger = $logger;
866 curl_multi_close( $this->cmh );