MediaWiki REL1_40
DatabasePostgres.php
Go to the documentation of this file.
1<?php
20namespace Wikimedia\Rdbms;
21
22use RuntimeException;
25use Wikimedia\WaitConditionLoop;
26
34 private $port;
36 private $tempSchema;
38 private $keywordTableMap;
40 private $numericVersion;
41
43 private $lastResultHandle;
44
46 protected $platform;
47
55 public function __construct( array $params ) {
56 $this->port = intval( $params['port'] ?? null );
57
58 if ( isset( $params['keywordTableMap'] ) ) {
59 wfDeprecatedMsg( 'Passing keywordTableMap parameter to ' .
60 'DatabasePostgres::__construct() is deprecated', '1.37'
61 );
62
63 $this->keywordTableMap = $params['keywordTableMap'];
64 }
65
66 parent::__construct( $params );
67
68 $this->platform = new PostgresPlatform(
69 $this,
70 $this->logger,
71 $this->currentDomain,
72 $this->errorLogger
73 );
74 $this->replicationReporter = new ReplicationReporter(
75 $params['topologyRole'],
76 $this->logger,
77 $params['srvCache']
78 );
79 }
80
81 public function getType() {
82 return 'postgres';
83 }
84
85 protected function open( $server, $user, $password, $db, $schema, $tablePrefix ) {
86 if ( !function_exists( 'pg_connect' ) ) {
88 "Postgres functions missing, have you compiled PHP with the --with-pgsql\n" .
89 "option? (Note: if you recently installed PHP, you may need to restart your\n" .
90 "webserver and database)"
91 );
92 }
93
94 $this->close( __METHOD__ );
95
96 $connectVars = [
97 // A database must be specified in order to connect to Postgres. If $dbName is not
98 // specified, then use the standard "postgres" database that should exist by default.
99 'dbname' => ( $db !== null && $db !== '' ) ? $db : 'postgres',
100 'user' => $user,
101 'password' => $password
102 ];
103 if ( $server !== null && $server !== '' ) {
104 $connectVars['host'] = $server;
105 }
106 if ( $this->port > 0 ) {
107 $connectVars['port'] = $this->port;
108 }
109 if ( $this->ssl ) {
110 $connectVars['sslmode'] = 'require';
111 }
112 $connectString = $this->makeConnectionString( $connectVars );
113
114 $this->installErrorHandler();
115 try {
116 $this->conn = pg_connect( $connectString, PGSQL_CONNECT_FORCE_NEW ) ?: null;
117 } catch ( RuntimeException $e ) {
118 $this->restoreErrorHandler();
119 throw $this->newExceptionAfterConnectError( $e->getMessage() );
120 }
121 $error = $this->restoreErrorHandler();
122
123 if ( !$this->conn ) {
124 throw $this->newExceptionAfterConnectError( $error ?: $this->lastError() );
125 }
126
127 try {
128 // Since no transaction is active at this point, any SET commands should apply
129 // for the entire session (e.g. will not be reverted on transaction rollback).
130 // See https://www.postgresql.org/docs/8.3/sql-set.html
131 $variables = [
132 'client_encoding' => 'UTF8',
133 'datestyle' => 'ISO, YMD',
134 'timezone' => 'GMT',
135 'standard_conforming_strings' => 'on',
136 'bytea_output' => 'escape',
137 'client_min_messages' => 'ERROR'
138 ];
139 foreach ( $variables as $var => $val ) {
140 $this->query(
141 'SET ' . $this->platform->addIdentifierQuotes( $var ) . ' = ' . $this->addQuotes( $val ),
142 __METHOD__,
143 self::QUERY_NO_RETRY | self::QUERY_CHANGE_TRX
144 );
145 }
146 $this->determineCoreSchema( $schema );
147 $this->currentDomain = new DatabaseDomain( $db, $schema, $tablePrefix );
148 $this->platform->setCurrentDomain( $this->currentDomain );
149 } catch ( RuntimeException $e ) {
150 throw $this->newExceptionAfterConnectError( $e->getMessage() );
151 }
152 }
153
154 public function databasesAreIndependent() {
155 return true;
156 }
157
158 public function doSelectDomain( DatabaseDomain $domain ) {
159 $database = $domain->getDatabase();
160 if ( $database === null ) {
161 // A null database means "don't care" so leave it as is and update the table prefix
162 $this->currentDomain = new DatabaseDomain(
163 $this->currentDomain->getDatabase(),
164 $domain->getSchema() ?? $this->currentDomain->getSchema(),
165 $domain->getTablePrefix()
166 );
167 $this->platform->setCurrentDomain( $this->currentDomain );
168 } elseif ( $this->getDBname() !== $database ) {
169 // Postgres doesn't support selectDB in the same way MySQL does.
170 // So if the DB name doesn't match the open connection, open a new one
171 $this->open(
172 $this->connectionParams[self::CONN_HOST],
173 $this->connectionParams[self::CONN_USER],
174 $this->connectionParams[self::CONN_PASSWORD],
175 $database,
176 $domain->getSchema(),
177 $domain->getTablePrefix()
178 );
179 } else {
180 $this->currentDomain = $domain;
181 $this->platform->setCurrentDomain( $this->currentDomain );
182 }
183
184 return true;
185 }
186
191 private function makeConnectionString( $vars ) {
192 $s = '';
193 foreach ( $vars as $name => $value ) {
194 $s .= "$name='" . str_replace( [ "\\", "'" ], [ "\\\\", "\\'" ], $value ) . "' ";
195 }
196
197 return $s;
198 }
199
200 protected function closeConnection() {
201 return $this->conn ? pg_close( $this->conn ) : true;
202 }
203
204 public function doSingleStatementQuery( string $sql ): QueryStatus {
205 $conn = $this->getBindingHandle();
206
207 $sql = mb_convert_encoding( $sql, 'UTF-8' );
208 // Clear any previously left over result
209 while ( $priorRes = pg_get_result( $conn ) ) {
210 pg_free_result( $priorRes );
211 }
212
213 if ( pg_send_query( $conn, $sql ) === false ) {
214 throw new DBUnexpectedError( $this, "Unable to post new query to PostgreSQL\n" );
215 }
216
217 // Newer PHP versions use PgSql\Result instead of resource variables
218 // https://www.php.net/manual/en/function.pg-get-result.php
219 $pgRes = pg_get_result( $conn );
220 $this->lastResultHandle = $pgRes;
221 $res = pg_result_error( $pgRes ) ? false : $pgRes;
222
223 return new QueryStatus(
224 is_bool( $res ) ? $res : new PostgresResultWrapper( $this, $conn, $res ),
225 $this->affectedRows(),
226 $this->lastError(),
227 $this->lastErrno()
228 );
229 }
230
231 protected function doMultiStatementQuery( array $sqls ): array {
232 $qsByStatementId = [];
233
234 $conn = $this->getBindingHandle();
235 // Clear any previously left over result
236 while ( $pgResultSet = pg_get_result( $conn ) ) {
237 pg_free_result( $pgResultSet );
238 }
239
240 $combinedSql = mb_convert_encoding( implode( ";\n", $sqls ), 'UTF-8' );
241 pg_send_query( $conn, $combinedSql );
242
243 reset( $sqls );
244 while ( ( $pgResultSet = pg_get_result( $conn ) ) !== false ) {
245 $this->lastResultHandle = $pgResultSet;
246
247 $statementId = key( $sqls );
248 if ( $statementId !== null ) {
249 if ( pg_result_error( $pgResultSet ) ) {
250 $res = false;
251 } else {
252 $res = new PostgresResultWrapper( $this, $conn, $pgResultSet );
253 }
254 $qsByStatementId[$statementId] = new QueryStatus(
255 $res,
256 pg_affected_rows( $pgResultSet ),
257 (string)pg_result_error( $pgResultSet ),
258 pg_result_error_field( $pgResultSet, PGSQL_DIAG_SQLSTATE )
259 );
260 }
261 next( $sqls );
262 }
263 // Fill in status for statements aborted due to prior statement failure
264 while ( ( $statementId = key( $sqls ) ) !== null ) {
265 $qsByStatementId[$statementId] = new QueryStatus( false, 0, 'Query aborted', 0 );
266 next( $sqls );
267 }
268
269 return $qsByStatementId;
270 }
271
272 protected function dumpError() {
273 $diags = [
274 PGSQL_DIAG_SEVERITY,
275 PGSQL_DIAG_SQLSTATE,
276 PGSQL_DIAG_MESSAGE_PRIMARY,
277 PGSQL_DIAG_MESSAGE_DETAIL,
278 PGSQL_DIAG_MESSAGE_HINT,
279 PGSQL_DIAG_STATEMENT_POSITION,
280 PGSQL_DIAG_INTERNAL_POSITION,
281 PGSQL_DIAG_INTERNAL_QUERY,
282 PGSQL_DIAG_CONTEXT,
283 PGSQL_DIAG_SOURCE_FILE,
284 PGSQL_DIAG_SOURCE_LINE,
285 PGSQL_DIAG_SOURCE_FUNCTION
286 ];
287 foreach ( $diags as $d ) {
288 $this->logger->debug( sprintf( "PgSQL ERROR(%d): %s",
289 $d, pg_result_error_field( $this->lastResultHandle, $d ) ) );
290 }
291 }
292
293 public function insertId() {
294 $res = $this->query(
295 "SELECT lastval()",
296 __METHOD__,
297 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
298 );
299 $row = $res->fetchRow();
300
301 // @phan-suppress-next-line PhanTypeMismatchReturnProbablyReal Return type is undefined for no lastval
302 return $row[0] === null ? null : (int)$row[0];
303 }
304
305 public function lastError() {
306 if ( $this->conn ) {
307 if ( $this->lastResultHandle ) {
308 return pg_result_error( $this->lastResultHandle );
309 } else {
310 return pg_last_error() ?: $this->lastConnectError;
311 }
312 }
313
314 return $this->getLastPHPError() ?: 'No database connection';
315 }
316
317 public function lastErrno() {
318 if ( $this->lastResultHandle ) {
319 $lastErrno = pg_result_error_field( $this->lastResultHandle, PGSQL_DIAG_SQLSTATE );
320 if ( $lastErrno !== false ) {
321 return $lastErrno;
322 }
323 }
324
325 return '00000';
326 }
327
328 protected function fetchAffectedRowCount() {
329 if ( !$this->lastResultHandle ) {
330 return 0;
331 }
332
333 return pg_affected_rows( $this->lastResultHandle );
334 }
335
351 public function estimateRowCount( $table, $var = '*', $conds = '',
352 $fname = __METHOD__, $options = [], $join_conds = []
353 ) {
354 $conds = $this->platform->normalizeConditions( $conds, $fname );
355 $column = $this->platform->extractSingleFieldFromList( $var );
356 if ( is_string( $column ) && !in_array( $column, [ '*', '1' ] ) ) {
357 $conds[] = "$column IS NOT NULL";
358 }
359
360 $options['EXPLAIN'] = true;
361 $res = $this->select( $table, $var, $conds, $fname, $options, $join_conds );
362 $rows = -1;
363 if ( $res ) {
364 $row = $res->fetchRow();
365 $count = [];
366 if ( preg_match( '/rows=(\d+)/', $row[0], $count ) ) {
367 $rows = (int)$count[1];
368 }
369 }
370
371 return $rows;
372 }
373
374 public function indexInfo( $table, $index, $fname = __METHOD__ ) {
375 $res = $this->query(
376 "SELECT indexname FROM pg_indexes WHERE tablename='$table'",
377 $fname,
378 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
379 );
380 if ( !$res ) {
381 return null;
382 }
383 foreach ( $res as $row ) {
384 if ( $row->indexname == $this->indexName( $index ) ) {
385 return $row;
386 }
387 }
388
389 return false;
390 }
391
392 public function indexAttributes( $index, $schema = false ) {
393 if ( $schema === false ) {
394 $schemas = $this->getCoreSchemas();
395 } else {
396 $schemas = [ $schema ];
397 }
398
399 $eindex = $this->addQuotes( $index );
400
401 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
402 foreach ( $schemas as $schema ) {
403 $eschema = $this->addQuotes( $schema );
404 /*
405 * A subquery would be not needed if we didn't care about the order
406 * of attributes, but we do
407 */
408 $sql = <<<__INDEXATTR__
409
410 SELECT opcname,
411 attname,
412 i.indoption[s.g] as option,
413 pg_am.amname
414 FROM
415 (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
416 FROM
417 pg_index isub
418 JOIN pg_class cis
419 ON cis.oid=isub.indexrelid
420 JOIN pg_namespace ns
421 ON cis.relnamespace = ns.oid
422 WHERE cis.relname=$eindex AND ns.nspname=$eschema) AS s,
423 pg_attribute,
424 pg_opclass opcls,
425 pg_am,
426 pg_class ci
427 JOIN pg_index i
428 ON ci.oid=i.indexrelid
429 JOIN pg_class ct
430 ON ct.oid = i.indrelid
431 JOIN pg_namespace n
432 ON ci.relnamespace = n.oid
433 WHERE
434 ci.relname=$eindex AND n.nspname=$eschema
435 AND attrelid = ct.oid
436 AND i.indkey[s.g] = attnum
437 AND i.indclass[s.g] = opcls.oid
438 AND pg_am.oid = opcls.opcmethod
439__INDEXATTR__;
440 $res = $this->query( $sql, __METHOD__, $flags );
441 $a = [];
442 if ( $res ) {
443 foreach ( $res as $row ) {
444 $a[] = [
445 $row->attname,
446 $row->opcname,
447 $row->amname,
448 $row->option ];
449 }
450 return $a;
451 }
452 }
453 return null;
454 }
455
456 public function indexUnique( $table, $index, $fname = __METHOD__ ) {
457 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
458 $sql = "SELECT indexname FROM pg_indexes WHERE tablename='{$table}'" .
459 " AND indexdef LIKE 'CREATE UNIQUE%(" .
460 $this->strencode( $this->indexName( $index ) ) .
461 ")'";
462 $res = $this->query( $sql, $fname, $flags );
463 return $res && $res->numRows() > 0;
464 }
465
484 protected function doInsertSelectNative(
485 $destTable,
486 $srcTable,
487 array $varMap,
488 $conds,
489 $fname,
490 array $insertOptions,
491 array $selectOptions,
492 $selectJoinConds
493 ) {
494 if ( in_array( 'IGNORE', $insertOptions ) ) {
495 // Use "ON CONFLICT DO" if we have it for IGNORE
496 $destTable = $this->tableName( $destTable );
497
498 $selectSql = $this->selectSQLText(
499 $srcTable,
500 array_values( $varMap ),
501 $conds,
502 $fname,
503 $selectOptions,
504 $selectJoinConds
505 );
506
507 $sql = "INSERT INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ') ' .
508 $selectSql . ' ON CONFLICT DO NOTHING';
509
510 $this->query( $sql, $fname, self::QUERY_CHANGE_ROWS );
511 } else {
512 parent::doInsertSelectNative( $destTable, $srcTable, $varMap, $conds, $fname,
513 $insertOptions, $selectOptions, $selectJoinConds );
514 }
515 }
516
522 public function realTableName( $name, $format = 'quoted' ) {
523 return parent::tableName( $name, $format );
524 }
525
526 public function nextSequenceValue( $seqName ) {
527 return new NextSequenceValue;
528 }
529
536 public function currentSequenceValue( $seqName ) {
537 $res = $this->query(
538 "SELECT currval('" . str_replace( "'", "''", $seqName ) . "')",
539 __METHOD__,
540 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
541 );
542 $row = $res->fetchRow();
543 $currval = $row[0];
544
545 return $currval;
546 }
547
552 public function getValueTypesForWithClause( $table ) {
553 $typesByColumn = [];
554
555 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
556 $encTable = $this->addQuotes( $table );
557 foreach ( $this->getCoreSchemas() as $schema ) {
558 $encSchema = $this->addQuotes( $schema );
559 $sql = "SELECT column_name,udt_name " .
560 "FROM information_schema.columns " .
561 "WHERE table_name = $encTable AND table_schema = $encSchema";
562 $res = $this->query( $sql, __METHOD__, $flags );
563 if ( $res->numRows() ) {
564 foreach ( $res as $row ) {
565 $typesByColumn[$row->column_name] = $row->udt_name;
566 }
567 break;
568 }
569 }
570
571 return $typesByColumn;
572 }
573
574 public function textFieldSize( $table, $field ) {
575 $flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
576 $encTable = $this->tableName( $table );
577 $sql = "SELECT t.typname as ftype,a.atttypmod as size
578 FROM pg_class c, pg_attribute a, pg_type t
579 WHERE relname='$encTable' AND a.attrelid=c.oid AND
580 a.atttypid=t.oid and a.attname='$field'";
581 $res = $this->query( $sql, __METHOD__, $flags );
582 $row = $res->fetchObject();
583 if ( $row->ftype == 'varchar' ) {
584 $size = $row->size - 4;
585 } else {
586 $size = $row->size;
587 }
588
589 return $size;
590 }
591
592 public function wasDeadlock() {
593 // https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
594 return $this->lastErrno() === '40P01';
595 }
596
597 public function wasLockTimeout() {
598 // https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
599 return $this->lastErrno() === '55P03';
600 }
601
602 protected function isConnectionError( $errno ) {
603 // https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
604 static $codes = [ '08000', '08003', '08006', '08001', '08004', '57P01', '57P03', '53300' ];
605
606 return in_array( $errno, $codes, true );
607 }
608
609 protected function isQueryTimeoutError( $errno ) {
610 // https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
611 return ( $errno === '57014' );
612 }
613
614 protected function isKnownStatementRollbackError( $errno ) {
615 return false; // transaction has to be rolled-back from error state
616 }
617
618 public function duplicateTableStructure(
619 $oldName, $newName, $temporary = false, $fname = __METHOD__
620 ) {
621 $newNameE = $this->platform->addIdentifierQuotes( $newName );
622 $oldNameE = $this->platform->addIdentifierQuotes( $oldName );
623
624 $temporary = $temporary ? 'TEMPORARY' : '';
625
626 $ret = $this->query(
627 "CREATE $temporary TABLE $newNameE " .
628 "(LIKE $oldNameE INCLUDING DEFAULTS INCLUDING INDEXES)",
629 $fname,
630 self::QUERY_PSEUDO_PERMANENT | self::QUERY_CHANGE_SCHEMA
631 );
632 if ( !$ret ) {
633 return $ret;
634 }
635
636 $res = $this->query(
637 'SELECT attname FROM pg_class c'
638 . ' JOIN pg_namespace n ON (n.oid = c.relnamespace)'
639 . ' JOIN pg_attribute a ON (a.attrelid = c.oid)'
640 . ' JOIN pg_attrdef d ON (c.oid=d.adrelid and a.attnum=d.adnum)'
641 . ' WHERE relkind = \'r\''
642 . ' AND nspname = ' . $this->addQuotes( $this->getCoreSchema() )
643 . ' AND relname = ' . $this->addQuotes( $oldName )
644 . ' AND pg_get_expr(adbin, adrelid) LIKE \'nextval(%\'',
645 $fname,
646 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
647 );
648 $row = $res->fetchObject();
649 if ( $row ) {
650 $field = $row->attname;
651 $newSeq = "{$newName}_{$field}_seq";
652 $fieldE = $this->platform->addIdentifierQuotes( $field );
653 $newSeqE = $this->platform->addIdentifierQuotes( $newSeq );
654 $newSeqQ = $this->addQuotes( $newSeq );
655 $this->query(
656 "CREATE $temporary SEQUENCE $newSeqE OWNED BY $newNameE.$fieldE",
657 $fname,
658 self::QUERY_CHANGE_SCHEMA
659 );
660 $this->query(
661 "ALTER TABLE $newNameE ALTER COLUMN $fieldE SET DEFAULT nextval({$newSeqQ}::regclass)",
662 $fname,
663 self::QUERY_CHANGE_SCHEMA
664 );
665 }
666
667 return $ret;
668 }
669
670 protected function doTruncate( array $tables, $fname ) {
671 $encTables = $this->tableNamesN( ...$tables );
672 $sql = "TRUNCATE TABLE " . implode( ',', $encTables ) . " RESTART IDENTITY";
673 $this->query( $sql, $fname, self::QUERY_CHANGE_SCHEMA );
674 }
675
682 public function listTables( $prefix = '', $fname = __METHOD__ ) {
683 $eschemas = implode( ',', array_map( [ $this, 'addQuotes' ], $this->getCoreSchemas() ) );
684 $result = $this->query(
685 "SELECT DISTINCT tablename FROM pg_tables WHERE schemaname IN ($eschemas)",
686 $fname,
687 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
688 );
689 $endArray = [];
690
691 foreach ( $result as $table ) {
692 $vars = get_object_vars( $table );
693 $table = array_pop( $vars );
694 if ( $prefix == '' || strpos( $table, $prefix ) === 0 ) {
695 $endArray[] = $table;
696 }
697 }
698
699 return $endArray;
700 }
701
720 private function pg_array_parse( $text, &$output, $limit = false, $offset = 1 ) {
721 if ( $limit === false ) {
722 $limit = strlen( $text ) - 1;
723 $output = [];
724 }
725 if ( $text == '{}' ) {
726 return $output;
727 }
728 do {
729 if ( $text[$offset] != '{' ) {
730 preg_match( "/(\\{?\"([^\"\\\\]|\\\\.)*\"|[^,{}]+)+([,}]+)/",
731 $text, $match, 0, $offset );
732 $offset += strlen( $match[0] );
733 $output[] = ( $match[1][0] != '"'
734 ? $match[1]
735 : stripcslashes( substr( $match[1], 1, -1 ) ) );
736 if ( $match[3] == '},' ) {
737 return $output;
738 }
739 } else {
740 $offset = $this->pg_array_parse( $text, $output, $limit, $offset + 1 );
741 }
742 } while ( $limit > $offset );
743
744 return $output;
745 }
746
747 public function getSoftwareLink() {
748 return '[{{int:version-db-postgres-url}} PostgreSQL]';
749 }
750
758 public function getCurrentSchema() {
759 $res = $this->query(
760 "SELECT current_schema()",
761 __METHOD__,
762 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
763 );
764 $row = $res->fetchRow();
765
766 return $row[0];
767 }
768
779 public function getSchemas() {
780 $res = $this->query(
781 "SELECT current_schemas(false)",
782 __METHOD__,
783 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
784 );
785 $row = $res->fetchRow();
786 $schemas = [];
787
788 /* PHP pgsql support does not support array type, "{a,b}" string is returned */
789
790 return $this->pg_array_parse( $row[0], $schemas );
791 }
792
802 public function getSearchPath() {
803 $res = $this->query(
804 "SHOW search_path",
805 __METHOD__,
806 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
807 );
808 $row = $res->fetchRow();
809
810 /* PostgreSQL returns SHOW values as strings */
811
812 return explode( ",", $row[0] );
813 }
814
822 private function setSearchPath( $search_path ) {
823 $this->query(
824 "SET search_path = " . implode( ", ", $search_path ),
825 __METHOD__,
826 self::QUERY_CHANGE_TRX
827 );
828 }
829
844 public function determineCoreSchema( $desiredSchema ) {
845 if ( $this->trxLevel() ) {
846 // We do not want the schema selection to change on ROLLBACK or INSERT SELECT.
847 // See https://www.postgresql.org/docs/8.3/sql-set.html
848 throw new DBUnexpectedError(
849 $this,
850 __METHOD__ . ": a transaction is currently active"
851 );
852 }
853
854 if ( $this->schemaExists( $desiredSchema ) ) {
855 if ( in_array( $desiredSchema, $this->getSchemas() ) ) {
856 $this->platform->setCoreSchema( $desiredSchema );
857 $this->logger->debug(
858 "Schema \"" . $desiredSchema . "\" already in the search path\n" );
859 } else {
860 // Prepend the desired schema to the search path (T17816)
861 $search_path = $this->getSearchPath();
862 array_unshift( $search_path, $this->platform->addIdentifierQuotes( $desiredSchema ) );
863 $this->setSearchPath( $search_path );
864 $this->platform->setCoreSchema( $desiredSchema );
865 $this->logger->debug(
866 "Schema \"" . $desiredSchema . "\" added to the search path\n" );
867 }
868 } else {
869 $this->platform->setCoreSchema( $this->getCurrentSchema() );
870 $this->logger->debug(
871 "Schema \"" . $desiredSchema . "\" not found, using current \"" .
872 $this->getCoreSchema() . "\"\n" );
873 }
874 }
875
882 public function getCoreSchema() {
883 return $this->platform->getCoreSchema();
884 }
885
892 public function getCoreSchemas() {
893 if ( $this->tempSchema ) {
894 return [ $this->tempSchema, $this->getCoreSchema() ];
895 }
896
897 $res = $this->query(
898 "SELECT nspname FROM pg_catalog.pg_namespace n WHERE n.oid = pg_my_temp_schema()",
899 __METHOD__,
900 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
901 );
902 $row = $res->fetchObject();
903 if ( $row ) {
904 $this->tempSchema = $row->nspname;
905 return [ $this->tempSchema, $this->getCoreSchema() ];
906 }
907
908 return [ $this->getCoreSchema() ];
909 }
910
911 public function getServerVersion() {
912 if ( !isset( $this->numericVersion ) ) {
913 $conn = $this->getBindingHandle();
914 $versionInfo = pg_version( $conn );
915 if ( version_compare( $versionInfo['client'], '7.4.0', 'lt' ) ) {
916 // Old client, abort install
917 $this->numericVersion = '7.3 or earlier';
918 } elseif ( isset( $versionInfo['server'] ) ) {
919 // Normal client
920 $this->numericVersion = $versionInfo['server'];
921 } else {
922 // T18937: broken pgsql extension from PHP<5.3
923 $this->numericVersion = pg_parameter_status( $conn, 'server_version' );
924 }
925 }
926
927 return $this->numericVersion;
928 }
929
938 private function relationExists( $table, $types, $schema = false ) {
939 if ( !is_array( $types ) ) {
940 $types = [ $types ];
941 }
942 if ( $schema === false ) {
943 $schemas = $this->getCoreSchemas();
944 } else {
945 $schemas = [ $schema ];
946 }
947 $table = $this->realTableName( $table, 'raw' );
948 $etable = $this->addQuotes( $table );
949 foreach ( $schemas as $schema ) {
950 $eschema = $this->addQuotes( $schema );
951 $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
952 . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
953 . "AND c.relkind IN ('" . implode( "','", $types ) . "')";
954 $res = $this->query(
955 $sql,
956 __METHOD__,
957 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
958 );
959 if ( $res && $res->numRows() ) {
960 return true;
961 }
962 }
963
964 return false;
965 }
966
974 public function tableExists( $table, $fname = __METHOD__, $schema = false ) {
975 return $this->relationExists( $table, [ 'r', 'v' ], $schema );
976 }
977
978 public function sequenceExists( $sequence, $schema = false ) {
979 return $this->relationExists( $sequence, 'S', $schema );
980 }
981
982 public function triggerExists( $table, $trigger ) {
983 $q = <<<SQL
984 SELECT 1 FROM pg_class, pg_namespace, pg_trigger
985 WHERE relnamespace=pg_namespace.oid AND relkind='r'
986 AND tgrelid=pg_class.oid
987 AND nspname=%s AND relname=%s AND tgname=%s
988SQL;
989 foreach ( $this->getCoreSchemas() as $schema ) {
990 $res = $this->query(
991 sprintf(
992 $q,
993 $this->addQuotes( $schema ),
994 $this->addQuotes( $table ),
995 $this->addQuotes( $trigger )
996 ),
997 __METHOD__,
998 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
999 );
1000 if ( $res && $res->numRows() ) {
1001 return true;
1002 }
1003 }
1004
1005 return false;
1006 }
1007
1008 public function ruleExists( $table, $rule ) {
1009 $exists = $this->selectField( 'pg_rules', 'rulename',
1010 [
1011 'rulename' => $rule,
1012 'tablename' => $table,
1013 'schemaname' => $this->getCoreSchemas()
1014 ],
1015 __METHOD__
1016 );
1017
1018 return $exists === $rule;
1019 }
1020
1021 public function constraintExists( $table, $constraint ) {
1022 foreach ( $this->getCoreSchemas() as $schema ) {
1023 $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
1024 "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
1025 $this->addQuotes( $schema ),
1026 $this->addQuotes( $table ),
1027 $this->addQuotes( $constraint )
1028 );
1029 $res = $this->query(
1030 $sql,
1031 __METHOD__,
1032 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1033 );
1034 if ( $res && $res->numRows() ) {
1035 return true;
1036 }
1037 }
1038 return false;
1039 }
1040
1046 public function schemaExists( $schema ) {
1047 if ( !strlen( $schema ?? '' ) ) {
1048 return false; // short-circuit
1049 }
1050
1051 $res = $this->query(
1052 "SELECT 1 FROM pg_catalog.pg_namespace " .
1053 "WHERE nspname = " . $this->addQuotes( $schema ) . " LIMIT 1",
1054 __METHOD__,
1055 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1056 );
1057
1058 return ( $res->numRows() > 0 );
1059 }
1060
1066 public function roleExists( $roleName ) {
1067 $res = $this->query(
1068 "SELECT 1 FROM pg_catalog.pg_roles " .
1069 "WHERE rolname = " . $this->addQuotes( $roleName ) . " LIMIT 1",
1070 __METHOD__,
1071 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1072 );
1073
1074 return ( $res->numRows() > 0 );
1075 }
1076
1082 public function fieldInfo( $table, $field ) {
1083 return PostgresField::fromText( $this, $table, $field );
1084 }
1085
1086 public function encodeBlob( $b ) {
1087 $conn = $this->getBindingHandle();
1088
1089 return new PostgresBlob( pg_escape_bytea( $conn, $b ) );
1090 }
1091
1092 public function decodeBlob( $b ) {
1093 if ( $b instanceof PostgresBlob ) {
1094 $b = $b->fetch();
1095 } elseif ( $b instanceof Blob ) {
1096 return $b->fetch();
1097 }
1098
1099 return pg_unescape_bytea( $b );
1100 }
1101
1102 public function strencode( $s ) {
1103 // Should not be called by us
1104 return pg_escape_string( $this->getBindingHandle(), (string)$s );
1105 }
1106
1107 public function addQuotes( $s ) {
1108 $conn = $this->getBindingHandle();
1109
1110 if ( $s === null ) {
1111 return 'NULL';
1112 } elseif ( is_bool( $s ) ) {
1113 return (string)intval( $s );
1114 } elseif ( is_int( $s ) ) {
1115 return (string)$s;
1116 } elseif ( $s instanceof Blob ) {
1117 if ( $s instanceof PostgresBlob ) {
1118 $s = $s->fetch();
1119 } else {
1120 $s = pg_escape_bytea( $conn, $s->fetch() );
1121 }
1122 return "'$s'";
1123 } elseif ( $s instanceof NextSequenceValue ) {
1124 return 'DEFAULT';
1125 }
1126
1127 return "'" . pg_escape_string( $conn, (string)$s ) . "'";
1128 }
1129
1130 public function streamStatementEnd( &$sql, &$newLine ) {
1131 # Allow dollar quoting for function declarations
1132 if ( substr( $newLine, 0, 4 ) == '$mw$' ) {
1133 if ( $this->delimiter ) {
1134 $this->delimiter = false;
1135 } else {
1136 $this->delimiter = ';';
1137 }
1138 }
1139
1140 return parent::streamStatementEnd( $sql, $newLine );
1141 }
1142
1143 public function doLockIsFree( string $lockName, string $method ) {
1144 $res = $this->query(
1145 $this->platform->lockIsFreeSQLText( $lockName ),
1146 $method,
1147 self::QUERY_CHANGE_LOCKS
1148 );
1149 $row = $res->fetchObject();
1150
1151 return ( $row->unlocked === 't' );
1152 }
1153
1154 public function doLock( string $lockName, string $method, int $timeout ) {
1155 $sql = $this->platform->lockSQLText( $lockName, $timeout );
1156
1157 $acquired = null;
1158 $loop = new WaitConditionLoop(
1159 function () use ( $sql, $method, &$acquired ) {
1160 $res = $this->query(
1161 $sql,
1162 $method,
1163 self::QUERY_CHANGE_LOCKS
1164 );
1165 $row = $res->fetchObject();
1166
1167 if ( $row->acquired !== null ) {
1168 $acquired = (float)$row->acquired;
1169
1170 return WaitConditionLoop::CONDITION_REACHED;
1171 }
1172
1173 return WaitConditionLoop::CONDITION_CONTINUE;
1174 },
1175 $timeout
1176 );
1177 $loop->invoke();
1178
1179 return $acquired;
1180 }
1181
1182 public function doUnlock( string $lockName, string $method ) {
1183 $result = $this->query(
1184 $this->platform->unlockSQLText( $lockName ),
1185 $method,
1186 self::QUERY_CHANGE_LOCKS
1187 );
1188 $row = $result->fetchObject();
1189
1190 return ( $row->released === 't' );
1191 }
1192
1193 protected function doFlushSession( $fname ) {
1194 $flags = self::QUERY_CHANGE_LOCKS | self::QUERY_NO_RETRY;
1195
1196 // https://www.postgresql.org/docs/9.1/functions-admin.html
1197 $sql = "SELECT pg_advisory_unlock_all()";
1198 $qs = $this->executeQuery( $sql, __METHOD__, $flags, $sql );
1199 if ( $qs->res === false ) {
1200 $this->reportQueryError( $qs->message, $qs->code, $sql, $fname, true );
1201 }
1202 }
1203
1204 public function serverIsReadOnly() {
1205 $res = $this->query(
1206 "SHOW default_transaction_read_only",
1207 __METHOD__,
1208 self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
1209 );
1210 $row = $res->fetchObject();
1211
1212 return $row ? ( strtolower( $row->default_transaction_read_only ) === 'on' ) : false;
1213 }
1214
1215 public static function getAttributes() {
1216 return [ self::ATTR_SCHEMAS_AS_TABLE_GROUPS => true ];
1217 }
1218}
1219
1223class_alias( DatabasePostgres::class, 'DatabasePostgres' );
wfDeprecatedMsg( $msg, $version=false, $component=false, $callerOffset=2)
Log a deprecation warning with arbitrary message text.
Class to handle database/schema/prefix specifications for IDatabase.
Postgres database abstraction layer.
getCoreSchemas()
Return schema names for temporary tables and core application tables.
determineCoreSchema( $desiredSchema)
Determine default schema for the current application Adjust this session schema search path if desire...
duplicateTableStructure( $oldName, $newName, $temporary=false, $fname=__METHOD__)
Creates a new table with structure copied from existing table.Note that unlike most database abstract...
isQueryTimeoutError( $errno)
Checks whether the cause of the error is detected to be a timeout.
doLock(string $lockName, string $method, int $timeout)
insertId()
Get the inserted value of an auto-increment row.
indexAttributes( $index, $schema=false)
databasesAreIndependent()
Returns true if DBs are assumed to be on potentially different servers.In systems like mysql/mariadb,...
doUnlock(string $lockName, string $method)
doSingleStatementQuery(string $sql)
Run a query and return a QueryStatus instance with the query result information.
streamStatementEnd(&$sql, &$newLine)
Called by sourceStream() to check if we've reached a statement end.
doLockIsFree(string $lockName, string $method)
nextSequenceValue( $seqName)
Deprecated method, calls should be removed.
doSelectDomain(DatabaseDomain $domain)
roleExists( $roleName)
Returns true if a given role (i.e.
getSchemas()
Return list of schemas which are accessible without schema name This is list does not contain magic k...
indexInfo( $table, $index, $fname=__METHOD__)
Get information about an index into an object.
schemaExists( $schema)
Query whether a given schema exists.
estimateRowCount( $table, $var=' *', $conds='', $fname=__METHOD__, $options=[], $join_conds=[])
Estimate rows in dataset Returns estimated count, based on EXPLAIN output This is not necessarily an ...
lastError()
Get the RDBMS-specific error description from the last attempted query statement.
addQuotes( $s)
Escape and quote a raw value string for use in a SQL query.stringto override
sequenceExists( $sequence, $schema=false)
wasDeadlock()
Determines if the last failure was due to a deadlock.Note that during a deadlock, the prior transacti...
currentSequenceValue( $seqName)
Return the current value of a sequence.
doMultiStatementQuery(array $sqls)
Execute a batch of query statements, aborting remaining statements if one fails.
lastErrno()
Get the RDBMS-specific error code from the last attempted query statement.
getCoreSchema()
Return schema name for core application tables.
strencode( $s)
Wrapper for addslashes()
wasLockTimeout()
Determines if the last failure was due to a lock timeout.Note that during a lock wait timeout,...
isConnectionError( $errno)
Do not use this method outside of Database/DBError classes.
indexUnique( $table, $index, $fname=__METHOD__)
Determines if a given index is unique.boolto override
open( $server, $user, $password, $db, $schema, $tablePrefix)
Open a new connection to the database (closing any existing one)
getServerVersion()
A string describing the current software version, like from mysql_get_server_info()
textFieldSize( $table, $field)
Returns the size of a text field, or -1 for "unlimited".intto override
getCurrentSchema()
Return current schema (executes SELECT current_schema()) Needs transaction.
getSearchPath()
Return search patch for schemas This is different from getSchemas() since it contain magic keywords (...
tableExists( $table, $fname=__METHOD__, $schema=false)
For backward compatibility, this function checks both tables and views.
decodeBlob( $b)
Some DBMSs return a special placeholder object representing blob fields in result objects....
encodeBlob( $b)
Some DBMSs have a special format for inserting into blob fields, they don't allow simple quoted strin...
realTableName( $name, $format='quoted')
closeConnection()
Closes underlying database connection.
getType()
Get the RDBMS type of the server (e.g.
doFlushSession( $fname)
Reset the server-side session state for named locks and table locks.
listTables( $prefix='', $fname=__METHOD__)
getSoftwareLink()
Returns a wikitext style link to the DB's website (e.g.
doInsertSelectNative( $destTable, $srcTable, array $varMap, $conds, $fname, array $insertOptions, array $selectOptions, $selectJoinConds)
INSERT SELECT wrapper $varMap must be an associative array of the form [ 'dest1' => 'source1',...
serverIsReadOnly()
bool Whether the DB server is marked as read-only server-side query} 1.28to override
Relational database abstraction object.
Definition Database.php:45
restoreErrorHandler()
Restore the previous error handler and return the last PHP error for this DB.
Definition Database.php:534
object resource null $conn
Database connection.
Definition Database.php:68
newExceptionAfterConnectError( $error)
installErrorHandler()
Set a custom error handler for logging errors during database connection.
Definition Database.php:523
affectedRows()
Get the number of rows affected by the last attempted query statement.
close( $fname=__METHOD__)
Close the database connection.
Definition Database.php:586
query( $sql, $fname=__METHOD__, $flags=0)
Run an SQL query statement and return the result.
Definition Database.php:838
getBindingHandle()
Get the underlying binding connection handle.
getDBname()
Get the current database name; null if there isn't one.
Used by Database::nextSequenceValue() so Database::insert() can detect values coming from the depreca...
return true
Definition router.php:92