MediaWiki  1.33.0
JobQueueDB.php
Go to the documentation of this file.
1 <?php
27 use Wikimedia\ScopedCallback;
28 
35 class JobQueueDB extends JobQueue {
36  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
37  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
38  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
39  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
40 
42  protected $cache;
44  protected $conn;
45 
47  protected $server;
49  protected $cluster;
50 
61  protected function __construct( array $params ) {
62  parent::__construct( $params );
63 
64  if ( isset( $params['server'] ) ) {
65  $this->server = $params['server'];
66  } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
67  $this->cluster = $params['cluster'];
68  }
69 
70  $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
71  }
72 
73  protected function supportedOrders() {
74  return [ 'random', 'timestamp', 'fifo' ];
75  }
76 
77  protected function optimalOrder() {
78  return 'random';
79  }
80 
85  protected function doIsEmpty() {
86  $dbr = $this->getReplicaDB();
88  $scope = $this->getScopedNoTrxFlag( $dbr );
89  try {
90  $found = $dbr->selectField( // unclaimed job
91  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
92  );
93  } catch ( DBError $e ) {
94  $this->throwDBException( $e );
95  }
96 
97  return !$found;
98  }
99 
104  protected function doGetSize() {
105  $key = $this->getCacheKey( 'size' );
106 
107  $size = $this->cache->get( $key );
108  if ( is_int( $size ) ) {
109  return $size;
110  }
111 
112  $dbr = $this->getReplicaDB();
114  $scope = $this->getScopedNoTrxFlag( $dbr );
115  try {
116  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
117  [ 'job_cmd' => $this->type, 'job_token' => '' ],
118  __METHOD__
119  );
120  } catch ( DBError $e ) {
121  $this->throwDBException( $e );
122  }
123  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
124 
125  return $size;
126  }
127 
132  protected function doGetAcquiredCount() {
133  if ( $this->claimTTL <= 0 ) {
134  return 0; // no acknowledgements
135  }
136 
137  $key = $this->getCacheKey( 'acquiredcount' );
138 
139  $count = $this->cache->get( $key );
140  if ( is_int( $count ) ) {
141  return $count;
142  }
143 
144  $dbr = $this->getReplicaDB();
146  $scope = $this->getScopedNoTrxFlag( $dbr );
147  try {
148  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
149  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
150  __METHOD__
151  );
152  } catch ( DBError $e ) {
153  $this->throwDBException( $e );
154  }
155  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
156 
157  return $count;
158  }
159 
165  protected function doGetAbandonedCount() {
166  if ( $this->claimTTL <= 0 ) {
167  return 0; // no acknowledgements
168  }
169 
170  $key = $this->getCacheKey( 'abandonedcount' );
171 
172  $count = $this->cache->get( $key );
173  if ( is_int( $count ) ) {
174  return $count;
175  }
176 
177  $dbr = $this->getReplicaDB();
179  $scope = $this->getScopedNoTrxFlag( $dbr );
180  try {
181  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
182  [
183  'job_cmd' => $this->type,
184  "job_token != {$dbr->addQuotes( '' )}",
185  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
186  ],
187  __METHOD__
188  );
189  } catch ( DBError $e ) {
190  $this->throwDBException( $e );
191  }
192 
193  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
194 
195  return $count;
196  }
197 
205  protected function doBatchPush( array $jobs, $flags ) {
206  $dbw = $this->getMasterDB();
208  $scope = $this->getScopedNoTrxFlag( $dbw );
209  // In general, there will be two cases here:
210  // a) sqlite; DB connection is probably a regular round-aware handle.
211  // If the connection is busy with a transaction, then defer the job writes
212  // until right before the main round commit step. Any errors that bubble
213  // up will rollback the main commit round.
214  // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
215  // No transaction is active nor will be started by writes, so enqueue the jobs
216  // now so that any errors will show up immediately as the interface expects. Any
217  // errors that bubble up will rollback the main commit round.
218  $fname = __METHOD__;
219  $dbw->onTransactionPreCommitOrIdle(
220  function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
221  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
222  },
223  $fname
224  );
225  }
226 
238  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
239  if ( $jobs === [] ) {
240  return;
241  }
242 
243  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
244  $rowList = []; // list of jobs for jobs that are not de-duplicated
245  foreach ( $jobs as $job ) {
246  $row = $this->insertFields( $job, $dbw );
247  if ( $job->ignoreDuplicates() ) {
248  $rowSet[$row['job_sha1']] = $row;
249  } else {
250  $rowList[] = $row;
251  }
252  }
253 
254  if ( $flags & self::QOS_ATOMIC ) {
255  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
256  }
257  try {
258  // Strip out any duplicate jobs that are already in the queue...
259  if ( count( $rowSet ) ) {
260  $res = $dbw->select( 'job', 'job_sha1',
261  [
262  // No job_type condition since it's part of the job_sha1 hash
263  'job_sha1' => array_keys( $rowSet ),
264  'job_token' => '' // unclaimed
265  ],
266  $method
267  );
268  foreach ( $res as $row ) {
269  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
270  unset( $rowSet[$row->job_sha1] ); // already enqueued
271  }
272  }
273  // Build the full list of job rows to insert
274  $rows = array_merge( $rowList, array_values( $rowSet ) );
275  // Insert the job rows in chunks to avoid replica DB lag...
276  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
277  $dbw->insert( 'job', $rowBatch, $method );
278  }
279  $this->incrStats( 'inserts', $this->type, count( $rows ) );
280  $this->incrStats( 'dupe_inserts', $this->type,
281  count( $rowSet ) + count( $rowList ) - count( $rows )
282  );
283  } catch ( DBError $e ) {
284  $this->throwDBException( $e );
285  }
286  if ( $flags & self::QOS_ATOMIC ) {
287  $dbw->endAtomic( $method );
288  }
289  }
290 
295  protected function doPop() {
296  $dbw = $this->getMasterDB();
298  $scope = $this->getScopedNoTrxFlag( $dbw );
299 
300  $job = false; // job popped off
301  try {
302  $uuid = wfRandomString( 32 ); // pop attempt
303  do { // retry when our row is invalid or deleted as a duplicate
304  // Try to reserve a row in the DB...
305  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
306  $row = $this->claimOldest( $uuid );
307  } else { // random first
308  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
309  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
310  $row = $this->claimRandom( $uuid, $rand, $gte );
311  }
312  // Check if we found a row to reserve...
313  if ( !$row ) {
314  break; // nothing to do
315  }
316  $this->incrStats( 'pops', $this->type );
317  // Get the job object from the row...
318  $title = Title::makeTitle( $row->job_namespace, $row->job_title );
319  $job = Job::factory( $row->job_cmd, $title,
320  self::extractBlob( $row->job_params ) );
321  $job->setMetadata( 'id', $row->job_id );
322  $job->setMetadata( 'timestamp', $row->job_timestamp );
323  break; // done
324  } while ( true );
325 
326  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
327  // Handled jobs that need to be recycled/deleted;
328  // any recycled jobs will be picked up next attempt
329  $this->recycleAndDeleteStaleJobs();
330  }
331  } catch ( DBError $e ) {
332  $this->throwDBException( $e );
333  }
334 
335  return $job;
336  }
337 
346  protected function claimRandom( $uuid, $rand, $gte ) {
347  $dbw = $this->getMasterDB();
349  $scope = $this->getScopedNoTrxFlag( $dbw );
350  // Check cache to see if the queue has <= OFFSET items
351  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
352 
353  $row = false; // the row acquired
354  $invertedDirection = false; // whether one job_random direction was already scanned
355  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
356  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
357  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
358  // be used here with MySQL.
359  do {
360  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
361  // For small queues, using OFFSET will overshoot and return no rows more often.
362  // Instead, this uses job_random to pick a row (possibly checking both directions).
363  $ineq = $gte ? '>=' : '<=';
364  $dir = $gte ? 'ASC' : 'DESC';
365  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
366  [
367  'job_cmd' => $this->type,
368  'job_token' => '', // unclaimed
369  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
370  __METHOD__,
371  [ 'ORDER BY' => "job_random {$dir}" ]
372  );
373  if ( !$row && !$invertedDirection ) {
374  $gte = !$gte;
375  $invertedDirection = true;
376  continue; // try the other direction
377  }
378  } else { // table *may* have >= MAX_OFFSET rows
379  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
380  // in MySQL if there are many rows for some reason. This uses a small OFFSET
381  // instead of job_random for reducing excess claim retries.
382  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
383  [
384  'job_cmd' => $this->type,
385  'job_token' => '', // unclaimed
386  ],
387  __METHOD__,
388  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
389  );
390  if ( !$row ) {
391  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
392  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
393  continue; // use job_random
394  }
395  }
396 
397  if ( $row ) { // claim the job
398  $dbw->update( 'job', // update by PK
399  [
400  'job_token' => $uuid,
401  'job_token_timestamp' => $dbw->timestamp(),
402  'job_attempts = job_attempts+1' ],
403  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
404  __METHOD__
405  );
406  // This might get raced out by another runner when claiming the previously
407  // selected row. The use of job_random should minimize this problem, however.
408  if ( !$dbw->affectedRows() ) {
409  $row = false; // raced out
410  }
411  } else {
412  break; // nothing to do
413  }
414  } while ( !$row );
415 
416  return $row;
417  }
418 
425  protected function claimOldest( $uuid ) {
426  $dbw = $this->getMasterDB();
428  $scope = $this->getScopedNoTrxFlag( $dbw );
429 
430  $row = false; // the row acquired
431  do {
432  if ( $dbw->getType() === 'mysql' ) {
433  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
434  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
435  // Oracle and Postgre have no such limitation. However, MySQL offers an
436  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
437  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
438  "SET " .
439  "job_token = {$dbw->addQuotes( $uuid ) }, " .
440  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
441  "job_attempts = job_attempts+1 " .
442  "WHERE ( " .
443  "job_cmd = {$dbw->addQuotes( $this->type )} " .
444  "AND job_token = {$dbw->addQuotes( '' )} " .
445  ") ORDER BY job_id ASC LIMIT 1",
446  __METHOD__
447  );
448  } else {
449  // Use a subquery to find the job, within an UPDATE to claim it.
450  // This uses as much of the DB wrapper functions as possible.
451  $dbw->update( 'job',
452  [
453  'job_token' => $uuid,
454  'job_token_timestamp' => $dbw->timestamp(),
455  'job_attempts = job_attempts+1' ],
456  [ 'job_id = (' .
457  $dbw->selectSQLText( 'job', 'job_id',
458  [ 'job_cmd' => $this->type, 'job_token' => '' ],
459  __METHOD__,
460  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
461  ')'
462  ],
463  __METHOD__
464  );
465  }
466  // Fetch any row that we just reserved...
467  if ( $dbw->affectedRows() ) {
468  $row = $dbw->selectRow( 'job', self::selectFields(),
469  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
470  );
471  if ( !$row ) { // raced out by duplicate job removal
472  wfDebug( "Row deleted as duplicate by another process.\n" );
473  }
474  } else {
475  break; // nothing to do
476  }
477  } while ( !$row );
478 
479  return $row;
480  }
481 
487  protected function doAck( Job $job ) {
488  $id = $job->getMetadata( 'id' );
489  if ( $id === null ) {
490  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
491  }
492 
493  $dbw = $this->getMasterDB();
495  $scope = $this->getScopedNoTrxFlag( $dbw );
496  try {
497  // Delete a row with a single DELETE without holding row locks over RTTs...
498  $dbw->delete(
499  'job',
500  [ 'job_cmd' => $this->type, 'job_id' => $id ],
501  __METHOD__
502  );
503 
504  $this->incrStats( 'acks', $this->type );
505  } catch ( DBError $e ) {
506  $this->throwDBException( $e );
507  }
508  }
509 
517  $params = $job->getParams();
518  if ( !isset( $params['rootJobSignature'] ) ) {
519  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
520  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
521  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
522  }
523  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
524  // Callers should call JobQueueGroup::push() before this method so that if the insert
525  // fails, the de-duplication registration will be aborted. Since the insert is
526  // deferred till "transaction idle", do the same here, so that the ordering is
527  // maintained. Having only the de-duplication registration succeed would cause
528  // jobs to become no-ops without any actual jobs that made them redundant.
529  $dbw = $this->getMasterDB();
531  $scope = $this->getScopedNoTrxFlag( $dbw );
532 
534  $dbw->onTransactionCommitOrIdle(
535  function () use ( $cache, $params, $key ) {
536  $timestamp = $cache->get( $key ); // current last timestamp of this job
537  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
538  return true; // a newer version of this root job was enqueued
539  }
540 
541  // Update the timestamp of the last root job started at the location...
542  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
543  },
544  __METHOD__
545  );
546 
547  return true;
548  }
549 
554  protected function doDelete() {
555  $dbw = $this->getMasterDB();
557  $scope = $this->getScopedNoTrxFlag( $dbw );
558  try {
559  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
560  } catch ( DBError $e ) {
561  $this->throwDBException( $e );
562  }
563 
564  return true;
565  }
566 
571  protected function doWaitForBackups() {
572  if ( $this->server ) {
573  return; // not using LBFactory instance
574  }
575 
576  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
577  $lbFactory->waitForReplication( [
578  'domain' => $this->domain,
579  'cluster' => is_string( $this->cluster ) ? $this->cluster : false
580  ] );
581  }
582 
586  protected function doFlushCaches() {
587  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
588  $this->cache->delete( $this->getCacheKey( $type ) );
589  }
590  }
591 
596  public function getAllQueuedJobs() {
597  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
598  }
599 
604  public function getAllAcquiredJobs() {
605  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
606  }
607 
612  protected function getJobIterator( array $conds ) {
613  $dbr = $this->getReplicaDB();
615  $scope = $this->getScopedNoTrxFlag( $dbr );
616  try {
617  return new MappedIterator(
618  $dbr->select( 'job', self::selectFields(), $conds ),
619  function ( $row ) {
620  $job = Job::factory(
621  $row->job_cmd,
622  Title::makeTitle( $row->job_namespace, $row->job_title ),
623  strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
624  );
625  $job->setMetadata( 'id', $row->job_id );
626  $job->setMetadata( 'timestamp', $row->job_timestamp );
627 
628  return $job;
629  }
630  );
631  } catch ( DBError $e ) {
632  $this->throwDBException( $e );
633  }
634  }
635 
636  public function getCoalesceLocationInternal() {
637  if ( $this->server ) {
638  return null; // not using the LBFactory instance
639  }
640 
641  return is_string( $this->cluster )
642  ? "DBCluster:{$this->cluster}:{$this->domain}"
643  : "LBFactory:{$this->domain}";
644  }
645 
646  protected function doGetSiblingQueuesWithJobs( array $types ) {
647  $dbr = $this->getReplicaDB();
649  $scope = $this->getScopedNoTrxFlag( $dbr );
650  // @note: this does not check whether the jobs are claimed or not.
651  // This is useful so JobQueueGroup::pop() also sees queues that only
652  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
653  // failed jobs so that they can be popped again for that edge case.
654  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
655  [ 'job_cmd' => $types ], __METHOD__ );
656 
657  $types = [];
658  foreach ( $res as $row ) {
659  $types[] = $row->job_cmd;
660  }
661 
662  return $types;
663  }
664 
665  protected function doGetSiblingQueueSizes( array $types ) {
666  $dbr = $this->getReplicaDB();
668  $scope = $this->getScopedNoTrxFlag( $dbr );
669 
670  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
671  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
672 
673  $sizes = [];
674  foreach ( $res as $row ) {
675  $sizes[$row->job_cmd] = (int)$row->count;
676  }
677 
678  return $sizes;
679  }
680 
686  public function recycleAndDeleteStaleJobs() {
687  $now = time();
688  $count = 0; // affected rows
689  $dbw = $this->getMasterDB();
691  $scope = $this->getScopedNoTrxFlag( $dbw );
692 
693  try {
694  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
695  return $count; // already in progress
696  }
697 
698  // Remove claims on jobs acquired for too long if enabled...
699  if ( $this->claimTTL > 0 ) {
700  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
701  // Get the IDs of jobs that have be claimed but not finished after too long.
702  // These jobs can be recycled into the queue by expiring the claim. Selecting
703  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
704  $res = $dbw->select( 'job', 'job_id',
705  [
706  'job_cmd' => $this->type,
707  "job_token != {$dbw->addQuotes( '' )}", // was acquired
708  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
709  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
710  __METHOD__
711  );
712  $ids = array_map(
713  function ( $o ) {
714  return $o->job_id;
715  }, iterator_to_array( $res )
716  );
717  if ( count( $ids ) ) {
718  // Reset job_token for these jobs so that other runners will pick them up.
719  // Set the timestamp to the current time, as it is useful to now that the job
720  // was already tried before (the timestamp becomes the "released" time).
721  $dbw->update( 'job',
722  [
723  'job_token' => '',
724  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
725  [
726  'job_id' => $ids ],
727  __METHOD__
728  );
729  $affected = $dbw->affectedRows();
730  $count += $affected;
731  $this->incrStats( 'recycles', $this->type, $affected );
732  }
733  }
734 
735  // Just destroy any stale jobs...
736  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
737  $conds = [
738  'job_cmd' => $this->type,
739  "job_token != {$dbw->addQuotes( '' )}", // was acquired
740  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
741  ];
742  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
743  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
744  }
745  // Get the IDs of jobs that are considered stale and should be removed. Selecting
746  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
747  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
748  $ids = array_map(
749  function ( $o ) {
750  return $o->job_id;
751  }, iterator_to_array( $res )
752  );
753  if ( count( $ids ) ) {
754  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
755  $affected = $dbw->affectedRows();
756  $count += $affected;
757  $this->incrStats( 'abandons', $this->type, $affected );
758  }
759 
760  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
761  } catch ( DBError $e ) {
762  $this->throwDBException( $e );
763  }
764 
765  return $count;
766  }
767 
773  protected function insertFields( IJobSpecification $job, IDatabase $db ) {
774  return [
775  // Fields that describe the nature of the job
776  'job_cmd' => $job->getType(),
777  'job_namespace' => $job->getTitle()->getNamespace(),
778  'job_title' => $job->getTitle()->getDBkey(),
779  'job_params' => self::makeBlob( $job->getParams() ),
780  // Additional job metadata
781  'job_timestamp' => $db->timestamp(),
782  'job_sha1' => Wikimedia\base_convert(
783  sha1( serialize( $job->getDeduplicationInfo() ) ),
784  16, 36, 31
785  ),
786  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
787  ];
788  }
789 
794  protected function getReplicaDB() {
795  try {
796  return $this->getDB( DB_REPLICA );
797  } catch ( DBConnectionError $e ) {
798  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
799  }
800  }
801 
806  protected function getMasterDB() {
807  try {
808  return $this->getDB( DB_MASTER );
809  } catch ( DBConnectionError $e ) {
810  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
811  }
812  }
813 
818  protected function getDB( $index ) {
819  if ( $this->server ) {
820  if ( $this->conn instanceof IDatabase ) {
821  return $this->conn;
822  } elseif ( $this->conn instanceof DBError ) {
823  throw $this->conn;
824  }
825 
826  try {
827  $this->conn = Database::factory( $this->server['type'], $this->server );
828  } catch ( DBError $e ) {
829  $this->conn = $e;
830  throw $e;
831  }
832 
833  return $this->conn;
834  } else {
835  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
836  $lb = is_string( $this->cluster )
837  ? $lbFactory->getExternalLB( $this->cluster )
838  : $lbFactory->getMainLB( $this->domain );
839 
840  return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
841  // Keep a separate connection to avoid contention and deadlocks;
842  // However, SQLite has the opposite behavior due to DB-level locking.
843  ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
844  // Jobs insertion will be defered until the PRESEND stage to reduce contention.
845  : $lb->getConnectionRef( $index, [], $this->domain );
846  }
847  }
848 
853  private function getScopedNoTrxFlag( IDatabase $db ) {
854  $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
855  $db->clearFlag( DBO_TRX ); // make each query its own transaction
856 
857  return new ScopedCallback( function () use ( $db, $autoTrx ) {
858  if ( $autoTrx ) {
859  $db->setFlag( DBO_TRX ); // restore old setting
860  }
861  } );
862  }
863 
868  private function getCacheKey( $property ) {
869  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
870 
871  return $this->cache->makeGlobalKey(
872  'jobqueue',
873  $this->domain,
874  $cluster,
875  $this->type,
876  $property
877  );
878  }
879 
884  protected static function makeBlob( $params ) {
885  if ( $params !== false ) {
886  return serialize( $params );
887  } else {
888  return '';
889  }
890  }
891 
896  protected static function extractBlob( $blob ) {
897  if ( (string)$blob !== '' ) {
898  return unserialize( $blob );
899  } else {
900  return false;
901  }
902  }
903 
908  protected function throwDBException( DBError $e ) {
909  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
910  }
911 
917  public static function selectFields() {
918  return [
919  'job_id',
920  'job_cmd',
921  'job_namespace',
922  'job_title',
923  'job_timestamp',
924  'job_params',
925  'job_random',
926  'job_attempts',
927  'job_token',
928  'job_token_timestamp',
929  'job_sha1',
930  ];
931  }
932 }
JobQueueDB\MAX_AGE_PRUNE
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:37
JobQueueDB\doBatchPushInternal
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:238
JobQueueDB\insertFields
insertFields(IJobSpecification $job, IDatabase $db)
Definition: JobQueueDB.php:773
MappedIterator
Convenience class for generating iterators from iterators.
Definition: MappedIterator.php:28
Wikimedia\Rdbms\Database
Relational database abstraction object.
Definition: Database.php:48
JobQueueDB\doWaitForBackups
doWaitForBackups()
Definition: JobQueueDB.php:571
JobQueueDB\doFlushCaches
doFlushCaches()
Definition: JobQueueDB.php:586
JobQueueDB\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:646
JobQueueDB\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueDB.php:636
JobQueueDB\getAllAcquiredJobs
getAllAcquiredJobs()
Definition: JobQueueDB.php:604
JobQueueDB\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:665
JobQueueDB\throwDBException
throwDBException(DBError $e)
Definition: JobQueueDB.php:908
captcha-old.count
count
Definition: captcha-old.py:249
WANObjectCache\set
set( $key, $value, $ttl=self::TTL_INDEFINITE, array $opts=[])
Set the value of a key in cache.
Definition: WANObjectCache.php:554
JobQueue\incrStats
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:706
JobQueueDB\doGetSize
doGetSize()
Definition: JobQueueDB.php:104
Wikimedia\Rdbms\IDatabase\endAtomic
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
JobQueueDB\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueDB.php:77
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:52
JobQueueDB\getDB
getDB( $index)
Definition: JobQueueDB.php:818
$params
$params
Definition: styleTest.css.php:44
JobQueueDB\__construct
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:61
JobQueueDB\getScopedNoTrxFlag
getScopedNoTrxFlag(IDatabase $db)
Definition: JobQueueDB.php:853
JobQueueDB\getMasterDB
getMasterDB()
Definition: JobQueueDB.php:806
$res
$res
Definition: database.txt:21
JobQueueDB
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:35
serialize
serialize()
Definition: ApiMessageTrait.php:134
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueDB\CACHE_TTL_SHORT
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:36
Wikimedia\Rdbms\DBError
Database error base class.
Definition: DBError.php:30
Wikimedia\Rdbms\IDatabase\insert
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
DBO_TRX
const DBO_TRX
Definition: defines.php:12
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
JobQueueDB\getReplicaDB
getReplicaDB()
Definition: JobQueueDB.php:794
JobQueueDB\getJobIterator
getJobIterator(array $conds)
Definition: JobQueueDB.php:612
$dbr
$dbr
Definition: testCompression.php:50
JobQueueDB\makeBlob
static makeBlob( $params)
Definition: JobQueueDB.php:884
JobQueueDB\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueDB.php:73
WANObjectCache\newEmpty
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
Definition: WANObjectCache.php:278
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:30
Wikimedia\Rdbms\IDatabase\timestamp
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by wfTimestamp() to the format used for inserting ...
MWException
MediaWiki exception.
Definition: MWException.php:26
$title
namespace and then decline to actually register it file or subcat img or subcat $title
Definition: hooks.txt:925
WANObjectCache\get
get( $key, &$curTTL=null, array $checkKeys=[], &$info=null)
Fetch the value of a key from cache.
Definition: WANObjectCache.php:331
$property
$property
Definition: styleTest.css.php:48
JobQueueDB\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:516
$blob
$blob
Definition: testCompression.php:65
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:35
JobQueueDB\recycleAndDeleteStaleJobs
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:686
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:48
JobQueueDB\MAX_OFFSET
const MAX_OFFSET
Definition: JobQueueDB.php:39
JobQueueDB\doBatchPush
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:205
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
Title\makeTitle
static makeTitle( $ns, $title, $fragment='', $interwiki='')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:576
DB_REPLICA
const DB_REPLICA
Definition: defines.php:25
JobQueueDB\doGetAcquiredCount
doGetAcquiredCount()
Definition: JobQueueDB.php:132
JobQueueError
Definition: JobQueueError.php:28
DB_MASTER
const DB_MASTER
Definition: defines.php:26
array
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
wfDebug
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
Definition: GlobalFunctions.php:949
JobQueueDB\extractBlob
static extractBlob( $blob)
Definition: JobQueueDB.php:896
JobQueueDB\doIsEmpty
doIsEmpty()
Definition: JobQueueDB.php:85
$fname
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings,...
Definition: Setup.php:123
JobQueueDB\$cluster
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
Definition: JobQueueDB.php:49
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2162
Job\factory
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:72
JobQueueDB\MAX_JOB_RANDOM
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:38
JobQueueDB\doDelete
doDelete()
Definition: JobQueueDB.php:554
JobQueueDB\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueueDB.php:165
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:116
JobQueueDB\getAllQueuedJobs
getAllQueuedJobs()
Definition: JobQueueDB.php:596
JobQueueDB\selectFields
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:917
JobQueueConnectionError
Definition: JobQueueConnectionError.php:28
Wikimedia\Rdbms\IDatabase\getFlag
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
JobQueueDB\claimOldest
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:425
JobQueueDB\$server
array null $server
Server configuration array.
Definition: JobQueueDB.php:47
JobQueueDB\doAck
doAck(Job $job)
Definition: JobQueueDB.php:487
unserialize
unserialize( $serialized)
Definition: ApiMessageTrait.php:142
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:521
$rows
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition: hooks.txt:2636
JobQueueDB\getCacheKey
getCacheKey( $property)
Definition: JobQueueDB.php:868
JobQueueDB\$cache
WANObjectCache $cache
Definition: JobQueueDB.php:42
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:49
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
Wikimedia\Rdbms\DBConnectionError
Definition: DBConnectionError.php:26
Wikimedia\Rdbms\IDatabase\select
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
JobQueueDB\$conn
IDatabase DBError null $conn
Definition: JobQueueDB.php:44
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
JobQueueDB\doPop
doPop()
Definition: JobQueueDB.php:295
Wikimedia\Rdbms\IDatabase\setFlag
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
server
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such and we might be restricted by PHP settings such as safe mode or open_basedir We cannot assume that the software even has read access anywhere useful Many shared hosts run all users web applications under the same so they can t rely on Unix and must forbid reads to even standard directories like tmp lest users read each others files We cannot assume that the user has the ability to install or run any programs not written as web accessible PHP scripts Since anything that works on cheap shared hosting will work if you have shell or root access MediaWiki s design is based around catering to the lowest common denominator Although we support higher end setups as the way many things work by default is tailored toward shared hosting These defaults are unconventional from the point of view of and they certainly aren t ideal for someone who s installing MediaWiki as MediaWiki does not conform to normal Unix filesystem layout Hopefully we ll offer direct support for standard layouts in the but for now *any change to the location of files is unsupported *Moving things and leaving symlinks will *probably *not break but it is *strongly *advised not to try any more intrusive changes to get MediaWiki to conform more closely to your filesystem hierarchy Any such attempt will almost certainly result in unnecessary bugs The standard recommended location to install relative to the web is it should be possible to enable the appropriate rewrite rules by if you can reconfigure the web server
Definition: distributors.txt:53
IJobSpecification
Job queue task description interface.
Definition: IJobSpecification.php:29
JobQueue\getType
getType()
Definition: JobQueue.php:137
Wikimedia\Rdbms\IDatabase\clearFlag
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
wfRandomString
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Definition: GlobalFunctions.php:298
JobQueueDB\claimRandom
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:346
Wikimedia\Rdbms\IDatabase\startAtomic
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.