MediaWiki master
JobQueueDB.php
Go to the documentation of this file.
1<?php
21namespace MediaWiki\JobQueue;
22
27use Profiler;
28use stdClass;
29use UnexpectedValueException;
38use Wikimedia\ScopedCallback;
39
46class JobQueueDB extends JobQueue {
47 /* seconds to cache info without re-validating */
48 private const CACHE_TTL_SHORT = 30;
49 /* seconds a job can live once claimed */
50 private const MAX_AGE_PRUNE = 7 * 24 * 3600;
55 private const MAX_JOB_RANDOM = 2_147_483_647;
56 /* maximum number of rows to skip */
57 private const MAX_OFFSET = 255;
58
60 protected $conn;
61
63 protected $server;
65 protected $cluster;
66
75 protected function __construct( array $params ) {
76 parent::__construct( $params );
77
78 if ( isset( $params['server'] ) ) {
79 $this->server = $params['server'];
80 // Always use autocommit mode, even if DBO_TRX is configured
81 $this->server['flags'] ??= 0;
82 $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
83 } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
84 $this->cluster = $params['cluster'];
85 }
86 }
87
88 protected function supportedOrders() {
89 return [ 'random', 'timestamp', 'fifo' ];
90 }
91
92 protected function optimalOrder() {
93 return 'random';
94 }
95
100 protected function doIsEmpty() {
101 $dbr = $this->getReplicaDB();
102 try {
103 // unclaimed job
104 $found = (bool)$dbr->newSelectQueryBuilder()
105 ->select( '1' )
106 ->from( 'job' )
107 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
108 ->caller( __METHOD__ )->fetchField();
109 } catch ( DBError $e ) {
110 throw $this->getDBException( $e );
111 }
112
113 return !$found;
114 }
115
120 protected function doGetSize() {
121 $key = $this->getCacheKey( 'size' );
122
123 $size = $this->wanCache->get( $key );
124 if ( is_int( $size ) ) {
125 return $size;
126 }
127
128 $dbr = $this->getReplicaDB();
129 try {
130 $size = $dbr->newSelectQueryBuilder()
131 ->from( 'job' )
132 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
133 ->caller( __METHOD__ )->fetchRowCount();
134 } catch ( DBError $e ) {
135 throw $this->getDBException( $e );
136 }
137 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
138
139 return $size;
140 }
141
146 protected function doGetAcquiredCount() {
147 if ( $this->claimTTL <= 0 ) {
148 return 0; // no acknowledgements
149 }
150
151 $key = $this->getCacheKey( 'acquiredcount' );
152
153 $count = $this->wanCache->get( $key );
154 if ( is_int( $count ) ) {
155 return $count;
156 }
157
158 $dbr = $this->getReplicaDB();
159 try {
160 $count = $dbr->newSelectQueryBuilder()
161 ->from( 'job' )
162 ->where( [
163 'job_cmd' => $this->type,
164 $dbr->expr( 'job_token', '!=', '' ),
165 ] )
166 ->caller( __METHOD__ )->fetchRowCount();
167 } catch ( DBError $e ) {
168 throw $this->getDBException( $e );
169 }
170 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
171
172 return $count;
173 }
174
181 protected function doGetAbandonedCount() {
182 if ( $this->claimTTL <= 0 ) {
183 return 0; // no acknowledgements
184 }
185
186 $key = $this->getCacheKey( 'abandonedcount' );
187
188 $count = $this->wanCache->get( $key );
189 if ( is_int( $count ) ) {
190 return $count;
191 }
192
193 $dbr = $this->getReplicaDB();
194 try {
195 $count = $dbr->newSelectQueryBuilder()
196 ->from( 'job' )
197 ->where(
198 [
199 'job_cmd' => $this->type,
200 $dbr->expr( 'job_token', '!=', '' ),
201 $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
202 ]
203 )
204 ->caller( __METHOD__ )->fetchRowCount();
205 } catch ( DBError $e ) {
206 throw $this->getDBException( $e );
207 }
208
209 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
210
211 return $count;
212 }
213
221 protected function doBatchPush( array $jobs, $flags ) {
222 // Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job.
223 $transactionProfiler = Profiler::instance()->getTransactionProfiler();
224 $scope = $transactionProfiler->silenceForScope();
225 $dbw = $this->getPrimaryDB();
226 ScopedCallback::consume( $scope );
227 // In general, there will be two cases here:
228 // a) sqlite; DB connection is probably a regular round-aware handle.
229 // If the connection is busy with a transaction, then defer the job writes
230 // until right before the main round commit step. Any errors that bubble
231 // up will rollback the main commit round.
232 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
233 // No transaction is active nor will be started by writes, so enqueue the jobs
234 // now so that any errors will show up immediately as the interface expects. Any
235 // errors that bubble up will rollback the main commit round.
236 $fname = __METHOD__;
237 $dbw->onTransactionPreCommitOrIdle(
238 fn () => $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ),
239 $fname
240 );
241 }
242
254 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
255 if ( $jobs === [] ) {
256 return;
257 }
258
259 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
260 $rowList = []; // list of jobs for jobs that are not de-duplicated
261 foreach ( $jobs as $job ) {
262 $row = $this->insertFields( $job, $dbw );
263 if ( $job->ignoreDuplicates() ) {
264 $rowSet[$row['job_sha1']] = $row;
265 } else {
266 $rowList[] = $row;
267 }
268 }
269
270 if ( $flags & self::QOS_ATOMIC ) {
271 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
272 }
273 try {
274 // Strip out any duplicate jobs that are already in the queue...
275 if ( count( $rowSet ) ) {
276 $res = $dbw->newSelectQueryBuilder()
277 ->select( 'job_sha1' )
278 ->from( 'job' )
279 ->where(
280 [
281 // No job_type condition since it's part of the job_sha1 hash
282 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
283 'job_token' => '' // unclaimed
284 ]
285 )
286 ->caller( $method )->fetchResultSet();
287 foreach ( $res as $row ) {
288 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
289 unset( $rowSet[$row->job_sha1] ); // already enqueued
290 }
291 }
292 // Build the full list of job rows to insert
293 $rows = array_merge( $rowList, array_values( $rowSet ) );
294 // Silence expectations related to inserting to the job table, because we have to perform the inserts to
295 // track the job.
296 $transactionProfiler = Profiler::instance()->getTransactionProfiler();
297 $scope = $transactionProfiler->silenceForScope();
298 // Insert the job rows in chunks to avoid replica DB lag...
299 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
301 ->insertInto( 'job' )
302 ->rows( $rowBatch )
303 ->caller( $method )->execute();
304 }
305 ScopedCallback::consume( $scope );
306 $this->incrStats( 'inserts', $this->type, count( $rows ) );
307 $this->incrStats( 'dupe_inserts', $this->type,
308 count( $rowSet ) + count( $rowList ) - count( $rows )
309 );
310 } catch ( DBError $e ) {
311 throw $this->getDBException( $e );
312 }
313 if ( $flags & self::QOS_ATOMIC ) {
314 $dbw->endAtomic( $method );
315 }
316 }
317
322 protected function doPop() {
323 $job = false; // job popped off
324 try {
325 $uuid = wfRandomString( 32 ); // pop attempt
326 do { // retry when our row is invalid or deleted as a duplicate
327 // Try to reserve a row in the DB...
328 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
329 $row = $this->claimOldest( $uuid );
330 } else { // random first
331 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
332 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
333 $row = $this->claimRandom( $uuid, $rand, $gte );
334 }
335 // Check if we found a row to reserve...
336 if ( !$row ) {
337 break; // nothing to do
338 }
339 $this->incrStats( 'pops', $this->type );
340
341 // Get the job object from the row...
342 $job = $this->jobFromRow( $row );
343 break; // done
344 } while ( true );
345
346 if ( !$job || mt_rand( 0, 9 ) == 0 ) {
347 // Handled jobs that need to be recycled/deleted;
348 // any recycled jobs will be picked up next attempt
350 }
351 } catch ( DBError $e ) {
352 throw $this->getDBException( $e );
353 }
354
355 return $job;
356 }
357
366 protected function claimRandom( $uuid, $rand, $gte ) {
367 $dbw = $this->getPrimaryDB();
368 // Check cache to see if the queue has <= OFFSET items
369 $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
370
371 $invertedDirection = false; // whether one job_random direction was already scanned
372 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
373 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
374 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
375 // be used here with MySQL.
376 do {
377 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
378 // For small queues, using OFFSET will overshoot and return no rows more often.
379 // Instead, this uses job_random to pick a row (possibly checking both directions).
380 $row = $dbw->newSelectQueryBuilder()
381 ->select( self::selectFields() )
382 ->from( 'job' )
383 ->where(
384 [
385 'job_cmd' => $this->type,
386 'job_token' => '', // unclaimed
387 $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
388 ]
389 )
390 ->orderBy(
391 'job_random',
392 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
393 )
394 ->caller( __METHOD__ )->fetchRow();
395 if ( !$row && !$invertedDirection ) {
396 $gte = !$gte;
397 $invertedDirection = true;
398 continue; // try the other direction
399 }
400 } else { // table *may* have >= MAX_OFFSET rows
401 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
402 // in MySQL if there are many rows for some reason. This uses a small OFFSET
403 // instead of job_random for reducing excess claim retries.
404 $row = $dbw->newSelectQueryBuilder()
405 ->select( self::selectFields() )
406 ->from( 'job' )
407 ->where(
408 [
409 'job_cmd' => $this->type,
410 'job_token' => '', // unclaimed
411 ]
412 )
413 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
414 ->caller( __METHOD__ )->fetchRow();
415 if ( !$row ) {
416 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
417 $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
418 continue; // use job_random
419 }
420 }
421
422 if ( !$row ) {
423 break;
424 }
425
426 $dbw->newUpdateQueryBuilder()
427 ->update( 'job' ) // update by PK
428 ->set( [
429 'job_token' => $uuid,
430 'job_token_timestamp' => $dbw->timestamp(),
431 'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
432 ] )
433 ->where( [
434 'job_cmd' => $this->type,
435 'job_id' => $row->job_id,
436 'job_token' => ''
437 ] )
438 ->caller( __METHOD__ )->execute();
439
440 // This might get raced out by another runner when claiming the previously
441 // selected row. The use of job_random should minimize this problem, however.
442 if ( !$dbw->affectedRows() ) {
443 $row = false; // raced out
444 }
445 } while ( !$row );
446
447 return $row;
448 }
449
456 protected function claimOldest( $uuid ) {
457 $dbw = $this->getPrimaryDB();
458
459 $row = false; // the row acquired
460 do {
461 if ( $dbw->getType() === 'mysql' ) {
462 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
463 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
464 // Postgres has no such limitation. However, MySQL offers an
465 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
466 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
467 "SET " .
468 "job_token = {$dbw->addQuotes( $uuid ) }, " .
469 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
470 "job_attempts = job_attempts+1 " .
471 "WHERE ( " .
472 "job_cmd = {$dbw->addQuotes( $this->type )} " .
473 "AND job_token = {$dbw->addQuotes( '' )} " .
474 ") ORDER BY job_id ASC LIMIT 1",
475 __METHOD__
476 );
477 } else {
478 // Use a subquery to find the job, within an UPDATE to claim it.
479 // This uses as much of the DB wrapper functions as possible.
480 $qb = $dbw->newSelectQueryBuilder()
481 ->select( 'job_id' )
482 ->from( 'job' )
483 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
484 ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
485 ->limit( 1 );
486
487 $dbw->newUpdateQueryBuilder()
488 ->update( 'job' )
489 ->set( [
490 'job_token' => $uuid,
491 'job_token_timestamp' => $dbw->timestamp(),
492 'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
493 ] )
494 ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] )
495 ->caller( __METHOD__ )->execute();
496 }
497
498 if ( !$dbw->affectedRows() ) {
499 break;
500 }
501
502 // Fetch any row that we just reserved...
503 $row = $dbw->newSelectQueryBuilder()
504 ->select( self::selectFields() )
505 ->from( 'job' )
506 ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
507 ->caller( __METHOD__ )->fetchRow();
508 if ( !$row ) { // raced out by duplicate job removal
509 wfDebug( "Row deleted as duplicate by another process." );
510 }
511 } while ( !$row );
512
513 return $row;
514 }
515
522 protected function doAck( RunnableJob $job ) {
523 $id = $job->getMetadata( 'id' );
524 if ( $id === null ) {
525 throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
526 }
527
528 $dbw = $this->getPrimaryDB();
529 try {
530 // Delete a row with a single DELETE without holding row locks over RTTs...
531 $dbw->newDeleteQueryBuilder()
532 ->deleteFrom( 'job' )
533 ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
534 ->caller( __METHOD__ )->execute();
535
536 $this->incrStats( 'acks', $this->type );
537 } catch ( DBError $e ) {
538 throw $this->getDBException( $e );
539 }
540 }
541
549 // Callers should call JobQueueGroup::push() before this method so that if the
550 // insert fails, the de-duplication registration will be aborted. Since the insert
551 // is deferred till "transaction idle", do the same here, so that the ordering is
552 // maintained. Having only the de-duplication registration succeed would cause
553 // jobs to become no-ops without any actual jobs that made them redundant.
554 $dbw = $this->getPrimaryDB();
555 $dbw->onTransactionCommitOrIdle(
556 function () use ( $job ) {
557 parent::doDeduplicateRootJob( $job );
558 },
559 __METHOD__
560 );
561
562 return true;
563 }
564
569 protected function doDelete() {
570 $dbw = $this->getPrimaryDB();
571 try {
572 $dbw->newDeleteQueryBuilder()
573 ->deleteFrom( 'job' )
574 ->where( [ 'job_cmd' => $this->type ] )
575 ->caller( __METHOD__ )->execute();
576 } catch ( DBError $e ) {
577 throw $this->getDBException( $e );
578 }
579
580 return true;
581 }
582
587 protected function doWaitForBackups() {
588 if ( $this->server ) {
589 return; // not using LBFactory instance
590 }
591
592 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
593 $lbFactory->waitForReplication();
594 }
595
599 protected function doFlushCaches() {
600 foreach ( [ 'size', 'acquiredcount' ] as $type ) {
601 $this->wanCache->delete( $this->getCacheKey( $type ) );
602 }
603 }
604
609 public function getAllQueuedJobs() {
610 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
611 }
612
617 public function getAllAcquiredJobs() {
618 $dbr = $this->getReplicaDB();
619 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] );
620 }
621
626 public function getAllAbandonedJobs() {
627 $dbr = $this->getReplicaDB();
628 return $this->getJobIterator( [
629 'job_cmd' => $this->getType(),
630 $dbr->expr( 'job_token', '>', '' ),
631 $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ),
632 ] );
633 }
634
639 protected function getJobIterator( array $conds ) {
640 $dbr = $this->getReplicaDB();
641 $qb = $dbr->newSelectQueryBuilder()
642 ->select( self::selectFields() )
643 ->from( 'job' )
644 ->where( $conds );
645 try {
646 return new MappedIterator(
647 $qb->caller( __METHOD__ )->fetchResultSet(),
648 function ( $row ) {
649 return $this->jobFromRow( $row );
650 }
651 );
652 } catch ( DBError $e ) {
653 throw $this->getDBException( $e );
654 }
655 }
656
657 public function getCoalesceLocationInternal() {
658 if ( $this->server ) {
659 return null; // not using the LBFactory instance
660 }
661
662 return is_string( $this->cluster )
663 ? "DBCluster:{$this->cluster}:{$this->domain}"
664 : "LBFactory:{$this->domain}";
665 }
666
667 protected function doGetSiblingQueuesWithJobs( array $types ) {
668 $dbr = $this->getReplicaDB();
669 // @note: this does not check whether the jobs are claimed or not.
670 // This is useful so JobQueueGroup::pop() also sees queues that only
671 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
672 // failed jobs so that they can be popped again for that edge case.
673 $res = $dbr->newSelectQueryBuilder()
674 ->select( 'job_cmd' )
675 ->distinct()
676 ->from( 'job' )
677 ->where( [ 'job_cmd' => $types ] )
678 ->caller( __METHOD__ )->fetchResultSet();
679
680 $types = [];
681 foreach ( $res as $row ) {
682 $types[] = $row->job_cmd;
683 }
684
685 return $types;
686 }
687
688 protected function doGetSiblingQueueSizes( array $types ) {
689 $dbr = $this->getReplicaDB();
690
691 $res = $dbr->newSelectQueryBuilder()
692 ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
693 ->from( 'job' )
694 ->where( [ 'job_cmd' => $types ] )
695 ->groupBy( 'job_cmd' )
696 ->caller( __METHOD__ )->fetchResultSet();
697
698 $sizes = [];
699 foreach ( $res as $row ) {
700 $sizes[$row->job_cmd] = (int)$row->count;
701 }
702
703 return $sizes;
704 }
705
711 public function recycleAndDeleteStaleJobs() {
712 $now = time();
713 $count = 0; // affected rows
714 $dbw = $this->getPrimaryDB();
715
716 try {
717 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
718 return $count; // already in progress
719 }
720
721 // Remove claims on jobs acquired for too long if enabled...
722 if ( $this->claimTTL > 0 ) {
723 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
724 // Get the IDs of jobs that have be claimed but not finished after too long.
725 // These jobs can be recycled into the queue by expiring the claim. Selecting
726 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
727 $res = $dbw->newSelectQueryBuilder()
728 ->select( 'job_id' )
729 ->from( 'job' )
730 ->where(
731 [
732 'job_cmd' => $this->type,
733 $dbw->expr( 'job_token', '!=', '' ), // was acquired
734 $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
735 $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
736 ]
737 )
738 ->caller( __METHOD__ )->fetchResultSet();
739 $ids = array_map(
740 static function ( $o ) {
741 return $o->job_id;
742 }, iterator_to_array( $res )
743 );
744 if ( count( $ids ) ) {
745 // Reset job_token for these jobs so that other runners will pick them up.
746 // Set the timestamp to the current time, as it is useful to now that the job
747 // was already tried before (the timestamp becomes the "released" time).
748 $dbw->newUpdateQueryBuilder()
749 ->update( 'job' )
750 ->set( [
751 'job_token' => '',
752 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
753 ] )
754 ->where( [
755 'job_id' => $ids,
756 $dbw->expr( 'job_token', '!=', '' ),
757 ] )
758 ->caller( __METHOD__ )->execute();
759
760 $affected = $dbw->affectedRows();
761 $count += $affected;
762 $this->incrStats( 'recycles', $this->type, $affected );
763 }
764 }
765
766 // Just destroy any stale jobs...
767 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
768 $qb = $dbw->newSelectQueryBuilder()
769 ->select( 'job_id' )
770 ->from( 'job' )
771 ->where(
772 [
773 'job_cmd' => $this->type,
774 $dbw->expr( 'job_token', '!=', '' ), // was acquired
775 $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
776 ]
777 );
778 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
779 $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
780 }
781 // Get the IDs of jobs that are considered stale and should be removed. Selecting
782 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
783 $res = $qb->caller( __METHOD__ )->fetchResultSet();
784 $ids = array_map(
785 static function ( $o ) {
786 return $o->job_id;
787 }, iterator_to_array( $res )
788 );
789 if ( count( $ids ) ) {
790 $dbw->newDeleteQueryBuilder()
791 ->deleteFrom( 'job' )
792 ->where( [ 'job_id' => $ids ] )
793 ->caller( __METHOD__ )->execute();
794 $affected = $dbw->affectedRows();
795 $count += $affected;
796 $this->incrStats( 'abandons', $this->type, $affected );
797 }
798
799 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
800 } catch ( DBError $e ) {
801 throw $this->getDBException( $e );
802 }
803
804 return $count;
805 }
806
813 return [
814 // Fields that describe the nature of the job
815 'job_cmd' => $job->getType(),
816 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
817 'job_title' => $job->getParams()['title'] ?? '',
818 'job_params' => self::makeBlob( $job->getParams() ),
819 // Additional job metadata
820 'job_timestamp' => $db->timestamp(),
821 'job_sha1' => \Wikimedia\base_convert(
822 sha1( serialize( $job->getDeduplicationInfo() ) ),
823 16, 36, 31
824 ),
825 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
826 ];
827 }
828
833 protected function getReplicaDB() {
834 try {
835 return $this->getDB( DB_REPLICA );
836 } catch ( DBConnectionError $e ) {
837 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
838 }
839 }
840
846 protected function getPrimaryDB() {
847 try {
848 return $this->getDB( DB_PRIMARY );
849 } catch ( DBConnectionError $e ) {
850 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
851 }
852 }
853
858 protected function getDB( $index ) {
859 if ( $this->server ) {
860 if ( $this->conn instanceof IDatabase ) {
861 return $this->conn;
862 } elseif ( $this->conn instanceof DBError ) {
863 throw $this->conn;
864 }
865
866 try {
867 $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
868 $this->server['type'],
869 $this->server
870 );
871 } catch ( DBError $e ) {
872 $this->conn = $e;
873 throw $e;
874 }
875
876 return $this->conn;
877 } else {
878 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
879 $lb = is_string( $this->cluster )
880 ? $lbFactory->getExternalLB( $this->cluster )
881 : $lbFactory->getMainLB( $this->domain );
882
883 if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) {
884 // Keep a separate connection to avoid contention and deadlocks;
885 // However, SQLite has the opposite behavior due to DB-level locking.
886 $flags = $lb::CONN_TRX_AUTOCOMMIT;
887 } else {
888 // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
889 $flags = 0;
890 }
891
892 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
893 }
894 }
895
900 private function getCacheKey( $property ) {
901 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
902
903 return $this->wanCache->makeGlobalKey(
904 'jobqueue',
905 $this->domain,
906 $cluster,
907 $this->type,
908 $property
909 );
910 }
911
916 protected static function makeBlob( $params ) {
917 if ( $params !== false ) {
918 return serialize( $params );
919 } else {
920 return '';
921 }
922 }
923
928 protected function jobFromRow( $row ) {
929 $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
930 if ( !is_array( $params ) ) { // this shouldn't happen
931 throw new UnexpectedValueException(
932 "Could not unserialize job with ID '{$row->job_id}'." );
933 }
934
935 $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
936 $job = $this->factoryJob( $row->job_cmd, $params );
937 $job->setMetadata( 'id', $row->job_id );
938 $job->setMetadata( 'timestamp', $row->job_timestamp );
939
940 return $job;
941 }
942
947 protected function getDBException( DBError $e ) {
948 return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
949 }
950
956 public static function selectFields() {
957 return [
958 'job_id',
959 'job_cmd',
960 'job_namespace',
961 'job_title',
962 'job_timestamp',
963 'job_params',
964 'job_random',
965 'job_attempts',
966 'job_token',
967 'job_token_timestamp',
968 'job_sha1',
969 ];
970 }
971}
972
974class_alias( JobQueueDB::class, 'JobQueueDB' );
const NS_SPECIAL
Definition Defines.php:54
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Convenience class for generating iterators from iterators.
Database-backed job queue storage.
array null $server
Server configuration array.
IMaintainableDatabase DBError null $conn
doGetSiblingQueueSizes(array $types)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doGetSiblingQueuesWithJobs(array $types)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
static selectFields()
Return the list of job fields that should be selected.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
Additional parameters include:
doBatchPush(array $jobs, $flags)
insertFields(IJobSpecification $job, IReadableDatabase $db)
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doDeduplicateRootJob(IJobSpecification $job)
Base class for queueing and running background jobs from a storage backend.
Definition JobQueue.php:50
incrStats( $event, $type, $delta=1)
Call StatsFactory::incrementBy() for the queue overall and for the queue type.
Definition JobQueue.php:784
string $type
Job type.
Definition JobQueue.php:54
factoryJob( $command, $params)
Definition JobQueue.php:750
Service locator for MediaWiki core services.
static getInstance()
Returns the global default instance of the top level service locator.
Profiler base class that defines the interface and some shared functionality.
Definition Profiler.php:37
static instance()
Definition Profiler.php:105
Database error base class.
Definition DBError.php:36
Raw SQL value to be used in query builders.
Build SELECT queries with a fluent interface.
Container for accessing information about the database servers in a database cluster.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
Interface to a relational database.
Definition IDatabase.php:45
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
newInsertQueryBuilder()
Get an InsertQueryBuilder bound to this connection.
Advanced database interface for IDatabase handles that include maintenance methods.
A database connection without write operations.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28
if(count( $args)< 1) $job