25 use Wikimedia\Timestamp\ConvertibleTimestamp;
26 use Wikimedia\WaitConditionLoop;
27 use Wikimedia\AtEase\AtEase;
54 $this->port = intval( $params[
'port'] ??
null );
55 $this->keywordTableMap = $params[
'keywordTableMap'] ?? [];
57 parent::__construct( $params );
70 $sql =
"SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
71 "WHERE c.connamespace = n.oid AND conname = " .
72 $this->
addQuotes( $name ) .
" AND n.nspname = " .
83 if ( !function_exists(
'pg_connect' ) ) {
85 "Postgres functions missing, have you compiled PHP with the --with-pgsql\n" .
86 "option? (Note: if you recently installed PHP, you may need to restart your\n" .
87 "webserver and database)"
100 'dbname' => strlen( $dbName ) ? $dbName :
'postgres',
105 $connectVars[
'host'] =
$server;
107 if ( $this->port > 0 ) {
111 $connectVars[
'sslmode'] =
'require';
117 $this->conn = pg_connect( $connectString, PGSQL_CONNECT_FORCE_NEW ) ?:
null;
118 }
catch ( Exception $e ) {
124 if ( !$this->conn ) {
133 'client_encoding' =>
'UTF8',
134 'datestyle' =>
'ISO, YMD',
136 'standard_conforming_strings' =>
'on',
137 'bytea_output' =>
'escape',
138 'client_min_messages' =>
'ERROR'
140 foreach ( $variables as $var => $val ) {
144 self::QUERY_IGNORE_DBO_TRX | self::QUERY_NO_RETRY
148 $this->currentDomain =
new DatabaseDomain( $dbName, $schema, $tablePrefix );
149 }
catch ( Exception $e ) {
155 if ( $this->coreSchema === $this->currentDomain->getSchema() ) {
160 return parent::relationSchemaQualifier();
180 $this->currentDomain = $domain;
192 foreach ( $vars as $name => $value ) {
193 $s .=
"$name='" . str_replace(
"'",
"\\'", $value ) .
"' ";
200 return $this->conn ? pg_close( $this->conn ) :
true;
204 return parent::isTransactableQuery( $sql ) &&
205 !preg_match(
'/^SELECT\s+pg_(try_|)advisory_\w+\(/', $sql );
215 $sql = mb_convert_encoding( $sql,
'UTF-8' );
217 while (
$res = pg_get_result(
$conn ) ) {
218 pg_free_result(
$res );
220 if ( pg_send_query(
$conn, $sql ) ===
false ) {
221 throw new DBUnexpectedError( $this,
"Unable to post new query to PostgreSQL\n" );
223 $this->lastResultHandle = pg_get_result(
$conn );
224 if ( pg_result_error( $this->lastResultHandle ) ) {
235 PGSQL_DIAG_MESSAGE_PRIMARY,
236 PGSQL_DIAG_MESSAGE_DETAIL,
237 PGSQL_DIAG_MESSAGE_HINT,
238 PGSQL_DIAG_STATEMENT_POSITION,
239 PGSQL_DIAG_INTERNAL_POSITION,
240 PGSQL_DIAG_INTERNAL_QUERY,
242 PGSQL_DIAG_SOURCE_FILE,
243 PGSQL_DIAG_SOURCE_LINE,
244 PGSQL_DIAG_SOURCE_FUNCTION
246 foreach ( $diags as $d ) {
247 $this->queryLogger->debug( sprintf(
"PgSQL ERROR(%d): %s\n",
248 $d, pg_result_error_field( $this->lastResultHandle, $d ) ) );
253 AtEase::suppressWarnings();
255 AtEase::restoreWarnings();
262 AtEase::suppressWarnings();
264 AtEase::restoreWarnings();
265 # @todo FIXME: HACK HACK HACK HACK debug
267 # @todo hashar: not sure if the following test really trigger if the object
270 if ( pg_last_error(
$conn ) ) {
273 'SQL error: ' . htmlspecialchars( pg_last_error(
$conn ) )
281 AtEase::suppressWarnings();
283 AtEase::restoreWarnings();
286 if ( pg_last_error(
$conn ) ) {
289 'SQL error: ' . htmlspecialchars( pg_last_error(
$conn ) )
297 if (
$res ===
false ) {
301 AtEase::suppressWarnings();
303 AtEase::restoreWarnings();
306 if ( pg_last_error(
$conn ) ) {
309 'SQL error: ' . htmlspecialchars( pg_last_error(
$conn ) )
325 $res = $this->
query(
"SELECT lastval()" );
327 return is_null( $row[0] ) ? null : (int)$row[0];
336 if ( $this->lastResultHandle ) {
337 return pg_result_error( $this->lastResultHandle );
339 return pg_last_error();
347 if ( $this->lastResultHandle ) {
348 return pg_result_error_field( $this->lastResultHandle, PGSQL_DIAG_SQLSTATE );
355 if ( !$this->lastResultHandle ) {
359 return pg_affected_rows( $this->lastResultHandle );
378 $fname = __METHOD__, $options = [], $join_conds = []
382 if ( is_string( $column ) && !in_array( $column, [
'*',
'1' ] ) ) {
383 $conds[] =
"$column IS NOT NULL";
386 $options[
'EXPLAIN'] =
true;
387 $res = $this->
select( $table, $var, $conds, $fname, $options, $join_conds );
392 if ( preg_match(
'/rows=(\d+)/', $row[0], $count ) ) {
393 $rows = (int)$count[1];
400 public function indexInfo( $table, $index, $fname = __METHOD__ ) {
401 $sql =
"SELECT indexname FROM pg_indexes WHERE tablename='$table'";
406 foreach (
$res as $row ) {
407 if ( $row->indexname == $this->indexName( $index ) ) {
416 if ( $schema ===
false ) {
419 $schemas = [ $schema ];
424 foreach ( $schemas as $schema ) {
430 $sql = <<<__INDEXATTR__
434 i.indoption[s.g] as option,
437 (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
441 ON cis.oid=isub.indexrelid
443 ON cis.relnamespace = ns.oid
444 WHERE cis.relname=$eindex AND ns.nspname=$eschema) AS s,
450 ON ci.oid=i.indexrelid
452 ON ct.oid = i.indrelid
454 ON ci.relnamespace =
n.oid
456 ci.relname=$eindex AND
n.nspname=$eschema
457 AND attrelid = ct.oid
458 AND i.indkey[s.g] = attnum
459 AND i.indclass[s.g] = opcls.oid
460 AND pg_am.oid = opcls.opcmethod
465 foreach (
$res as $row ) {
478 public function indexUnique( $table, $index, $fname = __METHOD__ ) {
479 $sql =
"SELECT indexname FROM pg_indexes WHERE tablename='{$table}'" .
480 " AND indexdef LIKE 'CREATE UNIQUE%(" .
488 return $res->numRows() > 0;
492 $table, $vars, $conds =
'', $fname = __METHOD__, $options = [], $join_conds = []
494 if ( is_string( $options ) ) {
495 $options = [ $options ];
505 if ( is_array( $options ) ) {
506 $forUpdateKey = array_search(
'FOR UPDATE', $options,
true );
507 if ( $forUpdateKey !==
false && $join_conds ) {
508 unset( $options[$forUpdateKey] );
509 $options[
'FOR UPDATE'] = [];
514 $alias = key( $toCheck );
515 $name = $toCheck[$alias];
516 unset( $toCheck[$alias] );
518 $hasAlias = !is_numeric( $alias );
519 if ( !$hasAlias && is_string( $name ) ) {
523 if ( !isset( $join_conds[$alias] ) ||
524 !preg_match(
'/^(?:LEFT|RIGHT|FULL)(?: OUTER)? JOIN$/i', $join_conds[$alias][0] )
526 if ( is_array( $name ) ) {
528 $toCheck = array_merge( $toCheck, $name );
537 if ( isset( $options[
'ORDER BY'] ) && $options[
'ORDER BY'] ==
'NULL' ) {
538 unset( $options[
'ORDER BY'] );
542 return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds );
546 public function insert( $table,
$args, $fname = __METHOD__, $options = [] ) {
547 if ( !count(
$args ) ) {
552 if ( !isset( $this->numericVersion ) ) {
556 if ( !is_array( $options ) ) {
557 $options = [ $options ];
560 if ( isset(
$args[0] ) && is_array(
$args[0] ) ) {
568 $ignore = in_array(
'IGNORE', $options );
570 $sql =
"INSERT INTO $table (" . implode(
',',
$keys ) .
') VALUES ';
572 if ( $this->numericVersion >= 9.5 || !$ignore ) {
575 foreach ( $rows as $row ) {
581 $sql .=
'(' . $this->
makeList( $row ) .
')';
584 $sql .=
' ON CONFLICT DO NOTHING';
586 $this->
query( $sql, $fname );
590 $numrowsinserted = 0;
592 $tok = $this->
startAtomic(
"$fname (outer)", self::ATOMIC_CANCELABLE );
594 foreach ( $rows as $row ) {
596 $tempsql .=
'(' . $this->
makeList( $row ) .
')';
598 $this->
startAtomic(
"$fname (inner)", self::ATOMIC_CANCELABLE );
600 $this->
query( $tempsql, $fname );
607 if ( $e->errno !==
'23505' ) {
612 }
catch ( Exception $e ) {
619 $this->affectedRowCount = $numrowsinserted;
626 if ( !is_array( $options ) ) {
627 $options = [ $options ];
632 $options = array_diff( $options, [
'IGNORE' ] );
634 return parent::makeUpdateOptionsArray( $options );
656 $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__,
657 $insertOptions = [], $selectOptions = [], $selectJoinConds = []
659 if ( !is_array( $insertOptions ) ) {
660 $insertOptions = [ $insertOptions ];
663 if ( in_array(
'IGNORE', $insertOptions ) ) {
666 $destTable = $this->
tableName( $destTable );
670 array_values( $varMap ),
677 $sql =
"INSERT INTO $destTable (" . implode(
',', array_keys( $varMap ) ) .
') ' .
678 $selectSql .
' ON CONFLICT DO NOTHING';
680 $this->
query( $sql, $fname );
684 $destTable, $srcTable, $varMap, $conds, $fname,
685 $insertOptions, $selectOptions, $selectJoinConds
689 parent::nativeInsertSelect( $destTable, $srcTable, $varMap, $conds, $fname,
690 $insertOptions, $selectOptions, $selectJoinConds );
694 public function tableName( $name, $format =
'quoted' ) {
698 return parent::tableName( $name, $format );
706 return $this->keywordTableMap[$name] ?? $name;
715 return parent::tableName( $name, $format );
729 $safeseq = str_replace(
"'",
"''", $seqName );
730 $res = $this->
query(
"SELECT currval('$safeseq')" );
739 $sql =
"SELECT t.typname as ftype,a.atttypmod as size
740 FROM pg_class c, pg_attribute a, pg_type t
741 WHERE relname='$table' AND a.attrelid=c.oid AND
742 a.atttypid=t.oid and a.attname='$field'";
745 if ( $row->ftype ==
'varchar' ) {
746 $size = $row->size - 4;
755 return "$sql LIMIT $limit " . ( is_numeric( $offset ) ?
" OFFSET {$offset} " :
'' );
770 static $codes = [
'08000',
'08003',
'08006',
'08001',
'08004',
'57P01',
'57P03',
'53300' ];
772 return in_array( $errno, $codes,
true );
780 $oldName, $newName, $temporary =
false, $fname = __METHOD__
785 $temporary = $temporary ?
'TEMPORARY' :
'';
788 "CREATE $temporary TABLE $newNameE " .
789 "(LIKE $oldNameE INCLUDING DEFAULTS INCLUDING INDEXES)",
791 $this::QUERY_PSEUDO_PERMANENT
797 $res = $this->
query(
'SELECT attname FROM pg_class c'
798 .
' JOIN pg_namespace n ON (n.oid = c.relnamespace)'
799 .
' JOIN pg_attribute a ON (a.attrelid = c.oid)'
800 .
' JOIN pg_attrdef d ON (c.oid=d.adrelid and a.attnum=d.adnum)'
801 .
' WHERE relkind = \'r\''
803 .
' AND relname = ' . $this->
addQuotes( $oldName )
804 .
' AND pg_get_expr(adbin, adrelid) LIKE \'nextval(%\'',
809 $field = $row->attname;
810 $newSeq =
"{$newName}_{$field}_seq";
815 "CREATE $temporary SEQUENCE $newSeqE OWNED BY $newNameE.$fieldE",
819 "ALTER TABLE $newNameE ALTER COLUMN $fieldE SET DEFAULT nextval({$newSeqQ}::regclass)",
828 $table = $this->
tableName( $table,
'raw' );
831 'SELECT c.oid FROM pg_class c JOIN pg_namespace n ON (n.oid = c.relnamespace)'
832 .
' WHERE relkind = \'r\''
833 .
' AND nspname = ' . $this->
addQuotes( $schema )
834 .
' AND relname = ' . $this->
addQuotes( $table ),
842 $res = $this->
query(
'SELECT pg_get_expr(adbin, adrelid) AS adsrc FROM pg_attribute a'
843 .
' JOIN pg_attrdef d ON (a.attrelid=d.adrelid and a.attnum=d.adnum)'
844 .
" WHERE a.attrelid = $oid"
845 .
' AND pg_get_expr(adbin, adrelid) LIKE \'nextval(%\'',
851 'SELECT ' . preg_replace(
'/^nextval\((.+)\)$/',
'setval($1,1,false)', $row->adsrc ),
868 public function listTables( $prefix =
'', $fname = __METHOD__ ) {
869 $eschemas = implode(
',', array_map( [ $this,
'addQuotes' ], $this->
getCoreSchemas() ) );
870 $result = $this->
query(
871 "SELECT DISTINCT tablename FROM pg_tables WHERE schemaname IN ($eschemas)", $fname );
874 foreach ( $result as $table ) {
875 $vars = get_object_vars( $table );
876 $table = array_pop( $vars );
877 if ( $prefix ==
'' || strpos( $table, $prefix ) === 0 ) {
878 $endArray[] = $table;
886 $ct =
new ConvertibleTimestamp( $ts );
888 return $ct->getTimestamp( TS_POSTGRES );
910 if ( $limit ===
false ) {
911 $limit = strlen( $text ) - 1;
914 if ( $text ==
'{}' ) {
918 if ( $text[$offset] !=
'{' ) {
919 preg_match(
"/(\\{?\"([^\"\\\\]|\\\\.)*\"|[^,{}]+)+([,}]+)/",
920 $text, $match, 0, $offset );
921 $offset += strlen( $match[0] );
922 $output[] = ( $match[1][0] !=
'"'
924 : stripcslashes( substr( $match[1], 1, -1 ) ) );
925 if ( $match[3] ==
'},' ) {
931 }
while ( $limit > $offset );
941 return '[{{int:version-db-postgres-url}} PostgreSQL]';
952 $res = $this->
query(
"SELECT current_schema()", __METHOD__, self::QUERY_IGNORE_DBO_TRX );
970 "SELECT current_schemas(false)",
972 self::QUERY_IGNORE_DBO_TRX
992 $res = $this->
query(
"SHOW search_path", __METHOD__, self::QUERY_IGNORE_DBO_TRX );
997 return explode(
",", $row[0] );
1009 "SET search_path = " . implode(
", ", $search_path ),
1011 self::QUERY_IGNORE_DBO_TRX
1035 __METHOD__ .
": a transaction is currently active"
1040 if ( in_array( $desiredSchema, $this->
getSchemas() ) ) {
1041 $this->coreSchema = $desiredSchema;
1042 $this->queryLogger->debug(
1043 "Schema \"" . $desiredSchema .
"\" already in the search path\n" );
1049 $this->coreSchema = $desiredSchema;
1050 $this->queryLogger->debug(
1051 "Schema \"" . $desiredSchema .
"\" added to the search path\n" );
1055 $this->queryLogger->debug(
1056 "Schema \"" . $desiredSchema .
"\" not found, using current \"" .
1057 $this->coreSchema .
"\"\n" );
1078 if ( $this->tempSchema ) {
1083 "SELECT nspname FROM pg_catalog.pg_namespace n WHERE n.oid = pg_my_temp_schema()", __METHOD__
1087 $this->tempSchema = $row->nspname;
1095 if ( !isset( $this->numericVersion ) ) {
1097 $versionInfo = pg_version(
$conn );
1098 if ( version_compare( $versionInfo[
'client'],
'7.4.0',
'lt' ) ) {
1100 $this->numericVersion =
'7.3 or earlier';
1101 } elseif ( isset( $versionInfo[
'server'] ) ) {
1103 $this->numericVersion = $versionInfo[
'server'];
1106 $this->numericVersion = pg_parameter_status(
$conn,
'server_version' );
1122 if ( !is_array( $types ) ) {
1123 $types = [ $types ];
1125 if ( $schema ===
false ) {
1128 $schemas = [ $schema ];
1132 foreach ( $schemas as $schema ) {
1134 $sql =
"SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
1135 .
"WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
1136 .
"AND c.relkind IN ('" . implode(
"','", $types ) .
"')";
1153 public function tableExists( $table, $fname = __METHOD__, $schema =
false ) {
1163 SELECT 1 FROM pg_class, pg_namespace, pg_trigger
1164 WHERE relnamespace=pg_namespace.oid AND relkind=
'r'
1165 AND tgrelid=pg_class.oid
1166 AND nspname=%s AND relname=%s AND tgname=%s
1186 $exists = $this->
selectField(
'pg_rules',
'rulename',
1188 'rulename' => $rule,
1189 'tablename' => $table,
1194 return $exists === $rule;
1199 $sql = sprintf(
"SELECT 1 FROM information_schema.table_constraints " .
1200 "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
1219 if ( !strlen( $schema ) ) {
1224 "SELECT 1 FROM pg_catalog.pg_namespace " .
1225 "WHERE nspname = " . $this->
addQuotes( $schema ) .
" LIMIT 1",
1227 self::QUERY_IGNORE_DBO_TRX
1239 $exists = $this->
selectField(
'"pg_catalog"."pg_roles"', 1,
1240 [
'rolname' => $roleName ], __METHOD__ );
1242 return (
bool)$exists;
1271 } elseif ( $b instanceof
Blob ) {
1275 return pg_unescape_bytea( $b );
1286 if ( is_null(
$s ) ) {
1288 } elseif ( is_bool(
$s ) ) {
1289 return intval(
$s );
1290 } elseif (
$s instanceof
Blob ) {
1294 $s = pg_escape_bytea(
$conn,
$s->fetch() );
1301 return "'" . pg_escape_string(
$conn, (
string)
$s ) .
"'";
1305 $preLimitTail = $postLimitTail =
'';
1306 $startOpts = $useIndex = $ignoreIndex =
'';
1309 foreach ( $options as $key => $option ) {
1310 if ( is_numeric( $key ) ) {
1311 $noKeyOptions[$option] =
true;
1319 if ( isset( $options[
'FOR UPDATE'] ) ) {
1320 $postLimitTail .=
' FOR UPDATE OF ' .
1321 implode(
', ', array_map( [ $this,
'tableName' ], $options[
'FOR UPDATE'] ) );
1322 } elseif ( isset( $noKeyOptions[
'FOR UPDATE'] ) ) {
1323 $postLimitTail .=
' FOR UPDATE';
1326 if ( isset( $noKeyOptions[
'DISTINCT'] ) || isset( $noKeyOptions[
'DISTINCTROW'] ) ) {
1327 $startOpts .=
'DISTINCT';
1330 return [ $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ];
1334 return implode(
' || ', $stringList );
1338 $delimiter, $table, $field, $conds =
'', $options = [], $join_conds = []
1342 return '(' . $this->
selectSQLText( $table, $fld, $conds,
null, [], $join_conds ) .
')';
1346 return $field .
'::text';
1350 # Allow dollar quoting for function declarations
1351 if ( substr( $newLine, 0, 4 ) ==
'$mw$' ) {
1352 if ( $this->delimiter ) {
1353 $this->delimiter =
false;
1355 $this->delimiter =
';';
1359 return parent::streamStatementEnd( $sql, $newLine );
1364 foreach ( $write as $table ) {
1365 $tablesWrite[] = $this->
tableName( $table );
1368 foreach ( $read as $table ) {
1369 $tablesRead[] = $this->
tableName( $table );
1373 if ( $tablesWrite ) {
1375 'LOCK TABLE ONLY ' . implode(
',', $tablesWrite ) .
' IN EXCLUSIVE MODE',
1379 if ( $tablesRead ) {
1381 'LOCK TABLE ONLY ' . implode(
',', $tablesRead ) .
' IN SHARE MODE',
1390 if ( !parent::lockIsFree( $lockName, $method ) ) {
1395 $result = $this->
query(
"SELECT (CASE(pg_try_advisory_lock($key))
1396 WHEN 'f' THEN 'f' ELSE pg_advisory_unlock($key) END) AS lockstatus", $method );
1399 return ( $row->lockstatus ===
't' );
1402 public function lock( $lockName, $method, $timeout = 5 ) {
1405 $loop =
new WaitConditionLoop(
1406 function () use ( $lockName, $key, $timeout, $method ) {
1407 $res = $this->
query(
"SELECT pg_try_advisory_lock($key) AS lockstatus", $method );
1409 if ( $row->lockstatus ===
't' ) {
1410 parent::lock( $lockName, $method, $timeout );
1414 return WaitConditionLoop::CONDITION_CONTINUE;
1419 return ( $loop->invoke() === $loop::CONDITION_REACHED );
1422 public function unlock( $lockName, $method ) {
1425 $result = $this->
query(
"SELECT pg_advisory_unlock($key) as lockstatus", $method );
1428 if ( $row->lockstatus ===
't' ) {
1429 parent::unlock( $lockName, $method );
1433 $this->queryLogger->debug( __METHOD__ .
" failed to release lock\n" );
1439 $res = $this->
query(
"SHOW default_transaction_read_only", __METHOD__ );
1442 return $row ? ( strtolower( $row->default_transaction_read_only ) ===
'on' ) :
false;
1446 return [ self::ATTR_SCHEMAS_AS_TABLE_GROUPS =>
true ];
1454 return \Wikimedia\base_convert( substr( sha1( $lockName ), 0, 15 ), 16, 10 );
1461 class_alias( DatabasePostgres::class,
'DatabasePostgres' );