MediaWiki  1.23.0
JobQueueDB.php
Go to the documentation of this file.
1 <?php
30 class JobQueueDB extends JobQueue {
31  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
32  const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
33  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
34  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
35  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
36 
38  protected $cache;
39 
41  protected $cluster = false;
42 
51  protected function __construct( array $params ) {
53 
54  parent::__construct( $params );
55 
56  $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
57  // Make sure that we don't use the SQL cache, which would be harmful
58  $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc;
59  }
60 
61  protected function supportedOrders() {
62  return array( 'random', 'timestamp', 'fifo' );
63  }
64 
65  protected function optimalOrder() {
66  return 'random';
67  }
68 
73  protected function doIsEmpty() {
74  $key = $this->getCacheKey( 'empty' );
75 
76  $isEmpty = $this->cache->get( $key );
77  if ( $isEmpty === 'true' ) {
78  return true;
79  } elseif ( $isEmpty === 'false' ) {
80  return false;
81  }
82 
83  $dbr = $this->getSlaveDB();
84  try {
85  $found = $dbr->selectField( // unclaimed job
86  'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
87  );
88  } catch ( DBError $e ) {
89  $this->throwDBException( $e );
90  }
91  $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
92 
93  return !$found;
94  }
95 
100  protected function doGetSize() {
101  $key = $this->getCacheKey( 'size' );
102 
103  $size = $this->cache->get( $key );
104  if ( is_int( $size ) ) {
105  return $size;
106  }
107 
108  try {
109  $dbr = $this->getSlaveDB();
110  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
111  array( 'job_cmd' => $this->type, 'job_token' => '' ),
112  __METHOD__
113  );
114  } catch ( DBError $e ) {
115  $this->throwDBException( $e );
116  }
117  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
118 
119  return $size;
120  }
121 
126  protected function doGetAcquiredCount() {
127  if ( $this->claimTTL <= 0 ) {
128  return 0; // no acknowledgements
129  }
130 
131  $key = $this->getCacheKey( 'acquiredcount' );
132 
133  $count = $this->cache->get( $key );
134  if ( is_int( $count ) ) {
135  return $count;
136  }
137 
138  $dbr = $this->getSlaveDB();
139  try {
140  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
141  array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
142  __METHOD__
143  );
144  } catch ( DBError $e ) {
145  $this->throwDBException( $e );
146  }
147  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
148 
149  return $count;
150  }
151 
157  protected function doGetAbandonedCount() {
158  global $wgMemc;
159 
160  if ( $this->claimTTL <= 0 ) {
161  return 0; // no acknowledgements
162  }
163 
164  $key = $this->getCacheKey( 'abandonedcount' );
165 
166  $count = $wgMemc->get( $key );
167  if ( is_int( $count ) ) {
168  return $count;
169  }
170 
171  $dbr = $this->getSlaveDB();
172  try {
173  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
174  array(
175  'job_cmd' => $this->type,
176  "job_token != {$dbr->addQuotes( '' )}",
177  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
178  ),
179  __METHOD__
180  );
181  } catch ( DBError $e ) {
182  $this->throwDBException( $e );
183  }
184  $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
185 
186  return $count;
187  }
188 
196  protected function doBatchPush( array $jobs, $flags ) {
197  $dbw = $this->getMasterDB();
198 
199  $that = $this;
200  $method = __METHOD__;
201  $dbw->onTransactionIdle(
202  function () use ( $dbw, $that, $jobs, $flags, $method ) {
203  $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
204  }
205  );
206 
207  return true;
208  }
209 
220  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
221  if ( !count( $jobs ) ) {
222  return true;
223  }
224 
225  $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
226  $rowList = array(); // list of jobs for jobs that are are not de-duplicated
227  foreach ( $jobs as $job ) {
228  $row = $this->insertFields( $job );
229  if ( $job->ignoreDuplicates() ) {
230  $rowSet[$row['job_sha1']] = $row;
231  } else {
232  $rowList[] = $row;
233  }
234  }
235 
236  if ( $flags & self::QOS_ATOMIC ) {
237  $dbw->begin( $method ); // wrap all the job additions in one transaction
238  }
239  try {
240  // Strip out any duplicate jobs that are already in the queue...
241  if ( count( $rowSet ) ) {
242  $res = $dbw->select( 'job', 'job_sha1',
243  array(
244  // No job_type condition since it's part of the job_sha1 hash
245  'job_sha1' => array_keys( $rowSet ),
246  'job_token' => '' // unclaimed
247  ),
248  $method
249  );
250  foreach ( $res as $row ) {
251  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
252  unset( $rowSet[$row->job_sha1] ); // already enqueued
253  }
254  }
255  // Build the full list of job rows to insert
256  $rows = array_merge( $rowList, array_values( $rowSet ) );
257  // Insert the job rows in chunks to avoid slave lag...
258  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
259  $dbw->insert( 'job', $rowBatch, $method );
260  }
261  JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
263  'job-insert-duplicate',
264  $this->type,
265  count( $rowSet ) + count( $rowList ) - count( $rows ),
266  $this->wiki
267  );
268  } catch ( DBError $e ) {
269  if ( $flags & self::QOS_ATOMIC ) {
270  $dbw->rollback( $method );
271  }
272  throw $e;
273  }
274  if ( $flags & self::QOS_ATOMIC ) {
275  $dbw->commit( $method );
276  }
277 
278  $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
279 
280  return true;
281  }
282 
287  protected function doPop() {
288  if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
289  return false; // queue is empty
290  }
291 
292  $dbw = $this->getMasterDB();
293  try {
294  $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
295  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
296  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
297  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
298  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
299  } );
300 
301  $uuid = wfRandomString( 32 ); // pop attempt
302  $job = false; // job popped off
303  do { // retry when our row is invalid or deleted as a duplicate
304  // Try to reserve a row in the DB...
305  if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
306  $row = $this->claimOldest( $uuid );
307  } else { // random first
308  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
309  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
310  $row = $this->claimRandom( $uuid, $rand, $gte );
311  }
312  // Check if we found a row to reserve...
313  if ( !$row ) {
314  $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
315  break; // nothing to do
316  }
317  JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
318  // Get the job object from the row...
319  $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
320  if ( !$title ) {
321  $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
322  wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
323  continue; // try again
324  }
325  $job = Job::factory( $row->job_cmd, $title,
326  self::extractBlob( $row->job_params ), $row->job_id );
327  $job->metadata['id'] = $row->job_id;
328  break; // done
329  } while ( true );
330  } catch ( DBError $e ) {
331  $this->throwDBException( $e );
332  }
333 
334  return $job;
335  }
336 
345  protected function claimRandom( $uuid, $rand, $gte ) {
346  $dbw = $this->getMasterDB();
347  // Check cache to see if the queue has <= OFFSET items
348  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
349 
350  $row = false; // the row acquired
351  $invertedDirection = false; // whether one job_random direction was already scanned
352  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
353  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
354  // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
355  // be used here with MySQL.
356  do {
357  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
358  // For small queues, using OFFSET will overshoot and return no rows more often.
359  // Instead, this uses job_random to pick a row (possibly checking both directions).
360  $ineq = $gte ? '>=' : '<=';
361  $dir = $gte ? 'ASC' : 'DESC';
362  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
363  array(
364  'job_cmd' => $this->type,
365  'job_token' => '', // unclaimed
366  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
367  __METHOD__,
368  array( 'ORDER BY' => "job_random {$dir}" )
369  );
370  if ( !$row && !$invertedDirection ) {
371  $gte = !$gte;
372  $invertedDirection = true;
373  continue; // try the other direction
374  }
375  } else { // table *may* have >= MAX_OFFSET rows
376  // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
377  // in MySQL if there are many rows for some reason. This uses a small OFFSET
378  // instead of job_random for reducing excess claim retries.
379  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
380  array(
381  'job_cmd' => $this->type,
382  'job_token' => '', // unclaimed
383  ),
384  __METHOD__,
385  array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) )
386  );
387  if ( !$row ) {
388  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
389  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
390  continue; // use job_random
391  }
392  }
393 
394  if ( $row ) { // claim the job
395  $dbw->update( 'job', // update by PK
396  array(
397  'job_token' => $uuid,
398  'job_token_timestamp' => $dbw->timestamp(),
399  'job_attempts = job_attempts+1' ),
400  array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
401  __METHOD__
402  );
403  // This might get raced out by another runner when claiming the previously
404  // selected row. The use of job_random should minimize this problem, however.
405  if ( !$dbw->affectedRows() ) {
406  $row = false; // raced out
407  }
408  } else {
409  break; // nothing to do
410  }
411  } while ( !$row );
412 
413  return $row;
414  }
415 
422  protected function claimOldest( $uuid ) {
423  $dbw = $this->getMasterDB();
424 
425  $row = false; // the row acquired
426  do {
427  if ( $dbw->getType() === 'mysql' ) {
428  // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
429  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
430  // Oracle and Postgre have no such limitation. However, MySQL offers an
431  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
432  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
433  "SET " .
434  "job_token = {$dbw->addQuotes( $uuid ) }, " .
435  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
436  "job_attempts = job_attempts+1 " .
437  "WHERE ( " .
438  "job_cmd = {$dbw->addQuotes( $this->type )} " .
439  "AND job_token = {$dbw->addQuotes( '' )} " .
440  ") ORDER BY job_id ASC LIMIT 1",
441  __METHOD__
442  );
443  } else {
444  // Use a subquery to find the job, within an UPDATE to claim it.
445  // This uses as much of the DB wrapper functions as possible.
446  $dbw->update( 'job',
447  array(
448  'job_token' => $uuid,
449  'job_token_timestamp' => $dbw->timestamp(),
450  'job_attempts = job_attempts+1' ),
451  array( 'job_id = (' .
452  $dbw->selectSQLText( 'job', 'job_id',
453  array( 'job_cmd' => $this->type, 'job_token' => '' ),
454  __METHOD__,
455  array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
456  ')'
457  ),
458  __METHOD__
459  );
460  }
461  // Fetch any row that we just reserved...
462  if ( $dbw->affectedRows() ) {
463  $row = $dbw->selectRow( 'job', self::selectFields(),
464  array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
465  );
466  if ( !$row ) { // raced out by duplicate job removal
467  wfDebug( "Row deleted as duplicate by another process.\n" );
468  }
469  } else {
470  break; // nothing to do
471  }
472  } while ( !$row );
473 
474  return $row;
475  }
476 
483  protected function doAck( Job $job ) {
484  if ( !isset( $job->metadata['id'] ) ) {
485  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
486  }
487 
488  $dbw = $this->getMasterDB();
489  try {
490  $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
491  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
492  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
493  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
494  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
495  } );
496 
497  // Delete a row with a single DELETE without holding row locks over RTTs...
498  $dbw->delete( 'job',
499  array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
500  } catch ( DBError $e ) {
501  $this->throwDBException( $e );
502  }
503 
504  return true;
505  }
506 
513  protected function doDeduplicateRootJob( Job $job ) {
514  $params = $job->getParams();
515  if ( !isset( $params['rootJobSignature'] ) ) {
516  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
517  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
518  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
519  }
520  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
521  // Callers should call batchInsert() and then this function so that if the insert
522  // fails, the de-duplication registration will be aborted. Since the insert is
523  // deferred till "transaction idle", do the same here, so that the ordering is
524  // maintained. Having only the de-duplication registration succeed would cause
525  // jobs to become no-ops without any actual jobs that made them redundant.
526  $dbw = $this->getMasterDB();
528  $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
529  $timestamp = $cache->get( $key ); // current last timestamp of this job
530  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
531  return true; // a newer version of this root job was enqueued
532  }
533 
534  // Update the timestamp of the last root job started at the location...
535  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
536  } );
537 
538  return true;
539  }
540 
545  protected function doDelete() {
546  $dbw = $this->getMasterDB();
547  try {
548  $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
549  } catch ( DBError $e ) {
550  $this->throwDBException( $e );
551  }
552 
553  return true;
554  }
555 
560  protected function doWaitForBackups() {
561  wfWaitForSlaves();
562  }
563 
567  protected function doGetPeriodicTasks() {
568  return array(
569  'recycleAndDeleteStaleJobs' => array(
570  'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
571  'period' => ceil( $this->claimTTL / 2 )
572  )
573  );
574  }
575 
579  protected function doFlushCaches() {
580  foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
581  $this->cache->delete( $this->getCacheKey( $type ) );
582  }
583  }
584 
589  public function getAllQueuedJobs() {
590  $dbr = $this->getSlaveDB();
591  try {
592  return new MappedIterator(
593  $dbr->select( 'job', self::selectFields(),
594  array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
595  function ( $row ) use ( $dbr ) {
596  $job = Job::factory(
597  $row->job_cmd,
598  Title::makeTitle( $row->job_namespace, $row->job_title ),
599  strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
600  );
601  $job->metadata['id'] = $row->job_id;
602  return $job;
603  }
604  );
605  } catch ( DBError $e ) {
606  $this->throwDBException( $e );
607  }
608  }
609 
610  public function getCoalesceLocationInternal() {
611  return $this->cluster
612  ? "DBCluster:{$this->cluster}:{$this->wiki}"
613  : "LBFactory:{$this->wiki}";
614  }
615 
616  protected function doGetSiblingQueuesWithJobs( array $types ) {
617  $dbr = $this->getSlaveDB();
618  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
619  array( 'job_cmd' => $types ), __METHOD__ );
620 
621  $types = array();
622  foreach ( $res as $row ) {
623  $types[] = $row->job_cmd;
624  }
625 
626  return $types;
627  }
628 
629  protected function doGetSiblingQueueSizes( array $types ) {
630  $dbr = $this->getSlaveDB();
631  $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
632  array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
633 
634  $sizes = array();
635  foreach ( $res as $row ) {
636  $sizes[$row->job_cmd] = (int)$row->count;
637  }
638 
639  return $sizes;
640  }
641 
647  public function recycleAndDeleteStaleJobs() {
648  $now = time();
649  $count = 0; // affected rows
650  $dbw = $this->getMasterDB();
651 
652  try {
653  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
654  return $count; // already in progress
655  }
656 
657  // Remove claims on jobs acquired for too long if enabled...
658  if ( $this->claimTTL > 0 ) {
659  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
660  // Get the IDs of jobs that have be claimed but not finished after too long.
661  // These jobs can be recycled into the queue by expiring the claim. Selecting
662  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
663  $res = $dbw->select( 'job', 'job_id',
664  array(
665  'job_cmd' => $this->type,
666  "job_token != {$dbw->addQuotes( '' )}", // was acquired
667  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
668  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
669  __METHOD__
670  );
671  $ids = array_map(
672  function ( $o ) {
673  return $o->job_id;
674  }, iterator_to_array( $res )
675  );
676  if ( count( $ids ) ) {
677  // Reset job_token for these jobs so that other runners will pick them up.
678  // Set the timestamp to the current time, as it is useful to now that the job
679  // was already tried before (the timestamp becomes the "released" time).
680  $dbw->update( 'job',
681  array(
682  'job_token' => '',
683  'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
684  array(
685  'job_id' => $ids ),
686  __METHOD__
687  );
688  $affected = $dbw->affectedRows();
689  $count += $affected;
690  JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
691  $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
692  }
693  }
694 
695  // Just destroy any stale jobs...
696  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
697  $conds = array(
698  'job_cmd' => $this->type,
699  "job_token != {$dbw->addQuotes( '' )}", // was acquired
700  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
701  );
702  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
703  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
704  }
705  // Get the IDs of jobs that are considered stale and should be removed. Selecting
706  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
707  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
708  $ids = array_map(
709  function ( $o ) {
710  return $o->job_id;
711  }, iterator_to_array( $res )
712  );
713  if ( count( $ids ) ) {
714  $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
715  $affected = $dbw->affectedRows();
716  $count += $affected;
717  JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
718  }
719 
720  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
721  } catch ( DBError $e ) {
722  $this->throwDBException( $e );
723  }
724 
725  return $count;
726  }
727 
732  protected function insertFields( IJobSpecification $job ) {
733  $dbw = $this->getMasterDB();
734 
735  return array(
736  // Fields that describe the nature of the job
737  'job_cmd' => $job->getType(),
738  'job_namespace' => $job->getTitle()->getNamespace(),
739  'job_title' => $job->getTitle()->getDBkey(),
740  'job_params' => self::makeBlob( $job->getParams() ),
741  // Additional job metadata
742  'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
743  'job_timestamp' => $dbw->timestamp(),
744  'job_sha1' => wfBaseConvert(
745  sha1( serialize( $job->getDeduplicationInfo() ) ),
746  16, 36, 31
747  ),
748  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
749  );
750  }
751 
756  protected function getSlaveDB() {
757  try {
758  return $this->getDB( DB_SLAVE );
759  } catch ( DBConnectionError $e ) {
760  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
761  }
762  }
763 
768  protected function getMasterDB() {
769  try {
770  return $this->getDB( DB_MASTER );
771  } catch ( DBConnectionError $e ) {
772  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
773  }
774  }
775 
780  protected function getDB( $index ) {
781  $lb = ( $this->cluster !== false )
782  ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
783  : wfGetLB( $this->wiki );
784 
785  return $lb->getConnectionRef( $index, array(), $this->wiki );
786  }
787 
792  private function getCacheKey( $property ) {
793  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
794  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
795 
796  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
797  }
798 
803  protected static function makeBlob( $params ) {
804  if ( $params !== false ) {
805  return serialize( $params );
806  } else {
807  return '';
808  }
809  }
810 
815  protected static function extractBlob( $blob ) {
816  if ( (string)$blob !== '' ) {
817  return unserialize( $blob );
818  } else {
819  return false;
820  }
821  }
822 
827  protected function throwDBException( DBError $e ) {
828  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
829  }
830 
836  public static function selectFields() {
837  return array(
838  'job_id',
839  'job_cmd',
840  'job_namespace',
841  'job_title',
842  'job_timestamp',
843  'job_params',
844  'job_random',
845  'job_attempts',
846  'job_token',
847  'job_token_timestamp',
848  'job_sha1',
849  );
850  }
851 }
JobQueueDB\MAX_AGE_PRUNE
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:33
JobQueueDB\doBatchPushInternal
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:218
MappedIterator
Convenience class for generating iterators from iterators.
Definition: MappedIterator.php:29
Title\makeTitle
static & makeTitle( $ns, $title, $fragment='', $interwiki='')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:398
JobQueueDB\doWaitForBackups
doWaitForBackups()
Definition: JobQueueDB.php:558
JobQueueDB\doFlushCaches
doFlushCaches()
Definition: JobQueueDB.php:577
JobQueueDB\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:614
DB_MASTER
const DB_MASTER
Definition: Defines.php:56
JobQueueDB\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueDB.php:608
JobQueueDB\CACHE_TTL_LONG
const CACHE_TTL_LONG
Definition: JobQueueDB.php:32
JobQueueDB\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:627
php
skin txt MediaWiki includes four core it has been set as the default in MediaWiki since the replacing Monobook it had been been the default skin since before being replaced by Vector largely rewritten in while keeping its appearance Several legacy skins were removed in the as the burden of supporting them became too heavy to bear Those in etc for skin dependent CSS etc for skin dependent JavaScript These can also be customised on a per user by etc This feature has led to a wide variety of user styles becoming that gallery is a good place to ending in php
Definition: skin.txt:62
JobQueue\incrStats
static incrStats( $key, $type, $delta=1, $wiki=null)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:714
JobQueueDB\throwDBException
throwDBException(DBError $e)
Definition: JobQueueDB.php:825
EmptyBagOStuff
A BagOStuff object with no objects in it.
Definition: EmptyBagOStuff.php:29
$wgMemc
globals will be eliminated from MediaWiki replaced by an application object which would be passed to constructors Whether that would be an convenient solution remains to be but certainly PHP makes such object oriented programming models easier than they were in previous versions For the time being MediaWiki programmers will have to work in an environment with some global context At the time of globals were initialised on startup by MediaWiki of these were configuration which are documented in DefaultSettings php There is no comprehensive documentation for the remaining however some of the most important ones are listed below They are typically initialised either in index php or in Setup php For a description of the see design txt $wgTitle Title object created from the request URL $wgOut OutputPage object for HTTP response $wgUser User object for the user associated with the current request $wgLang Language object selected by user preferences $wgContLang Language object associated with the wiki being viewed $wgParser Parser object Parser extensions register their hooks here $wgRequest WebRequest to get request data $wgMemc
Definition: globals.txt:25
wfGetLB
wfGetLB( $wiki=false)
Get a load balancer object.
Definition: GlobalFunctions.php:3660
JobQueueDB\doGetSize
doGetSize()
Definition: JobQueueDB.php:98
$timestamp
if( $limit) $timestamp
Definition: importImages.php:104
JobQueueDB\getSlaveDB
getSlaveDB()
Definition: JobQueueDB.php:754
wiki
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
JobQueueDB\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueDB.php:63
JobQueue\ROOTJOB_TTL
const ROOTJOB_TTL
Definition: JobQueue.php:48
JobQueueDB\getDB
getDB( $index)
Definition: JobQueueDB.php:778
$params
$params
Definition: styleTest.css.php:40
JobQueueDB\__construct
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:49
BagOStuff
interface is intended to be more or less compatible with the PHP memcached client.
Definition: BagOStuff.php:43
wfSplitWikiID
wfSplitWikiID( $wiki)
Split a wiki ID into DB name and table prefix.
Definition: GlobalFunctions.php:3620
JobQueueDB\getMasterDB
getMasterDB()
Definition: JobQueueDB.php:766
JobQueueDB\insertFields
insertFields(IJobSpecification $job)
Definition: JobQueueDB.php:730
JobQueueDB
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:30
$flags
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2113
cache
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
JobQueueDB\CACHE_TTL_SHORT
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:31
$dbr
$dbr
Definition: testCompression.php:48
JobQueueDB\makeBlob
static makeBlob( $params)
Definition: JobQueueDB.php:801
BagOStuff\set
set( $key, $value, $exptime=0)
Set an item.
JobQueueDB\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueDB.php:59
Job
Class to both describe a background job and handle jobs.
Definition: Job.php:31
BagOStuff\get
get( $key, &$casToken=null)
Get an item with the given key.
IJobSpecification\getType
getType()
$lb
if( $wgAPIRequestLog) $lb
Definition: api.php:126
MWException
MediaWiki exception.
Definition: MWException.php:26
$property
$property
Definition: styleTest.css.php:44
$blob
$blob
Definition: testCompression.php:61
JobQueue\$type
string $type
Job type *.
Definition: JobQueue.php:34
JobQueueDB\recycleAndDeleteStaleJobs
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:645
JobQueue\$dupCache
BagOStuff $dupCache
Definition: JobQueue.php:44
JobQueueDB\MAX_OFFSET
const MAX_OFFSET
Definition: JobQueueDB.php:35
JobQueueDB\doBatchPush
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:194
array
the array() calling protocol came about after MediaWiki 1.4rc1.
List of Api Query prop modules.
JobQueueDB\doDeduplicateRootJob
doDeduplicateRootJob(Job $job)
Definition: JobQueueDB.php:511
global
when a variable name is used in a it is silently declared as a new masking the global
Definition: design.txt:93
DBConnectionError
Definition: DatabaseError.php:98
JobQueueDB\doGetAcquiredCount
doGetAcquiredCount()
Definition: JobQueueDB.php:124
JobQueueError
Definition: JobQueue.php:738
wfWaitForSlaves
wfWaitForSlaves( $maxLag=false, $wiki=false, $cluster=false)
Modern version of wfWaitForSlaves().
Definition: GlobalFunctions.php:3795
wfForeignMemcKey
wfForeignMemcKey( $db, $prefix)
Get a cache key for a foreign DB.
Definition: GlobalFunctions.php:3588
list
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
false
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:188
JobQueueDB\extractBlob
static extractBlob( $blob)
Definition: JobQueueDB.php:813
DBError
Database error base class.
Definition: DatabaseError.php:28
JobQueueDB\doIsEmpty
doIsEmpty()
Definition: JobQueueDB.php:71
ScopedCallback
Class for asserting that a callback happens when an dummy object leaves scope.
Definition: ScopedCallback.php:28
wfDebug
wfDebug( $text, $dest='all')
Sends a line to the debug log if enabled or, optionally, to a comment in output.
Definition: GlobalFunctions.php:933
Title\makeTitleSafe
static makeTitleSafe( $ns, $title, $fragment='', $interwiki='')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:422
$title
presenting them properly to the user as errors is done by the caller $title
Definition: hooks.txt:1324
$size
$size
Definition: RandomTest.php:75
JobQueueDB\MAX_JOB_RANDOM
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:34
JobQueueDB\doDelete
doDelete()
Definition: JobQueueDB.php:543
JobQueueDB\doGetAbandonedCount
doGetAbandonedCount()
Definition: JobQueueDB.php:155
JobQueueDB\getAllQueuedJobs
getAllQueuedJobs()
Definition: JobQueueDB.php:587
Job\factory
static factory( $command, Title $title, $params=false)
Create the appropriate object to handle a specific job.
Definition: Job.php:67
JobQueueDB\doGetPeriodicTasks
doGetPeriodicTasks()
Definition: JobQueueDB.php:565
JobQueueDB\selectFields
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:834
JobQueueConnectionError
Definition: JobQueue.php:741
JobQueueDB\claimOldest
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:420
IDatabase
Interface for classes that implement or wrap DatabaseBase.
Definition: Database.php:212
JobQueueDB\doAck
doAck(Job $job)
Definition: JobQueueDB.php:481
SqlBagOStuff
Class to store objects in the database.
Definition: SqlBagOStuff.php:29
$count
$count
Definition: UtfNormalTest2.php:96
JobQueue\getRootJobCacheKey
getRootJobCacheKey( $signature)
Definition: JobQueue.php:526
DB_SLAVE
const DB_SLAVE
Definition: Defines.php:55
type
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
wfGetLBFactory
& wfGetLBFactory()
Get the load balancer factory object.
Definition: GlobalFunctions.php:3669
$dir
if(count( $args)==0) $dir
Definition: importImages.php:49
JobQueueDB\getCacheKey
getCacheKey( $property)
Definition: JobQueueDB.php:790
wfBaseConvert
wfBaseConvert( $input, $sourceBase, $destBase, $pad=1, $lowercase=true, $engine='auto')
Convert an arbitrarily-long digit string from one numeric base to another, optionally zero-padding to...
Definition: GlobalFunctions.php:3368
$job
if(count( $args)< 1) $job
Definition: recompressTracked.php:42
as
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
JobQueue
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
$e
if( $useReadline) $e
Definition: eval.php:66
JobQueueDB\doPop
doPop()
Definition: JobQueueDB.php:285
order
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
$res
$res
Definition: database.txt:21
DBO_TRX
const DBO_TRX
Definition: Defines.php:42
IJobSpecification
Job queue task description interface.
Definition: JobSpecification.php:30
JobQueueDB\$cluster
bool string $cluster
Name of an external DB cluster.
Definition: JobQueueDB.php:39
JobQueue\getType
getType()
Definition: JobQueue.php:128
JobQueueDB\$cache
BagOStuff $cache
Definition: JobQueueDB.php:37
wfRandomString
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Definition: GlobalFunctions.php:300
JobQueueDB\claimRandom
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:343