Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 221 |
|
0.00% |
0 / 17 |
CRAP | |
0.00% |
0 / 1 |
SaneitizeJobs | |
0.00% |
0 / 214 |
|
0.00% |
0 / 17 |
2450 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
2 | |||
execute | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
init | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
checkJobClusterMismatch | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
showJobDetail | |
0.00% |
0 / 63 |
|
0.00% |
0 / 1 |
20 | |||
pushJobs | |
0.00% |
0 / 33 |
|
0.00% |
0 / 1 |
30 | |||
initClusters | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
initMetaStores | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
initProfile | |
0.00% |
0 / 25 |
|
0.00% |
0 / 1 |
20 | |||
getJobInfo | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
30 | |||
updateJob | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
createNewJob | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
deleteJob | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
getPressure | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
log | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
error | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
fatalError | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Maintenance; |
4 | |
5 | use CirrusSearch\Connection; |
6 | use CirrusSearch\Job\CheckerJob; |
7 | use CirrusSearch\MetaStore\MetaSaneitizeJobStore; |
8 | use CirrusSearch\Profile\SearchProfileService; |
9 | use CirrusSearch\UpdateGroup; |
10 | use MediaWiki\MediaWikiServices; |
11 | |
12 | /** |
13 | * Push some sanitize jobs to the JobQueue |
14 | * |
15 | * This program is free software; you can redistribute it and/or modify |
16 | * it under the terms of the GNU General Public License as published by |
17 | * the Free Software Foundation; either version 2 of the License, or |
18 | * (at your option) any later version. |
19 | * |
20 | * This program is distributed in the hope that it will be useful, |
21 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
22 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
23 | * GNU General Public License for more details. |
24 | * |
25 | * You should have received a copy of the GNU General Public License along |
26 | * with this program; if not, write to the Free Software Foundation, Inc., |
27 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
28 | * http://www.gnu.org/copyleft/gpl.html |
29 | */ |
30 | |
31 | $IP = getenv( 'MW_INSTALL_PATH' ); |
32 | if ( $IP === false ) { |
33 | $IP = __DIR__ . '/../../..'; |
34 | } |
35 | require_once "$IP/maintenance/Maintenance.php"; |
36 | require_once __DIR__ . '/../includes/Maintenance/Maintenance.php'; |
37 | |
38 | class SaneitizeJobs extends Maintenance { |
39 | /** |
40 | * @var MetaSaneitizeJobStore[] all metastores for write clusters |
41 | */ |
42 | private $metaStores; |
43 | |
44 | /** |
45 | * @var int min page id (from db) |
46 | */ |
47 | private $minId; |
48 | |
49 | /** |
50 | * @var int max page id (from db) |
51 | */ |
52 | private $maxId; |
53 | |
54 | /** |
55 | * @var string profile name |
56 | */ |
57 | private $profileName; |
58 | |
59 | /** |
60 | * @var string[] list of clusters to check |
61 | */ |
62 | private $clusters; |
63 | |
64 | public function __construct() { |
65 | parent::__construct(); |
66 | $this->addDescription( 'Manage sanitize jobs (CheckerJob). This ' . |
67 | 'script operates on all writable clusters by default. ' . |
68 | 'Add --cluster to work on a single cluster. Note that ' . |
69 | 'once a job has been pushed to a particular cluster the ' . |
70 | 'script will fail if you try to run the same job with ' . |
71 | 'different cluster options.' |
72 | ); |
73 | $this->addOption( 'push', 'Push some jobs to the job queue.' ); |
74 | $this->addOption( 'show', 'Display job info.' ); |
75 | $this->addOption( 'delete-job', 'Delete the job.' ); |
76 | $this->addOption( 'refresh-freq', 'Refresh rate in seconds this ' . |
77 | 'script is run from your crontab. This will be ' . |
78 | 'used to spread jobs over time. Defaults to 7200 (2 ' . |
79 | 'hours).', false, true ); |
80 | $this->addOption( 'job-name', 'Tells the script the name of the ' . |
81 | 'sanitize job only useful to run multiple sanitize jobs. ' . |
82 | 'Defaults to "default".', false, true ); |
83 | } |
84 | |
85 | public function execute() { |
86 | $this->init(); |
87 | if ( $this->hasOption( 'show' ) ) { |
88 | $this->showJobDetail(); |
89 | } elseif ( $this->hasOption( 'push' ) ) { |
90 | $this->pushJobs(); |
91 | } elseif ( $this->hasOption( 'delete-job' ) ) { |
92 | $this->deleteJob(); |
93 | } else { |
94 | $this->maybeHelp( true ); |
95 | } |
96 | |
97 | return true; |
98 | } |
99 | |
100 | private function init() { |
101 | $this->initClusters(); |
102 | $this->initMetaStores(); |
103 | $this->initProfile(); |
104 | } |
105 | |
106 | /** |
107 | * Basically we support two modes: |
108 | * - all writable cluster, cluster = null |
109 | * - single cluster, cluster = 'clusterName' |
110 | * If we detect a mismatch here we fail. |
111 | * @param \Elastica\Document $jobInfo check if the stored job match |
112 | * cluster config used by this script, will die if clusters mismatch |
113 | */ |
114 | private function checkJobClusterMismatch( \Elastica\Document $jobInfo ) { |
115 | $jobCluster = $jobInfo->get( 'sanitize_job_cluster' ); |
116 | $scriptCluster = $this->getOption( 'cluster' ); |
117 | if ( $jobCluster != $scriptCluster ) { |
118 | $jobCluster = $jobCluster != null ? $jobCluster : "all writable clusters"; |
119 | $scriptCluster = $scriptCluster != null ? $scriptCluster : "all writable clusters"; |
120 | $this->fatalError( "Job cluster mismatch, stored job is configured to work on $jobCluster " . |
121 | "but the script is configured to run on $scriptCluster.\n" ); |
122 | } |
123 | } |
124 | |
125 | private function showJobDetail() { |
126 | $profile = $this->getSearchConfig() |
127 | ->getProfileService() |
128 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName ); |
129 | '@phan-var array $profile'; |
130 | $minLoopDuration = $profile['min_loop_duration']; |
131 | $maxJobs = $profile['max_checker_jobs']; |
132 | $maxUpdates = $profile['update_jobs_max_pressure']; |
133 | |
134 | $jobName = $this->getOption( 'job-name', 'default' ); |
135 | $jobInfo = $this->getJobInfo( $jobName ); |
136 | if ( $jobInfo === null ) { |
137 | $this->fatalError( "Unknown job $jobName, push some jobs first.\n" ); |
138 | } |
139 | $fmt = 'Y-m-d H:i:s'; |
140 | $cluster = $jobInfo->get( 'sanitize_job_cluster' ) ?: 'All writable clusters'; |
141 | |
142 | $created = date( $fmt, $jobInfo->get( 'sanitize_job_created' ) ); |
143 | $updated = date( $fmt, $jobInfo->get( 'sanitize_job_updated' ) ); |
144 | $loopStart = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) ); |
145 | |
146 | $idsSent = $jobInfo->get( 'sanitize_job_ids_sent' ); |
147 | $idsSentTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ); |
148 | |
149 | $jobsSent = $jobInfo->get( 'sanitize_job_jobs_sent' ); |
150 | $jobsSentTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ); |
151 | |
152 | $updatePressure = CheckerJob::getPressure(); |
153 | |
154 | $loopTime = time() - $jobInfo->get( 'sanitize_job_last_loop' ); |
155 | $totalTime = time() - $jobInfo->get( 'sanitize_job_created' ); |
156 | |
157 | $jobsRate = $jobInfo->get( 'sanitize_job_jobs_sent' ) / $loopTime; |
158 | $jobsPerHour = round( $jobsRate * 3600, 2 ); |
159 | $jobsPerDay = round( $jobsRate * 3600 * 24, 2 ); |
160 | $jobsRateTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ) / $totalTime; |
161 | $jobsTotalPerHour = round( $jobsRateTotal * 3600, 2 ); |
162 | $jobsTotalPerDay = round( $jobsRateTotal * 3600 * 24, 2 ); |
163 | |
164 | $idsRate = $jobInfo->get( 'sanitize_job_ids_sent' ) / $loopTime; |
165 | $idsPerHour = round( $idsRate * 3600, 2 ); |
166 | $idsPerDay = round( $idsRate * 3600 * 24, 2 ); |
167 | $idsRateTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ) / $totalTime; |
168 | $idsTotalPerHour = round( $idsRateTotal * 3600, 2 ); |
169 | $idsTotalPerDay = round( $idsRateTotal * 3600 * 24, 2 ); |
170 | |
171 | $loopId = $jobInfo->has( 'sanitize_job_loop_id' ) ? $jobInfo->get( 'sanitize_job_loop_id' ) : 0; |
172 | $idsTodo = $this->maxId - $jobInfo->get( 'sanitize_job_id_offset' ); |
173 | $loopEta = date( $fmt, time() + ( $idsTodo * $idsRate ) ); |
174 | $loopRestartMinTime = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) + $minLoopDuration ); |
175 | |
176 | $this->output( <<<EOD |
177 | JobDetail for {$jobName} |
178 | Target Wiki: {$jobInfo->get( 'sanitize_job_wiki' )} |
179 | Cluster: {$cluster} |
180 | Created: {$created} |
181 | Updated: {$updated} |
182 | Loop start: {$loopStart} |
183 | Current id: {$jobInfo->get( 'sanitize_job_id_offset' )} |
184 | Ids sent: {$idsSent} ({$idsSentTotal} total) |
185 | Jobs sent: {$jobsSent} ({$jobsSentTotal} total) |
186 | Pressure (CheckerJobs): |
187 | Cur: {$this->getPressure()} jobs |
188 | Max: {$maxJobs} jobs |
189 | Pressure (Updates): |
190 | Cur: {$updatePressure} jobs |
191 | Max: {$maxUpdates} jobs |
192 | Jobs rate: |
193 | Loop: {$jobsPerHour} jobs/hour, {$jobsPerDay} jobs/day |
194 | Total: {$jobsTotalPerHour} jobs/hour, {$jobsTotalPerDay} jobs/day |
195 | Ids rate : |
196 | Loop: {$idsPerHour} ids/hour, {$idsPerDay} ids/day |
197 | Total: {$idsTotalPerHour} ids/hour, {$idsTotalPerDay} ids/day |
198 | Loop: |
199 | Loop: {$loopId} |
200 | Todo: {$idsTodo} ids |
201 | ETA: {$loopEta} |
202 | Loop restart min time: {$loopRestartMinTime} |
203 | |
204 | EOD |
205 | ); |
206 | } |
207 | |
208 | private function pushJobs() { |
209 | $profile = $this->getSearchConfig() |
210 | ->getProfileService() |
211 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName ); |
212 | '@phan-var array $profile'; |
213 | $maxJobs = $profile['max_checker_jobs']; |
214 | if ( !$maxJobs || $maxJobs <= 0 ) { |
215 | $this->fatalError( "max_checker_jobs invalid abandonning.\n" ); |
216 | } |
217 | |
218 | $pressure = $this->getPressure(); |
219 | if ( $pressure >= $maxJobs ) { |
220 | $this->fatalError( "Too many CheckerJob: $pressure in the queue, $maxJobs allowed.\n" ); |
221 | } |
222 | $this->log( "$pressure checker job(s) in the queue.\n" ); |
223 | |
224 | $this->disablePoolCountersAndLogging(); |
225 | $this->initMetaStores(); |
226 | |
227 | $jobName = $this->getOption( 'job-name', 'default' ); |
228 | $jobInfo = $this->getJobInfo( $jobName ) ?? $this->createNewJob( $jobName ); |
229 | |
230 | $pushJobFreq = $this->getOption( 'refresh-freq', 2 * 3600 ); |
231 | $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup(); |
232 | $loop = new SaneitizeLoop( |
233 | $this->profileName, |
234 | $pushJobFreq, |
235 | $profile['jobs_chunk_size'], |
236 | $profile['min_loop_duration'], |
237 | function ( $msg, $channel ) { |
238 | $this->log( $msg, $channel ); |
239 | }, |
240 | $jobQueue ); |
241 | $jobs = $loop->run( $jobInfo, $maxJobs, $this->minId, $this->maxId ); |
242 | if ( $jobs ) { |
243 | // Some job queues implementations ignore the timestamps and |
244 | // instead run these jobs with concurrency limits to keep them |
245 | // spread over time. Insert jobs in the order we asked for them |
246 | // to be run to have some semblance of sanity. |
247 | usort( $jobs, static function ( CheckerJob $job1, CheckerJob $job2 ) { |
248 | return $job1->getReadyTimestamp() - $job2->getReleaseTimestamp(); |
249 | } ); |
250 | MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs ); |
251 | $this->updateJob( $jobInfo ); |
252 | } |
253 | } |
254 | |
255 | private function initClusters() { |
256 | $sanityCheckSetup = $this->getSearchConfig()->get( 'CirrusSearchSanityCheck' ); |
257 | if ( !$sanityCheckSetup ) { |
258 | $this->fatalError( "Sanity check disabled, abandonning...\n" ); |
259 | } |
260 | $assignment = $this->getSearchConfig()->getClusterAssignment(); |
261 | if ( $this->hasOption( 'cluster' ) ) { |
262 | $cluster = $this->getOption( 'cluster' ); |
263 | if ( $assignment->canWriteToCluster( $cluster, UpdateGroup::CHECK_SANITY ) ) { |
264 | $this->fatalError( "$cluster is not in the set of check_sanity clusters\n" ); |
265 | } |
266 | $this->clusters = [ $this->getOption( 'cluster' ) ]; |
267 | } |
268 | $this->clusters = $assignment->getWritableClusters( UpdateGroup::CHECK_SANITY ); |
269 | if ( count( $this->clusters ) === 0 ) { |
270 | $this->fatalError( 'No clusters are available...' ); |
271 | } |
272 | } |
273 | |
274 | private function initMetaStores() { |
275 | $connections = Connection::getClusterConnections( $this->clusters, $this->getSearchConfig() ); |
276 | |
277 | if ( !$connections ) { |
278 | $this->fatalError( "No available cluster found." ); |
279 | } |
280 | |
281 | $this->metaStores = []; |
282 | foreach ( $connections as $cluster => $connection ) { |
283 | $store = $this->getMetaStore( $connection ); |
284 | if ( !$store->cirrusReady() ) { |
285 | $this->fatalError( "No metastore found in cluster $cluster" ); |
286 | } |
287 | $this->metaStores[$cluster] = $store->saneitizeJobStore(); |
288 | } |
289 | } |
290 | |
291 | private function initProfile() { |
292 | $res = $this->getDB( DB_REPLICA ) |
293 | ->newSelectQueryBuilder() |
294 | ->select( [ 'min_id' => 'MIN(page_id)', 'max_id' => 'MAX(page_id)' ] ) |
295 | ->table( 'page' ) |
296 | ->caller( __METHOD__ ) |
297 | ->fetchResultSet(); |
298 | |
299 | $row = $res->fetchObject(); |
300 | $this->minId = $row->min_id; |
301 | $this->maxId = $row->max_id; |
302 | $profiles = |
303 | $this->getSearchConfig() |
304 | ->getProfileService() |
305 | ->listExposedProfiles( SearchProfileService::SANEITIZER ); |
306 | uasort( $profiles, static function ( $a, $b ) { |
307 | return $a['max_wiki_size'] <=> $b['max_wiki_size']; |
308 | } ); |
309 | $wikiSize = $this->maxId - $this->minId; |
310 | foreach ( $profiles as $name => $settings ) { |
311 | '@phan-var array $settings'; |
312 | if ( $settings['max_wiki_size'] > $wikiSize ) { |
313 | $this->profileName = $name; |
314 | $this->log( "Detected $wikiSize ids to check, selecting profile $name\n" ); |
315 | break; |
316 | } |
317 | } |
318 | if ( !$this->profileName ) { |
319 | $this->fatalError( "No profile found for $wikiSize ids, please check sanitization profiles" ); |
320 | } |
321 | } |
322 | |
323 | /** |
324 | * @param string $jobName |
325 | * @return \Elastica\Document|null |
326 | */ |
327 | private function getJobInfo( $jobName ) { |
328 | $latest = null; |
329 | // Fetch the lastest jobInfo from the metastore. Ideally all |
330 | // jobInfo should be the same but in the case a cluster has |
331 | // been decommissioned and re-added its job info may be outdated |
332 | foreach ( $this->metaStores as $store ) { |
333 | $current = $store->get( $jobName ); |
334 | if ( $current === null ) { |
335 | continue; |
336 | } |
337 | $this->checkJobClusterMismatch( $current ); |
338 | if ( $latest == null ) { |
339 | $latest = $current; |
340 | } elseif ( $current->get( 'sanitize_job_updated' ) > $latest->get( 'sanitize_job_updated' ) ) { |
341 | $latest = $current; |
342 | } |
343 | } |
344 | return $latest; |
345 | } |
346 | |
347 | /** |
348 | * @param \Elastica\Document $jobInfo |
349 | */ |
350 | private function updateJob( \Elastica\Document $jobInfo ) { |
351 | foreach ( $this->metaStores as $store ) { |
352 | $store->update( $jobInfo ); |
353 | } |
354 | } |
355 | |
356 | /** |
357 | * @param string $jobName |
358 | * @return \Elastica\Document |
359 | */ |
360 | private function createNewJob( $jobName ) { |
361 | $job = null; |
362 | $scriptCluster = $this->getOption( 'cluster' ); |
363 | foreach ( $this->metaStores as $store ) { |
364 | // TODO: It's a little awkward to let each cluster make |
365 | // it's own job, but it also seems sane to put all |
366 | // the doc building in the store? |
367 | $job = $store->create( $jobName, $this->minId, $scriptCluster ); |
368 | } |
369 | if ( $job === null ) { |
370 | $this->fatalError( "No job created, metastores failed to create?" ); |
371 | } |
372 | // @phan-suppress-next-line PhanTypeMismatchReturnNullable T240141 |
373 | return $job; |
374 | } |
375 | |
376 | private function deleteJob() { |
377 | $jobName = $this->getOption( 'job-name', 'default' ); |
378 | $jobInfo = $this->getJobInfo( $jobName ); |
379 | if ( $jobInfo === null ) { |
380 | $this->fatalError( "Unknown job $jobName" ); |
381 | } |
382 | foreach ( $this->metaStores as $cluster => $store ) { |
383 | $store->delete( $jobName ); |
384 | $this->log( "Deleted job $jobName from $cluster.\n" ); |
385 | } |
386 | } |
387 | |
388 | /** |
389 | * @return int the number of jobs in the CheckerJob queue |
390 | */ |
391 | private function getPressure() { |
392 | $queue = MediaWikiServices::getInstance()->getJobQueueGroup()->get( 'cirrusSearchCheckerJob' ); |
393 | return $queue->getSize() + $queue->getDelayedCount(); |
394 | } |
395 | |
396 | private function log( $msg, $channel = null ) { |
397 | $date = new \DateTime(); |
398 | $this->output( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $channel ); |
399 | } |
400 | |
401 | /** |
402 | * @param string $msg The error to display |
403 | * @param int $die deprecated do not use |
404 | */ |
405 | public function error( $msg, $die = 0 ) { |
406 | $date = new \DateTime(); |
407 | parent::error( $date->format( 'Y-m-d H:i:s' ) . " " . $msg ); |
408 | } |
409 | |
410 | /** |
411 | * @param string $msg The error to display |
412 | * @param int $exitCode die out using this int as the code |
413 | * @return never |
414 | */ |
415 | public function fatalError( $msg, $exitCode = 1 ) { |
416 | $date = new \DateTime(); |
417 | parent::fatalError( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $exitCode ); |
418 | } |
419 | } |
420 | |
421 | $maintClass = SaneitizeJobs::class; |
422 | require_once RUN_MAINTENANCE_IF_MAIN; |