MediaWiki  master
CdnCacheUpdate.php
Go to the documentation of this file.
1 <?php
24 use Wikimedia\Assert\Assert;
25 use Wikimedia\IPUtils;
26 
33  private $urlTuples = [];
35  private $pageTuples = [];
36 
38  private const MAX_REBOUND_DELAY = 300;
39 
46  public function __construct( array $targets, array $options = [] ) {
47  $delay = min(
48  (int)max( $options['reboundDelay'] ?? 0, 0 ),
49  self::MAX_REBOUND_DELAY
50  );
51 
52  foreach ( $targets as $target ) {
53  if ( $target instanceof PageReference ) {
54  $this->pageTuples[] = [ $target, $delay ];
55  } else {
56  $this->urlTuples[] = [ $target, $delay ];
57  }
58  }
59  }
60 
61  public function merge( MergeableUpdate $update ) {
63  Assert::parameterType( __CLASS__, $update, '$update' );
64  '@phan-var self $update';
65 
66  $this->urlTuples = array_merge( $this->urlTuples, $update->urlTuples );
67  $this->pageTuples = array_merge( $this->pageTuples, $update->pageTuples );
68  }
69 
79  public static function newFromTitles( $pages, $urls = [] ) {
80  wfDeprecated( __METHOD__, '1.35' );
81  return new CdnCacheUpdate( array_merge( $pages, $urls ) );
82  }
83 
84  public function doUpdate() {
85  // Resolve the final list of URLs just before purging them (T240083)
86  $reboundDelayByUrl = $this->resolveReboundDelayByUrl();
87 
88  // Send the immediate purges to CDN
89  self::purge( array_keys( $reboundDelayByUrl ) );
90  $immediatePurgeTimestamp = time();
91 
92  // Get the URLs that need rebound purges, grouped by seconds of purge delay
93  $urlsWithReboundByDelay = [];
94  foreach ( $reboundDelayByUrl as $url => $delay ) {
95  if ( $delay > 0 ) {
96  $urlsWithReboundByDelay[$delay][] = $url;
97  }
98  }
99  // Enqueue delayed purge jobs for these URLs (usually only one job)
100  $jobs = [];
101  foreach ( $urlsWithReboundByDelay as $delay => $urls ) {
102  $jobs[] = new CdnPurgeJob( [
103  'urls' => $urls,
104  'jobReleaseTimestamp' => $immediatePurgeTimestamp + $delay
105  ] );
106  }
107  MediaWikiServices::getInstance()->getJobQueueGroup()->lazyPush( $jobs );
108  }
109 
117  public static function purge( array $urls ) {
118  $cdnServers = MediaWikiServices::getInstance()->getMainConfig()->get( MainConfigNames::CdnServers );
119  $htcpRouting = MediaWikiServices::getInstance()->getMainConfig()->get( MainConfigNames::HTCPRouting );
120  if ( !$urls ) {
121  return;
122  }
123 
124  // Remove duplicate URLs from list
125  $urls = array_unique( $urls );
126 
127  wfDebugLog( 'squid', __METHOD__ . ': ' . implode( ' ', $urls ) );
128 
129  // Reliably broadcast the purge to all edge nodes
130  $ts = microtime( true );
131  $relayerGroup = MediaWikiServices::getInstance()->getEventRelayerGroup();
132  $relayerGroup->getRelayer( 'cdn-url-purges' )->notifyMulti(
133  'cdn-url-purges',
134  array_map(
135  static function ( $url ) use ( $ts ) {
136  return [
137  'url' => $url,
138  'timestamp' => $ts,
139  ];
140  },
141  $urls
142  )
143  );
144 
145  // Send lossy UDP broadcasting if enabled
146  if ( $htcpRouting ) {
147  self::HTCPPurge( $urls );
148  }
149 
150  // Do direct server purges if enabled (this does not scale very well)
151  if ( $cdnServers ) {
152  self::naivePurge( $urls );
153  }
154  }
155 
159  public function getUrls() {
160  return array_keys( $this->resolveReboundDelayByUrl() );
161  }
162 
166  private function resolveReboundDelayByUrl() {
167  $services = MediaWikiServices::getInstance();
170  // Avoid multiple queries for HtmlCacheUpdater::getUrls() call
171  $lb = $services->getLinkBatchFactory()->newLinkBatch();
172  foreach ( $this->pageTuples as list( $page, $delay ) ) {
173  $lb->addObj( $page );
174  }
175  $lb->execute();
176 
177  $reboundDelayByUrl = [];
178 
179  // Resolve the titles into CDN URLs
180  $htmlCacheUpdater = $services->getHtmlCacheUpdater();
181  foreach ( $this->pageTuples as list( $page, $delay ) ) {
182  foreach ( $htmlCacheUpdater->getUrls( $page ) as $url ) {
183  // Use the highest rebound for duplicate URLs in order to handle the most lag
184  $reboundDelayByUrl[$url] = max( $reboundDelayByUrl[$url] ?? 0, $delay );
185  }
186  }
187 
188  foreach ( $this->urlTuples as list( $url, $delay ) ) {
189  // Use the highest rebound for duplicate URLs in order to handle the most lag
190  $reboundDelayByUrl[$url] = max( $reboundDelayByUrl[$url] ?? 0, $delay );
191  }
192 
193  return $reboundDelayByUrl;
194  }
195 
202  private static function HTCPPurge( array $urls ) {
203  $htcpRouting = MediaWikiServices::getInstance()->getMainConfig()->get( MainConfigNames::HTCPRouting );
204  $htcpMulticastTTL = MediaWikiServices::getInstance()->getMainConfig()->get( MainConfigNames::HTCPMulticastTTL );
205  // HTCP CLR operation
206  $htcpOpCLR = 4;
207 
208  // @todo FIXME: PHP doesn't support these socket constants (include/linux/in.h)
209  if ( !defined( "IPPROTO_IP" ) ) {
210  define( "IPPROTO_IP", 0 );
211  define( "IP_MULTICAST_LOOP", 34 );
212  define( "IP_MULTICAST_TTL", 33 );
213  }
214 
215  // pfsockopen doesn't work because we need set_sock_opt
216  $conn = socket_create( AF_INET, SOCK_DGRAM, SOL_UDP );
217  if ( !$conn ) {
218  $errstr = socket_strerror( socket_last_error() );
219  wfDebugLog( 'squid', __METHOD__ .
220  ": Error opening UDP socket: $errstr" );
221 
222  return;
223  }
224 
225  // Set socket options
226  socket_set_option( $conn, IPPROTO_IP, IP_MULTICAST_LOOP, 0 );
227  if ( $htcpMulticastTTL != 1 ) {
228  // Set multicast time to live (hop count) option on socket
229  socket_set_option( $conn, IPPROTO_IP, IP_MULTICAST_TTL,
230  $htcpMulticastTTL );
231  }
232 
233  // Get sequential trx IDs for packet loss counting
234  $idGenerator = MediaWikiServices::getInstance()->getGlobalIdGenerator();
235  $ids = $idGenerator->newSequentialPerNodeIDs(
236  'squidhtcppurge',
237  32,
238  count( $urls )
239  );
240 
241  foreach ( $urls as $url ) {
242  if ( !is_string( $url ) ) {
243  throw new MWException( 'Bad purge URL' );
244  }
245  $url = self::expand( $url );
246  $conf = self::getRuleForURL( $url, $htcpRouting );
247  if ( !$conf ) {
248  wfDebugLog( 'squid', __METHOD__ .
249  "No HTCP rule configured for URL {$url} , skipping" );
250  continue;
251  }
252 
253  if ( isset( $conf['host'] ) && isset( $conf['port'] ) ) {
254  // Normalize single entries
255  $conf = [ $conf ];
256  }
257  foreach ( $conf as $subconf ) {
258  if ( !isset( $subconf['host'] ) || !isset( $subconf['port'] ) ) {
259  throw new MWException( "Invalid HTCP rule for URL $url\n" );
260  }
261  }
262 
263  // Construct a minimal HTCP request diagram
264  // as per RFC 2756
265  // Opcode 'CLR', no response desired, no auth
266  $htcpTransID = current( $ids );
267  next( $ids );
268 
269  $htcpSpecifier = pack( 'na4na*na8n',
270  4, 'HEAD', strlen( $url ), $url,
271  8, 'HTTP/1.0', 0 );
272 
273  $htcpDataLen = 8 + 2 + strlen( $htcpSpecifier );
274  $htcpLen = 4 + $htcpDataLen + 2;
275 
276  // Note! Squid gets the bit order of the first
277  // word wrong, wrt the RFC. Apparently no other
278  // implementation exists, so adapt to Squid
279  $htcpPacket = pack( 'nxxnCxNxxa*n',
280  $htcpLen, $htcpDataLen, $htcpOpCLR,
281  $htcpTransID, $htcpSpecifier, 2 );
282 
283  wfDebugLog( 'squid', __METHOD__ .
284  "Purging URL $url via HTCP" );
285  foreach ( $conf as $subconf ) {
286  socket_sendto( $conn, $htcpPacket, $htcpLen, 0,
287  $subconf['host'], $subconf['port'] );
288  }
289  }
290  }
291 
298  private static function naivePurge( array $urls ) {
299  $cdnServers = MediaWikiServices::getInstance()->getMainConfig()->get( MainConfigNames::CdnServers );
300 
301  $reqs = [];
302  foreach ( $urls as $url ) {
303  $url = self::expand( $url );
304  $urlInfo = wfParseUrl( $url );
305  $urlHost = strlen( $urlInfo['port'] ?? '' )
306  ? IPUtils::combineHostAndPort( $urlInfo['host'], (int)$urlInfo['port'] )
307  : $urlInfo['host'];
308  $baseReq = [
309  'method' => 'PURGE',
310  'url' => $url,
311  'headers' => [
312  'Host' => $urlHost,
313  'Connection' => 'Keep-Alive',
314  'Proxy-Connection' => 'Keep-Alive',
315  'User-Agent' => 'MediaWiki/' . MW_VERSION . ' ' . __CLASS__
316  ]
317  ];
318  foreach ( $cdnServers as $server ) {
319  $reqs[] = ( $baseReq + [ 'proxy' => $server ] );
320  }
321  }
322 
323  $http = MediaWikiServices::getInstance()->getHttpRequestFactory()
324  ->createMultiClient( [ 'maxConnsPerHost' => 8, 'usePipelining' => true ] );
325  $http->runMulti( $reqs );
326  }
327 
342  private static function expand( $url ) {
343  return wfExpandUrl( $url, PROTO_INTERNAL );
344  }
345 
352  private static function getRuleForURL( $url, $rules ) {
353  foreach ( $rules as $regex => $routing ) {
354  if ( $regex === '' || preg_match( $regex, $url ) ) {
355  return $routing;
356  }
357  }
358 
359  return false;
360  }
361 }
const MW_VERSION
The running version of MediaWiki.
Definition: Defines.php:36
const PROTO_INTERNAL
Definition: Defines.php:200
wfParseUrl( $url)
parse_url() work-alike, but non-broken.
wfExpandUrl( $url, $defaultProto=PROTO_CURRENT)
Expand a potentially local URL to a fully-qualified URL.
wfDebugLog( $logGroup, $text, $dest='all', array $context=[])
Send a line to a supplementary debug log file, if configured, or main debug log if not.
wfDeprecated( $function, $version=false, $component=false, $callerOffset=2)
Logs a warning that a deprecated feature was used.
Handles purging the appropriate CDN objects given a list of URLs or Title instances.
__construct(array $targets, array $options=[])
static newFromTitles( $pages, $urls=[])
Create an update object from an array of Title objects, or a TitleArray object.
static purge(array $urls)
Purges a list of CDN nodes defined in $wgCdnServers.
doUpdate()
Perform the actual work.
merge(MergeableUpdate $update)
Merge this enqueued update with a new MergeableUpdate of the same qualified class name.
Job to purge a set of URLs from CDN.
Definition: CdnPurgeJob.php:30
MediaWiki exception.
Definition: MWException.php:29
A class containing constants representing the names of configuration variables.
Service locator for MediaWiki core services.
Interface that deferrable updates should implement.
Interface for objects (potentially) representing a page that can be viewable and linked to on a wiki.
Interface that deferrable updates can implement to signal that updates can be combined.