Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
93.91% covered (success)
93.91%
216 / 230
50.00% covered (danger)
50.00%
4 / 8
CRAP
0.00% covered (danger)
0.00%
0 / 1
AggregateParticipantAnswers
94.32% covered (success)
94.32%
216 / 229
50.00% covered (danger)
50.00%
4 / 8
35.22
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
 execute
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 aggregateAnswers
91.43% covered (success)
91.43%
32 / 35
0.00% covered (danger)
0.00%
0 / 1
2.00
 processBatch
98.28% covered (success)
98.28%
57 / 58
0.00% covered (danger)
0.00%
0 / 1
10
 loadDataForBatch
87.93% covered (warning)
87.93%
51 / 58
0.00% covered (danger)
0.00%
0 / 1
11.21
 updateAggregationTimestamps
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
2
 purgeAggregates
93.33% covered (success)
93.33%
28 / 30
0.00% covered (danger)
0.00%
0 / 1
3.00
 purgeAggregatesForEvents
100.00% covered (success)
100.00%
32 / 32
100.00% covered (success)
100.00%
1 / 1
5
1<?php
2
3declare( strict_types=1 );
4
5namespace MediaWiki\Extension\CampaignEvents\Maintenance;
6
7use Maintenance;
8use MediaWiki\Extension\CampaignEvents\CampaignEventsServices;
9use MediaWiki\Extension\CampaignEvents\Questions\EventAggregatedAnswersStore;
10use MediaWiki\Utils\MWTimestamp;
11use RuntimeException;
12use Wikimedia\Rdbms\IDatabase;
13
14/**
15 * Maintenance script that aggregates and deletes participant answers after a predefined amount of time.
16 */
17class AggregateParticipantAnswers extends Maintenance {
18    private ?IDatabase $dbw;
19    private ?IDatabase $dbr;
20
21    private int $curTimeUnix;
22    private int $cutoffTimeUnix;
23
24    /**
25     * @var array<int,int> Map of [ event ID => end UNIX timestamp ]
26     */
27    private array $eventEndTimes = [];
28
29    /**
30     * Map of users to skip because their answers have already been aggregated in the past.
31     *
32     * @var array<int,array<int,true>> Map of [ event ID => [ user ID => true ] ]
33     */
34    private array $usersToSkipPerEvent = [];
35
36    /**
37     * Array of first answer time for each participant. Note that this doesn't include users who should be skipped.
38     *
39     * @var array<int,array<int,int>> Map of [ event ID => [ user ID => first answer UNIX timestamp ] ]
40     */
41    private array $userFirstAnswerPerEventTimes = [];
42
43    /**
44     * Map of cep_id values for participant rows that were scanned
45     *
46     * @var array<int,array<int,int>> Map of [ event ID => [ user ID => row ID ] ]
47     */
48    private array $participantRowIDsMap = [];
49
50    /**
51     * Map of cep_id values for rows where we need to update the aggregation timestamp.
52     *
53     * @var array<int,true> Map of [ row ID => true ]
54     */
55    private array $participantRowsToUpdateMap = [];
56
57    /** @var callable|null */
58    private $rollbackTransactionFn;
59
60    public function __construct() {
61        parent::__construct();
62        $this->addDescription( 'Aggregates and deletes participant answers after a predefined amount of time' );
63        $this->setBatchSize( 500 );
64        $this->requireExtension( 'CampaignEvents' );
65    }
66
67    /**
68     * @inheritDoc
69     */
70    public function execute(): void {
71        $this->aggregateAnswers();
72        $this->purgeAggregates();
73    }
74
75    /**
76     * Main method that handles the aggregation of individual answers (updates the aggregates and the aggregation
77     * timestamps, and deletes the individual answers).
78     */
79    private function aggregateAnswers(): void {
80        $this->output( "Aggregating old participant answers...\n" );
81
82        $this->curTimeUnix = (int)MWTimestamp::now( TS_UNIX );
83        $this->cutoffTimeUnix = $this->curTimeUnix - EventAggregatedAnswersStore::ANSWERS_TTL_SEC;
84        $dbHelper = CampaignEventsServices::getDatabaseHelper();
85        $this->dbr = $dbHelper->getDBConnection( DB_REPLICA );
86        $this->dbw = $dbHelper->getDBConnection( DB_PRIMARY );
87        $batchSize = $this->getBatchSize();
88
89        $maxRowID = (int)$this->dbr->newSelectQueryBuilder()
90            ->select( 'MAX(ceqa_id)' )
91            ->from( 'ce_question_answers' )
92            ->caller( __METHOD__ )
93            ->fetchField();
94        if ( $maxRowID === 0 ) {
95            $this->output( "Table is empty.\n" );
96            return;
97        }
98        $minRowID = (int)$this->dbr->newSelectQueryBuilder()
99            ->select( 'MIN(ceqa_id)' )
100            ->from( 'ce_question_answers' )
101            ->caller( __METHOD__ )
102            ->fetchField();
103
104        // Wrap all the changes into a transaction to make sure we don't leave incomplete updates behind. This is also
105        // needed to avoid edge cases like:
106        // - The script fails after aggregating a user's answers, but before updating their cep_aggregation_timestamp
107        // - An event's end date is changed while we are aggregating answers
108        // Because of the PII nature of participant answers, we want to try and avoid these edge cases as much as
109        // possible, especially those that could inadvertently leak PII.
110        $transactionName = __METHOD__;
111        $this->beginTransaction( $this->dbw, $transactionName );
112        $this->rollbackTransactionFn = function () use ( $transactionName ) {
113            $this->rollbackTransaction( $this->dbw, $transactionName );
114        };
115        $prevID = $minRowID - 1;
116        $curID = $prevID + $batchSize;
117        do {
118            $this->processBatch( $prevID, $curID );
119            $prevID = $curID;
120            $curID += $batchSize;
121            $dbHelper->waitForReplication();
122        } while ( $prevID < $maxRowID );
123
124        $this->updateAggregationTimestamps();
125        $this->commitTransaction( $this->dbw, $transactionName );
126
127        $this->output( "Done.\n" );
128    }
129
130    private function processBatch( int $startID, int $endID ): void {
131        // Lock the rows to prevent further changes.
132        $res = $this->dbw->newSelectQueryBuilder()
133            ->select( '*' )
134            ->from( 'ce_question_answers' )
135            ->where( [
136                $this->dbw->expr( 'ceqa_id', '>', $startID ),
137                $this->dbw->expr( 'ceqa_id', '<=', $endID ),
138            ] )
139            ->forUpdate()
140            ->caller( __METHOD__ )
141            ->fetchResultSet();
142
143        $this->loadDataForBatch( $res );
144
145        $deleteRowIDs = [];
146        $newAggregateTuples = [];
147        foreach ( $res as $row ) {
148            $eventID = (int)$row->ceqa_event_id;
149            $userID = (int)$row->ceqa_user_id;
150
151            if ( isset( $this->usersToSkipPerEvent[$eventID][$userID] ) ) {
152                // Note, this check should be redundant as long as we delete all user answers,
153                // including the non-PII ones.
154                continue;
155            }
156
157            if (
158                $this->eventEndTimes[$eventID] < $this->curTimeUnix ||
159                $this->userFirstAnswerPerEventTimes[$eventID][$userID] < $this->cutoffTimeUnix
160            ) {
161                $deleteRowIDs[] = (int)$row->ceqa_id;
162
163                $question = (int)$row->ceqa_question_id;
164                $option = (int)$row->ceqa_answer_option;
165                $newAggregateTuples[$eventID] ??= [];
166                $newAggregateTuples[$eventID][$question] ??= [];
167                $newAggregateTuples[$eventID][$question][$option] ??= 0;
168                $newAggregateTuples[$eventID][$question][$option]++;
169
170                $participantID = $this->participantRowIDsMap[$eventID][$userID];
171                $this->participantRowsToUpdateMap[$participantID] = true;
172            }
173        }
174
175        if ( $newAggregateTuples ) {
176            $newAggregateRows = [];
177            foreach ( $newAggregateTuples as $event => $eventData ) {
178                foreach ( $eventData as $question => $questionData ) {
179                    foreach ( $questionData as $option => $amount ) {
180                        $newAggregateRows[] = [
181                            'ceqag_event_id' => $event,
182                            'ceqag_question_id' => $question,
183                            'ceqag_answer_option' => $option,
184                            'ceqag_answers_amount' => $amount,
185                        ];
186                    }
187                }
188            }
189
190            $this->dbw->newInsertQueryBuilder()
191                ->insertInto( 'ce_question_aggregation' )
192                ->rows( $newAggregateRows )
193                ->onDuplicateKeyUpdate()
194                ->uniqueIndexFields( [ 'ceqag_event_id', 'ceqag_question_id', 'ceqag_answer_option' ] )
195                ->set( [
196                    'ceqag_answers_amount = ceqag_answers_amount + ' .
197                        $this->dbw->buildExcludedValue( 'ceqag_answers_amount' )
198                ] )
199                ->caller( __METHOD__ )
200                ->execute();
201        }
202
203        if ( $deleteRowIDs ) {
204            $this->dbw->newDeleteQueryBuilder()
205                ->deleteFrom( 'ce_question_answers' )
206                ->where( [ 'ceqa_id' => $deleteRowIDs ] )
207                ->caller( __METHOD__ )
208                ->execute();
209        }
210
211        $this->output( "Batch $startID-$endID done.\n" );
212    }
213
214    /**
215     * Given a batch of answers, loads data that will be needed to process that batch:
216     *  - The end time of each event
217     *  - The time of first answer of each participant in the batch
218     *  - Participants whose answers have already been aggregated
219     *
220     * @param iterable $res
221     * @return void
222     */
223    private function loadDataForBatch( iterable $res ): void {
224        // Use maps to discard duplicates.
225        $eventsMap = [];
226        $usersByEventMap = [];
227        foreach ( $res as $row ) {
228            $eventID = (int)$row->ceqa_event_id;
229            $eventsMap[$eventID] = true;
230            $usersByEventMap[$eventID] ??= [];
231            $userID = (int)$row->ceqa_user_id;
232            $usersByEventMap[$eventID][$userID] = true;
233        }
234
235        $eventsWithNoInfo = array_keys( array_diff_key( $eventsMap, $this->eventEndTimes ) );
236        if ( $eventsWithNoInfo ) {
237            // Lock the event rows as well, to prevent changes while we aggregate the answers.
238            $eventRows = $this->dbw->newSelectQueryBuilder()
239                ->select( [ 'event_id', 'event_end_utc' ] )
240                ->from( 'campaign_events' )
241                ->where( [ 'event_id' => $eventsWithNoInfo ] )
242                ->forUpdate()
243                ->caller( __METHOD__ )
244                ->fetchResultSet();
245            foreach ( $eventRows as $row ) {
246                $eventID = (int)$row->event_id;
247                $this->eventEndTimes[$eventID] = (int)wfTimestamp( TS_UNIX, $row->event_end_utc );
248            }
249        }
250
251        $missingUsersByEventMap = [];
252        $allMissingUsersMap = [];
253        foreach ( $usersByEventMap as $eventID => $eventUsersMap ) {
254            $usersToProcess = array_diff_key( $eventUsersMap, $this->usersToSkipPerEvent[$eventID] ?? [] );
255            $usersToProcess = array_diff_key(
256                $usersToProcess,
257                $this->userFirstAnswerPerEventTimes[$eventID] ?? []
258            );
259            if ( $usersToProcess ) {
260                $missingUsersByEventMap[$eventID] = $usersToProcess;
261                $allMissingUsersMap += $usersToProcess;
262            }
263        }
264
265        if ( !$missingUsersByEventMap ) {
266            return;
267        }
268        // Note that this query includes more rows than the ones we're looking for, because we're not filtering
269        // by tuples in the where condition. Results are filtered later.
270        // No need to obtain these from the primary DB, because concurrent changes to a record should not really
271        // affect the script execution.
272        $userRows = $this->dbr->newSelectQueryBuilder()
273            ->select( '*' )
274            ->from( 'ce_participants' )
275            ->where( [
276                'cep_event_id' => array_keys( $missingUsersByEventMap ),
277                'cep_user_id' => array_keys( $allMissingUsersMap ),
278            ] )
279            ->caller( __METHOD__ )
280            ->fetchResultSet();
281
282        foreach ( $userRows as $row ) {
283            $eventID = (int)$row->cep_event_id;
284            $userID = (int)$row->cep_user_id;
285            if ( !isset( $missingUsersByEventMap[$eventID][$userID] ) ) {
286                // Unwanted row, ignore it.
287                continue;
288            }
289            if ( $row->cep_aggregation_timestamp !== null ) {
290                $this->usersToSkipPerEvent[$eventID][$userID] = true;
291                continue;
292            }
293            $firstAnswerTS = wfTimestampOrNull( TS_UNIX, $row->cep_first_answer_timestamp );
294            if ( !is_string( $firstAnswerTS ) ) {
295                ( $this->rollbackTransactionFn )();
296                throw new RuntimeException(
297                    "User with answers but no first answer time (event=$eventID, user=$userID)"
298                );
299            }
300            $this->userFirstAnswerPerEventTimes[$eventID][$userID] = (int)$firstAnswerTS;
301            $this->participantRowIDsMap[$eventID][$userID] = (int)$row->cep_id;
302        }
303    }
304
305    /**
306     * Updates the aggregation timestamp for all users processed thus far.
307     * Note, this needs to be done after all the rows have been processed, to make sure that a non-null
308     * cep_aggregation_timestamp doesn't cause the user to be skipped in subsequent batches.
309     */
310    private function updateAggregationTimestamps(): void {
311        $this->output( "Updating aggregation timestamps...\n" );
312        $participantRowsToUpdate = array_keys( $this->participantRowsToUpdateMap );
313        $dbTimestamp = $this->dbw->timestamp( $this->curTimeUnix );
314        foreach ( array_chunk( $participantRowsToUpdate, $this->getBatchSize() ) as $idBatch ) {
315            $this->dbw->newUpdateQueryBuilder()
316                ->update( 'ce_participants' )
317                ->set( [ 'cep_aggregation_timestamp' => $dbTimestamp ] )
318                ->where( [ 'cep_id' => $idBatch ] )
319                ->caller( __METHOD__ )
320                ->execute();
321        }
322    }
323
324    /**
325     * Purge redundant aggregated data: for each event that has ended, delete any aggregated answers to questions that
326     * are no longer enabled for that event.
327     * Note that the aggregation itself ({@see self::processBatch()}) uses `ce_question_answers` as its data source,
328     * and does not make any distinction between old answers and finished events; therefore, it cannot handle the
329     * deletion of redundant aggregates as part of its execution: if a certain event has redundant aggregates but
330     * nothing to aggregate, it won't be processed at all.
331     * Also, note that the DB schema does not store whether the "final" aggregation (i.e., the first aggregation after
332     * the event end date) has occurred for a given event. Therefore, we need to check all the events here.
333     */
334    private function purgeAggregates(): void {
335        $this->output( "Purging old aggregated data...\n" );
336
337        $maxEventID = (int)$this->dbr->newSelectQueryBuilder()
338            ->select( 'MAX(event_id)' )
339            ->from( 'campaign_events' )
340            ->caller( __METHOD__ )
341            ->fetchField();
342        if ( $maxEventID === 0 ) {
343            $this->output( "No events.\n" );
344            return;
345        }
346
347        $batchSize = $this->getBatchSize();
348        $startID = 0;
349        $endID = $batchSize;
350        $curDBTimestamp = $this->dbr->timestamp( $this->curTimeUnix );
351        do {
352            // Note, we may already have partial data in $this->eventEndTimes. However, since it's partial, we'll need
353            // to query the DB again. Excluding events for which we already have data is probably useless, as it would
354            // introduce complexity and give little to nothing in return.
355            $eventsToCheck = $this->dbr->newSelectQueryBuilder()
356                ->select( 'event_id' )
357                ->from( 'campaign_events' )
358                ->where( [
359                    $this->dbr->expr( 'event_id', '>', $startID ),
360                    $this->dbr->expr( 'event_id', '<=', $endID ),
361                    $this->dbr->expr( 'event_end_utc', '<', $curDBTimestamp ),
362                ] )
363                ->caller( __METHOD__ )
364                ->fetchFieldValues();
365            if ( $eventsToCheck ) {
366                $eventsToCheck = array_map( 'intval', $eventsToCheck );
367                $this->purgeAggregatesForEvents( $eventsToCheck );
368            }
369
370            $startID = $endID;
371            $endID += $batchSize;
372        } while ( $startID < $maxEventID );
373
374        $this->output( "Done.\n" );
375    }
376
377    /**
378     * @param int[] $eventIDs
379     */
380    private function purgeAggregatesForEvents( array $eventIDs ): void {
381        $eventQuestionRows = $this->dbr->newSelectQueryBuilder()
382            ->select( [ 'ceeq_event_id', 'ceeq_question_id' ] )
383            ->from( 'ce_event_questions' )
384            ->where( [
385                'ceeq_event_id' => $eventIDs
386            ] )
387            ->caller( __METHOD__ )
388            ->fetchResultSet();
389        $questionsByEvent = array_fill_keys( $eventIDs, [] );
390        foreach ( $eventQuestionRows as $eventQuestionRow ) {
391            $eventID = (int)$eventQuestionRow->ceeq_event_id;
392            $questionsByEvent[$eventID][] = (int)$eventQuestionRow->ceeq_question_id;
393        }
394
395        $aggregatedAnswersRows = $this->dbr->newSelectQueryBuilder()
396            ->select( [ 'ceqag_id', 'ceqag_event_id', 'ceqag_question_id' ] )
397            ->from( 'ce_question_aggregation' )
398            ->where( [
399                'ceqag_event_id' => $eventIDs
400            ] )
401            ->caller( __METHOD__ )
402            ->fetchResultSet();
403        $deleteRowIDs = [];
404        foreach ( $aggregatedAnswersRows as $aggregatedAnswersRow ) {
405            $eventID = (int)$aggregatedAnswersRow->ceqag_event_id;
406            $questionID = (int)$aggregatedAnswersRow->ceqag_question_id;
407            if ( !in_array( $questionID, $questionsByEvent[$eventID], true ) ) {
408                $deleteRowIDs[] = (int)$aggregatedAnswersRow->ceqag_id;
409            }
410        }
411
412        if ( $deleteRowIDs ) {
413            $this->dbw->newDeleteQueryBuilder()
414                ->deleteFrom( 'ce_question_aggregation' )
415                ->where( [ 'ceqag_id' => $deleteRowIDs ] )
416                ->caller( __METHOD__ )
417                ->execute();
418        }
419    }
420}
421
422return AggregateParticipantAnswers::class;