3use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
7use Wikimedia\Assert\Assert;
11use Wikimedia\ScopedCallback;
118 $this->deferredUpdatesAddCallableUpdateCallback =
119 [ DeferredUpdates::class,
'addCallableUpdate' ];
124 $this->latestUpdateCache =
new HashBagOStuff( [
'maxKeys' => 3 ] );
146 if ( !defined(
'MW_PHPUNIT_TEST' ) ) {
148 'Cannot override DeferredUpdates::addCallableUpdate callback in operation.'
152 $this->deferredUpdatesAddCallableUpdateCallback = $callback;
153 return new ScopedCallback(
function () use ( $previousValue ) {
154 $this->deferredUpdatesAddCallableUpdateCallback = $previousValue;
159 return $this->
cache->makeKey(
162 (
string)$user->
getId()
170 $this->
cache->set( $key, $item );
171 $this->cacheIndex[$target->getNamespace()][$target->getDBkey()][$user->getId()] = $key;
172 $this->stats->increment(
'WatchedItemStore.cache' );
178 $this->stats->increment(
'WatchedItemStore.uncache' );
182 $this->stats->increment(
'WatchedItemStore.uncacheLinkTarget' );
187 $this->stats->increment(
'WatchedItemStore.uncacheLinkTarget.items' );
188 $this->
cache->delete( $key );
193 $this->stats->increment(
'WatchedItemStore.uncacheUser' );
194 foreach ( $this->cacheIndex as $ns => $dbKeyArray ) {
195 foreach ( $dbKeyArray as $dbKey => $userArray ) {
196 if ( isset( $userArray[$user->
getId()] ) ) {
197 $this->stats->increment(
'WatchedItemStore.uncacheUser.items' );
198 $this->
cache->delete( $userArray[$user->
getId()] );
204 $this->latestUpdateCache->delete( $pageSeenKey );
205 $this->stash->delete( $pageSeenKey );
229 'wl_user' => $user->
getId(),
241 return $this->loadBalancer->getConnectionRef( $dbIndex, [
'watchlist' ] );
259 $dbw = $this->loadBalancer->getConnectionRef(
DB_MASTER );
262 [
'wl_user' => $user->
getId() ],
271 $userId = $user->
getId();
272 foreach ( $this->cacheIndex as $ns => $dbKeyIndex ) {
273 foreach ( $dbKeyIndex as $dbKey => $userIndex ) {
274 if ( array_key_exists( $userId, $userIndex ) ) {
275 $this->
cache->delete( $userIndex[$userId] );
276 unset( $this->cacheIndex[$ns][$dbKey][$userId] );
282 foreach ( $this->cacheIndex as $ns => $dbKeyIndex ) {
283 foreach ( $dbKeyIndex as $dbKey => $userIndex ) {
284 if ( empty( $this->cacheIndex[$ns][$dbKey] ) ) {
285 unset( $this->cacheIndex[$ns][$dbKey] );
288 if ( empty( $this->cacheIndex[$ns] ) ) {
289 unset( $this->cacheIndex[$ns] );
303 $this->queueGroup->push(
$job );
312 return (
int)
$dbr->selectField(
327 $return = (int)
$dbr->selectField(
331 'wl_user' => $user->
getId()
346 $return = (int)
$dbr->selectField(
367 $visitingWatchers = (int)
$dbr->selectField(
373 'wl_notificationtimestamp >= ' .
374 $dbr->addQuotes(
$dbr->timestamp( $threshold ) ) .
375 ' OR wl_notificationtimestamp IS NULL'
380 return $visitingWatchers;
389 if ( $this->readOnlyMode->isReadOnly() ) {
403 $ticket = count( $titles ) > $this->updateRowsPerQuery ?
404 $this->lbFactory->getEmptyTransactionTicket( __METHOD__ ) :
null;
408 foreach ( $rows as $namespace => $namespaceTitles ) {
409 $rowBatches = array_chunk( $namespaceTitles, $this->updateRowsPerQuery );
410 foreach ( $rowBatches as $toDelete ) {
411 $dbw->delete(
'watchlist', [
412 'wl_user' => $user->
getId(),
413 'wl_namespace' => $namespace,
414 'wl_title' => $toDelete
416 $affectedRows += $dbw->affectedRows();
418 $this->lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
423 return (
bool)$affectedRows;
433 $dbOptions = [
'GROUP BY' => [
'wl_namespace',
'wl_title' ] ];
437 if ( array_key_exists(
'minimumWatchers', $options ) ) {
438 $dbOptions[
'HAVING'] =
'COUNT(*) >= ' . (int)$options[
'minimumWatchers'];
444 [
'wl_title',
'wl_namespace',
'watchers' =>
'COUNT(*)' ],
445 [ $lb->constructSet(
'wl',
$dbr ) ],
451 foreach ( $targets as $linkTarget ) {
452 $watchCounts[$linkTarget->getNamespace()][$linkTarget->getDBkey()] = 0;
455 foreach (
$res as $row ) {
456 $watchCounts[$row->wl_namespace][$row->wl_title] = (int)$row->watchers;
469 array $targetsWithVisitThresholds,
470 $minimumWatchers =
null
472 if ( $targetsWithVisitThresholds === [] ) {
481 $dbOptions = [
'GROUP BY' => [
'wl_namespace',
'wl_title' ] ];
482 if ( $minimumWatchers !==
null ) {
483 $dbOptions[
'HAVING'] =
'COUNT(*) >= ' . (int)$minimumWatchers;
487 [
'wl_namespace',
'wl_title',
'watchers' =>
'COUNT(*)' ],
494 foreach ( $targetsWithVisitThresholds as list( $target ) ) {
496 $watcherCounts[$target->getNamespace()][$target->getDBkey()] = 0;
499 foreach (
$res as $row ) {
500 $watcherCounts[$row->wl_namespace][$row->wl_title] = (int)$row->watchers;
503 return $watcherCounts;
515 array $targetsWithVisitThresholds
517 $missingTargets = [];
518 $namespaceConds = [];
519 foreach ( $targetsWithVisitThresholds as list( $target, $threshold ) ) {
520 if ( $threshold ===
null ) {
521 $missingTargets[] = $target;
525 $namespaceConds[$target->getNamespace()][] = $db->
makeList( [
526 'wl_title = ' . $db->
addQuotes( $target->getDBkey() ),
529 'wl_notificationtimestamp IS NULL'
535 foreach ( $namespaceConds as $namespace => $pageConds ) {
537 'wl_namespace = ' . $namespace,
542 if ( $missingTargets ) {
544 $conds[] = $lb->constructSet(
'wl', $db );
561 $cached = $this->
getCached( $user, $target );
563 $this->stats->increment(
'WatchedItemStore.getWatchedItem.cached' );
566 $this->stats->increment(
'WatchedItemStore.getWatchedItem.load' );
584 $row =
$dbr->selectRow(
586 'wl_notificationtimestamp',
587 $this->
dbCond( $user, $target ),
600 $this->
cache( $item );
612 $options += [
'forWrite' => false ];
615 if ( array_key_exists(
'sort', $options ) ) {
617 ( in_array( $options[
'sort'], [ self::SORT_ASC, self::SORT_DESC ] ) ),
618 '$options[\'sort\']',
619 'must be SORT_ASC or SORT_DESC'
621 $dbOptions[
'ORDER BY'] = [
622 "wl_namespace {$options['sort']}",
623 "wl_title {$options['sort']}"
630 [
'wl_namespace',
'wl_title',
'wl_notificationtimestamp' ],
631 [
'wl_user' => $user->
getId() ],
637 foreach (
$res as $row ) {
638 $target =
new TitleValue( (
int)$row->wl_namespace, $row->wl_title );
644 $row->wl_notificationtimestamp, $user, $target )
648 return $watchedItems;
669 foreach ( $targets as $target ) {
670 $timestamps[$target->getNamespace()][$target->getDBkey()] =
false;
678 foreach ( $targets as $target ) {
679 $cachedItem = $this->
getCached( $user, $target );
681 $timestamps[$target->getNamespace()][$target->getDBkey()] =
682 $cachedItem->getNotificationTimestamp();
684 $targetsToLoad[] = $target;
688 if ( !$targetsToLoad ) {
697 [
'wl_namespace',
'wl_title',
'wl_notificationtimestamp' ],
699 $lb->constructSet(
'wl',
$dbr ),
700 'wl_user' => $user->
getId(),
705 foreach (
$res as $row ) {
706 $target =
new TitleValue( (
int)$row->wl_namespace, $row->wl_title );
707 $timestamps[$row->wl_namespace][$row->wl_title] =
709 $row->wl_notificationtimestamp, $user, $target );
731 if ( $this->readOnlyMode->isReadOnly() ) {
745 foreach ( $targets as $target ) {
747 'wl_user' => $user->
getId(),
748 'wl_namespace' => $target->getNamespace(),
749 'wl_title' => $target->getDBkey(),
750 'wl_notificationtimestamp' =>
null,
757 $this->
uncache( $user, $target );
761 $ticket = count( $targets ) > $this->updateRowsPerQuery ?
762 $this->lbFactory->getEmptyTransactionTicket( __METHOD__ ) :
null;
764 $rowBatches = array_chunk( $rows, $this->updateRowsPerQuery );
765 foreach ( $rowBatches as $toInsert ) {
768 $dbw->insert(
'watchlist', $toInsert, __METHOD__, [
'IGNORE' ] );
769 $affectedRows += $dbw->affectedRows();
771 $this->lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
777 foreach ( $items as $item ) {
778 $this->
cache( $item );
781 return (
bool)$affectedRows;
815 if ( !$user->
isRegistered() || $this->readOnlyMode->isReadOnly() ) {
828 if ( $timestamp !==
null ) {
829 $timestamp = $dbw->timestamp( $timestamp );
831 $ticket = $this->lbFactory->getEmptyTransactionTicket( __METHOD__ );
832 $affectedSinceWait = 0;
835 foreach ( $rows as $namespace => $namespaceTitles ) {
836 $rowBatches = array_chunk( $namespaceTitles, $this->updateRowsPerQuery );
837 foreach ( $rowBatches as $toUpdate ) {
840 [
'wl_notificationtimestamp' => $timestamp ],
842 'wl_user' => $user->
getId(),
843 'wl_namespace' => $namespace,
844 'wl_title' => $toUpdate
847 $affectedSinceWait += $dbw->affectedRows();
849 if ( $affectedSinceWait >= $this->updateRowsPerQuery ) {
850 $this->lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
851 $affectedSinceWait = 0;
865 if ( $timestamp ===
null ) {
872 $seenTimestamps->get( $this->getPageSeenKey( $target ) ) >= $timestamp
895 'userId' => $user->
getId(),
'timestamp' => $timestamp,
'casTime' => time()
901 $this->deferredUpdatesAddCallableUpdateCallback,
902 function () use (
$job ) {
919 $uids = $dbw->selectFieldValues(
923 'wl_user != ' . intval( $editor->
getId() ),
926 'wl_notificationtimestamp IS NULL',
931 $watchers = array_map(
'intval', $uids );
935 DeferredUpdates::addCallableUpdate(
936 function () use ( $timestamp, $watchers, $target, $fname ) {
938 $ticket = $this->lbFactory->getEmptyTransactionTicket( $fname );
940 $watchersChunks = array_chunk( $watchers, $this->updateRowsPerQuery );
941 foreach ( $watchersChunks as $watchersChunk ) {
942 $dbw->update(
'watchlist',
944 'wl_notificationtimestamp' => $dbw->timestamp( $timestamp )
946 'wl_user' => $watchersChunk,
951 if ( count( $watchersChunks ) > 1 ) {
952 $this->lbFactory->commitAndWaitForReplication(
953 $fname, $ticket, [
'domain' => $dbw->getDomainID() ]
959 DeferredUpdates::POSTSEND,
981 if ( $this->readOnlyMode->isReadOnly() || !$user->
isRegistered() ) {
986 $userObj = User::newFromId( $user->
getId() );
987 $titleObj = Title::castFromLinkTarget(
$title );
988 if ( !Hooks::run(
'BeforeResetNotificationTimestamp',
989 [ &$userObj, &$titleObj, $force, &$oldid ] )
993 if ( !$userObj->equals( $user ) ) {
996 if ( !$titleObj->equals(
$title ) ) {
1001 if ( $force !=
'force' ) {
1003 if ( !$item || $item->getNotificationTimestamp() ===
null ) {
1012 $latestRev = $this->revisionLookup->getRevisionByTitle(
$title );
1014 $id = $latestRev->getId();
1016 $seenTime = $latestRev->getTimestamp();
1019 if ( $seenTime ===
null ) {
1020 $seenTime = $this->revisionLookup->getTimestampFromId( $id );
1024 $this->stash->merge(
1026 function (
$cache, $key, $current ) use (
$title, $seenTime ) {
1030 if ( $seenTime > $value->get( $subKey ) ) {
1032 $value->set( $subKey, $seenTime );
1033 $this->latestUpdateCache->set( $key, $value, BagOStuff::TTL_PROC_LONG );
1034 } elseif ( $seenTime ===
false ) {
1037 $this->latestUpdateCache->set( $key, $value, BagOStuff::TTL_PROC_LONG );
1051 'type' =>
'updateWatchlistNotification',
1052 'userid' => $user->
getId(),
1053 'notifTime' => $this->getNotificationTimestamp( $user,
$title, $item, $force, $oldid ),
1058 $this->queueGroup->lazyPush(
$job );
1072 return $this->latestUpdateCache->getWithSetCallback(
1074 BagOStuff::TTL_PROC_LONG,
1075 function () use ( $key ) {
1076 return $this->stash->get( $key ) ?:
null;
1086 return $this->stash->makeGlobalKey(
1087 'watchlist-recent-updates',
1088 $this->lbFactory->getLocalDomainID(),
1098 return "{$target->getNamespace()}:{$target->getDBkey()}";
1117 $oldRev = $this->revisionLookup->getRevisionById( $oldid );
1123 $nextRev = $this->revisionLookup->getNextRevision( $oldRev );
1129 if ( $item ===
null ) {
1140 $notificationTimestamp = $this->revisionLookup->getTimestampFromId( $oldid );
1147 $ts->timestamp->add(
new DateInterval(
'PT1S' ) );
1148 $notificationTimestamp = $ts->getTimestamp( TS_MW );
1151 if ( $force !=
'force' ) {
1155 return $item->getNotificationTimestamp();
1159 return $notificationTimestamp;
1172 if ( $unreadLimit !==
null ) {
1173 $unreadLimit = (int)$unreadLimit;
1174 $queryOptions[
'LIMIT'] = $unreadLimit;
1178 'wl_user' => $user->
getId(),
1179 'wl_notificationtimestamp IS NOT NULL'
1182 $rowCount =
$dbr->selectRowCount(
'watchlist',
'1', $conds, __METHOD__, $queryOptions );
1184 if ( $unreadLimit ===
null ) {
1188 if ( $rowCount >= $unreadLimit ) {
1203 $this->nsInfo->getSubjectPage( $oldTarget ),
1204 $this->nsInfo->getSubjectPage( $newTarget )
1207 $this->nsInfo->getTalkPage( $oldTarget ),
1208 $this->nsInfo->getTalkPage( $newTarget )
1220 $result = $dbw->select(
1222 [
'wl_user',
'wl_notificationtimestamp' ],
1225 'wl_title' => $oldTarget->
getDBkey(),
1232 $newDBkey = $newTarget->
getDBkey();
1234 # Construct array to replace into the watchlist
1236 foreach ( $result as $row ) {
1238 'wl_user' => $row->wl_user,
1239 'wl_namespace' => $newNamespace,
1240 'wl_title' => $newDBkey,
1241 'wl_notificationtimestamp' => $row->wl_notificationtimestamp,
1245 if ( !empty( $values ) ) {
1247 # Note that multi-row replace is very efficient for MySQL but may be inefficient for
1248 # some other DBMSes, mostly due to poor simulation by us
1251 [ [
'wl_user',
'wl_namespace',
'wl_title' ] ],
1264 foreach ( $titles as
$title ) {
1266 $rows[
$title->getNamespace() ][] =
$title->getDBkey();
1276 foreach ( $titles as
$title ) {
wfTimestampOrNull( $outputtype=TS_UNIX, $ts=null)
Return a formatted timestamp, or null if input is null.
wfTimestamp( $outputtype=TS_UNIX, $ts=0)
Get a timestamp string in one of various formats.
Job for updating user activity like "last viewed" timestamps.
Class representing a cache/ephemeral data store.
static newForUser(UserIdentity $user, $maxWatchlistId)
Job for clearing all of the "last viewed" timestamps for a user's watchlist, or setting them all to t...
Simple store for keeping values in an associative array for the current process.
Class to handle enqueueing of background jobs.
Class representing a list of titles The execute() method checks them all for existence and adds them ...
Library for creating and parsing MW-style timestamps.
Handles a simple LRU key/value map with a maximum number of entries.
This is a utility class for dealing with namespaces that encodes all the "magic" behaviors of them ba...
A service class for fetching the wiki's current read-only mode.
Represents a page (or page fragment) title within MediaWiki.
Storage layer class for WatchedItems.
getPageSeenTimestamps(UserIdentity $user)
uncacheLinkTarget(LinkTarget $target)
callable null $deferredUpdatesAddCallableUpdateCallback
countWatchedItems(UserIdentity $user)
addWatchBatchForUser(UserIdentity $user, array $targets)
duplicateEntry(LinkTarget $oldTarget, LinkTarget $newTarget)
uncacheAllItemsForUser(UserIdentity $user)
clearUserWatchedItems(UserIdentity $user)
Deletes ALL watched items for the given user when under $updateRowsPerQuery entries exist.
setNotificationTimestampsForUser(UserIdentity $user, $timestamp, array $targets=[])
Set the "last viewed" timestamps for certain titles on a user's watchlist.
duplicateAllAssociatedEntries(LinkTarget $oldTarget, LinkTarget $newTarget)
resetAllNotificationTimestampsForUser(UserIdentity $user, $timestamp=null)
Schedule a DeferredUpdate that sets all of the "last viewed" timestamps for a given user to the same ...
updateNotificationTimestamp(UserIdentity $editor, LinkTarget $target, $timestamp)
countVisitingWatchersMultiple(array $targetsWithVisitThresholds, $minimumWatchers=null)
getCacheKey(UserIdentity $user, LinkTarget $target)
ReadOnlyMode $readOnlyMode
dbCond(UserIdentity $user, LinkTarget $target)
Return an array of conditions to select or update the appropriate database row.
HashBagOStuff $latestUpdateCache
uncacheTitlesForUser(UserIdentity $user, array $titles)
getCached(UserIdentity $user, LinkTarget $target)
removeWatchBatchForUser(UserIdentity $user, array $titles)
countVisitingWatchers(LinkTarget $target, $threshold)
uncache(UserIdentity $user, LinkTarget $target)
setStatsdDataFactory(StatsdDataFactoryInterface $stats)
getWatchedItem(UserIdentity $user, LinkTarget $target)
JobQueueGroup $queueGroup
getNotificationTimestampsBatch(UserIdentity $user, array $targets)
countUnreadNotifications(UserIdentity $user, $unreadLimit=null)
getPageSeenKey(LinkTarget $target)
addWatch(UserIdentity $user, LinkTarget $target)
RevisionLookup $revisionLookup
clearUserWatchedItemsUsingJobQueue(UserIdentity $user)
Queues a job that will clear the users watchlist using the Job Queue.
getPageSeenTimestampsKey(UserIdentity $user)
isWatched(UserIdentity $user, LinkTarget $target)
__construct(ILBFactory $lbFactory, JobQueueGroup $queueGroup, BagOStuff $stash, HashBagOStuff $cache, ReadOnlyMode $readOnlyMode, $updateRowsPerQuery, NamespaceInfo $nsInfo, RevisionLookup $revisionLookup)
getLatestNotificationTimestamp( $timestamp, UserIdentity $user, LinkTarget $target)
Convert $timestamp to TS_MW or return null if the page was visited since then by $user.
removeWatch(UserIdentity $user, LinkTarget $target)
getNotificationTimestamp(UserIdentity $user, LinkTarget $title, $item, $force, $oldid)
getConnectionRef( $dbIndex)
countWatchers(LinkTarget $target)
resetNotificationTimestamp(UserIdentity $user, LinkTarget $title, $force='', $oldid=0)
countWatchersMultiple(array $targets, array $options=[])
StatsdDataFactoryInterface $stats
uncacheUser(UserIdentity $user)
getVisitingWatchersCondition(IDatabase $db, array $targetsWithVisitThresholds)
Generates condition for the query used in a batch count visiting watchers.
getTitleDbKeysGroupedByNamespace(array $titles)
loadWatchedItem(UserIdentity $user, LinkTarget $target)
LoadBalancer $loadBalancer
overrideDeferredUpdatesAddCallableUpdateCallback(callable $callback)
Overrides the DeferredUpdates::addCallableUpdate callback This is intended for use while testing and ...
array[] $cacheIndex
Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key' The index is needed so that on ...
getWatchedItemsForUser(UserIdentity $user, array $options=[])
Representation of a pair of user and title for watchlist entries.
Describes a Statsd aware interface.
if(count( $args)< 1) $job