Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
70.97% covered (warning)
70.97%
110 / 155
50.00% covered (danger)
50.00%
7 / 14
CRAP
0.00% covered (danger)
0.00%
0 / 1
EventBus
70.97% covered (warning)
70.97%
110 / 155
50.00% covered (danger)
50.00%
7 / 14
114.65
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
22 / 22
100.00% covered (success)
100.00%
1 / 1
6
 partitionEvents
100.00% covered (success)
100.00%
20 / 20
100.00% covered (success)
100.00%
1 / 1
4
 send
88.24% covered (warning)
88.24%
45 / 51
0.00% covered (danger)
0.00%
0 / 1
15.37
 serializeEvents
10.71% covered (danger)
10.71%
3 / 28
0.00% covered (danger)
0.00%
0 / 1
22.79
 prepareEventsForLogging
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
4.13
 replaceBinaryValues
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 replaceBinaryValuesRecursive
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 checkEventPart
40.00% covered (danger)
40.00%
4 / 10
0.00% covered (danger)
0.00%
0 / 1
10.40
 validateJSONSerializable
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 shouldSendEvent
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 logger
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 getFactory
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getInstance
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getInstanceForStream
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * Event delivery.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 * http://www.gnu.org/copyleft/gpl.html
20 *
21 * @file
22 * @author Eric Evans, Andrew Otto
23 */
24
25namespace MediaWiki\Extension\EventBus;
26
27use Exception;
28use FormatJson;
29use InvalidArgumentException;
30use MediaWiki\Logger\LoggerFactory;
31use MediaWiki\MediaWikiServices;
32use MultiHttpClient;
33use Psr\Log\LoggerInterface;
34use RuntimeException;
35use Wikimedia\Assert\Assert;
36
37class EventBus {
38
39    /**
40     * @const int the special event type indicating no events should be accepted.
41     */
42    public const TYPE_NONE = 0;
43
44    /**
45     * @const int the event type indicating that the event is a regular mediawiki event.
46     */
47    public const TYPE_EVENT = 1;
48
49    /**
50     * @const int the event type indicating that the event is a serialized job.
51     */
52    public const TYPE_JOB = 2;
53
54    /**
55     * @const int the event type indicating that the event is a CDN purge.
56     */
57    public const TYPE_PURGE = 4;
58
59    /**
60     * @const int the event type indicating any event type. (TYPE_EVENT ^ TYPE_EVENT)
61     */
62    public const TYPE_ALL = self::TYPE_EVENT | self::TYPE_JOB | self::TYPE_PURGE;
63
64    /**
65     * @const array names of the event type constants defined above
66     */
67    private const EVENT_TYPE_NAMES = [
68        'TYPE_NONE' => self::TYPE_NONE,
69        'TYPE_EVENT' => self::TYPE_EVENT,
70        'TYPE_JOB' => self::TYPE_JOB,
71        'TYPE_PURGE' => self::TYPE_PURGE,
72        'TYPE_ALL' => self::TYPE_ALL,
73    ];
74
75    /** @const int Default HTTP request timeout in seconds */
76    private const DEFAULT_REQUEST_TIMEOUT = 10;
77
78    /** @var LoggerInterface instance for all EventBus instances */
79    private static $logger;
80
81    /** @var MultiHttpClient */
82    private $http;
83
84    /** @var string EventServiceUrl for this EventBus instance */
85    private $url;
86
87    /** @var int HTTP request timeout for this EventBus instance */
88    private $timeout;
89
90    /** @var int which event types are allowed to be sent (TYPE_NONE|TYPE_EVENT|TYPE_JOB|TYPE_PURGE|TYPE_ALL) */
91    private $allowedEventTypes;
92
93    /** @var EventFactory|null event creator */
94    private $eventFactory;
95
96    /** @var int Maximum byte size of a batch */
97    private $maxBatchByteSize;
98
99    /** @var bool Whether to forward the X-Client-IP header, if present */
100    private $forwardXClientIP;
101
102    /**
103     * @param MultiHttpClient $http
104     * @param string|int $enableEventBus A value of the wgEnableEventBus config, or a bitmask
105     * of TYPE_* constants
106     * @param EventFactory $eventFactory EventFactory to use for event construction.
107     * @param string $url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events
108     * @param int $maxBatchByteSize Maximum byte size of a batch
109     * @param int|null $timeout HTTP request timeout in seconds, defaults to 5.
110     * @param bool $forwardXClientIP Whether the X-Client-IP header should be forwarded
111     *   to the intake service, if present
112     */
113    public function __construct(
114        MultiHttpClient $http,
115        $enableEventBus,
116        EventFactory $eventFactory,
117        string $url,
118        int $maxBatchByteSize,
119        int $timeout = null,
120        bool $forwardXClientIP = false
121    ) {
122        $this->http = $http;
123        $this->url = $url;
124        $this->maxBatchByteSize = $maxBatchByteSize;
125        $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT;
126        $this->eventFactory = $eventFactory;
127        $this->forwardXClientIP = $forwardXClientIP;
128
129        if ( is_int( $enableEventBus ) ) {
130            Assert::precondition(
131                (int)( $enableEventBus & self::TYPE_ALL ) === $enableEventBus,
132                'Invalid $enableEventBus parameter: ' . $enableEventBus
133            );
134            $this->allowedEventTypes = $enableEventBus;
135        } elseif ( is_string( $enableEventBus ) && $enableEventBus ) {
136            $this->allowedEventTypes = self::TYPE_NONE;
137            $allowedTypes = explode( '|', $enableEventBus );
138            foreach ( $allowedTypes as $allowedType ) {
139                Assert::precondition(
140                    array_key_exists( $allowedType, self::EVENT_TYPE_NAMES ),
141                    "EnableEventBus: $allowedType not recognized"
142                );
143                $this->allowedEventTypes |= self::EVENT_TYPE_NAMES[$allowedType];
144            }
145        } else {
146            $this->allowedEventTypes = self::TYPE_ALL;
147        }
148    }
149
150    /**
151     * @param array $events
152     * @param int $serializedSize
153     * @return array
154     */
155    private function partitionEvents( array $events, int $serializedSize ): array {
156        $results = [];
157
158        if ( count( $events ) > 1 ) {
159            $numOfChunks = ceil( $serializedSize / $this->maxBatchByteSize );
160            $partitions = array_chunk( $events, (int)floor( count( $events ) / $numOfChunks ) );
161            foreach ( $partitions as $partition ) {
162                $serializedPartition = self::serializeEvents( $partition );
163                if ( strlen( $serializedPartition ) > $this->maxBatchByteSize ) {
164                    $results = array_merge(
165                        $results,
166                        $this->partitionEvents( $partition, strlen( $serializedPartition ) )
167                    );
168                } else {
169                    $results[] = $serializedPartition;
170                }
171            }
172        } else {
173            self::logger()->warning(
174                "Event is larger than the maxBatchByteSize set.",
175                [
176                    'raw_event' => self::prepareEventsForLogging( $events )
177                ]
178            );
179            $results = [ self::serializeEvents( $events ) ];
180        }
181        return $results;
182    }
183
184    /**
185     * Deliver an array of events to the remote service.
186     *
187     * @param array|string $events the events to send.
188     * @param int $type the type of the event being sent.
189     * @return array|bool|string True on success or an error string or array on failure
190     * @throws Exception
191     */
192    public function send( $events, $type = self::TYPE_EVENT ) {
193        if ( !$this->shouldSendEvent( $type ) ) {
194            return "Events of type '$type' are not enqueueable";
195        }
196        if ( !$events ) {
197            // Logstash doesn't like the args, because they could be of various types
198            $context = [ 'exception' => new RuntimeException() ];
199            self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context );
200            return "Provided event list is empty";
201        }
202
203        // If we already have a JSON string of events, just use it as the body.
204        if ( is_string( $events ) ) {
205            if ( strlen( $events ) > $this->maxBatchByteSize ) {
206                $decodeEvents = FormatJson::decode( $events, true );
207                $body = $this->partitionEvents( $decodeEvents, strlen( $events ) );
208            } else {
209                $body = $events;
210            }
211        } else {
212            self::validateJSONSerializable( $events );
213            // Else serialize the array of events to a JSON string.
214            $serializedEvents = self::serializeEvents( $events );
215            // If not $body, then something when wrong.
216            // serializeEvents has already logged, so we can just return.
217            if ( !$serializedEvents ) {
218                return "Unable to serialize events";
219            }
220
221            if ( strlen( $serializedEvents ) > $this->maxBatchByteSize ) {
222                $body = $this->partitionEvents( $events, strlen( $serializedEvents ) );
223            } else {
224                $body = $serializedEvents;
225            }
226        }
227
228        $reqs = array_map( function ( $body ) {
229            $req = [
230                'url'        => $this->url,
231                'method'    => 'POST',
232                'body'        => $body,
233                'headers'    => [ 'content-type' => 'application/json' ]
234            ];
235            if ( $this->forwardXClientIP && isset( $_SERVER['HTTP_X_CLIENT_IP'] ) ) {
236                $req['headers']['x-client-ip'] = $_SERVER['HTTP_X_CLIENT_IP'];
237            }
238            return $req;
239        }, is_array( $body ) ? $body : [ $body ] );
240
241        $responses = $this->http->runMulti(
242            $reqs,
243            [
244                'reqTimeout' => $this->timeout
245            ]
246        );
247
248        // 201: all events accepted.
249        // 202: all events accepted but not necessarily persisted. HTTP response is returned 'hastily'.
250        // 207: some but not all events accepted: either due to validation failure or error.
251        // 400: no events accepted: all failed schema validation.
252        // 500: no events accepted: at least one caused an error, but some might have been invalid.
253        $results = [];
254        foreach ( $responses as $response ) {
255            $res = $response['response'];
256            if ( $res['code'] == 207 || $res['code'] >= 300 ) {
257                $message = empty( $res['error'] ) ?
258                    (string)$res['code'] . ': ' . (string)$res['reason'] : $res['error'];
259                // Limit the maximum size of the logged context to 8 kilobytes as that's where logstash
260                // truncates the JSON anyway
261                $context = [
262                    'raw_events' => self::prepareEventsForLogging( $body ),
263                    'service_response' => $res,
264                    'exception' => new RuntimeException(),
265                ];
266                self::logger()->error( "Unable to deliver all events: {$message}", $context );
267
268                $results[] = "Unable to deliver all events: $message";
269            }
270        }
271
272        if ( $results !== [] ) {
273            return $results;
274        }
275
276        return true;
277    }
278
279    // == static helper functions below ==
280
281    /**
282     * Serializes $events array to a JSON string.  If FormatJson::encode()
283     * returns false, this will log a detailed error message and return null.
284     *
285     * @param array $events
286     * @return string|null JSON or null on failure
287     */
288    public static function serializeEvents( $events ) {
289        try {
290            $serializedEvents = FormatJson::encode( $events, false, FormatJson::ALL_OK );
291            if ( !$serializedEvents ) {
292                // Something failed. Let's figure out exactly which one.
293                $bad = [];
294                foreach ( $events as $event ) {
295                    $result = FormatJson::encode( $event, false, FormatJson::ALL_OK );
296                    if ( !$result ) {
297                        $bad[] = $event;
298                    }
299                }
300                $context = [
301                    'exception' => new RuntimeException(),
302                    'json_last_error' => json_last_error_msg(),
303                    // Use PHP serialization since that will *always* work.
304                    'events' => serialize( $bad ),
305                ];
306                self::logger()->error(
307                    'FormatJson::encode($events) failed: ' . $context['json_last_error'] .
308                    '. Aborting send.', $context
309                );
310                return null;
311            }
312            return $serializedEvents;
313        } catch ( Exception $exception ) {
314            $context = [
315                'exception' => $exception,
316                'json_last_error' => json_last_error_msg()
317            ];
318            self::logger()->error(
319                'FormatJson::encode($events) thrown exception: ' . $context['json_last_error'] .
320                '. Aborting send.', $context
321            );
322            return null;
323        }
324    }
325
326    /**
327     * Prepares events for logging - serializes if needed, limits the size
328     * of the serialized event to 8kb.
329     *
330     * @param string|array $events
331     * @return string|null
332     */
333    private static function prepareEventsForLogging( $events ) {
334        if ( is_array( $events ) ) {
335            $events = self::serializeEvents( $events );
336        }
337
338        if ( $events === null ) {
339            return null;
340        }
341
342        return strlen( $events ) > 8192 ? substr( $events, 0, 8192 ) : $events;
343    }
344
345    /**
346     * If $value is a string, but not UTF-8 encoded, then assume it is binary
347     * and base64 encode it and prefix it with a content type.
348     * @param mixed $value
349     * @return mixed
350     */
351    public static function replaceBinaryValues( $value ) {
352        if ( is_string( $value ) && !mb_check_encoding( $value, 'UTF-8' ) ) {
353            return 'data:application/octet-stream;base64,' . base64_encode( $value );
354        }
355        return $value;
356    }
357
358    /**
359     * Recursively calls replaceBinaryValues on an array and transforms
360     * any binary values.  $array is passed by reference and will be modified.
361     * @param array &$array
362     * @return bool return value of array_walk_recursive
363     */
364    public static function replaceBinaryValuesRecursive( &$array ) {
365        return array_walk_recursive( $array, function ( &$item, $key ) {
366            $item = self::replaceBinaryValues( $item );
367        } );
368    }
369
370    /**
371     * Checks a part of the event for JSON-serializability
372     *
373     * @param array $originalEvent an original event that is being checked.
374     * @param array $eventPart the sub-object nested in the original event to be checked.
375     */
376    private static function checkEventPart( $originalEvent, $eventPart ) {
377        foreach ( $eventPart as $p => $v ) {
378            if ( is_array( $v ) ) {
379                self::checkEventPart( $originalEvent, $v );
380            } elseif ( !is_scalar( $v ) && $v !== null ) {
381                // Only log the first appearance of non-scalar property per event as jobs
382                // might contain hundreds of properties and we do not want to log everything.
383                self::logger()->error( 'Non-scalar value found in the event', [
384                    'raw_events' => self::prepareEventsForLogging( [ $originalEvent ] ),
385                    'prop_name' => $p,
386                    'prop_val_type' => get_class( $v )
387                ] );
388                // Follow to the next event in the array
389                return;
390            }
391        }
392    }
393
394    /**
395     * Checks if the event is JSON-serializable (contains only scalar values)
396     * and logs the event if non-scalar found.
397     *
398     * @param array $events
399     */
400    private static function validateJSONSerializable( $events ) {
401        foreach ( $events as $event ) {
402            self::checkEventPart( $event, $event );
403        }
404    }
405
406    private function shouldSendEvent( $eventType ) {
407        return $this->allowedEventTypes & $eventType;
408    }
409
410    /**
411     * Returns a singleton logger instance for all EventBus instances.
412     * Use like: self::logger()->info( $mesage )
413     * We use this so we don't have to check if the logger has been created
414     * before attempting to log a message.
415     * @return LoggerInterface
416     */
417    public static function logger() {
418        if ( !self::$logger ) {
419            self::$logger = LoggerFactory::getInstance( 'EventBus' );
420        }
421        return self::$logger;
422    }
423
424    /**
425     * Returns the EventFactory associated with this instance of EventBus
426     * @return EventFactory|null
427     */
428    public function getFactory() {
429        return $this->eventFactory;
430    }
431
432    /**
433     * @param string|null $eventServiceName
434     *         The name of a key in the EventServices config looked up via
435     *         MediaWikiServices::getInstance()->getMainConfig()->get('EventServices').
436     *         The EventService config is keyed by service name, and should at least contain
437     *         a 'url' entry pointing at the event service endpoint events should be
438     *         POSTed to. They can also optionally contain a 'timeout' entry specifying
439     *         the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry that can be
440     *         set to true if the X-Client-IP header from the originating request should be
441     *         forwarded to the event service. Instances are singletons identified by
442     *         $eventServiceName.
443     *
444     *         NOTE: Previously, this function took a $config object instead of an
445     *         event service name.  This is a backwards compatible change, but because
446     *         there are no other users of this extension, we can do this safely.
447     *
448     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
449     * @return EventBus
450     */
451    public static function getInstance( $eventServiceName ) {
452        return MediaWikiServices::getInstance()
453            ->get( 'EventBus.EventBusFactory' )
454            ->getInstance( $eventServiceName );
455    }
456
457    /**
458     * Uses EventStreamConfig.StreamConfigs to look up the
459     * EventBus instance to use for a $stream.
460     * If the stream is disabled, a non-producing EventBus instance will be used.
461     * If none is found, falls back to using wgEventServiceDefault.
462     *
463     * @param string $stream the stream to send an event to
464     * @return EventBus
465     * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured.
466     */
467    public static function getInstanceForStream( $stream ) {
468        return MediaWikiServices::getInstance()
469            ->get( 'EventBus.EventBusFactory' )
470            ->getInstanceForStream( $stream );
471    }
472}