MediaWiki  master
JobQueueDB.php
Go to the documentation of this file.
1 <?php
28 
35 class JobQueueDB extends JobQueue {
36  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
37  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
38  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
39  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
40 
42  protected $cache;
44  protected $conn;
45 
47  protected $server;
49  protected $cluster;
50 
61  protected function __construct( array $params ) {
62  parent::__construct( $params );
63 
64  if ( isset( $params['server'] ) ) {
65  $this->server = $params['server'];
66  } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
67  $this->cluster = $params['cluster'];
68  }
69 
70  $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
71  }
72 
73  protected function supportedOrders() {
74  return [ 'random', 'timestamp', 'fifo' ];
75  }
76 
77  protected function optimalOrder() {
78  return 'random';
79  }
80 
85  protected function doIsEmpty() {
86  $dbr = $this->getReplicaDB();
88  $scope = $this->getScopedNoTrxFlag( $dbr );
89  try {
90  $found = $dbr->selectField( // unclaimed job
91  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
92  );
93  } catch ( DBError $e ) {
94  throw $this->getDBException( $e );
95  }
96 
97  return !$found;
98  }
99 
104  protected function doGetSize() {
105  $key = $this->getCacheKey( 'size' );
106 
107  $size = $this->cache->get( $key );
108  if ( is_int( $size ) ) {
109  return $size;
110  }
111 
112  $dbr = $this->getReplicaDB();
114  $scope = $this->getScopedNoTrxFlag( $dbr );
115  try {
116  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
117  [ 'job_cmd' => $this->type, 'job_token' => '' ],
118  __METHOD__
119  );
120  } catch ( DBError $e ) {
121  throw $this->getDBException( $e );
122  }
123  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
124 
125  return $size;
126  }
127 
132  protected function doGetAcquiredCount() {
133  if ( $this->claimTTL <= 0 ) {
134  return 0; // no acknowledgements
135  }
136 
137  $key = $this->getCacheKey( 'acquiredcount' );
138 
139  $count = $this->cache->get( $key );
140  if ( is_int( $count ) ) {
141  return $count;
142  }
143 
144  $dbr = $this->getReplicaDB();
146  $scope = $this->getScopedNoTrxFlag( $dbr );
147  try {
148  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
149  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
150  __METHOD__
151  );
152  } catch ( DBError $e ) {
153  throw $this->getDBException( $e );
154  }
155  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
156 
157  return $count;
158  }
159 
165  protected function doGetAbandonedCount() {
166  if ( $this->claimTTL <= 0 ) {
167  return 0; // no acknowledgements
168  }
169 
170  $key = $this->getCacheKey( 'abandonedcount' );
171 
172  $count = $this->cache->get( $key );
173  if ( is_int( $count ) ) {
174  return $count;
175  }
176 
177  $dbr = $this->getReplicaDB();
179  $scope = $this->getScopedNoTrxFlag( $dbr );
180  try {
181  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
182  [
183  'job_cmd' => $this->type,
184  "job_token != {$dbr->addQuotes( '' )}",
185  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
186  ],
187  __METHOD__
188  );
189  } catch ( DBError $e ) {
190  throw $this->getDBException( $e );
191  }
192 
193  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
194 
195  return $count;
196  }
197 
205  protected function doBatchPush( array $jobs, $flags ) {
206  $dbw = $this->getMasterDB();
208  $scope = $this->getScopedNoTrxFlag( $dbw );
209  // In general, there will be two cases here:
210  // a) sqlite; DB connection is probably a regular round-aware handle.
211  // If the connection is busy with a transaction, then defer the job writes
212  // until right before the main round commit step. Any errors that bubble
213  // up will rollback the main commit round.
214  // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
215  // No transaction is active nor will be started by writes, so enqueue the jobs
216  // now so that any errors will show up immediately as the interface expects. Any
217  // errors that bubble up will rollback the main commit round.
218  $fname = __METHOD__;
219  $dbw->onTransactionPreCommitOrIdle(
220  function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
221  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
222  },
223  $fname
224  );
225  }
226 
238  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
239  if ( $jobs === [] ) {
240  return;
241  }
242 
243  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
244  $rowList = []; // list of jobs for jobs that are not de-duplicated
245  foreach ( $jobs as $job ) {
246  $row = $this->insertFields( $job, $dbw );
247  if ( $job->ignoreDuplicates() ) {
248  $rowSet[$row['job_sha1']] = $row;
249  } else {
250  $rowList[] = $row;
251  }
252  }
253 
254  if ( $flags & self::QOS_ATOMIC ) {
255  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
256  }
257  try {
258  // Strip out any duplicate jobs that are already in the queue...
259  if ( count( $rowSet ) ) {
260  $res = $dbw->select( 'job', 'job_sha1',
261  [
262  // No job_type condition since it's part of the job_sha1 hash
263  'job_sha1' => array_keys( $rowSet ),
264  'job_token' => '' // unclaimed
265  ],
266  $method
267  );
268  foreach ( $res as $row ) {
269  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
270  unset( $rowSet[$row->job_sha1] ); // already enqueued
271  }
272  }
273  // Build the full list of job rows to insert
274  $rows = array_merge( $rowList, array_values( $rowSet ) );
275  // Insert the job rows in chunks to avoid replica DB lag...
276  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
277  $dbw->insert( 'job', $rowBatch, $method );
278  }
279  $this->incrStats( 'inserts', $this->type, count( $rows ) );
280  $this->incrStats( 'dupe_inserts', $this->type,
281  count( $rowSet ) + count( $rowList ) - count( $rows )
282  );
283  } catch ( DBError $e ) {
284  throw $this->getDBException( $e );
285  }
286  if ( $flags & self::QOS_ATOMIC ) {
287  $dbw->endAtomic( $method );
288  }
289  }
290 
295  protected function doPop() {
296  $dbw = $this->getMasterDB();
298  $scope = $this->getScopedNoTrxFlag( $dbw );
299 
300  $job = false; // job popped off
301  try {
302  $uuid = wfRandomString( 32 ); // pop attempt
303  do { // retry when our row is invalid or deleted as a duplicate
304  // Try to reserve a row in the DB...
305  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
306  $row = $this->claimOldest( $uuid );
307  } else { // random first
308  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
309  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
310  $row = $this->claimRandom( $uuid, $rand, $gte );
311  }
312  // Check if we found a row to reserve...
313  if ( !$row ) {
314  break; // nothing to do
315  }
316  $this->incrStats( 'pops', $this->type );
317 
318  // Get the job object from the row...
319  $job = $this->jobFromRow( $row );
320  break; // done
321  } while ( true );
322 
323  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
324  // Handled jobs that need to be recycled/deleted;
325  // any recycled jobs will be picked up next attempt
326  $this->recycleAndDeleteStaleJobs();
327  }
328  } catch ( DBError $e ) {
329  throw $this->getDBException( $e );
330  }
331 
332  return $job;
333  }
334 
343  protected function claimRandom( $uuid, $rand, $gte ) {
344  $dbw = $this->getMasterDB();
346  $scope = $this->getScopedNoTrxFlag( $dbw );
347  // Check cache to see if the queue has <= OFFSET items
348  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
349 
350  $invertedDirection = false; // whether one job_random direction was already scanned
351  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
352  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
353  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
354  // be used here with MySQL.
355  do {
356  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
357  // For small queues, using OFFSET will overshoot and return no rows more often.
358  // Instead, this uses job_random to pick a row (possibly checking both directions).
359  $ineq = $gte ? '>=' : '<=';
360  $dir = $gte ? 'ASC' : 'DESC';
361  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
362  [
363  'job_cmd' => $this->type,
364  'job_token' => '', // unclaimed
365  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
366  __METHOD__,
367  [ 'ORDER BY' => "job_random {$dir}" ]
368  );
369  if ( !$row && !$invertedDirection ) {
370  $gte = !$gte;
371  $invertedDirection = true;
372  continue; // try the other direction
373  }
374  } else { // table *may* have >= MAX_OFFSET rows
375  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
376  // in MySQL if there are many rows for some reason. This uses a small OFFSET
377  // instead of job_random for reducing excess claim retries.
378  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
379  [
380  'job_cmd' => $this->type,
381  'job_token' => '', // unclaimed
382  ],
383  __METHOD__,
384  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
385  );
386  if ( !$row ) {
387  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
388  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
389  continue; // use job_random
390  }
391  }
392 
393  if ( $row ) { // claim the job
394  $dbw->update( 'job', // update by PK
395  [
396  'job_token' => $uuid,
397  'job_token_timestamp' => $dbw->timestamp(),
398  'job_attempts = job_attempts+1' ],
399  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
400  __METHOD__
401  );
402  // This might get raced out by another runner when claiming the previously
403  // selected row. The use of job_random should minimize this problem, however.
404  if ( !$dbw->affectedRows() ) {
405  $row = false; // raced out
406  }
407  } else {
408  break; // nothing to do
409  }
410  } while ( !$row );
411 
412  return $row;
413  }
414 
421  protected function claimOldest( $uuid ) {
422  $dbw = $this->getMasterDB();
424  $scope = $this->getScopedNoTrxFlag( $dbw );
425 
426  $row = false; // the row acquired
427  do {
428  if ( $dbw->getType() === 'mysql' ) {
429  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
430  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
431  // Oracle and Postgre have no such limitation. However, MySQL offers an
432  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
433  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
434  "SET " .
435  "job_token = {$dbw->addQuotes( $uuid ) }, " .
436  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
437  "job_attempts = job_attempts+1 " .
438  "WHERE ( " .
439  "job_cmd = {$dbw->addQuotes( $this->type )} " .
440  "AND job_token = {$dbw->addQuotes( '' )} " .
441  ") ORDER BY job_id ASC LIMIT 1",
442  __METHOD__
443  );
444  } else {
445  // Use a subquery to find the job, within an UPDATE to claim it.
446  // This uses as much of the DB wrapper functions as possible.
447  $dbw->update( 'job',
448  [
449  'job_token' => $uuid,
450  'job_token_timestamp' => $dbw->timestamp(),
451  'job_attempts = job_attempts+1' ],
452  [ 'job_id = (' .
453  $dbw->selectSQLText( 'job', 'job_id',
454  [ 'job_cmd' => $this->type, 'job_token' => '' ],
455  __METHOD__,
456  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
457  ')'
458  ],
459  __METHOD__
460  );
461  }
462  // Fetch any row that we just reserved...
463  if ( $dbw->affectedRows() ) {
464  $row = $dbw->selectRow( 'job', self::selectFields(),
465  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
466  );
467  if ( !$row ) { // raced out by duplicate job removal
468  wfDebug( "Row deleted as duplicate by another process.\n" );
469  }
470  } else {
471  break; // nothing to do
472  }
473  } while ( !$row );
474 
475  return $row;
476  }
477 
483  protected function doAck( RunnableJob $job ) {
484  $id = $job->getMetadata( 'id' );
485  if ( $id === null ) {
486  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
487  }
488 
489  $dbw = $this->getMasterDB();
491  $scope = $this->getScopedNoTrxFlag( $dbw );
492  try {
493  // Delete a row with a single DELETE without holding row locks over RTTs...
494  $dbw->delete(
495  'job',
496  [ 'job_cmd' => $this->type, 'job_id' => $id ],
497  __METHOD__
498  );
499 
500  $this->incrStats( 'acks', $this->type );
501  } catch ( DBError $e ) {
502  throw $this->getDBException( $e );
503  }
504  }
505 
513  $params = $job->getParams();
514  if ( !isset( $params['rootJobSignature'] ) ) {
515  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
516  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
517  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
518  }
519  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
520  // Callers should call JobQueueGroup::push() before this method so that if the insert
521  // fails, the de-duplication registration will be aborted. Since the insert is
522  // deferred till "transaction idle", do the same here, so that the ordering is
523  // maintained. Having only the de-duplication registration succeed would cause
524  // jobs to become no-ops without any actual jobs that made them redundant.
525  $dbw = $this->getMasterDB();
527  $scope = $this->getScopedNoTrxFlag( $dbw );
528 
530  $dbw->onTransactionCommitOrIdle(
531  function () use ( $cache, $params, $key ) {
532  $timestamp = $cache->get( $key ); // current last timestamp of this job
533  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
534  return true; // a newer version of this root job was enqueued
535  }
536 
537  // Update the timestamp of the last root job started at the location...
538  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
539  },
540  __METHOD__
541  );
542 
543  return true;
544  }
545 
550  protected function doDelete() {
551  $dbw = $this->getMasterDB();
553  $scope = $this->getScopedNoTrxFlag( $dbw );
554  try {
555  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
556  } catch ( DBError $e ) {
557  throw $this->getDBException( $e );
558  }
559 
560  return true;
561  }
562 
567  protected function doWaitForBackups() {
568  if ( $this->server ) {
569  return; // not using LBFactory instance
570  }
571 
572  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
573  $lbFactory->waitForReplication( [
574  'domain' => $this->domain,
575  'cluster' => is_string( $this->cluster ) ? $this->cluster : false
576  ] );
577  }
578 
582  protected function doFlushCaches() {
583  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
584  $this->cache->delete( $this->getCacheKey( $type ) );
585  }
586  }
587 
592  public function getAllQueuedJobs() {
593  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
594  }
595 
600  public function getAllAcquiredJobs() {
601  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
602  }
603 
608  protected function getJobIterator( array $conds ) {
609  $dbr = $this->getReplicaDB();
611  $scope = $this->getScopedNoTrxFlag( $dbr );
612  try {
613  return new MappedIterator(
614  $dbr->select( 'job', self::selectFields(), $conds ),
615  function ( $row ) {
616  return $this->jobFromRow( $row );
617  }
618  );
619  } catch ( DBError $e ) {
620  throw $this->getDBException( $e );
621  }
622  }
623 
624  public function getCoalesceLocationInternal() {
625  if ( $this->server ) {
626  return null; // not using the LBFactory instance
627  }
628 
629  return is_string( $this->cluster )
630  ? "DBCluster:{$this->cluster}:{$this->domain}"
631  : "LBFactory:{$this->domain}";
632  }
633 
634  protected function doGetSiblingQueuesWithJobs( array $types ) {
635  $dbr = $this->getReplicaDB();
637  $scope = $this->getScopedNoTrxFlag( $dbr );
638  // @note: this does not check whether the jobs are claimed or not.
639  // This is useful so JobQueueGroup::pop() also sees queues that only
640  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
641  // failed jobs so that they can be popped again for that edge case.
642  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
643  [ 'job_cmd' => $types ], __METHOD__ );
644 
645  $types = [];
646  foreach ( $res as $row ) {
647  $types[] = $row->job_cmd;
648  }
649 
650  return $types;
651  }
652 
653  protected function doGetSiblingQueueSizes( array $types ) {
654  $dbr = $this->getReplicaDB();
656  $scope = $this->getScopedNoTrxFlag( $dbr );
657 
658  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
659  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
660 
661  $sizes = [];
662  foreach ( $res as $row ) {
663  $sizes[$row->job_cmd] = (int)$row->count;
664  }
665 
666  return $sizes;
667  }
668 
674  public function recycleAndDeleteStaleJobs() {
675  $now = time();
676  $count = 0; // affected rows
677  $dbw = $this->getMasterDB();
679  $scope = $this->getScopedNoTrxFlag( $dbw );
680 
681  try {
682  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
683  return $count; // already in progress
684  }
685 
686  // Remove claims on jobs acquired for too long if enabled...
687  if ( $this->claimTTL > 0 ) {
688  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
689  // Get the IDs of jobs that have be claimed but not finished after too long.
690  // These jobs can be recycled into the queue by expiring the claim. Selecting
691  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
692  $res = $dbw->select( 'job', 'job_id',
693  [
694  'job_cmd' => $this->type,
695  "job_token != {$dbw->addQuotes( '' )}", // was acquired
696  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
697  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
698  __METHOD__
699  );
700  $ids = array_map(
701  function ( $o ) {
702  return $o->job_id;
703  }, iterator_to_array( $res )
704  );
705  if ( count( $ids ) ) {
706  // Reset job_token for these jobs so that other runners will pick them up.
707  // Set the timestamp to the current time, as it is useful to now that the job
708  // was already tried before (the timestamp becomes the "released" time).
709  $dbw->update( 'job',
710  [
711  'job_token' => '',
712  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
713  [
714  'job_id' => $ids ],
715  __METHOD__
716  );
717  $affected = $dbw->affectedRows();
718  $count += $affected;
719  $this->incrStats( 'recycles', $this->type, $affected );
720  }
721  }
722 
723  // Just destroy any stale jobs...
724  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
725  $conds = [
726  'job_cmd' => $this->type,
727  "job_token != {$dbw->addQuotes( '' )}", // was acquired
728  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
729  ];
730  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
731  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
732  }
733  // Get the IDs of jobs that are considered stale and should be removed. Selecting
734  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
735  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
736  $ids = array_map(
737  function ( $o ) {
738  return $o->job_id;
739  }, iterator_to_array( $res )
740  );
741  if ( count( $ids ) ) {
742  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
743  $affected = $dbw->affectedRows();
744  $count += $affected;
745  $this->incrStats( 'abandons', $this->type, $affected );
746  }
747 
748  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
749  } catch ( DBError $e ) {
750  throw $this->getDBException( $e );
751  }
752 
753  return $count;
754  }
755 
761  protected function insertFields( IJobSpecification $job, IDatabase $db ) {
762  return [
763  // Fields that describe the nature of the job
764  'job_cmd' => $job->getType(),
765  'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
766  'job_title' => $job->getParams()['title'] ?? '',
767  'job_params' => self::makeBlob( $job->getParams() ),
768  // Additional job metadata
769  'job_timestamp' => $db->timestamp(),
770  'job_sha1' => Wikimedia\base_convert(
771  sha1( serialize( $job->getDeduplicationInfo() ) ),
772  16, 36, 31
773  ),
774  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
775  ];
776  }
777 
782  protected function getReplicaDB() {
783  try {
784  return $this->getDB( DB_REPLICA );
785  } catch ( DBConnectionError $e ) {
786  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
787  }
788  }
789 
794  protected function getMasterDB() {
795  try {
796  return $this->getDB( DB_MASTER );
797  } catch ( DBConnectionError $e ) {
798  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
799  }
800  }
801 
806  protected function getDB( $index ) {
807  if ( $this->server ) {
808  if ( $this->conn instanceof IDatabase ) {
809  return $this->conn;
810  } elseif ( $this->conn instanceof DBError ) {
811  throw $this->conn;
812  }
813 
814  try {
815  $this->conn = Database::factory( $this->server['type'], $this->server );
816  } catch ( DBError $e ) {
817  $this->conn = $e;
818  throw $e;
819  }
820 
821  return $this->conn;
822  } else {
823  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
824  $lb = is_string( $this->cluster )
825  ? $lbFactory->getExternalLB( $this->cluster )
826  : $lbFactory->getMainLB( $this->domain );
827 
828  return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
829  // Keep a separate connection to avoid contention and deadlocks;
830  // However, SQLite has the opposite behavior due to DB-level locking.
831  ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
832  // Jobs insertion will be defered until the PRESEND stage to reduce contention.
833  : $lb->getConnectionRef( $index, [], $this->domain );
834  }
835  }
836 
841  private function getScopedNoTrxFlag( IDatabase $db ) {
842  $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
843  $db->clearFlag( DBO_TRX ); // make each query its own transaction
844 
845  return new ScopedCallback( function () use ( $db, $autoTrx ) {
846  if ( $autoTrx ) {
847  $db->setFlag( DBO_TRX ); // restore old setting
848  }
849  } );
850  }
851 
856  private function getCacheKey( $property ) {
857  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
858 
859  return $this->cache->makeGlobalKey(
860  'jobqueue',
861  $this->domain,
862  $cluster,
863  $this->type,
864  $property
865  );
866  }
867 
872  protected static function makeBlob( $params ) {
873  if ( $params !== false ) {
874  return serialize( $params );
875  } else {
876  return '';
877  }
878  }
879 
884  protected function jobFromRow( $row ) {
885  $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
886  if ( !is_array( $params ) ) { // this shouldn't happen
887  throw new UnexpectedValueException(
888  "Could not unserialize job with ID '{$row->job_id}'." );
889  }
890 
891  $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
892  $job = $this->factoryJob( $row->job_cmd, $params );
893  $job->setMetadata( 'id', $row->job_id );
894  $job->setMetadata( 'timestamp', $row->job_timestamp );
895 
896  return $job;
897  }
898 
903  protected function getDBException( DBError $e ) {
904  return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
905  }
906 
912  public static function selectFields() {
913  return [
914  'job_id',
915  'job_cmd',
916  'job_namespace',
917  'job_title',
918  'job_timestamp',
919  'job_params',
920  'job_random',
921  'job_attempts',
922  'job_token',
923  'job_token_timestamp',
924  'job_sha1',
925  ];
926  }
927 }
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:238
factoryJob( $command, $params)
Definition: JobQueue.php:694
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition: hooks.txt:2633
insertFields(IJobSpecification $job, IDatabase $db)
Definition: JobQueueDB.php:761
$property
getCoalesceLocationInternal()
Definition: JobQueueDB.php:624
serialize()
getDB( $index)
Definition: JobQueueDB.php:806
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:61
getScopedNoTrxFlag(IDatabase $db)
Definition: JobQueueDB.php:841
set( $key, $value, $ttl=self::TTL_INDEFINITE, array $opts=[])
Set the value of a key in cache.
IDatabase DBError null $conn
Definition: JobQueueDB.php:44
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2159
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:634
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:716
This code would result in ircNotify being run twice when an article is and once for brion Hooks can return three possible true was required This is the default since MediaWiki *some string
Definition: hooks.txt:175
const NS_SPECIAL
Definition: Defines.php:49
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:37
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:35
BagOStuff $dupCache
Definition: JobQueue.php:48
const DB_MASTER
Definition: defines.php:26
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by wfTimestamp() to the format used for inserting ...
static makeBlob( $params)
Definition: JobQueueDB.php:872
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:674
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:343
string $type
Job type.
Definition: JobQueue.php:35
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
array null $server
Server configuration array.
Definition: JobQueueDB.php:47
jobFromRow( $row)
Definition: JobQueueDB.php:884
getAllAcquiredJobs()
Definition: JobQueueDB.php:600
getMetadata( $field=null)
getRootJobCacheKey( $signature)
Definition: JobQueue.php:521
const MAX_OFFSET
Definition: JobQueueDB.php:39
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
$res
Definition: database.txt:21
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
getDBException(DBError $e)
Definition: JobQueueDB.php:903
getDeduplicationInfo()
Subclasses may need to override this to make duplication detection work.
get( $key, &$curTTL=null, array $checkKeys=[], &$info=null)
Fetch the value of a key from cache.
$params
unserialize( $serialized)
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:36
doGetAcquiredCount()
Definition: JobQueueDB.php:132
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:780
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:38
getJobIterator(array $conds)
Definition: JobQueueDB.php:608
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:205
Convenience class for generating iterators from iterators.
supportedOrders()
Definition: JobQueueDB.php:73
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings, LocalSettings).
Definition: Setup.php:123
const DBO_TRX
Definition: defines.php:12
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:512
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
doGetAbandonedCount()
Definition: JobQueueDB.php:165
doWaitForBackups()
Definition: JobQueueDB.php:567
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
Definition: JobQueueDB.php:49
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count( $args)< 1) $job
const ROOTJOB_TTL
Definition: JobQueue.php:52
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:653
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:912
Interface for serializable objects that describe a job queue task.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such and we might be restricted by PHP settings such as safe mode or open_basedir We cannot assume that the software even has read access anywhere useful Many shared hosts run all users web applications under the same so they can t rely on Unix and must forbid reads to even standard directories like tmp lest users read each others files We cannot assume that the user has the ability to install or run any programs not written as web accessible PHP scripts Since anything that works on cheap shared hosting will work if you have shell or root access MediaWiki s design is based around catering to the lowest common denominator Although we support higher end setups as the way many things work by default is tailored toward shared hosting These defaults are unconventional from the point of view of and they certainly aren t ideal for someone who s installing MediaWiki as MediaWiki does not conform to normal Unix filesystem layout Hopefully we ll offer direct support for standard layouts in the but for now *any change to the location of files is unsupported *Moving things and leaving symlinks will *probably *not break but it is *strongly *advised not to try any more intrusive changes to get MediaWiki to conform more closely to your filesystem hierarchy Any such attempt will almost certainly result in unnecessary bugs The standard recommended location to install relative to the web is it should be possible to enable the appropriate rewrite rules by if you can reconfigure the web server
doAck(RunnableJob $job)
Definition: JobQueueDB.php:483
const DB_REPLICA
Definition: defines.php:25
getCacheKey( $property)
Definition: JobQueueDB.php:856
WANObjectCache $cache
Definition: JobQueueDB.php:42
getAllQueuedJobs()
Definition: JobQueueDB.php:592
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
Database error base class.
Definition: DBError.php:30
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:421