45 MainConfigNames::JobBackoffThrottling,
46 MainConfigNames::JobClasses,
47 MainConfigNames::MaxJobDBWriteDuration,
48 MainConfigNames::TrxProfilerLimits,
58 private $jobQueueGroup;
61 private $readOnlyMode;
76 private const MAX_ALLOWED_LAG = 3;
78 private const SYNC_TIMEOUT = self::MAX_ALLOWED_LAG;
80 private const LAG_CHECK_PERIOD = 1.0;
82 private const ERROR_BACKOFF_TTL = 1;
84 private const READONLY_BACKOFF_TTL = 30;
90 $this->debug = $debug;
109 StatsdDataFactoryInterface $statsdDataFactory,
110 LoggerInterface $logger
113 $this->options = $serviceOptions;
114 $this->lbFactory = $lbFactory;
115 $this->jobQueueGroup = $jobQueueGroup;
116 $this->readOnlyMode = $readOnlyMode;
117 $this->linkCache = $linkCache;
118 $this->stats = $statsdDataFactory;
119 $this->logger = $logger;
147 public function run( array $options ) {
148 $type = $options[
'type'] ??
false;
149 $maxJobs = $options[
'maxJobs'] ??
false;
150 $maxTime = $options[
'maxTime'] ??
false;
151 $throttle = $options[
'throttle'] ??
true;
153 $jobClasses = $this->options->get( MainConfigNames::JobClasses );
154 $profilerLimits = $this->options->get( MainConfigNames::TrxProfilerLimits );
156 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
158 if ( $type !==
false && !isset( $jobClasses[$type] ) ) {
160 $response[
'reached'] =
'none-possible';
164 if ( $this->readOnlyMode->isReadOnly() ) {
166 $response[
'reached'] =
'read-only';
170 [ , $maxLag ] = $this->lbFactory->getMainLB()->getMaxLag();
171 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
173 $response[
'reached'] =
'replica-lag-limit';
178 $this->lbFactory->getTransactionProfiler()
179 ->setExpectations( $profilerLimits[
'JobRunner'], __METHOD__ );
182 if ( $this->lbFactory->hasTransactionRound() ) {
183 throw new LogicException( __METHOD__ .
' called with an active transaction round.' );
191 $loopStartTime = microtime(
true );
198 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
199 $backoffKeys = $throttle ? array_keys( $backoffs ) : [];
202 if ( $type ===
false ) {
204 $job = $this->jobQueueGroup
205 ->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE, $backoffKeys );
209 $job = in_array( $type, $backoffKeys ) ? false : $this->jobQueueGroup->pop( $type );
214 $jType =
$job->getType();
217 $ttw = $this->getBackoffTimeToWait(
$job );
221 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
222 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
228 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
229 $this->jobQueueGroup->ack(
$job );
233 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
234 $ttw = max( $ttw, $this->getErrorBackoffTTL( $info[
'caught'] ) );
235 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
238 $response[
'jobs'][] = [
240 'status' => ( $info[
'status'] === false ) ?
'failed' :
'ok',
241 'error' => $info[
'error'],
242 'time' => $info[
'timeMs']
244 $timeMsTotal += $info[
'timeMs'];
247 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
248 $response[
'reached'] =
'job-limit';
250 } elseif ( $maxTime && ( microtime(
true ) - $loopStartTime ) > $maxTime ) {
251 $response[
'reached'] =
'time-limit';
259 if ( in_array( DBConnectionError::class, $info[
'caught'],
true ) ) {
260 $response[
'reached'] =
'exception';
267 $timePassed = microtime(
true ) - $lastSyncTime;
268 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
269 $opts = [
'ifWritesSince' => $lastSyncTime,
'timeout' => self::SYNC_TIMEOUT ];
270 if ( !$this->lbFactory->waitForReplication( $opts ) ) {
271 $response[
'reached'] =
'replica-lag-limit';
274 $lastSyncTime = microtime(
true );
278 if ( !$this->checkMemoryOK() ) {
279 $response[
'reached'] =
'memory-limit';
286 if ( $backoffDeltas ) {
287 $this->syncBackoffDeltas( $backoffs, $backoffDeltas,
'wait' );
290 $response[
'backoffs'] = $backoffs;
291 $response[
'elapsed'] = $timeMsTotal;
315 $telemetry = Telemetry::getInstance();
316 $oldRequestId = $telemetry->getRequestId();
318 if (
$job->getRequestId() !==
null ) {
320 $telemetry->overrideRequestId(
$job->getRequestId() );
325 $telemetry->regenerateRequestId();
328 $oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
330 return $this->doExecuteJob(
$job );
332 $this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
333 $telemetry->overrideRequestId( $oldRequestId );
346 $jType =
$job->getType();
347 $msg =
$job->toString() .
" STARTING";
348 $this->logger->debug( $msg, [
'job_type' =>
$job->getType() ] );
349 $this->debugCallback( $msg );
353 $this->linkCache->clear();
357 $rssStart = $this->getMaxRssKb();
358 $jobStartTime = microtime(
true );
360 $fnameTrxOwner = get_class(
$job ) .
'::run';
362 if (
$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
363 $this->lbFactory->commitPrimaryChanges( $fnameTrxOwner );
365 $this->lbFactory->beginPrimaryChanges( $fnameTrxOwner );
368 $this->lbFactory->flushReplicaSnapshots( $fnameTrxOwner );
369 $status =
$job->run();
370 $error =
$job->getLastError();
372 $this->lbFactory->commitPrimaryChanges(
375 $this->options->get( MainConfigNames::MaxJobDBWriteDuration )
378 DeferredUpdates::doUpdates();
379 }
catch ( Throwable $e ) {
382 $error = get_class( $e ) .
': ' . $e->getMessage() .
' in '
383 . $e->getFile() .
' on line ' . $e->getLine();
384 $caught[] = get_class( $e );
388 $job->tearDown( $status );
389 }
catch ( Throwable $e ) {
393 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
394 $rssEnd = $this->getMaxRssKb();
397 $readyTs =
$job->getReadyTimestamp();
399 $pickupDelay = max( 0, $jobStartTime - $readyTs );
400 $this->stats->timing(
'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
401 $this->stats->timing(
"jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
404 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
405 if ( $rootTimestamp ) {
406 $age = max( 0, $jobStartTime - (
int)
wfTimestamp( TS_UNIX, $rootTimestamp ) );
407 $this->stats->timing(
"jobqueue.pickup_root_age.$jType", 1000 * $age );
410 $this->stats->timing(
"jobqueue.run.$jType", $timeMs );
412 if ( $rssStart && $rssEnd ) {
413 $this->stats->updateCount(
"jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
416 if ( $status ===
false ) {
417 $msg =
$job->toString() .
" t={job_duration} error={job_error}";
418 $this->logger->error( $msg, [
419 'job_type' =>
$job->getType(),
420 'job_duration' => $timeMs,
421 'job_error' => $error,
424 $msg =
$job->toString() .
" t=$timeMs error={$error}";
425 $this->debugCallback( $msg );
427 $msg =
$job->toString() .
" t={job_duration} good";
428 $this->logger->info( $msg, [
429 'job_type' =>
$job->getType(),
430 'job_duration' => $timeMs,
433 $msg =
$job->toString() .
" t=$timeMs good";
434 $this->debugCallback( $msg );
449 private function getErrorBackoffTTL( array $caught ) {
450 return in_array( DBReadOnlyError::class, $caught )
451 ? self::READONLY_BACKOFF_TTL
452 : self::ERROR_BACKOFF_TTL;
458 private function getMaxRssKb() {
459 $info = getrusage( 0 );
461 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] : null;
470 $throttling = $this->options->get( MainConfigNames::JobBackoffThrottling );
476 $itemsPerSecond = $throttling[
$job->getType()];
477 if ( $itemsPerSecond <= 0 ) {
482 if (
$job->workItemCount() > 0 ) {
483 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
485 $seconds = floor( $exactSeconds );
486 $remainder = $exactSeconds - $seconds;
487 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
490 return (
int)$seconds;
501 private function loadBackoffs( array $backoffs, $mode =
'wait' ) {
502 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
503 if ( is_file( $file ) ) {
504 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
505 $handle = fopen( $file,
'rb' );
506 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
510 $content = stream_get_contents( $handle );
511 flock( $handle, LOCK_UN );
513 $ctime = microtime(
true );
514 $cBackoffs = json_decode( $content,
true ) ?: [];
515 foreach ( $cBackoffs as $type => $timestamp ) {
516 if ( $timestamp < $ctime ) {
517 unset( $cBackoffs[$type] );
538 private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode =
'wait' ) {
540 return $this->loadBackoffs( $backoffs, $mode );
543 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
544 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
545 $handle = fopen( $file,
'wb+' );
546 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
550 $ctime = microtime(
true );
551 $content = stream_get_contents( $handle );
552 $cBackoffs = json_decode( $content,
true ) ?: [];
553 foreach ( $deltas as $type => $seconds ) {
554 $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
555 ? $cBackoffs[$type] + $seconds
558 foreach ( $cBackoffs as $type => $timestamp ) {
559 if ( $timestamp < $ctime ) {
560 unset( $cBackoffs[$type] );
563 ftruncate( $handle, 0 );
564 fwrite( $handle, json_encode( $cBackoffs ) );
565 flock( $handle, LOCK_UN );
578 private function checkMemoryOK() {
579 static $maxBytes =
null;
580 if ( $maxBytes ===
null ) {
582 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
583 [ , $num, $unit ] = $m;
584 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
585 $maxBytes = (int)$num * $conv[strtolower( $unit )];
590 $usedBytes = memory_get_usage();
591 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
592 $msg =
"Detected excessive memory usage ({used_bytes}/{max_bytes}).";
593 $this->logger->error( $msg, [
594 'used_bytes' => $usedBytes,
595 'max_bytes' => $maxBytes,
598 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
599 $this->debugCallback( $msg );
611 private function debugCallback( $msg ) {
612 if ( $this->debug ) {
613 call_user_func_array( $this->debug, [
wfTimestamp( TS_DB ) .
" $msg\n" ] );