Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
0.00% |
0 / 133 |
|
0.00% |
0 / 9 |
CRAP | |
0.00% |
0 / 3 |
| ExternalStoreMoveCluster | |
0.00% |
0 / 67 |
|
0.00% |
0 / 4 |
110 | |
0.00% |
0 / 1 |
| schema | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
| __construct | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
2 | |||
| execute | |
0.00% |
0 / 53 |
|
0.00% |
0 / 1 |
56 | |||
| output | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| error | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| ExternalStoreUpdateGenerator | |
0.00% |
0 / 49 |
|
0.00% |
0 / 4 |
110 | |
0.00% |
0 / 1 |
| __construct | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
| update | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
6 | |||
| read | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
| write | |
0.00% |
0 / 24 |
|
0.00% |
0 / 1 |
20 | |||
| FlowExternalStoreMoveCluster | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 1 |
| schema | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
2 | |||
| 1 | <?php |
| 2 | |
| 3 | namespace Flow\Maintenance; |
| 4 | |
| 5 | use BatchRowIterator; |
| 6 | use BatchRowUpdate; |
| 7 | use BatchRowWriter; |
| 8 | use ExternalStore; |
| 9 | use Flow\Container; |
| 10 | use Flow\DbFactory; |
| 11 | use Flow\Model\UUID; |
| 12 | use MediaWiki\Maintenance\Maintenance; |
| 13 | use MediaWiki\MediaWikiServices; |
| 14 | use MediaWiki\WikiMap\WikiMap; |
| 15 | use RowUpdateGenerator; |
| 16 | use RuntimeException; |
| 17 | use stdClass; |
| 18 | use Wikimedia\Rdbms\IDatabase; |
| 19 | use Wikimedia\Rdbms\IExpression; |
| 20 | use Wikimedia\Rdbms\IReadableDatabase; |
| 21 | use Wikimedia\Rdbms\LikeValue; |
| 22 | |
| 23 | $IP = getenv( 'MW_INSTALL_PATH' ); |
| 24 | if ( $IP === false ) { |
| 25 | $IP = __DIR__ . '/../../..'; |
| 26 | } |
| 27 | |
| 28 | require_once "$IP/maintenance/Maintenance.php"; |
| 29 | |
| 30 | /** |
| 31 | * @ingroup Maintenance |
| 32 | */ |
| 33 | abstract class ExternalStoreMoveCluster extends Maintenance { |
| 34 | /** |
| 35 | * Must return an array in the form: |
| 36 | * [ |
| 37 | * 'dbr' => IReadableDatabase object, |
| 38 | * 'dbw' => IDatabase object, |
| 39 | * 'table' => 'flow_revision', |
| 40 | * 'pk' => 'rev_id', |
| 41 | * 'content' => 'rev_content', |
| 42 | * 'flags' => 'rev_flags', |
| 43 | * ] |
| 44 | * |
| 45 | * It will roughly translate into these queries, where PK is the |
| 46 | * unique key to control batching & updates, content & flags are |
| 47 | * the columns to read from & update with new ES data. |
| 48 | * It will roughly translate into these queries: |
| 49 | * |
| 50 | * Against dbr: ('cluster' will be the argument passed to --from) |
| 51 | * SELECT <pk>, <content>, <flags> |
| 52 | * FROM <table> |
| 53 | * WHERE <flags> LIKE "%external%" |
| 54 | * AND <content> LIKE "DB://cluster/%"; |
| 55 | * |
| 56 | * Against dbw: |
| 57 | * UPDATE <table> |
| 58 | * SET <content> = ..., <flags> = ... |
| 59 | * WHERE <pk> = ...; |
| 60 | * |
| 61 | * @return array{dbr:IReadableDatabase,dbw:IDatabase,table:string,pk:string,content:string,flags:string,wiki:string} |
| 62 | */ |
| 63 | abstract protected function schema(); |
| 64 | |
| 65 | public function __construct() { |
| 66 | parent::__construct(); |
| 67 | |
| 68 | $this->addDescription( 'Moves ExternalStore content from (a) particular cluster(s) to ' . |
| 69 | '(an)other(s). Just make sure all clusters are valid $wgExternalServers.' ); |
| 70 | |
| 71 | $this->addOption( 'from', 'ExternalStore cluster to move from (comma-separated). ' . |
| 72 | 'E.g.: --from=cluster24,cluster25', true, true ); |
| 73 | $this->addOption( 'to', 'ExternalStore cluster to move to (comma-separated). ' . |
| 74 | 'E.g.: --to=cluster26', true, true ); |
| 75 | $this->addOption( 'dry-run', 'Outputs the old user content, inserts into new ' . |
| 76 | 'External Store, gives hypothetical new column values for flow_revision (but does ' . |
| 77 | 'not actually change flow_revision), and checks that old and new ES are the same.' ); |
| 78 | |
| 79 | $this->setBatchSize( 300 ); |
| 80 | |
| 81 | $this->requireExtension( 'Flow' ); |
| 82 | } |
| 83 | |
| 84 | public function execute() { |
| 85 | $from = explode( ',', $this->getOption( 'from' ) ); |
| 86 | $to = explode( ',', $this->getOption( 'to' ) ); |
| 87 | |
| 88 | $schema = $this->schema(); |
| 89 | /** @var IReadableDatabase $dbr */ |
| 90 | $dbr = $schema['dbr']; |
| 91 | /** @var IDatabase $dbw */ |
| 92 | $dbw = $schema['dbw']; |
| 93 | |
| 94 | $iterator = new BatchRowIterator( $dbr, $schema['table'], $schema['pk'], $this->getBatchSize() ); |
| 95 | $iterator->setFetchColumns( [ $schema['content'], $schema['flags'] ] ); |
| 96 | |
| 97 | $clusterConditions = []; |
| 98 | foreach ( $from as $cluster ) { |
| 99 | $clusterConditions[] = $dbr->expr( $schema['content'], IExpression::LIKE, |
| 100 | new LikeValue( "DB://$cluster/", $dbr->anyString() ) ); |
| 101 | } |
| 102 | $iterator->addConditions( [ |
| 103 | $schema['wiki'] => WikiMap::getCurrentWikiId(), |
| 104 | $dbr->expr( $schema['flags'], IExpression::LIKE, new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() ) ), |
| 105 | $dbr->orExpr( $clusterConditions ), |
| 106 | ] ); |
| 107 | |
| 108 | $iterator->setCaller( __METHOD__ ); |
| 109 | |
| 110 | $updateGenerator = new ExternalStoreUpdateGenerator( $this, $to, $schema ); |
| 111 | |
| 112 | if ( $this->hasOption( 'dry-run' ) ) { |
| 113 | $this->output( "Starting dry run\n\n" ); |
| 114 | foreach ( $iterator as $rows ) { |
| 115 | $this->output( "Starting dry run batch\n" ); |
| 116 | foreach ( $rows as $row ) { |
| 117 | $url = $row->{$schema['content']}; |
| 118 | $flags = explode( ',', $row->{$schema['flags']} ); |
| 119 | |
| 120 | $oldContent = $updateGenerator->read( $url, $flags ); |
| 121 | $this->output( "\nOld content: $oldContent\n" ); |
| 122 | |
| 123 | // Update itself just generates the update, it doesn't write |
| 124 | // to flow_revision. |
| 125 | $updatedColumns = $updateGenerator->update( $row ); |
| 126 | $this->output( "flow_revision columns would become:\n" ); |
| 127 | $this->output( var_export( $updatedColumns, true ) . "\n" ); |
| 128 | |
| 129 | $newContent = $updatedColumns[$schema['content']]; |
| 130 | $newFlags = explode( ',', $updatedColumns[$schema['flags']] ); |
| 131 | if ( in_array( 'external', $newFlags, true ) ) { |
| 132 | $newContent = $updateGenerator->read( $newContent, $newFlags ); |
| 133 | } |
| 134 | |
| 135 | if ( $newContent === $oldContent ) { |
| 136 | $this->output( "New external store content matches old external store content\n" ); |
| 137 | } else { |
| 138 | $revIdStr = UUID::create( $row->rev_id )->getAlphadecimal(); |
| 139 | $this->error( "New content for ID $revIdStr does not match prior content.\n" . |
| 140 | "New content: $newContent\nOld content: $oldContent\n\nTerminating dry run.\n", |
| 141 | 1 |
| 142 | ); |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | $this->output( "\n\n" ); |
| 147 | } |
| 148 | $this->output( "Dry run completed\n" ); |
| 149 | return; |
| 150 | } |
| 151 | |
| 152 | $writer = new BatchRowWriter( $dbw, $schema['table'] ); |
| 153 | $writer->setCaller( __METHOD__ ); |
| 154 | |
| 155 | $updater = new BatchRowUpdate( |
| 156 | $iterator, |
| 157 | $writer, |
| 158 | $updateGenerator |
| 159 | ); |
| 160 | $updater->setOutput( [ $this, 'output' ] ); |
| 161 | $updater->execute(); |
| 162 | } |
| 163 | |
| 164 | /** |
| 165 | * parent::output() is a protected method, only way to access it from a |
| 166 | * callback in php5.3 is to make a public function. In 5.4 can replace with |
| 167 | * a Closure. |
| 168 | * |
| 169 | * @param string $out |
| 170 | * @param string|null $channel |
| 171 | */ |
| 172 | public function output( $out, $channel = null ) { |
| 173 | parent::output( $out, $channel ); |
| 174 | } |
| 175 | |
| 176 | /** |
| 177 | * parent::error() is a protected method, only way to access it from the |
| 178 | * outside is to make it public. |
| 179 | * |
| 180 | * @param string $err |
| 181 | * @param int $die |
| 182 | */ |
| 183 | public function error( $err, $die = 0 ) { |
| 184 | parent::error( $err, $die ); |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | class ExternalStoreUpdateGenerator implements RowUpdateGenerator { |
| 189 | /** |
| 190 | * @var ExternalStoreMoveCluster |
| 191 | */ |
| 192 | protected $script; |
| 193 | |
| 194 | /** |
| 195 | * @var array |
| 196 | */ |
| 197 | protected $stores = []; |
| 198 | |
| 199 | /** |
| 200 | * @var array |
| 201 | */ |
| 202 | protected $schema = []; |
| 203 | |
| 204 | /** |
| 205 | * @param ExternalStoreMoveCluster $script |
| 206 | * @param array $stores |
| 207 | * @param array $schema |
| 208 | */ |
| 209 | public function __construct( ExternalStoreMoveCluster $script, array $stores, array $schema ) { |
| 210 | $this->script = $script; |
| 211 | $this->stores = $stores; |
| 212 | $this->schema = $schema; |
| 213 | } |
| 214 | |
| 215 | /** |
| 216 | * @param stdClass $row |
| 217 | * @return array |
| 218 | */ |
| 219 | public function update( $row ) { |
| 220 | $url = $row->{$this->schema['content']}; |
| 221 | $flags = explode( ',', $row->{$this->schema['flags']} ); |
| 222 | |
| 223 | try { |
| 224 | $content = $this->read( $url, $flags ); |
| 225 | $data = $this->write( $content, $flags ); |
| 226 | } catch ( \Exception $e ) { |
| 227 | // something went wrong, just output the error & don't update! |
| 228 | $this->script->error( $e->getMessage() . "\n" ); |
| 229 | return []; |
| 230 | } |
| 231 | |
| 232 | return [ |
| 233 | $this->schema['content'] => $data['content'], |
| 234 | $this->schema['flags'] => implode( ',', $data['flags'] ), |
| 235 | ]; |
| 236 | } |
| 237 | |
| 238 | /** |
| 239 | * @param string $url |
| 240 | * @param array $flags |
| 241 | * @return string |
| 242 | */ |
| 243 | public function read( $url, array $flags = [] ) { |
| 244 | $content = ExternalStore::fetchFromURL( $url ); |
| 245 | if ( $content === false ) { |
| 246 | throw new RuntimeException( "Failed to fetch content from URL: $url" ); |
| 247 | } |
| 248 | |
| 249 | $content = MediaWikiServices::getInstance() |
| 250 | ->getBlobStoreFactory() |
| 251 | ->newSqlBlobStore() |
| 252 | ->decompressData( $content, $flags ); |
| 253 | if ( $content === false ) { |
| 254 | throw new RuntimeException( "Failed to decompress content from URL: $url" ); |
| 255 | } |
| 256 | |
| 257 | return $content; |
| 258 | } |
| 259 | |
| 260 | /** |
| 261 | * @param string $content |
| 262 | * @param array $flags |
| 263 | * @return array New ExternalStore data in the form of ['content' => ..., 'flags' => [ ... ]] |
| 264 | */ |
| 265 | protected function write( $content, array $flags = [] ) { |
| 266 | // external, utf-8 & gzip flags are no longer valid at this point |
| 267 | $oldFlags = array_diff( $flags, [ 'external', 'utf-8', 'gzip' ] ); |
| 268 | |
| 269 | if ( $content === '' ) { |
| 270 | // don't store empty content elsewhere |
| 271 | return [ |
| 272 | 'content' => $content, |
| 273 | 'flags' => $oldFlags, |
| 274 | ]; |
| 275 | } |
| 276 | |
| 277 | // re-compress (if $wgCompressRevisions is enabled) the content & set flags accordingly |
| 278 | $compressed = MediaWikiServices::getInstance() |
| 279 | ->getBlobStoreFactory() |
| 280 | ->newSqlBlobStore() |
| 281 | ->compressData( $content ); |
| 282 | $flags = array_filter( explode( ',', $compressed ) ); |
| 283 | |
| 284 | // ExternalStore::insertWithFallback expects stores with protocol |
| 285 | $stores = []; |
| 286 | foreach ( $this->stores as $store ) { |
| 287 | $stores[] = 'DB://' . $store; |
| 288 | } |
| 289 | $url = ExternalStore::insertWithFallback( $stores, $content ); |
| 290 | if ( $url === false ) { |
| 291 | throw new RuntimeException( 'Failed to write content to stores ' . json_encode( $stores ) ); |
| 292 | } |
| 293 | |
| 294 | // add flag indicating content is external again, and restore unrelated flags |
| 295 | $flags[] = 'external'; |
| 296 | $flags = array_merge( $flags, $oldFlags ); |
| 297 | |
| 298 | return [ |
| 299 | 'content' => $url, |
| 300 | 'flags' => array_unique( $flags ), |
| 301 | ]; |
| 302 | } |
| 303 | } |
| 304 | |
| 305 | class FlowExternalStoreMoveCluster extends ExternalStoreMoveCluster { |
| 306 | protected function schema() { |
| 307 | $container = Container::getContainer(); |
| 308 | /** @var DbFactory $dbFactory */ |
| 309 | $dbFactory = $container['db.factory']; |
| 310 | |
| 311 | return [ |
| 312 | 'dbr' => $dbFactory->getDB( DB_REPLICA ), |
| 313 | 'dbw' => $dbFactory->getDB( DB_PRIMARY ), |
| 314 | 'table' => 'flow_revision', |
| 315 | 'pk' => 'rev_id', |
| 316 | 'content' => 'rev_content', |
| 317 | 'flags' => 'rev_flags', |
| 318 | 'wiki' => 'rev_user_wiki', |
| 319 | ]; |
| 320 | } |
| 321 | } |
| 322 | |
| 323 | $maintClass = FlowExternalStoreMoveCluster::class; |
| 324 | require_once RUN_MAINTENANCE_IF_MAIN; |