MediaWiki REL1_32
JobQueueDB.php
Go to the documentation of this file.
1<?php
27use Wikimedia\ScopedCallback;
28
35class JobQueueDB extends JobQueue {
36 const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
37 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
38 const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
39 const MAX_OFFSET = 255; // integer; maximum number of rows to skip
40
42 protected $cache;
43
45 protected $cluster = false;
46
55 protected function __construct( array $params ) {
56 parent::__construct( $params );
57
58 $this->cluster = $params['cluster'] ?? false;
59 $this->cache = ObjectCache::getMainWANInstance();
60 }
61
62 protected function supportedOrders() {
63 return [ 'random', 'timestamp', 'fifo' ];
64 }
65
66 protected function optimalOrder() {
67 return 'random';
68 }
69
74 protected function doIsEmpty() {
75 $dbr = $this->getReplicaDB();
76 try {
77 $found = $dbr->selectField( // unclaimed job
78 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
79 );
80 } catch ( DBError $e ) {
81 $this->throwDBException( $e );
82 }
83
84 return !$found;
85 }
86
91 protected function doGetSize() {
92 $key = $this->getCacheKey( 'size' );
93
94 $size = $this->cache->get( $key );
95 if ( is_int( $size ) ) {
96 return $size;
97 }
98
99 try {
100 $dbr = $this->getReplicaDB();
101 $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
102 [ 'job_cmd' => $this->type, 'job_token' => '' ],
103 __METHOD__
104 );
105 } catch ( DBError $e ) {
106 $this->throwDBException( $e );
107 }
108 $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
109
110 return $size;
111 }
112
117 protected function doGetAcquiredCount() {
118 if ( $this->claimTTL <= 0 ) {
119 return 0; // no acknowledgements
120 }
121
122 $key = $this->getCacheKey( 'acquiredcount' );
123
124 $count = $this->cache->get( $key );
125 if ( is_int( $count ) ) {
126 return $count;
127 }
128
129 $dbr = $this->getReplicaDB();
130 try {
131 $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
132 [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
133 __METHOD__
134 );
135 } catch ( DBError $e ) {
136 $this->throwDBException( $e );
137 }
138 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
139
140 return $count;
141 }
142
148 protected function doGetAbandonedCount() {
149 if ( $this->claimTTL <= 0 ) {
150 return 0; // no acknowledgements
151 }
152
153 $key = $this->getCacheKey( 'abandonedcount' );
154
155 $count = $this->cache->get( $key );
156 if ( is_int( $count ) ) {
157 return $count;
158 }
159
160 $dbr = $this->getReplicaDB();
161 try {
162 $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
163 [
164 'job_cmd' => $this->type,
165 "job_token != {$dbr->addQuotes( '' )}",
166 "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
167 ],
168 __METHOD__
169 );
170 } catch ( DBError $e ) {
171 $this->throwDBException( $e );
172 }
173
174 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
175
176 return $count;
177 }
178
186 protected function doBatchPush( array $jobs, $flags ) {
187 $dbw = $this->getMasterDB();
188 // In general, there will be two cases here:
189 // a) sqlite; DB connection is probably a regular round-aware handle.
190 // If the connection is busy with a transaction, then defer the job writes
191 // until right before the main round commit step. Any errors that bubble
192 // up will rollback the main commit round.
193 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
194 // No transaction is active nor will be started by writes, so enqueue the jobs
195 // now so that any errors will show up immediately as the interface expects. Any
196 // errors that bubble up will rollback the main commit round.
197 $fname = __METHOD__;
198 $dbw->onTransactionPreCommitOrIdle(
199 function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
200 $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
201 },
202 $fname
203 );
204 }
205
216 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
217 if ( !count( $jobs ) ) {
218 return;
219 }
220
221 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
222 $rowList = []; // list of jobs for jobs that are not de-duplicated
223 foreach ( $jobs as $job ) {
224 $row = $this->insertFields( $job, $dbw );
225 if ( $job->ignoreDuplicates() ) {
226 $rowSet[$row['job_sha1']] = $row;
227 } else {
228 $rowList[] = $row;
229 }
230 }
231
232 if ( $flags & self::QOS_ATOMIC ) {
233 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
234 }
235 try {
236 // Strip out any duplicate jobs that are already in the queue...
237 if ( count( $rowSet ) ) {
238 $res = $dbw->select( 'job', 'job_sha1',
239 [
240 // No job_type condition since it's part of the job_sha1 hash
241 'job_sha1' => array_keys( $rowSet ),
242 'job_token' => '' // unclaimed
243 ],
244 $method
245 );
246 foreach ( $res as $row ) {
247 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
248 unset( $rowSet[$row->job_sha1] ); // already enqueued
249 }
250 }
251 // Build the full list of job rows to insert
252 $rows = array_merge( $rowList, array_values( $rowSet ) );
253 // Insert the job rows in chunks to avoid replica DB lag...
254 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
255 $dbw->insert( 'job', $rowBatch, $method );
256 }
257 JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
258 JobQueue::incrStats( 'dupe_inserts', $this->type,
259 count( $rowSet ) + count( $rowList ) - count( $rows )
260 );
261 } catch ( DBError $e ) {
262 $this->throwDBException( $e );
263 }
264 if ( $flags & self::QOS_ATOMIC ) {
265 $dbw->endAtomic( $method );
266 }
267 }
268
273 protected function doPop() {
274 $dbw = $this->getMasterDB();
275 try {
276 $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
277 $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
278 $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
279 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
280 } );
281
282 $uuid = wfRandomString( 32 ); // pop attempt
283 $job = false; // job popped off
284 do { // retry when our row is invalid or deleted as a duplicate
285 // Try to reserve a row in the DB...
286 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
287 $row = $this->claimOldest( $uuid );
288 } else { // random first
289 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
290 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
291 $row = $this->claimRandom( $uuid, $rand, $gte );
292 }
293 // Check if we found a row to reserve...
294 if ( !$row ) {
295 break; // nothing to do
296 }
297 JobQueue::incrStats( 'pops', $this->type );
298 // Get the job object from the row...
299 $title = Title::makeTitle( $row->job_namespace, $row->job_title );
300 $job = Job::factory( $row->job_cmd, $title,
301 self::extractBlob( $row->job_params ), $row->job_id );
302 $job->metadata['id'] = $row->job_id;
303 $job->metadata['timestamp'] = $row->job_timestamp;
304 break; // done
305 } while ( true );
306
307 if ( !$job || mt_rand( 0, 9 ) == 0 ) {
308 // Handled jobs that need to be recycled/deleted;
309 // any recycled jobs will be picked up next attempt
311 }
312 } catch ( DBError $e ) {
313 $this->throwDBException( $e );
314 }
315
316 return $job;
317 }
318
327 protected function claimRandom( $uuid, $rand, $gte ) {
328 $dbw = $this->getMasterDB();
329 // Check cache to see if the queue has <= OFFSET items
330 $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
331
332 $row = false; // the row acquired
333 $invertedDirection = false; // whether one job_random direction was already scanned
334 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
335 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
336 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
337 // be used here with MySQL.
338 do {
339 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
340 // For small queues, using OFFSET will overshoot and return no rows more often.
341 // Instead, this uses job_random to pick a row (possibly checking both directions).
342 $ineq = $gte ? '>=' : '<=';
343 $dir = $gte ? 'ASC' : 'DESC';
344 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
345 [
346 'job_cmd' => $this->type,
347 'job_token' => '', // unclaimed
348 "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
349 __METHOD__,
350 [ 'ORDER BY' => "job_random {$dir}" ]
351 );
352 if ( !$row && !$invertedDirection ) {
353 $gte = !$gte;
354 $invertedDirection = true;
355 continue; // try the other direction
356 }
357 } else { // table *may* have >= MAX_OFFSET rows
358 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
359 // in MySQL if there are many rows for some reason. This uses a small OFFSET
360 // instead of job_random for reducing excess claim retries.
361 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
362 [
363 'job_cmd' => $this->type,
364 'job_token' => '', // unclaimed
365 ],
366 __METHOD__,
367 [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
368 );
369 if ( !$row ) {
370 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
371 $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
372 continue; // use job_random
373 }
374 }
375
376 if ( $row ) { // claim the job
377 $dbw->update( 'job', // update by PK
378 [
379 'job_token' => $uuid,
380 'job_token_timestamp' => $dbw->timestamp(),
381 'job_attempts = job_attempts+1' ],
382 [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
383 __METHOD__
384 );
385 // This might get raced out by another runner when claiming the previously
386 // selected row. The use of job_random should minimize this problem, however.
387 if ( !$dbw->affectedRows() ) {
388 $row = false; // raced out
389 }
390 } else {
391 break; // nothing to do
392 }
393 } while ( !$row );
394
395 return $row;
396 }
397
404 protected function claimOldest( $uuid ) {
405 $dbw = $this->getMasterDB();
406
407 $row = false; // the row acquired
408 do {
409 if ( $dbw->getType() === 'mysql' ) {
410 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
411 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
412 // Oracle and Postgre have no such limitation. However, MySQL offers an
413 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
414 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
415 "SET " .
416 "job_token = {$dbw->addQuotes( $uuid ) }, " .
417 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
418 "job_attempts = job_attempts+1 " .
419 "WHERE ( " .
420 "job_cmd = {$dbw->addQuotes( $this->type )} " .
421 "AND job_token = {$dbw->addQuotes( '' )} " .
422 ") ORDER BY job_id ASC LIMIT 1",
423 __METHOD__
424 );
425 } else {
426 // Use a subquery to find the job, within an UPDATE to claim it.
427 // This uses as much of the DB wrapper functions as possible.
428 $dbw->update( 'job',
429 [
430 'job_token' => $uuid,
431 'job_token_timestamp' => $dbw->timestamp(),
432 'job_attempts = job_attempts+1' ],
433 [ 'job_id = (' .
434 $dbw->selectSQLText( 'job', 'job_id',
435 [ 'job_cmd' => $this->type, 'job_token' => '' ],
436 __METHOD__,
437 [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
438 ')'
439 ],
440 __METHOD__
441 );
442 }
443 // Fetch any row that we just reserved...
444 if ( $dbw->affectedRows() ) {
445 $row = $dbw->selectRow( 'job', self::selectFields(),
446 [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
447 );
448 if ( !$row ) { // raced out by duplicate job removal
449 wfDebug( "Row deleted as duplicate by another process.\n" );
450 }
451 } else {
452 break; // nothing to do
453 }
454 } while ( !$row );
455
456 return $row;
457 }
458
464 protected function doAck( Job $job ) {
465 if ( !isset( $job->metadata['id'] ) ) {
466 throw new MWException( "Job of type '{$job->getType()}' has no ID." );
467 }
468
469 $dbw = $this->getMasterDB();
470 try {
471 $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
472 $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
473 $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
474 $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
475 } );
476
477 // Delete a row with a single DELETE without holding row locks over RTTs...
478 $dbw->delete( 'job',
479 [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
480
481 JobQueue::incrStats( 'acks', $this->type );
482 } catch ( DBError $e ) {
483 $this->throwDBException( $e );
484 }
485 }
486
494 $params = $job->getParams();
495 if ( !isset( $params['rootJobSignature'] ) ) {
496 throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
497 } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
498 throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
499 }
500 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
501 // Callers should call JobQueueGroup::push() before this method so that if the insert
502 // fails, the de-duplication registration will be aborted. Since the insert is
503 // deferred till "transaction idle", do the same here, so that the ordering is
504 // maintained. Having only the de-duplication registration succeed would cause
505 // jobs to become no-ops without any actual jobs that made them redundant.
506 $dbw = $this->getMasterDB();
508 $dbw->onTransactionCommitOrIdle(
509 function () use ( $cache, $params, $key ) {
510 $timestamp = $cache->get( $key ); // current last timestamp of this job
511 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
512 return true; // a newer version of this root job was enqueued
513 }
514
515 // Update the timestamp of the last root job started at the location...
516 return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
517 },
518 __METHOD__
519 );
520
521 return true;
522 }
523
528 protected function doDelete() {
529 $dbw = $this->getMasterDB();
530 try {
531 $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
532 } catch ( DBError $e ) {
533 $this->throwDBException( $e );
534 }
535
536 return true;
537 }
538
543 protected function doWaitForBackups() {
544 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
545 $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
546 }
547
551 protected function doFlushCaches() {
552 foreach ( [ 'size', 'acquiredcount' ] as $type ) {
553 $this->cache->delete( $this->getCacheKey( $type ) );
554 }
555 }
556
561 public function getAllQueuedJobs() {
562 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
563 }
564
569 public function getAllAcquiredJobs() {
570 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
571 }
572
577 protected function getJobIterator( array $conds ) {
578 $dbr = $this->getReplicaDB();
579 try {
580 return new MappedIterator(
581 $dbr->select( 'job', self::selectFields(), $conds ),
582 function ( $row ) {
584 $row->job_cmd,
585 Title::makeTitle( $row->job_namespace, $row->job_title ),
586 strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
587 );
588 $job->metadata['id'] = $row->job_id;
589 $job->metadata['timestamp'] = $row->job_timestamp;
590
591 return $job;
592 }
593 );
594 } catch ( DBError $e ) {
595 $this->throwDBException( $e );
596 }
597 }
598
599 public function getCoalesceLocationInternal() {
600 return $this->cluster
601 ? "DBCluster:{$this->cluster}:{$this->wiki}"
602 : "LBFactory:{$this->wiki}";
603 }
604
605 protected function doGetSiblingQueuesWithJobs( array $types ) {
606 $dbr = $this->getReplicaDB();
607 // @note: this does not check whether the jobs are claimed or not.
608 // This is useful so JobQueueGroup::pop() also sees queues that only
609 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
610 // failed jobs so that they can be popped again for that edge case.
611 $res = $dbr->select( 'job', 'DISTINCT job_cmd',
612 [ 'job_cmd' => $types ], __METHOD__ );
613
614 $types = [];
615 foreach ( $res as $row ) {
616 $types[] = $row->job_cmd;
617 }
618
619 return $types;
620 }
621
622 protected function doGetSiblingQueueSizes( array $types ) {
623 $dbr = $this->getReplicaDB();
624 $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
625 [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
626
627 $sizes = [];
628 foreach ( $res as $row ) {
629 $sizes[$row->job_cmd] = (int)$row->count;
630 }
631
632 return $sizes;
633 }
634
640 public function recycleAndDeleteStaleJobs() {
641 $now = time();
642 $count = 0; // affected rows
643 $dbw = $this->getMasterDB();
644
645 try {
646 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
647 return $count; // already in progress
648 }
649
650 // Remove claims on jobs acquired for too long if enabled...
651 if ( $this->claimTTL > 0 ) {
652 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
653 // Get the IDs of jobs that have be claimed but not finished after too long.
654 // These jobs can be recycled into the queue by expiring the claim. Selecting
655 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
656 $res = $dbw->select( 'job', 'job_id',
657 [
658 'job_cmd' => $this->type,
659 "job_token != {$dbw->addQuotes( '' )}", // was acquired
660 "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
661 "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
662 __METHOD__
663 );
664 $ids = array_map(
665 function ( $o ) {
666 return $o->job_id;
667 }, iterator_to_array( $res )
668 );
669 if ( count( $ids ) ) {
670 // Reset job_token for these jobs so that other runners will pick them up.
671 // Set the timestamp to the current time, as it is useful to now that the job
672 // was already tried before (the timestamp becomes the "released" time).
673 $dbw->update( 'job',
674 [
675 'job_token' => '',
676 'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
677 [
678 'job_id' => $ids ],
679 __METHOD__
680 );
681 $affected = $dbw->affectedRows();
682 $count += $affected;
683 JobQueue::incrStats( 'recycles', $this->type, $affected );
684 $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
685 }
686 }
687
688 // Just destroy any stale jobs...
689 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
690 $conds = [
691 'job_cmd' => $this->type,
692 "job_token != {$dbw->addQuotes( '' )}", // was acquired
693 "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
694 ];
695 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
696 $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
697 }
698 // Get the IDs of jobs that are considered stale and should be removed. Selecting
699 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
700 $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
701 $ids = array_map(
702 function ( $o ) {
703 return $o->job_id;
704 }, iterator_to_array( $res )
705 );
706 if ( count( $ids ) ) {
707 $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
708 $affected = $dbw->affectedRows();
709 $count += $affected;
710 JobQueue::incrStats( 'abandons', $this->type, $affected );
711 }
712
713 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
714 } catch ( DBError $e ) {
715 $this->throwDBException( $e );
716 }
717
718 return $count;
719 }
720
726 protected function insertFields( IJobSpecification $job, IDatabase $db ) {
727 return [
728 // Fields that describe the nature of the job
729 'job_cmd' => $job->getType(),
730 'job_namespace' => $job->getTitle()->getNamespace(),
731 'job_title' => $job->getTitle()->getDBkey(),
732 'job_params' => self::makeBlob( $job->getParams() ),
733 // Additional job metadata
734 'job_timestamp' => $db->timestamp(),
735 'job_sha1' => Wikimedia\base_convert(
736 sha1( serialize( $job->getDeduplicationInfo() ) ),
737 16, 36, 31
738 ),
739 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
740 ];
741 }
742
747 protected function getReplicaDB() {
748 try {
749 return $this->getDB( DB_REPLICA );
750 } catch ( DBConnectionError $e ) {
751 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
752 }
753 }
754
759 protected function getMasterDB() {
760 try {
761 return $this->getDB( DB_MASTER );
762 } catch ( DBConnectionError $e ) {
763 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
764 }
765 }
766
771 protected function getDB( $index ) {
772 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
773 $lb = ( $this->cluster !== false )
774 ? $lbFactory->getExternalLB( $this->cluster )
775 : $lbFactory->getMainLB( $this->wiki );
776
777 return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
778 // Keep a separate connection to avoid contention and deadlocks;
779 // However, SQLite has the opposite behavior due to DB-level locking.
780 ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CONN_TRX_AUTOCOMMIT )
781 // Jobs insertion will be defered until the PRESEND stage to reduce contention.
782 : $lb->getConnectionRef( $index, [], $this->wiki );
783 }
784
789 private function getCacheKey( $property ) {
790 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
791
792 return $this->cache->makeGlobalKey(
793 'jobqueue', $this->wiki, $cluster, $this->type, $property );
794 }
795
800 protected static function makeBlob( $params ) {
801 if ( $params !== false ) {
802 return serialize( $params );
803 } else {
804 return '';
805 }
806 }
807
812 protected static function extractBlob( $blob ) {
813 if ( (string)$blob !== '' ) {
814 return unserialize( $blob );
815 } else {
816 return false;
817 }
818 }
819
824 protected function throwDBException( DBError $e ) {
825 throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
826 }
827
833 public static function selectFields() {
834 return [
835 'job_id',
836 'job_cmd',
837 'job_namespace',
838 'job_title',
839 'job_timestamp',
840 'job_params',
841 'job_random',
842 'job_attempts',
843 'job_token',
844 'job_token_timestamp',
845 'job_sha1',
846 ];
847 }
848}
serialize()
unserialize( $serialized)
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings,...
Definition Setup.php:121
Class to handle job queues stored in the DB.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
supportedOrders()
Get the allowed queue orders for configuration validation.
doAck(Job $job)
const MAX_JOB_RANDOM
doGetSiblingQueueSizes(array $types)
insertFields(IJobSpecification $job, IDatabase $db)
const MAX_OFFSET
WANObjectCache $cache
bool string $cluster
Name of an external DB cluster.
getCacheKey( $property)
__construct(array $params)
Additional parameters include:
const CACHE_TTL_SHORT
doBatchPush(array $jobs, $flags)
throwDBException(DBError $e)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doGetAbandonedCount()
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDB( $index)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
static extractBlob( $blob)
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
const MAX_AGE_PRUNE
getJobIterator(array $conds)
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
BagOStuff $dupCache
Definition JobQueue.php:46
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:705
const ROOTJOB_TTL
Definition JobQueue.php:52
string $type
Job type.
Definition JobQueue.php:35
getRootJobCacheKey( $signature)
Definition JobQueue.php:524
Class to both describe a background job and handle jobs.
Definition Job.php:30
static factory( $command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Definition Job.php:73
MediaWiki exception.
Convenience class for generating iterators from iterators.
MediaWikiServices is the service locator for the application scope of MediaWiki.
Multi-datacenter aware caching interface.
get( $key, &$curTTL=null, array $checkKeys=[], &$asOf=null)
Fetch the value of a key from cache.
set( $key, $value, $ttl=0, array $opts=[])
Set the value of a key in cache.
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition DBConnRef.php:15
Database error base class.
Definition DBError.php:30
$res
Definition database.txt:21
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition hooks.txt:2857
processing should stop and the error should be shown to the user * false
Definition hooks.txt:187
returning false will NOT prevent logging $e
Definition hooks.txt:2226
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback function
Definition injection.txt:30
Job queue task description interface.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by wfTimestamp() to the format used for inserting ...
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
you have access to all of the normal MediaWiki so you can get a DB use the cache
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition postgres.txt:36
const DB_REPLICA
Definition defines.php:25
const DB_MASTER
Definition defines.php:26
const DBO_TRX
Definition defines.php:12
if(count( $args)< 1) $job
$property
$params