MediaWiki REL1_35
syncFileBackend.php
Go to the documentation of this file.
1<?php
25
26require_once __DIR__ . '/Maintenance.php';
27
35 public function __construct() {
36 parent::__construct();
37 $this->addDescription( 'Sync one file backend with another using the journal' );
38 $this->addOption( 'src', 'Name of backend to sync from', true, true );
39 $this->addOption( 'dst', 'Name of destination backend to sync', false, true );
40 $this->addOption( 'start', 'Starting journal ID', false, true );
41 $this->addOption( 'end', 'Ending journal ID', false, true );
42 $this->addOption( 'posdir', 'Directory to read/record journal positions', false, true );
43 $this->addOption( 'posdump', 'Just dump current journal position into the position dir.' );
44 $this->addOption( 'postime', 'For position dumps, get the ID at this time', false, true );
45 $this->addOption( 'backoff', 'Stop at entries younger than this age (sec).', false, true );
46 $this->addOption( 'verbose', 'Verbose mode', false, false, 'v' );
47 $this->setBatchSize( 50 );
48 }
49
50 public function execute() {
51 $backendGroup = MediaWikiServices::getInstance()->getFileBackendGroup();
52 $src = $backendGroup->get( $this->getOption( 'src' ) );
53
54 $posDir = $this->getOption( 'posdir' );
55 if ( $posDir != '' ) {
56 $posFile = "$posDir/" . rawurlencode( $src->getDomainId() );
57 } else {
58 $posFile = false;
59 }
60
61 if ( $this->hasOption( 'posdump' ) ) {
62 // Just dump the current position into the specified position dir
63 if ( !$this->hasOption( 'posdir' ) ) {
64 $this->fatalError( "Param posdir required!" );
65 }
66 if ( $this->hasOption( 'postime' ) ) {
67 $id = (int)$src->getJournal()->getPositionAtTime( $this->getOption( 'postime' ) );
68 $this->output( "Requested journal position is $id.\n" );
69 } else {
70 $id = (int)$src->getJournal()->getCurrentPosition();
71 $this->output( "Current journal position is $id.\n" );
72 }
73 if ( file_put_contents( $posFile, $id, LOCK_EX ) !== false ) {
74 $this->output( "Saved journal position file.\n" );
75 } else {
76 $this->output( "Could not save journal position file.\n" );
77 }
78 if ( $this->isQuiet() ) {
79 print $id; // give a single machine-readable number
80 }
81
82 return;
83 }
84
85 if ( !$this->hasOption( 'dst' ) ) {
86 $this->fatalError( "Param dst required!" );
87 }
88 $dst = $backendGroup->get( $this->getOption( 'dst' ) );
89
90 $start = $this->getOption( 'start', 0 );
91 if ( !$start && $posFile && is_dir( $posDir ) ) {
92 $start = is_file( $posFile )
93 ? (int)trim( file_get_contents( $posFile ) )
94 : 0;
95 ++$start; // we already did this ID, start with the next one
96 $startFromPosFile = true;
97 } else {
98 $startFromPosFile = false;
99 }
100
101 if ( $this->hasOption( 'backoff' ) ) {
102 $time = time() - $this->getOption( 'backoff', 0 );
103 $end = (int)$src->getJournal()->getPositionAtTime( $time );
104 } else {
105 $end = $this->getOption( 'end', INF );
106 }
107
108 $this->output( "Synchronizing backend '{$dst->getName()}' to '{$src->getName()}'...\n" );
109 $this->output( "Starting journal position is $start.\n" );
110 if ( is_finite( $end ) ) {
111 $this->output( "Ending journal position is $end.\n" );
112 }
113
114 // Periodically update the position file
115 $callback = function ( $pos ) use ( $startFromPosFile, $posFile, $start ) {
116 if ( $startFromPosFile && $pos >= $start ) { // successfully advanced
117 file_put_contents( $posFile, $pos, LOCK_EX );
118 }
119 };
120
121 // Actually sync the dest backend with the reference backend
122 $lastOKPos = $this->syncBackends( $src, $dst, $start, $end, $callback );
123
124 // Update the sync position file
125 if ( $startFromPosFile && $lastOKPos >= $start ) { // successfully advanced
126 if ( file_put_contents( $posFile, $lastOKPos, LOCK_EX ) !== false ) {
127 $this->output( "Updated journal position file.\n" );
128 } else {
129 $this->output( "Could not update journal position file.\n" );
130 }
131 }
132
133 if ( $lastOKPos === false ) {
134 if ( !$start ) {
135 $this->output( "No journal entries found.\n" );
136 } else {
137 $this->output( "No new journal entries found.\n" );
138 }
139 } else {
140 $this->output( "Stopped synchronization at journal position $lastOKPos.\n" );
141 }
142
143 if ( $this->isQuiet() ) {
144 print $lastOKPos; // give a single machine-readable number
145 }
146 }
147
159 protected function syncBackends(
160 FileBackend $src, FileBackend $dst, $start, $end, Closure $callback
161 ) {
162 $lastOKPos = 0; // failed
163 $first = true; // first batch
164
165 if ( $start > $end ) { // sanity
166 $this->fatalError( "Error: given starting ID greater than ending ID." );
167 }
168
169 $next = null;
170 do {
171 $limit = min( $this->getBatchSize(), $end - $start + 1 ); // don't go pass ending ID
172 $this->output( "Doing id $start to " . ( $start + $limit - 1 ) . "...\n" );
173
174 $entries = $src->getJournal()->getChangeEntries( $start, $limit, $next );
175 $start = $next; // start where we left off next time
176 if ( $first && !count( $entries ) ) {
177 return false; // nothing to do
178 }
179 $first = false;
180
181 $lastPosInBatch = 0;
182 $pathsInBatch = []; // changed paths
183 foreach ( $entries as $entry ) {
184 if ( $entry['op'] !== 'null' ) { // null ops are just for reference
185 $pathsInBatch[$entry['path']] = 1; // remove duplicates
186 }
187 $lastPosInBatch = $entry['id'];
188 }
189
190 $status = $this->syncFileBatch( array_keys( $pathsInBatch ), $src, $dst );
191 if ( $status->isOK() ) {
192 $lastOKPos = max( $lastOKPos, $lastPosInBatch );
193 $callback( $lastOKPos ); // update position file
194 } else {
195 $this->error( print_r( $status->getErrorsArray(), true ) );
196 break; // no gaps; everything up to $lastPos must be OK
197 }
198
199 if ( !$start ) {
200 $this->output( "End of journal entries.\n" );
201 }
202 } while ( $start && $start <= $end );
203
204 return $lastOKPos;
205 }
206
215 protected function syncFileBatch( array $paths, FileBackend $src, FileBackend $dst ) {
216 $status = Status::newGood();
217 if ( !count( $paths ) ) {
218 return $status; // nothing to do
219 }
220
221 // Source: convert internal backend names (FileBackendMultiWrite) to the public one
222 $sPaths = $this->replaceNamePaths( $paths, $src );
223 // Destination: get corresponding path name
224 $dPaths = $this->replaceNamePaths( $paths, $dst );
225
226 // Lock the live backend paths from modification
227 $sLock = $src->getScopedFileLocks( $sPaths, LockManager::LOCK_UW, $status );
228 $eLock = $dst->getScopedFileLocks( $dPaths, LockManager::LOCK_EX, $status );
229 if ( !$status->isOK() ) {
230 return $status;
231 }
232
233 $src->preloadFileStat( [ 'srcs' => $sPaths, 'latest' => 1 ] );
234 $dst->preloadFileStat( [ 'srcs' => $dPaths, 'latest' => 1 ] );
235
236 $ops = [];
237 $fsFiles = [];
238 foreach ( $sPaths as $i => $sPath ) {
239 $dPath = $dPaths[$i]; // destination
240 $sExists = $src->fileExists( [ 'src' => $sPath, 'latest' => 1 ] );
241 if ( $sExists === true ) { // exists in source
242 if ( $this->filesAreSame( $src, $dst, $sPath, $dPath ) ) {
243 continue; // avoid local copies for non-FS backends
244 }
245 // Note: getLocalReference() is fast for FS backends
246 $fsFile = $src->getLocalReference( [ 'src' => $sPath, 'latest' => 1 ] );
247 if ( !$fsFile ) {
248 $this->error( "Unable to sync '$dPath': could not get local copy." );
249 $status->fatal( 'backend-fail-internal', $src->getName() );
250
251 return $status;
252 }
253 $fsFiles[] = $fsFile; // keep TempFSFile objects alive as needed
254 // Note: prepare() is usually fast for key/value backends
255 $status->merge( $dst->prepare( [
256 'dir' => dirname( $dPath ), 'bypassReadOnly' => 1 ] ) );
257 if ( !$status->isOK() ) {
258 return $status;
259 }
260 $ops[] = [ 'op' => 'store',
261 'src' => $fsFile->getPath(), 'dst' => $dPath, 'overwrite' => 1 ];
262 } elseif ( $sExists === false ) { // does not exist in source
263 $ops[] = [ 'op' => 'delete', 'src' => $dPath, 'ignoreMissingSource' => 1 ];
264 } else {
265 $this->error( "Unable to sync '$dPath': could not stat file." );
266 $status->fatal( 'backend-fail-internal', $src->getName() );
267
268 return $status;
269 }
270 }
271
272 $t_start = microtime( true );
273 $status = $dst->doQuickOperations( $ops, [ 'bypassReadOnly' => 1 ] );
274 if ( !$status->isOK() ) {
275 sleep( 10 ); // wait and retry copy again
276 $status = $dst->doQuickOperations( $ops, [ 'bypassReadOnly' => 1 ] );
277 }
278 $elapsed_ms = floor( ( microtime( true ) - $t_start ) * 1000 );
279 if ( $status->isOK() && $this->getOption( 'verbose' ) ) {
280 $this->output( "Synchronized these file(s) [{$elapsed_ms}ms]:\n" .
281 implode( "\n", $dPaths ) . "\n" );
282 }
283
284 return $status;
285 }
286
294 protected function replaceNamePaths( $paths, FileBackend $backend ) {
295 return preg_replace(
296 '!^mwstore://([^/]+)!',
297 StringUtils::escapeRegexReplacement( "mwstore://" . $backend->getName() ),
298 $paths // string or array
299 );
300 }
301
302 protected function filesAreSame( FileBackend $src, FileBackend $dst, $sPath, $dPath ) {
303 return (
304 ( $src->getFileSize( [ 'src' => $sPath ] )
305 === $dst->getFileSize( [ 'src' => $dPath ] ) // short-circuit
306 ) && ( $src->getFileSha1Base36( [ 'src' => $sPath ] )
307 === $dst->getFileSha1Base36( [ 'src' => $dPath ] )
308 )
309 );
310 }
311}
312
313$maintClass = SyncFileBackend::class;
314require_once RUN_MAINTENANCE_IF_MAIN;
const RUN_MAINTENANCE_IF_MAIN
Base class for all file backend classes (including multi-write backends).
preloadFileStat(array $params)
Preload file stat information (concurrently if possible) into in-process cache.
getFileSha1Base36(array $params)
Get a SHA-1 hash of the content of the file at a storage path in the backend.
fileExists(array $params)
Check if a file exists at a storage path in the backend.
getScopedFileLocks(array $paths, $type, StatusValue $status, $timeout=0)
Lock the files at the given storage paths in the backend.
getFileSize(array $params)
Get the size (bytes) of a file at a storage path in the backend.
prepare(array $params)
Prepare a storage directory for usage.
doQuickOperations(array $ops, array $opts=[])
Perform a set of independent file operations on some files.
getLocalReference(array $params)
Returns a file system file, identical in content to the file at a storage path.
getName()
Get the unique backend name.
getJournal()
Get the file journal object for this backend.
Abstract maintenance class for quickly writing and churning out maintenance scripts with minimal effo...
error( $err, $die=0)
Throw an error to the user.
output( $out, $channel=null)
Throw some output to the user.
hasOption( $name)
Checks to see if a particular option was set.
getBatchSize()
Returns batch size.
addDescription( $text)
Set the description text.
addOption( $name, $description, $required=false, $withArg=false, $shortName=false, $multiOccurrence=false)
Add a parameter to the script.
getOption( $name, $default=null)
Get an option, or return the default.
setBatchSize( $s=0)
Set the batch size.
fatalError( $msg, $exitCode=1)
Output a message and terminate the current script.
MediaWikiServices is the service locator for the application scope of MediaWiki.
static escapeRegexReplacement( $string)
Escape a string to make it suitable for inclusion in a preg_replace() replacement parameter.
Maintenance script that syncs one file backend to another based on the journal of later.
syncBackends(FileBackend $src, FileBackend $dst, $start, $end, Closure $callback)
Sync $dst backend to $src backend based on the $src logs given after $start.
__construct()
Default constructor.
syncFileBatch(array $paths, FileBackend $src, FileBackend $dst)
Sync particular files of backend $src to the corresponding $dst backend files.
filesAreSame(FileBackend $src, FileBackend $dst, $sPath, $dPath)
execute()
Do the actual work.
replaceNamePaths( $paths, FileBackend $backend)
Substitute the backend name of storage paths with that of a given one.
while(( $__line=Maintenance::readconsole()) !==false) print
Definition eval.php:64