MediaWiki REL1_39
DeferredUpdatesScope.php
Go to the documentation of this file.
1<?php
32 private $parentScope;
34 private $activeUpdate;
36 private $activeStage;
38 private $queueByStage;
39
45 private function __construct(
46 $activeStage,
47 ?DeferrableUpdate $update,
48 ?DeferredUpdatesScope $parentScope
49 ) {
50 $this->activeStage = $activeStage;
51 $this->activeUpdate = $update;
52 $this->parentScope = $parentScope;
53 $this->queueByStage = array_fill_keys( DeferredUpdates::STAGES, [] );
54 }
55
59 public static function newRootScope() {
60 return new self( null, null, null );
61 }
62
69 public static function newChildScope(
70 $activeStage,
71 DeferrableUpdate $update,
72 DeferredUpdatesScope $parentScope
73 ) {
74 return new self( $activeStage, $update, $parentScope );
75 }
76
82 public function getActiveUpdate() {
83 return $this->activeUpdate;
84 }
85
92 public function addUpdate( DeferrableUpdate $update, $stage ) {
93 // Handle the case where the specified stage must have already passed
94 $stageEffective = max( $stage, $this->activeStage );
95
96 $queue =& $this->queueByStage[$stageEffective];
97
98 if ( $update instanceof MergeableUpdate ) {
99 $class = get_class( $update ); // fully-qualified class
100 if ( isset( $queue[$class] ) ) {
102 $existingUpdate = $queue[$class];
103 '@phan-var MergeableUpdate $existingUpdate';
104 $existingUpdate->merge( $update );
105 // Move the update to the end to handle things like mergeable purge
106 // updates that might depend on the prior updates in the queue running
107 unset( $queue[$class] );
108 $queue[$class] = $existingUpdate;
109 } else {
110 $queue[$class] = $update;
111 }
112 } else {
113 $queue[] = $update;
114 }
115 }
116
122 public function pendingUpdatesCount() {
123 return array_sum( array_map( 'count', $this->queueByStage ) );
124 }
125
132 public function getPendingUpdates( $stage ) {
133 $matchingQueues = [];
134 foreach ( $this->queueByStage as $queueStage => $queue ) {
135 if ( $stage === DeferredUpdates::ALL || $stage === $queueStage ) {
136 $matchingQueues[] = $queue;
137 }
138 }
139
140 return array_merge( ...$matchingQueues );
141 }
142
146 public function clearPendingUpdates() {
147 $this->queueByStage = array_fill_keys( array_keys( $this->queueByStage ), [] );
148 }
149
157 public function consumeMatchingUpdates( $stage, $class, callable $callback ) {
158 // T268840: defensively claim the pending updates in case of recursion
159 $claimedUpdates = [];
160 foreach ( $this->queueByStage as $queueStage => $queue ) {
161 if ( $stage === DeferredUpdates::ALL || $stage === $queueStage ) {
162 foreach ( $queue as $k => $update ) {
163 if ( $update instanceof $class ) {
164 $claimedUpdates[] = $update;
165 unset( $this->queueByStage[$queueStage][$k] );
166 }
167 }
168 }
169 }
170 // Execute the callback for each update
171 foreach ( $claimedUpdates as $update ) {
172 $callback( $update );
173 }
174 }
175
184 public function processUpdates( $stage, callable $callback ) {
185 if ( $stage === DeferredUpdates::ALL ) {
186 // Do everything, all the way to the last "defer until" stage
187 $activeStage = DeferredUpdates::STAGES[count( DeferredUpdates::STAGES ) - 1];
188 } else {
189 // Handle the case where the specified stage must have already passed
190 $activeStage = max( $stage, $this->activeStage );
191 }
192
193 do {
194 $processed = $this->upmergeUnreadyUpdates( $activeStage );
195 foreach ( range( DeferredUpdates::STAGES[0], $activeStage ) as $queueStage ) {
196 $processed += $this->processStageQueue( $queueStage, $activeStage, $callback );
197 }
198 } while ( $processed > 0 );
199 }
200
211 private function upmergeUnreadyUpdates( $activeStage ) {
212 $reassigned = 0;
213
214 if ( !$this->parentScope ) {
215 return $reassigned;
216 }
217
218 foreach ( $this->queueByStage as $queueStage => $queue ) {
219 foreach ( $queue as $k => $update ) {
220 if ( $update instanceof MergeableUpdate || $queueStage > $activeStage ) {
221 unset( $this->queueByStage[$queueStage][$k] );
222 $this->parentScope->addUpdate( $update, $queueStage );
223 ++$reassigned;
224 }
225 }
226 }
227
228 return $reassigned;
229 }
230
237 private function processStageQueue( $stage, $activeStage, callable $callback ) {
238 $processed = 0;
239
240 // Defensively claim the pending updates in case of recursion
241 $claimedUpdates = $this->queueByStage[$stage];
242 $this->queueByStage[$stage] = [];
243
244 // Keep doing rounds of updates until none get enqueued...
245 while ( $claimedUpdates ) {
246 // Segregate the updates into one for DataUpdate and one for everything else.
247 // This is done for historical reasons; DataUpdate used to have its own static
248 // method for running DataUpdate instances and was called first in DeferredUpdates.
249 // Before that, page updater code directly ran that static method.
250 // @TODO: remove this logic given the existence of RefreshSecondaryDataUpdate
251 $claimedDataUpdates = [];
252 $claimedGenericUpdates = [];
253 foreach ( $claimedUpdates as $claimedUpdate ) {
254 if ( $claimedUpdate instanceof DataUpdate ) {
255 $claimedDataUpdates[] = $claimedUpdate;
256 } else {
257 $claimedGenericUpdates[] = $claimedUpdate;
258 }
259 ++$processed;
260 }
261
262 // Execute the DataUpdate queue followed by the DeferrableUpdate queue...
263 foreach ( $claimedDataUpdates as $claimedDataUpdate ) {
264 $callback( $claimedDataUpdate, $activeStage );
265 }
266 foreach ( $claimedGenericUpdates as $claimedGenericUpdate ) {
267 $callback( $claimedGenericUpdate, $activeStage );
268 }
269
270 // Check for new entries; defensively claim the pending updates in case of recursion
271 $claimedUpdates = $this->queueByStage[$stage];
272 $this->queueByStage[$stage] = [];
273 }
274
275 return $processed;
276 }
277}
Abstract base class for update jobs that do something with some secondary data extracted from article...
DeferredUpdates helper class for managing DeferrableUpdate::doUpdate() nesting levels caused by neste...
consumeMatchingUpdates( $stage, $class, callable $callback)
Remove pending updates of the specified stage/class and pass them to a callback.
static newChildScope( $activeStage, DeferrableUpdate $update, DeferredUpdatesScope $parentScope)
pendingUpdatesCount()
Get the number of pending updates within this scope.
addUpdate(DeferrableUpdate $update, $stage)
Enqueue a deferred update within this scope using the specified "defer until" time.
getPendingUpdates( $stage)
Get pending updates within this scope with the given "defer until" stage.
processUpdates( $stage, callable $callback)
Iteratively, reassign unready pending updates to the parent scope (if applicable) and process the rea...
clearPendingUpdates()
Cancel all pending updates within this scope.
getActiveUpdate()
Get the deferred update that owns this scope (root scope has none)
Interface that deferrable updates should implement.
Interface that deferrable updates can implement to signal that updates can be combined.