MediaWiki  1.28.0
JobQueueDB.php
Go to the documentation of this file.
1 <?php
25 
32 class JobQueueDB extends JobQueue {
33  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
34  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
35  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
36  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
37 
39  protected $cache;
40 
42  protected $cluster = false;
43 
52  protected function __construct( array $params ) {
53  parent::__construct( $params );
54 
55  $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
57  }
58 
59  protected function supportedOrders() {
60  return [ 'random', 'timestamp', 'fifo' ];
61  }
62 
63  protected function optimalOrder() {
64  return 'random';
65  }
66 
71  protected function doIsEmpty() {
72  $dbr = $this->getSlaveDB();
73  try {
74  $found = $dbr->selectField( // unclaimed job
75  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
76  );
77  } catch ( DBError $e ) {
78  $this->throwDBException( $e );
79  }
80 
81  return !$found;
82  }
83 
88  protected function doGetSize() {
89  $key = $this->getCacheKey( 'size' );
90 
91  $size = $this->cache->get( $key );
92  if ( is_int( $size ) ) {
93  return $size;
94  }
95 
96  try {
97  $dbr = $this->getSlaveDB();
98  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
99  [ 'job_cmd' => $this->type, 'job_token' => '' ],
100  __METHOD__
101  );
102  } catch ( DBError $e ) {
103  $this->throwDBException( $e );
104  }
105  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
106 
107  return $size;
108  }
109 
114  protected function doGetAcquiredCount() {
115  if ( $this->claimTTL <= 0 ) {
116  return 0; // no acknowledgements
117  }
118 
119  $key = $this->getCacheKey( 'acquiredcount' );
120 
121  $count = $this->cache->get( $key );
122  if ( is_int( $count ) ) {
123  return $count;
124  }
125 
126  $dbr = $this->getSlaveDB();
127  try {
128  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
129  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
130  __METHOD__
131  );
132  } catch ( DBError $e ) {
133  $this->throwDBException( $e );
134  }
135  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
136 
137  return $count;
138  }
139 
145  protected function doGetAbandonedCount() {
146  if ( $this->claimTTL <= 0 ) {
147  return 0; // no acknowledgements
148  }
149 
150  $key = $this->getCacheKey( 'abandonedcount' );
151 
152  $count = $this->cache->get( $key );
153  if ( is_int( $count ) ) {
154  return $count;
155  }
156 
157  $dbr = $this->getSlaveDB();
158  try {
159  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
160  [
161  'job_cmd' => $this->type,
162  "job_token != {$dbr->addQuotes( '' )}",
163  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
164  ],
165  __METHOD__
166  );
167  } catch ( DBError $e ) {
168  $this->throwDBException( $e );
169  }
170 
171  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
172 
173  return $count;
174  }
175 
183  protected function doBatchPush( array $jobs, $flags ) {
184  $dbw = $this->getMasterDB();
185 
186  $method = __METHOD__;
187  $dbw->onTransactionIdle(
188  function () use ( $dbw, $jobs, $flags, $method ) {
189  $this->doBatchPushInternal( $dbw, $jobs, $flags, $method );
190  },
191  __METHOD__
192  );
193  }
194 
205  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
206  if ( !count( $jobs ) ) {
207  return;
208  }
209 
210  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
211  $rowList = []; // list of jobs for jobs that are not de-duplicated
212  foreach ( $jobs as $job ) {
213  $row = $this->insertFields( $job );
214  if ( $job->ignoreDuplicates() ) {
215  $rowSet[$row['job_sha1']] = $row;
216  } else {
217  $rowList[] = $row;
218  }
219  }
220 
221  if ( $flags & self::QOS_ATOMIC ) {
222  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
223  }
224  try {
225  // Strip out any duplicate jobs that are already in the queue...
226  if ( count( $rowSet ) ) {
227  $res = $dbw->select( 'job', 'job_sha1',
228  [
229  // No job_type condition since it's part of the job_sha1 hash
230  'job_sha1' => array_keys( $rowSet ),
231  'job_token' => '' // unclaimed
232  ],
233  $method
234  );
235  foreach ( $res as $row ) {
236  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
237  unset( $rowSet[$row->job_sha1] ); // already enqueued
238  }
239  }
240  // Build the full list of job rows to insert
241  $rows = array_merge( $rowList, array_values( $rowSet ) );
242  // Insert the job rows in chunks to avoid replica DB lag...
243  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
244  $dbw->insert( 'job', $rowBatch, $method );
245  }
246  JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
247  JobQueue::incrStats( 'dupe_inserts', $this->type,
248  count( $rowSet ) + count( $rowList ) - count( $rows )
249  );
250  } catch ( DBError $e ) {
251  $this->throwDBException( $e );
252  }
253  if ( $flags & self::QOS_ATOMIC ) {
254  $dbw->endAtomic( $method );
255  }
256 
257  return;
258  }
259 
264  protected function doPop() {
265  $dbw = $this->getMasterDB();
266  try {
267  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
268  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
269  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
270  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
271  } );
272 
273  $uuid = wfRandomString( 32 ); // pop attempt
274  $job = false; // job popped off
275  do { // retry when our row is invalid or deleted as a duplicate
276  // Try to reserve a row in the DB...
277  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
278  $row = $this->claimOldest( $uuid );
279  } else { // random first
280  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
281  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
282  $row = $this->claimRandom( $uuid, $rand, $gte );
283  }
284  // Check if we found a row to reserve...
285  if ( !$row ) {
286  break; // nothing to do
287  }
288  JobQueue::incrStats( 'pops', $this->type );
289  // Get the job object from the row...
290  $title = Title::makeTitle( $row->job_namespace, $row->job_title );
291  $job = Job::factory( $row->job_cmd, $title,
292  self::extractBlob( $row->job_params ), $row->job_id );
293  $job->metadata['id'] = $row->job_id;
294  $job->metadata['timestamp'] = $row->job_timestamp;
295  break; // done
296  } while ( true );
297 
298  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
299  // Handled jobs that need to be recycled/deleted;
300  // any recycled jobs will be picked up next attempt
301  $this->recycleAndDeleteStaleJobs();
302  }
303  } catch ( DBError $e ) {
304  $this->throwDBException( $e );
305  }
306 
307  return $job;
308  }
309 
318  protected function claimRandom( $uuid, $rand, $gte ) {
319  $dbw = $this->getMasterDB();
320  // Check cache to see if the queue has <= OFFSET items
321  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
322 
323  $row = false; // the row acquired
324  $invertedDirection = false; // whether one job_random direction was already scanned
325  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
326  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
327  // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
328  // be used here with MySQL.
329  do {
330  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
331  // For small queues, using OFFSET will overshoot and return no rows more often.
332  // Instead, this uses job_random to pick a row (possibly checking both directions).
333  $ineq = $gte ? '>=' : '<=';
334  $dir = $gte ? 'ASC' : 'DESC';
335  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
336  [
337  'job_cmd' => $this->type,
338  'job_token' => '', // unclaimed
339  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
340  __METHOD__,
341  [ 'ORDER BY' => "job_random {$dir}" ]
342  );
343  if ( !$row && !$invertedDirection ) {
344  $gte = !$gte;
345  $invertedDirection = true;
346  continue; // try the other direction
347  }
348  } else { // table *may* have >= MAX_OFFSET rows
349  // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
350  // in MySQL if there are many rows for some reason. This uses a small OFFSET
351  // instead of job_random for reducing excess claim retries.
352  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
353  [
354  'job_cmd' => $this->type,
355  'job_token' => '', // unclaimed
356  ],
357  __METHOD__,
358  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
359  );
360  if ( !$row ) {
361  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
362  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
363  continue; // use job_random
364  }
365  }
366 
367  if ( $row ) { // claim the job
368  $dbw->update( 'job', // update by PK
369  [
370  'job_token' => $uuid,
371  'job_token_timestamp' => $dbw->timestamp(),
372  'job_attempts = job_attempts+1' ],
373  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
374  __METHOD__
375  );
376  // This might get raced out by another runner when claiming the previously
377  // selected row. The use of job_random should minimize this problem, however.
378  if ( !$dbw->affectedRows() ) {
379  $row = false; // raced out
380  }
381  } else {
382  break; // nothing to do
383  }
384  } while ( !$row );
385 
386  return $row;
387  }
388 
395  protected function claimOldest( $uuid ) {
396  $dbw = $this->getMasterDB();
397 
398  $row = false; // the row acquired
399  do {
400  if ( $dbw->getType() === 'mysql' ) {
401  // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
402  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
403  // Oracle and Postgre have no such limitation. However, MySQL offers an
404  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
405  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
406  "SET " .
407  "job_token = {$dbw->addQuotes( $uuid ) }, " .
408  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
409  "job_attempts = job_attempts+1 " .
410  "WHERE ( " .
411  "job_cmd = {$dbw->addQuotes( $this->type )} " .
412  "AND job_token = {$dbw->addQuotes( '' )} " .
413  ") ORDER BY job_id ASC LIMIT 1",
414  __METHOD__
415  );
416  } else {
417  // Use a subquery to find the job, within an UPDATE to claim it.
418  // This uses as much of the DB wrapper functions as possible.
419  $dbw->update( 'job',
420  [
421  'job_token' => $uuid,
422  'job_token_timestamp' => $dbw->timestamp(),
423  'job_attempts = job_attempts+1' ],
424  [ 'job_id = (' .
425  $dbw->selectSQLText( 'job', 'job_id',
426  [ 'job_cmd' => $this->type, 'job_token' => '' ],
427  __METHOD__,
428  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
429  ')'
430  ],
431  __METHOD__
432  );
433  }
434  // Fetch any row that we just reserved...
435  if ( $dbw->affectedRows() ) {
436  $row = $dbw->selectRow( 'job', self::selectFields(),
437  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
438  );
439  if ( !$row ) { // raced out by duplicate job removal
440  wfDebug( "Row deleted as duplicate by another process.\n" );
441  }
442  } else {
443  break; // nothing to do
444  }
445  } while ( !$row );
446 
447  return $row;
448  }
449 
455  protected function doAck( Job $job ) {
456  if ( !isset( $job->metadata['id'] ) ) {
457  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
458  }
459 
460  $dbw = $this->getMasterDB();
461  try {
462  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
463  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
464  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
465  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
466  } );
467 
468  // Delete a row with a single DELETE without holding row locks over RTTs...
469  $dbw->delete( 'job',
470  [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
471 
472  JobQueue::incrStats( 'acks', $this->type );
473  } catch ( DBError $e ) {
474  $this->throwDBException( $e );
475  }
476  }
477 
485  $params = $job->getParams();
486  if ( !isset( $params['rootJobSignature'] ) ) {
487  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
488  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
489  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
490  }
491  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
492  // Callers should call batchInsert() and then this function so that if the insert
493  // fails, the de-duplication registration will be aborted. Since the insert is
494  // deferred till "transaction idle", do the same here, so that the ordering is
495  // maintained. Having only the de-duplication registration succeed would cause
496  // jobs to become no-ops without any actual jobs that made them redundant.
497  $dbw = $this->getMasterDB();
499  $dbw->onTransactionIdle(
500  function () use ( $cache, $params, $key, $dbw ) {
501  $timestamp = $cache->get( $key ); // current last timestamp of this job
502  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
503  return true; // a newer version of this root job was enqueued
504  }
505 
506  // Update the timestamp of the last root job started at the location...
507  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
508  },
509  __METHOD__
510  );
511 
512  return true;
513  }
514 
519  protected function doDelete() {
520  $dbw = $this->getMasterDB();
521  try {
522  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
523  } catch ( DBError $e ) {
524  $this->throwDBException( $e );
525  }
526 
527  return true;
528  }
529 
534  protected function doWaitForBackups() {
535  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
536  $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
537  }
538 
542  protected function doFlushCaches() {
543  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
544  $this->cache->delete( $this->getCacheKey( $type ) );
545  }
546  }
547 
552  public function getAllQueuedJobs() {
553  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
554  }
555 
560  public function getAllAcquiredJobs() {
561  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
562  }
563 
568  protected function getJobIterator( array $conds ) {
569  $dbr = $this->getSlaveDB();
570  try {
571  return new MappedIterator(
572  $dbr->select( 'job', self::selectFields(), $conds ),
573  function ( $row ) {
574  $job = Job::factory(
575  $row->job_cmd,
576  Title::makeTitle( $row->job_namespace, $row->job_title ),
577  strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
578  );
579  $job->metadata['id'] = $row->job_id;
580  $job->metadata['timestamp'] = $row->job_timestamp;
581 
582  return $job;
583  }
584  );
585  } catch ( DBError $e ) {
586  $this->throwDBException( $e );
587  }
588  }
589 
590  public function getCoalesceLocationInternal() {
591  return $this->cluster
592  ? "DBCluster:{$this->cluster}:{$this->wiki}"
593  : "LBFactory:{$this->wiki}";
594  }
595 
596  protected function doGetSiblingQueuesWithJobs( array $types ) {
597  $dbr = $this->getSlaveDB();
598  // @note: this does not check whether the jobs are claimed or not.
599  // This is useful so JobQueueGroup::pop() also sees queues that only
600  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
601  // failed jobs so that they can be popped again for that edge case.
602  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
603  [ 'job_cmd' => $types ], __METHOD__ );
604 
605  $types = [];
606  foreach ( $res as $row ) {
607  $types[] = $row->job_cmd;
608  }
609 
610  return $types;
611  }
612 
613  protected function doGetSiblingQueueSizes( array $types ) {
614  $dbr = $this->getSlaveDB();
615  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
616  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
617 
618  $sizes = [];
619  foreach ( $res as $row ) {
620  $sizes[$row->job_cmd] = (int)$row->count;
621  }
622 
623  return $sizes;
624  }
625 
631  public function recycleAndDeleteStaleJobs() {
632  $now = time();
633  $count = 0; // affected rows
634  $dbw = $this->getMasterDB();
635 
636  try {
637  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
638  return $count; // already in progress
639  }
640 
641  // Remove claims on jobs acquired for too long if enabled...
642  if ( $this->claimTTL > 0 ) {
643  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
644  // Get the IDs of jobs that have be claimed but not finished after too long.
645  // These jobs can be recycled into the queue by expiring the claim. Selecting
646  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
647  $res = $dbw->select( 'job', 'job_id',
648  [
649  'job_cmd' => $this->type,
650  "job_token != {$dbw->addQuotes( '' )}", // was acquired
651  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
652  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
653  __METHOD__
654  );
655  $ids = array_map(
656  function ( $o ) {
657  return $o->job_id;
658  }, iterator_to_array( $res )
659  );
660  if ( count( $ids ) ) {
661  // Reset job_token for these jobs so that other runners will pick them up.
662  // Set the timestamp to the current time, as it is useful to now that the job
663  // was already tried before (the timestamp becomes the "released" time).
664  $dbw->update( 'job',
665  [
666  'job_token' => '',
667  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
668  [
669  'job_id' => $ids ],
670  __METHOD__
671  );
672  $affected = $dbw->affectedRows();
673  $count += $affected;
674  JobQueue::incrStats( 'recycles', $this->type, $affected );
675  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
676  }
677  }
678 
679  // Just destroy any stale jobs...
680  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
681  $conds = [
682  'job_cmd' => $this->type,
683  "job_token != {$dbw->addQuotes( '' )}", // was acquired
684  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
685  ];
686  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
687  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
688  }
689  // Get the IDs of jobs that are considered stale and should be removed. Selecting
690  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
691  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
692  $ids = array_map(
693  function ( $o ) {
694  return $o->job_id;
695  }, iterator_to_array( $res )
696  );
697  if ( count( $ids ) ) {
698  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
699  $affected = $dbw->affectedRows();
700  $count += $affected;
701  JobQueue::incrStats( 'abandons', $this->type, $affected );
702  }
703 
704  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
705  } catch ( DBError $e ) {
706  $this->throwDBException( $e );
707  }
708 
709  return $count;
710  }
711 
716  protected function insertFields( IJobSpecification $job ) {
717  $dbw = $this->getMasterDB();
718 
719  return [
720  // Fields that describe the nature of the job
721  'job_cmd' => $job->getType(),
722  'job_namespace' => $job->getTitle()->getNamespace(),
723  'job_title' => $job->getTitle()->getDBkey(),
724  'job_params' => self::makeBlob( $job->getParams() ),
725  // Additional job metadata
726  'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
727  'job_timestamp' => $dbw->timestamp(),
728  'job_sha1' => Wikimedia\base_convert(
729  sha1( serialize( $job->getDeduplicationInfo() ) ),
730  16, 36, 31
731  ),
732  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
733  ];
734  }
735 
740  protected function getSlaveDB() {
741  try {
742  return $this->getDB( DB_REPLICA );
743  } catch ( DBConnectionError $e ) {
744  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
745  }
746  }
747 
752  protected function getMasterDB() {
753  try {
754  return $this->getDB( DB_MASTER );
755  } catch ( DBConnectionError $e ) {
756  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
757  }
758  }
759 
764  protected function getDB( $index ) {
765  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
766  $lb = ( $this->cluster !== false )
767  ? $lbFactory->getExternalLB( $this->cluster, $this->wiki )
768  : $lbFactory->getMainLB( $this->wiki );
769 
770  return $lb->getConnectionRef( $index, [], $this->wiki );
771  }
772 
777  private function getCacheKey( $property ) {
778  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
779  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
780 
781  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
782  }
783 
788  protected static function makeBlob( $params ) {
789  if ( $params !== false ) {
790  return serialize( $params );
791  } else {
792  return '';
793  }
794  }
795 
800  protected static function extractBlob( $blob ) {
801  if ( (string)$blob !== '' ) {
802  return unserialize( $blob );
803  } else {
804  return false;
805  }
806  }
807 
812  protected function throwDBException( DBError $e ) {
813  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
814  }
815 
821  public static function selectFields() {
822  return [
823  'job_id',
824  'job_cmd',
825  'job_namespace',
826  'job_title',
827  'job_timestamp',
828  'job_params',
829  'job_random',
830  'job_attempts',
831  'job_token',
832  'job_token_timestamp',
833  'job_sha1',
834  ];
835  }
836 }
set($key, $value, $ttl=0, array $opts=[])
Set the value of a key in cache.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:205
static extractBlob($blob)
Definition: JobQueueDB.php:800
static incrStats($key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:709
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
static getMainWANInstance()
Get the main WAN cache object.
insert($table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
Database error base class.
Definition: DBError.php:26
the array() calling protocol came about after MediaWiki 1.4rc1.
$property
if(count($args)==0) $dir
wfForeignMemcKey($db, $prefix)
Make a cache key for a foreign DB.
getCoalesceLocationInternal()
Definition: JobQueueDB.php:590
doAck(Job $job)
Definition: JobQueueDB.php:455
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:52
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:2102
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:596
Class to both describe a background job and handle jobs.
Definition: Job.php:31
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2703
get($key, &$curTTL=null, array $checkKeys=[], &$asOf=null)
Fetch the value of a key from cache.
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:34
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:32
BagOStuff $dupCache
Definition: JobQueue.php:46
const DB_MASTER
Definition: defines.php:23
wfRandomString($length=32)
Get a random string containing a number of pseudo-random hex characters.
wfDebug($text, $dest= 'all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:631
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
static makeBlob($params)
Definition: JobQueueDB.php:788
throwDBException(DBError $e)
Definition: JobQueueDB.php:812
string $type
Job type.
Definition: JobQueue.php:35
getAllAcquiredJobs()
Definition: JobQueueDB.php:560
unserialize($serialized)
Definition: ApiMessage.php:102
const MAX_OFFSET
Definition: JobQueueDB.php:36
getRootJobCacheKey($signature)
Definition: JobQueue.php:528
if($limit) $timestamp
$res
Definition: database.txt:21
getDeduplicationInfo()
Subclasses may need to override this to make duplication detection work.
getCacheKey($property)
Definition: JobQueueDB.php:777
$params
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:33
doGetAcquiredCount()
Definition: JobQueueDB.php:114
namespace and then decline to actually register it file or subcat img or subcat $title
Definition: hooks.txt:953
endAtomic($fname=__METHOD__)
Ends an atomic section of SQL statements.
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:35
getJobIterator(array $conds)
Definition: JobQueueDB.php:568
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:183
Convenience class for generating iterators from iterators.
supportedOrders()
Definition: JobQueueDB.php:59
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
getDB($index)
Definition: JobQueueDB.php:764
claimRandom($uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:318
startAtomic($fname=__METHOD__)
Begin an atomic section of statements.
const DBO_TRX
Definition: defines.php:9
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:484
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
doGetAbandonedCount()
Definition: JobQueueDB.php:145
doWaitForBackups()
Definition: JobQueueDB.php:534
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
$lbFactory
static factory($command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:68
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count($args)< 1) $job
const ROOTJOB_TTL
Definition: JobQueue.php:52
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:613
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:821
$count
Job queue task description interface.
claimOldest($uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:395
const DB_REPLICA
Definition: defines.php:22
wfSplitWikiID($wiki)
Split a wiki ID into DB name and table prefix.
serialize()
Definition: ApiMessage.php:94
select($table, $vars, $conds= '', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
WANObjectCache $cache
Definition: JobQueueDB.php:39
getAllQueuedJobs()
Definition: JobQueueDB.php:552
static makeTitle($ns, $title, $fragment= '', $interwiki= '')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:511
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:34
insertFields(IJobSpecification $job)
Definition: JobQueueDB.php:716
bool string $cluster
Name of an external DB cluster.
Definition: JobQueueDB.php:42