26 use Liuggio\StatsdClient\Factory\StatsdDataFactory;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Wikimedia\ScopedCallback;
75 $logger = LoggerFactory::getInstance(
'runJobs' );
78 $this->config = MediaWikiServices::getInstance()->getMainConfig();
106 $jobClasses = $this->config->get(
'JobClasses' );
107 $profilerLimits = $this->config->get(
'TrxProfilerLimits' );
109 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
112 $maxJobs =
$options[
'maxJobs'] ??
false;
113 $maxTime =
$options[
'maxTime'] ??
false;
117 if (
$type !==
false && !isset( $jobClasses[
$type] ) ) {
128 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
129 if ( $lbFactory->hasTransactionRound() ) {
130 throw new LogicException( __METHOD__ .
' called with an active transaction round.' );
134 list( , $maxLag ) = $lbFactory->getMainLB()->getMaxLag();
135 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
136 $response[
'reached'] =
'replica-lag-limit';
142 $trxProfiler->setLogger( LoggerFactory::getInstance(
'DBPerformance' ) );
143 $trxProfiler->setExpectations( $profilerLimits[
'JobRunner'], __METHOD__ );
151 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
154 $startTime = microtime(
true );
159 $blacklist = $noThrottle ? [] : array_keys( $backoffs );
162 if (
$type ===
false ) {
168 } elseif ( in_array(
$type, $blacklist ) ) {
177 $jType =
$job->getType();
186 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
187 ? $backoffDeltas[$jType] + $ttw
193 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
198 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
200 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
201 ? $backoffDeltas[$jType] + $ttw
207 'status' => ( $info[
'status'] ===
false ) ?
'failed' :
'ok',
208 'error' => $info[
'error'],
209 'time' => $info[
'timeMs']
211 $timeMsTotal += $info[
'timeMs'];
214 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
217 } elseif ( $maxTime && ( microtime(
true ) - $startTime ) > $maxTime ) {
225 $timePassed = microtime(
true ) - $lastCheckTime;
226 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
227 $success = $lbFactory->waitForReplication( [
228 'ifWritesSince' => $lastCheckTime,
229 'timeout' => self::MAX_ALLOWED_LAG,
232 $response[
'reached'] =
'replica-lag-limit';
235 $lastCheckTime = microtime(
true );
247 if ( $backoffDeltas ) {
262 return strpos( $error,
'DBReadOnlyError' ) !==
false
275 $jType =
$job->getType();
276 $msg =
$job->toString() .
" STARTING";
277 $this->logger->debug( $msg, [
278 'job_type' =>
$job->getType(),
284 $jobStartTime = microtime(
true );
286 $fnameTrxOwner = get_class(
$job ) .
'::run';
287 if ( !
$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
291 $error =
$job->getLastError();
295 }
catch ( Exception
$e ) {
298 $error = get_class(
$e ) .
': ' .
$e->getMessage();
303 }
catch ( Exception
$e ) {
312 MediaWikiServices::getInstance()->getLinkCache()->clear();
313 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
317 $readyTs =
$job->getReadyTimestamp();
319 $pickupDelay = max( 0, $popTime - $readyTs );
320 $stats->timing(
'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
321 $stats->timing(
"jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
324 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
325 if ( $rootTimestamp ) {
326 $age = max( 0, $popTime -
wfTimestamp( TS_UNIX, $rootTimestamp ) );
327 $stats->timing(
"jobqueue.pickup_root_age.$jType", 1000 * $age );
330 $stats->timing(
"jobqueue.run.$jType", $timeMs );
332 if ( $rssStart && $rssEnd ) {
333 $stats->updateCount(
"jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
337 $msg =
$job->toString() .
" t={job_duration} error={job_error}";
338 $this->logger->error( $msg, [
339 'job_type' =>
$job->getType(),
340 'job_duration' => $timeMs,
341 'job_error' => $error,
344 $msg =
$job->toString() .
" t=$timeMs error={$error}";
347 $msg =
$job->toString() .
" t={job_duration} good";
348 $this->logger->info( $msg, [
349 'job_type' =>
$job->getType(),
350 'job_duration' => $timeMs,
353 $msg =
$job->toString() .
" t=$timeMs good";
357 return [
'status' =>
$status,
'error' => $error,
'timeMs' => $timeMs ];
366 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] :
null;
375 $throttling = $this->config->get(
'JobBackoffThrottling' );
381 $itemsPerSecond = $throttling[
$job->getType()];
382 if ( $itemsPerSecond <= 0 ) {
387 if (
$job->workItemCount() > 0 ) {
388 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
390 $seconds = floor( $exactSeconds );
391 $remainder = $exactSeconds - $seconds;
392 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
395 return (
int)$seconds;
407 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
408 if ( is_file( $file ) ) {
409 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
410 $handle = fopen( $file,
'rb' );
411 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
415 $content = stream_get_contents( $handle );
416 flock( $handle, LOCK_UN );
418 $ctime = microtime(
true );
419 $cBackoffs = json_decode(
$content,
true ) ?: [];
420 foreach ( $cBackoffs
as $type => $timestamp ) {
421 if ( $timestamp < $ctime ) {
422 unset( $cBackoffs[
$type] );
448 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
449 $file =
wfTempDir() .
'/mw-runJobs-backoffs.json';
450 $handle = fopen( $file,
'wb+' );
451 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
455 $ctime = microtime(
true );
456 $content = stream_get_contents( $handle );
457 $cBackoffs = json_decode(
$content,
true ) ?: [];
458 foreach ( $deltas
as $type => $seconds ) {
459 $cBackoffs[
$type] = isset( $cBackoffs[
$type] ) && $cBackoffs[
$type] >= $ctime
460 ? $cBackoffs[
$type] + $seconds
463 foreach ( $cBackoffs
as $type => $timestamp ) {
464 if ( $timestamp < $ctime ) {
465 unset( $cBackoffs[
$type] );
468 ftruncate( $handle, 0 );
469 fwrite( $handle, json_encode( $cBackoffs ) );
470 flock( $handle, LOCK_UN );
484 static $maxBytes =
null;
485 if ( $maxBytes ===
null ) {
487 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
488 list( , $num, $unit ) = $m;
489 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
490 $maxBytes = $num * $conv[strtolower( $unit )];
495 $usedBytes = memory_get_usage();
496 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
497 $msg =
"Detected excessive memory usage ({used_bytes}/{max_bytes}).";
498 $this->logger->error( $msg, [
499 'used_bytes' => $usedBytes,
500 'max_bytes' => $maxBytes,
503 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
517 if ( $this->debug ) {
518 call_user_func_array( $this->debug, [
wfTimestamp( TS_DB ) .
" $msg\n" ] );
534 $syncThreshold = $this->config->get(
'JobSerialCommitThreshold' );
538 if ( $syncThreshold !==
false && $lb->getServerCount() > 1 ) {
540 $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
542 if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
543 $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
544 if (
$time < $syncThreshold ) {
559 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
565 $ms = intval( 1000 *
$time );
567 $msg =
$job->toString() .
" COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
568 $this->logger->info( $msg, [
569 'job_type' =>
$job->getType(),
570 'job_commit_write_ms' => $ms,
573 $msg =
$job->toString() .
" COMMIT ENQUEUED [{$ms}ms of writes]";
577 if ( !$dbwSerial->lock(
'jobrunner-serial-commit', $fnameTrxOwner, 30 ) ) {
579 throw new DBError( $dbwSerial,
"Timed out waiting on commit queue." );
581 $unlocker =
new ScopedCallback(
function ()
use ( $dbwSerial, $fnameTrxOwner ) {
582 $dbwSerial->unlock(
'jobrunner-serial-commit', $fnameTrxOwner );
586 $pos = $lb->getMasterPos();
588 $lb->waitForAll( $pos );
595 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
597 ScopedCallback::consume( $unlocker );