Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 133
0.00% covered (danger)
0.00%
0 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 3
ExternalStoreMoveCluster
0.00% covered (danger)
0.00%
0 / 67
0.00% covered (danger)
0.00%
0 / 4
110
0.00% covered (danger)
0.00%
0 / 1
 schema
n/a
0 / 0
n/a
0 / 0
0
 __construct
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 53
0.00% covered (danger)
0.00%
0 / 1
56
 output
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 error
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
ExternalStoreUpdateGenerator
0.00% covered (danger)
0.00%
0 / 49
0.00% covered (danger)
0.00%
0 / 4
110
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 update
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 read
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 write
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
20
FlowExternalStoreMoveCluster
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 1
 schema
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace Flow\Maintenance;
4
5use BatchRowIterator;
6use BatchRowUpdate;
7use BatchRowWriter;
8use ExternalStore;
9use Flow\Container;
10use Flow\DbFactory;
11use Flow\Model\UUID;
12use Maintenance;
13use MediaWiki\MediaWikiServices;
14use MediaWiki\WikiMap\WikiMap;
15use RowUpdateGenerator;
16use RuntimeException;
17use stdClass;
18use Wikimedia\Rdbms\IDatabase;
19use Wikimedia\Rdbms\IExpression;
20use Wikimedia\Rdbms\IReadableDatabase;
21use Wikimedia\Rdbms\LikeValue;
22
23$IP = getenv( 'MW_INSTALL_PATH' );
24if ( $IP === false ) {
25    $IP = __DIR__ . '/../../..';
26}
27
28require_once "$IP/maintenance/Maintenance.php";
29
30/**
31 * @ingroup Maintenance
32 */
33abstract class ExternalStoreMoveCluster extends Maintenance {
34    /**
35     * Must return an array in the form:
36     * [
37     *     'dbr' => IReadableDatabase object,
38     *     'dbw' => IDatabase object,
39     *     'table' => 'flow_revision',
40     *     'pk' => 'rev_id',
41     *     'content' => 'rev_content',
42     *     'flags' => 'rev_flags',
43     * ]
44     *
45     * It will roughly translate into these queries, where PK is the
46     * unique key to control batching & updates, content & flags are
47     * the columns to read from & update with new ES data.
48     * It will roughly translate into these queries:
49     *
50     * Against dbr: ('cluster' will be the argument passed to --from)
51     * SELECT <pk>, <content>, <flags>
52     * FROM <table>
53     * WHERE <flags> LIKE "%external%"
54     *     AND <content> LIKE "DB://cluster/%";
55     *
56     * Against dbw:
57     * UPDATE <table>
58     * SET <content> = ..., <flags> = ...
59     * WHERE <pk> = ...;
60     *
61     * @return array{dbr:IReadableDatabase,dbw:IDatabase,table:string,pk:string,content:string,flags:string,wiki:string}
62     */
63    abstract protected function schema();
64
65    public function __construct() {
66        parent::__construct();
67
68        $this->addDescription( 'Moves ExternalStore content from (a) particular cluster(s) to ' .
69            '(an)other(s). Just make sure all clusters are valid $wgExternalServers.' );
70
71        $this->addOption( 'from', 'ExternalStore cluster to move from (comma-separated). ' .
72            'E.g.: --from=cluster24,cluster25', true, true );
73        $this->addOption( 'to', 'ExternalStore cluster to move to (comma-separated). ' .
74            'E.g.: --to=cluster26', true, true );
75        $this->addOption( 'dry-run', 'Outputs the old user content, inserts into new ' .
76            'External Store, gives hypothetical new column values for flow_revision (but does ' .
77            'not actually change flow_revision), and checks that old and new ES are the same.' );
78
79        $this->setBatchSize( 300 );
80
81        $this->requireExtension( 'Flow' );
82    }
83
84    public function execute() {
85        $from = explode( ',', $this->getOption( 'from' ) );
86        $to = explode( ',', $this->getOption( 'to' ) );
87
88        $schema = $this->schema();
89        /** @var IReadableDatabase $dbr */
90        $dbr = $schema['dbr'];
91        /** @var IDatabase $dbw */
92        $dbw = $schema['dbw'];
93
94        $iterator = new BatchRowIterator( $dbr, $schema['table'], $schema['pk'], $this->getBatchSize() );
95        $iterator->setFetchColumns( [ $schema['content'], $schema['flags'] ] );
96
97        $clusterConditions = [];
98        foreach ( $from as $cluster ) {
99            $clusterConditions[] = $dbr->expr( $schema['content'], IExpression::LIKE,
100                new LikeValue( "DB://$cluster/", $dbr->anyString() ) );
101        }
102        $iterator->addConditions( [
103            $schema['wiki'] => WikiMap::getCurrentWikiId(),
104            $dbr->expr( $schema['flags'], IExpression::LIKE, new LikeValue( $dbr->anyString(), 'external', $dbr->anyString() ) ),
105            $dbr->orExpr( $clusterConditions ),
106        ] );
107
108        $iterator->setCaller( __METHOD__ );
109
110        $updateGenerator = new ExternalStoreUpdateGenerator( $this, $to, $schema );
111
112        if ( $this->hasOption( 'dry-run' ) ) {
113            $this->output( "Starting dry run\n\n" );
114            foreach ( $iterator as $rows ) {
115                $this->output( "Starting dry run batch\n" );
116                foreach ( $rows as $row ) {
117                    $url = $row->{$schema['content']};
118                    $flags = explode( ',', $row->{$schema['flags']} );
119
120                    $oldContent = $updateGenerator->read( $url, $flags );
121                    $this->output( "\nOld content: $oldContent\n" );
122
123                    // Update itself just generates the update, it doesn't write
124                    // to flow_revision.
125                    $updatedColumns = $updateGenerator->update( $row );
126                    $this->output( "flow_revision columns would become:\n" );
127                    $this->output( var_export( $updatedColumns, true ) . "\n" );
128
129                    $newContent = $updatedColumns[$schema['content']];
130                    $newFlags = explode( ',', $updatedColumns[$schema['flags']] );
131                    if ( in_array( 'external', $newFlags, true ) ) {
132                        $newContent = $updateGenerator->read( $newContent, $newFlags );
133                    }
134
135                    if ( $newContent === $oldContent ) {
136                        $this->output( "New external store content matches old external store content\n" );
137                    } else {
138                        $revIdStr = UUID::create( $row->rev_id )->getAlphadecimal();
139                        $this->error( "New content for ID $revIdStr does not match prior content.\n" .
140                            "New content: $newContent\nOld content: $oldContent\n\nTerminating dry run.\n",
141                            1
142                        );
143                    }
144                }
145
146                $this->output( "\n\n" );
147            }
148            $this->output( "Dry run completed\n" );
149            return;
150        }
151
152        $writer = new BatchRowWriter( $dbw, $schema['table'] );
153        $writer->setCaller( __METHOD__ );
154
155        $updater = new BatchRowUpdate(
156            $iterator,
157            $writer,
158            $updateGenerator
159        );
160        $updater->setOutput( [ $this, 'output' ] );
161        $updater->execute();
162    }
163
164    /**
165     * parent::output() is a protected method, only way to access it from a
166     * callback in php5.3 is to make a public function. In 5.4 can replace with
167     * a Closure.
168     *
169     * @param string $out
170     * @param mixed|null $channel
171     */
172    public function output( $out, $channel = null ) {
173        parent::output( $out, $channel );
174    }
175
176    /**
177     * parent::error() is a protected method, only way to access it from the
178     * outside is to make it public.
179     *
180     * @param string $err
181     * @param int $die
182     */
183    public function error( $err, $die = 0 ) {
184        parent::error( $err, $die );
185    }
186}
187
188class ExternalStoreUpdateGenerator implements RowUpdateGenerator {
189    /**
190     * @var ExternalStoreMoveCluster
191     */
192    protected $script;
193
194    /**
195     * @var array
196     */
197    protected $stores = [];
198
199    /**
200     * @var array
201     */
202    protected $schema = [];
203
204    /**
205     * @param ExternalStoreMoveCluster $script
206     * @param array $stores
207     * @param array $schema
208     */
209    public function __construct( ExternalStoreMoveCluster $script, array $stores, array $schema ) {
210        $this->script = $script;
211        $this->stores = $stores;
212        $this->schema = $schema;
213    }
214
215    /**
216     * @param stdClass $row
217     * @return array
218     */
219    public function update( $row ) {
220        $url = $row->{$this->schema['content']};
221        $flags = explode( ',', $row->{$this->schema['flags']} );
222
223        try {
224            $content = $this->read( $url, $flags );
225            $data = $this->write( $content, $flags );
226        } catch ( \Exception $e ) {
227            // something went wrong, just output the error & don't update!
228            $this->script->error( $e->getMessage() . "\n" );
229            return [];
230        }
231
232        return [
233            $this->schema['content'] => $data['content'],
234            $this->schema['flags'] => implode( ',', $data['flags'] ),
235        ];
236    }
237
238    /**
239     * @param string $url
240     * @param array $flags
241     * @return string
242     */
243    public function read( $url, array $flags = [] ) {
244        $content = ExternalStore::fetchFromURL( $url );
245        if ( $content === false ) {
246            throw new RuntimeException( "Failed to fetch content from URL: $url" );
247        }
248
249        $content = MediaWikiServices::getInstance()
250            ->getBlobStoreFactory()
251            ->newSqlBlobStore()
252            ->decompressData( $content, $flags );
253        if ( $content === false ) {
254            throw new RuntimeException( "Failed to decompress content from URL: $url" );
255        }
256
257        return $content;
258    }
259
260    /**
261     * @param string $content
262     * @param array $flags
263     * @return array New ExternalStore data in the form of ['content' => ..., 'flags' => [ ... ]]
264     */
265    protected function write( $content, array $flags = [] ) {
266        // external, utf-8 & gzip flags are no longer valid at this point
267        $oldFlags = array_diff( $flags, [ 'external', 'utf-8', 'gzip' ] );
268
269        if ( $content === '' ) {
270            // don't store empty content elsewhere
271            return [
272                'content' => $content,
273                'flags' => $oldFlags,
274            ];
275        }
276
277        // re-compress (if $wgCompressRevisions is enabled) the content & set flags accordingly
278        $compressed = MediaWikiServices::getInstance()
279            ->getBlobStoreFactory()
280            ->newSqlBlobStore()
281            ->compressData( $content );
282        $flags = array_filter( explode( ',', $compressed ) );
283
284        // ExternalStore::insertWithFallback expects stores with protocol
285        $stores = [];
286        foreach ( $this->stores as $store ) {
287            $stores[] = 'DB://' . $store;
288        }
289        $url = ExternalStore::insertWithFallback( $stores, $content );
290        if ( $url === false ) {
291            throw new RuntimeException( 'Failed to write content to stores ' . json_encode( $stores ) );
292        }
293
294        // add flag indicating content is external again, and restore unrelated flags
295        $flags[] = 'external';
296        $flags = array_merge( $flags, $oldFlags );
297
298        return [
299            'content' => $url,
300            'flags' => array_unique( $flags ),
301        ];
302    }
303}
304
305class FlowExternalStoreMoveCluster extends ExternalStoreMoveCluster {
306    protected function schema() {
307        $container = Container::getContainer();
308        /** @var DbFactory $dbFactory */
309        $dbFactory = $container['db.factory'];
310
311        return [
312            'dbr' => $dbFactory->getDB( DB_REPLICA ),
313            'dbw' => $dbFactory->getDB( DB_PRIMARY ),
314            'table' => 'flow_revision',
315            'pk' => 'rev_id',
316            'content' => 'rev_content',
317            'flags' => 'rev_flags',
318            'wiki' => 'rev_user_wiki',
319        ];
320    }
321}
322
323$maintClass = FlowExternalStoreMoveCluster::class;
324require_once RUN_MAINTENANCE_IF_MAIN;