Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 370 |
|
0.00% |
0 / 26 |
CRAP | |
0.00% |
0 / 1 |
UpdateSuggesterIndex | |
0.00% |
0 / 363 |
|
0.00% |
0 / 26 |
4830 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 32 |
|
0.00% |
0 / 1 |
2 | |||
execute | |
0.00% |
0 / 52 |
|
0.00% |
0 / 1 |
72 | |||
workAroundBrokenMessageCache | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
checkAndDeleteBrokenIndices | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
30 | |||
rebuild | |
0.00% |
0 / 19 |
|
0.00% |
0 / 1 |
6 | |||
canRecycle | |
0.00% |
0 / 42 |
|
0.00% |
0 / 1 |
110 | |||
recycle | |
0.00% |
0 / 45 |
|
0.00% |
0 / 1 |
42 | |||
deleteOldIndex | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 | |||
deleteIndex | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
optimize | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
expungeDeletes | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
indexData | |
0.00% |
0 / 48 |
|
0.00% |
0 / 1 |
56 | |||
validateAlias | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
outputProgress | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
20 | |||
log | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
pickAnalyzer | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
createIndex | |
0.00% |
0 / 31 |
|
0.00% |
0 / 1 |
20 | |||
enableReplicas | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
2 | |||
waitForGreen | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
getReplicaCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getShardCount | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getMaxShardsPerNode | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
updateVersions | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
getIndex | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
getClient | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getIndexAliasName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Maintenance; |
4 | |
5 | use CirrusSearch\BuildDocument\Completion\SuggestBuilder; |
6 | use CirrusSearch\Connection; |
7 | use CirrusSearch\ElasticaErrorHandler; |
8 | use CirrusSearch\Maintenance\Validators\AnalyzersValidator; |
9 | use CirrusSearch\SearchConfig; |
10 | use Elastica; |
11 | use Elastica\Index; |
12 | use Elastica\Query; |
13 | use Elastica\Request; |
14 | use Elastica\Status; |
15 | use MediaWiki\Extension\Elastica\MWElasticUtils; |
16 | |
17 | /** |
18 | * Update the search configuration on the search backend for the title |
19 | * suggest index. |
20 | * |
21 | * This program is free software; you can redistribute it and/or modify |
22 | * it under the terms of the GNU General Public License as published by |
23 | * the Free Software Foundation; either version 2 of the License, or |
24 | * (at your option) any later version. |
25 | * |
26 | * This program is distributed in the hope that it will be useful, |
27 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
28 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
29 | * GNU General Public License for more details. |
30 | * |
31 | * You should have received a copy of the GNU General Public License along |
32 | * with this program; if not, write to the Free Software Foundation, Inc., |
33 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
34 | * http://www.gnu.org/copyleft/gpl.html |
35 | */ |
36 | |
37 | $IP = getenv( 'MW_INSTALL_PATH' ); |
38 | if ( $IP === false ) { |
39 | $IP = __DIR__ . '/../../..'; |
40 | } |
41 | require_once "$IP/maintenance/Maintenance.php"; |
42 | require_once __DIR__ . '/../includes/Maintenance/Maintenance.php'; |
43 | |
44 | class UpdateSuggesterIndex extends Maintenance { |
45 | /** |
46 | * @var string language code we're building for |
47 | */ |
48 | private $langCode; |
49 | |
50 | /** |
51 | * @var int |
52 | */ |
53 | private $indexChunkSize; |
54 | |
55 | /** |
56 | * @var int |
57 | */ |
58 | private $indexRetryAttempts; |
59 | |
60 | /** |
61 | * @var string |
62 | */ |
63 | private $indexSuffix; |
64 | |
65 | /** |
66 | * @var string |
67 | */ |
68 | private $indexBaseName; |
69 | |
70 | /** |
71 | * @var string |
72 | */ |
73 | private $indexIdentifier; |
74 | |
75 | /** |
76 | * @var Index old suggester index that will be deleted at the end of the process |
77 | */ |
78 | private $oldIndex; |
79 | |
80 | /** |
81 | * @var int |
82 | */ |
83 | private $lastProgressPrinted; |
84 | |
85 | /** |
86 | * @var bool optimize the index when done. |
87 | */ |
88 | private $optimizeIndex; |
89 | |
90 | /** |
91 | * @var array list of available plugins |
92 | */ |
93 | private $availablePlugins; |
94 | |
95 | /** |
96 | * @var string |
97 | */ |
98 | private $masterTimeout; |
99 | |
100 | /** |
101 | * @var ConfigUtils |
102 | */ |
103 | private $utils; |
104 | |
105 | /** |
106 | * @todo: public (used in closure) |
107 | * @var SuggestBuilder |
108 | */ |
109 | public $builder; |
110 | |
111 | /** |
112 | * @var array |
113 | */ |
114 | private $analysisConfig; |
115 | |
116 | /** |
117 | * @var bool |
118 | */ |
119 | private $recycle = false; |
120 | |
121 | /** |
122 | * @var string[] |
123 | */ |
124 | private $bannedPlugins; |
125 | |
126 | public function __construct() { |
127 | parent::__construct(); |
128 | $this->addDescription( "Create a new suggester index. Always operates on a single cluster." ); |
129 | $this->addOption( 'baseName', 'What basename to use for all indexes, ' . |
130 | 'defaults to wiki id', false, true ); |
131 | $this->addOption( 'indexChunkSize', 'Documents per shard to index in a batch. ' . |
132 | 'Note when changing the number of shards that the old shard size is used, not the new ' . |
133 | 'one. If you see many errors submitting documents in bulk but the automatic retry as ' . |
134 | 'singles works then lower this number. Defaults to 500.', false, true ); |
135 | $this->addOption( 'indexRetryAttempts', 'Number of times to back off and retry ' . |
136 | 'per failure. Note that failures are not common but if Elasticsearch is in the process ' . |
137 | 'of moving a shard this can time out. This will retry the attempt after some backoff ' . |
138 | 'rather than failing the whole reindex process. Defaults to 5.', false, true ); |
139 | $this->addOption( 'optimize', |
140 | 'Optimize the index to 1 segment. Defaults to false.', false, false ); |
141 | $this->addOption( 'scoringMethod', |
142 | 'The scoring method to use when computing suggestion weights. ' . |
143 | 'Defaults to $wgCirrusSearchCompletionDefaultScore or quality if unset.', false, true ); |
144 | $this->addOption( 'masterTimeout', |
145 | 'The amount of time to wait for the master to respond to mapping ' . |
146 | 'updates before failing. Defaults to $wgCirrusSearchMasterTimeout.', false, true ); |
147 | $this->addOption( 'replicationTimeout', |
148 | 'The amount of time (seconds) to wait for the replica shards to initialize. ' . |
149 | 'Defaults to 3600 seconds.', false, true ); |
150 | $this->addOption( 'allocationIncludeTag', |
151 | 'Set index.routing.allocation.include.tag on the created index. Useful if you want to ' . |
152 | 'force the suggester index not to be allocated on a specific set of nodes.', |
153 | false, true ); |
154 | $this->addOption( 'allocationExcludeTag', |
155 | 'Set index.routing.allocation.exclude.tag on the created index. Useful if you want ' . |
156 | 'to force the suggester index not to be allocated on a specific set of nodes.', |
157 | false, true ); |
158 | $this->addOption( 'recreate', "Force the creation of a new index." ); |
159 | } |
160 | |
161 | public function execute() { |
162 | global $wgLanguageCode, |
163 | $wgCirrusSearchBannedPlugins, |
164 | $wgCirrusSearchMasterTimeout; |
165 | |
166 | $this->disablePoolCountersAndLogging(); |
167 | $this->workAroundBrokenMessageCache(); |
168 | $this->masterTimeout = $this->getOption( 'masterTimeout', $wgCirrusSearchMasterTimeout ); |
169 | $this->indexSuffix = Connection::TITLE_SUGGEST_INDEX_SUFFIX; |
170 | |
171 | $useCompletion = $this->getSearchConfig()->get( 'CirrusSearchUseCompletionSuggester' ); |
172 | |
173 | if ( $useCompletion !== 'build' && $useCompletion !== 'yes' && $useCompletion !== true ) { |
174 | $this->fatalError( "Completion suggester disabled, quitting..." ); |
175 | } |
176 | |
177 | // Check that all shards and replicas settings are set |
178 | try { |
179 | $this->getShardCount(); |
180 | $this->getReplicaCount(); |
181 | $this->getMaxShardsPerNode(); |
182 | } catch ( \Exception $e ) { |
183 | $this->fatalError( |
184 | "Failed to get shard count and replica count information: {$e->getMessage()}" |
185 | ); |
186 | } |
187 | |
188 | $this->indexBaseName = $this->getOption( |
189 | 'baseName', $this->getSearchConfig()->get( SearchConfig::INDEX_BASE_NAME ) |
190 | ); |
191 | $this->indexChunkSize = $this->getOption( 'indexChunkSize', 500 ); |
192 | $this->indexRetryAttempts = $this->getOption( 'reindexRetryAttempts', 5 ); |
193 | |
194 | $this->optimizeIndex = $this->getOption( 'optimize', false ); |
195 | |
196 | $this->utils = new ConfigUtils( $this->getClient(), $this ); |
197 | |
198 | $this->langCode = $wgLanguageCode; |
199 | $this->bannedPlugins = $wgCirrusSearchBannedPlugins; |
200 | |
201 | $this->availablePlugins = $this->unwrap( $this->utils->scanAvailablePlugins( $this->bannedPlugins ) ); |
202 | $this->analysisConfig = $this->pickAnalyzer( $this->langCode, $this->availablePlugins ) |
203 | ->buildConfig(); |
204 | |
205 | $this->unwrap( $this->utils->checkElasticsearchVersion() ); |
206 | |
207 | try { |
208 | $this->requireCirrusReady(); |
209 | $this->builder = SuggestBuilder::create( $this->getConnection(), |
210 | $this->getOption( 'scoringMethod' ), $this->indexBaseName ); |
211 | # check for broken indices and delete them |
212 | $this->checkAndDeleteBrokenIndices(); |
213 | |
214 | if ( !$this->canRecycle() ) { |
215 | $this->rebuild(); |
216 | } else { |
217 | $this->recycle(); |
218 | } |
219 | } catch ( \Elastica\Exception\Connection\HttpException $e ) { |
220 | $message = $e->getMessage(); |
221 | $this->log( "\nUnexpected Elasticsearch failure.\n" ); |
222 | $this->fatalError( "Http error communicating with Elasticsearch: $message.\n" ); |
223 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
224 | $type = get_class( $e ); |
225 | $message = ElasticaErrorHandler::extractMessage( $e ); |
226 | $trace = $e->getTraceAsString(); |
227 | $this->log( "\nUnexpected Elasticsearch failure.\n" ); |
228 | $this->fatalError( "Elasticsearch failed in an unexpected way. " . |
229 | "This is always a bug in CirrusSearch.\n" . |
230 | "Error type: $type\n" . |
231 | "Message: $message\n" . |
232 | "Trace:\n" . $trace ); |
233 | } |
234 | |
235 | return true; |
236 | } |
237 | |
238 | private function workAroundBrokenMessageCache() { |
239 | // Under some configurations (T288233) the i18n cache fails to |
240 | // initialize. After failing, at least in this particular deployment, |
241 | // it will fallback to local CDB files and ignore on-wiki overrides |
242 | // which is acceptable for this script. |
243 | try { |
244 | wfMessage( 'ok' )->text(); |
245 | } catch ( \LogicException $e ) { |
246 | // The first failure should trigger the fallback mode, this second |
247 | // try should work (and not throw the LogicException deep in the updates). |
248 | wfMessage( 'ok' )->text(); |
249 | } |
250 | } |
251 | |
252 | /** |
253 | * Check for duplicate indices that may have been created |
254 | * by a previous update that failed. |
255 | */ |
256 | private function checkAndDeleteBrokenIndices() { |
257 | $indices = $this->unwrap( $this->utils->getAllIndicesByType( $this->getIndexAliasName() ) ); |
258 | if ( count( $indices ) < 2 ) { |
259 | return; |
260 | } |
261 | $indexByName = []; |
262 | foreach ( $indices as $indexName ) { |
263 | $status = $this->utils->isIndexLive( $indexName ); |
264 | if ( !$status->isGood() ) { |
265 | $this->log( (string)$status ); |
266 | } elseif ( $status->getValue() === false ) { |
267 | $this->log( "Deleting broken index {$indexName}\n" ); |
268 | $this->deleteIndex( $this->getConnection()->getIndex( $indexName ) ); |
269 | } |
270 | } |
271 | # If something went wrong the process will fail when calling pickIndexIdentifierFromOption |
272 | } |
273 | |
274 | private function rebuild() { |
275 | $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption( |
276 | 'current', $this->getIndexAliasName() |
277 | ) ); |
278 | $this->oldIndex = $this->getConnection()->getIndex( |
279 | $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier |
280 | ); |
281 | $this->indexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption( |
282 | 'now', $this->getIndexAliasName() |
283 | ) ); |
284 | |
285 | $this->createIndex(); |
286 | $this->indexData(); |
287 | if ( $this->optimizeIndex ) { |
288 | $this->optimize(); |
289 | } |
290 | $this->enableReplicas(); |
291 | $this->getIndex()->refresh(); |
292 | $this->validateAlias(); |
293 | $this->updateVersions(); |
294 | $this->deleteOldIndex(); |
295 | $this->log( "Done.\n" ); |
296 | } |
297 | |
298 | private function canRecycle() { |
299 | global $wgCirrusSearchRecycleCompletionSuggesterIndex; |
300 | if ( !$wgCirrusSearchRecycleCompletionSuggesterIndex ) { |
301 | return false; |
302 | } |
303 | |
304 | if ( $this->getOption( "recreate", false ) ) { |
305 | return false; |
306 | } |
307 | |
308 | $oldIndexIdentifier = $this->unwrap( $this->utils->pickIndexIdentifierFromOption( |
309 | 'current', $this->getIndexAliasName() |
310 | ) ); |
311 | $oldIndex = $this->getConnection()->getIndex( |
312 | $this->indexBaseName, $this->indexSuffix, $oldIndexIdentifier |
313 | ); |
314 | if ( !$oldIndex->exists() ) { |
315 | $this->error( 'Index does not exist yet cannot recycle.' ); |
316 | return false; |
317 | } |
318 | $refresh = $oldIndex->getSettings()->getRefreshInterval(); |
319 | if ( $refresh != '-1' ) { |
320 | $this->error( 'Refresh interval is not -1, cannot recycle.' ); |
321 | return false; |
322 | } |
323 | |
324 | $shards = $oldIndex->getSettings()->get( 'number_of_shards' ); |
325 | // We check only the number of shards since it cannot be updated. |
326 | if ( $shards != $this->getShardCount() ) { |
327 | $this->error( 'Number of shards mismatch cannot recycle.' ); |
328 | return false; |
329 | } |
330 | |
331 | $mMaj = explode( '.', SuggesterMappingConfigBuilder::VERSION, 2 )[0]; |
332 | $aMaj = explode( '.', SuggesterAnalysisConfigBuilder::VERSION, 2 )[0]; |
333 | |
334 | try { |
335 | $versionDoc = $this->getMetaStore() |
336 | ->versionStore() |
337 | ->find( $this->indexBaseName, $this->indexSuffix ); |
338 | } catch ( \Elastica\Exception\NotFoundException $nfe ) { |
339 | $this->error( 'Index missing in mw_cirrus_metastore::version, cannot recycle.' ); |
340 | return false; |
341 | } |
342 | |
343 | if ( $versionDoc->analysis_maj != $aMaj ) { |
344 | $this->error( 'Analysis config version mismatch, cannot recycle.' ); |
345 | return false; |
346 | } |
347 | |
348 | if ( $versionDoc->mapping_maj != $mMaj ) { |
349 | $this->error( 'Mapping config version mismatch, cannot recycle.' ); |
350 | return false; |
351 | } |
352 | |
353 | $validator = new AnalyzersValidator( $oldIndex, $this->analysisConfig, $this ); |
354 | $status = $validator->validate(); |
355 | if ( !$status->isOK() ) { |
356 | $this->error( 'Analysis config differs, cannot recycle.' ); |
357 | return false; |
358 | } |
359 | |
360 | return true; |
361 | } |
362 | |
363 | /** |
364 | * Recycle a suggester index: |
365 | * 1/ index data (delete docs if it already exists) |
366 | * 2/ expunge deleted docs |
367 | * 3/ refresh the reader |
368 | * - so we can run a quick delete on remaining docs |
369 | * (the docs that were actually deleted) |
370 | * - drawbacks we load the FST from an un-optimized index |
371 | * 4/ delete old docs |
372 | * 5/ optimize |
373 | * 6/ refresh the reader |
374 | * |
375 | * Drawbacks: the FST will be read from disk twice in a short |
376 | * amount of time. |
377 | * This is a trade off between cluster operation and disk operation. |
378 | * Recreating the index may require less disk operations but causes |
379 | * the cluster to rebalance. |
380 | * This is certainly the best strategy for small indices (less than 100k docs) |
381 | * but needs to be carefully tested on bigger indices with high QPS. |
382 | */ |
383 | private function recycle() { |
384 | $this->log( "Recycling index {$this->getIndex()->getName()}\n" ); |
385 | $this->recycle = true; |
386 | $this->indexData(); |
387 | // This is fragile... hopefully most of the docs will be deleted from the old segments |
388 | // and will result in a fast operation. |
389 | // New segments should not be affected. |
390 | // Unfortunately if a failure causes the process to stop |
391 | // the FST will maybe contains duplicates as it cannot (elastic 1.7) |
392 | // filter deleted docs. We will rely on output deduplication |
393 | // but this will certainly affect performances. |
394 | |
395 | $this->expungeDeletes(); |
396 | // Refresh the reader so we can scroll over remaining docs. |
397 | // At this point we may read the new un-optimized FST segments |
398 | // Old ones should be pretty small after expungeDeletes |
399 | $this->getIndex()->refresh(); |
400 | |
401 | $boolNot = new Elastica\Query\BoolQuery(); |
402 | $boolNot->addMustNot( |
403 | new Elastica\Query\Term( [ "batch_id" => $this->builder->getBatchId() ] ) |
404 | ); |
405 | $bool = new Elastica\Query\BoolQuery(); |
406 | $bool->addFilter( $boolNot ); |
407 | |
408 | $query = new Elastica\Query(); |
409 | $query->setQuery( $bool ); |
410 | $query->setSize( $this->indexChunkSize ); |
411 | $query->setSource( false ); |
412 | $query->setSort( [ '_doc' ] ); |
413 | // Explicitly ask for accurate total_hits even-though we use a scroll request |
414 | $query->setTrackTotalHits( true ); |
415 | $search = new \Elastica\Search( $this->getClient() ); |
416 | $search->setQuery( $query ); |
417 | $search->addIndex( $this->getIndex() ); |
418 | $scroll = new \Elastica\Scroll( $search, '15m' ); |
419 | |
420 | $totalDocsToDump = -1; |
421 | $docsDumped = 0; |
422 | |
423 | $this->log( "Deleting remaining docs from previous batch\n" ); |
424 | foreach ( $scroll as $results ) { |
425 | if ( $totalDocsToDump === -1 ) { |
426 | $totalDocsToDump = $results->getTotalHits(); |
427 | if ( $totalDocsToDump === 0 ) { |
428 | break; |
429 | } |
430 | $docsDumped = 0; |
431 | } |
432 | $docIds = []; |
433 | foreach ( $results as $result ) { |
434 | $docsDumped++; |
435 | $docIds[] = $result->getId(); |
436 | } |
437 | $this->outputProgress( $docsDumped, $totalDocsToDump ); |
438 | if ( empty( $docIds ) ) { |
439 | continue; |
440 | } |
441 | |
442 | MWElasticUtils::withRetry( $this->indexRetryAttempts, |
443 | function () use ( $docIds ) { |
444 | $this->getIndex()->deleteByQuery( new Query\Ids( $docIds ) ); |
445 | } |
446 | ); |
447 | } |
448 | $this->log( "Done.\n" ); |
449 | // Old docs should be deleted now we can optimize and flush |
450 | $this->optimize(); |
451 | |
452 | // @todo add support for changing the number of replicas |
453 | // if the setting was changed in cirrus config. |
454 | // Workaround is to change the settings directly on the cluster. |
455 | |
456 | // Refresh the reader so it now uses the optimized FST, |
457 | // and actually free and delete old segments. |
458 | $this->getIndex()->refresh(); |
459 | } |
460 | |
461 | private function deleteOldIndex() { |
462 | if ( $this->oldIndex && $this->oldIndex->exists() ) { |
463 | $this->log( "Deleting " . $this->oldIndex->getName() . " ... " ); |
464 | // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated |
465 | // to allow passing the master_timeout |
466 | $this->oldIndex->request( |
467 | '', |
468 | Request::DELETE, |
469 | [], |
470 | [ 'master_timeout' => $this->masterTimeout ] |
471 | ); |
472 | $this->output( "ok.\n" ); |
473 | } |
474 | } |
475 | |
476 | /** |
477 | * Delete an index |
478 | * @param \Elastica\Index $index |
479 | */ |
480 | private function deleteIndex( \Elastica\Index $index ) { |
481 | // @todo Utilize $this->oldIndex->delete(...) once Elastica library is updated |
482 | // to allow passing the master_timeout |
483 | $index->request( |
484 | '', |
485 | Request::DELETE, |
486 | [], |
487 | [ 'master_timeout' => $this->masterTimeout ] |
488 | ); |
489 | } |
490 | |
491 | private function optimize() { |
492 | $this->log( "Optimizing index..." ); |
493 | $this->getIndex()->forcemerge( [ 'max_num_segments' => 1 ] ); |
494 | $this->output( "ok.\n" ); |
495 | } |
496 | |
497 | private function expungeDeletes() { |
498 | $this->log( "Purging deleted docs..." ); |
499 | $this->getIndex()->forcemerge( [ 'only_expunge_deletes' => 'true', 'flush' => 'false' ] ); |
500 | $this->output( "ok.\n" ); |
501 | } |
502 | |
503 | private function indexData() { |
504 | // We build the suggestions by reading CONTENT and GENERAL indices. |
505 | // This does not support extra indices like FILES on commons. |
506 | $sourceIndexSuffixes = [ Connection::CONTENT_INDEX_SUFFIX, Connection::GENERAL_INDEX_SUFFIX ]; |
507 | |
508 | $query = new Query(); |
509 | $query->setSource( [ |
510 | 'includes' => $this->builder->getRequiredFields() |
511 | ] ); |
512 | |
513 | $pageAndNs = new Elastica\Query\BoolQuery(); |
514 | $pageAndNs->addShould( new Elastica\Query\Term( [ "namespace" => NS_MAIN ] ) ); |
515 | $pageAndNs->addShould( new Elastica\Query\Term( [ "redirect.namespace" => NS_MAIN ] ) ); |
516 | $bool = new Elastica\Query\BoolQuery(); |
517 | $bool->addFilter( $pageAndNs ); |
518 | |
519 | $query->setQuery( $bool ); |
520 | $query->setSort( [ '_doc' ] ); |
521 | // Explicitly ask for accurate total_hits even-though we use a scroll request |
522 | $query->setTrackTotalHits( true ); |
523 | |
524 | foreach ( $sourceIndexSuffixes as $sourceIndexSuffix ) { |
525 | $sourceIndex = $this->getConnection()->getIndex( $this->indexBaseName, $sourceIndexSuffix ); |
526 | $search = new \Elastica\Search( $this->getClient() ); |
527 | $search->setQuery( $query ); |
528 | $search->addIndex( $sourceIndex ); |
529 | $query->setSize( $this->indexChunkSize ); |
530 | $totalDocsToDump = -1; |
531 | $scroll = new \Elastica\Scroll( $search, '15m' ); |
532 | |
533 | $docsDumped = 0; |
534 | $destinationIndex = $this->getIndex(); |
535 | |
536 | foreach ( $scroll as $results ) { |
537 | if ( $totalDocsToDump === -1 ) { |
538 | $totalDocsToDump = $results->getTotalHits(); |
539 | if ( $totalDocsToDump === 0 ) { |
540 | $this->log( "No documents to index from $sourceIndexSuffix\n" ); |
541 | break; |
542 | } |
543 | $this->log( "Indexing $totalDocsToDump documents from $sourceIndexSuffix with " . |
544 | "batchId: {$this->builder->getBatchId()}\n" ); |
545 | } |
546 | $inputDocs = []; |
547 | foreach ( $results as $result ) { |
548 | $docsDumped++; |
549 | $inputDocs[] = [ |
550 | 'id' => $result->getId(), |
551 | 'source' => $result->getSource() |
552 | ]; |
553 | } |
554 | |
555 | $suggestDocs = $this->builder->build( $inputDocs ); |
556 | if ( empty( $suggestDocs ) ) { |
557 | continue; |
558 | } |
559 | $this->outputProgress( $docsDumped, $totalDocsToDump ); |
560 | MWElasticUtils::withRetry( $this->indexRetryAttempts, |
561 | static function () use ( $destinationIndex, $suggestDocs ) { |
562 | $destinationIndex->addDocuments( $suggestDocs ); |
563 | } |
564 | ); |
565 | } |
566 | $this->log( "Indexing from $sourceIndexSuffix index done.\n" ); |
567 | } |
568 | } |
569 | |
570 | public function validateAlias() { |
571 | // @todo utilize the following once Elastica is updated to support passing |
572 | // master_timeout. This is a copy of the Elastica\Index::addAlias() method |
573 | // $this->getIndex()->addAlias( $this->getIndexTypeName(), true ); |
574 | $index = $this->getIndex(); |
575 | $name = $this->getIndexAliasName(); |
576 | |
577 | $path = '_aliases'; |
578 | $data = [ 'actions' => [] ]; |
579 | $status = new Status( $index->getClient() ); |
580 | foreach ( $status->getIndicesWithAlias( $name ) as $aliased ) { |
581 | $data['actions'][] = [ 'remove' => [ 'index' => $aliased->getName(), 'alias' => $name ] ]; |
582 | } |
583 | |
584 | $data['actions'][] = [ 'add' => [ 'index' => $index->getName(), 'alias' => $name ] ]; |
585 | |
586 | $index->getClient()->request( |
587 | $path, Request::POST, $data, [ 'master_timeout' => $this->masterTimeout ] |
588 | ); |
589 | } |
590 | |
591 | /** |
592 | * public because php 5.3 does not support accessing private |
593 | * methods in a closure. |
594 | * @param int $docsDumped |
595 | * @param int $limit |
596 | */ |
597 | public function outputProgress( $docsDumped, $limit ) { |
598 | if ( $docsDumped <= 0 ) { |
599 | return; |
600 | } |
601 | $pctDone = (int)( ( $docsDumped / $limit ) * 100 ); |
602 | if ( $this->lastProgressPrinted == $pctDone ) { |
603 | return; |
604 | } |
605 | $this->lastProgressPrinted = $pctDone; |
606 | if ( ( $pctDone % 2 ) == 0 ) { |
607 | $this->outputIndented( " $pctDone% done...\n" ); |
608 | } |
609 | } |
610 | |
611 | public function log( $message, $channel = null ) { |
612 | $date = new \DateTime(); |
613 | parent::output( $date->format( 'Y-m-d H:i:s' ) . " " . $message, $channel ); |
614 | } |
615 | |
616 | /** |
617 | * @param string $langCode |
618 | * @param array $availablePlugins |
619 | * @return AnalysisConfigBuilder |
620 | */ |
621 | private function pickAnalyzer( $langCode, array $availablePlugins = [] ) { |
622 | $analysisConfigBuilder = new SuggesterAnalysisConfigBuilder( $langCode, $availablePlugins ); |
623 | $this->outputIndented( 'Picking analyzer...' . |
624 | $analysisConfigBuilder->getDefaultTextAnalyzerType( $langCode ) . |
625 | "\n" ); |
626 | return $analysisConfigBuilder; |
627 | } |
628 | |
629 | private function createIndex() { |
630 | // This is "create only" for now. |
631 | if ( $this->getIndex()->exists() ) { |
632 | throw new \Exception( "Index already exists." ); |
633 | } |
634 | |
635 | $mappingConfigBuilder = new SuggesterMappingConfigBuilder(); |
636 | |
637 | // We create the index with 0 replicas, this is faster and will |
638 | // stress less nodes with 4 shards and 2 replicas we would |
639 | // stress 12 nodes (moreover with the optimize flag) |
640 | $settings = [ |
641 | 'number_of_shards' => $this->getShardCount(), |
642 | // hacky but we still use auto_expand_replicas |
643 | // for convenience on small install. |
644 | 'auto_expand_replicas' => "0-0", |
645 | 'refresh_interval' => -1, |
646 | 'analysis' => $this->analysisConfig, |
647 | 'routing.allocation.total_shards_per_node' => $this->getMaxShardsPerNode(), |
648 | ]; |
649 | |
650 | if ( $this->hasOption( 'allocationIncludeTag' ) ) { |
651 | $this->output( "Using routing.allocation.include.tag: " . |
652 | "{$this->getOption( 'allocationIncludeTag' )}, the index might be stuck in red " . |
653 | "if the cluster is not properly configured.\n" ); |
654 | $settings['routing.allocation.include.tag'] = $this->getOption( 'allocationIncludeTag' ); |
655 | } |
656 | |
657 | if ( $this->hasOption( 'allocationExcludeTag' ) ) { |
658 | $this->output( "Using routing.allocation.exclude.tag: " . |
659 | "{$this->getOption( 'allocationExcludeTag' )}, the index might be stuck in red " . |
660 | "if the cluster is not properly configured.\n" ); |
661 | $settings['routing.allocation.exclude.tag'] = $this->getOption( 'allocationExcludeTag' ); |
662 | } |
663 | |
664 | $args = [ |
665 | 'settings' => [ 'index' => $settings ], |
666 | 'mappings' => $mappingConfigBuilder->buildConfig() |
667 | ]; |
668 | // @todo utilize $this->getIndex()->create(...) once it supports setting |
669 | // the master_timeout parameter. |
670 | $this->getIndex()->request( |
671 | '', |
672 | Request::PUT, |
673 | $args, |
674 | [ 'master_timeout' => $this->masterTimeout ] |
675 | ); |
676 | |
677 | // Index create is async, we have to make sure that the index is ready |
678 | // before sending any docs to it. |
679 | $this->waitForGreen(); |
680 | } |
681 | |
682 | private function enableReplicas() { |
683 | $this->log( "Enabling replicas...\n" ); |
684 | $args = [ |
685 | 'index' => [ |
686 | 'auto_expand_replicas' => $this->getReplicaCount(), |
687 | ], |
688 | ]; |
689 | |
690 | $path = $this->getIndex()->getName() . "/_settings"; |
691 | $this->getIndex()->getClient()->request( |
692 | $path, |
693 | Request::PUT, |
694 | $args, |
695 | [ 'master_timeout' => $this->masterTimeout ] |
696 | ); |
697 | |
698 | // The previous call seems to be async, let's wait few sec |
699 | // otherwise replication won't have time to start. |
700 | sleep( 20 ); |
701 | |
702 | // Index will be yellow while replica shards are being allocated. |
703 | $this->waitForGreen( $this->getOption( 'replicationTimeout', 3600 ) ); |
704 | } |
705 | |
706 | private function waitForGreen( $timeout = 600 ) { |
707 | $this->log( "Waiting for the index to go green...\n" ); |
708 | // Wait for the index to go green ( default 10 min) |
709 | if ( !$this->utils->waitForGreen( $this->getIndex()->getName(), $timeout ) ) { |
710 | $this->fatalError( "Failed to wait for green... please check config and " . |
711 | "delete the {$this->getIndex()->getName()} index if it was created." ); |
712 | } |
713 | } |
714 | |
715 | /** |
716 | * @return string Number of replicas this index should have. May be a range such as '0-2' |
717 | */ |
718 | private function getReplicaCount() { |
719 | return $this->getConnection()->getSettings()->getReplicaCount( $this->indexSuffix ); |
720 | } |
721 | |
722 | private function getShardCount() { |
723 | return $this->getConnection()->getSettings()->getShardCount( $this->indexSuffix ); |
724 | } |
725 | |
726 | /** |
727 | * @return int Maximum number of shards that can be allocated on a single elasticsearch |
728 | * node. -1 for unlimited. |
729 | */ |
730 | private function getMaxShardsPerNode() { |
731 | return $this->getConnection()->getSettings()->getMaxShardsPerNode( $this->indexSuffix ); |
732 | } |
733 | |
734 | private function updateVersions() { |
735 | $this->log( "Updating tracking indexes..." ); |
736 | $this->getMetaStore() |
737 | ->versionStore() |
738 | ->update( $this->indexBaseName, $this->indexSuffix ); |
739 | $this->output( "ok.\n" ); |
740 | } |
741 | |
742 | /** |
743 | * @return \Elastica\Index being updated |
744 | */ |
745 | public function getIndex() { |
746 | return $this->getConnection()->getIndex( |
747 | $this->indexBaseName, $this->indexSuffix, $this->indexIdentifier |
748 | ); |
749 | } |
750 | |
751 | /** |
752 | * @return Elastica\Client |
753 | */ |
754 | protected function getClient() { |
755 | return $this->getConnection()->getClient(); |
756 | } |
757 | |
758 | /** |
759 | * @return string name of the index type being updated |
760 | */ |
761 | protected function getIndexAliasName() { |
762 | return $this->getConnection()->getIndexName( $this->indexBaseName, $this->indexSuffix ); |
763 | } |
764 | } |
765 | |
766 | $maintClass = UpdateSuggesterIndex::class; |
767 | require_once RUN_MAINTENANCE_IF_MAIN; |