Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 221
0.00% covered (danger)
0.00%
0 / 17
CRAP
0.00% covered (danger)
0.00%
0 / 1
SaneitizeJobs
0.00% covered (danger)
0.00%
0 / 214
0.00% covered (danger)
0.00%
0 / 17
2450
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 init
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 checkJobClusterMismatch
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 showJobDetail
0.00% covered (danger)
0.00%
0 / 63
0.00% covered (danger)
0.00%
0 / 1
20
 pushJobs
0.00% covered (danger)
0.00%
0 / 33
0.00% covered (danger)
0.00%
0 / 1
30
 initClusters
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 initMetaStores
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 initProfile
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
20
 getJobInfo
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
30
 updateJob
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 createNewJob
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 deleteJob
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 getPressure
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 log
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 error
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 fatalError
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch\Maintenance;
4
5use CirrusSearch\Connection;
6use CirrusSearch\Job\CheckerJob;
7use CirrusSearch\MetaStore\MetaSaneitizeJobStore;
8use CirrusSearch\Profile\SearchProfileService;
9use CirrusSearch\UpdateGroup;
10use MediaWiki\MediaWikiServices;
11
12/**
13 * Push some sanitize jobs to the JobQueue
14 *
15 * This program is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * This program is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License along
26 * with this program; if not, write to the Free Software Foundation, Inc.,
27 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
28 * http://www.gnu.org/copyleft/gpl.html
29 */
30
31$IP = getenv( 'MW_INSTALL_PATH' );
32if ( $IP === false ) {
33    $IP = __DIR__ . '/../../..';
34}
35require_once "$IP/maintenance/Maintenance.php";
36require_once __DIR__ . '/../includes/Maintenance/Maintenance.php';
37
38class SaneitizeJobs extends Maintenance {
39    /**
40     * @var MetaSaneitizeJobStore[] all metastores for write clusters
41     */
42    private $metaStores;
43
44    /**
45     * @var int min page id (from db)
46     */
47    private $minId;
48
49    /**
50     * @var int max page id (from db)
51     */
52    private $maxId;
53
54    /**
55     * @var string profile name
56     */
57    private $profileName;
58
59    /**
60     * @var string[] list of clusters to check
61     */
62    private $clusters;
63
64    public function __construct() {
65        parent::__construct();
66        $this->addDescription( 'Manage sanitize jobs (CheckerJob). This ' .
67            'script operates on all writable clusters by default. ' .
68            'Add --cluster to work on a single cluster. Note that ' .
69            'once a job has been pushed to a particular cluster the ' .
70            'script will fail if you try to run the same job with ' .
71            'different cluster options.'
72        );
73        $this->addOption( 'push', 'Push some jobs to the job queue.' );
74        $this->addOption( 'show', 'Display job info.' );
75        $this->addOption( 'delete-job', 'Delete the job.' );
76        $this->addOption( 'refresh-freq', 'Refresh rate in seconds this ' .
77            'script is run from your crontab. This will be ' .
78            'used to spread jobs over time. Defaults to 7200 (2 ' .
79            'hours).', false, true );
80        $this->addOption( 'job-name', 'Tells the script the name of the ' .
81            'sanitize job only useful to run multiple sanitize jobs. ' .
82            'Defaults to "default".', false, true );
83    }
84
85    public function execute() {
86        $this->init();
87        if ( $this->hasOption( 'show' ) ) {
88            $this->showJobDetail();
89        } elseif ( $this->hasOption( 'push' ) ) {
90            $this->pushJobs();
91        } elseif ( $this->hasOption( 'delete-job' ) ) {
92            $this->deleteJob();
93        } else {
94            $this->maybeHelp( true );
95        }
96
97        return true;
98    }
99
100    private function init() {
101        $this->initClusters();
102        $this->initMetaStores();
103        $this->initProfile();
104    }
105
106    /**
107     * Basically we support two modes:
108     *   - all writable cluster, cluster = null
109     *   - single cluster, cluster = 'clusterName'
110     * If we detect a mismatch here we fail.
111     * @param \Elastica\Document $jobInfo check if the stored job match
112     * cluster config used by this script, will die if clusters mismatch
113     */
114    private function checkJobClusterMismatch( \Elastica\Document $jobInfo ) {
115        $jobCluster = $jobInfo->get( 'sanitize_job_cluster' );
116        $scriptCluster = $this->getOption( 'cluster' );
117        if ( $jobCluster != $scriptCluster ) {
118            $jobCluster = $jobCluster != null ? $jobCluster : "all writable clusters";
119            $scriptCluster = $scriptCluster != null ? $scriptCluster : "all writable clusters";
120            $this->fatalError( "Job cluster mismatch, stored job is configured to work on $jobCluster " .
121                "but the script is configured to run on $scriptCluster.\n" );
122        }
123    }
124
125    private function showJobDetail() {
126        $profile = $this->getSearchConfig()
127            ->getProfileService()
128            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName );
129        '@phan-var array $profile';
130        $minLoopDuration = $profile['min_loop_duration'];
131        $maxJobs = $profile['max_checker_jobs'];
132        $maxUpdates = $profile['update_jobs_max_pressure'];
133
134        $jobName = $this->getOption( 'job-name', 'default' );
135        $jobInfo = $this->getJobInfo( $jobName );
136        if ( $jobInfo === null ) {
137            $this->fatalError( "Unknown job $jobName, push some jobs first.\n" );
138        }
139        $fmt = 'Y-m-d H:i:s';
140        $cluster = $jobInfo->get( 'sanitize_job_cluster' ) ?: 'All writable clusters';
141
142        $created = date( $fmt, $jobInfo->get( 'sanitize_job_created' ) );
143        $updated = date( $fmt, $jobInfo->get( 'sanitize_job_updated' ) );
144        $loopStart = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) );
145
146        $idsSent = $jobInfo->get( 'sanitize_job_ids_sent' );
147        $idsSentTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' );
148
149        $jobsSent = $jobInfo->get( 'sanitize_job_jobs_sent' );
150        $jobsSentTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' );
151
152        $updatePressure = CheckerJob::getPressure();
153
154        $loopTime = time() - $jobInfo->get( 'sanitize_job_last_loop' );
155        $totalTime = time() - $jobInfo->get( 'sanitize_job_created' );
156
157        $jobsRate = $jobInfo->get( 'sanitize_job_jobs_sent' ) / $loopTime;
158        $jobsPerHour = round( $jobsRate * 3600, 2 );
159        $jobsPerDay = round( $jobsRate * 3600 * 24, 2 );
160        $jobsRateTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ) / $totalTime;
161        $jobsTotalPerHour = round( $jobsRateTotal * 3600, 2 );
162        $jobsTotalPerDay = round( $jobsRateTotal * 3600 * 24, 2 );
163
164        $idsRate = $jobInfo->get( 'sanitize_job_ids_sent' ) / $loopTime;
165        $idsPerHour = round( $idsRate * 3600, 2 );
166        $idsPerDay = round( $idsRate * 3600 * 24, 2 );
167        $idsRateTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ) / $totalTime;
168        $idsTotalPerHour = round( $idsRateTotal * 3600, 2 );
169        $idsTotalPerDay = round( $idsRateTotal * 3600 * 24, 2 );
170
171        $loopId = $jobInfo->has( 'sanitize_job_loop_id' ) ? $jobInfo->get( 'sanitize_job_loop_id' ) : 0;
172        $idsTodo = $this->maxId - $jobInfo->get( 'sanitize_job_id_offset' );
173        $loopEta = date( $fmt, time() + ( $idsTodo * $idsRate ) );
174        $loopRestartMinTime = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) + $minLoopDuration );
175
176        $this->output( <<<EOD
177JobDetail for {$jobName}
178    Target Wiki:     {$jobInfo->get( 'sanitize_job_wiki' )}
179    Cluster:     {$cluster}
180    Created:     {$created}
181    Updated:     {$updated}
182    Loop start:    {$loopStart}
183    Current id:    {$jobInfo->get( 'sanitize_job_id_offset' )}
184    Ids sent:    {$idsSent} ({$idsSentTotal} total)
185    Jobs sent:    {$jobsSent} ({$jobsSentTotal} total)
186    Pressure (CheckerJobs):
187        Cur:    {$this->getPressure()} jobs
188        Max:    {$maxJobs} jobs
189    Pressure (Updates):
190        Cur:    {$updatePressure} jobs
191        Max:    {$maxUpdates} jobs
192    Jobs rate:
193        Loop:    {$jobsPerHour} jobs/hour, {$jobsPerDay} jobs/day
194        Total:    {$jobsTotalPerHour} jobs/hour, {$jobsTotalPerDay} jobs/day
195    Ids rate :
196        Loop:    {$idsPerHour} ids/hour, {$idsPerDay} ids/day
197        Total:    {$idsTotalPerHour} ids/hour, {$idsTotalPerDay} ids/day
198    Loop:
199        Loop:    {$loopId}
200        Todo:    {$idsTodo} ids
201        ETA:    {$loopEta}
202    Loop restart min time: {$loopRestartMinTime}
203
204EOD
205        );
206    }
207
208    private function pushJobs() {
209        $profile = $this->getSearchConfig()
210            ->getProfileService()
211            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName );
212        '@phan-var array $profile';
213        $maxJobs = $profile['max_checker_jobs'];
214        if ( !$maxJobs || $maxJobs <= 0 ) {
215            $this->fatalError( "max_checker_jobs invalid abandonning.\n" );
216        }
217
218        $pressure = $this->getPressure();
219        if ( $pressure >= $maxJobs ) {
220            $this->fatalError( "Too many CheckerJob: $pressure in the queue, $maxJobs allowed.\n" );
221        }
222        $this->log( "$pressure checker job(s) in the queue.\n" );
223
224        $this->disablePoolCountersAndLogging();
225        $this->initMetaStores();
226
227        $jobName = $this->getOption( 'job-name', 'default' );
228        $jobInfo = $this->getJobInfo( $jobName ) ?? $this->createNewJob( $jobName );
229
230        $pushJobFreq = $this->getOption( 'refresh-freq', 2 * 3600 );
231        $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup();
232        $loop = new SaneitizeLoop(
233            $this->profileName,
234            $pushJobFreq,
235            $profile['jobs_chunk_size'],
236            $profile['min_loop_duration'],
237            function ( $msg, $channel ) {
238                $this->log( $msg, $channel );
239            },
240            $jobQueue );
241        $jobs = $loop->run( $jobInfo, $maxJobs, $this->minId, $this->maxId );
242        if ( $jobs ) {
243            // Some job queues implementations ignore the timestamps and
244            // instead run these jobs with concurrency limits to keep them
245            // spread over time. Insert jobs in the order we asked for them
246            // to be run to have some semblance of sanity.
247            usort( $jobs, static function ( CheckerJob $job1, CheckerJob $job2 ) {
248                return $job1->getReadyTimestamp() - $job2->getReleaseTimestamp();
249            } );
250            MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs );
251            $this->updateJob( $jobInfo );
252        }
253    }
254
255    private function initClusters() {
256        $sanityCheckSetup = $this->getSearchConfig()->get( 'CirrusSearchSanityCheck' );
257        if ( !$sanityCheckSetup ) {
258            $this->fatalError( "Sanity check disabled, abandonning...\n" );
259        }
260        $assignment = $this->getSearchConfig()->getClusterAssignment();
261        if ( $this->hasOption( 'cluster' ) ) {
262            $cluster = $this->getOption( 'cluster' );
263            if ( $assignment->canWriteToCluster( $cluster, UpdateGroup::CHECK_SANITY ) ) {
264                $this->fatalError( "$cluster is not in the set of check_sanity clusters\n" );
265            }
266            $this->clusters = [ $this->getOption( 'cluster' ) ];
267        }
268        $this->clusters = $assignment->getWritableClusters( UpdateGroup::CHECK_SANITY );
269        if ( count( $this->clusters ) === 0 ) {
270            $this->fatalError( 'No clusters are available...' );
271        }
272    }
273
274    private function initMetaStores() {
275        $connections = Connection::getClusterConnections( $this->clusters, $this->getSearchConfig() );
276
277        if ( !$connections ) {
278            $this->fatalError( "No available cluster found." );
279        }
280
281        $this->metaStores = [];
282        foreach ( $connections as $cluster => $connection ) {
283            $store = $this->getMetaStore( $connection );
284            if ( !$store->cirrusReady() ) {
285                $this->fatalError( "No metastore found in cluster $cluster" );
286            }
287            $this->metaStores[$cluster] = $store->saneitizeJobStore();
288        }
289    }
290
291    private function initProfile() {
292        $res = $this->getDB( DB_REPLICA )
293            ->newSelectQueryBuilder()
294            ->select( [ 'min_id' => 'MIN(page_id)', 'max_id' => 'MAX(page_id)' ] )
295            ->table( 'page' )
296            ->caller( __METHOD__ )
297            ->fetchResultSet();
298
299        $row = $res->fetchObject();
300        $this->minId = $row->min_id;
301        $this->maxId = $row->max_id;
302        $profiles =
303            $this->getSearchConfig()
304                ->getProfileService()
305                ->listExposedProfiles( SearchProfileService::SANEITIZER );
306        uasort( $profiles, static function ( $a, $b ) {
307            return $a['max_wiki_size'] <=> $b['max_wiki_size'];
308        } );
309        $wikiSize = $this->maxId - $this->minId;
310        foreach ( $profiles as $name => $settings ) {
311            '@phan-var array $settings';
312            if ( $settings['max_wiki_size'] > $wikiSize ) {
313                $this->profileName = $name;
314                $this->log( "Detected $wikiSize ids to check, selecting profile $name\n" );
315                break;
316            }
317        }
318        if ( !$this->profileName ) {
319            $this->fatalError( "No profile found for $wikiSize ids, please check sanitization profiles" );
320        }
321    }
322
323    /**
324     * @param string $jobName
325     * @return \Elastica\Document|null
326     */
327    private function getJobInfo( $jobName ) {
328        $latest = null;
329        // Fetch the lastest jobInfo from the metastore. Ideally all
330        // jobInfo should be the same but in the case a cluster has
331        // been decommissioned and re-added its job info may be outdated
332        foreach ( $this->metaStores as $store ) {
333            $current = $store->get( $jobName );
334            if ( $current === null ) {
335                continue;
336            }
337            $this->checkJobClusterMismatch( $current );
338            if ( $latest == null ) {
339                $latest = $current;
340            } elseif ( $current->get( 'sanitize_job_updated' ) > $latest->get( 'sanitize_job_updated' ) ) {
341                $latest = $current;
342            }
343        }
344        return $latest;
345    }
346
347    /**
348     * @param \Elastica\Document $jobInfo
349     */
350    private function updateJob( \Elastica\Document $jobInfo ) {
351        foreach ( $this->metaStores as $store ) {
352            $store->update( $jobInfo );
353        }
354    }
355
356    /**
357     * @param string $jobName
358     * @return \Elastica\Document
359     */
360    private function createNewJob( $jobName ) {
361        $job = null;
362        $scriptCluster = $this->getOption( 'cluster' );
363        foreach ( $this->metaStores as $store ) {
364            // TODO: It's a little awkward to let each cluster make
365            // it's own job, but it also seems sane to put all
366            // the doc building in the store?
367            $job = $store->create( $jobName, $this->minId, $scriptCluster );
368        }
369        if ( $job === null ) {
370            $this->fatalError( "No job created, metastores failed to create?" );
371        }
372        // @phan-suppress-next-line PhanTypeMismatchReturnNullable T240141
373        return $job;
374    }
375
376    private function deleteJob() {
377        $jobName = $this->getOption( 'job-name', 'default' );
378        $jobInfo = $this->getJobInfo( $jobName );
379        if ( $jobInfo === null ) {
380            $this->fatalError( "Unknown job $jobName" );
381        }
382        foreach ( $this->metaStores as $cluster => $store ) {
383            $store->delete( $jobName );
384            $this->log( "Deleted job $jobName from $cluster.\n" );
385        }
386    }
387
388    /**
389     * @return int the number of jobs in the CheckerJob queue
390     */
391    private function getPressure() {
392        $queue = MediaWikiServices::getInstance()->getJobQueueGroup()->get( 'cirrusSearchCheckerJob' );
393        return $queue->getSize() + $queue->getDelayedCount();
394    }
395
396    private function log( $msg, $channel = null ) {
397        $date = new \DateTime();
398        $this->output( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $channel );
399    }
400
401    /**
402     * @param string $msg The error to display
403     * @param int $die deprecated do not use
404     */
405    public function error( $msg, $die = 0 ) {
406        $date = new \DateTime();
407        parent::error( $date->format( 'Y-m-d H:i:s' ) . " " . $msg );
408    }
409
410    /**
411     * @param string $msg The error to display
412     * @param int $exitCode die out using this int as the code
413     * @return never
414     */
415    public function fatalError( $msg, $exitCode = 1 ) {
416        $date = new \DateTime();
417        parent::fatalError( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $exitCode );
418    }
419}
420
421$maintClass = SaneitizeJobs::class;
422require_once RUN_MAINTENANCE_IF_MAIN;