Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
89.36% |
42 / 47 |
|
75.00% |
3 / 4 |
CRAP | |
0.00% |
0 / 1 |
StreamConfigs | |
89.36% |
42 / 47 |
|
75.00% |
3 / 4 |
21.53 | |
0.00% |
0 / 1 |
__construct | |
64.29% |
9 / 14 |
|
0.00% |
0 / 1 |
6.14 | |||
get | |
100.00% |
11 / 11 |
|
100.00% |
1 / 1 |
8 | |||
selectByStreams | |
100.00% |
16 / 16 |
|
100.00% |
1 / 1 |
4 | |||
findByStream | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
4 |
1 | <?php |
2 | |
3 | namespace MediaWiki\Extension\EventStreamConfig; |
4 | |
5 | use MediaWiki\Config\ServiceOptions; |
6 | use Psr\Log\LoggerInterface; |
7 | use Psr\Log\NullLogger; |
8 | use Wikimedia\Assert\Assert; |
9 | |
10 | /** |
11 | * Functions to aid in exporting event stream configs. These configs should be set |
12 | * in Global MW config to allow for more dynamic configuration of event stream settings |
13 | * e.g. sample rates or destination_event_service. |
14 | * |
15 | * Some terms: |
16 | * - StreamConfigs - List of individual Stream Configs |
17 | * - A StreamConfig - an object of event stream settings |
18 | * - A stream setting - an individual setting for an stream, e.g. 'schema_title'. |
19 | * |
20 | * See also: |
21 | * - https://phabricator.wikimedia.org/T205319 |
22 | * - https://phabricator.wikimedia.org/T233634 |
23 | * |
24 | * This expects that 'EventStreams' is set in MW Config to an associative array of stream |
25 | * configs keyed by stream name or regex pattern. Each stream config entry should look something |
26 | * like: |
27 | * |
28 | * "my.event.stream-name" => [ |
29 | * "schema_title" => "my/event/schema", |
30 | * "sample" => [ |
31 | * "rate" => 0.8, |
32 | * "unit" => "session", |
33 | * ], |
34 | * "destination_event_service" => "eventgate-analytics-external", |
35 | * ... |
36 | * ], |
37 | * |
38 | * If the stream is associated with a regex pattern, the functions here will match requested |
39 | * target streams against that pattern. |
40 | */ |
41 | class StreamConfigs { |
42 | /** |
43 | * Name of the main config key(s) for stream configuration. |
44 | * @var array |
45 | * |
46 | * @deprecated since 1.42 |
47 | */ |
48 | public const CONSTRUCTOR_OPTIONS = [ |
49 | 'EventStreams', |
50 | 'EventStreamsDefaultSettings' |
51 | ]; |
52 | |
53 | /** |
54 | * Associative array of StreamConfigs keyed by stream name/pattern |
55 | * @var array |
56 | */ |
57 | private $streamConfigs = []; |
58 | |
59 | /** |
60 | * @var \Psr\Log\LoggerInterface |
61 | */ |
62 | private $logger; |
63 | |
64 | /** |
65 | * Constructs a new StreamConfigs instance initialized |
66 | * from wgEventStreams and wgEventStreamsDefaultSettings |
67 | * |
68 | * @param array<string,array>|ServiceOptions $streamConfigs |
69 | * @param array|LoggerInterface $defaultSettings |
70 | * @param ?LoggerInterface $logger |
71 | */ |
72 | public function __construct( $streamConfigs, $defaultSettings, ?LoggerInterface $logger = null ) { |
73 | if ( $streamConfigs instanceof ServiceOptions ) { |
74 | wfDeprecatedMsg( __METHOD__ . ': calling with ServiceOptions is deprecated', '1.42' ); |
75 | |
76 | $streamConfigs->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS ); |
77 | |
78 | '@phan-var LoggerInterface $defaultSettings'; |
79 | $logger = $defaultSettings; |
80 | |
81 | $defaultSettings = $streamConfigs->get( 'EventStreamsDefaultSettings' ); |
82 | $streamConfigs = $streamConfigs->get( 'EventStreams' ); |
83 | } |
84 | |
85 | Assert::parameterType( 'array', $streamConfigs, 'EventStreams' ); |
86 | Assert::parameterType( 'array', $defaultSettings, 'EventStreamsDefaultSettings' ); |
87 | |
88 | foreach ( $streamConfigs as $key => $config ) { |
89 | $stream = is_int( $key ) && isset( $config[ StreamConfig::STREAM_SETTING ] ) ? |
90 | $config[ StreamConfig::STREAM_SETTING ] : |
91 | $key; |
92 | |
93 | $this->streamConfigs[ $stream ] = new StreamConfig( $stream, $config, $defaultSettings ); |
94 | } |
95 | |
96 | $this->logger = $logger ?? new NullLogger(); |
97 | } |
98 | |
99 | /** |
100 | * Looks for target stream names and returns matched stream configs keyed by stream name. |
101 | * |
102 | * @param array|null $targetStreams |
103 | * List of stream names. If not provided, all stream configs will be returned. |
104 | * @param array|null $settingsConstraints |
105 | * If given, returned stream config entries will be filtered for those that |
106 | * have these settings. |
107 | * |
108 | * @return array |
109 | */ |
110 | public function get( |
111 | ?array $targetStreams = null, |
112 | $settingsConstraints = null |
113 | ): array { |
114 | $args = func_get_args(); |
115 | |
116 | if ( |
117 | ( count( $args ) === 2 && is_bool( $settingsConstraints ) ) || |
118 | count( $args ) === 3 |
119 | ) { |
120 | wfDeprecatedMsg( __METHOD__ . ': $includeAllSettings parameter is deprecated', '1.41' ); |
121 | |
122 | $settingsConstraints = count( $args ) === 3 ? (array)$args[2] : null; |
123 | } |
124 | |
125 | $result = []; |
126 | foreach ( $this->selectByStreams( $targetStreams ) as $stream => $streamConfigEntry ) { |
127 | if ( |
128 | !$settingsConstraints || |
129 | $streamConfigEntry->matchesSettings( $settingsConstraints ) |
130 | ) { |
131 | $result[$stream] = $streamConfigEntry->toArray( $stream ); |
132 | } |
133 | } |
134 | return $result; |
135 | } |
136 | |
137 | /** |
138 | * Filter for stream names that match streams in $targetStreamNames. |
139 | * |
140 | * @param array|null $targetStreams |
141 | * If not provided, all $streamConfigs will be returned, keyed by 'stream'. |
142 | * |
143 | * @return StreamConfig[] |
144 | */ |
145 | private function selectByStreams( ?array $targetStreams = null ): array { |
146 | // If no $targetStreams were specified, then assume all are desired. |
147 | if ( $targetStreams === null ) { |
148 | $this->logger->debug( 'Selecting all stream configs.' ); |
149 | return $this->streamConfigs; |
150 | } |
151 | $groupedStreamConfigs = []; |
152 | $this->logger->debug( |
153 | 'Selecting stream configs for target streams: {streams}', |
154 | [ 'streams' => implode( " ", $targetStreams ) ] |
155 | ); |
156 | foreach ( $targetStreams as $stream ) { |
157 | // Find the config for this $stream. |
158 | // configured stream names can be exact streams or regexes. |
159 | // $stream will be matched against either. |
160 | $streamConfig = $this->findByStream( $stream ); |
161 | |
162 | if ( $streamConfig === null ) { |
163 | $this->logger->warning( |
164 | "Stream '$stream' does not match any `stream` in stream config" |
165 | ); |
166 | } else { |
167 | // Else include the settings in the stream config result.. |
168 | $groupedStreamConfigs[$stream] = $streamConfig; |
169 | } |
170 | } |
171 | return $groupedStreamConfigs; |
172 | } |
173 | |
174 | /** |
175 | * Given a $stream name to get, this matches $stream against |
176 | * `stream` in $streamConfigs and returns the first found StreamConfig object. |
177 | * If no match is found, returns null. |
178 | * |
179 | * @param string $stream |
180 | * @return StreamConfig|null |
181 | */ |
182 | private function findByStream( $stream ) { |
183 | // If a stream config is defined for the exact stream name provided, return it. |
184 | if ( isset( $this->streamConfigs[$stream] ) ) { |
185 | return $this->streamConfigs[$stream]; |
186 | } |
187 | |
188 | // If no exact match is found, iterate over $streamConfigs and return the config |
189 | // for the first pattern found that matches that matches the provided stream name. |
190 | foreach ( $this->streamConfigs as $config ) { |
191 | if ( $config->matches( $stream ) ) { |
192 | return $config; |
193 | } |
194 | } |
195 | |
196 | return null; |
197 | } |
198 | |
199 | } |