Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
68.16% covered (warning)
68.16%
152 / 223
46.67% covered (danger)
46.67%
7 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
EventBus
68.16% covered (warning)
68.16%
152 / 223
46.67% covered (danger)
46.67%
7 / 15
191.10
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
24 / 24
100.00% covered (success)
100.00%
1 / 1
6
 partitionEvents
100.00% covered (success)
100.00%
20 / 20
100.00% covered (success)
100.00%
1 / 1
4
 incrementMetricByValue
16.67% covered (danger)
16.67%
1 / 6
0.00% covered (danger)
0.00%
0 / 1
19.47
 send
75.68% covered (warning)
75.68%
84 / 111
0.00% covered (danger)
0.00%
0 / 1
28.97
 serializeEvents
10.71% covered (danger)
10.71%
3 / 28
0.00% covered (danger)
0.00%
0 / 1
22.79
 prepareEventsForLogging
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
4.13
 replaceBinaryValues
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 replaceBinaryValuesRecursive
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 checkEventPart
40.00% covered (danger)
40.00%
4 / 10
0.00% covered (danger)
0.00%
0 / 1
10.40
 validateJSONSerializable
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 shouldSendEvent
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 logger
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getFactory
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getInstance
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getInstanceForStream
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * Event delivery.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 * http://www.gnu.org/copyleft/gpl.html
20 *
21 * @file
22 * @author Eric Evans, Andrew Otto
23 */
24
25namespace MediaWiki\Extension\EventBus;
26
27use Exception;
28use FormatJson;
29use InvalidArgumentException;
30use MediaWiki\Context\RequestContext;
31use MediaWiki\Logger\LoggerFactory;
32use MediaWiki\MediaWikiServices;
33use MultiHttpClient;
34use Psr\Log\LoggerInterface;
35use RuntimeException;
36use Wikimedia\Assert\Assert;
37use Wikimedia\Stats\StatsFactory;
38
39class EventBus {
40
41    /**
42     * @const int the special event type indicating no events should be accepted.
43     */
44    public const TYPE_NONE = 0;
45
46    /**
47     * @const int the event type indicating that the event is a regular mediawiki event.
48     */
49    public const TYPE_EVENT = 1;
50
51    /**
52     * @const int the event type indicating that the event is a serialized job.
53     */
54    public const TYPE_JOB = 2;
55
56    /**
57     * @const int the event type indicating that the event is a CDN purge.
58     */
59    public const TYPE_PURGE = 4;
60
61    /**
62     * @const int the event type indicating any event type. (TYPE_EVENT ^ TYPE_EVENT)
63     */
64    public const TYPE_ALL = self::TYPE_EVENT | self::TYPE_JOB | self::TYPE_PURGE;
65
66    /**
67     * @const array names of the event type constants defined above
68     */
69    private const EVENT_TYPE_NAMES = [
70        'TYPE_NONE' => self::TYPE_NONE,
71        'TYPE_EVENT' => self::TYPE_EVENT,
72        'TYPE_JOB' => self::TYPE_JOB,
73        'TYPE_PURGE' => self::TYPE_PURGE,
74        'TYPE_ALL' => self::TYPE_ALL,
75    ];
76
77    /** @const int Default HTTP request timeout in seconds */
78    private const DEFAULT_REQUEST_TIMEOUT = 10;
79    /** @const string fallback stream name to use when the `meta.stream` field is
80     * not set in events payload.
81     */
82    private const STREAM_UNKNOWN_NAME = "__stream_unknown__";
83
84    /** @var LoggerInterface instance for all EventBus instances */
85    private static $logger;
86
87    /** @var MultiHttpClient */
88    private $http;
89
90    /** @var string EventServiceUrl for this EventBus instance */
91    private $url;
92
93    /** @var int HTTP request timeout for this EventBus instance */
94    private $timeout;
95
96    /** @var int which event types are allowed to be sent (TYPE_NONE|TYPE_EVENT|TYPE_JOB|TYPE_PURGE|TYPE_ALL) */
97    private $allowedEventTypes;
98
99    /** @var EventFactory|null event creator */
100    private $eventFactory;
101
102    /** @var int Maximum byte size of a batch */
103    private $maxBatchByteSize;
104
105    /** @var bool Whether to forward the X-Client-IP header, if present */
106    private $forwardXClientIP;
107
108    /** @var string intake event service name */
109    private string $eventServiceName;
110
111    /** @var ?StatsFactory wf:Stats factory instance */
112    private ?StatsFactory $statsFactory;
113
114    /**
115     * @param MultiHttpClient $http
116     * @param string|int $enableEventBus A value of the wgEnableEventBus config, or a bitmask
117     * of TYPE_* constants
118     * @param EventFactory $eventFactory EventFactory to use for event construction.
119     * @param string $url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events
120     * @param int $maxBatchByteSize Maximum byte size of a batch
121     * @param int|null $timeout HTTP request timeout in seconds, defaults to 5.
122     * @param bool $forwardXClientIP Whether the X-Client-IP header should be forwarded
123     *   to the intake service, if present
124     * @param string $eventServiceName TODO: pass the event service name so that it can be used
125     *      to label metrics. This is a hack put in place while refactoring efforts on this class are
126     *   ongoing. This variable is used to label metrics in send().
127     * @param ?StatsFactory|null $statsFactory wf:Stats factory instance
128     */
129    public function __construct(
130        MultiHttpClient $http,
131        $enableEventBus,
132        EventFactory $eventFactory,
133        string $url,
134        int $maxBatchByteSize,
135        int $timeout = null,
136        bool $forwardXClientIP = false,
137        string $eventServiceName = EventBusFactory::EVENT_SERVICE_DISABLED_NAME,
138        ?StatsFactory $statsFactory = null
139    ) {
140        $this->http = $http;
141        $this->url = $url;
142        $this->maxBatchByteSize = $maxBatchByteSize;
143        $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT;
144        $this->eventFactory = $eventFactory;
145        $this->forwardXClientIP = $forwardXClientIP;
146        $this->eventServiceName = $eventServiceName;
147        $this->statsFactory = $statsFactory;
148
149        if ( is_int( $enableEventBus ) ) {
150            Assert::precondition(
151                (int)( $enableEventBus & self::TYPE_ALL ) === $enableEventBus,
152                'Invalid $enableEventBus parameter: ' . $enableEventBus
153            );
154            $this->allowedEventTypes = $enableEventBus;
155        } elseif ( is_string( $enableEventBus ) && $enableEventBus ) {
156            $this->allowedEventTypes = self::TYPE_NONE;
157            $allowedTypes = explode( '|', $enableEventBus );
158            foreach ( $allowedTypes as $allowedType ) {
159                Assert::precondition(
160                    array_key_exists( $allowedType, self::EVENT_TYPE_NAMES ),
161                    "EnableEventBus: $allowedType not recognized"
162                );
163                $this->allowedEventTypes |= self::EVENT_TYPE_NAMES[$allowedType];
164            }
165        } else {
166            $this->allowedEventTypes = self::TYPE_ALL;
167        }
168    }
169
170    /**
171     * @param array $events
172     * @param int $serializedSize
173     * @return array
174     */
175    private function partitionEvents( array $events, int $serializedSize ): array {
176        $results = [];
177
178        if ( count( $events ) > 1 ) {
179            $numOfChunks = ceil( $serializedSize / $this->maxBatchByteSize );
180            $partitions = array_chunk( $events, (int)floor( count( $events ) / $numOfChunks ) );
181            foreach ( $partitions as $partition ) {
182                $serializedPartition = self::serializeEvents( $partition );
183                if ( strlen( $serializedPartition ) > $this->maxBatchByteSize ) {
184                    $results = array_merge(
185                        $results,
186                        $this->partitionEvents( $partition, strlen( $serializedPartition ) )
187                    );
188                } else {
189                    $results[] = $serializedPartition;
190                }
191            }
192        } else {
193            self::logger()->warning(
194                "Event is larger than the maxBatchByteSize set.",
195                [
196                    'raw_event' => self::prepareEventsForLogging( $events )
197                ]
198            );
199            $results = [ self::serializeEvents( $events ) ];
200        }
201        return $results;
202    }
203
204    /**
205     * @param string $metricName
206     * @param int $value
207     * @param mixed ...$labels passed as $key => $value pairs
208     * @return void
209     */
210    private function incrementMetricByValue( string $metricName, int $value, ...$labels ): void {
211        // Feature flag to enable instrumentation on Beta
212        // https://wikitech.wikimedia.org/wiki/Nova_Resource:Deployment-prep/How_code_is_updated#My_code_introduces_a_feature_that_is_not_yet_ready_for_production,_should_I_wait_to_merge_in_master?
213        global $wgEnableEventBusInstrumentation;
214        if ( $wgEnableEventBusInstrumentation && isset( $this->statsFactory ) ) {
215            $metric = $this->statsFactory->getCounter( $metricName );
216            foreach ( $labels as $label ) {
217                foreach ( $label as $k => $v ) {
218                    $metric->setLabel( $k, $v );
219                }
220            }
221            // Under the hood, Counter::incrementBy will update an integer
222            // valued counter, regardless of `$value` type.
223            $metric->incrementBy( $value );
224        }
225    }
226
227    /**
228     * Deliver an array of events to the remote service.
229     *
230     * @param array|string $events the events to send.
231     * @param int $type the type of the event being sent.
232     * @return array|bool|string True on success or an error string or array on failure
233     * @throws Exception
234     */
235    public function send( $events, $type = self::TYPE_EVENT ) {
236        // Label metrics by event type name. If the lookup fails,
237        // fall back to labeling with the $type id parameter. Unknown or invalid ids
238        // will be reported by the `events_are_not_enqueable` metric, which
239        // fires when an event type does not belong to this EventBus instance allow list.
240        // It should not be possible to supply a $type that does not belong
241        // to EVENT_TYPE_NAME. Falling back to a numerical id is just a guard
242        // to help us spot eventual bugs.
243        $eventType = array_search( $type, self::EVENT_TYPE_NAMES );
244        $eventType = ( $eventType === false ) ? $type : $eventType;
245        // Label metrics with the EventBus declared default service host.
246        // In the future, for streams that declare one, use the one provided by EventStreamConfig instead.
247        $baseMetricLabels = [ "function_name" => "send", "event_type" => $eventType,
248            "event_service_name" => $this->eventServiceName, "event_service_uri" => $this->url ];
249
250        if ( !$this->shouldSendEvent( $type ) ) {
251            // Debug metric. How often is the `$events` param not enqueable?
252            $this->incrementMetricByValue( "events_are_not_enqueable", 1, $baseMetricLabels );
253            return "Events of type '$type' are not enqueueable";
254        }
255        if ( !$events ) {
256            // Logstash doesn't like the args, because they could be of various types
257            $context = [ 'exception' => new RuntimeException() ];
258            self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context );
259            return "Provided event list is empty";
260        }
261
262        $numEvents = 0;
263        // If we already have a JSON string of events, just use it as the body.
264        if ( is_string( $events ) ) {
265            // TODO: is $events ever passed as a string? We should refactor this block and simplify the union type.
266            // Debug metric. How often is the `$events` param a string?
267            $this->incrementMetricByValue( "events_is_string",
268                1,
269                $baseMetricLabels
270            );
271            if ( strlen( $events ) > $this->maxBatchByteSize ) {
272                $decodeEvents = FormatJson::decode( $events, true );
273                $numEvents = count( $decodeEvents );
274                $body = $this->partitionEvents( $decodeEvents, strlen( $events ) );
275                // We have no guarantee that all events passed to send() will declare the
276                // same meta.stream. Iterate over the array an increment the counter by stream.
277                foreach ( $decodeEvents as $event ) {
278                    // TODO can we assume $decodeEvents will also have meta.stream set?
279                    // coerce to null for tests compat
280                    $streamName = $event['meta']['stream'] ?? self::STREAM_UNKNOWN_NAME;
281                        $this->incrementMetricByValue( "outgoing_events_total",
282                            1,
283                            $baseMetricLabels,
284                            [ "stream_name" => $streamName ]
285                        );
286                }
287            } else {
288                $body = $events;
289            }
290        } else {
291            $numEvents = count( $events );
292            self::validateJSONSerializable( $events );
293            // Else serialize the array of events to a JSON string.
294            $serializedEvents = self::serializeEvents( $events );
295            // If not $body, then something when wrong.
296            // serializeEvents has already logged, so we can just return.
297            if ( !$serializedEvents ) {
298                return "Unable to serialize events";
299            }
300            foreach ( $events as $event ) {
301                $streamName = $event['meta']['stream'] ?? self::STREAM_UNKNOWN_NAME;
302                    $this->incrementMetricByValue( "outgoing_events_total",
303                        1,
304                        $baseMetricLabels,
305                        [ "stream_name" => $streamName ]
306                    );
307            }
308
309            if ( strlen( $serializedEvents ) > $this->maxBatchByteSize ) {
310                $body = $this->partitionEvents( $events, strlen( $serializedEvents ) );
311            } else {
312                $body = $serializedEvents;
313            }
314        }
315
316        $originalRequest = RequestContext::getMain()->getRequest();
317
318        $reqs = array_map( function ( $body ) use ( $originalRequest ) {
319            $req = [
320                'url'        => $this->url,
321                'method'    => 'POST',
322                'body'        => $body,
323                'headers'    => [ 'content-type' => 'application/json' ]
324            ];
325            if ( $this->forwardXClientIP ) {
326                $req['headers']['x-client-ip'] = $originalRequest->getIP();
327            }
328            return $req;
329        }, is_array( $body ) ? $body : [ $body ] );
330
331        $responses = $this->http->runMulti(
332            $reqs,
333            [
334                'reqTimeout' => $this->timeout
335            ]
336        );
337
338        // 201: all events accepted.
339        // 202: all events accepted but not necessarily persisted. HTTP response is returned 'hastily'.
340        // 207: some but not all events accepted: either due to validation failure or error.
341        // 400: no events accepted: all failed schema validation.
342        // 500: no events accepted: at least one caused an error, but some might have been invalid.
343        $results = [];
344        foreach ( $responses as $response ) {
345            $res = $response['response'];
346            $code = $res['code'];
347
348            $this->incrementMetricByValue( "event_service_response_total",
349                1,
350                $baseMetricLabels,
351                [ "status_code" => $code ]
352            );
353
354            if ( $code == 207 || $code >= 300 ) {
355                $message = empty( $res['error'] ) ?
356                    (string)$code . ': ' . (string)$res['reason'] : $res['error'];
357                // Limit the maximum size of the logged context to 8 kilobytes as that's where logstash
358                // truncates the JSON anyway
359                $context = [
360                    'raw_events' => self::prepareEventsForLogging( $body ),
361                    'service_response' => $res,
362                    'exception' => new RuntimeException(),
363                ];
364                self::logger()->error( "Unable to deliver all events: {$message}", $context );
365
366                if ( isset( $res['body'] ) ) {
367                    // We expect the event service to return an array of objects
368                    // in the response body.
369                    // FormatJson::decode will return `null` if the message failed to parse.
370                    // If anything other than an array is parsed we treat it as unexpected
371                    // behaviour, and log the response at error severity.
372                    // See https://phabricator.wikimedia.org/T370428
373                    $failedEvents = FormatJson::decode( $res['body'], true );
374                    if ( is_array( $failedEvents ) ) {
375                        foreach ( $failedEvents as $failureKind => $failureList ) {
376                            // $failureList should not be null. This is just a guard against what the intake
377                            // service returns (or the behavior of different json parsing methods - possibly).
378                            $numFailedEvents = count( $failureList ?? [] );
379                            $this->incrementMetricByValue( "outgoing_events_failed_total",
380                                $numFailedEvents,
381                                $baseMetricLabels,
382                                [ "failure_kind" => $failureKind ]
383                            );
384
385                            foreach ( $failureList as $failurePayload ) {
386                                // TODO: can we assume that `error` messages can always be parsed?
387                                // Exception handling is expensive. This will need profiling.
388                                // At this point of execution, events have already been submitted to
389                                // to the event service, and the client should not experience latency.
390                                $event = FormatJson::decode( $failurePayload['event'], true );
391                                if ( $event === null ) {
392                                    self::logger()->error( "Unable to parse error messages from
393                                            the event service response body.", $context );
394                                }
395                                $streamName = $event['meta']['stream'] ?? self::STREAM_UNKNOWN_NAME;
396                                $this->incrementMetricByValue( "outgoing_events_failed_by_stream_total",
397                                    1,
398                                    $baseMetricLabels,
399                                    [ "stream_name" => $streamName, "failure_kind" => $failureKind ]
400                                );
401                            }
402                        }
403                    } else {
404                        self::logger()->error( "Invalid event service response body", $context );
405                    }
406                }
407                $results[] = "Unable to deliver all events: $message";
408            } else {
409                // 201, 202 all events have been accepted (but not necessarily persisted).
410                $this->incrementMetricByValue( "outgoing_events_accepted_total",
411                    $numEvents,
412                    $baseMetricLabels,
413                    [ "status_code" => $code ]
414                );
415            }
416        }
417
418        if ( $results !== [] ) {
419            return $results;
420        }
421
422        return true;
423    }
424
425    // == static helper functions below ==
426
427    /**
428     * Serializes $events array to a JSON string.  If FormatJson::encode()
429     * returns false, this will log a detailed error message and return null.
430     *
431     * @param array $events
432     * @return string|null JSON or null on failure
433     */
434    public static function serializeEvents( $events ) {
435        try {
436            $serializedEvents = FormatJson::encode( $events, false, FormatJson::ALL_OK );
437            if ( !$serializedEvents ) {
438                // Something failed. Let's figure out exactly which one.
439                $bad = [];
440                foreach ( $events as $event ) {
441                    $result = FormatJson::encode( $event, false, FormatJson::ALL_OK );
442                    if ( !$result ) {
443                        $bad[] = $event;
444                    }
445                }
446                $context = [
447                    'exception' => new RuntimeException(),
448                    'json_last_error' => json_last_error_msg(),
449                    // Use PHP serialization since that will *always* work.
450                    'events' => serialize( $bad ),
451                ];
452                self::logger()->error(
453                    'FormatJson::encode($events) failed: ' . $context['json_last_error'] .
454                    '. Aborting send.', $context
455                );
456                return null;
457            }
458            return $serializedEvents;
459        } catch ( Exception $exception ) {
460            $context = [
461                'exception' => $exception,
462                'json_last_error' => json_last_error_msg()
463            ];
464            self::logger()->error(
465                'FormatJson::encode($events) thrown exception: ' . $context['json_last_error'] .
466                '. Aborting send.', $context
467            );
468            return null;
469        }
470    }
471
472    /**
473     * Prepares events for logging - serializes if needed, limits the size
474     * of the serialized event to 8kb.
475     *
476     * @param string|array $events
477     * @return string|null
478     */
479    private static function prepareEventsForLogging( $events ) {
480        if ( is_array( $events ) ) {
481            $events = self::serializeEvents( $events );
482        }
483
484        if ( $events === null ) {
485            return null;
486        }
487
488        return strlen( $events ) > 8192 ? substr( $events, 0, 8192 ) : $events;
489    }
490
491    /**
492     * If $value is a string, but not UTF-8 encoded, then assume it is binary
493     * and base64 encode it and prefix it with a content type.
494     * @param mixed $value
495     * @return mixed
496     */
497    public static function replaceBinaryValues( $value ) {
498        if ( is_string( $value ) && !mb_check_encoding( $value, 'UTF-8' ) ) {
499            return 'data:application/octet-stream;base64,' . base64_encode( $value );
500        }
501        return $value;
502    }
503
504    /**
505     * Recursively calls replaceBinaryValues on an array and transforms
506     * any binary values.  $array is passed by reference and will be modified.
507     * @param array &$array
508     * @return bool return value of array_walk_recursive
509     */
510    public static function replaceBinaryValuesRecursive( &$array ) {
511        return array_walk_recursive( $array, function ( &$item, $key ) {
512            $item = self::replaceBinaryValues( $item );
513        } );
514    }
515
516    /**
517     * Checks a part of the event for JSON-serializability
518     *
519     * @param array $originalEvent an original event that is being checked.
520     * @param array $eventPart the sub-object nested in the original event to be checked.
521     */
522    private static function checkEventPart( $originalEvent, $eventPart ) {
523        foreach ( $eventPart as $p => $v ) {
524            if ( is_array( $v ) ) {
525                self::checkEventPart( $originalEvent, $v );
526            } elseif ( !is_scalar( $v ) && $v !== null ) {
527                // Only log the first appearance of non-scalar property per event as jobs
528                // might contain hundreds of properties and we do not want to log everything.
529                self::logger()->error( 'Non-scalar value found in the event', [
530                    'raw_events' => self::prepareEventsForLogging( [ $originalEvent ] ),
531                    'prop_name' => $p,
532                    'prop_val_type' => get_class( $v )
533                ] );
534                // Follow to the next event in the array
535                return;
536            }
537        }
538    }
539
540    /**
541     * Checks if the event is JSON-serializable (contains only scalar values)
542     * and logs the event if non-scalar found.
543     *
544     * @param array $events
545     */
546    private static function validateJSONSerializable( $events ) {
547        foreach ( $events as $event ) {
548            self::checkEventPart( $event, $event );
549        }
550    }
551
552    private function shouldSendEvent( $eventType ) {
553        return $this->allowedEventTypes & $eventType;
554    }
555
556    /**
557     * Returns a singleton logger instance for all EventBus instances.
558     * Use like: self::logger()->info( $mesage )
559     * We use this so we don't have to check if the logger has been created
560     * before attempting to log a message.
561     * @return LoggerInterface
562     */
563    public static function logger() {
564        if ( !self::$logger ) {
565            self::$logger = LoggerFactory::getInstance( 'EventBus' );
566        }
567        return self::$logger;
568    }
569
570    /**
571     * Returns the EventFactory associated with this instance of EventBus
572     * @return EventFactory|null
573     */
574    public function getFactory() {
575        return $this->eventFactory;
576    }
577
578    /**
579     * @param string|null $eventServiceName
580     *         The name of a key in the EventServices config looked up via
581     *         MediaWikiServices::getInstance()->getMainConfig()->get('EventServices').
582     *         The EventService config is keyed by service name, and should at least contain
583     *         a 'url' entry pointing at the event service endpoint events should be
584     *         POSTed to. They can also optionally contain a 'timeout' entry specifying
585     *         the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry that can be
586     *         set to true if the X-Client-IP header from the originating request should be
587     *         forwarded to the event service. Instances are singletons identified by
588     *         $eventServiceName.
589     *
590     *         NOTE: Previously, this function took a $config object instead of an
591     *         event service name.  This is a backwards compatible change, but because
592     *         there are no other users of this extension, we can do this safely.
593     *
594     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
595     * @return EventBus
596     */
597    public static function getInstance( $eventServiceName ) {
598        return MediaWikiServices::getInstance()
599            ->get( 'EventBus.EventBusFactory' )
600            ->getInstance( $eventServiceName );
601    }
602
603    /**
604     * Uses EventStreamConfig.StreamConfigs to look up the
605     * EventBus instance to use for a $stream.
606     * If the stream is disabled, a non-producing EventBus instance will be used.
607     * If none is found, falls back to using wgEventServiceDefault.
608     *
609     * @param string $stream the stream to send an event to
610     * @return EventBus
611     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
612     */
613    public static function getInstanceForStream( $stream ) {
614        return MediaWikiServices::getInstance()
615            ->get( 'EventBus.EventBusFactory' )
616            ->getInstanceForStream( $stream );
617    }
618}