Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
72.04% |
67 / 93 |
|
25.00% |
2 / 8 |
CRAP | |
0.00% |
0 / 1 |
ReindexTask | |
72.04% |
67 / 93 |
|
25.00% |
2 / 8 |
34.56 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
1 | |||
getId | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
isComplete | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
cancel | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
delete | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getResponse | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getStatus | |
66.67% |
14 / 21 |
|
0.00% |
0 / 1 |
8.81 | |||
mergeStatusWithChildren | |
85.96% |
49 / 57 |
|
0.00% |
0 / 1 |
8.18 |
1 | <?php |
2 | |
3 | namespace CirrusSearch\Elastica; |
4 | |
5 | use Elastica\Client; |
6 | use Elastica\Exception\ResponseException; |
7 | use Elastica\Exception\RuntimeException; |
8 | use Elastica\Request; |
9 | use LogicException; |
10 | use MediaWiki\Logger\LoggerFactory; |
11 | |
12 | class ReindexTask { |
13 | /** @var Client */ |
14 | private $client; |
15 | /** @var string */ |
16 | private $taskId; |
17 | /** @var ReindexResponse|null */ |
18 | private $response; |
19 | /** @var \Psr\Log\LoggerInterface */ |
20 | private $log; |
21 | |
22 | /** |
23 | * @param Client $client |
24 | * @param string $taskId |
25 | */ |
26 | public function __construct( Client $client, $taskId ) { |
27 | $this->client = $client; |
28 | $this->taskId = $taskId; |
29 | $this->log = LoggerFactory::getInstance( 'CirrusSearch' ); |
30 | } |
31 | |
32 | /** |
33 | * @return string |
34 | */ |
35 | public function getId() { |
36 | return $this->taskId; |
37 | } |
38 | |
39 | /** |
40 | * @param bool $check When true queries the remote |
41 | * to see if the task is complete. Otherwise reports |
42 | * last requested status. |
43 | * @return bool True if the reindex task is complete |
44 | */ |
45 | public function isComplete( $check = false ) { |
46 | return $this->response !== null; |
47 | } |
48 | |
49 | /** |
50 | * Cancel the in-progress reindex task. |
51 | * @return bool True if cancel was succesful |
52 | */ |
53 | public function cancel() { |
54 | if ( $this->response ) { |
55 | throw new LogicException( 'Cannot cancel completed task' ); |
56 | } |
57 | |
58 | $response = $this->client->request( "_tasks/{$this->taskId}/_cancel", Request::POST ); |
59 | |
60 | return $response->isOK(); |
61 | } |
62 | |
63 | /** |
64 | * Delete the task |
65 | * @return bool True if delete was successful, false otherwise. |
66 | * Throws Elastica NotFoundException for unknown task (already |
67 | * deleted?) or HttpException for communication failures. |
68 | */ |
69 | public function delete() { |
70 | if ( !$this->response ) { |
71 | throw new LogicException( 'Cannot delete in-progress task' ); |
72 | } |
73 | $response = |
74 | $this->client->getIndex( '.tasks' )->deleteById( $this->taskId ); |
75 | |
76 | return $response->isOK(); |
77 | } |
78 | |
79 | /** |
80 | * Get the final result of the reindexing task. |
81 | * |
82 | * @return ReindexResponse|null The result of the reindex, or null |
83 | * if the reindex is still running. self::getStatus must be used |
84 | * to update the task completion status. |
85 | */ |
86 | public function getResponse() { |
87 | return $this->response; |
88 | } |
89 | |
90 | /** |
91 | * @return ReindexStatus|null The status of the reindex, or null |
92 | * on failure. Transport may also throw exceptions for network |
93 | * failures. |
94 | */ |
95 | public function getStatus() { |
96 | if ( $this->response ) { |
97 | // task complete |
98 | return $this->response; |
99 | } |
100 | |
101 | $response = $this->client->request( "_tasks/{$this->taskId}", Request::GET ); |
102 | if ( !$response->isOK() ) { |
103 | $lastRequest = $this->client->getLastRequest(); |
104 | if ( $lastRequest !== null ) { |
105 | throw new ResponseException( $lastRequest, $response ); |
106 | } else { |
107 | throw new RuntimeException( "Client::getLastRequest() should not be null" ); |
108 | } |
109 | } |
110 | $data = $response->getData(); |
111 | $status = $data['task']['status']; |
112 | |
113 | if ( isset( $data['response'] ) ) { |
114 | // task complete |
115 | $this->response = new ReindexResponse( $data['response'] ); |
116 | |
117 | return $this->response; |
118 | } |
119 | |
120 | /** |
121 | * the task.status.slices array contains null for each incomplete child |
122 | * task. This fetches the children and merges their status in. |
123 | */ |
124 | if ( isset( $data['task']['status']['slices'] ) ) { |
125 | $childResponse = $this->client->request( "_tasks", Request::GET, [], [ |
126 | 'parent_task_id' => $this->taskId, |
127 | 'detailed' => 'true', |
128 | ] ); |
129 | if ( $childResponse->isOK() ) { |
130 | $status = $this->mergeStatusWithChildren( $status, $childResponse->getData() ); |
131 | } |
132 | } |
133 | |
134 | return new ReindexStatus( $status ); |
135 | } |
136 | |
137 | private function mergeStatusWithChildren( array $status, array $childResponse ) { |
138 | foreach ( $childResponse['nodes'] as $nodeData ) { |
139 | foreach ( $nodeData['tasks'] as $taskId => $childData ) { |
140 | $sliceId = $childData['status']['slice_id']; |
141 | $status['slices'][$sliceId] = $childData['status']; |
142 | } |
143 | } |
144 | |
145 | // Below mimics org.elasticsearch.action.bulk.byscroll.BulkByScrollTask.Status::Status |
146 | // except that class doesn't have data about in-progress task's. |
147 | $status['total'] = 0; |
148 | $status['updated'] = 0; |
149 | $status['created'] = 0; |
150 | $status['deleted'] = 0; |
151 | $status['batches'] = 0; |
152 | $status['version_conflicts'] = 0; |
153 | $status['noops'] = 0; |
154 | $status['bulkRetries'] = 0; |
155 | $status['searchRetries'] = 0; |
156 | $status['throttled_millis'] = 0; |
157 | $status['requests_per_second'] = 0; |
158 | $status['throttled_until_millis'] = PHP_INT_MAX; |
159 | $sliceFields = [ |
160 | 'total', |
161 | 'updated', |
162 | 'created', |
163 | 'deleted', |
164 | 'batches', |
165 | 'version_conflicts', |
166 | 'noops', |
167 | 'retries', |
168 | 'throttled_millis', |
169 | 'requests_per_second', |
170 | 'throttled_until_millis', |
171 | ]; |
172 | foreach ( $status['slices'] as $slice ) { |
173 | if ( $slice === null ) { |
174 | // slice has failed catastrophically |
175 | continue; |
176 | } |
177 | $missing_status_fields = array_diff_key( array_flip( $sliceFields ), $slice ); |
178 | if ( $missing_status_fields !== [] ) { |
179 | // slice has missing key fields |
180 | $slice_to_json = json_encode( $slice ); |
181 | $this->log->warning( 'Missing key field(s) for reindex task status', [ |
182 | 'cirrus_reindex_task_slice' => $slice_to_json, |
183 | 'exact_missing_fields' => $missing_status_fields, |
184 | ] ); |
185 | continue; |
186 | } |
187 | $status['total'] += $slice['total']; |
188 | $status['updated'] += $slice['updated']; |
189 | $status['created'] += $slice['created']; |
190 | $status['deleted'] += $slice['deleted']; |
191 | $status['batches'] += $slice['batches']; |
192 | $status['version_conflicts'] += $slice['version_conflicts']; |
193 | $status['noops'] += $slice['noops']; |
194 | $status['retries']['bulk'] += $slice['retries']['bulk']; |
195 | $status['retries']['search'] += $slice['retries']['search']; |
196 | $status['throttled_millis'] += $slice['throttled_millis']; |
197 | $status['requests_per_second'] += $slice['requests_per_second'] === -1 ? INF |
198 | : $slice['requests_per_second']; |
199 | $status['throttled_until_millis'] += min( $status['throttled_until_millis'], |
200 | $slice['throttled_until_millis'] ); |
201 | } |
202 | |
203 | if ( $status['requests_per_second'] === INF ) { |
204 | $status['requests_per_second'] = -1; |
205 | } |
206 | |
207 | return $status; |
208 | } |
209 | } |