MediaWiki REL1_39
WANObjectCacheReaper.php
Go to the documentation of this file.
1<?php
22use Psr\Log\LoggerAwareInterface;
23use Psr\Log\LoggerInterface;
24use Psr\Log\NullLogger;
25use Wikimedia\ScopedCallback;
26
37class WANObjectCacheReaper implements LoggerAwareInterface {
39 protected $cache;
41 protected $store;
47 protected $logger;
48
50 protected $channel;
53
76 public function __construct(
79 callable $logCallback,
80 callable $keyCallback,
81 array $params
82 ) {
83 $this->cache = $cache;
84 $this->store = $store;
85
86 $this->logChunkCallback = $logCallback;
87 $this->keyListCallback = $keyCallback;
88 if ( isset( $params['channel'] ) ) {
89 $this->channel = $params['channel'];
90 } else {
91 throw new UnexpectedValueException( "No channel specified." );
92 }
93
94 $this->initialStartWindow = $params['initialStartWindow'] ?? 3600;
95 $this->logger = $params['logger'] ?? new NullLogger();
96 }
97
98 public function setLogger( LoggerInterface $logger ) {
99 $this->logger = $logger;
100 }
101
108 final public function invoke( $n = 100 ) {
109 $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
110 $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
111 if ( !$scopeLock ) {
112 return 0;
113 }
114
115 $now = time();
116 $status = $this->store->get( $posKey );
117 if ( !$status ) {
118 $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
119 }
120
121 // Get events for entities who's keys tombstones/hold-off should have expired by now
122 $events = call_user_func_array(
123 $this->logChunkCallback,
124 [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
125 );
126
127 $event = null;
128 $keyEvents = [];
129 foreach ( $events as $event ) {
130 $keys = call_user_func_array(
131 $this->keyListCallback,
132 [ $this->cache, $event['item'] ]
133 );
134 foreach ( $keys as $key ) {
135 // use only the latest per key
136 unset( $keyEvents[$key] );
137 $keyEvents[$key] = [
138 'pos' => $event['pos'],
139 'id' => $event['id']
140 ];
141 }
142 }
143
144 $purgeCount = 0;
145 $lastOkEvent = null;
146 foreach ( $keyEvents as $key => $keyEvent ) {
147 if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
148 break;
149 }
150 ++$purgeCount;
151 $lastOkEvent = $event;
152 }
153
154 if ( $lastOkEvent ) {
155 $ok = $this->store->merge(
156 $posKey,
157 static function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
158 if ( !$curValue ) {
159 // Use new position
160 } else {
161 $curCoord = [ $curValue['pos'], $curValue['id'] ];
162 $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
163 if ( $newCoord < $curCoord ) {
164 // Keep prior position instead of rolling it back
165 return $curValue;
166 }
167 }
168
169 return [
170 'pos' => $lastOkEvent['pos'],
171 'id' => $lastOkEvent['id'],
172 'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
173 ];
174 },
175 BagOStuff::TTL_INDEFINITE
176 );
177
178 $pos = $lastOkEvent['pos'];
179 $id = $lastOkEvent['id'];
180 if ( $ok ) {
181 $this->logger->info( "Updated cache reap position ($pos, $id)." );
182 } else {
183 $this->logger->error( "Could not update cache reap position ($pos, $id)." );
184 }
185 }
186
187 ScopedCallback::consume( $scopeLock );
188
189 return $purgeCount;
190 }
191
195 public function getState() {
196 $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
197
198 return $this->store->get( $posKey );
199 }
200}
Class representing a cache/ephemeral data store.
Definition BagOStuff.php:85
Class for scanning through chronological, log-structured data or change logs and locally purging cach...
__construct(WANObjectCache $cache, BagOStuff $store, callable $logCallback, callable $keyCallback, array $params)
invoke( $n=100)
Check and reap stale keys based on a chunk of events.
setLogger(LoggerInterface $logger)
Multi-datacenter aware caching interface.