Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
70.97% |
110 / 155 |
|
50.00% |
7 / 14 |
CRAP | |
0.00% |
0 / 1 |
EventBus | |
70.97% |
110 / 155 |
|
50.00% |
7 / 14 |
114.65 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
22 / 22 |
|
100.00% |
1 / 1 |
6 | |||
partitionEvents | |
100.00% |
20 / 20 |
|
100.00% |
1 / 1 |
4 | |||
send | |
88.24% |
45 / 51 |
|
0.00% |
0 / 1 |
15.37 | |||
serializeEvents | |
10.71% |
3 / 28 |
|
0.00% |
0 / 1 |
22.79 | |||
prepareEventsForLogging | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
4.13 | |||
replaceBinaryValues | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
3 | |||
replaceBinaryValuesRecursive | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
checkEventPart | |
40.00% |
4 / 10 |
|
0.00% |
0 / 1 |
10.40 | |||
validateJSONSerializable | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
shouldSendEvent | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
logger | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
getFactory | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getInstance | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getInstanceForStream | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | /** |
4 | * Event delivery. |
5 | * |
6 | * This program is free software; you can redistribute it and/or modify |
7 | * it under the terms of the GNU General Public License as published by |
8 | * the Free Software Foundation; either version 2 of the License, or |
9 | * (at your option) any later version. |
10 | * |
11 | * This program is distributed in the hope that it will be useful, |
12 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | * GNU General Public License for more details. |
15 | * |
16 | * You should have received a copy of the GNU General Public License along |
17 | * with this program; if not, write to the Free Software Foundation, Inc., |
18 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
19 | * http://www.gnu.org/copyleft/gpl.html |
20 | * |
21 | * @file |
22 | * @author Eric Evans, Andrew Otto |
23 | */ |
24 | |
25 | namespace MediaWiki\Extension\EventBus; |
26 | |
27 | use Exception; |
28 | use FormatJson; |
29 | use InvalidArgumentException; |
30 | use MediaWiki\Logger\LoggerFactory; |
31 | use MediaWiki\MediaWikiServices; |
32 | use MultiHttpClient; |
33 | use Psr\Log\LoggerInterface; |
34 | use RuntimeException; |
35 | use Wikimedia\Assert\Assert; |
36 | |
37 | class EventBus { |
38 | |
39 | /** |
40 | * @const int the special event type indicating no events should be accepted. |
41 | */ |
42 | public const TYPE_NONE = 0; |
43 | |
44 | /** |
45 | * @const int the event type indicating that the event is a regular mediawiki event. |
46 | */ |
47 | public const TYPE_EVENT = 1; |
48 | |
49 | /** |
50 | * @const int the event type indicating that the event is a serialized job. |
51 | */ |
52 | public const TYPE_JOB = 2; |
53 | |
54 | /** |
55 | * @const int the event type indicating that the event is a CDN purge. |
56 | */ |
57 | public const TYPE_PURGE = 4; |
58 | |
59 | /** |
60 | * @const int the event type indicating any event type. (TYPE_EVENT ^ TYPE_EVENT) |
61 | */ |
62 | public const TYPE_ALL = self::TYPE_EVENT | self::TYPE_JOB | self::TYPE_PURGE; |
63 | |
64 | /** |
65 | * @const array names of the event type constants defined above |
66 | */ |
67 | private const EVENT_TYPE_NAMES = [ |
68 | 'TYPE_NONE' => self::TYPE_NONE, |
69 | 'TYPE_EVENT' => self::TYPE_EVENT, |
70 | 'TYPE_JOB' => self::TYPE_JOB, |
71 | 'TYPE_PURGE' => self::TYPE_PURGE, |
72 | 'TYPE_ALL' => self::TYPE_ALL, |
73 | ]; |
74 | |
75 | /** @const int Default HTTP request timeout in seconds */ |
76 | private const DEFAULT_REQUEST_TIMEOUT = 10; |
77 | |
78 | /** @var LoggerInterface instance for all EventBus instances */ |
79 | private static $logger; |
80 | |
81 | /** @var MultiHttpClient */ |
82 | private $http; |
83 | |
84 | /** @var string EventServiceUrl for this EventBus instance */ |
85 | private $url; |
86 | |
87 | /** @var int HTTP request timeout for this EventBus instance */ |
88 | private $timeout; |
89 | |
90 | /** @var int which event types are allowed to be sent (TYPE_NONE|TYPE_EVENT|TYPE_JOB|TYPE_PURGE|TYPE_ALL) */ |
91 | private $allowedEventTypes; |
92 | |
93 | /** @var EventFactory|null event creator */ |
94 | private $eventFactory; |
95 | |
96 | /** @var int Maximum byte size of a batch */ |
97 | private $maxBatchByteSize; |
98 | |
99 | /** @var bool Whether to forward the X-Client-IP header, if present */ |
100 | private $forwardXClientIP; |
101 | |
102 | /** |
103 | * @param MultiHttpClient $http |
104 | * @param string|int $enableEventBus A value of the wgEnableEventBus config, or a bitmask |
105 | * of TYPE_* constants |
106 | * @param EventFactory $eventFactory EventFactory to use for event construction. |
107 | * @param string $url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events |
108 | * @param int $maxBatchByteSize Maximum byte size of a batch |
109 | * @param int|null $timeout HTTP request timeout in seconds, defaults to 5. |
110 | * @param bool $forwardXClientIP Whether the X-Client-IP header should be forwarded |
111 | * to the intake service, if present |
112 | */ |
113 | public function __construct( |
114 | MultiHttpClient $http, |
115 | $enableEventBus, |
116 | EventFactory $eventFactory, |
117 | string $url, |
118 | int $maxBatchByteSize, |
119 | int $timeout = null, |
120 | bool $forwardXClientIP = false |
121 | ) { |
122 | $this->http = $http; |
123 | $this->url = $url; |
124 | $this->maxBatchByteSize = $maxBatchByteSize; |
125 | $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT; |
126 | $this->eventFactory = $eventFactory; |
127 | $this->forwardXClientIP = $forwardXClientIP; |
128 | |
129 | if ( is_int( $enableEventBus ) ) { |
130 | Assert::precondition( |
131 | (int)( $enableEventBus & self::TYPE_ALL ) === $enableEventBus, |
132 | 'Invalid $enableEventBus parameter: ' . $enableEventBus |
133 | ); |
134 | $this->allowedEventTypes = $enableEventBus; |
135 | } elseif ( is_string( $enableEventBus ) && $enableEventBus ) { |
136 | $this->allowedEventTypes = self::TYPE_NONE; |
137 | $allowedTypes = explode( '|', $enableEventBus ); |
138 | foreach ( $allowedTypes as $allowedType ) { |
139 | Assert::precondition( |
140 | array_key_exists( $allowedType, self::EVENT_TYPE_NAMES ), |
141 | "EnableEventBus: $allowedType not recognized" |
142 | ); |
143 | $this->allowedEventTypes |= self::EVENT_TYPE_NAMES[$allowedType]; |
144 | } |
145 | } else { |
146 | $this->allowedEventTypes = self::TYPE_ALL; |
147 | } |
148 | } |
149 | |
150 | /** |
151 | * @param array $events |
152 | * @param int $serializedSize |
153 | * @return array |
154 | */ |
155 | private function partitionEvents( array $events, int $serializedSize ): array { |
156 | $results = []; |
157 | |
158 | if ( count( $events ) > 1 ) { |
159 | $numOfChunks = ceil( $serializedSize / $this->maxBatchByteSize ); |
160 | $partitions = array_chunk( $events, (int)floor( count( $events ) / $numOfChunks ) ); |
161 | foreach ( $partitions as $partition ) { |
162 | $serializedPartition = self::serializeEvents( $partition ); |
163 | if ( strlen( $serializedPartition ) > $this->maxBatchByteSize ) { |
164 | $results = array_merge( |
165 | $results, |
166 | $this->partitionEvents( $partition, strlen( $serializedPartition ) ) |
167 | ); |
168 | } else { |
169 | $results[] = $serializedPartition; |
170 | } |
171 | } |
172 | } else { |
173 | self::logger()->warning( |
174 | "Event is larger than the maxBatchByteSize set.", |
175 | [ |
176 | 'raw_event' => self::prepareEventsForLogging( $events ) |
177 | ] |
178 | ); |
179 | $results = [ self::serializeEvents( $events ) ]; |
180 | } |
181 | return $results; |
182 | } |
183 | |
184 | /** |
185 | * Deliver an array of events to the remote service. |
186 | * |
187 | * @param array|string $events the events to send. |
188 | * @param int $type the type of the event being sent. |
189 | * @return array|bool|string True on success or an error string or array on failure |
190 | * @throws Exception |
191 | */ |
192 | public function send( $events, $type = self::TYPE_EVENT ) { |
193 | if ( !$this->shouldSendEvent( $type ) ) { |
194 | return "Events of type '$type' are not enqueueable"; |
195 | } |
196 | if ( !$events ) { |
197 | // Logstash doesn't like the args, because they could be of various types |
198 | $context = [ 'exception' => new RuntimeException() ]; |
199 | self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context ); |
200 | return "Provided event list is empty"; |
201 | } |
202 | |
203 | // If we already have a JSON string of events, just use it as the body. |
204 | if ( is_string( $events ) ) { |
205 | if ( strlen( $events ) > $this->maxBatchByteSize ) { |
206 | $decodeEvents = FormatJson::decode( $events, true ); |
207 | $body = $this->partitionEvents( $decodeEvents, strlen( $events ) ); |
208 | } else { |
209 | $body = $events; |
210 | } |
211 | } else { |
212 | self::validateJSONSerializable( $events ); |
213 | // Else serialize the array of events to a JSON string. |
214 | $serializedEvents = self::serializeEvents( $events ); |
215 | // If not $body, then something when wrong. |
216 | // serializeEvents has already logged, so we can just return. |
217 | if ( !$serializedEvents ) { |
218 | return "Unable to serialize events"; |
219 | } |
220 | |
221 | if ( strlen( $serializedEvents ) > $this->maxBatchByteSize ) { |
222 | $body = $this->partitionEvents( $events, strlen( $serializedEvents ) ); |
223 | } else { |
224 | $body = $serializedEvents; |
225 | } |
226 | } |
227 | |
228 | $reqs = array_map( function ( $body ) { |
229 | $req = [ |
230 | 'url' => $this->url, |
231 | 'method' => 'POST', |
232 | 'body' => $body, |
233 | 'headers' => [ 'content-type' => 'application/json' ] |
234 | ]; |
235 | if ( $this->forwardXClientIP && isset( $_SERVER['HTTP_X_CLIENT_IP'] ) ) { |
236 | $req['headers']['x-client-ip'] = $_SERVER['HTTP_X_CLIENT_IP']; |
237 | } |
238 | return $req; |
239 | }, is_array( $body ) ? $body : [ $body ] ); |
240 | |
241 | $responses = $this->http->runMulti( |
242 | $reqs, |
243 | [ |
244 | 'reqTimeout' => $this->timeout |
245 | ] |
246 | ); |
247 | |
248 | // 201: all events accepted. |
249 | // 202: all events accepted but not necessarily persisted. HTTP response is returned 'hastily'. |
250 | // 207: some but not all events accepted: either due to validation failure or error. |
251 | // 400: no events accepted: all failed schema validation. |
252 | // 500: no events accepted: at least one caused an error, but some might have been invalid. |
253 | $results = []; |
254 | foreach ( $responses as $response ) { |
255 | $res = $response['response']; |
256 | if ( $res['code'] == 207 || $res['code'] >= 300 ) { |
257 | $message = empty( $res['error'] ) ? |
258 | (string)$res['code'] . ': ' . (string)$res['reason'] : $res['error']; |
259 | // Limit the maximum size of the logged context to 8 kilobytes as that's where logstash |
260 | // truncates the JSON anyway |
261 | $context = [ |
262 | 'raw_events' => self::prepareEventsForLogging( $body ), |
263 | 'service_response' => $res, |
264 | 'exception' => new RuntimeException(), |
265 | ]; |
266 | self::logger()->error( "Unable to deliver all events: {$message}", $context ); |
267 | |
268 | $results[] = "Unable to deliver all events: $message"; |
269 | } |
270 | } |
271 | |
272 | if ( $results !== [] ) { |
273 | return $results; |
274 | } |
275 | |
276 | return true; |
277 | } |
278 | |
279 | // == static helper functions below == |
280 | |
281 | /** |
282 | * Serializes $events array to a JSON string. If FormatJson::encode() |
283 | * returns false, this will log a detailed error message and return null. |
284 | * |
285 | * @param array $events |
286 | * @return string|null JSON or null on failure |
287 | */ |
288 | public static function serializeEvents( $events ) { |
289 | try { |
290 | $serializedEvents = FormatJson::encode( $events, false, FormatJson::ALL_OK ); |
291 | if ( !$serializedEvents ) { |
292 | // Something failed. Let's figure out exactly which one. |
293 | $bad = []; |
294 | foreach ( $events as $event ) { |
295 | $result = FormatJson::encode( $event, false, FormatJson::ALL_OK ); |
296 | if ( !$result ) { |
297 | $bad[] = $event; |
298 | } |
299 | } |
300 | $context = [ |
301 | 'exception' => new RuntimeException(), |
302 | 'json_last_error' => json_last_error_msg(), |
303 | // Use PHP serialization since that will *always* work. |
304 | 'events' => serialize( $bad ), |
305 | ]; |
306 | self::logger()->error( |
307 | 'FormatJson::encode($events) failed: ' . $context['json_last_error'] . |
308 | '. Aborting send.', $context |
309 | ); |
310 | return null; |
311 | } |
312 | return $serializedEvents; |
313 | } catch ( Exception $exception ) { |
314 | $context = [ |
315 | 'exception' => $exception, |
316 | 'json_last_error' => json_last_error_msg() |
317 | ]; |
318 | self::logger()->error( |
319 | 'FormatJson::encode($events) thrown exception: ' . $context['json_last_error'] . |
320 | '. Aborting send.', $context |
321 | ); |
322 | return null; |
323 | } |
324 | } |
325 | |
326 | /** |
327 | * Prepares events for logging - serializes if needed, limits the size |
328 | * of the serialized event to 8kb. |
329 | * |
330 | * @param string|array $events |
331 | * @return string|null |
332 | */ |
333 | private static function prepareEventsForLogging( $events ) { |
334 | if ( is_array( $events ) ) { |
335 | $events = self::serializeEvents( $events ); |
336 | } |
337 | |
338 | if ( $events === null ) { |
339 | return null; |
340 | } |
341 | |
342 | return strlen( $events ) > 8192 ? substr( $events, 0, 8192 ) : $events; |
343 | } |
344 | |
345 | /** |
346 | * If $value is a string, but not UTF-8 encoded, then assume it is binary |
347 | * and base64 encode it and prefix it with a content type. |
348 | * @param mixed $value |
349 | * @return mixed |
350 | */ |
351 | public static function replaceBinaryValues( $value ) { |
352 | if ( is_string( $value ) && !mb_check_encoding( $value, 'UTF-8' ) ) { |
353 | return 'data:application/octet-stream;base64,' . base64_encode( $value ); |
354 | } |
355 | return $value; |
356 | } |
357 | |
358 | /** |
359 | * Recursively calls replaceBinaryValues on an array and transforms |
360 | * any binary values. $array is passed by reference and will be modified. |
361 | * @param array &$array |
362 | * @return bool return value of array_walk_recursive |
363 | */ |
364 | public static function replaceBinaryValuesRecursive( &$array ) { |
365 | return array_walk_recursive( $array, function ( &$item, $key ) { |
366 | $item = self::replaceBinaryValues( $item ); |
367 | } ); |
368 | } |
369 | |
370 | /** |
371 | * Checks a part of the event for JSON-serializability |
372 | * |
373 | * @param array $originalEvent an original event that is being checked. |
374 | * @param array $eventPart the sub-object nested in the original event to be checked. |
375 | */ |
376 | private static function checkEventPart( $originalEvent, $eventPart ) { |
377 | foreach ( $eventPart as $p => $v ) { |
378 | if ( is_array( $v ) ) { |
379 | self::checkEventPart( $originalEvent, $v ); |
380 | } elseif ( !is_scalar( $v ) && $v !== null ) { |
381 | // Only log the first appearance of non-scalar property per event as jobs |
382 | // might contain hundreds of properties and we do not want to log everything. |
383 | self::logger()->error( 'Non-scalar value found in the event', [ |
384 | 'raw_events' => self::prepareEventsForLogging( [ $originalEvent ] ), |
385 | 'prop_name' => $p, |
386 | 'prop_val_type' => get_class( $v ) |
387 | ] ); |
388 | // Follow to the next event in the array |
389 | return; |
390 | } |
391 | } |
392 | } |
393 | |
394 | /** |
395 | * Checks if the event is JSON-serializable (contains only scalar values) |
396 | * and logs the event if non-scalar found. |
397 | * |
398 | * @param array $events |
399 | */ |
400 | private static function validateJSONSerializable( $events ) { |
401 | foreach ( $events as $event ) { |
402 | self::checkEventPart( $event, $event ); |
403 | } |
404 | } |
405 | |
406 | private function shouldSendEvent( $eventType ) { |
407 | return $this->allowedEventTypes & $eventType; |
408 | } |
409 | |
410 | /** |
411 | * Returns a singleton logger instance for all EventBus instances. |
412 | * Use like: self::logger()->info( $mesage ) |
413 | * We use this so we don't have to check if the logger has been created |
414 | * before attempting to log a message. |
415 | * @return LoggerInterface |
416 | */ |
417 | public static function logger() { |
418 | if ( !self::$logger ) { |
419 | self::$logger = LoggerFactory::getInstance( 'EventBus' ); |
420 | } |
421 | return self::$logger; |
422 | } |
423 | |
424 | /** |
425 | * Returns the EventFactory associated with this instance of EventBus |
426 | * @return EventFactory|null |
427 | */ |
428 | public function getFactory() { |
429 | return $this->eventFactory; |
430 | } |
431 | |
432 | /** |
433 | * @param string|null $eventServiceName |
434 | * The name of a key in the EventServices config looked up via |
435 | * MediaWikiServices::getInstance()->getMainConfig()->get('EventServices'). |
436 | * The EventService config is keyed by service name, and should at least contain |
437 | * a 'url' entry pointing at the event service endpoint events should be |
438 | * POSTed to. They can also optionally contain a 'timeout' entry specifying |
439 | * the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry that can be |
440 | * set to true if the X-Client-IP header from the originating request should be |
441 | * forwarded to the event service. Instances are singletons identified by |
442 | * $eventServiceName. |
443 | * |
444 | * NOTE: Previously, this function took a $config object instead of an |
445 | * event service name. This is a backwards compatible change, but because |
446 | * there are no other users of this extension, we can do this safely. |
447 | * |
448 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
449 | * @return EventBus |
450 | */ |
451 | public static function getInstance( $eventServiceName ) { |
452 | return MediaWikiServices::getInstance() |
453 | ->get( 'EventBus.EventBusFactory' ) |
454 | ->getInstance( $eventServiceName ); |
455 | } |
456 | |
457 | /** |
458 | * Uses EventStreamConfig.StreamConfigs to look up the |
459 | * EventBus instance to use for a $stream. |
460 | * If the stream is disabled, a non-producing EventBus instance will be used. |
461 | * If none is found, falls back to using wgEventServiceDefault. |
462 | * |
463 | * @param string $stream the stream to send an event to |
464 | * @return EventBus |
465 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
466 | */ |
467 | public static function getInstanceForStream( $stream ) { |
468 | return MediaWikiServices::getInstance() |
469 | ->get( 'EventBus.EventBusFactory' ) |
470 | ->getInstanceForStream( $stream ); |
471 | } |
472 | } |