Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
20.00% covered (danger)
20.00%
17 / 85
42.86% covered (danger)
42.86%
3 / 7
CRAP
0.00% covered (danger)
0.00%
0 / 1
ElasticaWrite
20.00% covered (danger)
20.00%
17 / 85
42.86% covered (danger)
42.86%
3 / 7
164.97
0.00% covered (danger)
0.00%
0 / 1
 build
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 partitioningKey
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 serde
25.00% covered (danger)
25.00%
2 / 8
0.00% covered (danger)
0.00%
0 / 1
21.19
 __construct
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 allowRetries
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 doJob
0.00% covered (danger)
0.00%
0 / 37
0.00% covered (danger)
0.00%
0 / 1
30
 requeueError
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3namespace CirrusSearch\Job;
4
5use CirrusSearch\ClusterSettings;
6use CirrusSearch\Connection;
7use CirrusSearch\DataSender;
8use MediaWiki\Logger\LoggerFactory;
9use MediaWiki\MediaWikiServices;
10use Status;
11use Wikimedia\Assert\Assert;
12
13/**
14 * Performs writes to elasticsearch indexes with requeuing and an
15 * exponential backoff (if supported by jobqueue) when the index
16 * writes fail.
17 *
18 * This program is free software; you can redistribute it and/or modify
19 * it under the terms of the GNU General Public License as published by
20 * the Free Software Foundation; either version 2 of the License, or
21 * (at your option) any later version.
22 *
23 * This program is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License along
29 * with this program; if not, write to the Free Software Foundation, Inc.,
30 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
31 * http://www.gnu.org/copyleft/gpl.html
32 */
33class ElasticaWrite extends CirrusGenericJob {
34    private const MAX_ERROR_RETRY = 4;
35
36    /**
37     * @var array Map from method name to list of classes to
38     *  handle serialization for each argument.
39     */
40    private static $SERDE = [
41        'sendData' => [ null, ElasticaDocumentsJsonSerde::class ],
42    ];
43
44    /**
45     * @param ClusterSettings $cluster
46     * @param string $method
47     * @param array $arguments
48     * @param array $params
49     * @return ElasticaWrite
50     */
51    public static function build( ClusterSettings $cluster, string $method, array $arguments, array $params = [] ) {
52        return new self( [
53            'method' => $method,
54            'arguments' => self::serde( $method, $arguments ),
55            'cluster' => $cluster->getName(),
56            // This does not directly partition the jobs, it only provides a value
57            // to use during partitioning. The job queue must be separately
58            // configured to utilize this value.
59            'jobqueue_partition' => self::partitioningKey( $cluster ),
60        ] + $params );
61    }
62
63    /**
64     * Generate a cluster specific partitioning key
65     *
66     * Some job queue implementations, such as cpjobqueue, can partition the
67     * execution of jobs based on a parameter of the job. By default we
68     * provide one partition per cluster, but allow to configure multiple
69     * partitions per cluster if more throughput is necessary. Within a
70     * single cluster jobs are distributed randomly.
71     *
72     * @param ClusterSettings $settings
73     * @return string A value suitable for partitioning jobs per-cluster
74     */
75    private static function partitioningKey( ClusterSettings $settings ): string {
76        $numPartitions = $settings->getElasticaWritePartitionCount();
77        $partition = mt_rand() % $numPartitions;
78        return "{$settings->getName()}-{$partition}";
79    }
80
81    private static function serde( $method, array $arguments, $serialize = true ) {
82        if ( isset( self::$SERDE[$method] ) ) {
83            foreach ( self::$SERDE[$method] as $i => $serde ) {
84                if ( $serde !== null && array_key_exists( $i, $arguments ) ) {
85                    $impl = new $serde();
86                    if ( $serialize ) {
87                        $arguments[$i] = $impl->serialize( $arguments[$i] );
88                    } else {
89                        $arguments[$i] = $impl->deserialize( $arguments[$i] );
90                    }
91                }
92            }
93        }
94        return $arguments;
95    }
96
97    /**
98     * Entry point for jobs received from the job queue. Creating new
99     * jobs should be done via self::build.
100     *
101     * @param array $params
102     */
103    public function __construct( array $params ) {
104        parent::__construct( $params + [
105            'createdAt' => time(),
106            'errorCount' => 0,
107            'retryCount' => 0,
108            'cluster' => null,
109        ] );
110    }
111
112    /**
113     * This job handles all its own retries internally. These jobs are so
114     * numerous that if they were to start failing they would possibly
115     * overflow the job queue and bring down redis in production.
116     *
117     * Basically we just can't let these jobs hang out in the abandoned
118     * queue for a week like retries typically do. If these jobs get
119     * failed they will log to CirrusSearchChangeFailed which is a signal
120     * that some point in time around the failure needs to be reindexed
121     * manually. See https://wikitech.wikimedia.org/wiki/Search for more
122     * details.
123     * @return bool
124     */
125    public function allowRetries() {
126        return false;
127    }
128
129    /**
130     * @return bool
131     */
132    protected function doJob() {
133        // While we can only have a single connection per job, we still
134        // use decideClusters() which includes a variety of safeguards.
135        $connections = $this->decideClusters();
136        if ( empty( $connections ) ) {
137            // Chosen cluster no longer exists in configuration.
138            return true;
139        }
140        Assert::precondition( count( $connections ) == 1,
141            'per self::build() we must have a single connection' );
142
143        $conn = reset( $connections );
144        $arguments = self::serde( $this->params['method'], $this->params['arguments'], false );
145
146        LoggerFactory::getInstance( 'CirrusSearch' )->debug(
147            "Running {method} on cluster {cluster} {diff}s after insertion",
148            [
149                'method' => $this->params['method'],
150                'arguments' => $arguments,
151                'diff' => time() - $this->params['createdAt'],
152                'cluster' => $conn->getClusterName(),
153            ]
154        );
155
156        $retry = [];
157        $error = [];
158        $sender = new DataSender( $conn, $this->searchConfig );
159        try {
160            $status = $sender->{$this->params['method']}( ...$arguments );
161        } catch ( \Exception $e ) {
162            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
163                "Exception thrown while running DataSender::{method} in cluster {cluster}: {errorMessage}",
164                [
165                    'method' => $this->params['method'],
166                    'cluster' => $conn->getClusterName(),
167                    'errorMessage' => $e->getMessage(),
168                    'exception' => $e,
169                ]
170            );
171            $status = Status::newFatal( 'cirrussearch-send-failure' );
172        }
173
174        $ok = true;
175        if ( !$status->isOK() ) {
176            $action = $this->requeueError( $conn ) ? "Requeued" : "Dropped";
177            $this->setLastError( "ElasticaWrite job failed: {$action}" );
178            $ok = false;
179        }
180
181        return $ok;
182    }
183
184    /**
185     * Re-queue job that failed, or drop the job if it has failed
186     * too many times
187     *
188     * @param Connection $conn
189     * @return bool True when the job has been queued
190     */
191    private function requeueError( Connection $conn ) {
192        if ( $this->params['errorCount'] >= self::MAX_ERROR_RETRY ) {
193            LoggerFactory::getInstance( 'CirrusSearchChangeFailed' )->warning(
194                "Dropping failing ElasticaWrite job for DataSender::{method} in cluster {cluster} after repeated failure",
195                [
196                    'method' => $this->params['method'],
197                    'cluster' => $conn->getClusterName(),
198                ]
199            );
200            return false;
201        } else {
202            $delay = $this->backoffDelay( $this->params['retryCount'] );
203            $params = $this->params;
204            $params['errorCount']++;
205            unset( $params['jobReleaseTimestamp'] );
206            $params += self::buildJobDelayOptions( self::class, $delay );
207            $job = new self( $params );
208            // Individual failures should have already logged specific errors,
209            LoggerFactory::getInstance( 'CirrusSearch' )->info(
210                "ElasticaWrite job reported failure on cluster {cluster}. Requeueing job with delay of {delay}.",
211                [
212                    'cluster' => $conn->getClusterName(),
213                    'delay' => $delay
214                ]
215            );
216            MediaWikiServices::getInstance()->getJobQueueGroup()->push( $job );
217            return true;
218        }
219    }
220}