Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
66.38% |
233 / 351 |
|
28.57% |
4 / 14 |
CRAP | |
0.00% |
0 / 1 |
DataSender | |
66.38% |
233 / 351 |
|
28.57% |
4 / 14 |
367.01 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
8 / 8 |
|
100.00% |
1 / 1 |
1 | |||
sendUpdateWeightedTags | |
78.95% |
60 / 76 |
|
0.00% |
0 / 1 |
25.11 | |||
sendResetWeightedTags | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
1 | |||
sendData | |
68.37% |
67 / 98 |
|
0.00% |
0 / 1 |
34.96 | |||
reportUpdateMetrics | |
0.00% |
0 / 20 |
|
0.00% |
0 / 1 |
56 | |||
sendDeletes | |
67.65% |
23 / 34 |
|
0.00% |
0 / 1 |
5.85 | |||
sendOtherIndexUpdates | |
75.56% |
34 / 45 |
|
0.00% |
0 / 1 |
7.72 | |||
decideRequiredSetAction | |
80.00% |
4 / 5 |
|
0.00% |
0 / 1 |
2.03 | |||
bulkResponseExceptionIsJustDocumentMissing | |
0.00% |
0 / 20 |
|
0.00% |
0 / 1 |
90 | |||
newLog | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
docToSuperDetectNoopScript | |
94.12% |
16 / 17 |
|
0.00% |
0 / 1 |
5.01 | |||
retryOnConflict | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
convertEncoding | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
12 | |||
reportDocSize | |
66.67% |
4 / 6 |
|
0.00% |
0 / 1 |
2.15 |
1 | <?php |
2 | |
3 | namespace CirrusSearch; |
4 | |
5 | use CirrusSearch\BuildDocument\BuildDocument; |
6 | use CirrusSearch\BuildDocument\BuildDocumentException; |
7 | use CirrusSearch\BuildDocument\DocumentSizeLimiter; |
8 | use CirrusSearch\Profile\SearchProfileService; |
9 | use CirrusSearch\Search\CirrusIndexField; |
10 | use Elastica\Bulk\Action\AbstractDocument; |
11 | use Elastica\Document; |
12 | use Elastica\Exception\Bulk\ResponseException; |
13 | use Elastica\Exception\RuntimeException; |
14 | use Elastica\JSON; |
15 | use Elastica\Response; |
16 | use IDBAccessObject; |
17 | use Liuggio\StatsdClient\Factory\StatsdDataFactory; |
18 | use MediaWiki\Logger\LoggerFactory; |
19 | use MediaWiki\MediaWikiServices; |
20 | use MediaWiki\Status\Status; |
21 | use MediaWiki\Title\Title; |
22 | use Wikimedia\Assert\Assert; |
23 | |
24 | /** |
25 | * Handles non-maintenance write operations to the elastic search cluster. |
26 | * |
27 | * This program is free software; you can redistribute it and/or modify |
28 | * it under the terms of the GNU General Public License as published by |
29 | * the Free Software Foundation; either version 2 of the License, or |
30 | * (at your option) any later version. |
31 | * |
32 | * This program is distributed in the hope that it will be useful, |
33 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
34 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
35 | * GNU General Public License for more details. |
36 | * |
37 | * You should have received a copy of the GNU General Public License along |
38 | * with this program; if not, write to the Free Software Foundation, Inc., |
39 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
40 | * http://www.gnu.org/copyleft/gpl.html |
41 | */ |
42 | class DataSender extends ElasticsearchIntermediary { |
43 | /** @var \Psr\Log\LoggerInterface */ |
44 | private $log; |
45 | |
46 | /** @var \Psr\Log\LoggerInterface */ |
47 | private $failedLog; |
48 | |
49 | /** |
50 | * @var string |
51 | */ |
52 | private $indexBaseName; |
53 | |
54 | /** |
55 | * @var SearchConfig |
56 | */ |
57 | private $searchConfig; |
58 | |
59 | private $stats; |
60 | /** |
61 | * @var DocumentSizeLimiter |
62 | */ |
63 | private $docSizeLimiter; |
64 | |
65 | /** |
66 | * @param Connection $conn |
67 | * @param SearchConfig $config |
68 | * @param StatsdDataFactory|null $stats |
69 | * @param DocumentSizeLimiter|null $docSizeLimiter |
70 | */ |
71 | public function __construct( |
72 | Connection $conn, |
73 | SearchConfig $config, |
74 | StatsdDataFactory $stats = null, |
75 | DocumentSizeLimiter $docSizeLimiter = null |
76 | ) { |
77 | parent::__construct( $conn, null, 0 ); |
78 | $this->stats = $stats ?? MediaWikiServices::getInstance()->getStatsdDataFactory(); |
79 | $this->log = LoggerFactory::getInstance( 'CirrusSearch' ); |
80 | $this->failedLog = LoggerFactory::getInstance( 'CirrusSearchChangeFailed' ); |
81 | $this->indexBaseName = $config->get( SearchConfig::INDEX_BASE_NAME ); |
82 | $this->searchConfig = $config; |
83 | $this->docSizeLimiter = $docSizeLimiter ?? new DocumentSizeLimiter( |
84 | $config->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) ); |
85 | } |
86 | |
87 | /** |
88 | * @param string $indexSuffix |
89 | * @param string[] $docIds |
90 | * @param string $tagField |
91 | * @param string $tagPrefix |
92 | * @param string|string[]|null $tagNames A tag name or list of tag names. Each tag will be |
93 | * set for each document ID. Omit for tags which are fully defined by their prefix. |
94 | * @param int[]|int[][]|null $tagWeights An optional map of docid => weight. When $tagName is |
95 | * null, the weight is an integer. When $tagName is not null, the weight is itself a |
96 | * tagname => int map. Weights are between 1-1000, and can be omitted (in which case no |
97 | * update will be sent for the corresponding docid/tag combination). |
98 | * @param int $batchSize |
99 | * @return Status |
100 | */ |
101 | public function sendUpdateWeightedTags( |
102 | string $indexSuffix, |
103 | array $docIds, |
104 | string $tagField, |
105 | string $tagPrefix, |
106 | $tagNames = null, |
107 | array $tagWeights = null, |
108 | int $batchSize = 30 |
109 | ): Status { |
110 | Assert::parameterType( [ 'string', 'array', 'null' ], $tagNames, '$tagNames' ); |
111 | if ( is_array( $tagNames ) ) { |
112 | Assert::parameterElementType( 'string', $tagNames, '$tagNames' ); |
113 | } |
114 | if ( $tagNames === null ) { |
115 | $tagNames = [ 'exists' ]; |
116 | if ( $tagWeights !== null ) { |
117 | $tagWeights = [ 'exists' => $tagWeights ]; |
118 | } |
119 | } elseif ( is_string( $tagNames ) ) { |
120 | $tagNames = [ $tagNames ]; |
121 | } |
122 | |
123 | Assert::precondition( strpos( $tagPrefix, '/' ) === false, |
124 | "invalid tag prefix $tagPrefix: must not contain /" ); |
125 | foreach ( $tagNames as $tagName ) { |
126 | Assert::precondition( strpos( $tagName, '|' ) === false, |
127 | "invalid tag name $tagName: must not contain |" ); |
128 | } |
129 | if ( $tagWeights ) { |
130 | foreach ( $tagWeights as $docId => $docWeights ) { |
131 | Assert::precondition( in_array( $docId, $docIds ), |
132 | "document ID $docId used in \$tagWeights but not found in \$docIds" ); |
133 | foreach ( $docWeights as $tagName => $weight ) { |
134 | Assert::precondition( in_array( $tagName, $tagNames, true ), |
135 | "tag name $tagName used in \$tagWeights but not found in \$tagNames" ); |
136 | Assert::precondition( is_int( $weight ), "weights must be integers but $weight is " |
137 | . gettype( $weight ) ); |
138 | Assert::precondition( $weight >= 1 && $weight <= 1000, |
139 | "weights must be between 1 and 1000 (found: $weight)" ); |
140 | } |
141 | } |
142 | } |
143 | |
144 | $client = $this->connection->getClient(); |
145 | $status = Status::newGood(); |
146 | $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix ); |
147 | foreach ( array_chunk( $docIds, $batchSize ) as $docIdsChunk ) { |
148 | $bulk = new \Elastica\Bulk( $client ); |
149 | $bulk->setIndex( $pageIndex ); |
150 | foreach ( $docIdsChunk as $docId ) { |
151 | $tags = []; |
152 | foreach ( $tagNames as $tagName ) { |
153 | $tagValue = "$tagPrefix/$tagName"; |
154 | if ( $tagWeights !== null ) { |
155 | if ( !isset( $tagWeights[$docId][$tagName] ) ) { |
156 | continue; |
157 | } |
158 | // To pass the assertions above, the weight must be either an int, a float |
159 | // with an integer value, or a string representation of one of those. |
160 | // Convert to int to ensure there is no trailing '.0'. |
161 | $tagValue .= '|' . (int)$tagWeights[$docId][$tagName]; |
162 | } |
163 | $tags[] = $tagValue; |
164 | } |
165 | if ( !$tags ) { |
166 | continue; |
167 | } |
168 | $script = new \Elastica\Script\Script( 'super_detect_noop', [ |
169 | 'source' => [ $tagField => $tags ], |
170 | 'handlers' => [ $tagField => CirrusIndexField::MULTILIST_HANDLER ], |
171 | ], 'super_detect_noop' ); |
172 | $script->setId( $docId ); |
173 | $bulk->addScript( $script, 'update' ); |
174 | } |
175 | |
176 | if ( !$bulk->getActions() ) { |
177 | continue; |
178 | } |
179 | |
180 | // Execute the bulk update |
181 | $exception = null; |
182 | try { |
183 | $this->start( new BulkUpdateRequestLog( $this->connection->getClient(), |
184 | 'updating {numBulk} documents', |
185 | 'send_data_reset_weighted_tags', |
186 | [ 'numBulk' => count( $docIdsChunk ), 'index' => $pageIndex->getName() ] |
187 | ) ); |
188 | $bulk->send(); |
189 | } catch ( ResponseException $e ) { |
190 | if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) { |
191 | $exception = $e; |
192 | } |
193 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
194 | $exception = $e; |
195 | } |
196 | if ( $exception === null ) { |
197 | $this->success(); |
198 | } else { |
199 | $this->failure( $exception ); |
200 | $this->failedLog->warning( "Update weighted tag {weightedTagFieldName} for {weightedTagPrefix} in articles: {documents}", |
201 | [ |
202 | 'exception' => $exception, |
203 | 'weightedTagFieldName' => $tagField, |
204 | 'weightedTagPrefix' => $tagPrefix, |
205 | 'weightedTagNames' => implode( '|', $tagNames ), |
206 | 'weightedTagWeight' => $tagWeights, |
207 | 'docIds' => implode( ',', $docIds ) |
208 | ] ); |
209 | $status->error( 'cirrussearch-failed-update-weighted-tags' ); |
210 | } |
211 | } |
212 | return $status; |
213 | } |
214 | |
215 | /** |
216 | * @param string $indexSuffix |
217 | * @param string[] $docIds |
218 | * @param string $tagField |
219 | * @param string $tagPrefix |
220 | * @param int $batchSize |
221 | * @return Status |
222 | */ |
223 | public function sendResetWeightedTags( |
224 | string $indexSuffix, |
225 | array $docIds, |
226 | string $tagField, |
227 | string $tagPrefix, |
228 | int $batchSize = 30 |
229 | ): Status { |
230 | return $this->sendUpdateWeightedTags( |
231 | $indexSuffix, |
232 | $docIds, |
233 | $tagField, |
234 | $tagPrefix, |
235 | CirrusIndexField::MULTILIST_DELETE_GROUPING, |
236 | null, |
237 | $batchSize |
238 | ); |
239 | } |
240 | |
241 | /** |
242 | * @param string $indexSuffix suffix of index to which to send $documents |
243 | * @param \Elastica\Document[] $documents documents to send |
244 | * @return Status |
245 | */ |
246 | public function sendData( $indexSuffix, array $documents ) { |
247 | if ( !$documents ) { |
248 | return Status::newGood(); |
249 | } |
250 | |
251 | // Copy the docs so that modifications made in this method are not propagated up to the caller |
252 | $docsCopy = []; |
253 | foreach ( $documents as $doc ) { |
254 | $docsCopy[] = clone $doc; |
255 | } |
256 | $documents = $docsCopy; |
257 | |
258 | // Perform final stage of document building. This only |
259 | // applies to `page` documents, docs built by something |
260 | // other than BuildDocument will pass through unchanged. |
261 | $services = MediaWikiServices::getInstance(); |
262 | $builder = new BuildDocument( |
263 | $this->connection, |
264 | $services->getDBLoadBalancer()->getConnection( DB_REPLICA ), |
265 | $services->getRevisionStore(), |
266 | $services->getBacklinkCacheFactory(), |
267 | $this->docSizeLimiter, |
268 | $services->getTitleFormatter(), |
269 | $services->getWikiPageFactory() |
270 | ); |
271 | try { |
272 | foreach ( $documents as $i => $doc ) { |
273 | if ( !$builder->finalize( $doc ) ) { |
274 | // Something has changed while this was hanging out in the job |
275 | // queue and should no longer be written to elastic. |
276 | unset( $documents[$i] ); |
277 | } |
278 | $this->reportDocSize( $doc ); |
279 | } |
280 | } catch ( BuildDocumentException $be ) { |
281 | $this->failedLog->warning( |
282 | 'Failed to update documents', |
283 | [ 'exception' => $be ] |
284 | ); |
285 | return Status::newFatal( 'cirrussearch-failed-build-document' ); |
286 | } |
287 | |
288 | if ( !$documents ) { |
289 | // All documents noop'd |
290 | return Status::newGood(); |
291 | } |
292 | |
293 | /** |
294 | * Transform the finalized documents into noop scripts if possible |
295 | * to reduce update load. |
296 | */ |
297 | if ( $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop' ) ) { |
298 | foreach ( $documents as $i => $doc ) { |
299 | // BC Check for jobs that used to contain Document|Script |
300 | if ( $doc instanceof \Elastica\Document ) { |
301 | $documents[$i] = $this->docToSuperDetectNoopScript( $doc ); |
302 | } |
303 | } |
304 | } |
305 | |
306 | foreach ( $documents as $doc ) { |
307 | $doc->setRetryOnConflict( $this->retryOnConflict() ); |
308 | // Hints need to be retained until after finalizing |
309 | // the documents and building the noop scripts. |
310 | CirrusIndexField::resetHints( $doc ); |
311 | } |
312 | |
313 | $exception = null; |
314 | $responseSet = null; |
315 | $justDocumentMissing = false; |
316 | try { |
317 | $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix ); |
318 | |
319 | $this->start( new BulkUpdateRequestLog( |
320 | $this->connection->getClient(), |
321 | 'sending {numBulk} documents to the {index} index(s)', |
322 | 'send_data_write', |
323 | [ 'numBulk' => count( $documents ), 'index' => $pageIndex->getName() ] |
324 | ) ); |
325 | $bulk = new \Elastica\Bulk( $this->connection->getClient() ); |
326 | $bulk->setShardTimeout( $this->searchConfig->get( 'CirrusSearchUpdateShardTimeout' ) ); |
327 | $bulk->setIndex( $pageIndex ); |
328 | if ( $this->searchConfig->getElement( 'CirrusSearchElasticQuirks', 'retry_on_conflict' ) ) { |
329 | $actions = []; |
330 | foreach ( $documents as $doc ) { |
331 | $action = AbstractDocument::create( $doc, 'update' ); |
332 | $metadata = $action->getMetadata(); |
333 | // Rename deprecated _retry_on_conflict |
334 | // TODO: fix upstream in Elastica. |
335 | if ( isset( $metadata['_retry_on_conflict'] ) ) { |
336 | $metadata['retry_on_conflict'] = $metadata['_retry_on_conflict']; |
337 | unset( $metadata['_retry_on_conflict'] ); |
338 | $action->setMetadata( $metadata ); |
339 | } |
340 | $actions[] = $action; |
341 | } |
342 | |
343 | $bulk->addActions( $actions ); |
344 | } else { |
345 | $bulk->addData( $documents, 'update' ); |
346 | } |
347 | $responseSet = $bulk->send(); |
348 | } catch ( ResponseException $e ) { |
349 | $justDocumentMissing = $this->bulkResponseExceptionIsJustDocumentMissing( $e, |
350 | function ( $docId ) use ( $e, $indexSuffix ) { |
351 | $this->log->info( |
352 | "Updating a page that doesn't yet exist in Elasticsearch: {docId}", |
353 | [ 'docId' => $docId, 'indexSuffix' => $indexSuffix ] |
354 | ); |
355 | } |
356 | ); |
357 | $exception = $e; |
358 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
359 | $exception = $e; |
360 | } |
361 | |
362 | if ( $justDocumentMissing ) { |
363 | // wa have a failure but this is just docs that are missing in the index |
364 | // missing docs are logged above |
365 | $this->success(); |
366 | return Status::newGood(); |
367 | } |
368 | // check if the response is valid by making sure that it has bulk responses |
369 | if ( $responseSet !== null && count( $responseSet->getBulkResponses() ) > 0 ) { |
370 | $this->success(); |
371 | $this->reportUpdateMetrics( $responseSet, $indexSuffix, count( $documents ) ); |
372 | return Status::newGood(); |
373 | } |
374 | // Everything else should be a failure. |
375 | if ( $exception === null ) { |
376 | // Elastica failed to identify the error, reason is that the Elastica Bulk\Response |
377 | // does identify errors only in individual responses if the request fails without |
378 | // getting a formal elastic response Bulk\Response->isOk might remain true |
379 | // So here we construct the ResponseException that should have been built and thrown |
380 | // by Elastica |
381 | $lastRequest = $this->connection->getClient()->getLastRequest(); |
382 | if ( $lastRequest !== null ) { |
383 | $exception = new \Elastica\Exception\ResponseException( $lastRequest, |
384 | new Response( $responseSet->getData() ) ); |
385 | } else { |
386 | $exception = new RuntimeException( "Unknown error in bulk request (Client::getLastRequest() is null)" ); |
387 | } |
388 | } |
389 | $this->failure( $exception ); |
390 | $documentIds = array_map( static function ( $d ) { |
391 | return (string)( $d->getId() ); |
392 | }, $documents ); |
393 | $this->failedLog->warning( |
394 | 'Failed to update documents {docId}', |
395 | [ |
396 | 'docId' => implode( ', ', $documentIds ), |
397 | 'exception' => $exception |
398 | ] |
399 | ); |
400 | return Status::newFatal( 'cirrussearch-failed-send-data' ); |
401 | } |
402 | |
403 | /** |
404 | * @param \Elastica\Bulk\ResponseSet $responseSet |
405 | * @param string $indexSuffix |
406 | * @param int $sent |
407 | */ |
408 | private function reportUpdateMetrics( |
409 | \Elastica\Bulk\ResponseSet $responseSet, $indexSuffix, $sent |
410 | ) { |
411 | $updateStats = [ |
412 | 'sent' => $sent, |
413 | ]; |
414 | $allowedOps = [ 'created', 'updated', 'noop' ]; |
415 | foreach ( $responseSet->getBulkResponses() as $bulk ) { |
416 | $opRes = 'unknown'; |
417 | if ( $bulk instanceof \Elastica\Bulk\Response ) { |
418 | if ( isset( $bulk->getData()['result'] ) |
419 | && in_array( $bulk->getData()['result'], $allowedOps ) |
420 | ) { |
421 | $opRes = $bulk->getData()['result']; |
422 | } |
423 | } |
424 | if ( isset( $updateStats[$opRes] ) ) { |
425 | $updateStats[$opRes]++; |
426 | } else { |
427 | $updateStats[$opRes] = 1; |
428 | } |
429 | } |
430 | $cluster = $this->connection->getClusterName(); |
431 | $metricsPrefix = "CirrusSearch.$cluster.updates"; |
432 | foreach ( $updateStats as $what => $num ) { |
433 | $this->stats->updateCount( |
434 | "$metricsPrefix.details.{$this->indexBaseName}.$indexSuffix.$what", $num |
435 | ); |
436 | $this->stats->updateCount( "$metricsPrefix.all.$what", $num ); |
437 | } |
438 | } |
439 | |
440 | /** |
441 | * Send delete requests to Elasticsearch. |
442 | * |
443 | * @param string[] $docIds elasticsearch document ids to delete |
444 | * @param string|null $indexSuffix index from which to delete. null means all. |
445 | * @return Status |
446 | */ |
447 | public function sendDeletes( $docIds, $indexSuffix = null ) { |
448 | if ( $indexSuffix === null ) { |
449 | $indexes = $this->connection->getAllIndexSuffixes( Connection::PAGE_DOC_TYPE ); |
450 | } else { |
451 | $indexes = [ $indexSuffix ]; |
452 | } |
453 | |
454 | $idCount = count( $docIds ); |
455 | if ( $idCount !== 0 ) { |
456 | try { |
457 | foreach ( $indexes as $indexSuffix ) { |
458 | $this->startNewLog( |
459 | 'deleting {numIds} from {indexSuffix}', |
460 | 'send_deletes', [ |
461 | 'numIds' => $idCount, |
462 | 'indexSuffix' => $indexSuffix, |
463 | ] |
464 | ); |
465 | $this->connection |
466 | ->getIndex( $this->indexBaseName, $indexSuffix ) |
467 | ->deleteDocuments( |
468 | array_map( |
469 | static function ( $id ) { |
470 | return new Document( $id ); |
471 | }, $docIds |
472 | ) |
473 | ); |
474 | $this->success(); |
475 | } |
476 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
477 | $this->failure( $e ); |
478 | $this->failedLog->warning( |
479 | 'Failed to delete documents: {docId}', |
480 | [ |
481 | 'docId' => implode( ', ', $docIds ), |
482 | 'exception' => $e, |
483 | ] |
484 | ); |
485 | return Status::newFatal( 'cirrussearch-failed-send-deletes' ); |
486 | } |
487 | } |
488 | |
489 | return Status::newGood(); |
490 | } |
491 | |
492 | /** |
493 | * @param string $localSite The wikiId to add/remove from local_sites_with_dupe |
494 | * @param string $indexName The name of the index to perform updates to |
495 | * @param array[] $otherActions A list of arrays each containing the id within elasticsearch |
496 | * ('docId') and the article namespace ('ns') and DB key ('dbKey') at the within $localSite |
497 | * @param int $batchSize number of docs to update in a single bulk |
498 | * @return Status |
499 | */ |
500 | public function sendOtherIndexUpdates( $localSite, $indexName, array $otherActions, $batchSize = 30 ) { |
501 | $client = $this->connection->getClient(); |
502 | $status = Status::newGood(); |
503 | foreach ( array_chunk( $otherActions, $batchSize ) as $updates ) { |
504 | '@phan-var array[] $updates'; |
505 | $bulk = new \Elastica\Bulk( $client ); |
506 | $titles = []; |
507 | foreach ( $updates as $update ) { |
508 | $title = Title::makeTitle( $update['ns'], $update['dbKey'] ); |
509 | $action = $this->decideRequiredSetAction( $title ); |
510 | $script = new \Elastica\Script\Script( |
511 | 'super_detect_noop', |
512 | [ |
513 | 'source' => [ |
514 | 'local_sites_with_dupe' => [ $action => $localSite ], |
515 | ], |
516 | 'handlers' => [ 'local_sites_with_dupe' => 'set' ], |
517 | ], |
518 | 'super_detect_noop' |
519 | ); |
520 | $script->setId( $update['docId'] ); |
521 | $script->setParam( '_type', '_doc' ); |
522 | $script->setParam( '_index', $indexName ); |
523 | $bulk->addScript( $script, 'update' ); |
524 | $titles[] = $title; |
525 | } |
526 | |
527 | // Execute the bulk update |
528 | $exception = null; |
529 | try { |
530 | $this->start( new BulkUpdateRequestLog( |
531 | $this->connection->getClient(), |
532 | 'updating {numBulk} documents in other indexes', |
533 | 'send_data_other_idx_write', |
534 | [ 'numBulk' => count( $updates ), 'index' => $indexName ] |
535 | ) ); |
536 | $bulk->send(); |
537 | } catch ( ResponseException $e ) { |
538 | if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) { |
539 | $exception = $e; |
540 | } |
541 | } catch ( \Elastica\Exception\ExceptionInterface $e ) { |
542 | $exception = $e; |
543 | } |
544 | if ( $exception === null ) { |
545 | $this->success(); |
546 | } else { |
547 | $this->failure( $exception ); |
548 | $this->failedLog->warning( |
549 | "OtherIndex update for articles: {titleStr}", |
550 | [ 'exception' => $exception, 'titleStr' => implode( ',', $titles ) ] |
551 | ); |
552 | $status->error( 'cirrussearch-failed-update-otherindex' ); |
553 | } |
554 | } |
555 | |
556 | return $status; |
557 | } |
558 | |
559 | /** |
560 | * Decide what action is required to the other index to make it up |
561 | * to data with the current wiki state. This will always check against |
562 | * the master database. |
563 | * |
564 | * @param Title $title The title to decide the action for |
565 | * @return string The set action to be performed. Either 'add' or 'remove' |
566 | */ |
567 | protected function decideRequiredSetAction( Title $title ) { |
568 | $page = MediaWikiServices::getInstance()->getWikiPageFactory()->newFromTitle( $title ); |
569 | $page->loadPageData( IDBAccessObject::READ_LATEST ); |
570 | if ( $page->exists() ) { |
571 | return 'add'; |
572 | } else { |
573 | return 'remove'; |
574 | } |
575 | } |
576 | |
577 | /** |
578 | * Check if $exception is a bulk response exception that just contains |
579 | * document is missing failures. |
580 | * |
581 | * @param ResponseException $exception exception to check |
582 | * @param callable|null $logCallback Callback in which to do some logging. |
583 | * Callback will be passed the id of the missing document. |
584 | * @return bool |
585 | */ |
586 | protected function bulkResponseExceptionIsJustDocumentMissing( |
587 | ResponseException $exception, $logCallback = null |
588 | ) { |
589 | $justDocumentMissing = true; |
590 | foreach ( $exception->getResponseSet()->getBulkResponses() as $bulkResponse ) { |
591 | if ( !$bulkResponse->hasError() ) { |
592 | continue; |
593 | } |
594 | |
595 | $error = $bulkResponse->getFullError(); |
596 | if ( is_string( $error ) ) { |
597 | // es 1.7 cluster |
598 | $message = $bulkResponse->getError(); |
599 | if ( strpos( $message, 'DocumentMissingException' ) === false ) { |
600 | $justDocumentMissing = false; |
601 | continue; |
602 | } |
603 | } else { |
604 | // es 2.x cluster |
605 | if ( $error !== null && $error['type'] !== 'document_missing_exception' ) { |
606 | $justDocumentMissing = false; |
607 | continue; |
608 | } |
609 | } |
610 | |
611 | if ( $logCallback ) { |
612 | // This is generally not an error but we should |
613 | // log it to see how many we get |
614 | $action = $bulkResponse->getAction(); |
615 | $docId = 'missing'; |
616 | if ( $action instanceof \Elastica\Bulk\Action\AbstractDocument ) { |
617 | $docId = $action->getData()->getId(); |
618 | } |
619 | call_user_func( $logCallback, $docId ); |
620 | } |
621 | } |
622 | return $justDocumentMissing; |
623 | } |
624 | |
625 | /** |
626 | * @param string $description |
627 | * @param string $queryType |
628 | * @param string[] $extra |
629 | * @return SearchRequestLog |
630 | */ |
631 | protected function newLog( $description, $queryType, array $extra = [] ) { |
632 | return new SearchRequestLog( |
633 | $this->connection->getClient(), |
634 | $description, |
635 | $queryType, |
636 | $extra |
637 | ); |
638 | } |
639 | |
640 | /** |
641 | * Converts a document into a call to super_detect_noop from the wikimedia-extra plugin. |
642 | * @internal made public for testing purposes |
643 | * @param \Elastica\Document $doc |
644 | * @return \Elastica\Script\Script |
645 | */ |
646 | public function docToSuperDetectNoopScript( \Elastica\Document $doc ) { |
647 | $handlers = CirrusIndexField::getHint( $doc, CirrusIndexField::NOOP_HINT ); |
648 | $params = array_diff_key( $doc->getParams(), [ CirrusIndexField::DOC_HINT_PARAM => 1 ] ); |
649 | |
650 | $params['source'] = $doc->getData(); |
651 | |
652 | if ( $handlers ) { |
653 | Assert::precondition( is_array( $handlers ), "Noop hints must be an array" ); |
654 | $params['handlers'] = $handlers; |
655 | } else { |
656 | $params['handlers'] = []; |
657 | } |
658 | $extraHandlers = $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop_handlers' ); |
659 | if ( is_array( $extraHandlers ) ) { |
660 | $params['handlers'] += $extraHandlers; |
661 | } |
662 | |
663 | if ( $params['handlers'] === [] ) { |
664 | // The noop script only supports Map but an empty array |
665 | // may be transformed to [] instead of {} when serialized to json |
666 | // causing class cast exception failures |
667 | $params['handlers'] = (object)[]; |
668 | } |
669 | $script = new \Elastica\Script\Script( 'super_detect_noop', $params, 'super_detect_noop' ); |
670 | if ( $doc->getDocAsUpsert() ) { |
671 | CirrusIndexField::resetHints( $doc ); |
672 | $script->setUpsert( $doc ); |
673 | } |
674 | |
675 | return $script; |
676 | } |
677 | |
678 | /** |
679 | * @return int Number of times to instruct Elasticsearch to retry updates that fail on |
680 | * version conflicts. |
681 | */ |
682 | private function retryOnConflict(): int { |
683 | return $this->searchConfig->get( |
684 | 'CirrusSearchUpdateConflictRetryCount' ); |
685 | } |
686 | |
687 | private function convertEncoding( $d ) { |
688 | if ( is_string( $d ) ) { |
689 | return mb_convert_encoding( $d, 'UTF-8', 'UTF-8' ); |
690 | } |
691 | |
692 | foreach ( $d as &$v ) { |
693 | $v = $this->convertEncoding( $v ); |
694 | } |
695 | |
696 | return $d; |
697 | } |
698 | |
699 | private function reportDocSize( Document $doc ): void { |
700 | $cluster = $this->connection->getClusterName(); |
701 | $metricsPrefix = "CirrusSearch.$cluster.updates.all.doc_size"; |
702 | try { |
703 | // Use the same JSON output that Elastica uses, it might be the options MW uses |
704 | // to populate event-gate (esp. regarding escaping UTF-8) but hopefully it's close |
705 | // to what we will be using. |
706 | $len = strlen( JSON::stringify( $doc->getData(), \JSON_UNESCAPED_UNICODE | \JSON_UNESCAPED_SLASHES ) ); |
707 | // Use a timing stat as we'd like to have some min, max and percentiles calculated |
708 | $this->stats->timing( $metricsPrefix, $len ); |
709 | } catch ( \JsonException $e ) { |
710 | $this->log->warning( "Cannot estimate CirrusSearch doc size", [ "exception" => $e ] ); |
711 | } |
712 | } |
713 | |
714 | } |