Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 188
0.00% covered (danger)
0.00%
0 / 7
CRAP
0.00% covered (danger)
0.00%
0 / 1
CheckerJob
0.00% covered (danger)
0.00%
0 / 188
0.00% covered (danger)
0.00%
0 / 7
1190
0.00% covered (danger)
0.00%
0 / 1
 build
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
2
 __construct
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 doJob
0.00% covered (danger)
0.00%
0 / 117
0.00% covered (danger)
0.00%
0 / 1
420
 makeIsOldClosure
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getPressure
0.00% covered (danger)
0.00%
0 / 14
0.00% covered (danger)
0.00%
0 / 1
6
 allowRetries
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 retry
0.00% covered (danger)
0.00%
0 / 32
0.00% covered (danger)
0.00%
0 / 1
30
1<?php
2
3namespace CirrusSearch\Job;
4
5use ArrayObject;
6use CirrusSearch\Profile\SearchProfileService;
7use CirrusSearch\Sanity\AllClustersQueueingRemediator;
8use CirrusSearch\Sanity\BufferedRemediator;
9use CirrusSearch\Sanity\Checker;
10use CirrusSearch\Sanity\CheckerException;
11use CirrusSearch\Sanity\MultiClusterRemediatorHelper;
12use CirrusSearch\Sanity\QueueingRemediator;
13use CirrusSearch\Searcher;
14use MediaWiki\Logger\LoggerFactory;
15use MediaWiki\MediaWikiServices;
16
17/**
18 * Job wrapper around Sanity\Checker
19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License along
31 * with this program; if not, write to the Free Software Foundation, Inc.,
32 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
33 * http://www.gnu.org/copyleft/gpl.html
34 */
35class CheckerJob extends CirrusGenericJob {
36    /**
37     * @const int max number of retries, 3 means that the job can be run at
38     * most 4 times.
39     */
40    private const JOB_MAX_RETRIES = 3;
41
42    /**
43     * Construct a new CherckerJob.
44     * @param int $fromPageId
45     * @param int $toPageId
46     * @param int $delay
47     * @param string $profile sanitization profile to use
48     * @param string|null $cluster
49     * @param int $loopId The number of times the checker jobs have looped
50     *  over the pages to be checked.
51     * @return CheckerJob
52     */
53    public static function build( $fromPageId, $toPageId, $delay, $profile, $cluster, $loopId ) {
54        $job = new self( [
55            'fromPageId' => $fromPageId,
56            'toPageId' => $toPageId,
57            'createdAt' => time(),
58            'retryCount' => 0,
59            'profile' => $profile,
60            'cluster' => $cluster,
61            'loopId' => $loopId,
62        ] + self::buildJobDelayOptions( self::class, $delay ) );
63        return $job;
64    }
65
66    /**
67     * @param array $params
68     */
69    public function __construct( array $params ) {
70        // BC for jobs created before id fields were clarified to be explicitly page id's
71        if ( isset( $params['fromId'] ) ) {
72            $params['fromPageId'] = $params['fromId'];
73            unset( $params['fromId'] );
74        }
75        if ( isset( $params['toId'] ) ) {
76            $params['toPageId'] = $params['toId'];
77            unset( $params['toId'] );
78        }
79        // BC for jobs created before loop id existed
80        if ( !isset( $params['loopId'] ) ) {
81            $params['loopId'] = 0;
82        }
83        parent::__construct( $params );
84    }
85
86    /**
87     * @return bool
88     * @throws \MWException
89     */
90    protected function doJob() {
91        $profile = $this->searchConfig
92            ->getProfileService()
93            ->loadProfileByName( SearchProfileService::SANEITIZER, $this->params['profile'], false );
94
95        // First perform a set of sanity checks and return true to fake a success (to prevent retries)
96        // in case the job params are incorrect. These errors are generally unrecoverable.
97        if ( !$profile ) {
98            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
99                "Cannot run CheckerJob invalid profile {profile} provided, check CirrusSearchSanityCheck config.",
100                [
101                    'profile' => $this->params['profile']
102                ]
103            );
104            return true;
105        }
106        $maxPressure = $profile['update_jobs_max_pressure'] ?? null;
107        if ( !$maxPressure || $maxPressure < 0 ) {
108            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
109                "Cannot run CheckerJob invalid update_jobs_max_pressure, check CirrusSearchSanityCheck config."
110            );
111            return true;
112        }
113        $batchSize = $profile['checker_batch_size'] ?? null;
114        if ( !$batchSize || $batchSize < 0 ) {
115            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
116                "Cannot run CheckerJob invalid checker_batch_size, check CirrusSearchSanityCheck config."
117            );
118            return true;
119        }
120
121        $chunkSize = $profile['jobs_chunk_size'] ?? null;
122        if ( !$chunkSize || $chunkSize < 0 ) {
123            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
124                "Cannot run CheckerJob invalid jobs_chunk_size, check CirrusSearchSanityCheck config."
125            );
126            return true;
127        }
128
129        $maxTime = $profile['checker_job_max_time'] ?? null;
130        if ( !$maxTime || $maxTime < 0 ) {
131            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
132                "Cannot run CheckerJob invalid checker_job_max_time, check CirrusSearchSanityCheck config."
133            );
134            return true;
135        }
136
137        $connections = $this->decideClusters();
138        if ( empty( $connections ) ) {
139            return true;
140        }
141
142        $from = $this->params['fromPageId'];
143        $to = $this->params['toPageId'];
144
145        if ( $from > $to ) {
146            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
147                "Cannot run CheckerJob: from > to ( {from} > {to} ), job is corrupted?",
148                [
149                    'from' => $from,
150                    'to' => $to,
151                ]
152            );
153            return true;
154        }
155
156        if ( ( $to - $from ) > $chunkSize ) {
157            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
158                "Cannot run CheckerJob: to - from > chunkSize( {from}, {to} > {chunkSize} ), job is corrupted or profile mismatch?",
159                [
160                    'from' => $from,
161                    'to' => $to,
162                    'chunkSize' => $chunkSize,
163                ]
164            );
165            return true;
166        }
167
168        $clusterNames = implode( ', ', array_keys( $connections ) );
169
170        LoggerFactory::getInstance( 'CirrusSearch' )->debug(
171            "Running CheckerJob on cluster $clusterNames {diff}s after insertion",
172            [
173                'diff' => time() - $this->params['createdAt'],
174                'clusters' => array_keys( $connections ),
175            ]
176        );
177
178        $isOld = null;
179        $reindexAfterLoops = $profile['reindex_after_loops'] ?? null;
180        if ( $reindexAfterLoops ) {
181            $isOld = self::makeIsOldClosure(
182                $this->params['loopId'],
183                $reindexAfterLoops
184            );
185        }
186
187        $startTime = time();
188
189        $pageCache = new ArrayObject();
190        /**
191         * @var Checker[] $checkers
192         */
193        $checkers = [];
194        $perClusterRemediators = [];
195        $perClusterBufferedRemediators = [];
196        foreach ( $connections as $cluster => $connection ) {
197            $searcher = new Searcher( $connection, 0, 0, $this->searchConfig, [], null );
198            $remediator = new QueueingRemediator( $cluster );
199            $bufferedRemediator = new BufferedRemediator();
200            $checker = new Checker(
201                $this->searchConfig,
202                $connection,
203                $bufferedRemediator,
204                $searcher,
205                false, // logSane
206                false, // fastRedirectCheck
207                $pageCache,
208                $isOld
209            );
210            $checkers[$cluster] = $checker;
211            $perClusterRemediators[$cluster] = $remediator;
212            $perClusterBufferedRemediators[$cluster] = $bufferedRemediator;
213        }
214
215        $multiClusterRemediator = new MultiClusterRemediatorHelper( $perClusterRemediators, $perClusterBufferedRemediators,
216            new AllClustersQueueingRemediator(
217                $this->getSearchConfig()->getClusterAssignment(),
218                MediaWikiServices::getInstance()->getJobQueueGroup()
219            ) );
220
221        $ranges = array_chunk( range( $from, $to ), $batchSize );
222        while ( $pageIds = array_shift( $ranges ) ) {
223            if ( self::getPressure() > $maxPressure ) {
224                $this->retry( "too much pressure on update jobs", reset( $pageIds ) );
225                return true;
226            }
227            if ( time() - $startTime > $maxTime ) {
228                $this->retry( "execution time exceeded checker_job_max_time", reset( $pageIds ) );
229                return true;
230            }
231            $pageCache->exchangeArray( [] );
232            foreach ( $checkers as $cluster => $checker ) {
233                try {
234                    $checker->check( $pageIds );
235                } catch ( CheckerException $checkerException ) {
236                    $this->retry( "Failed to verify ids: " . $checkerException->getMessage(), reset( $pageIds ), $cluster );
237                    unset( $checkers[$cluster] );
238                }
239            }
240            $multiClusterRemediator->sendBatch();
241        }
242        return true;
243    }
244
245    /**
246     * Decide if a document should be reindexed based on time since last reindex
247     *
248     * Consider a page as old every $numCycles times the saneitizer loops over
249     * the same document. This ensures documents have been reindexed within the
250     * last `$numCycles * actual_loop_duration` (note that the configured
251     * duration is min_loop_duration, but in practice configuration ensures min
252     * and actual are typically the same).
253     *
254     * @param int $loopId The number of times the checker has looped over
255     *  the document set.
256     * @param int $numCycles The number of loops after which a document
257     *  is considered old.
258     * @return \Closure
259     */
260    private static function makeIsOldClosure( $loopId, $numCycles ) {
261        $loopMod = $loopId % $numCycles;
262        return static function ( \WikiPage $page ) use ( $numCycles, $loopMod ) {
263            $pageIdMod = $page->getId() % $numCycles;
264            return $pageIdMod == $loopMod;
265        };
266    }
267
268    /**
269     * @return int the total number of update jobs enqueued
270     */
271    public static function getPressure() {
272        $queues = [
273            'cirrusSearchLinksUpdatePrioritized',
274            'cirrusSearchLinksUpdate',
275            'cirrusSearchElasticaWrite',
276            'cirrusSearchOtherIndex',
277            'cirrusSearchDeletePages',
278        ];
279        $size = 0;
280        $jobQueueGroup = MediaWikiServices::getInstance()->getJobQueueGroup();
281        foreach ( $queues as $queueName ) {
282            $queue = $jobQueueGroup->get( $queueName );
283            $size += $queue->getSize();
284            $size += $queue->getDelayedCount();
285        }
286
287        return $size;
288    }
289
290    /**
291     * This job handles all its own retries internally.
292     * @return bool
293     */
294    public function allowRetries() {
295        return true;
296    }
297
298    /**
299     * Retry the job later with a new from offset
300     * @param string $cause why we retry
301     * @param int $newFrom the new from offset
302     * @param string|null $cluster Cluster job is for
303     */
304    private function retry( $cause, $newFrom, $cluster = null ) {
305        if ( $this->params['retryCount'] >= self::JOB_MAX_RETRIES ) {
306            LoggerFactory::getInstance( 'CirrusSearch' )->info(
307                "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Abandonning CheckerJob after {retries} retries " .
308                "for {cluster}, (jobs_chunk_size too high?).",
309                [
310                    'retries' => $this->params['retryCount'],
311                    'fromPageId' => $this->params['fromPageId'],
312                    'toPageId' => $this->params['toPageId'],
313                    'cluster' => $cluster ?: 'all clusters'
314                ]
315            );
316            return;
317        }
318
319        $delay = $this->backoffDelay( $this->params['retryCount'] );
320        $params = $this->params;
321        if ( $cluster !== null ) {
322            $params['cluster'] = $cluster;
323        }
324        $params['retryCount']++;
325        $params['fromPageId'] = $newFrom;
326        unset( $params['jobReleaseTimestamp'] );
327        $params += self::buildJobDelayOptions( self::class, $delay );
328        $job = new self( $params );
329        LoggerFactory::getInstance( 'CirrusSearch' )->info(
330            "Sanitize CheckerJob: $cause ({fromPageId}:{toPageId}), Requeueing CheckerJob " .
331            "for {cluster} with a delay of {delay}s.",
332            [
333                'delay' => $delay,
334                'fromPageId' => $job->params['fromPageId'],
335                'toPageId' => $job->params['toPageId'],
336                'cluster' => $cluster ?: 'all clusters'
337            ]
338        );
339        MediaWikiServices::getInstance()->getJobQueueGroup()->push( $job );
340    }
341}