Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
76.32% covered (warning)
76.32%
58 / 76
40.00% covered (danger)
40.00%
2 / 5
CRAP
0.00% covered (danger)
0.00%
0 / 1
EventBusFactory
76.32% covered (warning)
76.32%
58 / 76
40.00% covered (danger)
40.00%
2 / 5
16.60
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
21 / 21
100.00% covered (success)
100.00%
1 / 1
1
 getInstance
95.83% covered (success)
95.83%
23 / 24
0.00% covered (danger)
0.00%
0 / 1
4
 getInstanceForStream
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
3
 isStreamEnabled
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 getEventServiceNameForStream
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 * http://www.gnu.org/copyleft/gpl.html
18 *
19 * @file
20 * @author Petr Pchelko
21 */
22
23namespace MediaWiki\Extension\EventBus;
24
25use InvalidArgumentException;
26use MediaWiki\Config\ServiceOptions;
27use MediaWiki\Extension\EventStreamConfig\StreamConfigs;
28use MultiHttpClient;
29use Psr\Log\LoggerInterface;
30use Wikimedia\Stats\StatsFactory;
31
32/**
33 * Creates appropriate EventBus instance based on stream config.
34 *
35 * @package MediaWiki\Extension\EventBus
36 * @since 1.35
37 */
38class EventBusFactory {
39
40    public const CONSTRUCTOR_OPTIONS = [
41        'EventServices',
42        'EventServiceDefault',
43        'EnableEventBus',
44        'EventBusMaxBatchByteSize'
45    ];
46
47    /**
48     * Key in wgEventStreams['stream_name']['producers'] that contains settings
49     * for this MediaWiki EventBus producer.
50     */
51    public const EVENT_STREAM_CONFIG_PRODUCER_NAME = 'mediawiki_eventbus';
52
53    /**
54     * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME]
55     * that specifies if the stream is enabled. A stream is 'disabled' only if
56     * this setting is explicitly false, or if the stream name
57     * does not have an entry in wgEventStreams
58     * (and wgEventStreams is an array with other streams configured).
59     */
60    public const EVENT_STREAM_CONFIG_ENABLED_SETTING = 'enabled';
61
62    /**
63     * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] that specifies
64     * the event service name that should be used for a specific stream.
65     * This should be a key into $eventServiceConfig, which usually is configured
66     * using the EventBus MW config wgEventServices.
67     * If not found via StreamConfigs, EventServiceDefault will be used.
68     */
69    public const EVENT_STREAM_CONFIG_SERVICE_SETTING = 'event_service_name';
70
71    /**
72     * Internal name of an EventBus instance that never sends events.
73     * This is used for streams that are disabled or undeclared.
74     * This will also be used as the dummy 'url' of that instance.
75     * (Public only for testing purposes.)
76     */
77    public const EVENT_SERVICE_DISABLED_NAME = '_disabled_eventbus_';
78
79    /**
80     * @var array|mixed
81     */
82    private array $eventServiceConfig;
83
84    /**
85     * @var string|mixed
86     */
87    private string $eventServiceDefault;
88
89    /**
90     * @var StreamConfigs|null
91     */
92    private ?StreamConfigs $streamConfigs;
93
94    /**
95     * @var string|mixed
96     */
97    private string $enableEventBus;
98
99    /**
100     * @var int|mixed
101     */
102    private int $maxBatchByteSize;
103
104    /**
105     * @var EventFactory
106     */
107    private EventFactory $eventFactory;
108
109    /**
110     * @var MultiHttpClient
111     */
112    private MultiHttpClient $http;
113
114    /**
115     * @var LoggerInterface
116     */
117    private LoggerInterface $logger;
118
119    /** @var ?StatsFactory wf:Stats factory instance */
120    private ?StatsFactory $statsFactory;
121
122    /**
123     * @var array
124     */
125    private array $eventBusInstances = [];
126
127    /**
128     * @param ServiceOptions $options
129     * @param StreamConfigs|null $streamConfigs
130     * @param EventFactory $eventFactory
131     * @param MultiHttpClient $http
132     * @param LoggerInterface $logger
133     * @param StatsFactory|null $statsFactory
134     */
135    public function __construct(
136        ServiceOptions $options,
137        ?StreamConfigs $streamConfigs,
138        EventFactory $eventFactory,
139        MultiHttpClient $http,
140        LoggerInterface $logger,
141        ?StatsFactory $statsFactory = null
142    ) {
143        $options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
144
145        $this->eventServiceConfig = $options->get( 'EventServices' );
146        $this->eventServiceDefault = $options->get( 'EventServiceDefault' );
147        $this->enableEventBus = $options->get( 'EnableEventBus' );
148        $this->maxBatchByteSize = $options->get( 'EventBusMaxBatchByteSize' );
149
150        $this->streamConfigs = $streamConfigs;
151        $this->eventFactory = $eventFactory;
152        $this->http = $http;
153        $this->logger = $logger;
154        $this->statsFactory = $statsFactory;
155
156        // Save a 'disabled' non producing EventBus instance that sets
157        // the allowed event type to TYPE_NONE. No
158        // events sent through this instance will actually be sent to an event service.
159        // This is done to allow us to easily 'disable' streams.
160        $this->eventBusInstances[self::EVENT_SERVICE_DISABLED_NAME] = new EventBus(
161            $this->http,
162            EventBus::TYPE_NONE,
163            $this->eventFactory,
164            self::EVENT_SERVICE_DISABLED_NAME,
165            $this->maxBatchByteSize,
166            0,
167            false,
168            self::EVENT_SERVICE_DISABLED_NAME,
169            $this->statsFactory
170        );
171    }
172
173    /**
174     * @param string $eventServiceName
175     *   The name of a key in the EventServices config looked up via
176     *   MediaWikiServices::getInstance()->getMainConfig()->get('EventServices').
177     *   The EventService config is keyed by service name, and should at least contain
178     *   a 'url' entry pointing at the event service endpoint events should be
179     *   POSTed to. They can also optionally contain a 'timeout' entry specifying
180     *   the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry
181     *   that can be set to true if the X-Client-IP header from the originating request
182     *   should be forwarded to the event service. Instances are singletons identified
183     *   by $eventServiceName.
184     *
185     * @note Previously, this function took a $config object instead of an
186     * event service name.  This is a backwards compatible change, but because
187     * there are no other users of this extension, we can do this safely.
188     *
189     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
190     * @return EventBus
191     */
192    public function getInstance( string $eventServiceName ): EventBus {
193        if ( array_key_exists( $eventServiceName, $this->eventBusInstances ) ) {
194            // If eventServiceName has already been instantiated, return it.
195            return $this->eventBusInstances[$eventServiceName];
196        } elseif (
197            array_key_exists( $eventServiceName, $this->eventServiceConfig ) &&
198            array_key_exists( 'url', $this->eventServiceConfig[$eventServiceName] )
199        ) {
200            // else, create eventServiceName instance from config
201            // and save it in eventBusInstances.
202            $eventServiceSettings = $this->eventServiceConfig[$eventServiceName];
203            $url = $eventServiceSettings['url'];
204            $timeout = $eventServiceSettings['timeout'] ?? null;
205            $forwardXClientIP = $eventServiceSettings['x_client_ip_forwarding_enabled'] ?? false;
206
207            $this->eventBusInstances[$eventServiceName] = new EventBus(
208                $this->http,
209                $this->enableEventBus,
210                $this->eventFactory,
211                $url,
212                $this->maxBatchByteSize,
213                $timeout,
214                $forwardXClientIP,
215                $eventServiceName,
216                $this->statsFactory
217            );
218            return $this->eventBusInstances[$eventServiceName];
219        } else {
220            $error = "Could not get EventBus instance for event service '$eventServiceName'. " .
221                'This event service name must exist in EventServices config with a url setting.';
222            $this->logger->error( $error );
223            throw new InvalidArgumentException( $error );
224        }
225    }
226
227    /**
228     * Gets an EventBus instance for a $stream.
229     *
230     * If EventStreamConfig is not configured, or if the stream is configured but
231     * does not set ['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING],
232     * EventServiceDefault will be used.
233     *
234     * If EventStreamConfig is configured, but the stream is not or the stream has
235     * ['producers']['mediawiki_eventbus']['enabled'] = false, this will return
236     * a non-producing EventBus instance.
237     *
238     * @param string $streamName the stream to send an event to
239     * @return EventBus
240     * @throws InvalidArgumentException
241     */
242    public function getInstanceForStream( string $streamName ): EventBus {
243        if ( $this->streamConfigs === null ) {
244            $eventServiceName = $this->eventServiceDefault;
245        } elseif ( !$this->isStreamEnabled( $streamName ) ) {
246            // Don't send event if $streamName is explicitly disabled.
247
248            $eventServiceName = self::EVENT_SERVICE_DISABLED_NAME;
249            $this->logger->debug(
250                "Using non-producing EventBus instance for stream $streamName" .
251                'This stream is either undeclared, or is explicitly disabled.'
252            );
253        } else {
254            $eventServiceName = $this->getEventServiceNameForStream( $streamName ) ??
255                $this->eventServiceDefault;
256            $this->logger->debug(
257                "Using event intake service $eventServiceName for stream $streamName."
258            );
259        }
260
261        return self::getInstance( $eventServiceName );
262    }
263
264    /**
265     * Uses StreamConfigs to determine if a stream is enabled.
266     * By default, a stream is enabled.  It is disabled only if:
267     *
268     * - wgEventStreams[$streamName]['producers']['mediawiki_eventbus']['enabled'] === false
269     * OR
270     * - wgEventStreams != null, but, wgEventStreams[$streamName] is not declared
271     *
272     * @param string $streamName
273     * @return bool
274     */
275    private function isStreamEnabled( string $streamName ): bool {
276        // No streamConfigs means any stream is enabled
277        if ( $this->streamConfigs === null ) {
278            return true;
279        }
280
281        $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] );
282
283        // If $streamName is not declared in EventStreamConfig, then it is not enabled.
284        if ( !array_key_exists( $streamName, $streamConfigEntries ) ) {
285            return false;
286        }
287
288        $streamSettings = $streamConfigEntries[$streamName];
289
290        return $streamSettings['producers'][
291            self::EVENT_STREAM_CONFIG_PRODUCER_NAME
292        ][self::EVENT_STREAM_CONFIG_ENABLED_SETTING] ?? true;
293    }
294
295    /**
296     * Looks up the wgEventStreams[$streamName]['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING]
297     * setting for this stream.
298     * If wgEventStreams is not configured, or if the stream is not configured in wgEventStreams,
299     * or if the stream does not have EVENT_STREAM_CONFIG_SERVICE_SETTING set,
300     * then this will return null.
301     *
302     * @param string $streamName
303     * @return string|null
304     */
305    private function getEventServiceNameForStream( string $streamName ): ?string {
306        // Use eventServiceDefault if no streamConfigs were provided.
307        if ( $this->streamConfigs === null ) {
308            return null;
309        }
310
311        // Else attempt to lookup EVENT_STREAM_CONFIG_SERVICE_SETTING for this stream.
312        $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] );
313
314        $streamSettings = $streamConfigEntries[$streamName] ?? [];
315
316        $eventServiceName = $streamSettings['producers'][
317            self::EVENT_STREAM_CONFIG_PRODUCER_NAME
318        ][self::EVENT_STREAM_CONFIG_SERVICE_SETTING] ?? null;
319
320        // For backwards compatibility, the event service name setting used to be a top level
321        // stream setting 'destination_event_service'. If EVENT_STREAM_CONFIG_SERVICE_SETTING, use it instead.
322        // This can be removed once all streams have been migrated to using the
323        // producers.mediawiki_eventbus specific setting.
324        // https://phabricator.wikimedia.org/T321557
325        return $eventServiceName ?: $streamSettings['destination_event_service'] ?? null;
326    }
327}