Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 370
0.00% covered (danger)
0.00%
0 / 26
CRAP
0.00% covered (danger)
0.00%
0 / 1
UpdateSuggesterIndex
0.00% covered (danger)
0.00%
0 / 363
0.00% covered (danger)
0.00%
0 / 26
4830
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 32
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 52
0.00% covered (danger)
0.00%
0 / 1
72
 workAroundBrokenMessageCache
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 checkAndDeleteBrokenIndices
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
30
 rebuild
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
6
 canRecycle
0.00% covered (danger)
0.00%
0 / 42
0.00% covered (danger)
0.00%
0 / 1
110
 recycle
0.00% covered (danger)
0.00%
0 / 45
0.00% covered (danger)
0.00%
0 / 1
42
 deleteOldIndex
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 deleteIndex
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 optimize
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 expungeDeletes
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 indexData
0.00% covered (danger)
0.00%
0 / 48
0.00% covered (danger)
0.00%
0 / 1
56
 validateAlias
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 outputProgress
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
20
 log
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 pickAnalyzer
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 createIndex
0.00% covered (danger)
0.00%
0 / 31
0.00% covered (danger)
0.00%
0 / 1
20
 enableReplicas
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
2
 waitForGreen
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 getReplicaCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getShardCount
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getMaxShardsPerNode
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 updateVersions
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getIndex
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 getClient
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getIndexAliasName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace CirrusSearch\Maintenance;
4
5use CirrusSearch\BuildDocument\Completion\SuggestBuilder;
6use CirrusSearch\Connection;
7use CirrusSearch\ElasticaErrorHandler;
8use CirrusSearch\Maintenance\Validators\AnalyzersValidator;
9use CirrusSearch\SearchConfig;
10use Elastica;
11use Elastica\Index;
12use Elastica\Query;
13use Elastica\Request;
14use Elastica\Status;
15use MediaWiki\Extension\Elastica\MWElasticUtils;
16
17/**
18 * Update the search configuration on the search backend for the title
19 * suggest index.
20 *
21 * This program is free software; you can redistribute it and/or modify
22 * it under the terms of the GNU General Public License as published by
23 * the Free Software Foundation; either version 2 of the License, or
24 * (at your option) any later version.
25 *
26 * This program is distributed in the hope that it will be useful,
27 * but WITHOUT ANY WARRANTY; without even the implied warranty of
28 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * GNU General Public License for more details.
30 *
31 * You should have received a copy of the GNU General Public License along
32 * with this program; if not, write to the Free Software Foundation, Inc.,
33 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
34 * http://www.gnu.org/copyleft/gpl.html
35 */
36
37$IP = getenv( 'MW_INSTALL_PATH' );
38if ( $IP === false ) {
39    $IP = __DIR__ . '/../../..';
40}
41require_once "$IP/maintenance/Maintenance.php";
42require_once __DIR__ . '/../includes/Maintenance/Maintenance.php';
43
44class UpdateSuggesterIndex extends Maintenance {
45    /**
46     * @var string language code we're building for
47     */
48    private $langCode;
49
50    /**
51     * @var int
52     */
53    private $indexChunkSize;
54
55    /**
56     * @var int
57     */
58    private $indexRetryAttempts;
59
60    /**
61     * @var string
62     */
63    private $indexSuffix;
64
65    /**
66     * @var string
67     */
68    private $indexBaseName;
69
70    /**
71     * @var string
72     */
73    private $indexIdentifier;
74
75    /**
76     * @var Index old suggester index that will be deleted at the end of the process
77     */
78    private $oldIndex;
79
80    /**
81     * @var int
82     */
83    private $lastProgressPrinted;
84
85    /**
86     * @var bool optimize the index when done.
87     */
88    private $optimizeIndex;
89
90    /**
91     * @var array list of available plugins
92     */
93    private $availablePlugins;
94
95    /**
96     * @var string
97     */
98    private $masterTimeout;
99
100    /**
101     * @var ConfigUtils
102     */
103    private $utils;
104
105    /**
106     * @todo: public (used in closure)
107     * @var SuggestBuilder
108     */
109    public $builder;
110
111    /**
112     * @var array
113     */
114    private $analysisConfig;
115
116    /**
117     * @var bool
118     */
119    private $recycle = false;
120
121    /**
122     * @var string[]
123     */
124    private $bannedPlugins;
125
126    public function __construct() {
127        parent::__construct();
128        $this->addDescription( "Create a new suggester index. Always operates on a single cluster." );
129        $this->addOption( 'baseName', 'What basename to use for all indexes, ' .
130            'defaults to wiki id', false, true );
131        $this->addOption( 'indexChunkSize', 'Documents per shard to index in a batch.   ' .
132            'Note when changing the number of shards that the old shard size is used, not the new ' .
133            'one.  If you see many errors submitting documents in bulk but the automatic retry as ' .
134            'singles works then lower this number.  Defaults to 500.', false, true );
135        $this->addOption( 'indexRetryAttempts', 'Number of times to back off and retry ' .
136            'per failure.  Note that failures are not common but if Elasticsearch is in the process ' .
137            'of moving a shard this can time out.  This will retry the attempt after some backoff ' .
138            'rather than failing the whole reindex process.  Defaults to 5.', false, true );
139        $this->addOption( 'optimize',
140            'Optimize the index to 1 segment. Defaults to false.', false, false );
141        $this->addOption( 'scoringMethod',
142            'The scoring method to use when computing suggestion weights. ' .
143            'Defaults to $wgCirrusSearchCompletionDefaultScore or quality if unset.', false, true );
144        $this->addOption( 'masterTimeout',
145            'The amount of time to wait for the master to respond to mapping ' .
146            'updates before failing. Defaults to $wgCirrusSearchMasterTimeout.', false, true );
147        $this->addOption( 'replicationTimeout',
148            'The amount of time (seconds) to wait for the replica shards to initialize. ' .
149            'Defaults to 3600 seconds.', false, true );
150        $this->addOption( 'allocationIncludeTag',
151            'Set index.routing.allocation.include.tag on the created index. Useful if you want to ' .
152            'force the suggester index not to be allocated on a specific set of nodes.',
153            false, true );
154        $this->addOption( 'allocationExcludeTag',
155            'Set index.routing.allocation.exclude.tag on the created index. Useful if you want ' .
156            'to force the suggester index not to be allocated on a specific set of nodes.',
157            false, true );
158        $this->addOption( 'recreate', "Force the creation of a new index." );
159    }
160
161    public function execute() {
162        global $wgLanguageCode,
163            $wgCirrusSearchBannedPlugins,
164            $wgCirrusSearchMasterTimeout;
165
166        $this->disablePoolCountersAndLogging();
167        $this->workAroundBrokenMessageCache();
168        $this->masterTimeout = $this->getOption( 'masterTimeout', $wgCirrusSearchMasterTimeout );
169        $this->indexSuffix = Connection::TITLE_SUGGEST_INDEX_SUFFIX;
170
171        $useCompletion = $this->getSearchConfig()->get( 'CirrusSearchUseCompletionSuggester' );
172
173        if ( $useCompletion !== 'build' && $useCompletion !== 'yes' && $useCompletion !== true ) {
174            $this->fatalError( "Completion suggester disabled, quitting..." );
175        }
176
177        // Check that all shards and replicas settings are set
178        try {
179            $this->getShardCount();
180            $this->getReplicaCount();
181            $this->getMaxShardsPerNode();
182        } catch ( \Exception $e ) {
183            $this->fatalError(
184                "Failed to get shard count and replica count information: {$e->getMessage()}"
185            );
186        }
187
188        $this->indexBaseName = $this->getOption(
189            'baseName', $this->getSearchConfig()->get( SearchConfig::INDEX_BASE_NAME )
190        );
191        $this->indexChunkSize = $this->getOption( 'indexChunkSize', 500 );
192        $this->indexRetryAttempts = $this->getOption( 'reindexRetryAttempts', 5 );
193
194        $this->optimizeIndex = $this->getOption( 'optimize', false );
195
196        $this->utils = new ConfigUtils( $this->getClient(), $this );
197
198        $this->langCode = $wgLanguageCode;
199        $this->bannedPlugins = $wgCirrusSearchBannedPlugins;
200
201        $this->availablePlugins = $this->unwrap( $this->utils->scanAvailablePlugins( $this->bannedPlugins ) );
202        $this->analysisConfig = $this->pickAnalyzer( $this->langCode, $this->availablePlugins )
203            ->buildConfig();
204
205        $this->unwrap( $this->utils->checkElasticsearchVersion() );
206
207        try {
208            $this->requireCirrusReady();
209            $this->builder = SuggestBuilder::create( $this->getConnection(),
210                $this->getOption( 'scoringMethod' ), $this->indexBaseName );
211            # check for broken indices and delete them
212            $this->checkAndDeleteBrokenIndices();
213
214            if ( !$this->canRecycle() ) {
215                $this->rebuild();
216            } else {
217                $this->recycle();
218            }
219        } catch ( \Elastica\Exception\Connection\HttpException $e ) {
220            $message = $e->getMessage();
221            $this->log( "\nUnexpected Elasticsearch failure.\n" );
222            $this->fatalError( "Http error communicating with Elasticsearch:  $message.\n" );
223        } catch ( \Elastica\Exception\ExceptionInterface $e ) {
224            $type = get_class( $e );
225            $message = ElasticaErrorHandler::extractMessage( $e );
226            $trace = $e->getTraceAsString();
227            $this->log( "\nUnexpected Elasticsearch failure.\n" );
228            $this->fatalError( "Elasticsearch failed in an unexpected way.  " .
229                "This is always a bug in CirrusSearch.\n" .
230                "Error type: $type\n" .
231                "Message: $message\n" .
232                "Trace:\n" . $trace );
233        }
234
235        return true;
236    }
237
238    private function workAroundBrokenMessageCache() {
239        // Under some configurations (T288233) the i18n cache fails to
240        // initialize. After failing, at least in this particular deployment,
241        // it will fallback to local CDB files and ignore on-wiki overrides
242        // which is acceptable for this script.
243        try {
244            wfMessage( 'ok' )->text();
245        } catch ( \LogicException $e ) {
246            // The first failure should trigger the fallback mode, this second
247            // try should work (and not throw the LogicException deep in the updates).
248            wfMessage( 'ok' )->text();
249        }
250    }
251
252    /**
253     * Check for duplicate indices that may have been created
254     * by a previous update that failed.
255     */
256    private function checkAndDeleteBrokenIndices() {
257        $indices = $this->unwrap( $this->utils->getAllIndicesByType( $this->getIndexAliasName() ) );
258        if ( count( $indices ) < 2 ) {
259            return;
260        }
261        $indexByName = [];
262        foreach ( $indices as $indexName ) {
263            $status = $this->utils->isIndexLive( $indexName );
264            if ( !$status->isGood() ) {
265                $this->log( (string)$status );
266            } elseif ( $status->getValue() === false ) {
267                $this->log( "Deleting broken index {$indexName}\n" );
268                $this->deleteIndex( $this->getConnection()->getIndex( $indexName ) );
269            }
270        }
271        # If something went wrong the process will fail when calling pickIndexIdentifierFromOption
272    }
273
274    private function rebuild() {
275        $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption(
276            'current', $this->getIndexAliasName()
277        ) );
278        $this->oldIndex = $this->getConnection()->getIndex(
279            $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier
280        );
281        $this->indexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption(
282            'now', $this->getIndexAliasName()
283        ) );
284
285        $this->createIndex();
286        $this->indexData();
287        if ( $this->optimizeIndex ) {
288            $this->optimize();
289        }
290        $this->enableReplicas();
291        $this->getIndex()->refresh();
292        $this->validateAlias();
293        $this->updateVersions();
294        $this->deleteOldIndex();
295        $this->log( "Done.\n" );
296    }
297
298    private function canRecycle() {
299        global $wgCirrusSearchRecycleCompletionSuggesterIndex;
300        if ( !$wgCirrusSearchRecycleCompletionSuggesterIndex ) {
301            return false;
302        }
303
304        if ( $this->getOption( "recreate", false ) ) {
305            return false;
306        }
307
308        $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption(
309            'current', $this->getIndexAliasName()
310        ) );
311        $oldIndex = $this->getConnection()->getIndex(
312            $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier
313        );
314        if ( !$oldIndex->exists() ) {
315            $this->error( 'Index does not exist yet cannot recycle.' );
316            return false;
317        }
318        $refresh = $oldIndex->getSettings()->getRefreshInterval();
319        if ( $refresh != '-1' ) {
320            $this->error( 'Refresh interval is not -1, cannot recycle.' );
321            return false;
322        }
323
324        $shards = $oldIndex->getSettings()->get( 'number_of_shards' );
325        // We check only the number of shards since it cannot be updated.
326        if ( $shards != $this->getShardCount() ) {
327            $this->error( 'Number of shards mismatch cannot recycle.' );
328            return false;
329        }
330
331        $mMaj = explode( '.', SuggesterMappingConfigBuilder::VERSION, 2 )[0];
332        $aMaj = explode( '.', SuggesterAnalysisConfigBuilder::VERSION, 2 )[0];
333
334        try {
335            $versionDoc = $this->getMetaStore()
336                ->versionStore()
337                ->find( $this->indexBaseName, $this->indexSuffix );
338        } catch ( \Elastica\Exception\NotFoundException $nfe ) {
339            $this->error( 'Index missing in mw_cirrus_metastore::version, cannot recycle.' );
340            return false;
341        }
342
343        if ( $versionDoc->analysis_maj != $aMaj ) {
344            $this->error( 'Analysis config version mismatch, cannot recycle.' );
345            return false;
346        }
347
348        if ( $versionDoc->mapping_maj != $mMaj ) {
349            $this->error( 'Mapping config version mismatch, cannot recycle.' );
350            return false;
351        }
352
353        $validator = new AnalyzersValidator( $oldIndex, $this->analysisConfig, $this );
354        $status = $validator->validate();
355        if ( !$status->isOK() ) {
356            $this->error( 'Analysis config differs, cannot recycle.' );
357            return false;
358        }
359
360        return true;
361    }
362
363    /**
364     * Recycle a suggester index:
365     * 1/ index data (delete docs if it already exists)
366     * 2/ expunge deleted docs
367     * 3/ refresh the reader
368     *    - so we can run a quick delete on remaining docs
369     *      (the docs that were actually deleted)
370     *    - drawbacks we load the FST from an un-optimized index
371     * 4/ delete old docs
372     * 5/ optimize
373     * 6/ refresh the reader
374     *
375     * Drawbacks: the FST will be read from disk twice in a short
376     * amount of time.
377     * This is a trade off between cluster operation and disk operation.
378     * Recreating the index may require less disk operations but causes
379     * the cluster to rebalance.
380     * This is certainly the best strategy for small indices (less than 100k docs)
381     * but needs to be carefully tested on bigger indices with high QPS.
382     */
383    private function recycle() {
384        $this->log( "Recycling index {$this->getIndex()->getName()}\n" );
385        $this->recycle = true;
386        $this->indexData();
387        // This is fragile... hopefully most of the docs will be deleted from the old segments
388        // and will result in a fast operation.
389        // New segments should not be affected.
390        // Unfortunately if a failure causes the process to stop
391        // the FST will maybe contains duplicates as it cannot (elastic 1.7)
392        // filter deleted docs. We will rely on output deduplication
393        // but this will certainly affect performances.
394
395        $this->expungeDeletes();
396        // Refresh the reader so we can scroll over remaining docs.
397        // At this point we may read the new un-optimized FST segments
398        // Old ones should be pretty small after expungeDeletes
399        $this->getIndex()->refresh();
400
401        $boolNot = new Elastica\Query\BoolQuery();
402        $boolNot->addMustNot(
403            new Elastica\Query\Term( [ "batch_id" => $this->builder->getBatchId() ] )
404        );
405        $bool = new Elastica\Query\BoolQuery();
406        $bool->addFilter( $boolNot );
407
408        $query = new Elastica\Query();
409        $query->setQuery( $bool );
410        $query->setSize( $this->indexChunkSize );
411        $query->setSource( false );
412        $query->setSort( [ '_doc' ] );
413        // Explicitly ask for accurate total_hits even-though we use a scroll request
414        $query->setTrackTotalHits( true );
415        $search = new \Elastica\Search( $this->getClient() );
416        $search->setQuery( $query );
417        $search->addIndex( $this->getIndex() );
418        $scroll = new \Elastica\Scroll( $search, '15m' );
419
420        $totalDocsToDump = -1;
421        $docsDumped = 0;
422
423        $this->log( "Deleting remaining docs from previous batch\n" );
424        foreach ( $scroll as $results ) {
425            if ( $totalDocsToDump === -1 ) {
426                $totalDocsToDump = $results->getTotalHits();
427                if ( $totalDocsToDump === 0 ) {
428                    break;
429                }
430                $docsDumped = 0;
431            }
432            $docIds = [];
433            foreach ( $results as $result ) {
434                $docsDumped++;
435                $docIds[] = $result->getId();
436            }
437            $this->outputProgress( $docsDumped, $totalDocsToDump );
438            if ( empty( $docIds ) ) {
439                continue;
440            }
441
442            MWElasticUtils::withRetry( $this->indexRetryAttempts,
443                function () use ( $docIds ) {
444                    $this->getIndex()->deleteByQuery( new Query\Ids( $docIds ) );
445                }
446            );
447        }
448        $this->log( "Done.\n" );
449        // Old docs should be deleted now we can optimize and flush
450        $this->optimize();
451
452        // @todo add support for changing the number of replicas
453        // if the setting was changed in cirrus config.
454        // Workaround is to change the settings directly on the cluster.
455
456        // Refresh the reader so it now uses the optimized FST,
457        // and actually free and delete old segments.
458        $this->getIndex()->refresh();
459    }
460
461    private function deleteOldIndex() {
462        if ( $this->oldIndex && $this->oldIndex->exists() ) {
463            $this->log( "Deleting " . $this->oldIndex->getName() . " ... " );
464            // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated
465            // to allow passing the master_timeout
466            $this->oldIndex->request(
467                '',
468                Request::DELETE,
469                [],
470                [ 'master_timeout' => $this->masterTimeout ]
471            );
472            $this->output( "ok.\n" );
473        }
474    }
475
476    /**
477     * Delete an index
478     * @param \Elastica\Index $index
479     */
480    private function deleteIndex( \Elastica\Index $index ) {
481        // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated
482        // to allow passing the master_timeout
483        $index->request(
484            '',
485            Request::DELETE,
486            [],
487            [ 'master_timeout' => $this->masterTimeout ]
488        );
489    }
490
491    private function optimize() {
492        $this->log( "Optimizing index..." );
493        $this->getIndex()->forcemerge( [ 'max_num_segments' => 1 ] );
494        $this->output( "ok.\n" );
495    }
496
497    private function expungeDeletes() {
498        $this->log( "Purging deleted docs..." );
499        $this->getIndex()->forcemerge( [ 'only_expunge_deletes' => 'true', 'flush' => 'false' ] );
500        $this->output( "ok.\n" );
501    }
502
503    private function indexData() {
504        // We build the suggestions by reading CONTENT and GENERAL indices.
505        // This does not support extra indices like FILES on commons.
506        $sourceIndexSuffixes = [ Connection::CONTENT_INDEX_SUFFIX, Connection::GENERAL_INDEX_SUFFIX ];
507
508        $query = new Query();
509        $query->setSource( [
510            'includes' => $this->builder->getRequiredFields()
511        ] );
512
513        $pageAndNs = new Elastica\Query\BoolQuery();
514        $pageAndNs->addShould( new Elastica\Query\Term( [ "namespace" => NS_MAIN ] ) );
515        $pageAndNs->addShould( new Elastica\Query\Term( [ "redirect.namespace" => NS_MAIN ] ) );
516        $bool = new Elastica\Query\BoolQuery();
517        $bool->addFilter( $pageAndNs );
518
519        $query->setQuery( $bool );
520        $query->setSort( [ '_doc' ] );
521        // Explicitly ask for accurate total_hits even-though we use a scroll request
522        $query->setTrackTotalHits( true );
523
524        foreach ( $sourceIndexSuffixes as $sourceIndexSuffix ) {
525            $sourceIndex = $this->getConnection()->getIndex( $this->indexBaseName, $sourceIndexSuffix );
526            $search = new \Elastica\Search( $this->getClient() );
527            $search->setQuery( $query );
528            $search->addIndex( $sourceIndex );
529            $query->setSize( $this->indexChunkSize );
530            $totalDocsToDump = -1;
531            $scroll = new \Elastica\Scroll( $search, '15m' );
532
533            $docsDumped = 0;
534            $destinationIndex = $this->getIndex();
535
536            foreach ( $scroll as $results ) {
537                if ( $totalDocsToDump === -1 ) {
538                    $totalDocsToDump = $results->getTotalHits();
539                    if ( $totalDocsToDump === 0 ) {
540                        $this->log( "No documents to index from $sourceIndexSuffix\n" );
541                        break;
542                    }
543                    $this->log( "Indexing $totalDocsToDump documents from $sourceIndexSuffix with " .
544                        "batchId: {$this->builder->getBatchId()}\n" );
545                }
546                $inputDocs = [];
547                foreach ( $results as $result ) {
548                    $docsDumped++;
549                    $inputDocs[] = [
550                        'id' => $result->getId(),
551                        'source' => $result->getSource()
552                    ];
553                }
554
555                $suggestDocs = $this->builder->build( $inputDocs );
556                if ( empty( $suggestDocs ) ) {
557                    continue;
558                }
559                $this->outputProgress( $docsDumped, $totalDocsToDump );
560                MWElasticUtils::withRetry( $this->indexRetryAttempts,
561                    static function () use ( $destinationIndex, $suggestDocs ) {
562                        $destinationIndex->addDocuments( $suggestDocs );
563                    }
564                );
565            }
566            $this->log( "Indexing from $sourceIndexSuffix index done.\n" );
567        }
568    }
569
570    public function validateAlias() {
571        // @todo utilize the following once Elastica is updated to support passing
572        // master_timeout. This is a copy of the Elastica\Index::addAlias() method
573        // $this->getIndex()->addAlias( $this->getIndexTypeName(), true );
574        $index = $this->getIndex();
575        $name = $this->getIndexAliasName();
576
577        $path = '_aliases';
578        $data = [ 'actions' => [] ];
579        $status = new Status( $index->getClient() );
580        foreach ( $status->getIndicesWithAlias( $name ) as $aliased ) {
581            $data['actions'][] = [ 'remove' => [ 'index' => $aliased->getName(), 'alias' => $name ] ];
582        }
583
584        $data['actions'][] = [ 'add' => [ 'index' => $index->getName(), 'alias' => $name ] ];
585
586        $index->getClient()->request(
587            $path, Request::POST, $data, [ 'master_timeout' => $this->masterTimeout ]
588        );
589    }
590
591    /**
592     * public because php 5.3 does not support accessing private
593     * methods in a closure.
594     * @param int $docsDumped
595     * @param int $limit
596     */
597    public function outputProgress( $docsDumped, $limit ) {
598        if ( $docsDumped <= 0 ) {
599            return;
600        }
601        $pctDone = (int)( ( $docsDumped / $limit ) * 100 );
602        if ( $this->lastProgressPrinted == $pctDone ) {
603            return;
604        }
605        $this->lastProgressPrinted = $pctDone;
606        if ( ( $pctDone % 2 ) == 0 ) {
607            $this->outputIndented( "    $pctDone% done...\n" );
608        }
609    }
610
611    public function log( $message, $channel = null ) {
612        $date = new \DateTime();
613        parent::output( $date->format( 'Y-m-d H:i:s' ) . " " . $message, $channel );
614    }
615
616    /**
617     * @param string $langCode
618     * @param array $availablePlugins
619     * @return AnalysisConfigBuilder
620     */
621    private function pickAnalyzer( $langCode, array $availablePlugins = [] ) {
622        $analysisConfigBuilder = new SuggesterAnalysisConfigBuilder( $langCode, $availablePlugins );
623        $this->outputIndented( 'Picking analyzer...' .
624                                $analysisConfigBuilder->getDefaultTextAnalyzerType( $langCode ) .
625                                "\n" );
626        return $analysisConfigBuilder;
627    }
628
629    private function createIndex() {
630        // This is "create only" for now.
631        if ( $this->getIndex()->exists() ) {
632            throw new \Exception( "Index already exists." );
633        }
634
635        $mappingConfigBuilder = new SuggesterMappingConfigBuilder();
636
637        // We create the index with 0 replicas, this is faster and will
638        // stress less nodes with 4 shards and 2 replicas we would
639        // stress 12 nodes (moreover with the optimize flag)
640        $settings = [
641            'number_of_shards' => $this->getShardCount(),
642            // hacky but we still use auto_expand_replicas
643            // for convenience on small install.
644            'auto_expand_replicas' => "0-0",
645            'refresh_interval' => -1,
646            'analysis' => $this->analysisConfig,
647            'routing.allocation.total_shards_per_node' => $this->getMaxShardsPerNode(),
648        ];
649
650        if ( $this->hasOption( 'allocationIncludeTag' ) ) {
651            $this->output( "Using routing.allocation.include.tag: " .
652                "{$this->getOption( 'allocationIncludeTag' )}, the index might be stuck in red " .
653                "if the cluster is not properly configured.\n" );
654            $settings['routing.allocation.include.tag'] = $this->getOption( 'allocationIncludeTag' );
655        }
656
657        if ( $this->hasOption( 'allocationExcludeTag' ) ) {
658            $this->output( "Using routing.allocation.exclude.tag: " .
659                "{$this->getOption( 'allocationExcludeTag' )}, the index might be stuck in red " .
660                "if the cluster is not properly configured.\n" );
661            $settings['routing.allocation.exclude.tag'] = $this->getOption( 'allocationExcludeTag' );
662        }
663
664        $args = [
665            'settings' => [ 'index' => $settings ],
666            'mappings' => $mappingConfigBuilder->buildConfig()
667        ];
668        // @todo utilize $this->getIndex()->create(...) once it supports setting
669        // the master_timeout parameter.
670        $this->getIndex()->request(
671            '',
672            Request::PUT,
673            $args,
674            [ 'master_timeout' => $this->masterTimeout ]
675        );
676
677        // Index create is async, we have to make sure that the index is ready
678        // before sending any docs to it.
679        $this->waitForGreen();
680    }
681
682    private function enableReplicas() {
683        $this->log( "Enabling replicas...\n" );
684        $args = [
685            'index' => [
686                'auto_expand_replicas' => $this->getReplicaCount(),
687            ],
688        ];
689
690        $path = $this->getIndex()->getName() . "/_settings";
691        $this->getIndex()->getClient()->request(
692            $path,
693            Request::PUT,
694            $args,
695            [ 'master_timeout' => $this->masterTimeout ]
696        );
697
698        // The previous call seems to be async, let's wait few sec
699        // otherwise replication won't have time to start.
700        sleep( 20 );
701
702        // Index will be yellow while replica shards are being allocated.
703        $this->waitForGreen( $this->getOption( 'replicationTimeout', 3600 ) );
704    }
705
706    private function waitForGreen( $timeout = 600 ) {
707        $this->log( "Waiting for the index to go green...\n" );
708        // Wait for the index to go green ( default 10 min)
709        if ( !$this->utils->waitForGreen( $this->getIndex()->getName(), $timeout ) ) {
710            $this->fatalError( "Failed to wait for green... please check config and " .
711                "delete the {$this->getIndex()->getName()} index if it was created." );
712        }
713    }
714
715    /**
716     * @return string Number of replicas this index should have. May be a range such as '0-2'
717     */
718    private function getReplicaCount() {
719        return $this->getConnection()->getSettings()->getReplicaCount( $this->indexSuffix );
720    }
721
722    private function getShardCount() {
723        return $this->getConnection()->getSettings()->getShardCount( $this->indexSuffix );
724    }
725
726    /**
727     * @return int Maximum number of shards that can be allocated on a single elasticsearch
728     *  node. -1 for unlimited.
729     */
730    private function getMaxShardsPerNode() {
731        return $this->getConnection()->getSettings()->getMaxShardsPerNode( $this->indexSuffix );
732    }
733
734    private function updateVersions() {
735        $this->log( "Updating tracking indexes..." );
736        $this->getMetaStore()
737            ->versionStore()
738            ->update( $this->indexBaseName, $this->indexSuffix );
739        $this->output( "ok.\n" );
740    }
741
742    /**
743     * @return \Elastica\Index being updated
744     */
745    public function getIndex() {
746        return $this->getConnection()->getIndex(
747            $this->indexBaseName, $this->indexSuffix, $this->indexIdentifier
748        );
749    }
750
751    /**
752     * @return Elastica\Client
753     */
754    protected function getClient() {
755        return $this->getConnection()->getClient();
756    }
757
758    /**
759     * @return string name of the index type being updated
760     */
761    protected function getIndexAliasName() {
762        return $this->getConnection()->getIndexName( $this->indexBaseName, $this->indexSuffix );
763    }
764}
765
766$maintClass = UpdateSuggesterIndex::class;
767require_once RUN_MAINTENANCE_IF_MAIN;