MediaWiki REL1_31
JobQueueDB.php
Go to the documentation of this file.
1<?php
27use Wikimedia\ScopedCallback;
28
35class JobQueueDB extends JobQueue {
36 const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
37 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
38 const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
39 const MAX_OFFSET = 255; // integer; maximum number of rows to skip
40
42 protected $cache;
43
45 protected $cluster = false;
46
55 protected function __construct( array $params ) {
56 parent::__construct( $params );
57
58 $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
59 $this->cache = ObjectCache::getMainWANInstance();
60 }
61
62 protected function supportedOrders() {
63 return [ 'random', 'timestamp', 'fifo' ];
64 }
65
66 protected function optimalOrder() {
67 return 'random';
68 }
69
74 protected function doIsEmpty() {
75 $dbr = $this->getReplicaDB();
76 try {
77 $found = $dbr->selectField( // unclaimed job
78 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
79 );
80 } catch ( DBError $e ) {
81 $this->throwDBException( $e );
82 }
83
84 return !$found;
85 }
86
91 protected function doGetSize() {
92 $key = $this->getCacheKey( 'size' );
93
94 $size = $this->cache->get( $key );
95 if ( is_int( $size ) ) {
96 return $size;
97 }
98
99 try {
100 $dbr = $this->getReplicaDB();
101 $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
102 [ 'job_cmd' => $this->type, 'job_token' => '' ],
103 __METHOD__
104 );
105 } catch ( DBError $e ) {
106 $this->throwDBException( $e );
107 }
108 $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
109
110 return $size;
111 }
112
117 protected function doGetAcquiredCount() {
118 if ( $this->claimTTL <= 0 ) {
119 return 0; // no acknowledgements
120 }
121
122 $key = $this->getCacheKey( 'acquiredcount' );
123
124 $count = $this->cache->get( $key );
125 if ( is_int( $count ) ) {
126 return $count;
127 }
128
129 $dbr = $this->getReplicaDB();
130 try {
131 $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
132 [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
133 __METHOD__
134 );
135 } catch ( DBError $e ) {
136 $this->throwDBException( $e );
137 }
138 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
139
140 return $count;
141 }
142
148 protected function doGetAbandonedCount() {
149 if ( $this->claimTTL <= 0 ) {
150 return 0; // no acknowledgements
151 }
152
153 $key = $this->getCacheKey( 'abandonedcount' );
154
155 $count = $this->cache->get( $key );
156 if ( is_int( $count ) ) {
157 return $count;
158 }
159
160 $dbr = $this->getReplicaDB();
161 try {
162 $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
163 [
164 'job_cmd' => $this->type,
165 "job_token != {$dbr->addQuotes( '' )}",
166 "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
167 ],
168 __METHOD__
169 );
170 } catch ( DBError $e ) {
171 $this->throwDBException( $e );
172 }
173
174 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
175
176 return $count;
177 }
178
186 protected function doBatchPush( array $jobs, $flags ) {
187 $dbw = $this->getMasterDB();
188 // In general, there will be two cases here:
189 // a) sqlite; DB connection is probably a regular round-aware handle.
190 // If the connection is busy with a transaction, then defer the job writes
191 // until right before the main round commit step. Any errors that bubble
192 // up will rollback the main commit round.
193 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
194 // No transaction is active nor will be started by writes, so enqueue the jobs
195 // now so that any errors will show up immediately as the interface expects. Any
196 // errors that bubble up will rollback the main commit round.
197 $fname = __METHOD__;
198 $dbw->onTransactionPreCommitOrIdle(
199 function () use ( $dbw, $jobs, $flags, $fname ) {
200 $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
201 },
202 $fname
203 );
204 }
205
216 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
217 if ( !count( $jobs ) ) {
218 return;
219 }
220
221 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
222 $rowList = []; // list of jobs for jobs that are not de-duplicated
223 foreach ( $jobs as $job ) {
224 $row = $this->insertFields( $job );
225 if ( $job->ignoreDuplicates() ) {
226 $rowSet[$row['job_sha1']] = $row;
227 } else {
228 $rowList[] = $row;
229 }
230 }
231
232 if ( $flags & self::QOS_ATOMIC ) {
233 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
234 }
235 try {
236 // Strip out any duplicate jobs that are already in the queue...
237 if ( count( $rowSet ) ) {
238 $res = $dbw->select( 'job', 'job_sha1',
239 [
240 // No job_type condition since it's part of the job_sha1 hash
241 'job_sha1' => array_keys( $rowSet ),
242 'job_token' => '' // unclaimed
243 ],
244 $method
245 );
246 foreach ( $res as $row ) {
247 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
248 unset( $rowSet[$row->job_sha1] ); // already enqueued
249 }
250 }
251 // Build the full list of job rows to insert
252 $rows = array_merge( $rowList, array_values( $rowSet ) );
253 // Insert the job rows in chunks to avoid replica DB lag...
254 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
255 $dbw->insert( 'job', $rowBatch, $method );
256 }
257 JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
258 JobQueue::incrStats( 'dupe_inserts', $this->type,
259 count( $rowSet ) + count( $rowList ) - count( $rows )
260 );
261 } catch ( DBError $e ) {
262 $this->throwDBException( $e );
263 }
264 if ( $flags & self::QOS_ATOMIC ) {
265 $dbw->endAtomic( $method );
266 }
267
268 return;
269 }
270
275 protected function doPop() {
276 $dbw = $this->getMasterDB();
277 try {
278 $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
279 $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
280 $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
281 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
282 } );
283
284 $uuid = wfRandomString( 32 ); // pop attempt
285 $job = false; // job popped off
286 do { // retry when our row is invalid or deleted as a duplicate
287 // Try to reserve a row in the DB...
288 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
289 $row = $this->claimOldest( $uuid );
290 } else { // random first
291 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
292 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
293 $row = $this->claimRandom( $uuid, $rand, $gte );
294 }
295 // Check if we found a row to reserve...
296 if ( !$row ) {
297 break; // nothing to do
298 }
299 JobQueue::incrStats( 'pops', $this->type );
300 // Get the job object from the row...
301 $title = Title::makeTitle( $row->job_namespace, $row->job_title );
302 $job = Job::factory( $row->job_cmd, $title,
303 self::extractBlob( $row->job_params ), $row->job_id );
304 $job->metadata['id'] = $row->job_id;
305 $job->metadata['timestamp'] = $row->job_timestamp;
306 break; // done
307 } while ( true );
308
309 if ( !$job || mt_rand( 0, 9 ) == 0 ) {
310 // Handled jobs that need to be recycled/deleted;
311 // any recycled jobs will be picked up next attempt
313 }
314 } catch ( DBError $e ) {
315 $this->throwDBException( $e );
316 }
317
318 return $job;
319 }
320
329 protected function claimRandom( $uuid, $rand, $gte ) {
330 $dbw = $this->getMasterDB();
331 // Check cache to see if the queue has <= OFFSET items
332 $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
333
334 $row = false; // the row acquired
335 $invertedDirection = false; // whether one job_random direction was already scanned
336 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
337 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
338 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
339 // be used here with MySQL.
340 do {
341 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
342 // For small queues, using OFFSET will overshoot and return no rows more often.
343 // Instead, this uses job_random to pick a row (possibly checking both directions).
344 $ineq = $gte ? '>=' : '<=';
345 $dir = $gte ? 'ASC' : 'DESC';
346 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
347 [
348 'job_cmd' => $this->type,
349 'job_token' => '', // unclaimed
350 "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
351 __METHOD__,
352 [ 'ORDER BY' => "job_random {$dir}" ]
353 );
354 if ( !$row && !$invertedDirection ) {
355 $gte = !$gte;
356 $invertedDirection = true;
357 continue; // try the other direction
358 }
359 } else { // table *may* have >= MAX_OFFSET rows
360 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
361 // in MySQL if there are many rows for some reason. This uses a small OFFSET
362 // instead of job_random for reducing excess claim retries.
363 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
364 [
365 'job_cmd' => $this->type,
366 'job_token' => '', // unclaimed
367 ],
368 __METHOD__,
369 [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
370 );
371 if ( !$row ) {
372 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
373 $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
374 continue; // use job_random
375 }
376 }
377
378 if ( $row ) { // claim the job
379 $dbw->update( 'job', // update by PK
380 [
381 'job_token' => $uuid,
382 'job_token_timestamp' => $dbw->timestamp(),
383 'job_attempts = job_attempts+1' ],
384 [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
385 __METHOD__
386 );
387 // This might get raced out by another runner when claiming the previously
388 // selected row. The use of job_random should minimize this problem, however.
389 if ( !$dbw->affectedRows() ) {
390 $row = false; // raced out
391 }
392 } else {
393 break; // nothing to do
394 }
395 } while ( !$row );
396
397 return $row;
398 }
399
406 protected function claimOldest( $uuid ) {
407 $dbw = $this->getMasterDB();
408
409 $row = false; // the row acquired
410 do {
411 if ( $dbw->getType() === 'mysql' ) {
412 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
413 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
414 // Oracle and Postgre have no such limitation. However, MySQL offers an
415 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
416 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
417 "SET " .
418 "job_token = {$dbw->addQuotes( $uuid ) }, " .
419 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
420 "job_attempts = job_attempts+1 " .
421 "WHERE ( " .
422 "job_cmd = {$dbw->addQuotes( $this->type )} " .
423 "AND job_token = {$dbw->addQuotes( '' )} " .
424 ") ORDER BY job_id ASC LIMIT 1",
425 __METHOD__
426 );
427 } else {
428 // Use a subquery to find the job, within an UPDATE to claim it.
429 // This uses as much of the DB wrapper functions as possible.
430 $dbw->update( 'job',
431 [
432 'job_token' => $uuid,
433 'job_token_timestamp' => $dbw->timestamp(),
434 'job_attempts = job_attempts+1' ],
435 [ 'job_id = (' .
436 $dbw->selectSQLText( 'job', 'job_id',
437 [ 'job_cmd' => $this->type, 'job_token' => '' ],
438 __METHOD__,
439 [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
440 ')'
441 ],
442 __METHOD__
443 );
444 }
445 // Fetch any row that we just reserved...
446 if ( $dbw->affectedRows() ) {
447 $row = $dbw->selectRow( 'job', self::selectFields(),
448 [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
449 );
450 if ( !$row ) { // raced out by duplicate job removal
451 wfDebug( "Row deleted as duplicate by another process.\n" );
452 }
453 } else {
454 break; // nothing to do
455 }
456 } while ( !$row );
457
458 return $row;
459 }
460
466 protected function doAck( Job $job ) {
467 if ( !isset( $job->metadata['id'] ) ) {
468 throw new MWException( "Job of type '{$job->getType()}' has no ID." );
469 }
470
471 $dbw = $this->getMasterDB();
472 try {
473 $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
474 $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
475 $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
476 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
477 } );
478
479 // Delete a row with a single DELETE without holding row locks over RTTs...
480 $dbw->delete( 'job',
481 [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
482
483 JobQueue::incrStats( 'acks', $this->type );
484 } catch ( DBError $e ) {
485 $this->throwDBException( $e );
486 }
487 }
488
496 $params = $job->getParams();
497 if ( !isset( $params['rootJobSignature'] ) ) {
498 throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
499 } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
500 throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
501 }
502 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
503 // Callers should call batchInsert() and then this function so that if the insert
504 // fails, the de-duplication registration will be aborted. Since the insert is
505 // deferred till "transaction idle", do the same here, so that the ordering is
506 // maintained. Having only the de-duplication registration succeed would cause
507 // jobs to become no-ops without any actual jobs that made them redundant.
508 $dbw = $this->getMasterDB();
510 $dbw->onTransactionIdle(
511 function () use ( $cache, $params, $key, $dbw ) {
512 $timestamp = $cache->get( $key ); // current last timestamp of this job
513 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
514 return true; // a newer version of this root job was enqueued
515 }
516
517 // Update the timestamp of the last root job started at the location...
518 return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
519 },
520 __METHOD__
521 );
522
523 return true;
524 }
525
530 protected function doDelete() {
531 $dbw = $this->getMasterDB();
532 try {
533 $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
534 } catch ( DBError $e ) {
535 $this->throwDBException( $e );
536 }
537
538 return true;
539 }
540
545 protected function doWaitForBackups() {
546 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
547 $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
548 }
549
553 protected function doFlushCaches() {
554 foreach ( [ 'size', 'acquiredcount' ] as $type ) {
555 $this->cache->delete( $this->getCacheKey( $type ) );
556 }
557 }
558
563 public function getAllQueuedJobs() {
564 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
565 }
566
571 public function getAllAcquiredJobs() {
572 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
573 }
574
579 public function getAllAbandonedJobs() {
580 return $this->getJobIterator( [
581 'job_cmd' => $this->getType(),
582 "job_token > ''",
583 "job_attempts >= " . intval( $this->maxTries )
584 ] );
585 }
586
591 protected function getJobIterator( array $conds ) {
592 $dbr = $this->getReplicaDB();
593 try {
594 return new MappedIterator(
595 $dbr->select( 'job', self::selectFields(), $conds ),
596 function ( $row ) {
598 $row->job_cmd,
599 Title::makeTitle( $row->job_namespace, $row->job_title ),
600 strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
601 );
602 $job->metadata['id'] = $row->job_id;
603 $job->metadata['timestamp'] = $row->job_timestamp;
604
605 return $job;
606 }
607 );
608 } catch ( DBError $e ) {
609 $this->throwDBException( $e );
610 }
611 }
612
613 public function getCoalesceLocationInternal() {
614 return $this->cluster
615 ? "DBCluster:{$this->cluster}:{$this->wiki}"
616 : "LBFactory:{$this->wiki}";
617 }
618
619 protected function doGetSiblingQueuesWithJobs( array $types ) {
620 $dbr = $this->getReplicaDB();
621 // @note: this does not check whether the jobs are claimed or not.
622 // This is useful so JobQueueGroup::pop() also sees queues that only
623 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
624 // failed jobs so that they can be popped again for that edge case.
625 $res = $dbr->select( 'job', 'DISTINCT job_cmd',
626 [ 'job_cmd' => $types ], __METHOD__ );
627
628 $types = [];
629 foreach ( $res as $row ) {
630 $types[] = $row->job_cmd;
631 }
632
633 return $types;
634 }
635
636 protected function doGetSiblingQueueSizes( array $types ) {
637 $dbr = $this->getReplicaDB();
638 $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
639 [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
640
641 $sizes = [];
642 foreach ( $res as $row ) {
643 $sizes[$row->job_cmd] = (int)$row->count;
644 }
645
646 return $sizes;
647 }
648
654 public function recycleAndDeleteStaleJobs() {
655 $now = time();
656 $count = 0; // affected rows
657 $dbw = $this->getMasterDB();
658
659 try {
660 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
661 return $count; // already in progress
662 }
663
664 // Remove claims on jobs acquired for too long if enabled...
665 if ( $this->claimTTL > 0 ) {
666 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
667 // Get the IDs of jobs that have be claimed but not finished after too long.
668 // These jobs can be recycled into the queue by expiring the claim. Selecting
669 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
670 $res = $dbw->select( 'job', 'job_id',
671 [
672 'job_cmd' => $this->type,
673 "job_token != {$dbw->addQuotes( '' )}", // was acquired
674 "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
675 "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
676 __METHOD__
677 );
678 $ids = array_map(
679 function ( $o ) {
680 return $o->job_id;
681 }, iterator_to_array( $res )
682 );
683 if ( count( $ids ) ) {
684 // Reset job_token for these jobs so that other runners will pick them up.
685 // Set the timestamp to the current time, as it is useful to now that the job
686 // was already tried before (the timestamp becomes the "released" time).
687 $dbw->update( 'job',
688 [
689 'job_token' => '',
690 'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
691 [
692 'job_id' => $ids ],
693 __METHOD__
694 );
695 $affected = $dbw->affectedRows();
696 $count += $affected;
697 JobQueue::incrStats( 'recycles', $this->type, $affected );
698 $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
699 }
700 }
701
702 // Just destroy any stale jobs...
703 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
704 $conds = [
705 'job_cmd' => $this->type,
706 "job_token != {$dbw->addQuotes( '' )}", // was acquired
707 "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
708 ];
709 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
710 $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
711 }
712 // Get the IDs of jobs that are considered stale and should be removed. Selecting
713 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
714 $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
715 $ids = array_map(
716 function ( $o ) {
717 return $o->job_id;
718 }, iterator_to_array( $res )
719 );
720 if ( count( $ids ) ) {
721 $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
722 $affected = $dbw->affectedRows();
723 $count += $affected;
724 JobQueue::incrStats( 'abandons', $this->type, $affected );
725 }
726
727 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
728 } catch ( DBError $e ) {
729 $this->throwDBException( $e );
730 }
731
732 return $count;
733 }
734
739 protected function insertFields( IJobSpecification $job ) {
740 $dbw = $this->getMasterDB();
741
742 return [
743 // Fields that describe the nature of the job
744 'job_cmd' => $job->getType(),
745 'job_namespace' => $job->getTitle()->getNamespace(),
746 'job_title' => $job->getTitle()->getDBkey(),
747 'job_params' => self::makeBlob( $job->getParams() ),
748 // Additional job metadata
749 'job_timestamp' => $dbw->timestamp(),
750 'job_sha1' => Wikimedia\base_convert(
751 sha1( serialize( $job->getDeduplicationInfo() ) ),
752 16, 36, 31
753 ),
754 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
755 ];
756 }
757
762 protected function getReplicaDB() {
763 try {
764 return $this->getDB( DB_REPLICA );
765 } catch ( DBConnectionError $e ) {
766 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
767 }
768 }
769
774 protected function getMasterDB() {
775 try {
776 return $this->getDB( DB_MASTER );
777 } catch ( DBConnectionError $e ) {
778 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
779 }
780 }
781
786 protected function getDB( $index ) {
787 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
788 $lb = ( $this->cluster !== false )
789 ? $lbFactory->getExternalLB( $this->cluster )
790 : $lbFactory->getMainLB( $this->wiki );
791
792 return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
793 // Keep a separate connection to avoid contention and deadlocks;
794 // However, SQLite has the opposite behavior due to DB-level locking.
795 ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CONN_TRX_AUTOCOMMIT )
796 // Jobs insertion will be defered until the PRESEND stage to reduce contention.
797 : $lb->getConnectionRef( $index, [], $this->wiki );
798 }
799
804 private function getCacheKey( $property ) {
805 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
806 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
807
808 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
809 }
810
815 protected static function makeBlob( $params ) {
816 if ( $params !== false ) {
817 return serialize( $params );
818 } else {
819 return '';
820 }
821 }
822
827 protected static function extractBlob( $blob ) {
828 if ( (string)$blob !== '' ) {
829 return unserialize( $blob );
830 } else {
831 return false;
832 }
833 }
834
839 protected function throwDBException( DBError $e ) {
840 throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
841 }
842
848 public static function selectFields() {
849 return [
850 'job_id',
851 'job_cmd',
852 'job_namespace',
853 'job_title',
854 'job_timestamp',
855 'job_params',
856 'job_random',
857 'job_attempts',
858 'job_token',
859 'job_token_timestamp',
860 'job_sha1',
861 ];
862 }
863}
serialize()
unserialize( $serialized)
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings,...
Definition Setup.php:112
Class to handle job queues stored in the DB.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
supportedOrders()
Get the allowed queue orders for configuration validation.
doAck(Job $job)
insertFields(IJobSpecification $job)
const MAX_JOB_RANDOM
doGetSiblingQueueSizes(array $types)
const MAX_OFFSET
WANObjectCache $cache
bool string $cluster
Name of an external DB cluster.
getCacheKey( $property)
__construct(array $params)
Additional parameters include:
const CACHE_TTL_SHORT
getAllAbandonedJobs()
doBatchPush(array $jobs, $flags)
throwDBException(DBError $e)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doGetAbandonedCount()
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDB( $index)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
static extractBlob( $blob)
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
const MAX_AGE_PRUNE
getJobIterator(array $conds)
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
BagOStuff $dupCache
Definition JobQueue.php:46
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:709
const ROOTJOB_TTL
Definition JobQueue.php:52
string $type
Job type.
Definition JobQueue.php:35
getRootJobCacheKey( $signature)
Definition JobQueue.php:528
Class to both describe a background job and handle jobs.
Definition Job.php:31
static factory( $command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Definition Job.php:74
MediaWiki exception.
Convenience class for generating iterators from iterators.
MediaWikiServices is the service locator for the application scope of MediaWiki.
Multi-datacenter aware caching interface.
get( $key, &$curTTL=null, array $checkKeys=[], &$asOf=null)
Fetch the value of a key from cache.
set( $key, $value, $ttl=0, array $opts=[])
Set the value of a key in cache.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition DBConnRef.php:15
Database error base class.
Definition DBError.php:30
$res
Definition database.txt:21
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition deferred.txt:11
when a variable name is used in a function
Definition design.txt:94
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition design.txt:19
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition hooks.txt:2783
processing should stop and the error should be shown to the user * false
Definition hooks.txt:187
returning false will NOT prevent logging $e
Definition hooks.txt:2176
Job queue task description interface.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
you have access to all of the normal MediaWiki so you can get a DB use the cache
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition postgres.txt:30
const DB_REPLICA
Definition defines.php:25
const DB_MASTER
Definition defines.php:29
const DBO_TRX
Definition defines.php:12
if(count( $args)< 1) $job
$property
$params