Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
93.94% |
217 / 231 |
|
50.00% |
4 / 8 |
CRAP | |
0.00% |
0 / 1 |
AggregateParticipantAnswers | |
94.35% |
217 / 230 |
|
50.00% |
4 / 8 |
35.22 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
4 / 4 |
|
100.00% |
1 / 1 |
1 | |||
execute | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
aggregateAnswers | |
91.67% |
33 / 36 |
|
0.00% |
0 / 1 |
2.00 | |||
processBatch | |
98.28% |
57 / 58 |
|
0.00% |
0 / 1 |
10 | |||
loadDataForBatch | |
87.93% |
51 / 58 |
|
0.00% |
0 / 1 |
11.21 | |||
updateAggregationTimestamps | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
2 | |||
purgeAggregates | |
93.33% |
28 / 30 |
|
0.00% |
0 / 1 |
3.00 | |||
purgeAggregatesForEvents | |
100.00% |
32 / 32 |
|
100.00% |
1 / 1 |
5 |
1 | <?php |
2 | |
3 | declare( strict_types=1 ); |
4 | |
5 | namespace MediaWiki\Extension\CampaignEvents\Maintenance; |
6 | |
7 | use MediaWiki\Extension\CampaignEvents\CampaignEventsServices; |
8 | use MediaWiki\Extension\CampaignEvents\Questions\EventAggregatedAnswersStore; |
9 | use MediaWiki\Maintenance\Maintenance; |
10 | use MediaWiki\Utils\MWTimestamp; |
11 | use RuntimeException; |
12 | use Wikimedia\Rdbms\IDatabase; |
13 | |
14 | /** |
15 | * Maintenance script that aggregates and deletes participant answers after a predefined amount of time. |
16 | */ |
17 | class AggregateParticipantAnswers extends Maintenance { |
18 | private ?IDatabase $dbw; |
19 | private ?IDatabase $dbr; |
20 | |
21 | private int $curTimeUnix; |
22 | private int $cutoffTimeUnix; |
23 | |
24 | /** |
25 | * @var array<int,int> Map of [ event ID => end UNIX timestamp ] |
26 | */ |
27 | private array $eventEndTimes = []; |
28 | |
29 | /** |
30 | * Map of users to skip because their answers have already been aggregated in the past. |
31 | * |
32 | * @var array<int,array<int,true>> Map of [ event ID => [ user ID => true ] ] |
33 | */ |
34 | private array $usersToSkipPerEvent = []; |
35 | |
36 | /** |
37 | * Array of first answer time for each participant. Note that this doesn't include users who should be skipped. |
38 | * |
39 | * @var array<int,array<int,int>> Map of [ event ID => [ user ID => first answer UNIX timestamp ] ] |
40 | */ |
41 | private array $userFirstAnswerPerEventTimes = []; |
42 | |
43 | /** |
44 | * Map of cep_id values for participant rows that were scanned |
45 | * |
46 | * @var array<int,array<int,int>> Map of [ event ID => [ user ID => row ID ] ] |
47 | */ |
48 | private array $participantRowIDsMap = []; |
49 | |
50 | /** |
51 | * Map of cep_id values for rows where we need to update the aggregation timestamp. |
52 | * |
53 | * @var array<int,true> Map of [ row ID => true ] |
54 | */ |
55 | private array $participantRowsToUpdateMap = []; |
56 | |
57 | /** @var callable|null */ |
58 | private $rollbackTransactionFn; |
59 | |
60 | public function __construct() { |
61 | parent::__construct(); |
62 | $this->addDescription( 'Aggregates and deletes participant answers after a predefined amount of time' ); |
63 | $this->setBatchSize( 500 ); |
64 | $this->requireExtension( 'CampaignEvents' ); |
65 | } |
66 | |
67 | /** |
68 | * @inheritDoc |
69 | */ |
70 | public function execute(): void { |
71 | $this->aggregateAnswers(); |
72 | $this->purgeAggregates(); |
73 | } |
74 | |
75 | /** |
76 | * Main method that handles the aggregation of individual answers (updates the aggregates and the aggregation |
77 | * timestamps, and deletes the individual answers). |
78 | */ |
79 | private function aggregateAnswers(): void { |
80 | $this->output( "Aggregating old participant answers...\n" ); |
81 | |
82 | $this->curTimeUnix = (int)MWTimestamp::now( TS_UNIX ); |
83 | $this->cutoffTimeUnix = $this->curTimeUnix - EventAggregatedAnswersStore::ANSWERS_TTL_SEC; |
84 | $dbHelper = CampaignEventsServices::getDatabaseHelper(); |
85 | $this->dbr = $dbHelper->getDBConnection( DB_REPLICA ); |
86 | $this->dbw = $dbHelper->getDBConnection( DB_PRIMARY ); |
87 | $batchSize = $this->getBatchSize(); |
88 | |
89 | $maxRowID = (int)$this->dbr->newSelectQueryBuilder() |
90 | ->select( 'MAX(ceqa_id)' ) |
91 | ->from( 'ce_question_answers' ) |
92 | ->caller( __METHOD__ ) |
93 | ->fetchField(); |
94 | if ( $maxRowID === 0 ) { |
95 | $this->output( "Table is empty.\n" ); |
96 | return; |
97 | } |
98 | $minRowID = (int)$this->dbr->newSelectQueryBuilder() |
99 | ->select( 'MIN(ceqa_id)' ) |
100 | ->from( 'ce_question_answers' ) |
101 | ->caller( __METHOD__ ) |
102 | ->fetchField(); |
103 | |
104 | // Wrap all the changes into a transaction to make sure we don't leave incomplete updates behind. This is also |
105 | // needed to avoid edge cases like: |
106 | // - The script fails after aggregating a user's answers, but before updating their cep_aggregation_timestamp |
107 | // - An event's end date is changed while we are aggregating answers |
108 | // Because of the PII nature of participant answers, we want to try and avoid these edge cases as much as |
109 | // possible, especially those that could inadvertently leak PII. |
110 | $transactionName = __METHOD__; |
111 | $this->beginTransaction( $this->dbw, $transactionName ); |
112 | $this->rollbackTransactionFn = function () use ( $transactionName ) { |
113 | $this->rollbackTransaction( $this->dbw, $transactionName ); |
114 | }; |
115 | $prevID = $minRowID - 1; |
116 | $curID = $prevID + $batchSize; |
117 | do { |
118 | // Avoid locking nonexistent rows (T381622) |
119 | $batchEnd = min( $curID, $maxRowID ); |
120 | $this->processBatch( $prevID, $batchEnd ); |
121 | $prevID = $curID; |
122 | $curID += $batchSize; |
123 | $this->waitForReplication(); |
124 | } while ( $prevID < $maxRowID ); |
125 | |
126 | $this->updateAggregationTimestamps(); |
127 | $this->commitTransaction( $this->dbw, $transactionName ); |
128 | |
129 | $this->output( "Done.\n" ); |
130 | } |
131 | |
132 | private function processBatch( int $startID, int $endID ): void { |
133 | // Lock the rows to prevent further changes. |
134 | $res = $this->dbw->newSelectQueryBuilder() |
135 | ->select( '*' ) |
136 | ->from( 'ce_question_answers' ) |
137 | ->where( [ |
138 | $this->dbw->expr( 'ceqa_id', '>', $startID ), |
139 | $this->dbw->expr( 'ceqa_id', '<=', $endID ), |
140 | ] ) |
141 | ->forUpdate() |
142 | ->caller( __METHOD__ ) |
143 | ->fetchResultSet(); |
144 | |
145 | $this->loadDataForBatch( $res ); |
146 | |
147 | $deleteRowIDs = []; |
148 | $newAggregateTuples = []; |
149 | foreach ( $res as $row ) { |
150 | $eventID = (int)$row->ceqa_event_id; |
151 | $userID = (int)$row->ceqa_user_id; |
152 | |
153 | if ( isset( $this->usersToSkipPerEvent[$eventID][$userID] ) ) { |
154 | // Note, this check should be redundant as long as we delete all user answers, |
155 | // including the non-PII ones. |
156 | continue; |
157 | } |
158 | |
159 | if ( |
160 | $this->eventEndTimes[$eventID] < $this->curTimeUnix || |
161 | $this->userFirstAnswerPerEventTimes[$eventID][$userID] < $this->cutoffTimeUnix |
162 | ) { |
163 | $deleteRowIDs[] = (int)$row->ceqa_id; |
164 | |
165 | $question = (int)$row->ceqa_question_id; |
166 | $option = (int)$row->ceqa_answer_option; |
167 | $newAggregateTuples[$eventID] ??= []; |
168 | $newAggregateTuples[$eventID][$question] ??= []; |
169 | $newAggregateTuples[$eventID][$question][$option] ??= 0; |
170 | $newAggregateTuples[$eventID][$question][$option]++; |
171 | |
172 | $participantID = $this->participantRowIDsMap[$eventID][$userID]; |
173 | $this->participantRowsToUpdateMap[$participantID] = true; |
174 | } |
175 | } |
176 | |
177 | if ( $newAggregateTuples ) { |
178 | $newAggregateRows = []; |
179 | foreach ( $newAggregateTuples as $event => $eventData ) { |
180 | foreach ( $eventData as $question => $questionData ) { |
181 | foreach ( $questionData as $option => $amount ) { |
182 | $newAggregateRows[] = [ |
183 | 'ceqag_event_id' => $event, |
184 | 'ceqag_question_id' => $question, |
185 | 'ceqag_answer_option' => $option, |
186 | 'ceqag_answers_amount' => $amount, |
187 | ]; |
188 | } |
189 | } |
190 | } |
191 | |
192 | $this->dbw->newInsertQueryBuilder() |
193 | ->insertInto( 'ce_question_aggregation' ) |
194 | ->rows( $newAggregateRows ) |
195 | ->onDuplicateKeyUpdate() |
196 | ->uniqueIndexFields( [ 'ceqag_event_id', 'ceqag_question_id', 'ceqag_answer_option' ] ) |
197 | ->set( [ |
198 | 'ceqag_answers_amount = ceqag_answers_amount + ' . |
199 | $this->dbw->buildExcludedValue( 'ceqag_answers_amount' ) |
200 | ] ) |
201 | ->caller( __METHOD__ ) |
202 | ->execute(); |
203 | } |
204 | |
205 | if ( $deleteRowIDs ) { |
206 | $this->dbw->newDeleteQueryBuilder() |
207 | ->deleteFrom( 'ce_question_answers' ) |
208 | ->where( [ 'ceqa_id' => $deleteRowIDs ] ) |
209 | ->caller( __METHOD__ ) |
210 | ->execute(); |
211 | } |
212 | |
213 | $this->output( "Batch $startID-$endID done.\n" ); |
214 | } |
215 | |
216 | /** |
217 | * Given a batch of answers, loads data that will be needed to process that batch: |
218 | * - The end time of each event |
219 | * - The time of first answer of each participant in the batch |
220 | * - Participants whose answers have already been aggregated |
221 | * |
222 | * @param iterable $res |
223 | * @return void |
224 | */ |
225 | private function loadDataForBatch( iterable $res ): void { |
226 | // Use maps to discard duplicates. |
227 | $eventsMap = []; |
228 | $usersByEventMap = []; |
229 | foreach ( $res as $row ) { |
230 | $eventID = (int)$row->ceqa_event_id; |
231 | $eventsMap[$eventID] = true; |
232 | $usersByEventMap[$eventID] ??= []; |
233 | $userID = (int)$row->ceqa_user_id; |
234 | $usersByEventMap[$eventID][$userID] = true; |
235 | } |
236 | |
237 | $eventsWithNoInfo = array_keys( array_diff_key( $eventsMap, $this->eventEndTimes ) ); |
238 | if ( $eventsWithNoInfo ) { |
239 | // Lock the event rows as well, to prevent changes while we aggregate the answers. |
240 | $eventRows = $this->dbw->newSelectQueryBuilder() |
241 | ->select( [ 'event_id', 'event_end_utc' ] ) |
242 | ->from( 'campaign_events' ) |
243 | ->where( [ 'event_id' => $eventsWithNoInfo ] ) |
244 | ->forUpdate() |
245 | ->caller( __METHOD__ ) |
246 | ->fetchResultSet(); |
247 | foreach ( $eventRows as $row ) { |
248 | $eventID = (int)$row->event_id; |
249 | $this->eventEndTimes[$eventID] = (int)wfTimestamp( TS_UNIX, $row->event_end_utc ); |
250 | } |
251 | } |
252 | |
253 | $missingUsersByEventMap = []; |
254 | $allMissingUsersMap = []; |
255 | foreach ( $usersByEventMap as $eventID => $eventUsersMap ) { |
256 | $usersToProcess = array_diff_key( $eventUsersMap, $this->usersToSkipPerEvent[$eventID] ?? [] ); |
257 | $usersToProcess = array_diff_key( |
258 | $usersToProcess, |
259 | $this->userFirstAnswerPerEventTimes[$eventID] ?? [] |
260 | ); |
261 | if ( $usersToProcess ) { |
262 | $missingUsersByEventMap[$eventID] = $usersToProcess; |
263 | $allMissingUsersMap += $usersToProcess; |
264 | } |
265 | } |
266 | |
267 | if ( !$missingUsersByEventMap ) { |
268 | return; |
269 | } |
270 | // Note that this query includes more rows than the ones we're looking for, because we're not filtering |
271 | // by tuples in the where condition. Results are filtered later. |
272 | // No need to obtain these from the primary DB, because concurrent changes to a record should not really |
273 | // affect the script execution. |
274 | $userRows = $this->dbr->newSelectQueryBuilder() |
275 | ->select( '*' ) |
276 | ->from( 'ce_participants' ) |
277 | ->where( [ |
278 | 'cep_event_id' => array_keys( $missingUsersByEventMap ), |
279 | 'cep_user_id' => array_keys( $allMissingUsersMap ), |
280 | ] ) |
281 | ->caller( __METHOD__ ) |
282 | ->fetchResultSet(); |
283 | |
284 | foreach ( $userRows as $row ) { |
285 | $eventID = (int)$row->cep_event_id; |
286 | $userID = (int)$row->cep_user_id; |
287 | if ( !isset( $missingUsersByEventMap[$eventID][$userID] ) ) { |
288 | // Unwanted row, ignore it. |
289 | continue; |
290 | } |
291 | if ( $row->cep_aggregation_timestamp !== null ) { |
292 | $this->usersToSkipPerEvent[$eventID][$userID] = true; |
293 | continue; |
294 | } |
295 | $firstAnswerTS = wfTimestampOrNull( TS_UNIX, $row->cep_first_answer_timestamp ); |
296 | if ( !is_string( $firstAnswerTS ) ) { |
297 | ( $this->rollbackTransactionFn )(); |
298 | throw new RuntimeException( |
299 | "User with answers but no first answer time (event=$eventID, user=$userID)" |
300 | ); |
301 | } |
302 | $this->userFirstAnswerPerEventTimes[$eventID][$userID] = (int)$firstAnswerTS; |
303 | $this->participantRowIDsMap[$eventID][$userID] = (int)$row->cep_id; |
304 | } |
305 | } |
306 | |
307 | /** |
308 | * Updates the aggregation timestamp for all users processed thus far. |
309 | * Note, this needs to be done after all the rows have been processed, to make sure that a non-null |
310 | * cep_aggregation_timestamp doesn't cause the user to be skipped in subsequent batches. |
311 | */ |
312 | private function updateAggregationTimestamps(): void { |
313 | $this->output( "Updating aggregation timestamps...\n" ); |
314 | $participantRowsToUpdate = array_keys( $this->participantRowsToUpdateMap ); |
315 | $dbTimestamp = $this->dbw->timestamp( $this->curTimeUnix ); |
316 | foreach ( array_chunk( $participantRowsToUpdate, $this->getBatchSize() ) as $idBatch ) { |
317 | $this->dbw->newUpdateQueryBuilder() |
318 | ->update( 'ce_participants' ) |
319 | ->set( [ 'cep_aggregation_timestamp' => $dbTimestamp ] ) |
320 | ->where( [ 'cep_id' => $idBatch ] ) |
321 | ->caller( __METHOD__ ) |
322 | ->execute(); |
323 | } |
324 | } |
325 | |
326 | /** |
327 | * Purge redundant aggregated data: for each event that has ended, delete any aggregated answers to questions that |
328 | * are no longer enabled for that event. |
329 | * Note that the aggregation itself ({@see self::processBatch()}) uses `ce_question_answers` as its data source, |
330 | * and does not make any distinction between old answers and finished events; therefore, it cannot handle the |
331 | * deletion of redundant aggregates as part of its execution: if a certain event has redundant aggregates but |
332 | * nothing to aggregate, it won't be processed at all. |
333 | * Also, note that the DB schema does not store whether the "final" aggregation (i.e., the first aggregation after |
334 | * the event end date) has occurred for a given event. Therefore, we need to check all the events here. |
335 | */ |
336 | private function purgeAggregates(): void { |
337 | $this->output( "Purging old aggregated data...\n" ); |
338 | |
339 | $maxEventID = (int)$this->dbr->newSelectQueryBuilder() |
340 | ->select( 'MAX(event_id)' ) |
341 | ->from( 'campaign_events' ) |
342 | ->caller( __METHOD__ ) |
343 | ->fetchField(); |
344 | if ( $maxEventID === 0 ) { |
345 | $this->output( "No events.\n" ); |
346 | return; |
347 | } |
348 | |
349 | $batchSize = $this->getBatchSize(); |
350 | $startID = 0; |
351 | $endID = $batchSize; |
352 | $curDBTimestamp = $this->dbr->timestamp( $this->curTimeUnix ); |
353 | do { |
354 | // Note, we may already have partial data in $this->eventEndTimes. However, since it's partial, we'll need |
355 | // to query the DB again. Excluding events for which we already have data is probably useless, as it would |
356 | // introduce complexity and give little to nothing in return. |
357 | $eventsToCheck = $this->dbr->newSelectQueryBuilder() |
358 | ->select( 'event_id' ) |
359 | ->from( 'campaign_events' ) |
360 | ->where( [ |
361 | $this->dbr->expr( 'event_id', '>', $startID ), |
362 | $this->dbr->expr( 'event_id', '<=', $endID ), |
363 | $this->dbr->expr( 'event_end_utc', '<', $curDBTimestamp ), |
364 | ] ) |
365 | ->caller( __METHOD__ ) |
366 | ->fetchFieldValues(); |
367 | if ( $eventsToCheck ) { |
368 | $eventsToCheck = array_map( 'intval', $eventsToCheck ); |
369 | $this->purgeAggregatesForEvents( $eventsToCheck ); |
370 | } |
371 | |
372 | $startID = $endID; |
373 | $endID += $batchSize; |
374 | } while ( $startID < $maxEventID ); |
375 | |
376 | $this->output( "Done.\n" ); |
377 | } |
378 | |
379 | /** |
380 | * @param int[] $eventIDs |
381 | */ |
382 | private function purgeAggregatesForEvents( array $eventIDs ): void { |
383 | $eventQuestionRows = $this->dbr->newSelectQueryBuilder() |
384 | ->select( [ 'ceeq_event_id', 'ceeq_question_id' ] ) |
385 | ->from( 'ce_event_questions' ) |
386 | ->where( [ |
387 | 'ceeq_event_id' => $eventIDs |
388 | ] ) |
389 | ->caller( __METHOD__ ) |
390 | ->fetchResultSet(); |
391 | $questionsByEvent = array_fill_keys( $eventIDs, [] ); |
392 | foreach ( $eventQuestionRows as $eventQuestionRow ) { |
393 | $eventID = (int)$eventQuestionRow->ceeq_event_id; |
394 | $questionsByEvent[$eventID][] = (int)$eventQuestionRow->ceeq_question_id; |
395 | } |
396 | |
397 | $aggregatedAnswersRows = $this->dbr->newSelectQueryBuilder() |
398 | ->select( [ 'ceqag_id', 'ceqag_event_id', 'ceqag_question_id' ] ) |
399 | ->from( 'ce_question_aggregation' ) |
400 | ->where( [ |
401 | 'ceqag_event_id' => $eventIDs |
402 | ] ) |
403 | ->caller( __METHOD__ ) |
404 | ->fetchResultSet(); |
405 | $deleteRowIDs = []; |
406 | foreach ( $aggregatedAnswersRows as $aggregatedAnswersRow ) { |
407 | $eventID = (int)$aggregatedAnswersRow->ceqag_event_id; |
408 | $questionID = (int)$aggregatedAnswersRow->ceqag_question_id; |
409 | if ( !in_array( $questionID, $questionsByEvent[$eventID], true ) ) { |
410 | $deleteRowIDs[] = (int)$aggregatedAnswersRow->ceqag_id; |
411 | } |
412 | } |
413 | |
414 | if ( $deleteRowIDs ) { |
415 | $this->dbw->newDeleteQueryBuilder() |
416 | ->deleteFrom( 'ce_question_aggregation' ) |
417 | ->where( [ 'ceqag_id' => $deleteRowIDs ] ) |
418 | ->caller( __METHOD__ ) |
419 | ->execute(); |
420 | } |
421 | } |
422 | } |
423 | |
424 | return AggregateParticipantAnswers::class; |