MediaWiki  master
JobQueueMemory.php
Go to the documentation of this file.
1 <?php
31 class JobQueueMemory extends JobQueue {
33  protected static $data = [];
34 
35  public function __construct( array $params ) {
36  $params['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] );
37 
38  parent::__construct( $params );
39  }
40 
47  protected function doBatchPush( array $jobs, $flags ) {
48  $unclaimed =& $this->getQueueData( 'unclaimed', [] );
49 
50  foreach ( $jobs as $job ) {
51  if ( $job->ignoreDuplicates() ) {
52  $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
53  if ( !isset( $unclaimed[$sha1] ) ) {
54  $unclaimed[$sha1] = $job;
55  }
56  } else {
57  $unclaimed[] = $job;
58  }
59  }
60  }
61 
67  protected function supportedOrders() {
68  return [ 'random', 'timestamp', 'fifo' ];
69  }
70 
76  protected function optimalOrder() {
77  return 'fifo';
78  }
79 
85  protected function doIsEmpty() {
86  return ( $this->doGetSize() == 0 );
87  }
88 
94  protected function doGetSize() {
95  $unclaimed = $this->getQueueData( 'unclaimed' );
96 
97  return $unclaimed ? count( $unclaimed ) : 0;
98  }
99 
105  protected function doGetAcquiredCount() {
106  $claimed = $this->getQueueData( 'claimed' );
107 
108  return $claimed ? count( $claimed ) : 0;
109  }
110 
116  protected function doPop() {
117  if ( $this->doGetSize() == 0 ) {
118  return false;
119  }
120 
121  $unclaimed =& $this->getQueueData( 'unclaimed' );
122  $claimed =& $this->getQueueData( 'claimed', [] );
123 
124  if ( $this->order === 'random' ) {
125  $key = array_rand( $unclaimed );
126  } else {
127  $key = array_key_first( $unclaimed );
128  }
129 
130  $spec = $unclaimed[$key];
131  unset( $unclaimed[$key] );
132  $claimed[] = $spec;
133 
134  $job = $this->jobFromSpecInternal( $spec );
135 
136  $job->setMetadata( 'claimId', array_key_last( $claimed ) );
137 
138  return $job;
139  }
140 
146  protected function doAck( RunnableJob $job ) {
147  if ( $this->getAcquiredCount() == 0 ) {
148  return;
149  }
150 
151  $claimed =& $this->getQueueData( 'claimed' );
152  unset( $claimed[$job->getMetadata( 'claimId' )] );
153  }
154 
158  protected function doDelete() {
159  if ( isset( self::$data[$this->type][$this->domain] ) ) {
160  unset( self::$data[$this->type][$this->domain] );
161  if ( !self::$data[$this->type] ) {
162  unset( self::$data[$this->type] );
163  }
164  }
165  }
166 
172  public function getAllQueuedJobs() {
173  $unclaimed = $this->getQueueData( 'unclaimed' );
174  if ( !$unclaimed ) {
175  return new ArrayIterator( [] );
176  }
177 
178  return new MappedIterator(
179  $unclaimed,
180  function ( $value ) {
181  return $this->jobFromSpecInternal( $value );
182  }
183  );
184  }
185 
191  public function getAllAcquiredJobs() {
192  $claimed = $this->getQueueData( 'claimed' );
193  if ( !$claimed ) {
194  return new ArrayIterator( [] );
195  }
196 
197  return new MappedIterator(
198  $claimed,
199  function ( $value ) {
200  return $this->jobFromSpecInternal( $value );
201  }
202  );
203  }
204 
209  public function jobFromSpecInternal( IJobSpecification $spec ) {
210  return $this->factoryJob( $spec->getType(), $spec->getParams() );
211  }
212 
219  private function &getQueueData( $field, $init = null ) {
220  if ( !isset( self::$data[$this->type][$this->domain][$field] ) ) {
221  if ( $init !== null ) {
222  self::$data[$this->type][$this->domain][$field] = $init;
223  } else {
224  return $init;
225  }
226  }
227 
228  return self::$data[$this->type][$this->domain][$field];
229  }
230 }
Simple store for keeping values in an associative array for the current process.
Class to handle job queues stored in PHP memory for testing.
doDelete()
Stability: stableto override JobQueue::delete()
__construct(array $params)
doBatchPush(array $jobs, $flags)
static array[] $data
doAck(RunnableJob $job)
jobFromSpecInternal(IJobSpecification $spec)
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:39
string $type
Job type.
Definition: JobQueue.php:43
factoryJob( $command, $params)
Definition: JobQueue.php:745
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:282
string $domain
DB domain ID.
Definition: JobQueue.php:41
Convenience class for generating iterators from iterators.
Multi-datacenter aware caching interface.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack()
Definition: RunnableJob.php:37
if(count( $args)< 1) $job