47 private const SENSITIVE_HEADERS =
'/(^|-|_)(authorization|auth|password|cookie)($|-|_)/';
74 protected $userAgent =
'wikimedia/multi-http-client v1.1';
82 private const TIMEOUT_ACCURACY_FACTOR = 0.1;
109 if ( isset( $options[
'caBundlePath'] ) ) {
110 $this->caBundlePath = $options[
'caBundlePath'];
111 if ( !file_exists( $this->caBundlePath ) ) {
112 throw new InvalidArgumentException(
"Cannot find CA bundle: " . $this->caBundlePath );
116 'connTimeout',
'maxConnTimeout',
'reqTimeout',
'maxReqTimeout',
117 'usePipelining',
'maxConnsPerHost',
'proxy',
'userAgent',
'logger',
118 'localProxy',
'localVirtualHosts',
'headers',
'telemetry'
120 foreach ( $opts as $key ) {
121 if ( isset( $options[$key] ) ) {
122 $this->$key = $options[$key];
125 $this->logger ??=
new NullLogger;
152 public function run( array $req, array $opts = [],
string $caller = __METHOD__ ) {
153 return $this->
runMulti( [ $req ], $opts, $caller )[0][
'response'];
188 public function runMulti( array $reqs, array $opts = [],
string $caller = __METHOD__ ) {
189 $this->normalizeRequests( $reqs );
192 if ( $this->maxConnTimeout && $opts[
'connTimeout'] > $this->maxConnTimeout ) {
195 if ( $this->maxReqTimeout && $opts[
'reqTimeout'] > $this->maxReqTimeout ) {
200 switch ( $opts[
'httpVersion'] ??
null ) {
202 $opts[
'httpVersion'] = CURL_HTTP_VERSION_1_0;
205 $opts[
'httpVersion'] = CURL_HTTP_VERSION_1_1;
209 $opts[
'httpVersion'] = CURL_HTTP_VERSION_2_0;
212 $opts[
'httpVersion'] = CURL_HTTP_VERSION_NONE;
214 return $this->runMultiCurl( $reqs, $opts, $caller );
216 # TODO: Add handling for httpVersion option
217 return $this->runMultiHttp( $reqs, $opts );
229 return extension_loaded(
'curl' ) && function_exists(
'curl_multi_init' );
250 private function runMultiCurl( array $reqs, array $opts,
string $caller = __METHOD__ ) {
253 $selectTimeout = $this->getSelectTimeout( $opts );
257 foreach ( $reqs as $index => &$req ) {
259 curl_multi_add_handle( $chm, $handles[$index] );
268 $mrc = curl_multi_exec( $chm, $active );
270 if ( $mrc !== CURLM_OK ) {
271 $error = curl_multi_strerror( $mrc );
272 $this->logger->error(
'curl_multi_exec() failed: {error}', [
274 'exception' =>
new RuntimeException(),
281 if ( $active > 0 && curl_multi_select( $chm, $selectTimeout ) === -1 ) {
282 $errno = curl_multi_errno( $chm );
283 $error = curl_multi_strerror( $errno );
284 $this->logger->error(
'curl_multi_select() failed: {error}', [
286 'exception' =>
new RuntimeException(),
290 }
while ( $active > 0 );
292 $queuedMessages =
null;
294 $info = curl_multi_info_read( $chm, $queuedMessages );
295 if ( $info !==
false && $info[
'msg'] === CURLMSG_DONE ) {
299 $infos[(int)$info[
'handle']] = $info;
301 }
while ( $queuedMessages > 0 );
304 foreach ( $reqs as $index => &$req ) {
305 $ch = $handles[$index];
306 curl_multi_remove_handle( $chm, $ch );
308 if ( isset( $infos[(
int)$ch] ) ) {
309 $info = $infos[(int)$ch];
310 $errno = $info[
'result'];
311 if ( $errno !== 0 ) {
312 $req[
'response'][
'error'] =
"(curl error: $errno)";
313 if ( function_exists(
'curl_strerror' ) ) {
314 $req[
'response'][
'error'] .=
" " . curl_strerror( $errno );
316 $this->logger->error(
'Error fetching URL "{url}": {error}', [
317 'url' => $req[
'url'],
318 'error' => $req[
'response'][
'error'],
319 'exception' =>
new RuntimeException(),
323 $this->logger->debug(
324 "HTTP complete: {method} {url} code={response_code} size={size} " .
325 "total={total_time} connect={connect_time}",
327 'method' => $req[
'method'],
328 'url' => $req[
'url'],
329 'response_code' => $req[
'response'][
'code'],
330 'size' => curl_getinfo( $ch, CURLINFO_SIZE_DOWNLOAD ),
331 'total_time' => $this->getCurlTime(
332 $ch, CURLINFO_TOTAL_TIME,
'CURLINFO_TOTAL_TIME_T'
334 'connect_time' => $this->getCurlTime(
335 $ch, CURLINFO_CONNECT_TIME,
'CURLINFO_CONNECT_TIME_T'
341 $req[
'response'][
'error'] =
"(curl error: no status set)";
345 $req[
'response'][0] = $req[
'response'][
'code'];
346 $req[
'response'][1] = $req[
'response'][
'reason'];
347 $req[
'response'][2] = $req[
'response'][
'headers'];
348 $req[
'response'][3] = $req[
'response'][
'body'];
349 $req[
'response'][4] = $req[
'response'][
'error'];
352 if ( isset( $req[
'_closeHandle'] ) ) {
353 fclose( $req[
'_closeHandle'] );
354 unset( $req[
'_closeHandle'] );
377 curl_setopt( $ch, CURLOPT_PROXY, $req[
'proxy'] ?? $this->proxy );
378 curl_setopt( $ch, CURLOPT_CONNECTTIMEOUT_MS, intval( $opts[
'connTimeout'] * 1e3 ) );
379 curl_setopt( $ch, CURLOPT_TIMEOUT_MS, intval( $opts[
'reqTimeout'] * 1e3 ) );
380 curl_setopt( $ch, CURLOPT_FOLLOWLOCATION, 1 );
381 curl_setopt( $ch, CURLOPT_MAXREDIRS, 4 );
382 curl_setopt( $ch, CURLOPT_HEADER, 0 );
383 if ( $this->caBundlePath !==
null ) {
384 curl_setopt( $ch, CURLOPT_SSL_VERIFYPEER,
true );
385 curl_setopt( $ch, CURLOPT_CAINFO, $this->caBundlePath );
387 curl_setopt( $ch, CURLOPT_RETURNTRANSFER, 1 );
390 $query = http_build_query( $req[
'query'],
'',
'&', PHP_QUERY_RFC3986 );
391 if ( $query !=
'' ) {
392 $url .= !str_contains( $req[
'url'],
'?' ) ?
"?$query" :
"&$query";
394 curl_setopt( $ch, CURLOPT_URL,
$url );
395 curl_setopt( $ch, CURLOPT_CUSTOMREQUEST, $req[
'method'] );
396 curl_setopt( $ch, CURLOPT_NOBODY, ( $req[
'method'] ===
'HEAD' ) );
397 curl_setopt( $ch, CURLOPT_HTTP_VERSION, $opts[
'httpVersion'] ?? CURL_HTTP_VERSION_NONE );
399 if ( $req[
'method'] ===
'PUT' ) {
400 curl_setopt( $ch, CURLOPT_PUT, 1 );
402 if ( is_resource( $req[
'body'] ) ) {
403 curl_setopt( $ch, CURLOPT_INFILE, $req[
'body'] );
404 if ( isset( $req[
'headers'][
'content-length'] ) ) {
405 curl_setopt( $ch, CURLOPT_INFILESIZE, $req[
'headers'][
'content-length'] );
406 } elseif ( isset( $req[
'headers'][
'transfer-encoding'] ) &&
407 $req[
'headers'][
'transfer-encoding'] ===
'chunks'
409 curl_setopt( $ch, CURLOPT_UPLOAD,
true );
411 throw new InvalidArgumentException(
"Missing 'Content-Length' or 'Transfer-Encoding' header." );
413 } elseif ( $req[
'body'] !==
'' ) {
414 $fp = fopen(
"php://temp",
"wb+" );
415 fwrite( $fp, $req[
'body'], strlen( $req[
'body'] ) );
417 curl_setopt( $ch, CURLOPT_INFILE, $fp );
418 curl_setopt( $ch, CURLOPT_INFILESIZE, strlen( $req[
'body'] ) );
419 $req[
'_closeHandle'] = $fp;
421 curl_setopt( $ch, CURLOPT_INFILESIZE, 0 );
423 curl_setopt( $ch, CURLOPT_READFUNCTION,
424 static function ( $ch, $fd, $length ) {
425 return (
string)fread( $fd, $length );
428 } elseif ( $req[
'method'] ===
'POST' ) {
429 curl_setopt( $ch, CURLOPT_POST, 1 );
430 curl_setopt( $ch, CURLOPT_POSTFIELDS, $req[
'body'] );
433 if ( is_resource( $req[
'body'] ) || $req[
'body'] !==
'' ) {
434 throw new InvalidArgumentException(
"HTTP body specified for a non PUT/POST request." );
436 $req[
'headers'][
'content-length'] = 0;
439 if ( !isset( $req[
'headers'][
'user-agent'] ) ) {
444 foreach ( $req[
'headers'] as $name => $value ) {
445 if ( str_contains( $name,
':' ) ) {
446 throw new InvalidArgumentException(
"Header name must not contain colon-space." );
448 $headers[] = $name .
': ' . trim( $value );
450 curl_setopt( $ch, CURLOPT_HTTPHEADER,
$headers );
452 curl_setopt( $ch, CURLOPT_HEADERFUNCTION,
453 static function ( $ch, $header ) use ( &$req ) {
454 if ( !empty( $req[
'flags'][
'relayResponseHeaders'] ) && trim( $header ) !==
'' ) {
457 $length = strlen( $header );
459 if ( preg_match(
"/^(HTTP\/(?:1\.[01]|2)) (\d{3}) (.*)/", $header,
$matches ) ) {
460 $req[
'response'][
'code'] = (int)
$matches[2];
461 $req[
'response'][
'reason'] = trim(
$matches[3] );
464 $req[
'response'][
'headers'] = [];
467 if ( !str_contains( $header,
":" ) ) {
470 [ $name, $value ] = explode(
":", $header, 2 );
471 $name = strtolower( $name );
472 $value = trim( $value );
473 if ( isset( $req[
'response'][
'headers'][$name] ) ) {
474 $req[
'response'][
'headers'][$name] .=
', ' . $value;
476 $req[
'response'][
'headers'][$name] = $value;
483 $hasOutputStream = isset( $req[
'stream'] );
484 curl_setopt( $ch, CURLOPT_WRITEFUNCTION,
485 static function ( $ch, $data ) use ( &$req, $hasOutputStream ) {
486 if ( $hasOutputStream ) {
488 return fwrite( $req[
'stream'], $data );
491 $req[
'response'][
'body'] .= $data;
493 return strlen( $data );
509 $cmh = curl_multi_init();
512 curl_multi_setopt(
$cmh, CURLMOPT_MAXCONNECTS, (
int)$this->maxConnsPerHost );
516 $curlVersion = curl_version()[
'version'];
519 if ( version_compare( $curlVersion,
'7.30.0',
'>=' ) ) {
522 curl_multi_setopt( $this->cmh, CURLMOPT_MAX_HOST_CONNECTIONS, (
int)$maxHostConns );
525 if ( $opts[
'usePipelining'] ?? $this->usePipelining ) {
526 if ( version_compare( $curlVersion,
'7.43',
'<' ) ) {
529 } elseif ( version_compare( $curlVersion,
'7.62',
'<' ) ) {
531 $pipelining = CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX;
534 $pipelining = CURLPIPE_MULTIPLEX;
538 @curl_multi_setopt( $this->cmh, CURLMOPT_PIPELINING, $pipelining );
554 private function getCurlTime( $ch, $oldOption, $newConstName ): string {
555 if ( defined( $newConstName ) ) {
556 return sprintf(
"%.6F", curl_getinfo( $ch, constant( $newConstName ) ) / 1e6 );
558 return (
string)curl_getinfo( $ch, $oldOption );
577 private function runMultiHttp( array $reqs, array $opts = [] ) {
579 'timeout' => $opts[
'reqTimeout'] ?? $this->reqTimeout,
580 'connectTimeout' => $opts[
'connTimeout'] ?? $this->connTimeout,
581 'logger' => $this->logger,
582 'caInfo' => $this->caBundlePath,
584 foreach ( $reqs as &$req ) {
585 $reqOptions = $httpOptions + [
586 'method' => $req[
'method'],
587 'proxy' => $req[
'proxy'] ?? $this->proxy,
588 'userAgent' => $req[
'headers'][
'user-agent'] ?? $this->userAgent,
589 'postData' => $req[
'body'],
593 $query = http_build_query( $req[
'query'],
'',
'&', PHP_QUERY_RFC3986 );
594 if ( $query !=
'' ) {
595 $url .= !str_contains( $req[
'url'],
'?' ) ?
"?$query" :
"&$query";
598 $httpRequest = MediaWikiServices::getInstance()->getHttpRequestFactory()->create(
599 $url, $reqOptions, __METHOD__ );
600 $httpRequest->setLogger( $this->logger );
601 foreach ( $req[
'headers'] as $header => $value ) {
602 $httpRequest->setHeader( $header, $value );
604 $sv = $httpRequest->execute()->getStatusValue();
606 $respHeaders = array_map(
607 static function ( $v ) {
608 return implode(
', ', $v );
610 $httpRequest->getResponseHeaders() );
613 'code' => $httpRequest->getStatus(),
615 'headers' => $respHeaders,
616 'body' => $httpRequest->getContent(),
620 if ( !$sv->isOK() ) {
621 $svErrors = $sv->getErrors();
622 if ( isset( $svErrors[0] ) ) {
623 $req[
'response'][
'error'] = $svErrors[0][
'message'];
626 if ( isset( $svErrors[0][
'params'][0] ) ) {
627 if ( is_numeric( $svErrors[0][
'params'][0] ) ) {
628 if ( isset( $svErrors[0][
'params'][1] ) ) {
630 $req[
'response'][
'reason'] = $svErrors[0][
'params'][1];
633 $req[
'response'][
'reason'] = $svErrors[0][
'params'][0];
639 $req[
'response'][0] = $req[
'response'][
'code'];
640 $req[
'response'][1] = $req[
'response'][
'reason'];
641 $req[
'response'][2] = $req[
'response'][
'headers'];
642 $req[
'response'][3] = $req[
'response'][
'body'];
643 $req[
'response'][4] = $req[
'response'][
'error'];
654 private function normalizeHeaders( array $headers ): array {
656 foreach ( $headers as $name => $value ) {
657 $normalized[strtolower( $name )] = $value;
667 private function normalizeRequests( array &$reqs ) {
668 foreach ( $reqs as &$req ) {
676 if ( isset( $req[0] ) ) {
677 $req[
'method'] = $req[0];
680 if ( isset( $req[1] ) ) {
681 $req[
'url'] = $req[1];
684 if ( !isset( $req[
'method'] ) ) {
685 throw new InvalidArgumentException(
"Request has no 'method' field set." );
686 } elseif ( !isset( $req[
'url'] ) ) {
687 throw new InvalidArgumentException(
"Request has no 'url' field set." );
689 if ( $this->localProxy !==
false && $this->isLocalURL( $req[
'url'] ) ) {
690 $this->useReverseProxy( $req, $this->localProxy );
692 $req[
'query'] ??= [];
693 $req[
'headers'] = $this->normalizeHeaders(
696 $this->telemetry ? $this->telemetry->getRequestHeaders() : [],
697 $req[
'headers'] ?? []
701 if ( !isset( $req[
'body'] ) ) {
703 $req[
'headers'][
'content-length'] = 0;
706 $logHeaders = $req[
'headers'];
707 foreach ( $logHeaders as $header => $value ) {
708 if ( preg_match( self::SENSITIVE_HEADERS, $header ) === 1 ) {
709 $logHeaders[$header] =
'[redacted]';
712 $this->logger->debug(
"HTTP start: {method} {url}",
714 'method' => $req[
'method'],
715 'url' => $req[
'url'],
716 'headers' => $logHeaders,
719 $req[
'flags'] ??= [];
723 private function useReverseProxy( array &$req,
string $proxy ) {
724 $parsedProxy = parse_url( $proxy );
725 if ( $parsedProxy ===
false ) {
726 throw new InvalidArgumentException(
"Invalid reverseProxy configured: $proxy" );
728 $parsedUrl = parse_url( $req[
'url'] );
729 if ( $parsedUrl ===
false ) {
730 throw new InvalidArgumentException(
"Invalid url specified: {$req['url']}" );
734 $req[
'headers'][
'Host'] = $parsedUrl[
'host'];
737 $parsedUrl[
'scheme'] = $parsedProxy[
'scheme'];
739 $parsedUrl[
'host'] = $parsedProxy[
'host'];
740 if ( isset( $parsedProxy[
'port'] ) ) {
741 $parsedUrl[
'port'] = $parsedProxy[
'port'];
743 unset( $parsedUrl[
'port'] );
745 $req[
'url'] = self::assembleUrl( $parsedUrl );
748 $req[
'proxy'] =
false;
761 private static function assembleUrl( array $urlParts ): string {
762 $result = isset( $urlParts[
'scheme'] ) ? $urlParts[
'scheme'] .
'://' :
'';
764 if ( isset( $urlParts[
'host'] ) ) {
765 if ( isset( $urlParts[
'user'] ) ) {
766 $result .= $urlParts[
'user'];
767 if ( isset( $urlParts[
'pass'] ) ) {
768 $result .=
':' . $urlParts[
'pass'];
773 $result .= $urlParts[
'host'];
775 if ( isset( $urlParts[
'port'] ) ) {
776 $result .=
':' . $urlParts[
'port'];
780 if ( isset( $urlParts[
'path'] ) ) {
781 $result .= $urlParts[
'path'];
784 if ( isset( $urlParts[
'query'] ) && $urlParts[
'query'] !==
'' ) {
785 $result .=
'?' . $urlParts[
'query'];
788 if ( isset( $urlParts[
'fragment'] ) ) {
789 $result .=
'#' . $urlParts[
'fragment'];
802 private function isLocalURL(
$url ) {
803 if ( !$this->localVirtualHosts ) {
810 if ( preg_match(
'!^https?://([\w.-]+)[/:].*$!',
$url,
$matches ) ) {
813 $domainParts = explode(
'.', $host );
815 $domainParts = array_reverse( $domainParts );
818 $countParts = count( $domainParts );
819 for ( $i = 0; $i < $countParts; $i++ ) {
820 $domainPart = $domainParts[$i];
822 $domain = $domainPart;
824 $domain = $domainPart .
'.' . $domain;
827 if ( in_array( $domain, $this->localVirtualHosts ) ) {
842 private function getSelectTimeout( $opts ) {
843 $connTimeout = $opts[
'connTimeout'] ?? $this->connTimeout;
844 $reqTimeout = $opts[
'reqTimeout'] ?? $this->reqTimeout;
845 $timeouts = array_filter( [ $connTimeout, $reqTimeout ] );
846 if ( count( $timeouts ) === 0 ) {
850 $selectTimeout = min( $timeouts ) * self::TIMEOUT_ACCURACY_FACTOR;
852 if ( $selectTimeout < 10e-6 ) {
853 $selectTimeout = 10e-6;
855 return $selectTimeout;
861 public function setLogger( LoggerInterface $logger ): void {
862 $this->logger = $logger;
867 curl_multi_close( $this->cmh );