MediaWiki  master
JobQueueDB.php
Go to the documentation of this file.
1 <?php
28 use Wikimedia\ScopedCallback;
29 
36 class JobQueueDB extends JobQueue {
37  private const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
38  private const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
39  private const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
40  private const MAX_OFFSET = 255; // integer; maximum number of rows to skip
41 
43  protected $conn;
44 
46  protected $server;
48  protected $cluster;
49 
59  protected function __construct( array $params ) {
60  parent::__construct( $params );
61 
62  if ( isset( $params['server'] ) ) {
63  $this->server = $params['server'];
64  } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
65  $this->cluster = $params['cluster'];
66  }
67  }
68 
69  protected function supportedOrders() {
70  return [ 'random', 'timestamp', 'fifo' ];
71  }
72 
73  protected function optimalOrder() {
74  return 'random';
75  }
76 
81  protected function doIsEmpty() {
82  $dbr = $this->getReplicaDB();
84  $scope = $this->getScopedNoTrxFlag( $dbr );
85  try {
86  // unclaimed job
87  $found = (bool)$dbr->newSelectQueryBuilder()
88  ->select( '1' )
89  ->from( 'job' )
90  ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
91  ->caller( __METHOD__ )->fetchField();
92  } catch ( DBError $e ) {
93  throw $this->getDBException( $e );
94  }
95 
96  return !$found;
97  }
98 
103  protected function doGetSize() {
104  $key = $this->getCacheKey( 'size' );
105 
106  $size = $this->wanCache->get( $key );
107  if ( is_int( $size ) ) {
108  return $size;
109  }
110 
111  $dbr = $this->getReplicaDB();
113  $scope = $this->getScopedNoTrxFlag( $dbr );
114  try {
115  $size = $dbr->newSelectQueryBuilder()
116  ->from( 'job' )
117  ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
118  ->caller( __METHOD__ )->fetchRowCount();
119  } catch ( DBError $e ) {
120  throw $this->getDBException( $e );
121  }
122  $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
123 
124  return $size;
125  }
126 
131  protected function doGetAcquiredCount() {
132  if ( $this->claimTTL <= 0 ) {
133  return 0; // no acknowledgements
134  }
135 
136  $key = $this->getCacheKey( 'acquiredcount' );
137 
138  $count = $this->wanCache->get( $key );
139  if ( is_int( $count ) ) {
140  return $count;
141  }
142 
143  $dbr = $this->getReplicaDB();
145  $scope = $this->getScopedNoTrxFlag( $dbr );
146  try {
147  $count = $dbr->newSelectQueryBuilder()
148  ->from( 'job' )
149  ->where( [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ] )
150  ->caller( __METHOD__ )->fetchRowCount();
151  } catch ( DBError $e ) {
152  throw $this->getDBException( $e );
153  }
154  $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
155 
156  return $count;
157  }
158 
164  protected function doGetAbandonedCount() {
165  if ( $this->claimTTL <= 0 ) {
166  return 0; // no acknowledgements
167  }
168 
169  $key = $this->getCacheKey( 'abandonedcount' );
170 
171  $count = $this->wanCache->get( $key );
172  if ( is_int( $count ) ) {
173  return $count;
174  }
175 
176  $dbr = $this->getReplicaDB();
178  $scope = $this->getScopedNoTrxFlag( $dbr );
179  try {
180  $count = $dbr->newSelectQueryBuilder()
181  ->from( 'job' )
182  ->where(
183  [
184  'job_cmd' => $this->type,
185  "job_token != {$dbr->addQuotes( '' )}",
186  $dbr->buildComparison( '>=', [ 'job_attempts' => $this->maxTries ] ),
187  ]
188  )
189  ->caller( __METHOD__ )->fetchRowCount();
190  } catch ( DBError $e ) {
191  throw $this->getDBException( $e );
192  }
193 
194  $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
195 
196  return $count;
197  }
198 
206  protected function doBatchPush( array $jobs, $flags ) {
207  $dbw = $this->getPrimaryDB();
209  $scope = $this->getScopedNoTrxFlag( $dbw );
210  // In general, there will be two cases here:
211  // a) sqlite; DB connection is probably a regular round-aware handle.
212  // If the connection is busy with a transaction, then defer the job writes
213  // until right before the main round commit step. Any errors that bubble
214  // up will rollback the main commit round.
215  // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
216  // No transaction is active nor will be started by writes, so enqueue the jobs
217  // now so that any errors will show up immediately as the interface expects. Any
218  // errors that bubble up will rollback the main commit round.
219  $fname = __METHOD__;
220  $dbw->onTransactionPreCommitOrIdle(
221  function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
222  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
223  },
224  $fname
225  );
226  }
227 
239  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
240  if ( $jobs === [] ) {
241  return;
242  }
243 
244  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
245  $rowList = []; // list of jobs for jobs that are not de-duplicated
246  foreach ( $jobs as $job ) {
247  $row = $this->insertFields( $job, $dbw );
248  if ( $job->ignoreDuplicates() ) {
249  $rowSet[$row['job_sha1']] = $row;
250  } else {
251  $rowList[] = $row;
252  }
253  }
254 
255  if ( $flags & self::QOS_ATOMIC ) {
256  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
257  }
258  try {
259  // Strip out any duplicate jobs that are already in the queue...
260  if ( count( $rowSet ) ) {
261  $res = $dbw->newSelectQueryBuilder()
262  ->select( 'job_sha1' )
263  ->from( 'job' )
264  ->where(
265  [
266  // No job_type condition since it's part of the job_sha1 hash
267  'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
268  'job_token' => '' // unclaimed
269  ]
270  )
271  ->caller( $method )->fetchResultSet();
272  foreach ( $res as $row ) {
273  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
274  unset( $rowSet[$row->job_sha1] ); // already enqueued
275  }
276  }
277  // Build the full list of job rows to insert
278  $rows = array_merge( $rowList, array_values( $rowSet ) );
279  // Insert the job rows in chunks to avoid replica DB lag...
280  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
281  $dbw->insert( 'job', $rowBatch, $method );
282  }
283  $this->incrStats( 'inserts', $this->type, count( $rows ) );
284  $this->incrStats( 'dupe_inserts', $this->type,
285  count( $rowSet ) + count( $rowList ) - count( $rows )
286  );
287  } catch ( DBError $e ) {
288  throw $this->getDBException( $e );
289  }
290  if ( $flags & self::QOS_ATOMIC ) {
291  $dbw->endAtomic( $method );
292  }
293  }
294 
299  protected function doPop() {
300  $dbw = $this->getPrimaryDB();
302  $scope = $this->getScopedNoTrxFlag( $dbw );
303 
304  $job = false; // job popped off
305  try {
306  $uuid = wfRandomString( 32 ); // pop attempt
307  do { // retry when our row is invalid or deleted as a duplicate
308  // Try to reserve a row in the DB...
309  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
310  $row = $this->claimOldest( $uuid );
311  } else { // random first
312  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
313  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
314  $row = $this->claimRandom( $uuid, $rand, $gte );
315  }
316  // Check if we found a row to reserve...
317  if ( !$row ) {
318  break; // nothing to do
319  }
320  $this->incrStats( 'pops', $this->type );
321 
322  // Get the job object from the row...
323  $job = $this->jobFromRow( $row );
324  break; // done
325  } while ( true );
326 
327  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
328  // Handled jobs that need to be recycled/deleted;
329  // any recycled jobs will be picked up next attempt
330  $this->recycleAndDeleteStaleJobs();
331  }
332  } catch ( DBError $e ) {
333  throw $this->getDBException( $e );
334  }
335 
336  return $job;
337  }
338 
347  protected function claimRandom( $uuid, $rand, $gte ) {
348  $dbw = $this->getPrimaryDB();
350  $scope = $this->getScopedNoTrxFlag( $dbw );
351  // Check cache to see if the queue has <= OFFSET items
352  $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
353 
354  $invertedDirection = false; // whether one job_random direction was already scanned
355  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
356  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
357  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
358  // be used here with MySQL.
359  do {
360  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
361  // For small queues, using OFFSET will overshoot and return no rows more often.
362  // Instead, this uses job_random to pick a row (possibly checking both directions).
363  $row = $dbw->newSelectQueryBuilder()
364  ->select( self::selectFields() )
365  ->from( 'job' )
366  ->where(
367  [
368  'job_cmd' => $this->type,
369  'job_token' => '', // unclaimed
370  $dbw->buildComparison( $gte ? '>=' : '<=', [ 'job_random' => $rand ] )
371  ]
372  )
373  ->orderBy(
374  'job_random',
375  $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
376  )
377  ->caller( __METHOD__ )->fetchRow();
378  if ( !$row && !$invertedDirection ) {
379  $gte = !$gte;
380  $invertedDirection = true;
381  continue; // try the other direction
382  }
383  } else { // table *may* have >= MAX_OFFSET rows
384  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
385  // in MySQL if there are many rows for some reason. This uses a small OFFSET
386  // instead of job_random for reducing excess claim retries.
387  $row = $dbw->newSelectQueryBuilder()
388  ->select( self::selectFields() )
389  ->from( 'job' )
390  ->where(
391  [
392  'job_cmd' => $this->type,
393  'job_token' => '', // unclaimed
394  ]
395  )
396  ->offset( mt_rand( 0, self::MAX_OFFSET ) )
397  ->caller( __METHOD__ )->fetchRow();
398  if ( !$row ) {
399  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
400  $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
401  continue; // use job_random
402  }
403  }
404 
405  if ( !$row ) {
406  break;
407  }
408 
409  $dbw->update( 'job', // update by PK
410  [
411  'job_token' => $uuid,
412  'job_token_timestamp' => $dbw->timestamp(),
413  'job_attempts = job_attempts+1' ],
414  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
415  __METHOD__
416  );
417  // This might get raced out by another runner when claiming the previously
418  // selected row. The use of job_random should minimize this problem, however.
419  if ( !$dbw->affectedRows() ) {
420  $row = false; // raced out
421  }
422  } while ( !$row );
423 
424  return $row;
425  }
426 
433  protected function claimOldest( $uuid ) {
434  $dbw = $this->getPrimaryDB();
436  $scope = $this->getScopedNoTrxFlag( $dbw );
437 
438  $row = false; // the row acquired
439  do {
440  if ( $dbw->getType() === 'mysql' ) {
441  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
442  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
443  // Postgres has no such limitation. However, MySQL offers an
444  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
445  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
446  "SET " .
447  "job_token = {$dbw->addQuotes( $uuid ) }, " .
448  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
449  "job_attempts = job_attempts+1 " .
450  "WHERE ( " .
451  "job_cmd = {$dbw->addQuotes( $this->type )} " .
452  "AND job_token = {$dbw->addQuotes( '' )} " .
453  ") ORDER BY job_id ASC LIMIT 1",
454  __METHOD__
455  );
456  } else {
457  // Use a subquery to find the job, within an UPDATE to claim it.
458  // This uses as much of the DB wrapper functions as possible.
459  $qb = $dbw->newSelectQueryBuilder()
460  ->select( 'job_id' )
461  ->from( 'job' )
462  ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
463  ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
464  ->limit( 1 );
465 
466  $dbw->update( 'job',
467  [
468  'job_token' => $uuid,
469  'job_token_timestamp' => $dbw->timestamp(),
470  'job_attempts = job_attempts+1' ],
471  [ 'job_id = (' . $qb->getSQL() . ')'
472  ],
473  __METHOD__
474  );
475  }
476 
477  if ( !$dbw->affectedRows() ) {
478  break;
479  }
480 
481  // Fetch any row that we just reserved...
482  $row = $dbw->newSelectQueryBuilder()
483  ->select( self::selectFields() )
484  ->from( 'job' )
485  ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
486  ->caller( __METHOD__ )->fetchRow();
487  if ( !$row ) { // raced out by duplicate job removal
488  wfDebug( "Row deleted as duplicate by another process." );
489  }
490  } while ( !$row );
491 
492  return $row;
493  }
494 
500  protected function doAck( RunnableJob $job ) {
501  $id = $job->getMetadata( 'id' );
502  if ( $id === null ) {
503  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
504  }
505 
506  $dbw = $this->getPrimaryDB();
508  $scope = $this->getScopedNoTrxFlag( $dbw );
509  try {
510  // Delete a row with a single DELETE without holding row locks over RTTs...
511  $dbw->delete(
512  'job',
513  [ 'job_cmd' => $this->type, 'job_id' => $id ],
514  __METHOD__
515  );
516 
517  $this->incrStats( 'acks', $this->type );
518  } catch ( DBError $e ) {
519  throw $this->getDBException( $e );
520  }
521  }
522 
530  // Callers should call JobQueueGroup::push() before this method so that if the
531  // insert fails, the de-duplication registration will be aborted. Since the insert
532  // is deferred till "transaction idle", do the same here, so that the ordering is
533  // maintained. Having only the de-duplication registration succeed would cause
534  // jobs to become no-ops without any actual jobs that made them redundant.
535  $dbw = $this->getPrimaryDB();
537  $scope = $this->getScopedNoTrxFlag( $dbw );
538  $dbw->onTransactionCommitOrIdle(
539  function () use ( $job ) {
540  parent::doDeduplicateRootJob( $job );
541  },
542  __METHOD__
543  );
544 
545  return true;
546  }
547 
552  protected function doDelete() {
553  $dbw = $this->getPrimaryDB();
555  $scope = $this->getScopedNoTrxFlag( $dbw );
556  try {
557  $dbw->delete( 'job', [ 'job_cmd' => $this->type ], __METHOD__ );
558  } catch ( DBError $e ) {
559  throw $this->getDBException( $e );
560  }
561 
562  return true;
563  }
564 
569  protected function doWaitForBackups() {
570  if ( $this->server ) {
571  return; // not using LBFactory instance
572  }
573 
574  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
575  $lbFactory->waitForReplication( [
576  'domain' => $this->domain,
577  'cluster' => is_string( $this->cluster ) ? $this->cluster : false
578  ] );
579  }
580 
584  protected function doFlushCaches() {
585  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
586  $this->wanCache->delete( $this->getCacheKey( $type ) );
587  }
588  }
589 
594  public function getAllQueuedJobs() {
595  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
596  }
597 
602  public function getAllAcquiredJobs() {
603  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
604  }
605 
610  public function getAllAbandonedJobs() {
611  return $this->getJobIterator( [
612  'job_cmd' => $this->getType(),
613  "job_token > ''",
614  "job_attempts >= " . intval( $this->maxTries )
615  ] );
616  }
617 
622  protected function getJobIterator( array $conds ) {
623  $dbr = $this->getReplicaDB();
625  $scope = $this->getScopedNoTrxFlag( $dbr );
626  $qb = $dbr->newSelectQueryBuilder()
627  ->select( self::selectFields() )
628  ->from( 'job' )
629  ->where( $conds );
630  try {
631  return new MappedIterator(
632  $qb->caller( __METHOD__ )->fetchResultSet(),
633  function ( $row ) {
634  return $this->jobFromRow( $row );
635  }
636  );
637  } catch ( DBError $e ) {
638  throw $this->getDBException( $e );
639  }
640  }
641 
642  public function getCoalesceLocationInternal() {
643  if ( $this->server ) {
644  return null; // not using the LBFactory instance
645  }
646 
647  return is_string( $this->cluster )
648  ? "DBCluster:{$this->cluster}:{$this->domain}"
649  : "LBFactory:{$this->domain}";
650  }
651 
652  protected function doGetSiblingQueuesWithJobs( array $types ) {
653  $dbr = $this->getReplicaDB();
655  $scope = $this->getScopedNoTrxFlag( $dbr );
656  // @note: this does not check whether the jobs are claimed or not.
657  // This is useful so JobQueueGroup::pop() also sees queues that only
658  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
659  // failed jobs so that they can be popped again for that edge case.
660  $res = $dbr->newSelectQueryBuilder()
661  ->select( 'job_cmd' )
662  ->distinct()
663  ->from( 'job' )
664  ->where( [ 'job_cmd' => $types ] )
665  ->caller( __METHOD__ )->fetchResultSet();
666 
667  $types = [];
668  foreach ( $res as $row ) {
669  $types[] = $row->job_cmd;
670  }
671 
672  return $types;
673  }
674 
675  protected function doGetSiblingQueueSizes( array $types ) {
676  $dbr = $this->getReplicaDB();
678  $scope = $this->getScopedNoTrxFlag( $dbr );
679 
680  $res = $dbr->newSelectQueryBuilder()
681  ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
682  ->from( 'job' )
683  ->where( [ 'job_cmd' => $types ] )
684  ->groupBy( 'job_cmd' )
685  ->caller( __METHOD__ )->fetchResultSet();
686 
687  $sizes = [];
688  foreach ( $res as $row ) {
689  $sizes[$row->job_cmd] = (int)$row->count;
690  }
691 
692  return $sizes;
693  }
694 
700  public function recycleAndDeleteStaleJobs() {
701  $now = time();
702  $count = 0; // affected rows
703  $dbw = $this->getPrimaryDB();
705  $scope = $this->getScopedNoTrxFlag( $dbw );
706 
707  try {
708  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
709  return $count; // already in progress
710  }
711 
712  // Remove claims on jobs acquired for too long if enabled...
713  if ( $this->claimTTL > 0 ) {
714  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
715  // Get the IDs of jobs that have be claimed but not finished after too long.
716  // These jobs can be recycled into the queue by expiring the claim. Selecting
717  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
718  $res = $dbw->newSelectQueryBuilder()
719  ->select( 'job_id' )
720  ->from( 'job' )
721  ->where(
722  [
723  'job_cmd' => $this->type,
724  "job_token != {$dbw->addQuotes( '' )}", // was acquired
725  $dbw->buildComparison( '<', [ 'job_token_timestamp' => $claimCutoff ] ), // stale
726  $dbw->buildComparison( '<', [ 'job_attempts' => $this->maxTries ] ), // retries left
727  ]
728  )
729  ->caller( __METHOD__ )->fetchResultSet();
730  $ids = array_map(
731  static function ( $o ) {
732  return $o->job_id;
733  }, iterator_to_array( $res )
734  );
735  if ( count( $ids ) ) {
736  // Reset job_token for these jobs so that other runners will pick them up.
737  // Set the timestamp to the current time, as it is useful to now that the job
738  // was already tried before (the timestamp becomes the "released" time).
739  $dbw->update( 'job',
740  [
741  'job_token' => '',
742  'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
743  ],
744  [ 'job_id' => $ids, "job_token != {$dbw->addQuotes( '' )}" ],
745  __METHOD__
746  );
747  $affected = $dbw->affectedRows();
748  $count += $affected;
749  $this->incrStats( 'recycles', $this->type, $affected );
750  }
751  }
752 
753  // Just destroy any stale jobs...
754  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
755  $qb = $dbw->newSelectQueryBuilder()
756  ->select( 'job_id' )
757  ->from( 'job' )
758  ->where(
759  [
760  'job_cmd' => $this->type,
761  "job_token != {$dbw->addQuotes( '' )}", // was acquired
762  $dbw->buildComparison( '<', [ 'job_token_timestamp' => $pruneCutoff ] ) // stale
763  ]
764  );
765  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
766  $qb->andWhere( "job_attempts >= {$dbw->addQuotes( $this->maxTries )}" );
767  }
768  // Get the IDs of jobs that are considered stale and should be removed. Selecting
769  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
770  $res = $qb->caller( __METHOD__ )->fetchResultSet();
771  $ids = array_map(
772  static function ( $o ) {
773  return $o->job_id;
774  }, iterator_to_array( $res )
775  );
776  if ( count( $ids ) ) {
777  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
778  $affected = $dbw->affectedRows();
779  $count += $affected;
780  $this->incrStats( 'abandons', $this->type, $affected );
781  }
782 
783  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
784  } catch ( DBError $e ) {
785  throw $this->getDBException( $e );
786  }
787 
788  return $count;
789  }
790 
796  protected function insertFields( IJobSpecification $job, IDatabase $db ) {
797  return [
798  // Fields that describe the nature of the job
799  'job_cmd' => $job->getType(),
800  'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
801  'job_title' => $job->getParams()['title'] ?? '',
802  'job_params' => self::makeBlob( $job->getParams() ),
803  // Additional job metadata
804  'job_timestamp' => $db->timestamp(),
805  'job_sha1' => Wikimedia\base_convert(
806  sha1( serialize( $job->getDeduplicationInfo() ) ),
807  16, 36, 31
808  ),
809  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
810  ];
811  }
812 
817  protected function getReplicaDB() {
818  try {
819  return $this->getDB( DB_REPLICA );
820  } catch ( DBConnectionError $e ) {
821  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
822  }
823  }
824 
830  protected function getPrimaryDB() {
831  try {
832  return $this->getDB( DB_PRIMARY );
833  } catch ( DBConnectionError $e ) {
834  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
835  }
836  }
837 
842  protected function getDB( $index ) {
843  if ( $this->server ) {
844  if ( $this->conn instanceof IDatabase ) {
845  return $this->conn;
846  } elseif ( $this->conn instanceof DBError ) {
847  throw $this->conn;
848  }
849 
850  try {
851  $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
852  $this->server['type'],
853  $this->server
854  );
855  } catch ( DBError $e ) {
856  $this->conn = $e;
857  throw $e;
858  }
859 
860  return $this->conn;
861  } else {
862  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
863  $lb = is_string( $this->cluster )
864  ? $lbFactory->getExternalLB( $this->cluster )
865  : $lbFactory->getMainLB( $this->domain );
866 
867  if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
868  // Keep a separate connection to avoid contention and deadlocks;
869  // However, SQLite has the opposite behavior due to DB-level locking.
870  $flags = $lb::CONN_TRX_AUTOCOMMIT;
871  } else {
872  // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
873  $flags = 0;
874  }
875 
876  return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
877  }
878  }
879 
884  private function getScopedNoTrxFlag( IDatabase $db ) {
885  $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
886  $db->clearFlag( DBO_TRX ); // make each query its own transaction
887 
888  return new ScopedCallback( static function () use ( $db, $autoTrx ) {
889  if ( $autoTrx ) {
890  $db->setFlag( DBO_TRX ); // restore old setting
891  }
892  } );
893  }
894 
899  private function getCacheKey( $property ) {
900  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
901 
902  return $this->wanCache->makeGlobalKey(
903  'jobqueue',
904  $this->domain,
905  $cluster,
906  $this->type,
907  $property
908  );
909  }
910 
915  protected static function makeBlob( $params ) {
916  if ( $params !== false ) {
917  return serialize( $params );
918  } else {
919  return '';
920  }
921  }
922 
927  protected function jobFromRow( $row ) {
928  $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
929  if ( !is_array( $params ) ) { // this shouldn't happen
930  throw new UnexpectedValueException(
931  "Could not unserialize job with ID '{$row->job_id}'." );
932  }
933 
934  $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
935  $job = $this->factoryJob( $row->job_cmd, $params );
936  $job->setMetadata( 'id', $row->job_id );
937  $job->setMetadata( 'timestamp', $row->job_timestamp );
938 
939  return $job;
940  }
941 
946  protected function getDBException( DBError $e ) {
947  return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
948  }
949 
955  public static function selectFields() {
956  return [
957  'job_id',
958  'job_cmd',
959  'job_namespace',
960  'job_title',
961  'job_timestamp',
962  'job_params',
963  'job_random',
964  'job_attempts',
965  'job_token',
966  'job_token_timestamp',
967  'job_sha1',
968  ];
969  }
970 }
const NS_SPECIAL
Definition: Defines.php:53
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:36
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:433
doAck(RunnableJob $job)
Definition: JobQueueDB.php:500
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueDB.php:69
getAllAcquiredJobs()
Definition: JobQueueDB.php:602
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:675
insertFields(IJobSpecification $job, IDatabase $db)
Definition: JobQueueDB.php:796
doWaitForBackups()
Definition: JobQueueDB.php:569
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:59
getDBException(DBError $e)
Definition: JobQueueDB.php:946
getAllAbandonedJobs()
Definition: JobQueueDB.php:610
jobFromRow( $row)
Definition: JobQueueDB.php:927
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:206
static makeBlob( $params)
Definition: JobQueueDB.php:915
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:347
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:652
doGetAcquiredCount()
Definition: JobQueueDB.php:131
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:700
doGetAbandonedCount()
Definition: JobQueueDB.php:164
getAllQueuedJobs()
Definition: JobQueueDB.php:594
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:239
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueDB.php:73
getDB( $index)
Definition: JobQueueDB.php:842
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
Definition: JobQueueDB.php:48
IMaintainableDatabase DBError null $conn
Definition: JobQueueDB.php:43
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueDB.php:642
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:529
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:955
getJobIterator(array $conds)
Definition: JobQueueDB.php:622
array null $server
Server configuration array.
Definition: JobQueueDB.php:46
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:39
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition: JobQueue.php:779
string $type
Job type.
Definition: JobQueue.php:43
factoryJob( $command, $params)
Definition: JobQueue.php:745
MediaWiki exception.
Definition: MWException.php:32
Convenience class for generating iterators from iterators.
Service locator for MediaWiki core services.
Database error base class.
Definition: DBError.php:31
A query builder for SELECT queries with a fluent interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:36
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
insert( $table, $rows, $fname=__METHOD__, $options=[])
Insert row(s) into a table, in the provided order.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
Advanced database interface for IDatabase handles that include maintenance methods.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
const DB_REPLICA
Definition: defines.php:26
const DB_PRIMARY
Definition: defines.php:28
const DBO_TRX
Definition: defines.php:12
if(count( $args)< 1) $job