Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
64.47% |
225 / 349 |
|
33.33% |
5 / 15 |
CRAP | |
0.00% |
0 / 1 |
DataSender | |
64.47% |
225 / 349 |
|
33.33% |
5 / 15 |
335.07 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
1 | |||
sendUpdateWeightedTags | |
100.00% |
15 / 15 |
|
100.00% |
1 / 1 |
3 | |||
sendWeightedTagsUpdate | |
69.81% |
37 / 53 |
|
0.00% |
0 / 1 |
9.76 | |||
sendResetWeightedTags | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
sendData | |
68.69% |
68 / 99 |
|
0.00% |
0 / 1 |
34.54 | |||
reportUpdateMetrics | |
0.00% |
0 / 26 |
|
0.00% |
0 / 1 |
56 | |||
sendDeletes | |
67.65% |
23 / 34 |
|
0.00% |
0 / 1 |
5.85 | |||
sendOtherIndexUpdates | |
75.56% |
34 / 45 |
|
0.00% |
0 / 1 |
7.72 | |||
decideRequiredSetAction | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
2.03 | |||
bulkResponseExceptionIsJustDocumentMissing | |
0.00% |
0 / 20 |
|
0.00% |
0 / 1 |
90 | |||
newLog | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
docToSuperDetectNoopScript | |
94.12% |
16 / 17 |
|
0.00% |
0 / 1 |
5.01 | |||
retryOnConflict | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
convertEncoding | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
reportDocSize | |
75.00% |
6 / 8 |
|
0.00% |
0 / 1 |
2.06 |
1 | <?php |
2 | |
3 | namespace CirrusSearch; |
4 | |
5 | use CirrusSearch\BuildDocument\BuildDocument; |
6 | use CirrusSearch\BuildDocument\BuildDocumentException; |
7 | use CirrusSearch\BuildDocument\DocumentSizeLimiter; |
8 | use CirrusSearch\Extra\MultiList\MultiListBuilder; |
9 | use CirrusSearch\Profile\SearchProfileService; |
10 | use CirrusSearch\Search\CirrusIndexField; |
11 | use CirrusSearch\Wikimedia\WeightedTagsHooks; |
12 | use Elastica\Bulk\Action\AbstractDocument; |
13 | use Elastica\Document; |
14 | use Elastica\Exception\Bulk\ResponseException; |
15 | use Elastica\Exception\RuntimeException; |
16 | use Elastica\JSON; |
17 | use Elastica\Response; |
18 | use MediaWiki\Logger\LoggerFactory; |
19 | use MediaWiki\MediaWikiServices; |
20 | use MediaWiki\Status\Status; |
21 | use MediaWiki\Title\Title; |
22 | use Wikimedia\Assert\Assert; |
23 | use Wikimedia\Rdbms\IDBAccessObject; |
24 | use Wikimedia\Stats\StatsFactory; |
25 | |
26 | /** |
27 | * Handles non-maintenance write operations to the elastic search cluster. |
28 | * |
29 | * This program is free software; you can redistribute it and/or modify |
30 | * it under the terms of the GNU General Public License as published by |
31 | * the Free Software Foundation; either version 2 of the License, or |
32 | * (at your option) any later version. |
33 | * |
34 | * This program is distributed in the hope that it will be useful, |
35 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
36 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
37 | * GNU General Public License for more details. |
38 | * |
39 | * You should have received a copy of the GNU General Public License along |
40 | * with this program; if not, write to the Free Software Foundation, Inc., |
41 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
42 | * http://www.gnu.org/copyleft/gpl.html |
43 | */ |
44 | class DataSender extends ElasticsearchIntermediary { |
45 | |
46 | /** @var \Psr\Log\LoggerInterface */ |
47 | private $log; |
48 | |
49 | /** @var \Psr\Log\LoggerInterface */ |
50 | private $failedLog; |
51 | |
52 | /** |
53 | * @var string |
54 | */ |
55 | private $indexBaseName; |
56 | |
57 | /** |
58 | * @var SearchConfig |
59 | */ |
60 | private $searchConfig; |
61 | |
62 | private StatsFactory $stats; |
63 | /** |
64 | * @var DocumentSizeLimiter |
65 | */ |
66 | private $docSizeLimiter; |
67 | |
68 | /** |
69 | * @param Connection $conn |
70 | * @param SearchConfig $config |
71 | * @param StatsFactory|null $stats A StatsFactory (already prefixed with the right component) |
72 | * @param DocumentSizeLimiter|null $docSizeLimiter |
73 | */ |
74 | public function __construct( |
75 | Connection $conn, |
76 | SearchConfig $config, |
77 | ?StatsFactory $stats = null, |
78 | ?DocumentSizeLimiter $docSizeLimiter = null |
79 | ) { |
80 | parent::__construct( $conn, null, 0 ); |
81 | $this->stats = $stats ?? Util::getStatsFactory(); |
82 | $this->log = LoggerFactory::getInstance( 'CirrusSearch' ); |
83 | $this->failedLog = LoggerFactory::getInstance( 'CirrusSearchChangeFailed' ); |
84 | $this->indexBaseName = $config->get( SearchConfig::INDEX_BASE_NAME ); |
85 | $this->searchConfig = $config; |
86 | $this->docSizeLimiter = $docSizeLimiter ?? new DocumentSizeLimiter( |
87 | $config->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) ); |
88 | } |
89 | |
90 | /** |
91 | * @deprecated use {@link sendWeightedTagsUpdate} instead. |
92 | */ |
93 | public function sendUpdateWeightedTags( |
94 | string $indexSuffix, |
95 | array $docIds, |
96 | string $tagField, |
97 | string $tagPrefix, |
98 | $tagNames = null, |
99 | ?array $tagWeights = null, |
100 | int $batchSize = 30 |
101 | ): Status { |
102 | return $this->sendWeightedTagsUpdate( |
103 | $indexSuffix, |
104 | $tagPrefix, |
105 | is_array( $tagWeights ) ? array_reduce( |
106 | $docIds, static function ( $docTagsWeights, $docId ) use ( $tagNames, $tagWeights ) { |
107 | if ( array_key_exists( $docId, $tagWeights ) ) { |
108 | $docTagsWeights[$docId] = MultiListBuilder::buildTagWeightsFromLegacyParameters( |
109 | $tagNames, |
110 | $tagWeights[$docId] |
111 | ); |
112 | } |
113 | |
114 | return $docTagsWeights; |
115 | }, [] |
116 | ) : array_fill_keys( $docIds, MultiListBuilder::buildTagWeightsFromLegacyParameters( $tagNames ) ), |
117 | $batchSize |
118 | ); |
119 | } |
120 | |
121 | /** |
122 | * @param string $indexSuffix |
123 | * @param string $tagPrefix |
124 | * @param int[][]|null[][] $tagWeights a map of `[ docId: string => [ tagName: string => tagWeight: int|null ] ]` |
125 | * @param int $batchSize |
126 | * |
127 | * @return Status |
128 | */ |
129 | public function sendWeightedTagsUpdate( |
130 | string $indexSuffix, |
131 | string $tagPrefix, |
132 | array $tagWeights, |
133 | int $batchSize = 30 |
134 | ): Status { |
135 | $client = $this->connection->getClient(); |
136 | $status = Status::newGood(); |
137 | $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix ); |
138 | foreach ( array_chunk( array_keys( $tagWeights ), $batchSize ) as $docIdsChunk ) { |
139 | $bulk = new \Elastica\Bulk( $client ); |
140 | $bulk->setIndex( $pageIndex ); |
141 | foreach ( $docIdsChunk as $docId ) { |
142 | $docTags = MultiListBuilder::buildWeightedTags( |
143 | $tagPrefix, |
144 | $tagWeights[$docId], |
145 | ); |
146 | $script = new \Elastica\Script\Script( 'super_detect_noop', [ |
147 | 'source' => [ |
148 | WeightedTagsHooks::FIELD_NAME => array_map( static fn ( $docTag ) => (string)$docTag, |
149 | $docTags ) |
150 | ], |
151 | 'handlers' => [ WeightedTagsHooks::FIELD_NAME => CirrusIndexField::MULTILIST_HANDLER ], |
152 | ], 'super_detect_noop' ); |
153 | $script->setId( $docId ); |
154 | $bulk->addScript( $script, 'update' ); |
155 | } |
156 | |
157 | if ( !$bulk->getActions() ) { |
158 | continue; |
159 | } |
160 | |
161 | // Execute the bulk update |
162 | $exception = null; |
163 | try { |
164 | $this->start( |
165 | new BulkUpdateRequestLog( |
166 | $this->connection->getClient(), |
167 | 'updating {numBulk} documents', |
168 | 'send_data_reset_weighted_tags', |
169 | [ |
170 | 'numBulk' => count( $docIdsChunk ), |
171 | 'index' => $pageIndex->getName() |
172 | ] |
173 | ) |
174 | ); |
175 | $bulk->send(); |
176 | } catch ( ResponseException $e ) { |
177 | if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) { |
178 | $exception = $e; |
179 | } |
180 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
181 | $exception = $e; |
182 | } |
183 | if ( $exception === null ) { |
184 | $this->success(); |
185 | } else { |
186 | $this->failure( $exception ); |
187 | $this->failedLog->warning( |
188 | "Update weighted tag {weightedTagFieldName} for {weightedTagPrefix} in articles: {docIds}", [ |
189 | 'exception' => $exception, |
190 | 'weightedTagFieldName' => WeightedTagsHooks::FIELD_NAME, |
191 | 'weightedTagPrefix' => $tagPrefix, |
192 | 'weightedTagWeight' => var_export( $tagWeights, true ), |
193 | 'docIds' => implode( ',', array_keys( $tagWeights ) ) |
194 | ] |
195 | ); |
196 | } |
197 | } |
198 | |
199 | return $status; |
200 | } |
201 | |
202 | /** |
203 | * @deprecated use {@link sendWeightedTagsUpdate} instead. |
204 | */ |
205 | public function sendResetWeightedTags( |
206 | string $indexSuffix, |
207 | array $docIds, |
208 | string $tagField, |
209 | string $tagPrefix, |
210 | int $batchSize = 30 |
211 | ): Status { |
212 | return $this->sendWeightedTagsUpdate( |
213 | $indexSuffix, |
214 | $tagPrefix, |
215 | array_fill_keys( $docIds, [ CirrusIndexField::MULTILIST_DELETE_GROUPING => null ] ), |
216 | $batchSize |
217 | ); |
218 | } |
219 | |
220 | /** |
221 | * @param string $indexSuffix suffix of index to which to send $documents |
222 | * @param \Elastica\Document[] $documents documents to send |
223 | * @return Status |
224 | */ |
225 | public function sendData( $indexSuffix, array $documents ) { |
226 | if ( !$documents ) { |
227 | return Status::newGood(); |
228 | } |
229 | |
230 | // Copy the docs so that modifications made in this method are not propagated up to the caller |
231 | $docsCopy = []; |
232 | foreach ( $documents as $doc ) { |
233 | $docsCopy[] = clone $doc; |
234 | } |
235 | $documents = $docsCopy; |
236 | |
237 | // Perform final stage of document building. This only |
238 | // applies to `page` documents, docs built by something |
239 | // other than BuildDocument will pass through unchanged. |
240 | $services = MediaWikiServices::getInstance(); |
241 | $builder = new BuildDocument( |
242 | $this->connection, |
243 | $services->getConnectionProvider()->getReplicaDatabase(), |
244 | $services->getRevisionStore(), |
245 | $services->getBacklinkCacheFactory(), |
246 | $this->docSizeLimiter, |
247 | $services->getTitleFormatter(), |
248 | $services->getWikiPageFactory(), |
249 | $services->getTitleFactory() |
250 | ); |
251 | try { |
252 | foreach ( $documents as $i => $doc ) { |
253 | if ( !$builder->finalize( $doc ) ) { |
254 | // Something has changed while this was hanging out in the job |
255 | // queue and should no longer be written to elastic. |
256 | unset( $documents[$i] ); |
257 | } |
258 | $this->reportDocSize( $doc ); |
259 | } |
260 | } catch ( BuildDocumentException $be ) { |
261 | $this->failedLog->warning( |
262 | 'Failed to update documents', |
263 | [ 'exception' => $be ] |
264 | ); |
265 | return Status::newFatal( 'cirrussearch-failed-build-document' ); |
266 | } |
267 | |
268 | if ( !$documents ) { |
269 | // All documents noop'd |
270 | return Status::newGood(); |
271 | } |
272 | |
273 | /** |
274 | * Transform the finalized documents into noop scripts if possible |
275 | * to reduce update load. |
276 | */ |
277 | if ( $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop' ) ) { |
278 | foreach ( $documents as $i => $doc ) { |
279 | // BC Check for jobs that used to contain Document|Script |
280 | if ( $doc instanceof \Elastica\Document ) { |
281 | $documents[$i] = $this->docToSuperDetectNoopScript( $doc ); |
282 | } |
283 | } |
284 | } |
285 | |
286 | foreach ( $documents as $doc ) { |
287 | $doc->setRetryOnConflict( $this->retryOnConflict() ); |
288 | // Hints need to be retained until after finalizing |
289 | // the documents and building the noop scripts. |
290 | CirrusIndexField::resetHints( $doc ); |
291 | } |
292 | |
293 | $exception = null; |
294 | $responseSet = null; |
295 | $justDocumentMissing = false; |
296 | try { |
297 | $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix ); |
298 | |
299 | $this->start( new BulkUpdateRequestLog( |
300 | $this->connection->getClient(), |
301 | 'sending {numBulk} documents to the {index} index(s)', |
302 | 'send_data_write', |
303 | [ 'numBulk' => count( $documents ), 'index' => $pageIndex->getName() ] |
304 | ) ); |
305 | $bulk = new \Elastica\Bulk( $this->connection->getClient() ); |
306 | $bulk->setShardTimeout( $this->searchConfig->get( 'CirrusSearchUpdateShardTimeout' ) ); |
307 | $bulk->setIndex( $pageIndex ); |
308 | if ( $this->searchConfig->getElement( 'CirrusSearchElasticQuirks', 'retry_on_conflict' ) ) { |
309 | $actions = []; |
310 | foreach ( $documents as $doc ) { |
311 | $action = AbstractDocument::create( $doc, 'update' ); |
312 | $metadata = $action->getMetadata(); |
313 | // Rename deprecated _retry_on_conflict |
314 | // TODO: fix upstream in Elastica. |
315 | if ( isset( $metadata['_retry_on_conflict'] ) ) { |
316 | $metadata['retry_on_conflict'] = $metadata['_retry_on_conflict']; |
317 | unset( $metadata['_retry_on_conflict'] ); |
318 | $action->setMetadata( $metadata ); |
319 | } |
320 | $actions[] = $action; |
321 | } |
322 | |
323 | $bulk->addActions( $actions ); |
324 | } else { |
325 | $bulk->addData( $documents, 'update' ); |
326 | } |
327 | $responseSet = $bulk->send(); |
328 | } catch ( ResponseException $e ) { |
329 | $justDocumentMissing = $this->bulkResponseExceptionIsJustDocumentMissing( $e, |
330 | function ( $docId ) use ( $e, $indexSuffix ) { |
331 | $this->log->info( |
332 | "Updating a page that doesn't yet exist in Elasticsearch: {docId}", |
333 | [ 'docId' => $docId, 'indexSuffix' => $indexSuffix ] |
334 | ); |
335 | } |
336 | ); |
337 | $exception = $e; |
338 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
339 | $exception = $e; |
340 | } |
341 | |
342 | if ( $justDocumentMissing ) { |
343 | // wa have a failure but this is just docs that are missing in the index |
344 | // missing docs are logged above |
345 | $this->success(); |
346 | return Status::newGood(); |
347 | } |
348 | // check if the response is valid by making sure that it has bulk responses |
349 | if ( $responseSet !== null && count( $responseSet->getBulkResponses() ) > 0 ) { |
350 | $this->success(); |
351 | $this->reportUpdateMetrics( $responseSet, $indexSuffix, count( $documents ) ); |
352 | return Status::newGood(); |
353 | } |
354 | // Everything else should be a failure. |
355 | if ( $exception === null ) { |
356 | // Elastica failed to identify the error, reason is that the Elastica Bulk\Response |
357 | // does identify errors only in individual responses if the request fails without |
358 | // getting a formal elastic response Bulk\Response->isOk might remain true |
359 | // So here we construct the ResponseException that should have been built and thrown |
360 | // by Elastica |
361 | $lastRequest = $this->connection->getClient()->getLastRequest(); |
362 | if ( $lastRequest !== null ) { |
363 | $exception = new \Elastica\Exception\ResponseException( $lastRequest, |
364 | new Response( $responseSet->getData() ) ); |
365 | } else { |
366 | $exception = new RuntimeException( "Unknown error in bulk request (Client::getLastRequest() is null)" ); |
367 | } |
368 | } |
369 | $this->failure( $exception ); |
370 | $documentIds = array_map( static function ( $d ) { |
371 | return (string)( $d->getId() ); |
372 | }, $documents ); |
373 | $this->failedLog->warning( |
374 | 'Failed to update documents {docId}', |
375 | [ |
376 | 'docId' => implode( ', ', $documentIds ), |
377 | 'exception' => $exception |
378 | ] |
379 | ); |
380 | return Status::newFatal( 'cirrussearch-failed-send-data' ); |
381 | } |
382 | |
383 | /** |
384 | * @param \Elastica\Bulk\ResponseSet $responseSet |
385 | * @param string $indexSuffix |
386 | * @param int $sent |
387 | */ |
388 | private function reportUpdateMetrics( |
389 | \Elastica\Bulk\ResponseSet $responseSet, $indexSuffix, $sent |
390 | ) { |
391 | $updateStats = [ |
392 | 'sent' => $sent, |
393 | ]; |
394 | $allowedOps = [ 'created', 'updated', 'noop' ]; |
395 | foreach ( $responseSet->getBulkResponses() as $bulk ) { |
396 | $opRes = 'unknown'; |
397 | if ( $bulk instanceof \Elastica\Bulk\Response ) { |
398 | if ( isset( $bulk->getData()['result'] ) |
399 | && in_array( $bulk->getData()['result'], $allowedOps ) |
400 | ) { |
401 | $opRes = $bulk->getData()['result']; |
402 | } |
403 | } |
404 | if ( isset( $updateStats[$opRes] ) ) { |
405 | $updateStats[$opRes]++; |
406 | } else { |
407 | $updateStats[$opRes] = 1; |
408 | } |
409 | } |
410 | $cluster = $this->connection->getClusterName(); |
411 | $metricsPrefix = "CirrusSearch.$cluster.updates"; |
412 | foreach ( $updateStats as $what => $num ) { |
413 | $this->stats->getCounter( "update_total" ) |
414 | ->setLabel( "status", $what ) |
415 | ->setLabel( "search_cluster", $cluster ) |
416 | ->setLabel( "index_name", $this->indexBaseName ) |
417 | ->setLabel( "index_suffix", $indexSuffix ) |
418 | ->copyToStatsdAt( [ |
419 | "$metricsPrefix.details.{$this->indexBaseName}.$indexSuffix.$what", |
420 | "$metricsPrefix.all.$what" |
421 | ] ) |
422 | ->incrementBy( $num ); |
423 | } |
424 | } |
425 | |
426 | /** |
427 | * Send delete requests to Elasticsearch. |
428 | * |
429 | * @param string[] $docIds elasticsearch document ids to delete |
430 | * @param string|null $indexSuffix index from which to delete. null means all. |
431 | * @return Status |
432 | */ |
433 | public function sendDeletes( $docIds, $indexSuffix = null ) { |
434 | if ( $indexSuffix === null ) { |
435 | $indexes = $this->connection->getAllIndexSuffixes( Connection::PAGE_DOC_TYPE ); |
436 | } else { |
437 | $indexes = [ $indexSuffix ]; |
438 | } |
439 | |
440 | $idCount = count( $docIds ); |
441 | if ( $idCount !== 0 ) { |
442 | try { |
443 | foreach ( $indexes as $indexSuffix ) { |
444 | $this->startNewLog( |
445 | 'deleting {numIds} from {indexSuffix}', |
446 | 'send_deletes', [ |
447 | 'numIds' => $idCount, |
448 | 'indexSuffix' => $indexSuffix, |
449 | ] |
450 | ); |
451 | $this->connection |
452 | ->getIndex( $this->indexBaseName, $indexSuffix ) |
453 | ->deleteDocuments( |
454 | array_map( |
455 | static function ( $id ) { |
456 | return new Document( $id ); |
457 | }, $docIds |
458 | ) |
459 | ); |
460 | $this->success(); |
461 | } |
462 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
463 | $this->failure( $e ); |
464 | $this->failedLog->warning( |
465 | 'Failed to delete documents: {docId}', |
466 | [ |
467 | 'docId' => implode( ', ', $docIds ), |
468 | 'exception' => $e, |
469 | ] |
470 | ); |
471 | return Status::newFatal( 'cirrussearch-failed-send-deletes' ); |
472 | } |
473 | } |
474 | |
475 | return Status::newGood(); |
476 | } |
477 | |
478 | /** |
479 | * @param string $localSite The wikiId to add/remove from local_sites_with_dupe |
480 | * @param string $indexName The name of the index to perform updates to |
481 | * @param array[] $otherActions A list of arrays each containing the id within elasticsearch |
482 | * ('docId') and the article namespace ('ns') and DB key ('dbKey') at the within $localSite |
483 | * @param int $batchSize number of docs to update in a single bulk |
484 | * @return Status |
485 | */ |
486 | public function sendOtherIndexUpdates( $localSite, $indexName, array $otherActions, $batchSize = 30 ) { |
487 | $client = $this->connection->getClient(); |
488 | $status = Status::newGood(); |
489 | foreach ( array_chunk( $otherActions, $batchSize ) as $updates ) { |
490 | '@phan-var array[] $updates'; |
491 | $bulk = new \Elastica\Bulk( $client ); |
492 | $titles = []; |
493 | foreach ( $updates as $update ) { |
494 | $title = Title::makeTitle( $update['ns'], $update['dbKey'] ); |
495 | $action = $this->decideRequiredSetAction( $title ); |
496 | $script = new \Elastica\Script\Script( |
497 | 'super_detect_noop', |
498 | [ |
499 | 'source' => [ |
500 | 'local_sites_with_dupe' => [ $action => $localSite ], |
501 | ], |
502 | 'handlers' => [ 'local_sites_with_dupe' => 'set' ], |
503 | ], |
504 | 'super_detect_noop' |
505 | ); |
506 | $script->setId( $update['docId'] ); |
507 | $script->setParam( '_type', '_doc' ); |
508 | $script->setParam( '_index', $indexName ); |
509 | $bulk->addScript( $script, 'update' ); |
510 | $titles[] = $title; |
511 | } |
512 | |
513 | // Execute the bulk update |
514 | $exception = null; |
515 | try { |
516 | $this->start( new BulkUpdateRequestLog( |
517 | $this->connection->getClient(), |
518 | 'updating {numBulk} documents in other indexes', |
519 | 'send_data_other_idx_write', |
520 | [ 'numBulk' => count( $updates ), 'index' => $indexName ] |
521 | ) ); |
522 | $bulk->send(); |
523 | } catch ( ResponseException $e ) { |
524 | if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) { |
525 | $exception = $e; |
526 | } |
527 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
528 | $exception = $e; |
529 | } |
530 | if ( $exception === null ) { |
531 | $this->success(); |
532 | } else { |
533 | $this->failure( $exception ); |
534 | $this->failedLog->warning( |
535 | "OtherIndex update for articles: {titleStr}", |
536 | [ 'exception' => $exception, 'titleStr' => implode( ',', $titles ) ] |
537 | ); |
538 | $status->error( 'cirrussearch-failed-update-otherindex' ); |
539 | } |
540 | } |
541 | |
542 | return $status; |
543 | } |
544 | |
545 | /** |
546 | * Decide what action is required to the other index to make it up |
547 | * to data with the current wiki state. This will always check against |
548 | * the master database. |
549 | * |
550 | * @param Title $title The title to decide the action for |
551 | * @return string The set action to be performed. Either 'add' or 'remove' |
552 | */ |
553 | protected function decideRequiredSetAction( Title $title ) { |
554 | $page = MediaWikiServices::getInstance()->getWikiPageFactory()->newFromTitle( $title ); |
555 | $page->loadPageData( IDBAccessObject::READ_LATEST ); |
556 | if ( $page->exists() ) { |
557 | return 'add'; |
558 | } else { |
559 | return 'remove'; |
560 | } |
561 | } |
562 | |
563 | /** |
564 | * Check if $exception is a bulk response exception that just contains |
565 | * document is missing failures. |
566 | * |
567 | * @param ResponseException $exception exception to check |
568 | * @param callable|null $logCallback Callback in which to do some logging. |
569 | * Callback will be passed the id of the missing document. |
570 | * @return bool |
571 | */ |
572 | protected function bulkResponseExceptionIsJustDocumentMissing( |
573 | ResponseException $exception, $logCallback = null |
574 | ) { |
575 | $justDocumentMissing = true; |
576 | foreach ( $exception->getResponseSet()->getBulkResponses() as $bulkResponse ) { |
577 | if ( !$bulkResponse->hasError() ) { |
578 | continue; |
579 | } |
580 | |
581 | $error = $bulkResponse->getFullError(); |
582 | if ( is_string( $error ) ) { |
583 | // es 1.7 cluster |
584 | $message = $bulkResponse->getError(); |
585 | if ( strpos( $message, 'DocumentMissingException' ) === false ) { |
586 | $justDocumentMissing = false; |
587 | continue; |
588 | } |
589 | } else { |
590 | // es 2.x cluster |
591 | if ( $error !== null && $error['type'] !== 'document_missing_exception' ) { |
592 | $justDocumentMissing = false; |
593 | continue; |
594 | } |
595 | } |
596 | |
597 | if ( $logCallback ) { |
598 | // This is generally not an error but we should |
599 | // log it to see how many we get |
600 | $action = $bulkResponse->getAction(); |
601 | $docId = 'missing'; |
602 | if ( $action instanceof \Elastica\Bulk\Action\AbstractDocument ) { |
603 | $docId = $action->getData()->getId(); |
604 | } |
605 | call_user_func( $logCallback, $docId ); |
606 | } |
607 | } |
608 | return $justDocumentMissing; |
609 | } |
610 | |
611 | /** |
612 | * @param string $description |
613 | * @param string $queryType |
614 | * @param string[] $extra |
615 | * @return SearchRequestLog |
616 | */ |
617 | protected function newLog( $description, $queryType, array $extra = [] ) { |
618 | return new SearchRequestLog( |
619 | $this->connection->getClient(), |
620 | $description, |
621 | $queryType, |
622 | $extra |
623 | ); |
624 | } |
625 | |
626 | /** |
627 | * Converts a document into a call to super_detect_noop from the wikimedia-extra plugin. |
628 | * @param \Elastica\Document $doc |
629 | * @return \Elastica\Script\Script |
630 | * @internal made public for testing purposes |
631 | */ |
632 | public function docToSuperDetectNoopScript( \Elastica\Document $doc ) { |
633 | $handlers = CirrusIndexField::getHint( $doc, CirrusIndexField::NOOP_HINT ); |
634 | $params = array_diff_key( $doc->getParams(), [ CirrusIndexField::DOC_HINT_PARAM => 1 ] ); |
635 | |
636 | $params['source'] = $doc->getData(); |
637 | |
638 | if ( $handlers ) { |
639 | Assert::precondition( is_array( $handlers ), "Noop hints must be an array" ); |
640 | $params['handlers'] = $handlers; |
641 | } else { |
642 | $params['handlers'] = []; |
643 | } |
644 | $extraHandlers = $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop_handlers' ); |
645 | if ( is_array( $extraHandlers ) ) { |
646 | $params['handlers'] += $extraHandlers; |
647 | } |
648 | |
649 | if ( $params['handlers'] === [] ) { |
650 | // The noop script only supports Map but an empty array |
651 | // may be transformed to [] instead of {} when serialized to json |
652 | // causing class cast exception failures |
653 | $params['handlers'] = (object)[]; |
654 | } |
655 | $script = new \Elastica\Script\Script( 'super_detect_noop', $params, 'super_detect_noop' ); |
656 | if ( $doc->getDocAsUpsert() ) { |
657 | CirrusIndexField::resetHints( $doc ); |
658 | $script->setUpsert( $doc ); |
659 | } |
660 | |
661 | return $script; |
662 | } |
663 | |
664 | /** |
665 | * @return int Number of times to instruct Elasticsearch to retry updates that fail on |
666 | * version conflicts. |
667 | */ |
668 | private function retryOnConflict(): int { |
669 | return $this->searchConfig->get( |
670 | 'CirrusSearchUpdateConflictRetryCount' ); |
671 | } |
672 | |
673 | private function convertEncoding( $d ) { |
674 | if ( is_string( $d ) ) { |
675 | return mb_convert_encoding( $d, 'UTF-8', 'UTF-8' ); |
676 | } |
677 | |
678 | foreach ( $d as &$v ) { |
679 | $v = $this->convertEncoding( $v ); |
680 | } |
681 | |
682 | return $d; |
683 | } |
684 | |
685 | private function reportDocSize( Document $doc ): void { |
686 | $cluster = $this->connection->getClusterName(); |
687 | try { |
688 | // Use the same JSON output that Elastica uses, it might not be the options MW uses |
689 | // to populate event-gate (esp. regarding escaping UTF-8) but hopefully it's close |
690 | // to what we will be using. |
691 | $len = strlen( JSON::stringify( $doc->getData(), \JSON_UNESCAPED_UNICODE | \JSON_UNESCAPED_SLASHES ) ); |
692 | // Use a timing stat as we'd like to have percentiles calculated (possibly use T348796 once available) |
693 | // note that prior to switching to prometheus we used to have min and max, if that's proven to be still useful |
694 | // to track abnormally large docs we might consider another approach (log a warning?) |
695 | $this->stats->getTiming( "update_doc_size_kb" ) |
696 | ->setLabel( "search_cluster", $cluster ) |
697 | ->copyToStatsdAt( "CirrusSearch.$cluster.updates.all.doc_size" ) |
698 | ->observe( $len ); |
699 | |
700 | } catch ( \JsonException $e ) { |
701 | $this->log->warning( "Cannot estimate CirrusSearch doc size", [ "exception" => $e ] ); |
702 | } |
703 | } |
704 | |
705 | } |