Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
30.61% |
30 / 98 |
|
50.00% |
4 / 8 |
CRAP | |
0.00% |
0 / 1 |
ElasticaWrite | |
30.61% |
30 / 98 |
|
50.00% |
4 / 8 |
153.63 | |
0.00% |
0 / 1 |
build | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
1 | |||
partitioningKey | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
serde | |
25.00% |
2 / 8 |
|
0.00% |
0 / 1 |
21.19 | |||
__construct | |
100.00% |
9 / 9 |
|
100.00% |
1 / 1 |
1 | |||
allowRetries | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
doJob | |
0.00% |
0 / 36 |
|
0.00% |
0 / 1 |
30 | |||
requeueError | |
0.00% |
0 / 25 |
|
0.00% |
0 / 1 |
6 | |||
reportUpdateLag | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
3 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Job; |
4 | |
5 | use CirrusSearch\ClusterSettings; |
6 | use CirrusSearch\Connection; |
7 | use CirrusSearch\DataSender; |
8 | use CirrusSearch\UpdateGroup; |
9 | use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; |
10 | use MediaWiki\Logger\LoggerFactory; |
11 | use MediaWiki\MediaWikiServices; |
12 | use MediaWiki\Status\Status; |
13 | use MediaWiki\Utils\MWTimestamp; |
14 | use Wikimedia\Assert\Assert; |
15 | |
16 | /** |
17 | * Performs writes to elasticsearch indexes with requeuing and an |
18 | * exponential backoff (if supported by jobqueue) when the index |
19 | * writes fail. |
20 | * |
21 | * This program is free software; you can redistribute it and/or modify |
22 | * it under the terms of the GNU General Public License as published by |
23 | * the Free Software Foundation; either version 2 of the License, or |
24 | * (at your option) any later version. |
25 | * |
26 | * This program is distributed in the hope that it will be useful, |
27 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
28 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
29 | * GNU General Public License for more details. |
30 | * |
31 | * You should have received a copy of the GNU General Public License along |
32 | * with this program; if not, write to the Free Software Foundation, Inc., |
33 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
34 | * http://www.gnu.org/copyleft/gpl.html |
35 | */ |
36 | class ElasticaWrite extends CirrusGenericJob { |
37 | |
38 | private const MAX_ERROR_RETRY = 4; |
39 | |
40 | /** |
41 | * @var array Map from method name to list of classes to |
42 | * handle serialization for each argument. |
43 | */ |
44 | private static $SERDE = [ |
45 | 'sendData' => [ null, ElasticaDocumentsJsonSerde::class ], |
46 | ]; |
47 | |
48 | /** |
49 | * @param ClusterSettings $cluster |
50 | * @param string $updateGroup UpdateGroup::* constant |
51 | * @param string $method |
52 | * @param array $arguments |
53 | * @param array $params |
54 | * @param string|null $updateKind the kind of update to perform (used for monitoring) |
55 | * @param int|null $rootEventTime the time of MW event that caused this update (used for monitoring) |
56 | * @return ElasticaWrite |
57 | */ |
58 | public static function build( |
59 | ClusterSettings $cluster, |
60 | string $updateGroup, |
61 | string $method, |
62 | array $arguments, |
63 | array $params = [], |
64 | string $updateKind = null, |
65 | int $rootEventTime = null |
66 | ) { |
67 | return new self( [ |
68 | 'method' => $method, |
69 | 'arguments' => self::serde( $method, $arguments ), |
70 | 'cluster' => $cluster->getName(), |
71 | // This does not directly partition the jobs, it only provides a value |
72 | // to use during partitioning. The job queue must be separately |
73 | // configured to utilize this value. |
74 | 'jobqueue_partition' => self::partitioningKey( $cluster ), |
75 | 'update_group' => $updateGroup, |
76 | CirrusTitleJob::UPDATE_KIND => $updateKind, |
77 | CirrusTitleJob::ROOT_EVENT_TIME => $rootEventTime |
78 | ] + $params ); |
79 | } |
80 | |
81 | /** |
82 | * Generate a cluster specific partitioning key |
83 | * |
84 | * Some job queue implementations, such as cpjobqueue, can partition the |
85 | * execution of jobs based on a parameter of the job. By default we |
86 | * provide one partition per cluster, but allow to configure multiple |
87 | * partitions per cluster if more throughput is necessary. Within a |
88 | * single cluster jobs are distributed randomly. |
89 | * |
90 | * @param ClusterSettings $settings |
91 | * @return string A value suitable for partitioning jobs per-cluster |
92 | */ |
93 | private static function partitioningKey( ClusterSettings $settings ): string { |
94 | $numPartitions = $settings->getElasticaWritePartitionCount(); |
95 | $partition = mt_rand() % $numPartitions; |
96 | return "{$settings->getName()}-{$partition}"; |
97 | } |
98 | |
99 | private static function serde( $method, array $arguments, $serialize = true ) { |
100 | if ( isset( self::$SERDE[$method] ) ) { |
101 | foreach ( self::$SERDE[$method] as $i => $serde ) { |
102 | if ( $serde !== null && array_key_exists( $i, $arguments ) ) { |
103 | $impl = new $serde(); |
104 | if ( $serialize ) { |
105 | $arguments[$i] = $impl->serialize( $arguments[$i] ); |
106 | } else { |
107 | $arguments[$i] = $impl->deserialize( $arguments[$i] ); |
108 | } |
109 | } |
110 | } |
111 | } |
112 | return $arguments; |
113 | } |
114 | |
115 | /** |
116 | * Entry point for jobs received from the job queue. Creating new |
117 | * jobs should be done via self::build. |
118 | * |
119 | * @param array $params |
120 | */ |
121 | public function __construct( array $params ) { |
122 | parent::__construct( $params + [ |
123 | 'createdAt' => time(), |
124 | 'errorCount' => 0, |
125 | 'retryCount' => 0, |
126 | 'cluster' => null, |
127 | CirrusTitleJob::UPDATE_KIND => null, |
128 | CirrusTitleJob::ROOT_EVENT_TIME => null, |
129 | // BC for jobs created pre-1.42 |
130 | 'update_group' => UpdateGroup::PAGE, |
131 | ] ); |
132 | } |
133 | |
134 | /** |
135 | * This job handles all its own retries internally. These jobs are so |
136 | * numerous that if they were to start failing they would possibly |
137 | * overflow the job queue and bring down redis in production. |
138 | * |
139 | * Basically we just can't let these jobs hang out in the abandoned |
140 | * queue for a week like retries typically do. If these jobs get |
141 | * failed they will log to CirrusSearchChangeFailed which is a signal |
142 | * that some point in time around the failure needs to be reindexed |
143 | * manually. See https://wikitech.wikimedia.org/wiki/Search for more |
144 | * details. |
145 | * @return bool |
146 | */ |
147 | public function allowRetries() { |
148 | return false; |
149 | } |
150 | |
151 | /** |
152 | * @return bool |
153 | */ |
154 | protected function doJob() { |
155 | // While we can only have a single connection per job, we still |
156 | // use decideClusters() which includes a variety of safeguards. |
157 | $connections = $this->decideClusters( $this->params['update_group'] ); |
158 | if ( !$connections ) { |
159 | // Chosen cluster no longer exists in configuration. |
160 | return true; |
161 | } |
162 | Assert::precondition( count( $connections ) == 1, |
163 | 'per self::build() we must have a single connection' ); |
164 | |
165 | $conn = reset( $connections ); |
166 | $arguments = self::serde( $this->params['method'], $this->params['arguments'], false ); |
167 | |
168 | LoggerFactory::getInstance( 'CirrusSearch' )->debug( |
169 | "Running {method} on cluster {cluster} {diff}s after insertion", |
170 | [ |
171 | 'method' => $this->params['method'], |
172 | 'arguments' => $arguments, |
173 | 'diff' => time() - $this->params['createdAt'], |
174 | 'cluster' => $conn->getClusterName(), |
175 | ] |
176 | ); |
177 | |
178 | $sender = new DataSender( $conn, $this->searchConfig ); |
179 | try { |
180 | $status = $sender->{$this->params['method']}( ...$arguments ); |
181 | } catch ( \Exception $e ) { |
182 | LoggerFactory::getInstance( 'CirrusSearch' )->warning( |
183 | "Exception thrown while running DataSender::{method} in cluster {cluster}: {errorMessage}", |
184 | [ |
185 | 'method' => $this->params['method'], |
186 | 'cluster' => $conn->getClusterName(), |
187 | 'errorMessage' => $e->getMessage(), |
188 | 'exception' => $e, |
189 | ] |
190 | ); |
191 | $status = Status::newFatal( 'cirrussearch-send-failure' ); |
192 | } |
193 | |
194 | $ok = true; |
195 | if ( !$status->isOK() ) { |
196 | $action = $this->requeueError( $conn ) ? "Requeued" : "Dropped"; |
197 | $this->setLastError( "ElasticaWrite job failed: {$action}" ); |
198 | $ok = false; |
199 | } else { |
200 | $this->reportUpdateLag( $conn->getClusterName() ); |
201 | } |
202 | |
203 | return $ok; |
204 | } |
205 | |
206 | /** |
207 | * Re-queue job that failed, or drop the job if it has failed |
208 | * too many times |
209 | * |
210 | * @param Connection $conn |
211 | * @return bool True when the job has been queued |
212 | */ |
213 | private function requeueError( Connection $conn ) { |
214 | if ( $this->params['errorCount'] >= self::MAX_ERROR_RETRY ) { |
215 | LoggerFactory::getInstance( 'CirrusSearchChangeFailed' )->warning( |
216 | "Dropping failing ElasticaWrite job for DataSender::{method} in cluster {cluster} after repeated failure", |
217 | [ |
218 | 'method' => $this->params['method'], |
219 | 'cluster' => $conn->getClusterName(), |
220 | ] |
221 | ); |
222 | return false; |
223 | } else { |
224 | $delay = $this->backoffDelay( $this->params['retryCount'] ); |
225 | $params = $this->params; |
226 | $params['errorCount']++; |
227 | unset( $params['jobReleaseTimestamp'] ); |
228 | $jobQueue = MediaWikiServices::getInstance()->getJobQueueGroup(); |
229 | $params += self::buildJobDelayOptions( self::class, $delay, $jobQueue ); |
230 | $job = new self( $params ); |
231 | // Individual failures should have already logged specific errors, |
232 | LoggerFactory::getInstance( 'CirrusSearch' )->info( |
233 | "ElasticaWrite job reported failure on cluster {cluster}. Requeueing job with delay of {delay}.", |
234 | [ |
235 | 'cluster' => $conn->getClusterName(), |
236 | 'delay' => $delay |
237 | ] |
238 | ); |
239 | $jobQueue->push( $job ); |
240 | return true; |
241 | } |
242 | } |
243 | |
244 | /** |
245 | * Report the update lag based on stored params if set. |
246 | * @param string $cluster |
247 | * @param StatsdDataFactoryInterface|null $statsdDataFactory |
248 | * @return void |
249 | */ |
250 | public function reportUpdateLag( string $cluster, StatsdDataFactoryInterface $statsdDataFactory = null ): void { |
251 | $params = $this->getParams(); |
252 | $updateKind = $params[CirrusTitleJob::UPDATE_KIND] ?? null; |
253 | $eventTime = $params[CirrusTitleJob::ROOT_EVENT_TIME] ?? null; |
254 | if ( $updateKind !== null && $eventTime !== null ) { |
255 | $now = MWTimestamp::time(); |
256 | $statsdDataFactory ??= MediaWikiServices::getInstance()->getStatsdDataFactory(); |
257 | $statsdDataFactory->timing( "CirrusSearch.$cluster.updates.all.lag.$updateKind", $now - $eventTime ); |
258 | } |
259 | } |
260 | } |