20 use Psr\Log\LoggerInterface;
77 private const AWAKE_ONE = 1;
78 private const AWAKE_ALL = 2;
84 parent::__construct( $conf,
$type,
$key );
86 $this->serversByLabel = $conf[
'servers'];
88 $serverLabels = array_keys( $conf[
'servers'] );
89 $this->ring =
new HashRing( array_fill_keys( $serverLabels, 10 ) );
91 $conf[
'redisConfig'][
'serializer'] =
'none';
95 $this->keySha1 = sha1( $this->key );
96 $met = ini_get(
'max_execution_time' );
97 $this->lockTTL = $met ? 2 * (int)$met : 3600;
99 if ( self::$active ===
null ) {
101 register_shutdown_function( [ __CLASS__,
'releaseAll' ] );
109 if ( !isset( $this->conn ) ) {
111 $servers = $this->ring->getLocations( $this->key, 3 );
113 foreach ( $servers as $server ) {
114 $conn = $this->pool->getConnection( $this->serversByLabel[$server], $this->logger );
129 if ( !$status->isGood() ) {
138 if ( !$status->isGood() ) {
146 if ( $this->slot ===
null ) {
151 if ( !$status->isOK() ) {
155 $conn = $status->value;
156 '@phan-var RedisConnRef $conn';
162 local kSlots,kSlotsNextRelease,kWakeup,kWaiting = unpack(KEYS)
163 local rMaxWorkers,rExpiry,rSlot,rSlotTime,rAwakeAll,rTime = unpack(ARGV)
164 -- Add the slots back to the list (
if rSlot is
"w" then it is not a slot).
165 -- Treat the list as expired
if the
"next release" time sorted-
set is missing.
166 if rSlot ~=
'w' and redis.call(
'exists',kSlotsNextRelease) == 1 then
167 if 1*redis.call(
'zScore',kSlotsNextRelease,rSlot) ~= (rSlotTime + rExpiry) then
168 -- Slot lock expired and was released already
169 elseif redis.call(
'lLen',kSlots) >= 1*rMaxWorkers then
170 -- Slots somehow got out of sync; reset the list
171 redis.call(
'del',kSlots,kSlotsNextRelease)
172 elseif redis.call(
'lLen',kSlots) == (1*rMaxWorkers - 1) and redis.call(
'zCard',kWaiting) == 0 then
173 -- Slot list will be made full; clear it to save space (it re-inits as needed)
174 -- since nothing is waiting on being unblocked by a push to the list
175 redis.call(
'del',kSlots,kSlotsNextRelease)
177 -- Add slot back to pool and update the
"next release" time
178 redis.call(
'rPush',kSlots,rSlot)
179 redis.call(
'zAdd',kSlotsNextRelease,rTime + 30,rSlot)
180 -- Always keep renewing the expiry on use
181 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
182 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
185 -- Update an ephemeral list to wake up other clients that can
186 -- reuse any cached work from
this process. Only
do this if no
187 -- slots are currently free (e.g. clients could be waiting).
188 if 1*rAwakeAll == 1 then
189 local count = redis.call(
'zCard',kWaiting)
191 redis.call(
'rPush',kWakeup,
'w')
193 redis.call(
'pexpire',kWakeup,1)
210 ( $this->
onRelease === self::AWAKE_ALL ) ? 1 : 0,
213 4 # number of first argument(s) that are keys
215 }
catch ( RedisException $e ) {
220 $this->slotTime =
null;
222 unset( self::$active[$this->session] );
235 if ( $this->slot !==
null ) {
240 if ( !$status->isOK() ) {
244 $conn = $status->value;
245 '@phan-var RedisConnRef $conn';
247 $now = microtime(
true );
251 if ( ctype_digit(
$slot ) ) {
254 } elseif (
$slot ===
'QUEUE_FULL' ) {
257 } elseif (
$slot ===
'QUEUE_WAIT' ) {
259 $keys = ( $doWakeup == self::AWAKE_ALL )
276 return Status::newFatal(
'pool-error-unknown',
"Server gave slot '$slot'." );
278 }
catch ( RedisException $e ) {
282 if (
$slot !==
'w' ) {
303 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS)
304 local rMaxWorkers,rMaxQueue,rTimeout,rExpiry,rSess,rTime = unpack(ARGV)
305 -- Initialize
if the
"next release" time sorted-
set is empty. The slot key
306 -- itself is empty
if all slots are busy or when nothing is initialized.
307 -- If the list is empty but the
set is not, then it is the latter
case.
308 -- If the list exists but not the
set, then reset everything.
309 if redis.call(
'exists',kSlotsNextRelease) == 0 then
310 redis.call(
'del',kSlots)
311 for i = 1,1*rMaxWorkers
do
312 redis.call(
'rPush',kSlots,i)
313 redis.call(
'zAdd',kSlotsNextRelease,-1,i)
315 -- Otherwise
do maintenance to clean up after network partitions
317 -- Find stale slot locks and add free them (avoid duplicates)
318 local staleLocks = redis.call(
'zRangeByScore',kSlotsNextRelease,0,rTime)
319 for k,slot in ipairs(staleLocks)
do
320 redis.call(
'lRem',kSlots,0,slot)
321 redis.call(
'rPush',kSlots,slot)
322 redis.call(
'zAdd',kSlotsNextRelease,rTime + 30,slot)
324 -- Find stale wait slot entries and
remove them
325 redis.call(
'zRemRangeByScore',kSlotWaits,0,rTime - 2*rTimeout)
328 -- Try to acquire a slot
if possible now
329 if redis.call(
'lLen',kSlots) > 0 then
330 slot = redis.call(
'lPop',kSlots)
331 -- Update the slot
"next release" time
332 redis.call(
'zAdd',kSlotsNextRelease,rTime + rExpiry,slot)
333 elseif redis.call(
'zCard',kSlotWaits) >= 1*rMaxQueue then
337 -- Register
this process as waiting
338 redis.call(
'zAdd',kSlotWaits,rTime,rSess)
339 redis.call(
'expireAt',kSlotWaits,math.ceil(rTime + 2*rTimeout))
341 -- Always keep renewing the expiry on use
342 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
343 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
358 3 # number of first argument(s) that are keys
372 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS)
373 local rSlot,rExpiry,rSess,rTime = unpack(ARGV)
374 -- If rSlot is
'w' then the client was told to wake up but got no slot
376 -- Update the slot
"next release" time
377 redis.call(
'zAdd',kSlotsNextRelease,rTime + rExpiry,rSlot)
378 -- Always keep renewing the expiry on use
379 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
380 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
382 -- Unregister
this process as waiting
383 redis.call(
'zRem',kSlotWaits,rSess)
396 3 # number of first argument(s) that are keys
404 return "poolcounter:l-slots-{$this->keySha1}-{$this->workers}";
411 return "poolcounter:z-renewtime-{$this->keySha1}-{$this->workers}";
418 return "poolcounter:z-wait-{$this->keySha1}-{$this->workers}";
425 return "poolcounter:l-wakeup-{$this->keySha1}-{$this->workers}";
433 foreach ( self::$active as $poolCounter ) {
435 if ( $poolCounter->slot !==
null ) {
436 $poolCounter->release();
438 }
catch ( Exception $e ) {
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Convenience class for weighted consistent hash rings.
Version of PoolCounter that uses Redis.
acquireForAnyone( $timeout=null)
I want to do this task, but if anyone else does it instead, it's also fine for me.
__construct( $conf, $type, $key)
int null $onRelease
AWAKE_* constant.
float null $slotTime
UNIX timestamp.
initAndPopPoolSlotList(RedisConnRef $conn, $now)
array $serversByLabel
(server label => host) map
string $session
Unique string to identify this process.
static releaseAll()
Try to make sure that locks get released (even with exceptions and fatals)
release()
I have successfully finished my task.
static PoolCounterRedis[] $active
List of active PoolCounterRedis objects in this script.
waitForSlotOrNotif( $doWakeup, $timeout=null)
registerAcquisitionTime(RedisConnRef $conn, $slot, $now)
acquireForMe( $timeout=null)
I want to do this task and I need to do it myself.
int $lockTTL
TTL for locks to expire (work should finish in this time)
string null $slot
Pool slot value.
string $keySha1
SHA-1 of the key.
RedisConnectionPool $pool
Semaphore semantics to restrict how many workers may concurrently perform a task.
onAcquire()
Update any lock tracking information when the lock is acquired.
string $key
All workers with the same key share the lock.
precheckAcquire()
Checks that the lock request is sensible.
int $timeout
Maximum time in seconds to wait for the lock.
onRelease()
Update any lock tracking information when the lock is released.
Helper class to handle automatically marking connections as reusable (via RAII pattern)
luaEval( $script, array $params, $numKeys)
static singleton(array $options)
static newFatal( $message,... $parameters)
Factory function for fatal errors.
static newGood( $value=null)
Factory function for good results.