Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 188
0.00% covered (danger)
0.00%
0 / 6
CRAP
0.00% covered (danger)
0.00%
0 / 1
CheckerJob
0.00% covered (danger)
0.00%
0 / 188
0.00% covered (danger)
0.00%
0 / 6
1190
0.00% covered (danger)
0.00%
0 / 1
 build
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
2
 __construct
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 doJob
0.00% covered (danger)
0.00%
0 / 121
0.00% covered (danger)
0.00%
0 / 1
462
 getPressure
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
6
 allowRetries
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 retry
0.00% covered (danger)
0.00%
0 / 33
0.00% covered (danger)
0.00%
0 / 1
30
1<?php
2
3namespace CirrusSearch\Job;
4
5use ArrayObject;
6use CirrusSearch\Profile\SearchProfileService;
7use CirrusSearch\Sanity\AllClustersQueueingRemediator;
8use CirrusSearch\Sanity\BufferedRemediator;
9use CirrusSearch\Sanity\Checker;
10use CirrusSearch\Sanity\CheckerException;
11use CirrusSearch\Sanity\LogOnlyRemediator;
12use CirrusSearch\Sanity\MultiClusterRemediatorHelper;
13use CirrusSearch\Sanity\QueueingRemediator;
14use CirrusSearch\Searcher;
15use CirrusSearch\UpdateGroup;
16use CirrusSearch\Util;
17use MediaWiki\Logger\LoggerFactory;
18use MediaWiki\MediaWikiServices;
19
20/**
21 * Job wrapper around Sanity\Checker
22 *
23 * This program is free software; you can redistribute it and/or modify
24 * it under the terms of the GNU General Public License as published by
25 * the Free Software Foundation; either version 2 of the License, or
26 * (at your option) any later version.
27 *
28 * This program is distributed in the hope that it will be useful,
29 * but WITHOUT ANY WARRANTY; without even the implied warranty of
30 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * GNU General Public License for more details.
32 *
33 * You should have received a copy of the GNU General Public License along
34 * with this program; if not, write to the Free Software Foundation, Inc.,
35 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
36 * http://www.gnu.org/copyleft/gpl.html
37 */
38class CheckerJob extends CirrusGenericJob {
39    /**
40     * @const int max number of retries, 3 means that the job can be run at
41     * most 4 times.
42     */
43    private const JOB_MAX_RETRIES = 3;
44
45    /**
46     * Construct a new CherckerJob.
47     * @param int $fromPageId
48     * @param int $toPageId
49     * @param int $delay
50     * @param string $profile sanitization profile to use
51     * @param string|null $cluster
52     * @param int $loopId The number of times the checker jobs have looped
53     *  over the pages to be checked.
54     * @param \JobQueueGroup $jobQueueGroup
55     * @return CheckerJob
56     */
57    public static function build( $fromPageId, $toPageId, $delay, $profile, $cluster, $loopId, \JobQueueGroup $jobQueueGroup ) {
58        $job = new self( [
59            'fromPageId' => $fromPageId,
60            'toPageId' => $toPageId,
61            'createdAt' => time(),
62            'retryCount' => 0,
63            'profile' => $profile,
64            'cluster' => $cluster,
65            'loopId' => $loopId,
66        ] + self::buildJobDelayOptions( self::class, $delay, $jobQueueGroup ) );
67        return $job;
68    }
69
70    /**
71     * @param array $params
72     */
73    public function __construct( array $params ) {
74        // BC for jobs created before id fields were clarified to be explicitly page id's
75        if ( isset( $params['fromId'] ) ) {
76            $params['fromPageId'] = $params['fromId'];
77            unset( $params['fromId'] );
78        }
79        if ( isset( $params['toId'] ) ) {
80            $params['toPageId'] = $params['toId'];
81            unset( $params['toId'] );
82        }
83        // BC for jobs created before loop id existed
84        if ( !isset( $params['loopId'] ) ) {
85            $params['loopId'] = 0;
86        }
87        parent::__construct( $params );
88    }
89
90    /**
91     * @return bool
92     */
93    protected function doJob() {
94        $profile = $this->searchConfig
95            ->getProfileService()
96            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->params['profile'], false );
97
98        // First perform a set of sanity checks and return true to fake a success (to prevent retries)
99        // in case the job params are incorrect. These errors are generally unrecoverable.
100        if ( !$profile ) {
101            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
102                "Cannot run CheckerJob invalid profile {profile} provided, check CirrusSearchSanityCheck config.",
103                [
104                    'profile' => $this->params['profile']
105                ]
106            );
107            return true;
108        }
109        $maxPressure = $profile['update_jobs_max_pressure'] ?? null;
110        if ( !$maxPressure || $maxPressure < 0 ) {
111            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
112                "Cannot run CheckerJob invalid update_jobs_max_pressure, check CirrusSearchSanityCheck config."
113            );
114            return true;
115        }
116        $batchSize = $profile['checker_batch_size'] ?? null;
117        if ( !$batchSize || $batchSize < 0 ) {
118            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
119                "Cannot run CheckerJob invalid checker_batch_size, check CirrusSearchSanityCheck config."
120            );
121            return true;
122        }
123
124        $chunkSize = $profile['jobs_chunk_size'] ?? null;
125        if ( !$chunkSize || $chunkSize < 0 ) {
126            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
127                "Cannot run CheckerJob invalid jobs_chunk_size, check CirrusSearchSanityCheck config."
128            );
129            return true;
130        }
131
132        $maxTime = $profile['checker_job_max_time'] ?? null;
133        if ( !$maxTime || $maxTime < 0 ) {
134            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
135                "Cannot run CheckerJob invalid checker_job_max_time, check CirrusSearchSanityCheck config."
136            );
137            return true;
138        }
139
140        $connections = $this->decideClusters( UpdateGroup::CHECK_SANITY );
141        if ( !$connections ) {
142            return true;
143        }
144
145        $from = $this->params['fromPageId'];
146        $to = $this->params['toPageId'];
147
148        if ( $from > $to ) {
149            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
150                "Cannot run CheckerJob: from > to ( {from} > {to} ), job is corrupted?",
151                [
152                    'from' => $from,
153                    'to' => $to,
154                ]
155            );
156            return true;
157        }
158
159        if ( ( $to - $from ) > $chunkSize ) {
160            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
161                "Cannot run CheckerJob: to - from > chunkSize( {from}, {to} > {chunkSize} ), job is corrupted or profile mismatch?",
162                [
163                    'from' => $from,
164                    'to' => $to,
165                    'chunkSize' => $chunkSize,
166                ]
167            );
168            return true;
169        }
170
171        $clusterNames = implode( ', ', array_keys( $connections ) );
172
173        LoggerFactory::getInstance( 'CirrusSearch' )->debug(
174            "Running CheckerJob on cluster $clusterNames {diff}s after insertion",
175            [
176                'diff' => time() - $this->params['createdAt'],
177                'clusters' => array_keys( $connections ),
178            ]
179        );
180
181        $isOld = null;
182        $reindexAfterLoops = $profile['reindex_after_loops'] ?? null;
183        if ( $reindexAfterLoops ) {
184            $isOld = Checker::makeIsOldClosure(
185                $this->params['loopId'],
186                $reindexAfterLoops
187            );
188        }
189
190        $startTime = time();
191
192        $pageCache = new ArrayObject();
193        /**
194         * @var Checker[] $checkers
195         */
196        $checkers = [];
197        $perClusterRemediators = [];
198        $perClusterBufferedRemediators = [];
199        $logger = LoggerFactory::getInstance( 'CirrusSearch' );
200        $assignment = $this->searchConfig->getClusterAssignment();
201        foreach ( $connections as $cluster => $connection ) {
202            $searcher = new Searcher( $connection, 0, 0, $this->searchConfig, [], null );
203            $remediator = $assignment->canWriteToCluster( $cluster, UpdateGroup::SANEITIZER )
204                ? new QueueingRemediator( $cluster ) : new LogOnlyRemediator( $logger );
205            $bufferedRemediator = new BufferedRemediator();
206            $checker = new Checker(
207                $this->searchConfig,
208                $connection,
209                $bufferedRemediator,
210                $searcher,
211                Util::getStatsFactory(),
212                false, // logSane
213                false, // fastRedirectCheck
214                $pageCache,
215                $isOld
216            );
217            $checkers[$cluster] = $checker;
218            $perClusterRemediators[$cluster] = $remediator;
219            $perClusterBufferedRemediators[$cluster] = $bufferedRemediator;
220        }
221
222        $multiClusterRemediator = new MultiClusterRemediatorHelper( $perClusterRemediators, $perClusterBufferedRemediators,
223            new AllClustersQueueingRemediator(
224                $this->getSearchConfig()->getClusterAssignment(),
225                MediaWikiServices::getInstance()->getJobQueueGroup()
226            ) );
227
228        $ranges = array_chunk( range( $from, $to ), $batchSize );
229        foreach ( $ranges as $pageIds ) {
230            if ( self::getPressure() > $maxPressure ) {
231                $this->retry( "too much pressure on update jobs", reset( $pageIds ) );
232                return true;
233            }
234            if ( time() - $startTime > $maxTime ) {
235                $this->retry( "execution time exceeded checker_job_max_time", reset( $pageIds ) );
236                return true;
237            }
238            $pageCache->exchangeArray( [] );
239            foreach ( $checkers as $cluster => $checker ) {
240                try {
241                    $checker->check( $pageIds );
242                } catch ( CheckerException $checkerException ) {
243                    $this->retry( "Failed to verify ids on $cluster" . $checkerException->getMessage(), reset( $pageIds ), $cluster );
244                    unset( $checkers[$cluster] );
245                }
246            }
247            $multiClusterRemediator->sendBatch();
248        }
249        return true;
250    }
251
252    /**
253     * @return int the total number of update jobs enqueued
254     */
255    public static function getPressure() {
256        $queues = [
257            'cirrusSearchLinksUpdatePrioritized',
258            'cirrusSearchLinksUpdate',
259            'cirrusSearchElasticaWrite',
260            'cirrusSearchOtherIndex',
261            'cirrusSearchDeletePages',
262        ];
263        $size = 0;
264        $jobQueueGroup = MediaWikiServices::getInstance()->getJobQueueGroup();
265        foreach ( $queues as $queueName ) {
266            $queue = $jobQueueGroup->get( $queueName );
267            $size += $queue->getSize();
268            $size += $queue->getDelayedCount();
269        }
270
271        return $size;
272    }
273
274    /**
275     * This job handles all its own retries internally.
276     * @return bool
277     */
278    public function allowRetries() {
279        return true;
280    }
281
282    /**
283     * Retry the job later with a new from offset
284     * @param string $cause why we retry
285     * @param int $newFrom the new from offset
286     * @param string|null $cluster Cluster job is for
287     */
288    private function retry( $cause, $newFrom, $cluster = null ) {
289        if ( $this->params['retryCount'] >= self::JOB_MAX_RETRIES ) {
290            LoggerFactory::getInstance( 'CirrusSearch' )->info(
291                "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Abandonning CheckerJob after {retries} retries " .
292                "for {cluster}, (jobs_chunk_size too high?).",
293                [
294                    'retries' => $this->params['retryCount'],
295                    'fromPageId' => $this->params['fromPageId'],
296                    'toPageId' => $this->params['toPageId'],
297                    'cluster' => $cluster ?: 'all clusters'
298                ]
299            );
300            return;
301        }
302
303        $delay = $this->backoffDelay( $this->params['retryCount'] );
304        $params = $this->params;
305        if ( $cluster !== null ) {
306            $params['cluster'] = $cluster;
307        }
308        $params['retryCount']++;
309        $params['fromPageId'] = $newFrom;
310        unset( $params['jobReleaseTimestamp'] );
311        $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup();
312        $params += self::buildJobDelayOptions( self::class, $delay, $jobQueue );
313        $job = new self( $params );
314        LoggerFactory::getInstance( 'CirrusSearch' )->info(
315            "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Requeueing CheckerJob " .
316            "for {cluster} with a delay of {delay}s.",
317            [
318                'delay' => $delay,
319                'fromPageId' => $job->params['fromPageId'],
320                'toPageId' => $job->params['toPageId'],
321                'cluster' => $cluster ?: 'all clusters'
322            ]
323        );
324        $jobQueue->push( $job );
325    }
326}