MediaWiki  master
JobQueueDB.php
Go to the documentation of this file.
1 <?php
29 
36 class JobQueueDB extends JobQueue {
37  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
38  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
39  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
40  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
41 
43  protected $conn;
44 
46  protected $server;
48  protected $cluster;
49 
59  protected function __construct( array $params ) {
60  parent::__construct( $params );
61 
62  if ( isset( $params['server'] ) ) {
63  $this->server = $params['server'];
64  } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
65  $this->cluster = $params['cluster'];
66  }
67  }
68 
69  protected function supportedOrders() {
70  return [ 'random', 'timestamp', 'fifo' ];
71  }
72 
73  protected function optimalOrder() {
74  return 'random';
75  }
76 
81  protected function doIsEmpty() {
82  $dbr = $this->getReplicaDB();
84  $scope = $this->getScopedNoTrxFlag( $dbr );
85  try {
86  $found = $dbr->selectField( // unclaimed job
87  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
88  );
89  } catch ( DBError $e ) {
90  throw $this->getDBException( $e );
91  }
92 
93  return !$found;
94  }
95 
100  protected function doGetSize() {
101  $key = $this->getCacheKey( 'size' );
102 
103  $size = $this->wanCache->get( $key );
104  if ( is_int( $size ) ) {
105  return $size;
106  }
107 
108  $dbr = $this->getReplicaDB();
110  $scope = $this->getScopedNoTrxFlag( $dbr );
111  try {
112  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
113  [ 'job_cmd' => $this->type, 'job_token' => '' ],
114  __METHOD__
115  );
116  } catch ( DBError $e ) {
117  throw $this->getDBException( $e );
118  }
119  $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
120 
121  return $size;
122  }
123 
128  protected function doGetAcquiredCount() {
129  if ( $this->claimTTL <= 0 ) {
130  return 0; // no acknowledgements
131  }
132 
133  $key = $this->getCacheKey( 'acquiredcount' );
134 
135  $count = $this->wanCache->get( $key );
136  if ( is_int( $count ) ) {
137  return $count;
138  }
139 
140  $dbr = $this->getReplicaDB();
142  $scope = $this->getScopedNoTrxFlag( $dbr );
143  try {
144  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
145  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
146  __METHOD__
147  );
148  } catch ( DBError $e ) {
149  throw $this->getDBException( $e );
150  }
151  $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
152 
153  return $count;
154  }
155 
161  protected function doGetAbandonedCount() {
162  if ( $this->claimTTL <= 0 ) {
163  return 0; // no acknowledgements
164  }
165 
166  $key = $this->getCacheKey( 'abandonedcount' );
167 
168  $count = $this->wanCache->get( $key );
169  if ( is_int( $count ) ) {
170  return $count;
171  }
172 
173  $dbr = $this->getReplicaDB();
175  $scope = $this->getScopedNoTrxFlag( $dbr );
176  try {
177  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
178  [
179  'job_cmd' => $this->type,
180  "job_token != {$dbr->addQuotes( '' )}",
181  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
182  ],
183  __METHOD__
184  );
185  } catch ( DBError $e ) {
186  throw $this->getDBException( $e );
187  }
188 
189  $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
190 
191  return $count;
192  }
193 
201  protected function doBatchPush( array $jobs, $flags ) {
202  $dbw = $this->getMasterDB();
204  $scope = $this->getScopedNoTrxFlag( $dbw );
205  // In general, there will be two cases here:
206  // a) sqlite; DB connection is probably a regular round-aware handle.
207  // If the connection is busy with a transaction, then defer the job writes
208  // until right before the main round commit step. Any errors that bubble
209  // up will rollback the main commit round.
210  // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
211  // No transaction is active nor will be started by writes, so enqueue the jobs
212  // now so that any errors will show up immediately as the interface expects. Any
213  // errors that bubble up will rollback the main commit round.
214  $fname = __METHOD__;
215  $dbw->onTransactionPreCommitOrIdle(
216  function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
217  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
218  },
219  $fname
220  );
221  }
222 
234  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
235  if ( $jobs === [] ) {
236  return;
237  }
238 
239  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
240  $rowList = []; // list of jobs for jobs that are not de-duplicated
241  foreach ( $jobs as $job ) {
242  $row = $this->insertFields( $job, $dbw );
243  if ( $job->ignoreDuplicates() ) {
244  $rowSet[$row['job_sha1']] = $row;
245  } else {
246  $rowList[] = $row;
247  }
248  }
249 
250  if ( $flags & self::QOS_ATOMIC ) {
251  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
252  }
253  try {
254  // Strip out any duplicate jobs that are already in the queue...
255  if ( count( $rowSet ) ) {
256  $res = $dbw->select( 'job', 'job_sha1',
257  [
258  // No job_type condition since it's part of the job_sha1 hash
259  'job_sha1' => array_keys( $rowSet ),
260  'job_token' => '' // unclaimed
261  ],
262  $method
263  );
264  foreach ( $res as $row ) {
265  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
266  unset( $rowSet[$row->job_sha1] ); // already enqueued
267  }
268  }
269  // Build the full list of job rows to insert
270  $rows = array_merge( $rowList, array_values( $rowSet ) );
271  // Insert the job rows in chunks to avoid replica DB lag...
272  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
273  $dbw->insert( 'job', $rowBatch, $method );
274  }
275  $this->incrStats( 'inserts', $this->type, count( $rows ) );
276  $this->incrStats( 'dupe_inserts', $this->type,
277  count( $rowSet ) + count( $rowList ) - count( $rows )
278  );
279  } catch ( DBError $e ) {
280  throw $this->getDBException( $e );
281  }
282  if ( $flags & self::QOS_ATOMIC ) {
283  $dbw->endAtomic( $method );
284  }
285  }
286 
291  protected function doPop() {
292  $dbw = $this->getMasterDB();
294  $scope = $this->getScopedNoTrxFlag( $dbw );
295 
296  $job = false; // job popped off
297  try {
298  $uuid = wfRandomString( 32 ); // pop attempt
299  do { // retry when our row is invalid or deleted as a duplicate
300  // Try to reserve a row in the DB...
301  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
302  $row = $this->claimOldest( $uuid );
303  } else { // random first
304  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
305  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
306  $row = $this->claimRandom( $uuid, $rand, $gte );
307  }
308  // Check if we found a row to reserve...
309  if ( !$row ) {
310  break; // nothing to do
311  }
312  $this->incrStats( 'pops', $this->type );
313 
314  // Get the job object from the row...
315  $job = $this->jobFromRow( $row );
316  break; // done
317  } while ( true );
318 
319  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
320  // Handled jobs that need to be recycled/deleted;
321  // any recycled jobs will be picked up next attempt
322  $this->recycleAndDeleteStaleJobs();
323  }
324  } catch ( DBError $e ) {
325  throw $this->getDBException( $e );
326  }
327 
328  return $job;
329  }
330 
339  protected function claimRandom( $uuid, $rand, $gte ) {
340  $dbw = $this->getMasterDB();
342  $scope = $this->getScopedNoTrxFlag( $dbw );
343  // Check cache to see if the queue has <= OFFSET items
344  $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
345 
346  $invertedDirection = false; // whether one job_random direction was already scanned
347  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
348  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
349  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
350  // be used here with MySQL.
351  do {
352  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
353  // For small queues, using OFFSET will overshoot and return no rows more often.
354  // Instead, this uses job_random to pick a row (possibly checking both directions).
355  $ineq = $gte ? '>=' : '<=';
356  $dir = $gte ? 'ASC' : 'DESC';
357  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
358  [
359  'job_cmd' => $this->type,
360  'job_token' => '', // unclaimed
361  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
362  __METHOD__,
363  [ 'ORDER BY' => "job_random {$dir}" ]
364  );
365  if ( !$row && !$invertedDirection ) {
366  $gte = !$gte;
367  $invertedDirection = true;
368  continue; // try the other direction
369  }
370  } else { // table *may* have >= MAX_OFFSET rows
371  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
372  // in MySQL if there are many rows for some reason. This uses a small OFFSET
373  // instead of job_random for reducing excess claim retries.
374  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
375  [
376  'job_cmd' => $this->type,
377  'job_token' => '', // unclaimed
378  ],
379  __METHOD__,
380  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
381  );
382  if ( !$row ) {
383  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
384  $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
385  continue; // use job_random
386  }
387  }
388 
389  if ( $row ) { // claim the job
390  $dbw->update( 'job', // update by PK
391  [
392  'job_token' => $uuid,
393  'job_token_timestamp' => $dbw->timestamp(),
394  'job_attempts = job_attempts+1' ],
395  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
396  __METHOD__
397  );
398  // This might get raced out by another runner when claiming the previously
399  // selected row. The use of job_random should minimize this problem, however.
400  if ( !$dbw->affectedRows() ) {
401  $row = false; // raced out
402  }
403  } else {
404  break; // nothing to do
405  }
406  } while ( !$row );
407 
408  return $row;
409  }
410 
417  protected function claimOldest( $uuid ) {
418  $dbw = $this->getMasterDB();
420  $scope = $this->getScopedNoTrxFlag( $dbw );
421 
422  $row = false; // the row acquired
423  do {
424  if ( $dbw->getType() === 'mysql' ) {
425  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
426  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
427  // Oracle and Postgre have no such limitation. However, MySQL offers an
428  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
429  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
430  "SET " .
431  "job_token = {$dbw->addQuotes( $uuid ) }, " .
432  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
433  "job_attempts = job_attempts+1 " .
434  "WHERE ( " .
435  "job_cmd = {$dbw->addQuotes( $this->type )} " .
436  "AND job_token = {$dbw->addQuotes( '' )} " .
437  ") ORDER BY job_id ASC LIMIT 1",
438  __METHOD__
439  );
440  } else {
441  // Use a subquery to find the job, within an UPDATE to claim it.
442  // This uses as much of the DB wrapper functions as possible.
443  $dbw->update( 'job',
444  [
445  'job_token' => $uuid,
446  'job_token_timestamp' => $dbw->timestamp(),
447  'job_attempts = job_attempts+1' ],
448  [ 'job_id = (' .
449  $dbw->selectSQLText( 'job', 'job_id',
450  [ 'job_cmd' => $this->type, 'job_token' => '' ],
451  __METHOD__,
452  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
453  ')'
454  ],
455  __METHOD__
456  );
457  }
458  // Fetch any row that we just reserved...
459  if ( $dbw->affectedRows() ) {
460  $row = $dbw->selectRow( 'job', self::selectFields(),
461  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
462  );
463  if ( !$row ) { // raced out by duplicate job removal
464  wfDebug( "Row deleted as duplicate by another process.\n" );
465  }
466  } else {
467  break; // nothing to do
468  }
469  } while ( !$row );
470 
471  return $row;
472  }
473 
479  protected function doAck( RunnableJob $job ) {
480  $id = $job->getMetadata( 'id' );
481  if ( $id === null ) {
482  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
483  }
484 
485  $dbw = $this->getMasterDB();
487  $scope = $this->getScopedNoTrxFlag( $dbw );
488  try {
489  // Delete a row with a single DELETE without holding row locks over RTTs...
490  $dbw->delete(
491  'job',
492  [ 'job_cmd' => $this->type, 'job_id' => $id ],
493  __METHOD__
494  );
495 
496  $this->incrStats( 'acks', $this->type );
497  } catch ( DBError $e ) {
498  throw $this->getDBException( $e );
499  }
500  }
501 
509  // Callers should call JobQueueGroup::push() before this method so that if the
510  // insert fails, the de-duplication registration will be aborted. Since the insert
511  // is deferred till "transaction idle", do the same here, so that the ordering is
512  // maintained. Having only the de-duplication registration succeed would cause
513  // jobs to become no-ops without any actual jobs that made them redundant.
514  $dbw = $this->getMasterDB();
516  $scope = $this->getScopedNoTrxFlag( $dbw );
517  $dbw->onTransactionCommitOrIdle(
518  function () use ( $job ) {
519  parent::doDeduplicateRootJob( $job );
520  },
521  __METHOD__
522  );
523 
524  return true;
525  }
526 
531  protected function doDelete() {
532  $dbw = $this->getMasterDB();
534  $scope = $this->getScopedNoTrxFlag( $dbw );
535  try {
536  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
537  } catch ( DBError $e ) {
538  throw $this->getDBException( $e );
539  }
540 
541  return true;
542  }
543 
548  protected function doWaitForBackups() {
549  if ( $this->server ) {
550  return; // not using LBFactory instance
551  }
552 
553  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
554  $lbFactory->waitForReplication( [
555  'domain' => $this->domain,
556  'cluster' => is_string( $this->cluster ) ? $this->cluster : false
557  ] );
558  }
559 
563  protected function doFlushCaches() {
564  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
565  $this->wanCache->delete( $this->getCacheKey( $type ) );
566  }
567  }
568 
573  public function getAllQueuedJobs() {
574  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
575  }
576 
581  public function getAllAcquiredJobs() {
582  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
583  }
584 
589  protected function getJobIterator( array $conds ) {
590  $dbr = $this->getReplicaDB();
592  $scope = $this->getScopedNoTrxFlag( $dbr );
593  try {
594  return new MappedIterator(
595  $dbr->select( 'job', self::selectFields(), $conds ),
596  function ( $row ) {
597  return $this->jobFromRow( $row );
598  }
599  );
600  } catch ( DBError $e ) {
601  throw $this->getDBException( $e );
602  }
603  }
604 
605  public function getCoalesceLocationInternal() {
606  if ( $this->server ) {
607  return null; // not using the LBFactory instance
608  }
609 
610  return is_string( $this->cluster )
611  ? "DBCluster:{$this->cluster}:{$this->domain}"
612  : "LBFactory:{$this->domain}";
613  }
614 
615  protected function doGetSiblingQueuesWithJobs( array $types ) {
616  $dbr = $this->getReplicaDB();
618  $scope = $this->getScopedNoTrxFlag( $dbr );
619  // @note: this does not check whether the jobs are claimed or not.
620  // This is useful so JobQueueGroup::pop() also sees queues that only
621  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
622  // failed jobs so that they can be popped again for that edge case.
623  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
624  [ 'job_cmd' => $types ], __METHOD__ );
625 
626  $types = [];
627  foreach ( $res as $row ) {
628  $types[] = $row->job_cmd;
629  }
630 
631  return $types;
632  }
633 
634  protected function doGetSiblingQueueSizes( array $types ) {
635  $dbr = $this->getReplicaDB();
637  $scope = $this->getScopedNoTrxFlag( $dbr );
638 
639  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
640  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
641 
642  $sizes = [];
643  foreach ( $res as $row ) {
644  $sizes[$row->job_cmd] = (int)$row->count;
645  }
646 
647  return $sizes;
648  }
649 
655  public function recycleAndDeleteStaleJobs() {
656  $now = time();
657  $count = 0; // affected rows
658  $dbw = $this->getMasterDB();
660  $scope = $this->getScopedNoTrxFlag( $dbw );
661 
662  try {
663  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
664  return $count; // already in progress
665  }
666 
667  // Remove claims on jobs acquired for too long if enabled...
668  if ( $this->claimTTL > 0 ) {
669  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
670  // Get the IDs of jobs that have be claimed but not finished after too long.
671  // These jobs can be recycled into the queue by expiring the claim. Selecting
672  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
673  $res = $dbw->select( 'job', 'job_id',
674  [
675  'job_cmd' => $this->type,
676  "job_token != {$dbw->addQuotes( '' )}", // was acquired
677  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
678  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
679  __METHOD__
680  );
681  $ids = array_map(
682  function ( $o ) {
683  return $o->job_id;
684  }, iterator_to_array( $res )
685  );
686  if ( count( $ids ) ) {
687  // Reset job_token for these jobs so that other runners will pick them up.
688  // Set the timestamp to the current time, as it is useful to now that the job
689  // was already tried before (the timestamp becomes the "released" time).
690  $dbw->update( 'job',
691  [
692  'job_token' => '',
693  'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
694  ],
695  [ 'job_id' => $ids, "job_token != ''" ],
696  __METHOD__
697  );
698  $affected = $dbw->affectedRows();
699  $count += $affected;
700  $this->incrStats( 'recycles', $this->type, $affected );
701  }
702  }
703 
704  // Just destroy any stale jobs...
705  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
706  $conds = [
707  'job_cmd' => $this->type,
708  "job_token != {$dbw->addQuotes( '' )}", // was acquired
709  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
710  ];
711  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
712  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
713  }
714  // Get the IDs of jobs that are considered stale and should be removed. Selecting
715  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
716  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
717  $ids = array_map(
718  function ( $o ) {
719  return $o->job_id;
720  }, iterator_to_array( $res )
721  );
722  if ( count( $ids ) ) {
723  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
724  $affected = $dbw->affectedRows();
725  $count += $affected;
726  $this->incrStats( 'abandons', $this->type, $affected );
727  }
728 
729  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
730  } catch ( DBError $e ) {
731  throw $this->getDBException( $e );
732  }
733 
734  return $count;
735  }
736 
742  protected function insertFields( IJobSpecification $job, IDatabase $db ) {
743  return [
744  // Fields that describe the nature of the job
745  'job_cmd' => $job->getType(),
746  'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
747  'job_title' => $job->getParams()['title'] ?? '',
748  'job_params' => self::makeBlob( $job->getParams() ),
749  // Additional job metadata
750  'job_timestamp' => $db->timestamp(),
751  'job_sha1' => Wikimedia\base_convert(
752  sha1( serialize( $job->getDeduplicationInfo() ) ),
753  16, 36, 31
754  ),
755  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
756  ];
757  }
758 
763  protected function getReplicaDB() {
764  try {
765  return $this->getDB( DB_REPLICA );
766  } catch ( DBConnectionError $e ) {
767  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
768  }
769  }
770 
775  protected function getMasterDB() {
776  try {
777  return $this->getDB( DB_MASTER );
778  } catch ( DBConnectionError $e ) {
779  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
780  }
781  }
782 
787  protected function getDB( $index ) {
788  if ( $this->server ) {
789  if ( $this->conn instanceof IDatabase ) {
790  return $this->conn;
791  } elseif ( $this->conn instanceof DBError ) {
792  throw $this->conn;
793  }
794 
795  try {
796  $this->conn = Database::factory( $this->server['type'], $this->server );
797  } catch ( DBError $e ) {
798  $this->conn = $e;
799  throw $e;
800  }
801 
802  return $this->conn;
803  } else {
804  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
805  $lb = is_string( $this->cluster )
806  ? $lbFactory->getExternalLB( $this->cluster )
807  : $lbFactory->getMainLB( $this->domain );
808 
809  if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
810  // Keep a separate connection to avoid contention and deadlocks;
811  // However, SQLite has the opposite behavior due to DB-level locking.
812  $flags = $lb::CONN_TRX_AUTOCOMMIT;
813  } else {
814  // Jobs insertion will be defered until the PRESEND stage to reduce contention.
815  $flags = 0;
816  }
817 
818  return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
819  }
820  }
821 
826  private function getScopedNoTrxFlag( IDatabase $db ) {
827  $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
828  $db->clearFlag( DBO_TRX ); // make each query its own transaction
829 
830  return new ScopedCallback( function () use ( $db, $autoTrx ) {
831  if ( $autoTrx ) {
832  $db->setFlag( DBO_TRX ); // restore old setting
833  }
834  } );
835  }
836 
841  private function getCacheKey( $property ) {
842  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
843 
844  return $this->wanCache->makeGlobalKey(
845  'jobqueue',
846  $this->domain,
847  $cluster,
848  $this->type,
849  $property
850  );
851  }
852 
857  protected static function makeBlob( $params ) {
858  if ( $params !== false ) {
859  return serialize( $params );
860  } else {
861  return '';
862  }
863  }
864 
869  protected function jobFromRow( $row ) {
870  $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
871  if ( !is_array( $params ) ) { // this shouldn't happen
872  throw new UnexpectedValueException(
873  "Could not unserialize job with ID '{$row->job_id}'." );
874  }
875 
876  $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
877  $job = $this->factoryJob( $row->job_cmd, $params );
878  $job->setMetadata( 'id', $row->job_id );
879  $job->setMetadata( 'timestamp', $row->job_timestamp );
880 
881  return $job;
882  }
883 
888  protected function getDBException( DBError $e ) {
889  return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
890  }
891 
897  public static function selectFields() {
898  return [
899  'job_id',
900  'job_cmd',
901  'job_namespace',
902  'job_title',
903  'job_timestamp',
904  'job_params',
905  'job_random',
906  'job_attempts',
907  'job_token',
908  'job_token_timestamp',
909  'job_sha1',
910  ];
911  }
912 }
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:234
factoryJob( $command, $params)
Definition: JobQueue.php:704
do that in ParserLimitReportFormat instead use this to modify the parameters of the image all existing parser cache entries will be invalid To avoid you ll need to handle that somehow(e.g. with the RejectParserCacheValue hook) because MediaWiki won 't do it for you. & $defaults also a ContextSource after deleting those rows but within the same transaction $rows
Definition: hooks.txt:2621
insertFields(IJobSpecification $job, IDatabase $db)
Definition: JobQueueDB.php:742
$property
getCoalesceLocationInternal()
Definition: JobQueueDB.php:605
serialize()
getDB( $index)
Definition: JobQueueDB.php:787
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:59
getScopedNoTrxFlag(IDatabase $db)
Definition: JobQueueDB.php:826
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2147
insert( $table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:615
incrStats( $key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:726
This code would result in ircNotify being run twice when an article is and once for brion Hooks can return three possible true was required This is the default since MediaWiki *some string
Definition: hooks.txt:175
const NS_SPECIAL
Definition: Defines.php:49
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:38
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:36
const DB_MASTER
Definition: defines.php:26
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by wfTimestamp() to the format used for inserting ...
static makeBlob( $params)
Definition: JobQueueDB.php:857
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:655
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:339
string $type
Job type.
Definition: JobQueue.php:35
array null $server
Server configuration array.
Definition: JobQueueDB.php:46
jobFromRow( $row)
Definition: JobQueueDB.php:869
getAllAcquiredJobs()
Definition: JobQueueDB.php:581
getMetadata( $field=null)
const MAX_OFFSET
Definition: JobQueueDB.php:40
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
$res
Definition: database.txt:21
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
getDBException(DBError $e)
Definition: JobQueueDB.php:888
getDeduplicationInfo()
Subclasses may need to override this to make duplication detection work.
$params
unserialize( $serialized)
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:37
doGetAcquiredCount()
Definition: JobQueueDB.php:128
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:767
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack() ...
Definition: RunnableJob.php:35
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:39
getJobIterator(array $conds)
Definition: JobQueueDB.php:589
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:201
Convenience class for generating iterators from iterators.
supportedOrders()
Definition: JobQueueDB.php:69
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
if(defined( 'MW_SETUP_CALLBACK')) $fname
Customization point after all loading (constants, functions, classes, DefaultSettings, LocalSettings).
Definition: Setup.php:131
const DBO_TRX
Definition: defines.php:12
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:508
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
doGetAbandonedCount()
Definition: JobQueueDB.php:161
doWaitForBackups()
Definition: JobQueueDB.php:548
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:38
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
Definition: JobQueueDB.php:48
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count( $args)< 1) $job
IMaintainableDatabase DBError null $conn
Definition: JobQueueDB.php:43
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:634
select( $table, $vars, $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:897
Interface for serializable objects that describe a job queue task.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such and we might be restricted by PHP settings such as safe mode or open_basedir We cannot assume that the software even has read access anywhere useful Many shared hosts run all users web applications under the same so they can t rely on Unix and must forbid reads to even standard directories like tmp lest users read each others files We cannot assume that the user has the ability to install or run any programs not written as web accessible PHP scripts Since anything that works on cheap shared hosting will work if you have shell or root access MediaWiki s design is based around catering to the lowest common denominator Although we support higher end setups as the way many things work by default is tailored toward shared hosting These defaults are unconventional from the point of view of and they certainly aren t ideal for someone who s installing MediaWiki as MediaWiki does not conform to normal Unix filesystem layout Hopefully we ll offer direct support for standard layouts in the but for now *any change to the location of files is unsupported *Moving things and leaving symlinks will *probably *not break but it is *strongly *advised not to try any more intrusive changes to get MediaWiki to conform more closely to your filesystem hierarchy Any such attempt will almost certainly result in unnecessary bugs The standard recommended location to install relative to the web is it should be possible to enable the appropriate rewrite rules by if you can reconfigure the web server
doAck(RunnableJob $job)
Definition: JobQueueDB.php:479
const DB_REPLICA
Definition: defines.php:25
getCacheKey( $property)
Definition: JobQueueDB.php:841
getAllQueuedJobs()
Definition: JobQueueDB.php:573
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
Database error base class.
Definition: DBError.php:30
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:417