Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 273
0.00% covered (danger)
0.00%
0 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
MysqlReplicationReporter
0.00% covered (danger)
0.00%
0 / 273
0.00% covered (danger)
0.00%
0 / 15
3540
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 doGetLag
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getLagFromSlaveStatus
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 getLagFromPtHeartbeat
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
20
 fetchSecondsSinceHeartbeat
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
12
 getApproximateLagStatus
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 getReplicationSafetyInfo
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 useGTIDs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 primaryPosWait
0.00% covered (danger)
0.00%
0 / 98
0.00% covered (danger)
0.00%
0 / 1
182
 getReplicaPos
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
56
 getPrimaryPos
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
110
 getServerId
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
2
 getServerUUID
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
6
 getServerGTIDs
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
12
 getServerRoleStatus
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms\Replication;
21
22use InvalidArgumentException;
23use RuntimeException;
24use stdClass;
25use Wikimedia\Rdbms\DBPrimaryPos;
26use Wikimedia\Rdbms\DBQueryError;
27use Wikimedia\Rdbms\IDatabase;
28use Wikimedia\Rdbms\MySQLPrimaryPos;
29use Wikimedia\Rdbms\Platform\ISQLPlatform;
30use Wikimedia\Rdbms\Query;
31
32/**
33 * @internal
34 * @ingroup Database
35 * @since 1.40
36 */
37class MysqlReplicationReporter extends ReplicationReporter {
38    /** @var MySQLPrimaryPos */
39    protected $lastKnownReplicaPos;
40    /** @var string Method to detect replica DB lag */
41    protected $lagDetectionMethod;
42    /** @var array Method to detect replica DB lag */
43    protected $lagDetectionOptions = [];
44    /** @var bool bool Whether to use GTID methods */
45    protected $useGTIDs = false;
46    /** @var stdClass|null */
47    private $replicationInfoRow;
48    // Cache getServerId() for 24 hours
49    private const SERVER_ID_CACHE_TTL = 86400;
50
51    /** @var float Warn if lag estimates are made for transactions older than this many seconds */
52    private const LAG_STALE_WARN_THRESHOLD = 0.100;
53
54    public function __construct(
55        $topologyRole,
56        $logger,
57        $srvCache,
58        $lagDetectionMethod,
59        $lagDetectionOptions,
60        $useGTIDs
61    ) {
62        parent::__construct( $topologyRole, $logger, $srvCache );
63        $this->lagDetectionMethod = $lagDetectionMethod;
64        $this->lagDetectionOptions = $lagDetectionOptions;
65        $this->useGTIDs = $useGTIDs;
66    }
67
68    protected function doGetLag( IDatabase $conn ) {
69        if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
70            return $this->getLagFromPtHeartbeat( $conn );
71        } else {
72            return $this->getLagFromSlaveStatus( $conn );
73        }
74    }
75
76    /**
77     * @param IDatabase $conn To make queries
78     * @return int|false Second of lag
79     */
80    protected function getLagFromSlaveStatus( IDatabase $conn ) {
81        $query = new Query(
82            'SHOW SLAVE STATUS',
83            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
84            'SHOW',
85            null,
86            'SHOW SLAVE STATUS'
87        );
88        $res = $conn->query( $query );
89        $row = $res ? $res->fetchObject() : false;
90        // If the server is not replicating, there will be no row
91        if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
92            // https://mariadb.com/kb/en/delayed-replication/
93            // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
94            return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
95        }
96
97        return false;
98    }
99
100    /**
101     * @param IDatabase $conn To make queries
102     * @return float|false Seconds of lag
103     */
104    protected function getLagFromPtHeartbeat( IDatabase $conn ) {
105        $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
106        if ( $currentTrxInfo ) {
107            // There is an active transaction and the initial lag was already queried
108            $staleness = microtime( true ) - $currentTrxInfo['since'];
109            if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
110                // Avoid returning higher and higher lag value due to snapshot age
111                // given that the isolation level will typically be REPEATABLE-READ
112                // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
113                $this->logger->warning(
114                    "Using cached lag value for {db_server} due to active transaction",
115                    $this->getLogContext( $conn, [
116                        'method' => __METHOD__,
117                        'age' => $staleness,
118                        'exception' => new RuntimeException()
119                    ] )
120                );
121            }
122
123            return $currentTrxInfo['lag'];
124        }
125
126        $ago = $this->fetchSecondsSinceHeartbeat( $conn );
127        if ( $ago !== null ) {
128            return max( $ago, 0.0 );
129        }
130
131        $this->logger->error(
132            "Unable to find pt-heartbeat row for {db_server}",
133            $this->getLogContext( $conn, [
134                'method' => __METHOD__
135            ] )
136        );
137
138        return false;
139    }
140
141    /**
142     * @param IDatabase $conn To make queries
143     * @return float|null Elapsed seconds since the newest beat or null if none was found
144     * @see https://www.percona.com/doc/percona-toolkit/2.1/pt-heartbeat.html
145     */
146    protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
147        // Some setups might have pt-heartbeat running on each replica server.
148        // Exclude the row for events originating on this DB server. Assume that
149        // there is only one active replication channel and that any other row
150        // getting updates must be the row for the primary DB server.
151        $where = $conn->makeList(
152            $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
153            ISQLPlatform::LIST_AND
154        );
155        // User mysql server time so that query time and trip time are not counted.
156        // Use ORDER BY for channel based queries since that field might not be UNIQUE.
157        $query = new Query(
158            "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
159            "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
160            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
161            'SELECT',
162            null,
163            "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
164            "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
165        );
166        $res = $conn->query( $query );
167        $row = $res ? $res->fetchObject() : false;
168
169        return $row ? ( $row->us_ago / 1e6 ) : null;
170    }
171
172    public function getApproximateLagStatus( IDatabase $conn ) {
173        if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
174            // Disable caching since this is fast enough and we don't want
175            // to be *too* pessimistic by having both the cache TTL and the
176            // pt-heartbeat interval count as lag in getSessionLagStatus()
177            return parent::getApproximateLagStatus( $conn );
178        }
179
180        $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
181        $approxLag = $this->srvCache->get( $key );
182        if ( !$approxLag ) {
183            $approxLag = parent::getApproximateLagStatus( $conn );
184            $this->srvCache->set( $key, $approxLag, 1 );
185        }
186
187        return $approxLag;
188    }
189
190    /**
191     * @param IDatabase $conn To make queries
192     * @param string $fname
193     * @return stdClass Process cached row
194     */
195    public function getReplicationSafetyInfo( IDatabase $conn, $fname = __METHOD__ ) {
196        if ( $this->replicationInfoRow === null ) {
197            $this->replicationInfoRow = $conn->selectRow(
198                [],
199                [
200                    'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
201                    'binlog_format' => '@@binlog_format',
202                ],
203                [],
204                $fname
205            );
206        }
207        return $this->replicationInfoRow;
208    }
209
210    /**
211     * @return bool Whether GTID support is used (mockable for testing)
212     */
213    protected function useGTIDs() {
214        return $this->useGTIDs;
215    }
216
217    public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
218        if ( !( $pos instanceof MySQLPrimaryPos ) ) {
219            throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
220        }
221
222        if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
223            $this->logger->debug(
224                "Bypassed replication wait; database has a static dataset",
225                $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
226            );
227
228            return 0; // this is a copy of a read-only dataset with no primary DB
229        } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
230            $this->logger->debug(
231                "Bypassed replication wait; replication known to have reached {raw_pos}",
232                $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
233            );
234
235            return 0; // already reached this point for sure
236        }
237
238        // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
239        if ( $pos->getGTIDs() ) {
240            // Get the GTIDs from this replica server too see the domains (channels)
241            $refPos = $this->getReplicaPos( $conn );
242            if ( !$refPos ) {
243                $this->logger->error(
244                    "Could not get replication position on replica DB to compare to {raw_pos}",
245                    $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
246                );
247
248                return -1; // this is the primary DB itself?
249            }
250            // GTIDs with domains (channels) that are active and are present on the replica
251            $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
252            if ( !$gtidsWait ) {
253                $this->logger->error(
254                    "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
255                    $this->getLogContext( $conn, [
256                        'method' => __METHOD__,
257                        'raw_pos' => $pos,
258                        'current_pos' => $refPos
259                    ] )
260                );
261
262                return -1; // $pos is from the wrong cluster?
263            }
264            // Wait on the GTID set
265            $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
266            if ( strpos( $gtidArg, ':' ) !== false ) {
267                // MySQL GTIDs, e.g "source_id:transaction_id"
268                $query = new Query(
269                    "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg$timeout)",
270                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
271                    'SELECT',
272                    null,
273                    "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
274                );
275            } else {
276                // MariaDB GTIDs, e.g."domain:server:sequence"
277                $query = new Query(
278                    "SELECT MASTER_GTID_WAIT($gtidArg$timeout)",
279                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
280                    'SELECT',
281                    null,
282                    "SELECT MASTER_GTID_WAIT(?, ?)"
283                );
284            }
285            $waitPos = implode( ',', $gtidsWait );
286        } else {
287            // Wait on the binlog coordinates
288            $encFile = $conn->addQuotes( $pos->getLogFile() );
289            // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
290            $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
291            $query = new Query(
292                "SELECT MASTER_POS_WAIT($encFile$encPos$timeout)",
293                ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
294                'SELECT',
295                null,
296                "SELECT MASTER_POS_WAIT(?, ?, ?)"
297            );
298            $waitPos = $pos->__toString();
299        }
300
301        $start = microtime( true );
302        $res = $conn->query( $query, __METHOD__ );
303        $row = $res->fetchRow();
304        $seconds = max( microtime( true ) - $start, 0 );
305
306        // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
307        $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
308        if ( $status === null ) {
309            $this->logger->error(
310                "An error occurred while waiting for replication to reach {wait_pos}",
311                $this->getLogContext( $conn, [
312                    'raw_pos' => $pos,
313                    'wait_pos' => $waitPos,
314                    'sql' => $query->getSQL(),
315                    'seconds_waited' => $seconds,
316                    'exception' => new RuntimeException()
317                ] )
318            );
319        } elseif ( $status < 0 ) {
320            $this->logger->info(
321                "Timed out waiting for replication to reach {wait_pos}",
322                $this->getLogContext( $conn, [
323                    'raw_pos' => $pos,
324                    'wait_pos' => $waitPos,
325                    'timeout' => $timeout,
326                    'sql' => $query->getSQL(),
327                    'seconds_waited' => $seconds,
328                ] )
329            );
330        } elseif ( $status >= 0 ) {
331            $this->logger->debug(
332                "Replication has reached {wait_pos}",
333                $this->getLogContext( $conn, [
334                    'raw_pos' => $pos,
335                    'wait_pos' => $waitPos,
336                    'seconds_waited' => $seconds,
337                ] )
338            );
339            // Remember that this position was reached to save queries next time
340            $this->lastKnownReplicaPos = $pos;
341        }
342
343        return $status;
344    }
345
346    /**
347     * Get the position of the primary DB from SHOW SLAVE STATUS
348     *
349     * @param IDatabase $conn To make queries
350     * @return MySQLPrimaryPos|false
351     */
352    public function getReplicaPos( IDatabase $conn ) {
353        $now = microtime( true ); // as-of-time *before* fetching GTID variables
354
355        if ( $this->useGTIDs() ) {
356            // Try to use GTIDs, fallbacking to binlog positions if not possible
357            $data = $this->getServerGTIDs( $conn, __METHOD__ );
358            // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
359            foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
360                if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
361                    return new MySQLPrimaryPos( $data[$name], $now );
362                }
363            }
364        }
365
366        $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
367        if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
368            return new MySQLPrimaryPos(
369                "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
370                $now
371            );
372        }
373
374        return false;
375    }
376
377    /**
378     * Get the position of the primary DB from SHOW MASTER STATUS
379     *
380     * @param IDatabase $conn To make queries
381     * @return MySQLPrimaryPos|false
382     */
383    public function getPrimaryPos( IDatabase $conn ) {
384        $now = microtime( true ); // as-of-time *before* fetching GTID variables
385
386        $pos = false;
387        if ( $this->useGTIDs() ) {
388            // Try to use GTIDs, fallbacking to binlog positions if not possible
389            $data = $this->getServerGTIDs( $conn, __METHOD__ );
390            // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
391            foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
392                if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
393                    $pos = new MySQLPrimaryPos( $data[$name], $now );
394                    break;
395                }
396            }
397            // Filter domains that are inactive or not relevant to the session
398            if ( $pos ) {
399                $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
400                $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
401                if ( isset( $data['gtid_domain_id'] ) ) {
402                    $pos->setActiveDomain( $data['gtid_domain_id'] );
403                }
404            }
405        }
406
407        if ( !$pos ) {
408            $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
409            if ( $data && strlen( $data['File'] ) ) {
410                $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
411            }
412        }
413
414        return $pos;
415    }
416
417    /**
418     * @param IDatabase $conn To make queries
419     * @return string Value of server_id (32-bit integer, unique to the replication topology)
420     * @throws DBQueryError
421     */
422    protected function getServerId( IDatabase $conn ) {
423        $fname = __METHOD__;
424        return $this->srvCache->getWithSetCallback(
425            $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
426            self::SERVER_ID_CACHE_TTL,
427            static function () use ( $conn, $fname ) {
428                $query = new Query(
429                    "SELECT @@server_id AS id",
430                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
431                    'SELECT',
432                    null,
433                    "SELECT @@server_id AS id"
434                );
435                $res = $conn->query( $query, $fname );
436
437                return $res->fetchObject()->id;
438            }
439        );
440    }
441
442    /**
443     * @param IDatabase $conn To make queries
444     * @return string|null Value of server_uuid (hyphenated 128-bit hex string, globally unique)
445     * @throws DBQueryError
446     */
447    protected function getServerUUID( IDatabase $conn ) {
448        $fname = __METHOD__;
449        return $this->srvCache->getWithSetCallback(
450            $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
451            self::SERVER_ID_CACHE_TTL,
452            static function () use ( $conn, $fname ) {
453                $query = new Query(
454                    "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
455                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
456                    'SHOW',
457                    null,
458                    "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
459                );
460                $res = $conn->query( $query, $fname );
461                $row = $res->fetchObject();
462
463                return $row ? $row->Value : null;
464            }
465        );
466    }
467
468    /**
469     * @param IDatabase $conn To make queries
470     * @param string $fname
471     * @return string[]
472     */
473    protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
474        $map = [];
475
476        $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
477
478        // Get global-only variables like gtid_executed
479        $query = new Query(
480            "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
481            $flags,
482            'SHOW',
483            null,
484            "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
485        );
486        $res = $conn->query( $query, $fname );
487        foreach ( $res as $row ) {
488            $map[$row->Variable_name] = $row->Value;
489        }
490        // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
491        $query = new Query(
492            "SHOW SESSION VARIABLES LIKE 'gtid_%'",
493            $flags,
494            'SHOW',
495            null,
496            "SHOW SESSION VARIABLES LIKE 'gtid_%'"
497        );
498        $res = $conn->query( $query, $fname );
499        foreach ( $res as $row ) {
500            $map[$row->Variable_name] = $row->Value;
501        }
502
503        return $map;
504    }
505
506    /**
507     * @param IDatabase $conn To make queries
508     * @param string $role One of "MASTER"/"SLAVE"
509     * @param string $fname
510     * @return array<string,mixed>|null Latest available server status row; false on failure
511     */
512    protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
513        $query = new Query(
514            "SHOW $role STATUS",
515            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
516            'SHOW',
517            null,
518            "SHOW $role STATUS"
519        );
520        $res = $conn->query( $query, $fname );
521        $row = $res ? $res->fetchRow() : false;
522
523        return ( $row ?: null );
524    }
525
526}