29use UnexpectedValueException;
38use Wikimedia\ScopedCallback;
48 private const CACHE_TTL_SHORT = 30;
50 private const MAX_AGE_PRUNE = 7 * 24 * 3600;
55 private const MAX_JOB_RANDOM = 2_147_483_647;
57 private const MAX_OFFSET = 255;
76 parent::__construct( $params );
78 if ( isset( $params[
'server'] ) ) {
79 $this->server = $params[
'server'];
81 $this->server[
'flags'] ??= 0;
82 $this->server[
'flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
83 } elseif ( isset( $params[
'cluster'] ) && is_string( $params[
'cluster'] ) ) {
84 $this->cluster = $params[
'cluster'];
89 return [
'random',
'timestamp',
'fifo' ];
104 $found = (bool)$dbr->newSelectQueryBuilder()
107 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
108 ->caller( __METHOD__ )->fetchField();
121 $key = $this->getCacheKey(
'size' );
123 $size = $this->wanCache->get( $key );
124 if ( is_int( $size ) ) {
130 $size = $dbr->newSelectQueryBuilder()
132 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
133 ->caller( __METHOD__ )->fetchRowCount();
137 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
147 if ( $this->claimTTL <= 0 ) {
151 $key = $this->getCacheKey(
'acquiredcount' );
153 $count = $this->wanCache->get( $key );
154 if ( is_int( $count ) ) {
160 $count = $dbr->newSelectQueryBuilder()
163 'job_cmd' => $this->type,
164 $dbr->expr(
'job_token',
'!=',
'' ),
166 ->caller( __METHOD__ )->fetchRowCount();
170 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
182 if ( $this->claimTTL <= 0 ) {
186 $key = $this->getCacheKey(
'abandonedcount' );
188 $count = $this->wanCache->get( $key );
189 if ( is_int( $count ) ) {
195 $count = $dbr->newSelectQueryBuilder()
199 'job_cmd' => $this->type,
200 $dbr->expr(
'job_token',
'!=',
'' ),
201 $dbr->expr(
'job_attempts',
'>=', $this->maxTries ),
204 ->caller( __METHOD__ )->fetchRowCount();
209 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
224 $scope = $transactionProfiler->silenceForScope();
226 ScopedCallback::consume( $scope );
237 $dbw->onTransactionPreCommitOrIdle(
255 if ( $jobs === [] ) {
261 foreach ( $jobs as
$job ) {
263 if (
$job->ignoreDuplicates() ) {
264 $rowSet[$row[
'job_sha1']] = $row;
270 if ( $flags & self::QOS_ATOMIC ) {
275 if ( count( $rowSet ) ) {
277 ->select(
'job_sha1' )
282 'job_sha1' => array_map(
'strval', array_keys( $rowSet ) ),
286 ->caller( $method )->fetchResultSet();
287 foreach ( $res as $row ) {
288 wfDebug(
"Job with hash '{$row->job_sha1}' is a duplicate." );
289 unset( $rowSet[$row->job_sha1] );
293 $rows = array_merge( $rowList, array_values( $rowSet ) );
297 $scope = $transactionProfiler->silenceForScope();
299 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
301 ->insertInto(
'job' )
303 ->caller( $method )->execute();
305 ScopedCallback::consume( $scope );
306 $this->
incrStats(
'inserts', $this->type, count( $rows ) );
307 $this->
incrStats(
'dupe_inserts', $this->type,
308 count( $rowSet ) + count( $rowList ) - count( $rows )
313 if ( $flags & self::QOS_ATOMIC ) {
328 if ( in_array( $this->order, [
'fifo',
'timestamp' ] ) ) {
331 $rand = mt_rand( 0, self::MAX_JOB_RANDOM );
332 $gte = (bool)mt_rand( 0, 1 );
346 if ( !
$job || mt_rand( 0, 9 ) == 0 ) {
369 $tinyQueue = $this->wanCache->get( $this->getCacheKey(
'small' ) );
371 $invertedDirection =
false;
380 $row = $dbw->newSelectQueryBuilder()
381 ->select( self::selectFields() )
385 'job_cmd' => $this->type,
387 $dbw->expr(
'job_random', $gte ?
'>=' :
'<=', $rand )
392 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
394 ->caller( __METHOD__ )->fetchRow();
395 if ( !$row && !$invertedDirection ) {
397 $invertedDirection =
true;
404 $row = $dbw->newSelectQueryBuilder()
405 ->select( self::selectFields() )
409 'job_cmd' => $this->type,
413 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
414 ->caller( __METHOD__ )->fetchRow();
417 $this->wanCache->set( $this->getCacheKey(
'small' ), 1, 30 );
426 $dbw->newUpdateQueryBuilder()
429 'job_token' => $uuid,
430 'job_token_timestamp' => $dbw->timestamp(),
431 'job_attempts' =>
new RawSQLValue(
'job_attempts+1' ),
434 'job_cmd' => $this->type,
435 'job_id' => $row->job_id,
438 ->caller( __METHOD__ )->execute();
442 if ( !$dbw->affectedRows() ) {
461 if ( $dbw->getType() ===
'mysql' ) {
466 $dbw->query(
"UPDATE {$dbw->tableName( 'job' )} " .
468 "job_token = {$dbw->addQuotes( $uuid ) }, " .
469 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
470 "job_attempts = job_attempts+1 " .
472 "job_cmd = {$dbw->addQuotes( $this->type )} " .
473 "AND job_token = {$dbw->addQuotes( '' )} " .
474 ") ORDER BY job_id ASC LIMIT 1",
480 $qb = $dbw->newSelectQueryBuilder()
483 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
484 ->orderBy(
'job_id', SelectQueryBuilder::SORT_ASC )
487 $dbw->newUpdateQueryBuilder()
490 'job_token' => $uuid,
491 'job_token_timestamp' => $dbw->timestamp(),
492 'job_attempts' =>
new RawSQLValue(
'job_attempts+1' ),
494 ->where( [
'job_id' =>
new RawSQLValue(
'(' . $qb->getSQL() .
')' ) ] )
495 ->caller( __METHOD__ )->execute();
498 if ( !$dbw->affectedRows() ) {
503 $row = $dbw->newSelectQueryBuilder()
504 ->select( self::selectFields() )
506 ->where( [
'job_cmd' => $this->type,
'job_token' => $uuid ] )
507 ->caller( __METHOD__ )->fetchRow();
509 wfDebug(
"Row deleted as duplicate by another process." );
523 $id =
$job->getMetadata(
'id' );
524 if ( $id ===
null ) {
525 throw new UnexpectedValueException(
"Job of type '{$job->getType()}' has no ID." );
531 $dbw->newDeleteQueryBuilder()
532 ->deleteFrom(
'job' )
533 ->where( [
'job_cmd' => $this->type,
'job_id' => $id ] )
534 ->caller( __METHOD__ )->execute();
555 $dbw->onTransactionCommitOrIdle(
556 function () use (
$job ) {
557 parent::doDeduplicateRootJob(
$job );
572 $dbw->newDeleteQueryBuilder()
573 ->deleteFrom(
'job' )
574 ->where( [
'job_cmd' => $this->type ] )
575 ->caller( __METHOD__ )->execute();
588 if ( $this->server ) {
593 $lbFactory->waitForReplication();
600 foreach ( [
'size',
'acquiredcount' ] as
$type ) {
601 $this->wanCache->delete( $this->getCacheKey(
$type ) );
619 return $this->
getJobIterator( [
'job_cmd' => $this->
getType(), $dbr->expr(
'job_token',
'>',
'' ) ] );
630 $dbr->expr(
'job_token',
'>',
'' ),
631 $dbr->expr(
'job_attempts',
'>=', intval( $this->maxTries ) ),
641 $qb = $dbr->newSelectQueryBuilder()
642 ->select( self::selectFields() )
647 $qb->caller( __METHOD__ )->fetchResultSet(),
658 if ( $this->server ) {
662 return is_string( $this->cluster )
663 ?
"DBCluster:{$this->cluster}:{$this->domain}"
664 :
"LBFactory:{$this->domain}";
673 $res = $dbr->newSelectQueryBuilder()
674 ->select(
'job_cmd' )
677 ->where( [
'job_cmd' => $types ] )
678 ->caller( __METHOD__ )->fetchResultSet();
681 foreach ( $res as $row ) {
682 $types[] = $row->job_cmd;
691 $res = $dbr->newSelectQueryBuilder()
692 ->select( [
'job_cmd',
'count' =>
'COUNT(*)' ] )
694 ->where( [
'job_cmd' => $types ] )
695 ->groupBy(
'job_cmd' )
696 ->caller( __METHOD__ )->fetchResultSet();
699 foreach ( $res as $row ) {
700 $sizes[$row->job_cmd] = (int)$row->count;
717 if ( !$dbw->lock(
"jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
722 if ( $this->claimTTL > 0 ) {
723 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
727 $res = $dbw->newSelectQueryBuilder()
732 'job_cmd' => $this->type,
733 $dbw->expr(
'job_token',
'!=',
'' ),
734 $dbw->expr(
'job_token_timestamp',
'<', $claimCutoff ),
735 $dbw->expr(
'job_attempts',
'<', $this->maxTries ),
738 ->caller( __METHOD__ )->fetchResultSet();
740 static function ( $o ) {
742 }, iterator_to_array( $res )
744 if ( count( $ids ) ) {
748 $dbw->newUpdateQueryBuilder()
752 'job_token_timestamp' => $dbw->timestamp( $now )
756 $dbw->expr(
'job_token',
'!=',
'' ),
758 ->caller( __METHOD__ )->execute();
760 $affected = $dbw->affectedRows();
762 $this->
incrStats(
'recycles', $this->type, $affected );
767 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
768 $qb = $dbw->newSelectQueryBuilder()
773 'job_cmd' => $this->type,
774 $dbw->expr(
'job_token',
'!=',
'' ),
775 $dbw->expr(
'job_token_timestamp',
'<', $pruneCutoff )
778 if ( $this->claimTTL > 0 ) {
779 $qb->andWhere( $dbw->expr(
'job_attempts',
'>=', $this->maxTries ) );
783 $res = $qb->caller( __METHOD__ )->fetchResultSet();
785 static function ( $o ) {
787 }, iterator_to_array( $res )
789 if ( count( $ids ) ) {
790 $dbw->newDeleteQueryBuilder()
791 ->deleteFrom(
'job' )
792 ->where( [
'job_id' => $ids ] )
793 ->caller( __METHOD__ )->execute();
794 $affected = $dbw->affectedRows();
796 $this->
incrStats(
'abandons', $this->type, $affected );
799 $dbw->unlock(
"jobqueue-recycle-{$this->type}", __METHOD__ );
815 'job_cmd' =>
$job->getType(),
817 'job_title' =>
$job->getParams()[
'title'] ??
'',
821 'job_sha1' => \Wikimedia\base_convert(
822 sha1( serialize(
$job->getDeduplicationInfo() ) ),
825 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
858 protected function getDB( $index ) {
859 if ( $this->server ) {
860 if ( $this->conn instanceof
IDatabase ) {
862 } elseif ( $this->conn instanceof
DBError ) {
868 $this->server[
'type'],
879 $lb = is_string( $this->cluster )
880 ? $lbFactory->getExternalLB( $this->cluster )
881 : $lbFactory->getMainLB( $this->domain );
883 if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !==
'sqlite' ) {
886 $flags = $lb::CONN_TRX_AUTOCOMMIT;
892 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
900 private function getCacheKey( $property ) {
901 $cluster = is_string( $this->cluster ) ? $this->cluster :
'main';
903 return $this->wanCache->makeGlobalKey(
917 if ( $params !==
false ) {
918 return serialize( $params );
929 $params = ( (string)$row->job_params !==
'' ) ? unserialize( $row->job_params ) : [];
930 if ( !is_array( $params ) ) {
931 throw new UnexpectedValueException(
932 "Could not unserialize job with ID '{$row->job_id}'." );
935 $params += [
'namespace' => $row->job_namespace,
'title' => $row->job_title ];
937 $job->setMetadata(
'id', $row->job_id );
938 $job->setMetadata(
'timestamp', $row->job_timestamp );
948 return new JobQueueError( get_class( $e ) .
": " . $e->getMessage() );
967 'job_token_timestamp',
974class_alias( JobQueueDB::class,
'JobQueueDB' );
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Convenience class for generating iterators from iterators.
Profiler base class that defines the interface and some shared functionality.
Advanced database interface for IDatabase handles that include maintenance methods.
if(count( $args)< 1) $job