MediaWiki  master
JobQueueDB.php
Go to the documentation of this file.
1 <?php
28 use Wikimedia\ScopedCallback;
29 
36 class JobQueueDB extends JobQueue {
37  private const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
38  private const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
39  private const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
40  private const MAX_OFFSET = 255; // integer; maximum number of rows to skip
41 
43  protected $conn;
44 
46  protected $server;
48  protected $cluster;
49 
59  protected function __construct( array $params ) {
60  parent::__construct( $params );
61 
62  if ( isset( $params['server'] ) ) {
63  $this->server = $params['server'];
64  } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
65  $this->cluster = $params['cluster'];
66  }
67  }
68 
69  protected function supportedOrders() {
70  return [ 'random', 'timestamp', 'fifo' ];
71  }
72 
73  protected function optimalOrder() {
74  return 'random';
75  }
76 
81  protected function doIsEmpty() {
82  $dbr = $this->getReplicaDB();
84  $scope = $this->getScopedNoTrxFlag( $dbr );
85  try {
86  // unclaimed job
87  $found = (bool)$dbr->newSelectQueryBuilder()
88  ->select( '1' )
89  ->from( 'job' )
90  ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
91  ->caller( __METHOD__ )->fetchField();
92  } catch ( DBError $e ) {
93  throw $this->getDBException( $e );
94  }
95 
96  return !$found;
97  }
98 
103  protected function doGetSize() {
104  $key = $this->getCacheKey( 'size' );
105 
106  $size = $this->wanCache->get( $key );
107  if ( is_int( $size ) ) {
108  return $size;
109  }
110 
111  $dbr = $this->getReplicaDB();
113  $scope = $this->getScopedNoTrxFlag( $dbr );
114  try {
115  $size = $dbr->newSelectQueryBuilder()
116  ->from( 'job' )
117  ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
118  ->caller( __METHOD__ )->fetchRowCount();
119  } catch ( DBError $e ) {
120  throw $this->getDBException( $e );
121  }
122  $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
123 
124  return $size;
125  }
126 
131  protected function doGetAcquiredCount() {
132  if ( $this->claimTTL <= 0 ) {
133  return 0; // no acknowledgements
134  }
135 
136  $key = $this->getCacheKey( 'acquiredcount' );
137 
138  $count = $this->wanCache->get( $key );
139  if ( is_int( $count ) ) {
140  return $count;
141  }
142 
143  $dbr = $this->getReplicaDB();
145  $scope = $this->getScopedNoTrxFlag( $dbr );
146  try {
147  $count = $dbr->newSelectQueryBuilder()
148  ->from( 'job' )
149  ->where( [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ] )
150  ->caller( __METHOD__ )->fetchRowCount();
151  } catch ( DBError $e ) {
152  throw $this->getDBException( $e );
153  }
154  $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
155 
156  return $count;
157  }
158 
164  protected function doGetAbandonedCount() {
165  if ( $this->claimTTL <= 0 ) {
166  return 0; // no acknowledgements
167  }
168 
169  $key = $this->getCacheKey( 'abandonedcount' );
170 
171  $count = $this->wanCache->get( $key );
172  if ( is_int( $count ) ) {
173  return $count;
174  }
175 
176  $dbr = $this->getReplicaDB();
178  $scope = $this->getScopedNoTrxFlag( $dbr );
179  try {
180  $count = $dbr->newSelectQueryBuilder()
181  ->from( 'job' )
182  ->where(
183  [
184  'job_cmd' => $this->type,
185  "job_token != {$dbr->addQuotes( '' )}",
186  $dbr->buildComparison( '>=', [ 'job_attempts' => $this->maxTries ] ),
187  ]
188  )
189  ->caller( __METHOD__ )->fetchRowCount();
190  } catch ( DBError $e ) {
191  throw $this->getDBException( $e );
192  }
193 
194  $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
195 
196  return $count;
197  }
198 
206  protected function doBatchPush( array $jobs, $flags ) {
207  $dbw = $this->getPrimaryDB();
209  $scope = $this->getScopedNoTrxFlag( $dbw );
210  // In general, there will be two cases here:
211  // a) sqlite; DB connection is probably a regular round-aware handle.
212  // If the connection is busy with a transaction, then defer the job writes
213  // until right before the main round commit step. Any errors that bubble
214  // up will rollback the main commit round.
215  // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
216  // No transaction is active nor will be started by writes, so enqueue the jobs
217  // now so that any errors will show up immediately as the interface expects. Any
218  // errors that bubble up will rollback the main commit round.
219  $fname = __METHOD__;
220  $dbw->onTransactionPreCommitOrIdle(
221  function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
222  $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
223  },
224  $fname
225  );
226  }
227 
239  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
240  if ( $jobs === [] ) {
241  return;
242  }
243 
244  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
245  $rowList = []; // list of jobs for jobs that are not de-duplicated
246  foreach ( $jobs as $job ) {
247  $row = $this->insertFields( $job, $dbw );
248  if ( $job->ignoreDuplicates() ) {
249  $rowSet[$row['job_sha1']] = $row;
250  } else {
251  $rowList[] = $row;
252  }
253  }
254 
255  if ( $flags & self::QOS_ATOMIC ) {
256  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
257  }
258  try {
259  // Strip out any duplicate jobs that are already in the queue...
260  if ( count( $rowSet ) ) {
261  $res = $dbw->newSelectQueryBuilder()
262  ->select( 'job_sha1' )
263  ->from( 'job' )
264  ->where(
265  [
266  // No job_type condition since it's part of the job_sha1 hash
267  'job_sha1' => array_map( 'strval', array_keys( $rowSet ) ),
268  'job_token' => '' // unclaimed
269  ]
270  )
271  ->caller( $method )->fetchResultSet();
272  foreach ( $res as $row ) {
273  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
274  unset( $rowSet[$row->job_sha1] ); // already enqueued
275  }
276  }
277  // Build the full list of job rows to insert
278  $rows = array_merge( $rowList, array_values( $rowSet ) );
279  // Insert the job rows in chunks to avoid replica DB lag...
280  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
281  $dbw->newInsertQueryBuilder()
282  ->insertInto( 'job' )
283  ->rows( $rowBatch )
284  ->caller( $method )->execute();
285  }
286  $this->incrStats( 'inserts', $this->type, count( $rows ) );
287  $this->incrStats( 'dupe_inserts', $this->type,
288  count( $rowSet ) + count( $rowList ) - count( $rows )
289  );
290  } catch ( DBError $e ) {
291  throw $this->getDBException( $e );
292  }
293  if ( $flags & self::QOS_ATOMIC ) {
294  $dbw->endAtomic( $method );
295  }
296  }
297 
302  protected function doPop() {
303  $dbw = $this->getPrimaryDB();
305  $scope = $this->getScopedNoTrxFlag( $dbw );
306 
307  $job = false; // job popped off
308  try {
309  $uuid = wfRandomString( 32 ); // pop attempt
310  do { // retry when our row is invalid or deleted as a duplicate
311  // Try to reserve a row in the DB...
312  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
313  $row = $this->claimOldest( $uuid );
314  } else { // random first
315  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
316  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
317  $row = $this->claimRandom( $uuid, $rand, $gte );
318  }
319  // Check if we found a row to reserve...
320  if ( !$row ) {
321  break; // nothing to do
322  }
323  $this->incrStats( 'pops', $this->type );
324 
325  // Get the job object from the row...
326  $job = $this->jobFromRow( $row );
327  break; // done
328  } while ( true );
329 
330  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
331  // Handled jobs that need to be recycled/deleted;
332  // any recycled jobs will be picked up next attempt
333  $this->recycleAndDeleteStaleJobs();
334  }
335  } catch ( DBError $e ) {
336  throw $this->getDBException( $e );
337  }
338 
339  return $job;
340  }
341 
350  protected function claimRandom( $uuid, $rand, $gte ) {
351  $dbw = $this->getPrimaryDB();
353  $scope = $this->getScopedNoTrxFlag( $dbw );
354  // Check cache to see if the queue has <= OFFSET items
355  $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
356 
357  $invertedDirection = false; // whether one job_random direction was already scanned
358  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
359  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
360  // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
361  // be used here with MySQL.
362  do {
363  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
364  // For small queues, using OFFSET will overshoot and return no rows more often.
365  // Instead, this uses job_random to pick a row (possibly checking both directions).
366  $row = $dbw->newSelectQueryBuilder()
367  ->select( self::selectFields() )
368  ->from( 'job' )
369  ->where(
370  [
371  'job_cmd' => $this->type,
372  'job_token' => '', // unclaimed
373  $dbw->buildComparison( $gte ? '>=' : '<=', [ 'job_random' => $rand ] )
374  ]
375  )
376  ->orderBy(
377  'job_random',
378  $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
379  )
380  ->caller( __METHOD__ )->fetchRow();
381  if ( !$row && !$invertedDirection ) {
382  $gte = !$gte;
383  $invertedDirection = true;
384  continue; // try the other direction
385  }
386  } else { // table *may* have >= MAX_OFFSET rows
387  // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
388  // in MySQL if there are many rows for some reason. This uses a small OFFSET
389  // instead of job_random for reducing excess claim retries.
390  $row = $dbw->newSelectQueryBuilder()
391  ->select( self::selectFields() )
392  ->from( 'job' )
393  ->where(
394  [
395  'job_cmd' => $this->type,
396  'job_token' => '', // unclaimed
397  ]
398  )
399  ->offset( mt_rand( 0, self::MAX_OFFSET ) )
400  ->caller( __METHOD__ )->fetchRow();
401  if ( !$row ) {
402  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
403  $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
404  continue; // use job_random
405  }
406  }
407 
408  if ( !$row ) {
409  break;
410  }
411 
412  $dbw->newUpdateQueryBuilder()
413  ->update( 'job' ) // update by PK
414  ->set( [
415  'job_token' => $uuid,
416  'job_token_timestamp' => $dbw->timestamp(),
417  'job_attempts = job_attempts+1'
418  ] )
419  ->where( [
420  'job_cmd' => $this->type,
421  'job_id' => $row->job_id,
422  'job_token' => ''
423  ] )
424  ->caller( __METHOD__ )->execute();
425 
426  // This might get raced out by another runner when claiming the previously
427  // selected row. The use of job_random should minimize this problem, however.
428  if ( !$dbw->affectedRows() ) {
429  $row = false; // raced out
430  }
431  } while ( !$row );
432 
433  return $row;
434  }
435 
442  protected function claimOldest( $uuid ) {
443  $dbw = $this->getPrimaryDB();
445  $scope = $this->getScopedNoTrxFlag( $dbw );
446 
447  $row = false; // the row acquired
448  do {
449  if ( $dbw->getType() === 'mysql' ) {
450  // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
451  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
452  // Postgres has no such limitation. However, MySQL offers an
453  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
454  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
455  "SET " .
456  "job_token = {$dbw->addQuotes( $uuid ) }, " .
457  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
458  "job_attempts = job_attempts+1 " .
459  "WHERE ( " .
460  "job_cmd = {$dbw->addQuotes( $this->type )} " .
461  "AND job_token = {$dbw->addQuotes( '' )} " .
462  ") ORDER BY job_id ASC LIMIT 1",
463  __METHOD__
464  );
465  } else {
466  // Use a subquery to find the job, within an UPDATE to claim it.
467  // This uses as much of the DB wrapper functions as possible.
468  $qb = $dbw->newSelectQueryBuilder()
469  ->select( 'job_id' )
470  ->from( 'job' )
471  ->where( [ 'job_cmd' => $this->type, 'job_token' => '' ] )
472  ->orderBy( 'job_id', SelectQueryBuilder::SORT_ASC )
473  ->limit( 1 );
474 
475  $dbw->newUpdateQueryBuilder()
476  ->update( 'job' )
477  ->set( [
478  'job_token' => $uuid,
479  'job_token_timestamp' => $dbw->timestamp(),
480  'job_attempts = job_attempts+1'
481  ] )
482  ->where( [ 'job_id = (' . $qb->getSQL() . ')' ] )
483  ->caller( __METHOD__ )->execute();
484  }
485 
486  if ( !$dbw->affectedRows() ) {
487  break;
488  }
489 
490  // Fetch any row that we just reserved...
491  $row = $dbw->newSelectQueryBuilder()
492  ->select( self::selectFields() )
493  ->from( 'job' )
494  ->where( [ 'job_cmd' => $this->type, 'job_token' => $uuid ] )
495  ->caller( __METHOD__ )->fetchRow();
496  if ( !$row ) { // raced out by duplicate job removal
497  wfDebug( "Row deleted as duplicate by another process." );
498  }
499  } while ( !$row );
500 
501  return $row;
502  }
503 
509  protected function doAck( RunnableJob $job ) {
510  $id = $job->getMetadata( 'id' );
511  if ( $id === null ) {
512  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
513  }
514 
515  $dbw = $this->getPrimaryDB();
517  $scope = $this->getScopedNoTrxFlag( $dbw );
518  try {
519  // Delete a row with a single DELETE without holding row locks over RTTs...
520  $dbw->newDeleteQueryBuilder()
521  ->deleteFrom( 'job' )
522  ->where( [ 'job_cmd' => $this->type, 'job_id' => $id ] )
523  ->caller( __METHOD__ )->execute();
524 
525  $this->incrStats( 'acks', $this->type );
526  } catch ( DBError $e ) {
527  throw $this->getDBException( $e );
528  }
529  }
530 
538  // Callers should call JobQueueGroup::push() before this method so that if the
539  // insert fails, the de-duplication registration will be aborted. Since the insert
540  // is deferred till "transaction idle", do the same here, so that the ordering is
541  // maintained. Having only the de-duplication registration succeed would cause
542  // jobs to become no-ops without any actual jobs that made them redundant.
543  $dbw = $this->getPrimaryDB();
545  $scope = $this->getScopedNoTrxFlag( $dbw );
546  $dbw->onTransactionCommitOrIdle(
547  function () use ( $job ) {
548  parent::doDeduplicateRootJob( $job );
549  },
550  __METHOD__
551  );
552 
553  return true;
554  }
555 
560  protected function doDelete() {
561  $dbw = $this->getPrimaryDB();
563  $scope = $this->getScopedNoTrxFlag( $dbw );
564  try {
565  $dbw->newDeleteQueryBuilder()
566  ->deleteFrom( 'job' )
567  ->where( [ 'job_cmd' => $this->type ] )
568  ->caller( __METHOD__ )->execute();
569  } catch ( DBError $e ) {
570  throw $this->getDBException( $e );
571  }
572 
573  return true;
574  }
575 
580  protected function doWaitForBackups() {
581  if ( $this->server ) {
582  return; // not using LBFactory instance
583  }
584 
585  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
586  $lbFactory->waitForReplication();
587  }
588 
592  protected function doFlushCaches() {
593  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
594  $this->wanCache->delete( $this->getCacheKey( $type ) );
595  }
596  }
597 
602  public function getAllQueuedJobs() {
603  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
604  }
605 
610  public function getAllAcquiredJobs() {
611  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
612  }
613 
618  public function getAllAbandonedJobs() {
619  return $this->getJobIterator( [
620  'job_cmd' => $this->getType(),
621  "job_token > ''",
622  "job_attempts >= " . intval( $this->maxTries )
623  ] );
624  }
625 
630  protected function getJobIterator( array $conds ) {
631  $dbr = $this->getReplicaDB();
633  $scope = $this->getScopedNoTrxFlag( $dbr );
634  $qb = $dbr->newSelectQueryBuilder()
635  ->select( self::selectFields() )
636  ->from( 'job' )
637  ->where( $conds );
638  try {
639  return new MappedIterator(
640  $qb->caller( __METHOD__ )->fetchResultSet(),
641  function ( $row ) {
642  return $this->jobFromRow( $row );
643  }
644  );
645  } catch ( DBError $e ) {
646  throw $this->getDBException( $e );
647  }
648  }
649 
650  public function getCoalesceLocationInternal() {
651  if ( $this->server ) {
652  return null; // not using the LBFactory instance
653  }
654 
655  return is_string( $this->cluster )
656  ? "DBCluster:{$this->cluster}:{$this->domain}"
657  : "LBFactory:{$this->domain}";
658  }
659 
660  protected function doGetSiblingQueuesWithJobs( array $types ) {
661  $dbr = $this->getReplicaDB();
663  $scope = $this->getScopedNoTrxFlag( $dbr );
664  // @note: this does not check whether the jobs are claimed or not.
665  // This is useful so JobQueueGroup::pop() also sees queues that only
666  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
667  // failed jobs so that they can be popped again for that edge case.
668  $res = $dbr->newSelectQueryBuilder()
669  ->select( 'job_cmd' )
670  ->distinct()
671  ->from( 'job' )
672  ->where( [ 'job_cmd' => $types ] )
673  ->caller( __METHOD__ )->fetchResultSet();
674 
675  $types = [];
676  foreach ( $res as $row ) {
677  $types[] = $row->job_cmd;
678  }
679 
680  return $types;
681  }
682 
683  protected function doGetSiblingQueueSizes( array $types ) {
684  $dbr = $this->getReplicaDB();
686  $scope = $this->getScopedNoTrxFlag( $dbr );
687 
688  $res = $dbr->newSelectQueryBuilder()
689  ->select( [ 'job_cmd', 'count' => 'COUNT(*)' ] )
690  ->from( 'job' )
691  ->where( [ 'job_cmd' => $types ] )
692  ->groupBy( 'job_cmd' )
693  ->caller( __METHOD__ )->fetchResultSet();
694 
695  $sizes = [];
696  foreach ( $res as $row ) {
697  $sizes[$row->job_cmd] = (int)$row->count;
698  }
699 
700  return $sizes;
701  }
702 
708  public function recycleAndDeleteStaleJobs() {
709  $now = time();
710  $count = 0; // affected rows
711  $dbw = $this->getPrimaryDB();
713  $scope = $this->getScopedNoTrxFlag( $dbw );
714 
715  try {
716  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
717  return $count; // already in progress
718  }
719 
720  // Remove claims on jobs acquired for too long if enabled...
721  if ( $this->claimTTL > 0 ) {
722  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
723  // Get the IDs of jobs that have be claimed but not finished after too long.
724  // These jobs can be recycled into the queue by expiring the claim. Selecting
725  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
726  $res = $dbw->newSelectQueryBuilder()
727  ->select( 'job_id' )
728  ->from( 'job' )
729  ->where(
730  [
731  'job_cmd' => $this->type,
732  "job_token != {$dbw->addQuotes( '' )}", // was acquired
733  $dbw->buildComparison( '<', [ 'job_token_timestamp' => $claimCutoff ] ), // stale
734  $dbw->buildComparison( '<', [ 'job_attempts' => $this->maxTries ] ), // retries left
735  ]
736  )
737  ->caller( __METHOD__ )->fetchResultSet();
738  $ids = array_map(
739  static function ( $o ) {
740  return $o->job_id;
741  }, iterator_to_array( $res )
742  );
743  if ( count( $ids ) ) {
744  // Reset job_token for these jobs so that other runners will pick them up.
745  // Set the timestamp to the current time, as it is useful to now that the job
746  // was already tried before (the timestamp becomes the "released" time).
747  $dbw->newUpdateQueryBuilder()
748  ->update( 'job' )
749  ->set( [
750  'job_token' => '',
751  'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
752  ] )
753  ->where( [
754  'job_id' => $ids,
755  "job_token != {$dbw->addQuotes( '' )}"
756  ] )
757  ->caller( __METHOD__ )->execute();
758 
759  $affected = $dbw->affectedRows();
760  $count += $affected;
761  $this->incrStats( 'recycles', $this->type, $affected );
762  }
763  }
764 
765  // Just destroy any stale jobs...
766  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
767  $qb = $dbw->newSelectQueryBuilder()
768  ->select( 'job_id' )
769  ->from( 'job' )
770  ->where(
771  [
772  'job_cmd' => $this->type,
773  "job_token != {$dbw->addQuotes( '' )}", // was acquired
774  $dbw->buildComparison( '<', [ 'job_token_timestamp' => $pruneCutoff ] ) // stale
775  ]
776  );
777  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
778  $qb->andWhere( "job_attempts >= {$dbw->addQuotes( $this->maxTries )}" );
779  }
780  // Get the IDs of jobs that are considered stale and should be removed. Selecting
781  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
782  $res = $qb->caller( __METHOD__ )->fetchResultSet();
783  $ids = array_map(
784  static function ( $o ) {
785  return $o->job_id;
786  }, iterator_to_array( $res )
787  );
788  if ( count( $ids ) ) {
789  $dbw->newDeleteQueryBuilder()
790  ->deleteFrom( 'job' )
791  ->where( [ 'job_id' => $ids ] )
792  ->caller( __METHOD__ )->execute();
793  $affected = $dbw->affectedRows();
794  $count += $affected;
795  $this->incrStats( 'abandons', $this->type, $affected );
796  }
797 
798  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
799  } catch ( DBError $e ) {
800  throw $this->getDBException( $e );
801  }
802 
803  return $count;
804  }
805 
811  protected function insertFields( IJobSpecification $job, IDatabase $db ) {
812  return [
813  // Fields that describe the nature of the job
814  'job_cmd' => $job->getType(),
815  'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
816  'job_title' => $job->getParams()['title'] ?? '',
817  'job_params' => self::makeBlob( $job->getParams() ),
818  // Additional job metadata
819  'job_timestamp' => $db->timestamp(),
820  'job_sha1' => Wikimedia\base_convert(
821  sha1( serialize( $job->getDeduplicationInfo() ) ),
822  16, 36, 31
823  ),
824  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
825  ];
826  }
827 
832  protected function getReplicaDB() {
833  try {
834  return $this->getDB( DB_REPLICA );
835  } catch ( DBConnectionError $e ) {
836  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
837  }
838  }
839 
845  protected function getPrimaryDB() {
846  try {
847  return $this->getDB( DB_PRIMARY );
848  } catch ( DBConnectionError $e ) {
849  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
850  }
851  }
852 
857  protected function getDB( $index ) {
858  if ( $this->server ) {
859  if ( $this->conn instanceof IDatabase ) {
860  return $this->conn;
861  } elseif ( $this->conn instanceof DBError ) {
862  throw $this->conn;
863  }
864 
865  try {
866  $this->conn = MediaWikiServices::getInstance()->getDatabaseFactory()->create(
867  $this->server['type'],
868  $this->server
869  );
870  } catch ( DBError $e ) {
871  $this->conn = $e;
872  throw $e;
873  }
874 
875  return $this->conn;
876  } else {
877  $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
878  $lb = is_string( $this->cluster )
879  ? $lbFactory->getExternalLB( $this->cluster )
880  : $lbFactory->getMainLB( $this->domain );
881 
882  if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
883  // Keep a separate connection to avoid contention and deadlocks;
884  // However, SQLite has the opposite behavior due to DB-level locking.
885  $flags = $lb::CONN_TRX_AUTOCOMMIT;
886  } else {
887  // Jobs insertion will be deferred until the PRESEND stage to reduce contention.
888  $flags = 0;
889  }
890 
891  return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
892  }
893  }
894 
899  private function getScopedNoTrxFlag( IDatabase $db ) {
900  $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
901  $db->clearFlag( DBO_TRX ); // make each query its own transaction
902 
903  return new ScopedCallback( static function () use ( $db, $autoTrx ) {
904  if ( $autoTrx ) {
905  $db->setFlag( DBO_TRX ); // restore old setting
906  }
907  } );
908  }
909 
914  private function getCacheKey( $property ) {
915  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
916 
917  return $this->wanCache->makeGlobalKey(
918  'jobqueue',
919  $this->domain,
920  $cluster,
921  $this->type,
922  $property
923  );
924  }
925 
930  protected static function makeBlob( $params ) {
931  if ( $params !== false ) {
932  return serialize( $params );
933  } else {
934  return '';
935  }
936  }
937 
942  protected function jobFromRow( $row ) {
943  $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
944  if ( !is_array( $params ) ) { // this shouldn't happen
945  throw new UnexpectedValueException(
946  "Could not unserialize job with ID '{$row->job_id}'." );
947  }
948 
949  $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
950  $job = $this->factoryJob( $row->job_cmd, $params );
951  $job->setMetadata( 'id', $row->job_id );
952  $job->setMetadata( 'timestamp', $row->job_timestamp );
953 
954  return $job;
955  }
956 
961  protected function getDBException( DBError $e ) {
962  return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
963  }
964 
970  public static function selectFields() {
971  return [
972  'job_id',
973  'job_cmd',
974  'job_namespace',
975  'job_title',
976  'job_timestamp',
977  'job_params',
978  'job_random',
979  'job_attempts',
980  'job_token',
981  'job_token_timestamp',
982  'job_sha1',
983  ];
984  }
985 }
const NS_SPECIAL
Definition: Defines.php:53
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:36
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:442
doAck(RunnableJob $job)
Definition: JobQueueDB.php:509
supportedOrders()
Get the allowed queue orders for configuration validation.
Definition: JobQueueDB.php:69
getAllAcquiredJobs()
Definition: JobQueueDB.php:610
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:683
insertFields(IJobSpecification $job, IDatabase $db)
Definition: JobQueueDB.php:811
doWaitForBackups()
Definition: JobQueueDB.php:580
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:59
getDBException(DBError $e)
Definition: JobQueueDB.php:961
getAllAbandonedJobs()
Definition: JobQueueDB.php:618
jobFromRow( $row)
Definition: JobQueueDB.php:942
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:206
static makeBlob( $params)
Definition: JobQueueDB.php:930
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:350
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:660
doGetAcquiredCount()
Definition: JobQueueDB.php:131
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:708
doGetAbandonedCount()
Definition: JobQueueDB.php:164
getAllQueuedJobs()
Definition: JobQueueDB.php:602
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:239
optimalOrder()
Get the default queue order to use if configuration does not specify one.
Definition: JobQueueDB.php:73
getDB( $index)
Definition: JobQueueDB.php:857
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
Definition: JobQueueDB.php:48
IMaintainableDatabase DBError null $conn
Definition: JobQueueDB.php:43
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueueDB.php:650
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:537
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:970
getJobIterator(array $conds)
Definition: JobQueueDB.php:630
array null $server
Server configuration array.
Definition: JobQueueDB.php:46
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:38
incrStats( $key, $type, $delta=1)
Call StatsdDataFactoryInterface::updateCount() for the queue overall and for the queue type.
Definition: JobQueue.php:770
string $type
Job type.
Definition: JobQueue.php:42
factoryJob( $command, $params)
Definition: JobQueue.php:736
MediaWiki exception.
Definition: MWException.php:33
Convenience class for generating iterators from iterators.
Service locator for MediaWiki core services.
Database error base class.
Definition: DBError.php:37
Build SELECT queries with a fluent interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
setFlag( $flag, $remember=self::REMEMBER_NOTHING)
Set a flag for this connection.
clearFlag( $flag, $remember=self::REMEMBER_NOTHING)
Clear a flag for this connection.
getFlag( $flag)
Returns a boolean whether the flag $flag is set for this connection.
Basic database interface for live and lazy-loaded relation database handles.
Definition: IDatabase.php:36
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
newInsertQueryBuilder()
Get an InsertQueryBuilder bound to this connection.
Advanced database interface for IDatabase handles that include maintenance methods.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
const DB_REPLICA
Definition: defines.php:26
const DB_PRIMARY
Definition: defines.php:28
const DBO_TRX
Definition: defines.php:12
if(count( $args)< 1) $job