Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
64.47% covered (warning)
64.47%
225 / 349
33.33% covered (danger)
33.33%
5 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
DataSender
64.47% covered (warning)
64.47%
225 / 349
33.33% covered (danger)
33.33%
5 / 15
335.07
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
1
 sendUpdateWeightedTags
100.00% covered (success)
100.00%
15 / 15
100.00% covered (success)
100.00%
1 / 1
3
 sendWeightedTagsUpdate
69.81% covered (warning)
69.81%
37 / 53
0.00% covered (danger)
0.00%
0 / 1
9.76
 sendResetWeightedTags
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 sendData
68.69% covered (warning)
68.69%
68 / 99
0.00% covered (danger)
0.00%
0 / 1
34.54
 reportUpdateMetrics
0.00% covered (danger)
0.00%
0 / 26
0.00% covered (danger)
0.00%
0 / 1
56
 sendDeletes
67.65% covered (warning)
67.65%
23 / 34
0.00% covered (danger)
0.00%
0 / 1
5.85
 sendOtherIndexUpdates
75.56% covered (warning)
75.56%
34 / 45
0.00% covered (danger)
0.00%
0 / 1
7.72
 decideRequiredSetAction
80.00% covered (warning)
80.00%
4 / 5
0.00% covered (danger)
0.00%
0 / 1
2.03
 bulkResponseExceptionIsJustDocumentMissing
