Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 221
0.00% covered (danger)
0.00%
0 / 17
CRAP
0.00% covered (danger)
0.00%
0 / 1
SaneitizeJobs
0.00% covered (danger)
0.00%
0 / 214
0.00% covered (danger)
0.00%
0 / 17
2450
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 init
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 checkJobClusterMismatch
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 showJobDetail
0.00% covered (danger)
0.00%
0 / 63
0.00% covered (danger)
0.00%
0 / 1
20
 pushJobs
0.00% covered (danger)
0.00%
0 / 33
0.00% covered (danger)
0.00%
0 / 1
30
 initClusters
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 initMetaStores
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 initProfile
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
20
 getJobInfo
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
30
 updateJob
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 createNewJob
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 deleteJob
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 getPressure
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 log
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 error
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 fatalError
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch\Maintenance;
4
5use CirrusSearch\Connection;
6use CirrusSearch\Job\CheckerJob;
7use CirrusSearch\MetaStore\MetaSaneitizeJobStore;
8use CirrusSearch\Profile\SearchProfileService;
9use MediaWiki\MediaWikiServices;
10
11/**
12 * Push some sanitize jobs to the JobQueue
13 *
14 * This program is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * This program is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License along
25 * with this program; if not, write to the Free Software Foundation, Inc.,
26 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
27 * http://www.gnu.org/copyleft/gpl.html
28 */
29
30$IP = getenv( 'MW_INSTALL_PATH' );
31if ( $IP === false ) {
32    $IP = __DIR__ . '/../../..';
33}
34require_once "$IP/maintenance/Maintenance.php";
35require_once __DIR__ . '/../includes/Maintenance/Maintenance.php';
36
37class SaneitizeJobs extends Maintenance {
38    /**
39     * @var MetaSaneitizeJobStore[] all metastores for write clusters
40     */
41    private $metaStores;
42
43    /**
44     * @var int min page id (from db)
45     */
46    private $minId;
47
48    /**
49     * @var int max page id (from db)
50     */
51    private $maxId;
52
53    /**
54     * @var string profile name
55     */
56    private $profileName;
57
58    /**
59     * @var string[] list of clusters to check
60     */
61    private $clusters;
62
63    public function __construct() {
64        parent::__construct();
65        $this->addDescription( 'Manage sanitize jobs (CheckerJob). This ' .
66            'script operates on all writable clusters by default. ' .
67            'Add --cluster to work on a single cluster. Note that ' .
68            'once a job has been pushed to a particular cluster the ' .
69            'script will fail if you try to run the same job with ' .
70            'different cluster options.'
71        );
72        $this->addOption( 'push', 'Push some jobs to the job queue.' );
73        $this->addOption( 'show', 'Display job info.' );
74        $this->addOption( 'delete-job', 'Delete the job.' );
75        $this->addOption( 'refresh-freq', 'Refresh rate in seconds this ' .
76            'script is run from your crontab. This will be ' .
77            'used to spread jobs over time. Defaults to 7200 (2 ' .
78            'hours).', false, true );
79        $this->addOption( 'job-name', 'Tells the script the name of the ' .
80            'sanitize job only useful to run multiple sanitize jobs. ' .
81            'Defaults to "default".', false, true );
82    }
83
84    public function execute() {
85        $this->init();
86        if ( $this->hasOption( 'show' ) ) {
87            $this->showJobDetail();
88        } elseif ( $this->hasOption( 'push' ) ) {
89            $this->pushJobs();
90        } elseif ( $this->hasOption( 'delete-job' ) ) {
91            $this->deleteJob();
92        } else {
93            $this->maybeHelp( true );
94        }
95
96        return true;
97    }
98
99    private function init() {
100        $this->initClusters();
101        $this->initMetaStores();
102        $this->initProfile();
103    }
104
105    /**
106     * Basically we support two modes:
107     *   - all writable cluster, cluster = null
108     *   - single cluster, cluster = 'clusterName'
109     * If we detect a mismatch here we fail.
110     * @param \Elastica\Document $jobInfo check if the stored job match
111     * cluster config used by this script, will die if clusters mismatch
112     */
113    private function checkJobClusterMismatch( \Elastica\Document $jobInfo ) {
114        $jobCluster = $jobInfo->get( 'sanitize_job_cluster' );
115        $scriptCluster = $this->getOption( 'cluster' );
116        if ( $jobCluster != $scriptCluster ) {
117            $jobCluster = $jobCluster != null ? $jobCluster : "all writable clusters";
118            $scriptCluster = $scriptCluster != null ? $scriptCluster : "all writable clusters";
119            $this->fatalError( "Job cluster mismatch, stored job is configured to work on $jobCluster " .
120                "but the script is configured to run on $scriptCluster.\n" );
121        }
122    }
123
124    private function showJobDetail() {
125        $profile = $this->getSearchConfig()
126            ->getProfileService()
127            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName );
128        '@phan-var array $profile';
129        $minLoopDuration = $profile['min_loop_duration'];
130        $maxJobs = $profile['max_checker_jobs'];
131        $maxUpdates = $profile['update_jobs_max_pressure'];
132
133        $jobName = $this->getOption( 'job-name', 'default' );
134        $jobInfo = $this->getJobInfo( $jobName );
135        if ( $jobInfo === null ) {
136            $this->fatalError( "Unknown job $jobName, push some jobs first.\n" );
137        }
138        $fmt = 'Y-m-d H:i:s';
139        $cluster = $jobInfo->get( 'sanitize_job_cluster' ) ?: 'All writable clusters';
140
141        $created = date( $fmt, $jobInfo->get( 'sanitize_job_created' ) );
142        $updated = date( $fmt, $jobInfo->get( 'sanitize_job_updated' ) );
143        $loopStart = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) );
144
145        $idsSent = $jobInfo->get( 'sanitize_job_ids_sent' );
146        $idsSentTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' );
147
148        $jobsSent = $jobInfo->get( 'sanitize_job_jobs_sent' );
149        $jobsSentTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' );
150
151        $updatePressure = CheckerJob::getPressure();
152
153        $loopTime = time() - $jobInfo->get( 'sanitize_job_last_loop' );
154        $totalTime = time() - $jobInfo->get( 'sanitize_job_created' );
155
156        $jobsRate = $jobInfo->get( 'sanitize_job_jobs_sent' ) / $loopTime;
157        $jobsPerHour = round( $jobsRate * 3600, 2 );
158        $jobsPerDay = round( $jobsRate * 3600 * 24, 2 );
159        $jobsRateTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ) / $totalTime;
160        $jobsTotalPerHour = round( $jobsRateTotal * 3600, 2 );
161        $jobsTotalPerDay = round( $jobsRateTotal * 3600 * 24, 2 );
162
163        $idsRate = $jobInfo->get( 'sanitize_job_ids_sent' ) / $loopTime;
164        $idsPerHour = round( $idsRate * 3600, 2 );
165        $idsPerDay = round( $idsRate * 3600 * 24, 2 );
166        $idsRateTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ) / $totalTime;
167        $idsTotalPerHour = round( $idsRateTotal * 3600, 2 );
168        $idsTotalPerDay = round( $idsRateTotal * 3600 * 24, 2 );
169
170        $loopId = $jobInfo->has( 'sanitize_job_loop_id' ) ? $jobInfo->get( 'sanitize_job_loop_id' ) : 0;
171        $idsTodo = $this->maxId - $jobInfo->get( 'sanitize_job_id_offset' );
172        $loopEta = date( $fmt, time() + ( $idsTodo * $idsRate ) );
173        $loopRestartMinTime = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) + $minLoopDuration );
174
175        $this->output( <<<EOD
176JobDetail for {$jobName}
177    Target Wiki:     {$jobInfo->get( 'sanitize_job_wiki' )}
178    Cluster:     {$cluster}
179    Created:     {$created}
180    Updated:     {$updated}
181    Loop start:    {$loopStart}
182    Current id:    {$jobInfo->get( 'sanitize_job_id_offset' )}
183    Ids sent:    {$idsSent} ({$idsSentTotal} total)
184    Jobs sent:    {$jobsSent} ({$jobsSentTotal} total)
185    Pressure (CheckerJobs):
186        Cur:    {$this->getPressure()} jobs
187        Max:    {$maxJobs} jobs
188    Pressure (Updates):
189        Cur:    {$updatePressure} jobs
190        Max:    {$maxUpdates} jobs
191    Jobs rate:
192        Loop:    {$jobsPerHour} jobs/hour, {$jobsPerDay} jobs/day
193        Total:    {$jobsTotalPerHour} jobs/hour, {$jobsTotalPerDay} jobs/day
194    Ids rate :
195        Loop:    {$idsPerHour} ids/hour, {$idsPerDay} ids/day
196        Total:    {$idsTotalPerHour} ids/hour, {$idsTotalPerDay} ids/day
197    Loop:
198        Loop:    {$loopId}
199        Todo:    {$idsTodo} ids
200        ETA:    {$loopEta}
201    Loop restart min time: {$loopRestartMinTime}
202
203EOD
204        );
205    }
206
207    private function pushJobs() {
208        $profile = $this->getSearchConfig()
209            ->getProfileService()
210            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName );
211        '@phan-var array $profile';
212        $maxJobs = $profile['max_checker_jobs'];
213        if ( !$maxJobs || $maxJobs <= 0 ) {
214            $this->fatalError( "max_checker_jobs invalid abandonning.\n" );
215        }
216
217        $pressure = $this->getPressure();
218        if ( $pressure >= $maxJobs ) {
219            $this->fatalError( "Too many CheckerJob: $pressure in the queue, $maxJobs allowed.\n" );
220        }
221        $this->log( "$pressure checker job(s) in the queue.\n" );
222
223        $this->disablePoolCountersAndLogging();
224        $this->initMetaStores();
225
226        $jobName = $this->getOption( 'job-name', 'default' );
227        $jobInfo = $this->getJobInfo( $jobName ) ?? $this->createNewJob( $jobName );
228
229        $pushJobFreq = $this->getOption( 'refresh-freq', 2 * 3600 );
230        $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup();
231        $loop = new SaneitizeLoop(
232            $this->profileName,
233            $pushJobFreq,
234            $profile['jobs_chunk_size'],
235            $profile['min_loop_duration'],
236            function ( $msg, $channel ) {
237                $this->log( $msg, $channel );
238            },
239            $jobQueue );
240        $jobs = $loop->run( $jobInfo, $maxJobs, $this->minId, $this->maxId );
241        if ( $jobs ) {
242            // Some job queues implementations ignore the timestamps and
243            // instead run these jobs with concurrency limits to keep them
244            // spread over time. Insert jobs in the order we asked for them
245            // to be run to have some semblance of sanity.
246            usort( $jobs, static function ( CheckerJob $job1, CheckerJob $job2 ) {
247                return $job1->getReadyTimestamp() - $job2->getReleaseTimestamp();
248            } );
249            MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs );
250            $this->updateJob( $jobInfo );
251        }
252    }
253
254    private function initClusters() {
255        $sanityCheckSetup = $this->getSearchConfig()->get( 'CirrusSearchSanityCheck' );
256        if ( !$sanityCheckSetup ) {
257            $this->fatalError( "Sanity check disabled, abandonning...\n" );
258        }
259        $assignment = $this->getSearchConfig()->getClusterAssignment();
260        if ( $this->hasOption( 'cluster' ) ) {
261            $cluster = $this->getOption( 'cluster' );
262            if ( $assignment->canWriteToCluster( $cluster ) ) {
263                $this->fatalError( "$cluster is not in the set of writable clusters\n" );
264            }
265            $this->clusters = [ $this->getOption( 'cluster' ) ];
266        }
267        $this->clusters = $assignment->getWritableClusters();
268        if ( count( $this->clusters ) === 0 ) {
269            $this->fatalError( 'No clusters are writable...' );
270        }
271    }
272
273    private function initMetaStores() {
274        $connections = Connection::getClusterConnections( $this->clusters, $this->getSearchConfig() );
275
276        if ( !$connections ) {
277            $this->fatalError( "No writable cluster found." );
278        }
279
280        $this->metaStores = [];
281        foreach ( $connections as $cluster => $connection ) {
282            $store = $this->getMetaStore( $connection );
283            if ( !$store->cirrusReady() ) {
284                $this->fatalError( "No metastore found in cluster $cluster" );
285            }
286            $this->metaStores[$cluster] = $store->saneitizeJobStore();
287        }
288    }
289
290    private function initProfile() {
291        $res = $this->getDB( DB_REPLICA )
292            ->newSelectQueryBuilder()
293            ->select( [ 'min_id' => 'MIN(page_id)', 'max_id' => 'MAX(page_id)' ] )
294            ->table( 'page' )
295            ->caller( __METHOD__ )
296            ->fetchResultSet();
297
298        $row = $res->fetchObject();
299        $this->minId = $row->min_id;
300        $this->maxId = $row->max_id;
301        $profiles =
302            $this->getSearchConfig()
303                ->getProfileService()
304                ->listExposedProfiles( SearchProfileService::SANEITIZER );
305        uasort( $profiles, static function ( $a, $b ) {
306            return $a['max_wiki_size'] <=> $b['max_wiki_size'];
307        } );
308        $wikiSize = $this->maxId - $this->minId;
309        foreach ( $profiles as $name => $settings ) {
310            '@phan-var array $settings';
311            if ( $settings['max_wiki_size'] > $wikiSize ) {
312                $this->profileName = $name;
313                $this->log( "Detected $wikiSize ids to check, selecting profile $name\n" );
314                break;
315            }
316        }
317        if ( !$this->profileName ) {
318            $this->fatalError( "No profile found for $wikiSize ids, please check sanitization profiles" );
319        }
320    }
321
322    /**
323     * @param string $jobName
324     * @return \Elastica\Document|null
325     */
326    private function getJobInfo( $jobName ) {
327        $latest = null;
328        // Fetch the lastest jobInfo from the metastore. Ideally all
329        // jobInfo should be the same but in the case a cluster has
330        // been decommissioned and re-added its job info may be outdated
331        foreach ( $this->metaStores as $store ) {
332            $current = $store->get( $jobName );
333            if ( $current === null ) {
334                continue;
335            }
336            $this->checkJobClusterMismatch( $current );
337            if ( $latest == null ) {
338                $latest = $current;
339            } elseif ( $current->get( 'sanitize_job_updated' ) > $latest->get( 'sanitize_job_updated' ) ) {
340                $latest = $current;
341            }
342        }
343        return $latest;
344    }
345
346    /**
347     * @param \Elastica\Document $jobInfo
348     */
349    private function updateJob( \Elastica\Document $jobInfo ) {
350        foreach ( $this->metaStores as $store ) {
351            $store->update( $jobInfo );
352        }
353    }
354
355    /**
356     * @param string $jobName
357     * @return \Elastica\Document
358     */
359    private function createNewJob( $jobName ) {
360        $job = null;
361        $scriptCluster = $this->getOption( 'cluster' );
362        foreach ( $this->metaStores as $store ) {
363            // TODO: It's a little awkward to let each cluster make
364            // it's own job, but it also seems sane to put all
365            // the doc building in the store?
366            $job = $store->create( $jobName, $this->minId, $scriptCluster );
367        }
368        if ( $job === null ) {
369            $this->fatalError( "No job created, metastores failed to create?" );
370        }
371        // @phan-suppress-next-line PhanTypeMismatchReturnNullable T240141
372        return $job;
373    }
374
375    private function deleteJob() {
376        $jobName = $this->getOption( 'job-name', 'default' );
377        $jobInfo = $this->getJobInfo( $jobName );
378        if ( $jobInfo === null ) {
379            $this->fatalError( "Unknown job $jobName" );
380        }
381        foreach ( $this->metaStores as $cluster => $store ) {
382            $store->delete( $jobName );
383            $this->log( "Deleted job $jobName from $cluster.\n" );
384        }
385    }
386
387    /**
388     * @return int the number of jobs in the CheckerJob queue
389     */
390    private function getPressure() {
391        $queue = MediaWikiServices::getInstance()->getJobQueueGroup()->get( 'cirrusSearchCheckerJob' );
392        return $queue->getSize() + $queue->getDelayedCount();
393    }
394
395    private function log( $msg, $channel = null ) {
396        $date = new \DateTime();
397        $this->output( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $channel );
398    }
399
400    /**
401     * @param string $msg The error to display
402     * @param int $die deprecated do not use
403     */
404    public function error( $msg, $die = 0 ) {
405        $date = new \DateTime();
406        parent::error( $date->format( 'Y-m-d H:i:s' ) . " " . $msg );
407    }
408
409    /**
410     * @param string $msg The error to display
411     * @param int $exitCode die out using this int as the code
412     * @return never
413     */
414    public function fatalError( $msg, $exitCode = 1 ) {
415        $date = new \DateTime();
416        parent::fatalError( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $exitCode );
417    }
418}
419
420$maintClass = SaneitizeJobs::class;
421require_once RUN_MAINTENANCE_IF_MAIN;