MediaWiki master
JobQueueDB.php
Go to the documentation of this file.
1<?php
7namespace MediaWiki\JobQueue;
8
14use stdClass;
15use UnexpectedValueException;
24use Wikimedia\ScopedCallback;
25
32class JobQueueDB extends JobQueue {
33 /* seconds to cache info without re-validating */
34 private const CACHE_TTL_SHORT = 30;
35 /* seconds a job can live once claimed */
36 private const MAX_AGE_PRUNE = 7 * 24 * 3600;
41 private const MAX_JOB_RANDOM = 2_147_483_647;
42 /* maximum number of rows to skip */
43 private const MAX_OFFSET = 255;
44
46 protected $conn;
47
49 protected $server;
51 protected $cluster;
52
61 protected function __construct( array $params ) {
62 parent::__construct( $params );
63
64 if ( isset( $params['server'] ) ) {
65 $this->server = $params['server'];
66 // Always use autocommit mode, even if DBO_TRX is configured
67 $this->server['flags'] ??= 0;
68 $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
69 } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
70 $this->cluster = $params['cluster'];
71 }
72 }
73
75 protected function supportedOrders() {
76 return [ 'random', 'timestamp', 'fifo' ];
77 }
78
80 protected function optimalOrder() {
81 return 'random';
82 }
83
88 protected function doIsEmpty() {
89 $dbr = $this->getReplicaDB();
90 try {
91 // unclaimed job
92 $found = (bool)$dbr->newSelectQueryBuilder()
93 ->select( '1' )
94 ->from( 'job' )
95 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
96 ->caller( __METHOD__ )->fetchField();
97 } catch ( DBError $e ) {
98 throw $this->getDBException( $e );
99 }
100
101 return !$found;
102 }
103
108 protected function doGetSize() {
109 $key = $this->getCacheKey( 'size' );
110
111 $size = $this->wanCache->get( $key );
112 if ( is_int( $size ) ) {
113 return $size;
114 }
115
116 $dbr = $this->getReplicaDB();
117 try {
118 $size = $dbr->newSelectQueryBuilder()
119 ->from( 'job' )
120 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
121 ->caller( __METHOD__ )->fetchRowCount();
122 } catch ( DBError $e ) {
123 throw $this->getDBException( $e );
124 }
125 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
126
127 return $size;
128 }
129
134 protected function doGetAcquiredCount() {
135 if ( $this->claimTTL <= 0 ) {
136 return 0; // no acknowledgements
137 }
138
139 $key = $this->getCacheKey( 'acquiredcount' );
140
141 $count = $this->wanCache->get( $key );
142 if ( is_int( $count ) ) {
143 return $count;
144 }
145
146 $dbr = $this->getReplicaDB();
147 try {
148 $count = $dbr->newSelectQueryBuilder()
149 ->from( 'job' )
150 ->where( [
151 'job_cmd' => $this->type,
152 $dbr->expr( 'job_token', '!=', '' ),
153 ] )
154 ->caller( __METHOD__ )->fetchRowCount();
155 } catch ( DBError $e ) {
156 throw $this->getDBException( $e );
157 }
158 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
159
160 return $count;
161 }
162
169 protected function doGetAbandonedCount() {
170 if ( $this->claimTTL <= 0 ) {
171 return 0; // no acknowledgements
172 }
173
174 $key = $this->getCacheKey( 'abandonedcount' );
175
176 $count = $this->wanCache->get( $key );
177 if ( is_int( $count ) ) {
178 return $count;
179 }
180
181 $dbr = $this->getReplicaDB();
182 try {
183 $count = $dbr->newSelectQueryBuilder()
184 ->from( 'job' )
185 ->where(
186 [
187 'job_cmd' => $this->type,
188 $dbr->expr( 'job_token', '!=', '' ),
189 $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
190 ]
191 )
192 ->caller( __METHOD__ )->fetchRowCount();
193 } catch ( DBError $e ) {
194 throw $this->getDBException( $e );
195 }
196
197 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
198
199 return $count;
200 }
201
209 protected function doBatchPush( array $jobs, $flags ) {
210 // Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job.
211 $transactionProfiler = Profiler::instance()->getTransactionProfiler();
212 $scope = $transactionProfiler->silenceForScope();
213 $dbw = $this->getPrimaryDB();
214 ScopedCallback::consume( $scope );
215 // In general, there will be two cases here:
216 // a) sqlite; DB connection is probably a regular round-aware handle.
217 // If the connection is busy with a transaction, then defer the job writes
218 // until right before the main round commit step. Any errors that bubble
219 // up will rollback the main commit round.
220 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
221 // No transaction is active nor will be started by writes, so enqueue the jobs
222 // now so that any errors will show up immediately as the interface expects. Any
223 // errors that bubble up will rollback the main commit round.
224 $fname = __METHOD__;
225 $dbw->onTransactionPreCommitOrIdle(
226 fn () => $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ),
227 $fname
228 );
229 }
230
242 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
243 if ( $jobs === [] ) {
244 return;
245 }
246
247 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
248 $rowList = []; // list of jobs for jobs that are not de-duplicated
249 foreach ( $jobs as $job ) {
250 $row = $this->insertFields( $job, $dbw );
251 if ( $job->ignoreDuplicates() ) {
252 $rowSet[$row['job_sha1']] = $row;
253 } else {
254 $rowList[] = $row;
255 }
256 }
257
258 if ( $flags & self::QOS_ATOMIC ) {
259 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
260 }
261 try {
262 // Strip out any duplicate jobs that are already in the queue...
263 if ( count( $rowSet ) ) {
264 $res = $dbw->newSelectQueryBuilder()
265 ->select( 'job_sha1' )
266 ->from( 'job' )
267 ->where(
268 [
269 // No job_type condition since it's part of the job_sha1 hash
270 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
271 'job_token' => '' // unclaimed
272 ]
273 )
274 ->caller( $method )->fetchResultSet();
275 foreach ( $res as $row ) {
276 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
277 unset( $rowSet[$row->job_sha1] ); // already enqueued
278 }
279 }
280 // Build the full list of job rows to insert
281 $rows = array_merge( $rowList, array_values( $rowSet ) );
282 // Silence expectations related to inserting to the job table, because we have to perform the inserts to
283 // track the job.
284 $transactionProfiler = Profiler::instance()->getTransactionProfiler();
285 $scope = $transactionProfiler->silenceForScope();
286 // Insert the job rows in chunks to avoid replica DB lag...
287 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
289 ->insertInto( 'job' )
290 ->rows( $rowBatch )
291 ->caller( $method )->execute();
292 }
293 ScopedCallback::consume( $scope );
294 $this->incrStats( 'inserts', $this->type, count( $rows ) );
295 $this->incrStats( 'dupe_inserts', $this->type,
296 count( $rowSet ) + count( $rowList ) - count( $rows )
297 );
298 } catch ( DBError $e ) {
299 throw $this->getDBException( $e );
300 }
301 if ( $flags & self::QOS_ATOMIC ) {
302 $dbw->endAtomic( $method );
303 }
304 }
305
310 protected function doPop() {
311 $job = false; // job popped off
312 try {
313 $uuid = wfRandomString( 32 ); // pop attempt
314 do { // retry when our row is invalid or deleted as a duplicate
315 // Try to reserve a row in the DB...
316 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
317 $row = $this->claimOldest( $uuid );
318 } else { // random first
319 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
320 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
321 $row = $this->claimRandom( $uuid, $rand, $gte );
322 }
323 // Check if we found a row to reserve...
324 if ( !$row ) {
325 break; // nothing to do
326 }
327 $this->incrStats( 'pops', $this->type );
328
329 // Get the job object from the row...
330 $job = $this->jobFromRow( $row );
331 break; // done
332 } while ( true );
333
334 if ( !$job || mt_rand( 0, 9 ) == 0 ) {
335 // Handled jobs that need to be recycled/deleted;
336 // any recycled jobs will be picked up next attempt
338 }
339 } catch ( DBError $e ) {
340 throw $this->getDBException( $e );
341 }
342
343 return $job;
344 }
345
354 protected function claimRandom( $uuid, $rand, $gte ) {
355 $dbw = $this->getPrimaryDB();
356 // Check cache to see if the queue has <= OFFSET items
357 $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
358
359 $invertedDirection = false; // whether one job_random direction was already scanned
360 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
361 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
362 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
363 // be used here with MySQL.
364 do {
365 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
366 // For small queues, using OFFSET will overshoot and return no rows more often.
367 // Instead, this uses job_random to pick a row (possibly checking both directions).
368 $row = $dbw->newSelectQueryBuilder()
369 ->select( self::selectFields() )
370 ->from( 'job' )
371 ->where(
372 [
373 'job_cmd' => $this->type,
374 'job_token' => '', // unclaimed
375 $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
376 ]
377 )
378 ->orderBy(
379 'job_random',
380 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
381 )
382 ->caller( __METHOD__ )->fetchRow();
383 if ( !$row && !$invertedDirection ) {
384 $gte = !$gte;
385 $invertedDirection = true;
386 continue; // try the other direction
387 }
388 } else { // table *may* have >= MAX_OFFSET rows
389 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
390 // in MySQL if there are many rows for some reason. This uses a small OFFSET
391 // instead of job_random for reducing excess claim retries.
392 $row = $dbw->newSelectQueryBuilder()
393 ->select( self::selectFields() )
394 ->from( 'job' )
395 ->where(
396 [
397 'job_cmd' => $this->type,
398 'job_token' => '', // unclaimed
399 ]
400 )
401 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
402 ->caller( __METHOD__ )->fetchRow();
403 if ( !$row ) {
404 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
405 $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
406 continue; // use job_random
407 }
408 }
409
410 if ( !$row ) {
411 break;
412 }
413
414 $dbw->newUpdateQueryBuilder()
415 ->update( 'job' ) // update by PK
416 ->set( [
417 'job_token' => $uuid,
418 'job_token_timestamp' => $dbw->timestamp(),
419 'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
420 ] )
421 ->where( [
422 'job_cmd' => $this->type,
423 'job_id' => $row->job_id,
424 'job_token' => ''
425 ] )
426 ->caller( __METHOD__ )->execute();
427
428 // This might get raced out by another runner when claiming the previously
429 // selected row. The use of job_random should minimize this problem, however.
430 if ( !$dbw->affectedRows() ) {
431 $row = false; // raced out
432 }
433 } while ( !$row );
434
435 return $row;
436 }
437
444 protected function claimOldest( $uuid ) {
445 $dbw = $this->getPrimaryDB();
446
447 $row = false; // the row acquired
448 do {
449 if ( $dbw->getType() === 'mysql' ) {
450 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
451 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
452 // Postgres has no such limitation. However, MySQL offers an
453 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
454 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
455 "SET " .
456 "job_token = {$dbw->addQuotes( $uuid ) }, " .
457 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
458 "job_attempts = job_attempts+1 " .
459 "WHERE ( " .
460 "job_cmd = {$dbw->addQuotes( $this->type )} " .
461 "AND job_token = {$dbw->addQuotes( '' )} " .
462 ") ORDER BY job_id ASC LIMIT 1",
463 __METHOD__
464 );
465 } else {
466 // Use a subquery to find the job, within an UPDATE to claim it.
467 // This uses as much of the DB wrapper functions as possible.
468 $qb = $dbw->newSelectQueryBuilder()
469 ->select( 'job_id' )
470 ->from( 'job' )
471 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
472 ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
473 ->limit( 1 );
474
475 $dbw->newUpdateQueryBuilder()
476 ->update( 'job' )
477 ->set( [
478 'job_token' => $uuid,
479 'job_token_timestamp' => $dbw->timestamp(),
480 'job_attempts' => new RawSQLValue( 'job_attempts+1' ),
481 ] )
482 ->where( [ 'job_id' => new RawSQLValue( '(' . $qb->getSQL() . ')' ) ] )
483 ->caller( __METHOD__ )->execute();
484 }
485
486 if ( !$dbw->affectedRows() ) {
487 break;
488 }
489
490 // Fetch any row that we just reserved...
491 $row = $dbw->newSelectQueryBuilder()
492 ->select( self::selectFields() )
493 ->from( 'job' )
494 ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
495 ->caller( __METHOD__ )->fetchRow();
496 if ( !$row ) { // raced out by duplicate job removal
497 wfDebug( "Row deleted as duplicate by another process." );
498 }
499 } while ( !$row );
500
501 return $row;
502 }
503
510 protected function doAck( RunnableJob $job ) {
511 $id = $job->getMetadata( 'id' );
512 if ( $id === null ) {
513 throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
514 }
515
516 $dbw = $this->getPrimaryDB();
517 try {
518 // Delete a row with a single DELETE without holding row locks over RTTs...
519 $dbw->newDeleteQueryBuilder()
520 ->deleteFrom( 'job' )
521 ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
522 ->caller( __METHOD__ )->execute();
523
524 $this->incrStats( 'acks', $this->type );
525 } catch ( DBError $e ) {
526 throw $this->getDBException( $e );
527 }
528 }
529
537 // Callers should call JobQueueGroup::push() before this method so that if the
538 // insert fails, the de-duplication registration will be aborted. Since the insert
539 // is deferred till "transaction idle", do the same here, so that the ordering is
540 // maintained. Having only the de-duplication registration succeed would cause
541 // jobs to become no-ops without any actual jobs that made them redundant.
542 $dbw = $this->getPrimaryDB();
543 $dbw->onTransactionCommitOrIdle(
544 function () use ( $job ) {
545 parent::doDeduplicateRootJob( $job );
546 },
547 __METHOD__
548 );
549
550 return true;
551 }
552
557 protected function doDelete() {
558 $dbw = $this->getPrimaryDB();
559 try {
560 $dbw->newDeleteQueryBuilder()
561 ->deleteFrom( 'job' )
562 ->where( [ 'job_cmd' => $this->type ] )
563 ->caller( __METHOD__ )->execute();
564 } catch ( DBError $e ) {
565 throw $this->getDBException( $e );
566 }
567
568 return true;
569 }
570
575 protected function doWaitForBackups() {
576 if ( $this->server ) {
577 return; // not using LBFactory instance
578 }
579
580 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
581 $lbFactory->waitForReplication();
582 }
583
587 protected function doFlushCaches() {
588 foreach ( [ 'size', 'acquiredcount' ] as $type ) {
589 $this->wanCache->delete( $this->getCacheKey( $type ) );
590 }
591 }
592
597 public function getAllQueuedJobs() {
598 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
599 }
600
605 public function getAllAcquiredJobs() {
606 $dbr = $this->getReplicaDB();
607 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), $dbr->expr( 'job_token', '>', '' ) ] );
608 }
609
614 public function getAllAbandonedJobs() {
615 $dbr = $this->getReplicaDB();
616 return $this->getJobIterator( [
617 'job_cmd' => $this->getType(),
618 $dbr->expr( 'job_token', '>', '' ),
619 $dbr->expr( 'job_attempts', '>=', intval( $this->maxTries ) ),
620 ] );
621 }
622
627 protected function getJobIterator( array $conds ) {
628 $dbr = $this->getReplicaDB();
629 $qb = $dbr->newSelectQueryBuilder()
630 ->select( self::selectFields() )
631 ->from( 'job' )
632 ->where( $conds );
633 try {
634 return new MappedIterator(
635 $qb->caller( __METHOD__ )->fetchResultSet(),
636 $this->jobFromRow( ... )
637 );
638 } catch ( DBError $e ) {
639 throw $this->getDBException( $e );
640 }
641 }
642
644 public function getCoalesceLocationInternal() {
645 if ( $this->server ) {
646 return null; // not using the LBFactory instance
647 }
648
649 return is_string( $this->cluster )
650 ? "DBCluster:{$this->cluster}:{$this->domain}"
651 : "LBFactory:{$this->domain}";
652 }
653
655 protected function doGetSiblingQueuesWithJobs( array $types ) {
656 $dbr = $this->getReplicaDB();
657 // @note: this does not check whether the jobs are claimed or not.
658 // This is useful so JobQueueGroup::pop() also sees queues that only
659 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
660 // failed jobs so that they can be popped again for that edge case.
661 return $dbr->newSelectQueryBuilder()
662 ->select( 'job_cmd' )
663 ->distinct()
664 ->from( 'job' )
665 ->where( [ 'job_cmd' => $types ] )
666 ->caller( __METHOD__ )
667 ->fetchFieldValues();
668 }
669
671 protected function doGetSiblingQueueSizes( array $types ) {
672 $dbr = $this->getReplicaDB();
673
674 $res = $dbr->newSelectQueryBuilder()
675 ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
676 ->from( 'job' )
677 ->where( [ 'job_cmd' => $types ] )
678 ->groupBy( 'job_cmd' )
679 ->caller( __METHOD__ )->fetchResultSet();
680
681 $sizes = [];
682 foreach ( $res as $row ) {
683 $sizes[$row->job_cmd] = (int)$row->count;
684 }
685
686 return $sizes;
687 }
688
694 public function recycleAndDeleteStaleJobs() {
695 $now = time();
696 $count = 0; // affected rows
697 $dbw = $this->getPrimaryDB();
698
699 try {
700 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
701 return $count; // already in progress
702 }
703
704 // Remove claims on jobs acquired for too long if enabled...
705 if ( $this->claimTTL > 0 ) {
706 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
707 // Get the IDs of jobs that have be claimed but not finished after too long.
708 // These jobs can be recycled into the queue by expiring the claim. Selecting
709 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
710 $ids = $dbw->newSelectQueryBuilder()
711 ->select( 'job_id' )
712 ->from( 'job' )
713 ->where(
714 [
715 'job_cmd' => $this->type,
716 $dbw->expr( 'job_token', '!=', '' ), // was acquired
717 $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
718 $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
719 ]
720 )
721 ->caller( __METHOD__ )
722 ->fetchFieldValues();
723 if ( $ids ) {
724 // Reset job_token for these jobs so that other runners will pick them up.
725 // Set the timestamp to the current time, as it is useful to now that the job
726 // was already tried before (the timestamp becomes the "released" time).
727 $dbw->newUpdateQueryBuilder()
728 ->update( 'job' )
729 ->set( [
730 'job_token' => '',
731 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
732 ] )
733 ->where( [
734 'job_id' => $ids,
735 $dbw->expr( 'job_token', '!=', '' ),
736 ] )
737 ->caller( __METHOD__ )->execute();
738
739 $affected = $dbw->affectedRows();
740 $count += $affected;
741 $this->incrStats( 'recycles', $this->type, $affected );
742 }
743 }
744
745 // Just destroy any stale jobs...
746 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
747 $qb = $dbw->newSelectQueryBuilder()
748 ->select( 'job_id' )
749 ->from( 'job' )
750 ->where(
751 [
752 'job_cmd' => $this->type,
753 $dbw->expr( 'job_token', '!=', '' ), // was acquired
754 $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
755 ]
756 );
757 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
758 $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
759 }
760 // Get the IDs of jobs that are considered stale and should be removed. Selecting
761 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
762 $ids = $qb->caller( __METHOD__ )->fetchFieldValues();
763 if ( $ids ) {
764 $dbw->newDeleteQueryBuilder()
765 ->deleteFrom( 'job' )
766 ->where( [ 'job_id' => $ids ] )
767 ->caller( __METHOD__ )->execute();
768 $affected = $dbw->affectedRows();
769 $count += $affected;
770 $this->incrStats( 'abandons', $this->type, $affected );
771 }
772
773 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
774 } catch ( DBError $e ) {
775 throw $this->getDBException( $e );
776 }
777
778 return $count;
779 }
780
787 return [
788 // Fields that describe the nature of the job
789 'job_cmd' => $job->getType(),
790 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
791 'job_title' => $job->getParams()['title'] ?? '',
792 'job_params' => self::makeBlob( $job->getParams() ),
793 // Additional job metadata
794 'job_timestamp' => $db->timestamp(),
795 'job_sha1' => \Wikimedia\base_convert(
796 sha1( serialize( $job->getDeduplicationInfo() ) ),
797 16, 36, 31
798 ),
799 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
800 ];
801 }
802
807 protected function getReplicaDB() {
808 try {
809 return $this->getDB( DB_REPLICA );
810 } catch ( DBConnectionError $e ) {
811 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
812 }
813 }
814
820 protected function getPrimaryDB() {
821 try {
822 return $this->getDB( DB_PRIMARY );
823 } catch ( DBConnectionError $e ) {
824 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
825 }
826 }
827
832 protected function getDB( $index ) {
833 if ( $this->server ) {
834 if ( $this->conn instanceof IDatabase ) {
835 return $this->conn;
836 } elseif ( $this->conn instanceof DBError ) {
837 throw $this->conn;
838 }
839
840 try {
841 $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
842 $this->server['type'],
843 $this->server
844 );
845 } catch ( DBError $e ) {
846 $this->conn = $e;
847 throw $e;
848 }
849
850 return $this->conn;
851 } else {
852 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
853 $lb = is_string( $this->cluster )
854 ? $lbFactory->getExternalLB( $this->cluster )
855 : $lbFactory->getMainLB( $this->domain );
856
857 if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !== 'sqlite' ) {
858 // Keep a separate connection to avoid contention and deadlocks;
859 // However, SQLite has the opposite behavior due to DB-level locking.
860 $flags = $lb::CONN_TRX_AUTOCOMMIT;
861 } else {
862 // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
863 $flags = 0;
864 }
865
866 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
867 }
868 }
869
874 private function getCacheKey( $property ) {
875 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
876
877 return $this->wanCache->makeGlobalKey(
878 'jobqueue',
879 $this->domain,
880 $cluster,
881 $this->type,
882 $property
883 );
884 }
885
890 protected static function makeBlob( $params ) {
891 if ( $params !== false ) {
892 return serialize( $params );
893 } else {
894 return '';
895 }
896 }
897
902 protected function jobFromRow( $row ) {
903 $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
904 if ( !is_array( $params ) ) { // this shouldn't happen
905 throw new UnexpectedValueException(
906 "Could not unserialize job with ID '{$row->job_id}'." );
907 }
908
909 $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
910 $job = $this->factoryJob( $row->job_cmd, $params );
911 $job->setMetadata( 'id', $row->job_id );
912 $job->setMetadata( 'timestamp', $row->job_timestamp );
913
914 return $job;
915 }
916
921 protected function getDBException( DBError $e ) {
922 return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
923 }
924
930 public static function selectFields() {
931 return [
932 'job_id',
933 'job_cmd',
934 'job_namespace',
935 'job_title',
936 'job_timestamp',
937 'job_params',
938 'job_random',
939 'job_attempts',
940 'job_token',
941 'job_token_timestamp',
942 'job_sha1',
943 ];
944 }
945}
946
948class_alias( JobQueueDB::class, 'JobQueueDB' );
const NS_SPECIAL
Definition Defines.php:40
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28
Convenience class for generating iterators from iterators.
Database-backed job queue storage.
array null $server
Server configuration array.
IMaintainableDatabase DBError null $conn
doGetSiblingQueueSizes(array $types)
to override JobQueue::getSiblingQueuesSize() array|null (list of queue types) or null if unsupported
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.to override string|null 1....
doGetSiblingQueuesWithJobs(array $types)
to override JobQueue::getSiblingQueuesWithJobs() array|null (list of queue types) or null if unsuppor...
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
optimalOrder()
Get the default queue order to use if configuration does not specify one.string One of (random,...
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
static selectFields()
Return the list of job fields that should be selected.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
supportedOrders()
Get the allowed queue orders for configuration validation.array Subset of (random,...
__construct(array $params)
Additional parameters include:
doBatchPush(array $jobs, $flags)
insertFields(IJobSpecification $job, IReadableDatabase $db)
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doDeduplicateRootJob(IJobSpecification $job)
Base class for queueing and running background jobs from a storage backend.
Definition JobQueue.php:36
incrStats( $event, $type, $delta=1)
Call StatsFactory::incrementBy() for the queue overall and for the queue type.
Definition JobQueue.php:770
string $type
Job type.
Definition JobQueue.php:40
factoryJob( $command, $params)
Definition JobQueue.php:736
Service locator for MediaWiki core services.
static getInstance()
Returns the global default instance of the top level service locator.
Profiler base class that defines the interface and some shared functionality.
Definition Profiler.php:25
Database error base class.
Definition DBError.php:22
Raw SQL value to be used in query builders.
Build SELECT queries with a fluent interface.
Container for accessing information about the database servers in a database cluster.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
Interface to a relational database.
Definition IDatabase.php:31
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
newInsertQueryBuilder()
Get an InsertQueryBuilder bound to this connection.
Advanced database interface for IDatabase handles that include maintenance methods.
A database connection without write operations.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
if(count( $args)< 1) $job