Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 133 |
|
0.00% |
0 / 9 |
CRAP | |
0.00% |
0 / 3 |
ExternalStoreMoveCluster | |
0.00% |
0 / 67 |
|
0.00% |
0 / 4 |
110 | |
0.00% |
0 / 1 |
schema | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
__construct | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
2 | |||
execute | |
0.00% |
0 / 53 |
|
0.00% |
0 / 1 |
56 | |||
output | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
error | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
ExternalStoreUpdateGenerator | |
0.00% |
0 / 49 |
|
0.00% |
0 / 4 |
110 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
update | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
read | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
write | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
20 | |||
FlowExternalStoreMoveCluster | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 1 |
schema | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | namespace Flow\Maintenance; |
4 | |
5 | use BatchRowIterator; |
6 | use BatchRowUpdate; |
7 | use BatchRowWriter; |
8 | use ExternalStore; |
9 | use Flow\Container; |
10 | use Flow\DbFactory; |
11 | use Flow\Model\UUID; |
12 | use Maintenance; |
13 | use MediaWiki\MediaWikiServices; |
14 | use MediaWiki\WikiMap\WikiMap; |
15 | use RowUpdateGenerator; |
16 | use RuntimeException; |
17 | use stdClass; |
18 | use Wikimedia\Rdbms\IDatabase; |
19 | use Wikimedia\Rdbms\IExpression; |
20 | use Wikimedia\Rdbms\IReadableDatabase; |
21 | use Wikimedia\Rdbms\LikeValue; |
22 | |
23 | $IP = getenv( 'MW_INSTALL_PATH' ); |
24 | if ( $IP === false ) { |
25 | $IP = __DIR__ . '/../../..'; |
26 | } |
27 | |
28 | require_once "$IP/maintenance/Maintenance.php"; |
29 | |
30 | /** |
31 | * @ingroup Maintenance |
32 | */ |
33 | abstract class ExternalStoreMoveCluster extends Maintenance { |
34 | /** |
35 | * Must return an array in the form: |
36 | * [ |
37 | * 'dbr' => IReadableDatabase object, |
38 | * 'dbw' => IDatabase object, |
39 | * 'table' => 'flow_revision', |
40 | * 'pk' => 'rev_id', |
41 | * 'content' => 'rev_content', |
42 | * 'flags' => 'rev_flags', |
43 | * ] |
44 | * |
45 | * It will roughly translate into these queries, where PK is the |
46 | * unique key to control batching & updates, content & flags are |
47 | * the columns to read from & update with new ES data. |
48 | * It will roughly translate into these queries: |
49 | * |
50 | * Against dbr: ('cluster' will be the argument passed to --from) |
51 | * SELECT <pk>, <content>, <flags> |
52 | * FROM <table> |
53 | * WHERE <flags> LIKE "%external%" |
54 | * AND <content> LIKE "DB://cluster/%"; |
55 | * |
56 | * Against dbw: |
57 | * UPDATE <table> |
58 | * SET <content> = ..., <flags> = ... |
59 | * WHERE <pk> = ...; |
60 | * |
61 | * @return array{dbr:IReadableDatabase,dbw:IDatabase,table:string,pk:string,content:string,flags:string,wiki:string} |
62 | */ |
63 | abstract protected function schema(); |
64 | |
65 | public function __construct() { |
66 | parent::__construct(); |
67 | |
68 | $this->addDescription( 'Moves ExternalStore content from (a) particular cluster(s) to ' . |
69 | '(an)other(s). Just make sure all clusters are valid $wgExternalServers.' ); |
70 | |
71 | $this->addOption( 'from', 'ExternalStore cluster to move from (comma-separated). ' . |
72 | 'E.g.: --from=cluster24,cluster25', true, true ); |
73 | $this->addOption( 'to', 'ExternalStore cluster to move to (comma-separated). ' . |
74 | 'E.g.: --to=cluster26', true, true ); |
75 | $this->addOption( 'dry-run', 'Outputs the old user content, inserts into new ' . |
76 | 'External Store, gives hypothetical new column values for flow_revision (but does ' . |
77 | 'not actually change flow_revision), and checks that old and new ES are the same.' ); |
78 | |
79 | $this->setBatchSize( 300 ); |
80 | |
81 | $this->requireExtension( 'Flow' ); |
82 | } |
83 | |
84 | public function execute() { |
85 | $from = explode( ',', $this->getOption( 'from' ) ); |
86 | $to = explode( ',', $this->getOption( 'to' ) ); |
87 | |
88 | $schema = $this->schema(); |
89 | /** @var IReadableDatabase $dbr */ |
90 | $dbr = $schema['dbr']; |
91 | /** @var IDatabase $dbw */ |
92 | $dbw = $schema['dbw']; |
93 | |
94 | $iterator = new BatchRowIterator( $dbr, $schema['table'], $schema['pk'], $this->getBatchSize() ); |
95 | $iterator->setFetchColumns( [ $schema['content'], $schema['flags'] ] ); |
96 | |
97 | $clusterConditions = []; |
98 | foreach ( $from as $cluster ) { |
99 | $clusterConditions[] = $dbr->expr( $schema['content'], IExpression::LIKE, |
100 | new LikeValue( "DB://$cluster/", $dbr->anyString() ) ); |
101 | } |
102 | $iterator->addConditions( [ |
103 | $schema['wiki'] => WikiMap::getCurrentWikiId(), |
104 | $dbr->expr( $schema['flags'], IExpression::LIKE, new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() ) ), |
105 | $dbr->orExpr( $clusterConditions ), |
106 | ] ); |
107 | |
108 | $iterator->setCaller( __METHOD__ ); |
109 | |
110 | $updateGenerator = new ExternalStoreUpdateGenerator( $this, $to, $schema ); |
111 | |
112 | if ( $this->hasOption( 'dry-run' ) ) { |
113 | $this->output( "Starting dry run\n\n" ); |
114 | foreach ( $iterator as $rows ) { |
115 | $this->output( "Starting dry run batch\n" ); |
116 | foreach ( $rows as $row ) { |
117 | $url = $row->{$schema['content']}; |
118 | $flags = explode( ',', $row->{$schema['flags']} ); |
119 | |
120 | $oldContent = $updateGenerator->read( $url, $flags ); |
121 | $this->output( "\nOld content: $oldContent\n" ); |
122 | |
123 | // Update itself just generates the update, it doesn't write |
124 | // to flow_revision. |
125 | $updatedColumns = $updateGenerator->update( $row ); |
126 | $this->output( "flow_revision columns would become:\n" ); |
127 | $this->output( var_export( $updatedColumns, true ) . "\n" ); |
128 | |
129 | $newContent = $updatedColumns[$schema['content']]; |
130 | $newFlags = explode( ',', $updatedColumns[$schema['flags']] ); |
131 | if ( in_array( 'external', $newFlags, true ) ) { |
132 | $newContent = $updateGenerator->read( $newContent, $newFlags ); |
133 | } |
134 | |
135 | if ( $newContent === $oldContent ) { |
136 | $this->output( "New external store content matches old external store content\n" ); |
137 | } else { |
138 | $revIdStr = UUID::create( $row->rev_id )->getAlphadecimal(); |
139 | $this->error( "New content for ID $revIdStr does not match prior content.\n" . |
140 | "New content: $newContent\nOld content: $oldContent\n\nTerminating dry run.\n", |
141 | 1 |
142 | ); |
143 | } |
144 | } |
145 | |
146 | $this->output( "\n\n" ); |
147 | } |
148 | $this->output( "Dry run completed\n" ); |
149 | return; |
150 | } |
151 | |
152 | $writer = new BatchRowWriter( $dbw, $schema['table'] ); |
153 | $writer->setCaller( __METHOD__ ); |
154 | |
155 | $updater = new BatchRowUpdate( |
156 | $iterator, |
157 | $writer, |
158 | $updateGenerator |
159 | ); |
160 | $updater->setOutput( [ $this, 'output' ] ); |
161 | $updater->execute(); |
162 | } |
163 | |
164 | /** |
165 | * parent::output() is a protected method, only way to access it from a |
166 | * callback in php5.3 is to make a public function. In 5.4 can replace with |
167 | * a Closure. |
168 | * |
169 | * @param string $out |
170 | * @param mixed|null $channel |
171 | */ |
172 | public function output( $out, $channel = null ) { |
173 | parent::output( $out, $channel ); |
174 | } |
175 | |
176 | /** |
177 | * parent::error() is a protected method, only way to access it from the |
178 | * outside is to make it public. |
179 | * |
180 | * @param string $err |
181 | * @param int $die |
182 | */ |
183 | public function error( $err, $die = 0 ) { |
184 | parent::error( $err, $die ); |
185 | } |
186 | } |
187 | |
188 | class ExternalStoreUpdateGenerator implements RowUpdateGenerator { |
189 | /** |
190 | * @var ExternalStoreMoveCluster |
191 | */ |
192 | protected $script; |
193 | |
194 | /** |
195 | * @var array |
196 | */ |
197 | protected $stores = []; |
198 | |
199 | /** |
200 | * @var array |
201 | */ |
202 | protected $schema = []; |
203 | |
204 | /** |
205 | * @param ExternalStoreMoveCluster $script |
206 | * @param array $stores |
207 | * @param array $schema |
208 | */ |
209 | public function __construct( ExternalStoreMoveCluster $script, array $stores, array $schema ) { |
210 | $this->script = $script; |
211 | $this->stores = $stores; |
212 | $this->schema = $schema; |
213 | } |
214 | |
215 | /** |
216 | * @param stdClass $row |
217 | * @return array |
218 | */ |
219 | public function update( $row ) { |
220 | $url = $row->{$this->schema['content']}; |
221 | $flags = explode( ',', $row->{$this->schema['flags']} ); |
222 | |
223 | try { |
224 | $content = $this->read( $url, $flags ); |
225 | $data = $this->write( $content, $flags ); |
226 | } catch ( \Exception $e ) { |
227 | // something went wrong, just output the error & don't update! |
228 | $this->script->error( $e->getMessage() . "\n" ); |
229 | return []; |
230 | } |
231 | |
232 | return [ |
233 | $this->schema['content'] => $data['content'], |
234 | $this->schema['flags'] => implode( ',', $data['flags'] ), |
235 | ]; |
236 | } |
237 | |
238 | /** |
239 | * @param string $url |
240 | * @param array $flags |
241 | * @return string |
242 | */ |
243 | public function read( $url, array $flags = [] ) { |
244 | $content = ExternalStore::fetchFromURL( $url ); |
245 | if ( $content === false ) { |
246 | throw new RuntimeException( "Failed to fetch content from URL: $url" ); |
247 | } |
248 | |
249 | $content = MediaWikiServices::getInstance() |
250 | ->getBlobStoreFactory() |
251 | ->newSqlBlobStore() |
252 | ->decompressData( $content, $flags ); |
253 | if ( $content === false ) { |
254 | throw new RuntimeException( "Failed to decompress content from URL: $url" ); |
255 | } |
256 | |
257 | return $content; |
258 | } |
259 | |
260 | /** |
261 | * @param string $content |
262 | * @param array $flags |
263 | * @return array New ExternalStore data in the form of ['content' => ..., 'flags' => [ ... ]] |
264 | */ |
265 | protected function write( $content, array $flags = [] ) { |
266 | // external, utf-8 & gzip flags are no longer valid at this point |
267 | $oldFlags = array_diff( $flags, [ 'external', 'utf-8', 'gzip' ] ); |
268 | |
269 | if ( $content === '' ) { |
270 | // don't store empty content elsewhere |
271 | return [ |
272 | 'content' => $content, |
273 | 'flags' => $oldFlags, |
274 | ]; |
275 | } |
276 | |
277 | // re-compress (if $wgCompressRevisions is enabled) the content & set flags accordingly |
278 | $compressed = MediaWikiServices::getInstance() |
279 | ->getBlobStoreFactory() |
280 | ->newSqlBlobStore() |
281 | ->compressData( $content ); |
282 | $flags = array_filter( explode( ',', $compressed ) ); |
283 | |
284 | // ExternalStore::insertWithFallback expects stores with protocol |
285 | $stores = []; |
286 | foreach ( $this->stores as $store ) { |
287 | $stores[] = 'DB://' . $store; |
288 | } |
289 | $url = ExternalStore::insertWithFallback( $stores, $content ); |
290 | if ( $url === false ) { |
291 | throw new RuntimeException( 'Failed to write content to stores ' . json_encode( $stores ) ); |
292 | } |
293 | |
294 | // add flag indicating content is external again, and restore unrelated flags |
295 | $flags[] = 'external'; |
296 | $flags = array_merge( $flags, $oldFlags ); |
297 | |
298 | return [ |
299 | 'content' => $url, |
300 | 'flags' => array_unique( $flags ), |
301 | ]; |
302 | } |
303 | } |
304 | |
305 | class FlowExternalStoreMoveCluster extends ExternalStoreMoveCluster { |
306 | protected function schema() { |
307 | $container = Container::getContainer(); |
308 | /** @var DbFactory $dbFactory */ |
309 | $dbFactory = $container['db.factory']; |
310 | |
311 | return [ |
312 | 'dbr' => $dbFactory->getDB( DB_REPLICA ), |
313 | 'dbw' => $dbFactory->getDB( DB_PRIMARY ), |
314 | 'table' => 'flow_revision', |
315 | 'pk' => 'rev_id', |
316 | 'content' => 'rev_content', |
317 | 'flags' => 'rev_flags', |
318 | 'wiki' => 'rev_user_wiki', |
319 | ]; |
320 | } |
321 | } |
322 | |
323 | $maintClass = FlowExternalStoreMoveCluster::class; |
324 | require_once RUN_MAINTENANCE_IF_MAIN; |