MediaWiki REL1_35
JobRunner.php
Go to the documentation of this file.
1<?php
24use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
28use Psr\Log\LoggerAwareInterface;
29use Psr\Log\LoggerInterface;
33use Wikimedia\ScopedCallback;
34
41class JobRunner implements LoggerAwareInterface {
42
43 public const CONSTRUCTOR_OPTIONS = [
44 'JobBackoffThrottling',
45 'JobClasses',
46 'JobSerialCommitThreshold',
47 'MaxJobDBWriteDuration',
48 'TrxProfilerLimits'
49 ];
50
52 private $options;
53
55 private $lbFactory;
56
59
62
64 private $linkCache;
65
67 private $stats;
68
70 private $debug;
71
73 private $logger;
74
76 private const MAX_ALLOWED_LAG = 3;
78 private const SYNC_TIMEOUT = self::MAX_ALLOWED_LAG;
80 private const LAG_CHECK_PERIOD = 1.0;
82 private const ERROR_BACKOFF_TTL = 1;
84 private const READONLY_BACKOFF_TTL = 30;
85
89 public function setDebugHandler( $debug ) {
90 $this->debug = $debug;
91 }
92
98 public function setLogger( LoggerInterface $logger ) {
99 wfDeprecated( __METHOD__, '1.35' );
100 $this->logger = $logger;
101 }
102
114 public function __construct(
115 $serviceOptions = null,
116 ILBFactory $lbFactory = null,
117 JobQueueGroup $jobQueueGroup = null,
118 ReadOnlyMode $readOnlyMode = null,
119 LinkCache $linkCache = null,
120 StatsdDataFactoryInterface $statsdDataFactory = null,
121 LoggerInterface $logger = null
122 ) {
123 if ( !$serviceOptions || $serviceOptions instanceof LoggerInterface ) {
124 // TODO: wfDeprecated( __METHOD__ . 'called directly. Use MediaWikiServices instead', '1.35' );
125 $logger = $serviceOptions;
126 $serviceOptions = new ServiceOptions(
127 static::CONSTRUCTOR_OPTIONS,
128 MediaWikiServices::getInstance()->getMainConfig()
129 );
130 }
131
132 $this->options = $serviceOptions;
133 $this->lbFactory = $lbFactory ?? MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
134 $this->jobQueueGroup = $jobQueueGroup ?? JobQueueGroup::singleton();
135 $this->readOnlyMode = $readOnlyMode ?: MediaWikiServices::getInstance()->getReadOnlyMode();
136 $this->linkCache = $linkCache ?? MediaWikiServices::getInstance()->getLinkCache();
137 $this->stats = $statsdDataFactory ?? MediaWikiServices::getInstance()->getStatsdDataFactory();
138 $this->logger = $logger ?? LoggerFactory::getInstance( 'runJobs' );
139 }
140
166 public function run( array $options ) {
167 $type = $options['type'] ?? false;
168 $maxJobs = $options['maxJobs'] ?? false;
169 $maxTime = $options['maxTime'] ?? false;
170 $throttle = $options['throttle'] ?? true;
171
172 $jobClasses = $this->options->get( 'JobClasses' );
173 $profilerLimits = $this->options->get( 'TrxProfilerLimits' );
174
175 $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
176
177 if ( $type !== false && !isset( $jobClasses[$type] ) ) {
178 // Invalid job type specified
179 $response['reached'] = 'none-possible';
180 return $response;
181 }
182
183 if ( $this->readOnlyMode->isReadOnly() ) {
184 // Any jobs popped off the queue might fail to run and thus might end up lost
185 $response['reached'] = 'read-only';
186 return $response;
187 }
188
189 list( , $maxLag ) = $this->lbFactory->getMainLB()->getMaxLag();
190 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
191 // DB lag is already too high; caller can immediately try other wikis if applicable
192 $response['reached'] = 'replica-lag-limit';
193 return $response;
194 }
195
196 // Narrow DB query expectations for this HTTP request
197 $this->lbFactory->getTransactionProfiler()
198 ->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
199
200 // Error out if an explicit DB transaction round is somehow active
201 if ( $this->lbFactory->hasTransactionRound() ) {
202 throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
203 }
204
205 // Some jobs types should not run until a certain timestamp
206 $backoffs = []; // map of (type => UNIX expiry)
207 $backoffDeltas = []; // map of (type => seconds)
208 $wait = 'wait'; // block to read backoffs the first time
209
210 $loopStartTime = microtime( true );
211 $jobsPopped = 0;
212 $timeMsTotal = 0;
213 $lastSyncTime = 1; // initialize "last sync check timestamp" to "ages ago"
214 // Keep popping and running jobs until there are no more...
215 do {
216 // Sync the persistent backoffs with concurrent runners
217 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
218 $blacklist = $throttle ? array_keys( $backoffs ) : [];
219 $wait = 'nowait'; // less important now
220
221 if ( $type === false ) {
222 // Treat the default job type queues as a single queue and pop off a job
223 $job = $this->jobQueueGroup
224 ->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE, $blacklist );
225 } else {
226 // Pop off a job from the specified job type queue unless the execution of
227 // that type of job is currently rate-limited by the back-off blacklist
228 $job = in_array( $type, $blacklist ) ? false : $this->jobQueueGroup->pop( $type );
229 }
230
231 if ( $job ) {
232 ++$jobsPopped;
233 $jType = $job->getType();
234
235 // Back off of certain jobs for a while (for throttling and for errors)
236 $ttw = $this->getBackoffTimeToWait( $job );
237 if ( $ttw > 0 ) {
238 // Always add the delta for other runners in case the time running the
239 // job negated the backoff for each individually but not collectively.
240 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
241 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
242 }
243
244 $info = $this->executeJob( $job );
245
246 // Mark completed or "one shot only" jobs as resolved
247 if ( $info['status'] !== false || !$job->allowRetries() ) {
248 $this->jobQueueGroup->ack( $job );
249 }
250
251 // Back off of certain jobs for a while (for throttling and for errors)
252 if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
253 $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['caught'] ) );
254 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
255 ? $backoffDeltas[$jType] + $ttw
256 : $ttw;
257 }
258
259 $response['jobs'][] = [
260 'type' => $jType,
261 'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
262 'error' => $info['error'],
263 'time' => $info['timeMs']
264 ];
265 $timeMsTotal += $info['timeMs'];
266
267 // Break out if we hit the job count or wall time limits
268 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
269 $response['reached'] = 'job-limit';
270 break;
271 } elseif ( $maxTime && ( microtime( true ) - $loopStartTime ) > $maxTime ) {
272 $response['reached'] = 'time-limit';
273 break;
274 }
275
276 // Don't let any of the main DB replica DBs get backed up.
277 // This only waits for so long before exiting and letting
278 // other wikis in the farm (on different masters) get a chance.
279 $timePassed = microtime( true ) - $lastSyncTime;
280 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
281 $opts = [ 'ifWritesSince' => $lastSyncTime, 'timeout' => self::SYNC_TIMEOUT ];
282 if ( !$this->lbFactory->waitForReplication( $opts ) ) {
283 $response['reached'] = 'replica-lag-limit';
284 break;
285 }
286 $lastSyncTime = microtime( true );
287 }
288
289 // Abort if nearing OOM to avoid erroring out in the middle of a job
290 if ( !$this->checkMemoryOK() ) {
291 $response['reached'] = 'memory-limit';
292 break;
293 }
294 }
295 } while ( $job );
296
297 // Sync the persistent backoffs for the next runJobs.php pass
298 if ( $backoffDeltas ) {
299 $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
300 }
301
302 $response['backoffs'] = $backoffs;
303 $response['elapsed'] = $timeMsTotal;
304
305 return $response;
306 }
307
326 public function executeJob( RunnableJob $job ) {
327 $oldRequestId = WebRequest::getRequestId();
328 // Temporarily inherit the original ID of the web request that spawned this job
329 WebRequest::overrideRequestId( $job->getRequestId() );
330 // Use an appropriate timeout to balance lag avoidance and job progress
331 $oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
332 try {
333 return $this->doExecuteJob( $job );
334 } finally {
335 $this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
336 WebRequest::overrideRequestId( $oldRequestId );
337 }
338 }
339
348 private function doExecuteJob( RunnableJob $job ) {
349 $jType = $job->getType();
350 $msg = $job->toString() . " STARTING";
351 $this->logger->debug( $msg, [ 'job_type' => $job->getType() ] );
352 $this->debugCallback( $msg );
353
354 // Clear out title cache data from prior snapshots
355 // (e.g. from before JobRunner was invoked in this process)
356 $this->linkCache->clear();
357
358 // Run the job...
359 $caught = [];
360 $rssStart = $this->getMaxRssKb();
361 $jobStartTime = microtime( true );
362 try {
363 $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
364 // Flush any pending changes left over from an implicit transaction round
365 if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
366 $this->lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
367 } else {
368 $this->lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
369 }
370 // Clear any stale REPEATABLE-READ snapshots from replica DB connections
371 $this->lbFactory->flushReplicaSnapshots( $fnameTrxOwner );
372 $status = $job->run();
373 $error = $job->getLastError();
374 // Commit all pending changes from this job
375 $this->commitMasterChanges( $job, $fnameTrxOwner );
376 // Run any deferred update tasks; doUpdates() manages transactions itself
377 DeferredUpdates::doUpdates();
378 } catch ( Throwable $e ) {
379 MWExceptionHandler::rollbackMasterChangesAndLog( $e );
380 $status = false;
381 $error = get_class( $e ) . ': ' . $e->getMessage();
382 $caught[] = get_class( $e );
383 }
384 // Always attempt to call teardown(), even if Job throws exception
385 try {
386 $job->tearDown( $status );
387 } catch ( Throwable $e ) {
388 MWExceptionHandler::logException( $e );
389 }
390
391 $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
392 $rssEnd = $this->getMaxRssKb();
393
394 // Record how long jobs wait before getting popped
395 $readyTs = $job->getReadyTimestamp();
396 if ( $readyTs ) {
397 $pickupDelay = max( 0, $jobStartTime - $readyTs );
398 $this->stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
399 $this->stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
400 }
401 // Record root job age for jobs being run
402 $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
403 if ( $rootTimestamp ) {
404 $age = max( 0, $jobStartTime - wfTimestamp( TS_UNIX, $rootTimestamp ) );
405 $this->stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
406 }
407 // Track the execution time for jobs
408 $this->stats->timing( "jobqueue.run.$jType", $timeMs );
409 // Track RSS increases for jobs (in case of memory leaks)
410 if ( $rssStart && $rssEnd ) {
411 $this->stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
412 }
413
414 if ( $status === false ) {
415 $msg = $job->toString() . " t={job_duration} error={job_error}";
416 $this->logger->error( $msg, [
417 'job_type' => $job->getType(),
418 'job_duration' => $timeMs,
419 'job_error' => $error,
420 ] );
421
422 $msg = $job->toString() . " t=$timeMs error={$error}";
423 $this->debugCallback( $msg );
424 } else {
425 $msg = $job->toString() . " t={job_duration} good";
426 $this->logger->info( $msg, [
427 'job_type' => $job->getType(),
428 'job_duration' => $timeMs,
429 ] );
430
431 $msg = $job->toString() . " t=$timeMs good";
432 $this->debugCallback( $msg );
433 }
434
435 return [
436 'status' => $status,
437 'error' => $error,
438 'caught' => $caught,
439 'timeMs' => $timeMs
440 ];
441 }
442
447 private function getErrorBackoffTTL( array $caught ) {
448 return in_array( DBReadOnlyError::class, $caught )
449 ? self::READONLY_BACKOFF_TTL
450 : self::ERROR_BACKOFF_TTL;
451 }
452
456 private function getMaxRssKb() {
457 $info = getrusage( 0 /* RUSAGE_SELF */ );
458 // see https://linux.die.net/man/2/getrusage
459 return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
460 }
461
468 $throttling = $this->options->get( 'JobBackoffThrottling' );
469
470 if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
471 return 0; // not throttled
472 }
473
474 $itemsPerSecond = $throttling[$job->getType()];
475 if ( $itemsPerSecond <= 0 ) {
476 return 0; // not throttled
477 }
478
479 $seconds = 0;
480 if ( $job->workItemCount() > 0 ) {
481 $exactSeconds = $job->workItemCount() / $itemsPerSecond;
482 // use randomized rounding
483 $seconds = floor( $exactSeconds );
484 $remainder = $exactSeconds - $seconds;
485 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
486 }
487
488 return (int)$seconds;
489 }
490
499 private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
500 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
501 if ( is_file( $file ) ) {
502 $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
503 $handle = fopen( $file, 'rb' );
504 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
505 fclose( $handle );
506 return $backoffs; // don't wait on lock
507 }
508 $content = stream_get_contents( $handle );
509 flock( $handle, LOCK_UN );
510 fclose( $handle );
511 $ctime = microtime( true );
512 $cBackoffs = json_decode( $content, true ) ?: [];
513 foreach ( $cBackoffs as $type => $timestamp ) {
514 if ( $timestamp < $ctime ) {
515 unset( $cBackoffs[$type] );
516 }
517 }
518 } else {
519 $cBackoffs = [];
520 }
521
522 return $cBackoffs;
523 }
524
536 private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
537 if ( !$deltas ) {
538 return $this->loadBackoffs( $backoffs, $mode );
539 }
540
541 $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
542 $file = wfTempDir() . '/mw-runJobs-backoffs.json';
543 $handle = fopen( $file, 'wb+' );
544 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
545 fclose( $handle );
546 return $backoffs; // don't wait on lock
547 }
548 $ctime = microtime( true );
549 $content = stream_get_contents( $handle );
550 $cBackoffs = json_decode( $content, true ) ?: [];
551 foreach ( $deltas as $type => $seconds ) {
552 $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
553 ? $cBackoffs[$type] + $seconds
554 : $ctime + $seconds;
555 }
556 foreach ( $cBackoffs as $type => $timestamp ) {
557 if ( $timestamp < $ctime ) {
558 unset( $cBackoffs[$type] );
559 }
560 }
561 ftruncate( $handle, 0 );
562 fwrite( $handle, json_encode( $cBackoffs ) );
563 flock( $handle, LOCK_UN );
564 fclose( $handle );
565
566 $deltas = [];
567
568 return $cBackoffs;
569 }
570
576 private function checkMemoryOK() {
577 static $maxBytes = null;
578 if ( $maxBytes === null ) {
579 $m = [];
580 if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
581 list( , $num, $unit ) = $m;
582 $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
583 $maxBytes = $num * $conv[strtolower( $unit )];
584 } else {
585 $maxBytes = 0;
586 }
587 }
588 $usedBytes = memory_get_usage();
589 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
590 $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
591 $this->logger->error( $msg, [
592 'used_bytes' => $usedBytes,
593 'max_bytes' => $maxBytes,
594 ] );
595
596 $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
597 $this->debugCallback( $msg );
598
599 return false;
600 }
601
602 return true;
603 }
604
609 private function debugCallback( $msg ) {
610 if ( $this->debug ) {
611 call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] );
612 }
613 }
614
625 private function commitMasterChanges( RunnableJob $job, $fnameTrxOwner ) {
626 $syncThreshold = $this->options->get( 'JobSerialCommitThreshold' );
627
628 $time = false;
629 $lb = $this->lbFactory->getMainLB();
630 if ( $syncThreshold !== false && $lb->hasStreamingReplicaServers() ) {
631 // Generally, there is one master connection to the local DB
632 $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
633 // We need natively blocking fast locks
634 if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
635 $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
636 if ( $time < $syncThreshold ) {
637 $dbwSerial = false;
638 }
639 } else {
640 $dbwSerial = false;
641 }
642 } else {
643 // There are no replica DBs or writes are all to foreign DB (we don't handle that)
644 $dbwSerial = false;
645 }
646
647 if ( !$dbwSerial ) {
648 $this->lbFactory->commitMasterChanges(
649 $fnameTrxOwner,
650 // Abort if any transaction was too big
651 [ 'maxWriteDuration' => $this->options->get( 'MaxJobDBWriteDuration' ) ]
652 );
653
654 return;
655 }
656
657 $ms = intval( 1000 * $time );
658
659 $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
660 $this->logger->info( $msg, [
661 'job_type' => $job->getType(),
662 'job_commit_write_ms' => $ms,
663 ] );
664
665 $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
666 $this->debugCallback( $msg );
667
668 // Wait for an exclusive lock to commit
669 if ( !$dbwSerial->lock( 'jobrunner-serial-commit', $fnameTrxOwner, 30 ) ) {
670 // This will trigger a rollback in the main loop
671 throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
672 }
673 $unlocker = new ScopedCallback( function () use ( $dbwSerial, $fnameTrxOwner ) {
674 $dbwSerial->unlock( 'jobrunner-serial-commit', $fnameTrxOwner );
675 } );
676
677 // Wait for the replica DBs to catch up
678 $pos = $lb->getMasterPos();
679 if ( $pos ) {
680 $lb->waitForAll( $pos );
681 }
682
683 // Actually commit the DB master changes
684 $this->lbFactory->commitMasterChanges(
685 $fnameTrxOwner,
686 // Abort if any transaction was too big
687 [ 'maxWriteDuration' => $this->options->get( 'MaxJobDBWriteDuration' ) ]
688 );
689 ScopedCallback::consume( $unlocker );
690 }
691}
wfTempDir()
Tries to get the system directory for temporary files.
wfTimestamp( $outputtype=TS_UNIX, $ts=0)
Get a timestamp string in one of various formats.
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that $function is deprecated.
No-op job that does nothing.
Class to handle enqueueing of background jobs.
Job queue runner utility methods.
Definition JobRunner.php:41
ILBFactory $lbFactory
Definition JobRunner.php:55
commitMasterChanges(RunnableJob $job, $fnameTrxOwner)
Issue a commit on all masters who are currently in a transaction and have made changes to the databas...
setDebugHandler( $debug)
Definition JobRunner.php:89
ServiceOptions $options
Definition JobRunner.php:52
LinkCache $linkCache
Definition JobRunner.php:64
run(array $options)
Run jobs of the specified number/type for the specified time.
getErrorBackoffTTL(array $caught)
ReadOnlyMode $readOnlyMode
Definition JobRunner.php:61
const CONSTRUCTOR_OPTIONS
Definition JobRunner.php:43
callable null $debug
Debug output handler.
Definition JobRunner.php:70
setLogger(LoggerInterface $logger)
Definition JobRunner.php:98
executeJob(RunnableJob $job)
Run a specific job in a manner appropriate for mass use by job dispatchers.
syncBackoffDeltas(array $backoffs, array &$deltas, $mode='wait')
Merge the current backoff expiries from persistent storage.
JobQueueGroup $jobQueueGroup
Definition JobRunner.php:58
StatsdDataFactoryInterface $stats
Definition JobRunner.php:67
doExecuteJob(RunnableJob $job)
debugCallback( $msg)
Log the job message.
getBackoffTimeToWait(RunnableJob $job)
checkMemoryOK()
Make sure that this script is not too close to the memory usage limit.
LoggerInterface $logger
Definition JobRunner.php:73
__construct( $serviceOptions=null, ILBFactory $lbFactory=null, JobQueueGroup $jobQueueGroup=null, ReadOnlyMode $readOnlyMode=null, LinkCache $linkCache=null, StatsdDataFactoryInterface $statsdDataFactory=null, LoggerInterface $logger=null)
Calling this directly is deprecated.
loadBackoffs(array $backoffs, $mode='wait')
Get the previous backoff expiries from persistent storage On I/O or lock acquisition failure this ret...
Cache for article titles (prefixed DB keys) and ids linked from one source.
Definition LinkCache.php:34
A class for passing options to services.
PSR-3 logger instance factory.
MediaWikiServices is the service locator for the application scope of MediaWiki.
A service class for fetching the wiki's current read-only mode.
Database error base class @newable Stable to extend.
Definition DBError.php:32
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
An interface for generating database load balancers.
$debug
Definition mcc.php:31
if(count( $args)< 1) $job
$content
Definition router.php:76
if(PHP_SAPI !='cli-server') if(!isset( $_SERVER['SCRIPT_FILENAME'])) $file
Item class for a filearchive table row.
Definition router.php:42