Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 42 |
|
0.00% |
0 / 11 |
CRAP | |
0.00% |
0 / 1 |
JobQueueEventBus | |
0.00% |
0 / 42 |
|
0.00% |
0 / 11 |
380 | |
0.00% |
0 / 1 |
supportedOrders | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportsDelayedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
optimalOrder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doIsEmpty | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetSize | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doGetAcquiredCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
supportsTypeAgnostic | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doBatchPush | |
0.00% |
0 / 32 |
|
0.00% |
0 / 1 |
90 | |||
doPop | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doAck | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getAllQueuedJobs | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace MediaWiki\Extension\EventBus\Adapters\JobQueue; |
4 | |
5 | use ArrayIterator; |
6 | use IJobSpecification; |
7 | use Iterator; |
8 | use Job; |
9 | use JobQueue; |
10 | use JobQueueError; |
11 | use MediaWiki\Extension\EventBus\EventBus; |
12 | use RunnableJob; |
13 | |
14 | class JobQueueEventBus extends JobQueue { |
15 | /** |
16 | * Get the allowed queue orders for configuration validation |
17 | * |
18 | * @return array Subset of (random, timestamp, fifo, undefined) |
19 | */ |
20 | protected function supportedOrders() { |
21 | return [ 'fifo' ]; |
22 | } |
23 | |
24 | /** |
25 | * Find out if delayed jobs are supported for configuration validation |
26 | * |
27 | * @return bool Whether delayed jobs are supported |
28 | */ |
29 | protected function supportsDelayedJobs() { |
30 | return true; |
31 | } |
32 | |
33 | /** |
34 | * Get the default queue order to use if configuration does not specify one |
35 | * |
36 | * @return string One of (random, timestamp, fifo, undefined) |
37 | */ |
38 | protected function optimalOrder() { |
39 | return 'fifo'; |
40 | } |
41 | |
42 | /** |
43 | * @see JobQueue::isEmpty() |
44 | * @return bool |
45 | */ |
46 | protected function doIsEmpty() { |
47 | // not implemented |
48 | return false; |
49 | } |
50 | |
51 | /** |
52 | * @see JobQueue::getSize() |
53 | * @return int |
54 | */ |
55 | protected function doGetSize() { |
56 | // not implemented |
57 | return 0; |
58 | } |
59 | |
60 | /** |
61 | * @see JobQueue::getAcquiredCount() |
62 | * @return int |
63 | */ |
64 | protected function doGetAcquiredCount() { |
65 | // not implemented |
66 | return 0; |
67 | } |
68 | |
69 | /** |
70 | * @see JobQueue::supportsTypeAgnostic() |
71 | * @return bool |
72 | */ |
73 | protected function supportsTypeAgnostic(): bool { |
74 | return true; |
75 | } |
76 | |
77 | /** |
78 | * @param IJobSpecification[] $jobs |
79 | * @param int $flags |
80 | * @throws JobQueueError |
81 | * @see JobQueue::batchPush() |
82 | */ |
83 | protected function doBatchPush( array $jobs, $flags ) { |
84 | $streamEvents = []; |
85 | $streamBuses = []; |
86 | $count = 0; |
87 | |
88 | foreach ( $jobs as $job ) { |
89 | $stream = 'mediawiki.job.' . $job->getType(); |
90 | if ( !isset( $streamBuses[$stream] ) ) { |
91 | $streamBuses[$stream] = EventBus::getInstanceForStream( $stream ); |
92 | } |
93 | $item = $streamBuses[$stream]->getFactory()->createJobEvent( |
94 | $stream, |
95 | $this->getDomain(), |
96 | $job |
97 | ); |
98 | |
99 | if ( $item === null ) { |
100 | continue; |
101 | } |
102 | |
103 | $count++; |
104 | // hash identifier => de-duplicate |
105 | if ( isset( $item['sha1'] ) ) { |
106 | $streamEvents[$stream][$item['sha1']] = $item; |
107 | } else { |
108 | $streamEvents[$stream][$item['meta']['id']] = $item; |
109 | } |
110 | } |
111 | |
112 | if ( !$count ) { |
113 | // nothing to do |
114 | return; |
115 | } |
116 | |
117 | foreach ( array_keys( $streamEvents ) as $stream ) { |
118 | $result = $streamBuses[$stream]->send( |
119 | array_values( $streamEvents[$stream] ), |
120 | EventBus::TYPE_JOB |
121 | ); |
122 | |
123 | // This means sending jobs to the $stream has failed. |
124 | if ( is_array( $result ) || is_string( $result ) ) { |
125 | // Details of backend failure are logged by EventBus::send(). |
126 | // Details of which job failed is logged here. |
127 | EventBus::logger()->error( 'Could not enqueue jobs for stream {stream}', [ |
128 | 'stream' => $stream, |
129 | 'exception' => new JobQueueError( "Could not enqueue jobs" ), |
130 | 'invalidresponse' => (object)$result |
131 | ] ); |
132 | // Avoid fragmenting exception by job or stream name, since backend |
133 | // issues are generally unrelated to the job (T249745). |
134 | throw new JobQueueError( "Could not enqueue jobs" ); |
135 | } |
136 | } |
137 | } |
138 | |
139 | /** |
140 | * @see JobQueue::pop() |
141 | * @return Job|bool |
142 | */ |
143 | protected function doPop() { |
144 | // not implemented |
145 | return false; |
146 | } |
147 | |
148 | /** |
149 | * @see JobQueue::ack() |
150 | * |
151 | * @param RunnableJob $job |
152 | */ |
153 | protected function doAck( RunnableJob $job ) { |
154 | // not implemented |
155 | } |
156 | |
157 | /** |
158 | * Get an iterator to traverse over all available jobs in this queue. |
159 | * This does not include jobs that are currently acquired or delayed. |
160 | * Note: results may be stale if the queue is concurrently modified. |
161 | * |
162 | * @return Iterator |
163 | * @throws JobQueueError |
164 | */ |
165 | public function getAllQueuedJobs() { |
166 | // not implemented |
167 | return new ArrayIterator( [] ); |
168 | } |
169 | } |