22 use Psr\Log\LoggerInterface;
79 private const AWAKE_ONE = 1;
80 private const AWAKE_ALL = 2;
86 parent::__construct( $conf, $type,
$key );
88 $this->serversByLabel = $conf[
'servers'];
90 $serverLabels = array_keys( $conf[
'servers'] );
91 $this->ring =
new HashRing( array_fill_keys( $serverLabels, 10 ) );
93 $conf[
'redisConfig'][
'serializer'] =
'none';
97 $this->keySha1 = sha1( $this->key );
98 $met = ini_get(
'max_execution_time' );
99 $this->lockTTL = $met ? 2 * (int)$met : 3600;
101 if ( self::$active ===
null ) {
103 register_shutdown_function( [ __CLASS__,
'releaseAll' ] );
111 if ( !isset( $this->conn ) ) {
113 $servers = $this->ring->getLocations( $this->key, 3 );
115 foreach ( $servers as $server ) {
116 $conn = $this->pool->getConnection( $this->serversByLabel[$server], $this->logger );
122 return Status::newFatal(
'pool-servererror', implode(
', ', $servers ) );
126 return Status::newGood( $this->conn );
131 if ( !$status->isGood() ) {
140 if ( !$status->isGood() ) {
148 if ( $this->slot ===
null ) {
153 if ( !$status->isOK() ) {
157 $conn = $status->value;
158 '@phan-var RedisConnRef $conn';
164 local kSlots,kSlotsNextRelease,kWakeup,kWaiting = unpack(KEYS)
165 local rMaxWorkers,rExpiry,rSlot,rSlotTime,rAwakeAll,rTime = unpack(ARGV)
166 -- Add the slots back to the list (
if rSlot is
"w" then it is not a slot).
167 -- Treat the list as expired
if the
"next release" time sorted-
set is missing.
168 if rSlot ~=
'w' and redis.call(
'exists',kSlotsNextRelease) == 1 then
169 if 1*redis.call(
'zScore',kSlotsNextRelease,rSlot) ~= (rSlotTime + rExpiry) then
170 -- Slot lock expired and was released already
171 elseif redis.call(
'lLen',kSlots) >= 1*rMaxWorkers then
172 -- Slots somehow got out of sync; reset the list
173 redis.call(
'del',kSlots,kSlotsNextRelease)
174 elseif redis.call(
'lLen',kSlots) == (1*rMaxWorkers - 1) and redis.call(
'zCard',kWaiting) == 0 then
175 -- Slot list will be made full; clear it to save space (it re-inits as needed)
176 -- since nothing is waiting on being unblocked by a push to the list
177 redis.call(
'del',kSlots,kSlotsNextRelease)
179 -- Add slot back to pool and update the
"next release" time
180 redis.call(
'rPush',kSlots,rSlot)
181 redis.call(
'zAdd',kSlotsNextRelease,rTime + 30,rSlot)
182 -- Always keep renewing the expiry on use
183 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
184 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
187 -- Update an ephemeral list to wake up other clients that can
188 -- reuse any cached work from
this process. Only
do this if no
189 -- slots are currently free (e.g. clients could be waiting).
190 if 1*rAwakeAll == 1 then
191 local count = redis.call(
'zCard',kWaiting)
193 redis.call(
'rPush',kWakeup,
'w')
195 redis.call(
'pexpire',kWakeup,1)
212 ( $this->
onRelease === self::AWAKE_ALL ) ? 1 : 0,
215 4 # number of first argument(s) that are keys
217 }
catch ( RedisException $e ) {
218 return Status::newFatal(
'pool-error-unknown', $e->getMessage() );
222 $this->slotTime =
null;
224 unset( self::$active[$this->session] );
237 if ( $this->slot !==
null ) {
242 if ( !$status->isOK() ) {
246 $conn = $status->value;
247 '@phan-var RedisConnRef $conn';
249 $now = microtime(
true );
253 if ( ctype_digit(
$slot ) ) {
256 } elseif (
$slot ===
'QUEUE_FULL' ) {
259 } elseif (
$slot ===
'QUEUE_WAIT' ) {
261 $keys = ( $doWakeup == self::AWAKE_ALL )
278 return Status::newFatal(
'pool-error-unknown',
"Server gave slot '$slot'." );
280 }
catch ( RedisException $e ) {
281 return Status::newFatal(
'pool-error-unknown', $e->getMessage() );
284 if (
$slot !==
'w' ) {
305 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS)
306 local rMaxWorkers,rMaxQueue,rTimeout,rExpiry,rSess,rTime = unpack(ARGV)
307 -- Initialize
if the
"next release" time sorted-
set is empty. The slot key
308 -- itself is empty
if all slots are busy or when nothing is initialized.
309 -- If the list is empty but the
set is not, then it is the latter
case.
310 -- If the list exists but not the
set, then reset everything.
311 if redis.call(
'exists',kSlotsNextRelease) == 0 then
312 redis.call(
'del',kSlots)
313 for i = 1,1*rMaxWorkers
do
314 redis.call(
'rPush',kSlots,i)
315 redis.call(
'zAdd',kSlotsNextRelease,-1,i)
317 -- Otherwise
do maintenance to clean up after network partitions
319 -- Find stale slot locks and add free them (avoid duplicates)
320 local staleLocks = redis.call(
'zRangeByScore',kSlotsNextRelease,0,rTime)
321 for k,slot in ipairs(staleLocks)
do
322 redis.call(
'lRem',kSlots,0,slot)
323 redis.call(
'rPush',kSlots,slot)
324 redis.call(
'zAdd',kSlotsNextRelease,rTime + 30,slot)
326 -- Find stale wait slot entries and
remove them
327 redis.call(
'zRemRangeByScore',kSlotWaits,0,rTime - 2*rTimeout)
330 -- Try to acquire a slot
if possible now
331 if redis.call(
'lLen',kSlots) > 0 then
332 slot = redis.call(
'lPop',kSlots)
333 -- Update the slot
"next release" time
334 redis.call(
'zAdd',kSlotsNextRelease,rTime + rExpiry,slot)
335 elseif redis.call(
'zCard',kSlotWaits) >= 1*rMaxQueue then
339 -- Register
this process as waiting
340 redis.call(
'zAdd',kSlotWaits,rTime,rSess)
341 redis.call(
'expireAt',kSlotWaits,math.ceil(rTime + 2*rTimeout))
343 -- Always keep renewing the expiry on use
344 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
345 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
360 3 # number of first argument(s) that are keys
374 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS)
375 local rSlot,rExpiry,rSess,rTime = unpack(ARGV)
376 -- If rSlot is
'w' then the client was told to wake up but got no slot
378 -- Update the slot
"next release" time
379 redis.call(
'zAdd',kSlotsNextRelease,rTime + rExpiry,rSlot)
380 -- Always keep renewing the expiry on use
381 redis.call(
'expireAt',kSlots,math.ceil(rTime + rExpiry))
382 redis.call(
'expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry))
384 -- Unregister
this process as waiting
385 redis.call(
'zRem',kSlotWaits,rSess)
398 3 # number of first argument(s) that are keys
406 return "poolcounter:l-slots-{$this->keySha1}-{$this->workers}";
413 return "poolcounter:z-renewtime-{$this->keySha1}-{$this->workers}";
420 return "poolcounter:z-wait-{$this->keySha1}-{$this->workers}";
427 return "poolcounter:l-wakeup-{$this->keySha1}-{$this->workers}";
435 foreach ( self::$active as $poolCounter ) {
437 if ( $poolCounter->slot !==
null ) {
438 $poolCounter->release();
440 }
catch ( Exception $e ) {
static consistentHashSort(&$array, $key, $separator="\000")
Sort the given array in a pseudo-random order which depends only on the given key and each element va...
Convenience class for weighted consistent hash rings.
Version of PoolCounter that uses Redis.
acquireForAnyone( $timeout=null)
I want to do this task, but if anyone else does it instead, it's also fine for me.
__construct( $conf, $type, $key)
int null $onRelease
AWAKE_* constant.
float null $slotTime
UNIX timestamp.
initAndPopPoolSlotList(RedisConnRef $conn, $now)
array $serversByLabel
(server label => host) map
string $session
Unique string to identify this process.
static releaseAll()
Try to make sure that locks get released (even with exceptions and fatals)
release()
I have successfully finished my task.
static PoolCounterRedis[] $active
List of active PoolCounterRedis objects in this script.
waitForSlotOrNotif( $doWakeup, $timeout=null)
registerAcquisitionTime(RedisConnRef $conn, $slot, $now)
acquireForMe( $timeout=null)
I want to do this task and I need to do it myself.
int $lockTTL
TTL for locks to expire (work should finish in this time)
string null $slot
Pool slot value.
string $keySha1
SHA-1 of the key.
RedisConnectionPool $pool
Semaphore semantics to restrict how many workers may concurrently perform a task.
onAcquire()
Update any lock tracking information when the lock is acquired.
string $key
All workers with the same key share the lock.
precheckAcquire()
Checks that the lock request is sensible.
int $timeout
Maximum time in seconds to wait for the lock.
onRelease()
Update any lock tracking information when the lock is released.
Helper class to handle automatically marking connections as reusable (via RAII pattern)
luaEval( $script, array $params, $numKeys)
static singleton(array $options)