Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
76.32% |
58 / 76 |
|
40.00% |
2 / 5 |
CRAP | |
0.00% |
0 / 1 |
EventBusFactory | |
76.32% |
58 / 76 |
|
40.00% |
2 / 5 |
16.60 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
21 / 21 |
|
100.00% |
1 / 1 |
1 | |||
getInstance | |
95.83% |
23 / 24 |
|
0.00% |
0 / 1 |
4 | |||
getInstanceForStream | |
100.00% |
14 / 14 |
|
100.00% |
1 / 1 |
3 | |||
isStreamEnabled | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
getEventServiceNameForStream | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
12 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or modify |
5 | * it under the terms of the GNU General Public License as published by |
6 | * the Free Software Foundation; either version 2 of the License, or |
7 | * (at your option) any later version. |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License along |
15 | * with this program; if not, write to the Free Software Foundation, Inc., |
16 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * http://www.gnu.org/copyleft/gpl.html |
18 | * |
19 | * @file |
20 | * @author Petr Pchelko |
21 | */ |
22 | |
23 | namespace MediaWiki\Extension\EventBus; |
24 | |
25 | use InvalidArgumentException; |
26 | use MediaWiki\Config\ServiceOptions; |
27 | use MediaWiki\Extension\EventStreamConfig\StreamConfigs; |
28 | use MultiHttpClient; |
29 | use Psr\Log\LoggerInterface; |
30 | use Wikimedia\Stats\StatsFactory; |
31 | |
32 | /** |
33 | * Creates appropriate EventBus instance based on stream config. |
34 | * |
35 | * @package MediaWiki\Extension\EventBus |
36 | * @since 1.35 |
37 | */ |
38 | class EventBusFactory { |
39 | |
40 | public const CONSTRUCTOR_OPTIONS = [ |
41 | 'EventServices', |
42 | 'EventServiceDefault', |
43 | 'EnableEventBus', |
44 | 'EventBusMaxBatchByteSize' |
45 | ]; |
46 | |
47 | /** |
48 | * Key in wgEventStreams['stream_name']['producers'] that contains settings |
49 | * for this MediaWiki EventBus producer. |
50 | */ |
51 | public const EVENT_STREAM_CONFIG_PRODUCER_NAME = 'mediawiki_eventbus'; |
52 | |
53 | /** |
54 | * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] |
55 | * that specifies if the stream is enabled. A stream is 'disabled' only if |
56 | * this setting is explicitly false, or if the stream name |
57 | * does not have an entry in wgEventStreams |
58 | * (and wgEventStreams is an array with other streams configured). |
59 | */ |
60 | public const EVENT_STREAM_CONFIG_ENABLED_SETTING = 'enabled'; |
61 | |
62 | /** |
63 | * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] that specifies |
64 | * the event service name that should be used for a specific stream. |
65 | * This should be a key into $eventServiceConfig, which usually is configured |
66 | * using the EventBus MW config wgEventServices. |
67 | * If not found via StreamConfigs, EventServiceDefault will be used. |
68 | */ |
69 | public const EVENT_STREAM_CONFIG_SERVICE_SETTING = 'event_service_name'; |
70 | |
71 | /** |
72 | * Internal name of an EventBus instance that never sends events. |
73 | * This is used for streams that are disabled or undeclared. |
74 | * This will also be used as the dummy 'url' of that instance. |
75 | * (Public only for testing purposes.) |
76 | */ |
77 | public const EVENT_SERVICE_DISABLED_NAME = '_disabled_eventbus_'; |
78 | |
79 | /** |
80 | * @var array|mixed |
81 | */ |
82 | private array $eventServiceConfig; |
83 | |
84 | /** |
85 | * @var string|mixed |
86 | */ |
87 | private string $eventServiceDefault; |
88 | |
89 | /** |
90 | * @var StreamConfigs|null |
91 | */ |
92 | private ?StreamConfigs $streamConfigs; |
93 | |
94 | /** |
95 | * @var string|mixed |
96 | */ |
97 | private string $enableEventBus; |
98 | |
99 | /** |
100 | * @var int|mixed |
101 | */ |
102 | private int $maxBatchByteSize; |
103 | |
104 | /** |
105 | * @var EventFactory |
106 | */ |
107 | private EventFactory $eventFactory; |
108 | |
109 | /** |
110 | * @var MultiHttpClient |
111 | */ |
112 | private MultiHttpClient $http; |
113 | |
114 | /** |
115 | * @var LoggerInterface |
116 | */ |
117 | private LoggerInterface $logger; |
118 | |
119 | /** @var ?StatsFactory wf:Stats factory instance */ |
120 | private ?StatsFactory $statsFactory; |
121 | |
122 | /** |
123 | * @var array |
124 | */ |
125 | private array $eventBusInstances = []; |
126 | |
127 | /** |
128 | * @param ServiceOptions $options |
129 | * @param StreamConfigs|null $streamConfigs |
130 | * @param EventFactory $eventFactory |
131 | * @param MultiHttpClient $http |
132 | * @param LoggerInterface $logger |
133 | * @param StatsFactory|null $statsFactory |
134 | */ |
135 | public function __construct( |
136 | ServiceOptions $options, |
137 | ?StreamConfigs $streamConfigs, |
138 | EventFactory $eventFactory, |
139 | MultiHttpClient $http, |
140 | LoggerInterface $logger, |
141 | ?StatsFactory $statsFactory = null |
142 | ) { |
143 | $options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS ); |
144 | |
145 | $this->eventServiceConfig = $options->get( 'EventServices' ); |
146 | $this->eventServiceDefault = $options->get( 'EventServiceDefault' ); |
147 | $this->enableEventBus = $options->get( 'EnableEventBus' ); |
148 | $this->maxBatchByteSize = $options->get( 'EventBusMaxBatchByteSize' ); |
149 | |
150 | $this->streamConfigs = $streamConfigs; |
151 | $this->eventFactory = $eventFactory; |
152 | $this->http = $http; |
153 | $this->logger = $logger; |
154 | $this->statsFactory = $statsFactory; |
155 | |
156 | // Save a 'disabled' non producing EventBus instance that sets |
157 | // the allowed event type to TYPE_NONE. No |
158 | // events sent through this instance will actually be sent to an event service. |
159 | // This is done to allow us to easily 'disable' streams. |
160 | $this->eventBusInstances[self::EVENT_SERVICE_DISABLED_NAME] = new EventBus( |
161 | $this->http, |
162 | EventBus::TYPE_NONE, |
163 | $this->eventFactory, |
164 | self::EVENT_SERVICE_DISABLED_NAME, |
165 | $this->maxBatchByteSize, |
166 | 0, |
167 | false, |
168 | self::EVENT_SERVICE_DISABLED_NAME, |
169 | $this->statsFactory |
170 | ); |
171 | } |
172 | |
173 | /** |
174 | * @param string $eventServiceName |
175 | * The name of a key in the EventServices config looked up via |
176 | * MediaWikiServices::getInstance()->getMainConfig()->get('EventServices'). |
177 | * The EventService config is keyed by service name, and should at least contain |
178 | * a 'url' entry pointing at the event service endpoint events should be |
179 | * POSTed to. They can also optionally contain a 'timeout' entry specifying |
180 | * the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry |
181 | * that can be set to true if the X-Client-IP header from the originating request |
182 | * should be forwarded to the event service. Instances are singletons identified |
183 | * by $eventServiceName. |
184 | * |
185 | * @note Previously, this function took a $config object instead of an |
186 | * event service name. This is a backwards compatible change, but because |
187 | * there are no other users of this extension, we can do this safely. |
188 | * |
189 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
190 | * @return EventBus |
191 | */ |
192 | public function getInstance( string $eventServiceName ): EventBus { |
193 | if ( array_key_exists( $eventServiceName, $this->eventBusInstances ) ) { |
194 | // If eventServiceName has already been instantiated, return it. |
195 | return $this->eventBusInstances[$eventServiceName]; |
196 | } elseif ( |
197 | array_key_exists( $eventServiceName, $this->eventServiceConfig ) && |
198 | array_key_exists( 'url', $this->eventServiceConfig[$eventServiceName] ) |
199 | ) { |
200 | // else, create eventServiceName instance from config |
201 | // and save it in eventBusInstances. |
202 | $eventServiceSettings = $this->eventServiceConfig[$eventServiceName]; |
203 | $url = $eventServiceSettings['url']; |
204 | $timeout = $eventServiceSettings['timeout'] ?? null; |
205 | $forwardXClientIP = $eventServiceSettings['x_client_ip_forwarding_enabled'] ?? false; |
206 | |
207 | $this->eventBusInstances[$eventServiceName] = new EventBus( |
208 | $this->http, |
209 | $this->enableEventBus, |
210 | $this->eventFactory, |
211 | $url, |
212 | $this->maxBatchByteSize, |
213 | $timeout, |
214 | $forwardXClientIP, |
215 | $eventServiceName, |
216 | $this->statsFactory |
217 | ); |
218 | return $this->eventBusInstances[$eventServiceName]; |
219 | } else { |
220 | $error = "Could not get EventBus instance for event service '$eventServiceName'. " . |
221 | 'This event service name must exist in EventServices config with a url setting.'; |
222 | $this->logger->error( $error ); |
223 | throw new InvalidArgumentException( $error ); |
224 | } |
225 | } |
226 | |
227 | /** |
228 | * Gets an EventBus instance for a $stream. |
229 | * |
230 | * If EventStreamConfig is not configured, or if the stream is configured but |
231 | * does not set ['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING], |
232 | * EventServiceDefault will be used. |
233 | * |
234 | * If EventStreamConfig is configured, but the stream is not or the stream has |
235 | * ['producers']['mediawiki_eventbus']['enabled'] = false, this will return |
236 | * a non-producing EventBus instance. |
237 | * |
238 | * @param string $streamName the stream to send an event to |
239 | * @return EventBus |
240 | * @throws InvalidArgumentException |
241 | */ |
242 | public function getInstanceForStream( string $streamName ): EventBus { |
243 | if ( $this->streamConfigs === null ) { |
244 | $eventServiceName = $this->eventServiceDefault; |
245 | } elseif ( !$this->isStreamEnabled( $streamName ) ) { |
246 | // Don't send event if $streamName is explicitly disabled. |
247 | |
248 | $eventServiceName = self::EVENT_SERVICE_DISABLED_NAME; |
249 | $this->logger->debug( |
250 | "Using non-producing EventBus instance for stream $streamName. " . |
251 | 'This stream is either undeclared, or is explicitly disabled.' |
252 | ); |
253 | } else { |
254 | $eventServiceName = $this->getEventServiceNameForStream( $streamName ) ?? |
255 | $this->eventServiceDefault; |
256 | $this->logger->debug( |
257 | "Using event intake service $eventServiceName for stream $streamName." |
258 | ); |
259 | } |
260 | |
261 | return self::getInstance( $eventServiceName ); |
262 | } |
263 | |
264 | /** |
265 | * Uses StreamConfigs to determine if a stream is enabled. |
266 | * By default, a stream is enabled. It is disabled only if: |
267 | * |
268 | * - wgEventStreams[$streamName]['producers']['mediawiki_eventbus']['enabled'] === false |
269 | * OR |
270 | * - wgEventStreams != null, but, wgEventStreams[$streamName] is not declared |
271 | * |
272 | * @param string $streamName |
273 | * @return bool |
274 | */ |
275 | private function isStreamEnabled( string $streamName ): bool { |
276 | // No streamConfigs means any stream is enabled |
277 | if ( $this->streamConfigs === null ) { |
278 | return true; |
279 | } |
280 | |
281 | $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] ); |
282 | |
283 | // If $streamName is not declared in EventStreamConfig, then it is not enabled. |
284 | if ( !array_key_exists( $streamName, $streamConfigEntries ) ) { |
285 | return false; |
286 | } |
287 | |
288 | $streamSettings = $streamConfigEntries[$streamName]; |
289 | |
290 | return $streamSettings['producers'][ |
291 | self::EVENT_STREAM_CONFIG_PRODUCER_NAME |
292 | ][self::EVENT_STREAM_CONFIG_ENABLED_SETTING] ?? true; |
293 | } |
294 | |
295 | /** |
296 | * Looks up the wgEventStreams[$streamName]['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING] |
297 | * setting for this stream. |
298 | * If wgEventStreams is not configured, or if the stream is not configured in wgEventStreams, |
299 | * or if the stream does not have EVENT_STREAM_CONFIG_SERVICE_SETTING set, |
300 | * then this will return null. |
301 | * |
302 | * @param string $streamName |
303 | * @return string|null |
304 | */ |
305 | private function getEventServiceNameForStream( string $streamName ): ?string { |
306 | // Use eventServiceDefault if no streamConfigs were provided. |
307 | if ( $this->streamConfigs === null ) { |
308 | return null; |
309 | } |
310 | |
311 | // Else attempt to lookup EVENT_STREAM_CONFIG_SERVICE_SETTING for this stream. |
312 | $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] ); |
313 | |
314 | $streamSettings = $streamConfigEntries[$streamName] ?? []; |
315 | |
316 | $eventServiceName = $streamSettings['producers'][ |
317 | self::EVENT_STREAM_CONFIG_PRODUCER_NAME |
318 | ][self::EVENT_STREAM_CONFIG_SERVICE_SETTING] ?? null; |
319 | |
320 | // For backwards compatibility, the event service name setting used to be a top level |
321 | // stream setting 'destination_event_service'. If EVENT_STREAM_CONFIG_SERVICE_SETTING, use it instead. |
322 | // This can be removed once all streams have been migrated to using the |
323 | // producers.mediawiki_eventbus specific setting. |
324 | // https://phabricator.wikimedia.org/T321557 |
325 | return $eventServiceName ?: $streamSettings['destination_event_service'] ?? null; |
326 | } |
327 | } |