Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
62.50% |
175 / 280 |
|
47.06% |
8 / 17 |
CRAP | |
0.00% |
0 / 1 |
EventBus | |
62.50% |
175 / 280 |
|
47.06% |
8 / 17 |
311.84 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
24 / 24 |
|
100.00% |
1 / 1 |
6 | |||
partitionEvents | |
80.00% |
16 / 20 |
|
0.00% |
0 / 1 |
4.13 | |||
incrementMetricByValue | |
8.33% |
1 / 12 |
|
0.00% |
0 / 1 |
24.26 | |||
send | |
66.67% |
102 / 153 |
|
0.00% |
0 / 1 |
39.93 | |||
getStreamNameFromEvent | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
3 | |||
groupEventsByStream | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
2 | |||
serializeEvents | |
10.71% |
3 / 28 |
|
0.00% |
0 / 1 |
22.79 | |||
prepareEventsForLogging | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
4.13 | |||
replaceBinaryValues | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
3 | |||
replaceBinaryValuesRecursive | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
checkEventPart | |
40.00% |
4 / 10 |
|
0.00% |
0 / 1 |
10.40 | |||
validateJSONSerializable | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
shouldSendEvent | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
logger | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
getFactory | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getInstance | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getInstanceForStream | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | /** |
4 | * Event delivery. |
5 | * |
6 | * This program is free software; you can redistribute it and/or modify |
7 | * it under the terms of the GNU General Public License as published by |
8 | * the Free Software Foundation; either version 2 of the License, or |
9 | * (at your option) any later version. |
10 | * |
11 | * This program is distributed in the hope that it will be useful, |
12 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | * GNU General Public License for more details. |
15 | * |
16 | * You should have received a copy of the GNU General Public License along |
17 | * with this program; if not, write to the Free Software Foundation, Inc., |
18 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
19 | * http://www.gnu.org/copyleft/gpl.html |
20 | * |
21 | * @file |
22 | * @author Eric Evans, Andrew Otto |
23 | */ |
24 | |
25 | namespace MediaWiki\Extension\EventBus; |
26 | |
27 | use Exception; |
28 | use FormatJson; |
29 | use InvalidArgumentException; |
30 | use MediaWiki\Context\RequestContext; |
31 | use MediaWiki\Logger\LoggerFactory; |
32 | use MediaWiki\MediaWikiServices; |
33 | use MultiHttpClient; |
34 | use Psr\Log\LoggerInterface; |
35 | use RuntimeException; |
36 | use Wikimedia\Assert\Assert; |
37 | use Wikimedia\Stats\StatsFactory; |
38 | |
39 | class EventBus { |
40 | |
41 | /** |
42 | * @const int the special event type indicating no events should be accepted. |
43 | */ |
44 | public const TYPE_NONE = 0; |
45 | |
46 | /** |
47 | * @const int the event type indicating that the event is a regular mediawiki event. |
48 | */ |
49 | public const TYPE_EVENT = 1; |
50 | |
51 | /** |
52 | * @const int the event type indicating that the event is a serialized job. |
53 | */ |
54 | public const TYPE_JOB = 2; |
55 | |
56 | /** |
57 | * @const int the event type indicating that the event is a CDN purge. |
58 | */ |
59 | public const TYPE_PURGE = 4; |
60 | |
61 | /** |
62 | * @const int the event type indicating any event type. (TYPE_EVENT ^ TYPE_EVENT) |
63 | */ |
64 | public const TYPE_ALL = self::TYPE_EVENT | self::TYPE_JOB | self::TYPE_PURGE; |
65 | |
66 | /** |
67 | * @const array names of the event type constants defined above |
68 | */ |
69 | private const EVENT_TYPE_NAMES = [ |
70 | 'TYPE_NONE' => self::TYPE_NONE, |
71 | 'TYPE_EVENT' => self::TYPE_EVENT, |
72 | 'TYPE_JOB' => self::TYPE_JOB, |
73 | 'TYPE_PURGE' => self::TYPE_PURGE, |
74 | 'TYPE_ALL' => self::TYPE_ALL, |
75 | ]; |
76 | |
77 | /** @const int Default HTTP request timeout in seconds */ |
78 | private const DEFAULT_REQUEST_TIMEOUT = 10; |
79 | |
80 | /** @const string fallback value to use when a prometheus metrics label value |
81 | * (e.g. `meta.stream`) is not assigned. |
82 | */ |
83 | public const VALUE_UNKNOWN = "__value_unknown__"; |
84 | |
85 | /** @var LoggerInterface instance for all EventBus instances */ |
86 | private static $logger; |
87 | |
88 | /** @var MultiHttpClient */ |
89 | private $http; |
90 | |
91 | /** @var string EventServiceUrl for this EventBus instance */ |
92 | private $url; |
93 | |
94 | /** @var int HTTP request timeout for this EventBus instance */ |
95 | private $timeout; |
96 | |
97 | /** @var int which event types are allowed to be sent (TYPE_NONE|TYPE_EVENT|TYPE_JOB|TYPE_PURGE|TYPE_ALL) */ |
98 | private $allowedEventTypes; |
99 | |
100 | /** @var EventFactory|null event creator */ |
101 | private $eventFactory; |
102 | |
103 | /** @var int Maximum byte size of a batch */ |
104 | private $maxBatchByteSize; |
105 | |
106 | /** @var bool Whether to forward the X-Client-IP header, if present */ |
107 | private $forwardXClientIP; |
108 | |
109 | /** @var string intake event service name */ |
110 | private string $eventServiceName; |
111 | |
112 | /** @var ?StatsFactory wf:Stats factory instance */ |
113 | private ?StatsFactory $statsFactory; |
114 | |
115 | /** |
116 | * @param MultiHttpClient $http |
117 | * @param string|int $enableEventBus A value of the wgEnableEventBus config, or a bitmask |
118 | * of TYPE_* constants |
119 | * @param EventFactory $eventFactory EventFactory to use for event construction. |
120 | * @param string $url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events |
121 | * @param int $maxBatchByteSize Maximum byte size of a batch |
122 | * @param int|null $timeout HTTP request timeout in seconds, defaults to 5. |
123 | * @param bool $forwardXClientIP Whether the X-Client-IP header should be forwarded |
124 | * to the intake service, if present |
125 | * @param string $eventServiceName TODO: pass the event service name so that it can be used |
126 | * to label metrics. This is a hack put in place while refactoring efforts on this class are |
127 | * ongoing. This variable is used to label metrics in send(). |
128 | * @param ?StatsFactory|null $statsFactory wf:Stats factory instance |
129 | */ |
130 | public function __construct( |
131 | MultiHttpClient $http, |
132 | $enableEventBus, |
133 | EventFactory $eventFactory, |
134 | string $url, |
135 | int $maxBatchByteSize, |
136 | int $timeout = null, |
137 | bool $forwardXClientIP = false, |
138 | string $eventServiceName = EventBusFactory::EVENT_SERVICE_DISABLED_NAME, |
139 | ?StatsFactory $statsFactory = null |
140 | ) { |
141 | $this->http = $http; |
142 | $this->url = $url; |
143 | $this->maxBatchByteSize = $maxBatchByteSize; |
144 | $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT; |
145 | $this->eventFactory = $eventFactory; |
146 | $this->forwardXClientIP = $forwardXClientIP; |
147 | $this->eventServiceName = $eventServiceName; |
148 | $this->statsFactory = $statsFactory; |
149 | |
150 | if ( is_int( $enableEventBus ) ) { |
151 | Assert::precondition( |
152 | (int)( $enableEventBus & self::TYPE_ALL ) === $enableEventBus, |
153 | 'Invalid $enableEventBus parameter: ' . $enableEventBus |
154 | ); |
155 | $this->allowedEventTypes = $enableEventBus; |
156 | } elseif ( is_string( $enableEventBus ) && $enableEventBus ) { |
157 | $this->allowedEventTypes = self::TYPE_NONE; |
158 | $allowedTypes = explode( '|', $enableEventBus ); |
159 | foreach ( $allowedTypes as $allowedType ) { |
160 | Assert::precondition( |
161 | array_key_exists( $allowedType, self::EVENT_TYPE_NAMES ), |
162 | "EnableEventBus: $allowedType not recognized" |
163 | ); |
164 | $this->allowedEventTypes |= self::EVENT_TYPE_NAMES[$allowedType]; |
165 | } |
166 | } else { |
167 | $this->allowedEventTypes = self::TYPE_ALL; |
168 | } |
169 | } |
170 | |
171 | /** |
172 | * @param array $events |
173 | * @param int $serializedSize |
174 | * @return array |
175 | */ |
176 | private function partitionEvents( array $events, int $serializedSize ): array { |
177 | $results = []; |
178 | |
179 | if ( count( $events ) > 1 ) { |
180 | $numOfChunks = ceil( $serializedSize / $this->maxBatchByteSize ); |
181 | $partitions = array_chunk( $events, (int)floor( count( $events ) / $numOfChunks ) ); |
182 | foreach ( $partitions as $partition ) { |
183 | $serializedPartition = self::serializeEvents( $partition ); |
184 | if ( strlen( $serializedPartition ) > $this->maxBatchByteSize ) { |
185 | $results = array_merge( |
186 | $results, |
187 | $this->partitionEvents( $partition, strlen( $serializedPartition ) ) |
188 | ); |
189 | } else { |
190 | $results[] = $serializedPartition; |
191 | } |
192 | } |
193 | } else { |
194 | self::logger()->warning( |
195 | "Event is larger than the maxBatchByteSize set.", |
196 | [ |
197 | 'raw_event' => self::prepareEventsForLogging( $events ) |
198 | ] |
199 | ); |
200 | $results = [ self::serializeEvents( $events ) ]; |
201 | } |
202 | return $results; |
203 | } |
204 | |
205 | /** |
206 | * @param string $metricName |
207 | * @param int $value |
208 | * @param mixed ...$labels passed as $key => $value pairs |
209 | * @return void |
210 | */ |
211 | private function incrementMetricByValue( string $metricName, int $value, ...$labels ): void { |
212 | if ( isset( $this->statsFactory ) ) { |
213 | $metric = $this->statsFactory->getCounter( $metricName ); |
214 | foreach ( $labels as $label ) { |
215 | foreach ( $label as $k => $v ) { |
216 | // Bug: T373086 |
217 | if ( !isset( $v ) ) { |
218 | $v = self::VALUE_UNKNOWN; |
219 | self::logger()->warning( |
220 | ' Initialized metric label does not have an assigned value. ', |
221 | [ "metric_label" => $k ] |
222 | ); |
223 | } |
224 | $metric->setLabel( $k, $v ); |
225 | } |
226 | } |
227 | // Under the hood, Counter::incrementBy will update an integer |
228 | // valued counter, regardless of `$value` type. |
229 | $metric->incrementBy( $value ); |
230 | } |
231 | } |
232 | |
233 | /** |
234 | * Deliver an array of events to the remote event intake service. |
235 | * |
236 | * Statslib metrics emitted by this method: |
237 | * |
238 | * - events_outgoing_total |
239 | * - events_outgoing_by_stream_total |
240 | * - events_accepted_total |
241 | * - events_failed_total |
242 | * - events_failed_by_stream_total |
243 | * - event_service_response_total |
244 | * - event_batch_not_enqueable_total |
245 | * - event_batch_is_string_total |
246 | * - event_batch_not_serializable_total |
247 | * - event_batch_partitioned_total |
248 | * (incremented if $events had to be paratitioned and sent in multiple POST requests) |
249 | * |
250 | * @param array|string $events the events to send. |
251 | * @param int $type the type of the event being sent. |
252 | * @return array|bool|string True on success or an error string or array on failure |
253 | * @throws Exception |
254 | */ |
255 | public function send( $events, $type = self::TYPE_EVENT ) { |
256 | // Label metrics by event type name. If the lookup fails, |
257 | // fall back to labeling with the $type id parameter. Unknown or invalid ids |
258 | // will be reported by the `events_are_not_enqueable` metric, which |
259 | // fires when an event type does not belong to this EventBus instance allow list. |
260 | // It should not be possible to supply a $type that does not belong |
261 | // to EVENT_TYPE_NAME. Falling back to a numerical id is just a guard |
262 | // to help us spot eventual bugs. |
263 | $eventType = array_search( $type, self::EVENT_TYPE_NAMES ); |
264 | $eventType = ( $eventType === false ) ? $type : $eventType; |
265 | // Label metrics with the EventBus declared default service host. |
266 | // In the future, for streams that declare one, use the one provided by EventStreamConfig instead. |
267 | $baseMetricLabels = [ |
268 | "function_name" => "send", |
269 | "event_type" => $eventType, |
270 | "event_service_name" => $this->eventServiceName, |
271 | "event_service_uri" => $this->url |
272 | ]; |
273 | |
274 | if ( !$this->shouldSendEvent( $type ) ) { |
275 | // Debug metric. How often is the `$events` param not enqueable? |
276 | $this->incrementMetricByValue( |
277 | "event_batch_not_enqueable_total", |
278 | 1, |
279 | $baseMetricLabels |
280 | ); |
281 | return "Events of type '$type' are not enqueueable"; |
282 | } |
283 | if ( !$events ) { |
284 | // Logstash doesn't like the args, because they could be of various types |
285 | $context = [ 'exception' => new RuntimeException() ]; |
286 | self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context ); |
287 | return "Provided event list is empty"; |
288 | } |
289 | |
290 | // Historically, passing a JSON string has been supported, but we'd like to deprecate this feature. |
291 | // This was done to avoid having extra encode+decode steps if the caller already has a JSON string. |
292 | // But, we end up decoding always anyway, to properly increment metrics. |
293 | if ( is_string( $events ) ) { |
294 | // TODO: is $events ever passed as a string? We should refactor this block and simplify the union type. |
295 | // Debug metric. How often is the `$events` param a string? |
296 | $this->incrementMetricByValue( |
297 | "event_batch_is_string_total", |
298 | 1, |
299 | $baseMetricLabels |
300 | ); |
301 | |
302 | $decodedEvents = FormatJson::decode( $events, true ); |
303 | |
304 | if ( $decodedEvents === null ) { |
305 | $context = [ |
306 | 'exception' => new RuntimeException(), |
307 | 'raw_events' => self::prepareEventsForLogging( $events ) |
308 | ]; |
309 | self::logger()->error( 'Failed decoding events from JSON string.', $context ); |
310 | return "Failed decoding events from JSON string"; |
311 | } |
312 | |
313 | $events = $decodedEvents; |
314 | } |
315 | |
316 | // Code below expects that $events is a numeric array of event assoc arrays. |
317 | if ( !array_key_exists( 0, $events ) ) { |
318 | $events = [ $events ]; |
319 | } |
320 | $outgoingEventsCount = count( $events ); |
321 | |
322 | // Increment events_outgoing_total |
323 | // NOTE: We could just use events_outgoing_by_stream_total and sum, |
324 | // but below we want to emit an events_accepted_total. |
325 | // In the case of a 207 partial success, we don't know |
326 | // the stream names of the successful events |
327 | // (without diffing the response from the event service and $events). |
328 | // For consistency, we both events_outgoing_total and events_outgoing_by_stream_total. |
329 | $this->incrementMetricByValue( |
330 | "events_outgoing_total", |
331 | $outgoingEventsCount, |
332 | $baseMetricLabels, |
333 | ); |
334 | |
335 | // Increment events_outgoing_by_stream_total for each stream |
336 | $eventsByStream = self::groupEventsByStream( $events ); |
337 | foreach ( $eventsByStream as $streamName => $eventsForStreamName ) { |
338 | $this->incrementMetricByValue( |
339 | "events_outgoing_by_stream_total", |
340 | count( $eventsForStreamName ), |
341 | $baseMetricLabels, |
342 | [ "stream_name" => $streamName ] |
343 | ); |
344 | } |
345 | |
346 | // validateJSONSerializable only logs if any part of the event is not serializable. |
347 | // It does not return anything or raise any exceptions. |
348 | self::validateJSONSerializable( $events ); |
349 | // Serialize the array of events to a JSON string. |
350 | $serializedEvents = self::serializeEvents( $events ); |
351 | if ( !$serializedEvents ) { |
352 | $this->incrementMetricByValue( |
353 | "event_batch_not_serializable_total", |
354 | 1, |
355 | $baseMetricLabels |
356 | ); |
357 | // serializeEvents has already logged, so we can just return. |
358 | return "Unable to serialize events"; |
359 | } |
360 | |
361 | // If the body would be too big, partition it into multiple bodies. |
362 | if ( strlen( $serializedEvents ) > $this->maxBatchByteSize ) { |
363 | $postBodies = $this->partitionEvents( $events, strlen( $serializedEvents ) ); |
364 | // Measure the number of times we partition events into more than one batch. |
365 | $this->incrementMetricByValue( |
366 | "event_batch_partitioned_total", |
367 | 1, |
368 | $baseMetricLabels |
369 | ); |
370 | } else { |
371 | $postBodies = [ $serializedEvents ]; |
372 | } |
373 | |
374 | // Most of the time $postBodies will be a single element array, and we |
375 | // will only need to send one POST request. |
376 | // When the size is too large, $events will have been partitioned into |
377 | // multiple $postBodies, for which each will be sent as its own POST request. |
378 | $originalRequest = RequestContext::getMain()->getRequest(); |
379 | $requests = array_map( |
380 | function ( $postBody ) use ( $originalRequest ) { |
381 | $req = [ |
382 | 'url' => $this->url, |
383 | 'method' => 'POST', |
384 | 'body' => $postBody, |
385 | 'headers' => [ 'content-type' => 'application/json' ] |
386 | ]; |
387 | if ( $this->forwardXClientIP ) { |
388 | $req['headers']['x-client-ip'] = $originalRequest->getIP(); |
389 | } |
390 | return $req; |
391 | }, |
392 | $postBodies |
393 | ); |
394 | |
395 | // Do the POST requests. |
396 | $responses = $this->http->runMulti( |
397 | $requests, |
398 | [ |
399 | 'reqTimeout' => $this->timeout |
400 | ] |
401 | ); |
402 | |
403 | // Keep track of the total number of failed events. |
404 | // This will be used to calculate events_accepted_count later. |
405 | $failedEventsCountTotal = 0; |
406 | |
407 | // 201: all events accepted. |
408 | // 202: all events accepted but not necessarily persisted. HTTP response is returned 'hastily'. |
409 | // 207: some but not all events accepted: either due to validation failure or error. |
410 | // 400: no events accepted: all failed schema validation. |
411 | // 500: no events accepted: at least one caused an error, but some might have been invalid. |
412 | $results = []; |
413 | foreach ( $responses as $i => $response ) { |
414 | $res = $response['response']; |
415 | $code = $res['code']; |
416 | |
417 | $this->incrementMetricByValue( |
418 | "event_service_response_total", |
419 | 1, |
420 | $baseMetricLabels, |
421 | [ "status_code" => $code ] |
422 | ); |
423 | |
424 | if ( $code == 207 || $code >= 300 ) { |
425 | $message = empty( $res['error'] ) ? |
426 | (string)$code . ': ' . (string)$res['reason'] : $res['error']; |
427 | // Limit the maximum size of the logged context to 8 kilobytes as that's where logstash |
428 | // truncates the JSON anyway. |
429 | $context = [ |
430 | // $responses[$i] corresponds with $postBodies[$i] |
431 | 'raw_events' => self::prepareEventsForLogging( $postBodies[$i] ), |
432 | 'service_response' => $res, |
433 | 'exception' => new RuntimeException(), |
434 | ]; |
435 | self::logger()->error( "Unable to deliver all events: {$message}", $context ); |
436 | |
437 | if ( isset( $res['body'] ) ) { |
438 | // We expect the event service to return an array of objects |
439 | // in the response body. |
440 | // FormatJson::decode will return `null` if the message failed to parse. |
441 | // If anything other than an array is parsed we treat it as unexpected |
442 | // behaviour, and log the response at error severity. |
443 | // See https://phabricator.wikimedia.org/T370428 |
444 | |
445 | // $failureInfosByKind should look like: |
446 | // { |
447 | // "<failure_kind">: [ |
448 | // { ..., "event": {<failed event here>}, "context": {<failure context here>}, |
449 | // { ... } |
450 | // ], |
451 | // } |
452 | $failureInfosByKind = FormatJson::decode( $res['body'], true ); |
453 | if ( is_array( $failureInfosByKind ) ) { |
454 | |
455 | foreach ( $failureInfosByKind as $failureKind => $failureInfos ) { |
456 | // $failureInfos should not be null or empty. |
457 | // This is just a guard against what the intake |
458 | // service returns (or the behavior of different json parsing methods - possibly). |
459 | // https://www.mediawiki.org/wiki/Manual:Coding_conventions/PHP#empty() |
460 | if ( !isset( $failureInfos ) || $failureInfos === [] ) { |
461 | continue; |
462 | } |
463 | |
464 | // Get the events that failed from the response. |
465 | $failedEvents = array_map( |
466 | static function ( $failureStatus ) { |
467 | return $failureStatus['event'] ?? null; |
468 | }, |
469 | $failureInfos |
470 | ); |
471 | |
472 | $failedEventsCount = count( $failedEvents ); |
473 | $failedEventsCountTotal += $failedEventsCount; |
474 | |
475 | // increment events_failed_total |
476 | $this->incrementMetricByValue( |
477 | "events_failed_total", |
478 | $failedEventsCount, |
479 | $baseMetricLabels, |
480 | [ |
481 | "failure_kind" => $failureKind, |
482 | "status_code" => $code, |
483 | ] |
484 | ); |
485 | |
486 | // Group failed events by stream and increment events_failed_by_stream_total. |
487 | $failedEventsByStream = self::groupEventsByStream( $failedEvents ); |
488 | foreach ( $failedEventsByStream as $streamName => $failedEventsForStream ) { |
489 | $this->incrementMetricByValue( |
490 | "events_failed_by_stream_total", |
491 | count( $failedEventsForStream ), |
492 | $baseMetricLabels, |
493 | [ |
494 | "failure_kind" => $failureKind, |
495 | "status_code" => $code, |
496 | "stream_name" => $streamName, |
497 | ] |
498 | ); |
499 | } |
500 | } |
501 | |
502 | } else { |
503 | self::logger()->error( "Invalid event service response body", $context ); |
504 | } |
505 | } |
506 | $results[] = "Unable to deliver all events: $message"; |
507 | } |
508 | } |
509 | |
510 | // increment events_accepted_total as the difference between |
511 | // $outgoingEventsCount and $failedEventsCountTotal (if there were any failed events). |
512 | $this->incrementMetricByValue( |
513 | "events_accepted_total", |
514 | $outgoingEventsCount - $failedEventsCountTotal, |
515 | $baseMetricLabels, |
516 | ); |
517 | |
518 | if ( $results !== [] ) { |
519 | return $results; |
520 | } |
521 | |
522 | return true; |
523 | } |
524 | |
525 | // == static helper functions below == |
526 | |
527 | /** |
528 | * Given an event assoc array, extracts the stream name from meta.stream, |
529 | * or returns STREAM_NAME_UNKNOWN |
530 | * @param array|null $event |
531 | * @return mixed|string |
532 | */ |
533 | public static function getStreamNameFromEvent( ?array $event ) { |
534 | return is_array( $event ) && isset( $event['meta']['stream'] ) ? |
535 | $event['meta']['stream'] : |
536 | self::VALUE_UNKNOWN; |
537 | } |
538 | |
539 | /** |
540 | * Given an assoc array of events, this returns them grouped by stream name. |
541 | * @param array $events |
542 | * @return array |
543 | */ |
544 | public static function groupEventsByStream( array $events ): array { |
545 | $groupedEvents = []; |
546 | foreach ( $events as $event ) { |
547 | $streamName = self::getStreamNameFromEvent( $event ); |
548 | $groupedEvents[$streamName] ??= []; |
549 | $groupedEvents[$streamName][] = $event; |
550 | } |
551 | return $groupedEvents; |
552 | } |
553 | |
554 | /** |
555 | * Serializes $events array to a JSON string. If FormatJson::encode() |
556 | * returns false, this will log a detailed error message and return null. |
557 | * |
558 | * @param array $events |
559 | * @return string|null JSON or null on failure |
560 | */ |
561 | public static function serializeEvents( $events ) { |
562 | try { |
563 | $serializedEvents = FormatJson::encode( $events, false, FormatJson::ALL_OK ); |
564 | if ( !$serializedEvents ) { |
565 | // Something failed. Let's figure out exactly which one. |
566 | $bad = []; |
567 | foreach ( $events as $event ) { |
568 | $result = FormatJson::encode( $event, false, FormatJson::ALL_OK ); |
569 | if ( !$result ) { |
570 | $bad[] = $event; |
571 | } |
572 | } |
573 | $context = [ |
574 | 'exception' => new RuntimeException(), |
575 | 'json_last_error' => json_last_error_msg(), |
576 | // Use PHP serialization since that will *always* work. |
577 | 'events' => serialize( $bad ), |
578 | ]; |
579 | self::logger()->error( |
580 | 'FormatJson::encode($events) failed: ' . $context['json_last_error'] . |
581 | '. Aborting send.', $context |
582 | ); |
583 | return null; |
584 | } |
585 | return $serializedEvents; |
586 | } catch ( Exception $exception ) { |
587 | $context = [ |
588 | 'exception' => $exception, |
589 | 'json_last_error' => json_last_error_msg() |
590 | ]; |
591 | self::logger()->error( |
592 | 'FormatJson::encode($events) thrown exception: ' . $context['json_last_error'] . |
593 | '. Aborting send.', $context |
594 | ); |
595 | return null; |
596 | } |
597 | } |
598 | |
599 | /** |
600 | * Prepares events for logging - serializes if needed, limits the size |
601 | * of the serialized event to 8kb. |
602 | * |
603 | * @param string|array $events |
604 | * @return string|null |
605 | */ |
606 | private static function prepareEventsForLogging( $events ) { |
607 | if ( is_array( $events ) ) { |
608 | $events = self::serializeEvents( $events ); |
609 | } |
610 | |
611 | if ( $events === null ) { |
612 | return null; |
613 | } |
614 | |
615 | return strlen( $events ) > 8192 ? substr( $events, 0, 8192 ) : $events; |
616 | } |
617 | |
618 | /** |
619 | * If $value is a string, but not UTF-8 encoded, then assume it is binary |
620 | * and base64 encode it and prefix it with a content type. |
621 | * @param mixed $value |
622 | * @return mixed |
623 | */ |
624 | public static function replaceBinaryValues( $value ) { |
625 | if ( is_string( $value ) && !mb_check_encoding( $value, 'UTF-8' ) ) { |
626 | return 'data:application/octet-stream;base64,' . base64_encode( $value ); |
627 | } |
628 | return $value; |
629 | } |
630 | |
631 | /** |
632 | * Recursively calls replaceBinaryValues on an array and transforms |
633 | * any binary values. $array is passed by reference and will be modified. |
634 | * @param array &$array |
635 | * @return bool return value of array_walk_recursive |
636 | */ |
637 | public static function replaceBinaryValuesRecursive( &$array ) { |
638 | return array_walk_recursive( $array, function ( &$item, $key ) { |
639 | $item = self::replaceBinaryValues( $item ); |
640 | } ); |
641 | } |
642 | |
643 | /** |
644 | * Checks a part of the event for JSON-serializability |
645 | * |
646 | * @param array $originalEvent an original event that is being checked. |
647 | * @param array $eventPart the sub-object nested in the original event to be checked. |
648 | */ |
649 | private static function checkEventPart( $originalEvent, $eventPart ) { |
650 | foreach ( $eventPart as $p => $v ) { |
651 | if ( is_array( $v ) ) { |
652 | self::checkEventPart( $originalEvent, $v ); |
653 | } elseif ( !is_scalar( $v ) && $v !== null ) { |
654 | // Only log the first appearance of non-scalar property per event as jobs |
655 | // might contain hundreds of properties and we do not want to log everything. |
656 | self::logger()->error( 'Non-scalar value found in the event', [ |
657 | 'raw_events' => self::prepareEventsForLogging( [ $originalEvent ] ), |
658 | 'prop_name' => $p, |
659 | 'prop_val_type' => get_class( $v ) |
660 | ] ); |
661 | // Follow to the next event in the array |
662 | return; |
663 | } |
664 | } |
665 | } |
666 | |
667 | /** |
668 | * Checks if the event is JSON-serializable (contains only scalar values) |
669 | * and logs the event if non-scalar found. |
670 | * |
671 | * @param array $events |
672 | */ |
673 | private static function validateJSONSerializable( $events ) { |
674 | foreach ( $events as $event ) { |
675 | self::checkEventPart( $event, $event ); |
676 | } |
677 | } |
678 | |
679 | private function shouldSendEvent( $eventType ) { |
680 | return $this->allowedEventTypes & $eventType; |
681 | } |
682 | |
683 | /** |
684 | * Returns a singleton logger instance for all EventBus instances. |
685 | * Use like: self::logger()->info( $mesage ) |
686 | * We use this so we don't have to check if the logger has been created |
687 | * before attempting to log a message. |
688 | * @return LoggerInterface |
689 | */ |
690 | public static function logger() { |
691 | if ( !self::$logger ) { |
692 | self::$logger = LoggerFactory::getInstance( 'EventBus' ); |
693 | } |
694 | return self::$logger; |
695 | } |
696 | |
697 | /** |
698 | * Returns the EventFactory associated with this instance of EventBus |
699 | * @return EventFactory|null |
700 | */ |
701 | public function getFactory() { |
702 | return $this->eventFactory; |
703 | } |
704 | |
705 | /** |
706 | * @param string|null $eventServiceName |
707 | * The name of a key in the EventServices config looked up via |
708 | * MediaWikiServices::getInstance()->getMainConfig()->get('EventServices'). |
709 | * The EventService config is keyed by service name, and should at least contain |
710 | * a 'url' entry pointing at the event service endpoint events should be |
711 | * POSTed to. They can also optionally contain a 'timeout' entry specifying |
712 | * the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry that can be |
713 | * set to true if the X-Client-IP header from the originating request should be |
714 | * forwarded to the event service. Instances are singletons identified by |
715 | * $eventServiceName. |
716 | * |
717 | * NOTE: Previously, this function took a $config object instead of an |
718 | * event service name. This is a backwards compatible change, but because |
719 | * there are no other users of this extension, we can do this safely. |
720 | * |
721 | * @return EventBus |
722 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
723 | */ |
724 | public static function getInstance( $eventServiceName ) { |
725 | return MediaWikiServices::getInstance() |
726 | ->get( 'EventBus.EventBusFactory' ) |
727 | ->getInstance( $eventServiceName ); |
728 | } |
729 | |
730 | /** |
731 | * Uses EventStreamConfig.StreamConfigs to look up the |
732 | * EventBus instance to use for a $stream. |
733 | * If the stream is disabled, a non-producing EventBus instance will be used. |
734 | * If none is found, falls back to using wgEventServiceDefault. |
735 | * |
736 | * @param string $stream the stream to send an event to |
737 | * @return EventBus |
738 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
739 | */ |
740 | public static function getInstanceForStream( $stream ) { |
741 | return MediaWikiServices::getInstance() |
742 | ->get( 'EventBus.EventBusFactory' ) |
743 | ->getInstanceForStream( $stream ); |
744 | } |
745 | } |