23 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
25 use Psr\Log\LoggerInterface;
89 self::$executeContext &&
90 self::$executeContext[
'stage'] >= $stage &&
96 self::$executeContext[
'subqueue'][] = $update;
101 if ( $stage === self::PRESEND ) {
104 self::push( self::$postSendUpdates, $update );
125 $callable, $stage = self::POSTSEND, $dbw =
null
139 public static function doUpdates( $mode =
'run', $stage = self::ALL ) {
140 $stageEffective = ( $stage ===
self::ALL ) ? self::POSTSEND : $stage;
144 if ( $stage === self::ALL || $stage === self::PRESEND ) {
148 if ( $stage === self::ALL || $stage == self::POSTSEND ) {
151 }
while ( $stage === self::ALL && self::$preSendUpdates );
160 $class = get_class( $update );
161 if ( isset(
$queue[$class] ) ) {
163 $existingUpdate =
$queue[$class];
164 '@phan-var MergeableUpdate $existingUpdate';
165 $existingUpdate->merge( $update );
169 $queue[$class] = $existingUpdate;
188 $services = MediaWikiServices::getInstance();
189 $stats = $services->getStatsdDataFactory();
190 $lbf = $services->getDBLoadBalancerFactory();
191 $logger = LoggerFactory::getInstance(
'DeferredUpdates' );
192 $httpMethod = $services->getMainConfig()->get(
'CommandLineMode' )
206 $dataUpdateQueue = [];
207 $genericUpdateQueue = [];
208 foreach ( $updates as $update ) {
210 $dataUpdateQueue[] = $update;
212 $genericUpdateQueue[] = $update;
216 foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
217 foreach ( $updateQueue as $du ) {
220 self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
224 self::$executeContext = [
'stage' => $stage,
'subqueue' => [] ];
226 $e =
self::run( $du, $lbf, $logger, $stats, $httpMethod );
229 while ( self::$executeContext[
'subqueue'] ) {
230 $duChild = reset( self::$executeContext[
'subqueue'] );
231 $firstKey = key( self::$executeContext[
'subqueue'] );
232 unset( self::$executeContext[
'subqueue'][$firstKey] );
234 $e =
self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
241 self::$executeContext =
null;
253 if ( $guiEx && $stage === self::PRESEND && !headers_sent() ) {
268 private static function run(
271 LoggerInterface $logger,
272 StatsdDataFactoryInterface $stats,
275 $name = get_class( $update );
277 $stats->increment(
"deferred_updates.$httpMethod.{$name}{$suffix}" );
282 }
catch ( Exception $e ) {
283 }
catch ( Throwable $e ) {
288 "Deferred update {type} failed: {message}",
290 'type' => $name . $suffix,
291 'message' => $e->getMessage(),
292 'trace' => $e->getTraceAsString()
298 if ( defined(
'MW_PHPUNIT_TEST' ) ) {
318 LoggerInterface $logger,
319 StatsdDataFactoryInterface $stats,
322 $stats->increment(
"deferred_updates.$httpMethod." . get_class( $update ) );
328 }
catch ( Exception $e ) {
329 }
catch ( Throwable $e ) {
334 "Job insertion of deferred update {type} failed: {message}",
336 'type' => get_class( $update ),
337 'message' => $e->getMessage(),
338 'trace' => $e->getTraceAsString()
363 $update->setTransactionTicket( $ticket );
368 ? $update->getOrigin()
369 : get_class( $update ) .
'::doUpdate';
371 $useExplicitTrxRound = !(
373 $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
377 if ( $useExplicitTrxRound ) {
401 if ( self::$executeContext ) {
406 if ( !self::areDatabaseTransactionsActive() ) {
411 if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
430 foreach ( $updates as $update ) {
432 $spec = $update->getAsJobSpecification();
433 $domain = $spec[
'domain'] ?? $spec[
'wiki'];
436 $remaining[] = $update;
448 return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
458 if ( $stage === self::ALL || $stage === self::PRESEND ) {
459 $updates = array_merge( $updates, self::$preSendUpdates );
461 if ( $stage === self::ALL || $stage === self::POSTSEND ) {
462 $updates = array_merge( $updates, self::$postSendUpdates );
472 self::$preSendUpdates = [];
473 self::$postSendUpdates = [];
480 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
481 if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) {
486 $lbFactory->forEachLB(
function (
LoadBalancer $lb ) use ( &$connsBusy ) {