Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
68.16% |
152 / 223 |
|
46.67% |
7 / 15 |
CRAP | |
0.00% |
0 / 1 |
EventBus | |
68.16% |
152 / 223 |
|
46.67% |
7 / 15 |
191.10 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
24 / 24 |
|
100.00% |
1 / 1 |
6 | |||
partitionEvents | |
100.00% |
20 / 20 |
|
100.00% |
1 / 1 |
4 | |||
incrementMetricByValue | |
16.67% |
1 / 6 |
|
0.00% |
0 / 1 |
19.47 | |||
send | |
75.68% |
84 / 111 |
|
0.00% |
0 / 1 |
28.97 | |||
serializeEvents | |
10.71% |
3 / 28 |
|
0.00% |
0 / 1 |
22.79 | |||
prepareEventsForLogging | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
4.13 | |||
replaceBinaryValues | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
3 | |||
replaceBinaryValuesRecursive | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
checkEventPart | |
40.00% |
4 / 10 |
|
0.00% |
0 / 1 |
10.40 | |||
validateJSONSerializable | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
2 | |||
shouldSendEvent | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
logger | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
getFactory | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getInstance | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getInstanceForStream | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | /** |
4 | * Event delivery. |
5 | * |
6 | * This program is free software; you can redistribute it and/or modify |
7 | * it under the terms of the GNU General Public License as published by |
8 | * the Free Software Foundation; either version 2 of the License, or |
9 | * (at your option) any later version. |
10 | * |
11 | * This program is distributed in the hope that it will be useful, |
12 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | * GNU General Public License for more details. |
15 | * |
16 | * You should have received a copy of the GNU General Public License along |
17 | * with this program; if not, write to the Free Software Foundation, Inc., |
18 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
19 | * http://www.gnu.org/copyleft/gpl.html |
20 | * |
21 | * @file |
22 | * @author Eric Evans, Andrew Otto |
23 | */ |
24 | |
25 | namespace MediaWiki\Extension\EventBus; |
26 | |
27 | use Exception; |
28 | use FormatJson; |
29 | use InvalidArgumentException; |
30 | use MediaWiki\Context\RequestContext; |
31 | use MediaWiki\Logger\LoggerFactory; |
32 | use MediaWiki\MediaWikiServices; |
33 | use MultiHttpClient; |
34 | use Psr\Log\LoggerInterface; |
35 | use RuntimeException; |
36 | use Wikimedia\Assert\Assert; |
37 | use Wikimedia\Stats\StatsFactory; |
38 | |
39 | class EventBus { |
40 | |
41 | /** |
42 | * @const int the special event type indicating no events should be accepted. |
43 | */ |
44 | public const TYPE_NONE = 0; |
45 | |
46 | /** |
47 | * @const int the event type indicating that the event is a regular mediawiki event. |
48 | */ |
49 | public const TYPE_EVENT = 1; |
50 | |
51 | /** |
52 | * @const int the event type indicating that the event is a serialized job. |
53 | */ |
54 | public const TYPE_JOB = 2; |
55 | |
56 | /** |
57 | * @const int the event type indicating that the event is a CDN purge. |
58 | */ |
59 | public const TYPE_PURGE = 4; |
60 | |
61 | /** |
62 | * @const int the event type indicating any event type. (TYPE_EVENT ^ TYPE_EVENT) |
63 | */ |
64 | public const TYPE_ALL = self::TYPE_EVENT | self::TYPE_JOB | self::TYPE_PURGE; |
65 | |
66 | /** |
67 | * @const array names of the event type constants defined above |
68 | */ |
69 | private const EVENT_TYPE_NAMES = [ |
70 | 'TYPE_NONE' => self::TYPE_NONE, |
71 | 'TYPE_EVENT' => self::TYPE_EVENT, |
72 | 'TYPE_JOB' => self::TYPE_JOB, |
73 | 'TYPE_PURGE' => self::TYPE_PURGE, |
74 | 'TYPE_ALL' => self::TYPE_ALL, |
75 | ]; |
76 | |
77 | /** @const int Default HTTP request timeout in seconds */ |
78 | private const DEFAULT_REQUEST_TIMEOUT = 10; |
79 | /** @const string fallback stream name to use when the `meta.stream` field is |
80 | * not set in events payload. |
81 | */ |
82 | private const STREAM_UNKNOWN_NAME = "__stream_unknown__"; |
83 | |
84 | /** @var LoggerInterface instance for all EventBus instances */ |
85 | private static $logger; |
86 | |
87 | /** @var MultiHttpClient */ |
88 | private $http; |
89 | |
90 | /** @var string EventServiceUrl for this EventBus instance */ |
91 | private $url; |
92 | |
93 | /** @var int HTTP request timeout for this EventBus instance */ |
94 | private $timeout; |
95 | |
96 | /** @var int which event types are allowed to be sent (TYPE_NONE|TYPE_EVENT|TYPE_JOB|TYPE_PURGE|TYPE_ALL) */ |
97 | private $allowedEventTypes; |
98 | |
99 | /** @var EventFactory|null event creator */ |
100 | private $eventFactory; |
101 | |
102 | /** @var int Maximum byte size of a batch */ |
103 | private $maxBatchByteSize; |
104 | |
105 | /** @var bool Whether to forward the X-Client-IP header, if present */ |
106 | private $forwardXClientIP; |
107 | |
108 | /** @var string intake event service name */ |
109 | private string $eventServiceName; |
110 | |
111 | /** @var ?StatsFactory wf:Stats factory instance */ |
112 | private ?StatsFactory $statsFactory; |
113 | |
114 | /** |
115 | * @param MultiHttpClient $http |
116 | * @param string|int $enableEventBus A value of the wgEnableEventBus config, or a bitmask |
117 | * of TYPE_* constants |
118 | * @param EventFactory $eventFactory EventFactory to use for event construction. |
119 | * @param string $url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events |
120 | * @param int $maxBatchByteSize Maximum byte size of a batch |
121 | * @param int|null $timeout HTTP request timeout in seconds, defaults to 5. |
122 | * @param bool $forwardXClientIP Whether the X-Client-IP header should be forwarded |
123 | * to the intake service, if present |
124 | * @param string $eventServiceName TODO: pass the event service name so that it can be used |
125 | * to label metrics. This is a hack put in place while refactoring efforts on this class are |
126 | * ongoing. This variable is used to label metrics in send(). |
127 | * @param ?StatsFactory|null $statsFactory wf:Stats factory instance |
128 | */ |
129 | public function __construct( |
130 | MultiHttpClient $http, |
131 | $enableEventBus, |
132 | EventFactory $eventFactory, |
133 | string $url, |
134 | int $maxBatchByteSize, |
135 | int $timeout = null, |
136 | bool $forwardXClientIP = false, |
137 | string $eventServiceName = EventBusFactory::EVENT_SERVICE_DISABLED_NAME, |
138 | ?StatsFactory $statsFactory = null |
139 | ) { |
140 | $this->http = $http; |
141 | $this->url = $url; |
142 | $this->maxBatchByteSize = $maxBatchByteSize; |
143 | $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT; |
144 | $this->eventFactory = $eventFactory; |
145 | $this->forwardXClientIP = $forwardXClientIP; |
146 | $this->eventServiceName = $eventServiceName; |
147 | $this->statsFactory = $statsFactory; |
148 | |
149 | if ( is_int( $enableEventBus ) ) { |
150 | Assert::precondition( |
151 | (int)( $enableEventBus & self::TYPE_ALL ) === $enableEventBus, |
152 | 'Invalid $enableEventBus parameter: ' . $enableEventBus |
153 | ); |
154 | $this->allowedEventTypes = $enableEventBus; |
155 | } elseif ( is_string( $enableEventBus ) && $enableEventBus ) { |
156 | $this->allowedEventTypes = self::TYPE_NONE; |
157 | $allowedTypes = explode( '|', $enableEventBus ); |
158 | foreach ( $allowedTypes as $allowedType ) { |
159 | Assert::precondition( |
160 | array_key_exists( $allowedType, self::EVENT_TYPE_NAMES ), |
161 | "EnableEventBus: $allowedType not recognized" |
162 | ); |
163 | $this->allowedEventTypes |= self::EVENT_TYPE_NAMES[$allowedType]; |
164 | } |
165 | } else { |
166 | $this->allowedEventTypes = self::TYPE_ALL; |
167 | } |
168 | } |
169 | |
170 | /** |
171 | * @param array $events |
172 | * @param int $serializedSize |
173 | * @return array |
174 | */ |
175 | private function partitionEvents( array $events, int $serializedSize ): array { |
176 | $results = []; |
177 | |
178 | if ( count( $events ) > 1 ) { |
179 | $numOfChunks = ceil( $serializedSize / $this->maxBatchByteSize ); |
180 | $partitions = array_chunk( $events, (int)floor( count( $events ) / $numOfChunks ) ); |
181 | foreach ( $partitions as $partition ) { |
182 | $serializedPartition = self::serializeEvents( $partition ); |
183 | if ( strlen( $serializedPartition ) > $this->maxBatchByteSize ) { |
184 | $results = array_merge( |
185 | $results, |
186 | $this->partitionEvents( $partition, strlen( $serializedPartition ) ) |
187 | ); |
188 | } else { |
189 | $results[] = $serializedPartition; |
190 | } |
191 | } |
192 | } else { |
193 | self::logger()->warning( |
194 | "Event is larger than the maxBatchByteSize set.", |
195 | [ |
196 | 'raw_event' => self::prepareEventsForLogging( $events ) |
197 | ] |
198 | ); |
199 | $results = [ self::serializeEvents( $events ) ]; |
200 | } |
201 | return $results; |
202 | } |
203 | |
204 | /** |
205 | * @param string $metricName |
206 | * @param int $value |
207 | * @param mixed ...$labels passed as $key => $value pairs |
208 | * @return void |
209 | */ |
210 | private function incrementMetricByValue( string $metricName, int $value, ...$labels ): void { |
211 | // Feature flag to enable instrumentation on Beta |
212 | // https://wikitech.wikimedia.org/wiki/Nova_Resource:Deployment-prep/How_code_is_updated#My_code_introduces_a_feature_that_is_not_yet_ready_for_production,_should_I_wait_to_merge_in_master? |
213 | global $wgEnableEventBusInstrumentation; |
214 | if ( $wgEnableEventBusInstrumentation && isset( $this->statsFactory ) ) { |
215 | $metric = $this->statsFactory->getCounter( $metricName ); |
216 | foreach ( $labels as $label ) { |
217 | foreach ( $label as $k => $v ) { |
218 | $metric->setLabel( $k, $v ); |
219 | } |
220 | } |
221 | // Under the hood, Counter::incrementBy will update an integer |
222 | // valued counter, regardless of `$value` type. |
223 | $metric->incrementBy( $value ); |
224 | } |
225 | } |
226 | |
227 | /** |
228 | * Deliver an array of events to the remote service. |
229 | * |
230 | * @param array|string $events the events to send. |
231 | * @param int $type the type of the event being sent. |
232 | * @return array|bool|string True on success or an error string or array on failure |
233 | * @throws Exception |
234 | */ |
235 | public function send( $events, $type = self::TYPE_EVENT ) { |
236 | // Label metrics by event type name. If the lookup fails, |
237 | // fall back to labeling with the $type id parameter. Unknown or invalid ids |
238 | // will be reported by the `events_are_not_enqueable` metric, which |
239 | // fires when an event type does not belong to this EventBus instance allow list. |
240 | // It should not be possible to supply a $type that does not belong |
241 | // to EVENT_TYPE_NAME. Falling back to a numerical id is just a guard |
242 | // to help us spot eventual bugs. |
243 | $eventType = array_search( $type, self::EVENT_TYPE_NAMES ); |
244 | $eventType = ( $eventType === false ) ? $type : $eventType; |
245 | // Label metrics with the EventBus declared default service host. |
246 | // In the future, for streams that declare one, use the one provided by EventStreamConfig instead. |
247 | $baseMetricLabels = [ "function_name" => "send", "event_type" => $eventType, |
248 | "event_service_name" => $this->eventServiceName, "event_service_uri" => $this->url ]; |
249 | |
250 | if ( !$this->shouldSendEvent( $type ) ) { |
251 | // Debug metric. How often is the `$events` param not enqueable? |
252 | $this->incrementMetricByValue( "events_are_not_enqueable", 1, $baseMetricLabels ); |
253 | return "Events of type '$type' are not enqueueable"; |
254 | } |
255 | if ( !$events ) { |
256 | // Logstash doesn't like the args, because they could be of various types |
257 | $context = [ 'exception' => new RuntimeException() ]; |
258 | self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context ); |
259 | return "Provided event list is empty"; |
260 | } |
261 | |
262 | $numEvents = 0; |
263 | // If we already have a JSON string of events, just use it as the body. |
264 | if ( is_string( $events ) ) { |
265 | // TODO: is $events ever passed as a string? We should refactor this block and simplify the union type. |
266 | // Debug metric. How often is the `$events` param a string? |
267 | $this->incrementMetricByValue( "events_is_string", |
268 | 1, |
269 | $baseMetricLabels |
270 | ); |
271 | if ( strlen( $events ) > $this->maxBatchByteSize ) { |
272 | $decodeEvents = FormatJson::decode( $events, true ); |
273 | $numEvents = count( $decodeEvents ); |
274 | $body = $this->partitionEvents( $decodeEvents, strlen( $events ) ); |
275 | // We have no guarantee that all events passed to send() will declare the |
276 | // same meta.stream. Iterate over the array an increment the counter by stream. |
277 | foreach ( $decodeEvents as $event ) { |
278 | // TODO can we assume $decodeEvents will also have meta.stream set? |
279 | // coerce to null for tests compat |
280 | $streamName = $event['meta']['stream'] ?? self::STREAM_UNKNOWN_NAME; |
281 | $this->incrementMetricByValue( "outgoing_events_total", |
282 | 1, |
283 | $baseMetricLabels, |
284 | [ "stream_name" => $streamName ] |
285 | ); |
286 | } |
287 | } else { |
288 | $body = $events; |
289 | } |
290 | } else { |
291 | $numEvents = count( $events ); |
292 | self::validateJSONSerializable( $events ); |
293 | // Else serialize the array of events to a JSON string. |
294 | $serializedEvents = self::serializeEvents( $events ); |
295 | // If not $body, then something when wrong. |
296 | // serializeEvents has already logged, so we can just return. |
297 | if ( !$serializedEvents ) { |
298 | return "Unable to serialize events"; |
299 | } |
300 | foreach ( $events as $event ) { |
301 | $streamName = $event['meta']['stream'] ?? self::STREAM_UNKNOWN_NAME; |
302 | $this->incrementMetricByValue( "outgoing_events_total", |
303 | 1, |
304 | $baseMetricLabels, |
305 | [ "stream_name" => $streamName ] |
306 | ); |
307 | } |
308 | |
309 | if ( strlen( $serializedEvents ) > $this->maxBatchByteSize ) { |
310 | $body = $this->partitionEvents( $events, strlen( $serializedEvents ) ); |
311 | } else { |
312 | $body = $serializedEvents; |
313 | } |
314 | } |
315 | |
316 | $originalRequest = RequestContext::getMain()->getRequest(); |
317 | |
318 | $reqs = array_map( function ( $body ) use ( $originalRequest ) { |
319 | $req = [ |
320 | 'url' => $this->url, |
321 | 'method' => 'POST', |
322 | 'body' => $body, |
323 | 'headers' => [ 'content-type' => 'application/json' ] |
324 | ]; |
325 | if ( $this->forwardXClientIP ) { |
326 | $req['headers']['x-client-ip'] = $originalRequest->getIP(); |
327 | } |
328 | return $req; |
329 | }, is_array( $body ) ? $body : [ $body ] ); |
330 | |
331 | $responses = $this->http->runMulti( |
332 | $reqs, |
333 | [ |
334 | 'reqTimeout' => $this->timeout |
335 | ] |
336 | ); |
337 | |
338 | // 201: all events accepted. |
339 | // 202: all events accepted but not necessarily persisted. HTTP response is returned 'hastily'. |
340 | // 207: some but not all events accepted: either due to validation failure or error. |
341 | // 400: no events accepted: all failed schema validation. |
342 | // 500: no events accepted: at least one caused an error, but some might have been invalid. |
343 | $results = []; |
344 | foreach ( $responses as $response ) { |
345 | $res = $response['response']; |
346 | $code = $res['code']; |
347 | |
348 | $this->incrementMetricByValue( "event_service_response_total", |
349 | 1, |
350 | $baseMetricLabels, |
351 | [ "status_code" => $code ] |
352 | ); |
353 | |
354 | if ( $code == 207 || $code >= 300 ) { |
355 | $message = empty( $res['error'] ) ? |
356 | (string)$code . ': ' . (string)$res['reason'] : $res['error']; |
357 | // Limit the maximum size of the logged context to 8 kilobytes as that's where logstash |
358 | // truncates the JSON anyway |
359 | $context = [ |
360 | 'raw_events' => self::prepareEventsForLogging( $body ), |
361 | 'service_response' => $res, |
362 | 'exception' => new RuntimeException(), |
363 | ]; |
364 | self::logger()->error( "Unable to deliver all events: {$message}", $context ); |
365 | |
366 | if ( isset( $res['body'] ) ) { |
367 | // We expect the event service to return an array of objects |
368 | // in the response body. |
369 | // FormatJson::decode will return `null` if the message failed to parse. |
370 | // If anything other than an array is parsed we treat it as unexpected |
371 | // behaviour, and log the response at error severity. |
372 | // See https://phabricator.wikimedia.org/T370428 |
373 | $failedEvents = FormatJson::decode( $res['body'], true ); |
374 | if ( is_array( $failedEvents ) ) { |
375 | foreach ( $failedEvents as $failureKind => $failureList ) { |
376 | // $failureList should not be null. This is just a guard against what the intake |
377 | // service returns (or the behavior of different json parsing methods - possibly). |
378 | $numFailedEvents = count( $failureList ?? [] ); |
379 | $this->incrementMetricByValue( "outgoing_events_failed_total", |
380 | $numFailedEvents, |
381 | $baseMetricLabels, |
382 | [ "failure_kind" => $failureKind ] |
383 | ); |
384 | |
385 | foreach ( $failureList as $failurePayload ) { |
386 | // TODO: can we assume that `error` messages can always be parsed? |
387 | // Exception handling is expensive. This will need profiling. |
388 | // At this point of execution, events have already been submitted to |
389 | // to the event service, and the client should not experience latency. |
390 | $event = FormatJson::decode( $failurePayload['event'], true ); |
391 | if ( $event === null ) { |
392 | self::logger()->error( "Unable to parse error messages from |
393 | the event service response body.", $context ); |
394 | } |
395 | $streamName = $event['meta']['stream'] ?? self::STREAM_UNKNOWN_NAME; |
396 | $this->incrementMetricByValue( "outgoing_events_failed_by_stream_total", |
397 | 1, |
398 | $baseMetricLabels, |
399 | [ "stream_name" => $streamName, "failure_kind" => $failureKind ] |
400 | ); |
401 | } |
402 | } |
403 | } else { |
404 | self::logger()->error( "Invalid event service response body", $context ); |
405 | } |
406 | } |
407 | $results[] = "Unable to deliver all events: $message"; |
408 | } else { |
409 | // 201, 202 all events have been accepted (but not necessarily persisted). |
410 | $this->incrementMetricByValue( "outgoing_events_accepted_total", |
411 | $numEvents, |
412 | $baseMetricLabels, |
413 | [ "status_code" => $code ] |
414 | ); |
415 | } |
416 | } |
417 | |
418 | if ( $results !== [] ) { |
419 | return $results; |
420 | } |
421 | |
422 | return true; |
423 | } |
424 | |
425 | // == static helper functions below == |
426 | |
427 | /** |
428 | * Serializes $events array to a JSON string. If FormatJson::encode() |
429 | * returns false, this will log a detailed error message and return null. |
430 | * |
431 | * @param array $events |
432 | * @return string|null JSON or null on failure |
433 | */ |
434 | public static function serializeEvents( $events ) { |
435 | try { |
436 | $serializedEvents = FormatJson::encode( $events, false, FormatJson::ALL_OK ); |
437 | if ( !$serializedEvents ) { |
438 | // Something failed. Let's figure out exactly which one. |
439 | $bad = []; |
440 | foreach ( $events as $event ) { |
441 | $result = FormatJson::encode( $event, false, FormatJson::ALL_OK ); |
442 | if ( !$result ) { |
443 | $bad[] = $event; |
444 | } |
445 | } |
446 | $context = [ |
447 | 'exception' => new RuntimeException(), |
448 | 'json_last_error' => json_last_error_msg(), |
449 | // Use PHP serialization since that will *always* work. |
450 | 'events' => serialize( $bad ), |
451 | ]; |
452 | self::logger()->error( |
453 | 'FormatJson::encode($events) failed: ' . $context['json_last_error'] . |
454 | '. Aborting send.', $context |
455 | ); |
456 | return null; |
457 | } |
458 | return $serializedEvents; |
459 | } catch ( Exception $exception ) { |
460 | $context = [ |
461 | 'exception' => $exception, |
462 | 'json_last_error' => json_last_error_msg() |
463 | ]; |
464 | self::logger()->error( |
465 | 'FormatJson::encode($events) thrown exception: ' . $context['json_last_error'] . |
466 | '. Aborting send.', $context |
467 | ); |
468 | return null; |
469 | } |
470 | } |
471 | |
472 | /** |
473 | * Prepares events for logging - serializes if needed, limits the size |
474 | * of the serialized event to 8kb. |
475 | * |
476 | * @param string|array $events |
477 | * @return string|null |
478 | */ |
479 | private static function prepareEventsForLogging( $events ) { |
480 | if ( is_array( $events ) ) { |
481 | $events = self::serializeEvents( $events ); |
482 | } |
483 | |
484 | if ( $events === null ) { |
485 | return null; |
486 | } |
487 | |
488 | return strlen( $events ) > 8192 ? substr( $events, 0, 8192 ) : $events; |
489 | } |
490 | |
491 | /** |
492 | * If $value is a string, but not UTF-8 encoded, then assume it is binary |
493 | * and base64 encode it and prefix it with a content type. |
494 | * @param mixed $value |
495 | * @return mixed |
496 | */ |
497 | public static function replaceBinaryValues( $value ) { |
498 | if ( is_string( $value ) && !mb_check_encoding( $value, 'UTF-8' ) ) { |
499 | return 'data:application/octet-stream;base64,' . base64_encode( $value ); |
500 | } |
501 | return $value; |
502 | } |
503 | |
504 | /** |
505 | * Recursively calls replaceBinaryValues on an array and transforms |
506 | * any binary values. $array is passed by reference and will be modified. |
507 | * @param array &$array |
508 | * @return bool return value of array_walk_recursive |
509 | */ |
510 | public static function replaceBinaryValuesRecursive( &$array ) { |
511 | return array_walk_recursive( $array, function ( &$item, $key ) { |
512 | $item = self::replaceBinaryValues( $item ); |
513 | } ); |
514 | } |
515 | |
516 | /** |
517 | * Checks a part of the event for JSON-serializability |
518 | * |
519 | * @param array $originalEvent an original event that is being checked. |
520 | * @param array $eventPart the sub-object nested in the original event to be checked. |
521 | */ |
522 | private static function checkEventPart( $originalEvent, $eventPart ) { |
523 | foreach ( $eventPart as $p => $v ) { |
524 | if ( is_array( $v ) ) { |
525 | self::checkEventPart( $originalEvent, $v ); |
526 | } elseif ( !is_scalar( $v ) && $v !== null ) { |
527 | // Only log the first appearance of non-scalar property per event as jobs |
528 | // might contain hundreds of properties and we do not want to log everything. |
529 | self::logger()->error( 'Non-scalar value found in the event', [ |
530 | 'raw_events' => self::prepareEventsForLogging( [ $originalEvent ] ), |
531 | 'prop_name' => $p, |
532 | 'prop_val_type' => get_class( $v ) |
533 | ] ); |
534 | // Follow to the next event in the array |
535 | return; |
536 | } |
537 | } |
538 | } |
539 | |
540 | /** |
541 | * Checks if the event is JSON-serializable (contains only scalar values) |
542 | * and logs the event if non-scalar found. |
543 | * |
544 | * @param array $events |
545 | */ |
546 | private static function validateJSONSerializable( $events ) { |
547 | foreach ( $events as $event ) { |
548 | self::checkEventPart( $event, $event ); |
549 | } |
550 | } |
551 | |
552 | private function shouldSendEvent( $eventType ) { |
553 | return $this->allowedEventTypes & $eventType; |
554 | } |
555 | |
556 | /** |
557 | * Returns a singleton logger instance for all EventBus instances. |
558 | * Use like: self::logger()->info( $mesage ) |
559 | * We use this so we don't have to check if the logger has been created |
560 | * before attempting to log a message. |
561 | * @return LoggerInterface |
562 | */ |
563 | public static function logger() { |
564 | if ( !self::$logger ) { |
565 | self::$logger = LoggerFactory::getInstance( 'EventBus' ); |
566 | } |
567 | return self::$logger; |
568 | } |
569 | |
570 | /** |
571 | * Returns the EventFactory associated with this instance of EventBus |
572 | * @return EventFactory|null |
573 | */ |
574 | public function getFactory() { |
575 | return $this->eventFactory; |
576 | } |
577 | |
578 | /** |
579 | * @param string|null $eventServiceName |
580 | * The name of a key in the EventServices config looked up via |
581 | * MediaWikiServices::getInstance()->getMainConfig()->get('EventServices'). |
582 | * The EventService config is keyed by service name, and should at least contain |
583 | * a 'url' entry pointing at the event service endpoint events should be |
584 | * POSTed to. They can also optionally contain a 'timeout' entry specifying |
585 | * the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry that can be |
586 | * set to true if the X-Client-IP header from the originating request should be |
587 | * forwarded to the event service. Instances are singletons identified by |
588 | * $eventServiceName. |
589 | * |
590 | * NOTE: Previously, this function took a $config object instead of an |
591 | * event service name. This is a backwards compatible change, but because |
592 | * there are no other users of this extension, we can do this safely. |
593 | * |
594 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
595 | * @return EventBus |
596 | */ |
597 | public static function getInstance( $eventServiceName ) { |
598 | return MediaWikiServices::getInstance() |
599 | ->get( 'EventBus.EventBusFactory' ) |
600 | ->getInstance( $eventServiceName ); |
601 | } |
602 | |
603 | /** |
604 | * Uses EventStreamConfig.StreamConfigs to look up the |
605 | * EventBus instance to use for a $stream. |
606 | * If the stream is disabled, a non-producing EventBus instance will be used. |
607 | * If none is found, falls back to using wgEventServiceDefault. |
608 | * |
609 | * @param string $stream the stream to send an event to |
610 | * @return EventBus |
611 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
612 | */ |
613 | public static function getInstanceForStream( $stream ) { |
614 | return MediaWikiServices::getInstance() |
615 | ->get( 'EventBus.EventBusFactory' ) |
616 | ->getInstanceForStream( $stream ); |
617 | } |
618 | } |