Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
6.45% covered (danger)
6.45%
14 / 217
0.00% covered (danger)
0.00%
0 / 20
CRAP
0.00% covered (danger)
0.00%
0 / 1
Reindexer
6.45% covered (danger)
6.45%
14 / 217
0.00% covered (danger)
0.00%
0 / 20
3107.27
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 reindex
0.00% covered (danger)
0.00%
0 / 44
0.00% covered (danger)
0.00%
0 / 1
56
 waitForCounts
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
30
 waitForGreen
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 getHealth
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 decideMaxShardsPerNodeForReindex
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setConnectionTimeout
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 destroyClients
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 output
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 outputIndented
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 error
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 fatalError
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 makeUpdateFieldsScript
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
42
 makeRemoteReindexInfo
58.33% covered (warning)
58.33%
14 / 24
0.00% covered (danger)
0.00%
0 / 1
12.63
 monitorReindexTask
0.00% covered (danger)
0.00%
0 / 36
0.00% covered (danger)
0.00%
0 / 1
56
 monitorSleepSeconds
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 estimateTimeRemaining
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
30
 estimateSlices
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfNodes
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfShards
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3namespace CirrusSearch\Maintenance;
4
5use CirrusSearch\Connection;
6use CirrusSearch\Elastica\ReindexRequest;
7use CirrusSearch\Elastica\ReindexResponse;
8use CirrusSearch\Elastica\ReindexTask;
9use CirrusSearch\SearchConfig;
10use Elastica\Client;
11use Elastica\Exception\Connection\HttpException;
12use Elastica\Index;
13use Elastica\Request;
14use Elastica\Transport\Http;
15use Elastica\Transport\Https;
16
17/**
18 * This program is free software; you can redistribute it and/or modify
19 * it under the terms of the GNU General Public License as published by
20 * the Free Software Foundation; either version 2 of the License, or
21 * (at your option) any later version.
22 *
23 * This program is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License along
29 * with this program; if not, write to the Free Software Foundation, Inc.,
30 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
31 * http://www.gnu.org/copyleft/gpl.html
32 */
33class Reindexer {
34    private const MAX_CONSECUTIVE_ERRORS = 5;
35    private const MONITOR_SLEEP_SECONDS = 30;
36    private const MAX_WAIT_FOR_COUNT_SEC = 600;
37    private const AUTO_SLICE_CEILING = 20;
38
39    /**
40     * @var SearchConfig
41     */
42    private $searchConfig;
43
44    /* "From" portion */
45    /**
46     * @var Index
47     */
48    private $oldIndex;
49
50    /**
51     * @var Connection
52     */
53    private $oldConnection;
54
55    /* "To" portion */
56
57    /**
58     * @var Index
59     */
60    private $index;
61
62    /**
63     * @var Connection
64     */
65    private $connection;
66
67    /**
68     * @var Printer
69     */
70    private $out;
71
72    /**
73     * @var string[] list of fields to delete
74     */
75    private $fieldsToDelete;
76
77    /**
78     * @param SearchConfig $searchConfig
79     * @param Connection $source
80     * @param Connection $target
81     * @param Index $index
82     * @param Index $oldIndex
83     * @param Printer|null $out
84     * @param string[] $fieldsToDelete
85     */
86    public function __construct(
87        SearchConfig $searchConfig,
88        Connection $source,
89        Connection $target,
90        Index $index,
91        Index $oldIndex,
92        Printer $out = null,
93        $fieldsToDelete = []
94    ) {
95        // @todo: this constructor has too many arguments - refactor!
96        $this->searchConfig = $searchConfig;
97        $this->oldConnection = $source;
98        $this->connection = $target;
99        $this->oldIndex = $oldIndex;
100        $this->index = $index;
101        $this->out = $out;
102        $this->fieldsToDelete = $fieldsToDelete;
103    }
104
105    /**
106     * Dump everything from the live index into the one being worked on.
107     *
108     * @param int|null $slices The number of slices to use, or null to use
109     *  the number of shards
110     * @param int $chunkSize
111     * @param float $acceptableCountDeviation
112     */
113    public function reindex(
114        $slices = null,
115        $chunkSize = 100,
116        $acceptableCountDeviation = 0.05
117    ) {
118        // Set some settings that should help io load during bulk indexing.  We'll have to
119        // optimize after this to consolidate down to a proper number of segments but that is
120        // is worth the price.  total_shards_per_node will help to make sure that each shard
121        // has as few neighbors as possible.
122        $this->outputIndented( "Preparing index settings for reindex\n" );
123        $this->setConnectionTimeout();
124        $settings = $this->index->getSettings();
125        $oldSettings = $settings->get();
126        if ( !is_array( $oldSettings ) ) {
127            throw new \RuntimeException( 'Invalid response from index settings' );
128        }
129        $settings->set( [
130            'refresh_interval' => -1,
131            'routing.allocation.total_shards_per_node' =>
132                $this->decideMaxShardsPerNodeForReindex( $oldSettings ),
133            // It's probably inefficient to let the index be created with replicas,
134            // then drop the empty replicas a few moments later. Doing it like this
135            // allows reindexing and index creation to operate independantly without
136            // needing to know about each other.
137            'auto_expand_replicas' => 'false',
138            'number_of_replicas' => 0,
139        ] );
140        $this->waitForGreen();
141
142        $request = new ReindexRequest( $this->oldIndex, $this->index, $chunkSize );
143        if ( $slices === null ) {
144            $request->setSlices( $this->estimateSlices( $this->oldIndex ) );
145        } else {
146            $request->setSlices( $slices );
147        }
148        $remote = self::makeRemoteReindexInfo( $this->oldConnection, $this->connection );
149        if ( $remote !== null ) {
150            $request->setRemoteInfo( $remote );
151        }
152
153        $script = $this->makeUpdateFieldsScript();
154        if ( $script !== null ) {
155            $request->setScript( $script );
156        }
157
158        try {
159            $task = $request->reindexTask();
160        } catch ( \Exception $e ) {
161            $this->fatalError( $e->getMessage() );
162        }
163
164        $this->outputIndented( "Started reindex task: " . $task->getId() . "\n" );
165        $response = $this->monitorReindexTask( $task, $this->index );
166        $task->delete();
167        if ( !$response->isSuccessful() ) {
168            $this->fatalError(
169                "Reindex task was not successful: " . $response->getUnsuccessfulReason()
170            );
171        }
172
173        $this->outputIndented( "Verifying counts..." );
174        // We can't verify counts are exactly equal because they won't be - we still push updates
175        // into the old index while reindexing the new one.
176        $this->waitForCounts( $acceptableCountDeviation );
177        $this->output( "done\n" );
178
179        // Revert settings changed just for reindexing. Although we set number_of_replicas above
180        // we do not reset it's value here, rather allowing auto_expand_replicas to pick an
181        // appropriate value.
182        $newSettings = [
183            'refresh_interval' => $oldSettings['refresh_interval'],
184            'auto_expand_replicas' => $oldSettings['auto_expand_replicas'],
185            'routing.allocation.total_shards_per_node' =>
186                $oldSettings['routing']['allocation']['total_shards_per_node'] ?? -1,
187        ];
188        $settings->set( $newSettings );
189    }
190
191    /**
192     * @param float $acceptableCountDeviation
193     */
194    private function waitForCounts( float $acceptableCountDeviation ) {
195        $oldCount = (float)$this->oldIndex->count();
196        $this->index->refresh();
197        // While elasticsearch should be ready immediately after a refresh, we have seen this return
198        // exceptionally low values in 2% of reindex attempts. Wait around a bit and hope the refresh
199        // becomes available
200        $start = microtime( true );
201        $timeoutAfter = $start + self::MAX_WAIT_FOR_COUNT_SEC;
202        while ( true ) {
203            $newCount = (float)$this->index->count();
204            $difference = $oldCount > 0 ? abs( $oldCount - $newCount ) / $oldCount : 0;
205            if ( $difference <= $acceptableCountDeviation ) {
206                break;
207            }
208            $this->output(
209                "Not close enough!  old=$oldCount new=$newCount difference=$difference\n"
210            );
211            if ( microtime( true ) > $timeoutAfter ) {
212                $this->fatalError( 'Failed to load index - counts not close enough.  ' .
213                    "old=$oldCount new=$newCount difference=$difference.  " .
214                    'Check for warnings above.' );
215            } else {
216                $this->output( "Waiting to re-check counts..." );
217                sleep( 30 );
218            }
219        }
220    }
221
222    public function waitForGreen() {
223        $this->outputIndented( "Waiting for index green status..." );
224        $each = 0;
225        $status = $this->getHealth();
226        while ( $status['status'] !== 'green' ) {
227            if ( $each === 0 ) {
228                $this->output( '.' );
229            }
230            $each = ( $each + 1 ) % 20;
231            sleep( 1 );
232            $status = $this->getHealth();
233        }
234        $this->output( "done\n" );
235    }
236
237    /**
238     * Get health information about the index
239     *
240     * @return array Response data array
241     */
242    private function getHealth() {
243        $indexName = $this->index->getName();
244        $path = "_cluster/health/$indexName";
245        while ( true ) {
246            $response = $this->index->getClient()->request( $path );
247            if ( $response->hasError() ) {
248                $this->error( 'Error fetching index health but going to retry.  Message: ' .
249                    $response->getError() );
250                sleep( 1 );
251                continue;
252            }
253            return $response->getData();
254        }
255    }
256
257    /**
258     * Decide shards per node during reindex operation
259     *
260     * While reindexing we run with no replicas, meaning the default
261     * configuration for max shards per node might allow things to
262     * become very unbalanced. Choose a value that spreads the
263     * indexing load across as many instances as possible.
264     *
265     * @param array $settings Configured live index settings
266     * @return int
267     */
268    private function decideMaxShardsPerNodeForReindex( array $settings ): int {
269        $numberOfNodes = $this->getHealth()[ 'number_of_nodes' ];
270        $numberOfShards = $settings['number_of_shards'];
271        return (int)ceil( $numberOfShards / $numberOfNodes );
272    }
273
274    /**
275     * Set the maintenance timeout to the connection we will issue the reindex request
276     * to, so that it does not timeout while the reindex is running.
277     */
278    private function setConnectionTimeout() {
279        $timeout = $this->searchConfig->get( 'CirrusSearchMaintenanceTimeout' );
280        $this->connection->setTimeout( $timeout );
281    }
282
283    /**
284     * Destroy client connections
285     */
286    private function destroyClients() {
287        $this->connection->destroyClient();
288        $this->oldConnection->destroyClient();
289        // Destroying connections resets timeouts, so we have to reinstate them
290        $this->setConnectionTimeout();
291    }
292
293    /**
294     * @param string $message
295     * @param mixed|null $channel
296     */
297    protected function output( $message, $channel = null ) {
298        if ( $this->out ) {
299            $this->out->output( $message, $channel );
300        }
301    }
302
303    /**
304     * @param string $message
305     * @param string $prefix By default prefixes tab to fake an
306     *  additional indentation level.
307     */
308    private function outputIndented( $message, $prefix = "\t" ) {
309        if ( $this->out ) {
310            $this->out->outputIndented( $prefix . $message );
311        }
312    }
313
314    /**
315     * @param string $message
316     */
317    private function error( $message ) {
318        if ( $this->out ) {
319            $this->out->error( $message );
320        }
321    }
322
323    /**
324     * @param string $message
325     * @param int $exitCode
326     * @return never
327     */
328    private function fatalError( $message, $exitCode = 1 ) {
329        $this->error( $message );
330        exit( $exitCode );
331    }
332
333    /**
334     * @return array|null Returns an array suitable for use as
335     *  the _reindex api script parameter to delete fields from
336     *  the copied documents, or null if no script is needed.
337     */
338    private function makeUpdateFieldsScript() {
339        $script = [
340            'source' => '',
341            'lang' => 'painless',
342        ];
343        foreach ( $this->fieldsToDelete as $field ) {
344            $field = trim( $field );
345            if ( strlen( $field ) ) {
346                $script['source'] .= "ctx._source.remove('$field');";
347            }
348        }
349        // Populate the page_id if it's the first time we add the page_id field to the mapping
350        if ( !isset( $this->oldIndex->getMapping()['properties']['page_id'] )
351                 && isset( $this->index->getMapping()['properties']['page_id'] ) ) {
352            $this->outputIndented( "Populating the page_id field if not set\n" );
353            $prefLen = strlen( $this->searchConfig->makeId( 1 ) ) - 1;
354            $script['source'] .= "if (ctx._source.page_id == null) {ctx._source.page_id = Long.parseLong(ctx._id.substring($prefLen));}";
355        }
356        if ( $script['source'] === '' ) {
357            return null;
358        }
359
360        return $script;
361    }
362
363    /**
364     * Creates an array suitable for use as the _reindex api source.remote
365     * parameter to read from $oldConnection.
366     *
367     * This is very fragile, but the transports don't expose enough to do more really
368     *
369     * @param Connection $source Connection to read data from
370     * @param Connection $dest Connection to reindex data into
371     * @return array|null
372     */
373    public static function makeRemoteReindexInfo( Connection $source, Connection $dest ) {
374        if ( $source->getClusterName() === $dest->getClusterName() ) {
375            return null;
376        }
377
378        $innerConnection = $source->getClient()->getConnection();
379        $transport = $innerConnection->getTransportObject();
380        if ( !$transport instanceof Http ) {
381            throw new \RuntimeException(
382                'Remote reindex not implemented for transport: ' . get_class( $transport )
383            );
384        }
385
386        // We make some pretty bold assumptions that classes extending from \Elastica\Transport\Http
387        // don't change how any of this works.
388        $url = $innerConnection->hasConfig( 'url' )
389            ? $innerConnection->getConfig( 'url' )
390            : '';
391        if ( $url === '' ) {
392            $scheme = ( $transport instanceof Https )
393                ? 'https'
394                : 'http';
395            $url = $scheme . '://' . $innerConnection->getHost() . ':' .
396                $innerConnection->getPort() . '/' . $innerConnection->getPath();
397        }
398
399        if ( $innerConnection->getUsername() && $innerConnection->getPassword() ) {
400            return [
401                'host' => $url,
402                'username' => $innerConnection->getUsername(),
403                'password' => $innerConnection->getPassword(),
404            ];
405        } else {
406            return [ 'host' => $url ];
407        }
408    }
409
410    /**
411     * @param ReindexTask $task
412     * @param Index $target
413     * @return ReindexResponse
414     */
415    private function monitorReindexTask( ReindexTask $task, Index $target ) {
416        $consecutiveErrors = 0;
417        $sleepSeconds = self::monitorSleepSeconds( 1, 2, self::MONITOR_SLEEP_SECONDS );
418        $completionEstimateGen = self::estimateTimeRemaining();
419        while ( !$task->isComplete() ) {
420            try {
421                $status = $task->getStatus();
422            } catch ( \Exception $e ) {
423                if ( ++$consecutiveErrors > self::MAX_CONSECUTIVE_ERRORS ) {
424                    $this->output( "\n" );
425                    $this->fatalError(
426                        "$e\n\n" .
427                        "Lost connection to elasticsearch cluster. The reindex task "
428                        . "{$task->getId()} is still running.\nThe task should be manually "
429                        . "canceled, and the index {$target->getName()}\n"
430                        . "should be removed.\n" .
431                        $e->getMessage()
432                    );
433                }
434                if ( $e instanceof HttpException ) {
435                    // Allow through potentially intermittent network problems:
436                    // * couldn't connect,
437                    // * 28: timeout out
438                    // * 52: connected, closed with no response
439                    if ( !in_array( $e->getError(), [ CURLE_COULDNT_CONNECT, 28, 52 ] ) ) {
440                        // Wrap exception to include info about task id?
441                        throw $e;
442                    }
443                }
444                $this->outputIndented( "Error: {$e->getMessage()}\n" );
445                usleep( 500000 );
446                continue;
447            }
448
449            $consecutiveErrors = 0;
450
451            $estCompletion = $completionEstimateGen->send(
452                $status->getTotal() - $status->getCreated() );
453            // What is worth reporting here?
454            $this->outputIndented(
455                "Task: {$task->getId()} "
456                . "Search Retries: {$status->getSearchRetries()} "
457                . "Bulk Retries: {$status->getBulkRetries()} "
458                . "Indexed: {$status->getCreated()} / {$status->getTotal()} "
459                . "Complete: $estCompletion\n"
460            );
461            if ( !$status->isComplete() ) {
462                sleep( $sleepSeconds->current() );
463                $sleepSeconds->next();
464            }
465        }
466
467        return $task->getResponse();
468    }
469
470    private static function monitorSleepSeconds( $base, $ratio, $max ) {
471        $val = $base;
472        // @phan-suppress-next-line PhanInfiniteLoop https://github.com/phan/phan/issues/3545
473        while ( true ) {
474            yield $val;
475            $val = min( $max, $val * $ratio );
476        }
477    }
478
479    /**
480     * Generator returning the estimated timestamp of completion.
481     * @return \Generator Must be provided the remaining count via Generator::send, replies
482     *  with a unix timestamp estimating the completion time.
483     */
484    private static function estimateTimeRemaining(): \Generator {
485        $estimatedStr = null;
486        $remain = null;
487        $prevRemain = null;
488        $now = microtime( true );
489        while ( true ) {
490            $start = $now;
491            $prevRemain = $remain;
492            $remain = yield $estimatedStr;
493            $now = microtime( true );
494            if ( $remain === null || $prevRemain === null ) {
495                continue;
496            }
497            # Very simple calc, no smoothing and will vary wildly. Could be
498            # improved if deemed useful.
499            $elapsed  = $now - $start;
500            $rate = ( $prevRemain - $remain ) / $elapsed;
501            if ( $rate > 0 ) {
502                $estimatedCompletion = $now + ( $remain / $rate );
503                $estimatedStr = \MWTimestamp::convert( TS_RFC2822, $estimatedCompletion );
504            }
505        }
506    }
507
508    /**
509     * Auto detect the number of slices to use when reindexing.
510     *
511     * Note that elasticseach 7.x added an 'auto' setting, but we are on
512     * 6.x. That setting uses one slice per shard, up to a certain limit (20 in
513     * 7.9). This implementation provides the same limits, and adds an additional
514     * constraint that the auto-detected value must be <= the number of nodes.
515     *
516     * @param Index $index The index the estimate a slice count for
517     * @return int The number of slices to reindex with
518     */
519    private function estimateSlices( Index $index ) {
520        return min(
521            $this->getNumberOfNodes( $index->getClient() ),
522            $this->getNumberOfShards( $index ),
523            self::AUTO_SLICE_CEILING
524        );
525    }
526
527    private function getNumberOfNodes( Client $client ) {
528        $endpoint = ( new \Elasticsearch\Endpoints\Cat\Nodes() )
529            ->setParams( [ 'format' => 'json' ] );
530        return count( $client->requestEndpoint( $endpoint )->getData() );
531    }
532
533    private function getNumberOfShards( Index $index ) {
534        $response = $index->request( '_settings/index.number_of_shards', Request::GET );
535        $data = $response->getData();
536        // Can't use $index->getName() because that is probably an alias
537        $realIndexName = array_keys( $data )[0];
538        // In theory this should never happen, we will get a ResponseException if the index doesn't
539        // exist and every index must have a number_of_shards settings. But better safe than sorry.
540        if ( !isset( $data[$realIndexName]['settings']['index']['number_of_shards'] ) ) {
541            throw new \RuntimeException(
542                "Couldn't detect number of shards in {$index->getName()}"
543            );
544        }
545        return $data[$realIndexName]['settings']['index']['number_of_shards'];
546    }
547}