Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 370
0.00% covered (danger)
0.00%
0 / 26
CRAP
0.00% covered (danger)
0.00%
0 / 1
UpdateSuggesterIndex
0.00% covered (danger)
0.00%
0 / 363
0.00% covered (danger)
0.00%
0 / 26
4830
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 32
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 52
0.00% covered (danger)
0.00%
0 / 1
72
 workAroundBrokenMessageCache
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 checkAndDeleteBrokenIndices
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
30
 rebuild
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
6
 canRecycle
0.00% covered (danger)
0.00%
0 / 42
0.00% covered (danger)
0.00%
0 / 1
110
 recycle
0.00% covered (danger)
0.00%
0 / 45
0.00% covered (danger)
0.00%
0 / 1
42
 deleteOldIndex
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 deleteIndex
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 optimize
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 expungeDeletes
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 indexData
0.00% covered (danger)
0.00%
0 / 48
0.00% covered (danger)
0.00%
0 / 1
56
 validateAlias
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 outputProgress
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
20
 log
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 pickAnalyzer
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 createIndex
0.00% covered (danger)
0.00%
0 / 31
0.00% covered (danger)
0.00%
0 / 1
20
 enableReplicas
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
2
 waitForGreen
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 getReplicaCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getShardCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getMaxShardsPerNode
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 updateVersions
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getIndex
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getClient
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getIndexAliasName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch\Maintenance;
4
5use CirrusSearch\BuildDocument\Completion\SuggestBuilder;
6use CirrusSearch\Connection;
7use CirrusSearch\ElasticaErrorHandler;
8use CirrusSearch\Maintenance\Validators\AnalyzersValidator;
9use CirrusSearch\SearchConfig;
10use Elastica;
11use Elastica\Index;
12use Elastica\Query;
13use Elastica\Request;
14use Elastica\Status;
15use MediaWiki\Extension\Elastica\MWElasticUtils;
16use RuntimeException;
17
18/**
19 * Update the search configuration on the search backend for the title
20 * suggest index.
21 *
22 * This program is free software; you can redistribute it and/or modify
23 * it under the terms of the GNU General Public License as published by
24 * the Free Software Foundation; either version 2 of the License, or
25 * (at your option) any later version.
26 *
27 * This program is distributed in the hope that it will be useful,
28 * but WITHOUT ANY WARRANTY; without even the implied warranty of
29 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * GNU General Public License for more details.
31 *
32 * You should have received a copy of the GNU General Public License along
33 * with this program; if not, write to the Free Software Foundation, Inc.,
34 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
35 * http://www.gnu.org/copyleft/gpl.html
36 */
37
38$IP = getenv( 'MW_INSTALL_PATH' );
39if ( $IP === false ) {
40    $IP = __DIR__ . '/../../..';
41}
42require_once "$IP/maintenance/Maintenance.php";
43require_once __DIR__ . '/../includes/Maintenance/Maintenance.php';
44
45class UpdateSuggesterIndex extends Maintenance {
46    /**
47     * @var string language code we're building for
48     */
49    private $langCode;
50
51    /**
52     * @var int
53     */
54    private $indexChunkSize;
55
56    /**
57     * @var int
58     */
59    private $indexRetryAttempts;
60
61    /**
62     * @var string
63     */
64    private $indexSuffix;
65
66    /**
67     * @var string
68     */
69    private $indexBaseName;
70
71    /**
72     * @var string
73     */
74    private $indexIdentifier;
75
76    /**
77     * @var Index old suggester index that will be deleted at the end of the process
78     */
79    private $oldIndex;
80
81    /**
82     * @var int
83     */
84    private $lastProgressPrinted;
85
86    /**
87     * @var bool optimize the index when done.
88     */
89    private $optimizeIndex;
90
91    /**
92     * @var array list of available plugins
93     */
94    private $availablePlugins;
95
96    /**
97     * @var string
98     */
99    private $masterTimeout;
100
101    /**
102     * @var ConfigUtils
103     */
104    private $utils;
105
106    /**
107     * @todo: public (used in closure)
108     * @var SuggestBuilder
109     */
110    public $builder;
111
112    /**
113     * @var array
114     */
115    private $analysisConfig;
116
117    /**
118     * @var bool
119     */
120    private $recycle = false;
121
122    /**
123     * @var string[]
124     */
125    private $bannedPlugins;
126
127    public function __construct() {
128        parent::__construct();
129        $this->addDescription( "Create a new suggester index. Always operates on a single cluster." );
130        $this->addOption( 'baseName', 'What basename to use for all indexes, ' .
131            'defaults to wiki id', false, true );
132        $this->addOption( 'indexChunkSize', 'Documents per shard to index in a batch.   ' .
133            'Note when changing the number of shards that the old shard size is used, not the new ' .
134            'one.  If you see many errors submitting documents in bulk but the automatic retry as ' .
135            'singles works then lower this number.  Defaults to 500.', false, true );
136        $this->addOption( 'indexRetryAttempts', 'Number of times to back off and retry ' .
137            'per failure.  Note that failures are not common but if Elasticsearch is in the process ' .
138            'of moving a shard this can time out.  This will retry the attempt after some backoff ' .
139            'rather than failing the whole reindex process.  Defaults to 5.', false, true );
140        $this->addOption( 'optimize',
141            'Optimize the index to 1 segment. Defaults to false.', false, false );
142        $this->addOption( 'scoringMethod',
143            'The scoring method to use when computing suggestion weights. ' .
144            'Defaults to $wgCirrusSearchCompletionDefaultScore or quality if unset.', false, true );
145        $this->addOption( 'masterTimeout',
146            'The amount of time to wait for the master to respond to mapping ' .
147            'updates before failing. Defaults to $wgCirrusSearchMasterTimeout.', false, true );
148        $this->addOption( 'replicationTimeout',
149            'The amount of time (seconds) to wait for the replica shards to initialize. ' .
150            'Defaults to 3600 seconds.', false, true );
151        $this->addOption( 'allocationIncludeTag',
152            'Set index.routing.allocation.include.tag on the created index. Useful if you want to ' .
153            'force the suggester index not to be allocated on a specific set of nodes.',
154            false, true );
155        $this->addOption( 'allocationExcludeTag',
156            'Set index.routing.allocation.exclude.tag on the created index. Useful if you want ' .
157            'to force the suggester index not to be allocated on a specific set of nodes.',
158            false, true );
159        $this->addOption( 'recreate', "Force the creation of a new index." );
160    }
161
162    public function execute() {
163        global $wgLanguageCode,
164            $wgCirrusSearchBannedPlugins,
165            $wgCirrusSearchMasterTimeout;
166
167        $this->disablePoolCountersAndLogging();
168        $this->workAroundBrokenMessageCache();
169        $this->masterTimeout = $this->getOption( 'masterTimeout', $wgCirrusSearchMasterTimeout );
170        $this->indexSuffix = Connection::TITLE_SUGGEST_INDEX_SUFFIX;
171
172        $useCompletion = $this->getSearchConfig()->get( 'CirrusSearchUseCompletionSuggester' );
173
174        if ( $useCompletion !== 'build' && $useCompletion !== 'yes' && $useCompletion !== true ) {
175            $this->fatalError( "Completion suggester disabled, quitting..." );
176        }
177
178        // Check that all shards and replicas settings are set
179        try {
180            $this->getShardCount();
181            $this->getReplicaCount();
182            $this->getMaxShardsPerNode();
183        } catch ( \Exception $e ) {
184            $this->fatalError(
185                "Failed to get shard count and replica count information: {$e->getMessage()}"
186            );
187        }
188
189        $this->indexBaseName = $this->getOption(
190            'baseName', $this->getSearchConfig()->get( SearchConfig::INDEX_BASE_NAME )
191        );
192        $this->indexChunkSize = $this->getOption( 'indexChunkSize', 500 );
193        $this->indexRetryAttempts = $this->getOption( 'reindexRetryAttempts', 5 );
194
195        $this->optimizeIndex = $this->getOption( 'optimize', false );
196
197        $this->utils = new ConfigUtils( $this->getClient(), $this );
198
199        $this->langCode = $wgLanguageCode;
200        $this->bannedPlugins = $wgCirrusSearchBannedPlugins;
201
202        $this->availablePlugins = $this->unwrap( $this->utils->scanAvailablePlugins( $this->bannedPlugins ) );
203        $this->analysisConfig = $this->pickAnalyzer( $this->langCode, $this->availablePlugins )
204            ->buildConfig();
205
206        $this->unwrap( $this->utils->checkElasticsearchVersion() );
207
208        try {
209            $this->requireCirrusReady();
210            $this->builder = SuggestBuilder::create( $this->getConnection(),
211                $this->getOption( 'scoringMethod' ), $this->indexBaseName );
212            # check for broken indices and delete them
213            $this->checkAndDeleteBrokenIndices();
214
215            if ( !$this->canRecycle() ) {
216                $this->rebuild();
217            } else {
218                $this->recycle();
219            }
220        } catch ( \Elastica\Exception\Connection\HttpException $e ) {
221            $message = $e->getMessage();
222            $this->log( "\nUnexpected Elasticsearch failure.\n" );
223            $this->fatalError( "Http error communicating with Elasticsearch:  $message.\n" );
224        } catch ( \Elastica\Exception\ExceptionInterface $e ) {
225            $type = get_class( $e );
226            $message = ElasticaErrorHandler::extractMessage( $e );
227            $trace = $e->getTraceAsString();
228            $this->log( "\nUnexpected Elasticsearch failure.\n" );
229            $this->fatalError( "Elasticsearch failed in an unexpected way.  " .
230                "This is always a bug in CirrusSearch.\n" .
231                "Error type: $type\n" .
232                "Message: $message\n" .
233                "Trace:\n" . $trace );
234        }
235
236        return true;
237    }
238
239    private function workAroundBrokenMessageCache() {
240        // Under some configurations (T288233) the i18n cache fails to
241        // initialize. After failing, at least in this particular deployment,
242        // it will fallback to local CDB files and ignore on-wiki overrides
243        // which is acceptable for this script.
244        try {
245            wfMessage( 'ok' )->text();
246        } catch ( \LogicException $e ) {
247            // The first failure should trigger the fallback mode, this second
248            // try should work (and not throw the LogicException deep in the updates).
249            wfMessage( 'ok' )->text();
250        }
251    }
252
253    /**
254     * Check for duplicate indices that may have been created
255     * by a previous update that failed.
256     */
257    private function checkAndDeleteBrokenIndices() {
258        $indices = $this->unwrap( $this->utils->getAllIndicesByType( $this->getIndexAliasName() ) );
259        if ( count( $indices ) < 2 ) {
260            return;
261        }
262        $indexByName = [];
263        foreach ( $indices as $indexName ) {
264            $status = $this->utils->isIndexLive( $indexName );
265            if ( !$status->isGood() ) {
266                $this->log( (string)$status );
267            } elseif ( $status->getValue() === false ) {
268                $this->log( "Deleting broken index {$indexName}\n" );
269                $this->deleteIndex( $this->getConnection()->getIndex( $indexName ) );
270            }
271        }
272        # If something went wrong the process will fail when calling pickIndexIdentifierFromOption
273    }
274
275    private function rebuild() {
276        $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption(
277            'current', $this->getIndexAliasName()
278        ) );
279        $this->oldIndex = $this->getConnection()->getIndex(
280            $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier
281        );
282        $this->indexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption(
283            'now', $this->getIndexAliasName()
284        ) );
285
286        $this->createIndex();
287        $this->indexData();
288        if ( $this->optimizeIndex ) {
289            $this->optimize();
290        }
291        $this->enableReplicas();
292        $this->getIndex()->refresh();
293        $this->validateAlias();
294        $this->updateVersions();
295        $this->deleteOldIndex();
296        $this->log( "Done.\n" );
297    }
298
299    private function canRecycle() {
300        global $wgCirrusSearchRecycleCompletionSuggesterIndex;
301        if ( !$wgCirrusSearchRecycleCompletionSuggesterIndex ) {
302            return false;
303        }
304
305        if ( $this->getOption( "recreate", false ) ) {
306            return false;
307        }
308
309        $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption(
310            'current', $this->getIndexAliasName()
311        ) );
312        $oldIndex = $this->getConnection()->getIndex(
313            $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier
314        );
315        if ( !$oldIndex->exists() ) {
316            $this->error( 'Index does not exist yet cannot recycle.' );
317            return false;
318        }
319        $refresh = $oldIndex->getSettings()->getRefreshInterval();
320        if ( $refresh != '-1' ) {
321            $this->error( 'Refresh interval is not -1, cannot recycle.' );
322            return false;
323        }
324
325        $shards = $oldIndex->getSettings()->get( 'number_of_shards' );
326        // We check only the number of shards since it cannot be updated.
327        if ( $shards != $this->getShardCount() ) {
328            $this->error( 'Number of shards mismatch cannot recycle.' );
329            return false;
330        }
331
332        $mMaj = explode( '.', SuggesterMappingConfigBuilder::VERSION, 2 )[0];
333        $aMaj = explode( '.', SuggesterAnalysisConfigBuilder::VERSION, 2 )[0];
334
335        try {
336            $versionDoc = $this->getMetaStore()
337                ->versionStore()
338                ->find( $this->indexBaseName, $this->indexSuffix );
339        } catch ( \Elastica\Exception\NotFoundException $nfe ) {
340            $this->error( 'Index missing in mw_cirrus_metastore::version, cannot recycle.' );
341            return false;
342        }
343
344        if ( $versionDoc->analysis_maj != $aMaj ) {
345            $this->error( 'Analysis config version mismatch, cannot recycle.' );
346            return false;
347        }
348
349        if ( $versionDoc->mapping_maj != $mMaj ) {
350            $this->error( 'Mapping config version mismatch, cannot recycle.' );
351            return false;
352        }
353
354        $validator = new AnalyzersValidator( $oldIndex, $this->analysisConfig, $this );
355        $status = $validator->validate();
356        if ( !$status->isOK() ) {
357            $this->error( 'Analysis config differs, cannot recycle.' );
358            return false;
359        }
360
361        return true;
362    }
363
364    /**
365     * Recycle a suggester index:
366     * 1/ index data (delete docs if it already exists)
367     * 2/ expunge deleted docs
368     * 3/ refresh the reader
369     *    - so we can run a quick delete on remaining docs
370     *      (the docs that were actually deleted)
371     *    - drawbacks we load the FST from an un-optimized index
372     * 4/ delete old docs
373     * 5/ optimize
374     * 6/ refresh the reader
375     *
376     * Drawbacks: the FST will be read from disk twice in a short
377     * amount of time.
378     * This is a trade off between cluster operation and disk operation.
379     * Recreating the index may require less disk operations but causes
380     * the cluster to rebalance.
381     * This is certainly the best strategy for small indices (less than 100k docs)
382     * but needs to be carefully tested on bigger indices with high QPS.
383     */
384    private function recycle() {
385        $this->log( "Recycling index {$this->getIndex()->getName()}\n" );
386        $this->recycle = true;
387        $this->indexData();
388        // This is fragile... hopefully most of the docs will be deleted from the old segments
389        // and will result in a fast operation.
390        // New segments should not be affected.
391        // Unfortunately if a failure causes the process to stop
392        // the FST will maybe contains duplicates as it cannot (elastic 1.7)
393        // filter deleted docs. We will rely on output deduplication
394        // but this will certainly affect performances.
395
396        $this->expungeDeletes();
397        // Refresh the reader so we can scroll over remaining docs.
398        // At this point we may read the new un-optimized FST segments
399        // Old ones should be pretty small after expungeDeletes
400        $this->getIndex()->refresh();
401
402        $boolNot = new Elastica\Query\BoolQuery();
403        $boolNot->addMustNot(
404            new Elastica\Query\Term( [ "batch_id" => $this->builder->getBatchId() ] )
405        );
406        $bool = new Elastica\Query\BoolQuery();
407        $bool->addFilter( $boolNot );
408
409        $query = new Elastica\Query();
410        $query->setQuery( $bool );
411        $query->setSize( $this->indexChunkSize );
412        $query->setSource( false );
413        $query->setSort( [ '_doc' ] );
414        // Explicitly ask for accurate total_hits even-though we use a scroll request
415        $query->setTrackTotalHits( true );
416        $search = new \Elastica\Search( $this->getClient() );
417        $search->setQuery( $query );
418        $search->addIndex( $this->getIndex() );
419        $scroll = new \Elastica\Scroll( $search, '15m' );
420
421        $totalDocsToDump = -1;
422        $docsDumped = 0;
423
424        $this->log( "Deleting remaining docs from previous batch\n" );
425        foreach ( $scroll as $results ) {
426            if ( $totalDocsToDump === -1 ) {
427                $totalDocsToDump = $results->getTotalHits();
428                if ( $totalDocsToDump === 0 ) {
429                    break;
430                }
431                $docsDumped = 0;
432            }
433            $docIds = [];
434            foreach ( $results as $result ) {
435                $docsDumped++;
436                $docIds[] = $result->getId();
437            }
438            $this->outputProgress( $docsDumped, $totalDocsToDump );
439            if ( !$docIds ) {
440                continue;
441            }
442
443            MWElasticUtils::withRetry( $this->indexRetryAttempts,
444                function () use ( $docIds ) {
445                    $this->getIndex()->deleteByQuery( new Query\Ids( $docIds ) );
446                }
447            );
448        }
449        $this->log( "Done.\n" );
450        // Old docs should be deleted now we can optimize and flush
451        $this->optimize();
452
453        // @todo add support for changing the number of replicas
454        // if the setting was changed in cirrus config.
455        // Workaround is to change the settings directly on the cluster.
456
457        // Refresh the reader so it now uses the optimized FST,
458        // and actually free and delete old segments.
459        $this->getIndex()->refresh();
460    }
461
462    private function deleteOldIndex() {
463        if ( $this->oldIndex && $this->oldIndex->exists() ) {
464            $this->log( "Deleting " . $this->oldIndex->getName() . " ... " );
465            // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated
466            // to allow passing the master_timeout
467            $this->oldIndex->request(
468                '',
469                Request::DELETE,
470                [],
471                [ 'master_timeout' => $this->masterTimeout ]
472            );
473            $this->output( "ok.\n" );
474        }
475    }
476
477    /**
478     * Delete an index
479     * @param \Elastica\Index $index
480     */
481    private function deleteIndex( \Elastica\Index $index ) {
482        // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated
483        // to allow passing the master_timeout
484        $index->request(
485            '',
486            Request::DELETE,
487            [],
488            [ 'master_timeout' => $this->masterTimeout ]
489        );
490    }
491
492    private function optimize() {
493        $this->log( "Optimizing index..." );
494        $this->getIndex()->forcemerge( [ 'max_num_segments' => 1 ] );
495        $this->output( "ok.\n" );
496    }
497
498    private function expungeDeletes() {
499        $this->log( "Purging deleted docs..." );
500        $this->getIndex()->forcemerge( [ 'only_expunge_deletes' => 'true', 'flush' => 'false' ] );
501        $this->output( "ok.\n" );
502    }
503
504    private function indexData() {
505        // We build the suggestions by reading CONTENT and GENERAL indices.
506        // This does not support extra indices like FILES on commons.
507        $sourceIndexSuffixes = [ Connection::CONTENT_INDEX_SUFFIX, Connection::GENERAL_INDEX_SUFFIX ];
508
509        $query = new Query();
510        $query->setSource( [
511            'includes' => $this->builder->getRequiredFields()
512        ] );
513
514        $pageAndNs = new Elastica\Query\BoolQuery();
515        $pageAndNs->addShould( new Elastica\Query\Term( [ "namespace" => NS_MAIN ] ) );
516        $pageAndNs->addShould( new Elastica\Query\Term( [ "redirect.namespace" => NS_MAIN ] ) );
517        $bool = new Elastica\Query\BoolQuery();
518        $bool->addFilter( $pageAndNs );
519
520        $query->setQuery( $bool );
521        $query->setSort( [ '_doc' ] );
522        // Explicitly ask for accurate total_hits even-though we use a scroll request
523        $query->setTrackTotalHits( true );
524
525        foreach ( $sourceIndexSuffixes as $sourceIndexSuffix ) {
526            $sourceIndex = $this->getConnection()->getIndex( $this->indexBaseName, $sourceIndexSuffix );
527            $search = new \Elastica\Search( $this->getClient() );
528            $search->setQuery( $query );
529            $search->addIndex( $sourceIndex );
530            $query->setSize( $this->indexChunkSize );
531            $totalDocsToDump = -1;
532            $scroll = new \Elastica\Scroll( $search, '15m' );
533
534            $docsDumped = 0;
535            $destinationIndex = $this->getIndex();
536
537            foreach ( $scroll as $results ) {
538                if ( $totalDocsToDump === -1 ) {
539                    $totalDocsToDump = $results->getTotalHits();
540                    if ( $totalDocsToDump === 0 ) {
541                        $this->log( "No documents to index from $sourceIndexSuffix\n" );
542                        break;
543                    }
544                    $this->log( "Indexing $totalDocsToDump documents from $sourceIndexSuffix with " .
545                        "batchId: {$this->builder->getBatchId()}\n" );
546                }
547                $inputDocs = [];
548                foreach ( $results as $result ) {
549                    $docsDumped++;
550                    $inputDocs[] = [
551                        'id' => $result->getId(),
552                        'source' => $result->getSource()
553                    ];
554                }
555
556                $suggestDocs = $this->builder->build( $inputDocs );
557                if ( !$suggestDocs ) {
558                    continue;
559                }
560                $this->outputProgress( $docsDumped, $totalDocsToDump );
561                MWElasticUtils::withRetry( $this->indexRetryAttempts,
562                    static function () use ( $destinationIndex, $suggestDocs ) {
563                        $destinationIndex->addDocuments( $suggestDocs );
564                    }
565                );
566            }
567            $this->log( "Indexing from $sourceIndexSuffix index done.\n" );
568        }
569    }
570
571    public function validateAlias() {
572        // @todo utilize the following once Elastica is updated to support passing
573        // master_timeout. This is a copy of the Elastica\Index::addAlias() method
574        // $this->getIndex()->addAlias( $this->getIndexTypeName(), true );
575        $index = $this->getIndex();
576        $name = $this->getIndexAliasName();
577
578        $path = '_aliases';
579        $data = [ 'actions' => [] ];
580        $status = new Status( $index->getClient() );
581        foreach ( $status->getIndicesWithAlias( $name ) as $aliased ) {
582            $data['actions'][] = [ 'remove' => [ 'index' => $aliased->getName(), 'alias' => $name ] ];
583        }
584
585        $data['actions'][] = [ 'add' => [ 'index' => $index->getName(), 'alias' => $name ] ];
586
587        $index->getClient()->request(
588            $path, Request::POST, $data, [ 'master_timeout' => $this->masterTimeout ]
589        );
590    }
591
592    /**
593     * public because php 5.3 does not support accessing private
594     * methods in a closure.
595     * @param int $docsDumped
596     * @param int $limit
597     */
598    public function outputProgress( $docsDumped, $limit ) {
599        if ( $docsDumped <= 0 ) {
600            return;
601        }
602        $pctDone = (int)( ( $docsDumped / $limit ) * 100 );
603        if ( $this->lastProgressPrinted == $pctDone ) {
604            return;
605        }
606        $this->lastProgressPrinted = $pctDone;
607        if ( ( $pctDone % 2 ) == 0 ) {
608            $this->outputIndented( "    $pctDone% done...\n" );
609        }
610    }
611
612    public function log( $message, $channel = null ) {
613        $date = new \DateTime();
614        parent::output( $date->format( 'Y-m-d H:i:s' ) . " " . $message, $channel );
615    }
616
617    /**
618     * @param string $langCode
619     * @param array $availablePlugins
620     * @return AnalysisConfigBuilder
621     */
622    private function pickAnalyzer( $langCode, array $availablePlugins = [] ) {
623        $analysisConfigBuilder = new SuggesterAnalysisConfigBuilder( $langCode, $availablePlugins );
624        $this->outputIndented( 'Picking analyzer...' .
625                                $analysisConfigBuilder->getDefaultTextAnalyzerType( $langCode ) .
626                                "\n" );
627        return $analysisConfigBuilder;
628    }
629
630    private function createIndex() {
631        // This is "create only" for now.
632        if ( $this->getIndex()->exists() ) {
633            throw new RuntimeException( "Index already exists." );
634        }
635
636        $mappingConfigBuilder = new SuggesterMappingConfigBuilder();
637
638        // We create the index with 0 replicas, this is faster and will
639        // stress less nodes with 4 shards and 2 replicas we would
640        // stress 12 nodes (moreover with the optimize flag)
641        $settings = [
642            'number_of_shards' => $this->getShardCount(),
643            // hacky but we still use auto_expand_replicas
644            // for convenience on small install.
645            'auto_expand_replicas' => "0-0",
646            'refresh_interval' => -1,
647            'analysis' => $this->analysisConfig,
648            'routing.allocation.total_shards_per_node' => $this->getMaxShardsPerNode(),
649        ];
650
651        if ( $this->hasOption( 'allocationIncludeTag' ) ) {
652            $this->output( "Using routing.allocation.include.tag: " .
653                "{$this->getOption( 'allocationIncludeTag' )}, the index might be stuck in red " .
654                "if the cluster is not properly configured.\n" );
655            $settings['routing.allocation.include.tag'] = $this->getOption( 'allocationIncludeTag' );
656        }
657
658        if ( $this->hasOption( 'allocationExcludeTag' ) ) {
659            $this->output( "Using routing.allocation.exclude.tag: " .
660                "{$this->getOption( 'allocationExcludeTag' )}, the index might be stuck in red " .
661                "if the cluster is not properly configured.\n" );
662            $settings['routing.allocation.exclude.tag'] = $this->getOption( 'allocationExcludeTag' );
663        }
664
665        $args = [
666            'settings' => [ 'index' => $settings ],
667            'mappings' => $mappingConfigBuilder->buildConfig()
668        ];
669        // @todo utilize $this->getIndex()->create(...) once it supports setting
670        // the master_timeout parameter.
671        $this->getIndex()->request(
672            '',
673            Request::PUT,
674            $args,
675            [ 'master_timeout' => $this->masterTimeout ]
676        );
677
678        // Index create is async, we have to make sure that the index is ready
679        // before sending any docs to it.
680        $this->waitForGreen();
681    }
682
683    private function enableReplicas() {
684        $this->log( "Enabling replicas...\n" );
685        $args = [
686            'index' => [
687                'auto_expand_replicas' => $this->getReplicaCount(),
688            ],
689        ];
690
691        $path = $this->getIndex()->getName() . "/_settings";
692        $this->getIndex()->getClient()->request(
693            $path,
694            Request::PUT,
695            $args,
696            [ 'master_timeout' => $this->masterTimeout ]
697        );
698
699        // The previous call seems to be async, let's wait few sec
700        // otherwise replication won't have time to start.
701        sleep( 20 );
702
703        // Index will be yellow while replica shards are being allocated.
704        $this->waitForGreen( $this->getOption( 'replicationTimeout', 3600 ) );
705    }
706
707    private function waitForGreen( $timeout = 600 ) {
708        $this->log( "Waiting for the index to go green...\n" );
709        // Wait for the index to go green ( default 10 min)
710        if ( !$this->utils->waitForGreen( $this->getIndex()->getName(), $timeout ) ) {
711            $this->fatalError( "Failed to wait for green... please check config and " .
712                "delete the {$this->getIndex()->getName()} index if it was created." );
713        }
714    }
715
716    /**
717     * @return string Number of replicas this index should have. May be a range such as '0-2'
718     */
719    private function getReplicaCount() {
720        return $this->getConnection()->getSettings()->getReplicaCount( $this->indexSuffix );
721    }
722
723    private function getShardCount() {
724        return $this->getConnection()->getSettings()->getShardCount( $this->indexSuffix );
725    }
726
727    /**
728     * @return int Maximum number of shards that can be allocated on a single elasticsearch
729     *  node. -1 for unlimited.
730     */
731    private function getMaxShardsPerNode() {
732        return $this->getConnection()->getSettings()->getMaxShardsPerNode( $this->indexSuffix );
733    }
734
735    private function updateVersions() {
736        $this->log( "Updating tracking indexes..." );
737        $this->getMetaStore()
738            ->versionStore()
739            ->update( $this->indexBaseName, $this->indexSuffix );
740        $this->output( "ok.\n" );
741    }
742
743    /**
744     * @return \Elastica\Index being updated
745     */
746    public function getIndex() {
747        return $this->getConnection()->getIndex(
748            $this->indexBaseName, $this->indexSuffix, $this->indexIdentifier
749        );
750    }
751
752    /**
753     * @return Elastica\Client
754     */
755    protected function getClient() {
756        return $this->getConnection()->getClient();
757    }
758
759    /**
760     * @return string name of the index type being updated
761     */
762    protected function getIndexAliasName() {
763        return $this->getConnection()->getIndexName( $this->indexBaseName, $this->indexSuffix );
764    }
765}
766
767$maintClass = UpdateSuggesterIndex::class;
768require_once RUN_MAINTENANCE_IF_MAIN;