MediaWiki  master
EtcdConfig.php
Go to the documentation of this file.
1 <?php
21 use Psr\Log\LoggerAwareInterface;
22 use Psr\Log\LoggerInterface;
23 use Wikimedia\IPUtils;
24 use Wikimedia\ObjectFactory\ObjectFactory;
25 use Wikimedia\WaitConditionLoop;
26 
32 class EtcdConfig implements Config, LoggerAwareInterface {
34  private $http;
36  private $srvCache;
38  private $procCache;
40  private $logger;
42  private $dsd;
43 
45  private $service;
47  private $host;
49  private $port;
51  private $protocol;
53  private $directory;
55  private $baseCacheTTL;
57  private $skewCacheTTL;
59  private $timeout;
60 
74  public function __construct( array $params ) {
75  $params += [
76  'service' => 'etcd',
77  'port' => null,
78  'protocol' => 'http',
79  'cacheTTL' => 10,
80  'skewTTL' => 1,
81  'timeout' => 2
82  ];
83 
84  $this->service = $params['service'];
85  $this->host = $params['host'];
86  $this->port = $params['port'];
87  $this->protocol = $params['protocol'];
88  $this->directory = trim( $params['directory'], '/' );
89  $this->skewCacheTTL = $params['skewTTL'];
90  $this->baseCacheTTL = max( $params['cacheTTL'] - $this->skewCacheTTL, 0 );
91  $this->timeout = $params['timeout'];
92 
93  // For backwards compatibility, check the host for an embedded port
94  $hostAndPort = IPUtils::splitHostAndPort( $this->host );
95 
96  if ( $hostAndPort ) {
97  $this->host = $hostAndPort[0];
98 
99  if ( $hostAndPort[1] ) {
100  $this->port = $hostAndPort[1];
101  }
102  }
103 
104  // Also for backwards compatibility, check for a host in the format of
105  // an SRV record and use the service specified therein
106  if ( preg_match( '/^_([^\.]+)\._tcp\.(.+)$/', $this->host, $m ) ) {
107  $this->service = $m[1];
108  $this->host = $m[2];
109  }
110 
111  if ( !isset( $params['cache'] ) ) {
112  $this->srvCache = new HashBagOStuff();
113  } elseif ( $params['cache'] instanceof BagOStuff ) {
114  $this->srvCache = $params['cache'];
115  } else {
116  $this->srvCache = ObjectFactory::getObjectFromSpec( $params['cache'] );
117  }
118 
119  $this->logger = new Psr\Log\NullLogger();
120  $this->http = new MultiHttpClient( [
121  'connTimeout' => $this->timeout,
122  'reqTimeout' => $this->timeout,
123  'logger' => $this->logger
124  ] );
125  $this->dsd = new DnsSrvDiscoverer( $this->service, 'tcp', $this->host );
126  }
127 
128  public function setLogger( LoggerInterface $logger ) {
129  $this->logger = $logger;
130  $this->http->setLogger( $logger );
131  }
132 
133  public function has( $name ) {
134  $this->load();
135 
136  return array_key_exists( $name, $this->procCache['config'] );
137  }
138 
139  public function get( $name ) {
140  $this->load();
141 
142  if ( !array_key_exists( $name, $this->procCache['config'] ) ) {
143  throw new ConfigException( "No entry found for '$name'." );
144  }
145 
146  return $this->procCache['config'][$name];
147  }
148 
149  public function getModifiedIndex() {
150  $this->load();
151  return $this->procCache['modifiedIndex'];
152  }
153 
157  private function load() {
158  if ( $this->procCache !== null ) {
159  return; // already loaded
160  }
161 
162  $now = microtime( true );
163  $key = $this->srvCache->makeGlobalKey(
164  __CLASS__,
165  $this->host,
166  $this->directory
167  );
168 
169  // Get the cached value or block until it is regenerated (by this or another thread)...
170  $data = null; // latest config info
171  $error = null; // last error message
172  $loop = new WaitConditionLoop(
173  function () use ( $key, $now, &$data, &$error ) {
174  // Check if the values are in cache yet...
175  $data = $this->srvCache->get( $key );
176  if ( is_array( $data ) && $data['expires'] > $now ) {
177  $this->logger->debug( "Found up-to-date etcd configuration cache." );
178 
179  return WaitConditionLoop::CONDITION_REACHED;
180  }
181 
182  // Cache is either empty or stale;
183  // refresh the cache from etcd, using a mutex to reduce stampedes...
184  if ( $this->srvCache->lock( $key, 0, $this->baseCacheTTL ) ) {
185  try {
186  $etcdResponse = $this->fetchAllFromEtcd();
187  $error = $etcdResponse['error'];
188  if ( is_array( $etcdResponse['config'] ) ) {
189  // Avoid having all servers expire cache keys at the same time
190  $expiry = microtime( true ) + $this->baseCacheTTL;
191  // @phan-suppress-next-line PhanTypeMismatchArgumentInternal
192  $expiry += mt_rand( 0, 1e6 ) / 1e6 * $this->skewCacheTTL;
193  $data = [
194  'config' => $etcdResponse['config'],
195  'expires' => $expiry,
196  'modifiedIndex' => $etcdResponse['modifiedIndex']
197  ];
198  $this->srvCache->set( $key, $data, BagOStuff::TTL_INDEFINITE );
199 
200  $this->logger->info( "Refreshed stale etcd configuration cache." );
201 
202  return WaitConditionLoop::CONDITION_REACHED;
203  } else {
204  $this->logger->error( "Failed to fetch configuration: $error" );
205  if ( !$etcdResponse['retry'] ) {
206  // Fail fast since the error is likely to keep happening
207  return WaitConditionLoop::CONDITION_FAILED;
208  }
209  }
210  } finally {
211  $this->srvCache->unlock( $key ); // release mutex
212  }
213  }
214 
215  if ( is_array( $data ) ) {
216  $this->logger->info( "Using stale etcd configuration cache." );
217 
218  return WaitConditionLoop::CONDITION_REACHED;
219  }
220 
221  return WaitConditionLoop::CONDITION_CONTINUE;
222  },
223  $this->timeout
224  );
225 
226  if ( $loop->invoke() !== WaitConditionLoop::CONDITION_REACHED ) {
227  // No cached value exists and etcd query failed; throw an error
228  // @phan-suppress-next-line PhanTypeSuspiciousStringExpression WaitConditionLoop throws or error set
229  throw new ConfigException( "Failed to load configuration from etcd: $error" );
230  }
231 
232  // @phan-suppress-next-line PhanTypeMismatchProperty WaitConditionLoop throws ore data set
233  $this->procCache = $data;
234  }
235 
239  public function fetchAllFromEtcd() {
240  $servers = $this->dsd->getServers() ?: [ [ $this->host, $this->port ] ];
241 
242  foreach ( $servers as $server ) {
243  [ $host, $port ] = $server;
244 
245  // Try to load the config from this particular server
246  $response = $this->fetchAllFromEtcdServer( $host, $port );
247  if ( is_array( $response['config'] ) || $response['retry'] ) {
248  break;
249  }
250  }
251 
252  return $response;
253  }
254 
260  protected function fetchAllFromEtcdServer( string $address, ?int $port = null ) {
261  $host = $address;
262 
263  if ( $port !== null ) {
264  $host = IPUtils::combineHostAndPort( $address, $port );
265  }
266 
267  // Retrieve all the values under the MediaWiki config directory
268  [ $rcode, $rdesc, /* $rhdrs */, $rbody, $rerr ] = $this->http->run( [
269  'method' => 'GET',
270  'url' => "{$this->protocol}://{$host}/v2/keys/{$this->directory}/?recursive=true",
271  'headers' => [
272  'content-type' => 'application/json',
273  ]
274  ] );
275 
276  $response = [ 'config' => null, 'error' => null, 'retry' => false, 'modifiedIndex' => 0 ];
277 
278  static $terminalCodes = [ 404 => true ];
279  if ( $rcode < 200 || $rcode > 399 ) {
280  $response['error'] = strlen( $rerr ?? '' ) ? $rerr : "HTTP $rcode ($rdesc)";
281  $response['retry'] = empty( $terminalCodes[$rcode] );
282  return $response;
283  }
284 
285  try {
286  $parsedResponse = $this->parseResponse( $rbody );
287  } catch ( EtcdConfigParseError $e ) {
288  $parsedResponse = [ 'error' => $e->getMessage() ];
289  }
290  return array_merge( $response, $parsedResponse );
291  }
292 
299  protected function parseResponse( $rbody ) {
300  $info = json_decode( $rbody, true );
301  if ( $info === null ) {
302  throw new EtcdConfigParseError( "Error unserializing JSON response." );
303  }
304  if ( !isset( $info['node'] ) || !is_array( $info['node'] ) ) {
305  throw new EtcdConfigParseError(
306  "Unexpected JSON response: Missing or invalid node at top level." );
307  }
308  $config = [];
309  $lastModifiedIndex = $this->parseDirectory( '', $info['node'], $config );
310  return [ 'modifiedIndex' => $lastModifiedIndex, 'config' => $config ];
311  }
312 
322  protected function parseDirectory( $dirName, $dirNode, &$config ) {
323  $lastModifiedIndex = 0;
324  if ( !isset( $dirNode['nodes'] ) ) {
325  throw new EtcdConfigParseError(
326  "Unexpected JSON response in dir '$dirName'; missing 'nodes' list." );
327  }
328  if ( !is_array( $dirNode['nodes'] ) ) {
329  throw new EtcdConfigParseError(
330  "Unexpected JSON response in dir '$dirName'; 'nodes' is not an array." );
331  }
332 
333  foreach ( $dirNode['nodes'] as $node ) {
334  '@phan-var array $node';
335  $baseName = basename( $node['key'] );
336  $fullName = $dirName === '' ? $baseName : "$dirName/$baseName";
337  if ( !empty( $node['dir'] ) ) {
338  $lastModifiedIndex = max(
339  $this->parseDirectory( $fullName, $node, $config ),
340  $lastModifiedIndex );
341  } else {
342  $value = $this->unserialize( $node['value'] );
343  if ( !is_array( $value ) || !array_key_exists( 'val', $value ) ) {
344  throw new EtcdConfigParseError( "Failed to parse value for '$fullName'." );
345  }
346  $lastModifiedIndex = max( $node['modifiedIndex'], $lastModifiedIndex );
347  $config[$fullName] = $value['val'];
348  }
349  }
350  return $lastModifiedIndex;
351  }
352 
357  private function unserialize( $string ) {
358  return json_decode( $string, true );
359  }
360 }
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:85
Exceptions for config failures.
Interface for configuration instances.
Definition: EtcdConfig.php:32
fetchAllFromEtcd()
Definition: EtcdConfig.php:239
fetchAllFromEtcdServer(string $address, ?int $port=null)
Definition: EtcdConfig.php:260
parseDirectory( $dirName, $dirNode, &$config)
Recursively parse a directory node and populate the array passed by reference, throwing EtcdConfigPar...
Definition: EtcdConfig.php:322
getModifiedIndex()
Definition: EtcdConfig.php:149
setLogger(LoggerInterface $logger)
Definition: EtcdConfig.php:128
__construct(array $params)
Definition: EtcdConfig.php:74
has( $name)
Check whether a configuration option is set for the given name.
Definition: EtcdConfig.php:133
parseResponse( $rbody)
Parse a response body, throwing EtcdConfigParseError if there is a validation error.
Definition: EtcdConfig.php:299
Simple store for keeping values in an associative array for the current process.
Class to handle multiple HTTP requests.
Interface for configuration instances.
Definition: Config.php:30
return true
Definition: router.php:90