MediaWiki master
JobRunner.php
Go to the documentation of this file.
1<?php
7namespace MediaWiki\JobQueue;
8
9use LogicException;
10use MediaWiki\Cache\LinkCache;
19use Psr\Log\LoggerInterface;
20use Throwable;
25use Wikimedia\ScopedCallback;
27use Wikimedia\Timestamp\ConvertibleTimestamp;
28use Wikimedia\Timestamp\TimestampFormat as TS;
29
36class JobRunner {
37
41 public const CONSTRUCTOR_OPTIONS = [
46 ];
47
49 private $options;
50
52 private $lbFactory;
53
55 private $jobQueueGroup;
56
58 private $readOnlyMode;
59
61 private $linkCache;
62
64 private $statsFactory;
65
67 private $debug;
68
70 private $logger;
71
73 private const MAX_ALLOWED_LAG = 3;
75 private const SYNC_TIMEOUT = self::MAX_ALLOWED_LAG;
77 private const LAG_CHECK_PERIOD = 1.0;
79 private const ERROR_BACKOFF_TTL = 1;
81 private const READONLY_BACKOFF_TTL = 30;
82
86 public function setDebugHandler( $debug ) {
87 $this->debug = $debug;
88 }
89
100 public function __construct(
101 ServiceOptions $serviceOptions,
102 ILBFactory $lbFactory,
103 JobQueueGroup $jobQueueGroup,
104 ReadOnlyMode $readOnlyMode,
105 LinkCache $linkCache,
106 StatsFactory $statsFactory,
107 LoggerInterface $logger
108 ) {
109 $serviceOptions->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
110 $this->options = $serviceOptions;
111 $this->lbFactory = $lbFactory;
112 $this->jobQueueGroup = $jobQueueGroup;
113 $this->readOnlyMode = $readOnlyMode;
114 $this->linkCache = $linkCache;
115 $this->statsFactory = $statsFactory;
116 $this->logger = $logger;
117 }
118
145 public function run( array $options ) {
146 $type = $options['type'] ?? false;
147 $maxJobs = $options['maxJobs'] ?? false;
148 $maxTime = $options['maxTime'] ?? false;
149 $throttle = $options['throttle'] ?? true;
150
151 $jobClasses = $this->options->get( MainConfigNames::JobClasses );
152 $profilerLimits = $this->options->get( MainConfigNames::TrxProfilerLimits );
153
154 $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
155
156 if ( $type !== false && !isset( $jobClasses[$type] ) ) {
157 // Invalid job type specified
158 $response['reached'] = 'none-possible';
159 return $response;
160 }
161
162 if ( $this->readOnlyMode->isReadOnly() ) {
163 // Any jobs popped off the queue might fail to run and thus might end up lost
164 $response['reached'] = 'read-only';
165 return $response;
166 }
167
168 [ , $maxLag ] = $this->lbFactory->getMainLB()->getMaxLag();
169 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
170 // DB lag is already too high; caller can immediately try other wikis if applicable
171 $response['reached'] = 'replica-lag-limit';
172 return $response;
173 }
174
175 // Narrow DB query expectations for this HTTP request
176 $this->lbFactory->getTransactionProfiler()
177 ->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
178
179 // Error out if an explicit DB transaction round is somehow active
180 if ( $this->lbFactory->hasTransactionRound() ) {
181 throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
182 }
183
184 // Some jobs types should not run until a certain timestamp
185 $backoffs = []; // map of (type => UNIX expiry)
186 $backoffDeltas = []; // map of (type => seconds)
187 $wait = 'wait'; // block to read backoffs the first time
188
189 $loopStartTime = microtime( true );
190 $jobsPopped = 0;
191 $timeMsTotal = 0;
192 $lastSyncTime = 1; // initialize "last sync check timestamp" to "ages ago"
193 // Keep popping and running jobs until there are no more...
194 do {
195 // Sync the persistent backoffs with concurrent runners
196 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
197 $backoffKeys = $throttle ? array_keys( $backoffs ) : [];
198 $wait = 'nowait'; // less important now
199
200 if ( $type === false ) {
201 // Treat the default job type queues as a single queue and pop off a job
202 $job = $this->jobQueueGroup
204 } else {
205 // Pop off a job from the specified job type queue unless the execution of
206 // that type of job is currently rate-limited by the back-off list
207 $job = in_array( $type, $backoffKeys ) ? false : $this->jobQueueGroup->pop( $type );
208 }
209
210 if ( $job ) {
211 ++$jobsPopped;
212 $jType = $job->getType();
213
214 // Back off of certain jobs for a while (for throttling and for errors)
215 $ttw = $this->getBackoffTimeToWait( $job );
216 if ( $ttw > 0 ) {
217 // Always add the delta for other runners in case the time running the
218 // job negated the backoff for each individually but not collectively.
219 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
220 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
221 }
222
223 $info = $this->executeJob( $job );
224
225 // Mark completed or "one shot only" jobs as resolved
226 if ( $info['status'] !== false || !$job->allowRetries() ) {
227 $this->jobQueueGroup->ack( $job );
228 }
229
230 // Back off of certain jobs for a while (for throttling and for errors)
231 if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
232 $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['caught'] ) );
233 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
234 }
235
236 $response['jobs'][] = [
237 'type' => $jType,
238 'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
239 'error' => $info['error'],
240 'time' => $info['timeMs']
241 ];
242 $timeMsTotal += $info['timeMs'];
243
244 // Break out if we hit the job count or wall time limits
245 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
246 $response['reached'] = 'job-limit';
247 break;
248 } elseif ( $maxTime && ( microtime( true ) - $loopStartTime ) > $maxTime ) {
249 $response['reached'] = 'time-limit';
250 break;
251 }
252
253 // Stop if we caught a DBConnectionError. In theory it would be
254 // possible to explicitly reconnect, but the present behaviour
255 // is to just throw more exceptions every time something database-
256 // related is attempted.
257 if ( in_array( DBConnectionError::class, $info['caught'], true ) ) {
258 $response['reached'] = 'exception';
259 break;
260 }
261
262 // Don't let any of the main DB replica DBs get backed up.
263 // This only waits for so long before exiting and letting
264 // other wikis in the farm (on different masters) get a chance.
265 $timePassed = microtime( true ) - $lastSyncTime;
266 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
267 $opts = [ 'ifWritesSince' => $lastSyncTime, 'timeout' => self::SYNC_TIMEOUT ];
268 if ( !$this->lbFactory->waitForReplication( $opts ) ) {
269 $response['reached'] = 'replica-lag-limit';
270 break;
271 }
272 $lastSyncTime = microtime( true );
273 }
274
275 // Abort if nearing OOM to avoid erroring out in the middle of a job
276 if ( !$this->checkMemoryOK() ) {
277 $response['reached'] = 'memory-limit';
278 break;
279 }
280 }
281 } while ( $job );
282
283 // Sync the persistent backoffs for the next runJobs.php pass
284 if ( $backoffDeltas ) {
285 $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
286 }
287
288 $response['backoffs'] = $backoffs;
289 $response['elapsed'] = $timeMsTotal;
290
291 return $response;
292 }
293
312 public function executeJob( RunnableJob $job ) {
313 $telemetry = Telemetry::getInstance();
314 $oldRequestId = $telemetry->getRequestId();
315
316 if ( $job->getRequestId() !== null ) {
317 // Temporarily inherit the original ID of the web request that spawned this job
318 $telemetry->overrideRequestId( $job->getRequestId() );
319 } else {
320 // TODO: do we need to regenerate if job doesn't have the request id?
321 // If JobRunner was called with X-Request-ID header, regeneration will generate the
322 // same value
323 $telemetry->regenerateRequestId();
324 }
325 // Use an appropriate timeout to balance lag avoidance and job progress
326 $oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
327 try {
328 return $this->doExecuteJob( $job );
329 } finally {
330 $this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
331 $telemetry->overrideRequestId( $oldRequestId );
332 }
333 }
334
343 private function doExecuteJob( RunnableJob $job ) {
344 $jType = $job->getType();
345 $msg = $job->toString() . " STARTING";
346 $this->logger->debug( $msg, [ 'job_type' => $job->getType() ] );
347 $this->debugCallback( $msg );
348
349 // Clear out title cache data from prior snapshots
350 // (e.g. from before JobRunner was invoked in this process)
351 $this->linkCache->clear();
352
353 // Run the job...
354 $caught = [];
355 $rssStart = $this->getMaxRssKb();
356 $jobStartTime = microtime( true );
357 try {
358 $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
359 // Flush any pending changes left over from an implicit transaction round
360 if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
361 $this->lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
362 } else {
363 $this->lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
364 }
365 // Clear any stale REPEATABLE-READ snapshots from replica DB connections
366
367 $scope = LoggerFactory::getContext()->addScoped( [
368 'context.job_type' => $jType,
369 ] );
370 $status = $job->run();
371 $error = $job->getLastError();
372 ScopedCallback::consume( $scope );
373
374 // Commit all pending changes from this job
375 $this->lbFactory->commitPrimaryChanges(
376 $fnameTrxOwner,
377 // Abort if any transaction was too big
378 $this->options->get( MainConfigNames::MaxJobDBWriteDuration )
379 );
380 // Run any deferred update tasks; doUpdates() manages transactions itself
381 DeferredUpdates::doUpdates();
382 } catch ( Throwable $e ) {
383 MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );
384 $status = false;
385 $error = get_class( $e ) . ': ' . $e->getMessage() . ' in '
386 . $e->getFile() . ' on line ' . $e->getLine();
387 $caught[] = get_class( $e );
388 }
389 // Always attempt to call teardown(), even if Job throws exception
390 try {
391 $job->tearDown( $status );
392 } catch ( Throwable $e ) {
393 MWExceptionHandler::logException( $e );
394 }
395
396 $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
397 $rssEnd = $this->getMaxRssKb();
398
399 // Record how long jobs wait before getting popped
400 $readyTs = $job->getReadyTimestamp();
401 if ( $readyTs ) {
402 $pickupDelay = max( 0, $jobStartTime - $readyTs );
403 $this->statsFactory->getTiming( 'jobqueue_pickup_delay_seconds' )
404 ->setLabel( 'jobtype', $jType )
405 ->observe( 1000 * $pickupDelay );
406 }
407 // Record root job age for jobs being run
408 $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
409 if ( $rootTimestamp ) {
410 $age = max( 0, $jobStartTime - (int)wfTimestamp( TS::UNIX, $rootTimestamp ) );
411
412 $this->statsFactory->getTiming( "jobqueue_pickup_root_age_seconds" )
413 ->setLabel( 'jobtype', $jType )
414 ->observe( 1000 * $age );
415 }
416 // Track the execution time for jobs
417 $this->statsFactory->getTiming( 'jobqueue_runtime_seconds' )
418 ->setLabel( 'jobtype', $jType )
419 ->observe( $timeMs );
420 // Track RSS increases for jobs (in case of memory leaks)
421 if ( $rssStart && $rssEnd ) {
422 $this->statsFactory->getCounter( 'jobqueue_rss_delta_total' )
423 ->setLabel( 'rss_delta', $jType )
424 ->incrementBy( $rssEnd - $rssStart );
425 }
426
427 if ( $status === false ) {
428 $msg = $job->toString() . " t={job_duration} error={job_error}";
429 $this->logger->error( $msg, [
430 'job_type' => $job->getType(),
431 'job_duration' => $timeMs,
432 'job_error' => $error,
433 ] );
434
435 $msg = $job->toString() . " t=$timeMs error={$error}";
436 $this->debugCallback( $msg );
437 } else {
438 $msg = $job->toString() . " t={job_duration} good";
439 $this->logger->info( $msg, [
440 'job_type' => $job->getType(),
441 'job_duration' => $timeMs,
442 ] );
443
444 $msg = $job->toString() . " t=$timeMs good";
445 $this->debugCallback( $msg );
446 }
447
448 return [
449 'status' => $status,
450 'error' => $error,
451 'caught' => $caught,
452 'timeMs' => $timeMs
453 ];
454 }
455
460 private function getErrorBackoffTTL( array $caught ) {
461 return in_array( DBReadOnlyError::class, $caught )
462 ? self::READONLY_BACKOFF_TTL
463 : self::ERROR_BACKOFF_TTL;
464 }
465
469 private function getMaxRssKb() {
470 $info = getrusage( 0 /* RUSAGE_SELF */ );
471 // see https://linux.die.net/man/2/getrusage
472 return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
473 }
474
480 private function getBackoffTimeToWait( RunnableJob $job ) {
481 $throttling = $this->options->get( MainConfigNames::JobBackoffThrottling );
482
483 if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
484 return 0; // not throttled
485 }
486
487 $itemsPerSecond = $throttling[$job->getType()];
488 if ( $itemsPerSecond <= 0 ) {
489 return 0; // not throttled
490 }
491
492 $seconds = 0;
493 if ( $job->workItemCount() > 0 ) {
494 $exactSeconds = $job->workItemCount() / $itemsPerSecond;
495 // use randomized rounding
496 $seconds = floor( $exactSeconds );
497 $remainder = $exactSeconds - $seconds;
498 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
499 }
500
501 return (int)$seconds;
502 }
503
512 private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
513 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
514 if ( is_file( $file ) ) {
515 $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
516 $handle = fopen( $file, 'rb' );
517 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
518 fclose( $handle );
519 return $backoffs; // don't wait on lock
520 }
521 $content = stream_get_contents( $handle );
522 flock( $handle, LOCK_UN );
523 fclose( $handle );
524 $ctime = microtime( true );
525 $cBackoffs = json_decode( $content, true ) ?: [];
526 foreach ( $cBackoffs as $type => $timestamp ) {
527 if ( $timestamp < $ctime ) {
528 unset( $cBackoffs[$type] );
529 }
530 }
531 } else {
532 $cBackoffs = [];
533 }
534
535 return $cBackoffs;
536 }
537
549 private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
550 if ( !$deltas ) {
551 return $this->loadBackoffs( $backoffs, $mode );
552 }
553
554 $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
555 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
556 $handle = fopen( $file, 'wb+' );
557 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
558 fclose( $handle );
559 return $backoffs; // don't wait on lock
560 }
561 $ctime = microtime( true );
562 $content = stream_get_contents( $handle );
563 $cBackoffs = json_decode( $content, true ) ?: [];
564 foreach ( $deltas as $type => $seconds ) {
565 $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
566 ? $cBackoffs[$type] + $seconds
567 : $ctime + $seconds;
568 }
569 foreach ( $cBackoffs as $type => $timestamp ) {
570 if ( $timestamp < $ctime ) {
571 unset( $cBackoffs[$type] );
572 }
573 }
574 ftruncate( $handle, 0 );
575 fwrite( $handle, json_encode( $cBackoffs ) );
576 flock( $handle, LOCK_UN );
577 fclose( $handle );
578
579 $deltas = [];
580
581 return $cBackoffs;
582 }
583
589 private function checkMemoryOK() {
590 static $maxBytes = null;
591 if ( $maxBytes === null ) {
592 $m = [];
593 if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
594 [ , $num, $unit ] = $m;
595 $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
596 $maxBytes = (int)$num * $conv[strtolower( $unit )];
597 } else {
598 $maxBytes = 0;
599 }
600 }
601 $usedBytes = memory_get_usage();
602 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
603 $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
604 $this->logger->error( $msg, [
605 'used_bytes' => $usedBytes,
606 'max_bytes' => $maxBytes,
607 ] );
608
609 $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
610 $this->debugCallback( $msg );
611
612 return false;
613 }
614
615 return true;
616 }
617
622 private function debugCallback( $msg ) {
623 if ( $this->debug ) {
624 ( $this->debug )( ConvertibleTimestamp::now( TS::DB ) . " $msg\n" );
625 }
626 }
627}
628
630class_alias( JobRunner::class, 'JobRunner' );
wfTempDir()
Tries to get the system directory for temporary files.
wfTimestamp( $outputtype=TS::UNIX, $ts=0)
Get a timestamp string in one of various formats.
A class for passing options to services.
assertRequiredOptions(array $expectedKeys)
Assert that the list of options provided in this instance exactly match $expectedKeys,...
Defer callable updates to run later in the PHP process.
Handler class for MWExceptions.
Service for handling telemetry data.
Definition Telemetry.php:15
Handle enqueueing of background jobs.
Job queue runner utility methods.
Definition JobRunner.php:36
run(array $options)
Run jobs of the specified number/type for the specified time.
__construct(ServiceOptions $serviceOptions, ILBFactory $lbFactory, JobQueueGroup $jobQueueGroup, ReadOnlyMode $readOnlyMode, LinkCache $linkCache, StatsFactory $statsFactory, LoggerInterface $logger)
executeJob(RunnableJob $job)
Run a specific job in a manner appropriate for mass use by job dispatchers.
No-op job that does nothing.
Create PSR-3 logger objects.
A class containing constants representing the names of configuration variables.
const TrxProfilerLimits
Name constant for the TrxProfilerLimits setting, for use with Config::get()
const JobClasses
Name constant for the JobClasses setting, for use with Config::get()
const JobBackoffThrottling
Name constant for the JobBackoffThrottling setting, for use with Config::get()
const MaxJobDBWriteDuration
Name constant for the MaxJobDBWriteDuration setting, for use with Config::get()
Determine whether a site is currently in read-only mode.
This is the primary interface for validating metrics definitions, caching defined metrics,...
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
Manager of ILoadBalancer objects and, indirectly, IDatabase connections.
if(count( $args)< 1) $job