MediaWiki REL1_40
JobQueueDB.php
Go to the documentation of this file.
1<?php
28use Wikimedia\ScopedCallback;
29
36class JobQueueDB extends JobQueue {
37 private const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
38 private const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
39 private const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
40 private const MAX_OFFSET = 255; // integer; maximum number of rows to skip
41
43 protected $conn;
44
46 protected $server;
48 protected $cluster;
49
59 protected function __construct( array $params ) {
60 parent::__construct( $params );
61
62 if ( isset( $params['server'] ) ) {
63 $this->server = $params['server'];
64 } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
65 $this->cluster = $params['cluster'];
66 }
67 }
68
69 protected function supportedOrders() {
70 return [ 'random', 'timestamp', 'fifo' ];
71 }
72
73 protected function optimalOrder() {
74 return 'random';
75 }
76
81 protected function doIsEmpty() {
82 $dbr = $this->getReplicaDB();
84 $scope = $this->getScopedNoTrxFlag( $dbr );
85 try {
86 // unclaimed job
87 $found = (bool)$dbr->newSelectQueryBuilder()
88 ->select( '1' )
89 ->from( 'job' )
90 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
91 ->caller( __METHOD__ )->fetchField();
92 } catch ( DBError $e ) {
93 throw $this->getDBException( $e );
94 }
95
96 return !$found;
97 }
98
103 protected function doGetSize() {
104 $key = $this->getCacheKey( 'size' );
105
106 $size = $this->wanCache->get( $key );
107 if ( is_int( $size ) ) {
108 return $size;
109 }
110
111 $dbr = $this->getReplicaDB();
113 $scope = $this->getScopedNoTrxFlag( $dbr );
114 try {
115 $size = $dbr->newSelectQueryBuilder()
116 ->from( 'job' )
117 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
118 ->caller( __METHOD__ )->fetchRowCount();
119 } catch ( DBError $e ) {
120 throw $this->getDBException( $e );
121 }
122 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
123
124 return $size;
125 }
126
131 protected function doGetAcquiredCount() {
132 if ( $this->claimTTL <= 0 ) {
133 return 0; // no acknowledgements
134 }
135
136 $key = $this->getCacheKey( 'acquiredcount' );
137
138 $count = $this->wanCache->get( $key );
139 if ( is_int( $count ) ) {
140 return $count;
141 }
142
143 $dbr = $this->getReplicaDB();
145 $scope = $this->getScopedNoTrxFlag( $dbr );
146 try {
147 $count = $dbr->newSelectQueryBuilder()
148 ->from( 'job' )
149 ->where( [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ] )
150 ->caller( __METHOD__ )->fetchRowCount();
151 } catch ( DBError $e ) {
152 throw $this->getDBException( $e );
153 }
154 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
155
156 return $count;
157 }
158
164 protected function doGetAbandonedCount() {
165 if ( $this->claimTTL <= 0 ) {
166 return 0; // no acknowledgements
167 }
168
169 $key = $this->getCacheKey( 'abandonedcount' );
170
171 $count = $this->wanCache->get( $key );
172 if ( is_int( $count ) ) {
173 return $count;
174 }
175
176 $dbr = $this->getReplicaDB();
178 $scope = $this->getScopedNoTrxFlag( $dbr );
179 try {
180 $count = $dbr->newSelectQueryBuilder()
181 ->from( 'job' )
182 ->where(
183 [
184 'job_cmd' => $this->type,
185 "job_token != {$dbr->addQuotes( '' )}",
186 $dbr->buildComparison( '>=', [ 'job_attempts' => $this->maxTries ] ),
187 ]
188 )
189 ->caller( __METHOD__ )->fetchRowCount();
190 } catch ( DBError $e ) {
191 throw $this->getDBException( $e );
192 }
193
194 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
195
196 return $count;
197 }
198
206 protected function doBatchPush( array $jobs, $flags ) {
207 $dbw = $this->getPrimaryDB();
209 $scope = $this->getScopedNoTrxFlag( $dbw );
210 // In general, there will be two cases here:
211 // a) sqlite; DB connection is probably a regular round-aware handle.
212 // If the connection is busy with a transaction, then defer the job writes
213 // until right before the main round commit step. Any errors that bubble
214 // up will rollback the main commit round.
215 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
216 // No transaction is active nor will be started by writes, so enqueue the jobs
217 // now so that any errors will show up immediately as the interface expects. Any
218 // errors that bubble up will rollback the main commit round.
219 $fname = __METHOD__;
220 $dbw->onTransactionPreCommitOrIdle(
221 function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
222 $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
223 },
224 $fname
225 );
226 }
227
239 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
240 if ( $jobs === [] ) {
241 return;
242 }
243
244 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
245 $rowList = []; // list of jobs for jobs that are not de-duplicated
246 foreach ( $jobs as $job ) {
247 $row = $this->insertFields( $job, $dbw );
248 if ( $job->ignoreDuplicates() ) {
249 $rowSet[$row['job_sha1']] = $row;
250 } else {
251 $rowList[] = $row;
252 }
253 }
254
255 if ( $flags & self::QOS_ATOMIC ) {
256 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
257 }
258 try {
259 // Strip out any duplicate jobs that are already in the queue...
260 if ( count( $rowSet ) ) {
262 ->select( 'job_sha1' )
263 ->from( 'job' )
264 ->where(
265 [
266 // No job_type condition since it's part of the job_sha1 hash
267 'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
268 'job_token' => '' // unclaimed
269 ]
270 )
271 ->caller( $method )->fetchResultSet();
272 foreach ( $res as $row ) {
273 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
274 unset( $rowSet[$row->job_sha1] ); // already enqueued
275 }
276 }
277 // Build the full list of job rows to insert
278 $rows = array_merge( $rowList, array_values( $rowSet ) );
279 // Insert the job rows in chunks to avoid replica DB lag...
280 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
281 $dbw->insert( 'job', $rowBatch, $method );
282 }
283 $this->incrStats( 'inserts', $this->type, count( $rows ) );
284 $this->incrStats( 'dupe_inserts', $this->type,
285 count( $rowSet ) + count( $rowList ) - count( $rows )
286 );
287 } catch ( DBError $e ) {
288 throw $this->getDBException( $e );
289 }
290 if ( $flags & self::QOS_ATOMIC ) {
291 $dbw->endAtomic( $method );
292 }
293 }
294
299 protected function doPop() {
300 $dbw = $this->getPrimaryDB();
302 $scope = $this->getScopedNoTrxFlag( $dbw );
303
304 $job = false; // job popped off
305 try {
306 $uuid = wfRandomString( 32 ); // pop attempt
307 do { // retry when our row is invalid or deleted as a duplicate
308 // Try to reserve a row in the DB...
309 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
310 $row = $this->claimOldest( $uuid );
311 } else { // random first
312 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
313 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
314 $row = $this->claimRandom( $uuid, $rand, $gte );
315 }
316 // Check if we found a row to reserve...
317 if ( !$row ) {
318 break; // nothing to do
319 }
320 $this->incrStats( 'pops', $this->type );
321
322 // Get the job object from the row...
323 $job = $this->jobFromRow( $row );
324 break; // done
325 } while ( true );
326
327 if ( !$job || mt_rand( 0, 9 ) == 0 ) {
328 // Handled jobs that need to be recycled/deleted;
329 // any recycled jobs will be picked up next attempt
331 }
332 } catch ( DBError $e ) {
333 throw $this->getDBException( $e );
334 }
335
336 return $job;
337 }
338
347 protected function claimRandom( $uuid, $rand, $gte ) {
348 $dbw = $this->getPrimaryDB();
350 $scope = $this->getScopedNoTrxFlag( $dbw );
351 // Check cache to see if the queue has <= OFFSET items
352 $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
353
354 $invertedDirection = false; // whether one job_random direction was already scanned
355 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
356 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
357 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
358 // be used here with MySQL.
359 do {
360 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
361 // For small queues, using OFFSET will overshoot and return no rows more often.
362 // Instead, this uses job_random to pick a row (possibly checking both directions).
363 $row = $dbw->newSelectQueryBuilder()
364 ->select( self::selectFields() )
365 ->from( 'job' )
366 ->where(
367 [
368 'job_cmd' => $this->type,
369 'job_token' => '', // unclaimed
370 $dbw->buildComparison( $gte ? '>=' : '<=', [ 'job_random' => $rand ] )
371 ]
372 )
373 ->orderBy(
374 'job_random',
375 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
376 )
377 ->caller( __METHOD__ )->fetchRow();
378 if ( !$row && !$invertedDirection ) {
379 $gte = !$gte;
380 $invertedDirection = true;
381 continue; // try the other direction
382 }
383 } else { // table *may* have >= MAX_OFFSET rows
384 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
385 // in MySQL if there are many rows for some reason. This uses a small OFFSET
386 // instead of job_random for reducing excess claim retries.
387 $row = $dbw->newSelectQueryBuilder()
388 ->select( self::selectFields() )
389 ->from( 'job' )
390 ->where(
391 [
392 'job_cmd' => $this->type,
393 'job_token' => '', // unclaimed
394 ]
395 )
396 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
397 ->caller( __METHOD__ )->fetchRow();
398 if ( !$row ) {
399 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
400 $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
401 continue; // use job_random
402 }
403 }
404
405 if ( !$row ) {
406 break;
407 }
408
409 $dbw->update( 'job', // update by PK
410 [
411 'job_token' => $uuid,
412 'job_token_timestamp' => $dbw->timestamp(),
413 'job_attempts = job_attempts+1' ],
414 [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
415 __METHOD__
416 );
417 // This might get raced out by another runner when claiming the previously
418 // selected row. The use of job_random should minimize this problem, however.
419 if ( !$dbw->affectedRows() ) {
420 $row = false; // raced out
421 }
422 } while ( !$row );
423
424 return $row;
425 }
426
433 protected function claimOldest( $uuid ) {
434 $dbw = $this->getPrimaryDB();
436 $scope = $this->getScopedNoTrxFlag( $dbw );
437
438 $row = false; // the row acquired
439 do {
440 if ( $dbw->getType() === 'mysql' ) {
441 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
442 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
443 // Postgres has no such limitation. However, MySQL offers an
444 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
445 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
446 "SET " .
447 "job_token = {$dbw->addQuotes( $uuid ) }, " .
448 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
449 "job_attempts = job_attempts+1 " .
450 "WHERE ( " .
451 "job_cmd = {$dbw->addQuotes( $this->type )} " .
452 "AND job_token = {$dbw->addQuotes( '' )} " .
453 ") ORDER BY job_id ASC LIMIT 1",
454 __METHOD__
455 );
456 } else {
457 // Use a subquery to find the job, within an UPDATE to claim it.
458 // This uses as much of the DB wrapper functions as possible.
459 $qb = $dbw->newSelectQueryBuilder()
460 ->select( 'job_id' )
461 ->from( 'job' )
462 ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
463 ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
464 ->limit( 1 );
465
466 $dbw->update( 'job',
467 [
468 'job_token' => $uuid,
469 'job_token_timestamp' => $dbw->timestamp(),
470 'job_attempts = job_attempts+1' ],
471 [ 'job_id = (' . $qb->getSQL() . ')'
472 ],
473 __METHOD__
474 );
475 }
476
477 if ( !$dbw->affectedRows() ) {
478 break;
479 }
480
481 // Fetch any row that we just reserved...
482 $row = $dbw->newSelectQueryBuilder()
483 ->select( self::selectFields() )
484 ->from( 'job' )
485 ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
486 ->caller( __METHOD__ )->fetchRow();
487 if ( !$row ) { // raced out by duplicate job removal
488 wfDebug( "Row deleted as duplicate by another process." );
489 }
490 } while ( !$row );
491
492 return $row;
493 }
494
500 protected function doAck( RunnableJob $job ) {
501 $id = $job->getMetadata( 'id' );
502 if ( $id === null ) {
503 throw new MWException( "Job of type '{$job->getType()}' has no ID." );
504 }
505
506 $dbw = $this->getPrimaryDB();
508 $scope = $this->getScopedNoTrxFlag( $dbw );
509 try {
510 // Delete a row with a single DELETE without holding row locks over RTTs...
511 $dbw->delete(
512 'job',
513 [ 'job_cmd' => $this->type, 'job_id' => $id ],
514 __METHOD__
515 );
516
517 $this->incrStats( 'acks', $this->type );
518 } catch ( DBError $e ) {
519 throw $this->getDBException( $e );
520 }
521 }
522
530 // Callers should call JobQueueGroup::push() before this method so that if the
531 // insert fails, the de-duplication registration will be aborted. Since the insert
532 // is deferred till "transaction idle", do the same here, so that the ordering is
533 // maintained. Having only the de-duplication registration succeed would cause
534 // jobs to become no-ops without any actual jobs that made them redundant.
535 $dbw = $this->getPrimaryDB();
537 $scope = $this->getScopedNoTrxFlag( $dbw );
538 $dbw->onTransactionCommitOrIdle(
539 function () use ( $job ) {
540 parent::doDeduplicateRootJob( $job );
541 },
542 __METHOD__
543 );
544
545 return true;
546 }
547
552 protected function doDelete() {
553 $dbw = $this->getPrimaryDB();
555 $scope = $this->getScopedNoTrxFlag( $dbw );
556 try {
557 $dbw->delete( 'job', [ 'job_cmd' => $this->type ], __METHOD__ );
558 } catch ( DBError $e ) {
559 throw $this->getDBException( $e );
560 }
561
562 return true;
563 }
564
569 protected function doWaitForBackups() {
570 if ( $this->server ) {
571 return; // not using LBFactory instance
572 }
573
574 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
575 $lbFactory->waitForReplication( [
576 'domain' => $this->domain,
577 'cluster' => is_string( $this->cluster ) ? $this->cluster : false
578 ] );
579 }
580
584 protected function doFlushCaches() {
585 foreach ( [ 'size', 'acquiredcount' ] as $type ) {
586 $this->wanCache->delete( $this->getCacheKey( $type ) );
587 }
588 }
589
594 public function getAllQueuedJobs() {
595 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
596 }
597
602 public function getAllAcquiredJobs() {
603 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
604 }
605
610 public function getAllAbandonedJobs() {
611 return $this->getJobIterator( [
612 'job_cmd' => $this->getType(),
613 "job_token > ''",
614 "job_attempts >= " . intval( $this->maxTries )
615 ] );
616 }
617
622 protected function getJobIterator( array $conds ) {
623 $dbr = $this->getReplicaDB();
625 $scope = $this->getScopedNoTrxFlag( $dbr );
626 $qb = $dbr->newSelectQueryBuilder()
627 ->select( self::selectFields() )
628 ->from( 'job' )
629 ->where( $conds );
630 try {
631 return new MappedIterator(
632 $qb->caller( __METHOD__ )->fetchResultSet(),
633 function ( $row ) {
634 return $this->jobFromRow( $row );
635 }
636 );
637 } catch ( DBError $e ) {
638 throw $this->getDBException( $e );
639 }
640 }
641
642 public function getCoalesceLocationInternal() {
643 if ( $this->server ) {
644 return null; // not using the LBFactory instance
645 }
646
647 return is_string( $this->cluster )
648 ? "DBCluster:{$this->cluster}:{$this->domain}"
649 : "LBFactory:{$this->domain}";
650 }
651
652 protected function doGetSiblingQueuesWithJobs( array $types ) {
653 $dbr = $this->getReplicaDB();
655 $scope = $this->getScopedNoTrxFlag( $dbr );
656 // @note: this does not check whether the jobs are claimed or not.
657 // This is useful so JobQueueGroup::pop() also sees queues that only
658 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
659 // failed jobs so that they can be popped again for that edge case.
660 $res = $dbr->newSelectQueryBuilder()
661 ->select( 'job_cmd' )
662 ->distinct()
663 ->from( 'job' )
664 ->where( [ 'job_cmd' => $types ] )
665 ->caller( __METHOD__ )->fetchResultSet();
666
667 $types = [];
668 foreach ( $res as $row ) {
669 $types[] = $row->job_cmd;
670 }
671
672 return $types;
673 }
674
675 protected function doGetSiblingQueueSizes( array $types ) {
676 $dbr = $this->getReplicaDB();
678 $scope = $this->getScopedNoTrxFlag( $dbr );
679
680 $res = $dbr->newSelectQueryBuilder()
681 ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
682 ->from( 'job' )
683 ->where( [ 'job_cmd' => $types ] )
684 ->groupBy( 'job_cmd' )
685 ->caller( __METHOD__ )->fetchResultSet();
686
687 $sizes = [];
688 foreach ( $res as $row ) {
689 $sizes[$row->job_cmd] = (int)$row->count;
690 }
691
692 return $sizes;
693 }
694
700 public function recycleAndDeleteStaleJobs() {
701 $now = time();
702 $count = 0; // affected rows
703 $dbw = $this->getPrimaryDB();
705 $scope = $this->getScopedNoTrxFlag( $dbw );
706
707 try {
708 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
709 return $count; // already in progress
710 }
711
712 // Remove claims on jobs acquired for too long if enabled...
713 if ( $this->claimTTL > 0 ) {
714 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
715 // Get the IDs of jobs that have be claimed but not finished after too long.
716 // These jobs can be recycled into the queue by expiring the claim. Selecting
717 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
718 $res = $dbw->newSelectQueryBuilder()
719 ->select( 'job_id' )
720 ->from( 'job' )
721 ->where(
722 [
723 'job_cmd' => $this->type,
724 "job_token != {$dbw->addQuotes( '' )}", // was acquired
725 $dbw->buildComparison( '<', [ 'job_token_timestamp' => $claimCutoff ] ), // stale
726 $dbw->buildComparison( '<', [ 'job_attempts' => $this->maxTries ] ), // retries left
727 ]
728 )
729 ->caller( __METHOD__ )->fetchResultSet();
730 $ids = array_map(
731 static function ( $o ) {
732 return $o->job_id;
733 }, iterator_to_array( $res )
734 );
735 if ( count( $ids ) ) {
736 // Reset job_token for these jobs so that other runners will pick them up.
737 // Set the timestamp to the current time, as it is useful to now that the job
738 // was already tried before (the timestamp becomes the "released" time).
739 $dbw->update( 'job',
740 [
741 'job_token' => '',
742 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
743 ],
744 [ 'job_id' => $ids, "job_token != {$dbw->addQuotes( '' )}" ],
745 __METHOD__
746 );
747 $affected = $dbw->affectedRows();
748 $count += $affected;
749 $this->incrStats( 'recycles', $this->type, $affected );
750 }
751 }
752
753 // Just destroy any stale jobs...
754 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
755 $qb = $dbw->newSelectQueryBuilder()
756 ->select( 'job_id' )
757 ->from( 'job' )
758 ->where(
759 [
760 'job_cmd' => $this->type,
761 "job_token != {$dbw->addQuotes( '' )}", // was acquired
762 $dbw->buildComparison( '<', [ 'job_token_timestamp' => $pruneCutoff ] ) // stale
763 ]
764 );
765 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
766 $qb->andWhere( "job_attempts >= {$dbw->addQuotes( $this->maxTries )}" );
767 }
768 // Get the IDs of jobs that are considered stale and should be removed. Selecting
769 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
770 $res = $qb->caller( __METHOD__ )->fetchResultSet();
771 $ids = array_map(
772 static function ( $o ) {
773 return $o->job_id;
774 }, iterator_to_array( $res )
775 );
776 if ( count( $ids ) ) {
777 $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
778 $affected = $dbw->affectedRows();
779 $count += $affected;
780 $this->incrStats( 'abandons', $this->type, $affected );
781 }
782
783 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
784 } catch ( DBError $e ) {
785 throw $this->getDBException( $e );
786 }
787
788 return $count;
789 }
790
796 protected function insertFields( IJobSpecification $job, IDatabase $db ) {
797 return [
798 // Fields that describe the nature of the job
799 'job_cmd' => $job->getType(),
800 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
801 'job_title' => $job->getParams()['title'] ?? '',
802 'job_params' => self::makeBlob( $job->getParams() ),
803 // Additional job metadata
804 'job_timestamp' => $db->timestamp(),
805 'job_sha1' => Wikimedia\base_convert(
806 sha1( serialize( $job->getDeduplicationInfo() ) ),
807 16, 36, 31
808 ),
809 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
810 ];
811 }
812
817 protected function getReplicaDB() {
818 try {
819 return $this->getDB( DB_REPLICA );
820 } catch ( DBConnectionError $e ) {
821 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
822 }
823 }
824
830 protected function getPrimaryDB() {
831 try {
832 return $this->getDB( DB_PRIMARY );
833 } catch ( DBConnectionError $e ) {
834 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
835 }
836 }
837
842 protected function getDB( $index ) {
843 if ( $this->server ) {
844 if ( $this->conn instanceof IDatabase ) {
845 return $this->conn;
846 } elseif ( $this->conn instanceof DBError ) {
847 throw $this->conn;
848 }
849
850 try {
851 $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
852 $this->server['type'],
853 $this->server
854 );
855 } catch ( DBError $e ) {
856 $this->conn = $e;
857 throw $e;
858 }
859
860 return $this->conn;
861 } else {
862 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
863 $lb = is_string( $this->cluster )
864 ? $lbFactory->getExternalLB( $this->cluster )
865 : $lbFactory->getMainLB( $this->domain );
866
867 if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
868 // Keep a separate connection to avoid contention and deadlocks;
869 // However, SQLite has the opposite behavior due to DB-level locking.
870 $flags = $lb::CONN_TRX_AUTOCOMMIT;
871 } else {
872 // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
873 $flags = 0;
874 }
875
876 return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
877 }
878 }
879
884 private function getScopedNoTrxFlag( IDatabase $db ) {
885 $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
886 $db->clearFlag( DBO_TRX ); // make each query its own transaction
887
888 return new ScopedCallback( static function () use ( $db, $autoTrx ) {
889 if ( $autoTrx ) {
890 $db->setFlag( DBO_TRX ); // restore old setting
891 }
892 } );
893 }
894
899 private function getCacheKey( $property ) {
900 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
901
902 return $this->wanCache->makeGlobalKey(
903 'jobqueue',
904 $this->domain,
905 $cluster,
906 $this->type,
907 $property
908 );
909 }
910
915 protected static function makeBlob( $params ) {
916 if ( $params !== false ) {
917 return serialize( $params );
918 } else {
919 return '';
920 }
921 }
922
927 protected function jobFromRow( $row ) {
928 $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
929 if ( !is_array( $params ) ) { // this shouldn't happen
930 throw new UnexpectedValueException(
931 "Could not unserialize job with ID '{$row->job_id}'." );
932 }
933
934 $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
935 $job = $this->factoryJob( $row->job_cmd, $params );
936 $job->setMetadata( 'id', $row->job_id );
937 $job->setMetadata( 'timestamp', $row->job_timestamp );
938
939 return $job;
940 }
941
946 protected function getDBException( DBError $e ) {
947 return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
948 }
949
955 public static function selectFields() {
956 return [
957 'job_id',
958 'job_cmd',
959 'job_namespace',
960 'job_title',
961 'job_timestamp',
962 'job_params',
963 'job_random',
964 'job_attempts',
965 'job_token',
966 'job_token_timestamp',
967 'job_sha1',
968 ];
969 }
970}
getDB()
const NS_SPECIAL
Definition Defines.php:53
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Class to handle job queues stored in the DB.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doAck(RunnableJob $job)
supportedOrders()
Get the allowed queue orders for configuration validation.
doGetSiblingQueueSizes(array $types)
insertFields(IJobSpecification $job, IDatabase $db)
__construct(array $params)
Additional parameters include:
getDBException(DBError $e)
getAllAbandonedJobs()
jobFromRow( $row)
doBatchPush(array $jobs, $flags)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doGetAbandonedCount()
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDB( $index)
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
IMaintainableDatabase DBError null $conn
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
getJobIterator(array $conds)
array null $server
Server configuration array.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:39
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition JobQueue.php:780
factoryJob( $command, $params)
Definition JobQueue.php:746
MediaWiki exception.
Convenience class for generating iterators from iterators.
Service locator for MediaWiki core services.
Database error base class.
Definition DBError.php:31
A query builder for SELECT queries with a fluent interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:36
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
insert( $table, $rows, $fname=__METHOD__, $options=[])
Insert row(s) into a table, in the provided order.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
Advanced database interface for IDatabase handles that include maintenance methods.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
const DB_REPLICA
Definition defines.php:26
const DB_PRIMARY
Definition defines.php:28
const DBO_TRX
Definition defines.php:12
if(count( $args)< 1) $job