Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
95.95% |
71 / 74 |
|
60.00% |
3 / 5 |
CRAP | |
0.00% |
0 / 1 |
| EventBusFactory | |
95.95% |
71 / 74 |
|
60.00% |
3 / 5 |
13 | |
0.00% |
0 / 1 |
| __construct | |
100.00% |
21 / 21 |
|
100.00% |
1 / 1 |
1 | |||
| getInstance | |
95.83% |
23 / 24 |
|
0.00% |
0 / 1 |
4 | |||
| getInstanceForStream | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
| isStreamEnabled | |
77.78% |
7 / 9 |
|
0.00% |
0 / 1 |
3.10 | |||
| getEventServiceNameForStream | |
100.00% |
19 / 19 |
|
100.00% |
1 / 1 |
4 | |||
| 1 | <?php |
| 2 | |
| 3 | /** |
| 4 | * This program is free software; you can redistribute it and/or modify |
| 5 | * it under the terms of the GNU General Public License as published by |
| 6 | * the Free Software Foundation; either version 2 of the License, or |
| 7 | * (at your option) any later version. |
| 8 | * |
| 9 | * This program is distributed in the hope that it will be useful, |
| 10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | * GNU General Public License for more details. |
| 13 | * |
| 14 | * You should have received a copy of the GNU General Public License along |
| 15 | * with this program; if not, write to the Free Software Foundation, Inc., |
| 16 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
| 17 | * http://www.gnu.org/copyleft/gpl.html |
| 18 | * |
| 19 | * @file |
| 20 | * @author Petr Pchelko |
| 21 | */ |
| 22 | |
| 23 | namespace MediaWiki\Extension\EventBus; |
| 24 | |
| 25 | use InvalidArgumentException; |
| 26 | use MediaWiki\Config\ServiceOptions; |
| 27 | use MediaWiki\Extension\EventStreamConfig\StreamConfigs; |
| 28 | use Psr\Log\LoggerInterface; |
| 29 | use Wikimedia\Http\MultiHttpClient; |
| 30 | use Wikimedia\Stats\StatsFactory; |
| 31 | |
| 32 | /** |
| 33 | * Creates appropriate EventBus instance based on stream config. |
| 34 | * |
| 35 | * @since 1.35 |
| 36 | */ |
| 37 | class EventBusFactory { |
| 38 | |
| 39 | public const CONSTRUCTOR_OPTIONS = [ |
| 40 | 'EventServices', |
| 41 | 'EventServiceDefault', |
| 42 | 'EnableEventBus', |
| 43 | 'EventBusMaxBatchByteSize' |
| 44 | ]; |
| 45 | |
| 46 | /** |
| 47 | * Key in wgEventStreams['stream_name']['producers'] that contains settings |
| 48 | * for this MediaWiki EventBus producer. |
| 49 | */ |
| 50 | public const EVENT_STREAM_CONFIG_PRODUCER_NAME = 'mediawiki_eventbus'; |
| 51 | |
| 52 | /** |
| 53 | * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] |
| 54 | * that specifies if the stream is enabled. A stream is 'disabled' only if |
| 55 | * this setting is explicitly false, or if the stream name |
| 56 | * does not have an entry in wgEventStreams |
| 57 | * (and wgEventStreams is an array with other streams configured). |
| 58 | */ |
| 59 | public const EVENT_STREAM_CONFIG_ENABLED_SETTING = 'enabled'; |
| 60 | |
| 61 | /** |
| 62 | * Key in wgEventStreams['stream_name']['producers'][EVENT_STREAM_CONFIG_PRODUCER_NAME] that specifies |
| 63 | * the event service name that should be used for a specific stream. |
| 64 | * This should be a key into $eventServiceConfig, which usually is configured |
| 65 | * using the EventBus MW config wgEventServices. |
| 66 | * If not found via StreamConfigs, EventServiceDefault will be used. |
| 67 | */ |
| 68 | public const EVENT_STREAM_CONFIG_SERVICE_SETTING = 'event_service_name'; |
| 69 | |
| 70 | /** |
| 71 | * Internal name of an EventBus instance that never sends events. |
| 72 | * This is used for streams that are disabled or undeclared. |
| 73 | * This will also be used as the dummy 'url' of that instance. |
| 74 | * (Public only for testing purposes.) |
| 75 | */ |
| 76 | public const EVENT_SERVICE_DISABLED_NAME = '_disabled_eventbus_'; |
| 77 | |
| 78 | /** |
| 79 | * @var array|mixed |
| 80 | */ |
| 81 | private array $eventServiceConfig; |
| 82 | |
| 83 | /** |
| 84 | * @var string|mixed |
| 85 | */ |
| 86 | private string $eventServiceDefault; |
| 87 | |
| 88 | /** |
| 89 | * @var StreamConfigs|null |
| 90 | */ |
| 91 | private ?StreamConfigs $streamConfigs; |
| 92 | |
| 93 | /** |
| 94 | * @var string|mixed |
| 95 | */ |
| 96 | private string $enableEventBus; |
| 97 | |
| 98 | /** |
| 99 | * @var int|mixed |
| 100 | */ |
| 101 | private int $maxBatchByteSize; |
| 102 | |
| 103 | /** |
| 104 | * @var EventFactory |
| 105 | */ |
| 106 | private EventFactory $eventFactory; |
| 107 | |
| 108 | /** |
| 109 | * @var MultiHttpClient |
| 110 | */ |
| 111 | private MultiHttpClient $http; |
| 112 | |
| 113 | /** |
| 114 | * @var LoggerInterface |
| 115 | */ |
| 116 | private LoggerInterface $logger; |
| 117 | |
| 118 | /** @var ?StatsFactory wf:Stats factory instance */ |
| 119 | private ?StatsFactory $statsFactory; |
| 120 | |
| 121 | /** |
| 122 | * @var array |
| 123 | */ |
| 124 | private array $eventBusInstances = []; |
| 125 | |
| 126 | /** |
| 127 | * @param ServiceOptions $options |
| 128 | * @param StreamConfigs|null $streamConfigs |
| 129 | * @param EventFactory $eventFactory |
| 130 | * @param MultiHttpClient $http |
| 131 | * @param LoggerInterface $logger |
| 132 | * @param StatsFactory|null $statsFactory |
| 133 | */ |
| 134 | public function __construct( |
| 135 | ServiceOptions $options, |
| 136 | ?StreamConfigs $streamConfigs, |
| 137 | EventFactory $eventFactory, |
| 138 | MultiHttpClient $http, |
| 139 | LoggerInterface $logger, |
| 140 | ?StatsFactory $statsFactory = null |
| 141 | ) { |
| 142 | $options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS ); |
| 143 | |
| 144 | $this->eventServiceConfig = $options->get( 'EventServices' ); |
| 145 | $this->eventServiceDefault = $options->get( 'EventServiceDefault' ); |
| 146 | $this->enableEventBus = $options->get( 'EnableEventBus' ); |
| 147 | $this->maxBatchByteSize = $options->get( 'EventBusMaxBatchByteSize' ); |
| 148 | |
| 149 | $this->streamConfigs = $streamConfigs; |
| 150 | $this->eventFactory = $eventFactory; |
| 151 | $this->http = $http; |
| 152 | $this->logger = $logger; |
| 153 | $this->statsFactory = $statsFactory; |
| 154 | |
| 155 | // Save a 'disabled' non producing EventBus instance that sets |
| 156 | // the allowed event type to TYPE_NONE. No |
| 157 | // events sent through this instance will actually be sent to an event service. |
| 158 | // This is done to allow us to easily 'disable' streams. |
| 159 | $this->eventBusInstances[self::EVENT_SERVICE_DISABLED_NAME] = new EventBus( |
| 160 | $this->http, |
| 161 | EventBus::TYPE_NONE, |
| 162 | $this->eventFactory, |
| 163 | self::EVENT_SERVICE_DISABLED_NAME, |
| 164 | $this->maxBatchByteSize, |
| 165 | 0, |
| 166 | false, |
| 167 | self::EVENT_SERVICE_DISABLED_NAME, |
| 168 | $this->statsFactory |
| 169 | ); |
| 170 | } |
| 171 | |
| 172 | /** |
| 173 | * @param string $eventServiceName |
| 174 | * The name of a key in the EventServices config looked up via |
| 175 | * MediaWikiServices::getInstance()->getMainConfig()->get('EventServices'). |
| 176 | * The EventService config is keyed by service name, and should at least contain |
| 177 | * a 'url' entry pointing at the event service endpoint events should be |
| 178 | * POSTed to. They can also optionally contain a 'timeout' entry specifying |
| 179 | * the HTTP POST request timeout, and a 'x_client_ip_forwarding_enabled' entry |
| 180 | * that can be set to true if the X-Client-IP header from the originating request |
| 181 | * should be forwarded to the event service. Instances are singletons identified |
| 182 | * by $eventServiceName. |
| 183 | * |
| 184 | * @note Previously, this function took a $config object instead of an |
| 185 | * event service name. This is a backwards compatible change, but because |
| 186 | * there are no other users of this extension, we can do this safely. |
| 187 | * |
| 188 | * @throws InvalidArgumentException if EventServices or $eventServiceName is misconfigured. |
| 189 | * @return EventBus |
| 190 | */ |
| 191 | public function getInstance( string $eventServiceName ): EventBus { |
| 192 | if ( array_key_exists( $eventServiceName, $this->eventBusInstances ) ) { |
| 193 | // If eventServiceName has already been instantiated, return it. |
| 194 | return $this->eventBusInstances[$eventServiceName]; |
| 195 | } elseif ( |
| 196 | array_key_exists( $eventServiceName, $this->eventServiceConfig ) && |
| 197 | array_key_exists( 'url', $this->eventServiceConfig[$eventServiceName] ) |
| 198 | ) { |
| 199 | // else, create eventServiceName instance from config |
| 200 | // and save it in eventBusInstances. |
| 201 | $eventServiceSettings = $this->eventServiceConfig[$eventServiceName]; |
| 202 | $url = $eventServiceSettings['url']; |
| 203 | $timeout = $eventServiceSettings['timeout'] ?? null; |
| 204 | $forwardXClientIP = $eventServiceSettings['x_client_ip_forwarding_enabled'] ?? false; |
| 205 | |
| 206 | $this->eventBusInstances[$eventServiceName] = new EventBus( |
| 207 | $this->http, |
| 208 | $this->enableEventBus, |
| 209 | $this->eventFactory, |
| 210 | $url, |
| 211 | $this->maxBatchByteSize, |
| 212 | $timeout, |
| 213 | $forwardXClientIP, |
| 214 | $eventServiceName, |
| 215 | $this->statsFactory |
| 216 | ); |
| 217 | return $this->eventBusInstances[$eventServiceName]; |
| 218 | } else { |
| 219 | $error = "Could not get EventBus instance for event service '$eventServiceName'. " . |
| 220 | 'This event service name must exist in EventServices config with a url setting.'; |
| 221 | $this->logger->error( $error ); |
| 222 | throw new InvalidArgumentException( $error ); |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | /** |
| 227 | * Gets an EventBus instance for a $stream. |
| 228 | * |
| 229 | * If EventStreamConfig is not configured, or if the stream is configured but |
| 230 | * does not set ['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING], |
| 231 | * EventServiceDefault will be used. |
| 232 | * |
| 233 | * If EventStreamConfig is configured, but the stream is not or the stream has |
| 234 | * ['producers']['mediawiki_eventbus']['enabled'] = false, this will return |
| 235 | * a non-producing EventBus instance. |
| 236 | * |
| 237 | * @param string $streamName the stream to send an event to |
| 238 | * @return EventBus |
| 239 | * @throws InvalidArgumentException |
| 240 | */ |
| 241 | public function getInstanceForStream( string $streamName ): EventBus { |
| 242 | return self::getInstance( $this->getEventServiceNameForStream( $streamName ) ); |
| 243 | } |
| 244 | |
| 245 | /** |
| 246 | * Uses StreamConfigs to determine if a stream is enabled. |
| 247 | * By default, a stream is enabled. It is disabled only if: |
| 248 | * |
| 249 | * - wgEventStreams[$streamName]['producers']['mediawiki_eventbus']['enabled'] === false |
| 250 | * OR |
| 251 | * - wgEventStreams != null, but, wgEventStreams[$streamName] is not declared |
| 252 | * |
| 253 | * @param string $streamName |
| 254 | * @return bool |
| 255 | */ |
| 256 | private function isStreamEnabled( string $streamName ): bool { |
| 257 | // No streamConfigs means any stream is enabled |
| 258 | if ( $this->streamConfigs === null ) { |
| 259 | return true; |
| 260 | } |
| 261 | |
| 262 | $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] ); |
| 263 | |
| 264 | // If $streamName is not declared in EventStreamConfig, then it is not enabled. |
| 265 | if ( !array_key_exists( $streamName, $streamConfigEntries ) ) { |
| 266 | return false; |
| 267 | } |
| 268 | |
| 269 | $streamSettings = $streamConfigEntries[$streamName]; |
| 270 | |
| 271 | return $streamSettings['producers'][ |
| 272 | self::EVENT_STREAM_CONFIG_PRODUCER_NAME |
| 273 | ][self::EVENT_STREAM_CONFIG_ENABLED_SETTING] ?? true; |
| 274 | } |
| 275 | |
| 276 | /** |
| 277 | * Looks up the wgEventStreams[$streamName]['producers']['mediawiki_eventbus'][EVENT_STREAM_CONFIG_SERVICE_SETTING] |
| 278 | * setting for this stream. |
| 279 | * If wgEventStreams is not configured, or if the stream is not configured in wgEventStreams, |
| 280 | * or if the stream does not have EVENT_STREAM_CONFIG_SERVICE_SETTING set, |
| 281 | * then this falls back to the default stream. |
| 282 | * If the stream is explicitly marked as disabled, this will return a non-producing stream name. |
| 283 | * |
| 284 | * @param string $streamName |
| 285 | * @return string |
| 286 | */ |
| 287 | public function getEventServiceNameForStream( string $streamName ): string { |
| 288 | // Use eventServiceDefault if no streamConfigs were provided. |
| 289 | if ( $this->streamConfigs === null ) { |
| 290 | return $this->eventServiceDefault; |
| 291 | } |
| 292 | |
| 293 | if ( !$this->isStreamEnabled( $streamName ) ) { |
| 294 | // Don't send event if $streamName is explicitly disabled. |
| 295 | |
| 296 | $this->logger->debug( |
| 297 | "Using non-producing EventBus instance for stream $streamName. " . |
| 298 | 'This stream is either undeclared, or is explicitly disabled.' |
| 299 | ); |
| 300 | |
| 301 | return self::EVENT_SERVICE_DISABLED_NAME; |
| 302 | } |
| 303 | |
| 304 | // Else attempt to lookup EVENT_STREAM_CONFIG_SERVICE_SETTING for this stream. |
| 305 | $streamConfigEntries = $this->streamConfigs->get( [ $streamName ] ); |
| 306 | |
| 307 | $streamSettings = $streamConfigEntries[$streamName] ?? []; |
| 308 | |
| 309 | $eventServiceName = $streamSettings['producers'][ |
| 310 | self::EVENT_STREAM_CONFIG_PRODUCER_NAME |
| 311 | ][self::EVENT_STREAM_CONFIG_SERVICE_SETTING] ?? null; |
| 312 | |
| 313 | // For backwards compatibility, the event service name setting used to be a top level |
| 314 | // stream setting 'destination_event_service'. If EVENT_STREAM_CONFIG_SERVICE_SETTING, use it instead. |
| 315 | // This can be removed once all streams have been migrated to using the |
| 316 | // producers.mediawiki_eventbus specific setting. |
| 317 | // https://phabricator.wikimedia.org/T321557 |
| 318 | $eventServiceName = $eventServiceName ?: $streamSettings['destination_event_service'] ?? |
| 319 | $this->eventServiceDefault; |
| 320 | |
| 321 | $this->logger->debug( |
| 322 | "Using event intake service $eventServiceName for stream $streamName." |
| 323 | ); |
| 324 | |
| 325 | return $eventServiceName; |
| 326 | } |
| 327 | } |