Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
66.31% covered (warning)
66.31%
185 / 279
28.57% covered (danger)
28.57%
4 / 14
CRAP
0.00% covered (danger)
0.00%
0 / 1
DataSender
66.31% covered (warning)
66.31%
185 / 279
28.57% covered (danger)
28.57%
4 / 14
399.78
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
1
 sendUpdateWeightedTags
84.38% covered (warning)
84.38%
54 / 64
0.00% covered (danger)
0.00%
0 / 1
22.68
 sendResetWeightedTags
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 sendData
71.08% covered (warning)
71.08%
59 / 83
0.00% covered (danger)
0.00%
0 / 1
40.11
 reportUpdateMetrics
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
56
 sendDeletes
70.00% covered (warning)
70.00%
14 / 20
0.00% covered (danger)
0.00%
0 / 1
5.68
 sendOtherIndexUpdates
70.97% covered (warning)
70.97%
22 / 31
0.00% covered (danger)
0.00%
0 / 1
8.20
 decideRequiredSetAction
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
2.03
 bulkResponseExceptionIsJustDocumentMissing
0.00% covered (danger)
0.00%
0 / 20
0.00% covered (danger)
0.00%
0 / 1
90
 newLog
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 docToSuperDetectNoopScript
94.12% covered (success)
94.12%
16 / 17
0.00% covered (danger)
0.00%
0 / 1
5.01
 retryOnConflict
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 convertEncoding
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 reportDocSize
66.67% covered (warning)
66.67%
4 / 6
0.00% covered (danger)
0.00%
0 / 1
2.15
1<?php
2
3namespace CirrusSearch;
4
5use CirrusSearch\BuildDocument\BuildDocument;
6use CirrusSearch\BuildDocument\BuildDocumentException;
7use CirrusSearch\BuildDocument\DocumentSizeLimiter;
8use CirrusSearch\Profile\SearchProfileService;
9use CirrusSearch\Search\CirrusIndexField;
10use Elastica\Bulk\Action\AbstractDocument;
11use Elastica\Document;
12use Elastica\Exception\Bulk\ResponseException;
13use Elastica\JSON;
14use Liuggio\StatsdClient\Factory\StatsdDataFactory;
15use MediaWiki\Logger\LoggerFactory;
16use MediaWiki\MediaWikiServices;
17use Status;
18use Title;
19use Wikimedia\Assert\Assert;
20use WikiPage;
21
22/**
23 * Handles non-maintenance write operations to the elastic search cluster.
24 *
25 * This program is free software; you can redistribute it and/or modify
26 * it under the terms of the GNU General Public License as published by
27 * the Free Software Foundation; either version 2 of the License, or
28 * (at your option) any later version.
29 *
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33 * GNU General Public License for more details.
34 *
35 * You should have received a copy of the GNU General Public License along
36 * with this program; if not, write to the Free Software Foundation, Inc.,
37 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
38 * http://www.gnu.org/copyleft/gpl.html
39 */
40class DataSender extends ElasticsearchIntermediary {
41    /** @var \Psr\Log\LoggerInterface */
42    private $log;
43
44    /** @var \Psr\Log\LoggerInterface */
45    private $failedLog;
46
47    /**
48     * @var string
49     */
50    private $indexBaseName;
51
52    /**
53     * @var SearchConfig
54     */
55    private $searchConfig;
56
57    private $stats;
58    /**
59     * @var DocumentSizeLimiter
60     */
61    private $docSizeLimiter;
62
63    /**
64     * @param Connection $conn
65     * @param SearchConfig $config
66     * @param StatsdDataFactory|null $stats
67     * @param DocumentSizeLimiter|null $docSizeLimiter
68     */
69    public function __construct(
70        Connection $conn,
71        SearchConfig $config,
72        StatsdDataFactory $stats = null,
73        DocumentSizeLimiter $docSizeLimiter = null
74    ) {
75        parent::__construct( $conn, null, 0 );
76        $this->stats = $stats ?? MediaWikiServices::getInstance()->getStatsdDataFactory();
77        $this->log = LoggerFactory::getInstance( 'CirrusSearch' );
78        $this->failedLog = LoggerFactory::getInstance( 'CirrusSearchChangeFailed' );
79        $this->indexBaseName = $config->get( SearchConfig::INDEX_BASE_NAME );
80        $this->searchConfig = $config;
81        $this->docSizeLimiter = $docSizeLimiter ?? new DocumentSizeLimiter(
82            $config->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) );
83    }
84
85    /**
86     * @param string $indexSuffix
87     * @param string[] $docIds
88     * @param string $tagField
89     * @param string $tagPrefix
90     * @param string|string[]|null $tagNames A tag name or list of tag names. Each tag will be
91     *   set for each document ID. Omit for tags which are fully defined by their prefix.
92     * @param int[]|int[][]|null $tagWeights An optional map of docid => weight. When $tagName is
93     *   null, the weight is an integer. When $tagName is not null, the weight is itself a
94     *   tagname => int map. Weights are between 1-1000, and can be omitted (in which case no
95     *   update will be sent for the corresponding docid/tag combination).
96     * @param int $batchSize
97     * @return Status
98     */
99    public function sendUpdateWeightedTags(
100        string $indexSuffix,
101        array $docIds,
102        string $tagField,
103        string $tagPrefix,
104        $tagNames = null,
105        array $tagWeights = null,
106        int $batchSize = 30
107    ): Status {
108        Assert::parameterType( [ 'string', 'array', 'null' ], $tagNames, '$tagNames' );
109        if ( is_array( $tagNames ) ) {
110            Assert::parameterElementType( 'string', $tagNames, '$tagNames' );
111        }
112        if ( $tagNames === null ) {
113            $tagNames = [ 'exists' ];
114            if ( $tagWeights !== null ) {
115                $tagWeights = [ 'exists' => $tagWeights ];
116            }
117        } elseif ( is_string( $tagNames ) ) {
118            $tagNames = [ $tagNames ];
119        }
120
121        Assert::precondition( strpos( $tagPrefix, '/' ) === false,
122            "invalid tag prefix $tagPrefix: must not contain /" );
123        foreach ( $tagNames as $tagName ) {
124            Assert::precondition( strpos( $tagName, '|' ) === false,
125                "invalid tag name $tagName: must not contain |" );
126        }
127        if ( $tagWeights ) {
128            foreach ( $tagWeights as $docId => $docWeights ) {
129                Assert::precondition( in_array( $docId, $docIds ),
130                    "document ID $docId used in \$tagWeights but not found in \$docIds" );
131                foreach ( $docWeights as $tagName => $weight ) {
132                    Assert::precondition( in_array( $tagName, $tagNames, true ),
133                        "tag name $tagName used in \$tagWeights but not found in \$tagNames" );
134                    Assert::precondition( is_int( $weight ), "weights must be integers but $weight is "
135                        . gettype( $weight ) );
136                    Assert::precondition( $weight >= 1 && $weight <= 1000,
137                        "weights must be between 1 and 1000 (found: $weight)" );
138                }
139            }
140        }
141
142        $client = $this->connection->getClient();
143        $status = Status::newGood();
144        $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix );
145        foreach ( array_chunk( $docIds, $batchSize ) as $docIdsChunk ) {
146            $bulk = new \Elastica\Bulk( $client );
147            $bulk->setIndex( $pageIndex );
148            foreach ( $docIdsChunk as $docId ) {
149                $tags = [];
150                foreach ( $tagNames as $tagName ) {
151                    $tagValue = "$tagPrefix/$tagName";
152                    if ( $tagWeights !== null ) {
153                        if ( !isset( $tagWeights[$docId][$tagName] ) ) {
154                            continue;
155                        }
156                        // To pass the assertions above, the weight must be either an int, a float
157                        // with an integer value, or a string representation of one of those.
158                        // Convert to int to ensure there is no trailing '.0'.
159                        $tagValue .= '|' . (int)$tagWeights[$docId][$tagName];
160                    }
161                    $tags[] = $tagValue;
162                }
163                if ( !$tags ) {
164                    continue;
165                }
166                $script = new \Elastica\Script\Script( 'super_detect_noop', [
167                    'source' => [ $tagField => $tags ],
168                    'handlers' => [ $tagField => CirrusIndexField::MULTILIST_HANDLER ],
169                ], 'super_detect_noop' );
170                $script->setId( $docId );
171                $bulk->addScript( $script, 'update' );
172            }
173
174            if ( !$bulk->getActions() ) {
175                continue;
176            }
177
178            // Execute the bulk update
179            $exception = null;
180            try {
181                $this->start( new BulkUpdateRequestLog( $this->connection->getClient(),
182                    'updating {numBulk} documents',
183                    'send_data_reset_weighted_tags',
184                    [ 'numBulk' => count( $docIdsChunk ), 'index' => $pageIndex->getName() ]
185                ) );
186                $bulk->send();
187            } catch ( ResponseException $e ) {
188                if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) {
189                    $exception = $e;
190                }
191            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
192                $exception = $e;
193            }
194            if ( $exception === null ) {
195                $this->success();
196            } else {
197                $this->failure( $exception );
198                $this->failedLog->warning( "Update weighted tag {weightedTagFieldName} for {weightedTagPrefix} in articles: {documents}",
199                    [
200                        'exception' => $exception,
201                        'weightedTagFieldName' => $tagField,
202                        'weightedTagPrefix' => $tagPrefix,
203                        'weightedTagNames' => implode( '|', $tagNames ),
204                        'weightedTagWeight' => $tagWeights,
205                        'docIds' => implode( ',', $docIds )
206                    ] );
207                $status->error( 'cirrussearch-failed-update-weighted-tags' );
208            }
209        }
210        return $status;
211    }
212
213    /**
214     * @param string $indexSuffix
215     * @param string[] $docIds
216     * @param string $tagField
217     * @param string $tagPrefix
218     * @param int $batchSize
219     * @return Status
220     */
221    public function sendResetWeightedTags(
222        string $indexSuffix,
223        array $docIds,
224        string $tagField,
225        string $tagPrefix,
226        int $batchSize = 30
227    ): Status {
228        return $this->sendUpdateWeightedTags(
229            $indexSuffix,
230            $docIds,
231            $tagField,
232            $tagPrefix,
233            CirrusIndexField::MULTILIST_DELETE_GROUPING,
234            null,
235            $batchSize
236        );
237    }
238
239    /**
240     * @param string $indexSuffix suffix of index to which to send $documents
241     * @param \Elastica\Document[] $documents documents to send
242     * @return Status
243     */
244    public function sendData( $indexSuffix, array $documents ) {
245        if ( !$documents ) {
246            return Status::newGood();
247        }
248
249        // Copy the docs so that modifications made in this method are not propagated up to the caller
250        $docsCopy = [];
251        foreach ( $documents as $doc ) {
252            $docsCopy[] = clone $doc;
253        }
254        $documents = $docsCopy;
255
256        // Perform final stage of document building. This only
257        // applies to `page` documents, docs built by something
258        // other than BuildDocument will pass through unchanged.
259        $services = MediaWikiServices::getInstance();
260        $builder = new BuildDocument(
261            $this->connection,
262            $services->getDBLoadBalancer()->getConnection( DB_REPLICA ),
263            $services->getParserCache(),
264            $services->getRevisionStore(),
265            $services->getBacklinkCacheFactory(),
266            $this->docSizeLimiter,
267            $services->getTitleFormatter()
268        );
269        try {
270            foreach ( $documents as $i => $doc ) {
271                if ( !$builder->finalize( $doc ) ) {
272                    // Something has changed while this was hanging out in the job
273                    // queue and should no longer be written to elastic.
274                    unset( $documents[$i] );
275                }
276                $this->reportDocSize( $doc );
277            }
278        } catch ( BuildDocumentException $be ) {
279            $this->failedLog->warning(
280                'Failed to update documents',
281                [ 'exception' => $be ]
282            );
283            return Status::newFatal( 'cirrussearch-failed-build-document' );
284        }
285
286        if ( !$documents ) {
287            // All documents noop'd
288            return Status::newGood();
289        }
290
291        /**
292         * Transform the finalized documents into noop scripts if possible
293         * to reduce update load.
294         */
295        if ( $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop' ) ) {
296            foreach ( $documents as $i => $doc ) {
297                // BC Check for jobs that used to contain Document|Script
298                if ( $doc instanceof \Elastica\Document ) {
299                    $documents[$i] = $this->docToSuperDetectNoopScript( $doc );
300                }
301            }
302        }
303
304        foreach ( $documents as $doc ) {
305            $doc->setRetryOnConflict( $this->retryOnConflict() );
306            // Hints need to be retained until after finalizing
307            // the documents and building the noop scripts.
308            CirrusIndexField::resetHints( $doc );
309        }
310
311        $exception = null;
312        $responseSet = null;
313        $justDocumentMissing = false;
314        try {
315            $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix );
316
317            $this->start( new BulkUpdateRequestLog(
318                $this->connection->getClient(),
319                'sending {numBulk} documents to the {index} index(s)',
320                'send_data_write',
321                [ 'numBulk' => count( $documents ), 'index' => $pageIndex->getName() ]
322            ) );
323            $bulk = new \Elastica\Bulk( $this->connection->getClient() );
324            $bulk->setShardTimeout( $this->searchConfig->get( 'CirrusSearchUpdateShardTimeout' ) );
325            $bulk->setIndex( $pageIndex );
326            if ( $this->searchConfig->getElement( 'CirrusSearchElasticQuirks', 'retry_on_conflict' ) ) {
327                $actions = [];
328                foreach ( $documents as $doc ) {
329                    $action = AbstractDocument::create( $doc, 'update' );
330                    $metadata = $action->getMetadata();
331                    // Rename deprecated _retry_on_conflict
332                    // TODO: fix upstream in Elastica.
333                    if ( isset( $metadata['_retry_on_conflict'] ) ) {
334                        $metadata['retry_on_conflict'] = $metadata['_retry_on_conflict'];
335                        unset( $metadata['_retry_on_conflict'] );
336                        $action->setMetadata( $metadata );
337                    }
338                    $actions[] = $action;
339                }
340
341                $bulk->addActions( $actions );
342            } else {
343                $bulk->addData( $documents, 'update' );
344            }
345            $responseSet = $bulk->send();
346        } catch ( ResponseException $e ) {
347            $justDocumentMissing = $this->bulkResponseExceptionIsJustDocumentMissing( $e,
348                function ( $docId ) use ( $e, $indexSuffix ) {
349                    $this->log->info(
350                        "Updating a page that doesn't yet exist in Elasticsearch: {docId}",
351                        [ 'docId' => $docId, 'indexSuffix' => $indexSuffix ]
352                    );
353                }
354            );
355            if ( !$justDocumentMissing ) {
356                $exception = $e;
357            }
358        } catch ( \Elastica\Exception\ExceptionInterface $e ) {
359            $exception = $e;
360        }
361
362        // TODO: rewrite error handling, the logic here is hard to follow
363        $validResponse = $responseSet !== null && count( $responseSet->getBulkResponses() ) > 0;
364        if ( $exception === null && ( $justDocumentMissing || $validResponse ) ) {
365            $this->success();
366            if ( $validResponse ) {
367                // @phan-suppress-next-line PhanTypeMismatchArgumentNullable responseset is not null
368                $this->reportUpdateMetrics( $responseSet, $indexSuffix, count( $documents ) );
369            }
370            return Status::newGood();
371        } else {
372            $this->failure( $exception );
373            $documentIds = array_map( static function ( $d ) {
374                return (string)( $d->getId() );
375            }, $documents );
376            $logContext = [ 'docId' => implode( ', ', $documentIds ) ];
377            if ( $exception ) {
378                $logContext['exception'] = $exception;
379            } else {
380                // we want to figure out error_massage from the responseData log, because
381                // error_message is currently not set when exception is null and response is not
382                // valid
383                $responseData = $responseSet->getData();
384                $responseDataString = json_encode( $responseData );
385
386                // in logstash some error_message seems to be empty we are assuming its due to
387                // non UTF-8 sequences in the response data causing the json_encode to return empty
388                // string,so we added a logic to validate the assumption
389                if ( json_last_error() === JSON_ERROR_UTF8 ) {
390                    $responseDataString =
391                        json_encode( $this->convertEncoding( $responseData ) );
392                } elseif ( json_last_error() !== JSON_ERROR_NONE ) {
393                    $responseDataString = json_last_error_msg();
394                }
395
396                $logContext['error_message'] = mb_substr( $responseDataString, 0, 4096 );
397            }
398            $this->failedLog->warning(
399                'Failed to update documents {docId}',
400                $logContext
401            );
402            return Status::newFatal( 'cirrussearch-failed-send-data' );
403        }
404    }
405
406    /**
407     * @param \Elastica\Bulk\ResponseSet $responseSet
408     * @param string $indexSuffix
409     * @param int $sent
410     */
411    private function reportUpdateMetrics(
412        \Elastica\Bulk\ResponseSet $responseSet, $indexSuffix, $sent
413    ) {
414        $updateStats = [
415            'sent' => $sent,
416        ];
417        $allowedOps = [ 'created', 'updated', 'noop' ];
418        foreach ( $responseSet->getBulkResponses() as $bulk ) {
419            $opRes = 'unknown';
420            if ( $bulk instanceof \Elastica\Bulk\Response ) {
421                if ( isset( $bulk->getData()['result'] )
422                    && in_array( $bulk->getData()['result'], $allowedOps )
423                ) {
424                    $opRes = $bulk->getData()['result'];
425                }
426            }
427            if ( isset( $updateStats[$opRes] ) ) {
428                $updateStats[$opRes]++;
429            } else {
430                $updateStats[$opRes] = 1;
431            }
432        }
433        $cluster = $this->connection->getClusterName();
434        $metricsPrefix = "CirrusSearch.$cluster.updates";
435        foreach ( $updateStats as $what => $num ) {
436            $this->stats->updateCount(
437                "$metricsPrefix.details.{$this->indexBaseName}.$indexSuffix.$what", $num
438            );
439            $this->stats->updateCount( "$metricsPrefix.all.$what", $num );
440        }
441    }
442
443    /**
444     * Send delete requests to Elasticsearch.
445     *
446     * @param string[] $docIds elasticsearch document ids to delete
447     * @param string|null $indexSuffix index from which to delete.  null means all.
448     * @return Status
449     */
450    public function sendDeletes( $docIds, $indexSuffix = null ) {
451        if ( $indexSuffix === null ) {
452            $indexes = $this->connection->getAllIndexSuffixes( Connection::PAGE_DOC_TYPE );
453        } else {
454            $indexes = [ $indexSuffix ];
455        }
456
457        $idCount = count( $docIds );
458        if ( $idCount !== 0 ) {
459            try {
460                foreach ( $indexes as $indexSuffix ) {
461                    $this->startNewLog(
462                        'deleting {numIds} from {indexSuffix}',
463                        'send_deletes', [
464                            'numIds' => $idCount,
465                            'indexSuffix' => $indexSuffix,
466                        ]
467                    );
468                    $this->connection
469                        ->getIndex( $this->indexBaseName, $indexSuffix )
470                        ->deleteDocuments(
471                            array_map(
472                                static function ( $id ) {
473                                    return new Document( $id );
474                                }, $docIds
475                            )
476                        );
477                    $this->success();
478                }
479            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
480                $this->failure( $e );
481                $this->failedLog->warning(
482                    'Failed to delete documents: {docId}',
483                    [
484                        'docId' => implode( ', ', $docIds ),
485                        'exception' => $e,
486                    ]
487                );
488                return Status::newFatal( 'cirrussearch-failed-send-deletes' );
489            }
490        }
491
492        return Status::newGood();
493    }
494
495    /**
496     * @param string $localSite The wikiId to add/remove from local_sites_with_dupe
497     * @param string $indexName The name of the index to perform updates to
498     * @param array[] $otherActions A list of arrays each containing the id within elasticsearch
499     *   ('docId') and the article namespace ('ns') and DB key ('dbKey') at the within $localSite
500     * @param int $batchSize number of docs to update in a single bulk
501     * @return Status
502     */
503    public function sendOtherIndexUpdates( $localSite, $indexName, array $otherActions, $batchSize = 30 ) {
504        $client = $this->connection->getClient();
505        $status = Status::newGood();
506        foreach ( array_chunk( $otherActions, $batchSize ) as $updates ) {
507            '@phan-var array[] $updates';
508            $bulk = new \Elastica\Bulk( $client );
509            $titles = [];
510            foreach ( $updates as $update ) {
511                $title = Title::makeTitle( $update['ns'], $update['dbKey'] );
512                $action = $this->decideRequiredSetAction( $title );
513                $script = new \Elastica\Script\Script(
514                    'super_detect_noop',
515                    [
516                        'source' => [
517                            'local_sites_with_dupe' => [ $action => $localSite ],
518                        ],
519                        'handlers' => [ 'local_sites_with_dupe' => 'set' ],
520                    ],
521                    'super_detect_noop'
522                );
523                $script->setId( $update['docId'] );
524                $script->setParam( '_type', '_doc' );
525                $script->setParam( '_index', $indexName );
526                $bulk->addScript( $script, 'update' );
527                $titles[] = $title;
528            }
529
530            // Execute the bulk update
531            $exception = null;
532            try {
533                $this->start( new BulkUpdateRequestLog(
534                    $this->connection->getClient(),
535                    'updating {numBulk} documents in other indexes',
536                    'send_data_other_idx_write',
537                        [ 'numBulk' => count( $updates ), 'index' => $indexName ]
538                    ) );
539                $bulk->send();
540            } catch ( ResponseException $e ) {
541                if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) {
542                    $exception = $e;
543                }
544            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
545                $exception = $e;
546            }
547            if ( $exception === null ) {
548                $this->success();
549            } else {
550                $this->failure( $exception );
551                $this->failedLog->warning(
552                    "OtherIndex update for articles: {titleStr}",
553                    [ 'exception' => $exception, 'titleStr' => implode( ',', $titles ) ]
554                );
555                $status->error( 'cirrussearch-failed-update-otherindex' );
556            }
557        }
558
559        return $status;
560    }
561
562    /**
563     * Decide what action is required to the other index to make it up
564     * to data with the current wiki state. This will always check against
565     * the master database.
566     *
567     * @param Title $title The title to decide the action for
568     * @return string The set action to be performed. Either 'add' or 'remove'
569     */
570    protected function decideRequiredSetAction( Title $title ) {
571        $page = MediaWikiServices::getInstance()->getWikiPageFactory()->newFromTitle( $title );
572        $page->loadPageData( WikiPage::READ_LATEST );
573        if ( $page->exists() ) {
574            return 'add';
575        } else {
576            return 'remove';
577        }
578    }
579
580    /**
581     * Check if $exception is a bulk response exception that just contains
582     * document is missing failures.
583     *
584     * @param ResponseException $exception exception to check
585     * @param callable|null $logCallback Callback in which to do some logging.
586     *   Callback will be passed the id of the missing document.
587     * @return bool
588     */
589    protected function bulkResponseExceptionIsJustDocumentMissing(
590        ResponseException $exception, $logCallback = null
591    ) {
592        $justDocumentMissing = true;
593        foreach ( $exception->getResponseSet()->getBulkResponses() as $bulkResponse ) {
594            if ( !$bulkResponse->hasError() ) {
595                continue;
596            }
597
598            $error = $bulkResponse->getFullError();
599            if ( is_string( $error ) ) {
600                // es 1.7 cluster
601                $message = $bulkResponse->getError();
602                if ( strpos( $message, 'DocumentMissingException' ) === false ) {
603                    $justDocumentMissing = false;
604                    continue;
605                }
606            } else {
607                // es 2.x cluster
608                if ( $error !== null && $error['type'] !== 'document_missing_exception' ) {
609                    $justDocumentMissing = false;
610                    continue;
611                }
612            }
613
614            if ( $logCallback ) {
615                // This is generally not an error but we should
616                // log it to see how many we get
617                $action = $bulkResponse->getAction();
618                $docId = 'missing';
619                if ( $action instanceof \Elastica\Bulk\Action\AbstractDocument ) {
620                    $docId = $action->getData()->getId();
621                }
622                call_user_func( $logCallback, $docId );
623            }
624        }
625        return $justDocumentMissing;
626    }
627
628    /**
629     * @param string $description
630     * @param string $queryType
631     * @param string[] $extra
632     * @return SearchRequestLog
633     */
634    protected function newLog( $description, $queryType, array $extra = [] ) {
635        return new SearchRequestLog(
636            $this->connection->getClient(),
637            $description,
638            $queryType,
639            $extra
640        );
641    }
642
643    /**
644     * Converts a document into a call to super_detect_noop from the wikimedia-extra plugin.
645     * @internal made public for testing purposes
646     * @param \Elastica\Document $doc
647     * @return \Elastica\Script\Script
648     */
649    public function docToSuperDetectNoopScript( \Elastica\Document $doc ) {
650        $handlers = CirrusIndexField::getHint( $doc, CirrusIndexField::NOOP_HINT );
651        $params = array_diff_key( $doc->getParams(), [ CirrusIndexField::DOC_HINT_PARAM => 1 ] );
652
653        $params['source'] = $doc->getData();
654
655        if ( $handlers ) {
656            Assert::precondition( is_array( $handlers ), "Noop hints must be an array" );
657            $params['handlers'] = $handlers;
658        } else {
659            $params['handlers'] = [];
660        }
661        $extraHandlers = $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop_handlers' );
662        if ( is_array( $extraHandlers ) ) {
663            $params['handlers'] += $extraHandlers;
664        }
665
666        if ( $params['handlers'] === [] ) {
667            // The noop script only supports Map but an empty array
668            // may be transformed to [] instead of {} when serialized to json
669            // causing class cast exception failures
670            $params['handlers'] = (object)[];
671        }
672        $script = new \Elastica\Script\Script( 'super_detect_noop', $params, 'super_detect_noop' );
673        if ( $doc->getDocAsUpsert() ) {
674            CirrusIndexField::resetHints( $doc );
675            $script->setUpsert( $doc );
676        }
677
678        return $script;
679    }
680
681    /**
682     * @return int Number of times to instruct Elasticsearch to retry updates that fail on
683     *  version conflicts.
684     */
685    private function retryOnConflict(): int {
686        return $this->searchConfig->get(
687            'CirrusSearchUpdateConflictRetryCount' );
688    }
689
690    private function convertEncoding( $d ) {
691        if ( is_string( $d ) ) {
692            return mb_convert_encoding( $d, 'UTF-8', 'UTF-8' );
693        }
694
695        foreach ( $d as &$v ) {
696            $v = $this->convertEncoding( $v );
697        }
698
699        return $d;
700    }
701
702    private function reportDocSize( Document $doc ): void {
703        $cluster = $this->connection->getClusterName();
704        $metricsPrefix = "CirrusSearch.$cluster.updates.all.doc_size";
705        try {
706            // Use the same JSON output that Elastica uses, it might be the options MW uses
707            // to populate event-gate (esp. regarding escaping UTF-8) but hopefully it's close
708            // to what we will be using.
709            $len = strlen( JSON::stringify( $doc->getData(), \JSON_UNESCAPED_UNICODE | \JSON_UNESCAPED_SLASHES ) );
710            // Use a timing stat as we'd like to have some min, max and percentiles calculated
711            $this->stats->timing( $metricsPrefix, $len );
712        } catch ( \JsonException $e ) {
713            $this->log->warning( "Cannot estimate CirrusSearch doc size", [ "exception" => $e ] );
714        }
715    }
716
717}