20 use Psr\Log\LoggerInterface;
78 private const AWAKE_ONE = 1;
79 private const AWAKE_ALL = 2;
85 parent::__construct( $conf,
$type,
$key );
87 $this->serversByLabel = $conf[
'servers'];
89 $serverLabels = array_keys( $conf[
'servers'] );
90 $this->ring =
new HashRing( array_fill_keys( $serverLabels, 10 ) );
92 $conf[
'redisConfig'][
'serializer'] =
'none';
96 $this->keySha1 = sha1( $this->key );
97 $met = ini_get(
'max_execution_time' );
98 $this->lockTTL = $met ? 2 * (int)$met : 3600;
100 if ( self::$active ===
null ) {
102 register_shutdown_function( [ __CLASS__,
'releaseAll' ] );
110 if ( !isset( $this->conn ) ) {
112 $servers = $this->ring->getLocations( $this->key, 3 );
114 foreach ( $servers as $server ) {
115 $conn = $this->pool->getConnection( $this->serversByLabel[$server], $this->logger );
130 if ( !$status->isGood() ) {
139 if ( !$status->isGood() ) {
147 if ( $this->slot ===
null ) {
152 if ( !$status->isOK() ) {
156 $conn = $status->value;
157 '@phan-var RedisConnRef $conn';
163 local kSlots,kSlotsNextRelease,kWakeup,kWaiting = unpack(KEYS)
164 local rMaxWorkers,rExpiry,rSlot,rSlotTime,rAwakeAll,rTime = unpack(ARGV)
165 -- Add the slots back to the list (
if rSlot is
"w" then it is not a slot).
166 -- Treat the list as expired
if the
"next release" time sorted-
set is missing.
167 if rSlot ~=
'w' and redis.call(
'exists',kSlotsNextRelease) == 1 then
168 if 1*redis.call(
'zScore',kSlotsNextRelease,rSlot) ~= (rSlotTime + rExpiry) then
169 -- Slot lock expired and was released already
170 elseif redis.call(
'lLen',kSlots) >= 1*rMaxWorkers then
171 -- Slots somehow got out of sync; reset the list
172 redis.call(
'del',kSlots,kSlotsNextRelease)
173 elseif redis.call(
'lLen',kSlots) == (1*rMaxWorkers - 1) and redis.call(
'zCard',kWaiting) == 0 then
174 -- Slot list will be made full; clear it to save space (it re-inits as needed)
175 -- since nothing is waiting on being unblocked by a push to the list
176 redis.call(
'del',kSlots,kSlotsNextRelease)
178 -- Add slot back to pool and update the
"next release" time
179 redis.call(
'rPush',kSlots,rSlot)
180 redis.call(
'zAdd',kSlotsNextRelease,rTime + 30,rSlot)
181 -- Always keep renewing the expiry on use
182 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
183 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
186 -- Update an ephemeral list to wake up other clients that can
187 -- reuse any cached work from
this process. Only
do this if no
188 -- slots are currently free (e.g. clients could be waiting).
189 if 1*rAwakeAll == 1 then
190 local count = redis.call(
'zCard',kWaiting)
192 redis.call(
'rPush',kWakeup,
'w')
194 redis.call(
'pexpire',kWakeup,1)
211 ( $this->
onRelease === self::AWAKE_ALL ) ? 1 : 0,
214 4 # number of first argument(s) that are keys
216 }
catch ( RedisException $e ) {
221 $this->slotTime =
null;
223 unset( self::$active[$this->session] );
236 if ( $this->slot !==
null ) {
241 if ( !$status->isOK() ) {
245 $conn = $status->value;
246 '@phan-var RedisConnRef $conn';
248 $now = microtime(
true );
252 if ( ctype_digit(
$slot ) ) {
255 } elseif (
$slot ===
'QUEUE_FULL' ) {
258 } elseif (
$slot ===
'QUEUE_WAIT' ) {
260 $keys = ( $doWakeup == self::AWAKE_ALL )
277 return Status::newFatal(
'pool-error-unknown',
"Server gave slot '$slot'." );
279 }
catch ( RedisException $e ) {
283 if (
$slot !==
'w' ) {
304 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS)
305 local rMaxWorkers,rMaxQueue,rTimeout,rExpiry,rSess,rTime = unpack(ARGV)
306 -- Initialize
if the
"next release" time sorted-
set is empty. The slot key
307 -- itself is empty
if all slots are busy or when nothing is initialized.
308 -- If the list is empty but the
set is not, then it is the latter
case.
309 -- If the list exists but not the
set, then reset everything.
310 if redis.call(
'exists',kSlotsNextRelease) == 0 then
311 redis.call(
'del',kSlots)
312 for i = 1,1*rMaxWorkers
do
313 redis.call(
'rPush',kSlots,i)
314 redis.call(
'zAdd',kSlotsNextRelease,-1,i)
316 -- Otherwise
do maintenance to clean up after network partitions
318 -- Find stale slot locks and add free them (avoid duplicates)
319 local staleLocks = redis.call(
'zRangeByScore',kSlotsNextRelease,0,rTime)
320 for k,slot in ipairs(staleLocks)
do
321 redis.call(
'lRem',kSlots,0,slot)
322 redis.call(
'rPush',kSlots,slot)
323 redis.call(
'zAdd',kSlotsNextRelease,rTime + 30,slot)
325 -- Find stale wait slot entries and
remove them
326 redis.call(
'zRemRangeByScore',kSlotWaits,0,rTime - 2*rTimeout)
329 -- Try to acquire a slot
if possible now
330 if redis.call(
'lLen',kSlots) > 0 then
331 slot = redis.call(
'lPop',kSlots)
332 -- Update the slot
"next release" time
333 redis.call(
'zAdd',kSlotsNextRelease,rTime + rExpiry,slot)
334 elseif redis.call(
'zCard',kSlotWaits) >= 1*rMaxQueue then
338 -- Register
this process as waiting
339 redis.call(
'zAdd',kSlotWaits,rTime,rSess)
340 redis.call(
'expireAt',kSlotWaits,math.ceil(rTime + 2*rTimeout))
342 -- Always keep renewing the expiry on use
343 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
344 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
359 3 # number of first argument(s) that are keys
373 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS)
374 local rSlot,rExpiry,rSess,rTime = unpack(ARGV)
375 -- If rSlot is
'w' then the client was told to wake up but got no slot
377 -- Update the slot
"next release" time
378 redis.call(
'zAdd',kSlotsNextRelease,rTime + rExpiry,rSlot)
379 -- Always keep renewing the expiry on use
380 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
381 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
383 -- Unregister
this process as waiting
384 redis.call(
'zRem',kSlotWaits,rSess)
397 3 # number of first argument(s) that are keys
405 return "poolcounter:l-slots-{$this->keySha1}-{$this->workers}";
412 return "poolcounter:z-renewtime-{$this->keySha1}-{$this->workers}";
419 return "poolcounter:z-wait-{$this->keySha1}-{$this->workers}";
426 return "poolcounter:l-wakeup-{$this->keySha1}-{$this->workers}";
434 foreach ( self::$active as $poolCounter ) {
436 if ( $poolCounter->slot !==
null ) {
437 $poolCounter->release();
439 }
catch ( Exception $e ) {
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Convenience class for weighted consistent hash rings.
Version of PoolCounter that uses Redis.
acquireForAnyone( $timeout=null)
I want to do this task, but if anyone else does it instead, it's also fine for me.
__construct( $conf, $type, $key)
int null $onRelease
AWAKE_* constant.
int null $slotTime
UNIX timestamp.
initAndPopPoolSlotList(RedisConnRef $conn, $now)
array $serversByLabel
(server label => host) map
string $session
Unique string to identify this process.
static releaseAll()
Try to make sure that locks get released (even with exceptions and fatals)
release()
I have successfully finished my task.
static PoolCounterRedis[] $active
List of active PoolCounterRedis objects in this script.
waitForSlotOrNotif( $doWakeup, $timeout=null)
registerAcquisitionTime(RedisConnRef $conn, $slot, $now)
acquireForMe( $timeout=null)
I want to do this task and I need to do it myself.
int $lockTTL
TTL for locks to expire (work should finish in this time)
string null $slot
Pool slot value.
string $keySha1
SHA-1 of the key.
RedisConnectionPool $pool
When you have many workers (threads/servers) giving service, and a cached item expensive to produce e...
onAcquire()
Update any lock tracking information when the lock is acquired.
string $key
All workers with the same key share the lock.
precheckAcquire()
Checks that the lock request is sensible.
int $timeout
Maximum time in seconds to wait for the lock.
onRelease()
Update any lock tracking information when the lock is released.
Helper class to handle automatically marking connections as reusable (via RAII pattern)
luaEval( $script, array $params, $numKeys)
static singleton(array $options)
static newFatal( $message,... $parameters)
Factory function for fatal errors.
static newGood( $value=null)
Factory function for good results.