Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
93.94% covered (success)
93.94%
217 / 231
50.00% covered (danger)
50.00%
4 / 8
CRAP
0.00% covered (danger)
0.00%
0 / 1
AggregateParticipantAnswers
94.35% covered (success)
94.35%
217 / 230
50.00% covered (danger)
50.00%
4 / 8
35.22
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 execute
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 aggregateAnswers
91.67% covered (success)
91.67%
33 / 36
0.00% covered (danger)
0.00%
0 / 1
2.00
 processBatch
98.28% covered (success)
98.28%
57 / 58
0.00% covered (danger)
0.00%
0 / 1
10
 loadDataForBatch
87.93% covered (warning)
87.93%
51 / 58
0.00% covered (danger)
0.00%
0 / 1
11.21
 updateAggregationTimestamps
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
2
 purgeAggregates
93.33% covered (success)
93.33%
28 / 30
0.00% covered (danger)
0.00%
0 / 1
3.00
 purgeAggregatesForEvents
100.00% covered (success)
100.00%
32 / 32
100.00% covered (success)
100.00%
1 / 1
5
1<?php
2
3declare( strict_types=1 );
4
5namespace MediaWiki\Extension\CampaignEvents\Maintenance;
6
7use MediaWiki\Extension\CampaignEvents\CampaignEventsServices;
8use MediaWiki\Extension\CampaignEvents\Questions\EventAggregatedAnswersStore;
9use MediaWiki\Maintenance\Maintenance;
10use MediaWiki\Utils\MWTimestamp;
11use RuntimeException;
12use Wikimedia\Rdbms\IDatabase;
13
14/**
15 * Maintenance script that aggregates and deletes participant answers after a predefined amount of time.
16 */
17class AggregateParticipantAnswers extends Maintenance {
18    private ?IDatabase $dbw;
19    private ?IDatabase $dbr;
20
21    private int $curTimeUnix;
22    private int $cutoffTimeUnix;
23
24    /**
25     * @var array<int,int> Map of [ event ID => end UNIX timestamp ]
26     */
27    private array $eventEndTimes = [];
28
29    /**
30     * Map of users to skip because their answers have already been aggregated in the past.
31     *
32     * @var array<int,array<int,true>> Map of [ event ID => [ user ID => true ] ]
33     */
34    private array $usersToSkipPerEvent = [];
35
36    /**
37     * Array of first answer time for each participant. Note that this doesn't include users who should be skipped.
38     *
39     * @var array<int,array<int,int>> Map of [ event ID => [ user ID => first answer UNIX timestamp ] ]
40     */
41    private array $userFirstAnswerPerEventTimes = [];
42
43    /**
44     * Map of cep_id values for participant rows that were scanned
45     *
46     * @var array<int,array<int,int>> Map of [ event ID => [ user ID => row ID ] ]
47     */
48    private array $participantRowIDsMap = [];
49
50    /**
51     * Map of cep_id values for rows where we need to update the aggregation timestamp.
52     *
53     * @var array<int,true> Map of [ row ID => true ]
54     */
55    private array $participantRowsToUpdateMap = [];
56
57    /** @var callable|null */
58    private $rollbackTransactionFn;
59
60    public function __construct() {
61        parent::__construct();
62        $this->addDescription( 'Aggregates and deletes participant answers after a predefined amount of time' );
63        $this->setBatchSize( 500 );
64        $this->requireExtension( 'CampaignEvents' );
65    }
66
67    /**
68     * @inheritDoc
69     */
70    public function execute(): void {
71        $this->aggregateAnswers();
72        $this->purgeAggregates();
73    }
74
75    /**
76     * Main method that handles the aggregation of individual answers (updates the aggregates and the aggregation
77     * timestamps, and deletes the individual answers).
78     */
79    private function aggregateAnswers(): void {
80        $this->output( "Aggregating old participant answers...\n" );
81
82        $this->curTimeUnix = (int)MWTimestamp::now( TS_UNIX );
83        $this->cutoffTimeUnix = $this->curTimeUnix - EventAggregatedAnswersStore::ANSWERS_TTL_SEC;
84        $dbHelper = CampaignEventsServices::getDatabaseHelper();
85        $this->dbr = $dbHelper->getDBConnection( DB_REPLICA );
86        $this->dbw = $dbHelper->getDBConnection( DB_PRIMARY );
87        $batchSize = $this->getBatchSize();
88
89        $maxRowID = (int)$this->dbr->newSelectQueryBuilder()
90            ->select( 'MAX(ceqa_id)' )
91            ->from( 'ce_question_answers' )
92            ->caller( __METHOD__ )
93            ->fetchField();
94        if ( $maxRowID === 0 ) {
95            $this->output( "Table is empty.\n" );
96            return;
97        }
98        $minRowID = (int)$this->dbr->newSelectQueryBuilder()
99            ->select( 'MIN(ceqa_id)' )
100            ->from( 'ce_question_answers' )
101            ->caller( __METHOD__ )
102            ->fetchField();
103
104        // Wrap all the changes into a transaction to make sure we don't leave incomplete updates behind. This is also
105        // needed to avoid edge cases like:
106        // - The script fails after aggregating a user's answers, but before updating their cep_aggregation_timestamp
107        // - An event's end date is changed while we are aggregating answers
108        // Because of the PII nature of participant answers, we want to try and avoid these edge cases as much as
109        // possible, especially those that could inadvertently leak PII.
110        $transactionName = __METHOD__;
111        $this->beginTransaction( $this->dbw, $transactionName );
112        $this->rollbackTransactionFn = function () use ( $transactionName ) {
113            $this->rollbackTransaction( $this->dbw, $transactionName );
114        };
115        $prevID = $minRowID - 1;
116        $curID = $prevID + $batchSize;
117        do {
118            // Avoid locking nonexistent rows (T381622)
119            $batchEnd = min( $curID, $maxRowID );
120            $this->processBatch( $prevID, $batchEnd );
121            $prevID = $curID;
122            $curID += $batchSize;
123            $this->waitForReplication();
124        } while ( $prevID < $maxRowID );
125
126        $this->updateAggregationTimestamps();
127        $this->commitTransaction( $this->dbw, $transactionName );
128
129        $this->output( "Done.\n" );
130    }
131
132    private function processBatch( int $startID, int $endID ): void {
133        // Lock the rows to prevent further changes.
134        $res = $this->dbw->newSelectQueryBuilder()
135            ->select( '*' )
136            ->from( 'ce_question_answers' )
137            ->where( [
138                $this->dbw->expr( 'ceqa_id', '>', $startID ),
139                $this->dbw->expr( 'ceqa_id', '<=', $endID ),
140            ] )
141            ->forUpdate()
142            ->caller( __METHOD__ )
143            ->fetchResultSet();
144
145        $this->loadDataForBatch( $res );
146
147        $deleteRowIDs = [];
148        $newAggregateTuples = [];
149        foreach ( $res as $row ) {
150            $eventID = (int)$row->ceqa_event_id;
151            $userID = (int)$row->ceqa_user_id;
152
153            if ( isset( $this->usersToSkipPerEvent[$eventID][$userID] ) ) {
154                // Note, this check should be redundant as long as we delete all user answers,
155                // including the non-PII ones.
156                continue;
157            }
158
159            if (
160                $this->eventEndTimes[$eventID] < $this->curTimeUnix ||
161                $this->userFirstAnswerPerEventTimes[$eventID][$userID] < $this->cutoffTimeUnix
162            ) {
163                $deleteRowIDs[] = (int)$row->ceqa_id;
164
165                $question = (int)$row->ceqa_question_id;
166                $option = (int)$row->ceqa_answer_option;
167                $newAggregateTuples[$eventID] ??= [];
168                $newAggregateTuples[$eventID][$question] ??= [];
169                $newAggregateTuples[$eventID][$question][$option] ??= 0;
170                $newAggregateTuples[$eventID][$question][$option]++;
171
172                $participantID = $this->participantRowIDsMap[$eventID][$userID];
173                $this->participantRowsToUpdateMap[$participantID] = true;
174            }
175        }
176
177        if ( $newAggregateTuples ) {
178            $newAggregateRows = [];
179            foreach ( $newAggregateTuples as $event => $eventData ) {
180                foreach ( $eventData as $question => $questionData ) {
181                    foreach ( $questionData as $option => $amount ) {
182                        $newAggregateRows[] = [
183                            'ceqag_event_id' => $event,
184                            'ceqag_question_id' => $question,
185                            'ceqag_answer_option' => $option,
186                            'ceqag_answers_amount' => $amount,
187                        ];
188                    }
189                }
190            }
191
192            $this->dbw->newInsertQueryBuilder()
193                ->insertInto( 'ce_question_aggregation' )
194                ->rows( $newAggregateRows )
195                ->onDuplicateKeyUpdate()
196                ->uniqueIndexFields( [ 'ceqag_event_id', 'ceqag_question_id', 'ceqag_answer_option' ] )
197                ->set( [
198                    'ceqag_answers_amount = ceqag_answers_amount + ' .
199                        $this->dbw->buildExcludedValue( 'ceqag_answers_amount' )
200                ] )
201                ->caller( __METHOD__ )
202                ->execute();
203        }
204
205        if ( $deleteRowIDs ) {
206            $this->dbw->newDeleteQueryBuilder()
207                ->deleteFrom( 'ce_question_answers' )
208                ->where( [ 'ceqa_id' => $deleteRowIDs ] )
209                ->caller( __METHOD__ )
210                ->execute();
211        }
212
213        $this->output( "Batch $startID-$endID done.\n" );
214    }
215
216    /**
217     * Given a batch of answers, loads data that will be needed to process that batch:
218     *  - The end time of each event
219     *  - The time of first answer of each participant in the batch
220     *  - Participants whose answers have already been aggregated
221     *
222     * @param iterable $res
223     * @return void
224     */
225    private function loadDataForBatch( iterable $res ): void {
226        // Use maps to discard duplicates.
227        $eventsMap = [];
228        $usersByEventMap = [];
229        foreach ( $res as $row ) {
230            $eventID = (int)$row->ceqa_event_id;
231            $eventsMap[$eventID] = true;
232            $usersByEventMap[$eventID] ??= [];
233            $userID = (int)$row->ceqa_user_id;
234            $usersByEventMap[$eventID][$userID] = true;
235        }
236
237        $eventsWithNoInfo = array_keys( array_diff_key( $eventsMap, $this->eventEndTimes ) );
238        if ( $eventsWithNoInfo ) {
239            // Lock the event rows as well, to prevent changes while we aggregate the answers.
240            $eventRows = $this->dbw->newSelectQueryBuilder()
241                ->select( [ 'event_id', 'event_end_utc' ] )
242                ->from( 'campaign_events' )
243                ->where( [ 'event_id' => $eventsWithNoInfo ] )
244                ->forUpdate()
245                ->caller( __METHOD__ )
246                ->fetchResultSet();
247            foreach ( $eventRows as $row ) {
248                $eventID = (int)$row->event_id;
249                $this->eventEndTimes[$eventID] = (int)wfTimestamp( TS_UNIX, $row->event_end_utc );
250            }
251        }
252
253        $missingUsersByEventMap = [];
254        $allMissingUsersMap = [];
255        foreach ( $usersByEventMap as $eventID => $eventUsersMap ) {
256            $usersToProcess = array_diff_key( $eventUsersMap, $this->usersToSkipPerEvent[$eventID] ?? [] );
257            $usersToProcess = array_diff_key(
258                $usersToProcess,
259                $this->userFirstAnswerPerEventTimes[$eventID] ?? []
260            );
261            if ( $usersToProcess ) {
262                $missingUsersByEventMap[$eventID] = $usersToProcess;
263                $allMissingUsersMap += $usersToProcess;
264            }
265        }
266
267        if ( !$missingUsersByEventMap ) {
268            return;
269        }
270        // Note that this query includes more rows than the ones we're looking for, because we're not filtering
271        // by tuples in the where condition. Results are filtered later.
272        // No need to obtain these from the primary DB, because concurrent changes to a record should not really
273        // affect the script execution.
274        $userRows = $this->dbr->newSelectQueryBuilder()
275            ->select( '*' )
276            ->from( 'ce_participants' )
277            ->where( [
278                'cep_event_id' => array_keys( $missingUsersByEventMap ),
279                'cep_user_id' => array_keys( $allMissingUsersMap ),
280            ] )
281            ->caller( __METHOD__ )
282            ->fetchResultSet();
283
284        foreach ( $userRows as $row ) {
285            $eventID = (int)$row->cep_event_id;
286            $userID = (int)$row->cep_user_id;
287            if ( !isset( $missingUsersByEventMap[$eventID][$userID] ) ) {
288                // Unwanted row, ignore it.
289                continue;
290            }
291            if ( $row->cep_aggregation_timestamp !== null ) {
292                $this->usersToSkipPerEvent[$eventID][$userID] = true;
293                continue;
294            }
295            $firstAnswerTS = wfTimestampOrNull( TS_UNIX, $row->cep_first_answer_timestamp );
296            if ( !is_string( $firstAnswerTS ) ) {
297                ( $this->rollbackTransactionFn )();
298                throw new RuntimeException(
299                    "User with answers but no first answer time (event=$eventID, user=$userID)"
300                );
301            }
302            $this->userFirstAnswerPerEventTimes[$eventID][$userID] = (int)$firstAnswerTS;
303            $this->participantRowIDsMap[$eventID][$userID] = (int)$row->cep_id;
304        }
305    }
306
307    /**
308     * Updates the aggregation timestamp for all users processed thus far.
309     * Note, this needs to be done after all the rows have been processed, to make sure that a non-null
310     * cep_aggregation_timestamp doesn't cause the user to be skipped in subsequent batches.
311     */
312    private function updateAggregationTimestamps(): void {
313        $this->output( "Updating aggregation timestamps...\n" );
314        $participantRowsToUpdate = array_keys( $this->participantRowsToUpdateMap );
315        $dbTimestamp = $this->dbw->timestamp( $this->curTimeUnix );
316        foreach ( array_chunk( $participantRowsToUpdate, $this->getBatchSize() ) as $idBatch ) {
317            $this->dbw->newUpdateQueryBuilder()
318                ->update( 'ce_participants' )
319                ->set( [ 'cep_aggregation_timestamp' => $dbTimestamp ] )
320                ->where( [ 'cep_id' => $idBatch ] )
321                ->caller( __METHOD__ )
322                ->execute();
323        }
324    }
325
326    /**
327     * Purge redundant aggregated data: for each event that has ended, delete any aggregated answers to questions that
328     * are no longer enabled for that event.
329     * Note that the aggregation itself ({@see self::processBatch()}) uses `ce_question_answers` as its data source,
330     * and does not make any distinction between old answers and finished events; therefore, it cannot handle the
331     * deletion of redundant aggregates as part of its execution: if a certain event has redundant aggregates but
332     * nothing to aggregate, it won't be processed at all.
333     * Also, note that the DB schema does not store whether the "final" aggregation (i.e., the first aggregation after
334     * the event end date) has occurred for a given event. Therefore, we need to check all the events here.
335     */
336    private function purgeAggregates(): void {
337        $this->output( "Purging old aggregated data...\n" );
338
339        $maxEventID = (int)$this->dbr->newSelectQueryBuilder()
340            ->select( 'MAX(event_id)' )
341            ->from( 'campaign_events' )
342            ->caller( __METHOD__ )
343            ->fetchField();
344        if ( $maxEventID === 0 ) {
345            $this->output( "No events.\n" );
346            return;
347        }
348
349        $batchSize = $this->getBatchSize();
350        $startID = 0;
351        $endID = $batchSize;
352        $curDBTimestamp = $this->dbr->timestamp( $this->curTimeUnix );
353        do {
354            // Note, we may already have partial data in $this->eventEndTimes. However, since it's partial, we'll need
355            // to query the DB again. Excluding events for which we already have data is probably useless, as it would
356            // introduce complexity and give little to nothing in return.
357            $eventsToCheck = $this->dbr->newSelectQueryBuilder()
358                ->select( 'event_id' )
359                ->from( 'campaign_events' )
360                ->where( [
361                    $this->dbr->expr( 'event_id', '>', $startID ),
362                    $this->dbr->expr( 'event_id', '<=', $endID ),
363                    $this->dbr->expr( 'event_end_utc', '<', $curDBTimestamp ),
364                ] )
365                ->caller( __METHOD__ )
366                ->fetchFieldValues();
367            if ( $eventsToCheck ) {
368                $eventsToCheck = array_map( 'intval', $eventsToCheck );
369                $this->purgeAggregatesForEvents( $eventsToCheck );
370            }
371
372            $startID = $endID;
373            $endID += $batchSize;
374        } while ( $startID < $maxEventID );
375
376        $this->output( "Done.\n" );
377    }
378
379    /**
380     * @param int[] $eventIDs
381     */
382    private function purgeAggregatesForEvents( array $eventIDs ): void {
383        $eventQuestionRows = $this->dbr->newSelectQueryBuilder()
384            ->select( [ 'ceeq_event_id', 'ceeq_question_id' ] )
385            ->from( 'ce_event_questions' )
386            ->where( [
387                'ceeq_event_id' => $eventIDs
388            ] )
389            ->caller( __METHOD__ )
390            ->fetchResultSet();
391        $questionsByEvent = array_fill_keys( $eventIDs, [] );
392        foreach ( $eventQuestionRows as $eventQuestionRow ) {
393            $eventID = (int)$eventQuestionRow->ceeq_event_id;
394            $questionsByEvent[$eventID][] = (int)$eventQuestionRow->ceeq_question_id;
395        }
396
397        $aggregatedAnswersRows = $this->dbr->newSelectQueryBuilder()
398            ->select( [ 'ceqag_id', 'ceqag_event_id', 'ceqag_question_id' ] )
399            ->from( 'ce_question_aggregation' )
400            ->where( [
401                'ceqag_event_id' => $eventIDs
402            ] )
403            ->caller( __METHOD__ )
404            ->fetchResultSet();
405        $deleteRowIDs = [];
406        foreach ( $aggregatedAnswersRows as $aggregatedAnswersRow ) {
407            $eventID = (int)$aggregatedAnswersRow->ceqag_event_id;
408            $questionID = (int)$aggregatedAnswersRow->ceqag_question_id;
409            if ( !in_array( $questionID, $questionsByEvent[$eventID], true ) ) {
410                $deleteRowIDs[] = (int)$aggregatedAnswersRow->ceqag_id;
411            }
412        }
413
414        if ( $deleteRowIDs ) {
415            $this->dbw->newDeleteQueryBuilder()
416                ->deleteFrom( 'ce_question_aggregation' )
417                ->where( [ 'ceqag_id' => $deleteRowIDs ] )
418                ->caller( __METHOD__ )
419                ->execute();
420        }
421    }
422}
423
424return AggregateParticipantAnswers::class;