Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 219 |
|
0.00% |
0 / 17 |
CRAP | |
0.00% |
0 / 1 |
SaneitizeJobs | |
0.00% |
0 / 212 |
|
0.00% |
0 / 17 |
2450 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 18 |
|
0.00% |
0 / 1 |
2 | |||
execute | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
init | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
checkJobClusterMismatch | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
showJobDetail | |
0.00% |
0 / 63 |
|
0.00% |
0 / 1 |
20 | |||
pushJobs | |
0.00% |
0 / 31 |
|
0.00% |
0 / 1 |
30 | |||
initClusters | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
30 | |||
initMetaStores | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
initProfile | |
0.00% |
0 / 25 |
|
0.00% |
0 / 1 |
20 | |||
getJobInfo | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
30 | |||
updateJob | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
createNewJob | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
deleteJob | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
getPressure | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
log | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
error | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
fatalError | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Maintenance; |
4 | |
5 | use CirrusSearch\Connection; |
6 | use CirrusSearch\Job\CheckerJob; |
7 | use CirrusSearch\MetaStore\MetaSaneitizeJobStore; |
8 | use CirrusSearch\Profile\SearchProfileService; |
9 | use MediaWiki\MediaWikiServices; |
10 | |
11 | /** |
12 | * Push some sanitize jobs to the JobQueue |
13 | * |
14 | * This program is free software; you can redistribute it and/or modify |
15 | * it under the terms of the GNU General Public License as published by |
16 | * the Free Software Foundation; either version 2 of the License, or |
17 | * (at your option) any later version. |
18 | * |
19 | * This program is distributed in the hope that it will be useful, |
20 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
21 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
22 | * GNU General Public License for more details. |
23 | * |
24 | * You should have received a copy of the GNU General Public License along |
25 | * with this program; if not, write to the Free Software Foundation, Inc., |
26 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
27 | * http://www.gnu.org/copyleft/gpl.html |
28 | */ |
29 | |
30 | $IP = getenv( 'MW_INSTALL_PATH' ); |
31 | if ( $IP === false ) { |
32 | $IP = __DIR__ . '/../../..'; |
33 | } |
34 | require_once "$IP/maintenance/Maintenance.php"; |
35 | require_once __DIR__ . '/../includes/Maintenance/Maintenance.php'; |
36 | |
37 | class SaneitizeJobs extends Maintenance { |
38 | /** |
39 | * @var MetaSaneitizeJobStore[] all metastores for write clusters |
40 | */ |
41 | private $metaStores; |
42 | |
43 | /** |
44 | * @var int min page id (from db) |
45 | */ |
46 | private $minId; |
47 | |
48 | /** |
49 | * @var int max page id (from db) |
50 | */ |
51 | private $maxId; |
52 | |
53 | /** |
54 | * @var string profile name |
55 | */ |
56 | private $profileName; |
57 | |
58 | /** |
59 | * @var string[] list of clusters to check |
60 | */ |
61 | private $clusters; |
62 | |
63 | public function __construct() { |
64 | parent::__construct(); |
65 | $this->addDescription( 'Manage sanitize jobs (CheckerJob). This ' . |
66 | 'script operates on all writable clusters by default. ' . |
67 | 'Add --cluster to work on a single cluster. Note that ' . |
68 | 'once a job has been pushed to a particular cluster the ' . |
69 | 'script will fail if you try to run the same job with ' . |
70 | 'different cluster options.' |
71 | ); |
72 | $this->addOption( 'push', 'Push some jobs to the job queue.' ); |
73 | $this->addOption( 'show', 'Display job info.' ); |
74 | $this->addOption( 'delete-job', 'Delete the job.' ); |
75 | $this->addOption( 'refresh-freq', 'Refresh rate in seconds this ' . |
76 | 'script is run from your crontab. This will be ' . |
77 | 'used to spread jobs over time. Defaults to 7200 (2 ' . |
78 | 'hours).', false, true ); |
79 | $this->addOption( 'job-name', 'Tells the script the name of the ' . |
80 | 'sanitize job only useful to run multiple sanitize jobs. ' . |
81 | 'Defaults to "default".', false, true ); |
82 | } |
83 | |
84 | public function execute() { |
85 | $this->init(); |
86 | if ( $this->hasOption( 'show' ) ) { |
87 | $this->showJobDetail(); |
88 | } elseif ( $this->hasOption( 'push' ) ) { |
89 | $this->pushJobs(); |
90 | } elseif ( $this->hasOption( 'delete-job' ) ) { |
91 | $this->deleteJob(); |
92 | } else { |
93 | $this->maybeHelp( true ); |
94 | } |
95 | |
96 | return true; |
97 | } |
98 | |
99 | private function init() { |
100 | $this->initClusters(); |
101 | $this->initMetaStores(); |
102 | $this->initProfile(); |
103 | } |
104 | |
105 | /** |
106 | * Basically we support two modes: |
107 | * - all writable cluster, cluster = null |
108 | * - single cluster, cluster = 'clusterName' |
109 | * If we detect a mismatch here we fail. |
110 | * @param \Elastica\Document $jobInfo check if the stored job match |
111 | * cluster config used by this script, will die if clusters mismatch |
112 | */ |
113 | private function checkJobClusterMismatch( \Elastica\Document $jobInfo ) { |
114 | $jobCluster = $jobInfo->get( 'sanitize_job_cluster' ); |
115 | $scriptCluster = $this->getOption( 'cluster' ); |
116 | if ( $jobCluster != $scriptCluster ) { |
117 | $jobCluster = $jobCluster != null ? $jobCluster : "all writable clusters"; |
118 | $scriptCluster = $scriptCluster != null ? $scriptCluster : "all writable clusters"; |
119 | $this->fatalError( "Job cluster mismatch, stored job is configured to work on $jobCluster " . |
120 | "but the script is configured to run on $scriptCluster.\n" ); |
121 | } |
122 | } |
123 | |
124 | private function showJobDetail() { |
125 | $profile = $this->getSearchConfig() |
126 | ->getProfileService() |
127 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName ); |
128 | '@phan-var array $profile'; |
129 | $minLoopDuration = $profile['min_loop_duration']; |
130 | $maxJobs = $profile['max_checker_jobs']; |
131 | $maxUpdates = $profile['update_jobs_max_pressure']; |
132 | |
133 | $jobName = $this->getOption( 'job-name', 'default' ); |
134 | $jobInfo = $this->getJobInfo( $jobName ); |
135 | if ( $jobInfo === null ) { |
136 | $this->fatalError( "Unknown job $jobName, push some jobs first.\n" ); |
137 | } |
138 | $fmt = 'Y-m-d H:i:s'; |
139 | $cluster = $jobInfo->get( 'sanitize_job_cluster' ) ?: 'All writable clusters'; |
140 | |
141 | $created = date( $fmt, $jobInfo->get( 'sanitize_job_created' ) ); |
142 | $updated = date( $fmt, $jobInfo->get( 'sanitize_job_updated' ) ); |
143 | $loopStart = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) ); |
144 | |
145 | $idsSent = $jobInfo->get( 'sanitize_job_ids_sent' ); |
146 | $idsSentTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ); |
147 | |
148 | $jobsSent = $jobInfo->get( 'sanitize_job_jobs_sent' ); |
149 | $jobsSentTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ); |
150 | |
151 | $updatePressure = CheckerJob::getPressure(); |
152 | |
153 | $loopTime = time() - $jobInfo->get( 'sanitize_job_last_loop' ); |
154 | $totalTime = time() - $jobInfo->get( 'sanitize_job_created' ); |
155 | |
156 | $jobsRate = $jobInfo->get( 'sanitize_job_jobs_sent' ) / $loopTime; |
157 | $jobsPerHour = round( $jobsRate * 3600, 2 ); |
158 | $jobsPerDay = round( $jobsRate * 3600 * 24, 2 ); |
159 | $jobsRateTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ) / $totalTime; |
160 | $jobsTotalPerHour = round( $jobsRateTotal * 3600, 2 ); |
161 | $jobsTotalPerDay = round( $jobsRateTotal * 3600 * 24, 2 ); |
162 | |
163 | $idsRate = $jobInfo->get( 'sanitize_job_ids_sent' ) / $loopTime; |
164 | $idsPerHour = round( $idsRate * 3600, 2 ); |
165 | $idsPerDay = round( $idsRate * 3600 * 24, 2 ); |
166 | $idsRateTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ) / $totalTime; |
167 | $idsTotalPerHour = round( $idsRateTotal * 3600, 2 ); |
168 | $idsTotalPerDay = round( $idsRateTotal * 3600 * 24, 2 ); |
169 | |
170 | $loopId = $jobInfo->has( 'sanitize_job_loop_id' ) ? $jobInfo->get( 'sanitize_job_loop_id' ) : 0; |
171 | $idsTodo = $this->maxId - $jobInfo->get( 'sanitize_job_id_offset' ); |
172 | $loopEta = date( $fmt, time() + ( $idsTodo * $idsRate ) ); |
173 | $loopRestartMinTime = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) + $minLoopDuration ); |
174 | |
175 | $this->output( <<<EOD |
176 | JobDetail for {$jobName} |
177 | Target Wiki: {$jobInfo->get( 'sanitize_job_wiki' )} |
178 | Cluster: {$cluster} |
179 | Created: {$created} |
180 | Updated: {$updated} |
181 | Loop start: {$loopStart} |
182 | Current id: {$jobInfo->get( 'sanitize_job_id_offset' )} |
183 | Ids sent: {$idsSent} ({$idsSentTotal} total) |
184 | Jobs sent: {$jobsSent} ({$jobsSentTotal} total) |
185 | Pressure (CheckerJobs): |
186 | Cur: {$this->getPressure()} jobs |
187 | Max: {$maxJobs} jobs |
188 | Pressure (Updates): |
189 | Cur: {$updatePressure} jobs |
190 | Max: {$maxUpdates} jobs |
191 | Jobs rate: |
192 | Loop: {$jobsPerHour} jobs/hour, {$jobsPerDay} jobs/day |
193 | Total: {$jobsTotalPerHour} jobs/hour, {$jobsTotalPerDay} jobs/day |
194 | Ids rate : |
195 | Loop: {$idsPerHour} ids/hour, {$idsPerDay} ids/day |
196 | Total: {$idsTotalPerHour} ids/hour, {$idsTotalPerDay} ids/day |
197 | Loop: |
198 | Loop: {$loopId} |
199 | Todo: {$idsTodo} ids |
200 | ETA: {$loopEta} |
201 | Loop restart min time: {$loopRestartMinTime} |
202 | |
203 | EOD |
204 | ); |
205 | } |
206 | |
207 | private function pushJobs() { |
208 | $profile = $this->getSearchConfig() |
209 | ->getProfileService() |
210 | ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName ); |
211 | '@phan-var array $profile'; |
212 | $maxJobs = $profile['max_checker_jobs']; |
213 | if ( !$maxJobs || $maxJobs <= 0 ) { |
214 | $this->fatalError( "max_checker_jobs invalid abandonning.\n" ); |
215 | } |
216 | |
217 | $pressure = $this->getPressure(); |
218 | if ( $pressure >= $maxJobs ) { |
219 | $this->fatalError( "Too many CheckerJob: $pressure in the queue, $maxJobs allowed.\n" ); |
220 | } |
221 | $this->log( "$pressure checker job(s) in the queue.\n" ); |
222 | |
223 | $this->disablePoolCountersAndLogging(); |
224 | $this->initMetaStores(); |
225 | |
226 | $jobName = $this->getOption( 'job-name', 'default' ); |
227 | $jobInfo = $this->getJobInfo( $jobName ) ?? $this->createNewJob( $jobName ); |
228 | |
229 | $pushJobFreq = $this->getOption( 'refresh-freq', 2 * 3600 ); |
230 | $loop = new SaneitizeLoop( |
231 | $this->profileName, |
232 | $pushJobFreq, |
233 | $profile['jobs_chunk_size'], |
234 | $profile['min_loop_duration'], |
235 | function ( $msg, $channel ) { |
236 | $this->log( $msg, $channel ); |
237 | } ); |
238 | $jobs = $loop->run( $jobInfo, $maxJobs, $this->minId, $this->maxId ); |
239 | if ( $jobs ) { |
240 | // Some job queues implementations ignore the timestamps and |
241 | // instead run these jobs with concurrency limits to keep them |
242 | // spread over time. Insert jobs in the order we asked for them |
243 | // to be run to have some semblance of sanity. |
244 | usort( $jobs, static function ( CheckerJob $job1, CheckerJob $job2 ) { |
245 | return $job1->getReadyTimestamp() - $job2->getReleaseTimestamp(); |
246 | } ); |
247 | MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs ); |
248 | $this->updateJob( $jobInfo ); |
249 | } |
250 | } |
251 | |
252 | private function initClusters() { |
253 | $sanityCheckSetup = $this->getSearchConfig()->get( 'CirrusSearchSanityCheck' ); |
254 | if ( !$sanityCheckSetup ) { |
255 | $this->fatalError( "Sanity check disabled, abandonning...\n" ); |
256 | } |
257 | $assignment = $this->getSearchConfig()->getClusterAssignment(); |
258 | if ( $this->hasOption( 'cluster' ) ) { |
259 | $cluster = $this->getOption( 'cluster' ); |
260 | if ( $assignment->canWriteToCluster( $cluster ) ) { |
261 | $this->fatalError( "$cluster is not in the set of writable clusters\n" ); |
262 | } |
263 | $this->clusters = [ $this->getOption( 'cluster' ) ]; |
264 | } |
265 | $this->clusters = $assignment->getWritableClusters(); |
266 | if ( count( $this->clusters ) === 0 ) { |
267 | $this->fatalError( 'No clusters are writable...' ); |
268 | } |
269 | } |
270 | |
271 | private function initMetaStores() { |
272 | $connections = Connection::getClusterConnections( $this->clusters, $this->getSearchConfig() ); |
273 | |
274 | if ( empty( $connections ) ) { |
275 | $this->fatalError( "No writable cluster found." ); |
276 | } |
277 | |
278 | $this->metaStores = []; |
279 | foreach ( $connections as $cluster => $connection ) { |
280 | $store = $this->getMetaStore( $connection ); |
281 | if ( !$store->cirrusReady() ) { |
282 | $this->fatalError( "No metastore found in cluster $cluster" ); |
283 | } |
284 | $this->metaStores[$cluster] = $store->saneitizeJobStore(); |
285 | } |
286 | } |
287 | |
288 | private function initProfile() { |
289 | $res = $this->getDB( DB_REPLICA ) |
290 | ->newSelectQueryBuilder() |
291 | ->select( [ 'min_id' => 'MIN(page_id)', 'max_id' => 'MAX(page_id)' ] ) |
292 | ->table( 'page' ) |
293 | ->caller( __METHOD__ ) |
294 | ->fetchResultSet(); |
295 | |
296 | $row = $res->fetchObject(); |
297 | $this->minId = $row->min_id; |
298 | $this->maxId = $row->max_id; |
299 | $profiles = |
300 | $this->getSearchConfig() |
301 | ->getProfileService() |
302 | ->listExposedProfiles( SearchProfileService::SANEITIZER ); |
303 | uasort( $profiles, static function ( $a, $b ) { |
304 | return $a['max_wiki_size'] <=> $b['max_wiki_size']; |
305 | } ); |
306 | $wikiSize = $this->maxId - $this->minId; |
307 | foreach ( $profiles as $name => $settings ) { |
308 | '@phan-var array $settings'; |
309 | if ( $settings['max_wiki_size'] > $wikiSize ) { |
310 | $this->profileName = $name; |
311 | $this->log( "Detected $wikiSize ids to check, selecting profile $name\n" ); |
312 | break; |
313 | } |
314 | } |
315 | if ( !$this->profileName ) { |
316 | $this->fatalError( "No profile found for $wikiSize ids, please check sanitization profiles" ); |
317 | } |
318 | } |
319 | |
320 | /** |
321 | * @param string $jobName |
322 | * @return \Elastica\Document|null |
323 | */ |
324 | private function getJobInfo( $jobName ) { |
325 | $latest = null; |
326 | // Fetch the lastest jobInfo from the metastore. Ideally all |
327 | // jobInfo should be the same but in the case a cluster has |
328 | // been decommissioned and re-added its job info may be outdated |
329 | foreach ( $this->metaStores as $store ) { |
330 | $current = $store->get( $jobName ); |
331 | if ( $current === null ) { |
332 | continue; |
333 | } |
334 | $this->checkJobClusterMismatch( $current ); |
335 | if ( $latest == null ) { |
336 | $latest = $current; |
337 | } elseif ( $current->get( 'sanitize_job_updated' ) > $latest->get( 'sanitize_job_updated' ) ) { |
338 | $latest = $current; |
339 | } |
340 | } |
341 | return $latest; |
342 | } |
343 | |
344 | /** |
345 | * @param \Elastica\Document $jobInfo |
346 | */ |
347 | private function updateJob( \Elastica\Document $jobInfo ) { |
348 | foreach ( $this->metaStores as $store ) { |
349 | $store->update( $jobInfo ); |
350 | } |
351 | } |
352 | |
353 | /** |
354 | * @param string $jobName |
355 | * @return \Elastica\Document |
356 | */ |
357 | private function createNewJob( $jobName ) { |
358 | $job = null; |
359 | $scriptCluster = $this->getOption( 'cluster' ); |
360 | foreach ( $this->metaStores as $store ) { |
361 | // TODO: It's a little awkward to let each cluster make |
362 | // it's own job, but it also seems sane to put all |
363 | // the doc building in the store? |
364 | $job = $store->create( $jobName, $this->minId, $scriptCluster ); |
365 | } |
366 | if ( $job === null ) { |
367 | $this->fatalError( "No job created, metastores failed to create?" ); |
368 | } |
369 | // @phan-suppress-next-line PhanTypeMismatchReturnNullable T240141 |
370 | return $job; |
371 | } |
372 | |
373 | private function deleteJob() { |
374 | $jobName = $this->getOption( 'job-name', 'default' ); |
375 | $jobInfo = $this->getJobInfo( $jobName ); |
376 | if ( $jobInfo === null ) { |
377 | $this->fatalError( "Unknown job $jobName" ); |
378 | } |
379 | foreach ( $this->metaStores as $cluster => $store ) { |
380 | $store->delete( $jobName ); |
381 | $this->log( "Deleted job $jobName from $cluster.\n" ); |
382 | } |
383 | } |
384 | |
385 | /** |
386 | * @return int the number of jobs in the CheckerJob queue |
387 | */ |
388 | private function getPressure() { |
389 | $queue = MediaWikiServices::getInstance()->getJobQueueGroup()->get( 'cirrusSearchCheckerJob' ); |
390 | return $queue->getSize() + $queue->getDelayedCount(); |
391 | } |
392 | |
393 | private function log( $msg, $channel = null ) { |
394 | $date = new \DateTime(); |
395 | $this->output( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $channel ); |
396 | } |
397 | |
398 | /** |
399 | * @param string $msg The error to display |
400 | * @param int $die deprecated do not use |
401 | */ |
402 | public function error( $msg, $die = 0 ) { |
403 | $date = new \DateTime(); |
404 | parent::error( $date->format( 'Y-m-d H:i:s' ) . " " . $msg ); |
405 | } |
406 | |
407 | /** |
408 | * @param string $msg The error to display |
409 | * @param int $exitCode die out using this int as the code |
410 | * @return never |
411 | */ |
412 | public function fatalError( $msg, $exitCode = 1 ) { |
413 | $date = new \DateTime(); |
414 | parent::fatalError( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $exitCode ); |
415 | } |
416 | } |
417 | |
418 | $maintClass = SaneitizeJobs::class; |
419 | require_once RUN_MAINTENANCE_IF_MAIN; |