29use Wikimedia\ScopedCallback;
39 private const CACHE_TTL_SHORT = 30;
41 private const MAX_AGE_PRUNE = 7 * 24 * 3600;
46 private const MAX_JOB_RANDOM = 2_147_483_647;
48 private const MAX_OFFSET = 255;
70 if ( isset(
$params[
'server'] ) ) {
71 $this->server =
$params[
'server'];
72 } elseif ( isset(
$params[
'cluster'] ) && is_string(
$params[
'cluster'] ) ) {
73 $this->cluster =
$params[
'cluster'];
78 return [
'random',
'timestamp',
'fifo' ];
92 $scope = $this->getScopedNoTrxFlag( $dbr );
95 $found = (bool)$dbr->newSelectQueryBuilder()
98 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
99 ->caller( __METHOD__ )->fetchField();
114 $size = $this->wanCache->get( $key );
115 if ( is_int( $size ) ) {
121 $scope = $this->getScopedNoTrxFlag( $dbr );
123 $size = $dbr->newSelectQueryBuilder()
125 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
126 ->caller( __METHOD__ )->fetchRowCount();
130 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
140 if ( $this->claimTTL <= 0 ) {
146 $count = $this->wanCache->get( $key );
147 if ( is_int( $count ) ) {
153 $scope = $this->getScopedNoTrxFlag( $dbr );
155 $count = $dbr->newSelectQueryBuilder()
158 'job_cmd' => $this->type,
159 $dbr->expr(
'job_token',
'!=',
'' ),
161 ->caller( __METHOD__ )->fetchRowCount();
165 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
177 if ( $this->claimTTL <= 0 ) {
183 $count = $this->wanCache->get( $key );
184 if ( is_int( $count ) ) {
190 $scope = $this->getScopedNoTrxFlag( $dbr );
192 $count = $dbr->newSelectQueryBuilder()
196 'job_cmd' => $this->type,
197 $dbr->expr(
'job_token',
'!=',
'' ),
198 $dbr->expr(
'job_attempts',
'>=', $this->maxTries ),
201 ->caller( __METHOD__ )->fetchRowCount();
206 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
221 $scope = $this->getScopedNoTrxFlag( $dbw );
232 $dbw->onTransactionPreCommitOrIdle(
233 function (
IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
252 if ( $jobs === [] ) {
258 foreach ( $jobs as
$job ) {
260 if (
$job->ignoreDuplicates() ) {
261 $rowSet[$row[
'job_sha1']] = $row;
267 if ( $flags & self::QOS_ATOMIC ) {
272 if ( count( $rowSet ) ) {
274 ->select(
'job_sha1' )
279 'job_sha1' => array_map(
'strval', array_keys( $rowSet ) ),
283 ->caller( $method )->fetchResultSet();
284 foreach ( $res as $row ) {
285 wfDebug(
"Job with hash '{$row->job_sha1}' is a duplicate." );
286 unset( $rowSet[$row->job_sha1] );
290 $rows = array_merge( $rowList, array_values( $rowSet ) );
292 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
294 ->insertInto(
'job' )
296 ->caller( $method )->execute();
298 $this->
incrStats(
'inserts', $this->type, count( $rows ) );
299 $this->
incrStats(
'dupe_inserts', $this->type,
300 count( $rowSet ) + count( $rowList ) - count( $rows )
305 if ( $flags & self::QOS_ATOMIC ) {
317 $scope = $this->getScopedNoTrxFlag( $dbw );
324 if ( in_array( $this->order, [
'fifo',
'timestamp' ] ) ) {
327 $rand = mt_rand( 0, self::MAX_JOB_RANDOM );
328 $gte = (bool)mt_rand( 0, 1 );
342 if ( !
$job || mt_rand( 0, 9 ) == 0 ) {
365 $scope = $this->getScopedNoTrxFlag( $dbw );
367 $tinyQueue = $this->wanCache->get( $this->
getCacheKey(
'small' ) );
369 $invertedDirection =
false;
378 $row = $dbw->newSelectQueryBuilder()
379 ->select( self::selectFields() )
383 'job_cmd' => $this->type,
385 $dbw->expr(
'job_random', $gte ?
'>=' :
'<=', $rand )
390 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
392 ->caller( __METHOD__ )->fetchRow();
393 if ( !$row && !$invertedDirection ) {
395 $invertedDirection =
true;
402 $row = $dbw->newSelectQueryBuilder()
403 ->select( self::selectFields() )
407 'job_cmd' => $this->type,
411 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
412 ->caller( __METHOD__ )->fetchRow();
415 $this->wanCache->set( $this->
getCacheKey(
'small' ), 1, 30 );
424 $dbw->newUpdateQueryBuilder()
427 'job_token' => $uuid,
428 'job_token_timestamp' => $dbw->timestamp(),
429 'job_attempts' =>
new RawSQLValue(
'job_attempts+1' ),
432 'job_cmd' => $this->type,
433 'job_id' => $row->job_id,
436 ->caller( __METHOD__ )->execute();
440 if ( !$dbw->affectedRows() ) {
457 $scope = $this->getScopedNoTrxFlag( $dbw );
461 if ( $dbw->getType() ===
'mysql' ) {
466 $dbw->query(
"UPDATE {$dbw->tableName( 'job' )} " .
468 "job_token = {$dbw->addQuotes( $uuid ) }, " .
469 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
470 "job_attempts = job_attempts+1 " .
472 "job_cmd = {$dbw->addQuotes( $this->type )} " .
473 "AND job_token = {$dbw->addQuotes( '' )} " .
474 ") ORDER BY job_id ASC LIMIT 1",
480 $qb = $dbw->newSelectQueryBuilder()
483 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
484 ->orderBy(
'job_id', SelectQueryBuilder::SORT_ASC )
487 $dbw->newUpdateQueryBuilder()
490 'job_token' => $uuid,
491 'job_token_timestamp' => $dbw->timestamp(),
492 'job_attempts' =>
new RawSQLValue(
'job_attempts+1' ),
494 ->where( [
'job_id' =>
new RawSQLValue(
'(' . $qb->getSQL() .
')' ) ] )
495 ->caller( __METHOD__ )->execute();
498 if ( !$dbw->affectedRows() ) {
503 $row = $dbw->newSelectQueryBuilder()
504 ->select( self::selectFields() )
506 ->where( [
'job_cmd' => $this->type,
'job_token' => $uuid ] )
507 ->caller( __METHOD__ )->fetchRow();
509 wfDebug(
"Row deleted as duplicate by another process." );
523 $id =
$job->getMetadata(
'id' );
524 if ( $id ===
null ) {
525 throw new UnexpectedValueException(
"Job of type '{$job->getType()}' has no ID." );
530 $scope = $this->getScopedNoTrxFlag( $dbw );
533 $dbw->newDeleteQueryBuilder()
534 ->deleteFrom(
'job' )
535 ->where( [
'job_cmd' => $this->type,
'job_id' => $id ] )
536 ->caller( __METHOD__ )->execute();
558 $scope = $this->getScopedNoTrxFlag( $dbw );
559 $dbw->onTransactionCommitOrIdle(
560 function () use (
$job ) {
561 parent::doDeduplicateRootJob(
$job );
576 $scope = $this->getScopedNoTrxFlag( $dbw );
578 $dbw->newDeleteQueryBuilder()
579 ->deleteFrom(
'job' )
580 ->where( [
'job_cmd' => $this->type ] )
581 ->caller( __METHOD__ )->execute();
594 if ( $this->server ) {
598 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
599 $lbFactory->waitForReplication();
606 foreach ( [
'size',
'acquiredcount' ] as
$type ) {
607 $this->wanCache->delete( $this->
getCacheKey( $type ) );
625 return $this->
getJobIterator( [
'job_cmd' => $this->
getType(), $dbr->expr(
'job_token',
'>',
'' ) ] );
636 $dbr->expr(
'job_token',
'>',
'' ),
637 $dbr->expr(
'job_attempts',
'>=', intval( $this->maxTries ) ),
648 $scope = $this->getScopedNoTrxFlag( $dbr );
649 $qb = $dbr->newSelectQueryBuilder()
650 ->select( self::selectFields() )
655 $qb->caller( __METHOD__ )->fetchResultSet(),
666 if ( $this->server ) {
670 return is_string( $this->cluster )
671 ?
"DBCluster:{$this->cluster}:{$this->domain}"
672 :
"LBFactory:{$this->domain}";
678 $scope = $this->getScopedNoTrxFlag( $dbr );
683 $res = $dbr->newSelectQueryBuilder()
684 ->select(
'job_cmd' )
687 ->where( [
'job_cmd' => $types ] )
688 ->caller( __METHOD__ )->fetchResultSet();
691 foreach ( $res as $row ) {
692 $types[] = $row->job_cmd;
701 $scope = $this->getScopedNoTrxFlag( $dbr );
703 $res = $dbr->newSelectQueryBuilder()
704 ->select( [
'job_cmd',
'count' =>
'COUNT(*)' ] )
706 ->where( [
'job_cmd' => $types ] )
707 ->groupBy(
'job_cmd' )
708 ->caller( __METHOD__ )->fetchResultSet();
711 foreach ( $res as $row ) {
712 $sizes[$row->job_cmd] = (int)$row->count;
728 $scope = $this->getScopedNoTrxFlag( $dbw );
731 if ( !$dbw->lock(
"jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
736 if ( $this->claimTTL > 0 ) {
737 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
741 $res = $dbw->newSelectQueryBuilder()
746 'job_cmd' => $this->type,
747 $dbw->expr(
'job_token',
'!=',
'' ),
748 $dbw->expr(
'job_token_timestamp',
'<', $claimCutoff ),
749 $dbw->expr(
'job_attempts',
'<', $this->maxTries ),
752 ->caller( __METHOD__ )->fetchResultSet();
754 static function ( $o ) {
756 }, iterator_to_array( $res )
758 if ( count( $ids ) ) {
762 $dbw->newUpdateQueryBuilder()
766 'job_token_timestamp' => $dbw->timestamp( $now )
770 $dbw->expr(
'job_token',
'!=',
'' ),
772 ->caller( __METHOD__ )->execute();
774 $affected = $dbw->affectedRows();
776 $this->
incrStats(
'recycles', $this->type, $affected );
781 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
782 $qb = $dbw->newSelectQueryBuilder()
787 'job_cmd' => $this->type,
788 $dbw->expr(
'job_token',
'!=',
'' ),
789 $dbw->expr(
'job_token_timestamp',
'<', $pruneCutoff )
792 if ( $this->claimTTL > 0 ) {
793 $qb->andWhere( $dbw->expr(
'job_attempts',
'>=', $this->maxTries ) );
797 $res = $qb->caller( __METHOD__ )->fetchResultSet();
799 static function ( $o ) {
801 }, iterator_to_array( $res )
803 if ( count( $ids ) ) {
804 $dbw->newDeleteQueryBuilder()
805 ->deleteFrom(
'job' )
806 ->where( [
'job_id' => $ids ] )
807 ->caller( __METHOD__ )->execute();
808 $affected = $dbw->affectedRows();
810 $this->
incrStats(
'abandons', $this->type, $affected );
813 $dbw->unlock(
"jobqueue-recycle-{$this->type}", __METHOD__ );
829 'job_cmd' =>
$job->getType(),
831 'job_title' =>
$job->getParams()[
'title'] ??
'',
832 'job_params' => self::makeBlob(
$job->getParams() ),
835 'job_sha1' => Wikimedia\base_convert(
836 sha1( serialize(
$job->getDeduplicationInfo() ) ),
839 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
872 protected function getDB( $index ) {
873 if ( $this->server ) {
874 if ( $this->conn instanceof
IDatabase ) {
876 } elseif ( $this->conn instanceof
DBError ) {
881 $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
882 $this->server[
'type'],
892 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
893 $lb = is_string( $this->cluster )
894 ? $lbFactory->getExternalLB( $this->cluster )
895 : $lbFactory->getMainLB( $this->domain );
897 if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !==
'sqlite' ) {
900 $flags = $lb::CONN_TRX_AUTOCOMMIT;
906 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
918 return new ScopedCallback(
static function () use ( $db, $autoTrx ) {
930 $cluster = is_string( $this->cluster ) ? $this->cluster :
'main';
932 return $this->wanCache->makeGlobalKey(
958 $params = ( (string)$row->job_params !==
'' ) ? unserialize( $row->job_params ) : [];
960 throw new UnexpectedValueException(
961 "Could not unserialize job with ID '{$row->job_id}'." );
964 $params += [
'namespace' => $row->job_namespace,
'title' => $row->job_title ];
966 $job->setMetadata(
'id', $row->job_id );
967 $job->setMetadata(
'timestamp', $row->job_timestamp );
977 return new JobQueueError( get_class( $e ) .
": " . $e->getMessage() );
996 'job_token_timestamp',
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
array $params
The job parameters.
getCacheKey()
Get the cache key used to store status.
Database-backed job queue storage.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
supportedOrders()
Get the allowed queue orders for configuration validation.
doGetSiblingQueueSizes(array $types)
__construct(array $params)
Additional parameters include:
getDBException(DBError $e)
doBatchPush(array $jobs, $flags)
insertFields(IJobSpecification $job, IReadableDatabase $db)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
IMaintainableDatabase DBError null $conn
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
getJobIterator(array $conds)
array null $server
Server configuration array.
Base class for queueing and running background jobs from a storage backend.
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
factoryJob( $command, $params)
Convenience class for generating iterators from iterators.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
Advanced database interface for IDatabase handles that include maintenance methods.
if(count( $args)< 1) $job