Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
65.64% covered (warning)
65.64%
235 / 358
28.57% covered (danger)
28.57%
4 / 14
CRAP
0.00% covered (danger)
0.00%
0 / 1
DataSender
65.64% covered (warning)
65.64%
235 / 358
28.57% covered (danger)
28.57%
4 / 14
385.96
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
1
 sendUpdateWeightedTags
78.95% covered (warning)
78.95%
60 / 76
0.00% covered (danger)
0.00%
0 / 1
25.11
 sendResetWeightedTags
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
1
 sendData
68.37% covered (warning)
68.37%
67 / 98
0.00% covered (danger)
0.00%
0 / 1
34.96
 reportUpdateMetrics
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
56
 sendDeletes
67.65% covered (warning)
67.65%
23 / 34
0.00% covered (danger)
0.00%
0 / 1
5.85
 sendOtherIndexUpdates
75.56% covered (warning)
75.56%
34 / 45
0.00% covered (danger)
0.00%
0 / 1
7.72
 decideRequiredSetAction
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
2.03
 bulkResponseExceptionIsJustDocumentMissing
0.00% covered (danger)
0.00%
0 / 20
0.00% covered (danger)
0.00%
0 / 1
90
 newLog
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 docToSuperDetectNoopScript
94.12% covered (success)
94.12%
16 / 17
0.00% covered (danger)
0.00%
0 / 1
5.01
 retryOnConflict
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 convertEncoding
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 reportDocSize
75.00% covered (warning)
75.00%
6 / 8
0.00% covered (danger)
0.00%
0 / 1
2.06
1<?php
2
3namespace CirrusSearch;
4
5use CirrusSearch\BuildDocument\BuildDocument;
6use CirrusSearch\BuildDocument\BuildDocumentException;
7use CirrusSearch\BuildDocument\DocumentSizeLimiter;
8use CirrusSearch\Profile\SearchProfileService;
9use CirrusSearch\Search\CirrusIndexField;
10use Elastica\Bulk\Action\AbstractDocument;
11use Elastica\Document;
12use Elastica\Exception\Bulk\ResponseException;
13use Elastica\Exception\RuntimeException;
14use Elastica\JSON;
15use Elastica\Response;
16use IDBAccessObject;
17use MediaWiki\Logger\LoggerFactory;
18use MediaWiki\MediaWikiServices;
19use MediaWiki\Status\Status;
20use MediaWiki\Title\Title;
21use Wikimedia\Assert\Assert;
22use Wikimedia\Stats\StatsFactory;
23
24/**
25 * Handles non-maintenance write operations to the elastic search cluster.
26 *
27 * This program is free software; you can redistribute it and/or modify
28 * it under the terms of the GNU General Public License as published by
29 * the Free Software Foundation; either version 2 of the License, or
30 * (at your option) any later version.
31 *
32 * This program is distributed in the hope that it will be useful,
33 * but WITHOUT ANY WARRANTY; without even the implied warranty of
34 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
35 * GNU General Public License for more details.
36 *
37 * You should have received a copy of the GNU General Public License along
38 * with this program; if not, write to the Free Software Foundation, Inc.,
39 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
40 * http://www.gnu.org/copyleft/gpl.html
41 */
42class DataSender extends ElasticsearchIntermediary {
43    /** @var \Psr\Log\LoggerInterface */
44    private $log;
45
46    /** @var \Psr\Log\LoggerInterface */
47    private $failedLog;
48
49    /**
50     * @var string
51     */
52    private $indexBaseName;
53
54    /**
55     * @var SearchConfig
56     */
57    private $searchConfig;
58
59    private StatsFactory $stats;
60    /**
61     * @var DocumentSizeLimiter
62     */
63    private $docSizeLimiter;
64
65    /**
66     * @param Connection $conn
67     * @param SearchConfig $config
68     * @param StatsFactory|null $stats A StatsFactory (already prefixed with the right component)
69     * @param DocumentSizeLimiter|null $docSizeLimiter
70     */
71    public function __construct(
72        Connection $conn,
73        SearchConfig $config,
74        StatsFactory $stats = null,
75        DocumentSizeLimiter $docSizeLimiter = null
76    ) {
77        parent::__construct( $conn, null, 0 );
78        $this->stats = $stats ?? Util::getStatsFactory();
79        $this->log = LoggerFactory::getInstance( 'CirrusSearch' );
80        $this->failedLog = LoggerFactory::getInstance( 'CirrusSearchChangeFailed' );
81        $this->indexBaseName = $config->get( SearchConfig::INDEX_BASE_NAME );
82        $this->searchConfig = $config;
83        $this->docSizeLimiter = $docSizeLimiter ?? new DocumentSizeLimiter(
84            $config->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) );
85    }
86
87    /**
88     * @param string $indexSuffix
89     * @param string[] $docIds
90     * @param string $tagField
91     * @param string $tagPrefix
92     * @param string|string[]|null $tagNames A tag name or list of tag names. Each tag will be
93     *   set for each document ID. Omit for tags which are fully defined by their prefix.
94     * @param int[]|int[][]|null $tagWeights An optional map of docid => weight. When $tagName is
95     *   null, the weight is an integer. When $tagName is not null, the weight is itself a
96     *   tagname => int map. Weights are between 1-1000, and can be omitted (in which case no
97     *   update will be sent for the corresponding docid/tag combination).
98     * @param int $batchSize
99     * @return Status
100     */
101    public function sendUpdateWeightedTags(
102        string $indexSuffix,
103        array $docIds,
104        string $tagField,
105        string $tagPrefix,
106        $tagNames = null,
107        array $tagWeights = null,
108        int $batchSize = 30
109    ): Status {
110        Assert::parameterType( [ 'string', 'array', 'null' ], $tagNames, '$tagNames' );
111        if ( is_array( $tagNames ) ) {
112            Assert::parameterElementType( 'string', $tagNames, '$tagNames' );
113        }
114        if ( $tagNames === null ) {
115            $tagNames = [ 'exists' ];
116            if ( $tagWeights !== null ) {
117                $tagWeights = [ 'exists' => $tagWeights ];
118            }
119        } elseif ( is_string( $tagNames ) ) {
120            $tagNames = [ $tagNames ];
121        }
122
123        Assert::precondition( strpos( $tagPrefix, '/' ) === false,
124            "invalid tag prefix $tagPrefix: must not contain /" );
125        foreach ( $tagNames as $tagName ) {
126            Assert::precondition( strpos( $tagName, '|' ) === false,
127                "invalid tag name $tagName: must not contain |" );
128        }
129        if ( $tagWeights ) {
130            foreach ( $tagWeights as $docId => $docWeights ) {
131                Assert::precondition( in_array( $docId, $docIds ),
132                    "document ID $docId used in \$tagWeights but not found in \$docIds" );
133                foreach ( $docWeights as $tagName => $weight ) {
134                    Assert::precondition( in_array( $tagName, $tagNames, true ),
135                        "tag name $tagName used in \$tagWeights but not found in \$tagNames" );
136                    Assert::precondition( is_int( $weight ), "weights must be integers but $weight is "
137                        . gettype( $weight ) );
138                    Assert::precondition( $weight >= 1 && $weight <= 1000,
139                        "weights must be between 1 and 1000 (found: $weight)" );
140                }
141            }
142        }
143
144        $client = $this->connection->getClient();
145        $status = Status::newGood();
146        $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix );
147        foreach ( array_chunk( $docIds, $batchSize ) as $docIdsChunk ) {
148            $bulk = new \Elastica\Bulk( $client );
149            $bulk->setIndex( $pageIndex );
150            foreach ( $docIdsChunk as $docId ) {
151                $tags = [];
152                foreach ( $tagNames as $tagName ) {
153                    $tagValue = "$tagPrefix/$tagName";
154                    if ( $tagWeights !== null ) {
155                        if ( !isset( $tagWeights[$docId][$tagName] ) ) {
156                            continue;
157                        }
158                        // To pass the assertions above, the weight must be either an int, a float
159                        // with an integer value, or a string representation of one of those.
160                        // Convert to int to ensure there is no trailing '.0'.
161                        $tagValue .= '|' . (int)$tagWeights[$docId][$tagName];
162                    }
163                    $tags[] = $tagValue;
164                }
165                if ( !$tags ) {
166                    continue;
167                }
168                $script = new \Elastica\Script\Script( 'super_detect_noop', [
169                    'source' => [ $tagField => $tags ],
170                    'handlers' => [ $tagField => CirrusIndexField::MULTILIST_HANDLER ],
171                ], 'super_detect_noop' );
172                $script->setId( $docId );
173                $bulk->addScript( $script, 'update' );
174            }
175
176            if ( !$bulk->getActions() ) {
177                continue;
178            }
179
180            // Execute the bulk update
181            $exception = null;
182            try {
183                $this->start( new BulkUpdateRequestLog( $this->connection->getClient(),
184                    'updating {numBulk} documents',
185                    'send_data_reset_weighted_tags',
186                    [ 'numBulk' => count( $docIdsChunk ), 'index' => $pageIndex->getName() ]
187                ) );
188                $bulk->send();
189            } catch ( ResponseException $e ) {
190                if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) {
191                    $exception = $e;
192                }
193            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
194                $exception = $e;
195            }
196            if ( $exception === null ) {
197                $this->success();
198            } else {
199                $this->failure( $exception );
200                $this->failedLog->warning( "Update weighted tag {weightedTagFieldName} for {weightedTagPrefix} in articles: {documents}",
201                    [
202                        'exception' => $exception,
203                        'weightedTagFieldName' => $tagField,
204                        'weightedTagPrefix' => $tagPrefix,
205                        'weightedTagNames' => implode( '|', $tagNames ),
206                        'weightedTagWeight' => $tagWeights,
207                        'docIds' => implode( ',', $docIds )
208                    ] );
209                $status->error( 'cirrussearch-failed-update-weighted-tags' );
210            }
211        }
212        return $status;
213    }
214
215    /**
216     * @param string $indexSuffix
217     * @param string[] $docIds
218     * @param string $tagField
219     * @param string $tagPrefix
220     * @param int $batchSize
221     * @return Status
222     */
223    public function sendResetWeightedTags(
224        string $indexSuffix,
225        array $docIds,
226        string $tagField,
227        string $tagPrefix,
228        int $batchSize = 30
229    ): Status {
230        return $this->sendUpdateWeightedTags(
231            $indexSuffix,
232            $docIds,
233            $tagField,
234            $tagPrefix,
235            CirrusIndexField::MULTILIST_DELETE_GROUPING,
236            null,
237            $batchSize
238        );
239    }
240
241    /**
242     * @param string $indexSuffix suffix of index to which to send $documents
243     * @param \Elastica\Document[] $documents documents to send
244     * @return Status
245     */
246    public function sendData( $indexSuffix, array $documents ) {
247        if ( !$documents ) {
248            return Status::newGood();
249        }
250
251        // Copy the docs so that modifications made in this method are not propagated up to the caller
252        $docsCopy = [];
253        foreach ( $documents as $doc ) {
254            $docsCopy[] = clone $doc;
255        }
256        $documents = $docsCopy;
257
258        // Perform final stage of document building. This only
259        // applies to `page` documents, docs built by something
260        // other than BuildDocument will pass through unchanged.
261        $services = MediaWikiServices::getInstance();
262        $builder = new BuildDocument(
263            $this->connection,
264            $services->getConnectionProvider()->getReplicaDatabase(),
265            $services->getRevisionStore(),
266            $services->getBacklinkCacheFactory(),
267            $this->docSizeLimiter,
268            $services->getTitleFormatter(),
269            $services->getWikiPageFactory()
270        );
271        try {
272            foreach ( $documents as $i => $doc ) {
273                if ( !$builder->finalize( $doc ) ) {
274                    // Something has changed while this was hanging out in the job
275                    // queue and should no longer be written to elastic.
276                    unset( $documents[$i] );
277                }
278                $this->reportDocSize( $doc );
279            }
280        } catch ( BuildDocumentException $be ) {
281            $this->failedLog->warning(
282                'Failed to update documents',
283                [ 'exception' => $be ]
284            );
285            return Status::newFatal( 'cirrussearch-failed-build-document' );
286        }
287
288        if ( !$documents ) {
289            // All documents noop'd
290            return Status::newGood();
291        }
292
293        /**
294         * Transform the finalized documents into noop scripts if possible
295         * to reduce update load.
296         */
297        if ( $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop' ) ) {
298            foreach ( $documents as $i => $doc ) {
299                // BC Check for jobs that used to contain Document|Script
300                if ( $doc instanceof \Elastica\Document ) {
301                    $documents[$i] = $this->docToSuperDetectNoopScript( $doc );
302                }
303            }
304        }
305
306        foreach ( $documents as $doc ) {
307            $doc->setRetryOnConflict( $this->retryOnConflict() );
308            // Hints need to be retained until after finalizing
309            // the documents and building the noop scripts.
310            CirrusIndexField::resetHints( $doc );
311        }
312
313        $exception = null;
314        $responseSet = null;
315        $justDocumentMissing = false;
316        try {
317            $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix );
318
319            $this->start( new BulkUpdateRequestLog(
320                $this->connection->getClient(),
321                'sending {numBulk} documents to the {index} index(s)',
322                'send_data_write',
323                [ 'numBulk' => count( $documents ), 'index' => $pageIndex->getName() ]
324            ) );
325            $bulk = new \Elastica\Bulk( $this->connection->getClient() );
326            $bulk->setShardTimeout( $this->searchConfig->get( 'CirrusSearchUpdateShardTimeout' ) );
327            $bulk->setIndex( $pageIndex );
328            if ( $this->searchConfig->getElement( 'CirrusSearchElasticQuirks', 'retry_on_conflict' ) ) {
329                $actions = [];
330                foreach ( $documents as $doc ) {
331                    $action = AbstractDocument::create( $doc, 'update' );
332                    $metadata = $action->getMetadata();
333                    // Rename deprecated _retry_on_conflict
334                    // TODO: fix upstream in Elastica.
335                    if ( isset( $metadata['_retry_on_conflict'] ) ) {
336                        $metadata['retry_on_conflict'] = $metadata['_retry_on_conflict'];
337                        unset( $metadata['_retry_on_conflict'] );
338                        $action->setMetadata( $metadata );
339                    }
340                    $actions[] = $action;
341                }
342
343                $bulk->addActions( $actions );
344            } else {
345                $bulk->addData( $documents, 'update' );
346            }
347            $responseSet = $bulk->send();
348        } catch ( ResponseException $e ) {
349            $justDocumentMissing = $this->bulkResponseExceptionIsJustDocumentMissing( $e,
350                function ( $docId ) use ( $e, $indexSuffix ) {
351                    $this->log->info(
352                        "Updating a page that doesn't yet exist in Elasticsearch: {docId}",
353                        [ 'docId' => $docId, 'indexSuffix' => $indexSuffix ]
354                    );
355                }
356            );
357            $exception = $e;
358        } catch ( \Elastica\Exception\ExceptionInterface $e ) {
359            $exception = $e;
360        }
361
362        if ( $justDocumentMissing ) {
363            // wa have a failure but this is just docs that are missing in the index
364            // missing docs are logged above
365            $this->success();
366            return Status::newGood();
367        }
368        // check if the response is valid by making sure that it has bulk responses
369        if ( $responseSet !== null && count( $responseSet->getBulkResponses() ) > 0 ) {
370            $this->success();
371            $this->reportUpdateMetrics( $responseSet, $indexSuffix, count( $documents ) );
372            return Status::newGood();
373        }
374        // Everything else should be a failure.
375        if ( $exception === null ) {
376            // Elastica failed to identify the error, reason is that the Elastica Bulk\Response
377            // does identify errors only in individual responses if the request fails without
378            // getting a formal elastic response Bulk\Response->isOk might remain true
379            // So here we construct the ResponseException that should have been built and thrown
380            // by Elastica
381            $lastRequest = $this->connection->getClient()->getLastRequest();
382            if ( $lastRequest !== null ) {
383                $exception = new \Elastica\Exception\ResponseException( $lastRequest,
384                    new Response( $responseSet->getData() ) );
385            } else {
386                $exception = new RuntimeException( "Unknown error in bulk request (Client::getLastRequest() is null)" );
387            }
388        }
389        $this->failure( $exception );
390        $documentIds = array_map( static function ( $d ) {
391            return (string)( $d->getId() );
392        }, $documents );
393        $this->failedLog->warning(
394            'Failed to update documents {docId}',
395            [
396                'docId' => implode( ', ', $documentIds ),
397                'exception' => $exception
398            ]
399        );
400        return Status::newFatal( 'cirrussearch-failed-send-data' );
401    }
402
403    /**
404     * @param \Elastica\Bulk\ResponseSet $responseSet
405     * @param string $indexSuffix
406     * @param int $sent
407     */
408    private function reportUpdateMetrics(
409        \Elastica\Bulk\ResponseSet $responseSet, $indexSuffix, $sent
410    ) {
411        $updateStats = [
412            'sent' => $sent,
413        ];
414        $allowedOps = [ 'created', 'updated', 'noop' ];
415        foreach ( $responseSet->getBulkResponses() as $bulk ) {
416            $opRes = 'unknown';
417            if ( $bulk instanceof \Elastica\Bulk\Response ) {
418                if ( isset( $bulk->getData()['result'] )
419                    && in_array( $bulk->getData()['result'], $allowedOps )
420                ) {
421                    $opRes = $bulk->getData()['result'];
422                }
423            }
424            if ( isset( $updateStats[$opRes] ) ) {
425                $updateStats[$opRes]++;
426            } else {
427                $updateStats[$opRes] = 1;
428            }
429        }
430        $cluster = $this->connection->getClusterName();
431        $metricsPrefix = "CirrusSearch.$cluster.updates";
432        foreach ( $updateStats as $what => $num ) {
433            $this->stats->getCounter( "update_total" )
434                ->setLabel( "status", $what )
435                ->setLabel( "search_cluster", $cluster )
436                ->setLabel( "index_name", $indexSuffix )
437                ->copyToStatsdAt( [
438                    "$metricsPrefix.details.{$this->indexBaseName}.$indexSuffix.$what",
439                    "$metricsPrefix.all.$what"
440                ] )
441                ->incrementBy( $num );
442        }
443    }
444
445    /**
446     * Send delete requests to Elasticsearch.
447     *
448     * @param string[] $docIds elasticsearch document ids to delete
449     * @param string|null $indexSuffix index from which to delete.  null means all.
450     * @return Status
451     */
452    public function sendDeletes( $docIds, $indexSuffix = null ) {
453        if ( $indexSuffix === null ) {
454            $indexes = $this->connection->getAllIndexSuffixes( Connection::PAGE_DOC_TYPE );
455        } else {
456            $indexes = [ $indexSuffix ];
457        }
458
459        $idCount = count( $docIds );
460        if ( $idCount !== 0 ) {
461            try {
462                foreach ( $indexes as $indexSuffix ) {
463                    $this->startNewLog(
464                        'deleting {numIds} from {indexSuffix}',
465                        'send_deletes', [
466                            'numIds' => $idCount,
467                            'indexSuffix' => $indexSuffix,
468                        ]
469                    );
470                    $this->connection
471                        ->getIndex( $this->indexBaseName, $indexSuffix )
472                        ->deleteDocuments(
473                            array_map(
474                                static function ( $id ) {
475                                    return new Document( $id );
476                                }, $docIds
477                            )
478                        );
479                    $this->success();
480                }
481            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
482                $this->failure( $e );
483                $this->failedLog->warning(
484                    'Failed to delete documents: {docId}',
485                    [
486                        'docId' => implode( ', ', $docIds ),
487                        'exception' => $e,
488                    ]
489                );
490                return Status::newFatal( 'cirrussearch-failed-send-deletes' );
491            }
492        }
493
494        return Status::newGood();
495    }
496
497    /**
498     * @param string $localSite The wikiId to add/remove from local_sites_with_dupe
499     * @param string $indexName The name of the index to perform updates to
500     * @param array[] $otherActions A list of arrays each containing the id within elasticsearch
501     *   ('docId') and the article namespace ('ns') and DB key ('dbKey') at the within $localSite
502     * @param int $batchSize number of docs to update in a single bulk
503     * @return Status
504     */
505    public function sendOtherIndexUpdates( $localSite, $indexName, array $otherActions, $batchSize = 30 ) {
506        $client = $this->connection->getClient();
507        $status = Status::newGood();
508        foreach ( array_chunk( $otherActions, $batchSize ) as $updates ) {
509            '@phan-var array[] $updates';
510            $bulk = new \Elastica\Bulk( $client );
511            $titles = [];
512            foreach ( $updates as $update ) {
513                $title = Title::makeTitle( $update['ns'], $update['dbKey'] );
514                $action = $this->decideRequiredSetAction( $title );
515                $script = new \Elastica\Script\Script(
516                    'super_detect_noop',
517                    [
518                        'source' => [
519                            'local_sites_with_dupe' => [ $action => $localSite ],
520                        ],
521                        'handlers' => [ 'local_sites_with_dupe' => 'set' ],
522                    ],
523                    'super_detect_noop'
524                );
525                $script->setId( $update['docId'] );
526                $script->setParam( '_type', '_doc' );
527                $script->setParam( '_index', $indexName );
528                $bulk->addScript( $script, 'update' );
529                $titles[] = $title;
530            }
531
532            // Execute the bulk update
533            $exception = null;
534            try {
535                $this->start( new BulkUpdateRequestLog(
536                    $this->connection->getClient(),
537                    'updating {numBulk} documents in other indexes',
538                    'send_data_other_idx_write',
539                        [ 'numBulk' => count( $updates ), 'index' => $indexName ]
540                    ) );
541                $bulk->send();
542            } catch ( ResponseException $e ) {
543                if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) {
544                    $exception = $e;
545                }
546            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
547                $exception = $e;
548            }
549            if ( $exception === null ) {
550                $this->success();
551            } else {
552                $this->failure( $exception );
553                $this->failedLog->warning(
554                    "OtherIndex update for articles: {titleStr}",
555                    [ 'exception' => $exception, 'titleStr' => implode( ',', $titles ) ]
556                );
557                $status->error( 'cirrussearch-failed-update-otherindex' );
558            }
559        }
560
561        return $status;
562    }
563
564    /**
565     * Decide what action is required to the other index to make it up
566     * to data with the current wiki state. This will always check against
567     * the master database.
568     *
569     * @param Title $title The title to decide the action for
570     * @return string The set action to be performed. Either 'add' or 'remove'
571     */
572    protected function decideRequiredSetAction( Title $title ) {
573        $page = MediaWikiServices::getInstance()->getWikiPageFactory()->newFromTitle( $title );
574        $page->loadPageData( IDBAccessObject::READ_LATEST );
575        if ( $page->exists() ) {
576            return 'add';
577        } else {
578            return 'remove';
579        }
580    }
581
582    /**
583     * Check if $exception is a bulk response exception that just contains
584     * document is missing failures.
585     *
586     * @param ResponseException $exception exception to check
587     * @param callable|null $logCallback Callback in which to do some logging.
588     *   Callback will be passed the id of the missing document.
589     * @return bool
590     */
591    protected function bulkResponseExceptionIsJustDocumentMissing(
592        ResponseException $exception, $logCallback = null
593    ) {
594        $justDocumentMissing = true;
595        foreach ( $exception->getResponseSet()->getBulkResponses() as $bulkResponse ) {
596            if ( !$bulkResponse->hasError() ) {
597                continue;
598            }
599
600            $error = $bulkResponse->getFullError();
601            if ( is_string( $error ) ) {
602                // es 1.7 cluster
603                $message = $bulkResponse->getError();
604                if ( strpos( $message, 'DocumentMissingException' ) === false ) {
605                    $justDocumentMissing = false;
606                    continue;
607                }
608            } else {
609                // es 2.x cluster
610                if ( $error !== null && $error['type'] !== 'document_missing_exception' ) {
611                    $justDocumentMissing = false;
612                    continue;
613                }
614            }
615
616            if ( $logCallback ) {
617                // This is generally not an error but we should
618                // log it to see how many we get
619                $action = $bulkResponse->getAction();
620                $docId = 'missing';
621                if ( $action instanceof \Elastica\Bulk\Action\AbstractDocument ) {
622                    $docId = $action->getData()->getId();
623                }
624                call_user_func( $logCallback, $docId );
625            }
626        }
627        return $justDocumentMissing;
628    }
629
630    /**
631     * @param string $description
632     * @param string $queryType
633     * @param string[] $extra
634     * @return SearchRequestLog
635     */
636    protected function newLog( $description, $queryType, array $extra = [] ) {
637        return new SearchRequestLog(
638            $this->connection->getClient(),
639            $description,
640            $queryType,
641            $extra
642        );
643    }
644
645    /**
646     * Converts a document into a call to super_detect_noop from the wikimedia-extra plugin.
647     * @internal made public for testing purposes
648     * @param \Elastica\Document $doc
649     * @return \Elastica\Script\Script
650     */
651    public function docToSuperDetectNoopScript( \Elastica\Document $doc ) {
652        $handlers = CirrusIndexField::getHint( $doc, CirrusIndexField::NOOP_HINT );
653        $params = array_diff_key( $doc->getParams(), [ CirrusIndexField::DOC_HINT_PARAM => 1 ] );
654
655        $params['source'] = $doc->getData();
656
657        if ( $handlers ) {
658            Assert::precondition( is_array( $handlers ), "Noop hints must be an array" );
659            $params['handlers'] = $handlers;
660        } else {
661            $params['handlers'] = [];
662        }
663        $extraHandlers = $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop_handlers' );
664        if ( is_array( $extraHandlers ) ) {
665            $params['handlers'] += $extraHandlers;
666        }
667
668        if ( $params['handlers'] === [] ) {
669            // The noop script only supports Map but an empty array
670            // may be transformed to [] instead of {} when serialized to json
671            // causing class cast exception failures
672            $params['handlers'] = (object)[];
673        }
674        $script = new \Elastica\Script\Script( 'super_detect_noop', $params, 'super_detect_noop' );
675        if ( $doc->getDocAsUpsert() ) {
676            CirrusIndexField::resetHints( $doc );
677            $script->setUpsert( $doc );
678        }
679
680        return $script;
681    }
682
683    /**
684     * @return int Number of times to instruct Elasticsearch to retry updates that fail on
685     *  version conflicts.
686     */
687    private function retryOnConflict(): int {
688        return $this->searchConfig->get(
689            'CirrusSearchUpdateConflictRetryCount' );
690    }
691
692    private function convertEncoding( $d ) {
693        if ( is_string( $d ) ) {
694            return mb_convert_encoding( $d, 'UTF-8', 'UTF-8' );
695        }
696
697        foreach ( $d as &$v ) {
698            $v = $this->convertEncoding( $v );
699        }
700
701        return $d;
702    }
703
704    private function reportDocSize( Document $doc ): void {
705        $cluster = $this->connection->getClusterName();
706        try {
707            // Use the same JSON output that Elastica uses, it might not be the options MW uses
708            // to populate event-gate (esp. regarding escaping UTF-8) but hopefully it's close
709            // to what we will be using.
710            $len = strlen( JSON::stringify( $doc->getData(), \JSON_UNESCAPED_UNICODE | \JSON_UNESCAPED_SLASHES ) );
711            // Use a timing stat as we'd like to have percentiles calculated (possibly use T348796 once available)
712            // note that prior to switching to prometheus we used to have min and max, if that's proven to be still useful
713            // to track abnormally large docs we might consider another approach (log a warning?)
714            $this->stats->getTiming( "update_doc_size_kb" )
715                ->setLabel( "search_cluster", $cluster )
716                ->copyToStatsdAt( "CirrusSearch.$cluster.updates.all.doc_size" )
717                ->observe( $len );
718
719        } catch ( \JsonException $e ) {
720            $this->log->warning( "Cannot estimate CirrusSearch doc size", [ "exception" => $e ] );
721        }
722    }
723
724}