26 use Liuggio\StatsdClient\Factory\StatsdDataFactory;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Wikimedia\ScopedCallback;
74 $logger = LoggerFactory::getInstance(
'runJobs' );
104 global $wgJobClasses, $wgTrxProfilerLimits;
106 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
114 if (
$type !==
false && !isset( $wgJobClasses[
$type] ) ) {
124 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
128 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
129 $response[
'reached'] =
'replica-lag-limit';
138 $trxProfiler->setLogger( LoggerFactory::getInstance(
'DBPerformance' ) );
139 $trxProfiler->setExpectations( $wgTrxProfilerLimits[
'JobRunner'], __METHOD__ );
147 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
150 $startTime = microtime(
true );
155 $blacklist = $noThrottle ? [] : array_keys( $backoffs );
158 if (
$type ===
false ) {
164 } elseif ( in_array(
$type, $blacklist ) ) {
169 $lbFactory->commitMasterChanges( __METHOD__ );
174 $jType =
$job->getType();
183 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
184 ? $backoffDeltas[$jType] + $ttw
190 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
192 $lbFactory->commitMasterChanges( __METHOD__ );
196 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
198 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
199 ? $backoffDeltas[$jType] + $ttw
205 'status' => ( $info[
'status'] ===
false ) ?
'failed' :
'ok',
206 'error' => $info[
'error'],
207 'time' => $info[
'timeMs']
209 $timeMsTotal += $info[
'timeMs'];
212 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
215 } elseif ( $maxTime && ( microtime(
true ) - $startTime ) > $maxTime ) {
223 $timePassed = microtime(
true ) - $lastCheckTime;
224 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
227 'ifWritesSince' => $lastCheckTime,
228 'timeout' => self::MAX_ALLOWED_LAG
231 $response[
'reached'] =
'replica-lag-limit';
234 $lastCheckTime = microtime(
true );
237 if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
238 $group->waitForBackups();
250 if ( $backoffDeltas ) {
265 return strpos( $error,
'DBReadOnlyError' ) !==
false
278 $jType =
$job->getType();
279 $msg =
$job->toString() .
" STARTING";
280 $this->logger->debug( $msg );
285 $jobStartTime = microtime(
true );
287 $fnameTrxOwner = get_class(
$job ) .
'::run';
288 $lbFactory->beginMasterChanges( $fnameTrxOwner );
290 $error =
$job->getLastError();
296 }
catch ( Exception
$e ) {
299 $error = get_class(
$e ) .
': ' .
$e->getMessage();
304 }
catch ( Exception
$e ) {
311 $lbFactory->flushReplicaSnapshots( __METHOD__ );
313 MediaWikiServices::getInstance()->getLinkCache()->clear();
314 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
318 $readyTs =
$job->getReadyTimestamp();
320 $pickupDelay = max( 0, $popTime - $readyTs );
321 $stats->timing(
'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
322 $stats->timing(
"jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
325 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
326 if ( $rootTimestamp ) {
327 $age = max( 0, $popTime -
wfTimestamp( TS_UNIX, $rootTimestamp ) );
328 $stats->timing(
"jobqueue.pickup_root_age.$jType", 1000 * $age );
331 $stats->timing(
"jobqueue.run.$jType", $timeMs );
333 if ( $rssStart && $rssEnd ) {
334 $stats->updateCount(
"jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
338 $msg =
$job->toString() .
" t=$timeMs error={$error}";
339 $this->logger->error( $msg );
342 $msg =
$job->toString() .
" t=$timeMs good";
343 $this->logger->info( $msg );
347 return [
'status' =>
$status,
'error' => $error,
'timeMs' => $timeMs ];
356 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] :
null;
365 global $wgJobBackoffThrottling;
367 if ( !isset( $wgJobBackoffThrottling[
$job->getType()] ) ||
373 $itemsPerSecond = $wgJobBackoffThrottling[
$job->getType()];
374 if ( $itemsPerSecond <= 0 ) {
379 if (
$job->workItemCount() > 0 ) {
380 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
382 $seconds = floor( $exactSeconds );
383 $remainder = $exactSeconds - $seconds;
384 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
387 return (
int)$seconds;
399 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
400 if ( is_file( $file ) ) {
401 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
402 $handle = fopen( $file,
'rb' );
403 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
407 $content = stream_get_contents( $handle );
408 flock( $handle, LOCK_UN );
410 $ctime = microtime(
true );
411 $cBackoffs = json_decode(
$content,
true ) ?: [];
412 foreach ( $cBackoffs
as $type => $timestamp ) {
413 if ( $timestamp < $ctime ) {
414 unset( $cBackoffs[
$type] );
440 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
441 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
442 $handle = fopen( $file,
'wb+' );
443 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
447 $ctime = microtime(
true );
448 $content = stream_get_contents( $handle );
449 $cBackoffs = json_decode(
$content,
true ) ?: [];
450 foreach ( $deltas
as $type => $seconds ) {
451 $cBackoffs[
$type] = isset( $cBackoffs[
$type] ) && $cBackoffs[
$type] >= $ctime
452 ? $cBackoffs[
$type] + $seconds
455 foreach ( $cBackoffs
as $type => $timestamp ) {
456 if ( $timestamp < $ctime ) {
457 unset( $cBackoffs[
$type] );
460 ftruncate( $handle, 0 );
461 fwrite( $handle, json_encode( $cBackoffs ) );
462 flock( $handle, LOCK_UN );
476 static $maxBytes =
null;
477 if ( $maxBytes ===
null ) {
479 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
480 list( , $num, $unit ) = $m;
481 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
482 $maxBytes = $num * $conv[strtolower( $unit )];
487 $usedBytes = memory_get_usage();
488 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
489 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
491 $this->logger->error( $msg );
504 if ( $this->debug ) {
505 call_user_func_array( $this->debug, [
wfTimestamp( TS_DB ) .
" $msg\n" ] );
521 global $wgJobSerialCommitThreshold;
525 if ( $wgJobSerialCommitThreshold !==
false && $lb->getServerCount() > 1 ) {
527 $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
529 if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
530 $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
531 if (
$time < $wgJobSerialCommitThreshold ) {
543 $lbFactory->commitMasterChanges( $fnameTrxOwner );
547 $ms = intval( 1000 *
$time );
548 $msg =
$job->toString() .
" COMMIT ENQUEUED [{$ms}ms of writes]";
549 $this->logger->info( $msg );
553 if ( !$dbwSerial->lock(
'jobrunner-serial-commit', __METHOD__, 30 ) ) {
555 throw new DBError( $dbwSerial,
"Timed out waiting on commit queue." );
557 $unlocker =
new ScopedCallback(
function ()
use ( $dbwSerial ) {
558 $dbwSerial->unlock(
'jobrunner-serial-commit', __METHOD__ );
562 $pos = $lb->getMasterPos();
564 $lb->waitForAll( $pos );
568 $lbFactory->commitMasterChanges( $fnameTrxOwner );
569 ScopedCallback::consume( $unlocker );