Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
89.36% covered (warning)
89.36%
42 / 47
75.00% covered (warning)
75.00%
3 / 4
CRAP
0.00% covered (danger)
0.00%
0 / 1
StreamConfigs
89.36% covered (warning)
89.36%
42 / 47
75.00% covered (warning)
75.00%
3 / 4
21.53
0.00% covered (danger)
0.00%
0 / 1
 __construct
64.29% covered (warning)
64.29%
9 / 14
0.00% covered (danger)
0.00%
0 / 1
6.14
 get
100.00% covered (success)
100.00%
11 / 11
100.00% covered (success)
100.00%
1 / 1
8
 selectByStreams
100.00% covered (success)
100.00%
16 / 16
100.00% covered (success)
100.00%
1 / 1
4
 findByStream
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
4
1<?php
2
3namespace MediaWiki\Extension\EventStreamConfig;
4
5use MediaWiki\Config\ServiceOptions;
6use Psr\Log\LoggerInterface;
7use Psr\Log\NullLogger;
8use Wikimedia\Assert\Assert;
9
10/**
11 * Functions to aid in exporting event stream configs.  These configs should be set
12 * in Global MW config to allow for more dynamic configuration of event stream settings
13 * e.g. sample rates or destination_event_service.
14 *
15 * Some terms:
16 * - StreamConfigs    - List of individual Stream Configs
17 * - A StreamConfig   - an object of event stream settings
18 * - A stream setting - an individual setting for an stream, e.g. 'schema_title'.
19 *
20 * See also:
21 * - https://phabricator.wikimedia.org/T205319
22 * - https://phabricator.wikimedia.org/T233634
23 *
24 * This expects that 'EventStreams' is set in MW Config to an associative array of stream
25 * configs keyed by stream name or regex pattern. Each stream config entry should look something
26 * like:
27 *
28 * "my.event.stream-name" => [
29 *      "schema_title" => "my/event/schema",
30 *      "sample" => [
31 *          "rate" => 0.8,
32 *          "unit" => "session",
33 *      ],
34 *      "destination_event_service" => "eventgate-analytics-external",
35 *      ...
36 *     ],
37 *
38 * If the stream is associated with a regex pattern, the functions here will match requested
39 * target streams against that pattern.
40 */
41class StreamConfigs {
42    /**
43     * Name of the main config key(s) for stream configuration.
44     * @var array
45     *
46     * @deprecated since 1.42
47     */
48    public const CONSTRUCTOR_OPTIONS = [
49        'EventStreams',
50        'EventStreamsDefaultSettings'
51    ];
52
53    /**
54     * Associative array of StreamConfigs keyed by stream name/pattern
55     * @var array
56     */
57    private $streamConfigs = [];
58
59    /**
60     * @var \Psr\Log\LoggerInterface
61     */
62    private $logger;
63
64    /**
65     * Constructs a new StreamConfigs instance initialized
66     * from wgEventStreams and wgEventStreamsDefaultSettings
67     *
68     * @param array<string,array>|ServiceOptions $streamConfigs
69     * @param array|LoggerInterface $defaultSettings
70     * @param ?LoggerInterface $logger
71     */
72    public function __construct( $streamConfigs, $defaultSettings, ?LoggerInterface $logger = null ) {
73        if ( $streamConfigs instanceof ServiceOptions ) {
74            wfDeprecatedMsg( __METHOD__ . ': calling with ServiceOptions is deprecated', '1.42' );
75
76            $streamConfigs->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
77
78            '@phan-var LoggerInterface $defaultSettings';
79            $logger = $defaultSettings;
80
81            $defaultSettings = $streamConfigs->get( 'EventStreamsDefaultSettings' );
82            $streamConfigs = $streamConfigs->get( 'EventStreams' );
83        }
84
85        Assert::parameterType( 'array', $streamConfigs, 'EventStreams' );
86        Assert::parameterType( 'array', $defaultSettings, 'EventStreamsDefaultSettings' );
87
88        foreach ( $streamConfigs as $key => $config ) {
89            $stream = is_int( $key ) && isset( $config[ StreamConfig::STREAM_SETTING ] ) ?
90                $config[ StreamConfig::STREAM_SETTING ] :
91                $key;
92
93            $this->streamConfigs[ $stream ] = new StreamConfig( $stream, $config, $defaultSettings );
94        }
95
96        $this->logger = $logger ?? new NullLogger();
97    }
98
99    /**
100     * Looks for target stream names and returns matched stream configs keyed by stream name.
101     *
102     * @param array|null $targetStreams
103     *     List of stream names. If not provided, all stream configs will be returned.
104     * @param array|null $settingsConstraints
105     *     If given, returned stream config entries will be filtered for those that
106     *     have these settings.
107     *
108     * @return array
109     */
110    public function get(
111        array $targetStreams = null,
112        $settingsConstraints = null
113    ): array {
114        $args = func_get_args();
115
116        if (
117            ( count( $args ) === 2 && is_bool( $settingsConstraints ) ) ||
118            count( $args ) === 3
119        ) {
120            wfDeprecatedMsg( __METHOD__ . ': $includeAllSettings parameter is deprecated', '1.41' );
121
122            $settingsConstraints = count( $args ) === 3 ? (array)$args[2] : null;
123        }
124
125        $result = [];
126        foreach ( $this->selectByStreams( $targetStreams ) as $stream => $streamConfigEntry ) {
127            if (
128                !$settingsConstraints ||
129                $streamConfigEntry->matchesSettings( $settingsConstraints )
130            ) {
131                $result[$stream] = $streamConfigEntry->toArray( $stream );
132            }
133        }
134        return $result;
135    }
136
137    /**
138     * Filter for stream names that match streams in $targetStreamNames.
139     *
140     * @param array|null $targetStreams
141     *     If not provided, all $streamConfigs will be returned, keyed by 'stream'.
142     *
143     * @return StreamConfig[]
144     */
145    private function selectByStreams( array $targetStreams = null ): array {
146        // If no $targetStreams were specified, then assume all are desired.
147        if ( $targetStreams === null ) {
148            $this->logger->debug( 'Selecting all stream configs.' );
149            return $this->streamConfigs;
150        }
151        $groupedStreamConfigs = [];
152        $this->logger->debug(
153            'Selecting stream configs for target streams: {streams}',
154            [ 'streams' => implode( " ", $targetStreams ) ]
155        );
156        foreach ( $targetStreams as $stream ) {
157            // Find the config for this $stream.
158            // configured stream names can be exact streams or regexes.
159            // $stream will be matched against either.
160            $streamConfig = $this->findByStream( $stream );
161
162            if ( $streamConfig === null ) {
163                $this->logger->warning(
164                    "Stream '$stream' does not match any `stream` in stream config"
165                );
166            } else {
167                // Else include the settings in the stream config result..
168                $groupedStreamConfigs[$stream] = $streamConfig;
169            }
170        }
171        return $groupedStreamConfigs;
172    }
173
174    /**
175     * Given a $stream name to get, this matches $stream against
176     * `stream` in $streamConfigs and returns the first found StreamConfig object.
177     * If no match is found, returns null.
178     *
179     * @param string $stream
180     * @return StreamConfig|null
181     */
182    private function findByStream( $stream ) {
183        // If a stream config is defined for the exact stream name provided, return it.
184        if ( isset( $this->streamConfigs[$stream] ) ) {
185            return $this->streamConfigs[$stream];
186        }
187
188        // If no exact match is found, iterate over $streamConfigs and return the config
189        // for the first pattern found that matches that matches the provided stream name.
190        foreach ( $this->streamConfigs as $config ) {
191            if ( $config->matches( $stream ) ) {
192                return $config;
193            }
194        }
195
196        return null;
197    }
198
199}