15use UnexpectedValueException;
24use Wikimedia\ScopedCallback;
34 private const CACHE_TTL_SHORT = 30;
36 private const MAX_AGE_PRUNE = 7 * 24 * 3600;
41 private const MAX_JOB_RANDOM = 2_147_483_647;
43 private const MAX_OFFSET = 255;
62 parent::__construct( $params );
64 if ( isset( $params[
'server'] ) ) {
65 $this->server = $params[
'server'];
67 $this->server[
'flags'] ??= 0;
68 $this->server[
'flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
69 } elseif ( isset( $params[
'cluster'] ) && is_string( $params[
'cluster'] ) ) {
70 $this->cluster = $params[
'cluster'];
76 return [
'random',
'timestamp',
'fifo' ];
92 $found = (bool)$dbr->newSelectQueryBuilder()
95 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
96 ->caller( __METHOD__ )->fetchField();
109 $key = $this->getCacheKey(
'size' );
111 $size = $this->wanCache->get( $key );
112 if ( is_int( $size ) ) {
118 $size = $dbr->newSelectQueryBuilder()
120 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
121 ->caller( __METHOD__ )->fetchRowCount();
125 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
135 if ( $this->claimTTL <= 0 ) {
139 $key = $this->getCacheKey(
'acquiredcount' );
141 $count = $this->wanCache->get( $key );
142 if ( is_int( $count ) ) {
148 $count = $dbr->newSelectQueryBuilder()
151 'job_cmd' => $this->type,
152 $dbr->expr(
'job_token',
'!=',
'' ),
154 ->caller( __METHOD__ )->fetchRowCount();
158 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
170 if ( $this->claimTTL <= 0 ) {
174 $key = $this->getCacheKey(
'abandonedcount' );
176 $count = $this->wanCache->get( $key );
177 if ( is_int( $count ) ) {
183 $count = $dbr->newSelectQueryBuilder()
187 'job_cmd' => $this->type,
188 $dbr->expr(
'job_token',
'!=',
'' ),
189 $dbr->expr(
'job_attempts',
'>=', $this->maxTries ),
192 ->caller( __METHOD__ )->fetchRowCount();
197 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
211 $transactionProfiler = Profiler::instance()->getTransactionProfiler();
212 $scope = $transactionProfiler->silenceForScope();
214 ScopedCallback::consume( $scope );
225 $dbw->onTransactionPreCommitOrIdle(
243 if ( $jobs === [] ) {
249 foreach ( $jobs as
$job ) {
251 if (
$job->ignoreDuplicates() ) {
252 $rowSet[$row[
'job_sha1']] = $row;
258 if ( $flags & self::QOS_ATOMIC ) {
263 if ( count( $rowSet ) ) {
265 ->select(
'job_sha1' )
270 'job_sha1' => array_map(
'strval', array_keys( $rowSet ) ),
274 ->caller( $method )->fetchResultSet();
275 foreach ( $res as $row ) {
276 wfDebug(
"Job with hash '{$row->job_sha1}' is a duplicate." );
277 unset( $rowSet[$row->job_sha1] );
281 $rows = array_merge( $rowList, array_values( $rowSet ) );
284 $transactionProfiler = Profiler::instance()->getTransactionProfiler();
285 $scope = $transactionProfiler->silenceForScope();
287 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
289 ->insertInto(
'job' )
291 ->caller( $method )->execute();
293 ScopedCallback::consume( $scope );
294 $this->
incrStats(
'inserts', $this->type, count( $rows ) );
295 $this->
incrStats(
'dupe_inserts', $this->type,
296 count( $rowSet ) + count( $rowList ) - count( $rows )
301 if ( $flags & self::QOS_ATOMIC ) {
316 if ( in_array( $this->order, [
'fifo',
'timestamp' ] ) ) {
319 $rand = mt_rand( 0, self::MAX_JOB_RANDOM );
320 $gte = (bool)mt_rand( 0, 1 );
334 if ( !
$job || mt_rand( 0, 9 ) == 0 ) {
357 $tinyQueue = $this->wanCache->get( $this->getCacheKey(
'small' ) );
359 $invertedDirection =
false;
368 $row = $dbw->newSelectQueryBuilder()
369 ->select( self::selectFields() )
373 'job_cmd' => $this->type,
375 $dbw->expr(
'job_random', $gte ?
'>=' :
'<=', $rand )
380 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
382 ->caller( __METHOD__ )->fetchRow();
383 if ( !$row && !$invertedDirection ) {
385 $invertedDirection =
true;
392 $row = $dbw->newSelectQueryBuilder()
393 ->select( self::selectFields() )
397 'job_cmd' => $this->type,
401 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
402 ->caller( __METHOD__ )->fetchRow();
405 $this->wanCache->set( $this->getCacheKey(
'small' ), 1, 30 );
414 $dbw->newUpdateQueryBuilder()
417 'job_token' => $uuid,
418 'job_token_timestamp' => $dbw->timestamp(),
419 'job_attempts' =>
new RawSQLValue(
'job_attempts+1' ),
422 'job_cmd' => $this->type,
423 'job_id' => $row->job_id,
426 ->caller( __METHOD__ )->execute();
430 if ( !$dbw->affectedRows() ) {
449 if ( $dbw->getType() ===
'mysql' ) {
454 $dbw->query(
"UPDATE {$dbw->tableName( 'job' )} " .
456 "job_token = {$dbw->addQuotes( $uuid ) }, " .
457 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
458 "job_attempts = job_attempts+1 " .
460 "job_cmd = {$dbw->addQuotes( $this->type )} " .
461 "AND job_token = {$dbw->addQuotes( '' )} " .
462 ") ORDER BY job_id ASC LIMIT 1",
468 $qb = $dbw->newSelectQueryBuilder()
471 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
472 ->orderBy(
'job_id', SelectQueryBuilder::SORT_ASC )
475 $dbw->newUpdateQueryBuilder()
478 'job_token' => $uuid,
479 'job_token_timestamp' => $dbw->timestamp(),
480 'job_attempts' =>
new RawSQLValue(
'job_attempts+1' ),
482 ->where( [
'job_id' =>
new RawSQLValue(
'(' . $qb->getSQL() .
')' ) ] )
483 ->caller( __METHOD__ )->execute();
486 if ( !$dbw->affectedRows() ) {
491 $row = $dbw->newSelectQueryBuilder()
492 ->select( self::selectFields() )
494 ->where( [
'job_cmd' => $this->type,
'job_token' => $uuid ] )
495 ->caller( __METHOD__ )->fetchRow();
497 wfDebug(
"Row deleted as duplicate by another process." );
511 $id =
$job->getMetadata(
'id' );
512 if ( $id ===
null ) {
513 throw new UnexpectedValueException(
"Job of type '{$job->getType()}' has no ID." );
519 $dbw->newDeleteQueryBuilder()
520 ->deleteFrom(
'job' )
521 ->where( [
'job_cmd' => $this->type,
'job_id' => $id ] )
522 ->caller( __METHOD__ )->execute();
543 $dbw->onTransactionCommitOrIdle(
544 function () use (
$job ) {
545 parent::doDeduplicateRootJob(
$job );
560 $dbw->newDeleteQueryBuilder()
561 ->deleteFrom(
'job' )
562 ->where( [
'job_cmd' => $this->type ] )
563 ->caller( __METHOD__ )->execute();
576 if ( $this->server ) {
581 $lbFactory->waitForReplication();
588 foreach ( [
'size',
'acquiredcount' ] as
$type ) {
589 $this->wanCache->delete( $this->getCacheKey(
$type ) );
607 return $this->
getJobIterator( [
'job_cmd' => $this->
getType(), $dbr->expr(
'job_token',
'>',
'' ) ] );
618 $dbr->expr(
'job_token',
'>',
'' ),
619 $dbr->expr(
'job_attempts',
'>=', intval( $this->maxTries ) ),
629 $qb = $dbr->newSelectQueryBuilder()
630 ->select( self::selectFields() )
635 $qb->caller( __METHOD__ )->fetchResultSet(),
636 $this->jobFromRow( ... )
645 if ( $this->server ) {
649 return is_string( $this->cluster )
650 ?
"DBCluster:{$this->cluster}:{$this->domain}"
651 :
"LBFactory:{$this->domain}";
661 return $dbr->newSelectQueryBuilder()
662 ->select(
'job_cmd' )
665 ->where( [
'job_cmd' => $types ] )
666 ->caller( __METHOD__ )
667 ->fetchFieldValues();
674 $res = $dbr->newSelectQueryBuilder()
675 ->select( [
'job_cmd',
'count' =>
'COUNT(*)' ] )
677 ->where( [
'job_cmd' => $types ] )
678 ->groupBy(
'job_cmd' )
679 ->caller( __METHOD__ )->fetchResultSet();
682 foreach ( $res as $row ) {
683 $sizes[$row->job_cmd] = (int)$row->count;
700 if ( !$dbw->lock(
"jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
705 if ( $this->claimTTL > 0 ) {
706 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
710 $ids = $dbw->newSelectQueryBuilder()
715 'job_cmd' => $this->type,
716 $dbw->expr(
'job_token',
'!=',
'' ),
717 $dbw->expr(
'job_token_timestamp',
'<', $claimCutoff ),
718 $dbw->expr(
'job_attempts',
'<', $this->maxTries ),
721 ->caller( __METHOD__ )
722 ->fetchFieldValues();
727 $dbw->newUpdateQueryBuilder()
731 'job_token_timestamp' => $dbw->timestamp( $now )
735 $dbw->expr(
'job_token',
'!=',
'' ),
737 ->caller( __METHOD__ )->execute();
739 $affected = $dbw->affectedRows();
741 $this->
incrStats(
'recycles', $this->type, $affected );
746 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
747 $qb = $dbw->newSelectQueryBuilder()
752 'job_cmd' => $this->type,
753 $dbw->expr(
'job_token',
'!=',
'' ),
754 $dbw->expr(
'job_token_timestamp',
'<', $pruneCutoff )
757 if ( $this->claimTTL > 0 ) {
758 $qb->andWhere( $dbw->expr(
'job_attempts',
'>=', $this->maxTries ) );
762 $ids = $qb->caller( __METHOD__ )->fetchFieldValues();
764 $dbw->newDeleteQueryBuilder()
765 ->deleteFrom(
'job' )
766 ->where( [
'job_id' => $ids ] )
767 ->caller( __METHOD__ )->execute();
768 $affected = $dbw->affectedRows();
770 $this->
incrStats(
'abandons', $this->type, $affected );
773 $dbw->unlock(
"jobqueue-recycle-{$this->type}", __METHOD__ );
789 'job_cmd' =>
$job->getType(),
791 'job_title' =>
$job->getParams()[
'title'] ??
'',
795 'job_sha1' => \Wikimedia\base_convert(
796 sha1( serialize(
$job->getDeduplicationInfo() ) ),
799 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
832 protected function getDB( $index ) {
833 if ( $this->server ) {
834 if ( $this->conn instanceof
IDatabase ) {
836 } elseif ( $this->conn instanceof
DBError ) {
842 $this->server[
'type'],
853 $lb = is_string( $this->cluster )
854 ? $lbFactory->getExternalLB( $this->cluster )
855 : $lbFactory->getMainLB( $this->domain );
857 if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !==
'sqlite' ) {
860 $flags = $lb::CONN_TRX_AUTOCOMMIT;
866 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
874 private function getCacheKey( $property ) {
875 $cluster = is_string( $this->cluster ) ? $this->cluster :
'main';
877 return $this->wanCache->makeGlobalKey(
891 if ( $params !==
false ) {
892 return serialize( $params );
903 $params = ( (string)$row->job_params !==
'' ) ? unserialize( $row->job_params ) : [];
904 if ( !is_array( $params ) ) {
905 throw new UnexpectedValueException(
906 "Could not unserialize job with ID '{$row->job_id}'." );
909 $params += [
'namespace' => $row->job_namespace,
'title' => $row->job_title ];
911 $job->setMetadata(
'id', $row->job_id );
912 $job->setMetadata(
'timestamp', $row->job_timestamp );
922 return new JobQueueError( get_class( $e ) .
": " . $e->getMessage() );
941 'job_token_timestamp',
948class_alias( JobQueueDB::class,
'JobQueueDB' );
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Convenience class for generating iterators from iterators.
Advanced database interface for IDatabase handles that include maintenance methods.
if(count( $args)< 1) $job