Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 42
0.00% covered (danger)
0.00%
0 / 11
CRAP
0.00% covered (danger)
0.00%
0 / 1
JobQueueEventBus
0.00% covered (danger)
0.00%
0 / 42
0.00% covered (danger)
0.00%
0 / 11
380
0.00% covered (danger)
0.00%
0 / 1
 supportedOrders
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 supportsDelayedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 optimalOrder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doIsEmpty
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doGetSize
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doGetAcquiredCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 supportsTypeAgnostic
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doBatchPush
0.00% covered (danger)
0.00%
0 / 32
0.00% covered (danger)
0.00%
0 / 1
90
 doPop
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doAck
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getAllQueuedJobs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace MediaWiki\Extension\EventBus\Adapters\JobQueue;
4
5use ArrayIterator;
6use IJobSpecification;
7use Iterator;
8use Job;
9use JobQueue;
10use JobQueueError;
11use MediaWiki\Extension\EventBus\EventBus;
12use RunnableJob;
13
14class JobQueueEventBus extends JobQueue {
15    /**
16     * Get the allowed queue orders for configuration validation
17     *
18     * @return array Subset of (random, timestamp, fifo, undefined)
19     */
20    protected function supportedOrders() {
21        return [ 'fifo' ];
22    }
23
24    /**
25     * Find out if delayed jobs are supported for configuration validation
26     *
27     * @return bool Whether delayed jobs are supported
28     */
29    protected function supportsDelayedJobs() {
30        return true;
31    }
32
33    /**
34     * Get the default queue order to use if configuration does not specify one
35     *
36     * @return string One of (random, timestamp, fifo, undefined)
37     */
38    protected function optimalOrder() {
39        return 'fifo';
40    }
41
42    /**
43     * @see JobQueue::isEmpty()
44     * @return bool
45     */
46    protected function doIsEmpty() {
47        // not implemented
48        return false;
49    }
50
51    /**
52     * @see JobQueue::getSize()
53     * @return int
54     */
55    protected function doGetSize() {
56        // not implemented
57        return 0;
58    }
59
60    /**
61     * @see JobQueue::getAcquiredCount()
62     * @return int
63     */
64    protected function doGetAcquiredCount() {
65        // not implemented
66        return 0;
67    }
68
69    /**
70     * @see JobQueue::supportsTypeAgnostic()
71     * @return bool
72     */
73    protected function supportsTypeAgnostic(): bool {
74        return true;
75    }
76
77    /**
78     * @param IJobSpecification[] $jobs
79     * @param int $flags
80     * @throws JobQueueError
81     * @see JobQueue::batchPush()
82     */
83    protected function doBatchPush( array $jobs, $flags ) {
84        $streamEvents = [];
85        $streamBuses = [];
86        $count = 0;
87
88        foreach ( $jobs as $job ) {
89            $stream = 'mediawiki.job.' . $job->getType();
90            if ( !isset( $streamBuses[$stream] ) ) {
91                $streamBuses[$stream] = EventBus::getInstanceForStream( $stream );
92            }
93            $item = $streamBuses[$stream]->getFactory()->createJobEvent(
94                $stream,
95                $this->getDomain(),
96                $job
97            );
98
99            if ( $item === null ) {
100                continue;
101            }
102
103            $count++;
104            // hash identifier => de-duplicate
105            if ( isset( $item['sha1'] ) ) {
106                $streamEvents[$stream][$item['sha1']] = $item;
107            } else {
108                $streamEvents[$stream][$item['meta']['id']] = $item;
109            }
110        }
111
112        if ( !$count ) {
113            // nothing to do
114            return;
115        }
116
117        foreach ( array_keys( $streamEvents ) as $stream ) {
118            $result = $streamBuses[$stream]->send(
119                array_values( $streamEvents[$stream] ),
120                EventBus::TYPE_JOB
121            );
122
123            // This means sending jobs to the $stream has failed.
124            if ( is_array( $result ) || is_string( $result ) ) {
125                // Details of backend failure are logged by EventBus::send().
126                // Details of which job failed is logged here.
127                EventBus::logger()->error( 'Could not enqueue jobs for stream {stream}', [
128                    'stream' => $stream,
129                    'exception' => new JobQueueError( "Could not enqueue jobs" ),
130                    'response' => $result
131                ] );
132                // Avoid fragmenting exception by job or stream name, since backend
133                // issues are generally unrelated to the job (T249745).
134                throw new JobQueueError( "Could not enqueue jobs" );
135            }
136        }
137    }
138
139    /**
140     * @see JobQueue::pop()
141     * @return Job|bool
142     */
143    protected function doPop() {
144        // not implemented
145        return false;
146    }
147
148    /**
149     * @see JobQueue::ack()
150     *
151     * @param RunnableJob $job
152     */
153    protected function doAck( RunnableJob $job ) {
154        // not implemented
155    }
156
157    /**
158     * Get an iterator to traverse over all available jobs in this queue.
159     * This does not include jobs that are currently acquired or delayed.
160     * Note: results may be stale if the queue is concurrently modified.
161     *
162     * @return Iterator
163     * @throws JobQueueError
164     */
165    public function getAllQueuedJobs() {
166        // not implemented
167        return new ArrayIterator( [] );
168    }
169}