55 private $baseCacheTTL;
57 private $skewCacheTTL;
84 $this->service = $params[
'service'];
85 $this->host = $params[
'host'];
86 $this->port = $params[
'port'];
87 $this->protocol = $params[
'protocol'];
88 $this->directory = trim( $params[
'directory'],
'/' );
89 $this->skewCacheTTL = $params[
'skewTTL'];
90 $this->baseCacheTTL = max( $params[
'cacheTTL'] - $this->skewCacheTTL, 0 );
91 $this->timeout = $params[
'timeout'];
94 $hostAndPort = IPUtils::splitHostAndPort( $this->host );
97 $this->host = $hostAndPort[0];
99 if ( $hostAndPort[1] ) {
100 $this->port = $hostAndPort[1];
106 if ( preg_match(
'/^_([^\.]+)\._tcp\.(.+)$/', $this->host, $m ) ) {
107 $this->service = $m[1];
111 if ( !isset( $params[
'cache'] ) ) {
113 } elseif ( $params[
'cache'] instanceof
BagOStuff ) {
114 $this->srvCache = $params[
'cache'];
116 $this->srvCache = ObjectFactory::getObjectFromSpec( $params[
'cache'] );
119 $this->logger =
new Psr\Log\NullLogger();
121 'connTimeout' => $this->timeout,
122 'reqTimeout' => $this->timeout,
123 'logger' => $this->logger
129 $this->logger = $logger;
130 $this->http->setLogger( $logger );
133 public function has( $name ) {
136 return array_key_exists( $name, $this->procCache[
'config'] );
139 public function get( $name ) {
142 if ( !array_key_exists( $name, $this->procCache[
'config'] ) ) {
146 return $this->procCache[
'config'][$name];
151 return $this->procCache[
'modifiedIndex'];
157 private function load() {
158 if ( $this->procCache !==
null ) {
162 $now = microtime(
true );
163 $key = $this->srvCache->makeGlobalKey(
172 $loop =
new WaitConditionLoop(
173 function () use ( $key, $now, &$data, &$error ) {
175 $data = $this->srvCache->get( $key );
176 if ( is_array( $data ) && $data[
'expires'] > $now ) {
177 $this->logger->debug(
"Found up-to-date etcd configuration cache." );
179 return WaitConditionLoop::CONDITION_REACHED;
184 if ( $this->srvCache->lock( $key, 0, $this->baseCacheTTL ) ) {
186 $etcdResponse = $this->fetchAllFromEtcd();
187 $error = $etcdResponse[
'error'];
188 if ( is_array( $etcdResponse[
'config'] ) ) {
190 $expiry = microtime( true ) + $this->baseCacheTTL;
192 $expiry += mt_rand( 0, 1e6 ) / 1e6 * $this->skewCacheTTL;
194 'config' => $etcdResponse[
'config'],
195 'expires' => $expiry,
196 'modifiedIndex' => $etcdResponse[
'modifiedIndex']
198 $this->srvCache->set( $key, $data, BagOStuff::TTL_INDEFINITE );
200 $this->logger->info(
"Refreshed stale etcd configuration cache." );
202 return WaitConditionLoop::CONDITION_REACHED;
204 $this->logger->error(
"Failed to fetch configuration: $error" );
205 if ( !$etcdResponse[
'retry'] ) {
207 return WaitConditionLoop::CONDITION_FAILED;
211 $this->srvCache->unlock( $key );
215 if ( is_array( $data ) ) {
216 $this->logger->info(
"Using stale etcd configuration cache." );
218 return WaitConditionLoop::CONDITION_REACHED;
221 return WaitConditionLoop::CONDITION_CONTINUE;
226 if ( $loop->invoke() !== WaitConditionLoop::CONDITION_REACHED ) {
229 throw new ConfigException(
"Failed to load configuration from etcd: $error" );
233 $this->procCache = $data;
240 $servers = $this->dsd->getServers() ?: [ [ $this->host, $this->port ] ];
242 foreach ( $servers as $server ) {
243 list( $host, $port ) = $server;
246 $response = $this->fetchAllFromEtcdServer( $host, $port );
247 if ( is_array( $response[
'config'] ) || $response[
'retry'] ) {
263 if ( $port !==
null ) {
264 $host = IPUtils::combineHostAndPort( $address, $port );
268 list( $rcode, $rdesc, , $rbody, $rerr ) = $this->http->run( [
270 'url' =>
"{$this->protocol}://{$host}/v2/keys/{$this->directory}/?recursive=true",
272 'content-type' =>
'application/json',
276 $response = [
'config' =>
null,
'error' =>
null,
'retry' =>
false,
'modifiedIndex' => 0 ];
278 static $terminalCodes = [ 404 =>
true ];
279 if ( $rcode < 200 || $rcode > 399 ) {
280 $response[
'error'] = strlen( $rerr ??
'' ) ? $rerr :
"HTTP $rcode ($rdesc)";
281 $response[
'retry'] = empty( $terminalCodes[$rcode] );
286 $parsedResponse = $this->parseResponse( $rbody );
288 $parsedResponse = [
'error' => $e->getMessage() ];
290 return array_merge( $response, $parsedResponse );
300 $info = json_decode( $rbody,
true );
301 if ( $info ===
null ) {
304 if ( !isset( $info[
'node'] ) || !is_array( $info[
'node'] ) ) {
306 "Unexpected JSON response: Missing or invalid node at top level." );
309 $lastModifiedIndex = $this->parseDirectory(
'', $info[
'node'], $config );
310 return [
'modifiedIndex' => $lastModifiedIndex,
'config' => $config ];
323 $lastModifiedIndex = 0;
324 if ( !isset( $dirNode[
'nodes'] ) ) {
326 "Unexpected JSON response in dir '$dirName'; missing 'nodes' list." );
328 if ( !is_array( $dirNode[
'nodes'] ) ) {
330 "Unexpected JSON response in dir '$dirName'; 'nodes' is not an array." );
333 foreach ( $dirNode[
'nodes'] as $node ) {
334 '@phan-var array $node';
335 $baseName = basename( $node[
'key'] );
336 $fullName = $dirName ===
'' ? $baseName :
"$dirName/$baseName";
337 if ( !empty( $node[
'dir'] ) ) {
338 $lastModifiedIndex = max(
339 $this->parseDirectory( $fullName, $node, $config ),
340 $lastModifiedIndex );
343 if ( !is_array( $value ) || !array_key_exists(
'val', $value ) ) {
346 $lastModifiedIndex = max( $node[
'modifiedIndex'], $lastModifiedIndex );
347 $config[$fullName] = $value[
'val'];
350 return $lastModifiedIndex;
358 return json_decode( $string,
true );
parseDirectory( $dirName, $dirNode, &$config)
Recursively parse a directory node and populate the array passed by reference, throwing EtcdConfigPar...