26 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
27 use Psr\Log\LoggerAwareInterface;
28 use Psr\Log\LoggerInterface;
29 use Wikimedia\ScopedCallback;
75 $logger = LoggerFactory::getInstance(
'runJobs' );
78 $this->config = MediaWikiServices::getInstance()->getMainConfig();
105 public function run( array $options ) {
106 $jobClasses = $this->config->get(
'JobClasses' );
107 $profilerLimits = $this->config->get(
'TrxProfilerLimits' );
109 $response = [
'jobs' => [],
'reached' =>
'none-ready' ];
111 $type = $options[
'type'] ??
false;
112 $maxJobs = $options[
'maxJobs'] ??
false;
113 $maxTime = $options[
'maxTime'] ??
false;
114 $noThrottle = isset( $options[
'throttle'] ) && !$options[
'throttle'];
117 if (
$type !==
false && !isset( $jobClasses[
$type] ) ) {
128 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
129 if ( $lbFactory->hasTransactionRound() ) {
130 throw new LogicException( __METHOD__ .
' called with an active transaction round.' );
134 list( , $maxLag ) = $lbFactory->getMainLB()->getMaxLag();
135 if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
136 $response[
'reached'] =
'replica-lag-limit';
142 $trxProfiler->setLogger( LoggerFactory::getInstance(
'DBPerformance' ) );
143 $trxProfiler->setExpectations( $profilerLimits[
'JobRunner'], __METHOD__ );
151 $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
154 $startTime = microtime(
true );
159 $blacklist = $noThrottle ? [] : array_keys( $backoffs );
162 if (
$type ===
false ) {
168 } elseif ( in_array(
$type, $blacklist ) ) {
177 $jType =
$job->getType();
186 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
187 ? $backoffDeltas[$jType] + $ttw
193 if ( $info[
'status'] !==
false || !
$job->allowRetries() ) {
198 if ( $info[
'status'] ===
false && mt_rand( 0, 49 ) == 0 ) {
200 $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
201 ? $backoffDeltas[$jType] + $ttw
207 'status' => ( $info[
'status'] === false ) ?
'failed' :
'ok',
208 'error' => $info[
'error'],
209 'time' => $info[
'timeMs']
211 $timeMsTotal += $info[
'timeMs'];
214 if ( $maxJobs && $jobsPopped >= $maxJobs ) {
217 } elseif ( $maxTime && ( microtime(
true ) - $startTime ) > $maxTime ) {
225 $timePassed = microtime(
true ) - $lastCheckTime;
226 if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
227 $success = $lbFactory->waitForReplication( [
228 'ifWritesSince' => $lastCheckTime,
229 'timeout' => self::MAX_ALLOWED_LAG,
232 $response[
'reached'] =
'replica-lag-limit';
235 $lastCheckTime = microtime(
true );
247 if ( $backoffDeltas ) {
262 return strpos( $error,
'DBReadOnlyError' ) !==
false
275 $jType =
$job->getType();
276 $msg =
$job->toString() .
" STARTING";
277 $this->logger->debug( $msg, [
278 'job_type' =>
$job->getType(),
284 MediaWikiServices::getInstance()->getLinkCache()->clear();
288 $jobStartTime = microtime(
true );
290 $fnameTrxOwner = get_class(
$job ) .
'::run';
292 if (
$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
300 $error =
$job->getLastError();
305 }
catch ( Exception $e ) {
308 $error = get_class( $e ) .
': ' . $e->getMessage();
313 }
catch ( Exception $e ) {
317 $timeMs = intval( ( microtime(
true ) - $jobStartTime ) * 1000 );
321 $readyTs =
$job->getReadyTimestamp();
323 $pickupDelay = max( 0, $popTime - $readyTs );
324 $stats->timing(
'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
325 $stats->timing(
"jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
328 $rootTimestamp =
$job->getRootJobParams()[
'rootJobTimestamp'];
329 if ( $rootTimestamp ) {
330 $age = max( 0, $popTime -
wfTimestamp( TS_UNIX, $rootTimestamp ) );
331 $stats->timing(
"jobqueue.pickup_root_age.$jType", 1000 * $age );
334 $stats->timing(
"jobqueue.run.$jType", $timeMs );
336 if ( $rssStart && $rssEnd ) {
337 $stats->updateCount(
"jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
341 $msg =
$job->toString() .
" t={job_duration} error={job_error}";
342 $this->logger->error( $msg, [
343 'job_type' =>
$job->getType(),
344 'job_duration' => $timeMs,
345 'job_error' => $error,
348 $msg =
$job->toString() .
" t=$timeMs error={$error}";
351 $msg =
$job->toString() .
" t={job_duration} good";
352 $this->logger->info( $msg, [
353 'job_type' =>
$job->getType(),
354 'job_duration' => $timeMs,
357 $msg =
$job->toString() .
" t=$timeMs good";
361 return [
'status' =>
$status,
'error' => $error,
'timeMs' => $timeMs ];
370 return isset( $info[
'ru_maxrss'] ) ? (int)$info[
'ru_maxrss'] :
null;
379 $throttling = $this->config->get(
'JobBackoffThrottling' );
385 $itemsPerSecond = $throttling[
$job->getType()];
386 if ( $itemsPerSecond <= 0 ) {
391 if (
$job->workItemCount() > 0 ) {
392 $exactSeconds =
$job->workItemCount() / $itemsPerSecond;
394 $seconds = floor( $exactSeconds );
395 $remainder = $exactSeconds - $seconds;
396 $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
399 return (
int)$seconds;
412 if ( is_file(
$file ) ) {
413 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
414 $handle = fopen(
$file,
'rb' );
415 if ( !flock( $handle, LOCK_SH | $noblock ) ) {
419 $content = stream_get_contents( $handle );
420 flock( $handle, LOCK_UN );
422 $ctime = microtime(
true );
423 $cBackoffs = json_decode(
$content,
true ) ?: [];
424 foreach ( $cBackoffs as
$type => $timestamp ) {
425 if ( $timestamp < $ctime ) {
426 unset( $cBackoffs[
$type] );
452 $noblock = ( $mode ===
'nowait' ) ? LOCK_NB : 0;
454 $handle = fopen(
$file,
'wb+' );
455 if ( !flock( $handle, LOCK_EX | $noblock ) ) {
459 $ctime = microtime(
true );
460 $content = stream_get_contents( $handle );
461 $cBackoffs = json_decode(
$content,
true ) ?: [];
462 foreach ( $deltas as
$type => $seconds ) {
463 $cBackoffs[
$type] = isset( $cBackoffs[
$type] ) && $cBackoffs[
$type] >= $ctime
464 ? $cBackoffs[
$type] + $seconds
467 foreach ( $cBackoffs as
$type => $timestamp ) {
468 if ( $timestamp < $ctime ) {
469 unset( $cBackoffs[
$type] );
472 ftruncate( $handle, 0 );
473 fwrite( $handle, json_encode( $cBackoffs ) );
474 flock( $handle, LOCK_UN );
488 static $maxBytes =
null;
489 if ( $maxBytes ===
null ) {
491 if ( preg_match(
'!^(\d+)(k|m|g|)$!i', ini_get(
'memory_limit' ), $m ) ) {
492 list( , $num, $unit ) = $m;
493 $conv = [
'g' => 1073741824,
'm' => 1048576,
'k' => 1024,
'' => 1 ];
494 $maxBytes = $num * $conv[strtolower( $unit )];
499 $usedBytes = memory_get_usage();
500 if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
501 $msg =
"Detected excessive memory usage ({used_bytes}/{max_bytes}).";
502 $this->logger->error( $msg, [
503 'used_bytes' => $usedBytes,
504 'max_bytes' => $maxBytes,
507 $msg =
"Detected excessive memory usage ($usedBytes/$maxBytes).";
521 if ( $this->debug ) {
522 call_user_func_array( $this->debug, [
wfTimestamp( TS_DB ) .
" $msg\n" ] );
538 $syncThreshold = $this->config->get(
'JobSerialCommitThreshold' );
542 if ( $syncThreshold !==
false && $lb->hasStreamingReplicaServers() ) {
544 $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
546 if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
547 $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
548 if ( $time < $syncThreshold ) {
563 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
569 $ms = intval( 1000 * $time );
571 $msg =
$job->toString() .
" COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
572 $this->logger->info( $msg, [
573 'job_type' =>
$job->getType(),
574 'job_commit_write_ms' => $ms,
577 $msg =
$job->toString() .
" COMMIT ENQUEUED [{$ms}ms of writes]";
581 if ( !$dbwSerial->lock(
'jobrunner-serial-commit', $fnameTrxOwner, 30 ) ) {
583 throw new DBError( $dbwSerial,
"Timed out waiting on commit queue." );
585 $unlocker =
new ScopedCallback(
function () use ( $dbwSerial, $fnameTrxOwner ) {
586 $dbwSerial->unlock(
'jobrunner-serial-commit', $fnameTrxOwner );
590 $pos = $lb->getMasterPos();
592 $lb->waitForAll( $pos );
599 [
'maxWriteDuration' => $this->config->get(
'MaxJobDBWriteDuration' ) ]
601 ScopedCallback::consume( $unlocker );