Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 141
0.00% covered (danger)
0.00%
0 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueueGroup
0.00% covered (danger)
0.00%
0 / 141
0.00% covered (danger)
0.00%
0 / 15
3422
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
2
 get
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
 push
0.00% covered (danger)
0.00%
0 / 31
0.00% covered (danger)
0.00%
0 / 1
132
 lazyPush
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
20
 pop
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
132
 ack
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 deduplicateRootJob
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 waitForBackups
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getQueueTypes
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getDefaultQueueTypes
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 queuesHaveJobs
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
20
 getQueuesWithJobs
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
30
 getQueueSizes
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 getCoalescedQueues
0.00% covered (danger)
0.00%
0 / 20
0.00% covered (danger)
0.00%
0 / 1
30
 assertValidJobs
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
20
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21use MediaWiki\Deferred\DeferredUpdates;
22use MediaWiki\Deferred\JobQueueEnqueueUpdate;
23use Wikimedia\Rdbms\ReadOnlyMode;
24use Wikimedia\UUID\GlobalIdGenerator;
25
26/**
27 * Handle enqueueing of background jobs.
28 *
29 * @warning This service class supports queuing jobs to foreign wikis via JobQueueGroupFactory,
30 * but other operations may be called for the local wiki only. Exceptions may be thrown if you
31 * attempt to inspect, pop, or execute a foreign wiki's job queue.
32 *
33 * @since 1.21
34 * @ingroup JobQueue
35 */
36class JobQueueGroup {
37    /** @var MapCacheLRU */
38    protected $cache;
39
40    /** @var string Wiki domain ID */
41    protected $domain;
42    /** @var ReadOnlyMode Read only mode */
43    protected $readOnlyMode;
44    /** @var array|null */
45    private $localJobClasses;
46    /** @var array */
47    private $jobTypeConfiguration;
48    /** @var array */
49    private $jobTypesExcludedFromDefaultQueue;
50    /** @var IBufferingStatsdDataFactory */
51    private $statsdDataFactory;
52    /** @var WANObjectCache */
53    private $wanCache;
54    /** @var GlobalIdGenerator */
55    private $globalIdGenerator;
56
57    /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
58    protected $coalescedQueues;
59
60    public const TYPE_DEFAULT = 1; // integer; jobs popped by default
61    private const TYPE_ANY = 2; // integer; any job
62
63    public const USE_CACHE = 1; // integer; use process or persistent cache
64
65    private const PROC_CACHE_TTL = 15; // integer; seconds
66
67    /**
68     * @internal Use MediaWikiServices::getJobQueueGroupFactory
69     *
70     * @param string $domain Wiki domain ID
71     * @param ReadOnlyMode $readOnlyMode Read-only mode
72     * @param array|null $localJobClasses
73     * @param array $jobTypeConfiguration
74     * @param array $jobTypesExcludedFromDefaultQueue
75     * @param IBufferingStatsdDataFactory $statsdDataFactory
76     * @param WANObjectCache $wanCache
77     * @param GlobalIdGenerator $globalIdGenerator
78     */
79    public function __construct(
80        $domain,
81        ReadOnlyMode $readOnlyMode,
82        ?array $localJobClasses,
83        array $jobTypeConfiguration,
84        array $jobTypesExcludedFromDefaultQueue,
85        IBufferingStatsdDataFactory $statsdDataFactory,
86        WANObjectCache $wanCache,
87        GlobalIdGenerator $globalIdGenerator
88    ) {
89        $this->domain = $domain;
90        $this->readOnlyMode = $readOnlyMode;
91        $this->cache = new MapCacheLRU( 10 );
92        $this->localJobClasses = $localJobClasses;
93        $this->jobTypeConfiguration = $jobTypeConfiguration;
94        $this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
95        $this->statsdDataFactory = $statsdDataFactory;
96        $this->wanCache = $wanCache;
97        $this->globalIdGenerator = $globalIdGenerator;
98    }
99
100    /**
101     * Get the job queue object for a given queue type
102     *
103     * @param string $type
104     * @return JobQueue
105     */
106    public function get( $type ) {
107        $conf = [ 'domain' => $this->domain, 'type' => $type ];
108        $conf += $this->jobTypeConfiguration[$type] ?? $this->jobTypeConfiguration['default'];
109        if ( !isset( $conf['readOnlyReason'] ) ) {
110            $conf['readOnlyReason'] = $this->readOnlyMode->getConfiguredReason();
111        }
112
113        $conf['stats'] = $this->statsdDataFactory;
114        $conf['wanCache'] = $this->wanCache;
115        $conf['idGenerator'] = $this->globalIdGenerator;
116
117        return JobQueue::factory( $conf );
118    }
119
120    /**
121     * Insert jobs into the respective queues of which they belong
122     *
123     * This inserts the jobs into the queue specified by $wgJobTypeConf
124     * and updates the aggregate job queue information cache as needed.
125     *
126     * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
127     * @return void
128     */
129    public function push( $jobs ) {
130        $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
131        if ( $jobs === [] ) {
132            return;
133        }
134
135        $this->assertValidJobs( $jobs );
136
137        $jobsByType = []; // (job type => list of jobs)
138        foreach ( $jobs as $job ) {
139            $type = $job->getType();
140            if ( isset( $this->jobTypeConfiguration[$type] ) ) {
141                $jobsByType[$type][] = $job;
142            } else {
143                if (
144                    isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
145                    $this->jobTypeConfiguration['default']['typeAgnostic']
146                ) {
147                    $jobsByType['default'][] = $job;
148                } else {
149                    $jobsByType[$type][] = $job;
150                }
151            }
152        }
153
154        foreach ( $jobsByType as $type => $jobs ) {
155            $this->get( $type )->push( $jobs );
156        }
157
158        if ( $this->cache->hasField( 'queues-ready', 'list' ) ) {
159            $list = $this->cache->getField( 'queues-ready', 'list' );
160            if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
161                $this->cache->clear( 'queues-ready' );
162            }
163        }
164
165        $cache = ObjectCache::getLocalClusterInstance();
166        $cache->set(
167            $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
168            'true',
169            15
170        );
171        if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
172            $cache->set(
173                $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
174                'true',
175                15
176            );
177        }
178    }
179
180    /**
181     * Buffer jobs for insertion via push() or call it now if in CLI mode
182     *
183     * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
184     * @return void
185     * @since 1.26
186     */
187    public function lazyPush( $jobs ) {
188        if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
189            $this->push( $jobs );
190            return;
191        }
192
193        $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
194
195        // Throw errors now instead of on push(), when other jobs may be buffered
196        $this->assertValidJobs( $jobs );
197
198        DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
199    }
200
201    /**
202     * Pop one job off a job queue
203     *
204     * @warning May not be called on foreign wikis!
205     *
206     * This pops a job off a queue as specified by $wgJobTypeConf and
207     * updates the aggregate job queue information cache as needed.
208     *
209     * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string
210     * @param int $flags Bitfield of JobQueueGroup::USE_* constants
211     * @param array $ignored List of job types to ignore
212     * @return RunnableJob|false Returns false on failure
213     * @throws JobQueueError
214     */
215    public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
216        $job = false;
217
218        if ( !$this->localJobClasses ) {
219            throw new JobQueueError(
220                "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
221        }
222        if ( is_string( $qtype ) && !isset( $this->localJobClasses[$qtype] ) ) {
223            // Do not pop jobs if there is no class for the queue type
224            throw new JobQueueError( "Unrecognized job type '$qtype'." );
225        }
226
227        if ( is_string( $qtype ) ) { // specific job type
228            if ( !in_array( $qtype, $ignored ) ) {
229                $job = $this->get( $qtype )->pop();
230            }
231        } else { // any job in the "default" jobs types
232            if ( $flags & self::USE_CACHE ) {
233                if ( !$this->cache->hasField( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
234                    $this->cache->setField( 'queues-ready', 'list', $this->getQueuesWithJobs() );
235                }
236                $types = $this->cache->getField( 'queues-ready', 'list' );
237            } else {
238                $types = $this->getQueuesWithJobs();
239            }
240
241            if ( $qtype == self::TYPE_DEFAULT ) {
242                $types = array_intersect( $types, $this->getDefaultQueueTypes() );
243            }
244
245            $types = array_diff( $types, $ignored ); // avoid selected types
246            shuffle( $types ); // avoid starvation
247
248            foreach ( $types as $type ) { // for each queue...
249                $job = $this->get( $type )->pop();
250                if ( $job ) { // found
251                    break;
252                } else { // not found
253                    $this->cache->clear( 'queues-ready' );
254                }
255            }
256        }
257
258        return $job;
259    }
260
261    /**
262     * Acknowledge that a job was completed
263     *
264     * @param RunnableJob $job
265     * @return void
266     */
267    public function ack( RunnableJob $job ) {
268        $this->get( $job->getType() )->ack( $job );
269    }
270
271    /**
272     * Register the "root job" of a given job into the queue for de-duplication.
273     * This should only be called right *after* all the new jobs have been inserted.
274     *
275     * @deprecated since 1.40
276     * @param RunnableJob $job
277     * @return bool
278     */
279    public function deduplicateRootJob( RunnableJob $job ) {
280        wfDeprecated( __METHOD__, '1.40' );
281        return true;
282    }
283
284    /**
285     * Wait for any replica DBs or backup queue servers to catch up.
286     *
287     * This does nothing for certain queue classes.
288     *
289     * @deprecated since 1.41, use JobQueue::waitForBackups() instead.
290     *
291     * @return void
292     */
293    public function waitForBackups() {
294        wfDeprecated( __METHOD__, '1.41' );
295        // Try to avoid doing this more than once per queue storage medium
296        foreach ( $this->jobTypeConfiguration as $type => $conf ) {
297            $this->get( $type )->waitForBackups();
298        }
299    }
300
301    /**
302     * Get the list of queue types
303     *
304     * @warning May not be called on foreign wikis!
305     *
306     * @return string[]
307     */
308    public function getQueueTypes() {
309        if ( !$this->localJobClasses ) {
310            throw new JobQueueError( 'Cannot inspect job queue from foreign wiki' );
311        }
312        return array_keys( $this->localJobClasses );
313    }
314
315    /**
316     * Get the list of default queue types
317     *
318     * @warning May not be called on foreign wikis!
319     *
320     * @return string[]
321     */
322    public function getDefaultQueueTypes() {
323        return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
324    }
325
326    /**
327     * Check if there are any queues with jobs (this is cached)
328     *
329     * @warning May not be called on foreign wikis!
330     *
331     * @since 1.23
332     * @param int $type JobQueueGroup::TYPE_* constant
333     * @return bool
334     */
335    public function queuesHaveJobs( $type = self::TYPE_ANY ) {
336        $cache = ObjectCache::getLocalClusterInstance();
337        $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
338
339        $value = $cache->get( $key );
340        if ( $value === false ) {
341            $queues = $this->getQueuesWithJobs();
342            if ( $type == self::TYPE_DEFAULT ) {
343                $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
344            }
345            $value = count( $queues ) ? 'true' : 'false';
346            $cache->add( $key, $value, 15 );
347        }
348
349        return ( $value === 'true' );
350    }
351
352    /**
353     * Get the list of job types that have non-empty queues
354     *
355     * @warning May not be called on foreign wikis!
356     *
357     * @return string[] List of job types that have non-empty queues
358     */
359    public function getQueuesWithJobs() {
360        $types = [];
361        foreach ( $this->getCoalescedQueues() as $info ) {
362            /** @var JobQueue $queue */
363            $queue = $info['queue'];
364            $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
365            if ( is_array( $nonEmpty ) ) { // batching features supported
366                $types = array_merge( $types, $nonEmpty );
367            } else { // we have to go through the queues in the bucket one-by-one
368                foreach ( $info['types'] as $type ) {
369                    if ( !$this->get( $type )->isEmpty() ) {
370                        $types[] = $type;
371                    }
372                }
373            }
374        }
375
376        return $types;
377    }
378
379    /**
380     * Get the size of the queues for a list of job types
381     *
382     * @warning May not be called on foreign wikis!
383     *
384     * @return int[] Map of (job type => size)
385     */
386    public function getQueueSizes() {
387        $sizeMap = [];
388        foreach ( $this->getCoalescedQueues() as $info ) {
389            /** @var JobQueue $queue */
390            $queue = $info['queue'];
391            $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
392            if ( is_array( $sizes ) ) { // batching features supported
393                $sizeMap += $sizes;
394            } else { // we have to go through the queues in the bucket one-by-one
395                foreach ( $info['types'] as $type ) {
396                    $sizeMap[$type] = $this->get( $type )->getSize();
397                }
398            }
399        }
400
401        return $sizeMap;
402    }
403
404    /**
405     * @return array[]
406     * @phan-return array<string,array{queue:JobQueue,types:array<string,class-string>}>
407     */
408    protected function getCoalescedQueues() {
409        if ( $this->coalescedQueues === null ) {
410            $this->coalescedQueues = [];
411            foreach ( $this->jobTypeConfiguration as $type => $conf ) {
412                $conf['domain'] = $this->domain;
413                $conf['type'] = 'null';
414                $conf['stats'] = $this->statsdDataFactory;
415                $conf['wanCache'] = $this->wanCache;
416                $conf['idGenerator'] = $this->globalIdGenerator;
417
418                $queue = JobQueue::factory( $conf );
419                $loc = $queue->getCoalesceLocationInternal();
420                if ( !isset( $this->coalescedQueues[$loc] ) ) {
421                    $this->coalescedQueues[$loc]['queue'] = $queue;
422                    $this->coalescedQueues[$loc]['types'] = [];
423                }
424                if ( $type === 'default' ) {
425                    $this->coalescedQueues[$loc]['types'] = array_merge(
426                        $this->coalescedQueues[$loc]['types'],
427                        array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
428                    );
429                } else {
430                    $this->coalescedQueues[$loc]['types'][] = $type;
431                }
432            }
433        }
434
435        return $this->coalescedQueues;
436    }
437
438    /**
439     * @param array $jobs
440     * @throws InvalidArgumentException
441     */
442    private function assertValidJobs( array $jobs ) {
443        foreach ( $jobs as $job ) {
444            if ( !( $job instanceof IJobSpecification ) ) {
445                $type = is_object( $job ) ? get_class( $job ) : gettype( $job );
446                throw new InvalidArgumentException( "Expected IJobSpecification objects, got " . $type );
447            }
448        }
449    }
450}