MediaWiki REL1_33
JobQueueDB.php
Go to the documentation of this file.
1<?php
27use Wikimedia\ScopedCallback;
28
35class JobQueueDB extends JobQueue {
36 const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
37 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
38 const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
39 const MAX_OFFSET = 255; // integer; maximum number of rows to skip
40
42 protected $cache;
44 protected $conn;
45
47 protected $server;
49 protected $cluster;
50
61 protected function __construct( array $params ) {
62 parent::__construct( $params );
63
64 if ( isset( $params['server'] ) ) {
65 $this->server = $params['server'];
66 } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
67 $this->cluster = $params['cluster'];
68 }
69
70 $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
71 }
72
73 protected function supportedOrders() {
74 return [ 'random', 'timestamp', 'fifo' ];
75 }
76
77 protected function optimalOrder() {
78 return 'random';
79 }
80
85 protected function doIsEmpty() {
86 $dbr = $this->getReplicaDB();
88 $scope = $this->getScopedNoTrxFlag( $dbr );
89 try {
90 $found = $dbr->selectField( // unclaimed job
91 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
92 );
93 } catch ( DBError $e ) {
94 $this->throwDBException( $e );
95 }
96
97 return !$found;
98 }
99
104 protected function doGetSize() {
105 $key = $this->getCacheKey( 'size' );
106
107 $size = $this->cache->get( $key );
108 if ( is_int( $size ) ) {
109 return $size;
110 }
111
112 $dbr = $this->getReplicaDB();
114 $scope = $this->getScopedNoTrxFlag( $dbr );
115 try {
116 $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
117 [ 'job_cmd' => $this->type, 'job_token' => '' ],
118 __METHOD__
119 );
120 } catch ( DBError $e ) {
121 $this->throwDBException( $e );
122 }
123 $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
124
125 return $size;
126 }
127
132 protected function doGetAcquiredCount() {
133 if ( $this->claimTTL <= 0 ) {
134 return 0; // no acknowledgements
135 }
136
137 $key = $this->getCacheKey( 'acquiredcount' );
138
139 $count = $this->cache->get( $key );
140 if ( is_int( $count ) ) {
141 return $count;
142 }
143
144 $dbr = $this->getReplicaDB();
146 $scope = $this->getScopedNoTrxFlag( $dbr );
147 try {
148 $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
149 [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
150 __METHOD__
151 );
152 } catch ( DBError $e ) {
153 $this->throwDBException( $e );
154 }
155 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
156
157 return $count;
158 }
159
165 protected function doGetAbandonedCount() {
166 if ( $this->claimTTL <= 0 ) {
167 return 0; // no acknowledgements
168 }
169
170 $key = $this->getCacheKey( 'abandonedcount' );
171
172 $count = $this->cache->get( $key );
173 if ( is_int( $count ) ) {
174 return $count;
175 }
176
177 $dbr = $this->getReplicaDB();
179 $scope = $this->getScopedNoTrxFlag( $dbr );
180 try {
181 $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
182 [
183 'job_cmd' => $this->type,
184 "job_token != {$dbr->addQuotes( '' )}",
185 "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
186 ],
187 __METHOD__
188 );
189 } catch ( DBError $e ) {
190 $this->throwDBException( $e );
191 }
192
193 $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
194
195 return $count;
196 }
197
205 protected function doBatchPush( array $jobs, $flags ) {
206 $dbw = $this->getMasterDB();
208 $scope = $this->getScopedNoTrxFlag( $dbw );
209 // In general, there will be two cases here:
210 // a) sqlite; DB connection is probably a regular round-aware handle.
211 // If the connection is busy with a transaction, then defer the job writes
212 // until right before the main round commit step. Any errors that bubble
213 // up will rollback the main commit round.
214 // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
215 // No transaction is active nor will be started by writes, so enqueue the jobs
216 // now so that any errors will show up immediately as the interface expects. Any
217 // errors that bubble up will rollback the main commit round.
218 $fname = __METHOD__;
219 $dbw->onTransactionPreCommitOrIdle(
220 function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
221 $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
222 },
223 $fname
224 );
225 }
226
238 public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
239 if ( $jobs === [] ) {
240 return;
241 }
242
243 $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
244 $rowList = []; // list of jobs for jobs that are not de-duplicated
245 foreach ( $jobs as $job ) {
246 $row = $this->insertFields( $job, $dbw );
247 if ( $job->ignoreDuplicates() ) {
248 $rowSet[$row['job_sha1']] = $row;
249 } else {
250 $rowList[] = $row;
251 }
252 }
253
254 if ( $flags & self::QOS_ATOMIC ) {
255 $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
256 }
257 try {
258 // Strip out any duplicate jobs that are already in the queue...
259 if ( count( $rowSet ) ) {
260 $res = $dbw->select( 'job', 'job_sha1',
261 [
262 // No job_type condition since it's part of the job_sha1 hash
263 'job_sha1' => array_keys( $rowSet ),
264 'job_token' => '' // unclaimed
265 ],
266 $method
267 );
268 foreach ( $res as $row ) {
269 wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
270 unset( $rowSet[$row->job_sha1] ); // already enqueued
271 }
272 }
273 // Build the full list of job rows to insert
274 $rows = array_merge( $rowList, array_values( $rowSet ) );
275 // Insert the job rows in chunks to avoid replica DB lag...
276 foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
277 $dbw->insert( 'job', $rowBatch, $method );
278 }
279 $this->incrStats( 'inserts', $this->type, count( $rows ) );
280 $this->incrStats( 'dupe_inserts', $this->type,
281 count( $rowSet ) + count( $rowList ) - count( $rows )
282 );
283 } catch ( DBError $e ) {
284 $this->throwDBException( $e );
285 }
286 if ( $flags & self::QOS_ATOMIC ) {
287 $dbw->endAtomic( $method );
288 }
289 }
290
295 protected function doPop() {
296 $dbw = $this->getMasterDB();
298 $scope = $this->getScopedNoTrxFlag( $dbw );
299
300 $job = false; // job popped off
301 try {
302 $uuid = wfRandomString( 32 ); // pop attempt
303 do { // retry when our row is invalid or deleted as a duplicate
304 // Try to reserve a row in the DB...
305 if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
306 $row = $this->claimOldest( $uuid );
307 } else { // random first
308 $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
309 $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
310 $row = $this->claimRandom( $uuid, $rand, $gte );
311 }
312 // Check if we found a row to reserve...
313 if ( !$row ) {
314 break; // nothing to do
315 }
316 $this->incrStats( 'pops', $this->type );
317 // Get the job object from the row...
318 $title = Title::makeTitle( $row->job_namespace, $row->job_title );
319 $job = Job::factory( $row->job_cmd, $title,
320 self::extractBlob( $row->job_params ) );
321 $job->setMetadata( 'id', $row->job_id );
322 $job->setMetadata( 'timestamp', $row->job_timestamp );
323 break; // done
324 } while ( true );
325
326 if ( !$job || mt_rand( 0, 9 ) == 0 ) {
327 // Handled jobs that need to be recycled/deleted;
328 // any recycled jobs will be picked up next attempt
330 }
331 } catch ( DBError $e ) {
332 $this->throwDBException( $e );
333 }
334
335 return $job;
336 }
337
346 protected function claimRandom( $uuid, $rand, $gte ) {
347 $dbw = $this->getMasterDB();
349 $scope = $this->getScopedNoTrxFlag( $dbw );
350 // Check cache to see if the queue has <= OFFSET items
351 $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
352
353 $row = false; // the row acquired
354 $invertedDirection = false; // whether one job_random direction was already scanned
355 // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
356 // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
357 // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
358 // be used here with MySQL.
359 do {
360 if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
361 // For small queues, using OFFSET will overshoot and return no rows more often.
362 // Instead, this uses job_random to pick a row (possibly checking both directions).
363 $ineq = $gte ? '>=' : '<=';
364 $dir = $gte ? 'ASC' : 'DESC';
365 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
366 [
367 'job_cmd' => $this->type,
368 'job_token' => '', // unclaimed
369 "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
370 __METHOD__,
371 [ 'ORDER BY' => "job_random {$dir}" ]
372 );
373 if ( !$row && !$invertedDirection ) {
374 $gte = !$gte;
375 $invertedDirection = true;
376 continue; // try the other direction
377 }
378 } else { // table *may* have >= MAX_OFFSET rows
379 // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
380 // in MySQL if there are many rows for some reason. This uses a small OFFSET
381 // instead of job_random for reducing excess claim retries.
382 $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
383 [
384 'job_cmd' => $this->type,
385 'job_token' => '', // unclaimed
386 ],
387 __METHOD__,
388 [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
389 );
390 if ( !$row ) {
391 $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
392 $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
393 continue; // use job_random
394 }
395 }
396
397 if ( $row ) { // claim the job
398 $dbw->update( 'job', // update by PK
399 [
400 'job_token' => $uuid,
401 'job_token_timestamp' => $dbw->timestamp(),
402 'job_attempts = job_attempts+1' ],
403 [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
404 __METHOD__
405 );
406 // This might get raced out by another runner when claiming the previously
407 // selected row. The use of job_random should minimize this problem, however.
408 if ( !$dbw->affectedRows() ) {
409 $row = false; // raced out
410 }
411 } else {
412 break; // nothing to do
413 }
414 } while ( !$row );
415
416 return $row;
417 }
418
425 protected function claimOldest( $uuid ) {
426 $dbw = $this->getMasterDB();
428 $scope = $this->getScopedNoTrxFlag( $dbw );
429
430 $row = false; // the row acquired
431 do {
432 if ( $dbw->getType() === 'mysql' ) {
433 // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
434 // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
435 // Oracle and Postgre have no such limitation. However, MySQL offers an
436 // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
437 $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
438 "SET " .
439 "job_token = {$dbw->addQuotes( $uuid ) }, " .
440 "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
441 "job_attempts = job_attempts+1 " .
442 "WHERE ( " .
443 "job_cmd = {$dbw->addQuotes( $this->type )} " .
444 "AND job_token = {$dbw->addQuotes( '' )} " .
445 ") ORDER BY job_id ASC LIMIT 1",
446 __METHOD__
447 );
448 } else {
449 // Use a subquery to find the job, within an UPDATE to claim it.
450 // This uses as much of the DB wrapper functions as possible.
451 $dbw->update( 'job',
452 [
453 'job_token' => $uuid,
454 'job_token_timestamp' => $dbw->timestamp(),
455 'job_attempts = job_attempts+1' ],
456 [ 'job_id = (' .
457 $dbw->selectSQLText( 'job', 'job_id',
458 [ 'job_cmd' => $this->type, 'job_token' => '' ],
459 __METHOD__,
460 [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
461 ')'
462 ],
463 __METHOD__
464 );
465 }
466 // Fetch any row that we just reserved...
467 if ( $dbw->affectedRows() ) {
468 $row = $dbw->selectRow( 'job', self::selectFields(),
469 [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
470 );
471 if ( !$row ) { // raced out by duplicate job removal
472 wfDebug( "Row deleted as duplicate by another process.\n" );
473 }
474 } else {
475 break; // nothing to do
476 }
477 } while ( !$row );
478
479 return $row;
480 }
481
487 protected function doAck( Job $job ) {
488 $id = $job->getMetadata( 'id' );
489 if ( $id === null ) {
490 throw new MWException( "Job of type '{$job->getType()}' has no ID." );
491 }
492
493 $dbw = $this->getMasterDB();
495 $scope = $this->getScopedNoTrxFlag( $dbw );
496 try {
497 // Delete a row with a single DELETE without holding row locks over RTTs...
498 $dbw->delete(
499 'job',
500 [ 'job_cmd' => $this->type, 'job_id' => $id ],
501 __METHOD__
502 );
503
504 $this->incrStats( 'acks', $this->type );
505 } catch ( DBError $e ) {
506 $this->throwDBException( $e );
507 }
508 }
509
517 $params = $job->getParams();
518 if ( !isset( $params['rootJobSignature'] ) ) {
519 throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
520 } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
521 throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
522 }
523 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
524 // Callers should call JobQueueGroup::push() before this method so that if the insert
525 // fails, the de-duplication registration will be aborted. Since the insert is
526 // deferred till "transaction idle", do the same here, so that the ordering is
527 // maintained. Having only the de-duplication registration succeed would cause
528 // jobs to become no-ops without any actual jobs that made them redundant.
529 $dbw = $this->getMasterDB();
531 $scope = $this->getScopedNoTrxFlag( $dbw );
532
534 $dbw->onTransactionCommitOrIdle(
535 function () use ( $cache, $params, $key ) {
536 $timestamp = $cache->get( $key ); // current last timestamp of this job
537 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
538 return true; // a newer version of this root job was enqueued
539 }
540
541 // Update the timestamp of the last root job started at the location...
542 return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
543 },
544 __METHOD__
545 );
546
547 return true;
548 }
549
554 protected function doDelete() {
555 $dbw = $this->getMasterDB();
557 $scope = $this->getScopedNoTrxFlag( $dbw );
558 try {
559 $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
560 } catch ( DBError $e ) {
561 $this->throwDBException( $e );
562 }
563
564 return true;
565 }
566
571 protected function doWaitForBackups() {
572 if ( $this->server ) {
573 return; // not using LBFactory instance
574 }
575
576 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
577 $lbFactory->waitForReplication( [
578 'domain' => $this->domain,
579 'cluster' => is_string( $this->cluster ) ? $this->cluster : false
580 ] );
581 }
582
586 protected function doFlushCaches() {
587 foreach ( [ 'size', 'acquiredcount' ] as $type ) {
588 $this->cache->delete( $this->getCacheKey( $type ) );
589 }
590 }
591
596 public function getAllQueuedJobs() {
597 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
598 }
599
604 public function getAllAcquiredJobs() {
605 return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
606 }
607
612 protected function getJobIterator( array $conds ) {
613 $dbr = $this->getReplicaDB();
615 $scope = $this->getScopedNoTrxFlag( $dbr );
616 try {
617 return new MappedIterator(
618 $dbr->select( 'job', self::selectFields(), $conds ),
619 function ( $row ) {
621 $row->job_cmd,
622 Title::makeTitle( $row->job_namespace, $row->job_title ),
623 strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
624 );
625 $job->setMetadata( 'id', $row->job_id );
626 $job->setMetadata( 'timestamp', $row->job_timestamp );
627
628 return $job;
629 }
630 );
631 } catch ( DBError $e ) {
632 $this->throwDBException( $e );
633 }
634 }
635
636 public function getCoalesceLocationInternal() {
637 if ( $this->server ) {
638 return null; // not using the LBFactory instance
639 }
640
641 return is_string( $this->cluster )
642 ? "DBCluster:{$this->cluster}:{$this->domain}"
643 : "LBFactory:{$this->domain}";
644 }
645
646 protected function doGetSiblingQueuesWithJobs( array $types ) {
647 $dbr = $this->getReplicaDB();
649 $scope = $this->getScopedNoTrxFlag( $dbr );
650 // @note: this does not check whether the jobs are claimed or not.
651 // This is useful so JobQueueGroup::pop() also sees queues that only
652 // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
653 // failed jobs so that they can be popped again for that edge case.
654 $res = $dbr->select( 'job', 'DISTINCT job_cmd',
655 [ 'job_cmd' => $types ], __METHOD__ );
656
657 $types = [];
658 foreach ( $res as $row ) {
659 $types[] = $row->job_cmd;
660 }
661
662 return $types;
663 }
664
665 protected function doGetSiblingQueueSizes( array $types ) {
666 $dbr = $this->getReplicaDB();
668 $scope = $this->getScopedNoTrxFlag( $dbr );
669
670 $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
671 [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
672
673 $sizes = [];
674 foreach ( $res as $row ) {
675 $sizes[$row->job_cmd] = (int)$row->count;
676 }
677
678 return $sizes;
679 }
680
686 public function recycleAndDeleteStaleJobs() {
687 $now = time();
688 $count = 0; // affected rows
689 $dbw = $this->getMasterDB();
691 $scope = $this->getScopedNoTrxFlag( $dbw );
692
693 try {
694 if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
695 return $count; // already in progress
696 }
697
698 // Remove claims on jobs acquired for too long if enabled...
699 if ( $this->claimTTL > 0 ) {
700 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
701 // Get the IDs of jobs that have be claimed but not finished after too long.
702 // These jobs can be recycled into the queue by expiring the claim. Selecting
703 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
704 $res = $dbw->select( 'job', 'job_id',
705 [
706 'job_cmd' => $this->type,
707 "job_token != {$dbw->addQuotes( '' )}", // was acquired
708 "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
709 "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
710 __METHOD__
711 );
712 $ids = array_map(
713 function ( $o ) {
714 return $o->job_id;
715 }, iterator_to_array( $res )
716 );
717 if ( count( $ids ) ) {
718 // Reset job_token for these jobs so that other runners will pick them up.
719 // Set the timestamp to the current time, as it is useful to now that the job
720 // was already tried before (the timestamp becomes the "released" time).
721 $dbw->update( 'job',
722 [
723 'job_token' => '',
724 'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
725 [
726 'job_id' => $ids ],
727 __METHOD__
728 );
729 $affected = $dbw->affectedRows();
730 $count += $affected;
731 $this->incrStats( 'recycles', $this->type, $affected );
732 }
733 }
734
735 // Just destroy any stale jobs...
736 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
737 $conds = [
738 'job_cmd' => $this->type,
739 "job_token != {$dbw->addQuotes( '' )}", // was acquired
740 "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
741 ];
742 if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
743 $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
744 }
745 // Get the IDs of jobs that are considered stale and should be removed. Selecting
746 // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
747 $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
748 $ids = array_map(
749 function ( $o ) {
750 return $o->job_id;
751 }, iterator_to_array( $res )
752 );
753 if ( count( $ids ) ) {
754 $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
755 $affected = $dbw->affectedRows();
756 $count += $affected;
757 $this->incrStats( 'abandons', $this->type, $affected );
758 }
759
760 $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
761 } catch ( DBError $e ) {
762 $this->throwDBException( $e );
763 }
764
765 return $count;
766 }
767
773 protected function insertFields( IJobSpecification $job, IDatabase $db ) {
774 return [
775 // Fields that describe the nature of the job
776 'job_cmd' => $job->getType(),
777 'job_namespace' => $job->getTitle()->getNamespace(),
778 'job_title' => $job->getTitle()->getDBkey(),
779 'job_params' => self::makeBlob( $job->getParams() ),
780 // Additional job metadata
781 'job_timestamp' => $db->timestamp(),
782 'job_sha1' => Wikimedia\base_convert(
783 sha1( serialize( $job->getDeduplicationInfo() ) ),
784 16, 36, 31
785 ),
786 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
787 ];
788 }
789
794 protected function getReplicaDB() {
795 try {
796 return $this->getDB( DB_REPLICA );
797 } catch ( DBConnectionError $e ) {
798 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
799 }
800 }
801
806 protected function getMasterDB() {
807 try {
808 return $this->getDB( DB_MASTER );
809 } catch ( DBConnectionError $e ) {
810 throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
811 }
812 }
813
818 protected function getDB( $index ) {
819 if ( $this->server ) {
820 if ( $this->conn instanceof IDatabase ) {
821 return $this->conn;
822 } elseif ( $this->conn instanceof DBError ) {
823 throw $this->conn;
824 }
825
826 try {
827 $this->conn = Database::factory( $this->server['type'], $this->server );
828 } catch ( DBError $e ) {
829 $this->conn = $e;
830 throw $e;
831 }
832
833 return $this->conn;
834 } else {
835 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
836 $lb = is_string( $this->cluster )
837 ? $lbFactory->getExternalLB( $this->cluster )
838 : $lbFactory->getMainLB( $this->domain );
839
840 return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
841 // Keep a separate connection to avoid contention and deadlocks;
842 // However, SQLite has the opposite behavior due to DB-level locking.
843 ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
844 // Jobs insertion will be defered until the PRESEND stage to reduce contention.
845 : $lb->getConnectionRef( $index, [], $this->domain );
846 }
847 }
848
853 private function getScopedNoTrxFlag( IDatabase $db ) {
854 $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
855 $db->clearFlag( DBO_TRX ); // make each query its own transaction
856
857 return new ScopedCallback( function () use ( $db, $autoTrx ) {
858 if ( $autoTrx ) {
859 $db->setFlag( DBO_TRX ); // restore old setting
860 }
861 } );
862 }
863
868 private function getCacheKey( $property ) {
869 $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
870
871 return $this->cache->makeGlobalKey(
872 'jobqueue',
873 $this->domain,
874 $cluster,
875 $this->type,
877 );
878 }
879
884 protected static function makeBlob( $params ) {
885 if ( $params !== false ) {
886 return serialize( $params );
887 } else {
888 return '';
889 }
890 }
891
896 protected static function extractBlob( $blob ) {
897 if ( (string)$blob !== '' ) {
898 return unserialize( $blob );
899 } else {
900 return false;
901 }
902 }
903
908 protected function throwDBException( DBError $e ) {
909 throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
910 }
911
917 public static function selectFields() {
918 return [
919 'job_id',
920 'job_cmd',
921 'job_namespace',
922 'job_title',
923 'job_timestamp',
924 'job_params',
925 'job_random',
926 'job_attempts',
927 'job_token',
928 'job_token_timestamp',
929 'job_sha1',
930 ];
931 }
932}
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
serialize()
unserialize( $serialized)
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings,...
Definition Setup.php:123
Class to handle job queues stored in the DB.
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
supportedOrders()
Get the allowed queue orders for configuration validation.
doAck(Job $job)
const MAX_JOB_RANDOM
doGetSiblingQueueSizes(array $types)
insertFields(IJobSpecification $job, IDatabase $db)
const MAX_OFFSET
WANObjectCache $cache
getCacheKey( $property)
__construct(array $params)
Additional parameters include:
const CACHE_TTL_SHORT
doBatchPush(array $jobs, $flags)
throwDBException(DBError $e)
static makeBlob( $params)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doGetSiblingQueuesWithJobs(array $types)
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
doGetAbandonedCount()
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
getDB( $index)
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
getScopedNoTrxFlag(IDatabase $db)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
static extractBlob( $blob)
IDatabase DBError null $conn
doDeduplicateRootJob(IJobSpecification $job)
static selectFields()
Return the list of job fields that should be selected.
const MAX_AGE_PRUNE
getJobIterator(array $conds)
array null $server
Server configuration array.
Class to handle enqueueing and running of background jobs.
Definition JobQueue.php:31
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition JobQueue.php:706
BagOStuff $dupCache
Definition JobQueue.php:48
const ROOTJOB_TTL
Definition JobQueue.php:52
string $type
Job type.
Definition JobQueue.php:35
getRootJobCacheKey( $signature)
Definition JobQueue.php:521
Class to both describe a background job and handle jobs.
Definition Job.php:30
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition Job.php:72
MediaWiki exception.
Convenience class for generating iterators from iterators.
MediaWikiServices is the service locator for the application scope of MediaWiki.
Multi-datacenter aware caching interface.
set( $key, $value, $ttl=self::TTL_INDEFINITE, array $opts=[])
Set the value of a key in cache.
get( $key, &$curTTL=null, array $checkKeys=[], &$info=null)
Fetch the value of a key from cache.
Database error base class.
Definition DBError.php:30
Relational database abstraction object.
Definition Database.php:49
$res
Definition database.txt:21
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such and we might be restricted by PHP settings such as safe mode or open_basedir We cannot assume that the software even has read access anywhere useful Many shared hosts run all users web applications under the same so they can t rely on Unix and must forbid reads to even standard directories like tmp lest users read each others files We cannot assume that the user has the ability to install or run any programs not written as web accessible PHP scripts Since anything that works on cheap shared hosting will work if you have shell or root access MediaWiki s design is based around catering to the lowest common denominator Although we support higher end setups as the way many things work by default is tailored toward shared hosting These defaults are unconventional from the point of view of and they certainly aren t ideal for someone who s installing MediaWiki as MediaWiki does not conform to normal Unix filesystem layout Hopefully we ll offer direct support for standard layouts in the but for now *any change to the location of files is unsupported *Moving things and leaving symlinks will *probably *not break but it is *strongly *advised not to try any more intrusive changes to get MediaWiki to conform more closely to your filesystem hierarchy Any such attempt will almost certainly result in unnecessary bugs The standard recommended location to install relative to the web is it should be possible to enable the appropriate rewrite rules by if you can reconfigure the web server
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition hooks.txt:2818
namespace and then decline to actually register it file or subcat img or subcat $title
Definition hooks.txt:955
processing should stop and the error should be shown to the user * false
Definition hooks.txt:187
returning false will NOT prevent logging $e
Definition hooks.txt:2175
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback function
Definition injection.txt:30
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition injection.txt:37
Job queue task description interface.
Basic database interface for live and lazy-loaded relation database handles.
Definition IDatabase.php:38
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by wfTimestamp() to the format used for inserting ...
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
you have access to all of the normal MediaWiki so you can get a DB use the cache
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition postgres.txt:30
const DB_REPLICA
Definition defines.php:25
const DB_MASTER
Definition defines.php:26
const DBO_TRX
Definition defines.php:12
if(count( $args)< 1) $job
$property
$params