Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
74.65% covered (warning)
74.65%
53 / 71
40.00% covered (danger)
40.00%
2 / 5
CRAP
0.00% covered (danger)
0.00%
0 / 1
EventBusFactory
74.65% covered (warning)
74.65%
53 / 71
40.00% covered (danger)
40.00%
2 / 5
17.19
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
18 / 18
100.00% covered (success)
100.00%
1 / 1
1
 getInstance
95.45% covered (success)
95.45%
21 / 22
0.00% covered (danger)
0.00%
0 / 1
4
 getInstanceForStream
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
3
 isStreamEnabled
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 getEventServiceNameForStream
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 * http://www.gnu.org/copyleft/gpl.html
18 *
19 * @file
20 * @author Petr Pchelko
21 */
22
23namespace MediaWiki\Extension\EventBus;
24
25use InvalidArgumentException;
26use MediaWiki\Config\ServiceOptions;
27use MediaWiki\Extension\EventStreamConfig\StreamConfigs;
28use MultiHttpClient;
29use Psr\Log\LoggerInterface;
30
31/**
32 * Creates appropriate EventBus instance based on stream config.
33 *
34 * @package MediaWiki\Extension\EventBus
35 * @since 1.35
36 */
37class EventBusFactory {
38
39    public const CONSTRUCTOR_OPTIONS = [
40        'EventServices',
41        'EventServiceDefault',
42        'EnableEventBus',
43        'EventBusMaxBatchByteSize'
44    ];
45
46    /**
47     * Key in wgEventStreams['stream_name']['producers'] that contains settings
48     * for this MediaWiki EventBus producer.
49     */
50    public const EVENT_STREAM_CONFIG_PRODUCER_NAME = 'mediawiki_eventbus';
51
52    /**
53     * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME]
54     * that specifies if the stream is enabled. A stream is 'disabled' only if
55     * this setting is explicitly false, or if the stream name
56     * does not have an entry in wgEventStreams
57     * (and wgEventStreams is an array with other streams configured).
58     */
59    public const EVENT_STREAM_CONFIG_ENABLED_SETTING = 'enabled';
60
61    /**
62     * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] that specifies
63     * the event service name that should be used for a specific stream.
64     * This should be a key into $eventServiceConfig, which usually is configured
65     * using the EventBus MW config wgEventServices.
66     * If not found via StreamConfigs, EventServiceDefault will be used.
67     */
68    public const EVENT_STREAM_CONFIG_SERVICE_SETTING = 'event_service_name';
69
70    /**
71     * Internal name of an EventBus instance that never sends events.
72     * This is used for streams that are disabled or undeclared.
73     * This will also be used as the dummy 'url' of that instance.
74     * (Public only for testing purposes.)
75     */
76    public const EVENT_SERVICE_DISABLED_NAME = '_disabled_eventbus_';
77
78    /**
79     * @var array|mixed
80     */
81    private array $eventServiceConfig;
82
83    /**
84     * @var string|mixed
85     */
86    private string $eventServiceDefault;
87
88    /**
89     * @var StreamConfigs|null
90     */
91    private ?StreamConfigs $streamConfigs;
92
93    /**
94     * @var string|mixed
95     */
96    private string $enableEventBus;
97
98    /**
99     * @var int|mixed
100     */
101    private int $maxBatchByteSize;
102
103    /**
104     * @var EventFactory
105     */
106    private EventFactory $eventFactory;
107
108    /**
109     * @var MultiHttpClient
110     */
111    private MultiHttpClient $http;
112
113    /**
114     * @var LoggerInterface
115     */
116    private LoggerInterface $logger;
117
118    /**
119     * @var array
120     */
121    private array $eventBusInstances = [];
122
123    /**
124     * @param ServiceOptions $options
125     * @param StreamConfigs|null $streamConfigs
126     * @param EventFactory $eventFactory
127     * @param MultiHttpClient $http
128     * @param LoggerInterface $logger
129     */
130    public function __construct(
131        ServiceOptions $options,
132        ?StreamConfigs $streamConfigs,
133        EventFactory $eventFactory,
134        MultiHttpClient $http,
135        LoggerInterface $logger
136    ) {
137        $options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
138
139        $this->eventServiceConfig = $options->get( 'EventServices' );
140        $this->eventServiceDefault = $options->get( 'EventServiceDefault' );
141        $this->enableEventBus = $options->get( 'EnableEventBus' );
142        $this->maxBatchByteSize = $options->get( 'EventBusMaxBatchByteSize' );
143        $this->streamConfigs = $streamConfigs;
144        $this->eventFactory = $eventFactory;
145        $this->http = $http;
146        $this->logger = $logger;
147
148        // Save a 'disabled' non producing EventBus instance that sets
149        // the allowed event type to TYPE_NONE. No
150        // events sent through this instance will actually be sent to an event service.
151        // This is done to allow us to easily 'disable' streams.
152        $this->eventBusInstances[self::EVENT_SERVICE_DISABLED_NAME] = new EventBus(
153            $this->http,
154            EventBus::TYPE_NONE,
155            $this->eventFactory,
156            self::EVENT_SERVICE_DISABLED_NAME,
157            $this->maxBatchByteSize,
158            0,
159            false
160        );
161    }
162
163    /**
164     * @param string $eventServiceName
165     *   The name of a key in the EventServices config looked up via
166     *   MediaWikiServices::getInstance()->getMainConfig()->get('EventServices').
167     *   The EventService config is keyed by service name, and should at least contain
168     *   a 'url' entry pointing at the event service endpoint events should be
169     *   POSTed to. They can also optionally contain a 'timeout' entry specifying
170     *   the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry
171     *   that can be set to true if the X-Client-IP header from the originating request
172     *   should be forwarded to the event service. Instances are singletons identified
173     *   by $eventServiceName.
174     *
175     * @note Previously, this function took a $config object instead of an
176     * event service name.  This is a backwards compatible change, but because
177     * there are no other users of this extension, we can do this safely.
178     *
179     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
180     * @return EventBus
181     */
182    public function getInstance( string $eventServiceName ): EventBus {
183        if ( array_key_exists( $eventServiceName, $this->eventBusInstances ) ) {
184            // If eventServiceName has already been instantiated, return it.
185            return $this->eventBusInstances[$eventServiceName];
186        } elseif (
187            array_key_exists( $eventServiceName, $this->eventServiceConfig ) &&
188            array_key_exists( 'url', $this->eventServiceConfig[$eventServiceName] )
189        ) {
190            // else, create eventServiceName instance from config
191            // and save it in eventBusInstances.
192            $eventServiceSettings = $this->eventServiceConfig[$eventServiceName];
193            $url = $eventServiceSettings['url'];
194            $timeout = $eventServiceSettings['timeout'] ?? null;
195            $forwardXClientIP = $eventServiceSettings['x_client_ip_forwarding_enabled'] ?? false;
196
197            $this->eventBusInstances[$eventServiceName] = new EventBus(
198                $this->http,
199                $this->enableEventBus,
200                $this->eventFactory,
201                $url,
202                $this->maxBatchByteSize,
203                $timeout,
204                $forwardXClientIP
205            );
206            return $this->eventBusInstances[$eventServiceName];
207        } else {
208            $error = "Could not get EventBus instance for event service '$eventServiceName'. " .
209                'This event service name must exist in EventServices config with a url setting.';
210            $this->logger->error( $error );
211            throw new InvalidArgumentException( $error );
212        }
213    }
214
215    /**
216     * Gets an EventBus instance for a $stream.
217     *
218     * If EventStreamConfig is not configured, or if the stream is configured but
219     * does not set ['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING],
220     * EventServiceDefault will be used.
221     *
222     * If EventStreamConfig is configured, but the stream is not or the stream has
223     * ['producers']['mediawiki_eventbus']['enabled'] = false, this will return
224     * a non-producing EventBus instance.
225     *
226     * @param string $streamName the stream to send an event to
227     * @return EventBus
228     * @throws InvalidArgumentException
229     */
230    public function getInstanceForStream( string $streamName ): EventBus {
231        if ( $this->streamConfigs === null ) {
232            $eventServiceName = $this->eventServiceDefault;
233        } elseif ( !$this->isStreamEnabled( $streamName ) ) {
234            // Don't send event if $streamName is explicitly disabled.
235
236            $eventServiceName = self::EVENT_SERVICE_DISABLED_NAME;
237            $this->logger->debug(
238                "Using non-producing EventBus instance for stream $streamName" .
239                'This stream is either undeclared, or is explicitly disabled.'
240            );
241        } else {
242            $eventServiceName = $this->getEventServiceNameForStream( $streamName ) ??
243                $this->eventServiceDefault;
244            $this->logger->debug(
245                "Using event intake service $eventServiceName for stream $streamName."
246            );
247        }
248
249        return self::getInstance( $eventServiceName );
250    }
251
252    /**
253     * Uses StreamConfigs to determine if a stream is enabled.
254     * By default, a stream is enabled.  It is disabled only if:
255     *
256     * - wgEventStreams[$streamName]['producers']['mediawiki_eventbus']['enabled'] === false
257     * OR
258     * - wgEventStreams != null, but, wgEventStreams[$streamName] is not declared
259     *
260     * @param string $streamName
261     * @return bool
262     */
263    private function isStreamEnabled( string $streamName ): bool {
264        // No streamConfigs means any stream is enabled
265        if ( $this->streamConfigs === null ) {
266            return true;
267        }
268
269        $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] );
270
271        // If $streamName is not declared in EventStreamConfig, then it is not enabled.
272        if ( !array_key_exists( $streamName, $streamConfigEntries ) ) {
273            return false;
274        }
275
276        $streamSettings = $streamConfigEntries[$streamName];
277
278        return $streamSettings['producers'][
279            self::EVENT_STREAM_CONFIG_PRODUCER_NAME
280        ][self::EVENT_STREAM_CONFIG_ENABLED_SETTING] ?? true;
281    }
282
283    /**
284     * Looks up the wgEventStreams[$streamName]['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING]
285     * setting for this stream.
286     * If wgEventStreams is not configured, or if the stream is not configured in wgEventStreams,
287     * or if the stream does not have EVENT_STREAM_CONFIG_SERVICE_SETTING set,
288     * then this will return null.
289     *
290     * @param string $streamName
291     * @return string|null
292     */
293    private function getEventServiceNameForStream( string $streamName ): ?string {
294        // Use eventServiceDefault if no streamConfigs were provided.
295        if ( $this->streamConfigs === null ) {
296            return null;
297        }
298
299        // Else attempt to lookup EVENT_STREAM_CONFIG_SERVICE_SETTING for this stream.
300        $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] );
301
302        $streamSettings = $streamConfigEntries[$streamName] ?? [];
303
304        $eventServiceName = $streamSettings['producers'][
305            self::EVENT_STREAM_CONFIG_PRODUCER_NAME
306        ][self::EVENT_STREAM_CONFIG_SERVICE_SETTING] ?? null;
307
308        // For backwards compatibility, the event service name setting used to be a top level
309        // stream setting 'destination_event_service'. If EVENT_STREAM_CONFIG_SERVICE_SETTING, use it instead.
310        // This can be removed once all streams have been migrated to using the
311        // producers.mediawiki_eventbus specific setting.
312        // https://phabricator.wikimedia.org/T321557
313        return $eventServiceName ?: $streamSettings['destination_event_service'] ?? null;
314    }
315}