MediaWiki  master
JobQueueDB.php
Go to the documentation of this file.
1 <?php
28 
35 class JobQueueDB extends JobQueue {
36  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
37  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
38  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
39  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
40 
42  protected $cache;
44  protected $conn;
45 
47  protected $server;
49  protected $cluster;
50 
61  protected function __construct( array $params ) {
62  parent::__construct( $params );
63 
64  if ( isset( $params['server'] ) ) {
65  $this->server = $params['server'];
66  } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
67  $this->cluster = $params['cluster'];
68  }
69 
70  $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
71  }
72 
73  protected function supportedOrders() {
74  return [ 'random', 'timestamp', 'fifo' ];
75  }
76 
77  protected function optimalOrder() {
78  return 'random';
79  }
80 
85  protected function doIsEmpty() {
86  $dbr = $this->getReplicaDB();
88  $scope = $this->getScopedNoTrxFlag( $dbr );
89  try {
90  $found = $dbr->selectField( // unclaimed job
91  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
92  );
93  } catch ( DBError $e ) {
94  $this->throwDBException( $e );
95  }
96 
97  return !$found;
98  }
99 
104  protected function doGetSize() {
105  $key = $this->getCacheKey( 'size' );
106 
107  $size = $this->cache->get( $key );
108  if ( is_int( $size ) ) {
109  return $size;
110  }
111 
112  $dbr = $this->getReplicaDB();
114  $scope = $this->getScopedNoTrxFlag( $dbr );
115  try {
116  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
117  [ 'job_cmd' => $this->type, 'job_token' => '' ],
118  __METHOD__
119  );
120  } catch ( DBError $e ) {
121  $this->throwDBException( $e );
122  }
123  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
124 
125  return $size;
126  }
127 
132  protected function doGetAcquiredCount() {
133  if ( $this->claimTTL <= 0 ) {
134  return 0; // no acknowledgements
135  }
136 
137  $key = $this->getCacheKey( 'acquiredcount' );
138 
139  $count = $this->cache->get( $key );
140  if ( is_int( $count ) ) {
141  return $count;
142  }
143 
144  $dbr = $this->getReplicaDB();
146  $scope = $this->getScopedNoTrxFlag( $dbr );
147  try {
148  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
149  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
150  __METHOD__
151  );
152  } catch ( DBError $e ) {
153  $this->throwDBException( $e );
154  }
155  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
156 
157  return $count;
158  }
159 
165  protected function doGetAbandonedCount() {
166  if ( $this->claimTTL <= 0 ) {
167  return 0; // no acknowledgements
168  }
169 
170  $key = $this->getCacheKey( 'abandonedcount' );
171 
172  $count = $this->cache->get( $key );
173  if ( is_int( $count ) ) {
174  return $count;
175  }
176 
177  $dbr = $this->getReplicaDB();
179  $scope = $this->getScopedNoTrxFlag( $dbr );
180  try {
181  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
182  [
183  'job_cmd' => $this->type,
184  "job_token != {$dbr->addQuotes( '' )}",
185  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
186  ],
187  __METHOD__
188  );
189  } catch ( DBError $e ) {
190  $this->throwDBException( $e );
191  }
192 
193  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
194 
195  return $count;
196  }
197 
205  protected function doBatchPush( array $jobs, $flags ) {
206  $dbw = $this->getMasterDB();
208  $scope = $this->getScopedNoTrxFlag( $dbw );
209  // In general, there will be two cases here:
210  // a) sqlite; DB connection is probably a regular round-aware handle.
211  // If the connection is busy with a transaction, then defer the job writes
212  // until right before the main round commit step. Any errors that bubble
213  // up will rollback the main commit round.
214  // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
215  // No transaction is active nor will be started by writes, so enqueue the jobs
216  // now so that any errors will show up immediately as the interface expects. Any
217  // errors that bubble up will rollback the main commit round.
218  $fname = __METHOD__;
219  $dbw->onTransactionPreCommitOrIdle(
220  function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
221  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
222  },
223  $fname
224  );
225  }
226 
238  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
239  if ( $jobs === [] ) {
240  return;
241  }
242 
243  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
244  $rowList = []; // list of jobs for jobs that are not de-duplicated
245  foreach ( $jobs as $job ) {
246  $row = $this->insertFields( $job, $dbw );
247  if ( $job->ignoreDuplicates() ) {
248  $rowSet[$row['job_sha1']] = $row;
249  } else {
250  $rowList[] = $row;
251  }
252  }
253 
254  if ( $flags & self::QOS_ATOMIC ) {
255  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
256  }
257  try {
258  // Strip out any duplicate jobs that are already in the queue...
259  if ( count( $rowSet ) ) {
260  $res = $dbw->select( 'job', 'job_sha1',
261  [
262  // No job_type condition since it's part of the job_sha1 hash
263  'job_sha1' => array_keys( $rowSet ),
264  'job_token' => '' // unclaimed
265  ],
266  $method
267  );
268  foreach ( $res as $row ) {
269  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
270  unset( $rowSet[$row->job_sha1] ); // already enqueued
271  }
272  }
273  // Build the full list of job rows to insert
274  $rows = array_merge( $rowList, array_values( $rowSet ) );
275  // Insert the job rows in chunks to avoid replica DB lag...
276  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
277  $dbw->insert( 'job', $rowBatch, $method );
278  }
279  $this->incrStats( 'inserts', $this->type, count( $rows ) );
280  $this->incrStats( 'dupe_inserts', $this->type,
281  count( $rowSet ) + count( $rowList ) - count( $rows )
282  );
283  } catch ( DBError $e ) {
284  $this->throwDBException( $e );
285  }
286  if ( $flags & self::QOS_ATOMIC ) {
287  $dbw->endAtomic( $method );
288  }
289  }
290 
295  protected function doPop() {
296  $dbw = $this->getMasterDB();
298  $scope = $this->getScopedNoTrxFlag( $dbw );
299 
300  $job = false; // job popped off
301  try {
302  $uuid = wfRandomString( 32 ); // pop attempt
303  do { // retry when our row is invalid or deleted as a duplicate
304  // Try to reserve a row in the DB...
305  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
306  $row = $this->claimOldest( $uuid );
307  } else { // random first
308  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
309  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
310  $row = $this->claimRandom( $uuid, $rand, $gte );
311  }
312  // Check if we found a row to reserve...
313  if ( !$row ) {
314  break; // nothing to do
315  }
316  $this->incrStats( 'pops', $this->type );
317 
318  // Get the job object from the row...
319  $params = self::extractBlob( $row->job_params );
320  $params = is_array( $params ) ? $params : []; // sanity
321  $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
322  $job = $this->factoryJob( $row->job_cmd, $params );
323  $job->setMetadata( 'id', $row->job_id );
324  $job->setMetadata( 'timestamp', $row->job_timestamp );
325  break; // done
326  } while ( true );
327 
328  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
329  // Handled jobs that need to be recycled/deleted;
330  // any recycled jobs will be picked up next attempt
331  $this->recycleAndDeleteStaleJobs();
332  }
333  } catch ( DBError $e ) {
334  $this->throwDBException( $e );
335  }
336 
337  return $job;
338  }
339 
348  protected function claimRandom( $uuid, $rand, $gte ) {
349  $dbw = $this->getMasterDB();
351  $scope = $this->getScopedNoTrxFlag( $dbw );
352  // Check cache to see if the queue has <= OFFSET items
353  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
354 
355  $row = false; // the row acquired
356  $invertedDirection = false; // whether one job_random direction was already scanned
357  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
358  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
359  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
360  // be used here with MySQL.
361  do {
362  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
363  // For small queues, using OFFSET will overshoot and return no rows more often.
364  // Instead, this uses job_random to pick a row (possibly checking both directions).
365  $ineq = $gte ? '>=' : '<=';
366  $dir = $gte ? 'ASC' : 'DESC';
367  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
368  [
369  'job_cmd' => $this->type,
370  'job_token' => '', // unclaimed
371  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
372  __METHOD__,
373  [ 'ORDER BY' => "job_random {$dir}" ]
374  );
375  if ( !$row && !$invertedDirection ) {
376  $gte = !$gte;
377  $invertedDirection = true;
378  continue; // try the other direction
379  }
380  } else { // table *may* have >= MAX_OFFSET rows
381  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
382  // in MySQL if there are many rows for some reason. This uses a small OFFSET
383  // instead of job_random for reducing excess claim retries.
384  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
385  [
386  'job_cmd' => $this->type,
387  'job_token' => '', // unclaimed
388  ],
389  __METHOD__,
390  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
391  );
392  if ( !$row ) {
393  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
394  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
395  continue; // use job_random
396  }
397  }
398 
399  if ( $row ) { // claim the job
400  $dbw->update( 'job', // update by PK
401  [
402  'job_token' => $uuid,
403  'job_token_timestamp' => $dbw->timestamp(),
404  'job_attempts = job_attempts+1' ],
405  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
406  __METHOD__
407  );
408  // This might get raced out by another runner when claiming the previously
409  // selected row. The use of job_random should minimize this problem, however.
410  if ( !$dbw->affectedRows() ) {
411  $row = false; // raced out
412  }
413  } else {
414  break; // nothing to do
415  }
416  } while ( !$row );
417 
418  return $row;
419  }
420 
427  protected function claimOldest( $uuid ) {
428  $dbw = $this->getMasterDB();
430  $scope = $this->getScopedNoTrxFlag( $dbw );
431 
432  $row = false; // the row acquired
433  do {
434  if ( $dbw->getType() === 'mysql' ) {
435  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
436  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
437  // Oracle and Postgre have no such limitation. However, MySQL offers an
438  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
439  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
440  "SET " .
441  "job_token = {$dbw->addQuotes( $uuid ) }, " .
442  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
443  "job_attempts = job_attempts+1 " .
444  "WHERE ( " .
445  "job_cmd = {$dbw->addQuotes( $this->type )} " .
446  "AND job_token = {$dbw->addQuotes( '' )} " .
447  ") ORDER BY job_id ASC LIMIT 1",
448  __METHOD__
449  );
450  } else {
451  // Use a subquery to find the job, within an UPDATE to claim it.
452  // This uses as much of the DB wrapper functions as possible.
453  $dbw->update( 'job',
454  [
455  'job_token' => $uuid,
456  'job_token_timestamp' => $dbw->timestamp(),
457  'job_attempts = job_attempts+1' ],
458  [ 'job_id = (' .
459  $dbw->selectSQLText( 'job', 'job_id',
460  [ 'job_cmd' => $this->type, 'job_token' => '' ],
461  __METHOD__,
462  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
463  ')'
464  ],
465  __METHOD__
466  );
467  }
468  // Fetch any row that we just reserved...
469  if ( $dbw->affectedRows() ) {
470  $row = $dbw->selectRow( 'job', self::selectFields(),
471  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
472  );
473  if ( !$row ) { // raced out by duplicate job removal
474  wfDebug( "Row deleted as duplicate by another process.\n" );
475  }
476  } else {
477  break; // nothing to do
478  }
479  } while ( !$row );
480 
481  return $row;
482  }
483 
489  protected function doAck( RunnableJob $job ) {
490  $id = $job->getMetadata( 'id' );
491  if ( $id === null ) {
492  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
493  }
494 
495  $dbw = $this->getMasterDB();
497  $scope = $this->getScopedNoTrxFlag( $dbw );
498  try {
499  // Delete a row with a single DELETE without holding row locks over RTTs...
500  $dbw->delete(
501  'job',
502  [ 'job_cmd' => $this->type, 'job_id' => $id ],
503  __METHOD__
504  );
505 
506  $this->incrStats( 'acks', $this->type );
507  } catch ( DBError $e ) {
508  $this->throwDBException( $e );
509  }
510  }
511 
519  $params = $job->getParams();
520  if ( !isset( $params['rootJobSignature'] ) ) {
521  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
522  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
523  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
524  }
525  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
526  // Callers should call JobQueueGroup::push() before this method so that if the insert
527  // fails, the de-duplication registration will be aborted. Since the insert is
528  // deferred till "transaction idle", do the same here, so that the ordering is
529  // maintained. Having only the de-duplication registration succeed would cause
530  // jobs to become no-ops without any actual jobs that made them redundant.
531  $dbw = $this->getMasterDB();
533  $scope = $this->getScopedNoTrxFlag( $dbw );
534 
536  $dbw->onTransactionCommitOrIdle(
537  function () use ( $cache, $params, $key ) {
538  $timestamp = $cache->get( $key ); // current last timestamp of this job
539  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
540  return true; // a newer version of this root job was enqueued
541  }
542 
543  // Update the timestamp of the last root job started at the location...
544  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
545  },
546  __METHOD__
547  );
548 
549  return true;
550  }
551 
556  protected function doDelete() {
557  $dbw = $this->getMasterDB();
559  $scope = $this->getScopedNoTrxFlag( $dbw );
560  try {
561  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
562  } catch ( DBError $e ) {
563  $this->throwDBException( $e );
564  }
565 
566  return true;
567  }
568 
573  protected function doWaitForBackups() {
574  if ( $this->server ) {
575  return; // not using LBFactory instance
576  }
577 
578  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
579  $lbFactory->waitForReplication( [
580  'domain' => $this->domain,
581  'cluster' => is_string( $this->cluster ) ? $this->cluster : false
582  ] );
583  }
584 
588  protected function doFlushCaches() {
589  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
590  $this->cache->delete( $this->getCacheKey( $type ) );
591  }
592  }
593 
598  public function getAllQueuedJobs() {
599  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
600  }
601 
606  public function getAllAcquiredJobs() {
607  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
608  }
609 
614  protected function getJobIterator( array $conds ) {
615  $dbr = $this->getReplicaDB();
617  $scope = $this->getScopedNoTrxFlag( $dbr );
618  try {
619  return new MappedIterator(
620  $dbr->select( 'job', self::selectFields(), $conds ),
621  function ( $row ) {
622  $params = strlen( $row->job_params ) ? unserialize( $row->job_params ) : [];
623  $params = is_array( $params ) ? $params : []; // sanity
624  $params += [
625  'namespace' => $row->job_namespace,
626  'title' => $row->job_title
627  ];
628 
629  $job = $this->factoryJob( $row->job_cmd, $params );
630  $job->setMetadata( 'id', $row->job_id );
631  $job->setMetadata( 'timestamp', $row->job_timestamp );
632 
633  return $job;
634  }
635  );
636  } catch ( DBError $e ) {
637  $this->throwDBException( $e );
638  }
639  }
640 
641  public function getCoalesceLocationInternal() {
642  if ( $this->server ) {
643  return null; // not using the LBFactory instance
644  }
645 
646  return is_string( $this->cluster )
647  ? "DBCluster:{$this->cluster}:{$this->domain}"
648  : "LBFactory:{$this->domain}";
649  }
650 
651  protected function doGetSiblingQueuesWithJobs( array $types ) {
652  $dbr = $this->getReplicaDB();
654  $scope = $this->getScopedNoTrxFlag( $dbr );
655  // @note: this does not check whether the jobs are claimed or not.
656  // This is useful so JobQueueGroup::pop() also sees queues that only
657  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
658  // failed jobs so that they can be popped again for that edge case.
659  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
660  [ 'job_cmd' => $types ], __METHOD__ );
661 
662  $types = [];
663  foreach ( $res as $row ) {
664  $types[] = $row->job_cmd;
665  }
666 
667  return $types;
668  }
669 
670  protected function doGetSiblingQueueSizes( array $types ) {
671  $dbr = $this->getReplicaDB();
673  $scope = $this->getScopedNoTrxFlag( $dbr );
674 
675  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
676  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
677 
678  $sizes = [];
679  foreach ( $res as $row ) {
680  $sizes[$row->job_cmd] = (int)$row->count;
681  }
682 
683  return $sizes;
684  }
685 
691  public function recycleAndDeleteStaleJobs() {
692  $now = time();
693  $count = 0; // affected rows
694  $dbw = $this->getMasterDB();
696  $scope = $this->getScopedNoTrxFlag( $dbw );
697 
698  try {
699  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
700  return $count; // already in progress
701  }
702 
703  // Remove claims on jobs acquired for too long if enabled...
704  if ( $this->claimTTL > 0 ) {
705  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
706  // Get the IDs of jobs that have be claimed but not finished after too long.
707  // These jobs can be recycled into the queue by expiring the claim. Selecting
708  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
709  $res = $dbw->select( 'job', 'job_id',
710  [
711  'job_cmd' => $this->type,
712  "job_token != {$dbw->addQuotes( '' )}", // was acquired
713  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
714  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
715  __METHOD__
716  );
717  $ids = array_map(
718  function ( $o ) {
719  return $o->job_id;
720  }, iterator_to_array( $res )
721  );
722  if ( count( $ids ) ) {
723  // Reset job_token for these jobs so that other runners will pick them up.
724  // Set the timestamp to the current time, as it is useful to now that the job
725  // was already tried before (the timestamp becomes the "released" time).
726  $dbw->update( 'job',
727  [
728  'job_token' => '',
729  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
730  [
731  'job_id' => $ids ],
732  __METHOD__
733  );
734  $affected = $dbw->affectedRows();
735  $count += $affected;
736  $this->incrStats( 'recycles', $this->type, $affected );
737  }
738  }
739 
740  // Just destroy any stale jobs...
741  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
742  $conds = [
743  'job_cmd' => $this->type,
744  "job_token != {$dbw->addQuotes( '' )}", // was acquired
745  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
746  ];
747  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
748  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
749  }
750  // Get the IDs of jobs that are considered stale and should be removed. Selecting
751  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
752  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
753  $ids = array_map(
754  function ( $o ) {
755  return $o->job_id;
756  }, iterator_to_array( $res )
757  );
758  if ( count( $ids ) ) {
759  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
760  $affected = $dbw->affectedRows();
761  $count += $affected;
762  $this->incrStats( 'abandons', $this->type, $affected );
763  }
764 
765  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
766  } catch ( DBError $e ) {
767  $this->throwDBException( $e );
768  }
769 
770  return $count;
771  }
772 
778  protected function insertFields( IJobSpecification $job, IDatabase $db ) {
779  return [
780  // Fields that describe the nature of the job
781  'job_cmd' => $job->getType(),
782  'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
783  'job_title' => $job->getParams()['title'] ?? '',
784  'job_params' => self::makeBlob( $job->getParams() ),
785  // Additional job metadata
786  'job_timestamp' => $db->timestamp(),
787  'job_sha1' => Wikimedia\base_convert(
788  sha1( serialize( $job->getDeduplicationInfo() ) ),
789  16, 36, 31
790  ),
791  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
792  ];
793  }
794 
799  protected function getReplicaDB() {
800  try {
801  return $this->getDB( DB_REPLICA );
802  } catch ( DBConnectionError $e ) {
803  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
804  }
805  }
806 
811  protected function getMasterDB() {
812  try {
813  return $this->getDB( DB_MASTER );
814  } catch ( DBConnectionError $e ) {
815  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
816  }
817  }
818 
823  protected function getDB( $index ) {
824  if ( $this->server ) {
825  if ( $this->conn instanceof IDatabase ) {
826  return $this->conn;
827  } elseif ( $this->conn instanceof DBError ) {
828  throw $this->conn;
829  }
830 
831  try {
832  $this->conn = Database::factory( $this->server['type'], $this->server );
833  } catch ( DBError $e ) {
834  $this->conn = $e;
835  throw $e;
836  }
837 
838  return $this->conn;
839  } else {
840  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
841  $lb = is_string( $this->cluster )
842  ? $lbFactory->getExternalLB( $this->cluster )
843  : $lbFactory->getMainLB( $this->domain );
844 
845  return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
846  // Keep a separate connection to avoid contention and deadlocks;
847  // However, SQLite has the opposite behavior due to DB-level locking.
848  ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
849  // Jobs insertion will be defered until the PRESEND stage to reduce contention.
850  : $lb->getConnectionRef( $index, [], $this->domain );
851  }
852  }
853 
858  private function getScopedNoTrxFlag( IDatabase $db ) {
859  $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
860  $db->clearFlag( DBO_TRX ); // make each query its own transaction
861 
862  return new ScopedCallback( function () use ( $db, $autoTrx ) {
863  if ( $autoTrx ) {
864  $db->setFlag( DBO_TRX ); // restore old setting
865  }
866  } );
867  }
868 
873  private function getCacheKey( $property ) {
874  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
875 
876  return $this->cache->makeGlobalKey(
877  'jobqueue',
878  $this->domain,
879  $cluster,
880  $this->type,
881  $property
882  );
883  }
884 
889  protected static function makeBlob( $params ) {
890  if ( $params !== false ) {
891  return serialize( $params );
892  } else {
893  return '';
894  }
895  }
896 
901  protected static function extractBlob( $blob ) {
902  if ( (string)$blob !== '' ) {
903  return unserialize( $blob );
904  } else {
905  return false;
906  }
907  }
908 
913  protected function throwDBException( DBError $e ) {
914  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
915  }
916 
922  public static function selectFields() {
923  return [
924  'job_id',
925  'job_cmd',
926  'job_namespace',
927  'job_title',
928  'job_timestamp',
929  'job_params',
930  'job_random',
931  'job_attempts',
932  'job_token',
933  'job_token_timestamp',
934  'job_sha1',
935  ];
936  }
937 }
The wiki should then use memcached to cache various data To use multiple just add more items to the array To increase the weight of a make its entry a array("192.168.0.1:11211", 2))
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:238
factoryJob( $command, $params)
Definition: JobQueue.php:694
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition: hooks.txt:2633
insertFields(IJobSpecification $job, IDatabase $db)
Definition: JobQueueDB.php:778
$property
getCoalesceLocationInternal()
Definition: JobQueueDB.php:641
serialize()
getDB( $index)
Definition: JobQueueDB.php:823
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:61
getScopedNoTrxFlag(IDatabase $db)
Definition: JobQueueDB.php:858
set( $key, $value, $ttl=self::TTL_INDEFINITE, array $opts=[])
Set the value of a key in cache.
IDatabase DBError null $conn
Definition: JobQueueDB.php:44
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2159
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:651
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:716
const NS_SPECIAL
Definition: Defines.php:53
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:37
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:35
BagOStuff $dupCache
Definition: JobQueue.php:48
const DB_MASTER
Definition: defines.php:26
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by wfTimestamp() to the format used for inserting ...
static makeBlob( $params)
Definition: JobQueueDB.php:889
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:691
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
throwDBException(DBError $e)
Definition: JobQueueDB.php:913
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:348
string $type
Job type.
Definition: JobQueue.php:35
static newEmpty()
Get an instance that wraps EmptyBagOStuff.
array null $server
Server configuration array.
Definition: JobQueueDB.php:47
getAllAcquiredJobs()
Definition: JobQueueDB.php:606
getMetadata( $field=null)
static extractBlob( $blob)
Definition: JobQueueDB.php:901
getRootJobCacheKey( $signature)
Definition: JobQueue.php:521
const MAX_OFFSET
Definition: JobQueueDB.php:39
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
$res
Definition: database.txt:21
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
getDeduplicationInfo()
Subclasses may need to override this to make duplication detection work.
get( $key, &$curTTL=null, array $checkKeys=[], &$info=null)
Fetch the value of a key from cache.
$params
unserialize( $serialized)
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:36
doGetAcquiredCount()
Definition: JobQueueDB.php:132
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:780
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:38
getJobIterator(array $conds)
Definition: JobQueueDB.php:614
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:205
Convenience class for generating iterators from iterators.
supportedOrders()
Definition: JobQueueDB.php:73
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings, LocalSettings).
Definition: Setup.php:123
const DBO_TRX
Definition: defines.php:12
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:518
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
doGetAbandonedCount()
Definition: JobQueueDB.php:165
doWaitForBackups()
Definition: JobQueueDB.php:573
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
Definition: JobQueueDB.php:49
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count( $args)< 1) $job
const ROOTJOB_TTL
Definition: JobQueue.php:52
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:670
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:922
Interface for serializable objects that describe a job queue task.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such and we might be restricted by PHP settings such as safe mode or open_basedir We cannot assume that the software even has read access anywhere useful Many shared hosts run all users web applications under the same so they can t rely on Unix and must forbid reads to even standard directories like tmp lest users read each others files We cannot assume that the user has the ability to install or run any programs not written as web accessible PHP scripts Since anything that works on cheap shared hosting will work if you have shell or root access MediaWiki s design is based around catering to the lowest common denominator Although we support higher end setups as the way many things work by default is tailored toward shared hosting These defaults are unconventional from the point of view of and they certainly aren t ideal for someone who s installing MediaWiki as MediaWiki does not conform to normal Unix filesystem layout Hopefully we ll offer direct support for standard layouts in the but for now *any change to the location of files is unsupported *Moving things and leaving symlinks will *probably *not break but it is *strongly *advised not to try any more intrusive changes to get MediaWiki to conform more closely to your filesystem hierarchy Any such attempt will almost certainly result in unnecessary bugs The standard recommended location to install relative to the web is it should be possible to enable the appropriate rewrite rules by if you can reconfigure the web server
doAck(RunnableJob $job)
Definition: JobQueueDB.php:489
const DB_REPLICA
Definition: defines.php:25
getCacheKey( $property)
Definition: JobQueueDB.php:873
WANObjectCache $cache
Definition: JobQueueDB.php:42
getAllQueuedJobs()
Definition: JobQueueDB.php:598
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
Database error base class.
Definition: DBError.php:30
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:427