Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
11.46% |
11 / 96 |
|
11.11% |
1 / 9 |
CRAP | |
0.00% |
0 / 1 |
MWElasticUtils | |
11.58% |
11 / 95 |
|
11.11% |
1 / 9 |
457.06 | |
0.00% |
0 / 1 |
withRetry | |
90.91% |
10 / 11 |
|
0.00% |
0 / 1 |
5.02 | |||
backoffDelay | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
getIndexHealth | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
waitForGreen | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
20 | |||
deleteByQuery | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
increasingDelay | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
deleteByQueryWithStatus | |
0.00% |
0 / 52 |
|
0.00% |
0 / 1 |
42 | |||
fetchClusterName | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
isFrozen | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace MediaWiki\Extension\Elastica; |
4 | |
5 | use Elastica\Client; |
6 | use Elastica\Index; |
7 | use Elastica\Query; |
8 | use Elastica\Task; |
9 | use Elasticsearch\Endpoints\Cluster\Health; |
10 | use Exception; |
11 | use FormatJson; |
12 | use Generator; |
13 | use MediaWiki\Logger\LoggerFactory; |
14 | use MWTimestamp; |
15 | |
16 | /** |
17 | * This program is free software; you can redistribute it and/or modify |
18 | * it under the terms of the GNU General Public License as published by |
19 | * the Free Software Foundation; either version 2 of the License, or |
20 | * (at your option) any later version. |
21 | * |
22 | * This program is distributed in the hope that it will be useful, |
23 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
24 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
25 | * GNU General Public License for more details. |
26 | * |
27 | * You should have received a copy of the GNU General Public License along |
28 | * with this program; if not, write to the Free Software Foundation, Inc., |
29 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
30 | * |
31 | */ |
32 | |
33 | /** |
34 | * Utility class |
35 | */ |
36 | class MWElasticUtils { |
37 | |
38 | private const ONE_SEC_IN_MICROSEC = 1000000; |
39 | |
40 | /** |
41 | * A function that retries callback $func if it throws an exception. |
42 | * The $beforeRetry is called before a retry and receives the underlying |
43 | * ExceptionInterface object and the number of failed attempts. |
44 | * It's generally used to log and sleep between retries. Default behaviour |
45 | * is to sleep with a random backoff. |
46 | * @see Util::backoffDelay |
47 | * |
48 | * @param int $attempts the number of times we retry |
49 | * @param callable $func |
50 | * @param callable|null $beforeRetry function called before each retry |
51 | * @return mixed |
52 | */ |
53 | public static function withRetry( $attempts, $func, $beforeRetry = null ) { |
54 | $errors = 0; |
55 | while ( true ) { |
56 | if ( $errors < $attempts ) { |
57 | try { |
58 | return $func(); |
59 | } catch ( Exception $e ) { |
60 | $errors++; |
61 | if ( $beforeRetry ) { |
62 | $beforeRetry( $e, $errors ); |
63 | } else { |
64 | $seconds = static::backoffDelay( $errors ); |
65 | usleep( $seconds * self::ONE_SEC_IN_MICROSEC ); |
66 | } |
67 | } |
68 | } else { |
69 | return $func(); |
70 | } |
71 | } |
72 | } |
73 | |
74 | /** |
75 | * Backoff with lowest possible upper bound as 16 seconds. |
76 | * With the default maximum number of errors (5) this maxes out at 256 seconds. |
77 | * |
78 | * @param int $errorCount |
79 | * @return int |
80 | */ |
81 | public static function backoffDelay( $errorCount ) { |
82 | return rand( 1, (int)pow( 2, 3 + $errorCount ) ); |
83 | } |
84 | |
85 | /** |
86 | * Get index health |
87 | * |
88 | * @param Client $client |
89 | * @param string $indexName |
90 | * @return array the index health status |
91 | */ |
92 | public static function getIndexHealth( Client $client, $indexName ) { |
93 | $endpoint = new Health; |
94 | $endpoint->setIndex( $indexName ); |
95 | $response = $client->requestEndpoint( $endpoint ); |
96 | if ( $response->hasError() ) { |
97 | throw new \Exception( "Error while fetching index health status: " . $response->getError() ); |
98 | } |
99 | return $response->getData(); |
100 | } |
101 | |
102 | /** |
103 | * Wait for the index to go green |
104 | * |
105 | * @param Client $client |
106 | * @param string $indexName Name of index to wait for |
107 | * @param int $timeout In seconds |
108 | * @return Generator|string[]|bool Returns a generator. Generator yields |
109 | * string status messages. Generator return value is true if the index is |
110 | * green false otherwise. |
111 | */ |
112 | public static function waitForGreen( Client $client, $indexName, $timeout ) { |
113 | $startTime = time(); |
114 | while ( ( $startTime + $timeout ) > time() ) { |
115 | try { |
116 | $response = self::getIndexHealth( $client, $indexName ); |
117 | $status = $response['status'] ?? 'unknown'; |
118 | if ( $status === 'green' ) { |
119 | yield "\tGreen!"; |
120 | return true; |
121 | } |
122 | yield "\tIndex is $status retrying..."; |
123 | sleep( 5 ); |
124 | } catch ( \Exception $e ) { |
125 | yield "Error while waiting for green ({$e->getMessage()}), retrying..."; |
126 | } |
127 | } |
128 | return false; |
129 | } |
130 | |
131 | /** |
132 | * Delete docs by query and wait for it to complete via tasks api. |
133 | * |
134 | * @param Index $index the source index |
135 | * @param Query $query the query |
136 | * @param bool $allowConflicts When true documents updated since starting |
137 | * the query will not be deleted, and will not fail the delete-by-query. When |
138 | * false (default) the updated document will not be deleted and the delete-by-query |
139 | * will abort. Deletes are not transactional, some subset of matching documents |
140 | * will have been deleted. |
141 | * @param int $reportEveryNumSec Log task status on this interval of seconds |
142 | * @return Task Generator returns the Task instance on completion. |
143 | * @throws Exception when task reports failures |
144 | */ |
145 | public static function deleteByQuery( |
146 | Index $index, |
147 | Query $query, |
148 | $allowConflicts = false, |
149 | $reportEveryNumSec = 300 |
150 | ) { |
151 | $gen = self::deleteByQueryWithStatus( $index, $query, $allowConflicts, $reportEveryNumSec ); |
152 | // @phan-suppress-next-line PhanTypeNoAccessiblePropertiesForeach always a generator object |
153 | foreach ( $gen as $status ) { |
154 | // We don't need these status updates. But we need to iterate |
155 | // the generator until it is done. |
156 | } |
157 | return $gen->getReturn(); |
158 | } |
159 | |
160 | /** |
161 | * @param float $minDelay Starting value of generator |
162 | * @param float $maxDelay Maximum value to return |
163 | * @param float $increaseByRatio Increase by this ratio on each iteration, up to $maxDelay |
164 | * @return Generator|float[] Returns a generator. Generator yields floats between |
165 | * $minDelay and $maxDelay |
166 | * @suppress PhanInfiniteLoop |
167 | */ |
168 | private static function increasingDelay( $minDelay, $maxDelay, $increaseByRatio = 1.5 ) { |
169 | $delay = $minDelay; |
170 | while ( true ) { |
171 | yield $delay; |
172 | $delay = min( $delay * $increaseByRatio, $maxDelay ); |
173 | } |
174 | } |
175 | |
176 | /** |
177 | * Delete docs by query and wait for it to complete via tasks api. This |
178 | * method returns a generator which must be iterated on at least once |
179 | * or the deletion will not occur. |
180 | * |
181 | * Client code that doesn't care about the result or when the deleteByQuery |
182 | * completes are safe to call next( $gen ) a single time to start the deletion, |
183 | * and then throw away the generator. Note that logging about how long the task |
184 | * has been running will not be logged if the generator is not iterated. |
185 | * |
186 | * @param Index $index the source index |
187 | * @param Query $query the query |
188 | * @param bool $allowConflicts When true documents updated since starting |
189 | * the query will not be deleted, and will not fail the delete-by-query. When |
190 | * false (default) the updated document will not be deleted and the delete-by-query |
191 | * will abort. Deletes are not transactional, some subset of matching documents |
192 | * will have been deleted. |
193 | * @param int $reportEveryNumSec Log task status on this interval of seconds |
194 | * @return \Generator|array[]|\Elastica\Task Returns a generator. Generator yields |
195 | * arrays containing task status responses. Generator returns the Task instance |
196 | * on completion via Generator::getReturn. |
197 | * @throws Exception when task reports failures |
198 | */ |
199 | public static function deleteByQueryWithStatus( |
200 | Index $index, |
201 | Query $query, |
202 | $allowConflicts = false, |
203 | $reportEveryNumSec = 300 |
204 | ) { |
205 | $params = [ |
206 | 'wait_for_completion' => 'false', |
207 | 'scroll' => '15m', |
208 | ]; |
209 | if ( $allowConflicts ) { |
210 | $params['conflicts'] = 'proceed'; |
211 | } |
212 | $response = $index->deleteByQuery( $query, $params )->getData(); |
213 | if ( !isset( $response['task'] ) ) { |
214 | throw new \Exception( 'No task returned: ' . var_export( $response, true ) ); |
215 | } |
216 | $log = LoggerFactory::getInstance( 'Elastica' ); |
217 | $clusterName = self::fetchClusterName( $index->getClient() ); |
218 | $logContext = [ |
219 | 'index' => $index->getName(), |
220 | 'cluster' => $clusterName, |
221 | 'taskId' => $response['task'], |
222 | ]; |
223 | $logPrefix = 'deleteByQuery against [{index}] on cluster [{cluster}] with task id [{taskId}]'; |
224 | $log->info( "$logPrefix starting", $logContext + [ |
225 | 'elastic_query' => FormatJson::encode( $query->toArray() ) |
226 | ] ); |
227 | |
228 | // Log tasks running longer than 10 minutes to help track down job runner |
229 | // timeouts that occur after 20 minutes. T219234 |
230 | $start = MWTimestamp::time(); |
231 | $reportAfter = $start + $reportEveryNumSec; |
232 | $task = new \Elastica\Task( |
233 | $index->getClient(), |
234 | $response['task'] ); |
235 | $delay = self::increasingDelay( 0.05, 5 ); |
236 | while ( !$task->isCompleted() ) { |
237 | $now = MWTimestamp::time(); |
238 | if ( $now >= $reportAfter ) { |
239 | $reportAfter = $now + $reportEveryNumSec; |
240 | $log->warning( "$logPrefix still running after [{runtime}] seconds", $logContext + [ |
241 | 'runtime' => $now - $start, |
242 | // json encode to ensure we don't add a bunch of properties in |
243 | // logstash, we only really need the content and this will still be |
244 | // searchable. |
245 | 'status' => FormatJson::encode( $task->getData() ), |
246 | ] ); |
247 | } |
248 | yield $task->getData(); |
249 | $delay->next(); |
250 | usleep( $delay->current() * self::ONE_SEC_IN_MICROSEC ); |
251 | $task->refresh(); |
252 | } |
253 | |
254 | $now = MWTimestamp::time(); |
255 | $taskCompleteResponse = $task->getData()['response']; |
256 | if ( $taskCompleteResponse['failures'] ) { |
257 | $log->error( "$logPrefix failed", $logContext + [ |
258 | 'runtime' => $now - $start, |
259 | 'status' => FormatJson::encode( $task->getData() ), |
260 | ] ); |
261 | throw new \Exception( 'Failed deleteByQuery: ' |
262 | . implode( ', ', $taskCompleteResponse['failures'] ) ); |
263 | } |
264 | |
265 | $log->info( "$logPrefix completed", $logContext + [ |
266 | 'runtime' => $now - $start, |
267 | 'status' => FormatJson::encode( $task->getData() ), |
268 | ] ); |
269 | |
270 | return $task; |
271 | } |
272 | |
273 | /** |
274 | * Fetch the name of the cluster client is communicating with. |
275 | * |
276 | * @param Client $client Elasticsearch client to fetch name for |
277 | * @return string Name of cluster $client is communicating with |
278 | */ |
279 | public static function fetchClusterName( Client $client ) { |
280 | $response = $client->requestEndpoint( new \Elasticsearch\Endpoints\Info ); |
281 | if ( $response->getStatus() !== 200 ) { |
282 | throw new Exception( |
283 | "Failed requesting cluster name, got status code [{$response->getStatus()}]" ); |
284 | } |
285 | return $response->getData()['cluster_name']; |
286 | } |
287 | |
288 | /** |
289 | * @param Client $client |
290 | * @return bool True when no writes should be sent via $client |
291 | * @deprecated (always returns false) |
292 | */ |
293 | public static function isFrozen( Client $client ) { |
294 | return false; |
295 | } |
296 | } |
297 | |
298 | class_alias( MWElasticUtils::class, 'MWElasticUtils' ); |