MediaWiki  master
DeferredUpdatesScope.php
Go to the documentation of this file.
1 <?php
32  private $parentScope;
34  private $activeUpdate;
36  private $activeStage;
38  private $queueByStage;
39 
45  private function __construct(
46  $activeStage,
47  ?DeferrableUpdate $update,
48  ?DeferredUpdatesScope $parentScope
49  ) {
50  $this->activeStage = $activeStage;
51  $this->activeUpdate = $update;
52  $this->parentScope = $parentScope;
53  $this->queueByStage = array_fill_keys( DeferredUpdates::STAGES, [] );
54  }
55 
59  public static function newRootScope() {
60  return new self( null, null, null );
61  }
62 
69  public static function newChildScope(
70  $activeStage,
71  DeferrableUpdate $update,
72  DeferredUpdatesScope $parentScope
73  ) {
74  return new self( $activeStage, $update, $parentScope );
75  }
76 
82  public function getActiveUpdate() {
83  return $this->activeUpdate;
84  }
85 
92  public function addUpdate( DeferrableUpdate $update, $stage ) {
93  // Handle the case where the specified stage must have already passed
94  $stageEffective = max( $stage, $this->activeStage );
95 
96  $queue =& $this->queueByStage[$stageEffective];
97 
98  if ( $update instanceof MergeableUpdate ) {
99  $class = get_class( $update ); // fully-qualified class
100  if ( isset( $queue[$class] ) ) {
102  $existingUpdate = $queue[$class];
103  '@phan-var MergeableUpdate $existingUpdate';
104  $existingUpdate->merge( $update );
105  // Move the update to the end to handle things like mergeable purge
106  // updates that might depend on the prior updates in the queue running
107  unset( $queue[$class] );
108  $queue[$class] = $existingUpdate;
109  } else {
110  $queue[$class] = $update;
111  }
112  } else {
113  $queue[] = $update;
114  }
115  }
116 
122  public function pendingUpdatesCount() {
123  return array_sum( array_map( 'count', $this->queueByStage ) );
124  }
125 
132  public function getPendingUpdates( $stage ) {
133  $matchingQueues = [];
134  foreach ( $this->queueByStage as $queueStage => $queue ) {
135  if ( $stage === DeferredUpdates::ALL || $stage === $queueStage ) {
136  $matchingQueues[] = $queue;
137  }
138  }
139 
140  return array_merge( ...$matchingQueues );
141  }
142 
146  public function clearPendingUpdates() {
147  $this->queueByStage = array_fill_keys( array_keys( $this->queueByStage ), [] );
148  }
149 
157  public function consumeMatchingUpdates( $stage, $class, callable $callback ) {
158  // T268840: defensively claim the pending updates in case of recursion
159  $claimedUpdates = [];
160  foreach ( $this->queueByStage as $queueStage => $queue ) {
161  if ( $stage === DeferredUpdates::ALL || $stage === $queueStage ) {
162  foreach ( $queue as $k => $update ) {
163  if ( $update instanceof $class ) {
164  $claimedUpdates[] = $update;
165  unset( $this->queueByStage[$queueStage][$k] );
166  }
167  }
168  }
169  }
170  // Execute the callback for each update
171  foreach ( $claimedUpdates as $update ) {
172  $callback( $update );
173  }
174  }
175 
184  public function processUpdates( $stage, callable $callback ) {
185  if ( $stage === DeferredUpdates::ALL ) {
186  // Do everything, all the way to the last "defer until" stage
187  $activeStage = DeferredUpdates::STAGES[count( DeferredUpdates::STAGES ) - 1];
188  } else {
189  // Handle the case where the specified stage must have already passed
190  $activeStage = max( $stage, $this->activeStage );
191  }
192 
193  do {
194  $processed = $this->upmergeUnreadyUpdates( $activeStage );
195  foreach ( range( DeferredUpdates::STAGES[0], $activeStage ) as $queueStage ) {
196  $processed += $this->processStageQueue( $queueStage, $activeStage, $callback );
197  }
198  } while ( $processed > 0 );
199  }
200 
211  private function upmergeUnreadyUpdates( $activeStage ) {
212  $reassigned = 0;
213 
214  if ( !$this->parentScope ) {
215  return $reassigned;
216  }
217 
218  foreach ( $this->queueByStage as $queueStage => $queue ) {
219  foreach ( $queue as $k => $update ) {
220  if ( $update instanceof MergeableUpdate || $queueStage > $activeStage ) {
221  unset( $this->queueByStage[$queueStage][$k] );
222  $this->parentScope->addUpdate( $update, $queueStage );
223  ++$reassigned;
224  }
225  }
226  }
227 
228  return $reassigned;
229  }
230 
237  private function processStageQueue( $stage, $activeStage, callable $callback ) {
238  $processed = 0;
239 
240  // Defensively claim the pending updates in case of recursion
241  $claimedUpdates = $this->queueByStage[$stage];
242  $this->queueByStage[$stage] = [];
243 
244  // Keep doing rounds of updates until none get enqueued...
245  while ( $claimedUpdates ) {
246  // Segregate the updates into one for DataUpdate and one for everything else.
247  // This is done for historical reasons; DataUpdate used to have its own static
248  // method for running DataUpdate instances and was called first in DeferredUpdates.
249  // Before that, page updater code directly ran that static method.
250  // @TODO: remove this logic given the existence of RefreshSecondaryDataUpdate
251  $claimedDataUpdates = [];
252  $claimedGenericUpdates = [];
253  foreach ( $claimedUpdates as $claimedUpdate ) {
254  if ( $claimedUpdate instanceof DataUpdate ) {
255  $claimedDataUpdates[] = $claimedUpdate;
256  } else {
257  $claimedGenericUpdates[] = $claimedUpdate;
258  }
259  ++$processed;
260  }
261 
262  // Execute the DataUpdate queue followed by the DeferrableUpdate queue...
263  foreach ( $claimedDataUpdates as $claimedDataUpdate ) {
264  $callback( $claimedDataUpdate, $activeStage );
265  }
266  foreach ( $claimedGenericUpdates as $claimedGenericUpdate ) {
267  $callback( $claimedGenericUpdate, $activeStage );
268  }
269 
270  // Check for new entries; defensively claim the pending updates in case of recursion
271  $claimedUpdates = $this->queueByStage[$stage];
272  $this->queueByStage[$stage] = [];
273  }
274 
275  return $processed;
276  }
277 }
Abstract base class for update jobs that do something with some secondary data extracted from article...
Definition: DataUpdate.php:30
DeferredUpdates helper class for managing DeferrableUpdate::doUpdate() nesting levels caused by neste...
consumeMatchingUpdates( $stage, $class, callable $callback)
Remove pending updates of the specified stage/class and pass them to a callback.
static newChildScope( $activeStage, DeferrableUpdate $update, DeferredUpdatesScope $parentScope)
pendingUpdatesCount()
Get the number of pending updates within this scope.
addUpdate(DeferrableUpdate $update, $stage)
Enqueue a deferred update within this scope using the specified "defer until" time.
getPendingUpdates( $stage)
Get pending updates within this scope with the given "defer until" stage.
processUpdates( $stage, callable $callback)
Iteratively, reassign unready pending updates to the parent scope (if applicable) and process the rea...
clearPendingUpdates()
Cancel all pending updates within this scope.
getActiveUpdate()
Get the deferred update that owns this scope (root scope has none)
Interface that deferrable updates should implement.
Interface that deferrable updates can implement to signal that updates can be combined.