MediaWiki  REL1_39
WANObjectCacheReaper.php
Go to the documentation of this file.
1 <?php
22 use Psr\Log\LoggerAwareInterface;
23 use Psr\Log\LoggerInterface;
24 use Psr\Log\NullLogger;
25 use Wikimedia\ScopedCallback;
26 
37 class WANObjectCacheReaper implements LoggerAwareInterface {
39  protected $cache;
41  protected $store;
43  protected $logChunkCallback;
45  protected $keyListCallback;
47  protected $logger;
48 
50  protected $channel;
53 
76  public function __construct(
79  callable $logCallback,
80  callable $keyCallback,
81  array $params
82  ) {
83  $this->cache = $cache;
84  $this->store = $store;
85 
86  $this->logChunkCallback = $logCallback;
87  $this->keyListCallback = $keyCallback;
88  if ( isset( $params['channel'] ) ) {
89  $this->channel = $params['channel'];
90  } else {
91  throw new UnexpectedValueException( "No channel specified." );
92  }
93 
94  $this->initialStartWindow = $params['initialStartWindow'] ?? 3600;
95  $this->logger = $params['logger'] ?? new NullLogger();
96  }
97 
98  public function setLogger( LoggerInterface $logger ) {
99  $this->logger = $logger;
100  }
101 
108  final public function invoke( $n = 100 ) {
109  $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
110  $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
111  if ( !$scopeLock ) {
112  return 0;
113  }
114 
115  $now = time();
116  $status = $this->store->get( $posKey );
117  if ( !$status ) {
118  $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
119  }
120 
121  // Get events for entities who's keys tombstones/hold-off should have expired by now
122  $events = call_user_func_array(
123  $this->logChunkCallback,
124  [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
125  );
126 
127  $event = null;
128  $keyEvents = [];
129  foreach ( $events as $event ) {
130  $keys = call_user_func_array(
131  $this->keyListCallback,
132  [ $this->cache, $event['item'] ]
133  );
134  foreach ( $keys as $key ) {
135  // use only the latest per key
136  unset( $keyEvents[$key] );
137  $keyEvents[$key] = [
138  'pos' => $event['pos'],
139  'id' => $event['id']
140  ];
141  }
142  }
143 
144  $purgeCount = 0;
145  $lastOkEvent = null;
146  foreach ( $keyEvents as $key => $keyEvent ) {
147  if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
148  break;
149  }
150  ++$purgeCount;
151  $lastOkEvent = $event;
152  }
153 
154  if ( $lastOkEvent ) {
155  $ok = $this->store->merge(
156  $posKey,
157  static function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
158  if ( !$curValue ) {
159  // Use new position
160  } else {
161  $curCoord = [ $curValue['pos'], $curValue['id'] ];
162  $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
163  if ( $newCoord < $curCoord ) {
164  // Keep prior position instead of rolling it back
165  return $curValue;
166  }
167  }
168 
169  return [
170  'pos' => $lastOkEvent['pos'],
171  'id' => $lastOkEvent['id'],
172  'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
173  ];
174  },
175  BagOStuff::TTL_INDEFINITE
176  );
177 
178  $pos = $lastOkEvent['pos'];
179  $id = $lastOkEvent['id'];
180  if ( $ok ) {
181  $this->logger->info( "Updated cache reap position ($pos, $id)." );
182  } else {
183  $this->logger->error( "Could not update cache reap position ($pos, $id)." );
184  }
185  }
186 
187  ScopedCallback::consume( $scopeLock );
188 
189  return $purgeCount;
190  }
191 
195  public function getState() {
196  $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
197 
198  return $this->store->get( $posKey );
199  }
200 }
Class representing a cache/ephemeral data store.
Definition: BagOStuff.php:85
Class for scanning through chronological, log-structured data or change logs and locally purging cach...
__construct(WANObjectCache $cache, BagOStuff $store, callable $logCallback, callable $keyCallback, array $params)
invoke( $n=100)
Check and reap stale keys based on a chunk of events.
setLogger(LoggerInterface $logger)
Multi-datacenter aware caching interface.
const HOLDOFF_TTL
Seconds to tombstone keys on delete() and to treat keys as volatile after purges.