26 use Liuggio\StatsdClient\Factory\StatsdDataFactory;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Wikimedia\ScopedCallback;
76 $logger = LoggerFactory::getInstance(
'runJobs' );
79 $this->config = MediaWikiServices::getInstance()->getMainConfig();
107 $jobClasses = $this->config->get(
'JobClasses' );
108 $profilerLimits = $this->config->get(
'TrxProfilerLimits' );
110 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
118 if (
$type !==
false && !isset( $jobClasses[
$type] ) ) {
129 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
130 if ( $lbFactory->hasTransactionRound() ) {
131 throw new LogicException( __METHOD__ .
' called with an active transaction round.' );
135 list( , $maxLag ) = $lbFactory->getMainLB(
wfWikiID() )->getMaxLag();
136 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
137 $response[
'reached'] =
'replica-lag-limit';
143 $trxProfiler->setLogger( LoggerFactory::getInstance(
'DBPerformance' ) );
144 $trxProfiler->setExpectations( $profilerLimits[
'JobRunner'], __METHOD__ );
152 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
155 $startTime = microtime(
true );
160 $blacklist = $noThrottle ? [] : array_keys( $backoffs );
163 if (
$type ===
false ) {
169 } elseif ( in_array(
$type, $blacklist ) ) {
178 $jType =
$job->getType();
187 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
188 ? $backoffDeltas[$jType] + $ttw
194 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
199 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
201 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
202 ? $backoffDeltas[$jType] + $ttw
208 'status' => ( $info[
'status'] ===
false ) ?
'failed' :
'ok',
209 'error' => $info[
'error'],
210 'time' => $info[
'timeMs']
212 $timeMsTotal += $info[
'timeMs'];
215 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
218 } elseif ( $maxTime && ( microtime(
true ) - $startTime ) > $maxTime ) {
226 $timePassed = microtime(
true ) - $lastCheckTime;
227 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
229 $lbFactory->waitForReplication( [
230 'ifWritesSince' => $lastCheckTime,
231 'timeout' => self::MAX_ALLOWED_LAG
234 $response[
'reached'] =
'replica-lag-limit';
237 $lastCheckTime = microtime(
true );
240 if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
241 $group->waitForBackups();
253 if ( $backoffDeltas ) {
268 return strpos( $error,
'DBReadOnlyError' ) !==
false
281 $jType =
$job->getType();
282 $msg =
$job->toString() .
" STARTING";
283 $this->logger->debug( $msg, [
284 'job_type' =>
$job->getType(),
290 $jobStartTime = microtime(
true );
292 $fnameTrxOwner = get_class(
$job ) .
'::run';
293 if ( !
$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
297 $error =
$job->getLastError();
303 }
catch ( Exception
$e ) {
306 $error = get_class(
$e ) .
': ' .
$e->getMessage();
311 }
catch ( Exception
$e ) {
320 MediaWikiServices::getInstance()->getLinkCache()->clear();
321 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
325 $readyTs =
$job->getReadyTimestamp();
327 $pickupDelay = max( 0, $popTime - $readyTs );
328 $stats->timing(
'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
329 $stats->timing(
"jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
332 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
333 if ( $rootTimestamp ) {
334 $age = max( 0, $popTime -
wfTimestamp( TS_UNIX, $rootTimestamp ) );
335 $stats->timing(
"jobqueue.pickup_root_age.$jType", 1000 * $age );
338 $stats->timing(
"jobqueue.run.$jType", $timeMs );
340 if ( $rssStart && $rssEnd ) {
341 $stats->updateCount(
"jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
345 $msg =
$job->toString() .
" t={job_duration} error={job_error}";
346 $this->logger->error( $msg, [
347 'job_type' =>
$job->getType(),
348 'job_duration' => $timeMs,
349 'job_error' => $error,
352 $msg =
$job->toString() .
" t=$timeMs error={$error}";
355 $msg =
$job->toString() .
" t={job_duration} good";
356 $this->logger->info( $msg, [
357 'job_type' =>
$job->getType(),
358 'job_duration' => $timeMs,
361 $msg =
$job->toString() .
" t=$timeMs good";
365 return [
'status' =>
$status,
'error' => $error,
'timeMs' => $timeMs ];
374 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] :
null;
383 $throttling = $this->config->get(
'JobBackoffThrottling' );
389 $itemsPerSecond = $throttling[
$job->getType()];
390 if ( $itemsPerSecond <= 0 ) {
395 if (
$job->workItemCount() > 0 ) {
396 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
398 $seconds = floor( $exactSeconds );
399 $remainder = $exactSeconds - $seconds;
400 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
403 return (
int)$seconds;
415 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
416 if ( is_file( $file ) ) {
417 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
418 $handle = fopen( $file,
'rb' );
419 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
423 $content = stream_get_contents( $handle );
424 flock( $handle, LOCK_UN );
426 $ctime = microtime(
true );
427 $cBackoffs = json_decode( $content,
true ) ?: [];
428 foreach ( $cBackoffs
as $type => $timestamp ) {
429 if ( $timestamp < $ctime ) {
430 unset( $cBackoffs[
$type] );
456 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
457 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
458 $handle = fopen( $file,
'wb+' );
459 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
463 $ctime = microtime(
true );
464 $content = stream_get_contents( $handle );
465 $cBackoffs = json_decode( $content,
true ) ?: [];
466 foreach ( $deltas
as $type => $seconds ) {
467 $cBackoffs[
$type] = isset( $cBackoffs[
$type] ) && $cBackoffs[
$type] >= $ctime
468 ? $cBackoffs[
$type] + $seconds
471 foreach ( $cBackoffs
as $type => $timestamp ) {
472 if ( $timestamp < $ctime ) {
473 unset( $cBackoffs[
$type] );
476 ftruncate( $handle, 0 );
477 fwrite( $handle, json_encode( $cBackoffs ) );
478 flock( $handle, LOCK_UN );
492 static $maxBytes =
null;
493 if ( $maxBytes ===
null ) {
495 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
496 list( , $num, $unit ) = $m;
497 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
498 $maxBytes = $num * $conv[strtolower( $unit )];
503 $usedBytes = memory_get_usage();
504 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
505 $msg =
"Detected excessive memory usage ({used_bytes}/{max_bytes}).";
506 $this->logger->error( $msg, [
507 'used_bytes' => $usedBytes,
508 'max_bytes' => $maxBytes,
511 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
525 if ( $this->debug ) {
526 call_user_func_array( $this->debug, [
wfTimestamp( TS_DB ) .
" $msg\n" ] );
542 $syncThreshold = $this->config->get(
'JobSerialCommitThreshold' );
546 if ( $syncThreshold !==
false && $lb->getServerCount() > 1 ) {
548 $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
550 if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
551 $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
552 if (
$time < $syncThreshold ) {
567 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
573 $ms = intval( 1000 *
$time );
575 $msg =
$job->toString() .
" COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
576 $this->logger->info( $msg, [
577 'job_type' =>
$job->getType(),
578 'job_commit_write_ms' => $ms,
581 $msg =
$job->toString() .
" COMMIT ENQUEUED [{$ms}ms of writes]";
585 if ( !$dbwSerial->lock(
'jobrunner-serial-commit', __METHOD__, 30 ) ) {
587 throw new DBError( $dbwSerial,
"Timed out waiting on commit queue." );
589 $unlocker =
new ScopedCallback(
function ()
use ( $dbwSerial ) {
590 $dbwSerial->unlock(
'jobrunner-serial-commit', __METHOD__ );
594 $pos = $lb->getMasterPos();
596 $lb->waitForAll( $pos );
603 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
605 ScopedCallback::consume( $unlocker );