MediaWiki  REL1_28
JobQueueDB.php
Go to the documentation of this file.
1 <?php
24 use Wikimedia\ScopedCallback;
25 
32 class JobQueueDB extends JobQueue {
33  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
34  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
35  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
36  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
37 
39  protected $cache;
40 
42  protected $cluster = false;
43 
52  protected function __construct( array $params ) {
53  parent::__construct( $params );
54 
55  $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
57  }
58 
59  protected function supportedOrders() {
60  return [ 'random', 'timestamp', 'fifo' ];
61  }
62 
63  protected function optimalOrder() {
64  return 'random';
65  }
66 
71  protected function doIsEmpty() {
72  $dbr = $this->getSlaveDB();
73  try {
74  $found = $dbr->selectField( // unclaimed job
75  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
76  );
77  } catch ( DBError $e ) {
78  $this->throwDBException( $e );
79  }
80 
81  return !$found;
82  }
83 
88  protected function doGetSize() {
89  $key = $this->getCacheKey( 'size' );
90 
91  $size = $this->cache->get( $key );
92  if ( is_int( $size ) ) {
93  return $size;
94  }
95 
96  try {
97  $dbr = $this->getSlaveDB();
98  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
99  [ 'job_cmd' => $this->type, 'job_token' => '' ],
100  __METHOD__
101  );
102  } catch ( DBError $e ) {
103  $this->throwDBException( $e );
104  }
105  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
106 
107  return $size;
108  }
109 
114  protected function doGetAcquiredCount() {
115  if ( $this->claimTTL <= 0 ) {
116  return 0; // no acknowledgements
117  }
118 
119  $key = $this->getCacheKey( 'acquiredcount' );
120 
121  $count = $this->cache->get( $key );
122  if ( is_int( $count ) ) {
123  return $count;
124  }
125 
126  $dbr = $this->getSlaveDB();
127  try {
128  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
129  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
130  __METHOD__
131  );
132  } catch ( DBError $e ) {
133  $this->throwDBException( $e );
134  }
135  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
136 
137  return $count;
138  }
139 
145  protected function doGetAbandonedCount() {
146  if ( $this->claimTTL <= 0 ) {
147  return 0; // no acknowledgements
148  }
149 
150  $key = $this->getCacheKey( 'abandonedcount' );
151 
152  $count = $this->cache->get( $key );
153  if ( is_int( $count ) ) {
154  return $count;
155  }
156 
157  $dbr = $this->getSlaveDB();
158  try {
159  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
160  [
161  'job_cmd' => $this->type,
162  "job_token != {$dbr->addQuotes( '' )}",
163  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
164  ],
165  __METHOD__
166  );
167  } catch ( DBError $e ) {
168  $this->throwDBException( $e );
169  }
170 
171  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
172 
173  return $count;
174  }
175 
183  protected function doBatchPush( array $jobs, $flags ) {
185  new AutoCommitUpdate(
186  $this->getMasterDB(),
187  __METHOD__,
188  function ( IDatabase $dbw, $fname ) use ( $jobs, $flags ) {
189  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
190  }
191  ),
193  );
194  }
195 
206  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
207  if ( !count( $jobs ) ) {
208  return;
209  }
210 
211  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
212  $rowList = []; // list of jobs for jobs that are not de-duplicated
213  foreach ( $jobs as $job ) {
214  $row = $this->insertFields( $job );
215  if ( $job->ignoreDuplicates() ) {
216  $rowSet[$row['job_sha1']] = $row;
217  } else {
218  $rowList[] = $row;
219  }
220  }
221 
222  if ( $flags & self::QOS_ATOMIC ) {
223  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
224  }
225  try {
226  // Strip out any duplicate jobs that are already in the queue...
227  if ( count( $rowSet ) ) {
228  $res = $dbw->select( 'job', 'job_sha1',
229  [
230  // No job_type condition since it's part of the job_sha1 hash
231  'job_sha1' => array_keys( $rowSet ),
232  'job_token' => '' // unclaimed
233  ],
234  $method
235  );
236  foreach ( $res as $row ) {
237  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
238  unset( $rowSet[$row->job_sha1] ); // already enqueued
239  }
240  }
241  // Build the full list of job rows to insert
242  $rows = array_merge( $rowList, array_values( $rowSet ) );
243  // Insert the job rows in chunks to avoid replica DB lag...
244  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
245  $dbw->insert( 'job', $rowBatch, $method );
246  }
247  JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
248  JobQueue::incrStats( 'dupe_inserts', $this->type,
249  count( $rowSet ) + count( $rowList ) - count( $rows )
250  );
251  } catch ( DBError $e ) {
252  $this->throwDBException( $e );
253  }
254  if ( $flags & self::QOS_ATOMIC ) {
255  $dbw->endAtomic( $method );
256  }
257 
258  return;
259  }
260 
265  protected function doPop() {
266  $dbw = $this->getMasterDB();
267  try {
268  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
269  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
270  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
271  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
272  } );
273 
274  $uuid = wfRandomString( 32 ); // pop attempt
275  $job = false; // job popped off
276  do { // retry when our row is invalid or deleted as a duplicate
277  // Try to reserve a row in the DB...
278  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
279  $row = $this->claimOldest( $uuid );
280  } else { // random first
281  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
282  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
283  $row = $this->claimRandom( $uuid, $rand, $gte );
284  }
285  // Check if we found a row to reserve...
286  if ( !$row ) {
287  break; // nothing to do
288  }
289  JobQueue::incrStats( 'pops', $this->type );
290  // Get the job object from the row...
291  $title = Title::makeTitle( $row->job_namespace, $row->job_title );
292  $job = Job::factory( $row->job_cmd, $title,
293  self::extractBlob( $row->job_params ), $row->job_id );
294  $job->metadata['id'] = $row->job_id;
295  $job->metadata['timestamp'] = $row->job_timestamp;
296  break; // done
297  } while ( true );
298 
299  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
300  // Handled jobs that need to be recycled/deleted;
301  // any recycled jobs will be picked up next attempt
302  $this->recycleAndDeleteStaleJobs();
303  }
304  } catch ( DBError $e ) {
305  $this->throwDBException( $e );
306  }
307 
308  return $job;
309  }
310 
319  protected function claimRandom( $uuid, $rand, $gte ) {
320  $dbw = $this->getMasterDB();
321  // Check cache to see if the queue has <= OFFSET items
322  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
323 
324  $row = false; // the row acquired
325  $invertedDirection = false; // whether one job_random direction was already scanned
326  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
327  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
328  // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
329  // be used here with MySQL.
330  do {
331  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
332  // For small queues, using OFFSET will overshoot and return no rows more often.
333  // Instead, this uses job_random to pick a row (possibly checking both directions).
334  $ineq = $gte ? '>=' : '<=';
335  $dir = $gte ? 'ASC' : 'DESC';
336  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
337  [
338  'job_cmd' => $this->type,
339  'job_token' => '', // unclaimed
340  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
341  __METHOD__,
342  [ 'ORDER BY' => "job_random {$dir}" ]
343  );
344  if ( !$row && !$invertedDirection ) {
345  $gte = !$gte;
346  $invertedDirection = true;
347  continue; // try the other direction
348  }
349  } else { // table *may* have >= MAX_OFFSET rows
350  // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
351  // in MySQL if there are many rows for some reason. This uses a small OFFSET
352  // instead of job_random for reducing excess claim retries.
353  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
354  [
355  'job_cmd' => $this->type,
356  'job_token' => '', // unclaimed
357  ],
358  __METHOD__,
359  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
360  );
361  if ( !$row ) {
362  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
363  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
364  continue; // use job_random
365  }
366  }
367 
368  if ( $row ) { // claim the job
369  $dbw->update( 'job', // update by PK
370  [
371  'job_token' => $uuid,
372  'job_token_timestamp' => $dbw->timestamp(),
373  'job_attempts = job_attempts+1' ],
374  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
375  __METHOD__
376  );
377  // This might get raced out by another runner when claiming the previously
378  // selected row. The use of job_random should minimize this problem, however.
379  if ( !$dbw->affectedRows() ) {
380  $row = false; // raced out
381  }
382  } else {
383  break; // nothing to do
384  }
385  } while ( !$row );
386 
387  return $row;
388  }
389 
396  protected function claimOldest( $uuid ) {
397  $dbw = $this->getMasterDB();
398 
399  $row = false; // the row acquired
400  do {
401  if ( $dbw->getType() === 'mysql' ) {
402  // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
403  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
404  // Oracle and Postgre have no such limitation. However, MySQL offers an
405  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
406  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
407  "SET " .
408  "job_token = {$dbw->addQuotes( $uuid ) }, " .
409  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
410  "job_attempts = job_attempts+1 " .
411  "WHERE ( " .
412  "job_cmd = {$dbw->addQuotes( $this->type )} " .
413  "AND job_token = {$dbw->addQuotes( '' )} " .
414  ") ORDER BY job_id ASC LIMIT 1",
415  __METHOD__
416  );
417  } else {
418  // Use a subquery to find the job, within an UPDATE to claim it.
419  // This uses as much of the DB wrapper functions as possible.
420  $dbw->update( 'job',
421  [
422  'job_token' => $uuid,
423  'job_token_timestamp' => $dbw->timestamp(),
424  'job_attempts = job_attempts+1' ],
425  [ 'job_id = (' .
426  $dbw->selectSQLText( 'job', 'job_id',
427  [ 'job_cmd' => $this->type, 'job_token' => '' ],
428  __METHOD__,
429  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
430  ')'
431  ],
432  __METHOD__
433  );
434  }
435  // Fetch any row that we just reserved...
436  if ( $dbw->affectedRows() ) {
437  $row = $dbw->selectRow( 'job', self::selectFields(),
438  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
439  );
440  if ( !$row ) { // raced out by duplicate job removal
441  wfDebug( "Row deleted as duplicate by another process.\n" );
442  }
443  } else {
444  break; // nothing to do
445  }
446  } while ( !$row );
447 
448  return $row;
449  }
450 
456  protected function doAck( Job $job ) {
457  if ( !isset( $job->metadata['id'] ) ) {
458  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
459  }
460 
461  $dbw = $this->getMasterDB();
462  try {
463  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
464  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
465  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
466  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
467  } );
468 
469  // Delete a row with a single DELETE without holding row locks over RTTs...
470  $dbw->delete( 'job',
471  [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
472 
473  JobQueue::incrStats( 'acks', $this->type );
474  } catch ( DBError $e ) {
475  $this->throwDBException( $e );
476  }
477  }
478 
486  $params = $job->getParams();
487  if ( !isset( $params['rootJobSignature'] ) ) {
488  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
489  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
490  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
491  }
492  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
493  // Callers should call batchInsert() and then this function so that if the insert
494  // fails, the de-duplication registration will be aborted. Since the insert is
495  // deferred till "transaction idle", do the same here, so that the ordering is
496  // maintained. Having only the de-duplication registration succeed would cause
497  // jobs to become no-ops without any actual jobs that made them redundant.
498  $dbw = $this->getMasterDB();
500  $dbw->onTransactionIdle(
501  function () use ( $cache, $params, $key, $dbw ) {
502  $timestamp = $cache->get( $key ); // current last timestamp of this job
503  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
504  return true; // a newer version of this root job was enqueued
505  }
506 
507  // Update the timestamp of the last root job started at the location...
508  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
509  },
510  __METHOD__
511  );
512 
513  return true;
514  }
515 
520  protected function doDelete() {
521  $dbw = $this->getMasterDB();
522  try {
523  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
524  } catch ( DBError $e ) {
525  $this->throwDBException( $e );
526  }
527 
528  return true;
529  }
530 
535  protected function doWaitForBackups() {
536  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
537  $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
538  }
539 
543  protected function doFlushCaches() {
544  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
545  $this->cache->delete( $this->getCacheKey( $type ) );
546  }
547  }
548 
553  public function getAllQueuedJobs() {
554  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
555  }
556 
561  public function getAllAcquiredJobs() {
562  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
563  }
564 
569  protected function getJobIterator( array $conds ) {
570  $dbr = $this->getSlaveDB();
571  try {
572  return new MappedIterator(
573  $dbr->select( 'job', self::selectFields(), $conds ),
574  function ( $row ) {
575  $job = Job::factory(
576  $row->job_cmd,
577  Title::makeTitle( $row->job_namespace, $row->job_title ),
578  strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
579  );
580  $job->metadata['id'] = $row->job_id;
581  $job->metadata['timestamp'] = $row->job_timestamp;
582 
583  return $job;
584  }
585  );
586  } catch ( DBError $e ) {
587  $this->throwDBException( $e );
588  }
589  }
590 
591  public function getCoalesceLocationInternal() {
592  return $this->cluster
593  ? "DBCluster:{$this->cluster}:{$this->wiki}"
594  : "LBFactory:{$this->wiki}";
595  }
596 
597  protected function doGetSiblingQueuesWithJobs( array $types ) {
598  $dbr = $this->getSlaveDB();
599  // @note: this does not check whether the jobs are claimed or not.
600  // This is useful so JobQueueGroup::pop() also sees queues that only
601  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
602  // failed jobs so that they can be popped again for that edge case.
603  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
604  [ 'job_cmd' => $types ], __METHOD__ );
605 
606  $types = [];
607  foreach ( $res as $row ) {
608  $types[] = $row->job_cmd;
609  }
610 
611  return $types;
612  }
613 
614  protected function doGetSiblingQueueSizes( array $types ) {
615  $dbr = $this->getSlaveDB();
616  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
617  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
618 
619  $sizes = [];
620  foreach ( $res as $row ) {
621  $sizes[$row->job_cmd] = (int)$row->count;
622  }
623 
624  return $sizes;
625  }
626 
632  public function recycleAndDeleteStaleJobs() {
633  $now = time();
634  $count = 0; // affected rows
635  $dbw = $this->getMasterDB();
636 
637  try {
638  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
639  return $count; // already in progress
640  }
641 
642  // Remove claims on jobs acquired for too long if enabled...
643  if ( $this->claimTTL > 0 ) {
644  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
645  // Get the IDs of jobs that have be claimed but not finished after too long.
646  // These jobs can be recycled into the queue by expiring the claim. Selecting
647  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
648  $res = $dbw->select( 'job', 'job_id',
649  [
650  'job_cmd' => $this->type,
651  "job_token != {$dbw->addQuotes( '' )}", // was acquired
652  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
653  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
654  __METHOD__
655  );
656  $ids = array_map(
657  function ( $o ) {
658  return $o->job_id;
659  }, iterator_to_array( $res )
660  );
661  if ( count( $ids ) ) {
662  // Reset job_token for these jobs so that other runners will pick them up.
663  // Set the timestamp to the current time, as it is useful to now that the job
664  // was already tried before (the timestamp becomes the "released" time).
665  $dbw->update( 'job',
666  [
667  'job_token' => '',
668  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
669  [
670  'job_id' => $ids ],
671  __METHOD__
672  );
673  $affected = $dbw->affectedRows();
674  $count += $affected;
675  JobQueue::incrStats( 'recycles', $this->type, $affected );
676  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
677  }
678  }
679 
680  // Just destroy any stale jobs...
681  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
682  $conds = [
683  'job_cmd' => $this->type,
684  "job_token != {$dbw->addQuotes( '' )}", // was acquired
685  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
686  ];
687  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
688  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
689  }
690  // Get the IDs of jobs that are considered stale and should be removed. Selecting
691  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
692  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
693  $ids = array_map(
694  function ( $o ) {
695  return $o->job_id;
696  }, iterator_to_array( $res )
697  );
698  if ( count( $ids ) ) {
699  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
700  $affected = $dbw->affectedRows();
701  $count += $affected;
702  JobQueue::incrStats( 'abandons', $this->type, $affected );
703  }
704 
705  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
706  } catch ( DBError $e ) {
707  $this->throwDBException( $e );
708  }
709 
710  return $count;
711  }
712 
717  protected function insertFields( IJobSpecification $job ) {
718  $dbw = $this->getMasterDB();
719 
720  return [
721  // Fields that describe the nature of the job
722  'job_cmd' => $job->getType(),
723  'job_namespace' => $job->getTitle()->getNamespace(),
724  'job_title' => $job->getTitle()->getDBkey(),
725  'job_params' => self::makeBlob( $job->getParams() ),
726  // Additional job metadata
727  'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
728  'job_timestamp' => $dbw->timestamp(),
729  'job_sha1' => Wikimedia\base_convert(
730  sha1( serialize( $job->getDeduplicationInfo() ) ),
731  16, 36, 31
732  ),
733  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
734  ];
735  }
736 
741  protected function getSlaveDB() {
742  try {
743  return $this->getDB( DB_REPLICA );
744  } catch ( DBConnectionError $e ) {
745  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
746  }
747  }
748 
753  protected function getMasterDB() {
754  try {
755  return $this->getDB( DB_MASTER );
756  } catch ( DBConnectionError $e ) {
757  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
758  }
759  }
760 
765  protected function getDB( $index ) {
766  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
767  $lb = ( $this->cluster !== false )
768  ? $lbFactory->getExternalLB( $this->cluster, $this->wiki )
769  : $lbFactory->getMainLB( $this->wiki );
770 
771  return $lb->getConnectionRef( $index, [], $this->wiki );
772  }
773 
778  private function getCacheKey( $property ) {
779  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
780  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
781 
782  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
783  }
784 
789  protected static function makeBlob( $params ) {
790  if ( $params !== false ) {
791  return serialize( $params );
792  } else {
793  return '';
794  }
795  }
796 
801  protected static function extractBlob( $blob ) {
802  if ( (string)$blob !== '' ) {
803  return unserialize( $blob );
804  } else {
805  return false;
806  }
807  }
808 
813  protected function throwDBException( DBError $e ) {
814  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
815  }
816 
822  public static function selectFields() {
823  return [
824  'job_id',
825  'job_cmd',
826  'job_namespace',
827  'job_title',
828  'job_timestamp',
829  'job_params',
830  'job_random',
831  'job_attempts',
832  'job_token',
833  'job_token_timestamp',
834  'job_sha1',
835  ];
836  }
837 }
JobQueueDB\MAX_AGE_PRUNE
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:34
JobQueueDB\doBatchPushInternal
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:206
MappedIterator
Convenience class for generating iterators from iterators.
Definition: MappedIterator.php:29
JobQueueDB\doWaitForBackups
doWaitForBackups()
Definition: JobQueueDB.php:535
JobQueueDB\doFlushCaches
doFlushCaches()
Definition: JobQueueDB.php:543
JobQueueDB\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:597
JobQueueDB\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueDB.php:591
JobQueueDB\getAllAcquiredJobs
getAllAcquiredJobs()
Definition: JobQueueDB.php:561
JobQueue\incrStats
static incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:709
JobQueueDB\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:614
use
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
Definition: APACHE-LICENSE-2.0.txt:3
JobQueueDB\throwDBException
throwDBException(DBError $e)
Definition: JobQueueDB.php:813
array
the array() calling protocol came about after MediaWiki 1.4rc1.
AutoCommitUpdate
Deferrable Update for closure/callback updates that should use auto-commit mode.
Definition: AutoCommitUpdate.php:7
JobQueueDB\doGetSize
doGetSize()
Definition: JobQueueDB.php:88
$timestamp
if( $limit) $timestamp
Definition: importImages.php:119
JobQueueDB\getSlaveDB
getSlaveDB()
Definition: JobQueueDB.php:741
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
DeferredUpdates\addUpdate
static addUpdate(DeferrableUpdate $update, $stage=self::POSTSEND)
Add an update to the deferred list to be run later by execute()
Definition: DeferredUpdates.php:73
$fname
if(!defined( 'MEDIAWIKI')) $fname
This file is not a valid entry point, perform no further processing unless MEDIAWIKI is defined.
Definition: Setup.php:36
unserialize
unserialize( $serialized)
Definition: ApiMessage.php:102
JobQueueDB\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueDB.php:63
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:52
JobQueueDB\getDB
getDB( $index)
Definition: JobQueueDB.php:765
$params
$params
Definition: styleTest.css.php:40
$title
namespace and then decline to actually register it file or subcat img or subcat $title
Definition: hooks.txt:956
JobQueueDB\__construct
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:52
WANObjectCache\set
set( $key, $value, $ttl=0, array $opts=[])
Set the value of a key in cache.
Definition: WANObjectCache.php:428
serialize
serialize()
Definition: ApiMessage.php:94
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:3040
JobQueueDB\getMasterDB
getMasterDB()
Definition: JobQueueDB.php:753
JobQueueDB\insertFields
insertFields(IJobSpecification $job)
Definition: JobQueueDB.php:717
$res
$res
Definition: database.txt:21
JobQueueDB
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:32
$lbFactory
$lbFactory
Definition: doMaintenance.php:117
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2706
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueDB\CACHE_TTL_SHORT
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:33
DBO_TRX
const DBO_TRX
Definition: defines.php:9
php
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
JobQueueDB\getJobIterator
getJobIterator(array $conds)
Definition: JobQueueDB.php:569
$dbr
$dbr
Definition: testCompression.php:50
JobQueueDB\makeBlob
static makeBlob( $params)
Definition: JobQueueDB.php:789
JobQueueDB\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueDB.php:59
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
IJobSpecification\getType
getType()
Job\factory
static factory( $command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:68
MWException
MediaWiki exception.
Definition: MWException.php:26
$property
$property
Definition: styleTest.css.php:44
JobQueueDB\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:485
$blob
$blob
Definition: testCompression.php:63
JobQueue\$type
string $type
Job type.
Definition: JobQueue.php:35
IDatabase\insert
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
$count
$count
Definition: importImages.php:141
JobQueueDB\recycleAndDeleteStaleJobs
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:632
IDatabase\startAtomic
startAtomic( $fname=__METHOD__)
Begin an atomic section of statements.
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:46
JobQueueDB\MAX_OFFSET
const MAX_OFFSET
Definition: JobQueueDB.php:36
JobQueueDB\doBatchPush
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:183
IDatabase\endAtomic
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
Title\makeTitle
static makeTitle( $ns, $title, $fragment='', $interwiki='')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:511
DB_REPLICA
const DB_REPLICA
Definition: defines.php:22
DBConnectionError
Definition: DBConnectionError.php:25
JobQueueDB\doGetAcquiredCount
doGetAcquiredCount()
Definition: JobQueueDB.php:114
JobQueueError
Definition: JobQueue.php:723
DB_MASTER
const DB_MASTER
Definition: defines.php:23
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Make a cache key for a foreign DB.
Definition: GlobalFunctions.php:2991
wfDebug
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
Definition: GlobalFunctions.php:997
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
JobQueueDB\extractBlob
static extractBlob( $blob)
Definition: JobQueueDB.php:801
DBError
Database error base class.
Definition: DBError.php:26
JobQueueDB\doIsEmpty
doIsEmpty()
Definition: JobQueueDB.php:71
WANObjectCache\get
get( $key, &$curTTL=null, array $checkKeys=[], &$asOf=null)
Fetch the value of a key from cache.
Definition: WANObjectCache.php:243
JobQueueDB\MAX_JOB_RANDOM
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:35
JobQueueDB\doDelete
doDelete()
Definition: JobQueueDB.php:520
JobQueueDB\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueueDB.php:145
WANObjectCache
Multi-datacenter aware caching interface.
Definition: WANObjectCache.php:76
JobQueueDB\getAllQueuedJobs
getAllQueuedJobs()
Definition: JobQueueDB.php:553
JobQueueDB\selectFields
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:822
JobQueueConnectionError
Definition: JobQueue.php:726
JobQueueDB\claimOldest
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:396
IDatabase
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:34
JobQueueDB\doAck
doAck(Job $job)
Definition: JobQueueDB.php:456
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:528
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
$dir
if(count( $args)==0) $dir
Definition: importImages.php:56
ObjectCache\getMainWANInstance
static getMainWANInstance()
Get the main WAN cache object.
Definition: ObjectCache.php:370
JobQueueDB\getCacheKey
getCacheKey( $property)
Definition: JobQueueDB.php:778
DeferredUpdates\PRESEND
const PRESEND
Definition: DeferredUpdates.php:57
JobQueueDB\$cache
WANObjectCache $cache
Definition: JobQueueDB.php:39
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:47
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
IDatabase\select
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
MediaWikiServices
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
JobQueueDB\doPop
doPop()
Definition: JobQueueDB.php:265
order
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
JobQueueDB\$cluster
bool string $cluster
Name of an external DB cluster.
Definition: JobQueueDB.php:42
JobQueue\getType
getType()
Definition: JobQueue.php:131
$e
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2105
wfRandomString
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Definition: GlobalFunctions.php:334
JobQueueDB\claimRandom
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:319