Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
74.65% |
53 / 71 |
|
40.00% |
2 / 5 |
CRAP | |
0.00% |
0 / 1 |
EventBusFactory | |
74.65% |
53 / 71 |
|
40.00% |
2 / 5 |
17.19 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
18 / 18 |
|
100.00% |
1 / 1 |
1 | |||
getInstance | |
95.45% |
21 / 22 |
|
0.00% |
0 / 1 |
4 | |||
getInstanceForStream | |
100.00% |
14 / 14 |
|
100.00% |
1 / 1 |
3 | |||
isStreamEnabled | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
getEventServiceNameForStream | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or modify |
5 | * it under the terms of the GNU General Public License as published by |
6 | * the Free Software Foundation; either version 2 of the License, or |
7 | * (at your option) any later version. |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License along |
15 | * with this program; if not, write to the Free Software Foundation, Inc., |
16 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * http://www.gnu.org/copyleft/gpl.html |
18 | * |
19 | * @file |
20 | * @author Petr Pchelko |
21 | */ |
22 | |
23 | namespace MediaWiki\Extension\EventBus; |
24 | |
25 | use InvalidArgumentException; |
26 | use MediaWiki\Config\ServiceOptions; |
27 | use MediaWiki\Extension\EventStreamConfig\StreamConfigs; |
28 | use MultiHttpClient; |
29 | use Psr\Log\LoggerInterface; |
30 | |
31 | /** |
32 | * Creates appropriate EventBus instance based on stream config. |
33 | * |
34 | * @package MediaWiki\Extension\EventBus |
35 | * @since 1.35 |
36 | */ |
37 | class EventBusFactory { |
38 | |
39 | public const CONSTRUCTOR_OPTIONS = [ |
40 | 'EventServices', |
41 | 'EventServiceDefault', |
42 | 'EnableEventBus', |
43 | 'EventBusMaxBatchByteSize' |
44 | ]; |
45 | |
46 | /** |
47 | * Key in wgEventStreams['stream_name']['producers'] that contains settings |
48 | * for this MediaWiki EventBus producer. |
49 | */ |
50 | public const EVENT_STREAM_CONFIG_PRODUCER_NAME = 'mediawiki_eventbus'; |
51 | |
52 | /** |
53 | * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] |
54 | * that specifies if the stream is enabled. A stream is 'disabled' only if |
55 | * this setting is explicitly false, or if the stream name |
56 | * does not have an entry in wgEventStreams |
57 | * (and wgEventStreams is an array with other streams configured). |
58 | */ |
59 | public const EVENT_STREAM_CONFIG_ENABLED_SETTING = 'enabled'; |
60 | |
61 | /** |
62 | * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] that specifies |
63 | * the event service name that should be used for a specific stream. |
64 | * This should be a key into $eventServiceConfig, which usually is configured |
65 | * using the EventBus MW config wgEventServices. |
66 | * If not found via StreamConfigs, EventServiceDefault will be used. |
67 | */ |
68 | public const EVENT_STREAM_CONFIG_SERVICE_SETTING = 'event_service_name'; |
69 | |
70 | /** |
71 | * Internal name of an EventBus instance that never sends events. |
72 | * This is used for streams that are disabled or undeclared. |
73 | * This will also be used as the dummy 'url' of that instance. |
74 | * (Public only for testing purposes.) |
75 | */ |
76 | public const EVENT_SERVICE_DISABLED_NAME = '_disabled_eventbus_'; |
77 | |
78 | /** |
79 | * @var array|mixed |
80 | */ |
81 | private array $eventServiceConfig; |
82 | |
83 | /** |
84 | * @var string|mixed |
85 | */ |
86 | private string $eventServiceDefault; |
87 | |
88 | /** |
89 | * @var StreamConfigs|null |
90 | */ |
91 | private ?StreamConfigs $streamConfigs; |
92 | |
93 | /** |
94 | * @var string|mixed |
95 | */ |
96 | private string $enableEventBus; |
97 | |
98 | /** |
99 | * @var int|mixed |
100 | */ |
101 | private int $maxBatchByteSize; |
102 | |
103 | /** |
104 | * @var EventFactory |
105 | */ |
106 | private EventFactory $eventFactory; |
107 | |
108 | /** |
109 | * @var MultiHttpClient |
110 | */ |
111 | private MultiHttpClient $http; |
112 | |
113 | /** |
114 | * @var LoggerInterface |
115 | */ |
116 | private LoggerInterface $logger; |
117 | |
118 | /** |
119 | * @var array |
120 | */ |
121 | private array $eventBusInstances = []; |
122 | |
123 | /** |
124 | * @param ServiceOptions $options |
125 | * @param StreamConfigs|null $streamConfigs |
126 | * @param EventFactory $eventFactory |
127 | * @param MultiHttpClient $http |
128 | * @param LoggerInterface $logger |
129 | */ |
130 | public function __construct( |
131 | ServiceOptions $options, |
132 | ?StreamConfigs $streamConfigs, |
133 | EventFactory $eventFactory, |
134 | MultiHttpClient $http, |
135 | LoggerInterface $logger |
136 | ) { |
137 | $options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS ); |
138 | |
139 | $this->eventServiceConfig = $options->get( 'EventServices' ); |
140 | $this->eventServiceDefault = $options->get( 'EventServiceDefault' ); |
141 | $this->enableEventBus = $options->get( 'EnableEventBus' ); |
142 | $this->maxBatchByteSize = $options->get( 'EventBusMaxBatchByteSize' ); |
143 | $this->streamConfigs = $streamConfigs; |
144 | $this->eventFactory = $eventFactory; |
145 | $this->http = $http; |
146 | $this->logger = $logger; |
147 | |
148 | // Save a 'disabled' non producing EventBus instance that sets |
149 | // the allowed event type to TYPE_NONE. No |
150 | // events sent through this instance will actually be sent to an event service. |
151 | // This is done to allow us to easily 'disable' streams. |
152 | $this->eventBusInstances[self::EVENT_SERVICE_DISABLED_NAME] = new EventBus( |
153 | $this->http, |
154 | EventBus::TYPE_NONE, |
155 | $this->eventFactory, |
156 | self::EVENT_SERVICE_DISABLED_NAME, |
157 | $this->maxBatchByteSize, |
158 | 0, |
159 | false |
160 | ); |
161 | } |
162 | |
163 | /** |
164 | * @param string $eventServiceName |
165 | * The name of a key in the EventServices config looked up via |
166 | * MediaWikiServices::getInstance()->getMainConfig()->get('EventServices'). |
167 | * The EventService config is keyed by service name, and should at least contain |
168 | * a 'url' entry pointing at the event service endpoint events should be |
169 | * POSTed to. They can also optionally contain a 'timeout' entry specifying |
170 | * the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry |
171 | * that can be set to true if the X-Client-IP header from the originating request |
172 | * should be forwarded to the event service. Instances are singletons identified |
173 | * by $eventServiceName. |
174 | * |
175 | * @note Previously, this function took a $config object instead of an |
176 | * event service name. This is a backwards compatible change, but because |
177 | * there are no other users of this extension, we can do this safely. |
178 | * |
179 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
180 | * @return EventBus |
181 | */ |
182 | public function getInstance( string $eventServiceName ): EventBus { |
183 | if ( array_key_exists( $eventServiceName, $this->eventBusInstances ) ) { |
184 | // If eventServiceName has already been instantiated, return it. |
185 | return $this->eventBusInstances[$eventServiceName]; |
186 | } elseif ( |
187 | array_key_exists( $eventServiceName, $this->eventServiceConfig ) && |
188 | array_key_exists( 'url', $this->eventServiceConfig[$eventServiceName] ) |
189 | ) { |
190 | // else, create eventServiceName instance from config |
191 | // and save it in eventBusInstances. |
192 | $eventServiceSettings = $this->eventServiceConfig[$eventServiceName]; |
193 | $url = $eventServiceSettings['url']; |
194 | $timeout = $eventServiceSettings['timeout'] ?? null; |
195 | $forwardXClientIP = $eventServiceSettings['x_client_ip_forwarding_enabled'] ?? false; |
196 | |
197 | $this->eventBusInstances[$eventServiceName] = new EventBus( |
198 | $this->http, |
199 | $this->enableEventBus, |
200 | $this->eventFactory, |
201 | $url, |
202 | $this->maxBatchByteSize, |
203 | $timeout, |
204 | $forwardXClientIP |
205 | ); |
206 | return $this->eventBusInstances[$eventServiceName]; |
207 | } else { |
208 | $error = "Could not get EventBus instance for event service '$eventServiceName'. " . |
209 | 'This event service name must exist in EventServices config with a url setting.'; |
210 | $this->logger->error( $error ); |
211 | throw new InvalidArgumentException( $error ); |
212 | } |
213 | } |
214 | |
215 | /** |
216 | * Gets an EventBus instance for a $stream. |
217 | * |
218 | * If EventStreamConfig is not configured, or if the stream is configured but |
219 | * does not set ['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING], |
220 | * EventServiceDefault will be used. |
221 | * |
222 | * If EventStreamConfig is configured, but the stream is not or the stream has |
223 | * ['producers']['mediawiki_eventbus']['enabled'] = false, this will return |
224 | * a non-producing EventBus instance. |
225 | * |
226 | * @param string $streamName the stream to send an event to |
227 | * @return EventBus |
228 | * @throws InvalidArgumentException |
229 | */ |
230 | public function getInstanceForStream( string $streamName ): EventBus { |
231 | if ( $this->streamConfigs === null ) { |
232 | $eventServiceName = $this->eventServiceDefault; |
233 | } elseif ( !$this->isStreamEnabled( $streamName ) ) { |
234 | // Don't send event if $streamName is explicitly disabled. |
235 | |
236 | $eventServiceName = self::EVENT_SERVICE_DISABLED_NAME; |
237 | $this->logger->debug( |
238 | "Using non-producing EventBus instance for stream $streamName. " . |
239 | 'This stream is either undeclared, or is explicitly disabled.' |
240 | ); |
241 | } else { |
242 | $eventServiceName = $this->getEventServiceNameForStream( $streamName ) ?? |
243 | $this->eventServiceDefault; |
244 | $this->logger->debug( |
245 | "Using event intake service $eventServiceName for stream $streamName." |
246 | ); |
247 | } |
248 | |
249 | return self::getInstance( $eventServiceName ); |
250 | } |
251 | |
252 | /** |
253 | * Uses StreamConfigs to determine if a stream is enabled. |
254 | * By default, a stream is enabled. It is disabled only if: |
255 | * |
256 | * - wgEventStreams[$streamName]['producers']['mediawiki_eventbus']['enabled'] === false |
257 | * OR |
258 | * - wgEventStreams != null, but, wgEventStreams[$streamName] is not declared |
259 | * |
260 | * @param string $streamName |
261 | * @return bool |
262 | */ |
263 | private function isStreamEnabled( string $streamName ): bool { |
264 | // No streamConfigs means any stream is enabled |
265 | if ( $this->streamConfigs === null ) { |
266 | return true; |
267 | } |
268 | |
269 | $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] ); |
270 | |
271 | // If $streamName is not declared in EventStreamConfig, then it is not enabled. |
272 | if ( !array_key_exists( $streamName, $streamConfigEntries ) ) { |
273 | return false; |
274 | } |
275 | |
276 | $streamSettings = $streamConfigEntries[$streamName]; |
277 | |
278 | return $streamSettings['producers'][ |
279 | self::EVENT_STREAM_CONFIG_PRODUCER_NAME |
280 | ][self::EVENT_STREAM_CONFIG_ENABLED_SETTING] ?? true; |
281 | } |
282 | |
283 | /** |
284 | * Looks up the wgEventStreams[$streamName]['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING] |
285 | * setting for this stream. |
286 | * If wgEventStreams is not configured, or if the stream is not configured in wgEventStreams, |
287 | * or if the stream does not have EVENT_STREAM_CONFIG_SERVICE_SETTING set, |
288 | * then this will return null. |
289 | * |
290 | * @param string $streamName |
291 | * @return string|null |
292 | */ |
293 | private function getEventServiceNameForStream( string $streamName ): ?string { |
294 | // Use eventServiceDefault if no streamConfigs were provided. |
295 | if ( $this->streamConfigs === null ) { |
296 | return null; |
297 | } |
298 | |
299 | // Else attempt to lookup EVENT_STREAM_CONFIG_SERVICE_SETTING for this stream. |
300 | $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] ); |
301 | |
302 | $streamSettings = $streamConfigEntries[$streamName] ?? []; |
303 | |
304 | $eventServiceName = $streamSettings['producers'][ |
305 | self::EVENT_STREAM_CONFIG_PRODUCER_NAME |
306 | ][self::EVENT_STREAM_CONFIG_SERVICE_SETTING] ?? null; |
307 | |
308 | // For backwards compatibility, the event service name setting used to be a top level |
309 | // stream setting 'destination_event_service'. If EVENT_STREAM_CONFIG_SERVICE_SETTING, use it instead. |
310 | // This can be removed once all streams have been migrated to using the |
311 | // producers.mediawiki_eventbus specific setting. |
312 | // https://phabricator.wikimedia.org/T321557 |
313 | return $eventServiceName ?: $streamSettings['destination_event_service'] ?? null; |
314 | } |
315 | } |