Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 132
0.00% covered (danger)
0.00%
0 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 3
ExternalStoreMoveCluster
0.00% covered (danger)
0.00%
0 / 66
0.00% covered (danger)
0.00%
0 / 4
110
0.00% covered (danger)
0.00%
0 / 1
 schema
n/a
0 / 0
n/a
0 / 0
0
 __construct
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
2
 execute
0.00% covered (danger)
0.00%
0 / 52
0.00% covered (danger)
0.00%
0 / 1
56
 output
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 error
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
ExternalStoreUpdateGenerator
0.00% covered (danger)
0.00%
0 / 49
0.00% covered (danger)
0.00%
0 / 4
110
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 update
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 read
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 write
0.00% covered (danger)
0.00%
0 / 24
0.00% covered (danger)
0.00%
0 / 1
20
FlowExternalStoreMoveCluster
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 1
 schema
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3namespace Flow\Maintenance;
4
5use BatchRowIterator;
6use BatchRowUpdate;
7use BatchRowWriter;
8use ExternalStore;
9use Flow\Container;
10use Flow\DbFactory;
11use Flow\Model\UUID;
12use Maintenance;
13use MediaWiki\MediaWikiServices;
14use MediaWiki\WikiMap\WikiMap;
15use RowUpdateGenerator;
16use RuntimeException;
17use stdClass;
18use Wikimedia\Rdbms\IDatabase;
19
20$IP = getenv( 'MW_INSTALL_PATH' );
21if ( $IP === false ) {
22    $IP = __DIR__ . '/../../..';
23}
24
25require_once "$IP/maintenance/Maintenance.php";
26
27/**
28 * @ingroup Maintenance
29 */
30abstract class ExternalStoreMoveCluster extends Maintenance {
31    /**
32     * Must return an array in the form:
33     * [
34     *     'dbr' => IDatabase object,
35     *     'dbw' => IDatabase object,
36     *     'table' => 'flow_revision',
37     *     'pk' => 'rev_id',
38     *     'content' => 'rev_content',
39     *     'flags' => 'rev_flags',
40     * ]
41     *
42     * It will roughly translate into these queries, where PK is the
43     * unique key to control batching & updates, content & flags are
44     * the columns to read from & update with new ES data.
45     * It will roughly translate into these queries:
46     *
47     * Against dbr: ('cluster' will be the argument passed to --from)
48     * SELECT <pk>, <content>, <flags>
49     * FROM <table>
50     * WHERE <flags> LIKE "%external%"
51     *     AND <content> LIKE "DB://cluster/%";
52     *
53     * Against dbw:
54     * UPDATE <table>
55     * SET <content> = ..., <flags> = ...
56     * WHERE <pk> = ...;
57     *
58     * @return array
59     */
60    abstract protected function schema();
61
62    public function __construct() {
63        parent::__construct();
64
65        $this->addDescription( 'Moves ExternalStore content from (a) particular cluster(s) to ' .
66            '(an)other(s). Just make sure all clusters are valid $wgExternalServers.' );
67
68        $this->addOption( 'from', 'ExternalStore cluster to move from (comma-separated). ' .
69            'E.g.: --from=cluster24,cluster25', true, true );
70        $this->addOption( 'to', 'ExternalStore cluster to move to (comma-separated). ' .
71            'E.g.: --to=cluster26', true, true );
72        $this->addOption( 'dry-run', 'Outputs the old user content, inserts into new ' .
73            'External Store, gives hypothetical new column values for flow_revision (but does ' .
74            'not actually change flow_revision), and checks that old and new ES are the same.' );
75
76        $this->setBatchSize( 300 );
77
78        $this->requireExtension( 'Flow' );
79    }
80
81    public function execute() {
82        $from = explode( ',', $this->getOption( 'from' ) );
83        $to = explode( ',', $this->getOption( 'to' ) );
84
85        $schema = $this->schema();
86        /** @var IDatabase $dbr */
87        $dbr = $schema['dbr'];
88        /** @var IDatabase $dbw */
89        $dbw = $schema['dbw'];
90
91        $iterator = new BatchRowIterator( $dbr, $schema['table'], $schema['pk'], $this->getBatchSize() );
92        $iterator->setFetchColumns( [ $schema['content'], $schema['flags'] ] );
93
94        $clusterConditions = [];
95        foreach ( $from as $cluster ) {
96            $clusterConditions[] = $schema['content'] . $dbr->buildLike( "DB://$cluster/", $dbr->anyString() );
97        }
98        $iterator->addConditions( [
99            $schema['wiki'] => WikiMap::getCurrentWikiId(),
100            $schema['flags'] . $dbr->buildLike( $dbr->anyString(), 'external', $dbr->anyString() ),
101            $dbr->makeList( $clusterConditions, LIST_OR ),
102        ] );
103
104        $iterator->setCaller( __METHOD__ );
105
106        $updateGenerator = new ExternalStoreUpdateGenerator( $this, $to, $schema );
107
108        if ( $this->hasOption( 'dry-run' ) ) {
109            $this->output( "Starting dry run\n\n" );
110            foreach ( $iterator as $rows ) {
111                $this->output( "Starting dry run batch\n" );
112                foreach ( $rows as $row ) {
113                    $url = $row->{$schema['content']};
114                    $flags = explode( ',', $row->{$schema['flags']} );
115
116                    $oldContent = $updateGenerator->read( $url, $flags );
117                    $this->output( "\nOld content: $oldContent\n" );
118
119                    // Update itself just generates the update, it doesn't write
120                    // to flow_revision.
121                    $updatedColumns = $updateGenerator->update( $row );
122                    $this->output( "flow_revision columns would become:\n" );
123                    $this->output( var_export( $updatedColumns, true ) . "\n" );
124
125                    $newContent = $updatedColumns[$schema['content']];
126                    $newFlags = explode( ',', $updatedColumns[$schema['flags']] );
127                    if ( in_array( 'external', $newFlags, true ) ) {
128                        $newContent = $updateGenerator->read( $newContent, $newFlags );
129                    }
130
131                    if ( $newContent === $oldContent ) {
132                        $this->output( "New external store content matches old external store content\n" );
133                    } else {
134                        $revIdStr = UUID::create( $row->rev_id )->getAlphadecimal();
135                        $this->error( "New content for ID $revIdStr does not match prior content.\n" .
136                            "New content: $newContent\nOld content: $oldContent\n\nTerminating dry run.\n",
137                            1
138                        );
139                    }
140                }
141
142                $this->output( "\n\n" );
143            }
144            $this->output( "Dry run completed\n" );
145            return;
146        }
147
148        $writer = new BatchRowWriter( $dbw, $schema['table'] );
149        $writer->setCaller( __METHOD__ );
150
151        $updater = new BatchRowUpdate(
152            $iterator,
153            $writer,
154            $updateGenerator
155        );
156        $updater->setOutput( [ $this, 'output' ] );
157        $updater->execute();
158    }
159
160    /**
161     * parent::output() is a protected method, only way to access it from a
162     * callback in php5.3 is to make a public function. In 5.4 can replace with
163     * a Closure.
164     *
165     * @param string $out
166     * @param mixed|null $channel
167     */
168    public function output( $out, $channel = null ) {
169        parent::output( $out, $channel );
170    }
171
172    /**
173     * parent::error() is a protected method, only way to access it from the
174     * outside is to make it public.
175     *
176     * @param string $err
177     * @param int $die
178     */
179    public function error( $err, $die = 0 ) {
180        parent::error( $err, $die );
181    }
182}
183
184class ExternalStoreUpdateGenerator implements RowUpdateGenerator {
185    /**
186     * @var ExternalStoreMoveCluster
187     */
188    protected $script;
189
190    /**
191     * @var array
192     */
193    protected $stores = [];
194
195    /**
196     * @var array
197     */
198    protected $schema = [];
199
200    /**
201     * @param ExternalStoreMoveCluster $script
202     * @param array $stores
203     * @param array $schema
204     */
205    public function __construct( ExternalStoreMoveCluster $script, array $stores, array $schema ) {
206        $this->script = $script;
207        $this->stores = $stores;
208        $this->schema = $schema;
209    }
210
211    /**
212     * @param stdClass $row
213     * @return array
214     */
215    public function update( $row ) {
216        $url = $row->{$this->schema['content']};
217        $flags = explode( ',', $row->{$this->schema['flags']} );
218
219        try {
220            $content = $this->read( $url, $flags );
221            $data = $this->write( $content, $flags );
222        } catch ( \Exception $e ) {
223            // something went wrong, just output the error & don't update!
224            $this->script->error( $e->getMessage() . "\n" );
225            return [];
226        }
227
228        return [
229            $this->schema['content'] => $data['content'],
230            $this->schema['flags'] => implode( ',', $data['flags'] ),
231        ];
232    }
233
234    /**
235     * @param string $url
236     * @param array $flags
237     * @return string
238     */
239    public function read( $url, array $flags = [] ) {
240        $content = ExternalStore::fetchFromURL( $url );
241        if ( $content === false ) {
242            throw new RuntimeException( "Failed to fetch content from URL: $url" );
243        }
244
245        $content = MediaWikiServices::getInstance()
246            ->getBlobStoreFactory()
247            ->newSqlBlobStore()
248            ->decompressData( $content, $flags );
249        if ( $content === false ) {
250            throw new RuntimeException( "Failed to decompress content from URL: $url" );
251        }
252
253        return $content;
254    }
255
256    /**
257     * @param string $content
258     * @param array $flags
259     * @return array New ExternalStore data in the form of ['content' => ..., 'flags' => [ ... ]]
260     */
261    protected function write( $content, array $flags = [] ) {
262        // external, utf-8 & gzip flags are no longer valid at this point
263        $oldFlags = array_diff( $flags, [ 'external', 'utf-8', 'gzip' ] );
264
265        if ( $content === '' ) {
266            // don't store empty content elsewhere
267            return [
268                'content' => $content,
269                'flags' => $oldFlags,
270            ];
271        }
272
273        // re-compress (if $wgCompressRevisions is enabled) the content & set flags accordingly
274        $compressed = MediaWikiServices::getInstance()
275            ->getBlobStoreFactory()
276            ->newSqlBlobStore()
277            ->compressData( $content );
278        $flags = array_filter( explode( ',', $compressed ) );
279
280        // ExternalStore::insertWithFallback expects stores with protocol
281        $stores = [];
282        foreach ( $this->stores as $store ) {
283            $stores[] = 'DB://' . $store;
284        }
285        $url = ExternalStore::insertWithFallback( $stores, $content );
286        if ( $url === false ) {
287            throw new RuntimeException( 'Failed to write content to stores ' . json_encode( $stores ) );
288        }
289
290        // add flag indicating content is external again, and restore unrelated flags
291        $flags[] = 'external';
292        $flags = array_merge( $flags, $oldFlags );
293
294        return [
295            'content' => $url,
296            'flags' => array_unique( $flags ),
297        ];
298    }
299}
300
301class FlowExternalStoreMoveCluster extends ExternalStoreMoveCluster {
302    protected function schema() {
303        $container = Container::getContainer();
304        /** @var DbFactory $dbFactory */
305        $dbFactory = $container['db.factory'];
306
307        return [
308            'dbr' => $dbFactory->getDB( DB_REPLICA ),
309            'dbw' => $dbFactory->getDB( DB_PRIMARY ),
310            'table' => 'flow_revision',
311            'pk' => 'rev_id',
312            'content' => 'rev_content',
313            'flags' => 'rev_flags',
314            'wiki' => 'rev_user_wiki',
315        ];
316    }
317}
318
319$maintClass = FlowExternalStoreMoveCluster::class;
320require_once RUN_MAINTENANCE_IF_MAIN;