Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
92.16% |
47 / 51 |
|
62.50% |
5 / 8 |
CRAP | |
0.00% |
0 / 1 |
WRStatsWriter | |
92.16% |
47 / 51 |
|
62.50% |
5 / 8 |
19.17 | |
0.00% |
0 / 1 |
__construct | |
75.00% |
6 / 8 |
|
0.00% |
0 / 1 |
4.25 | |||
incr | |
94.44% |
17 / 18 |
|
0.00% |
0 / 1 |
4.00 | |||
setCurrentTime | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
resetCurrentTime | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
now | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
flush | |
100.00% |
3 / 3 |
|
100.00% |
1 / 1 |
2 | |||
__destruct | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
resetAll | |
100.00% |
17 / 17 |
|
100.00% |
1 / 1 |
5 |
1 | <?php |
2 | |
3 | namespace Wikimedia\WRStats; |
4 | |
5 | /** |
6 | * Writers gather a batch of increment operations and then |
7 | * commit them when flush() is called, or when the writer is destroyed. |
8 | * |
9 | * @since 1.39 |
10 | */ |
11 | class WRStatsWriter { |
12 | /** @var StatsStore */ |
13 | private $store; |
14 | /** @var array<string,MetricSpec> */ |
15 | private $metricSpecs; |
16 | /** @var float[][] Values indexed by TTL and storage key */ |
17 | private $queuedValues = []; |
18 | /** @var float|int|null The UNIX timestamp used for the current time */ |
19 | private $now; |
20 | /** @var string[] */ |
21 | private $prefixComponents; |
22 | |
23 | /** |
24 | * @internal Use WRStatsFactory::createWriter instead |
25 | * @param StatsStore $store |
26 | * @param array<string,array> $specs |
27 | * @param string|string[] $prefix |
28 | */ |
29 | public function __construct( StatsStore $store, $specs, $prefix ) { |
30 | $this->store = $store; |
31 | $this->metricSpecs = []; |
32 | foreach ( $specs as $name => $spec ) { |
33 | $this->metricSpecs[$name] = new MetricSpec( $spec ); |
34 | } |
35 | $this->prefixComponents = is_array( $prefix ) ? $prefix : [ $prefix ]; |
36 | if ( !count( $this->prefixComponents ) ) { |
37 | throw new WRStatsError( __METHOD__ . |
38 | ': there must be at least one prefix component' ); |
39 | } |
40 | } |
41 | |
42 | /** |
43 | * Queue an increment operation. |
44 | * |
45 | * @param string $name The metric name |
46 | * @param EntityKey|null $entity Additional storage key components |
47 | * @param float|int $value The value to add |
48 | */ |
49 | public function incr( $name, ?EntityKey $entity = null, $value = 1 ) { |
50 | $metricSpec = $this->metricSpecs[$name] ?? null; |
51 | $entity ??= new LocalEntityKey; |
52 | if ( $metricSpec === null ) { |
53 | throw new WRStatsError( "Unrecognised metric \"$name\"" ); |
54 | } |
55 | $res = $metricSpec->resolution; |
56 | $scaledValue = $value / $res; |
57 | |
58 | foreach ( $metricSpec->sequences as $seqSpec ) { |
59 | $timeStep = $seqSpec->timeStep; |
60 | $timeBucket = (int)( $this->now() / $timeStep ); |
61 | $key = $this->store->makeKey( |
62 | $this->prefixComponents, |
63 | [ $name, $seqSpec->name, $timeBucket ], |
64 | $entity |
65 | ); |
66 | |
67 | $ttl = $seqSpec->hardExpiry; |
68 | |
69 | if ( !isset( $this->queuedValues[$ttl][$key] ) ) { |
70 | $this->queuedValues[$ttl][$key] = 0; |
71 | } |
72 | $this->queuedValues[$ttl][$key] += (int)round( $scaledValue ); |
73 | } |
74 | } |
75 | |
76 | /** |
77 | * Set the time to be used as the current time |
78 | * |
79 | * @param float|int $now |
80 | */ |
81 | public function setCurrentTime( $now ) { |
82 | $this->now = $now; |
83 | } |
84 | |
85 | /** |
86 | * Reset the stored current time. In a long-running process this should be |
87 | * called regularly to write new results. |
88 | * |
89 | * @return void |
90 | */ |
91 | public function resetCurrentTime() { |
92 | $this->now = null; |
93 | } |
94 | |
95 | /** |
96 | * @return float|int |
97 | */ |
98 | private function now() { |
99 | $this->now ??= microtime( true ); |
100 | return $this->now; |
101 | } |
102 | |
103 | /** |
104 | * Commit the batch of increment operations. |
105 | */ |
106 | public function flush() { |
107 | foreach ( $this->queuedValues as $ttl => $values ) { |
108 | $this->store->incr( $values, $ttl ); |
109 | } |
110 | $this->queuedValues = []; |
111 | } |
112 | |
113 | /** |
114 | * Commit the batch of increment operations. |
115 | */ |
116 | public function __destruct() { |
117 | $this->flush(); |
118 | } |
119 | |
120 | /** |
121 | * Delete all stored metrics corresponding to the specs supplied to the |
122 | * constructor, resetting the counters to zero. |
123 | * |
124 | * @param EntityKey[]|null $entities An array of additional storage key |
125 | * components. The default is the empty local entity. |
126 | */ |
127 | public function resetAll( ?array $entities = null ) { |
128 | $entities ??= [ new LocalEntityKey ]; |
129 | $this->queuedValues = []; |
130 | $keys = []; |
131 | foreach ( $this->metricSpecs as $name => $metricSpec ) { |
132 | foreach ( $metricSpec->sequences as $seqSpec ) { |
133 | $timeStep = $seqSpec->timeStep; |
134 | $ttl = $seqSpec->hardExpiry; |
135 | $lastBucket = (int)( $this->now() / $timeStep ) + 1; |
136 | $firstBucket = (int)( ( $this->now() - $ttl ) / $timeStep ) - 1; |
137 | for ( $bucket = $firstBucket; $bucket <= $lastBucket; $bucket++ ) { |
138 | foreach ( $entities as $entity ) { |
139 | $keys[] = $this->store->makeKey( |
140 | $this->prefixComponents, |
141 | [ $name, $seqSpec->name, $bucket ], |
142 | $entity |
143 | ); |
144 | } |
145 | } |
146 | } |
147 | } |
148 | $this->store->delete( $keys ); |
149 | } |
150 | } |