0.00% covered (danger)
0.00%
0 / 20
0.00% covered (danger)
0.00%
0 / 1
90
 newLog
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 docToSuperDetectNoopScript
94.12% covered (success)
94.12%
16 / 17
0.00% covered (danger)
0.00%
0 / 1
5.01
 retryOnConflict
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 convertEncoding
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 reportDocSize
75.00% covered (warning)
75.00%
6 / 8
0.00% covered (danger)
0.00%
0 / 1
2.06
1<?php
2
3namespace CirrusSearch;
4
5use CirrusSearch\BuildDocument\BuildDocument;
6use CirrusSearch\BuildDocument\BuildDocumentException;
7use CirrusSearch\BuildDocument\DocumentSizeLimiter;
8use CirrusSearch\Extra\MultiList\MultiListBuilder;
9use CirrusSearch\Profile\SearchProfileService;
10use CirrusSearch\Search\CirrusIndexField;
11use CirrusSearch\Wikimedia\WeightedTagsHooks;
12use Elastica\Bulk\Action\AbstractDocument;
13use Elastica\Document;
14use Elastica\Exception\Bulk\ResponseException;
15use Elastica\Exception\RuntimeException;
16use Elastica\JSON;
17use Elastica\Response;
18use MediaWiki\Logger\LoggerFactory;
19use MediaWiki\MediaWikiServices;
20use MediaWiki\Status\Status;
21use MediaWiki\Title\Title;
22use Wikimedia\Assert\Assert;
23use Wikimedia\Rdbms\IDBAccessObject;
24use Wikimedia\Stats\StatsFactory;
25
26/**
27 * Handles non-maintenance write operations to the elastic search cluster.
28 *
29 * This program is free software; you can redistribute it and/or modify
30 * it under the terms of the GNU General Public License as published by
31 * the Free Software Foundation; either version 2 of the License, or
32 * (at your option) any later version.
33 *
34 * This program is distributed in the hope that it will be useful,
35 * but WITHOUT ANY WARRANTY; without even the implied warranty of
36 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
37 * GNU General Public License for more details.
38 *
39 * You should have received a copy of the GNU General Public License along
40 * with this program; if not, write to the Free Software Foundation, Inc.,
41 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
42 * http://www.gnu.org/copyleft/gpl.html
43 */
44class DataSender extends ElasticsearchIntermediary {
45
46    /** @var \Psr\Log\LoggerInterface */
47    private $log;
48
49    /** @var \Psr\Log\LoggerInterface */
50    private $failedLog;
51
52    /**
53     * @var string
54     */
55    private $indexBaseName;
56
57    /**
58     * @var SearchConfig
59     */
60    private $searchConfig;
61
62    private StatsFactory $stats;
63    /**
64     * @var DocumentSizeLimiter
65     */
66    private $docSizeLimiter;
67
68    /**
69     * @param Connection $conn
70     * @param SearchConfig $config
71     * @param StatsFactory|null $stats A StatsFactory (already prefixed with the right component)
72     * @param DocumentSizeLimiter|null $docSizeLimiter
73     */
74    public function __construct(
75        Connection $conn,
76        SearchConfig $config,
77        ?StatsFactory $stats = null,
78        ?DocumentSizeLimiter $docSizeLimiter = null
79    ) {
80        parent::__construct( $conn, null, 0 );
81        $this->stats = $stats ?? Util::getStatsFactory();
82        $this->log = LoggerFactory::getInstance( 'CirrusSearch' );
83        $this->failedLog = LoggerFactory::getInstance( 'CirrusSearchChangeFailed' );
84        $this->indexBaseName = $config->get( SearchConfig::INDEX_BASE_NAME );
85        $this->searchConfig = $config;
86        $this->docSizeLimiter = $docSizeLimiter ?? new DocumentSizeLimiter(
87            $config->getProfileService()->loadProfile( SearchProfileService::DOCUMENT_SIZE_LIMITER ) );
88    }
89
90    /**
91     * @deprecated use {@link sendWeightedTagsUpdate} instead.
92     */
93    public function sendUpdateWeightedTags(
94        string $indexSuffix,
95        array $docIds,
96        string $tagField,
97        string $tagPrefix,
98        $tagNames = null,
99        ?array $tagWeights = null,
100        int $batchSize = 30
101    ): Status {
102        return $this->sendWeightedTagsUpdate(
103            $indexSuffix,
104            $tagPrefix,
105            is_array( $tagWeights ) ? array_reduce(
106                $docIds, static function ( $docTagsWeights, $docId ) use ( $tagNames, $tagWeights ) {
107                    if ( array_key_exists( $docId, $tagWeights ) ) {
108                        $docTagsWeights[$docId] = MultiListBuilder::buildTagWeightsFromLegacyParameters(
109                        $tagNames,
110                        $tagWeights[$docId]
111                        );
112                    }
113
114                    return $docTagsWeights;
115                }, []
116            ) : array_fill_keys( $docIds, MultiListBuilder::buildTagWeightsFromLegacyParameters( $tagNames ) ),
117            $batchSize
118        );
119    }
120
121    /**
122     * @param string $indexSuffix
123     * @param string $tagPrefix
124     * @param int[][]|null[][] $tagWeights a map of `[ docId: string => [ tagName: string => tagWeight: int|null ] ]`
125     * @param int $batchSize
126     *
127     * @return Status
128     */
129    public function sendWeightedTagsUpdate(
130        string $indexSuffix,
131        string $tagPrefix,
132        array $tagWeights,
133        int $batchSize = 30
134    ): Status {
135        $client = $this->connection->getClient();
136        $status = Status::newGood();
137        $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix );
138        foreach ( array_chunk( array_keys( $tagWeights ), $batchSize ) as $docIdsChunk ) {
139            $bulk = new \Elastica\Bulk( $client );
140            $bulk->setIndex( $pageIndex );
141            foreach ( $docIdsChunk as $docId ) {
142                $docTags = MultiListBuilder::buildWeightedTags(
143                    $tagPrefix,
144                    $tagWeights[$docId],
145                );
146                $script = new \Elastica\Script\Script( 'super_detect_noop', [
147                    'source' => [
148                        WeightedTagsHooks::FIELD_NAME => array_map( static fn ( $docTag ) => (string)$docTag,
149                            $docTags )
150                    ],
151                    'handlers' => [ WeightedTagsHooks::FIELD_NAME => CirrusIndexField::MULTILIST_HANDLER ],
152                ], 'super_detect_noop' );
153                $script->setId( $docId );
154                $bulk->addScript( $script, 'update' );
155            }
156
157            if ( !$bulk->getActions() ) {
158                continue;
159            }
160
161            // Execute the bulk update
162            $exception = null;
163            try {
164                $this->start(
165                    new BulkUpdateRequestLog(
166                        $this->connection->getClient(),
167                        'updating {numBulk} documents',
168                        'send_data_reset_weighted_tags',
169                        [
170                            'numBulk' => count( $docIdsChunk ),
171                            'index' => $pageIndex->getName()
172                        ]
173                    )
174                );
175                $bulk->send();
176            } catch ( ResponseException $e ) {
177                if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) {
178                    $exception = $e;
179                }
180            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
181                $exception = $e;
182            }
183            if ( $exception === null ) {
184                $this->success();
185            } else {
186                $this->failure( $exception );
187                $this->failedLog->warning(
188                    "Update weighted tag {weightedTagFieldName} for {weightedTagPrefix} in articles: {docIds}", [
189                        'exception' => $exception,
190                        'weightedTagFieldName' => WeightedTagsHooks::FIELD_NAME,
191                        'weightedTagPrefix' => $tagPrefix,
192                        'weightedTagWeight' => var_export( $tagWeights, true ),
193                        'docIds' => implode( ',', array_keys( $tagWeights ) )
194                    ]
195                );
196            }
197        }
198
199        return $status;
200    }
201
202    /**
203     * @deprecated use {@link sendWeightedTagsUpdate} instead.
204     */
205    public function sendResetWeightedTags(
206        string $indexSuffix,
207        array $docIds,
208        string $tagField,
209        string $tagPrefix,
210        int $batchSize = 30
211    ): Status {
212        return $this->sendWeightedTagsUpdate(
213            $indexSuffix,
214            $tagPrefix,
215            array_fill_keys( $docIds, [ CirrusIndexField::MULTILIST_DELETE_GROUPING => null ] ),
216            $batchSize
217        );
218    }
219
220    /**
221     * @param string $indexSuffix suffix of index to which to send $documents
222     * @param \Elastica\Document[] $documents documents to send
223     * @return Status
224     */
225    public function sendData( $indexSuffix, array $documents ) {
226        if ( !$documents ) {
227            return Status::newGood();
228        }
229
230        // Copy the docs so that modifications made in this method are not propagated up to the caller
231        $docsCopy = [];
232        foreach ( $documents as $doc ) {
233            $docsCopy[] = clone $doc;
234        }
235        $documents = $docsCopy;
236
237        // Perform final stage of document building. This only
238        // applies to `page` documents, docs built by something
239        // other than BuildDocument will pass through unchanged.
240        $services = MediaWikiServices::getInstance();
241        $builder = new BuildDocument(
242            $this->connection,
243            $services->getConnectionProvider()->getReplicaDatabase(),
244            $services->getRevisionStore(),
245            $services->getBacklinkCacheFactory(),
246            $this->docSizeLimiter,
247            $services->getTitleFormatter(),
248            $services->getWikiPageFactory(),
249            $services->getTitleFactory()
250        );
251        try {
252            foreach ( $documents as $i => $doc ) {
253                if ( !$builder->finalize( $doc ) ) {
254                    // Something has changed while this was hanging out in the job
255                    // queue and should no longer be written to elastic.
256                    unset( $documents[$i] );
257                }
258                $this->reportDocSize( $doc );
259            }
260        } catch ( BuildDocumentException $be ) {
261            $this->failedLog->warning(
262                'Failed to update documents',
263                [ 'exception' => $be ]
264            );
265            return Status::newFatal( 'cirrussearch-failed-build-document' );
266        }
267
268        if ( !$documents ) {
269            // All documents noop'd
270            return Status::newGood();
271        }
272
273        /**
274         * Transform the finalized documents into noop scripts if possible
275         * to reduce update load.
276         */
277        if ( $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop' ) ) {
278            foreach ( $documents as $i => $doc ) {
279                // BC Check for jobs that used to contain Document|Script
280                if ( $doc instanceof \Elastica\Document ) {
281                    $documents[$i] = $this->docToSuperDetectNoopScript( $doc );
282                }
283            }
284        }
285
286        foreach ( $documents as $doc ) {
287            $doc->setRetryOnConflict( $this->retryOnConflict() );
288            // Hints need to be retained until after finalizing
289            // the documents and building the noop scripts.
290            CirrusIndexField::resetHints( $doc );
291        }
292
293        $exception = null;
294        $responseSet = null;
295        $justDocumentMissing = false;
296        try {
297            $pageIndex = $this->connection->getIndex( $this->indexBaseName, $indexSuffix );
298
299            $this->start( new BulkUpdateRequestLog(
300                $this->connection->getClient(),
301                'sending {numBulk} documents to the {index} index(s)',
302                'send_data_write',
303                [ 'numBulk' => count( $documents ), 'index' => $pageIndex->getName() ]
304            ) );
305            $bulk = new \Elastica\Bulk( $this->connection->getClient() );
306            $bulk->setShardTimeout( $this->searchConfig->get( 'CirrusSearchUpdateShardTimeout' ) );
307            $bulk->setIndex( $pageIndex );
308            if ( $this->searchConfig->getElement( 'CirrusSearchElasticQuirks', 'retry_on_conflict' ) ) {
309                $actions = [];
310                foreach ( $documents as $doc ) {
311                    $action = AbstractDocument::create( $doc, 'update' );
312                    $metadata = $action->getMetadata();
313                    // Rename deprecated _retry_on_conflict
314                    // TODO: fix upstream in Elastica.
315                    if ( isset( $metadata['_retry_on_conflict'] ) ) {
316                        $metadata['retry_on_conflict'] = $metadata['_retry_on_conflict'];
317                        unset( $metadata['_retry_on_conflict'] );
318                        $action->setMetadata( $metadata );
319                    }
320                    $actions[] = $action;
321                }
322
323                $bulk->addActions( $actions );
324            } else {
325                $bulk->addData( $documents, 'update' );
326            }
327            $responseSet = $bulk->send();
328        } catch ( ResponseException $e ) {
329            $justDocumentMissing = $this->bulkResponseExceptionIsJustDocumentMissing( $e,
330                function ( $docId ) use ( $e, $indexSuffix ) {
331                    $this->log->info(
332                        "Updating a page that doesn't yet exist in Elasticsearch: {docId}",
333                        [ 'docId' => $docId, 'indexSuffix' => $indexSuffix ]
334                    );
335                }
336            );
337            $exception = $e;
338        } catch ( \Elastica\Exception\ExceptionInterface $e ) {
339            $exception = $e;
340        }
341
342        if ( $justDocumentMissing ) {
343            // wa have a failure but this is just docs that are missing in the index
344            // missing docs are logged above
345            $this->success();
346            return Status::newGood();
347        }
348        // check if the response is valid by making sure that it has bulk responses
349        if ( $responseSet !== null && count( $responseSet->getBulkResponses() ) > 0 ) {
350            $this->success();
351            $this->reportUpdateMetrics( $responseSet, $indexSuffix, count( $documents ) );
352            return Status::newGood();
353        }
354        // Everything else should be a failure.
355        if ( $exception === null ) {
356            // Elastica failed to identify the error, reason is that the Elastica Bulk\Response
357            // does identify errors only in individual responses if the request fails without
358            // getting a formal elastic response Bulk\Response->isOk might remain true
359            // So here we construct the ResponseException that should have been built and thrown
360            // by Elastica
361            $lastRequest = $this->connection->getClient()->getLastRequest();
362            if ( $lastRequest !== null ) {
363                $exception = new \Elastica\Exception\ResponseException( $lastRequest,
364                    new Response( $responseSet->getData() ) );
365            } else {
366                $exception = new RuntimeException( "Unknown error in bulk request (Client::getLastRequest() is null)" );
367            }
368        }
369        $this->failure( $exception );
370        $documentIds = array_map( static function ( $d ) {
371            return (string)( $d->getId() );
372        }, $documents );
373        $this->failedLog->warning(
374            'Failed to update documents {docId}',
375            [
376                'docId' => implode( ', ', $documentIds ),
377                'exception' => $exception
378            ]
379        );
380        return Status::newFatal( 'cirrussearch-failed-send-data' );
381    }
382
383    /**
384     * @param \Elastica\Bulk\ResponseSet $responseSet
385     * @param string $indexSuffix
386     * @param int $sent
387     */
388    private function reportUpdateMetrics(
389        \Elastica\Bulk\ResponseSet $responseSet, $indexSuffix, $sent
390    ) {
391        $updateStats = [
392            'sent' => $sent,
393        ];
394        $allowedOps = [ 'created', 'updated', 'noop' ];
395        foreach ( $responseSet->getBulkResponses() as $bulk ) {
396            $opRes = 'unknown';
397            if ( $bulk instanceof \Elastica\Bulk\Response ) {
398                if ( isset( $bulk->getData()['result'] )
399                    && in_array( $bulk->getData()['result'], $allowedOps )
400                ) {
401                    $opRes = $bulk->getData()['result'];
402                }
403            }
404            if ( isset( $updateStats[$opRes] ) ) {
405                $updateStats[$opRes]++;
406            } else {
407                $updateStats[$opRes] = 1;
408            }
409        }
410        $cluster = $this->connection->getClusterName();
411        $metricsPrefix = "CirrusSearch.$cluster.updates";
412        foreach ( $updateStats as $what => $num ) {
413            $this->stats->getCounter( "update_total" )
414                ->setLabel( "status", $what )
415                ->setLabel( "search_cluster", $cluster )
416                ->setLabel( "index_name", $this->indexBaseName )
417                ->setLabel( "index_suffix", $indexSuffix )
418                ->copyToStatsdAt( [
419                    "$metricsPrefix.details.{$this->indexBaseName}.$indexSuffix.$what",
420                    "$metricsPrefix.all.$what"
421                ] )
422                ->incrementBy( $num );
423        }
424    }
425
426    /**
427     * Send delete requests to Elasticsearch.
428     *
429     * @param string[] $docIds elasticsearch document ids to delete
430     * @param string|null $indexSuffix index from which to delete.  null means all.
431     * @return Status
432     */
433    public function sendDeletes( $docIds, $indexSuffix = null ) {
434        if ( $indexSuffix === null ) {
435            $indexes = $this->connection->getAllIndexSuffixes( Connection::PAGE_DOC_TYPE );
436        } else {
437            $indexes = [ $indexSuffix ];
438        }
439
440        $idCount = count( $docIds );
441        if ( $idCount !== 0 ) {
442            try {
443                foreach ( $indexes as $indexSuffix ) {
444                    $this->startNewLog(
445                        'deleting {numIds} from {indexSuffix}',
446                        'send_deletes', [
447                            'numIds' => $idCount,
448                            'indexSuffix' => $indexSuffix,
449                        ]
450                    );
451                    $this->connection
452                        ->getIndex( $this->indexBaseName, $indexSuffix )
453                        ->deleteDocuments(
454                            array_map(
455                                static function ( $id ) {
456                                    return new Document( $id );
457                                }, $docIds
458                            )
459                        );
460                    $this->success();
461                }
462            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
463                $this->failure( $e );
464                $this->failedLog->warning(
465                    'Failed to delete documents: {docId}',
466                    [
467                        'docId' => implode( ', ', $docIds ),
468                        'exception' => $e,
469                    ]
470                );
471                return Status::newFatal( 'cirrussearch-failed-send-deletes' );
472            }
473        }
474
475        return Status::newGood();
476    }
477
478    /**
479     * @param string $localSite The wikiId to add/remove from local_sites_with_dupe
480     * @param string $indexName The name of the index to perform updates to
481     * @param array[] $otherActions A list of arrays each containing the id within elasticsearch
482     *   ('docId') and the article namespace ('ns') and DB key ('dbKey') at the within $localSite
483     * @param int $batchSize number of docs to update in a single bulk
484     * @return Status
485     */
486    public function sendOtherIndexUpdates( $localSite, $indexName, array $otherActions, $batchSize = 30 ) {
487        $client = $this->connection->getClient();
488        $status = Status::newGood();
489        foreach ( array_chunk( $otherActions, $batchSize ) as $updates ) {
490            '@phan-var array[] $updates';
491            $bulk = new \Elastica\Bulk( $client );
492            $titles = [];
493            foreach ( $updates as $update ) {
494                $title = Title::makeTitle( $update['ns'], $update['dbKey'] );
495                $action = $this->decideRequiredSetAction( $title );
496                $script = new \Elastica\Script\Script(
497                    'super_detect_noop',
498                    [
499                        'source' => [
500                            'local_sites_with_dupe' => [ $action => $localSite ],
501                        ],
502                        'handlers' => [ 'local_sites_with_dupe' => 'set' ],
503                    ],
504                    'super_detect_noop'
505                );
506                $script->setId( $update['docId'] );
507                $script->setParam( '_type', '_doc' );
508                $script->setParam( '_index', $indexName );
509                $bulk->addScript( $script, 'update' );
510                $titles[] = $title;
511            }
512
513            // Execute the bulk update
514            $exception = null;
515            try {
516                $this->start( new BulkUpdateRequestLog(
517                    $this->connection->getClient(),
518                    'updating {numBulk} documents in other indexes',
519                    'send_data_other_idx_write',
520                    [ 'numBulk' => count( $updates ), 'index' => $indexName ]
521                ) );
522                $bulk->send();
523            } catch ( ResponseException $e ) {
524                if ( !$this->bulkResponseExceptionIsJustDocumentMissing( $e ) ) {
525                    $exception = $e;
526                }
527            } catch ( \Elastica\Exception\ExceptionInterface $e ) {
528                $exception = $e;
529            }
530            if ( $exception === null ) {
531                $this->success();
532            } else {
533                $this->failure( $exception );
534                $this->failedLog->warning(
535                    "OtherIndex update for articles: {titleStr}",
536                    [ 'exception' => $exception, 'titleStr' => implode( ',', $titles ) ]
537                );
538                $status->error( 'cirrussearch-failed-update-otherindex' );
539            }
540        }
541
542        return $status;
543    }
544
545    /**
546     * Decide what action is required to the other index to make it up
547     * to data with the current wiki state. This will always check against
548     * the master database.
549     *
550     * @param Title $title The title to decide the action for
551     * @return string The set action to be performed. Either 'add' or 'remove'
552     */
553    protected function decideRequiredSetAction( Title $title ) {
554        $page = MediaWikiServices::getInstance()->getWikiPageFactory()->newFromTitle( $title );
555        $page->loadPageData( IDBAccessObject::READ_LATEST );
556        if ( $page->exists() ) {
557            return 'add';
558        } else {
559            return 'remove';
560        }
561    }
562
563    /**
564     * Check if $exception is a bulk response exception that just contains
565     * document is missing failures.
566     *
567     * @param ResponseException $exception exception to check
568     * @param callable|null $logCallback Callback in which to do some logging.
569     *   Callback will be passed the id of the missing document.
570     * @return bool
571     */
572    protected function bulkResponseExceptionIsJustDocumentMissing(
573        ResponseException $exception, $logCallback = null
574    ) {
575        $justDocumentMissing = true;
576        foreach ( $exception->getResponseSet()->getBulkResponses() as $bulkResponse ) {
577            if ( !$bulkResponse->hasError() ) {
578                continue;
579            }
580
581            $error = $bulkResponse->getFullError();
582            if ( is_string( $error ) ) {
583                // es 1.7 cluster
584                $message = $bulkResponse->getError();
585                if ( strpos( $message, 'DocumentMissingException' ) === false ) {
586                    $justDocumentMissing = false;
587                    continue;
588                }
589            } else {
590                // es 2.x cluster
591                if ( $error !== null && $error['type'] !== 'document_missing_exception' ) {
592                    $justDocumentMissing = false;
593                    continue;
594                }
595            }
596
597            if ( $logCallback ) {
598                // This is generally not an error but we should
599                // log it to see how many we get
600                $action = $bulkResponse->getAction();
601                $docId = 'missing';
602                if ( $action instanceof \Elastica\Bulk\Action\AbstractDocument ) {
603                    $docId = $action->getData()->getId();
604                }
605                call_user_func( $logCallback, $docId );
606            }
607        }
608        return $justDocumentMissing;
609    }
610
611    /**
612     * @param string $description
613     * @param string $queryType
614     * @param string[] $extra
615     * @return SearchRequestLog
616     */
617    protected function newLog( $description, $queryType, array $extra = [] ) {
618        return new SearchRequestLog(
619            $this->connection->getClient(),
620            $description,
621            $queryType,
622            $extra
623        );
624    }
625
626    /**
627     * Converts a document into a call to super_detect_noop from the wikimedia-extra plugin.
628     * @param \Elastica\Document $doc
629     * @return \Elastica\Script\Script
630     * @internal made public for testing purposes
631     */
632    public function docToSuperDetectNoopScript( \Elastica\Document $doc ) {
633        $handlers = CirrusIndexField::getHint( $doc, CirrusIndexField::NOOP_HINT );
634        $params = array_diff_key( $doc->getParams(), [ CirrusIndexField::DOC_HINT_PARAM => 1 ] );
635
636        $params['source'] = $doc->getData();
637
638        if ( $handlers ) {
639            Assert::precondition( is_array( $handlers ), "Noop hints must be an array" );
640            $params['handlers'] = $handlers;
641        } else {
642            $params['handlers'] = [];
643        }
644        $extraHandlers = $this->searchConfig->getElement( 'CirrusSearchWikimediaExtraPlugin', 'super_detect_noop_handlers' );
645        if ( is_array( $extraHandlers ) ) {
646            $params['handlers'] += $extraHandlers;
647        }
648
649        if ( $params['handlers'] === [] ) {
650            // The noop script only supports Map but an empty array
651            // may be transformed to [] instead of {} when serialized to json
652            // causing class cast exception failures
653            $params['handlers'] = (object)[];
654        }
655        $script = new \Elastica\Script\Script( 'super_detect_noop', $params, 'super_detect_noop' );
656        if ( $doc->getDocAsUpsert() ) {
657            CirrusIndexField::resetHints( $doc );
658            $script->setUpsert( $doc );
659        }
660
661        return $script;
662    }
663
664    /**
665     * @return int Number of times to instruct Elasticsearch to retry updates that fail on
666     *  version conflicts.
667     */
668    private function retryOnConflict(): int {
669        return $this->searchConfig->get(
670            'CirrusSearchUpdateConflictRetryCount' );
671    }
672
673    private function convertEncoding( $d ) {
674        if ( is_string( $d ) ) {
675            return mb_convert_encoding( $d, 'UTF-8', 'UTF-8' );
676        }
677
678        foreach ( $d as &$v ) {
679            $v = $this->convertEncoding( $v );
680        }
681
682        return $d;
683    }
684
685    private function reportDocSize( Document $doc ): void {
686        $cluster = $this->connection->getClusterName();
687        try {
688            // Use the same JSON output that Elastica uses, it might not be the options MW uses
689            // to populate event-gate (esp. regarding escaping UTF-8) but hopefully it's close
690            // to what we will be using.
691            $len = strlen( JSON::stringify( $doc->getData(), \JSON_UNESCAPED_UNICODE | \JSON_UNESCAPED_SLASHES ) );
692            // Use a timing stat as we'd like to have percentiles calculated (possibly use T348796 once available)
693            // note that prior to switching to prometheus we used to have min and max, if that's proven to be still useful
694            // to track abnormally large docs we might consider another approach (log a warning?)
695            $this->stats->getTiming( "update_doc_size_kb" )
696                ->setLabel( "search_cluster", $cluster )
697                ->copyToStatsdAt( "CirrusSearch.$cluster.updates.all.doc_size" )
698                ->observe( $len );
699
700        } catch ( \JsonException $e ) {
701            $this->log->warning( "Cannot estimate CirrusSearch doc size", [ "exception" => $e ] );
702        }
703    }
704
705}