28 use Wikimedia\ScopedCallback;
37 private const CACHE_TTL_SHORT = 30;
38 private const MAX_AGE_PRUNE = 604800;
39 private const MAX_JOB_RANDOM = 2147483647;
40 private const MAX_OFFSET = 255;
60 parent::__construct( $params );
62 if ( isset( $params[
'server'] ) ) {
63 $this->server = $params[
'server'];
64 } elseif ( isset( $params[
'cluster'] ) && is_string( $params[
'cluster'] ) ) {
65 $this->cluster = $params[
'cluster'];
70 return [
'random',
'timestamp',
'fifo' ];
84 $scope = $this->getScopedNoTrxFlag( $dbr );
87 $found = (bool)$dbr->newSelectQueryBuilder()
90 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
91 ->caller( __METHOD__ )->fetchField();
104 $key = $this->getCacheKey(
'size' );
106 $size = $this->wanCache->get( $key );
107 if ( is_int( $size ) ) {
113 $scope = $this->getScopedNoTrxFlag( $dbr );
115 $size = $dbr->newSelectQueryBuilder()
117 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
118 ->caller( __METHOD__ )->fetchRowCount();
122 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
132 if ( $this->claimTTL <= 0 ) {
136 $key = $this->getCacheKey(
'acquiredcount' );
138 $count = $this->wanCache->get( $key );
139 if ( is_int( $count ) ) {
145 $scope = $this->getScopedNoTrxFlag( $dbr );
147 $count = $dbr->newSelectQueryBuilder()
149 ->where( [
'job_cmd' => $this->type,
"job_token != {$dbr->addQuotes( '' )}" ] )
150 ->caller( __METHOD__ )->fetchRowCount();
154 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
165 if ( $this->claimTTL <= 0 ) {
169 $key = $this->getCacheKey(
'abandonedcount' );
171 $count = $this->wanCache->get( $key );
172 if ( is_int( $count ) ) {
178 $scope = $this->getScopedNoTrxFlag( $dbr );
180 $count = $dbr->newSelectQueryBuilder()
184 'job_cmd' => $this->type,
185 "job_token != {$dbr->addQuotes( '' )}",
186 $dbr->buildComparison(
'>=', [
'job_attempts' => $this->maxTries ] ),
189 ->caller( __METHOD__ )->fetchRowCount();
194 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
209 $scope = $this->getScopedNoTrxFlag( $dbw );
220 $dbw->onTransactionPreCommitOrIdle(
221 function (
IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
240 if ( $jobs === [] ) {
246 foreach ( $jobs as
$job ) {
248 if (
$job->ignoreDuplicates() ) {
249 $rowSet[$row[
'job_sha1']] = $row;
255 if ( $flags & self::QOS_ATOMIC ) {
260 if ( count( $rowSet ) ) {
262 ->select(
'job_sha1' )
267 'job_sha1' => array_map(
'strval', array_keys( $rowSet ) ),
271 ->caller( $method )->fetchResultSet();
272 foreach ( $res as $row ) {
273 wfDebug(
"Job with hash '{$row->job_sha1}' is a duplicate." );
274 unset( $rowSet[$row->job_sha1] );
278 $rows = array_merge( $rowList, array_values( $rowSet ) );
280 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
282 ->insertInto(
'job' )
284 ->caller( $method )->execute();
286 $this->
incrStats(
'inserts', $this->type, count( $rows ) );
287 $this->
incrStats(
'dupe_inserts', $this->type,
288 count( $rowSet ) + count( $rowList ) - count( $rows )
293 if ( $flags & self::QOS_ATOMIC ) {
305 $scope = $this->getScopedNoTrxFlag( $dbw );
312 if ( in_array( $this->order, [
'fifo',
'timestamp' ] ) ) {
315 $rand = mt_rand( 0, self::MAX_JOB_RANDOM );
316 $gte = (bool)mt_rand( 0, 1 );
330 if ( !
$job || mt_rand( 0, 9 ) == 0 ) {
353 $scope = $this->getScopedNoTrxFlag( $dbw );
355 $tinyQueue = $this->wanCache->get( $this->getCacheKey(
'small' ) );
357 $invertedDirection =
false;
366 $row = $dbw->newSelectQueryBuilder()
367 ->select( self::selectFields() )
371 'job_cmd' => $this->type,
373 $dbw->buildComparison( $gte ?
'>=' :
'<=', [
'job_random' => $rand ] )
378 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
380 ->caller( __METHOD__ )->fetchRow();
381 if ( !$row && !$invertedDirection ) {
383 $invertedDirection =
true;
390 $row = $dbw->newSelectQueryBuilder()
391 ->select( self::selectFields() )
395 'job_cmd' => $this->type,
399 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
400 ->caller( __METHOD__ )->fetchRow();
403 $this->wanCache->set( $this->getCacheKey(
'small' ), 1, 30 );
412 $dbw->newUpdateQueryBuilder()
415 'job_token' => $uuid,
416 'job_token_timestamp' => $dbw->timestamp(),
417 'job_attempts = job_attempts+1'
420 'job_cmd' => $this->type,
421 'job_id' => $row->job_id,
424 ->caller( __METHOD__ )->execute();
428 if ( !$dbw->affectedRows() ) {
445 $scope = $this->getScopedNoTrxFlag( $dbw );
449 if ( $dbw->getType() ===
'mysql' ) {
454 $dbw->query(
"UPDATE {$dbw->tableName( 'job' )} " .
456 "job_token = {$dbw->addQuotes( $uuid ) }, " .
457 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
458 "job_attempts = job_attempts+1 " .
460 "job_cmd = {$dbw->addQuotes( $this->type )} " .
461 "AND job_token = {$dbw->addQuotes( '' )} " .
462 ") ORDER BY job_id ASC LIMIT 1",
468 $qb = $dbw->newSelectQueryBuilder()
471 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
472 ->orderBy(
'job_id', SelectQueryBuilder::SORT_ASC )
475 $dbw->newUpdateQueryBuilder()
478 'job_token' => $uuid,
479 'job_token_timestamp' => $dbw->timestamp(),
480 'job_attempts = job_attempts+1'
482 ->where( [
'job_id = (' . $qb->getSQL() .
')' ] )
483 ->caller( __METHOD__ )->execute();
486 if ( !$dbw->affectedRows() ) {
491 $row = $dbw->newSelectQueryBuilder()
492 ->select( self::selectFields() )
494 ->where( [
'job_cmd' => $this->type,
'job_token' => $uuid ] )
495 ->caller( __METHOD__ )->fetchRow();
497 wfDebug(
"Row deleted as duplicate by another process." );
510 $id =
$job->getMetadata(
'id' );
511 if ( $id ===
null ) {
512 throw new MWException(
"Job of type '{$job->getType()}' has no ID." );
517 $scope = $this->getScopedNoTrxFlag( $dbw );
520 $dbw->newDeleteQueryBuilder()
521 ->deleteFrom(
'job' )
522 ->where( [
'job_cmd' => $this->type,
'job_id' => $id ] )
523 ->caller( __METHOD__ )->execute();
545 $scope = $this->getScopedNoTrxFlag( $dbw );
546 $dbw->onTransactionCommitOrIdle(
547 function () use (
$job ) {
548 parent::doDeduplicateRootJob(
$job );
563 $scope = $this->getScopedNoTrxFlag( $dbw );
565 $dbw->newDeleteQueryBuilder()
566 ->deleteFrom(
'job' )
567 ->where( [
'job_cmd' => $this->type ] )
568 ->caller( __METHOD__ )->execute();
581 if ( $this->server ) {
585 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
586 $lbFactory->waitForReplication();
593 foreach ( [
'size',
'acquiredcount' ] as
$type ) {
594 $this->wanCache->delete( $this->getCacheKey(
$type ) );
622 "job_attempts >= " . intval( $this->maxTries )
633 $scope = $this->getScopedNoTrxFlag( $dbr );
634 $qb = $dbr->newSelectQueryBuilder()
635 ->select( self::selectFields() )
640 $qb->caller( __METHOD__ )->fetchResultSet(),
651 if ( $this->server ) {
655 return is_string( $this->cluster )
656 ?
"DBCluster:{$this->cluster}:{$this->domain}"
657 :
"LBFactory:{$this->domain}";
663 $scope = $this->getScopedNoTrxFlag( $dbr );
668 $res = $dbr->newSelectQueryBuilder()
669 ->select(
'job_cmd' )
672 ->where( [
'job_cmd' => $types ] )
673 ->caller( __METHOD__ )->fetchResultSet();
676 foreach ( $res as $row ) {
677 $types[] = $row->job_cmd;
686 $scope = $this->getScopedNoTrxFlag( $dbr );
688 $res = $dbr->newSelectQueryBuilder()
689 ->select( [
'job_cmd',
'count' =>
'COUNT(*)' ] )
691 ->where( [
'job_cmd' => $types ] )
692 ->groupBy(
'job_cmd' )
693 ->caller( __METHOD__ )->fetchResultSet();
696 foreach ( $res as $row ) {
697 $sizes[$row->job_cmd] = (int)$row->count;
713 $scope = $this->getScopedNoTrxFlag( $dbw );
716 if ( !$dbw->lock(
"jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
721 if ( $this->claimTTL > 0 ) {
722 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
726 $res = $dbw->newSelectQueryBuilder()
731 'job_cmd' => $this->type,
732 "job_token != {$dbw->addQuotes( '' )}",
733 $dbw->buildComparison(
'<', [
'job_token_timestamp' => $claimCutoff ] ),
734 $dbw->buildComparison(
'<', [
'job_attempts' => $this->maxTries ] ),
737 ->caller( __METHOD__ )->fetchResultSet();
739 static function ( $o ) {
741 }, iterator_to_array( $res )
743 if ( count( $ids ) ) {
747 $dbw->newUpdateQueryBuilder()
751 'job_token_timestamp' => $dbw->timestamp( $now )
755 "job_token != {$dbw->addQuotes( '' )}"
757 ->caller( __METHOD__ )->execute();
759 $affected = $dbw->affectedRows();
761 $this->
incrStats(
'recycles', $this->type, $affected );
766 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
767 $qb = $dbw->newSelectQueryBuilder()
772 'job_cmd' => $this->type,
773 "job_token != {$dbw->addQuotes( '' )}",
774 $dbw->buildComparison(
'<', [
'job_token_timestamp' => $pruneCutoff ] )
777 if ( $this->claimTTL > 0 ) {
778 $qb->andWhere(
"job_attempts >= {$dbw->addQuotes( $this->maxTries )}" );
782 $res = $qb->caller( __METHOD__ )->fetchResultSet();
784 static function ( $o ) {
786 }, iterator_to_array( $res )
788 if ( count( $ids ) ) {
789 $dbw->newDeleteQueryBuilder()
790 ->deleteFrom(
'job' )
791 ->where( [
'job_id' => $ids ] )
792 ->caller( __METHOD__ )->execute();
793 $affected = $dbw->affectedRows();
795 $this->
incrStats(
'abandons', $this->type, $affected );
798 $dbw->unlock(
"jobqueue-recycle-{$this->type}", __METHOD__ );
814 'job_cmd' =>
$job->getType(),
816 'job_title' =>
$job->getParams()[
'title'] ??
'',
820 'job_sha1' => Wikimedia\base_convert(
821 sha1( serialize(
$job->getDeduplicationInfo() ) ),
824 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
857 protected function getDB( $index ) {
858 if ( $this->server ) {
859 if ( $this->conn instanceof
IDatabase ) {
861 } elseif ( $this->conn instanceof
DBError ) {
866 $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
867 $this->server[
'type'],
877 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
878 $lb = is_string( $this->cluster )
879 ? $lbFactory->getExternalLB( $this->cluster )
880 : $lbFactory->getMainLB( $this->domain );
882 if ( $lb->getServerType( $lb->getWriterIndex() ) !==
'sqlite' ) {
885 $flags = $lb::CONN_TRX_AUTOCOMMIT;
891 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
899 private function getScopedNoTrxFlag(
IDatabase $db ) {
903 return new ScopedCallback(
static function () use ( $db, $autoTrx ) {
914 private function getCacheKey( $property ) {
915 $cluster = is_string( $this->cluster ) ? $this->cluster :
'main';
917 return $this->wanCache->makeGlobalKey(
931 if ( $params !==
false ) {
932 return serialize( $params );
943 $params = ( (string)$row->job_params !==
'' ) ? unserialize( $row->job_params ) : [];
944 if ( !is_array( $params ) ) {
945 throw new UnexpectedValueException(
946 "Could not unserialize job with ID '{$row->job_id}'." );
949 $params += [
'namespace' => $row->job_namespace,
'title' => $row->job_title ];
951 $job->setMetadata(
'id', $row->job_id );
952 $job->setMetadata(
'timestamp', $row->job_timestamp );
962 return new JobQueueError( get_class( $e ) .
": " . $e->getMessage() );
981 'job_token_timestamp',
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Class to handle job queues stored in the DB.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
supportedOrders()
Get the allowed queue orders for configuration validation.
doGetSiblingQueueSizes(array $types)
insertFields(IJobSpecification $job, IDatabase $db)
__construct(array $params)
Additional parameters include:
getDBException(DBError $e)
doBatchPush(array $jobs, $flags)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
IMaintainableDatabase DBError null $conn
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
getJobIterator(array $conds)
array null $server
Server configuration array.
Class to handle enqueueing and running of background jobs.
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
factoryJob( $command, $params)
Convenience class for generating iterators from iterators.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Advanced database interface for IDatabase handles that include maintenance methods.
if(count( $args)< 1) $job