MediaWiki  master
JobQueueTest.php
Go to the documentation of this file.
1 <?php
2 
4 
11  protected $key;
13 
14  function __construct( $name = null, array $data = [], $dataName = '' ) {
15  parent::__construct( $name, $data, $dataName );
16 
17  $this->tablesUsed[] = 'job';
18  }
19 
20  protected function setUp() {
21  global $wgJobTypeConf;
22  parent::setUp();
23 
24  if ( $this->getCliArg( 'use-jobqueue' ) ) {
25  $name = $this->getCliArg( 'use-jobqueue' );
26  if ( !isset( $wgJobTypeConf[$name] ) ) {
27  throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
28  }
29  $baseConfig = $wgJobTypeConf[$name];
30  } else {
31  $baseConfig = [ 'class' => JobQueueDBSingle::class ];
32  }
33  $baseConfig['type'] = 'null';
34  $baseConfig['domain'] = WikiMap::getCurrentWikiDbDomain()->getId();
35  $baseConfig['stash'] = new HashBagOStuff();
36  $baseConfig['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] );
37  $variants = [
38  'queueRand' => [ 'order' => 'random', 'claimTTL' => 0 ],
39  'queueRandTTL' => [ 'order' => 'random', 'claimTTL' => 10 ],
40  'queueTimestamp' => [ 'order' => 'timestamp', 'claimTTL' => 0 ],
41  'queueTimestampTTL' => [ 'order' => 'timestamp', 'claimTTL' => 10 ],
42  'queueFifo' => [ 'order' => 'fifo', 'claimTTL' => 0 ],
43  'queueFifoTTL' => [ 'order' => 'fifo', 'claimTTL' => 10 ],
44  ];
45  foreach ( $variants as $q => $settings ) {
46  try {
47  $this->$q = JobQueue::factory( $settings + $baseConfig );
48  } catch ( MWException $e ) {
49  // unsupported?
50  // @todo What if it was another error?
51  }
52  }
53  }
54 
55  protected function tearDown() {
56  parent::tearDown();
57  foreach (
58  [
59  'queueRand', 'queueRandTTL', 'queueTimestamp', 'queueTimestampTTL',
60  'queueFifo', 'queueFifoTTL'
61  ] as $q
62  ) {
63  if ( $this->$q ) {
64  $this->$q->delete();
65  }
66  $this->$q = null;
67  }
68  }
69 
74  public function testGetWiki( $queue, $recycles, $desc ) {
75  $queue = $this->$queue;
76  if ( !$queue ) {
77  $this->markTestSkipped( $desc );
78  }
79  $this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" );
80  $this->assertEquals(
82  $queue->getDomain(),
83  "Proper wiki ID ($desc)" );
84  }
85 
90  public function testGetType( $queue, $recycles, $desc ) {
91  $queue = $this->$queue;
92  if ( !$queue ) {
93  $this->markTestSkipped( $desc );
94  }
95  $this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" );
96  }
97 
102  public function testBasicOperations( $queue, $recycles, $desc ) {
103  $queue = $this->$queue;
104  if ( !$queue ) {
105  $this->markTestSkipped( $desc );
106  }
107 
108  $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
109 
110  $queue->flushCaches();
111  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
112  $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
113 
114  $this->assertNull( $queue->push( $this->newJob() ), "Push worked ($desc)" );
115  $this->assertNull( $queue->batchPush( [ $this->newJob() ] ), "Push worked ($desc)" );
116 
117  $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
118 
119  $queue->flushCaches();
120  $this->assertEquals( 2, $queue->getSize(), "Queue size is correct ($desc)" );
121  $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
122  $jobs = iterator_to_array( $queue->getAllQueuedJobs() );
123  $this->assertEquals( 2, count( $jobs ), "Queue iterator size is correct ($desc)" );
124 
125  $job1 = $queue->pop();
126  $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
127 
128  $queue->flushCaches();
129  $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
130 
131  $queue->flushCaches();
132  if ( $recycles ) {
133  $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
134  }
135 
136  $job2 = $queue->pop();
137  $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
138  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
139 
140  $queue->flushCaches();
141  if ( $recycles ) {
142  $this->assertEquals( 2, $queue->getAcquiredCount(), "Active job count ($desc)" );
143  }
144 
145  $queue->ack( $job1 );
146 
147  $queue->flushCaches();
148  if ( $recycles ) {
149  $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
150  }
151 
152  $queue->ack( $job2 );
153 
154  $queue->flushCaches();
155  $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
156 
157  $this->assertNull( $queue->batchPush( [ $this->newJob(), $this->newJob() ] ),
158  "Push worked ($desc)" );
159  $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
160 
161  $queue->delete();
162  $queue->flushCaches();
163  $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
164  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
165  }
166 
171  public function testBasicDeduplication( $queue, $recycles, $desc ) {
172  $queue = $this->$queue;
173  if ( !$queue ) {
174  $this->markTestSkipped( $desc );
175  }
176 
177  $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
178 
179  $queue->flushCaches();
180  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
181  $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
182 
183  $this->assertNull(
184  $queue->batchPush(
185  [ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
186  ),
187  "Push worked ($desc)" );
188 
189  $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
190 
191  $queue->flushCaches();
192  $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
193  $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
194 
195  $this->assertNull(
196  $queue->batchPush(
197  [ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
198  ),
199  "Push worked ($desc)"
200  );
201 
202  $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
203 
204  $queue->flushCaches();
205  $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
206  $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
207 
208  $job1 = $queue->pop();
209  $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
210 
211  $queue->flushCaches();
212  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
213  if ( $recycles ) {
214  $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
215  }
216 
217  $queue->ack( $job1 );
218 
219  $queue->flushCaches();
220  $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
221  }
222 
227  public function testDeduplicationWhileClaimed( $queue, $recycles, $desc ) {
228  $queue = $this->$queue;
229  if ( !$queue ) {
230  $this->markTestSkipped( $desc );
231  }
232 
233  $job = $this->newDedupedJob();
234  $queue->push( $job );
235 
236  // De-duplication does not apply to already-claimed jobs
237  $j = $queue->pop();
238  $queue->push( $job );
239  $queue->ack( $j );
240 
241  $j = $queue->pop();
242  // Make sure ack() of the twin did not delete the sibling data
243  $this->assertType( NullJob::class, $j );
244  }
245 
250  public function testRootDeduplication( $queue, $recycles, $desc ) {
251  $queue = $this->$queue;
252  if ( !$queue ) {
253  $this->markTestSkipped( $desc );
254  }
255 
256  $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
257 
258  $queue->flushCaches();
259  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
260  $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
261 
262  $root1 = Job::newRootJobParams( "nulljobspam:testId" ); // task ID/timestamp
263  for ( $i = 0; $i < 5; ++$i ) {
264  $this->assertNull( $queue->push( $this->newJob( 0, $root1 ) ), "Push worked ($desc)" );
265  }
266  $queue->deduplicateRootJob( $this->newJob( 0, $root1 ) );
267 
268  $root2 = $root1;
269  # Add a second to UNIX epoch and format back to TS_MW
270  $root2_ts = strtotime( $root2['rootJobTimestamp'] );
271  $root2_ts++;
272  $root2['rootJobTimestamp'] = wfTimestamp( TS_MW, $root2_ts );
273 
274  $this->assertNotEquals( $root1['rootJobTimestamp'], $root2['rootJobTimestamp'],
275  "Root job signatures have different timestamps." );
276  for ( $i = 0; $i < 5; ++$i ) {
277  $this->assertNull( $queue->push( $this->newJob( 0, $root2 ) ), "Push worked ($desc)" );
278  }
279  $queue->deduplicateRootJob( $this->newJob( 0, $root2 ) );
280 
281  $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
282 
283  $queue->flushCaches();
284  $this->assertEquals( 10, $queue->getSize(), "Queue size is correct ($desc)" );
285  $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
286 
287  $dupcount = 0;
288  $jobs = [];
289  do {
290  $job = $queue->pop();
291  if ( $job ) {
292  $jobs[] = $job;
293  $queue->ack( $job );
294  }
295  if ( $job instanceof DuplicateJob ) {
296  ++$dupcount;
297  }
298  } while ( $job );
299 
300  $this->assertEquals( 10, count( $jobs ), "Correct number of jobs popped ($desc)" );
301  $this->assertEquals( 5, $dupcount, "Correct number of duplicate jobs popped ($desc)" );
302  }
303 
308  public function testJobOrder( $queue, $recycles, $desc ) {
309  $queue = $this->$queue;
310  if ( !$queue ) {
311  $this->markTestSkipped( $desc );
312  }
313 
314  $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
315 
316  $queue->flushCaches();
317  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
318  $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
319 
320  for ( $i = 0; $i < 10; ++$i ) {
321  $this->assertNull( $queue->push( $this->newJob( $i ) ), "Push worked ($desc)" );
322  }
323 
324  for ( $i = 0; $i < 10; ++$i ) {
325  $job = $queue->pop();
326  $this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" );
327  $params = $job->getParams();
328  $this->assertEquals( $i, $params['i'], "Job popped from queue is FIFO ($desc)" );
329  $queue->ack( $job );
330  }
331 
332  $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
333 
334  $queue->flushCaches();
335  $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
336  $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
337  }
338 
342  public function testQueueAggregateTable() {
344  if ( !$queue || !method_exists( $queue, 'getServerQueuesWithJobs' ) ) {
345  $this->markTestSkipped();
346  }
347 
348  $this->assertNotContains(
349  [ $queue->getType(), $queue->getWiki() ],
350  $queue->getServerQueuesWithJobs(),
351  "Null queue not in listing"
352  );
353 
354  $queue->push( $this->newJob( 0 ) );
355 
356  $this->assertContains(
357  [ $queue->getType(), $queue->getWiki() ],
358  $queue->getServerQueuesWithJobs(),
359  "Null queue in listing"
360  );
361  }
362 
363  public static function provider_queueLists() {
364  return [
365  [ 'queueRand', false, 'Random queue without ack()' ],
366  [ 'queueRandTTL', true, 'Random queue with ack()' ],
367  [ 'queueTimestamp', false, 'Time ordered queue without ack()' ],
368  [ 'queueTimestampTTL', true, 'Time ordered queue with ack()' ],
369  [ 'queueFifo', false, 'FIFO ordered queue without ack()' ],
370  [ 'queueFifoTTL', true, 'FIFO ordered queue with ack()' ]
371  ];
372  }
373 
374  public static function provider_fifoQueueLists() {
375  return [
376  [ 'queueFifo', false, 'Ordered queue without ack()' ],
377  [ 'queueFifoTTL', true, 'Ordered queue with ack()' ]
378  ];
379  }
380 
381  function newJob( $i = 0, $rootJob = [] ) {
382  return Job::factory( 'null', Title::newMainPage(),
383  [ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 0, 'i' => $i ] + $rootJob );
384  }
385 
386  function newDedupedJob( $i = 0, $rootJob = [] ) {
387  return Job::factory( 'null', Title::newMainPage(),
388  [ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 1, 'i' => $i ] + $rootJob );
389  }
390 }
391 
393  protected function getDB( $index ) {
394  $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
395  // Override to not use CONN_TRX_AUTOCOMMIT so that we see the same temporary `job` table
396  return $lb->getConnection( $index, [], $this->domain );
397  }
398 }
testDeduplicationWhileClaimed( $queue, $recycles, $desc)
provider_queueLists JobQueue
testGetWiki( $queue, $recycles, $desc)
provider_queueLists JobQueue::getWiki
assertType( $type, $actual, $message='')
Asserts the type of the provided value.
static newMainPage(MessageLocalizer $localizer=null)
Create a new Title for the Main Page.
Definition: Title.php:653
static provider_fifoQueueLists()
newJob( $i=0, $rootJob=[])
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException' returning false will NOT prevent logging $e
Definition: hooks.txt:2159
testQueueAggregateTable()
JobQueue.
Class to both describe a background job and handle jobs.
Definition: Job.php:30
JobQueue medium Database.
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency MediaWikiServices
Definition: injection.txt:23
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:35
No-op job that does nothing.
static newRootJobParams( $key)
Get "root job" parameters for a task.
Definition: Job.php:335
testBasicOperations( $queue, $recycles, $desc)
provider_queueLists JobQueue
wfTimestamp( $outputtype=TS_UNIX, $ts=0)
Get a timestamp string in one of various formats.
testRootDeduplication( $queue, $recycles, $desc)
provider_queueLists JobQueue
testBasicDeduplication( $queue, $recycles, $desc)
provider_queueLists JobQueue
$params
this hook is for auditing only or null if authentication failed before getting that far or null if we can t even determine that When $user is not null
Definition: hooks.txt:780
static factory( $command, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:66
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
testJobOrder( $queue, $recycles, $desc)
provider_fifoQueueLists JobQueue
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
testGetType( $queue, $recycles, $desc)
provider_queueLists JobQueue::getType
newDedupedJob( $i=0, $rootJob=[])
static getCurrentWikiDbDomain()
Definition: WikiMap.php:302
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:106
$wgJobTypeConf
Map of job types to configuration arrays.
you have access to all of the normal MediaWiki so you can get a DB use the etc For full docs on the Maintenance class
Definition: maintenance.txt:52
if(count( $args)< 1) $job
Allows to change the fields on the form that will be generated $name
Definition: hooks.txt:271
__construct( $name=null, array $data=[], $dataName='')
static provider_queueLists()