26 use Liuggio\StatsdClient\Factory\StatsdDataFactory;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Wikimedia\ScopedCallback;
76 $logger = LoggerFactory::getInstance(
'runJobs' );
79 $this->config = MediaWikiServices::getInstance()->getMainConfig();
107 $jobClasses = $this->config->get(
'JobClasses' );
108 $profilerLimits = $this->config->get(
'TrxProfilerLimits' );
110 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
118 if (
$type !==
false && !isset( $jobClasses[
$type] ) ) {
128 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
131 list( , $maxLag ) = $lbFactory->getMainLB(
wfWikiID() )->getMaxLag();
132 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
133 $response[
'reached'] =
'replica-lag-limit';
138 $lbFactory->commitAll( __METHOD__ );
142 $trxProfiler->setLogger( LoggerFactory::getInstance(
'DBPerformance' ) );
143 $trxProfiler->setExpectations( $profilerLimits[
'JobRunner'], __METHOD__ );
151 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
154 $startTime = microtime(
true );
159 $blacklist = $noThrottle ? [] : array_keys( $backoffs );
162 if (
$type ===
false ) {
168 } elseif ( in_array(
$type, $blacklist ) ) {
173 $lbFactory->commitMasterChanges( __METHOD__ );
178 $jType =
$job->getType();
187 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
188 ? $backoffDeltas[$jType] + $ttw
194 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
196 $lbFactory->commitMasterChanges( __METHOD__ );
200 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
202 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
203 ? $backoffDeltas[$jType] + $ttw
209 'status' => ( $info[
'status'] ===
false ) ?
'failed' :
'ok',
210 'error' => $info[
'error'],
211 'time' => $info[
'timeMs']
213 $timeMsTotal += $info[
'timeMs'];
216 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
219 } elseif ( $maxTime && ( microtime(
true ) - $startTime ) > $maxTime ) {
227 $timePassed = microtime(
true ) - $lastCheckTime;
228 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
230 $lbFactory->waitForReplication( [
231 'ifWritesSince' => $lastCheckTime,
232 'timeout' => self::MAX_ALLOWED_LAG
235 $response[
'reached'] =
'replica-lag-limit';
238 $lastCheckTime = microtime(
true );
241 if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
242 $group->waitForBackups();
254 if ( $backoffDeltas ) {
269 return strpos( $error,
'DBReadOnlyError' ) !==
false
282 $jType =
$job->getType();
283 $msg =
$job->toString() .
" STARTING";
284 $this->logger->debug( $msg, [
285 'job_type' =>
$job->getType(),
291 $jobStartTime = microtime(
true );
293 $fnameTrxOwner = get_class(
$job ) .
'::run';
296 $error =
$job->getLastError();
302 }
catch ( Exception
$e ) {
305 $error = get_class(
$e ) .
': ' .
$e->getMessage();
310 }
catch ( Exception
$e ) {
319 MediaWikiServices::getInstance()->getLinkCache()->clear();
320 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
324 $readyTs =
$job->getReadyTimestamp();
326 $pickupDelay = max( 0, $popTime - $readyTs );
327 $stats->timing(
'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
328 $stats->timing(
"jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
331 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
332 if ( $rootTimestamp ) {
333 $age = max( 0, $popTime -
wfTimestamp( TS_UNIX, $rootTimestamp ) );
334 $stats->timing(
"jobqueue.pickup_root_age.$jType", 1000 * $age );
337 $stats->timing(
"jobqueue.run.$jType", $timeMs );
339 if ( $rssStart && $rssEnd ) {
340 $stats->updateCount(
"jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
344 $msg =
$job->toString() .
" t={job_duration} error={job_error}";
345 $this->logger->error( $msg, [
346 'job_type' =>
$job->getType(),
347 'job_duration' => $timeMs,
348 'job_error' => $error,
351 $msg =
$job->toString() .
" t=$timeMs error={$error}";
354 $msg =
$job->toString() .
" t={job_duration} good";
355 $this->logger->info( $msg, [
356 'job_type' =>
$job->getType(),
357 'job_duration' => $timeMs,
360 $msg =
$job->toString() .
" t=$timeMs good";
364 return [
'status' =>
$status,
'error' => $error,
'timeMs' => $timeMs ];
373 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] :
null;
382 $throttling = $this->config->get(
'JobBackoffThrottling' );
388 $itemsPerSecond = $throttling[
$job->getType()];
389 if ( $itemsPerSecond <= 0 ) {
394 if (
$job->workItemCount() > 0 ) {
395 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
397 $seconds = floor( $exactSeconds );
398 $remainder = $exactSeconds - $seconds;
399 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
402 return (
int)$seconds;
414 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
415 if ( is_file( $file ) ) {
416 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
417 $handle = fopen( $file,
'rb' );
418 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
422 $content = stream_get_contents( $handle );
423 flock( $handle, LOCK_UN );
425 $ctime = microtime(
true );
426 $cBackoffs = json_decode( $content,
true ) ?: [];
427 foreach ( $cBackoffs
as $type => $timestamp ) {
428 if ( $timestamp < $ctime ) {
429 unset( $cBackoffs[
$type] );
455 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
456 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
457 $handle = fopen( $file,
'wb+' );
458 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
462 $ctime = microtime(
true );
463 $content = stream_get_contents( $handle );
464 $cBackoffs = json_decode( $content,
true ) ?: [];
465 foreach ( $deltas
as $type => $seconds ) {
466 $cBackoffs[
$type] = isset( $cBackoffs[
$type] ) && $cBackoffs[
$type] >= $ctime
467 ? $cBackoffs[
$type] + $seconds
470 foreach ( $cBackoffs
as $type => $timestamp ) {
471 if ( $timestamp < $ctime ) {
472 unset( $cBackoffs[
$type] );
475 ftruncate( $handle, 0 );
476 fwrite( $handle, json_encode( $cBackoffs ) );
477 flock( $handle, LOCK_UN );
491 static $maxBytes =
null;
492 if ( $maxBytes ===
null ) {
494 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
495 list( , $num, $unit ) = $m;
496 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
497 $maxBytes = $num * $conv[strtolower( $unit )];
502 $usedBytes = memory_get_usage();
503 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
504 $msg =
"Detected excessive memory usage ({used_bytes}/{max_bytes}).";
505 $this->logger->error( $msg, [
506 'used_bytes' => $usedBytes,
507 'max_bytes' => $maxBytes,
510 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
524 if ( $this->debug ) {
525 call_user_func_array( $this->debug, [
wfTimestamp( TS_DB ) .
" $msg\n" ] );
541 $syncThreshold = $this->config->get(
'JobSerialCommitThreshold' );
545 if ( $syncThreshold !==
false && $lb->getServerCount() > 1 ) {
547 $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
549 if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
550 $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
551 if (
$time < $syncThreshold ) {
566 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
572 $ms = intval( 1000 *
$time );
574 $msg =
$job->toString() .
" COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
575 $this->logger->info( $msg, [
576 'job_type' =>
$job->getType(),
577 'job_commit_write_ms' => $ms,
580 $msg =
$job->toString() .
" COMMIT ENQUEUED [{$ms}ms of writes]";
584 if ( !$dbwSerial->lock(
'jobrunner-serial-commit', __METHOD__, 30 ) ) {
586 throw new DBError( $dbwSerial,
"Timed out waiting on commit queue." );
588 $unlocker =
new ScopedCallback(
function ()
use ( $dbwSerial ) {
589 $dbwSerial->unlock(
'jobrunner-serial-commit', __METHOD__ );
593 $pos = $lb->getMasterPos();
595 $lb->waitForAll( $pos );
602 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
604 ScopedCallback::consume( $unlocker );