55 private $jobQueueGroup;
58 private $readOnlyMode;
64 private $statsFactory;
73 private const MAX_ALLOWED_LAG = 3;
75 private const SYNC_TIMEOUT = self::MAX_ALLOWED_LAG;
77 private const LAG_CHECK_PERIOD = 1.0;
79 private const ERROR_BACKOFF_TTL = 1;
81 private const READONLY_BACKOFF_TTL = 30;
87 $this->debug = $debug;
105 LinkCache $linkCache,
107 LoggerInterface $logger
110 $this->options = $serviceOptions;
111 $this->lbFactory = $lbFactory;
112 $this->jobQueueGroup = $jobQueueGroup;
113 $this->readOnlyMode = $readOnlyMode;
114 $this->linkCache = $linkCache;
115 $this->statsFactory = $statsFactory;
116 $this->logger = $logger;
145 public function run( array $options ) {
146 $type = $options[
'type'] ??
false;
147 $maxJobs = $options[
'maxJobs'] ??
false;
148 $maxTime = $options[
'maxTime'] ??
false;
149 $throttle = $options[
'throttle'] ??
true;
154 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
156 if ( $type !==
false && !isset( $jobClasses[$type] ) ) {
158 $response[
'reached'] =
'none-possible';
162 if ( $this->readOnlyMode->isReadOnly() ) {
164 $response[
'reached'] =
'read-only';
168 [ , $maxLag ] = $this->lbFactory->getMainLB()->getMaxLag();
169 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
171 $response[
'reached'] =
'replica-lag-limit';
176 $this->lbFactory->getTransactionProfiler()
177 ->setExpectations( $profilerLimits[
'JobRunner'], __METHOD__ );
180 if ( $this->lbFactory->hasTransactionRound() ) {
181 throw new LogicException( __METHOD__ .
' called with an active transaction round.' );
189 $loopStartTime = microtime(
true );
196 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
197 $backoffKeys = $throttle ? array_keys( $backoffs ) : [];
200 if ( $type ===
false ) {
202 $job = $this->jobQueueGroup
207 $job = in_array( $type, $backoffKeys ) ? false : $this->jobQueueGroup->pop( $type );
212 $jType =
$job->getType();
215 $ttw = $this->getBackoffTimeToWait(
$job );
219 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
220 $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
226 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
227 $this->jobQueueGroup->ack(
$job );
231 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
232 $ttw = max( $ttw, $this->getErrorBackoffTTL( $info[
'caught'] ) );
233 $backoffDeltas[$jType] = ( $backoffDeltas[$jType] ?? 0 ) + $ttw;
236 $response[
'jobs'][] = [
238 'status' => ( $info[
'status'] === false ) ?
'failed' :
'ok',
239 'error' => $info[
'error'],
240 'time' => $info[
'timeMs']
242 $timeMsTotal += $info[
'timeMs'];
245 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
246 $response[
'reached'] =
'job-limit';
248 } elseif ( $maxTime && ( microtime(
true ) - $loopStartTime ) > $maxTime ) {
249 $response[
'reached'] =
'time-limit';
257 if ( in_array( DBConnectionError::class, $info[
'caught'],
true ) ) {
258 $response[
'reached'] =
'exception';
265 $timePassed = microtime(
true ) - $lastSyncTime;
266 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
267 $opts = [
'ifWritesSince' => $lastSyncTime,
'timeout' => self::SYNC_TIMEOUT ];
268 if ( !$this->lbFactory->waitForReplication( $opts ) ) {
269 $response[
'reached'] =
'replica-lag-limit';
272 $lastSyncTime = microtime(
true );
276 if ( !$this->checkMemoryOK() ) {
277 $response[
'reached'] =
'memory-limit';
284 if ( $backoffDeltas ) {
285 $this->syncBackoffDeltas( $backoffs, $backoffDeltas,
'wait' );
288 $response[
'backoffs'] = $backoffs;
289 $response[
'elapsed'] = $timeMsTotal;
313 $telemetry = Telemetry::getInstance();
314 $oldRequestId = $telemetry->getRequestId();
316 if (
$job->getRequestId() !==
null ) {
318 $telemetry->overrideRequestId(
$job->getRequestId() );
323 $telemetry->regenerateRequestId();
326 $oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
328 return $this->doExecuteJob(
$job );
330 $this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
331 $telemetry->overrideRequestId( $oldRequestId );
344 $jType =
$job->getType();
345 $msg =
$job->toString() .
" STARTING";
346 $this->logger->debug( $msg, [
'job_type' =>
$job->getType() ] );
347 $this->debugCallback( $msg );
351 $this->linkCache->clear();
355 $rssStart = $this->getMaxRssKb();
356 $jobStartTime = microtime(
true );
358 $fnameTrxOwner = get_class(
$job ) .
'::run';
360 if (
$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
361 $this->lbFactory->commitPrimaryChanges( $fnameTrxOwner );
363 $this->lbFactory->beginPrimaryChanges( $fnameTrxOwner );
367 $scope = LoggerFactory::getContext()->addScoped( [
368 'context.job_type' => $jType,
370 $status =
$job->run();
371 $error =
$job->getLastError();
372 ScopedCallback::consume( $scope );
375 $this->lbFactory->commitPrimaryChanges(
381 DeferredUpdates::doUpdates();
382 }
catch ( Throwable $e ) {
383 MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );
385 $error = get_class( $e ) .
': ' . $e->getMessage() .
' in '
386 . $e->getFile() .
' on line ' . $e->getLine();
387 $caught[] = get_class( $e );
391 $job->tearDown( $status );
392 }
catch ( Throwable $e ) {
393 MWExceptionHandler::logException( $e );
396 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
397 $rssEnd = $this->getMaxRssKb();
400 $readyTs =
$job->getReadyTimestamp();
402 $pickupDelay = max( 0, $jobStartTime - $readyTs );
403 $this->statsFactory->getTiming(
'jobqueue_pickup_delay_seconds' )
404 ->setLabel(
'jobtype', $jType )
405 ->observe( 1000 * $pickupDelay );
408 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
409 if ( $rootTimestamp ) {
410 $age = max( 0, $jobStartTime - (
int)
wfTimestamp( TS::UNIX, $rootTimestamp ) );
412 $this->statsFactory->getTiming(
"jobqueue_pickup_root_age_seconds" )
413 ->setLabel(
'jobtype', $jType )
414 ->observe( 1000 * $age );
417 $this->statsFactory->getTiming(
'jobqueue_runtime_seconds' )
418 ->setLabel(
'jobtype', $jType )
419 ->observe( $timeMs );
421 if ( $rssStart && $rssEnd ) {
422 $this->statsFactory->getCounter(
'jobqueue_rss_delta_total' )
423 ->setLabel(
'rss_delta', $jType )
424 ->incrementBy( $rssEnd - $rssStart );
427 if ( $status ===
false ) {
428 $msg =
$job->toString() .
" t={job_duration} error={job_error}";
429 $this->logger->error( $msg, [
430 'job_type' =>
$job->getType(),
431 'job_duration' => $timeMs,
432 'job_error' => $error,
435 $msg =
$job->toString() .
" t=$timeMs error={$error}";
436 $this->debugCallback( $msg );
438 $msg =
$job->toString() .
" t={job_duration} good";
439 $this->logger->info( $msg, [
440 'job_type' =>
$job->getType(),
441 'job_duration' => $timeMs,
444 $msg =
$job->toString() .
" t=$timeMs good";
445 $this->debugCallback( $msg );
460 private function getErrorBackoffTTL( array $caught ) {
461 return in_array( DBReadOnlyError::class, $caught )
462 ? self::READONLY_BACKOFF_TTL
463 : self::ERROR_BACKOFF_TTL;
469 private function getMaxRssKb() {
470 $info = getrusage( 0 );
472 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] : null;
480 private function getBackoffTimeToWait( RunnableJob
$job ) {
483 if ( !isset( $throttling[
$job->getType()] ) ||
$job instanceof DuplicateJob ) {
487 $itemsPerSecond = $throttling[
$job->getType()];
488 if ( $itemsPerSecond <= 0 ) {
493 if (
$job->workItemCount() > 0 ) {
494 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
496 $seconds = floor( $exactSeconds );
497 $remainder = $exactSeconds - $seconds;
498 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
501 return (
int)$seconds;
512 private function loadBackoffs( array $backoffs, $mode =
'wait' ) {
513 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
514 if ( is_file( $file ) ) {
515 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
516 $handle = fopen( $file,
'rb' );
517 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
521 $content = stream_get_contents( $handle );
522 flock( $handle, LOCK_UN );
524 $ctime = microtime(
true );
525 $cBackoffs = json_decode( $content,
true ) ?: [];
526 foreach ( $cBackoffs as $type => $timestamp ) {
527 if ( $timestamp < $ctime ) {
528 unset( $cBackoffs[$type] );
549 private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode =
'wait' ) {
551 return $this->loadBackoffs( $backoffs, $mode );
554 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
555 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
556 $handle = fopen( $file,
'wb+' );
557 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
561 $ctime = microtime(
true );
562 $content = stream_get_contents( $handle );
563 $cBackoffs = json_decode( $content,
true ) ?: [];
564 foreach ( $deltas as $type => $seconds ) {
565 $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
566 ? $cBackoffs[$type] + $seconds
569 foreach ( $cBackoffs as $type => $timestamp ) {
570 if ( $timestamp < $ctime ) {
571 unset( $cBackoffs[$type] );
574 ftruncate( $handle, 0 );
575 fwrite( $handle, json_encode( $cBackoffs ) );
576 flock( $handle, LOCK_UN );
589 private function checkMemoryOK() {
590 static $maxBytes =
null;
591 if ( $maxBytes ===
null ) {
593 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
594 [ , $num, $unit ] = $m;
595 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
596 $maxBytes = (int)$num * $conv[strtolower( $unit )];
601 $usedBytes = memory_get_usage();
602 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
603 $msg =
"Detected excessive memory usage ({used_bytes}/{max_bytes}).";
604 $this->logger->error( $msg, [
605 'used_bytes' => $usedBytes,
606 'max_bytes' => $maxBytes,
609 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
610 $this->debugCallback( $msg );
622 private function debugCallback( $msg ) {
623 if ( $this->debug ) {
624 ( $this->debug )( ConvertibleTimestamp::now( TS::DB ) .
" $msg\n" );