MediaWiki master
JobQueueDB.php
Go to the documentation of this file.
1<?php
26use Wikimedia\ScopedCallback;
27
34class JobQueueDB extends JobQueue {
35 /* seconds to cache info without re-validating */
36 private const CACHE_TTL_SHORT = 30;
37 /* seconds a job can live once claimed */
38 private const MAX_AGE_PRUNE = 7 * 24 * 3600;
43 private const MAX_JOB_RANDOM = 2_147_483_647;
44 /* maximum number of rows to skip */
45 private const MAX_OFFSET = 255;
46
48 protected $conn;
49
51 protected $server;
53 protected $cluster;
54
64 protected function __construct( array $params ) {
65 parent::__construct( $params );
66
67 if ( isset( $params['server'] ) ) {
68 $this->server = $params['server'];
69 } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
70 $this->cluster = $params['cluster'];
71 }
72 }
73
74 protected function supportedOrders() {
75 return [ 'random', 'timestamp', 'fifo' ];
76 }
77
78 protected function optimalOrder() {
79 return 'random';
80 }
81
86 protected function doIsEmpty() {
87 $dbr = $this->getReplicaDB();
89 $scope = $this->getScopedNoTrxFlag( $dbr );
90 try {
91 // unclaimed job
92 $found = (bool)$dbr->newSelectQueryBuilder()
93 ->select( '1' )
94 ->from( 'job' )
95 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
96 ->caller( __METHOD__ )->fetchField();
97 } catch ( DBError $e ) {
98 throw $this->getDBException( $e );
99 }
100
101 return !$found;
102 }
103
108 protected function doGetSize() {
109 $key = $this->getCacheKey( 'size' );
110
111 $size = $this->wanCache->get( $key );
112 if ( is_int( $size ) ) {
113 return $size;
114 }
115
116 $dbr = $this->getReplicaDB();
118 $scope = $this->getScopedNoTrxFlag( $dbr );
119 try {
120 $size = $dbr->newSelectQueryBuilder()
121 ->from( 'job' )
122 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
123 ->caller( __METHOD__ )->fetchRowCount();
124 } catch ( DBError $e ) {
125 throw $this->getDBException( $e );
126 }
127 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
128
129 return $size;
130 }
131
136 protected function doGetAcquiredCount() {
137 if ( $this->claimTTL <= 0 ) {
138 return 0; // no acknowledgements
139 }
140
141 $key = $this->getCacheKey( 'acquiredcount' );
142
143 $count = $this->wanCache->get( $key );
144 if ( is_int( $count ) ) {
145 return $count;
146 }
147
148 $dbr = $this->getReplicaDB();
150 $scope = $this->getScopedNoTrxFlag( $dbr );
151 try {
152 $count = $dbr->newSelectQueryBuilder()
153 ->from( 'job' )
154 ->where( [
155 'job_cmd' => $this->type,
156 $dbr->expr( 'job_token', '!=', '' ),
157 ] )
158 ->caller( __METHOD__ )->fetchRowCount();
159 } catch ( DBError $e ) {
160 throw $this->getDBException( $e );
161 }
162 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
163
164 return $count;
165 }
166
173 protected function doGetAbandonedCount() {
174 if ( $this->claimTTL <= 0 ) {
175 return 0; // no acknowledgements
176 }
177
178 $key = $this->getCacheKey( 'abandonedcount' );
179
180 $count = $this->wanCache->get( $key );
181 if ( is_int( $count ) ) {
182 return $count;
183 }
184
185 $dbr = $this->getReplicaDB();
187 $scope = $this->getScopedNoTrxFlag( $dbr );
188 try {
189 $count = $dbr->newSelectQueryBuilder()
190 ->from( 'job' )
191 ->where(
192 [
193 'job_cmd' => $this->type,
194 $dbr->expr( 'job_token', '!=', '' ),
195 $dbr->expr( 'job_attempts', '>=', $this->maxTries ),
196 ]
197 )
198 ->caller( __METHOD__ )->fetchRowCount();
199 } catch ( DBError $e ) {
200 throw $this->getDBException( $e );
201 }
202
203 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
204
205 return $count;
206 }
207
215 protected function doBatchPush( array $jobs, $flags ) {
216 $dbw = $this->getPrimaryDB();
218 $scope = $this->getScopedNoTrxFlag( $dbw );
219 // In general, there will be two cases here:
220 // a) sqlite; DB connection is probably a regular round-aware handle.
221 // If the connection is busy with a transaction, then defer the job writes
222 // until right before the main round commit step. Any errors that bubble
223 // up will rollback the main commit round.
224 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
225 // No transaction is active nor will be started by writes, so enqueue the jobs
226 // now so that any errors will show up immediately as the interface expects. Any
227 // errors that bubble up will rollback the main commit round.
228 $fname = __METHOD__;
229 $dbw->onTransactionPreCommitOrIdle(
230 function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
231 $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
232 },
233 $fname
234 );
235 }
236
248 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
249 if ( $jobs === [] ) {
250 return;
251 }
252
253 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
254 $rowList = []; // list of jobs for jobs that are not de-duplicated
255 foreach ( $jobs as $job ) {
256 $row = $this->insertFields( $job, $dbw );
257 if ( $job->ignoreDuplicates() ) {
258 $rowSet[$row['job_sha1']] = $row;
259 } else {
260 $rowList[] = $row;
261 }
262 }
263
264 if ( $flags & self::QOS_ATOMIC ) {
265 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
266 }
267 try {
268 // Strip out any duplicate jobs that are already in the queue...
269 if ( count( $rowSet ) ) {
270 $res = $dbw->newSelectQueryBuilder()
271 ->select( 'job_sha1' )
272 ->from( 'job' )
273 ->where(
274 [
275 // No job_type condition since it's part of the job_sha1 hash
276 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
277 'job_token' => '' // unclaimed
278 ]
279 )
280 ->caller( $method )->fetchResultSet();
281 foreach ( $res as $row ) {
282 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
283 unset( $rowSet[$row->job_sha1] ); // already enqueued
284 }
285 }
286 // Build the full list of job rows to insert
287 $rows = array_merge( $rowList, array_values( $rowSet ) );
288 // Insert the job rows in chunks to avoid replica DB lag...
289 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
291 ->insertInto( 'job' )
292 ->rows( $rowBatch )
293 ->caller( $method )->execute();
294 }
295 $this->incrStats( 'inserts', $this->type, count( $rows ) );
296 $this->incrStats( 'dupe_inserts', $this->type,
297 count( $rowSet ) + count( $rowList ) - count( $rows )
298 );
299 } catch ( DBError $e ) {
300 throw $this->getDBException( $e );
301 }
302 if ( $flags & self::QOS_ATOMIC ) {
303 $dbw->endAtomic( $method );
304 }
305 }
306
311 protected function doPop() {
312 $dbw = $this->getPrimaryDB();
314 $scope = $this->getScopedNoTrxFlag( $dbw );
315
316 $job = false; // job popped off
317 try {
318 $uuid = wfRandomString( 32 ); // pop attempt
319 do { // retry when our row is invalid or deleted as a duplicate
320 // Try to reserve a row in the DB...
321 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
322 $row = $this->claimOldest( $uuid );
323 } else { // random first
324 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
325 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
326 $row = $this->claimRandom( $uuid, $rand, $gte );
327 }
328 // Check if we found a row to reserve...
329 if ( !$row ) {
330 break; // nothing to do
331 }
332 $this->incrStats( 'pops', $this->type );
333
334 // Get the job object from the row...
335 $job = $this->jobFromRow( $row );
336 break; // done
337 } while ( true );
338
339 if ( !$job || mt_rand( 0, 9 ) == 0 ) {
340 // Handled jobs that need to be recycled/deleted;
341 // any recycled jobs will be picked up next attempt
343 }
344 } catch ( DBError $e ) {
345 throw $this->getDBException( $e );
346 }
347
348 return $job;
349 }
350
359 protected function claimRandom( $uuid, $rand, $gte ) {
360 $dbw = $this->getPrimaryDB();
362 $scope = $this->getScopedNoTrxFlag( $dbw );
363 // Check cache to see if the queue has <= OFFSET items
364 $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
365
366 $invertedDirection = false; // whether one job_random direction was already scanned
367 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
368 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
369 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
370 // be used here with MySQL.
371 do {
372 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
373 // For small queues, using OFFSET will overshoot and return no rows more often.
374 // Instead, this uses job_random to pick a row (possibly checking both directions).
375 $row = $dbw->newSelectQueryBuilder()
376 ->select( self::selectFields() )
377 ->from( 'job' )
378 ->where(
379 [
380 'job_cmd' => $this->type,
381 'job_token' => '', // unclaimed
382 $dbw->expr( 'job_random', $gte ? '>=' : '<=', $rand )
383 ]
384 )
385 ->orderBy(
386 'job_random',
387 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
388 )
389 ->caller( __METHOD__ )->fetchRow();
390 if ( !$row && !$invertedDirection ) {
391 $gte = !$gte;
392 $invertedDirection = true;
393 continue; // try the other direction
394 }
395 } else { // table *may* have >= MAX_OFFSET rows
396 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
397 // in MySQL if there are many rows for some reason. This uses a small OFFSET
398 // instead of job_random for reducing excess claim retries.
399 $row = $dbw->newSelectQueryBuilder()
400 ->select( self::selectFields() )
401 ->from( 'job' )
402 ->where(
403 [
404 'job_cmd' => $this->type,
405 'job_token' => '', // unclaimed
406 ]
407 )
408 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
409 ->caller( __METHOD__ )->fetchRow();
410 if ( !$row ) {
411 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
412 $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
413 continue; // use job_random
414 }
415 }
416
417 if ( !$row ) {
418 break;
419 }
420
421 $dbw->newUpdateQueryBuilder()
422 ->update( 'job' ) // update by PK
423 ->set( [
424 'job_token' => $uuid,
425 'job_token_timestamp' => $dbw->timestamp(),
426 'job_attempts = job_attempts+1'
427 ] )
428 ->where( [
429 'job_cmd' => $this->type,
430 'job_id' => $row->job_id,
431 'job_token' => ''
432 ] )
433 ->caller( __METHOD__ )->execute();
434
435 // This might get raced out by another runner when claiming the previously
436 // selected row. The use of job_random should minimize this problem, however.
437 if ( !$dbw->affectedRows() ) {
438 $row = false; // raced out
439 }
440 } while ( !$row );
441
442 return $row;
443 }
444
451 protected function claimOldest( $uuid ) {
452 $dbw = $this->getPrimaryDB();
454 $scope = $this->getScopedNoTrxFlag( $dbw );
455
456 $row = false; // the row acquired
457 do {
458 if ( $dbw->getType() === 'mysql' ) {
459 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
460 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
461 // Postgres has no such limitation. However, MySQL offers an
462 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
463 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
464 "SET " .
465 "job_token = {$dbw->addQuotes( $uuid ) }, " .
466 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
467 "job_attempts = job_attempts+1 " .
468 "WHERE ( " .
469 "job_cmd = {$dbw->addQuotes( $this->type )} " .
470 "AND job_token = {$dbw->addQuotes( '' )} " .
471 ") ORDER BY job_id ASC LIMIT 1",
472 __METHOD__
473 );
474 } else {
475 // Use a subquery to find the job, within an UPDATE to claim it.
476 // This uses as much of the DB wrapper functions as possible.
477 $qb = $dbw->newSelectQueryBuilder()
478 ->select( 'job_id' )
479 ->from( 'job' )
480 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
481 ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
482 ->limit( 1 );
483
484 $dbw->newUpdateQueryBuilder()
485 ->update( 'job' )
486 ->set( [
487 'job_token' => $uuid,
488 'job_token_timestamp' => $dbw->timestamp(),
489 'job_attempts = job_attempts+1'
490 ] )
491 ->where( [ 'job_id = (' . $qb->getSQL() . ')' ] )
492 ->caller( __METHOD__ )->execute();
493 }
494
495 if ( !$dbw->affectedRows() ) {
496 break;
497 }
498
499 // Fetch any row that we just reserved...
500 $row = $dbw->newSelectQueryBuilder()
501 ->select( self::selectFields() )
502 ->from( 'job' )
503 ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
504 ->caller( __METHOD__ )->fetchRow();
505 if ( !$row ) { // raced out by duplicate job removal
506 wfDebug( "Row deleted as duplicate by another process." );
507 }
508 } while ( !$row );
509
510 return $row;
511 }
512
519 protected function doAck( RunnableJob $job ) {
520 $id = $job->getMetadata( 'id' );
521 if ( $id === null ) {
522 throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no ID." );
523 }
524
525 $dbw = $this->getPrimaryDB();
527 $scope = $this->getScopedNoTrxFlag( $dbw );
528 try {
529 // Delete a row with a single DELETE without holding row locks over RTTs...
530 $dbw->newDeleteQueryBuilder()
531 ->deleteFrom( 'job' )
532 ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
533 ->caller( __METHOD__ )->execute();
534
535 $this->incrStats( 'acks', $this->type );
536 } catch ( DBError $e ) {
537 throw $this->getDBException( $e );
538 }
539 }
540
548 // Callers should call JobQueueGroup::push() before this method so that if the
549 // insert fails, the de-duplication registration will be aborted. Since the insert
550 // is deferred till "transaction idle", do the same here, so that the ordering is
551 // maintained. Having only the de-duplication registration succeed would cause
552 // jobs to become no-ops without any actual jobs that made them redundant.
553 $dbw = $this->getPrimaryDB();
555 $scope = $this->getScopedNoTrxFlag( $dbw );
556 $dbw->onTransactionCommitOrIdle(
557 function () use ( $job ) {
558 parent::doDeduplicateRootJob( $job );
559 },
560 __METHOD__
561 );
562
563 return true;
564 }
565
570 protected function doDelete() {
571 $dbw = $this->getPrimaryDB();
573 $scope = $this->getScopedNoTrxFlag( $dbw );
574 try {
575 $dbw->newDeleteQueryBuilder()
576 ->deleteFrom( 'job' )
577 ->where( [ 'job_cmd' => $this->type ] )
578 ->caller( __METHOD__ )->execute();
579 } catch ( DBError $e ) {
580 throw $this->getDBException( $e );
581 }
582
583 return true;
584 }
585
590 protected function doWaitForBackups() {
591 if ( $this->server ) {
592 return; // not using LBFactory instance
593 }
594
595 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
596 $lbFactory->waitForReplication();
597 }
598
602 protected function doFlushCaches() {
603 foreach ( [ 'size', 'acquiredcount' ] as $type ) {
604 $this->wanCache->delete( $this->getCacheKey( $type ) );
605 }
606 }
607
612 public function getAllQueuedJobs() {
613 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
614 }
615
620 public function getAllAcquiredJobs() {
621 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
622 }
623
628 public function getAllAbandonedJobs() {
629 return $this->getJobIterator( [
630 'job_cmd' => $this->getType(),
631 "job_token > ''",
632 "job_attempts >= " . intval( $this->maxTries )
633 ] );
634 }
635
640 protected function getJobIterator( array $conds ) {
641 $dbr = $this->getReplicaDB();
643 $scope = $this->getScopedNoTrxFlag( $dbr );
644 $qb = $dbr->newSelectQueryBuilder()
645 ->select( self::selectFields() )
646 ->from( 'job' )
647 ->where( $conds );
648 try {
649 return new MappedIterator(
650 $qb->caller( __METHOD__ )->fetchResultSet(),
651 function ( $row ) {
652 return $this->jobFromRow( $row );
653 }
654 );
655 } catch ( DBError $e ) {
656 throw $this->getDBException( $e );
657 }
658 }
659
660 public function getCoalesceLocationInternal() {
661 if ( $this->server ) {
662 return null; // not using the LBFactory instance
663 }
664
665 return is_string( $this->cluster )
666 ? "DBCluster:{$this->cluster}:{$this->domain}"
667 : "LBFactory:{$this->domain}";
668 }
669
670 protected function doGetSiblingQueuesWithJobs( array $types ) {
671 $dbr = $this->getReplicaDB();
673 $scope = $this->getScopedNoTrxFlag( $dbr );
674 // @note: this does not check whether the jobs are claimed or not.
675 // This is useful so JobQueueGroup::pop() also sees queues that only
676 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
677 // failed jobs so that they can be popped again for that edge case.
678 $res = $dbr->newSelectQueryBuilder()
679 ->select( 'job_cmd' )
680 ->distinct()
681 ->from( 'job' )
682 ->where( [ 'job_cmd' => $types ] )
683 ->caller( __METHOD__ )->fetchResultSet();
684
685 $types = [];
686 foreach ( $res as $row ) {
687 $types[] = $row->job_cmd;
688 }
689
690 return $types;
691 }
692
693 protected function doGetSiblingQueueSizes( array $types ) {
694 $dbr = $this->getReplicaDB();
696 $scope = $this->getScopedNoTrxFlag( $dbr );
697
698 $res = $dbr->newSelectQueryBuilder()
699 ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
700 ->from( 'job' )
701 ->where( [ 'job_cmd' => $types ] )
702 ->groupBy( 'job_cmd' )
703 ->caller( __METHOD__ )->fetchResultSet();
704
705 $sizes = [];
706 foreach ( $res as $row ) {
707 $sizes[$row->job_cmd] = (int)$row->count;
708 }
709
710 return $sizes;
711 }
712
718 public function recycleAndDeleteStaleJobs() {
719 $now = time();
720 $count = 0; // affected rows
721 $dbw = $this->getPrimaryDB();
723 $scope = $this->getScopedNoTrxFlag( $dbw );
724
725 try {
726 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
727 return $count; // already in progress
728 }
729
730 // Remove claims on jobs acquired for too long if enabled...
731 if ( $this->claimTTL > 0 ) {
732 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
733 // Get the IDs of jobs that have be claimed but not finished after too long.
734 // These jobs can be recycled into the queue by expiring the claim. Selecting
735 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
736 $res = $dbw->newSelectQueryBuilder()
737 ->select( 'job_id' )
738 ->from( 'job' )
739 ->where(
740 [
741 'job_cmd' => $this->type,
742 $dbw->expr( 'job_token', '!=', '' ), // was acquired
743 $dbw->expr( 'job_token_timestamp', '<', $claimCutoff ), // stale
744 $dbw->expr( 'job_attempts', '<', $this->maxTries ), // retries left
745 ]
746 )
747 ->caller( __METHOD__ )->fetchResultSet();
748 $ids = array_map(
749 static function ( $o ) {
750 return $o->job_id;
751 }, iterator_to_array( $res )
752 );
753 if ( count( $ids ) ) {
754 // Reset job_token for these jobs so that other runners will pick them up.
755 // Set the timestamp to the current time, as it is useful to now that the job
756 // was already tried before (the timestamp becomes the "released" time).
757 $dbw->newUpdateQueryBuilder()
758 ->update( 'job' )
759 ->set( [
760 'job_token' => '',
761 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
762 ] )
763 ->where( [
764 'job_id' => $ids,
765 $dbw->expr( 'job_token', '!=', '' ),
766 ] )
767 ->caller( __METHOD__ )->execute();
768
769 $affected = $dbw->affectedRows();
770 $count += $affected;
771 $this->incrStats( 'recycles', $this->type, $affected );
772 }
773 }
774
775 // Just destroy any stale jobs...
776 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
777 $qb = $dbw->newSelectQueryBuilder()
778 ->select( 'job_id' )
779 ->from( 'job' )
780 ->where(
781 [
782 'job_cmd' => $this->type,
783 $dbw->expr( 'job_token', '!=', '' ), // was acquired
784 $dbw->expr( 'job_token_timestamp', '<', $pruneCutoff ) // stale
785 ]
786 );
787 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
788 $qb->andWhere( $dbw->expr( 'job_attempts', '>=', $this->maxTries ) );
789 }
790 // Get the IDs of jobs that are considered stale and should be removed. Selecting
791 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
792 $res = $qb->caller( __METHOD__ )->fetchResultSet();
793 $ids = array_map(
794 static function ( $o ) {
795 return $o->job_id;
796 }, iterator_to_array( $res )
797 );
798 if ( count( $ids ) ) {
799 $dbw->newDeleteQueryBuilder()
800 ->deleteFrom( 'job' )
801 ->where( [ 'job_id' => $ids ] )
802 ->caller( __METHOD__ )->execute();
803 $affected = $dbw->affectedRows();
804 $count += $affected;
805 $this->incrStats( 'abandons', $this->type, $affected );
806 }
807
808 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
809 } catch ( DBError $e ) {
810 throw $this->getDBException( $e );
811 }
812
813 return $count;
814 }
815
821 protected function insertFields( IJobSpecification $job, IDatabase $db ) {
822 return [
823 // Fields that describe the nature of the job
824 'job_cmd' => $job->getType(),
825 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
826 'job_title' => $job->getParams()['title'] ?? '',
827 'job_params' => self::makeBlob( $job->getParams() ),
828 // Additional job metadata
829 'job_timestamp' => $db->timestamp(),
830 'job_sha1' => Wikimedia\base_convert(
831 sha1( serialize( $job->getDeduplicationInfo() ) ),
832 16, 36, 31
833 ),
834 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
835 ];
836 }
837
842 protected function getReplicaDB() {
843 try {
844 return $this->getDB( DB_REPLICA );
845 } catch ( DBConnectionError $e ) {
846 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
847 }
848 }
849
855 protected function getPrimaryDB() {
856 try {
857 return $this->getDB( DB_PRIMARY );
858 } catch ( DBConnectionError $e ) {
859 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
860 }
861 }
862
867 protected function getDB( $index ) {
868 if ( $this->server ) {
869 if ( $this->conn instanceof IDatabase ) {
870 return $this->conn;
871 } elseif ( $this->conn instanceof DBError ) {
872 throw $this->conn;
873 }
874
875 try {
876 $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
877 $this->server['type'],
878 $this->server
879 );
880 } catch ( DBError $e ) {
881 $this->conn = $e;
882 throw $e;
883 }
884
885 return $this->conn;
886 } else {
887 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
888 $lb = is_string( $this->cluster )
889 ? $lbFactory->getExternalLB( $this->cluster )
890 : $lbFactory->getMainLB( $this->domain );
891
892 if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
893 // Keep a separate connection to avoid contention and deadlocks;
894 // However, SQLite has the opposite behavior due to DB-level locking.
895 $flags = $lb::CONN_TRX_AUTOCOMMIT;
896 } else {
897 // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
898 $flags = 0;
899 }
900
901 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
902 }
903 }
904
909 private function getScopedNoTrxFlag( IDatabase $db ) {
910 $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
911 $db->clearFlag( DBO_TRX ); // make each query its own transaction
912
913 return new ScopedCallback( static function () use ( $db, $autoTrx ) {
914 if ( $autoTrx ) {
915 $db->setFlag( DBO_TRX ); // restore old setting
916 }
917 } );
918 }
919
924 private function getCacheKey( $property ) {
925 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
926
927 return $this->wanCache->makeGlobalKey(
928 'jobqueue',
929 $this->domain,
930 $cluster,
931 $this->type,
932 $property
933 );
934 }
935
940 protected static function makeBlob( $params ) {
941 if ( $params !== false ) {
942 return serialize( $params );
943 } else {
944 return '';
945 }
946 }
947
952 protected function jobFromRow( $row ) {
953 $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
954 if ( !is_array( $params ) ) { // this shouldn't happen
955 throw new UnexpectedValueException(
956 "Could not unserialize job with ID '{$row->job_id}'." );
957 }
958
959 $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
960 $job = $this->factoryJob( $row->job_cmd, $params );
961 $job->setMetadata( 'id', $row->job_id );
962 $job->setMetadata( 'timestamp', $row->job_timestamp );
963
964 return $job;
965 }
966
971 protected function getDBException( DBError $e ) {
972 return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
973 }
974
980 public static function selectFields() {
981 return [
982 'job_id',
983 'job_cmd',
984 'job_namespace',
985 'job_title',
986 'job_timestamp',
987 'job_params',
988 'job_random',
989 'job_attempts',
990 'job_token',
991 'job_token_timestamp',
992 'job_sha1',
993 ];
994 }
995}
getDB()
const NS_SPECIAL
Definition Defines.php:53
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
array $params
The job parameters.
getCacheKey()
Get the cache key used to store status.
Database-backed job queue storage.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doAck(RunnableJob $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
doGetSiblingQueueSizes(array $types)
insertFields(IJobSpecification $job, IDatabase $db)
__construct(array $params)
Additional parameters include:
getDBException(DBError $e)
getAllAbandonedJobs()
jobFromRow( $row)
doBatchPush(array $jobs, $flags)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doGetAbandonedCount()
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDB( $index)
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
IMaintainableDatabase DBError null $conn
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
getJobIterator(array $conds)
array null $server
Server configuration array.
Base class for queueing and running background jobs from a storage backend.
Definition JobQueue.php:43
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition JobQueue.php:777
string $type
Job type.
Definition JobQueue.php:47
factoryJob( $command, $params)
Definition JobQueue.php:743
Convenience class for generating iterators from iterators.
Service locator for MediaWiki core services.
Database error base class.
Definition DBError.php:36
Build SELECT queries with a fluent interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
newInsertQueryBuilder()
Get an InsertQueryBuilder bound to this connection.
Advanced database interface for IDatabase handles that include maintenance methods.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28
const DBO_TRX
Definition defines.php:12
if(count( $args)< 1) $job