Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 187
0.00% covered (danger)
0.00%
0 / 6
CRAP
0.00% covered (danger)
0.00%
0 / 1
CheckerJob
0.00% covered (danger)
0.00%
0 / 187
0.00% covered (danger)
0.00%
0 / 6
1190
0.00% covered (danger)
0.00%
0 / 1
 build
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
2
 __construct
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 doJob
0.00% covered (danger)
0.00%
0 / 120
0.00% covered (danger)
0.00%
0 / 1
462
 getPressure
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
6
 allowRetries
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 retry
0.00% covered (danger)
0.00%
0 / 33
0.00% covered (danger)
0.00%
0 / 1
30
1<?php
2
3namespace CirrusSearch\Job;
4
5use ArrayObject;
6use CirrusSearch\Profile\SearchProfileService;
7use CirrusSearch\Sanity\AllClustersQueueingRemediator;
8use CirrusSearch\Sanity\BufferedRemediator;
9use CirrusSearch\Sanity\Checker;
10use CirrusSearch\Sanity\CheckerException;
11use CirrusSearch\Sanity\LogOnlyRemediator;
12use CirrusSearch\Sanity\MultiClusterRemediatorHelper;
13use CirrusSearch\Sanity\QueueingRemediator;
14use CirrusSearch\Searcher;
15use CirrusSearch\UpdateGroup;
16use MediaWiki\Logger\LoggerFactory;
17use MediaWiki\MediaWikiServices;
18
19/**
20 * Job wrapper around Sanity\Checker
21 *
22 * This program is free software; you can redistribute it and/or modify
23 * it under the terms of the GNU General Public License as published by
24 * the Free Software Foundation; either version 2 of the License, or
25 * (at your option) any later version.
26 *
27 * This program is distributed in the hope that it will be useful,
28 * but WITHOUT ANY WARRANTY; without even the implied warranty of
29 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * GNU General Public License for more details.
31 *
32 * You should have received a copy of the GNU General Public License along
33 * with this program; if not, write to the Free Software Foundation, Inc.,
34 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
35 * http://www.gnu.org/copyleft/gpl.html
36 */
37class CheckerJob extends CirrusGenericJob {
38    /**
39     * @const int max number of retries, 3 means that the job can be run at
40     * most 4 times.
41     */
42    private const JOB_MAX_RETRIES = 3;
43
44    /**
45     * Construct a new CherckerJob.
46     * @param int $fromPageId
47     * @param int $toPageId
48     * @param int $delay
49     * @param string $profile sanitization profile to use
50     * @param string|null $cluster
51     * @param int $loopId The number of times the checker jobs have looped
52     *  over the pages to be checked.
53     * @param \JobQueueGroup $jobQueueGroup
54     * @return CheckerJob
55     */
56    public static function build( $fromPageId, $toPageId, $delay, $profile, $cluster, $loopId, \JobQueueGroup $jobQueueGroup ) {
57        $job = new self( [
58            'fromPageId' => $fromPageId,
59            'toPageId' => $toPageId,
60            'createdAt' => time(),
61            'retryCount' => 0,
62            'profile' => $profile,
63            'cluster' => $cluster,
64            'loopId' => $loopId,
65        ] + self::buildJobDelayOptions( self::class, $delay, $jobQueueGroup ) );
66        return $job;
67    }
68
69    /**
70     * @param array $params
71     */
72    public function __construct( array $params ) {
73        // BC for jobs created before id fields were clarified to be explicitly page id's
74        if ( isset( $params['fromId'] ) ) {
75            $params['fromPageId'] = $params['fromId'];
76            unset( $params['fromId'] );
77        }
78        if ( isset( $params['toId'] ) ) {
79            $params['toPageId'] = $params['toId'];
80            unset( $params['toId'] );
81        }
82        // BC for jobs created before loop id existed
83        if ( !isset( $params['loopId'] ) ) {
84            $params['loopId'] = 0;
85        }
86        parent::__construct( $params );
87    }
88
89    /**
90     * @return bool
91     */
92    protected function doJob() {
93        $profile = $this->searchConfig
94            ->getProfileService()
95            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->params['profile'], false );
96
97        // First perform a set of sanity checks and return true to fake a success (to prevent retries)
98        // in case the job params are incorrect. These errors are generally unrecoverable.
99        if ( !$profile ) {
100            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
101                "Cannot run CheckerJob invalid profile {profile} provided, check CirrusSearchSanityCheck config.",
102                [
103                    'profile' => $this->params['profile']
104                ]
105            );
106            return true;
107        }
108        $maxPressure = $profile['update_jobs_max_pressure'] ?? null;
109        if ( !$maxPressure || $maxPressure < 0 ) {
110            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
111                "Cannot run CheckerJob invalid update_jobs_max_pressure, check CirrusSearchSanityCheck config."
112            );
113            return true;
114        }
115        $batchSize = $profile['checker_batch_size'] ?? null;
116        if ( !$batchSize || $batchSize < 0 ) {
117            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
118                "Cannot run CheckerJob invalid checker_batch_size, check CirrusSearchSanityCheck config."
119            );
120            return true;
121        }
122
123        $chunkSize = $profile['jobs_chunk_size'] ?? null;
124        if ( !$chunkSize || $chunkSize < 0 ) {
125            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
126                "Cannot run CheckerJob invalid jobs_chunk_size, check CirrusSearchSanityCheck config."
127            );
128            return true;
129        }
130
131        $maxTime = $profile['checker_job_max_time'] ?? null;
132        if ( !$maxTime || $maxTime < 0 ) {
133            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
134                "Cannot run CheckerJob invalid checker_job_max_time, check CirrusSearchSanityCheck config."
135            );
136            return true;
137        }
138
139        $connections = $this->decideClusters( UpdateGroup::CHECK_SANITY );
140        if ( !$connections ) {
141            return true;
142        }
143
144        $from = $this->params['fromPageId'];
145        $to = $this->params['toPageId'];
146
147        if ( $from > $to ) {
148            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
149                "Cannot run CheckerJob: from > to ( {from} > {to} ), job is corrupted?",
150                [
151                    'from' => $from,
152                    'to' => $to,
153                ]
154            );
155            return true;
156        }
157
158        if ( ( $to - $from ) > $chunkSize ) {
159            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
160                "Cannot run CheckerJob: to - from > chunkSize( {from}, {to} > {chunkSize} ), job is corrupted or profile mismatch?",
161                [
162                    'from' => $from,
163                    'to' => $to,
164                    'chunkSize' => $chunkSize,
165                ]
166            );
167            return true;
168        }
169
170        $clusterNames = implode( ', ', array_keys( $connections ) );
171
172        LoggerFactory::getInstance( 'CirrusSearch' )->debug(
173            "Running CheckerJob on cluster $clusterNames {diff}s after insertion",
174            [
175                'diff' => time() - $this->params['createdAt'],
176                'clusters' => array_keys( $connections ),
177            ]
178        );
179
180        $isOld = null;
181        $reindexAfterLoops = $profile['reindex_after_loops'] ?? null;
182        if ( $reindexAfterLoops ) {
183            $isOld = Checker::makeIsOldClosure(
184                $this->params['loopId'],
185                $reindexAfterLoops
186            );
187        }
188
189        $startTime = time();
190
191        $pageCache = new ArrayObject();
192        /**
193         * @var Checker[] $checkers
194         */
195        $checkers = [];
196        $perClusterRemediators = [];
197        $perClusterBufferedRemediators = [];
198        $logger = LoggerFactory::getInstance( 'CirrusSearch' );
199        $assignment = $this->searchConfig->getClusterAssignment();
200        foreach ( $connections as $cluster => $connection ) {
201            $searcher = new Searcher( $connection, 0, 0, $this->searchConfig, [], null );
202            $remediator = $assignment->canWriteToCluster( $cluster, UpdateGroup::SANEITIZER )
203                ? new QueueingRemediator( $cluster ) : new LogOnlyRemediator( $logger );
204            $bufferedRemediator = new BufferedRemediator();
205            $checker = new Checker(
206                $this->searchConfig,
207                $connection,
208                $bufferedRemediator,
209                $searcher,
210                false, // logSane
211                false, // fastRedirectCheck
212                $pageCache,
213                $isOld
214            );
215            $checkers[$cluster] = $checker;
216            $perClusterRemediators[$cluster] = $remediator;
217            $perClusterBufferedRemediators[$cluster] = $bufferedRemediator;
218        }
219
220        $multiClusterRemediator = new MultiClusterRemediatorHelper( $perClusterRemediators, $perClusterBufferedRemediators,
221            new AllClustersQueueingRemediator(
222                $this->getSearchConfig()->getClusterAssignment(),
223                MediaWikiServices::getInstance()->getJobQueueGroup()
224            ) );
225
226        $ranges = array_chunk( range( $from, $to ), $batchSize );
227        foreach ( $ranges as $pageIds ) {
228            if ( self::getPressure() > $maxPressure ) {
229                $this->retry( "too much pressure on update jobs", reset( $pageIds ) );
230                return true;
231            }
232            if ( time() - $startTime > $maxTime ) {
233                $this->retry( "execution time exceeded checker_job_max_time", reset( $pageIds ) );
234                return true;
235            }
236            $pageCache->exchangeArray( [] );
237            foreach ( $checkers as $cluster => $checker ) {
238                try {
239                    $checker->check( $pageIds );
240                } catch ( CheckerException $checkerException ) {
241                    $this->retry( "Failed to verify ids on $cluster" . $checkerException->getMessage(), reset( $pageIds ), $cluster );
242                    unset( $checkers[$cluster] );
243                }
244            }
245            $multiClusterRemediator->sendBatch();
246        }
247        return true;
248    }
249
250    /**
251     * @return int the total number of update jobs enqueued
252     */
253    public static function getPressure() {
254        $queues = [
255            'cirrusSearchLinksUpdatePrioritized',
256            'cirrusSearchLinksUpdate',
257            'cirrusSearchElasticaWrite',
258            'cirrusSearchOtherIndex',
259            'cirrusSearchDeletePages',
260        ];
261        $size = 0;
262        $jobQueueGroup = MediaWikiServices::getInstance()->getJobQueueGroup();
263        foreach ( $queues as $queueName ) {
264            $queue = $jobQueueGroup->get( $queueName );
265            $size += $queue->getSize();
266            $size += $queue->getDelayedCount();
267        }
268
269        return $size;
270    }
271
272    /**
273     * This job handles all its own retries internally.
274     * @return bool
275     */
276    public function allowRetries() {
277        return true;
278    }
279
280    /**
281     * Retry the job later with a new from offset
282     * @param string $cause why we retry
283     * @param int $newFrom the new from offset
284     * @param string|null $cluster Cluster job is for
285     */
286    private function retry( $cause, $newFrom, $cluster = null ) {
287        if ( $this->params['retryCount'] >= self::JOB_MAX_RETRIES ) {
288            LoggerFactory::getInstance( 'CirrusSearch' )->info(
289                "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Abandonning CheckerJob after {retries} retries " .
290                "for {cluster}, (jobs_chunk_size too high?).",
291                [
292                    'retries' => $this->params['retryCount'],
293                    'fromPageId' => $this->params['fromPageId'],
294                    'toPageId' => $this->params['toPageId'],
295                    'cluster' => $cluster ?: 'all clusters'
296                ]
297            );
298            return;
299        }
300
301        $delay = $this->backoffDelay( $this->params['retryCount'] );
302        $params = $this->params;
303        if ( $cluster !== null ) {
304            $params['cluster'] = $cluster;
305        }
306        $params['retryCount']++;
307        $params['fromPageId'] = $newFrom;
308        unset( $params['jobReleaseTimestamp'] );
309        $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup();
310        $params += self::buildJobDelayOptions( self::class, $delay, $jobQueue );
311        $job = new self( $params );
312        LoggerFactory::getInstance( 'CirrusSearch' )->info(
313            "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Requeueing CheckerJob " .
314            "for {cluster} with a delay of {delay}s.",
315            [
316                'delay' => $delay,
317                'fromPageId' => $job->params['fromPageId'],
318                'toPageId' => $job->params['toPageId'],
319                'cluster' => $cluster ?: 'all clusters'
320            ]
321        );
322        $jobQueue->push( $job );
323    }
324}