Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
62.23% covered (warning)
62.23%
173 / 278
47.06% covered (danger)
47.06%
8 / 17
CRAP
0.00% covered (danger)
0.00%
0 / 1
EventBus
62.23% covered (warning)
62.23%
173 / 278
47.06% covered (danger)
47.06%
8 / 17
317.14
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
24 / 24
100.00% covered (success)
100.00%
1 / 1
6
 partitionEvents
80.00% covered (warning)
80.00%
16 / 20
0.00% covered (danger)
0.00%
0 / 1
4.13
 incrementMetricByValue
8.33% covered (danger)
8.33%
1 / 12
0.00% covered (danger)
0.00%
0 / 1
24.26
 send
66.23% covered (warning)
66.23%
100 / 151
0.00% covered (danger)
0.00%
0 / 1
40.65
 getStreamNameFromEvent
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 groupEventsByStream
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
2
 serializeEvents
10.71% covered (danger)
10.71%
3 / 28
0.00% covered (danger)
0.00%
0 / 1
22.79
 prepareEventsForLogging
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
4.13
 replaceBinaryValues
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 replaceBinaryValuesRecursive
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 checkEventPart
40.00% covered (danger)
40.00%
4 / 10
0.00% covered (danger)
0.00%
0 / 1
10.40
 validateJSONSerializable
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 shouldSendEvent
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 logger
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getFactory
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getInstance
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getInstanceForStream
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * Event delivery.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 * http://www.gnu.org/copyleft/gpl.html
20 *
21 * @file
22 * @author Eric Evans, Andrew Otto
23 */
24
25namespace MediaWiki\Extension\EventBus;
26
27use Exception;
28use InvalidArgumentException;
29use MediaWiki\Context\RequestContext;
30use MediaWiki\Json\FormatJson;
31use MediaWiki\Logger\LoggerFactory;
32use MediaWiki\MediaWikiServices;
33use Psr\Log\LoggerInterface;
34use RuntimeException;
35use Wikimedia\Assert\Assert;
36use Wikimedia\Http\MultiHttpClient;
37use Wikimedia\Stats\StatsFactory;
38
39class EventBus {
40
41    /**
42     * @const int the special event type indicating no events should be accepted.
43     */
44    public const TYPE_NONE = 0;
45
46    /**
47     * @const int the event type indicating that the event is a regular mediawiki event.
48     */
49    public const TYPE_EVENT = 1;
50
51    /**
52     * @const int the event type indicating that the event is a serialized job.
53     */
54    public const TYPE_JOB = 2;
55
56    /**
57     * @const int the event type indicating that the event is a CDN purge.
58     */
59    public const TYPE_PURGE = 4;
60
61    /**
62     * @const int the event type indicating any event type. (TYPE_EVENT ^ TYPE_EVENT)
63     */
64    public const TYPE_ALL = self::TYPE_EVENT | self::TYPE_JOB | self::TYPE_PURGE;
65
66    /**
67     * @const array names of the event type constants defined above
68     */
69    private const EVENT_TYPE_NAMES = [
70        'TYPE_NONE' => self::TYPE_NONE,
71        'TYPE_EVENT' => self::TYPE_EVENT,
72        'TYPE_JOB' => self::TYPE_JOB,
73        'TYPE_PURGE' => self::TYPE_PURGE,
74        'TYPE_ALL' => self::TYPE_ALL,
75    ];
76
77    /** @const int Default HTTP request timeout in seconds */
78    private const DEFAULT_REQUEST_TIMEOUT = 10;
79
80    /** @const string fallback value to use when a prometheus metrics label value
81     * (e.g. `meta.stream`) is not assigned.
82     */
83    public const VALUE_UNKNOWN = "__value_unknown__";
84
85    /** @var LoggerInterface instance for all EventBus instances */
86    private static $logger;
87
88    /** @var MultiHttpClient */
89    private $http;
90
91    /** @var string EventServiceUrl for this EventBus instance */
92    private $url;
93
94    /** @var int HTTP request timeout for this EventBus instance */
95    private $timeout;
96
97    /** @var int which event types are allowed to be sent (TYPE_NONE|TYPE_EVENT|TYPE_JOB|TYPE_PURGE|TYPE_ALL) */
98    private $allowedEventTypes;
99
100    /** @var EventFactory|null event creator */
101    private $eventFactory;
102
103    /** @var int Maximum byte size of a batch */
104    private $maxBatchByteSize;
105
106    /** @var bool Whether to forward the X-Client-IP header, if present */
107    private $forwardXClientIP;
108
109    /** @var string intake event service name */
110    private string $eventServiceName;
111
112    /** @var ?StatsFactory wf:Stats factory instance */
113    private ?StatsFactory $statsFactory;
114
115    /**
116     * @param MultiHttpClient $http
117     * @param string|int $enableEventBus A value of the wgEnableEventBus config, or a bitmask
118     * of TYPE_* constants
119     * @param EventFactory $eventFactory EventFactory to use for event construction.
120     * @param string $url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events
121     * @param int $maxBatchByteSize Maximum byte size of a batch
122     * @param int|null $timeout HTTP request timeout in seconds, defaults to 5.
123     * @param bool $forwardXClientIP Whether the X-Client-IP header should be forwarded
124     *   to the intake service, if present
125     * @param string $eventServiceName TODO: pass the event service name so that it can be used
126     *     to label metrics. This is a hack put in place while refactoring efforts on this class are
127     *   ongoing. This variable is used to label metrics in send().
128     * @param ?StatsFactory|null $statsFactory wf:Stats factory instance
129     */
130    public function __construct(
131        MultiHttpClient $http,
132        $enableEventBus,
133        EventFactory $eventFactory,
134        string $url,
135        int $maxBatchByteSize,
136        ?int $timeout = null,
137        bool $forwardXClientIP = false,
138        string $eventServiceName = EventBusFactory::EVENT_SERVICE_DISABLED_NAME,
139        ?StatsFactory $statsFactory = null
140    ) {
141        $this->http = $http;
142        $this->url = $url;
143        $this->maxBatchByteSize = $maxBatchByteSize;
144        $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT;
145        $this->eventFactory = $eventFactory;
146        $this->forwardXClientIP = $forwardXClientIP;
147        $this->eventServiceName = $eventServiceName;
148        $this->statsFactory = $statsFactory;
149
150        if ( is_int( $enableEventBus ) ) {
151            Assert::precondition(
152                (int)( $enableEventBus & self::TYPE_ALL ) === $enableEventBus,
153                'Invalid $enableEventBus parameter: ' . $enableEventBus
154            );
155            $this->allowedEventTypes = $enableEventBus;
156        } elseif ( is_string( $enableEventBus ) && $enableEventBus ) {
157            $this->allowedEventTypes = self::TYPE_NONE;
158            $allowedTypes = explode( '|', $enableEventBus );
159            foreach ( $allowedTypes as $allowedType ) {
160                Assert::precondition(
161                    array_key_exists( $allowedType, self::EVENT_TYPE_NAMES ),
162                    "EnableEventBus: $allowedType not recognized"
163                );
164                $this->allowedEventTypes |= self::EVENT_TYPE_NAMES[$allowedType];
165            }
166        } else {
167            $this->allowedEventTypes = self::TYPE_ALL;
168        }
169    }
170
171    /**
172     * @param array $events
173     * @param int $serializedSize
174     * @return array
175     */
176    private function partitionEvents( array $events, int $serializedSize ): array {
177        $results = [];
178
179        if ( count( $events ) > 1 ) {
180            $numOfChunks = ceil( $serializedSize / $this->maxBatchByteSize );
181            $partitions = array_chunk( $events, (int)floor( count( $events ) / $numOfChunks ) );
182            foreach ( $partitions as $partition ) {
183                $serializedPartition = self::serializeEvents( $partition );
184                if ( strlen( $serializedPartition ) > $this->maxBatchByteSize ) {
185                    $results = array_merge(
186                        $results,
187                        $this->partitionEvents( $partition, strlen( $serializedPartition ) )
188                    );
189                } else {
190                    $results[] = $serializedPartition;
191                }
192            }
193        } else {
194            self::logger()->warning(
195                "Event is larger than the maxBatchByteSize set.",
196                [
197                    'raw_event' => self::prepareEventsForLogging( $events )
198                ]
199            );
200            $results = [ self::serializeEvents( $events ) ];
201        }
202        return $results;
203    }
204
205    /**
206     * @param string $metricName
207     * @param int $value
208     * @param mixed ...$labels passed as $key => $value pairs
209     * @return void
210     */
211    private function incrementMetricByValue( string $metricName, int $value, ...$labels ): void {
212        if ( $this->statsFactory !== null ) {
213            $metric = $this->statsFactory->getCounter( $metricName );
214            foreach ( $labels as $label ) {
215                foreach ( $label as $k => $v ) {
216                    // Bug: T373086
217                    if ( $v === null ) {
218                        $v = self::VALUE_UNKNOWN;
219                        self::logger()->warning(
220                            ' Initialized metric label does not have an assigned value. ',
221                            [ "metric_label" => $k ]
222                        );
223                    }
224                    $metric->setLabel( $k, $v );
225                }
226            }
227            // Under the hood, Counter::incrementBy will update an integer
228            // valued counter, regardless of `$value` type.
229            $metric->incrementBy( $value );
230        }
231    }
232
233    /**
234     * Deliver an array of events to the remote event intake service.
235     *
236     * When emitting events, consider using {@link EventBusSendUpdate} instead of manually wrapping this
237     * method in a callable deferred update, to take advantage of automatic batching for events destined
238     * for the same underlying event service.
239     *
240     * Statslib metrics emitted by this method:
241     *
242     * - events_outgoing_total
243     * - events_outgoing_by_stream_total
244     * - events_accepted_total
245     * - events_failed_total
246     * - events_failed_by_stream_total
247     * - event_service_response_total
248     * - event_batch_not_enqueable_total
249     * - event_batch_is_string_total
250     * - event_batch_not_serializable_total
251     * - event_batch_partitioned_total
252     *     (incremented if $events had to be paratitioned and sent in multiple POST requests)
253     *
254     * @param array|string $events the events to send.
255     * @param int $type the type of the event being sent.
256     * @return array|bool|string True on success or an error string or array on failure
257     * @throws Exception
258     */
259    public function send( $events, $type = self::TYPE_EVENT ) {
260        // Label metrics by event type name. If the lookup fails,
261        // fall back to labeling with the $type id parameter. Unknown or invalid ids
262        // will be reported by the `events_are_not_enqueable` metric, which
263        // fires when an event type does not belong to this EventBus instance allow list.
264        // It should not be possible to supply a $type that does not belong
265        // to EVENT_TYPE_NAME. Falling back to a numerical id is just a guard
266        // to help us spot eventual bugs.
267        $eventType = array_search( $type, self::EVENT_TYPE_NAMES );
268        $eventType = ( $eventType === false ) ? $type : $eventType;
269        // Label metrics with the EventBus declared default service host.
270        // In the future, for streams that declare one, use the one provided by EventStreamConfig instead.
271        $baseMetricLabels = [
272            "function_name" => "send",
273            "event_type" => $eventType,
274            "event_service_name" => $this->eventServiceName,
275            "event_service_uri" => $this->url
276        ];
277
278        if ( !$this->shouldSendEvent( $type ) ) {
279            // Debug metric. How often is the `$events` param not enqueable?
280            $this->incrementMetricByValue(
281                "event_batch_not_enqueable_total",
282                1,
283                $baseMetricLabels
284            );
285            return "Events of type '$type' are not enqueueable";
286        }
287        if ( !$events ) {
288            // Logstash doesn't like the args, because they could be of various types
289            $context = [ 'exception' => new RuntimeException() ];
290            self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context );
291            return "Provided event list is empty";
292        }
293
294        // Historically, passing a JSON string has been supported, but we'd like to deprecate this feature.
295        // This was done to avoid having extra encode+decode steps if the caller already has a JSON string.
296        // But, we end up decoding always anyway, to properly increment metrics.
297        if ( is_string( $events ) ) {
298            // TODO: is $events ever passed as a string? We should refactor this block and simplify the union type.
299            // Debug metric. How often is the `$events` param a string?
300            $this->incrementMetricByValue(
301                "event_batch_is_string_total",
302                1,
303                $baseMetricLabels
304            );
305
306            $decodedEvents = FormatJson::decode( $events, true );
307
308            if ( $decodedEvents === null ) {
309                $context = [
310                    'exception' => new RuntimeException(),
311                    'raw_events' => self::prepareEventsForLogging( $events )
312                ];
313                self::logger()->error( 'Failed decoding events from JSON string.', $context );
314                return "Failed decoding events from JSON string";
315            }
316
317            $events = $decodedEvents;
318        }
319
320        // Code below expects that $events is a numeric array of event assoc arrays.
321        if ( !array_key_exists( 0, $events ) ) {
322            $events = [ $events ];
323        }
324        $outgoingEventsCount = count( $events );
325
326        // Increment events_outgoing_total
327        // NOTE: We could just use events_outgoing_by_stream_total and sum,
328        //       but below we want to emit an events_accepted_total.
329        //       In the case of a 207 partial success, we don't know
330        //       the stream names of the successful events
331        //       (without diffing the response from the event service and $events).
332        //       For consistency, we both events_outgoing_total and events_outgoing_by_stream_total.
333        $this->incrementMetricByValue(
334            "events_outgoing_total",
335            $outgoingEventsCount,
336            $baseMetricLabels,
337        );
338
339        // Increment events_outgoing_by_stream_total for each stream
340        $eventsByStream = self::groupEventsByStream( $events );
341        foreach ( $eventsByStream as $streamName => $eventsForStreamName ) {
342            $this->incrementMetricByValue(
343                "events_outgoing_by_stream_total",
344                count( $eventsForStreamName ),
345                $baseMetricLabels,
346                [ "stream_name" => $streamName ]
347            );
348        }
349
350        // validateJSONSerializable only logs if any part of the event is not serializable.
351        // It does not return anything or raise any exceptions.
352        self::validateJSONSerializable( $events );
353        // Serialize the array of events to a JSON string.
354        $serializedEvents = self::serializeEvents( $events );
355        if ( !$serializedEvents ) {
356            $this->incrementMetricByValue(
357                "event_batch_not_serializable_total",
358                1,
359                $baseMetricLabels
360            );
361            // serializeEvents has already logged, so we can just return.
362            return "Unable to serialize events";
363        }
364
365        // If the body would be too big, partition it into multiple bodies.
366        if ( strlen( $serializedEvents ) > $this->maxBatchByteSize ) {
367            $postBodies = $this->partitionEvents( $events, strlen( $serializedEvents ) );
368            // Measure the number of times we partition events into more than one batch.
369            $this->incrementMetricByValue(
370                "event_batch_partitioned_total",
371                1,
372                $baseMetricLabels
373            );
374        } else {
375            $postBodies = [ $serializedEvents ];
376        }
377
378        // Most of the time $postBodies will be a single element array, and we
379        // will only need to send one POST request.
380        // When the size is too large, $events will have been partitioned into
381        // multiple $postBodies, for which each will be sent as its own POST request.
382        $originalRequest = RequestContext::getMain()->getRequest();
383        $requests = array_map(
384            function ( $postBody ) use ( $originalRequest ) {
385                $req = [
386                    'url' => $this->url,
387                    'method' => 'POST',
388                    'body' => $postBody,
389                    'headers' => [ 'content-type' => 'application/json' ]
390                ];
391                if ( $this->forwardXClientIP ) {
392                    $req['headers']['x-client-ip'] = $originalRequest->getIP();
393                }
394                return $req;
395            },
396            $postBodies
397        );
398
399        // Do the POST requests.
400        $responses = $this->http->runMulti(
401            $requests,
402            [
403                'reqTimeout' => $this->timeout
404            ]
405        );
406
407        // Keep track of the total number of failed events.
408        // This will be used to calculate events_accepted_count later.
409        $failedEventsCountTotal = 0;
410
411        // 201: all events accepted.
412        // 202: all events accepted but not necessarily persisted. HTTP response is returned 'hastily'.
413        // 207: some but not all events accepted: either due to validation failure or error.
414        // 400: no events accepted: all failed schema validation.
415        // 500: no events accepted: at least one caused an error, but some might have been invalid.
416        $results = [];
417        foreach ( $responses as $i => $response ) {
418            $res = $response['response'];
419            $code = $res['code'];
420
421            $this->incrementMetricByValue(
422                "event_service_response_total",
423                1,
424                $baseMetricLabels,
425                [ "status_code" => $code ]
426            );
427
428            if ( $code == 207 || $code >= 300 ) {
429                $message = empty( $res['error'] ) ?
430                    (string)$code . ': ' . (string)$res['reason'] : $res['error'];
431                // Limit the maximum size of the logged context to 8 kilobytes as that's where logstash
432                // truncates the JSON anyway.
433                $context = [
434                    // $responses[$i] corresponds with $postBodies[$i]
435                    'raw_events' => self::prepareEventsForLogging( $postBodies[$i] ),
436                    'service_response' => $res,
437                    'exception' => new RuntimeException(),
438                ];
439                self::logger()->error( "Unable to deliver all events: {$message}", $context );
440
441                if ( isset( $res['body'] ) ) {
442                    // We expect the event service to return an array of objects
443                    // in the response body.
444                    // FormatJson::decode will return `null` if the message failed to parse.
445                    // If anything other than an array is parsed we treat it as unexpected
446                    // behaviour, and log the response at error severity.
447                    // See https://phabricator.wikimedia.org/T370428
448
449                    // $failureInfosByKind should look like:
450                    // {
451                    //       "<failure_kind">: [
452                    //         { ..., "event": {<failed event here>}, "context": {<failure context here>},
453                    //        { ... }
454                    //    ],
455                    // }
456                    $failureInfosByKind = FormatJson::decode( $res['body'], true );
457                    if ( is_array( $failureInfosByKind ) ) {
458                        foreach ( $failureInfosByKind as $failureKind => $failureInfos ) {
459                            // $failureInfos should not be null or empty.
460                            // This is just a guard against what the intake
461                            // service returns (or the behavior of different json parsing methods - possibly).
462                            // https://www.mediawiki.org/wiki/Manual:Coding_conventions/PHP#empty()
463                            if ( $failureInfos === null || $failureInfos === [] ) {
464                                continue;
465                            }
466
467                            // Get the events that failed from the response.
468                            $failedEvents = array_map(
469                                static function ( $failureStatus ) {
470                                    return $failureStatus['event'] ?? null;
471                                },
472                                $failureInfos
473                            );
474
475                            $failedEventsCount = count( $failedEvents );
476                            $failedEventsCountTotal += $failedEventsCount;
477
478                            // increment events_failed_total
479                            $this->incrementMetricByValue(
480                                "events_failed_total",
481                                $failedEventsCount,
482                                $baseMetricLabels,
483                                [
484                                    "failure_kind" => $failureKind,
485                                    "status_code" => $code,
486                                ]
487                            );
488
489                            // Group failed events by stream and increment events_failed_by_stream_total.
490                            $failedEventsByStream = self::groupEventsByStream( $failedEvents );
491                            foreach ( $failedEventsByStream as $streamName => $failedEventsForStream ) {
492                                $this->incrementMetricByValue(
493                                    "events_failed_by_stream_total",
494                                    count( $failedEventsForStream ),
495                                    $baseMetricLabels,
496                                    [
497                                        "failure_kind" => $failureKind,
498                                        "status_code" => $code,
499                                        "stream_name" => $streamName,
500                                    ]
501                                );
502                            }
503                        }
504                    } else {
505                        self::logger()->error( "Invalid event service response body", $context );
506                    }
507                }
508                $results[] = "Unable to deliver all events: $message";
509            }
510        }
511
512        // increment events_accepted_total as the difference between
513        // $outgoingEventsCount and $failedEventsCountTotal (if there were any failed events).
514        $this->incrementMetricByValue(
515            "events_accepted_total",
516            $outgoingEventsCount - $failedEventsCountTotal,
517            $baseMetricLabels,
518        );
519
520        return $results ?: true;
521    }
522
523    // == static helper functions below ==
524
525    /**
526     * Given an event assoc array, extracts the stream name from meta.stream,
527     * or returns STREAM_NAME_UNKNOWN
528     * @param array|null $event
529     * @return mixed|string
530     */
531    public static function getStreamNameFromEvent( ?array $event ) {
532        return is_array( $event ) && isset( $event['meta']['stream'] ) ?
533            $event['meta']['stream'] :
534            self::VALUE_UNKNOWN;
535    }
536
537    /**
538     * Given an assoc array of events, this returns them grouped by stream name.
539     * @param array $events
540     * @return array
541     */
542    public static function groupEventsByStream( array $events ): array {
543        $groupedEvents = [];
544        foreach ( $events as $event ) {
545            $streamName = self::getStreamNameFromEvent( $event );
546            $groupedEvents[$streamName] ??= [];
547            $groupedEvents[$streamName][] = $event;
548        }
549        return $groupedEvents;
550    }
551
552    /**
553     * Serializes $events array to a JSON string.  If FormatJson::encode()
554     * returns false, this will log a detailed error message and return null.
555     *
556     * @param array $events
557     * @return string|null JSON or null on failure
558     */
559    public static function serializeEvents( $events ) {
560        try {
561            $serializedEvents = FormatJson::encode( $events, false, FormatJson::ALL_OK );
562            if ( !$serializedEvents ) {
563                // Something failed. Let's figure out exactly which one.
564                $bad = [];
565                foreach ( $events as $event ) {
566                    $result = FormatJson::encode( $event, false, FormatJson::ALL_OK );
567                    if ( !$result ) {
568                        $bad[] = $event;
569                    }
570                }
571                $context = [
572                    'exception' => new RuntimeException(),
573                    'json_last_error' => json_last_error_msg(),
574                    // Use PHP serialization since that will *always* work.
575                    'events' => serialize( $bad ),
576                ];
577                self::logger()->error(
578                    'FormatJson::encode($events) failed: ' . $context['json_last_error'] .
579                    '. Aborting send.', $context
580                );
581                return null;
582            }
583            return $serializedEvents;
584        } catch ( Exception $exception ) {
585            $context = [
586                'exception' => $exception,
587                'json_last_error' => json_last_error_msg()
588            ];
589            self::logger()->error(
590                'FormatJson::encode($events) thrown exception: ' . $context['json_last_error'] .
591                '. Aborting send.', $context
592            );
593            return null;
594        }
595    }
596
597    /**
598     * Prepares events for logging - serializes if needed, limits the size
599     * of the serialized event to 8kb.
600     *
601     * @param string|array $events
602     * @return string|null
603     */
604    private static function prepareEventsForLogging( $events ) {
605        if ( is_array( $events ) ) {
606            $events = self::serializeEvents( $events );
607        }
608
609        if ( $events === null ) {
610            return null;
611        }
612
613        return strlen( $events ) > 8192 ? substr( $events, 0, 8192 ) : $events;
614    }
615
616    /**
617     * If $value is a string, but not UTF-8 encoded, then assume it is binary
618     * and base64 encode it and prefix it with a content type.
619     * @param mixed $value
620     * @return mixed
621     */
622    public static function replaceBinaryValues( $value ) {
623        if ( is_string( $value ) && !mb_check_encoding( $value, 'UTF-8' ) ) {
624            return 'data:application/octet-stream;base64,' . base64_encode( $value );
625        }
626        return $value;
627    }
628
629    /**
630     * Recursively calls replaceBinaryValues on an array and transforms
631     * any binary values.  $array is passed by reference and will be modified.
632     * @param array &$array
633     * @return bool return value of array_walk_recursive
634     */
635    public static function replaceBinaryValuesRecursive( &$array ) {
636        return array_walk_recursive( $array, function ( &$item, $key ) {
637            $item = self::replaceBinaryValues( $item );
638        } );
639    }
640
641    /**
642     * Checks a part of the event for JSON-serializability
643     *
644     * @param array $originalEvent an original event that is being checked.
645     * @param array $eventPart the sub-object nested in the original event to be checked.
646     */
647    private static function checkEventPart( $originalEvent, $eventPart ) {
648        foreach ( $eventPart as $p => $v ) {
649            if ( is_array( $v ) ) {
650                self::checkEventPart( $originalEvent, $v );
651            } elseif ( !is_scalar( $v ) && $v !== null ) {
652                // Only log the first appearance of non-scalar property per event as jobs
653                // might contain hundreds of properties and we do not want to log everything.
654                self::logger()->error( 'Non-scalar value found in the event', [
655                    'raw_events' => self::prepareEventsForLogging( [ $originalEvent ] ),
656                    'prop_name' => $p,
657                    'prop_val_type' => get_class( $v )
658                ] );
659                // Follow to the next event in the array
660                return;
661            }
662        }
663    }
664
665    /**
666     * Checks if the event is JSON-serializable (contains only scalar values)
667     * and logs the event if non-scalar found.
668     *
669     * @param array $events
670     */
671    private static function validateJSONSerializable( $events ) {
672        foreach ( $events as $event ) {
673            self::checkEventPart( $event, $event );
674        }
675    }
676
677    /**
678     * @param int $eventType
679     * @return int
680     */
681    private function shouldSendEvent( $eventType ) {
682        return $this->allowedEventTypes & $eventType;
683    }
684
685    /**
686     * Returns a singleton logger instance for all EventBus instances.
687     * Use like: self::logger()->info( $mesage )
688     * We use this so we don't have to check if the logger has been created
689     * before attempting to log a message.
690     * @return LoggerInterface
691     */
692    public static function logger() {
693        if ( !self::$logger ) {
694            self::$logger = LoggerFactory::getInstance( 'EventBus' );
695        }
696        return self::$logger;
697    }
698
699    /**
700     * Returns the EventFactory associated with this instance of EventBus
701     * @return EventFactory|null
702     */
703    public function getFactory() {
704        return $this->eventFactory;
705    }
706
707    /**
708     * @param string|null $eventServiceName
709     *        The name of a key in the EventServices config looked up via
710     *        MediaWikiServices::getInstance()->getMainConfig()->get('EventServices').
711     *        The EventService config is keyed by service name, and should at least contain
712     *        a 'url' entry pointing at the event service endpoint events should be
713     *        POSTed to. They can also optionally contain a 'timeout' entry specifying
714     *        the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry that can be
715     *        set to true if the X-Client-IP header from the originating request should be
716     *        forwarded to the event service. Instances are singletons identified by
717     *        $eventServiceName.
718     *
719     *        NOTE: Previously, this function took a $config object instead of an
720     *        event service name.  This is a backwards compatible change, but because
721     *        there are no other users of this extension, we can do this safely.
722     *
723     * @return EventBus
724     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
725     */
726    public static function getInstance( $eventServiceName ) {
727        return MediaWikiServices::getInstance()
728            ->get( 'EventBus.EventBusFactory' )
729            ->getInstance( $eventServiceName );
730    }
731
732    /**
733     * Uses EventStreamConfig.StreamConfigs to look up the
734     * EventBus instance to use for a $stream.
735     * If the stream is disabled, a non-producing EventBus instance will be used.
736     * If none is found, falls back to using wgEventServiceDefault.
737     *
738     * @param string $stream the stream to send an event to
739     * @return EventBus
740     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
741     */
742    public static function getInstanceForStream( $stream ) {
743        return MediaWikiServices::getInstance()
744            ->get( 'EventBus.EventBusFactory' )
745            ->getInstanceForStream( $stream );
746    }
747}