Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 219
0.00% covered (danger)
0.00%
0 / 17
CRAP
0.00% covered (danger)
0.00%
0 / 1
SaneitizeJobs
0.00% covered (danger)
0.00%
0 / 212
0.00% covered (danger)
0.00%
0 / 17
2450
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 init
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 checkJobClusterMismatch
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 showJobDetail
0.00% covered (danger)
0.00%
0 / 63
0.00% covered (danger)
0.00%
0 / 1
20
 pushJobs
0.00% covered (danger)
0.00%
0 / 31
0.00% covered (danger)
0.00%
0 / 1
30
 initClusters
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
30
 initMetaStores
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 initProfile
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
20
 getJobInfo
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
30
 updateJob
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 createNewJob
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 deleteJob
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 getPressure
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 log
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 error
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 fatalError
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch\Maintenance;
4
5use CirrusSearch\Connection;
6use CirrusSearch\Job\CheckerJob;
7use CirrusSearch\MetaStore\MetaSaneitizeJobStore;
8use CirrusSearch\Profile\SearchProfileService;
9use MediaWiki\MediaWikiServices;
10
11/**
12 * Push some sanitize jobs to the JobQueue
13 *
14 * This program is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * This program is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License along
25 * with this program; if not, write to the Free Software Foundation, Inc.,
26 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
27 * http://www.gnu.org/copyleft/gpl.html
28 */
29
30$IP = getenv( 'MW_INSTALL_PATH' );
31if ( $IP === false ) {
32    $IP = __DIR__ . '/../../..';
33}
34require_once "$IP/maintenance/Maintenance.php";
35require_once __DIR__ . '/../includes/Maintenance/Maintenance.php';
36
37class SaneitizeJobs extends Maintenance {
38    /**
39     * @var MetaSaneitizeJobStore[] all metastores for write clusters
40     */
41    private $metaStores;
42
43    /**
44     * @var int min page id (from db)
45     */
46    private $minId;
47
48    /**
49     * @var int max page id (from db)
50     */
51    private $maxId;
52
53    /**
54     * @var string profile name
55     */
56    private $profileName;
57
58    /**
59     * @var string[] list of clusters to check
60     */
61    private $clusters;
62
63    public function __construct() {
64        parent::__construct();
65        $this->addDescription( 'Manage sanitize jobs (CheckerJob). This ' .
66            'script operates on all writable clusters by default. ' .
67            'Add --cluster to work on a single cluster. Note that ' .
68            'once a job has been pushed to a particular cluster the ' .
69            'script will fail if you try to run the same job with ' .
70            'different cluster options.'
71        );
72        $this->addOption( 'push', 'Push some jobs to the job queue.' );
73        $this->addOption( 'show', 'Display job info.' );
74        $this->addOption( 'delete-job', 'Delete the job.' );
75        $this->addOption( 'refresh-freq', 'Refresh rate in seconds this ' .
76            'script is run from your crontab. This will be ' .
77            'used to spread jobs over time. Defaults to 7200 (2 ' .
78            'hours).', false, true );
79        $this->addOption( 'job-name', 'Tells the script the name of the ' .
80            'sanitize job only useful to run multiple sanitize jobs. ' .
81            'Defaults to "default".', false, true );
82    }
83
84    public function execute() {
85        $this->init();
86        if ( $this->hasOption( 'show' ) ) {
87            $this->showJobDetail();
88        } elseif ( $this->hasOption( 'push' ) ) {
89            $this->pushJobs();
90        } elseif ( $this->hasOption( 'delete-job' ) ) {
91            $this->deleteJob();
92        } else {
93            $this->maybeHelp( true );
94        }
95
96        return true;
97    }
98
99    private function init() {
100        $this->initClusters();
101        $this->initMetaStores();
102        $this->initProfile();
103    }
104
105    /**
106     * Basically we support two modes:
107     *   - all writable cluster, cluster = null
108     *   - single cluster, cluster = 'clusterName'
109     * If we detect a mismatch here we fail.
110     * @param \Elastica\Document $jobInfo check if the stored job match
111     * cluster config used by this script, will die if clusters mismatch
112     */
113    private function checkJobClusterMismatch( \Elastica\Document $jobInfo ) {
114        $jobCluster = $jobInfo->get( 'sanitize_job_cluster' );
115        $scriptCluster = $this->getOption( 'cluster' );
116        if ( $jobCluster != $scriptCluster ) {
117            $jobCluster = $jobCluster != null ? $jobCluster : "all writable clusters";
118            $scriptCluster = $scriptCluster != null ? $scriptCluster : "all writable clusters";
119            $this->fatalError( "Job cluster mismatch, stored job is configured to work on $jobCluster " .
120                "but the script is configured to run on $scriptCluster.\n" );
121        }
122    }
123
124    private function showJobDetail() {
125        $profile = $this->getSearchConfig()
126            ->getProfileService()
127            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName );
128        '@phan-var array $profile';
129        $minLoopDuration = $profile['min_loop_duration'];
130        $maxJobs = $profile['max_checker_jobs'];
131        $maxUpdates = $profile['update_jobs_max_pressure'];
132
133        $jobName = $this->getOption( 'job-name', 'default' );
134        $jobInfo = $this->getJobInfo( $jobName );
135        if ( $jobInfo === null ) {
136            $this->fatalError( "Unknown job $jobName, push some jobs first.\n" );
137        }
138        $fmt = 'Y-m-d H:i:s';
139        $cluster = $jobInfo->get( 'sanitize_job_cluster' ) ?: 'All writable clusters';
140
141        $created = date( $fmt, $jobInfo->get( 'sanitize_job_created' ) );
142        $updated = date( $fmt, $jobInfo->get( 'sanitize_job_updated' ) );
143        $loopStart = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) );
144
145        $idsSent = $jobInfo->get( 'sanitize_job_ids_sent' );
146        $idsSentTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' );
147
148        $jobsSent = $jobInfo->get( 'sanitize_job_jobs_sent' );
149        $jobsSentTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' );
150
151        $updatePressure = CheckerJob::getPressure();
152
153        $loopTime = time() - $jobInfo->get( 'sanitize_job_last_loop' );
154        $totalTime = time() - $jobInfo->get( 'sanitize_job_created' );
155
156        $jobsRate = $jobInfo->get( 'sanitize_job_jobs_sent' ) / $loopTime;
157        $jobsPerHour = round( $jobsRate * 3600, 2 );
158        $jobsPerDay = round( $jobsRate * 3600 * 24, 2 );
159        $jobsRateTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' ) / $totalTime;
160        $jobsTotalPerHour = round( $jobsRateTotal * 3600, 2 );
161        $jobsTotalPerDay = round( $jobsRateTotal * 3600 * 24, 2 );
162
163        $idsRate = $jobInfo->get( 'sanitize_job_ids_sent' ) / $loopTime;
164        $idsPerHour = round( $idsRate * 3600, 2 );
165        $idsPerDay = round( $idsRate * 3600 * 24, 2 );
166        $idsRateTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' ) / $totalTime;
167        $idsTotalPerHour = round( $idsRateTotal * 3600, 2 );
168        $idsTotalPerDay = round( $idsRateTotal * 3600 * 24, 2 );
169
170        $loopId = $jobInfo->has( 'sanitize_job_loop_id' ) ? $jobInfo->get( 'sanitize_job_loop_id' ) : 0;
171        $idsTodo = $this->maxId - $jobInfo->get( 'sanitize_job_id_offset' );
172        $loopEta = date( $fmt, time() + ( $idsTodo * $idsRate ) );
173        $loopRestartMinTime = date( $fmt, $jobInfo->get( 'sanitize_job_last_loop' ) + $minLoopDuration );
174
175        $this->output( <<<EOD
176JobDetail for {$jobName}
177    Target Wiki:     {$jobInfo->get( 'sanitize_job_wiki' )}
178    Cluster:     {$cluster}
179    Created:     {$created}
180    Updated:     {$updated}
181    Loop start:    {$loopStart}
182    Current id:    {$jobInfo->get( 'sanitize_job_id_offset' )}
183    Ids sent:    {$idsSent} ({$idsSentTotal} total)
184    Jobs sent:    {$jobsSent} ({$jobsSentTotal} total)
185    Pressure (CheckerJobs):
186        Cur:    {$this->getPressure()} jobs
187        Max:    {$maxJobs} jobs
188    Pressure (Updates):
189        Cur:    {$updatePressure} jobs
190        Max:    {$maxUpdates} jobs
191    Jobs rate:
192        Loop:    {$jobsPerHour} jobs/hour, {$jobsPerDay} jobs/day
193        Total:    {$jobsTotalPerHour} jobs/hour, {$jobsTotalPerDay} jobs/day
194    Ids rate :
195        Loop:    {$idsPerHour} ids/hour, {$idsPerDay} ids/day
196        Total:    {$idsTotalPerHour} ids/hour, {$idsTotalPerDay} ids/day
197    Loop:
198        Loop:    {$loopId}
199        Todo:    {$idsTodo} ids
200        ETA:    {$loopEta}
201    Loop restart min time: {$loopRestartMinTime}
202
203EOD
204        );
205    }
206
207    private function pushJobs() {
208        $profile = $this->getSearchConfig()
209            ->getProfileService()
210            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->profileName );
211        '@phan-var array $profile';
212        $maxJobs = $profile['max_checker_jobs'];
213        if ( !$maxJobs || $maxJobs <= 0 ) {
214            $this->fatalError( "max_checker_jobs invalid abandonning.\n" );
215        }
216
217        $pressure = $this->getPressure();
218        if ( $pressure >= $maxJobs ) {
219            $this->fatalError( "Too many CheckerJob: $pressure in the queue, $maxJobs allowed.\n" );
220        }
221        $this->log( "$pressure checker job(s) in the queue.\n" );
222
223        $this->disablePoolCountersAndLogging();
224        $this->initMetaStores();
225
226        $jobName = $this->getOption( 'job-name', 'default' );
227        $jobInfo = $this->getJobInfo( $jobName ) ?? $this->createNewJob( $jobName );
228
229        $pushJobFreq = $this->getOption( 'refresh-freq', 2 * 3600 );
230        $loop = new SaneitizeLoop(
231            $this->profileName,
232            $pushJobFreq,
233            $profile['jobs_chunk_size'],
234            $profile['min_loop_duration'],
235            function ( $msg, $channel ) {
236                $this->log( $msg, $channel );
237            } );
238        $jobs = $loop->run( $jobInfo, $maxJobs, $this->minId, $this->maxId );
239        if ( $jobs ) {
240            // Some job queues implementations ignore the timestamps and
241            // instead run these jobs with concurrency limits to keep them
242            // spread over time. Insert jobs in the order we asked for them
243            // to be run to have some semblance of sanity.
244            usort( $jobs, static function ( CheckerJob $job1, CheckerJob $job2 ) {
245                return $job1->getReadyTimestamp() - $job2->getReleaseTimestamp();
246            } );
247            MediaWikiServices::getInstance()->getJobQueueGroup()->push( $jobs );
248            $this->updateJob( $jobInfo );
249        }
250    }
251
252    private function initClusters() {
253        $sanityCheckSetup = $this->getSearchConfig()->get( 'CirrusSearchSanityCheck' );
254        if ( !$sanityCheckSetup ) {
255            $this->fatalError( "Sanity check disabled, abandonning...\n" );
256        }
257        $assignment = $this->getSearchConfig()->getClusterAssignment();
258        if ( $this->hasOption( 'cluster' ) ) {
259            $cluster = $this->getOption( 'cluster' );
260            if ( $assignment->canWriteToCluster( $cluster ) ) {
261                $this->fatalError( "$cluster is not in the set of writable clusters\n" );
262            }
263            $this->clusters = [ $this->getOption( 'cluster' ) ];
264        }
265        $this->clusters = $assignment->getWritableClusters();
266        if ( count( $this->clusters ) === 0 ) {
267            $this->fatalError( 'No clusters are writable...' );
268        }
269    }
270
271    private function initMetaStores() {
272        $connections = Connection::getClusterConnections( $this->clusters, $this->getSearchConfig() );
273
274        if ( empty( $connections ) ) {
275            $this->fatalError( "No writable cluster found." );
276        }
277
278        $this->metaStores = [];
279        foreach ( $connections as $cluster => $connection ) {
280            $store = $this->getMetaStore( $connection );
281            if ( !$store->cirrusReady() ) {
282                $this->fatalError( "No metastore found in cluster $cluster" );
283            }
284            $this->metaStores[$cluster] = $store->saneitizeJobStore();
285        }
286    }
287
288    private function initProfile() {
289        $res = $this->getDB( DB_REPLICA )
290            ->newSelectQueryBuilder()
291            ->select( [ 'min_id' => 'MIN(page_id)', 'max_id' => 'MAX(page_id)' ] )
292            ->table( 'page' )
293            ->caller( __METHOD__ )
294            ->fetchResultSet();
295
296        $row = $res->fetchObject();
297        $this->minId = $row->min_id;
298        $this->maxId = $row->max_id;
299        $profiles =
300            $this->getSearchConfig()
301                ->getProfileService()
302                ->listExposedProfiles( SearchProfileService::SANEITIZER );
303        uasort( $profiles, static function ( $a, $b ) {
304            return $a['max_wiki_size'] <=> $b['max_wiki_size'];
305        } );
306        $wikiSize = $this->maxId - $this->minId;
307        foreach ( $profiles as $name => $settings ) {
308            '@phan-var array $settings';
309            if ( $settings['max_wiki_size'] > $wikiSize ) {
310                $this->profileName = $name;
311                $this->log( "Detected $wikiSize ids to check, selecting profile $name\n" );
312                break;
313            }
314        }
315        if ( !$this->profileName ) {
316            $this->fatalError( "No profile found for $wikiSize ids, please check sanitization profiles" );
317        }
318    }
319
320    /**
321     * @param string $jobName
322     * @return \Elastica\Document|null
323     */
324    private function getJobInfo( $jobName ) {
325        $latest = null;
326        // Fetch the lastest jobInfo from the metastore. Ideally all
327        // jobInfo should be the same but in the case a cluster has
328        // been decommissioned and re-added its job info may be outdated
329        foreach ( $this->metaStores as $store ) {
330            $current = $store->get( $jobName );
331            if ( $current === null ) {
332                continue;
333            }
334            $this->checkJobClusterMismatch( $current );
335            if ( $latest == null ) {
336                $latest = $current;
337            } elseif ( $current->get( 'sanitize_job_updated' ) > $latest->get( 'sanitize_job_updated' ) ) {
338                $latest = $current;
339            }
340        }
341        return $latest;
342    }
343
344    /**
345     * @param \Elastica\Document $jobInfo
346     */
347    private function updateJob( \Elastica\Document $jobInfo ) {
348        foreach ( $this->metaStores as $store ) {
349            $store->update( $jobInfo );
350        }
351    }
352
353    /**
354     * @param string $jobName
355     * @return \Elastica\Document
356     */
357    private function createNewJob( $jobName ) {
358        $job = null;
359        $scriptCluster = $this->getOption( 'cluster' );
360        foreach ( $this->metaStores as $store ) {
361            // TODO: It's a little awkward to let each cluster make
362            // it's own job, but it also seems sane to put all
363            // the doc building in the store?
364            $job = $store->create( $jobName, $this->minId, $scriptCluster );
365        }
366        if ( $job === null ) {
367            $this->fatalError( "No job created, metastores failed to create?" );
368        }
369        // @phan-suppress-next-line PhanTypeMismatchReturnNullable T240141
370        return $job;
371    }
372
373    private function deleteJob() {
374        $jobName = $this->getOption( 'job-name', 'default' );
375        $jobInfo = $this->getJobInfo( $jobName );
376        if ( $jobInfo === null ) {
377            $this->fatalError( "Unknown job $jobName" );
378        }
379        foreach ( $this->metaStores as $cluster => $store ) {
380            $store->delete( $jobName );
381            $this->log( "Deleted job $jobName from $cluster.\n" );
382        }
383    }
384
385    /**
386     * @return int the number of jobs in the CheckerJob queue
387     */
388    private function getPressure() {
389        $queue = MediaWikiServices::getInstance()->getJobQueueGroup()->get( 'cirrusSearchCheckerJob' );
390        return $queue->getSize() + $queue->getDelayedCount();
391    }
392
393    private function log( $msg, $channel = null ) {
394        $date = new \DateTime();
395        $this->output( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $channel );
396    }
397
398    /**
399     * @param string $msg The error to display
400     * @param int $die deprecated do not use
401     */
402    public function error( $msg, $die = 0 ) {
403        $date = new \DateTime();
404        parent::error( $date->format( 'Y-m-d H:i:s' ) . " " . $msg );
405    }
406
407    /**
408     * @param string $msg The error to display
409     * @param int $exitCode die out using this int as the code
410     * @return never
411     */
412    public function fatalError( $msg, $exitCode = 1 ) {
413        $date = new \DateTime();
414        parent::fatalError( $date->format( 'Y-m-d H:i:s' ) . " " . $msg, $exitCode );
415    }
416}
417
418$maintClass = SaneitizeJobs::class;
419require_once RUN_MAINTENANCE_IF_MAIN;