Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
100.00% covered (success)
100.00%
54 / 54
100.00% covered (success)
100.00%
5 / 5
CRAP
100.00% covered (success)
100.00%
1 / 1
SaneitizeLoop
100.00% covered (success)
100.00%
54 / 54
100.00% covered (success)
100.00%
5 / 5
13
100.00% covered (success)
100.00%
1 / 1
 __construct
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
 run
100.00% covered (success)
100.00%
37 / 37
100.00% covered (success)
100.00%
1 / 1
7
 createCheckerJob
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 checkMinLoopDuration
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
3
 log
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 */
18
19namespace CirrusSearch\Maintenance;
20
21use CirrusSearch\Job\CheckerJob;
22use Elastica\Document;
23use MediaWiki\JobQueue\JobQueueGroup;
24use MediaWiki\MediaWikiServices;
25use MediaWiki\Utils\MWTimestamp;
26
27/**
28 * Create saneitize jobs for a single execution of a saneitizer loop
29 *
30 * Maintains state in the job info pertaining to current position in
31 * the loop. The job info must be persisted between runs.
32 */
33class SaneitizeLoop {
34    /** @var string Name of the saneitizer profile to use in created jobs */
35    private $profileName;
36
37    /** @var int The frequency, in seconds, that the saneitize loop is executed */
38    private $pushJobFreq;
39
40    /** @var int The number of pages to include per job */
41    private $chunkSize;
42
43    /** @var int Minimum number of seconds between loop restarts */
44    private $minLoopDuration;
45
46    /** @var callable */
47    private $logger;
48    /**
49     * @var JobQueueGroup
50     */
51    private $jobQueueGroup;
52
53    /**
54     * @param string $profileName Name of the saneitizer profile to use in created jobs
55     * @param int $pushJobFreq The frequency, in seconds, that the saneitize loop is executed
56     * @param int $chunkSize The number of pages to include per job
57     * @param int $minLoopDuration Minimum number of seconds between loop restarts
58     * @param callable|null $logger Callable accepting 2 arguments, first a log
59     *  message and second either a channel name or null.
60     * @param JobQueueGroup|null $jobQueueGroup
61     */
62    public function __construct(
63        $profileName, $pushJobFreq, $chunkSize, $minLoopDuration, $logger = null, ?JobQueueGroup $jobQueueGroup = null
64    ) {
65        $this->profileName = $profileName;
66        $this->pushJobFreq = $pushJobFreq;
67        $this->chunkSize = $chunkSize;
68        $this->minLoopDuration = $minLoopDuration;
69        $this->logger = $logger ?? static function ( $msg, $channel = null ) {
70        };
71        $this->jobQueueGroup = $jobQueueGroup ?? MediaWikiServices::getInstance()->getJobQueueGroup();
72    }
73
74    /**
75     * Generate jobs for one run of a saneitize loop
76     *
77     * @param Document $jobInfo
78     * @param int $numJobs The number of jobs to create
79     * @param int $minId Minimum page_id on the wiki
80     * @param int $maxId Maximum page_id on the wiki
81     * @return CheckerJob[] The created jobs. May be less than requested.
82     */
83    public function run( Document $jobInfo, $numJobs, $minId, $maxId ) {
84        // @var int
85        $from = $jobInfo->get( 'sanitize_job_id_offset' );
86        $lastLoop = $jobInfo->get( 'sanitize_job_last_loop' );
87        // ternary is BC for when loop_id didn't exist.
88        $loopId = $jobInfo->has( 'sanitize_job_loop_id' ) ? $jobInfo->get( 'sanitize_job_loop_id' ) : 0;
89        $jobsSent = $jobInfo->get( 'sanitize_job_jobs_sent' );
90        $jobsSentTotal = $jobInfo->get( 'sanitize_job_jobs_sent_total' );
91        $jobsSentCurLoop = 0;
92        $idsSent = $jobInfo->get( 'sanitize_job_ids_sent' );
93        $idsSentTotal = $jobInfo->get( 'sanitize_job_ids_sent_total' );
94        $jobs = [];
95        for ( $i = 0; $i < $numJobs; $i++ ) {
96            if ( $from <= $minId || $from >= $maxId ) {
97                // The previous loop has completed. Wait until that loop
98                // has taken the minimum required duration before starting
99                // the next one.
100                if ( !$this->checkMinLoopDuration( $lastLoop ) ) {
101                    break;
102                }
103                $from = $minId;
104                $idsSent = 0;
105                $jobsSent = 0;
106                $lastLoop = MWTimestamp::time();
107                $loopId += 1;
108            }
109            $to = min( $from + $this->chunkSize - 1, $maxId );
110            $jobs[] = $this->createCheckerJob( $from, $to, $jobInfo->get( 'sanitize_job_cluster' ), $loopId );
111            $jobsSent++;
112            $jobsSentTotal++;
113            $jobsSentCurLoop++;
114            $idsSent += $to - $from;
115            $idsSentTotal += $to - $from;
116            $from = $to + 1;
117        }
118
119        if ( $jobs ) {
120            $jobInfo->set( 'sanitize_job_loop_id', $loopId );
121            $jobInfo->set( 'sanitize_job_last_loop', $lastLoop );
122            $jobInfo->set( 'sanitize_job_id_offset', $from );
123            $jobInfo->set( 'sanitize_job_jobs_sent', $jobsSent );
124            $jobInfo->set( 'sanitize_job_jobs_sent_total', $jobsSentTotal );
125            $jobInfo->set( 'sanitize_job_ids_sent', $idsSent );
126            $jobInfo->set( 'sanitize_job_ids_sent_total', $idsSentTotal );
127            $this->log( "Created $jobsSentCurLoop jobs, setting from offset to $from.\n" );
128        } else {
129            $this->log( "No jobs created.\n" );
130        }
131
132        return $jobs;
133    }
134
135    /**
136     * @param int $from
137     * @param int $to
138     * @param string|null $cluster
139     * @param int $loopId
140     * @return CheckerJob
141     */
142    private function createCheckerJob( $from, $to, $cluster, $loopId ) {
143        $delay = mt_rand( 0, $this->pushJobFreq );
144        $this->log( "Creating CheckerJob( $from$to$delay{$this->profileName}$cluster$loopId )\n" );
145        return CheckerJob::build( $from, $to, $delay, $this->profileName, $cluster, $loopId, $this->jobQueueGroup );
146    }
147
148    /**
149     * @param int|null $lastLoop last loop start time
150     * @return bool true if minLoopDuration is not reached false otherwize
151     */
152    private function checkMinLoopDuration( $lastLoop ) {
153        if ( $lastLoop !== null && ( MWTimestamp::time() - $lastLoop ) < $this->minLoopDuration ) {
154            $date = date( 'Y-m-d H:i:s', $lastLoop );
155            $newLoop = date( 'Y-m-d H:i:s', $lastLoop + $this->minLoopDuration );
156            $this->log( "Last loop ended at $date, new jobs will be sent when min_loop_duration is reached at $newLoop\n" );
157            return false;
158        }
159        return true;
160    }
161
162    /**
163     * @param string $msg
164     * @param string|null $channel
165     */
166    private function log( $msg, $channel = null ) {
167        ( $this->logger )( $msg, $channel );
168    }
169}