67 private $jobQueueGroup;
70 private $readOnlyMode;
76 private $statsFactory;
85 private const MAX_ALLOWED_LAG = 3;
87 private const SYNC_TIMEOUT = self::MAX_ALLOWED_LAG;
89 private const LAG_CHECK_PERIOD = 1.0;
91 private const ERROR_BACKOFF_TTL = 1;
93 private const READONLY_BACKOFF_TTL = 30;
99 $this->debug = $debug;
119 LoggerInterface $logger
122 $this->options = $serviceOptions;
123 $this->lbFactory = $lbFactory;
124 $this->jobQueueGroup = $jobQueueGroup;
125 $this->readOnlyMode = $readOnlyMode;
126 $this->linkCache = $linkCache;
127 $this->statsFactory = $statsFactory;
128 $this->logger = $logger;
156 public function run( array $options ) {
157 $type = $options[
'type'] ??
false;
158 $maxJobs = $options[
'maxJobs'] ??
false;
159 $maxTime = $options[
'maxTime'] ??
false;
160 $throttle = $options[
'throttle'] ??
true;
165 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
167 if ( $type !==
false && !isset( $jobClasses[$type] ) ) {
169 $response[
'reached'] =
'none-possible';
173 if ( $this->readOnlyMode->isReadOnly() ) {
175 $response[
'reached'] =
'read-only';
179 [ , $maxLag ] = $this->lbFactory->getMainLB()->getMaxLag();
180 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
182 $response[
'reached'] =
'replica-lag-limit';
187 $this->lbFactory->getTransactionProfiler()
188 ->setExpectations( $profilerLimits[
'JobRunner'], __METHOD__ );
191 if ( $this->lbFactory->hasTransactionRound() ) {
192 throw new LogicException( __METHOD__ .
' called with an active transaction round.' );
200 $loopStartTime = microtime(
true );
207 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
208 $backoffKeys = $throttle ? array_keys( $backoffs ) : [];
211 if ( $type ===
false ) {
213 $job = $this->jobQueueGroup
218 $job = in_array( $type, $backoffKeys ) ? false : $this->jobQueueGroup->pop( $type );
223 $jType =
$job->getType();
226 $ttw = $this->getBackoffTimeToWait(
$job );
230 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
231 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
237 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
238 $this->jobQueueGroup->ack(
$job );
242 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
243 $ttw = max( $ttw, $this->getErrorBackoffTTL( $info[
'caught'] ) );
244 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
247 $response[
'jobs'][] = [
249 'status' => ( $info[
'status'] === false ) ?
'failed' :
'ok',
250 'error' => $info[
'error'],
251 'time' => $info[
'timeMs']
253 $timeMsTotal += $info[
'timeMs'];
256 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
257 $response[
'reached'] =
'job-limit';
259 } elseif ( $maxTime && ( microtime(
true ) - $loopStartTime ) > $maxTime ) {
260 $response[
'reached'] =
'time-limit';
268 if ( in_array( DBConnectionError::class, $info[
'caught'],
true ) ) {
269 $response[
'reached'] =
'exception';
276 $timePassed = microtime(
true ) - $lastSyncTime;
277 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
278 $opts = [
'ifWritesSince' => $lastSyncTime,
'timeout' => self::SYNC_TIMEOUT ];
279 if ( !$this->lbFactory->waitForReplication( $opts ) ) {
280 $response[
'reached'] =
'replica-lag-limit';
283 $lastSyncTime = microtime(
true );
287 if ( !$this->checkMemoryOK() ) {
288 $response[
'reached'] =
'memory-limit';
295 if ( $backoffDeltas ) {
296 $this->syncBackoffDeltas( $backoffs, $backoffDeltas,
'wait' );
299 $response[
'backoffs'] = $backoffs;
300 $response[
'elapsed'] = $timeMsTotal;
324 $telemetry = Telemetry::getInstance();
325 $oldRequestId = $telemetry->getRequestId();
327 if (
$job->getRequestId() !==
null ) {
329 $telemetry->overrideRequestId(
$job->getRequestId() );
334 $telemetry->regenerateRequestId();
337 $oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
339 return $this->doExecuteJob(
$job );
341 $this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
342 $telemetry->overrideRequestId( $oldRequestId );
355 $jType =
$job->getType();
356 $msg =
$job->toString() .
" STARTING";
357 $this->logger->debug( $msg, [
'job_type' =>
$job->getType() ] );
358 $this->debugCallback( $msg );
362 $this->linkCache->clear();
366 $rssStart = $this->getMaxRssKb();
367 $jobStartTime = microtime(
true );
369 $fnameTrxOwner = get_class(
$job ) .
'::run';
371 if (
$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
372 $this->lbFactory->commitPrimaryChanges( $fnameTrxOwner );
374 $this->lbFactory->beginPrimaryChanges( $fnameTrxOwner );
378 $scope = LoggerFactory::getContext()->addScoped( [
379 'context.job_type' => $jType,
381 $status =
$job->run();
382 $error =
$job->getLastError();
383 ScopedCallback::consume( $scope );
386 $this->lbFactory->commitPrimaryChanges(
392 DeferredUpdates::doUpdates();
393 }
catch ( Throwable $e ) {
394 MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );
396 $error = get_class( $e ) .
': ' . $e->getMessage() .
' in '
397 . $e->getFile() .
' on line ' . $e->getLine();
398 $caught[] = get_class( $e );
402 $job->tearDown( $status );
403 }
catch ( Throwable $e ) {
404 MWExceptionHandler::logException( $e );
407 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
408 $rssEnd = $this->getMaxRssKb();
411 $readyTs =
$job->getReadyTimestamp();
413 $pickupDelay = max( 0, $jobStartTime - $readyTs );
414 $this->statsFactory->getTiming(
'jobqueue_pickup_delay_seconds' )
415 ->setLabel(
'jobtype', $jType )
417 "jobqueue_pickup_delay_all_mean",
"jobqueue.pickup_delay.$jType"
419 ->observe( 1000 * $pickupDelay );
422 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
423 if ( $rootTimestamp ) {
424 $age = max( 0, $jobStartTime - (
int)
wfTimestamp( TS_UNIX, $rootTimestamp ) );
426 $this->statsFactory->getTiming(
"jobqueue_pickup_root_age_seconds" )
427 ->setLabel(
'jobtype', $jType )
428 ->copyToStatsdAt(
"jobqueue.pickup_root_age.$jType" )
429 ->observe( 1000 * $age );
432 $this->statsFactory->getTiming(
'jobqueue_runtime_seconds' )
433 ->setLabel(
'jobtype', $jType )
434 ->copyToStatsdAt(
"jobqueue.run.$jType" )
435 ->observe( $timeMs );
437 if ( $rssStart && $rssEnd ) {
438 $this->statsFactory->getCounter(
'jobqueue_rss_delta_total' )
439 ->setLabel(
'rss_delta', $jType )
440 ->copyToStatsdAt(
"jobqueue.rss_delta.$jType" )
441 ->incrementBy( $rssEnd - $rssStart );
444 if ( $status ===
false ) {
445 $msg =
$job->toString() .
" t={job_duration} error={job_error}";
446 $this->logger->error( $msg, [
447 'job_type' =>
$job->getType(),
448 'job_duration' => $timeMs,
449 'job_error' => $error,
452 $msg =
$job->toString() .
" t=$timeMs error={$error}";
453 $this->debugCallback( $msg );
455 $msg =
$job->toString() .
" t={job_duration} good";
456 $this->logger->info( $msg, [
457 'job_type' =>
$job->getType(),
458 'job_duration' => $timeMs,
461 $msg =
$job->toString() .
" t=$timeMs good";
462 $this->debugCallback( $msg );
477 private function getErrorBackoffTTL( array $caught ) {
478 return in_array( DBReadOnlyError::class, $caught )
479 ? self::READONLY_BACKOFF_TTL
480 : self::ERROR_BACKOFF_TTL;
486 private function getMaxRssKb() {
487 $info = getrusage( 0 );
489 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] : null;
497 private function getBackoffTimeToWait( RunnableJob
$job ) {
500 if ( !isset( $throttling[
$job->getType()] ) ||
$job instanceof DuplicateJob ) {
504 $itemsPerSecond = $throttling[
$job->getType()];
505 if ( $itemsPerSecond <= 0 ) {
510 if (
$job->workItemCount() > 0 ) {
511 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
513 $seconds = floor( $exactSeconds );
514 $remainder = $exactSeconds - $seconds;
515 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
518 return (
int)$seconds;
529 private function loadBackoffs( array $backoffs, $mode =
'wait' ) {
530 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
531 if ( is_file( $file ) ) {
532 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
533 $handle = fopen( $file,
'rb' );
534 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
538 $content = stream_get_contents( $handle );
539 flock( $handle, LOCK_UN );
541 $ctime = microtime(
true );
542 $cBackoffs = json_decode( $content,
true ) ?: [];
543 foreach ( $cBackoffs as $type => $timestamp ) {
544 if ( $timestamp < $ctime ) {
545 unset( $cBackoffs[$type] );
566 private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode =
'wait' ) {
568 return $this->loadBackoffs( $backoffs, $mode );
571 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
572 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
573 $handle = fopen( $file,
'wb+' );
574 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
578 $ctime = microtime(
true );
579 $content = stream_get_contents( $handle );
580 $cBackoffs = json_decode( $content,
true ) ?: [];
581 foreach ( $deltas as $type => $seconds ) {
582 $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
583 ? $cBackoffs[$type] + $seconds
586 foreach ( $cBackoffs as $type => $timestamp ) {
587 if ( $timestamp < $ctime ) {
588 unset( $cBackoffs[$type] );
591 ftruncate( $handle, 0 );
592 fwrite( $handle, json_encode( $cBackoffs ) );
593 flock( $handle, LOCK_UN );
606 private function checkMemoryOK() {
607 static $maxBytes =
null;
608 if ( $maxBytes ===
null ) {
610 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
611 [ , $num, $unit ] = $m;
612 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
613 $maxBytes = (int)$num * $conv[strtolower( $unit )];
618 $usedBytes = memory_get_usage();
619 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
620 $msg =
"Detected excessive memory usage ({used_bytes}/{max_bytes}).";
621 $this->logger->error( $msg, [
622 'used_bytes' => $usedBytes,
623 'max_bytes' => $maxBytes,
626 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
627 $this->debugCallback( $msg );
639 private function debugCallback( $msg ) {
640 if ( $this->debug ) {
641 ( $this->debug )(
wfTimestamp( TS_DB ) .
" $msg\n" );