Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
95.95% covered (success)
95.95%
71 / 74
60.00% covered (warning)
60.00%
3 / 5
CRAP
0.00% covered (danger)
0.00%
0 / 1
EventBusFactory
95.95% covered (success)
95.95%
71 / 74
60.00% covered (warning)
60.00%
3 / 5
13
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
21 / 21
100.00% covered (success)
100.00%
1 / 1
1
 getInstance
95.83% covered (success)
95.83%
23 / 24
0.00% covered (danger)
0.00%
0 / 1
4
 getInstanceForStream
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isStreamEnabled
77.78% covered (warning)
77.78%
7 / 9
0.00% covered (danger)
0.00%
0 / 1
3.10
 getEventServiceNameForStream
100.00% covered (success)
100.00%
19 / 19
100.00% covered (success)
100.00%
1 / 1
4
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 * http://www.gnu.org/copyleft/gpl.html
18 *
19 * @file
20 * @author Petr Pchelko
21 */
22
23namespace MediaWiki\Extension\EventBus;
24
25use InvalidArgumentException;
26use MediaWiki\Config\ServiceOptions;
27use MediaWiki\Extension\EventStreamConfig\StreamConfigs;
28use Psr\Log\LoggerInterface;
29use Wikimedia\Http\MultiHttpClient;
30use Wikimedia\Stats\StatsFactory;
31
32/**
33 * Creates appropriate EventBus instance based on stream config.
34 *
35 * @since 1.35
36 */
37class EventBusFactory {
38
39    public const CONSTRUCTOR_OPTIONS = [
40        'EventServices',
41        'EventServiceDefault',
42        'EnableEventBus',
43        'EventBusMaxBatchByteSize'
44    ];
45
46    /**
47     * Key in wgEventStreams['stream_name']['producers'] that contains settings
48     * for this MediaWiki EventBus producer.
49     */
50    public const EVENT_STREAM_CONFIG_PRODUCER_NAME = 'mediawiki_eventbus';
51
52    /**
53     * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME]
54     * that specifies if the stream is enabled. A stream is 'disabled' only if
55     * this setting is explicitly false, or if the stream name
56     * does not have an entry in wgEventStreams
57     * (and wgEventStreams is an array with other streams configured).
58     */
59    public const EVENT_STREAM_CONFIG_ENABLED_SETTING = 'enabled';
60
61    /**
62     * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] that specifies
63     * the event service name that should be used for a specific stream.
64     * This should be a key into $eventServiceConfig, which usually is configured
65     * using the EventBus MW config wgEventServices.
66     * If not found via StreamConfigs, EventServiceDefault will be used.
67     */
68    public const EVENT_STREAM_CONFIG_SERVICE_SETTING = 'event_service_name';
69
70    /**
71     * Internal name of an EventBus instance that never sends events.
72     * This is used for streams that are disabled or undeclared.
73     * This will also be used as the dummy 'url' of that instance.
74     * (Public only for testing purposes.)
75     */
76    public const EVENT_SERVICE_DISABLED_NAME = '_disabled_eventbus_';
77
78    /**
79     * @var array|mixed
80     */
81    private array $eventServiceConfig;
82
83    /**
84     * @var string|mixed
85     */
86    private string $eventServiceDefault;
87
88    /**
89     * @var StreamConfigs|null
90     */
91    private ?StreamConfigs $streamConfigs;
92
93    /**
94     * @var string|mixed
95     */
96    private string $enableEventBus;
97
98    /**
99     * @var int|mixed
100     */
101    private int $maxBatchByteSize;
102
103    /**
104     * @var EventFactory
105     */
106    private EventFactory $eventFactory;
107
108    /**
109     * @var MultiHttpClient
110     */
111    private MultiHttpClient $http;
112
113    /**
114     * @var LoggerInterface
115     */
116    private LoggerInterface $logger;
117
118    /** @var ?StatsFactory wf:Stats factory instance */
119    private ?StatsFactory $statsFactory;
120
121    /**
122     * @var array
123     */
124    private array $eventBusInstances = [];
125
126    /**
127     * @param ServiceOptions $options
128     * @param StreamConfigs|null $streamConfigs
129     * @param EventFactory $eventFactory
130     * @param MultiHttpClient $http
131     * @param LoggerInterface $logger
132     * @param StatsFactory|null $statsFactory
133     */
134    public function __construct(
135        ServiceOptions $options,
136        ?StreamConfigs $streamConfigs,
137        EventFactory $eventFactory,
138        MultiHttpClient $http,
139        LoggerInterface $logger,
140        ?StatsFactory $statsFactory = null
141    ) {
142        $options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
143
144        $this->eventServiceConfig = $options->get( 'EventServices' );
145        $this->eventServiceDefault = $options->get( 'EventServiceDefault' );
146        $this->enableEventBus = $options->get( 'EnableEventBus' );
147        $this->maxBatchByteSize = $options->get( 'EventBusMaxBatchByteSize' );
148
149        $this->streamConfigs = $streamConfigs;
150        $this->eventFactory = $eventFactory;
151        $this->http = $http;
152        $this->logger = $logger;
153        $this->statsFactory = $statsFactory;
154
155        // Save a 'disabled' non producing EventBus instance that sets
156        // the allowed event type to TYPE_NONE. No
157        // events sent through this instance will actually be sent to an event service.
158        // This is done to allow us to easily 'disable' streams.
159        $this->eventBusInstances[self::EVENT_SERVICE_DISABLED_NAME] = new EventBus(
160            $this->http,
161            EventBus::TYPE_NONE,
162            $this->eventFactory,
163            self::EVENT_SERVICE_DISABLED_NAME,
164            $this->maxBatchByteSize,
165            0,
166            false,
167            self::EVENT_SERVICE_DISABLED_NAME,
168            $this->statsFactory
169        );
170    }
171
172    /**
173     * @param string $eventServiceName
174     *   The name of a key in the EventServices config looked up via
175     *   MediaWikiServices::getInstance()->getMainConfig()->get('EventServices').
176     *   The EventService config is keyed by service name, and should at least contain
177     *   a 'url' entry pointing at the event service endpoint events should be
178     *   POSTed to. They can also optionally contain a 'timeout' entry specifying
179     *   the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry
180     *   that can be set to true if the X-Client-IP header from the originating request
181     *   should be forwarded to the event service. Instances are singletons identified
182     *   by $eventServiceName.
183     *
184     * @note Previously, this function took a $config object instead of an
185     * event service name.  This is a backwards compatible change, but because
186     * there are no other users of this extension, we can do this safely.
187     *
188     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
189     * @return EventBus
190     */
191    public function getInstance( string $eventServiceName ): EventBus {
192        if ( array_key_exists( $eventServiceName, $this->eventBusInstances ) ) {
193            // If eventServiceName has already been instantiated, return it.
194            return $this->eventBusInstances[$eventServiceName];
195        } elseif (
196            array_key_exists( $eventServiceName, $this->eventServiceConfig ) &&
197            array_key_exists( 'url', $this->eventServiceConfig[$eventServiceName] )
198        ) {
199            // else, create eventServiceName instance from config
200            // and save it in eventBusInstances.
201            $eventServiceSettings = $this->eventServiceConfig[$eventServiceName];
202            $url = $eventServiceSettings['url'];
203            $timeout = $eventServiceSettings['timeout'] ?? null;
204            $forwardXClientIP = $eventServiceSettings['x_client_ip_forwarding_enabled'] ?? false;
205
206            $this->eventBusInstances[$eventServiceName] = new EventBus(
207                $this->http,
208                $this->enableEventBus,
209                $this->eventFactory,
210                $url,
211                $this->maxBatchByteSize,
212                $timeout,
213                $forwardXClientIP,
214                $eventServiceName,
215                $this->statsFactory
216            );
217            return $this->eventBusInstances[$eventServiceName];
218        } else {
219            $error = "Could not get EventBus instance for event service '$eventServiceName'. " .
220                'This event service name must exist in EventServices config with a url setting.';
221            $this->logger->error( $error );
222            throw new InvalidArgumentException( $error );
223        }
224    }
225
226    /**
227     * Gets an EventBus instance for a $stream.
228     *
229     * If EventStreamConfig is not configured, or if the stream is configured but
230     * does not set ['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING],
231     * EventServiceDefault will be used.
232     *
233     * If EventStreamConfig is configured, but the stream is not or the stream has
234     * ['producers']['mediawiki_eventbus']['enabled'] = false, this will return
235     * a non-producing EventBus instance.
236     *
237     * @param string $streamName the stream to send an event to
238     * @return EventBus
239     * @throws InvalidArgumentException
240     */
241    public function getInstanceForStream( string $streamName ): EventBus {
242        return self::getInstance( $this->getEventServiceNameForStream( $streamName ) );
243    }
244
245    /**
246     * Uses StreamConfigs to determine if a stream is enabled.
247     * By default, a stream is enabled.  It is disabled only if:
248     *
249     * - wgEventStreams[$streamName]['producers']['mediawiki_eventbus']['enabled'] === false
250     * OR
251     * - wgEventStreams != null, but, wgEventStreams[$streamName] is not declared
252     *
253     * @param string $streamName
254     * @return bool
255     */
256    private function isStreamEnabled( string $streamName ): bool {
257        // No streamConfigs means any stream is enabled
258        if ( $this->streamConfigs === null ) {
259            return true;
260        }
261
262        $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] );
263
264        // If $streamName is not declared in EventStreamConfig, then it is not enabled.
265        if ( !array_key_exists( $streamName, $streamConfigEntries ) ) {
266            return false;
267        }
268
269        $streamSettings = $streamConfigEntries[$streamName];
270
271        return $streamSettings['producers'][
272            self::EVENT_STREAM_CONFIG_PRODUCER_NAME
273        ][self::EVENT_STREAM_CONFIG_ENABLED_SETTING] ?? true;
274    }
275
276    /**
277     * Looks up the wgEventStreams[$streamName]['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING]
278     * setting for this stream.
279     * If wgEventStreams is not configured, or if the stream is not configured in wgEventStreams,
280     * or if the stream does not have EVENT_STREAM_CONFIG_SERVICE_SETTING set,
281     * then this falls back to the default stream.
282     * If the stream is explicitly marked as disabled, this will return a non-producing stream name.
283     *
284     * @param string $streamName
285     * @return string
286     */
287    public function getEventServiceNameForStream( string $streamName ): string {
288        // Use eventServiceDefault if no streamConfigs were provided.
289        if ( $this->streamConfigs === null ) {
290            return $this->eventServiceDefault;
291        }
292
293        if ( !$this->isStreamEnabled( $streamName ) ) {
294            // Don't send event if $streamName is explicitly disabled.
295
296            $this->logger->debug(
297                "Using non-producing EventBus instance for stream $streamName" .
298                'This stream is either undeclared, or is explicitly disabled.'
299            );
300
301            return self::EVENT_SERVICE_DISABLED_NAME;
302        }
303
304        // Else attempt to lookup EVENT_STREAM_CONFIG_SERVICE_SETTING for this stream.
305        $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] );
306
307        $streamSettings = $streamConfigEntries[$streamName] ?? [];
308
309        $eventServiceName = $streamSettings['producers'][
310            self::EVENT_STREAM_CONFIG_PRODUCER_NAME
311        ][self::EVENT_STREAM_CONFIG_SERVICE_SETTING] ?? null;
312
313        // For backwards compatibility, the event service name setting used to be a top level
314        // stream setting 'destination_event_service'. If EVENT_STREAM_CONFIG_SERVICE_SETTING, use it instead.
315        // This can be removed once all streams have been migrated to using the
316        // producers.mediawiki_eventbus specific setting.
317        // https://phabricator.wikimedia.org/T321557
318        $eventServiceName = $eventServiceName ?: $streamSettings['destination_event_service'] ??
319            $this->eventServiceDefault;
320
321        $this->logger->debug(
322            "Using event intake service $eventServiceName for stream $streamName."
323        );
324
325        return $eventServiceName;
326    }
327}