MediaWiki master
JobRunner.php
Go to the documentation of this file.
1<?php
21namespace MediaWiki\JobQueue;
22
23use LogicException;
33use Psr\Log\LoggerInterface;
34use Throwable;
39use Wikimedia\ScopedCallback;
41
48class JobRunner {
49
53 public const CONSTRUCTOR_OPTIONS = [
58 ];
59
61 private $options;
62
64 private $lbFactory;
65
67 private $jobQueueGroup;
68
70 private $readOnlyMode;
71
73 private $linkCache;
74
76 private $statsFactory;
77
79 private $debug;
80
82 private $logger;
83
85 private const MAX_ALLOWED_LAG = 3;
87 private const SYNC_TIMEOUT = self::MAX_ALLOWED_LAG;
89 private const LAG_CHECK_PERIOD = 1.0;
91 private const ERROR_BACKOFF_TTL = 1;
93 private const READONLY_BACKOFF_TTL = 30;
94
98 public function setDebugHandler( $debug ) {
99 $this->debug = $debug;
100 }
101
112 public function __construct(
113 ServiceOptions $serviceOptions,
114 ILBFactory $lbFactory,
115 JobQueueGroup $jobQueueGroup,
116 ReadOnlyMode $readOnlyMode,
117 LinkCache $linkCache,
118 StatsFactory $statsFactory,
119 LoggerInterface $logger
120 ) {
121 $serviceOptions->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
122 $this->options = $serviceOptions;
123 $this->lbFactory = $lbFactory;
124 $this->jobQueueGroup = $jobQueueGroup;
125 $this->readOnlyMode = $readOnlyMode;
126 $this->linkCache = $linkCache;
127 $this->statsFactory = $statsFactory;
128 $this->logger = $logger;
129 }
130
156 public function run( array $options ) {
157 $type = $options['type'] ?? false;
158 $maxJobs = $options['maxJobs'] ?? false;
159 $maxTime = $options['maxTime'] ?? false;
160 $throttle = $options['throttle'] ?? true;
161
162 $jobClasses = $this->options->get( MainConfigNames::JobClasses );
163 $profilerLimits = $this->options->get( MainConfigNames::TrxProfilerLimits );
164
165 $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
166
167 if ( $type !== false && !isset( $jobClasses[$type] ) ) {
168 // Invalid job type specified
169 $response['reached'] = 'none-possible';
170 return $response;
171 }
172
173 if ( $this->readOnlyMode->isReadOnly() ) {
174 // Any jobs popped off the queue might fail to run and thus might end up lost
175 $response['reached'] = 'read-only';
176 return $response;
177 }
178
179 [ , $maxLag ] = $this->lbFactory->getMainLB()->getMaxLag();
180 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
181 // DB lag is already too high; caller can immediately try other wikis if applicable
182 $response['reached'] = 'replica-lag-limit';
183 return $response;
184 }
185
186 // Narrow DB query expectations for this HTTP request
187 $this->lbFactory->getTransactionProfiler()
188 ->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
189
190 // Error out if an explicit DB transaction round is somehow active
191 if ( $this->lbFactory->hasTransactionRound() ) {
192 throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
193 }
194
195 // Some jobs types should not run until a certain timestamp
196 $backoffs = []; // map of (type => UNIX expiry)
197 $backoffDeltas = []; // map of (type => seconds)
198 $wait = 'wait'; // block to read backoffs the first time
199
200 $loopStartTime = microtime( true );
201 $jobsPopped = 0;
202 $timeMsTotal = 0;
203 $lastSyncTime = 1; // initialize "last sync check timestamp" to "ages ago"
204 // Keep popping and running jobs until there are no more...
205 do {
206 // Sync the persistent backoffs with concurrent runners
207 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
208 $backoffKeys = $throttle ? array_keys( $backoffs ) : [];
209 $wait = 'nowait'; // less important now
210
211 if ( $type === false ) {
212 // Treat the default job type queues as a single queue and pop off a job
213 $job = $this->jobQueueGroup
215 } else {
216 // Pop off a job from the specified job type queue unless the execution of
217 // that type of job is currently rate-limited by the back-off list
218 $job = in_array( $type, $backoffKeys ) ? false : $this->jobQueueGroup->pop( $type );
219 }
220
221 if ( $job ) {
222 ++$jobsPopped;
223 $jType = $job->getType();
224
225 // Back off of certain jobs for a while (for throttling and for errors)
226 $ttw = $this->getBackoffTimeToWait( $job );
227 if ( $ttw > 0 ) {
228 // Always add the delta for other runners in case the time running the
229 // job negated the backoff for each individually but not collectively.
230 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
231 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
232 }
233
234 $info = $this->executeJob( $job );
235
236 // Mark completed or "one shot only" jobs as resolved
237 if ( $info['status'] !== false || !$job->allowRetries() ) {
238 $this->jobQueueGroup->ack( $job );
239 }
240
241 // Back off of certain jobs for a while (for throttling and for errors)
242 if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
243 $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['caught'] ) );
244 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
245 }
246
247 $response['jobs'][] = [
248 'type' => $jType,
249 'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
250 'error' => $info['error'],
251 'time' => $info['timeMs']
252 ];
253 $timeMsTotal += $info['timeMs'];
254
255 // Break out if we hit the job count or wall time limits
256 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
257 $response['reached'] = 'job-limit';
258 break;
259 } elseif ( $maxTime && ( microtime( true ) - $loopStartTime ) > $maxTime ) {
260 $response['reached'] = 'time-limit';
261 break;
262 }
263
264 // Stop if we caught a DBConnectionError. In theory it would be
265 // possible to explicitly reconnect, but the present behaviour
266 // is to just throw more exceptions every time something database-
267 // related is attempted.
268 if ( in_array( DBConnectionError::class, $info['caught'], true ) ) {
269 $response['reached'] = 'exception';
270 break;
271 }
272
273 // Don't let any of the main DB replica DBs get backed up.
274 // This only waits for so long before exiting and letting
275 // other wikis in the farm (on different masters) get a chance.
276 $timePassed = microtime( true ) - $lastSyncTime;
277 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
278 $opts = [ 'ifWritesSince' => $lastSyncTime, 'timeout' => self::SYNC_TIMEOUT ];
279 if ( !$this->lbFactory->waitForReplication( $opts ) ) {
280 $response['reached'] = 'replica-lag-limit';
281 break;
282 }
283 $lastSyncTime = microtime( true );
284 }
285
286 // Abort if nearing OOM to avoid erroring out in the middle of a job
287 if ( !$this->checkMemoryOK() ) {
288 $response['reached'] = 'memory-limit';
289 break;
290 }
291 }
292 } while ( $job );
293
294 // Sync the persistent backoffs for the next runJobs.php pass
295 if ( $backoffDeltas ) {
296 $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
297 }
298
299 $response['backoffs'] = $backoffs;
300 $response['elapsed'] = $timeMsTotal;
301
302 return $response;
303 }
304
323 public function executeJob( RunnableJob $job ) {
324 $telemetry = Telemetry::getInstance();
325 $oldRequestId = $telemetry->getRequestId();
326
327 if ( $job->getRequestId() !== null ) {
328 // Temporarily inherit the original ID of the web request that spawned this job
329 $telemetry->overrideRequestId( $job->getRequestId() );
330 } else {
331 // TODO: do we need to regenerate if job doesn't have the request id?
332 // If JobRunner was called with X-Request-ID header, regeneration will generate the
333 // same value
334 $telemetry->regenerateRequestId();
335 }
336 // Use an appropriate timeout to balance lag avoidance and job progress
337 $oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
338 try {
339 return $this->doExecuteJob( $job );
340 } finally {
341 $this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
342 $telemetry->overrideRequestId( $oldRequestId );
343 }
344 }
345
354 private function doExecuteJob( RunnableJob $job ) {
355 $jType = $job->getType();
356 $msg = $job->toString() . " STARTING";
357 $this->logger->debug( $msg, [ 'job_type' => $job->getType() ] );
358 $this->debugCallback( $msg );
359
360 // Clear out title cache data from prior snapshots
361 // (e.g. from before JobRunner was invoked in this process)
362 $this->linkCache->clear();
363
364 // Run the job...
365 $caught = [];
366 $rssStart = $this->getMaxRssKb();
367 $jobStartTime = microtime( true );
368 try {
369 $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
370 // Flush any pending changes left over from an implicit transaction round
371 if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
372 $this->lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
373 } else {
374 $this->lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
375 }
376 // Clear any stale REPEATABLE-READ snapshots from replica DB connections
377
378 $scope = LoggerFactory::getContext()->addScoped( [
379 'context.job_type' => $jType,
380 ] );
381 $status = $job->run();
382 $error = $job->getLastError();
383 ScopedCallback::consume( $scope );
384
385 // Commit all pending changes from this job
386 $this->lbFactory->commitPrimaryChanges(
387 $fnameTrxOwner,
388 // Abort if any transaction was too big
389 $this->options->get( MainConfigNames::MaxJobDBWriteDuration )
390 );
391 // Run any deferred update tasks; doUpdates() manages transactions itself
392 DeferredUpdates::doUpdates();
393 } catch ( Throwable $e ) {
394 MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );
395 $status = false;
396 $error = get_class( $e ) . ': ' . $e->getMessage() . ' in '
397 . $e->getFile() . ' on line ' . $e->getLine();
398 $caught[] = get_class( $e );
399 }
400 // Always attempt to call teardown(), even if Job throws exception
401 try {
402 $job->tearDown( $status );
403 } catch ( Throwable $e ) {
404 MWExceptionHandler::logException( $e );
405 }
406
407 $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
408 $rssEnd = $this->getMaxRssKb();
409
410 // Record how long jobs wait before getting popped
411 $readyTs = $job->getReadyTimestamp();
412 if ( $readyTs ) {
413 $pickupDelay = max( 0, $jobStartTime - $readyTs );
414 $this->statsFactory->getTiming( 'jobqueue_pickup_delay_seconds' )
415 ->setLabel( 'jobtype', $jType )
416 ->copyToStatsdAt( [
417 "jobqueue_pickup_delay_all_mean", "jobqueue.pickup_delay.$jType"
418 ] )
419 ->observe( 1000 * $pickupDelay );
420 }
421 // Record root job age for jobs being run
422 $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
423 if ( $rootTimestamp ) {
424 $age = max( 0, $jobStartTime - (int)wfTimestamp( TS_UNIX, $rootTimestamp ) );
425
426 $this->statsFactory->getTiming( "jobqueue_pickup_root_age_seconds" )
427 ->setLabel( 'jobtype', $jType )
428 ->copyToStatsdAt( "jobqueue.pickup_root_age.$jType" )
429 ->observe( 1000 * $age );
430 }
431 // Track the execution time for jobs
432 $this->statsFactory->getTiming( 'jobqueue_runtime_seconds' )
433 ->setLabel( 'jobtype', $jType )
434 ->copyToStatsdAt( "jobqueue.run.$jType" )
435 ->observe( $timeMs );
436 // Track RSS increases for jobs (in case of memory leaks)
437 if ( $rssStart && $rssEnd ) {
438 $this->statsFactory->getCounter( 'jobqueue_rss_delta_total' )
439 ->setLabel( 'rss_delta', $jType )
440 ->copyToStatsdAt( "jobqueue.rss_delta.$jType" )
441 ->incrementBy( $rssEnd - $rssStart );
442 }
443
444 if ( $status === false ) {
445 $msg = $job->toString() . " t={job_duration} error={job_error}";
446 $this->logger->error( $msg, [
447 'job_type' => $job->getType(),
448 'job_duration' => $timeMs,
449 'job_error' => $error,
450 ] );
451
452 $msg = $job->toString() . " t=$timeMs error={$error}";
453 $this->debugCallback( $msg );
454 } else {
455 $msg = $job->toString() . " t={job_duration} good";
456 $this->logger->info( $msg, [
457 'job_type' => $job->getType(),
458 'job_duration' => $timeMs,
459 ] );
460
461 $msg = $job->toString() . " t=$timeMs good";
462 $this->debugCallback( $msg );
463 }
464
465 return [
466 'status' => $status,
467 'error' => $error,
468 'caught' => $caught,
469 'timeMs' => $timeMs
470 ];
471 }
472
477 private function getErrorBackoffTTL( array $caught ) {
478 return in_array( DBReadOnlyError::class, $caught )
479 ? self::READONLY_BACKOFF_TTL
480 : self::ERROR_BACKOFF_TTL;
481 }
482
486 private function getMaxRssKb() {
487 $info = getrusage( 0 /* RUSAGE_SELF */ );
488 // see https://linux.die.net/man/2/getrusage
489 return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
490 }
491
497 private function getBackoffTimeToWait( RunnableJob $job ) {
498 $throttling = $this->options->get( MainConfigNames::JobBackoffThrottling );
499
500 if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
501 return 0; // not throttled
502 }
503
504 $itemsPerSecond = $throttling[$job->getType()];
505 if ( $itemsPerSecond <= 0 ) {
506 return 0; // not throttled
507 }
508
509 $seconds = 0;
510 if ( $job->workItemCount() > 0 ) {
511 $exactSeconds = $job->workItemCount() / $itemsPerSecond;
512 // use randomized rounding
513 $seconds = floor( $exactSeconds );
514 $remainder = $exactSeconds - $seconds;
515 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
516 }
517
518 return (int)$seconds;
519 }
520
529 private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
530 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
531 if ( is_file( $file ) ) {
532 $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
533 $handle = fopen( $file, 'rb' );
534 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
535 fclose( $handle );
536 return $backoffs; // don't wait on lock
537 }
538 $content = stream_get_contents( $handle );
539 flock( $handle, LOCK_UN );
540 fclose( $handle );
541 $ctime = microtime( true );
542 $cBackoffs = json_decode( $content, true ) ?: [];
543 foreach ( $cBackoffs as $type => $timestamp ) {
544 if ( $timestamp < $ctime ) {
545 unset( $cBackoffs[$type] );
546 }
547 }
548 } else {
549 $cBackoffs = [];
550 }
551
552 return $cBackoffs;
553 }
554
566 private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
567 if ( !$deltas ) {
568 return $this->loadBackoffs( $backoffs, $mode );
569 }
570
571 $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
572 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
573 $handle = fopen( $file, 'wb+' );
574 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
575 fclose( $handle );
576 return $backoffs; // don't wait on lock
577 }
578 $ctime = microtime( true );
579 $content = stream_get_contents( $handle );
580 $cBackoffs = json_decode( $content, true ) ?: [];
581 foreach ( $deltas as $type => $seconds ) {
582 $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
583 ? $cBackoffs[$type] + $seconds
584 : $ctime + $seconds;
585 }
586 foreach ( $cBackoffs as $type => $timestamp ) {
587 if ( $timestamp < $ctime ) {
588 unset( $cBackoffs[$type] );
589 }
590 }
591 ftruncate( $handle, 0 );
592 fwrite( $handle, json_encode( $cBackoffs ) );
593 flock( $handle, LOCK_UN );
594 fclose( $handle );
595
596 $deltas = [];
597
598 return $cBackoffs;
599 }
600
606 private function checkMemoryOK() {
607 static $maxBytes = null;
608 if ( $maxBytes === null ) {
609 $m = [];
610 if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
611 [ , $num, $unit ] = $m;
612 $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
613 $maxBytes = (int)$num * $conv[strtolower( $unit )];
614 } else {
615 $maxBytes = 0;
616 }
617 }
618 $usedBytes = memory_get_usage();
619 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
620 $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
621 $this->logger->error( $msg, [
622 'used_bytes' => $usedBytes,
623 'max_bytes' => $maxBytes,
624 ] );
625
626 $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
627 $this->debugCallback( $msg );
628
629 return false;
630 }
631
632 return true;
633 }
634
639 private function debugCallback( $msg ) {
640 if ( $this->debug ) {
641 ( $this->debug )( wfTimestamp( TS_DB ) . " $msg\n" );
642 }
643 }
644}
645
647class_alias( JobRunner::class, 'JobRunner' );
wfTempDir()
Tries to get the system directory for temporary files.
wfTimestamp( $outputtype=TS_UNIX, $ts=0)
Get a timestamp string in one of various formats.
Cache for article titles (prefixed DB keys) and ids linked from one source.
Definition LinkCache.php:52
A class for passing options to services.
assertRequiredOptions(array $expectedKeys)
Assert that the list of options provided in this instance exactly match $expectedKeys,...
Defer callable updates to run later in the PHP process.
Handler class for MWExceptions.
Service for handling telemetry data.
Definition Telemetry.php:29
Handle enqueueing of background jobs.
Job queue runner utility methods.
Definition JobRunner.php:48
run(array $options)
Run jobs of the specified number/type for the specified time.
__construct(ServiceOptions $serviceOptions, ILBFactory $lbFactory, JobQueueGroup $jobQueueGroup, ReadOnlyMode $readOnlyMode, LinkCache $linkCache, StatsFactory $statsFactory, LoggerInterface $logger)
executeJob(RunnableJob $job)
Run a specific job in a manner appropriate for mass use by job dispatchers.
No-op job that does nothing.
Create PSR-3 logger objects.
A class containing constants representing the names of configuration variables.
const TrxProfilerLimits
Name constant for the TrxProfilerLimits setting, for use with Config::get()
const JobClasses
Name constant for the JobClasses setting, for use with Config::get()
const JobBackoffThrottling
Name constant for the JobBackoffThrottling setting, for use with Config::get()
const MaxJobDBWriteDuration
Name constant for the MaxJobDBWriteDuration setting, for use with Config::get()
Determine whether a site is currently in read-only mode.
This is the primary interface for validating metrics definitions, caching defined metrics,...
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
Manager of ILoadBalancer objects and, indirectly, IDatabase connections.
if(count( $args)< 1) $job