Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 132 |
|
0.00% |
0 / 9 |
CRAP | |
0.00% |
0 / 3 |
ExternalStoreMoveCluster | |
0.00% |
0 / 66 |
|
0.00% |
0 / 4 |
110 | |
0.00% |
0 / 1 |
schema | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
__construct | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
2 | |||
execute | |
0.00% |
0 / 52 |
|
0.00% |
0 / 1 |
56 | |||
output | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
error | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
ExternalStoreUpdateGenerator | |
0.00% |
0 / 49 |
|
0.00% |
0 / 4 |
110 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
update | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
read | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
write | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
20 | |||
FlowExternalStoreMoveCluster | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 1 |
schema | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace Flow\Maintenance; |
4 | |
5 | use BatchRowIterator; |
6 | use BatchRowUpdate; |
7 | use BatchRowWriter; |
8 | use ExternalStore; |
9 | use Flow\Container; |
10 | use Flow\DbFactory; |
11 | use Flow\Model\UUID; |
12 | use Maintenance; |
13 | use MediaWiki\MediaWikiServices; |
14 | use MediaWiki\WikiMap\WikiMap; |
15 | use RowUpdateGenerator; |
16 | use RuntimeException; |
17 | use stdClass; |
18 | use Wikimedia\Rdbms\IDatabase; |
19 | |
20 | $IP = getenv( 'MW_INSTALL_PATH' ); |
21 | if ( $IP === false ) { |
22 | $IP = __DIR__ . '/../../..'; |
23 | } |
24 | |
25 | require_once "$IP/maintenance/Maintenance.php"; |
26 | |
27 | /** |
28 | * @ingroup Maintenance |
29 | */ |
30 | abstract class ExternalStoreMoveCluster extends Maintenance { |
31 | /** |
32 | * Must return an array in the form: |
33 | * [ |
34 | * 'dbr' => IDatabase object, |
35 | * 'dbw' => IDatabase object, |
36 | * 'table' => 'flow_revision', |
37 | * 'pk' => 'rev_id', |
38 | * 'content' => 'rev_content', |
39 | * 'flags' => 'rev_flags', |
40 | * ] |
41 | * |
42 | * It will roughly translate into these queries, where PK is the |
43 | * unique key to control batching & updates, content & flags are |
44 | * the columns to read from & update with new ES data. |
45 | * It will roughly translate into these queries: |
46 | * |
47 | * Against dbr: ('cluster' will be the argument passed to --from) |
48 | * SELECT <pk>, <content>, <flags> |
49 | * FROM <table> |
50 | * WHERE <flags> LIKE "%external%" |
51 | * AND <content> LIKE "DB://cluster/%"; |
52 | * |
53 | * Against dbw: |
54 | * UPDATE <table> |
55 | * SET <content> = ..., <flags> = ... |
56 | * WHERE <pk> = ...; |
57 | * |
58 | * @return array |
59 | */ |
60 | abstract protected function schema(); |
61 | |
62 | public function __construct() { |
63 | parent::__construct(); |
64 | |
65 | $this->addDescription( 'Moves ExternalStore content from (a) particular cluster(s) to ' . |
66 | '(an)other(s). Just make sure all clusters are valid $wgExternalServers.' ); |
67 | |
68 | $this->addOption( 'from', 'ExternalStore cluster to move from (comma-separated). ' . |
69 | 'E.g.: --from=cluster24,cluster25', true, true ); |
70 | $this->addOption( 'to', 'ExternalStore cluster to move to (comma-separated). ' . |
71 | 'E.g.: --to=cluster26', true, true ); |
72 | $this->addOption( 'dry-run', 'Outputs the old user content, inserts into new ' . |
73 | 'External Store, gives hypothetical new column values for flow_revision (but does ' . |
74 | 'not actually change flow_revision), and checks that old and new ES are the same.' ); |
75 | |
76 | $this->setBatchSize( 300 ); |
77 | |
78 | $this->requireExtension( 'Flow' ); |
79 | } |
80 | |
81 | public function execute() { |
82 | $from = explode( ',', $this->getOption( 'from' ) ); |
83 | $to = explode( ',', $this->getOption( 'to' ) ); |
84 | |
85 | $schema = $this->schema(); |
86 | /** @var IDatabase $dbr */ |
87 | $dbr = $schema['dbr']; |
88 | /** @var IDatabase $dbw */ |
89 | $dbw = $schema['dbw']; |
90 | |
91 | $iterator = new BatchRowIterator( $dbr, $schema['table'], $schema['pk'], $this->getBatchSize() ); |
92 | $iterator->setFetchColumns( [ $schema['content'], $schema['flags'] ] ); |
93 | |
94 | $clusterConditions = []; |
95 | foreach ( $from as $cluster ) { |
96 | $clusterConditions[] = $schema['content'] . $dbr->buildLike( "DB://$cluster/", $dbr->anyString() ); |
97 | } |
98 | $iterator->addConditions( [ |
99 | $schema['wiki'] => WikiMap::getCurrentWikiId(), |
100 | $schema['flags'] . $dbr->buildLike( $dbr->anyString(), 'external', $dbr->anyString() ), |
101 | $dbr->makeList( $clusterConditions, LIST_OR ), |
102 | ] ); |
103 | |
104 | $iterator->setCaller( __METHOD__ ); |
105 | |
106 | $updateGenerator = new ExternalStoreUpdateGenerator( $this, $to, $schema ); |
107 | |
108 | if ( $this->hasOption( 'dry-run' ) ) { |
109 | $this->output( "Starting dry run\n\n" ); |
110 | foreach ( $iterator as $rows ) { |
111 | $this->output( "Starting dry run batch\n" ); |
112 | foreach ( $rows as $row ) { |
113 | $url = $row->{$schema['content']}; |
114 | $flags = explode( ',', $row->{$schema['flags']} ); |
115 | |
116 | $oldContent = $updateGenerator->read( $url, $flags ); |
117 | $this->output( "\nOld content: $oldContent\n" ); |
118 | |
119 | // Update itself just generates the update, it doesn't write |
120 | // to flow_revision. |
121 | $updatedColumns = $updateGenerator->update( $row ); |
122 | $this->output( "flow_revision columns would become:\n" ); |
123 | $this->output( var_export( $updatedColumns, true ) . "\n" ); |
124 | |
125 | $newContent = $updatedColumns[$schema['content']]; |
126 | $newFlags = explode( ',', $updatedColumns[$schema['flags']] ); |
127 | if ( in_array( 'external', $newFlags, true ) ) { |
128 | $newContent = $updateGenerator->read( $newContent, $newFlags ); |
129 | } |
130 | |
131 | if ( $newContent === $oldContent ) { |
132 | $this->output( "New external store content matches old external store content\n" ); |
133 | } else { |
134 | $revIdStr = UUID::create( $row->rev_id )->getAlphadecimal(); |
135 | $this->error( "New content for ID $revIdStr does not match prior content.\n" . |
136 | "New content: $newContent\nOld content: $oldContent\n\nTerminating dry run.\n", |
137 | 1 |
138 | ); |
139 | } |
140 | } |
141 | |
142 | $this->output( "\n\n" ); |
143 | } |
144 | $this->output( "Dry run completed\n" ); |
145 | return; |
146 | } |
147 | |
148 | $writer = new BatchRowWriter( $dbw, $schema['table'] ); |
149 | $writer->setCaller( __METHOD__ ); |
150 | |
151 | $updater = new BatchRowUpdate( |
152 | $iterator, |
153 | $writer, |
154 | $updateGenerator |
155 | ); |
156 | $updater->setOutput( [ $this, 'output' ] ); |
157 | $updater->execute(); |
158 | } |
159 | |
160 | /** |
161 | * parent::output() is a protected method, only way to access it from a |
162 | * callback in php5.3 is to make a public function. In 5.4 can replace with |
163 | * a Closure. |
164 | * |
165 | * @param string $out |
166 | * @param mixed|null $channel |
167 | */ |
168 | public function output( $out, $channel = null ) { |
169 | parent::output( $out, $channel ); |
170 | } |
171 | |
172 | /** |
173 | * parent::error() is a protected method, only way to access it from the |
174 | * outside is to make it public. |
175 | * |
176 | * @param string $err |
177 | * @param int $die |
178 | */ |
179 | public function error( $err, $die = 0 ) { |
180 | parent::error( $err, $die ); |
181 | } |
182 | } |
183 | |
184 | class ExternalStoreUpdateGenerator implements RowUpdateGenerator { |
185 | /** |
186 | * @var ExternalStoreMoveCluster |
187 | */ |
188 | protected $script; |
189 | |
190 | /** |
191 | * @var array |
192 | */ |
193 | protected $stores = []; |
194 | |
195 | /** |
196 | * @var array |
197 | */ |
198 | protected $schema = []; |
199 | |
200 | /** |
201 | * @param ExternalStoreMoveCluster $script |
202 | * @param array $stores |
203 | * @param array $schema |
204 | */ |
205 | public function __construct( ExternalStoreMoveCluster $script, array $stores, array $schema ) { |
206 | $this->script = $script; |
207 | $this->stores = $stores; |
208 | $this->schema = $schema; |
209 | } |
210 | |
211 | /** |
212 | * @param stdClass $row |
213 | * @return array |
214 | */ |
215 | public function update( $row ) { |
216 | $url = $row->{$this->schema['content']}; |
217 | $flags = explode( ',', $row->{$this->schema['flags']} ); |
218 | |
219 | try { |
220 | $content = $this->read( $url, $flags ); |
221 | $data = $this->write( $content, $flags ); |
222 | } catch ( \Exception $e ) { |
223 | // something went wrong, just output the error & don't update! |
224 | $this->script->error( $e->getMessage() . "\n" ); |
225 | return []; |
226 | } |
227 | |
228 | return [ |
229 | $this->schema['content'] => $data['content'], |
230 | $this->schema['flags'] => implode( ',', $data['flags'] ), |
231 | ]; |
232 | } |
233 | |
234 | /** |
235 | * @param string $url |
236 | * @param array $flags |
237 | * @return string |
238 | */ |
239 | public function read( $url, array $flags = [] ) { |
240 | $content = ExternalStore::fetchFromURL( $url ); |
241 | if ( $content === false ) { |
242 | throw new RuntimeException( "Failed to fetch content from URL: $url" ); |
243 | } |
244 | |
245 | $content = MediaWikiServices::getInstance() |
246 | ->getBlobStoreFactory() |
247 | ->newSqlBlobStore() |
248 | ->decompressData( $content, $flags ); |
249 | if ( $content === false ) { |
250 | throw new RuntimeException( "Failed to decompress content from URL: $url" ); |
251 | } |
252 | |
253 | return $content; |
254 | } |
255 | |
256 | /** |
257 | * @param string $content |
258 | * @param array $flags |
259 | * @return array New ExternalStore data in the form of ['content' => ..., 'flags' => [ ... ]] |
260 | */ |
261 | protected function write( $content, array $flags = [] ) { |
262 | // external, utf-8 & gzip flags are no longer valid at this point |
263 | $oldFlags = array_diff( $flags, [ 'external', 'utf-8', 'gzip' ] ); |
264 | |
265 | if ( $content === '' ) { |
266 | // don't store empty content elsewhere |
267 | return [ |
268 | 'content' => $content, |
269 | 'flags' => $oldFlags, |
270 | ]; |
271 | } |
272 | |
273 | // re-compress (if $wgCompressRevisions is enabled) the content & set flags accordingly |
274 | $compressed = MediaWikiServices::getInstance() |
275 | ->getBlobStoreFactory() |
276 | ->newSqlBlobStore() |
277 | ->compressData( $content ); |
278 | $flags = array_filter( explode( ',', $compressed ) ); |
279 | |
280 | // ExternalStore::insertWithFallback expects stores with protocol |
281 | $stores = []; |
282 | foreach ( $this->stores as $store ) { |
283 | $stores[] = 'DB://' . $store; |
284 | } |
285 | $url = ExternalStore::insertWithFallback( $stores, $content ); |
286 | if ( $url === false ) { |
287 | throw new RuntimeException( 'Failed to write content to stores ' . json_encode( $stores ) ); |
288 | } |
289 | |
290 | // add flag indicating content is external again, and restore unrelated flags |
291 | $flags[] = 'external'; |
292 | $flags = array_merge( $flags, $oldFlags ); |
293 | |
294 | return [ |
295 | 'content' => $url, |
296 | 'flags' => array_unique( $flags ), |
297 | ]; |
298 | } |
299 | } |
300 | |
301 | class FlowExternalStoreMoveCluster extends ExternalStoreMoveCluster { |
302 | protected function schema() { |
303 | $container = Container::getContainer(); |
304 | /** @var DbFactory $dbFactory */ |
305 | $dbFactory = $container['db.factory']; |
306 | |
307 | return [ |
308 | 'dbr' => $dbFactory->getDB( DB_REPLICA ), |
309 | 'dbw' => $dbFactory->getDB( DB_PRIMARY ), |
310 | 'table' => 'flow_revision', |
311 | 'pk' => 'rev_id', |
312 | 'content' => 'rev_content', |
313 | 'flags' => 'rev_flags', |
314 | 'wiki' => 'rev_user_wiki', |
315 | ]; |
316 | } |
317 | } |
318 | |
319 | $maintClass = FlowExternalStoreMoveCluster::class; |
320 | require_once RUN_MAINTENANCE_IF_MAIN; |