MediaWiki  1.30.0
JobQueueDB.php
Go to the documentation of this file.
1 <?php
27 use Wikimedia\ScopedCallback;
28 
35 class JobQueueDB extends JobQueue {
36  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
37  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
38  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
39  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
40 
42  protected $cache;
43 
45  protected $cluster = false;
46 
55  protected function __construct( array $params ) {
56  parent::__construct( $params );
57 
58  $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
60  }
61 
62  protected function supportedOrders() {
63  return [ 'random', 'timestamp', 'fifo' ];
64  }
65 
66  protected function optimalOrder() {
67  return 'random';
68  }
69 
74  protected function doIsEmpty() {
75  $dbr = $this->getReplicaDB();
76  try {
77  $found = $dbr->selectField( // unclaimed job
78  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
79  );
80  } catch ( DBError $e ) {
81  $this->throwDBException( $e );
82  }
83 
84  return !$found;
85  }
86 
91  protected function doGetSize() {
92  $key = $this->getCacheKey( 'size' );
93 
94  $size = $this->cache->get( $key );
95  if ( is_int( $size ) ) {
96  return $size;
97  }
98 
99  try {
100  $dbr = $this->getReplicaDB();
101  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
102  [ 'job_cmd' => $this->type, 'job_token' => '' ],
103  __METHOD__
104  );
105  } catch ( DBError $e ) {
106  $this->throwDBException( $e );
107  }
108  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
109 
110  return $size;
111  }
112 
117  protected function doGetAcquiredCount() {
118  if ( $this->claimTTL <= 0 ) {
119  return 0; // no acknowledgements
120  }
121 
122  $key = $this->getCacheKey( 'acquiredcount' );
123 
124  $count = $this->cache->get( $key );
125  if ( is_int( $count ) ) {
126  return $count;
127  }
128 
129  $dbr = $this->getReplicaDB();
130  try {
131  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
132  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
133  __METHOD__
134  );
135  } catch ( DBError $e ) {
136  $this->throwDBException( $e );
137  }
138  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
139 
140  return $count;
141  }
142 
148  protected function doGetAbandonedCount() {
149  if ( $this->claimTTL <= 0 ) {
150  return 0; // no acknowledgements
151  }
152 
153  $key = $this->getCacheKey( 'abandonedcount' );
154 
155  $count = $this->cache->get( $key );
156  if ( is_int( $count ) ) {
157  return $count;
158  }
159 
160  $dbr = $this->getReplicaDB();
161  try {
162  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
163  [
164  'job_cmd' => $this->type,
165  "job_token != {$dbr->addQuotes( '' )}",
166  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
167  ],
168  __METHOD__
169  );
170  } catch ( DBError $e ) {
171  $this->throwDBException( $e );
172  }
173 
174  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
175 
176  return $count;
177  }
178 
186  protected function doBatchPush( array $jobs, $flags ) {
187  $dbw = $this->getMasterDB();
188  // In general, there will be two cases here:
189  // a) sqlite; DB connection is probably a regular round-aware handle.
190  // If the connection is busy with a transaction, then defer the job writes
191  // until right before the main round commit step. Any errors that bubble
192  // up will rollback the main commit round.
193  // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTO handle.
194  // No transaction is active nor will be started by writes, so enqueue the jobs
195  // now so that any errors will show up immediately as the interface expects. Any
196  // errors that bubble up will rollback the main commit round.
197  $fname = __METHOD__;
198  $dbw->onTransactionPreCommitOrIdle(
199  function () use ( $dbw, $jobs, $flags, $fname ) {
200  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
201  },
202  $fname
203  );
204  }
205 
216  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
217  if ( !count( $jobs ) ) {
218  return;
219  }
220 
221  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
222  $rowList = []; // list of jobs for jobs that are not de-duplicated
223  foreach ( $jobs as $job ) {
224  $row = $this->insertFields( $job );
225  if ( $job->ignoreDuplicates() ) {
226  $rowSet[$row['job_sha1']] = $row;
227  } else {
228  $rowList[] = $row;
229  }
230  }
231 
232  if ( $flags & self::QOS_ATOMIC ) {
233  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
234  }
235  try {
236  // Strip out any duplicate jobs that are already in the queue...
237  if ( count( $rowSet ) ) {
238  $res = $dbw->select( 'job', 'job_sha1',
239  [
240  // No job_type condition since it's part of the job_sha1 hash
241  'job_sha1' => array_keys( $rowSet ),
242  'job_token' => '' // unclaimed
243  ],
244  $method
245  );
246  foreach ( $res as $row ) {
247  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
248  unset( $rowSet[$row->job_sha1] ); // already enqueued
249  }
250  }
251  // Build the full list of job rows to insert
252  $rows = array_merge( $rowList, array_values( $rowSet ) );
253  // Insert the job rows in chunks to avoid replica DB lag...
254  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
255  $dbw->insert( 'job', $rowBatch, $method );
256  }
257  JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
258  JobQueue::incrStats( 'dupe_inserts', $this->type,
259  count( $rowSet ) + count( $rowList ) - count( $rows )
260  );
261  } catch ( DBError $e ) {
262  $this->throwDBException( $e );
263  }
264  if ( $flags & self::QOS_ATOMIC ) {
265  $dbw->endAtomic( $method );
266  }
267 
268  return;
269  }
270 
275  protected function doPop() {
276  $dbw = $this->getMasterDB();
277  try {
278  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
279  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
280  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
281  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
282  } );
283 
284  $uuid = wfRandomString( 32 ); // pop attempt
285  $job = false; // job popped off
286  do { // retry when our row is invalid or deleted as a duplicate
287  // Try to reserve a row in the DB...
288  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
289  $row = $this->claimOldest( $uuid );
290  } else { // random first
291  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
292  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
293  $row = $this->claimRandom( $uuid, $rand, $gte );
294  }
295  // Check if we found a row to reserve...
296  if ( !$row ) {
297  break; // nothing to do
298  }
299  JobQueue::incrStats( 'pops', $this->type );
300  // Get the job object from the row...
301  $title = Title::makeTitle( $row->job_namespace, $row->job_title );
302  $job = Job::factory( $row->job_cmd, $title,
303  self::extractBlob( $row->job_params ), $row->job_id );
304  $job->metadata['id'] = $row->job_id;
305  $job->metadata['timestamp'] = $row->job_timestamp;
306  break; // done
307  } while ( true );
308 
309  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
310  // Handled jobs that need to be recycled/deleted;
311  // any recycled jobs will be picked up next attempt
312  $this->recycleAndDeleteStaleJobs();
313  }
314  } catch ( DBError $e ) {
315  $this->throwDBException( $e );
316  }
317 
318  return $job;
319  }
320 
329  protected function claimRandom( $uuid, $rand, $gte ) {
330  $dbw = $this->getMasterDB();
331  // Check cache to see if the queue has <= OFFSET items
332  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
333 
334  $row = false; // the row acquired
335  $invertedDirection = false; // whether one job_random direction was already scanned
336  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
337  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
338  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
339  // be used here with MySQL.
340  do {
341  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
342  // For small queues, using OFFSET will overshoot and return no rows more often.
343  // Instead, this uses job_random to pick a row (possibly checking both directions).
344  $ineq = $gte ? '>=' : '<=';
345  $dir = $gte ? 'ASC' : 'DESC';
346  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
347  [
348  'job_cmd' => $this->type,
349  'job_token' => '', // unclaimed
350  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
351  __METHOD__,
352  [ 'ORDER BY' => "job_random {$dir}" ]
353  );
354  if ( !$row && !$invertedDirection ) {
355  $gte = !$gte;
356  $invertedDirection = true;
357  continue; // try the other direction
358  }
359  } else { // table *may* have >= MAX_OFFSET rows
360  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
361  // in MySQL if there are many rows for some reason. This uses a small OFFSET
362  // instead of job_random for reducing excess claim retries.
363  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
364  [
365  'job_cmd' => $this->type,
366  'job_token' => '', // unclaimed
367  ],
368  __METHOD__,
369  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
370  );
371  if ( !$row ) {
372  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
373  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
374  continue; // use job_random
375  }
376  }
377 
378  if ( $row ) { // claim the job
379  $dbw->update( 'job', // update by PK
380  [
381  'job_token' => $uuid,
382  'job_token_timestamp' => $dbw->timestamp(),
383  'job_attempts = job_attempts+1' ],
384  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
385  __METHOD__
386  );
387  // This might get raced out by another runner when claiming the previously
388  // selected row. The use of job_random should minimize this problem, however.
389  if ( !$dbw->affectedRows() ) {
390  $row = false; // raced out
391  }
392  } else {
393  break; // nothing to do
394  }
395  } while ( !$row );
396 
397  return $row;
398  }
399 
406  protected function claimOldest( $uuid ) {
407  $dbw = $this->getMasterDB();
408 
409  $row = false; // the row acquired
410  do {
411  if ( $dbw->getType() === 'mysql' ) {
412  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
413  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
414  // Oracle and Postgre have no such limitation. However, MySQL offers an
415  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
416  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
417  "SET " .
418  "job_token = {$dbw->addQuotes( $uuid ) }, " .
419  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
420  "job_attempts = job_attempts+1 " .
421  "WHERE ( " .
422  "job_cmd = {$dbw->addQuotes( $this->type )} " .
423  "AND job_token = {$dbw->addQuotes( '' )} " .
424  ") ORDER BY job_id ASC LIMIT 1",
425  __METHOD__
426  );
427  } else {
428  // Use a subquery to find the job, within an UPDATE to claim it.
429  // This uses as much of the DB wrapper functions as possible.
430  $dbw->update( 'job',
431  [
432  'job_token' => $uuid,
433  'job_token_timestamp' => $dbw->timestamp(),
434  'job_attempts = job_attempts+1' ],
435  [ 'job_id = (' .
436  $dbw->selectSQLText( 'job', 'job_id',
437  [ 'job_cmd' => $this->type, 'job_token' => '' ],
438  __METHOD__,
439  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
440  ')'
441  ],
442  __METHOD__
443  );
444  }
445  // Fetch any row that we just reserved...
446  if ( $dbw->affectedRows() ) {
447  $row = $dbw->selectRow( 'job', self::selectFields(),
448  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
449  );
450  if ( !$row ) { // raced out by duplicate job removal
451  wfDebug( "Row deleted as duplicate by another process.\n" );
452  }
453  } else {
454  break; // nothing to do
455  }
456  } while ( !$row );
457 
458  return $row;
459  }
460 
466  protected function doAck( Job $job ) {
467  if ( !isset( $job->metadata['id'] ) ) {
468  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
469  }
470 
471  $dbw = $this->getMasterDB();
472  try {
473  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
474  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
475  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
476  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
477  } );
478 
479  // Delete a row with a single DELETE without holding row locks over RTTs...
480  $dbw->delete( 'job',
481  [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
482 
483  JobQueue::incrStats( 'acks', $this->type );
484  } catch ( DBError $e ) {
485  $this->throwDBException( $e );
486  }
487  }
488 
496  $params = $job->getParams();
497  if ( !isset( $params['rootJobSignature'] ) ) {
498  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
499  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
500  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
501  }
502  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
503  // Callers should call batchInsert() and then this function so that if the insert
504  // fails, the de-duplication registration will be aborted. Since the insert is
505  // deferred till "transaction idle", do the same here, so that the ordering is
506  // maintained. Having only the de-duplication registration succeed would cause
507  // jobs to become no-ops without any actual jobs that made them redundant.
508  $dbw = $this->getMasterDB();
510  $dbw->onTransactionIdle(
511  function () use ( $cache, $params, $key, $dbw ) {
512  $timestamp = $cache->get( $key ); // current last timestamp of this job
513  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
514  return true; // a newer version of this root job was enqueued
515  }
516 
517  // Update the timestamp of the last root job started at the location...
518  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
519  },
520  __METHOD__
521  );
522 
523  return true;
524  }
525 
530  protected function doDelete() {
531  $dbw = $this->getMasterDB();
532  try {
533  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
534  } catch ( DBError $e ) {
535  $this->throwDBException( $e );
536  }
537 
538  return true;
539  }
540 
545  protected function doWaitForBackups() {
546  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
547  $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
548  }
549 
553  protected function doFlushCaches() {
554  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
555  $this->cache->delete( $this->getCacheKey( $type ) );
556  }
557  }
558 
563  public function getAllQueuedJobs() {
564  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
565  }
566 
571  public function getAllAcquiredJobs() {
572  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
573  }
574 
579  protected function getJobIterator( array $conds ) {
580  $dbr = $this->getReplicaDB();
581  try {
582  return new MappedIterator(
583  $dbr->select( 'job', self::selectFields(), $conds ),
584  function ( $row ) {
585  $job = Job::factory(
586  $row->job_cmd,
587  Title::makeTitle( $row->job_namespace, $row->job_title ),
588  strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
589  );
590  $job->metadata['id'] = $row->job_id;
591  $job->metadata['timestamp'] = $row->job_timestamp;
592 
593  return $job;
594  }
595  );
596  } catch ( DBError $e ) {
597  $this->throwDBException( $e );
598  }
599  }
600 
601  public function getCoalesceLocationInternal() {
602  return $this->cluster
603  ? "DBCluster:{$this->cluster}:{$this->wiki}"
604  : "LBFactory:{$this->wiki}";
605  }
606 
607  protected function doGetSiblingQueuesWithJobs( array $types ) {
608  $dbr = $this->getReplicaDB();
609  // @note: this does not check whether the jobs are claimed or not.
610  // This is useful so JobQueueGroup::pop() also sees queues that only
611  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
612  // failed jobs so that they can be popped again for that edge case.
613  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
614  [ 'job_cmd' => $types ], __METHOD__ );
615 
616  $types = [];
617  foreach ( $res as $row ) {
618  $types[] = $row->job_cmd;
619  }
620 
621  return $types;
622  }
623 
624  protected function doGetSiblingQueueSizes( array $types ) {
625  $dbr = $this->getReplicaDB();
626  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
627  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
628 
629  $sizes = [];
630  foreach ( $res as $row ) {
631  $sizes[$row->job_cmd] = (int)$row->count;
632  }
633 
634  return $sizes;
635  }
636 
642  public function recycleAndDeleteStaleJobs() {
643  $now = time();
644  $count = 0; // affected rows
645  $dbw = $this->getMasterDB();
646 
647  try {
648  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
649  return $count; // already in progress
650  }
651 
652  // Remove claims on jobs acquired for too long if enabled...
653  if ( $this->claimTTL > 0 ) {
654  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
655  // Get the IDs of jobs that have be claimed but not finished after too long.
656  // These jobs can be recycled into the queue by expiring the claim. Selecting
657  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
658  $res = $dbw->select( 'job', 'job_id',
659  [
660  'job_cmd' => $this->type,
661  "job_token != {$dbw->addQuotes( '' )}", // was acquired
662  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
663  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
664  __METHOD__
665  );
666  $ids = array_map(
667  function ( $o ) {
668  return $o->job_id;
669  }, iterator_to_array( $res )
670  );
671  if ( count( $ids ) ) {
672  // Reset job_token for these jobs so that other runners will pick them up.
673  // Set the timestamp to the current time, as it is useful to now that the job
674  // was already tried before (the timestamp becomes the "released" time).
675  $dbw->update( 'job',
676  [
677  'job_token' => '',
678  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
679  [
680  'job_id' => $ids ],
681  __METHOD__
682  );
683  $affected = $dbw->affectedRows();
684  $count += $affected;
685  JobQueue::incrStats( 'recycles', $this->type, $affected );
686  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
687  }
688  }
689 
690  // Just destroy any stale jobs...
691  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
692  $conds = [
693  'job_cmd' => $this->type,
694  "job_token != {$dbw->addQuotes( '' )}", // was acquired
695  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
696  ];
697  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
698  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
699  }
700  // Get the IDs of jobs that are considered stale and should be removed. Selecting
701  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
702  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
703  $ids = array_map(
704  function ( $o ) {
705  return $o->job_id;
706  }, iterator_to_array( $res )
707  );
708  if ( count( $ids ) ) {
709  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
710  $affected = $dbw->affectedRows();
711  $count += $affected;
712  JobQueue::incrStats( 'abandons', $this->type, $affected );
713  }
714 
715  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
716  } catch ( DBError $e ) {
717  $this->throwDBException( $e );
718  }
719 
720  return $count;
721  }
722 
727  protected function insertFields( IJobSpecification $job ) {
728  $dbw = $this->getMasterDB();
729 
730  return [
731  // Fields that describe the nature of the job
732  'job_cmd' => $job->getType(),
733  'job_namespace' => $job->getTitle()->getNamespace(),
734  'job_title' => $job->getTitle()->getDBkey(),
735  'job_params' => self::makeBlob( $job->getParams() ),
736  // Additional job metadata
737  'job_timestamp' => $dbw->timestamp(),
738  'job_sha1' => Wikimedia\base_convert(
739  sha1( serialize( $job->getDeduplicationInfo() ) ),
740  16, 36, 31
741  ),
742  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
743  ];
744  }
745 
750  protected function getReplicaDB() {
751  try {
752  return $this->getDB( DB_REPLICA );
753  } catch ( DBConnectionError $e ) {
754  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
755  }
756  }
757 
762  protected function getMasterDB() {
763  try {
764  return $this->getDB( DB_MASTER );
765  } catch ( DBConnectionError $e ) {
766  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
767  }
768  }
769 
774  protected function getDB( $index ) {
775  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
776  $lb = ( $this->cluster !== false )
777  ? $lbFactory->getExternalLB( $this->cluster )
778  : $lbFactory->getMainLB( $this->wiki );
779 
780  return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
781  // Keep a separate connection to avoid contention and deadlocks;
782  // However, SQLite has the opposite behavior due to DB-level locking.
783  ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CONN_TRX_AUTO )
784  // Jobs insertion will be defered until the PRESEND stage to reduce contention.
785  : $lb->getConnectionRef( $index, [], $this->wiki );
786  }
787 
792  private function getCacheKey( $property ) {
793  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
794  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
795 
796  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
797  }
798 
803  protected static function makeBlob( $params ) {
804  if ( $params !== false ) {
805  return serialize( $params );
806  } else {
807  return '';
808  }
809  }
810 
815  protected static function extractBlob( $blob ) {
816  if ( (string)$blob !== '' ) {
817  return unserialize( $blob );
818  } else {
819  return false;
820  }
821  }
822 
827  protected function throwDBException( DBError $e ) {
828  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
829  }
830 
836  public static function selectFields() {
837  return [
838  'job_id',
839  'job_cmd',
840  'job_namespace',
841  'job_title',
842  'job_timestamp',
843  'job_params',
844  'job_random',
845  'job_attempts',
846  'job_token',
847  'job_token_timestamp',
848  'job_sha1',
849  ];
850  }
851 }
JobQueueDB\MAX_AGE_PRUNE
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:37
JobQueueDB\doBatchPushInternal
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:216
MappedIterator
Convenience class for generating iterators from iterators.
Definition: MappedIterator.php:28
JobQueueDB\doWaitForBackups
doWaitForBackups()
Definition: JobQueueDB.php:545
JobQueueDB\doFlushCaches
doFlushCaches()
Definition: JobQueueDB.php:553
JobQueueDB\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:607
JobQueueDB\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueDB.php:601
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
JobQueueDB\getAllAcquiredJobs
getAllAcquiredJobs()
Definition: JobQueueDB.php:571
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:187
JobQueue\incrStats
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:709
JobQueueDB\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:624
JobQueueDB\throwDBException
throwDBException(DBError $e)
Definition: JobQueueDB.php:827
captcha-old.count
count
Definition: captcha-old.py:249
JobQueueDB\doGetSize
doGetSize()
Definition: JobQueueDB.php:91
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
Wikimedia\Rdbms\IDatabase\endAtomic
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
$fname
if(!defined( 'MEDIAWIKI')) $fname
This file is not a valid entry point, perform no further processing unless MEDIAWIKI is defined.
Definition: Setup.php:36
unserialize
unserialize( $serialized)
Definition: ApiMessage.php:185
JobQueueDB\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueDB.php:66
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:52
JobQueueDB\getDB
getDB( $index)
Definition: JobQueueDB.php:774
$params
$params
Definition: styleTest.css.php:40
JobQueueDB\__construct
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:55
WANObjectCache\set
set( $key, $value, $ttl=0, array $opts=[])
Set the value of a key in cache.
Definition: WANObjectCache.php:436
serialize
serialize()
Definition: ApiMessage.php:177
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:2823
JobQueueDB\getMasterDB
getMasterDB()
Definition: JobQueueDB.php:762
JobQueueDB\insertFields
insertFields(IJobSpecification $job)
Definition: JobQueueDB.php:727
$res
$res
Definition: database.txt:21
JobQueueDB
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:35
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueDB\CACHE_TTL_SHORT
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:36
Wikimedia\Rdbms\DBError
Database error base class.
Definition: DBError.php:30
Wikimedia\Rdbms\IDatabase\insert
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
DBO_TRX
const DBO_TRX
Definition: defines.php:12
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:40
JobQueueDB\getReplicaDB
getReplicaDB()
Definition: JobQueueDB.php:750
JobQueueDB\getJobIterator
getJobIterator(array $conds)
Definition: JobQueueDB.php:579
JobQueueDB\makeBlob
static makeBlob( $params)
Definition: JobQueueDB.php:803
JobQueueDB\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueDB.php:62
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
IJobSpecification\getType
getType()
Job\factory
static factory( $command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:68
MWException
MediaWiki exception.
Definition: MWException.php:26
$title
namespace and then decline to actually register it file or subcat img or subcat $title
Definition: hooks.txt:932
$property
$property
Definition: styleTest.css.php:44
JobQueueDB\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:495
$blob
$blob
Definition: testCompression.php:63
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:35
JobQueueDB\recycleAndDeleteStaleJobs
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:642
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:46
JobQueueDB\MAX_OFFSET
const MAX_OFFSET
Definition: JobQueueDB.php:39
JobQueueDB\doBatchPush
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:186
Title\makeTitle
static makeTitle( $ns, $title, $fragment='', $interwiki='')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:529
DB_REPLICA
const DB_REPLICA
Definition: defines.php:25
JobQueueDB\doGetAcquiredCount
doGetAcquiredCount()
Definition: JobQueueDB.php:117
JobQueueError
Definition: JobQueue.php:723
DB_MASTER
const DB_MASTER
Definition: defines.php:26
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
Definition: GlobalFunctions.php:2773
wfDebug
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
Definition: GlobalFunctions.php:1047
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
$dir
$dir
Definition: Autoload.php:8
JobQueueDB\extractBlob
static extractBlob( $blob)
Definition: JobQueueDB.php:815
JobQueueDB\doIsEmpty
doIsEmpty()
Definition: JobQueueDB.php:74
WANObjectCache\get
get( $key, &$curTTL=null, array $checkKeys=[], &$asOf=null)
Fetch the value of a key from cache.
Definition: WANObjectCache.php:248
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2141
JobQueueDB\MAX_JOB_RANDOM
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:38
JobQueueDB\doDelete
doDelete()
Definition: JobQueueDB.php:530
JobQueueDB\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueueDB.php:148
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:80
JobQueueDB\getAllQueuedJobs
getAllQueuedJobs()
Definition: JobQueueDB.php:563
JobQueueDB\selectFields
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:836
JobQueueConnectionError
Definition: JobQueue.php:726
JobQueueDB\claimOldest
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:406
JobQueueDB\doAck
doAck(Job $job)
Definition: JobQueueDB.php:466
Wikimedia\Rdbms\DBConnRef
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition: DBConnRef.php:15
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:528
$dbr
if(! $regexes) $dbr
Definition: cleanup.php:94
$rows
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition: hooks.txt:2581
ObjectCache\getMainWANInstance
static getMainWANInstance()
Get the main WAN cache object.
Definition: ObjectCache.php:370
JobQueueDB\getCacheKey
getCacheKey( $property)
Definition: JobQueueDB.php:792
JobQueueDB\$cache
WANObjectCache $cache
Definition: JobQueueDB.php:42
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
Wikimedia\Rdbms\DBConnectionError
Definition: DBConnectionError.php:26
Wikimedia\Rdbms\IDatabase\select
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
JobQueueDB\doPop
doPop()
Definition: JobQueueDB.php:275
order
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2801
JobQueueDB\$cluster
bool string $cluster
Name of an external DB cluster.
Definition: JobQueueDB.php:45
JobQueue\getType
getType()
Definition: JobQueue.php:131
array
the array() calling protocol came about after MediaWiki 1.4rc1.
Wikimedia\Rdbms\IDatabase\startAtomic
startAtomic( $fname=__METHOD__)
Begin an atomic section of statements.
wfRandomString
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Definition: GlobalFunctions.php:370
JobQueueDB\claimRandom
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:329