Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
62.50% covered (warning)
62.50%
175 / 280
47.06% covered (danger)
47.06%
8 / 17
CRAP
0.00% covered (danger)
0.00%
0 / 1
EventBus
62.50% covered (warning)
62.50%
175 / 280
47.06% covered (danger)
47.06%
8 / 17
311.84
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
24 / 24
100.00% covered (success)
100.00%
1 / 1
6
 partitionEvents
80.00% covered (warning)
80.00%
16 / 20
0.00% covered (danger)
0.00%
0 / 1
4.13
 incrementMetricByValue
8.33% covered (danger)
8.33%
1 / 12
0.00% covered (danger)
0.00%
0 / 1
24.26
 send
66.67% covered (warning)
66.67%
102 / 153
0.00% covered (danger)
0.00%
0 / 1
39.93
 getStreamNameFromEvent
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 groupEventsByStream
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
2
 serializeEvents
10.71% covered (danger)
10.71%
3 / 28
0.00% covered (danger)
0.00%
0 / 1
22.79
 prepareEventsForLogging
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
4.13
 replaceBinaryValues
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 replaceBinaryValuesRecursive
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 checkEventPart
40.00% covered (danger)
40.00%
4 / 10
0.00% covered (danger)
0.00%
0 / 1
10.40
 validateJSONSerializable
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 shouldSendEvent
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 logger
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getFactory
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getInstance
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getInstanceForStream
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * Event delivery.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 * http://www.gnu.org/copyleft/gpl.html
20 *
21 * @file
22 * @author Eric Evans, Andrew Otto
23 */
24
25namespace MediaWiki\Extension\EventBus;
26
27use Exception;
28use FormatJson;
29use InvalidArgumentException;
30use MediaWiki\Context\RequestContext;
31use MediaWiki\Logger\LoggerFactory;
32use MediaWiki\MediaWikiServices;
33use MultiHttpClient;
34use Psr\Log\LoggerInterface;
35use RuntimeException;
36use Wikimedia\Assert\Assert;
37use Wikimedia\Stats\StatsFactory;
38
39class EventBus {
40
41    /**
42     * @const int the special event type indicating no events should be accepted.
43     */
44    public const TYPE_NONE = 0;
45
46    /**
47     * @const int the event type indicating that the event is a regular mediawiki event.
48     */
49    public const TYPE_EVENT = 1;
50
51    /**
52     * @const int the event type indicating that the event is a serialized job.
53     */
54    public const TYPE_JOB = 2;
55
56    /**
57     * @const int the event type indicating that the event is a CDN purge.
58     */
59    public const TYPE_PURGE = 4;
60
61    /**
62     * @const int the event type indicating any event type. (TYPE_EVENT ^ TYPE_EVENT)
63     */
64    public const TYPE_ALL = self::TYPE_EVENT | self::TYPE_JOB | self::TYPE_PURGE;
65
66    /**
67     * @const array names of the event type constants defined above
68     */
69    private const EVENT_TYPE_NAMES = [
70        'TYPE_NONE' => self::TYPE_NONE,
71        'TYPE_EVENT' => self::TYPE_EVENT,
72        'TYPE_JOB' => self::TYPE_JOB,
73        'TYPE_PURGE' => self::TYPE_PURGE,
74        'TYPE_ALL' => self::TYPE_ALL,
75    ];
76
77    /** @const int Default HTTP request timeout in seconds */
78    private const DEFAULT_REQUEST_TIMEOUT = 10;
79
80    /** @const string fallback value to use when a prometheus metrics label value
81     * (e.g. `meta.stream`) is not assigned.
82     */
83    public const VALUE_UNKNOWN = "__value_unknown__";
84
85    /** @var LoggerInterface instance for all EventBus instances */
86    private static $logger;
87
88    /** @var MultiHttpClient */
89    private $http;
90
91    /** @var string EventServiceUrl for this EventBus instance */
92    private $url;
93
94    /** @var int HTTP request timeout for this EventBus instance */
95    private $timeout;
96
97    /** @var int which event types are allowed to be sent (TYPE_NONE|TYPE_EVENT|TYPE_JOB|TYPE_PURGE|TYPE_ALL) */
98    private $allowedEventTypes;
99
100    /** @var EventFactory|null event creator */
101    private $eventFactory;
102
103    /** @var int Maximum byte size of a batch */
104    private $maxBatchByteSize;
105
106    /** @var bool Whether to forward the X-Client-IP header, if present */
107    private $forwardXClientIP;
108
109    /** @var string intake event service name */
110    private string $eventServiceName;
111
112    /** @var ?StatsFactory wf:Stats factory instance */
113    private ?StatsFactory $statsFactory;
114
115    /**
116     * @param MultiHttpClient $http
117     * @param string|int $enableEventBus A value of the wgEnableEventBus config, or a bitmask
118     * of TYPE_* constants
119     * @param EventFactory $eventFactory EventFactory to use for event construction.
120     * @param string $url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events
121     * @param int $maxBatchByteSize Maximum byte size of a batch
122     * @param int|null $timeout HTTP request timeout in seconds, defaults to 5.
123     * @param bool $forwardXClientIP Whether the X-Client-IP header should be forwarded
124     *   to the intake service, if present
125     * @param string $eventServiceName TODO: pass the event service name so that it can be used
126     *     to label metrics. This is a hack put in place while refactoring efforts on this class are
127     *   ongoing. This variable is used to label metrics in send().
128     * @param ?StatsFactory|null $statsFactory wf:Stats factory instance
129     */
130    public function __construct(
131        MultiHttpClient $http,
132        $enableEventBus,
133        EventFactory $eventFactory,
134        string $url,
135        int $maxBatchByteSize,
136        int $timeout = null,
137        bool $forwardXClientIP = false,
138        string $eventServiceName = EventBusFactory::EVENT_SERVICE_DISABLED_NAME,
139        ?StatsFactory $statsFactory = null
140    ) {
141        $this->http = $http;
142        $this->url = $url;
143        $this->maxBatchByteSize = $maxBatchByteSize;
144        $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT;
145        $this->eventFactory = $eventFactory;
146        $this->forwardXClientIP = $forwardXClientIP;
147        $this->eventServiceName = $eventServiceName;
148        $this->statsFactory = $statsFactory;
149
150        if ( is_int( $enableEventBus ) ) {
151            Assert::precondition(
152                (int)( $enableEventBus & self::TYPE_ALL ) === $enableEventBus,
153                'Invalid $enableEventBus parameter: ' . $enableEventBus
154            );
155            $this->allowedEventTypes = $enableEventBus;
156        } elseif ( is_string( $enableEventBus ) && $enableEventBus ) {
157            $this->allowedEventTypes = self::TYPE_NONE;
158            $allowedTypes = explode( '|', $enableEventBus );
159            foreach ( $allowedTypes as $allowedType ) {
160                Assert::precondition(
161                    array_key_exists( $allowedType, self::EVENT_TYPE_NAMES ),
162                    "EnableEventBus: $allowedType not recognized"
163                );
164                $this->allowedEventTypes |= self::EVENT_TYPE_NAMES[$allowedType];
165            }
166        } else {
167            $this->allowedEventTypes = self::TYPE_ALL;
168        }
169    }
170
171    /**
172     * @param array $events
173     * @param int $serializedSize
174     * @return array
175     */
176    private function partitionEvents( array $events, int $serializedSize ): array {
177        $results = [];
178
179        if ( count( $events ) > 1 ) {
180            $numOfChunks = ceil( $serializedSize / $this->maxBatchByteSize );
181            $partitions = array_chunk( $events, (int)floor( count( $events ) / $numOfChunks ) );
182            foreach ( $partitions as $partition ) {
183                $serializedPartition = self::serializeEvents( $partition );
184                if ( strlen( $serializedPartition ) > $this->maxBatchByteSize ) {
185                    $results = array_merge(
186                        $results,
187                        $this->partitionEvents( $partition, strlen( $serializedPartition ) )
188                    );
189                } else {
190                    $results[] = $serializedPartition;
191                }
192            }
193        } else {
194            self::logger()->warning(
195                "Event is larger than the maxBatchByteSize set.",
196                [
197                    'raw_event' => self::prepareEventsForLogging( $events )
198                ]
199            );
200            $results = [ self::serializeEvents( $events ) ];
201        }
202        return $results;
203    }
204
205    /**
206     * @param string $metricName
207     * @param int $value
208     * @param mixed ...$labels passed as $key => $value pairs
209     * @return void
210     */
211    private function incrementMetricByValue( string $metricName, int $value, ...$labels ): void {
212        if ( isset( $this->statsFactory ) ) {
213            $metric = $this->statsFactory->getCounter( $metricName );
214            foreach ( $labels as $label ) {
215                foreach ( $label as $k => $v ) {
216                    // Bug: T373086
217                    if ( !isset( $v ) ) {
218                        $v = self::VALUE_UNKNOWN;
219                        self::logger()->warning(
220                            ' Initialized metric label does not have an assigned value. ',
221                            [ "metric_label" => $k ]
222                        );
223                    }
224                    $metric->setLabel( $k, $v );
225                }
226            }
227            // Under the hood, Counter::incrementBy will update an integer
228            // valued counter, regardless of `$value` type.
229            $metric->incrementBy( $value );
230        }
231    }
232
233    /**
234     * Deliver an array of events to the remote event intake service.
235     *
236     * Statslib metrics emitted by this method:
237     *
238     * - events_outgoing_total
239     * - events_outgoing_by_stream_total
240     * - events_accepted_total
241     * - events_failed_total
242     * - events_failed_by_stream_total
243     * - event_service_response_total
244     * - event_batch_not_enqueable_total
245     * - event_batch_is_string_total
246     * - event_batch_not_serializable_total
247     * - event_batch_partitioned_total
248     *     (incremented if $events had to be paratitioned and sent in multiple POST requests)
249     *
250     * @param array|string $events the events to send.
251     * @param int $type the type of the event being sent.
252     * @return array|bool|string True on success or an error string or array on failure
253     * @throws Exception
254     */
255    public function send( $events, $type = self::TYPE_EVENT ) {
256        // Label metrics by event type name. If the lookup fails,
257        // fall back to labeling with the $type id parameter. Unknown or invalid ids
258        // will be reported by the `events_are_not_enqueable` metric, which
259        // fires when an event type does not belong to this EventBus instance allow list.
260        // It should not be possible to supply a $type that does not belong
261        // to EVENT_TYPE_NAME. Falling back to a numerical id is just a guard
262        // to help us spot eventual bugs.
263        $eventType = array_search( $type, self::EVENT_TYPE_NAMES );
264        $eventType = ( $eventType === false ) ? $type : $eventType;
265        // Label metrics with the EventBus declared default service host.
266        // In the future, for streams that declare one, use the one provided by EventStreamConfig instead.
267        $baseMetricLabels = [
268            "function_name" => "send",
269            "event_type" => $eventType,
270            "event_service_name" => $this->eventServiceName,
271            "event_service_uri" => $this->url
272        ];
273
274        if ( !$this->shouldSendEvent( $type ) ) {
275            // Debug metric. How often is the `$events` param not enqueable?
276            $this->incrementMetricByValue(
277                "event_batch_not_enqueable_total",
278                1,
279                $baseMetricLabels
280            );
281            return "Events of type '$type' are not enqueueable";
282        }
283        if ( !$events ) {
284            // Logstash doesn't like the args, because they could be of various types
285            $context = [ 'exception' => new RuntimeException() ];
286            self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context );
287            return "Provided event list is empty";
288        }
289
290        // Historically, passing a JSON string has been supported, but we'd like to deprecate this feature.
291        // This was done to avoid having extra encode+decode steps if the caller already has a JSON string.
292        // But, we end up decoding always anyway, to properly increment metrics.
293        if ( is_string( $events ) ) {
294            // TODO: is $events ever passed as a string? We should refactor this block and simplify the union type.
295            // Debug metric. How often is the `$events` param a string?
296            $this->incrementMetricByValue(
297                "event_batch_is_string_total",
298                1,
299                $baseMetricLabels
300            );
301
302            $decodedEvents = FormatJson::decode( $events, true );
303
304            if ( $decodedEvents === null ) {
305                $context = [
306                    'exception' => new RuntimeException(),
307                    'raw_events' => self::prepareEventsForLogging( $events )
308                ];
309                self::logger()->error( 'Failed decoding events from JSON string.', $context );
310                return "Failed decoding events from JSON string";
311            }
312
313            $events = $decodedEvents;
314        }
315
316        // Code below expects that $events is a numeric array of event assoc arrays.
317        if ( !array_key_exists( 0, $events ) ) {
318            $events = [ $events ];
319        }
320        $outgoingEventsCount = count( $events );
321
322        // Increment events_outgoing_total
323        // NOTE: We could just use events_outgoing_by_stream_total and sum,
324        //       but below we want to emit an events_accepted_total.
325        //       In the case of a 207 partial success, we don't know
326        //       the stream names of the successful events
327        //       (without diffing the response from the event service and $events).
328        //       For consistency, we both events_outgoing_total and events_outgoing_by_stream_total.
329        $this->incrementMetricByValue(
330            "events_outgoing_total",
331            $outgoingEventsCount,
332            $baseMetricLabels,
333        );
334
335        // Increment events_outgoing_by_stream_total for each stream
336        $eventsByStream = self::groupEventsByStream( $events );
337        foreach ( $eventsByStream as $streamName => $eventsForStreamName ) {
338            $this->incrementMetricByValue(
339                "events_outgoing_by_stream_total",
340                count( $eventsForStreamName ),
341                $baseMetricLabels,
342                [ "stream_name" => $streamName ]
343            );
344        }
345
346        // validateJSONSerializable only logs if any part of the event is not serializable.
347        // It does not return anything or raise any exceptions.
348        self::validateJSONSerializable( $events );
349        // Serialize the array of events to a JSON string.
350        $serializedEvents = self::serializeEvents( $events );
351        if ( !$serializedEvents ) {
352            $this->incrementMetricByValue(
353                "event_batch_not_serializable_total",
354                1,
355                $baseMetricLabels
356            );
357            // serializeEvents has already logged, so we can just return.
358            return "Unable to serialize events";
359        }
360
361        // If the body would be too big, partition it into multiple bodies.
362        if ( strlen( $serializedEvents ) > $this->maxBatchByteSize ) {
363            $postBodies = $this->partitionEvents( $events, strlen( $serializedEvents ) );
364            // Measure the number of times we partition events into more than one batch.
365            $this->incrementMetricByValue(
366                "event_batch_partitioned_total",
367                1,
368                $baseMetricLabels
369            );
370        } else {
371            $postBodies = [ $serializedEvents ];
372        }
373
374        // Most of the time $postBodies will be a single element array, and we
375        // will only need to send one POST request.
376        // When the size is too large, $events will have been partitioned into
377        // multiple $postBodies, for which each will be sent as its own POST request.
378        $originalRequest = RequestContext::getMain()->getRequest();
379        $requests = array_map(
380            function ( $postBody ) use ( $originalRequest ) {
381                $req = [
382                    'url' => $this->url,
383                    'method' => 'POST',
384                    'body' => $postBody,
385                    'headers' => [ 'content-type' => 'application/json' ]
386                ];
387                if ( $this->forwardXClientIP ) {
388                    $req['headers']['x-client-ip'] = $originalRequest->getIP();
389                }
390                return $req;
391            },
392            $postBodies
393        );
394
395        // Do the POST requests.
396        $responses = $this->http->runMulti(
397            $requests,
398            [
399                'reqTimeout' => $this->timeout
400            ]
401        );
402
403        // Keep track of the total number of failed events.
404        // This will be used to calculate events_accepted_count later.
405        $failedEventsCountTotal = 0;
406
407        // 201: all events accepted.
408        // 202: all events accepted but not necessarily persisted. HTTP response is returned 'hastily'.
409        // 207: some but not all events accepted: either due to validation failure or error.
410        // 400: no events accepted: all failed schema validation.
411        // 500: no events accepted: at least one caused an error, but some might have been invalid.
412        $results = [];
413        foreach ( $responses as $i => $response ) {
414            $res = $response['response'];
415            $code = $res['code'];
416
417            $this->incrementMetricByValue(
418                "event_service_response_total",
419                1,
420                $baseMetricLabels,
421                [ "status_code" => $code ]
422            );
423
424            if ( $code == 207 || $code >= 300 ) {
425                $message = empty( $res['error'] ) ?
426                    (string)$code . ': ' . (string)$res['reason'] : $res['error'];
427                // Limit the maximum size of the logged context to 8 kilobytes as that's where logstash
428                // truncates the JSON anyway.
429                $context = [
430                    // $responses[$i] corresponds with $postBodies[$i]
431                    'raw_events' => self::prepareEventsForLogging( $postBodies[$i] ),
432                    'service_response' => $res,
433                    'exception' => new RuntimeException(),
434                ];
435                self::logger()->error( "Unable to deliver all events: {$message}", $context );
436
437                if ( isset( $res['body'] ) ) {
438                    // We expect the event service to return an array of objects
439                    // in the response body.
440                    // FormatJson::decode will return `null` if the message failed to parse.
441                    // If anything other than an array is parsed we treat it as unexpected
442                    // behaviour, and log the response at error severity.
443                    // See https://phabricator.wikimedia.org/T370428
444
445                    // $failureInfosByKind should look like:
446                    // {
447                    //       "<failure_kind">: [
448                    //         { ..., "event": {<failed event here>}, "context": {<failure context here>},
449                    //        { ... }
450                    //    ],
451                    // }
452                    $failureInfosByKind = FormatJson::decode( $res['body'], true );
453                    if ( is_array( $failureInfosByKind ) ) {
454
455                        foreach ( $failureInfosByKind as $failureKind => $failureInfos ) {
456                            // $failureInfos should not be null or empty.
457                            // This is just a guard against what the intake
458                            // service returns (or the behavior of different json parsing methods - possibly).
459                            // https://www.mediawiki.org/wiki/Manual:Coding_conventions/PHP#empty()
460                            if ( !isset( $failureInfos ) || $failureInfos === [] ) {
461                                continue;
462                            }
463
464                            // Get the events that failed from the response.
465                            $failedEvents = array_map(
466                                static function ( $failureStatus ) {
467                                    return $failureStatus['event'] ?? null;
468                                },
469                                $failureInfos
470                            );
471
472                            $failedEventsCount = count( $failedEvents );
473                            $failedEventsCountTotal += $failedEventsCount;
474
475                            // increment events_failed_total
476                            $this->incrementMetricByValue(
477                                "events_failed_total",
478                                $failedEventsCount,
479                                $baseMetricLabels,
480                                [
481                                    "failure_kind" => $failureKind,
482                                    "status_code" => $code,
483                                ]
484                            );
485
486                            // Group failed events by stream and increment events_failed_by_stream_total.
487                            $failedEventsByStream = self::groupEventsByStream( $failedEvents );
488                            foreach ( $failedEventsByStream as $streamName => $failedEventsForStream ) {
489                                $this->incrementMetricByValue(
490                                    "events_failed_by_stream_total",
491                                    count( $failedEventsForStream ),
492                                    $baseMetricLabels,
493                                    [
494                                        "failure_kind" => $failureKind,
495                                        "status_code" => $code,
496                                        "stream_name" => $streamName,
497                                    ]
498                                );
499                            }
500                        }
501
502                    } else {
503                        self::logger()->error( "Invalid event service response body", $context );
504                    }
505                }
506                $results[] = "Unable to deliver all events: $message";
507            }
508        }
509
510        // increment events_accepted_total as the difference between
511        // $outgoingEventsCount and $failedEventsCountTotal (if there were any failed events).
512        $this->incrementMetricByValue(
513            "events_accepted_total",
514            $outgoingEventsCount - $failedEventsCountTotal,
515            $baseMetricLabels,
516        );
517
518        if ( $results !== [] ) {
519            return $results;
520        }
521
522        return true;
523    }
524
525    // == static helper functions below ==
526
527    /**
528     * Given an event assoc array, extracts the stream name from meta.stream,
529     * or returns STREAM_NAME_UNKNOWN
530     * @param array|null $event
531     * @return mixed|string
532     */
533    public static function getStreamNameFromEvent( ?array $event ) {
534        return is_array( $event ) && isset( $event['meta']['stream'] ) ?
535            $event['meta']['stream'] :
536            self::VALUE_UNKNOWN;
537    }
538
539    /**
540     * Given an assoc array of events, this returns them grouped by stream name.
541     * @param array $events
542     * @return array
543     */
544    public static function groupEventsByStream( array $events ): array {
545        $groupedEvents = [];
546        foreach ( $events as $event ) {
547            $streamName = self::getStreamNameFromEvent( $event );
548            $groupedEvents[$streamName] ??= [];
549            $groupedEvents[$streamName][] = $event;
550        }
551        return $groupedEvents;
552    }
553
554    /**
555     * Serializes $events array to a JSON string.  If FormatJson::encode()
556     * returns false, this will log a detailed error message and return null.
557     *
558     * @param array $events
559     * @return string|null JSON or null on failure
560     */
561    public static function serializeEvents( $events ) {
562        try {
563            $serializedEvents = FormatJson::encode( $events, false, FormatJson::ALL_OK );
564            if ( !$serializedEvents ) {
565                // Something failed. Let's figure out exactly which one.
566                $bad = [];
567                foreach ( $events as $event ) {
568                    $result = FormatJson::encode( $event, false, FormatJson::ALL_OK );
569                    if ( !$result ) {
570                        $bad[] = $event;
571                    }
572                }
573                $context = [
574                    'exception' => new RuntimeException(),
575                    'json_last_error' => json_last_error_msg(),
576                    // Use PHP serialization since that will *always* work.
577                    'events' => serialize( $bad ),
578                ];
579                self::logger()->error(
580                    'FormatJson::encode($events) failed: ' . $context['json_last_error'] .
581                    '. Aborting send.', $context
582                );
583                return null;
584            }
585            return $serializedEvents;
586        } catch ( Exception $exception ) {
587            $context = [
588                'exception' => $exception,
589                'json_last_error' => json_last_error_msg()
590            ];
591            self::logger()->error(
592                'FormatJson::encode($events) thrown exception: ' . $context['json_last_error'] .
593                '. Aborting send.', $context
594            );
595            return null;
596        }
597    }
598
599    /**
600     * Prepares events for logging - serializes if needed, limits the size
601     * of the serialized event to 8kb.
602     *
603     * @param string|array $events
604     * @return string|null
605     */
606    private static function prepareEventsForLogging( $events ) {
607        if ( is_array( $events ) ) {
608            $events = self::serializeEvents( $events );
609        }
610
611        if ( $events === null ) {
612            return null;
613        }
614
615        return strlen( $events ) > 8192 ? substr( $events, 0, 8192 ) : $events;
616    }
617
618    /**
619     * If $value is a string, but not UTF-8 encoded, then assume it is binary
620     * and base64 encode it and prefix it with a content type.
621     * @param mixed $value
622     * @return mixed
623     */
624    public static function replaceBinaryValues( $value ) {
625        if ( is_string( $value ) && !mb_check_encoding( $value, 'UTF-8' ) ) {
626            return 'data:application/octet-stream;base64,' . base64_encode( $value );
627        }
628        return $value;
629    }
630
631    /**
632     * Recursively calls replaceBinaryValues on an array and transforms
633     * any binary values.  $array is passed by reference and will be modified.
634     * @param array &$array
635     * @return bool return value of array_walk_recursive
636     */
637    public static function replaceBinaryValuesRecursive( &$array ) {
638        return array_walk_recursive( $array, function ( &$item, $key ) {
639            $item = self::replaceBinaryValues( $item );
640        } );
641    }
642
643    /**
644     * Checks a part of the event for JSON-serializability
645     *
646     * @param array $originalEvent an original event that is being checked.
647     * @param array $eventPart the sub-object nested in the original event to be checked.
648     */
649    private static function checkEventPart( $originalEvent, $eventPart ) {
650        foreach ( $eventPart as $p => $v ) {
651            if ( is_array( $v ) ) {
652                self::checkEventPart( $originalEvent, $v );
653            } elseif ( !is_scalar( $v ) && $v !== null ) {
654                // Only log the first appearance of non-scalar property per event as jobs
655                // might contain hundreds of properties and we do not want to log everything.
656                self::logger()->error( 'Non-scalar value found in the event', [
657                    'raw_events' => self::prepareEventsForLogging( [ $originalEvent ] ),
658                    'prop_name' => $p,
659                    'prop_val_type' => get_class( $v )
660                ] );
661                // Follow to the next event in the array
662                return;
663            }
664        }
665    }
666
667    /**
668     * Checks if the event is JSON-serializable (contains only scalar values)
669     * and logs the event if non-scalar found.
670     *
671     * @param array $events
672     */
673    private static function validateJSONSerializable( $events ) {
674        foreach ( $events as $event ) {
675            self::checkEventPart( $event, $event );
676        }
677    }
678
679    private function shouldSendEvent( $eventType ) {
680        return $this->allowedEventTypes & $eventType;
681    }
682
683    /**
684     * Returns a singleton logger instance for all EventBus instances.
685     * Use like: self::logger()->info( $mesage )
686     * We use this so we don't have to check if the logger has been created
687     * before attempting to log a message.
688     * @return LoggerInterface
689     */
690    public static function logger() {
691        if ( !self::$logger ) {
692            self::$logger = LoggerFactory::getInstance( 'EventBus' );
693        }
694        return self::$logger;
695    }
696
697    /**
698     * Returns the EventFactory associated with this instance of EventBus
699     * @return EventFactory|null
700     */
701    public function getFactory() {
702        return $this->eventFactory;
703    }
704
705    /**
706     * @param string|null $eventServiceName
707     *        The name of a key in the EventServices config looked up via
708     *        MediaWikiServices::getInstance()->getMainConfig()->get('EventServices').
709     *        The EventService config is keyed by service name, and should at least contain
710     *        a 'url' entry pointing at the event service endpoint events should be
711     *        POSTed to. They can also optionally contain a 'timeout' entry specifying
712     *        the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry that can be
713     *        set to true if the X-Client-IP header from the originating request should be
714     *        forwarded to the event service. Instances are singletons identified by
715     *        $eventServiceName.
716     *
717     *        NOTE: Previously, this function took a $config object instead of an
718     *        event service name.  This is a backwards compatible change, but because
719     *        there are no other users of this extension, we can do this safely.
720     *
721     * @return EventBus
722     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
723     */
724    public static function getInstance( $eventServiceName ) {
725        return MediaWikiServices::getInstance()
726            ->get( 'EventBus.EventBusFactory' )
727            ->getInstance( $eventServiceName );
728    }
729
730    /**
731     * Uses EventStreamConfig.StreamConfigs to look up the
732     * EventBus instance to use for a $stream.
733     * If the stream is disabled, a non-producing EventBus instance will be used.
734     * If none is found, falls back to using wgEventServiceDefault.
735     *
736     * @param string $stream the stream to send an event to
737     * @return EventBus
738     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
739     */
740    public static function getInstanceForStream( $stream ) {
741        return MediaWikiServices::getInstance()
742            ->get( 'EventBus.EventBusFactory' )
743            ->getInstanceForStream( $stream );
744    }
745}