MediaWiki REL1_40
EtcdConfig.php
Go to the documentation of this file.
1<?php
21use Psr\Log\LoggerAwareInterface;
22use Psr\Log\LoggerInterface;
23use Wikimedia\IPUtils;
24use Wikimedia\ObjectFactory\ObjectFactory;
25use Wikimedia\WaitConditionLoop;
26
32class EtcdConfig implements Config, LoggerAwareInterface {
34 private $http;
36 private $srvCache;
38 private $procCache;
40 private $logger;
42 private $dsd;
43
45 private $service;
47 private $host;
49 private $port;
51 private $protocol;
53 private $directory;
55 private $baseCacheTTL;
57 private $skewCacheTTL;
59 private $timeout;
60
74 public function __construct( array $params ) {
75 $params += [
76 'service' => 'etcd',
77 'port' => null,
78 'protocol' => 'http',
79 'cacheTTL' => 10,
80 'skewTTL' => 1,
81 'timeout' => 2
82 ];
83
84 $this->service = $params['service'];
85 $this->host = $params['host'];
86 $this->port = $params['port'];
87 $this->protocol = $params['protocol'];
88 $this->directory = trim( $params['directory'], '/' );
89 $this->skewCacheTTL = $params['skewTTL'];
90 $this->baseCacheTTL = max( $params['cacheTTL'] - $this->skewCacheTTL, 0 );
91 $this->timeout = $params['timeout'];
92
93 // For backwards compatibility, check the host for an embedded port
94 $hostAndPort = IPUtils::splitHostAndPort( $this->host );
95
96 if ( $hostAndPort ) {
97 $this->host = $hostAndPort[0];
98
99 if ( $hostAndPort[1] ) {
100 $this->port = $hostAndPort[1];
101 }
102 }
103
104 // Also for backwards compatibility, check for a host in the format of
105 // an SRV record and use the service specified therein
106 if ( preg_match( '/^_([^\.]+)\._tcp\.(.+)$/', $this->host, $m ) ) {
107 $this->service = $m[1];
108 $this->host = $m[2];
109 }
110
111 if ( !isset( $params['cache'] ) ) {
112 $this->srvCache = new HashBagOStuff();
113 } elseif ( $params['cache'] instanceof BagOStuff ) {
114 $this->srvCache = $params['cache'];
115 } else {
116 $this->srvCache = ObjectFactory::getObjectFromSpec( $params['cache'] );
117 }
118
119 $this->logger = new Psr\Log\NullLogger();
120 $this->http = new MultiHttpClient( [
121 'connTimeout' => $this->timeout,
122 'reqTimeout' => $this->timeout,
123 'logger' => $this->logger
124 ] );
125 $this->dsd = new DnsSrvDiscoverer( $this->service, 'tcp', $this->host );
126 }
127
128 public function setLogger( LoggerInterface $logger ) {
129 $this->logger = $logger;
130 $this->http->setLogger( $logger );
131 }
132
133 public function has( $name ) {
134 $this->load();
135
136 return array_key_exists( $name, $this->procCache['config'] );
137 }
138
139 public function get( $name ) {
140 $this->load();
141
142 if ( !array_key_exists( $name, $this->procCache['config'] ) ) {
143 throw new ConfigException( "No entry found for '$name'." );
144 }
145
146 return $this->procCache['config'][$name];
147 }
148
149 public function getModifiedIndex() {
150 $this->load();
151 return $this->procCache['modifiedIndex'];
152 }
153
157 private function load() {
158 if ( $this->procCache !== null ) {
159 return; // already loaded
160 }
161
162 $now = microtime( true );
163 $key = $this->srvCache->makeGlobalKey(
164 __CLASS__,
165 $this->host,
166 $this->directory
167 );
168
169 // Get the cached value or block until it is regenerated (by this or another thread)...
170 $data = null; // latest config info
171 $error = null; // last error message
172 $loop = new WaitConditionLoop(
173 function () use ( $key, $now, &$data, &$error ) {
174 // Check if the values are in cache yet...
175 $data = $this->srvCache->get( $key );
176 if ( is_array( $data ) && $data['expires'] > $now ) {
177 $this->logger->debug( "Found up-to-date etcd configuration cache." );
178
179 return WaitConditionLoop::CONDITION_REACHED;
180 }
181
182 // Cache is either empty or stale;
183 // refresh the cache from etcd, using a mutex to reduce stampedes...
184 if ( $this->srvCache->lock( $key, 0, $this->baseCacheTTL ) ) {
185 try {
186 $etcdResponse = $this->fetchAllFromEtcd();
187 $error = $etcdResponse['error'];
188 if ( is_array( $etcdResponse['config'] ) ) {
189 // Avoid having all servers expire cache keys at the same time
190 $expiry = microtime( true ) + $this->baseCacheTTL;
191 // @phan-suppress-next-line PhanTypeMismatchArgumentInternal
192 $expiry += mt_rand( 0, 1e6 ) / 1e6 * $this->skewCacheTTL;
193 $data = [
194 'config' => $etcdResponse['config'],
195 'expires' => $expiry,
196 'modifiedIndex' => $etcdResponse['modifiedIndex']
197 ];
198 $this->srvCache->set( $key, $data, BagOStuff::TTL_INDEFINITE );
199
200 $this->logger->info( "Refreshed stale etcd configuration cache." );
201
202 return WaitConditionLoop::CONDITION_REACHED;
203 } else {
204 $this->logger->error( "Failed to fetch configuration: $error" );
205 if ( !$etcdResponse['retry'] ) {
206 // Fail fast since the error is likely to keep happening
207 return WaitConditionLoop::CONDITION_FAILED;
208 }
209 }
210 } finally {
211 $this->srvCache->unlock( $key ); // release mutex
212 }
213 }
214
215 if ( is_array( $data ) ) {
216 $this->logger->info( "Using stale etcd configuration cache." );
217
218 return WaitConditionLoop::CONDITION_REACHED;
219 }
220
221 return WaitConditionLoop::CONDITION_CONTINUE;
222 },
223 $this->timeout
224 );
225
226 if ( $loop->invoke() !== WaitConditionLoop::CONDITION_REACHED ) {
227 // No cached value exists and etcd query failed; throw an error
228 // @phan-suppress-next-line PhanTypeSuspiciousStringExpression WaitConditionLoop throws or error set
229 throw new ConfigException( "Failed to load configuration from etcd: $error" );
230 }
231
232 // @phan-suppress-next-line PhanTypeMismatchProperty WaitConditionLoop throws ore data set
233 $this->procCache = $data;
234 }
235
239 public function fetchAllFromEtcd() {
240 $servers = $this->dsd->getServers() ?: [ [ $this->host, $this->port ] ];
241
242 foreach ( $servers as $server ) {
243 [ $host, $port ] = $server;
244
245 // Try to load the config from this particular server
246 $response = $this->fetchAllFromEtcdServer( $host, $port );
247 if ( is_array( $response['config'] ) || $response['retry'] ) {
248 break;
249 }
250 }
251
252 return $response;
253 }
254
260 protected function fetchAllFromEtcdServer( string $address, ?int $port = null ) {
261 $host = $address;
262
263 if ( $port !== null ) {
264 $host = IPUtils::combineHostAndPort( $address, $port );
265 }
266
267 // Retrieve all the values under the MediaWiki config directory
268 [ $rcode, $rdesc, /* $rhdrs */, $rbody, $rerr ] = $this->http->run( [
269 'method' => 'GET',
270 'url' => "{$this->protocol}://{$host}/v2/keys/{$this->directory}/?recursive=true",
271 'headers' => [
272 'content-type' => 'application/json',
273 ]
274 ] );
275
276 $response = [ 'config' => null, 'error' => null, 'retry' => false, 'modifiedIndex' => 0 ];
277
278 static $terminalCodes = [ 404 => true ];
279 if ( $rcode < 200 || $rcode > 399 ) {
280 $response['error'] = strlen( $rerr ?? '' ) ? $rerr : "HTTP $rcode ($rdesc)";
281 $response['retry'] = empty( $terminalCodes[$rcode] );
282 return $response;
283 }
284
285 try {
286 $parsedResponse = $this->parseResponse( $rbody );
287 } catch ( EtcdConfigParseError $e ) {
288 $parsedResponse = [ 'error' => $e->getMessage() ];
289 }
290 return array_merge( $response, $parsedResponse );
291 }
292
299 protected function parseResponse( $rbody ) {
300 $info = json_decode( $rbody, true );
301 if ( $info === null ) {
302 throw new EtcdConfigParseError( "Error unserializing JSON response." );
303 }
304 if ( !isset( $info['node'] ) || !is_array( $info['node'] ) ) {
305 throw new EtcdConfigParseError(
306 "Unexpected JSON response: Missing or invalid node at top level." );
307 }
308 $config = [];
309 $lastModifiedIndex = $this->parseDirectory( '', $info['node'], $config );
310 return [ 'modifiedIndex' => $lastModifiedIndex, 'config' => $config ];
311 }
312
322 protected function parseDirectory( $dirName, $dirNode, &$config ) {
323 $lastModifiedIndex = 0;
324 if ( !isset( $dirNode['nodes'] ) ) {
325 throw new EtcdConfigParseError(
326 "Unexpected JSON response in dir '$dirName'; missing 'nodes' list." );
327 }
328 if ( !is_array( $dirNode['nodes'] ) ) {
329 throw new EtcdConfigParseError(
330 "Unexpected JSON response in dir '$dirName'; 'nodes' is not an array." );
331 }
332
333 foreach ( $dirNode['nodes'] as $node ) {
334 '@phan-var array $node';
335 $baseName = basename( $node['key'] );
336 $fullName = $dirName === '' ? $baseName : "$dirName/$baseName";
337 if ( !empty( $node['dir'] ) ) {
338 $lastModifiedIndex = max(
339 $this->parseDirectory( $fullName, $node, $config ),
340 $lastModifiedIndex );
341 } else {
342 $value = $this->unserialize( $node['value'] );
343 if ( !is_array( $value ) || !array_key_exists( 'val', $value ) ) {
344 throw new EtcdConfigParseError( "Failed to parse value for '$fullName'." );
345 }
346 $lastModifiedIndex = max( $node['modifiedIndex'], $lastModifiedIndex );
347 $config[$fullName] = $value['val'];
348 }
349 }
350 return $lastModifiedIndex;
351 }
352
357 private function unserialize( $string ) {
358 return json_decode( $string, true );
359 }
360}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:85
Exceptions for config failures.
Interface for configuration instances.
fetchAllFromEtcdServer(string $address, ?int $port=null)
parseDirectory( $dirName, $dirNode, &$config)
Recursively parse a directory node and populate the array passed by reference, throwing EtcdConfigPar...
setLogger(LoggerInterface $logger)
__construct(array $params)
has( $name)
Check whether a configuration option is set for the given name.
parseResponse( $rbody)
Parse a response body, throwing EtcdConfigParseError if there is a validation error.
Simple store for keeping values in an associative array for the current process.
Class to handle multiple HTTP requests.
Interface for configuration instances.
Definition Config.php:30
return true
Definition router.php:92