Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
6.45% covered (danger)
6.45%
14 / 217
0.00% covered (danger)
0.00%
0 / 20
CRAP
0.00% covered (danger)
0.00%
0 / 1
Reindexer
6.45% covered (danger)
6.45%
14 / 217
0.00% covered (danger)
0.00%
0 / 20
3107.27
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 reindex
0.00% covered (danger)
0.00%
0 / 44
0.00% covered (danger)
0.00%
0 / 1
56
 waitForCounts
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
30
 waitForGreen
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 getHealth
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 decideMaxShardsPerNodeForReindex
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setConnectionTimeout
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 destroyClients
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 output
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 outputIndented
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 error
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 fatalError
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 makeUpdateFieldsScript
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
42
 makeRemoteReindexInfo
58.33% covered (warning)
58.33%
14 / 24
0.00% covered (danger)
0.00%
0 / 1
12.63
 monitorReindexTask
0.00% covered (danger)
0.00%
0 / 36
0.00% covered (danger)
0.00%
0 / 1
56
 monitorSleepSeconds
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 estimateTimeRemaining
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
30
 estimateSlices
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfNodes
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfShards
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3namespace CirrusSearch\Maintenance;
4
5use CirrusSearch\Connection;
6use CirrusSearch\Elastica\ReindexRequest;
7use CirrusSearch\Elastica\ReindexResponse;
8use CirrusSearch\Elastica\ReindexTask;
9use CirrusSearch\SearchConfig;
10use Elastica\Client;
11use Elastica\Exception\Connection\HttpException;
12use Elastica\Index;
13use Elastica\Request;
14use Elastica\Transport\Http;
15use Elastica\Transport\Https;
16use MediaWiki\Utils\MWTimestamp;
17
18/**
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License along
30 * with this program; if not, write to the Free Software Foundation, Inc.,
31 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
32 * http://www.gnu.org/copyleft/gpl.html
33 */
34class Reindexer {
35    private const MAX_CONSECUTIVE_ERRORS = 5;
36    private const MONITOR_SLEEP_SECONDS = 30;
37    private const MAX_WAIT_FOR_COUNT_SEC = 600;
38    private const AUTO_SLICE_CEILING = 20;
39
40    /**
41     * @var SearchConfig
42     */
43    private $searchConfig;
44
45    /* "From" portion */
46    /**
47     * @var Index
48     */
49    private $oldIndex;
50
51    /**
52     * @var Connection
53     */
54    private $oldConnection;
55
56    /* "To" portion */
57
58    /**
59     * @var Index
60     */
61    private $index;
62
63    /**
64     * @var Connection
65     */
66    private $connection;
67
68    /**
69     * @var Printer
70     */
71    private $out;
72
73    /**
74     * @var string[] list of fields to delete
75     */
76    private $fieldsToDelete;
77
78    /**
79     * @param SearchConfig $searchConfig
80     * @param Connection $source
81     * @param Connection $target
82     * @param Index $index
83     * @param Index $oldIndex
84     * @param Printer|null $out
85     * @param string[] $fieldsToDelete
86     */
87    public function __construct(
88        SearchConfig $searchConfig,
89        Connection $source,
90        Connection $target,
91        Index $index,
92        Index $oldIndex,
93        Printer $out = null,
94        $fieldsToDelete = []
95    ) {
96        // @todo: this constructor has too many arguments - refactor!
97        $this->searchConfig = $searchConfig;
98        $this->oldConnection = $source;
99        $this->connection = $target;
100        $this->oldIndex = $oldIndex;
101        $this->index = $index;
102        $this->out = $out;
103        $this->fieldsToDelete = $fieldsToDelete;
104    }
105
106    /**
107     * Dump everything from the live index into the one being worked on.
108     *
109     * @param int|null $slices The number of slices to use, or null to use
110     *  the number of shards
111     * @param int $chunkSize
112     * @param float $acceptableCountDeviation
113     */
114    public function reindex(
115        $slices = null,
116        $chunkSize = 100,
117        $acceptableCountDeviation = 0.05
118    ) {
119        // Set some settings that should help io load during bulk indexing.  We'll have to
120        // optimize after this to consolidate down to a proper number of segments but that is
121        // is worth the price.  total_shards_per_node will help to make sure that each shard
122        // has as few neighbors as possible.
123        $this->outputIndented( "Preparing index settings for reindex\n" );
124        $this->setConnectionTimeout();
125        $settings = $this->index->getSettings();
126        $oldSettings = $settings->get();
127        if ( !is_array( $oldSettings ) ) {
128            throw new \RuntimeException( 'Invalid response from index settings' );
129        }
130        $settings->set( [
131            'refresh_interval' => -1,
132            'routing.allocation.total_shards_per_node' =>
133                $this->decideMaxShardsPerNodeForReindex( $oldSettings ),
134            // It's probably inefficient to let the index be created with replicas,
135            // then drop the empty replicas a few moments later. Doing it like this
136            // allows reindexing and index creation to operate independantly without
137            // needing to know about each other.
138            'auto_expand_replicas' => 'false',
139            'number_of_replicas' => 0,
140        ] );
141        $this->waitForGreen();
142
143        $request = new ReindexRequest( $this->oldIndex, $this->index, $chunkSize );
144        if ( $slices === null ) {
145            $request->setSlices( $this->estimateSlices( $this->oldIndex ) );
146        } else {
147            $request->setSlices( $slices );
148        }
149        $remote = self::makeRemoteReindexInfo( $this->oldConnection, $this->connection );
150        if ( $remote !== null ) {
151            $request->setRemoteInfo( $remote );
152        }
153
154        $script = $this->makeUpdateFieldsScript();
155        if ( $script !== null ) {
156            $request->setScript( $script );
157        }
158
159        try {
160            $task = $request->reindexTask();
161        } catch ( \Exception $e ) {
162            $this->fatalError( $e->getMessage() );
163        }
164
165        $this->outputIndented( "Started reindex task: " . $task->getId() . "\n" );
166        $response = $this->monitorReindexTask( $task, $this->index );
167        $task->delete();
168        if ( !$response->isSuccessful() ) {
169            $this->fatalError(
170                "Reindex task was not successful: " . $response->getUnsuccessfulReason()
171            );
172        }
173
174        $this->outputIndented( "Verifying counts..." );
175        // We can't verify counts are exactly equal because they won't be - we still push updates
176        // into the old index while reindexing the new one.
177        $this->waitForCounts( $acceptableCountDeviation );
178        $this->output( "done\n" );
179
180        // Revert settings changed just for reindexing. Although we set number_of_replicas above
181        // we do not reset it's value here, rather allowing auto_expand_replicas to pick an
182        // appropriate value.
183        $newSettings = [
184            'refresh_interval' => $oldSettings['refresh_interval'],
185            'auto_expand_replicas' => $oldSettings['auto_expand_replicas'],
186            'routing.allocation.total_shards_per_node' =>
187                $oldSettings['routing']['allocation']['total_shards_per_node'] ?? -1,
188        ];
189        $settings->set( $newSettings );
190    }
191
192    /**
193     * @param float $acceptableCountDeviation
194     */
195    private function waitForCounts( float $acceptableCountDeviation ) {
196        $oldCount = (float)$this->oldIndex->count();
197        $this->index->refresh();
198        // While elasticsearch should be ready immediately after a refresh, we have seen this return
199        // exceptionally low values in 2% of reindex attempts. Wait around a bit and hope the refresh
200        // becomes available
201        $start = microtime( true );
202        $timeoutAfter = $start + self::MAX_WAIT_FOR_COUNT_SEC;
203        while ( true ) {
204            $newCount = (float)$this->index->count();
205            $difference = $oldCount > 0 ? abs( $oldCount - $newCount ) / $oldCount : 0;
206            if ( $difference <= $acceptableCountDeviation ) {
207                break;
208            }
209            $this->output(
210                "Not close enough!  old=$oldCount new=$newCount difference=$difference\n"
211            );
212            if ( microtime( true ) > $timeoutAfter ) {
213                $this->fatalError( 'Failed to load index - counts not close enough.  ' .
214                    "old=$oldCount new=$newCount difference=$difference.  " .
215                    'Check for warnings above.' );
216            } else {
217                $this->output( "Waiting to re-check counts..." );
218                sleep( 30 );
219            }
220        }
221    }
222
223    public function waitForGreen() {
224        $this->outputIndented( "Waiting for index green status..." );
225        $each = 0;
226        $status = $this->getHealth();
227        while ( $status['status'] !== 'green' ) {
228            if ( $each === 0 ) {
229                $this->output( '.' );
230            }
231            $each = ( $each + 1 ) % 20;
232            sleep( 1 );
233            $status = $this->getHealth();
234        }
235        $this->output( "done\n" );
236    }
237
238    /**
239     * Get health information about the index
240     *
241     * @return array Response data array
242     */
243    private function getHealth() {
244        $indexName = $this->index->getName();
245        $path = "_cluster/health/$indexName";
246        while ( true ) {
247            $response = $this->index->getClient()->request( $path );
248            if ( $response->hasError() ) {
249                $this->error( 'Error fetching index health but going to retry.  Message: ' .
250                    $response->getError() );
251                sleep( 1 );
252                continue;
253            }
254            return $response->getData();
255        }
256    }
257
258    /**
259     * Decide shards per node during reindex operation
260     *
261     * While reindexing we run with no replicas, meaning the default
262     * configuration for max shards per node might allow things to
263     * become very unbalanced. Choose a value that spreads the
264     * indexing load across as many instances as possible.
265     *
266     * @param array $settings Configured live index settings
267     * @return int
268     */
269    private function decideMaxShardsPerNodeForReindex( array $settings ): int {
270        $numberOfNodes = $this->getHealth()[ 'number_of_nodes' ];
271        $numberOfShards = $settings['number_of_shards'];
272        return (int)ceil( $numberOfShards / $numberOfNodes );
273    }
274
275    /**
276     * Set the maintenance timeout to the connection we will issue the reindex request
277     * to, so that it does not timeout while the reindex is running.
278     */
279    private function setConnectionTimeout() {
280        $timeout = $this->searchConfig->get( 'CirrusSearchMaintenanceTimeout' );
281        $this->connection->setTimeout( $timeout );
282    }
283
284    /**
285     * Destroy client connections
286     */
287    private function destroyClients() {
288        $this->connection->destroyClient();
289        $this->oldConnection->destroyClient();
290        // Destroying connections resets timeouts, so we have to reinstate them
291        $this->setConnectionTimeout();
292    }
293
294    /**
295     * @param string $message
296     * @param mixed|null $channel
297     */
298    protected function output( $message, $channel = null ) {
299        if ( $this->out ) {
300            $this->out->output( $message, $channel );
301        }
302    }
303
304    /**
305     * @param string $message
306     * @param string $prefix By default prefixes tab to fake an
307     *  additional indentation level.
308     */
309    private function outputIndented( $message, $prefix = "\t" ) {
310        if ( $this->out ) {
311            $this->out->outputIndented( $prefix . $message );
312        }
313    }
314
315    /**
316     * @param string $message
317     */
318    private function error( $message ) {
319        if ( $this->out ) {
320            $this->out->error( $message );
321        }
322    }
323
324    /**
325     * @param string $message
326     * @param int $exitCode
327     * @return never
328     */
329    private function fatalError( $message, $exitCode = 1 ) {
330        $this->error( $message );
331        exit( $exitCode );
332    }
333
334    /**
335     * @return array|null Returns an array suitable for use as
336     *  the _reindex api script parameter to delete fields from
337     *  the copied documents, or null if no script is needed.
338     */
339    private function makeUpdateFieldsScript() {
340        $script = [
341            'source' => '',
342            'lang' => 'painless',
343        ];
344        foreach ( $this->fieldsToDelete as $field ) {
345            $field = trim( $field );
346            if ( strlen( $field ) ) {
347                $script['source'] .= "ctx._source.remove('$field');";
348            }
349        }
350        // Populate the page_id if it's the first time we add the page_id field to the mapping
351        if ( !isset( $this->oldIndex->getMapping()['properties']['page_id'] )
352                 && isset( $this->index->getMapping()['properties']['page_id'] ) ) {
353            $this->outputIndented( "Populating the page_id field if not set\n" );
354            $prefLen = strlen( $this->searchConfig->makeId( 1 ) ) - 1;
355            $script['source'] .= "if (ctx._source.page_id == null) {ctx._source.page_id = Long.parseLong(ctx._id.substring($prefLen));}";
356        }
357        if ( $script['source'] === '' ) {
358            return null;
359        }
360
361        return $script;
362    }
363
364    /**
365     * Creates an array suitable for use as the _reindex api source.remote
366     * parameter to read from $oldConnection.
367     *
368     * This is very fragile, but the transports don't expose enough to do more really
369     *
370     * @param Connection $source Connection to read data from
371     * @param Connection $dest Connection to reindex data into
372     * @return array|null
373     */
374    public static function makeRemoteReindexInfo( Connection $source, Connection $dest ) {
375        if ( $source->getClusterName() === $dest->getClusterName() ) {
376            return null;
377        }
378
379        $innerConnection = $source->getClient()->getConnection();
380        $transport = $innerConnection->getTransportObject();
381        if ( !$transport instanceof Http ) {
382            throw new \RuntimeException(
383                'Remote reindex not implemented for transport: ' . get_class( $transport )
384            );
385        }
386
387        // We make some pretty bold assumptions that classes extending from \Elastica\Transport\Http
388        // don't change how any of this works.
389        $url = $innerConnection->hasConfig( 'url' )
390            ? $innerConnection->getConfig( 'url' )
391            : '';
392        if ( $url === '' ) {
393            $scheme = ( $transport instanceof Https )
394                ? 'https'
395                : 'http';
396            $url = $scheme . '://' . $innerConnection->getHost() . ':' .
397                $innerConnection->getPort() . '/' . $innerConnection->getPath();
398        }
399
400        if ( $innerConnection->getUsername() && $innerConnection->getPassword() ) {
401            return [
402                'host' => $url,
403                'username' => $innerConnection->getUsername(),
404                'password' => $innerConnection->getPassword(),
405            ];
406        } else {
407            return [ 'host' => $url ];
408        }
409    }
410
411    /**
412     * @param ReindexTask $task
413     * @param Index $target
414     * @return ReindexResponse
415     */
416    private function monitorReindexTask( ReindexTask $task, Index $target ) {
417        $consecutiveErrors = 0;
418        $sleepSeconds = self::monitorSleepSeconds( 1, 2, self::MONITOR_SLEEP_SECONDS );
419        $completionEstimateGen = self::estimateTimeRemaining();
420        while ( !$task->isComplete() ) {
421            try {
422                $status = $task->getStatus();
423            } catch ( \Exception $e ) {
424                if ( ++$consecutiveErrors > self::MAX_CONSECUTIVE_ERRORS ) {
425                    $this->output( "\n" );
426                    $this->fatalError(
427                        "$e\n\n" .
428                        "Lost connection to elasticsearch cluster. The reindex task "
429                        . "{$task->getId()} is still running.\nThe task should be manually "
430                        . "canceled, and the index {$target->getName()}\n"
431                        . "should be removed.\n" .
432                        $e->getMessage()
433                    );
434                }
435                if ( $e instanceof HttpException ) {
436                    // Allow through potentially intermittent network problems:
437                    // * couldn't connect,
438                    // * 28: timeout out
439                    // * 52: connected, closed with no response
440                    if ( !in_array( $e->getError(), [ CURLE_COULDNT_CONNECT, 28, 52 ] ) ) {
441                        // Wrap exception to include info about task id?
442                        throw $e;
443                    }
444                }
445                $this->outputIndented( "Error: {$e->getMessage()}\n" );
446                usleep( 500000 );
447                continue;
448            }
449
450            $consecutiveErrors = 0;
451
452            $estCompletion = $completionEstimateGen->send(
453                $status->getTotal() - $status->getCreated() );
454            // What is worth reporting here?
455            $this->outputIndented(
456                "Task: {$task->getId()} "
457                . "Search Retries: {$status->getSearchRetries()} "
458                . "Bulk Retries: {$status->getBulkRetries()} "
459                . "Indexed: {$status->getCreated()} / {$status->getTotal()} "
460                . "Complete: $estCompletion\n"
461            );
462            if ( !$status->isComplete() ) {
463                sleep( $sleepSeconds->current() );
464                $sleepSeconds->next();
465            }
466        }
467
468        return $task->getResponse();
469    }
470
471    private static function monitorSleepSeconds( $base, $ratio, $max ) {
472        $val = $base;
473        // @phan-suppress-next-line PhanInfiniteLoop https://github.com/phan/phan/issues/3545
474        while ( true ) {
475            yield $val;
476            $val = min( $max, $val * $ratio );
477        }
478    }
479
480    /**
481     * Generator returning the estimated timestamp of completion.
482     * @return \Generator Must be provided the remaining count via Generator::send, replies
483     *  with a unix timestamp estimating the completion time.
484     */
485    private static function estimateTimeRemaining(): \Generator {
486        $estimatedStr = null;
487        $remain = null;
488        $prevRemain = null;
489        $now = microtime( true );
490        while ( true ) {
491            $start = $now;
492            $prevRemain = $remain;
493            $remain = yield $estimatedStr;
494            $now = microtime( true );
495            if ( $remain === null || $prevRemain === null ) {
496                continue;
497            }
498            # Very simple calc, no smoothing and will vary wildly. Could be
499            # improved if deemed useful.
500            $elapsed  = $now - $start;
501            $rate = ( $prevRemain - $remain ) / $elapsed;
502            if ( $rate > 0 ) {
503                $estimatedCompletion = $now + ( $remain / $rate );
504                $estimatedStr = MWTimestamp::convert( TS_RFC2822, $estimatedCompletion );
505            }
506        }
507    }
508
509    /**
510     * Auto detect the number of slices to use when reindexing.
511     *
512     * Note that elasticseach 7.x added an 'auto' setting, but we are on
513     * 6.x. That setting uses one slice per shard, up to a certain limit (20 in
514     * 7.9). This implementation provides the same limits, and adds an additional
515     * constraint that the auto-detected value must be <= the number of nodes.
516     *
517     * @param Index $index The index the estimate a slice count for
518     * @return int The number of slices to reindex with
519     */
520    private function estimateSlices( Index $index ) {
521        return min(
522            $this->getNumberOfNodes( $index->getClient() ),
523            $this->getNumberOfShards( $index ),
524            self::AUTO_SLICE_CEILING
525        );
526    }
527
528    private function getNumberOfNodes( Client $client ) {
529        $endpoint = ( new \Elasticsearch\Endpoints\Cat\Nodes() )
530            ->setParams( [ 'format' => 'json' ] );
531        return count( $client->requestEndpoint( $endpoint )->getData() );
532    }
533
534    private function getNumberOfShards( Index $index ) {
535        $response = $index->request( '_settings/index.number_of_shards', Request::GET );
536        $data = $response->getData();
537        // Can't use $index->getName() because that is probably an alias
538        $realIndexName = array_keys( $data )[0];
539        // In theory this should never happen, we will get a ResponseException if the index doesn't
540        // exist and every index must have a number_of_shards settings. But better safe than sorry.
541        if ( !isset( $data[$realIndexName]['settings']['index']['number_of_shards'] ) ) {
542            throw new \RuntimeException(
543                "Couldn't detect number of shards in {$index->getName()}"
544            );
545        }
546        return $data[$realIndexName]['settings']['index']['number_of_shards'];
547    }
548}