Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 274
0.00% covered (danger)
0.00%
0 / 16
CRAP
0.00% covered (danger)
0.00%
0 / 1
MysqlReplicationReporter
0.00% covered (danger)
0.00%
0 / 274
0.00% covered (danger)
0.00%
0 / 16
3660
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 doGetLag
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getLagFromSlaveStatus
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 getLagFromPtHeartbeat
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
20
 fetchSecondsSinceHeartbeat
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
12
 getApproximateLagStatus
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
12
 getReplicationSafetyInfo
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
6
 useGTIDs
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 primaryPosWait
0.00% covered (danger)
0.00%
0 / 98
0.00% covered (danger)
0.00%
0 / 1
182
 getReplicaPos
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
56
 getPrimaryPos
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
110
 getTopologyBasedServerId
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getServerId
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
2
 getServerUUID
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
6
 getServerGTIDs
0.00% covered (danger)
0.00%
0 / 23
0.00% covered (danger)
0.00%
0 / 1
12
 getServerRoleStatus
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
1<?php
2/**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20namespace Wikimedia\Rdbms\Replication;
21
22use InvalidArgumentException;
23use RuntimeException;
24use stdClass;
25use Wikimedia\Rdbms\DBPrimaryPos;
26use Wikimedia\Rdbms\DBQueryError;
27use Wikimedia\Rdbms\IDatabase;
28use Wikimedia\Rdbms\MySQLPrimaryPos;
29use Wikimedia\Rdbms\Platform\ISQLPlatform;
30use Wikimedia\Rdbms\Query;
31
32/**
33 * @internal
34 * @ingroup Database
35 * @since 1.40
36 */
37class MysqlReplicationReporter extends ReplicationReporter {
38    /** @var MySQLPrimaryPos */
39    protected $lastKnownReplicaPos;
40    /** @var string Method to detect replica DB lag */
41    protected $lagDetectionMethod;
42    /** @var array Method to detect replica DB lag */
43    protected $lagDetectionOptions = [];
44    /** @var bool bool Whether to use GTID methods */
45    protected $useGTIDs = false;
46    /** @var stdClass|null */
47    private $replicationInfoRow;
48    // Cache getServerId() for 24 hours
49    private const SERVER_ID_CACHE_TTL = 86400;
50
51    /** @var float Warn if lag estimates are made for transactions older than this many seconds */
52    private const LAG_STALE_WARN_THRESHOLD = 0.100;
53
54    public function __construct(
55        $topologyRole,
56        $logger,
57        $srvCache,
58        $lagDetectionMethod,
59        $lagDetectionOptions,
60        $useGTIDs
61    ) {
62        parent::__construct( $topologyRole, $logger, $srvCache );
63        $this->lagDetectionMethod = $lagDetectionMethod;
64        $this->lagDetectionOptions = $lagDetectionOptions;
65        $this->useGTIDs = $useGTIDs;
66    }
67
68    protected function doGetLag( IDatabase $conn ) {
69        if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
70            return $this->getLagFromPtHeartbeat( $conn );
71        } else {
72            return $this->getLagFromSlaveStatus( $conn );
73        }
74    }
75
76    /**
77     * @param IDatabase $conn To make queries
78     * @return int|false Second of lag
79     */
80    protected function getLagFromSlaveStatus( IDatabase $conn ) {
81        $query = new Query(
82            'SHOW SLAVE STATUS',
83            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
84            'SHOW',
85            null,
86            'SHOW SLAVE STATUS'
87        );
88        $res = $conn->query( $query );
89        $row = $res ? $res->fetchObject() : false;
90        // If the server is not replicating, there will be no row
91        if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
92            // https://mariadb.com/kb/en/delayed-replication/
93            // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
94            return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
95        }
96
97        return false;
98    }
99
100    /**
101     * @param IDatabase $conn To make queries
102     * @return float|false Seconds of lag
103     */
104    protected function getLagFromPtHeartbeat( IDatabase $conn ) {
105        $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
106        if ( $currentTrxInfo ) {
107            // There is an active transaction and the initial lag was already queried
108            $staleness = microtime( true ) - $currentTrxInfo['since'];
109            if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
110                // Avoid returning higher and higher lag value due to snapshot age
111                // given that the isolation level will typically be REPEATABLE-READ
112                // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
113                $this->logger->warning(
114                    "Using cached lag value for {db_server} due to active transaction",
115                    $this->getLogContext( $conn, [
116                        'method' => __METHOD__,
117                        'age' => $staleness,
118                        'exception' => new RuntimeException()
119                    ] )
120                );
121            }
122
123            return $currentTrxInfo['lag'];
124        }
125
126        $ago = $this->fetchSecondsSinceHeartbeat( $conn );
127        if ( $ago !== null ) {
128            return max( $ago, 0.0 );
129        }
130
131        $this->logger->error(
132            "Unable to find pt-heartbeat row for {db_server}",
133            $this->getLogContext( $conn, [
134                'method' => __METHOD__
135            ] )
136        );
137
138        return false;
139    }
140
141    /**
142     * @param IDatabase $conn To make queries
143     * @return float|null Elapsed seconds since the newest beat or null if none was found
144     * @see https://www.percona.com/doc/percona-toolkit/2.1/pt-heartbeat.html
145     */
146    protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
147        // Some setups might have pt-heartbeat running on each replica server.
148        // Exclude the row for events originating on this DB server. Assume that
149        // there is only one active replication channel and that any other row
150        // getting updates must be the row for the primary DB server.
151        $where = $conn->makeList(
152            $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
153            ISQLPlatform::LIST_AND
154        );
155        // User mysql server time so that query time and trip time are not counted.
156        // Use ORDER BY for channel based queries since that field might not be UNIQUE.
157        $query = new Query(
158            "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
159            "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
160            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
161            'SELECT',
162            null,
163            "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
164            "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
165        );
166        $res = $conn->query( $query );
167        $row = $res ? $res->fetchObject() : false;
168
169        return $row ? ( $row->us_ago / 1e6 ) : null;
170    }
171
172    public function getApproximateLagStatus( IDatabase $conn ) {
173        if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
174            // Disable caching since this is fast enough and we don't want
175            // to be *too* pessimistic by having both the cache TTL and the
176            // pt-heartbeat interval count as lag in getSessionLagStatus()
177            return parent::getApproximateLagStatus( $conn );
178        }
179
180        $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
181        $approxLag = $this->srvCache->get( $key );
182        if ( !$approxLag ) {
183            $approxLag = parent::getApproximateLagStatus( $conn );
184            $this->srvCache->set( $key, $approxLag, 1 );
185        }
186
187        return $approxLag;
188    }
189
190    /**
191     * @param IDatabase $conn To make queries
192     * @return stdClass Process cached row
193     */
194    public function getReplicationSafetyInfo( IDatabase $conn ) {
195        if ( $this->replicationInfoRow === null ) {
196            $this->replicationInfoRow = $conn->selectRow(
197                [],
198                [
199                    'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
200                    'binlog_format' => '@@binlog_format',
201                ],
202                [],
203                __METHOD__
204            );
205        }
206        return $this->replicationInfoRow;
207    }
208
209    /**
210     * @return bool Whether GTID support is used (mockable for testing)
211     */
212    protected function useGTIDs() {
213        return $this->useGTIDs;
214    }
215
216    public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
217        if ( !( $pos instanceof MySQLPrimaryPos ) ) {
218            throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
219        }
220
221        if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
222            $this->logger->debug(
223                "Bypassed replication wait; database has a static dataset",
224                $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
225            );
226
227            return 0; // this is a copy of a read-only dataset with no primary DB
228        } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
229            $this->logger->debug(
230                "Bypassed replication wait; replication known to have reached {raw_pos}",
231                $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
232            );
233
234            return 0; // already reached this point for sure
235        }
236
237        // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
238        if ( $pos->getGTIDs() ) {
239            // Get the GTIDs from this replica server too see the domains (channels)
240            $refPos = $this->getReplicaPos( $conn );
241            if ( !$refPos ) {
242                $this->logger->error(
243                    "Could not get replication position on replica DB to compare to {raw_pos}",
244                    $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
245                );
246
247                return -1; // this is the primary DB itself?
248            }
249            // GTIDs with domains (channels) that are active and are present on the replica
250            $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
251            if ( !$gtidsWait ) {
252                $this->logger->error(
253                    "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
254                    $this->getLogContext( $conn, [
255                        'method' => __METHOD__,
256                        'raw_pos' => $pos,
257                        'current_pos' => $refPos
258                    ] )
259                );
260
261                return -1; // $pos is from the wrong cluster?
262            }
263            // Wait on the GTID set
264            $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
265            if ( strpos( $gtidArg, ':' ) !== false ) {
266                // MySQL GTIDs, e.g "source_id:transaction_id"
267                $query = new Query(
268                    "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg$timeout)",
269                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
270                    'SELECT',
271                    null,
272                    "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
273                );
274            } else {
275                // MariaDB GTIDs, e.g."domain:server:sequence"
276                $query = new Query(
277                    "SELECT MASTER_GTID_WAIT($gtidArg$timeout)",
278                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
279                    'SELECT',
280                    null,
281                    "SELECT MASTER_GTID_WAIT(?, ?)"
282                );
283            }
284            $waitPos = implode( ',', $gtidsWait );
285        } else {
286            // Wait on the binlog coordinates
287            $encFile = $conn->addQuotes( $pos->getLogFile() );
288            // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
289            $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
290            $query = new Query(
291                "SELECT MASTER_POS_WAIT($encFile$encPos$timeout)",
292                ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
293                'SELECT',
294                null,
295                "SELECT MASTER_POS_WAIT(?, ?, ?)"
296            );
297            $waitPos = $pos->__toString();
298        }
299
300        $start = microtime( true );
301        $res = $conn->query( $query, __METHOD__ );
302        $row = $res->fetchRow();
303        $seconds = max( microtime( true ) - $start, 0 );
304
305        // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
306        $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
307        if ( $status === null ) {
308            $this->logger->error(
309                "An error occurred while waiting for replication to reach {wait_pos}",
310                $this->getLogContext( $conn, [
311                    'raw_pos' => $pos,
312                    'wait_pos' => $waitPos,
313                    'sql' => $query->getSQL(),
314                    'seconds_waited' => $seconds,
315                    'exception' => new RuntimeException()
316                ] )
317            );
318        } elseif ( $status < 0 ) {
319            $this->logger->info(
320                "Timed out waiting for replication to reach {wait_pos}",
321                $this->getLogContext( $conn, [
322                    'raw_pos' => $pos,
323                    'wait_pos' => $waitPos,
324                    'timeout' => $timeout,
325                    'sql' => $query->getSQL(),
326                    'seconds_waited' => $seconds,
327                ] )
328            );
329        } elseif ( $status >= 0 ) {
330            $this->logger->debug(
331                "Replication has reached {wait_pos}",
332                $this->getLogContext( $conn, [
333                    'raw_pos' => $pos,
334                    'wait_pos' => $waitPos,
335                    'seconds_waited' => $seconds,
336                ] )
337            );
338            // Remember that this position was reached to save queries next time
339            $this->lastKnownReplicaPos = $pos;
340        }
341
342        return $status;
343    }
344
345    /**
346     * Get the position of the primary DB from SHOW SLAVE STATUS
347     *
348     * @param IDatabase $conn To make queries
349     * @return MySQLPrimaryPos|false
350     */
351    public function getReplicaPos( IDatabase $conn ) {
352        $now = microtime( true ); // as-of-time *before* fetching GTID variables
353
354        if ( $this->useGTIDs() ) {
355            // Try to use GTIDs, fallbacking to binlog positions if not possible
356            $data = $this->getServerGTIDs( $conn, __METHOD__ );
357            // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
358            foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
359                if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
360                    return new MySQLPrimaryPos( $data[$name], $now );
361                }
362            }
363        }
364
365        $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
366        if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
367            return new MySQLPrimaryPos(
368                "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
369                $now
370            );
371        }
372
373        return false;
374    }
375
376    /**
377     * Get the position of the primary DB from SHOW MASTER STATUS
378     *
379     * @param IDatabase $conn To make queries
380     * @return MySQLPrimaryPos|false
381     */
382    public function getPrimaryPos( IDatabase $conn ) {
383        $now = microtime( true ); // as-of-time *before* fetching GTID variables
384
385        $pos = false;
386        if ( $this->useGTIDs() ) {
387            // Try to use GTIDs, fallbacking to binlog positions if not possible
388            $data = $this->getServerGTIDs( $conn, __METHOD__ );
389            // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
390            foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
391                if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
392                    $pos = new MySQLPrimaryPos( $data[$name], $now );
393                    break;
394                }
395            }
396            // Filter domains that are inactive or not relevant to the session
397            if ( $pos ) {
398                $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
399                $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
400                if ( isset( $data['gtid_domain_id'] ) ) {
401                    $pos->setActiveDomain( $data['gtid_domain_id'] );
402                }
403            }
404        }
405
406        if ( !$pos ) {
407            $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
408            if ( $data && strlen( $data['File'] ) ) {
409                $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
410            }
411        }
412
413        return $pos;
414    }
415
416    /**
417     * @inheritDoc
418     * @param IDatabase $conn To make queries
419     * @return string|null 32 bit integer ID; null if not applicable or unknown
420     */
421    public function getTopologyBasedServerId( IDatabase $conn ) {
422        return $this->getServerId( $conn );
423    }
424
425    /**
426     * @param IDatabase $conn To make queries
427     * @return string Value of server_id (32-bit integer, unique to the replication topology)
428     * @throws DBQueryError
429     */
430    protected function getServerId( IDatabase $conn ) {
431        $fname = __METHOD__;
432        return $this->srvCache->getWithSetCallback(
433            $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
434            self::SERVER_ID_CACHE_TTL,
435            static function () use ( $conn, $fname ) {
436                $query = new Query(
437                    "SELECT @@server_id AS id",
438                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
439                    'SELECT',
440                    null,
441                    "SELECT @@server_id AS id"
442                );
443                $res = $conn->query( $query, $fname );
444
445                return $res->fetchObject()->id;
446            }
447        );
448    }
449
450    /**
451     * @param IDatabase $conn To make queries
452     * @return string|null Value of server_uuid (hyphenated 128-bit hex string, globally unique)
453     * @throws DBQueryError
454     */
455    protected function getServerUUID( IDatabase $conn ) {
456        $fname = __METHOD__;
457        return $this->srvCache->getWithSetCallback(
458            $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
459            self::SERVER_ID_CACHE_TTL,
460            static function () use ( $conn, $fname ) {
461                $query = new Query(
462                    "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
463                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
464                    'SHOW',
465                    null,
466                    "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
467                );
468                $res = $conn->query( $query, $fname );
469                $row = $res->fetchObject();
470
471                return $row ? $row->Value : null;
472            }
473        );
474    }
475
476    /**
477     * @param IDatabase $conn To make queries
478     * @param string $fname
479     * @return string[]
480     */
481    protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
482        $map = [];
483
484        $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
485
486        // Get global-only variables like gtid_executed
487        $query = new Query(
488            "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
489            $flags,
490            'SHOW',
491            null,
492            "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
493        );
494        $res = $conn->query( $query, $fname );
495        foreach ( $res as $row ) {
496            $map[$row->Variable_name] = $row->Value;
497        }
498        // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
499        $query = new Query(
500            "SHOW SESSION VARIABLES LIKE 'gtid_%'",
501            $flags,
502            'SHOW',
503            null,
504            "SHOW SESSION VARIABLES LIKE 'gtid_%'"
505        );
506        $res = $conn->query( $query, $fname );
507        foreach ( $res as $row ) {
508            $map[$row->Variable_name] = $row->Value;
509        }
510
511        return $map;
512    }
513
514    /**
515     * @param IDatabase $conn To make queries
516     * @param string $role One of "MASTER"/"SLAVE"
517     * @param string $fname
518     * @return array<string,mixed>|null Latest available server status row; false on failure
519     */
520    protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
521        $query = new Query(
522            "SHOW $role STATUS",
523            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
524            'SHOW',
525            null,
526            "SHOW $role STATUS"
527        );
528        $res = $conn->query( $query, $fname );
529        $row = $res ? $res->fetchRow() : false;
530
531        return ( $row ?: null );
532    }
533
534}