Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
100.00% |
19 / 19 |
|
100.00% |
4 / 4 |
CRAP | |
100.00% |
1 / 1 |
| EventBusSendUpdate | |
100.00% |
19 / 19 |
|
100.00% |
4 / 4 |
7 | |
100.00% |
1 / 1 |
| __construct | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 | |||
| newForStream | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
| doUpdate | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
3 | |||
| merge | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
2 | |||
| 1 | <?php |
| 2 | namespace MediaWiki\Extension\EventBus; |
| 3 | |
| 4 | use MediaWiki\Deferred\DeferrableUpdate; |
| 5 | use MediaWiki\Deferred\MergeableUpdate; |
| 6 | use Wikimedia\Assert\Assert; |
| 7 | |
| 8 | /** |
| 9 | * A deferred update that provides automatic batching for EventBus events. |
| 10 | * |
| 11 | * During a single request, events may be sent at different points in the |
| 12 | * request lifecycle to various event streams that may however be backed by the same |
| 13 | * underlying event service, resulting in unnecessary HTTP requests. |
| 14 | * This deferred update provides automatic batching for these events |
| 15 | * so that at most a single HTTP request is made for each underlying event service. |
| 16 | */ |
| 17 | class EventBusSendUpdate implements DeferrableUpdate, MergeableUpdate { |
| 18 | |
| 19 | private EventBusFactory $eventBusFactory; |
| 20 | |
| 21 | /** |
| 22 | * Associative array of event lists keyed by event service name. |
| 23 | * @var array[] |
| 24 | */ |
| 25 | private array $eventsByService = []; |
| 26 | |
| 27 | /** |
| 28 | * Create a new EventBusSendUpdate instance for a specific event service. |
| 29 | * |
| 30 | * @param EventBusFactory $eventBusFactory |
| 31 | * @param string $eventServiceName The event service to send the events to |
| 32 | * @param array $events List of events to send |
| 33 | */ |
| 34 | public function __construct( |
| 35 | EventBusFactory $eventBusFactory, |
| 36 | string $eventServiceName, |
| 37 | array $events |
| 38 | ) { |
| 39 | Assert::parameter( |
| 40 | array_is_list( $events ), |
| 41 | '$events', |
| 42 | 'must be a flat list of events' |
| 43 | ); |
| 44 | |
| 45 | $this->eventsByService[$eventServiceName] = $events; |
| 46 | $this->eventBusFactory = $eventBusFactory; |
| 47 | } |
| 48 | |
| 49 | /** |
| 50 | * Create a new EventBusSendUpdate instance for a specific stream. |
| 51 | * The stream name will be used to determine the event service to send the events to. |
| 52 | * |
| 53 | * @param EventBusFactory $eventBusFactory |
| 54 | * @param string $streamName The event stream to send the events to |
| 55 | * @param array $events List of events to send |
| 56 | * @return self |
| 57 | */ |
| 58 | public static function newForStream( |
| 59 | EventBusFactory $eventBusFactory, |
| 60 | string $streamName, |
| 61 | array $events |
| 62 | ): self { |
| 63 | $eventServiceName = $eventBusFactory->getEventServiceNameForStream( $streamName ); |
| 64 | return new self( $eventBusFactory, $eventServiceName, $events ); |
| 65 | } |
| 66 | |
| 67 | public function doUpdate(): void { |
| 68 | foreach ( $this->eventsByService as $eventServiceName => $events ) { |
| 69 | if ( count( $events ) > 0 ) { |
| 70 | $this->eventBusFactory->getInstance( $eventServiceName ) |
| 71 | ->send( $events ); |
| 72 | } |
| 73 | } |
| 74 | } |
| 75 | |
| 76 | public function merge( MergeableUpdate $update ): void { |
| 77 | /** @var EventBusSendUpdate $update */ |
| 78 | Assert::parameterType( __CLASS__, $update, '$update' ); |
| 79 | '@phan-var EventBusSendUpdate $update'; |
| 80 | |
| 81 | foreach ( $update->eventsByService as $eventServiceName => $events ) { |
| 82 | $this->eventsByService[$eventServiceName] = array_merge( |
| 83 | $this->eventsByService[$eventServiceName] ?? [], |
| 84 | $events |
| 85 | ); |
| 86 | } |
| 87 | } |
| 88 | } |