28use Wikimedia\ScopedCallback;
60 parent::__construct( $params );
62 if ( isset( $params[
'server'] ) ) {
63 $this->server = $params[
'server'];
64 } elseif ( isset( $params[
'cluster'] ) && is_string( $params[
'cluster'] ) ) {
65 $this->cluster = $params[
'cluster'];
70 return [
'random',
'timestamp',
'fifo' ];
86 $found =
$dbr->selectField(
87 'job',
'1', [
'job_cmd' => $this->type,
'job_token' =>
'' ], __METHOD__
103 $size = $this->wanCache->get( $key );
104 if ( is_int( $size ) ) {
112 $size = (int)
$dbr->selectField(
'job',
'COUNT(*)',
113 [
'job_cmd' => $this->type,
'job_token' =>
'' ],
119 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
129 if ( $this->claimTTL <= 0 ) {
135 $count = $this->wanCache->get( $key );
136 if ( is_int( $count ) ) {
144 $count = (int)
$dbr->selectField(
'job',
'COUNT(*)',
145 [
'job_cmd' => $this->type,
"job_token != {$dbr->addQuotes( '' )}" ],
151 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
162 if ( $this->claimTTL <= 0 ) {
168 $count = $this->wanCache->get( $key );
169 if ( is_int( $count ) ) {
177 $count = (int)
$dbr->selectField(
'job',
'COUNT(*)',
179 'job_cmd' => $this->type,
180 "job_token != {$dbr->addQuotes( '' )}",
181 "job_attempts >= " .
$dbr->addQuotes( $this->maxTries )
189 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
215 $dbw->onTransactionPreCommitOrIdle(
216 function (
IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
235 if ( $jobs === [] ) {
241 foreach ( $jobs as
$job ) {
243 if (
$job->ignoreDuplicates() ) {
244 $rowSet[$row[
'job_sha1']] = $row;
250 if ( $flags & self::QOS_ATOMIC ) {
255 if ( count( $rowSet ) ) {
259 'job_sha1' => array_map(
'strval', array_keys( $rowSet ) ),
264 foreach (
$res as $row ) {
265 wfDebug(
"Job with hash '{$row->job_sha1}' is a duplicate." );
266 unset( $rowSet[$row->job_sha1] );
270 $rows = array_merge( $rowList, array_values( $rowSet ) );
272 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
273 $dbw->
insert(
'job', $rowBatch, $method );
275 $this->
incrStats(
'inserts', $this->type, count( $rows ) );
276 $this->
incrStats(
'dupe_inserts', $this->type,
277 count( $rowSet ) + count( $rowList ) - count( $rows )
282 if ( $flags & self::QOS_ATOMIC ) {
301 if ( in_array( $this->order, [
'fifo',
'timestamp' ] ) ) {
304 $rand = mt_rand( 0, self::MAX_JOB_RANDOM );
305 $gte = (bool)mt_rand( 0, 1 );
319 if ( !
$job || mt_rand( 0, 9 ) == 0 ) {
344 $tinyQueue = $this->wanCache->get( $this->
getCacheKey(
'small' ) );
346 $invertedDirection =
false;
355 $ineq = $gte ?
'>=' :
'<=';
356 $dir = $gte ?
'ASC' :
'DESC';
357 $row = $dbw->selectRow(
'job', self::selectFields(),
359 'job_cmd' => $this->type,
361 "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
363 [
'ORDER BY' =>
"job_random {$dir}" ]
365 if ( !$row && !$invertedDirection ) {
367 $invertedDirection =
true;
374 $row = $dbw->selectRow(
'job', self::selectFields(),
376 'job_cmd' => $this->type,
380 [
'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
384 $this->wanCache->set( $this->
getCacheKey(
'small' ), 1, 30 );
392 'job_token' => $uuid,
393 'job_token_timestamp' => $dbw->timestamp(),
394 'job_attempts = job_attempts+1' ],
395 [
'job_cmd' => $this->type,
'job_id' => $row->job_id,
'job_token' =>
'' ],
400 if ( !$dbw->affectedRows() ) {
424 if ( $dbw->getType() ===
'mysql' ) {
429 $dbw->query(
"UPDATE {$dbw->tableName( 'job' )} " .
431 "job_token = {$dbw->addQuotes( $uuid ) }, " .
432 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
433 "job_attempts = job_attempts+1 " .
435 "job_cmd = {$dbw->addQuotes( $this->type )} " .
436 "AND job_token = {$dbw->addQuotes( '' )} " .
437 ") ORDER BY job_id ASC LIMIT 1",
445 'job_token' => $uuid,
446 'job_token_timestamp' => $dbw->timestamp(),
447 'job_attempts = job_attempts+1' ],
449 $dbw->selectSQLText(
'job',
'job_id',
450 [
'job_cmd' => $this->type,
'job_token' =>
'' ],
452 [
'ORDER BY' =>
'job_id ASC',
'LIMIT' => 1 ] ) .
459 if ( $dbw->affectedRows() ) {
460 $row = $dbw->selectRow(
'job', self::selectFields(),
461 [
'job_cmd' => $this->type,
'job_token' => $uuid ], __METHOD__
464 wfDebug(
"Row deleted as duplicate by another process." );
480 $id =
$job->getMetadata(
'id' );
481 if ( $id ===
null ) {
482 throw new MWException(
"Job of type '{$job->getType()}' has no ID." );
492 [
'job_cmd' => $this->type,
'job_id' => $id ],
517 $dbw->onTransactionCommitOrIdle(
518 function () use (
$job ) {
519 parent::doDeduplicateRootJob(
$job );
536 $dbw->delete(
'job', [
'job_cmd' => $this->type ], __METHOD__ );
549 if ( $this->server ) {
553 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
554 $lbFactory->waitForReplication( [
555 'domain' => $this->domain,
556 'cluster' => is_string( $this->cluster ) ? $this->cluster : false
564 foreach ( [
'size',
'acquiredcount' ] as
$type ) {
565 $this->wanCache->delete( $this->
getCacheKey( $type ) );
593 "job_attempts >= " . intval( $this->maxTries )
607 $dbr->select(
'job', self::selectFields(), $conds, __METHOD__ ),
618 if ( $this->server ) {
622 return is_string( $this->cluster )
623 ?
"DBCluster:{$this->cluster}:{$this->domain}"
624 :
"LBFactory:{$this->domain}";
635 $res =
$dbr->select(
'job',
'DISTINCT job_cmd',
636 [
'job_cmd' => $types ], __METHOD__ );
639 foreach (
$res as $row ) {
640 $types[] = $row->job_cmd;
651 $res =
$dbr->select(
'job', [
'job_cmd',
'COUNT(*) AS count' ],
652 [
'job_cmd' => $types ], __METHOD__, [
'GROUP BY' =>
'job_cmd' ] );
655 foreach (
$res as $row ) {
656 $sizes[$row->job_cmd] = (int)$row->count;
675 if ( !$dbw->lock(
"jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
680 if ( $this->claimTTL > 0 ) {
681 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
685 $res = $dbw->select(
'job',
'job_id',
687 'job_cmd' => $this->type,
688 "job_token != {$dbw->addQuotes( '' )}",
689 "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}",
690 "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ],
696 }, iterator_to_array(
$res )
698 if ( count( $ids ) ) {
705 'job_token_timestamp' => $dbw->timestamp( $now )
707 [
'job_id' => $ids,
"job_token != ''" ],
710 $affected = $dbw->affectedRows();
712 $this->
incrStats(
'recycles', $this->type, $affected );
717 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
720 "job_token != {$dbw->addQuotes( '' )}",
721 "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}"
723 if ( $this->claimTTL > 0 ) {
724 $conds[] =
"job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
728 $res = $dbw->select(
'job',
'job_id', $conds, __METHOD__ );
732 }, iterator_to_array(
$res )
734 if ( count( $ids ) ) {
735 $dbw->delete(
'job', [
'job_id' => $ids ], __METHOD__ );
736 $affected = $dbw->affectedRows();
738 $this->
incrStats(
'abandons', $this->type, $affected );
741 $dbw->unlock(
"jobqueue-recycle-{$this->type}", __METHOD__ );
757 'job_cmd' =>
$job->getType(),
759 'job_title' =>
$job->getParams()[
'title'] ??
'',
763 'job_sha1' => Wikimedia\base_convert(
767 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
799 protected function getDB( $index ) {
800 if ( $this->server ) {
801 if ( $this->conn instanceof
IDatabase ) {
803 } elseif ( $this->conn instanceof
DBError ) {
808 $this->conn = Database::factory( $this->server[
'type'], $this->server );
816 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
817 $lb = is_string( $this->cluster )
818 ? $lbFactory->getExternalLB( $this->cluster )
819 : $lbFactory->getMainLB( $this->domain );
821 if ( $lb->getServerType( $lb->getWriterIndex() ) !==
'sqlite' ) {
824 $flags = $lb::CONN_TRX_AUTOCOMMIT;
830 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
842 return new ScopedCallback(
function () use ( $db, $autoTrx ) {
854 $cluster = is_string( $this->cluster ) ? $this->cluster :
'main';
856 return $this->wanCache->makeGlobalKey(
870 if ( $params !==
false ) {
882 $params = ( (string)$row->job_params !==
'' ) ?
unserialize( $row->job_params ) : [];
883 if ( !is_array( $params ) ) {
884 throw new UnexpectedValueException(
885 "Could not unserialize job with ID '{$row->job_id}'." );
888 $params += [
'namespace' => $row->job_namespace,
'title' => $row->job_title ];
890 $job->setMetadata(
'id', $row->job_id );
891 $job->setMetadata(
'timestamp', $row->job_timestamp );
901 return new JobQueueError( get_class( $e ) .
": " . $e->getMessage() );
920 'job_token_timestamp',
unserialize( $serialized)
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Class to handle job queues stored in the DB.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
supportedOrders()
Get the allowed queue orders for configuration validation.
doGetSiblingQueueSizes(array $types)
Stable to override.
insertFields(IJobSpecification $job, IDatabase $db)
__construct(array $params)
Additional parameters include:
getDBException(DBError $e)
doBatchPush(array $jobs, $flags)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
Stable to override.
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
IMaintainableDatabase DBError null $conn
getScopedNoTrxFlag(IDatabase $db)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
getJobIterator(array $conds)
array null $server
Server configuration array.
Class to handle enqueueing and running of background jobs.
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
factoryJob( $command, $params)
Convenience class for generating iterators from iterators.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Advanced database interface for IDatabase handles that include maintenance methods.
if(count( $args)< 1) $job