Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
20.00% |
17 / 85 |
|
42.86% |
3 / 7 |
CRAP | |
0.00% |
0 / 1 |
ElasticaWrite | |
20.00% |
17 / 85 |
|
42.86% |
3 / 7 |
164.97 | |
0.00% |
0 / 1 |
build | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
partitioningKey | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
serde | |
25.00% |
2 / 8 |
|
0.00% |
0 / 1 |
21.19 | |||
__construct | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
allowRetries | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doJob | |
0.00% |
0 / 37 |
|
0.00% |
0 / 1 |
30 | |||
requeueError | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
6 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Job; |
4 | |
5 | use CirrusSearch\ClusterSettings; |
6 | use CirrusSearch\Connection; |
7 | use CirrusSearch\DataSender; |
8 | use MediaWiki\Logger\LoggerFactory; |
9 | use MediaWiki\MediaWikiServices; |
10 | use Status; |
11 | use Wikimedia\Assert\Assert; |
12 | |
13 | /** |
14 | * Performs writes to elasticsearch indexes with requeuing and an |
15 | * exponential backoff (if supported by jobqueue) when the index |
16 | * writes fail. |
17 | * |
18 | * This program is free software; you can redistribute it and/or modify |
19 | * it under the terms of the GNU General Public License as published by |
20 | * the Free Software Foundation; either version 2 of the License, or |
21 | * (at your option) any later version. |
22 | * |
23 | * This program is distributed in the hope that it will be useful, |
24 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
25 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
26 | * GNU General Public License for more details. |
27 | * |
28 | * You should have received a copy of the GNU General Public License along |
29 | * with this program; if not, write to the Free Software Foundation, Inc., |
30 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
31 | * http://www.gnu.org/copyleft/gpl.html |
32 | */ |
33 | class ElasticaWrite extends CirrusGenericJob { |
34 | private const MAX_ERROR_RETRY = 4; |
35 | |
36 | /** |
37 | * @var array Map from method name to list of classes to |
38 | * handle serialization for each argument. |
39 | */ |
40 | private static $SERDE = [ |
41 | 'sendData' => [ null, ElasticaDocumentsJsonSerde::class ], |
42 | ]; |
43 | |
44 | /** |
45 | * @param ClusterSettings $cluster |
46 | * @param string $method |
47 | * @param array $arguments |
48 | * @param array $params |
49 | * @return ElasticaWrite |
50 | */ |
51 | public static function build( ClusterSettings $cluster, string $method, array $arguments, array $params = [] ) { |
52 | return new self( [ |
53 | 'method' => $method, |
54 | 'arguments' => self::serde( $method, $arguments ), |
55 | 'cluster' => $cluster->getName(), |
56 | // This does not directly partition the jobs, it only provides a value |
57 | // to use during partitioning. The job queue must be separately |
58 | // configured to utilize this value. |
59 | 'jobqueue_partition' => self::partitioningKey( $cluster ), |
60 | ] + $params ); |
61 | } |
62 | |
63 | /** |
64 | * Generate a cluster specific partitioning key |
65 | * |
66 | * Some job queue implementations, such as cpjobqueue, can partition the |
67 | * execution of jobs based on a parameter of the job. By default we |
68 | * provide one partition per cluster, but allow to configure multiple |
69 | * partitions per cluster if more throughput is necessary. Within a |
70 | * single cluster jobs are distributed randomly. |
71 | * |
72 | * @param ClusterSettings $settings |
73 | * @return string A value suitable for partitioning jobs per-cluster |
74 | */ |
75 | private static function partitioningKey( ClusterSettings $settings ): string { |
76 | $numPartitions = $settings->getElasticaWritePartitionCount(); |
77 | $partition = mt_rand() % $numPartitions; |
78 | return "{$settings->getName()}-{$partition}"; |
79 | } |
80 | |
81 | private static function serde( $method, array $arguments, $serialize = true ) { |
82 | if ( isset( self::$SERDE[$method] ) ) { |
83 | foreach ( self::$SERDE[$method] as $i => $serde ) { |
84 | if ( $serde !== null && array_key_exists( $i, $arguments ) ) { |
85 | $impl = new $serde(); |
86 | if ( $serialize ) { |
87 | $arguments[$i] = $impl->serialize( $arguments[$i] ); |
88 | } else { |
89 | $arguments[$i] = $impl->deserialize( $arguments[$i] ); |
90 | } |
91 | } |
92 | } |
93 | } |
94 | return $arguments; |
95 | } |
96 | |
97 | /** |
98 | * Entry point for jobs received from the job queue. Creating new |
99 | * jobs should be done via self::build. |
100 | * |
101 | * @param array $params |
102 | */ |
103 | public function __construct( array $params ) { |
104 | parent::__construct( $params + [ |
105 | 'createdAt' => time(), |
106 | 'errorCount' => 0, |
107 | 'retryCount' => 0, |
108 | 'cluster' => null, |
109 | ] ); |
110 | } |
111 | |
112 | /** |
113 | * This job handles all its own retries internally. These jobs are so |
114 | * numerous that if they were to start failing they would possibly |
115 | * overflow the job queue and bring down redis in production. |
116 | * |
117 | * Basically we just can't let these jobs hang out in the abandoned |
118 | * queue for a week like retries typically do. If these jobs get |
119 | * failed they will log to CirrusSearchChangeFailed which is a signal |
120 | * that some point in time around the failure needs to be reindexed |
121 | * manually. See https://wikitech.wikimedia.org/wiki/Search for more |
122 | * details. |
123 | * @return bool |
124 | */ |
125 | public function allowRetries() { |
126 | return false; |
127 | } |
128 | |
129 | /** |
130 | * @return bool |
131 | */ |
132 | protected function doJob() { |
133 | // While we can only have a single connection per job, we still |
134 | // use decideClusters() which includes a variety of safeguards. |
135 | $connections = $this->decideClusters(); |
136 | if ( empty( $connections ) ) { |
137 | // Chosen cluster no longer exists in configuration. |
138 | return true; |
139 | } |
140 | Assert::precondition( count( $connections ) == 1, |
141 | 'per self::build() we must have a single connection' ); |
142 | |
143 | $conn = reset( $connections ); |
144 | $arguments = self::serde( $this->params['method'], $this->params['arguments'], false ); |
145 | |
146 | LoggerFactory::getInstance( 'CirrusSearch' )->debug( |
147 | "Running {method} on cluster {cluster} {diff}s after insertion", |
148 | [ |
149 | 'method' => $this->params['method'], |
150 | 'arguments' => $arguments, |
151 | 'diff' => time() - $this->params['createdAt'], |
152 | 'cluster' => $conn->getClusterName(), |
153 | ] |
154 | ); |
155 | |
156 | $retry = []; |
157 | $error = []; |
158 | $sender = new DataSender( $conn, $this->searchConfig ); |
159 | try { |
160 | $status = $sender->{$this->params['method']}( ...$arguments ); |
161 | } catch ( \Exception $e ) { |
162 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
163 | "Exception thrown while running DataSender::{method} in cluster {cluster}: {errorMessage}", |
164 | [ |
165 | 'method' => $this->params['method'], |
166 | 'cluster' => $conn->getClusterName(), |
167 | 'errorMessage' => $e->getMessage(), |
168 | 'exception' => $e, |
169 | ] |
170 | ); |
171 | $status = Status::newFatal( 'cirrussearch-send-failure' ); |
172 | } |
173 | |
174 | $ok = true; |
175 | if ( !$status->isOK() ) { |
176 | $action = $this->requeueError( $conn ) ? "Requeued" : "Dropped"; |
177 | $this->setLastError( "ElasticaWrite job failed: {$action}" ); |
178 | $ok = false; |
179 | } |
180 | |
181 | return $ok; |
182 | } |
183 | |
184 | /** |
185 | * Re-queue job that failed, or drop the job if it has failed |
186 | * too many times |
187 | * |
188 | * @param Connection $conn |
189 | * @return bool True when the job has been queued |
190 | */ |
191 | private function requeueError( Connection $conn ) { |
192 | if ( $this->params['errorCount'] >= self::MAX_ERROR_RETRY ) { |
193 | LoggerFactory::getInstance( 'CirrusSearchChangeFailed' )->warning( |
194 | "Dropping failing ElasticaWrite job for DataSender::{method} in cluster {cluster} after repeated failure", |
195 | [ |
196 | 'method' => $this->params['method'], |
197 | 'cluster' => $conn->getClusterName(), |
198 | ] |
199 | ); |
200 | return false; |
201 | } else { |
202 | $delay = $this->backoffDelay( $this->params['retryCount'] ); |
203 | $params = $this->params; |
204 | $params['errorCount']++; |
205 | unset( $params['jobReleaseTimestamp'] ); |
206 | $params += self::buildJobDelayOptions( self::class, $delay ); |
207 | $job = new self( $params ); |
208 | // Individual failures should have already logged specific errors, |
209 | LoggerFactory::getInstance( 'CirrusSearch' )->info( |
210 | "ElasticaWrite job reported failure on cluster {cluster}. Requeueing job with delay of {delay}.", |
211 | [ |
212 | 'cluster' => $conn->getClusterName(), |
213 | 'delay' => $delay |
214 | ] |
215 | ); |
216 | MediaWikiServices::getInstance()->getJobQueueGroup()->push( $job ); |
217 | return true; |
218 | } |
219 | } |
220 | } |