Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 221 |
|
0.00% |
0 / 17 |
CRAP | |
0.00% |
0 / 1 |
SaneitizeJobs | |
0.00% |
0 / 214 |
|
0.00% |
0 / 17 |
2450 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
2 | |||
execute | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
init | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
checkJobClusterMismatch | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
showJobDetail | |
0.00% |
0 / 63 |
|
0.00% |
0 / 1 |
20 | |||
pushJobs | |
0.00% |
0 / 33 |
|
0.00% |
0 / 1 |
30 | |||
initClusters | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
initMetaStores | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
initProfile | |
0.00% |
0 / 25 |
|
0.00% |
0 / 1 |
20 | |||
getJobInfo | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
30 | |||
updateJob | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
createNewJob | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
deleteJob | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
getPressure | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
log | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
error | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
fatalError | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Maintenance; |
4 | |
5 | use CirrusSearch\Connection; |
6 | use CirrusSearch\Job\CheckerJob; |
7 | use CirrusSearch\MetaStore\MetaSaneitizeJobStore; |
8 | use CirrusSearch\Profile\SearchProfileService; |
9 | use MediaWiki\MediaWikiServices; |
10 | |
11 | /** |
12 | * Push some sanitize jobs to the JobQueue |
13 | * |
14 | * This program is free software; you can redistribute it and/or modify |
15 | * it under the terms of the GNU General Public License as published by |
16 | * the Free Software Foundation; either version 2 of the License, or |
17 | * (at your option) any later version. |
18 | * |
19 | * This program is distributed in the hope that it will be useful, |
20 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
21 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
22 | * GNU General Public License for more details. |
23 | * |
24 | * You should have received a copy of the GNU General Public License along |
25 | * with this program; if not, write to the Free Software Foundation, Inc., |
26 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
27 | * http://www.gnu.org/copyleft/gpl.html |
28 | */ |
29 | |
30 | $IP = getenv( 'MW_INSTALL_PATH' ); |
31 | if ( $IP === false ) { |
32 | $IP = __DIR__ . '/../../..'; |
33 | } |
34 | require_once "$IP/maintenance/Maintenance.php"; |
35 | require_once __DIR__ . '/../includes/Maintenance/Maintenance.php'; |
36 | |
37 | class SaneitizeJobs extends Maintenance { |
38 | /** |
39 | * @var MetaSaneitizeJobStore[] all metastores for write clusters |
40 | */ |
41 | private $metaStores; |
42 | |
43 | /** |
44 | * @var int min page id (from db) |
45 | */ |
46 | private $minId; |
47 | |
48 | /** |
49 | * @var int max page id (from db) |
50 | */ |
51 | private $maxId; |
52 | |
53 | /** |
54 | * @var string profile name |
55 | */ |
56 | private $profileName; |
57 | |
58 | /** |
59 | * @var string[] list of clusters to check |
60 | */ |
61 | private $clusters; |
62 | |
63 | public function __construct() { |
64 | parent::__construct(); |
65 | $this->addDescription( 'Manage sanitize jobs (CheckerJob). This ' . |
66 | 'script operates on all writable clusters by default. ' . |
67 | 'Add --cluster to work on a single cluster. Note that ' . |
68 | 'once a job has been pushed to a particular cluster the ' . |
69 | 'script will fail if you try to run the same job with ' . |
70 | 'different cluster options.' |
71 | ); |
72 | $this->addOption( 'push', 'Push some jobs to the job queue.' ); |
73 | $this->addOption( 'show', 'Display job info.' ); |
74 | $this->addOption( 'delete-job', 'Delete the job.' ); |
75 | $this->addOption( 'refresh-freq', 'Refresh rate in seconds this ' . |
76 | 'script is run from your crontab. This will be ' . |
77 | 'used to spread jobs over time. Defaults to 7200 (2 ' . |
78 | 'hours).', false, true ); |
79 | $this->addOption( 'job-name', 'Tells the script the name of the ' . |
80 | 'sanitize job only useful to run multiple sanitize jobs. ' . |
81 | 'Defaults to "default".', false, true ); |
82 | } |
83 | |
84 | public function execute() { |
85 | $this->init(); |
86 | if ( $this->hasOption( 'show' ) ) { |
87 | $this->showJobDetail(); |
88 | } elseif ( $this->hasOption( 'push' ) ) { |
89 | $this->pushJobs(); |
90 | } elseif ( $this->hasOption( 'delete-job' ) ) { |
91 | $this->deleteJob(); |
92 | } else { |
93 | $this->maybeHelp( true ); |
94 | } |
95 | |
96 | return true; |
97 | } |
98 | |
99 | private function init() { |
100 | $this->initClusters(); |
101 | $this->initMetaStores(); |
102 | $this->initProfile(); |
103 | } |
104 | |
105 | /** |
106 | * Basically we support two modes: |
107 | * - all writable cluster, cluster = null |
108 | * - single cluster, cluster = 'clusterName' |
109 | * If we detect a mismatch here we fail. |
110 | * @param \Elastica\Document $jobInfo check if the stored job match |
111 | * cluster config used by this script, will die if clusters mismatch |
112 | */ |
113 | private function checkJobClusterMismatch( \Elastica\Document $jobInfo ) { |
114 | $jobCluster = $jobInfo->get( 'sanitize_job_cluster' ); |
115 | $scriptCluster = $this->getOption( 'cluster' ); |
116 | if ( $jobCluster != $scriptCluster ) { |
117 | $jobCluster = $jobCluster != null ? $jobCluster : "all writable clusters"; |
118 | $scriptCluster = $scriptCluster != null ? $scriptCluster : "all writable clusters"; |
119 | $this->fatalError( "Job cluster mismatch, stored job is configured to work on $jobCluster " . |
120 | "but the script is configured to run on $scriptCluster.\n" ); |
121 | } |
122 | } |
123 | |
124 | private function showJobDetail() { |
125 | $profile = $this->getSearchConfig() |
126 | ->getProfileService() |
127 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName ); |
128 | '@phan-var array $profile'; |
129 | $minLoopDuration = $profile['min_loop_duration']; |
130 | $maxJobs = $profile['max_checker_jobs']; |
131 | $maxUpdates = $profile['update_jobs_max_pressure']; |
132 | |
133 | $jobName = $this->getOption( 'job-name', 'default' ); |
134 | $jobInfo = $this->getJobInfo( $jobName ); |
135 | if ( $jobInfo === null ) { |
136 | $this->fatalError( "Unknown job $jobName, push some jobs first.\n" ); |
137 | } |
138 | $fmt = 'Y-m-d H:i:s'; |
139 | $cluster = $jobInfo->get( 'sanitize_job_cluster' ) ?: 'All writable clusters'; |
140 | |
141 | $created = date( $fmt, $jobInfo->get( 'sanitize_job_created' ) ); |
142 | $updated = date( $fmt, $jobInfo->get( 'sanitize_job_updated' ) ); |
143 | $loopStart = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) ); |
144 | |
145 | $idsSent = $jobInfo->get( 'sanitize_job_ids_sent' ); |
146 | $idsSentTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ); |
147 | |
148 | $jobsSent = $jobInfo->get( 'sanitize_job_jobs_sent' ); |
149 | $jobsSentTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ); |
150 | |
151 | $updatePressure = CheckerJob::getPressure(); |
152 | |
153 | $loopTime = time() - $jobInfo->get( 'sanitize_job_last_loop' ); |
154 | $totalTime = time() - $jobInfo->get( 'sanitize_job_created' ); |
155 | |
156 | $jobsRate = $jobInfo->get( 'sanitize_job_jobs_sent' ) / $loopTime; |
157 | $jobsPerHour = round( $jobsRate * 3600, 2 ); |
158 | $jobsPerDay = round( $jobsRate * 3600 * 24, 2 ); |
159 | $jobsRateTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ) / $totalTime; |
160 | $jobsTotalPerHour = round( $jobsRateTotal * 3600, 2 ); |
161 | $jobsTotalPerDay = round( $jobsRateTotal * 3600 * 24, 2 ); |
162 | |
163 | $idsRate = $jobInfo->get( 'sanitize_job_ids_sent' ) / $loopTime; |
164 | $idsPerHour = round( $idsRate * 3600, 2 ); |
165 | $idsPerDay = round( $idsRate * 3600 * 24, 2 ); |
166 | $idsRateTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ) / $totalTime; |
167 | $idsTotalPerHour = round( $idsRateTotal * 3600, 2 ); |
168 | $idsTotalPerDay = round( $idsRateTotal * 3600 * 24, 2 ); |
169 | |
170 | $loopId = $jobInfo->has( 'sanitize_job_loop_id' ) ? $jobInfo->get( 'sanitize_job_loop_id' ) : 0; |
171 | $idsTodo = $this->maxId - $jobInfo->get( 'sanitize_job_id_offset' ); |
172 | $loopEta = date( $fmt, time() + ( $idsTodo * $idsRate ) ); |
173 | $loopRestartMinTime = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) + $minLoopDuration ); |
174 | |
175 | $this->output( <<<EOD |
176 | JobDetail for {$jobName} |
177 | Target Wiki: {$jobInfo->get( 'sanitize_job_wiki' )} |
178 | Cluster: {$cluster} |
179 | Created: {$created} |
180 | Updated: {$updated} |
181 | Loop start: {$loopStart} |
182 | Current id: {$jobInfo->get( 'sanitize_job_id_offset' )} |
183 | Ids sent: {$idsSent} ({$idsSentTotal} total) |
184 | Jobs sent: {$jobsSent} ({$jobsSentTotal} total) |
185 | Pressure (CheckerJobs): |
186 | Cur: {$this->getPressure()} jobs |
187 | Max: {$maxJobs} jobs |
188 | Pressure (Updates): |
189 | Cur: {$updatePressure} jobs |
190 | Max: {$maxUpdates} jobs |
191 | Jobs rate: |
192 | Loop: {$jobsPerHour} jobs/hour, {$jobsPerDay} jobs/day |
193 | Total: {$jobsTotalPerHour} jobs/hour, {$jobsTotalPerDay} jobs/day |
194 | Ids rate : |
195 | Loop: {$idsPerHour} ids/hour, {$idsPerDay} ids/day |
196 | Total: {$idsTotalPerHour} ids/hour, {$idsTotalPerDay} ids/day |
197 | Loop: |
198 | Loop: {$loopId} |
199 | Todo: {$idsTodo} ids |
200 | ETA: {$loopEta} |
201 | Loop restart min time: {$loopRestartMinTime} |
202 | |
203 | EOD |
204 | ); |
205 | } |
206 | |
207 | private function pushJobs() { |
208 | $profile = $this->getSearchConfig() |
209 | ->getProfileService() |
210 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName ); |
211 | '@phan-var array $profile'; |
212 | $maxJobs = $profile['max_checker_jobs']; |
213 | if ( !$maxJobs || $maxJobs <= 0 ) { |
214 | $this->fatalError( "max_checker_jobs invalid abandonning.\n" ); |
215 | } |
216 | |
217 | $pressure = $this->getPressure(); |
218 | if ( $pressure >= $maxJobs ) { |
219 | $this->fatalError( "Too many CheckerJob: $pressure in the queue, $maxJobs allowed.\n" ); |
220 | } |
221 | $this->log( "$pressure checker job(s) in the queue.\n" ); |
222 | |
223 | $this->disablePoolCountersAndLogging(); |
224 | $this->initMetaStores(); |
225 | |
226 | $jobName = $this->getOption( 'job-name', 'default' ); |
227 | $jobInfo = $this->getJobInfo( $jobName ) ?? $this->createNewJob( $jobName ); |
228 | |
229 | $pushJobFreq = $this->getOption( 'refresh-freq', 2 * 3600 ); |
230 | $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup(); |
231 | $loop = new SaneitizeLoop( |
232 | $this->profileName, |
233 | $pushJobFreq, |
234 | $profile['jobs_chunk_size'], |
235 | $profile['min_loop_duration'], |
236 | function ( $msg, $channel ) { |
237 | $this->log( $msg, $channel ); |
238 | }, |
239 | $jobQueue ); |
240 | $jobs = $loop->run( $jobInfo, $maxJobs, $this->minId, $this->maxId ); |
241 | if ( $jobs ) { |
242 | // Some job queues implementations ignore the timestamps and |
243 | // instead run these jobs with concurrency limits to keep them |
244 | // spread over time. Insert jobs in the order we asked for them |
245 | // to be run to have some semblance of sanity. |
246 | usort( $jobs, static function ( CheckerJob $job1, CheckerJob $job2 ) { |
247 | return $job1->getReadyTimestamp() - $job2->getReleaseTimestamp(); |
248 | } ); |
249 | MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs ); |
250 | $this->updateJob( $jobInfo ); |
251 | } |
252 | } |
253 | |
254 | private function initClusters() { |
255 | $sanityCheckSetup = $this->getSearchConfig()->get( 'CirrusSearchSanityCheck' ); |
256 | if ( !$sanityCheckSetup ) { |
257 | $this->fatalError( "Sanity check disabled, abandonning...\n" ); |
258 | } |
259 | $assignment = $this->getSearchConfig()->getClusterAssignment(); |
260 | if ( $this->hasOption( 'cluster' ) ) { |
261 | $cluster = $this->getOption( 'cluster' ); |
262 | if ( $assignment->canWriteToCluster( $cluster ) ) { |
263 | $this->fatalError( "$cluster is not in the set of writable clusters\n" ); |
264 | } |
265 | $this->clusters = [ $this->getOption( 'cluster' ) ]; |
266 | } |
267 | $this->clusters = $assignment->getWritableClusters(); |
268 | if ( count( $this->clusters ) === 0 ) { |
269 | $this->fatalError( 'No clusters are writable...' ); |
270 | } |
271 | } |
272 | |
273 | private function initMetaStores() { |
274 | $connections = Connection::getClusterConnections( $this->clusters, $this->getSearchConfig() ); |
275 | |
276 | if ( !$connections ) { |
277 | $this->fatalError( "No writable cluster found." ); |
278 | } |
279 | |
280 | $this->metaStores = []; |
281 | foreach ( $connections as $cluster => $connection ) { |
282 | $store = $this->getMetaStore( $connection ); |
283 | if ( !$store->cirrusReady() ) { |
284 | $this->fatalError( "No metastore found in cluster $cluster" ); |
285 | } |
286 | $this->metaStores[$cluster] = $store->saneitizeJobStore(); |
287 | } |
288 | } |
289 | |
290 | private function initProfile() { |
291 | $res = $this->getDB( DB_REPLICA ) |
292 | ->newSelectQueryBuilder() |
293 | ->select( [ 'min_id' => 'MIN(page_id)', 'max_id' => 'MAX(page_id)' ] ) |
294 | ->table( 'page' ) |
295 | ->caller( __METHOD__ ) |
296 | ->fetchResultSet(); |
297 | |
298 | $row = $res->fetchObject(); |
299 | $this->minId = $row->min_id; |
300 | $this->maxId = $row->max_id; |
301 | $profiles = |
302 | $this->getSearchConfig() |
303 | ->getProfileService() |
304 | ->listExposedProfiles( SearchProfileService::SANEITIZER ); |
305 | uasort( $profiles, static function ( $a, $b ) { |
306 | return $a['max_wiki_size'] <=> $b['max_wiki_size']; |
307 | } ); |
308 | $wikiSize = $this->maxId - $this->minId; |
309 | foreach ( $profiles as $name => $settings ) { |
310 | '@phan-var array $settings'; |
311 | if ( $settings['max_wiki_size'] > $wikiSize ) { |
312 | $this->profileName = $name; |
313 | $this->log( "Detected $wikiSize ids to check, selecting profile $name\n" ); |
314 | break; |
315 | } |
316 | } |
317 | if ( !$this->profileName ) { |
318 | $this->fatalError( "No profile found for $wikiSize ids, please check sanitization profiles" ); |
319 | } |
320 | } |
321 | |
322 | /** |
323 | * @param string $jobName |
324 | * @return \Elastica\Document|null |
325 | */ |
326 | private function getJobInfo( $jobName ) { |
327 | $latest = null; |
328 | // Fetch the lastest jobInfo from the metastore. Ideally all |
329 | // jobInfo should be the same but in the case a cluster has |
330 | // been decommissioned and re-added its job info may be outdated |
331 | foreach ( $this->metaStores as $store ) { |
332 | $current = $store->get( $jobName ); |
333 | if ( $current === null ) { |
334 | continue; |
335 | } |
336 | $this->checkJobClusterMismatch( $current ); |
337 | if ( $latest == null ) { |
338 | $latest = $current; |
339 | } elseif ( $current->get( 'sanitize_job_updated' ) > $latest->get( 'sanitize_job_updated' ) ) { |
340 | $latest = $current; |
341 | } |
342 | } |
343 | return $latest; |
344 | } |
345 | |
346 | /** |
347 | * @param \Elastica\Document $jobInfo |
348 | */ |
349 | private function updateJob( \Elastica\Document $jobInfo ) { |
350 | foreach ( $this->metaStores as $store ) { |
351 | $store->update( $jobInfo ); |
352 | } |
353 | } |
354 | |
355 | /** |
356 | * @param string $jobName |
357 | * @return \Elastica\Document |
358 | */ |
359 | private function createNewJob( $jobName ) { |
360 | $job = null; |
361 | $scriptCluster = $this->getOption( 'cluster' ); |
362 | foreach ( $this->metaStores as $store ) { |
363 | // TODO: It's a little awkward to let each cluster make |
364 | // it's own job, but it also seems sane to put all |
365 | // the doc building in the store? |
366 | $job = $store->create( $jobName, $this->minId, $scriptCluster ); |
367 | } |
368 | if ( $job === null ) { |
369 | $this->fatalError( "No job created, metastores failed to create?" ); |
370 | } |
371 | // @phan-suppress-next-line PhanTypeMismatchReturnNullable T240141 |
372 | return $job; |
373 | } |
374 | |
375 | private function deleteJob() { |
376 | $jobName = $this->getOption( 'job-name', 'default' ); |
377 | $jobInfo = $this->getJobInfo( $jobName ); |
378 | if ( $jobInfo === null ) { |
379 | $this->fatalError( "Unknown job $jobName" ); |
380 | } |
381 | foreach ( $this->metaStores as $cluster => $store ) { |
382 | $store->delete( $jobName ); |
383 | $this->log( "Deleted job $jobName from $cluster.\n" ); |
384 | } |
385 | } |
386 | |
387 | /** |
388 | * @return int the number of jobs in the CheckerJob queue |
389 | */ |
390 | private function getPressure() { |
391 | $queue = MediaWikiServices::getInstance()->getJobQueueGroup()->get( 'cirrusSearchCheckerJob' ); |
392 | return $queue->getSize() + $queue->getDelayedCount(); |
393 | } |
394 | |
395 | private function log( $msg, $channel = null ) { |
396 | $date = new \DateTime(); |
397 | $this->output( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $channel ); |
398 | } |
399 | |
400 | /** |
401 | * @param string $msg The error to display |
402 | * @param int $die deprecated do not use |
403 | */ |
404 | public function error( $msg, $die = 0 ) { |
405 | $date = new \DateTime(); |
406 | parent::error( $date->format( 'Y-m-d H:i:s' ) . " " . $msg ); |
407 | } |
408 | |
409 | /** |
410 | * @param string $msg The error to display |
411 | * @param int $exitCode die out using this int as the code |
412 | * @return never |
413 | */ |
414 | public function fatalError( $msg, $exitCode = 1 ) { |
415 | $date = new \DateTime(); |
416 | parent::fatalError( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $exitCode ); |
417 | } |
418 | } |
419 | |
420 | $maintClass = SaneitizeJobs::class; |
421 | require_once RUN_MAINTENANCE_IF_MAIN; |