Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
30.61% covered (danger)
30.61%
30 / 98
50.00% covered (danger)
50.00%
4 / 8
CRAP
0.00% covered (danger)
0.00%
0 / 1
ElasticaWrite
30.61% covered (danger)
30.61%
30 / 98
50.00% covered (danger)
50.00%
4 / 8
153.63
0.00% covered (danger)
0.00%
0 / 1
 build
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
1
 partitioningKey
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 serde
25.00% covered (danger)
25.00%
2 / 8
0.00% covered (danger)
0.00%
0 / 1
21.19
 __construct
100.00% covered (success)
100.00%
9 / 9
100.00% covered (success)
100.00%
1 / 1
1
 allowRetries
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doJob
0.00% covered (danger)
0.00%
0 / 36
0.00% covered (danger)
0.00%
0 / 1
30
 requeueError
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
6
 reportUpdateLag
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
3
1<?php
2
3namespace CirrusSearch\Job;
4
5use CirrusSearch\ClusterSettings;
6use CirrusSearch\Connection;
7use CirrusSearch\DataSender;
8use CirrusSearch\UpdateGroup;
9use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
10use MediaWiki\Logger\LoggerFactory;
11use MediaWiki\MediaWikiServices;
12use MediaWiki\Status\Status;
13use MediaWiki\Utils\MWTimestamp;
14use Wikimedia\Assert\Assert;
15
16/**
17 * Performs writes to elasticsearch indexes with requeuing and an
18 * exponential backoff (if supported by jobqueue) when the index
19 * writes fail.
20 *
21 * This program is free software; you can redistribute it and/or modify
22 * it under the terms of the GNU General Public License as published by
23 * the Free Software Foundation; either version 2 of the License, or
24 * (at your option) any later version.
25 *
26 * This program is distributed in the hope that it will be useful,
27 * but WITHOUT ANY WARRANTY; without even the implied warranty of
28 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * GNU General Public License for more details.
30 *
31 * You should have received a copy of the GNU General Public License along
32 * with this program; if not, write to the Free Software Foundation, Inc.,
33 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
34 * http://www.gnu.org/copyleft/gpl.html
35 */
36class ElasticaWrite extends CirrusGenericJob {
37
38    private const MAX_ERROR_RETRY = 4;
39
40    /**
41     * @var array Map from method name to list of classes to
42     *  handle serialization for each argument.
43     */
44    private static $SERDE = [
45        'sendData' => [ null, ElasticaDocumentsJsonSerde::class ],
46    ];
47
48    /**
49     * @param ClusterSettings $cluster
50     * @param string $updateGroup UpdateGroup::* constant
51     * @param string $method
52     * @param array $arguments
53     * @param array $params
54     * @param string|null $updateKind the kind of update to perform (used for monitoring)
55     * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring)
56     * @return ElasticaWrite
57     */
58    public static function build(
59        ClusterSettings $cluster,
60        string $updateGroup,
61        string $method,
62        array $arguments,
63        array $params = [],
64        string $updateKind = null,
65        int $rootEventTime = null
66    ) {
67        return new self( [
68            'method' => $method,
69            'arguments' => self::serde( $method, $arguments ),
70            'cluster' => $cluster->getName(),
71            // This does not directly partition the jobs, it only provides a value
72            // to use during partitioning. The job queue must be separately
73            // configured to utilize this value.
74            'jobqueue_partition' => self::partitioningKey( $cluster ),
75            'update_group' => $updateGroup,
76            CirrusTitleJob::UPDATE_KIND => $updateKind,
77            CirrusTitleJob::ROOT_EVENT_TIME => $rootEventTime
78        ] + $params );
79    }
80
81    /**
82     * Generate a cluster specific partitioning key
83     *
84     * Some job queue implementations, such as cpjobqueue, can partition the
85     * execution of jobs based on a parameter of the job. By default we
86     * provide one partition per cluster, but allow to configure multiple
87     * partitions per cluster if more throughput is necessary. Within a
88     * single cluster jobs are distributed randomly.
89     *
90     * @param ClusterSettings $settings
91     * @return string A value suitable for partitioning jobs per-cluster
92     */
93    private static function partitioningKey( ClusterSettings $settings ): string {
94        $numPartitions = $settings->getElasticaWritePartitionCount();
95        $partition = mt_rand() % $numPartitions;
96        return "{$settings->getName()}-{$partition}";
97    }
98
99    private static function serde( $method, array $arguments, $serialize = true ) {
100        if ( isset( self::$SERDE[$method] ) ) {
101            foreach ( self::$SERDE[$method] as $i => $serde ) {
102                if ( $serde !== null && array_key_exists( $i, $arguments ) ) {
103                    $impl = new $serde();
104                    if ( $serialize ) {
105                        $arguments[$i] = $impl->serialize( $arguments[$i] );
106                    } else {
107                        $arguments[$i] = $impl->deserialize( $arguments[$i] );
108                    }
109                }
110            }
111        }
112        return $arguments;
113    }
114
115    /**
116     * Entry point for jobs received from the job queue. Creating new
117     * jobs should be done via self::build.
118     *
119     * @param array $params
120     */
121    public function __construct( array $params ) {
122        parent::__construct( $params + [
123            'createdAt' => time(),
124            'errorCount' => 0,
125            'retryCount' => 0,
126            'cluster' => null,
127            CirrusTitleJob::UPDATE_KIND => null,
128            CirrusTitleJob::ROOT_EVENT_TIME => null,
129            // BC for jobs created pre-1.42
130            'update_group' => UpdateGroup::PAGE,
131        ] );
132    }
133
134    /**
135     * This job handles all its own retries internally. These jobs are so
136     * numerous that if they were to start failing they would possibly
137     * overflow the job queue and bring down redis in production.
138     *
139     * Basically we just can't let these jobs hang out in the abandoned
140     * queue for a week like retries typically do. If these jobs get
141     * failed they will log to CirrusSearchChangeFailed which is a signal
142     * that some point in time around the failure needs to be reindexed
143     * manually. See https://wikitech.wikimedia.org/wiki/Search for more
144     * details.
145     * @return bool
146     */
147    public function allowRetries() {
148        return false;
149    }
150
151    /**
152     * @return bool
153     */
154    protected function doJob() {
155        // While we can only have a single connection per job, we still
156        // use decideClusters() which includes a variety of safeguards.
157        $connections = $this->decideClusters( $this->params['update_group'] );
158        if ( !$connections ) {
159            // Chosen cluster no longer exists in configuration.
160            return true;
161        }
162        Assert::precondition( count( $connections ) == 1,
163            'per self::build() we must have a single connection' );
164
165        $conn = reset( $connections );
166        $arguments = self::serde( $this->params['method'], $this->params['arguments'], false );
167
168        LoggerFactory::getInstance( 'CirrusSearch' )->debug(
169            "Running {method} on cluster {cluster} {diff}s after insertion",
170            [
171                'method' => $this->params['method'],
172                'arguments' => $arguments,
173                'diff' => time() - $this->params['createdAt'],
174                'cluster' => $conn->getClusterName(),
175            ]
176        );
177
178        $sender = new DataSender( $conn, $this->searchConfig );
179        try {
180            $status = $sender->{$this->params['method']}( ...$arguments );
181        } catch ( \Exception $e ) {
182            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
183                "Exception thrown while running DataSender::{method} in cluster {cluster}: {errorMessage}",
184                [
185                    'method' => $this->params['method'],
186                    'cluster' => $conn->getClusterName(),
187                    'errorMessage' => $e->getMessage(),
188                    'exception' => $e,
189                ]
190            );
191            $status = Status::newFatal( 'cirrussearch-send-failure' );
192        }
193
194        $ok = true;
195        if ( !$status->isOK() ) {
196            $action = $this->requeueError( $conn ) ? "Requeued" : "Dropped";
197            $this->setLastError( "ElasticaWrite job failed: {$action}" );
198            $ok = false;
199        } else {
200            $this->reportUpdateLag( $conn->getClusterName() );
201        }
202
203        return $ok;
204    }
205
206    /**
207     * Re-queue job that failed, or drop the job if it has failed
208     * too many times
209     *
210     * @param Connection $conn
211     * @return bool True when the job has been queued
212     */
213    private function requeueError( Connection $conn ) {
214        if ( $this->params['errorCount'] >= self::MAX_ERROR_RETRY ) {
215            LoggerFactory::getInstance( 'CirrusSearchChangeFailed' )->warning(
216                "Dropping failing ElasticaWrite job for DataSender::{method} in cluster {cluster} after repeated failure",
217                [
218                    'method' => $this->params['method'],
219                    'cluster' => $conn->getClusterName(),
220                ]
221            );
222            return false;
223        } else {
224            $delay = $this->backoffDelay( $this->params['retryCount'] );
225            $params = $this->params;
226            $params['errorCount']++;
227            unset( $params['jobReleaseTimestamp'] );
228            $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup();
229            $params += self::buildJobDelayOptions( self::class, $delay, $jobQueue );
230            $job = new self( $params );
231            // Individual failures should have already logged specific errors,
232            LoggerFactory::getInstance( 'CirrusSearch' )->info(
233                "ElasticaWrite job reported failure on cluster {cluster}. Requeueing job with delay of {delay}.",
234                [
235                    'cluster' => $conn->getClusterName(),
236                    'delay' => $delay
237                ]
238            );
239            $jobQueue->push( $job );
240            return true;
241        }
242    }
243
244    /**
245     * Report the update lag based on stored params if set.
246     * @param string $cluster
247     * @param StatsdDataFactoryInterface|null $statsdDataFactory
248     * @return void
249     */
250    public function reportUpdateLag( string $cluster, StatsdDataFactoryInterface $statsdDataFactory = null ): void {
251        $params = $this->getParams();
252        $updateKind = $params[CirrusTitleJob::UPDATE_KIND] ?? null;
253        $eventTime = $params[CirrusTitleJob::ROOT_EVENT_TIME] ?? null;
254        if ( $updateKind !== null && $eventTime !== null ) {
255            $now = MWTimestamp::time();
256            $statsdDataFactory ??= MediaWikiServices::getInstance()->getStatsdDataFactory();
257            $statsdDataFactory->timing( "CirrusSearch.$cluster.updates.all.lag.$updateKind", $now - $eventTime );
258        }
259    }
260}