MediaWiki  1.29.1
JobQueueDB.php
Go to the documentation of this file.
1 <?php
28 use Wikimedia\ScopedCallback;
29 
36 class JobQueueDB extends JobQueue {
37  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
38  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
39  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
40  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
41 
43  protected $cache;
44 
46  protected $cluster = false;
47 
56  protected function __construct( array $params ) {
57  parent::__construct( $params );
58 
59  $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
61  }
62 
63  protected function supportedOrders() {
64  return [ 'random', 'timestamp', 'fifo' ];
65  }
66 
67  protected function optimalOrder() {
68  return 'random';
69  }
70 
75  protected function doIsEmpty() {
76  $dbr = $this->getReplicaDB();
77  try {
78  $found = $dbr->selectField( // unclaimed job
79  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
80  );
81  } catch ( DBError $e ) {
82  $this->throwDBException( $e );
83  }
84 
85  return !$found;
86  }
87 
92  protected function doGetSize() {
93  $key = $this->getCacheKey( 'size' );
94 
95  $size = $this->cache->get( $key );
96  if ( is_int( $size ) ) {
97  return $size;
98  }
99 
100  try {
101  $dbr = $this->getReplicaDB();
102  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
103  [ 'job_cmd' => $this->type, 'job_token' => '' ],
104  __METHOD__
105  );
106  } catch ( DBError $e ) {
107  $this->throwDBException( $e );
108  }
109  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
110 
111  return $size;
112  }
113 
118  protected function doGetAcquiredCount() {
119  if ( $this->claimTTL <= 0 ) {
120  return 0; // no acknowledgements
121  }
122 
123  $key = $this->getCacheKey( 'acquiredcount' );
124 
125  $count = $this->cache->get( $key );
126  if ( is_int( $count ) ) {
127  return $count;
128  }
129 
130  $dbr = $this->getReplicaDB();
131  try {
132  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
133  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
134  __METHOD__
135  );
136  } catch ( DBError $e ) {
137  $this->throwDBException( $e );
138  }
139  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
140 
141  return $count;
142  }
143 
149  protected function doGetAbandonedCount() {
150  if ( $this->claimTTL <= 0 ) {
151  return 0; // no acknowledgements
152  }
153 
154  $key = $this->getCacheKey( 'abandonedcount' );
155 
156  $count = $this->cache->get( $key );
157  if ( is_int( $count ) ) {
158  return $count;
159  }
160 
161  $dbr = $this->getReplicaDB();
162  try {
163  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
164  [
165  'job_cmd' => $this->type,
166  "job_token != {$dbr->addQuotes( '' )}",
167  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
168  ],
169  __METHOD__
170  );
171  } catch ( DBError $e ) {
172  $this->throwDBException( $e );
173  }
174 
175  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
176 
177  return $count;
178  }
179 
187  protected function doBatchPush( array $jobs, $flags ) {
189  new AutoCommitUpdate(
190  $this->getMasterDB(),
191  __METHOD__,
192  function ( IDatabase $dbw, $fname ) use ( $jobs, $flags ) {
193  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
194  }
195  ),
197  );
198  }
199 
210  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
211  if ( !count( $jobs ) ) {
212  return;
213  }
214 
215  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
216  $rowList = []; // list of jobs for jobs that are not de-duplicated
217  foreach ( $jobs as $job ) {
218  $row = $this->insertFields( $job );
219  if ( $job->ignoreDuplicates() ) {
220  $rowSet[$row['job_sha1']] = $row;
221  } else {
222  $rowList[] = $row;
223  }
224  }
225 
226  if ( $flags & self::QOS_ATOMIC ) {
227  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
228  }
229  try {
230  // Strip out any duplicate jobs that are already in the queue...
231  if ( count( $rowSet ) ) {
232  $res = $dbw->select( 'job', 'job_sha1',
233  [
234  // No job_type condition since it's part of the job_sha1 hash
235  'job_sha1' => array_keys( $rowSet ),
236  'job_token' => '' // unclaimed
237  ],
238  $method
239  );
240  foreach ( $res as $row ) {
241  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
242  unset( $rowSet[$row->job_sha1] ); // already enqueued
243  }
244  }
245  // Build the full list of job rows to insert
246  $rows = array_merge( $rowList, array_values( $rowSet ) );
247  // Insert the job rows in chunks to avoid replica DB lag...
248  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
249  $dbw->insert( 'job', $rowBatch, $method );
250  }
251  JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
252  JobQueue::incrStats( 'dupe_inserts', $this->type,
253  count( $rowSet ) + count( $rowList ) - count( $rows )
254  );
255  } catch ( DBError $e ) {
256  $this->throwDBException( $e );
257  }
258  if ( $flags & self::QOS_ATOMIC ) {
259  $dbw->endAtomic( $method );
260  }
261 
262  return;
263  }
264 
269  protected function doPop() {
270  $dbw = $this->getMasterDB();
271  try {
272  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
273  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
274  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
275  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
276  } );
277 
278  $uuid = wfRandomString( 32 ); // pop attempt
279  $job = false; // job popped off
280  do { // retry when our row is invalid or deleted as a duplicate
281  // Try to reserve a row in the DB...
282  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
283  $row = $this->claimOldest( $uuid );
284  } else { // random first
285  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
286  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
287  $row = $this->claimRandom( $uuid, $rand, $gte );
288  }
289  // Check if we found a row to reserve...
290  if ( !$row ) {
291  break; // nothing to do
292  }
293  JobQueue::incrStats( 'pops', $this->type );
294  // Get the job object from the row...
295  $title = Title::makeTitle( $row->job_namespace, $row->job_title );
296  $job = Job::factory( $row->job_cmd, $title,
297  self::extractBlob( $row->job_params ), $row->job_id );
298  $job->metadata['id'] = $row->job_id;
299  $job->metadata['timestamp'] = $row->job_timestamp;
300  break; // done
301  } while ( true );
302 
303  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
304  // Handled jobs that need to be recycled/deleted;
305  // any recycled jobs will be picked up next attempt
306  $this->recycleAndDeleteStaleJobs();
307  }
308  } catch ( DBError $e ) {
309  $this->throwDBException( $e );
310  }
311 
312  return $job;
313  }
314 
323  protected function claimRandom( $uuid, $rand, $gte ) {
324  $dbw = $this->getMasterDB();
325  // Check cache to see if the queue has <= OFFSET items
326  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
327 
328  $row = false; // the row acquired
329  $invertedDirection = false; // whether one job_random direction was already scanned
330  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
331  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
332  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
333  // be used here with MySQL.
334  do {
335  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
336  // For small queues, using OFFSET will overshoot and return no rows more often.
337  // Instead, this uses job_random to pick a row (possibly checking both directions).
338  $ineq = $gte ? '>=' : '<=';
339  $dir = $gte ? 'ASC' : 'DESC';
340  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
341  [
342  'job_cmd' => $this->type,
343  'job_token' => '', // unclaimed
344  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
345  __METHOD__,
346  [ 'ORDER BY' => "job_random {$dir}" ]
347  );
348  if ( !$row && !$invertedDirection ) {
349  $gte = !$gte;
350  $invertedDirection = true;
351  continue; // try the other direction
352  }
353  } else { // table *may* have >= MAX_OFFSET rows
354  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
355  // in MySQL if there are many rows for some reason. This uses a small OFFSET
356  // instead of job_random for reducing excess claim retries.
357  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
358  [
359  'job_cmd' => $this->type,
360  'job_token' => '', // unclaimed
361  ],
362  __METHOD__,
363  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
364  );
365  if ( !$row ) {
366  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
367  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
368  continue; // use job_random
369  }
370  }
371 
372  if ( $row ) { // claim the job
373  $dbw->update( 'job', // update by PK
374  [
375  'job_token' => $uuid,
376  'job_token_timestamp' => $dbw->timestamp(),
377  'job_attempts = job_attempts+1' ],
378  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
379  __METHOD__
380  );
381  // This might get raced out by another runner when claiming the previously
382  // selected row. The use of job_random should minimize this problem, however.
383  if ( !$dbw->affectedRows() ) {
384  $row = false; // raced out
385  }
386  } else {
387  break; // nothing to do
388  }
389  } while ( !$row );
390 
391  return $row;
392  }
393 
400  protected function claimOldest( $uuid ) {
401  $dbw = $this->getMasterDB();
402 
403  $row = false; // the row acquired
404  do {
405  if ( $dbw->getType() === 'mysql' ) {
406  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
407  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
408  // Oracle and Postgre have no such limitation. However, MySQL offers an
409  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
410  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
411  "SET " .
412  "job_token = {$dbw->addQuotes( $uuid ) }, " .
413  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
414  "job_attempts = job_attempts+1 " .
415  "WHERE ( " .
416  "job_cmd = {$dbw->addQuotes( $this->type )} " .
417  "AND job_token = {$dbw->addQuotes( '' )} " .
418  ") ORDER BY job_id ASC LIMIT 1",
419  __METHOD__
420  );
421  } else {
422  // Use a subquery to find the job, within an UPDATE to claim it.
423  // This uses as much of the DB wrapper functions as possible.
424  $dbw->update( 'job',
425  [
426  'job_token' => $uuid,
427  'job_token_timestamp' => $dbw->timestamp(),
428  'job_attempts = job_attempts+1' ],
429  [ 'job_id = (' .
430  $dbw->selectSQLText( 'job', 'job_id',
431  [ 'job_cmd' => $this->type, 'job_token' => '' ],
432  __METHOD__,
433  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
434  ')'
435  ],
436  __METHOD__
437  );
438  }
439  // Fetch any row that we just reserved...
440  if ( $dbw->affectedRows() ) {
441  $row = $dbw->selectRow( 'job', self::selectFields(),
442  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
443  );
444  if ( !$row ) { // raced out by duplicate job removal
445  wfDebug( "Row deleted as duplicate by another process.\n" );
446  }
447  } else {
448  break; // nothing to do
449  }
450  } while ( !$row );
451 
452  return $row;
453  }
454 
460  protected function doAck( Job $job ) {
461  if ( !isset( $job->metadata['id'] ) ) {
462  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
463  }
464 
465  $dbw = $this->getMasterDB();
466  try {
467  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
468  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
469  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
470  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
471  } );
472 
473  // Delete a row with a single DELETE without holding row locks over RTTs...
474  $dbw->delete( 'job',
475  [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
476 
477  JobQueue::incrStats( 'acks', $this->type );
478  } catch ( DBError $e ) {
479  $this->throwDBException( $e );
480  }
481  }
482 
490  $params = $job->getParams();
491  if ( !isset( $params['rootJobSignature'] ) ) {
492  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
493  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
494  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
495  }
496  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
497  // Callers should call batchInsert() and then this function so that if the insert
498  // fails, the de-duplication registration will be aborted. Since the insert is
499  // deferred till "transaction idle", do the same here, so that the ordering is
500  // maintained. Having only the de-duplication registration succeed would cause
501  // jobs to become no-ops without any actual jobs that made them redundant.
502  $dbw = $this->getMasterDB();
504  $dbw->onTransactionIdle(
505  function () use ( $cache, $params, $key, $dbw ) {
506  $timestamp = $cache->get( $key ); // current last timestamp of this job
507  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
508  return true; // a newer version of this root job was enqueued
509  }
510 
511  // Update the timestamp of the last root job started at the location...
512  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
513  },
514  __METHOD__
515  );
516 
517  return true;
518  }
519 
524  protected function doDelete() {
525  $dbw = $this->getMasterDB();
526  try {
527  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
528  } catch ( DBError $e ) {
529  $this->throwDBException( $e );
530  }
531 
532  return true;
533  }
534 
539  protected function doWaitForBackups() {
540  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
541  $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
542  }
543 
547  protected function doFlushCaches() {
548  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
549  $this->cache->delete( $this->getCacheKey( $type ) );
550  }
551  }
552 
557  public function getAllQueuedJobs() {
558  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
559  }
560 
565  public function getAllAcquiredJobs() {
566  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
567  }
568 
573  protected function getJobIterator( array $conds ) {
574  $dbr = $this->getReplicaDB();
575  try {
576  return new MappedIterator(
577  $dbr->select( 'job', self::selectFields(), $conds ),
578  function ( $row ) {
579  $job = Job::factory(
580  $row->job_cmd,
581  Title::makeTitle( $row->job_namespace, $row->job_title ),
582  strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
583  );
584  $job->metadata['id'] = $row->job_id;
585  $job->metadata['timestamp'] = $row->job_timestamp;
586 
587  return $job;
588  }
589  );
590  } catch ( DBError $e ) {
591  $this->throwDBException( $e );
592  }
593  }
594 
595  public function getCoalesceLocationInternal() {
596  return $this->cluster
597  ? "DBCluster:{$this->cluster}:{$this->wiki}"
598  : "LBFactory:{$this->wiki}";
599  }
600 
601  protected function doGetSiblingQueuesWithJobs( array $types ) {
602  $dbr = $this->getReplicaDB();
603  // @note: this does not check whether the jobs are claimed or not.
604  // This is useful so JobQueueGroup::pop() also sees queues that only
605  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
606  // failed jobs so that they can be popped again for that edge case.
607  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
608  [ 'job_cmd' => $types ], __METHOD__ );
609 
610  $types = [];
611  foreach ( $res as $row ) {
612  $types[] = $row->job_cmd;
613  }
614 
615  return $types;
616  }
617 
618  protected function doGetSiblingQueueSizes( array $types ) {
619  $dbr = $this->getReplicaDB();
620  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
621  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
622 
623  $sizes = [];
624  foreach ( $res as $row ) {
625  $sizes[$row->job_cmd] = (int)$row->count;
626  }
627 
628  return $sizes;
629  }
630 
636  public function recycleAndDeleteStaleJobs() {
637  $now = time();
638  $count = 0; // affected rows
639  $dbw = $this->getMasterDB();
640 
641  try {
642  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
643  return $count; // already in progress
644  }
645 
646  // Remove claims on jobs acquired for too long if enabled...
647  if ( $this->claimTTL > 0 ) {
648  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
649  // Get the IDs of jobs that have be claimed but not finished after too long.
650  // These jobs can be recycled into the queue by expiring the claim. Selecting
651  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
652  $res = $dbw->select( 'job', 'job_id',
653  [
654  'job_cmd' => $this->type,
655  "job_token != {$dbw->addQuotes( '' )}", // was acquired
656  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
657  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
658  __METHOD__
659  );
660  $ids = array_map(
661  function ( $o ) {
662  return $o->job_id;
663  }, iterator_to_array( $res )
664  );
665  if ( count( $ids ) ) {
666  // Reset job_token for these jobs so that other runners will pick them up.
667  // Set the timestamp to the current time, as it is useful to now that the job
668  // was already tried before (the timestamp becomes the "released" time).
669  $dbw->update( 'job',
670  [
671  'job_token' => '',
672  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
673  [
674  'job_id' => $ids ],
675  __METHOD__
676  );
677  $affected = $dbw->affectedRows();
678  $count += $affected;
679  JobQueue::incrStats( 'recycles', $this->type, $affected );
680  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
681  }
682  }
683 
684  // Just destroy any stale jobs...
685  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
686  $conds = [
687  'job_cmd' => $this->type,
688  "job_token != {$dbw->addQuotes( '' )}", // was acquired
689  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
690  ];
691  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
692  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
693  }
694  // Get the IDs of jobs that are considered stale and should be removed. Selecting
695  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
696  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
697  $ids = array_map(
698  function ( $o ) {
699  return $o->job_id;
700  }, iterator_to_array( $res )
701  );
702  if ( count( $ids ) ) {
703  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
704  $affected = $dbw->affectedRows();
705  $count += $affected;
706  JobQueue::incrStats( 'abandons', $this->type, $affected );
707  }
708 
709  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
710  } catch ( DBError $e ) {
711  $this->throwDBException( $e );
712  }
713 
714  return $count;
715  }
716 
721  protected function insertFields( IJobSpecification $job ) {
722  $dbw = $this->getMasterDB();
723 
724  return [
725  // Fields that describe the nature of the job
726  'job_cmd' => $job->getType(),
727  'job_namespace' => $job->getTitle()->getNamespace(),
728  'job_title' => $job->getTitle()->getDBkey(),
729  'job_params' => self::makeBlob( $job->getParams() ),
730  // Additional job metadata
731  'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
732  'job_timestamp' => $dbw->timestamp(),
733  'job_sha1' => Wikimedia\base_convert(
734  sha1( serialize( $job->getDeduplicationInfo() ) ),
735  16, 36, 31
736  ),
737  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
738  ];
739  }
740 
745  protected function getReplicaDB() {
746  try {
747  return $this->getDB( DB_REPLICA );
748  } catch ( DBConnectionError $e ) {
749  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
750  }
751  }
752 
757  protected function getMasterDB() {
758  try {
759  return $this->getDB( DB_MASTER );
760  } catch ( DBConnectionError $e ) {
761  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
762  }
763  }
764 
769  protected function getDB( $index ) {
770  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
771  $lb = ( $this->cluster !== false )
772  ? $lbFactory->getExternalLB( $this->cluster, $this->wiki )
773  : $lbFactory->getMainLB( $this->wiki );
774 
775  return $lb->getConnectionRef( $index, [], $this->wiki );
776  }
777 
782  private function getCacheKey( $property ) {
783  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
784  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
785 
786  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
787  }
788 
793  protected static function makeBlob( $params ) {
794  if ( $params !== false ) {
795  return serialize( $params );
796  } else {
797  return '';
798  }
799  }
800 
805  protected static function extractBlob( $blob ) {
806  if ( (string)$blob !== '' ) {
807  return unserialize( $blob );
808  } else {
809  return false;
810  }
811  }
812 
817  protected function throwDBException( DBError $e ) {
818  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
819  }
820 
826  public static function selectFields() {
827  return [
828  'job_id',
829  'job_cmd',
830  'job_namespace',
831  'job_title',
832  'job_timestamp',
833  'job_params',
834  'job_random',
835  'job_attempts',
836  'job_token',
837  'job_token_timestamp',
838  'job_sha1',
839  ];
840  }
841 }
JobQueueDB\MAX_AGE_PRUNE
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:38
JobQueueDB\doBatchPushInternal
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:210
MappedIterator
Convenience class for generating iterators from iterators.
Definition: MappedIterator.php:29
JobQueueDB\doWaitForBackups
doWaitForBackups()
Definition: JobQueueDB.php:539
JobQueueDB\doFlushCaches
doFlushCaches()
Definition: JobQueueDB.php:547
JobQueueDB\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:601
JobQueueDB\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueDB.php:595
JobQueueDB\getAllAcquiredJobs
getAllAcquiredJobs()
Definition: JobQueueDB.php:565
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
JobQueue\incrStats
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:710
JobQueueDB\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:618
JobQueueDB\throwDBException
throwDBException(DBError $e)
Definition: JobQueueDB.php:817
AutoCommitUpdate
Deferrable Update for closure/callback updates that should use auto-commit mode.
Definition: AutoCommitUpdate.php:9
captcha-old.count
count
Definition: captcha-old.py:225
JobQueueDB\doGetSize
doGetSize()
Definition: JobQueueDB.php:92
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
Wikimedia\Rdbms\IDatabase\endAtomic
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
use
as see the revision history and available at free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to use
Definition: MIT-LICENSE.txt:10
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
Definition: DeferredUpdates.php:76
$fname
if(!defined( 'MEDIAWIKI')) $fname
This file is not a valid entry point, perform no further processing unless MEDIAWIKI is defined.
Definition: Setup.php:36
unserialize
unserialize( $serialized)
Definition: ApiMessage.php:185
JobQueueDB\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueDB.php:67
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:53
JobQueueDB\getDB
getDB( $index)
Definition: JobQueueDB.php:769
$params
$params
Definition: styleTest.css.php:40
JobQueueDB\__construct
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:56
WANObjectCache\set
set( $key, $value, $ttl=0, array $opts=[])
Set the value of a key in cache.
Definition: WANObjectCache.php:433
serialize
serialize()
Definition: ApiMessage.php:177
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:3027
JobQueueDB\getMasterDB
getMasterDB()
Definition: JobQueueDB.php:757
JobQueueDB\insertFields
insertFields(IJobSpecification $job)
Definition: JobQueueDB.php:721
$res
$res
Definition: database.txt:21
JobQueueDB
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:36
$lbFactory
$lbFactory
Definition: doMaintenance.php:117
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueDB\CACHE_TTL_SHORT
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:37
Wikimedia\Rdbms\DBError
Database error base class.
Definition: DBError.php:30
Wikimedia\Rdbms\IDatabase\insert
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
DBO_TRX
const DBO_TRX
Definition: defines.php:12
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
Wikimedia\Rdbms\IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:40
JobQueueDB\getReplicaDB
getReplicaDB()
Definition: JobQueueDB.php:745
JobQueueDB\getJobIterator
getJobIterator(array $conds)
Definition: JobQueueDB.php:573
JobQueueDB\makeBlob
static makeBlob( $params)
Definition: JobQueueDB.php:793
JobQueueDB\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueDB.php:63
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
IJobSpecification\getType
getType()
Job\factory
static factory( $command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:68
MWException
MediaWiki exception.
Definition: MWException.php:26
$title
namespace and then decline to actually register it file or subcat img or subcat $title
Definition: hooks.txt:934
$property
$property
Definition: styleTest.css.php:44
JobQueueDB\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:489
$blob
$blob
Definition: testCompression.php:63
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:36
JobQueueDB\recycleAndDeleteStaleJobs
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:636
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:47
JobQueueDB\MAX_OFFSET
const MAX_OFFSET
Definition: JobQueueDB.php:40
JobQueueDB\doBatchPush
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:187
Title\makeTitle
static makeTitle( $ns, $title, $fragment='', $interwiki='')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:514
DB_REPLICA
const DB_REPLICA
Definition: defines.php:25
JobQueueDB\doGetAcquiredCount
doGetAcquiredCount()
Definition: JobQueueDB.php:118
JobQueueError
Definition: JobQueue.php:724
DB_MASTER
const DB_MASTER
Definition: defines.php:26
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
Definition: GlobalFunctions.php:2978
wfDebug
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
Definition: GlobalFunctions.php:999
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
$dir
$dir
Definition: Autoload.php:8
JobQueueDB\extractBlob
static extractBlob( $blob)
Definition: JobQueueDB.php:805
JobQueueDB\doIsEmpty
doIsEmpty()
Definition: JobQueueDB.php:75
WANObjectCache\get
get( $key, &$curTTL=null, array $checkKeys=[], &$asOf=null)
Fetch the value of a key from cache.
Definition: WANObjectCache.php:248
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2122
JobQueueDB\MAX_JOB_RANDOM
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:39
JobQueueDB\doDelete
doDelete()
Definition: JobQueueDB.php:524
JobQueueDB\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueueDB.php:149
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:81
JobQueueDB\getAllQueuedJobs
getAllQueuedJobs()
Definition: JobQueueDB.php:557
JobQueueDB\selectFields
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:826
JobQueueConnectionError
Definition: JobQueue.php:727
JobQueueDB\claimOldest
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:400
JobQueueDB\doAck
doAck(Job $job)
Definition: JobQueueDB.php:460
Wikimedia\Rdbms\DBConnRef
Helper class to handle automatically marking connections as reusable (via RAII pattern) as well handl...
Definition: DBConnRef.php:15
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:529
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
$dbr
if(! $regexes) $dbr
Definition: cleanup.php:94
ObjectCache\getMainWANInstance
static getMainWANInstance()
Get the main WAN cache object.
Definition: ObjectCache.php:370
JobQueueDB\getCacheKey
getCacheKey( $property)
Definition: JobQueueDB.php:782
DeferredUpdates\PRESEND
const PRESEND
Definition: DeferredUpdates.php:60
JobQueueDB\$cache
WANObjectCache $cache
Definition: JobQueueDB.php:43
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:32
Wikimedia\Rdbms\DBConnectionError
Definition: DBConnectionError.php:26
Wikimedia\Rdbms\IDatabase\select
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
JobQueueDB\doPop
doPop()
Definition: JobQueueDB.php:269
order
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2749
JobQueueDB\$cluster
bool string $cluster
Name of an external DB cluster.
Definition: JobQueueDB.php:46
JobQueue\getType
getType()
Definition: JobQueue.php:132
array
the array() calling protocol came about after MediaWiki 1.4rc1.
Wikimedia\Rdbms\IDatabase\startAtomic
startAtomic( $fname=__METHOD__)
Begin an atomic section of statements.
wfRandomString
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Definition: GlobalFunctions.php:336
JobQueueDB\claimRandom
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:323