0s autopkgtest [16:49:30]: starting date and time: 2025-03-15 16:49:30+0000 0s autopkgtest [16:49:30]: git checkout: 325255d2 Merge branch 'pin-any-arch' into 'ubuntu/production' 0s autopkgtest [16:49:30]: host juju-7f2275-prod-proposed-migration-environment-20; command line: /home/ubuntu/autopkgtest/runner/autopkgtest --output-dir /tmp/autopkgtest-work.w9rsbaz7/out --timeout-copy=6000 --setup-commands /home/ubuntu/autopkgtest-cloud/worker-config-production/setup-canonical.sh --apt-pocket=proposed=src:glibc --apt-upgrade slony1-2 --timeout-short=300 --timeout-copy=20000 --timeout-build=20000 --env=ADT_TEST_TRIGGERS=glibc/2.41-1ubuntu2 -- ssh -s /home/ubuntu/autopkgtest/ssh-setup/nova -- --flavor autopkgtest-s390x --security-groups autopkgtest-juju-7f2275-prod-proposed-migration-environment-20@bos03-s390x-11.secgroup --name adt-plucky-s390x-slony1-2-20250315-164930-juju-7f2275-prod-proposed-migration-environment-20-1994cbae-3f66-41d1-ae44-d38abdc4bfea --image adt/ubuntu-plucky-s390x-server --keyname testbed-juju-7f2275-prod-proposed-migration-environment-20 --net-id=net_prod-proposed-migration-s390x -e TERM=linux -e ''"'"'http_proxy=http://squid.internal:3128'"'"'' -e ''"'"'https_proxy=http://squid.internal:3128'"'"'' -e ''"'"'no_proxy=127.0.0.1,127.0.1.1,login.ubuntu.com,localhost,localdomain,novalocal,internal,archive.ubuntu.com,ports.ubuntu.com,security.ubuntu.com,ddebs.ubuntu.com,changelogs.ubuntu.com,keyserver.ubuntu.com,launchpadlibrarian.net,launchpadcontent.net,launchpad.net,10.24.0.0/24,keystone.ps5.canonical.com,objectstorage.prodstack5.canonical.com,radosgw.ps5.canonical.com'"'"'' --mirror=http://ftpmaster.internal/ubuntu/ 145s autopkgtest [16:51:55]: testbed dpkg architecture: s390x 145s autopkgtest [16:51:55]: testbed apt version: 2.9.33 146s autopkgtest [16:51:56]: @@@@@@@@@@@@@@@@@@@@ test bed setup 146s autopkgtest [16:51:56]: testbed release detected to be: None 146s autopkgtest [16:51:56]: updating testbed package index (apt update) 147s Get:1 http://ftpmaster.internal/ubuntu plucky-proposed InRelease [126 kB] 147s Hit:2 http://ftpmaster.internal/ubuntu plucky InRelease 147s Hit:3 http://ftpmaster.internal/ubuntu plucky-updates InRelease 147s Hit:4 http://ftpmaster.internal/ubuntu plucky-security InRelease 147s Get:5 http://ftpmaster.internal/ubuntu plucky-proposed/universe Sources [379 kB] 148s Get:6 http://ftpmaster.internal/ubuntu plucky-proposed/main Sources [99.7 kB] 148s Get:7 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse Sources [15.8 kB] 148s Get:8 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x Packages [113 kB] 148s Get:9 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x c-n-f Metadata [1824 B] 148s Get:10 http://ftpmaster.internal/ubuntu plucky-proposed/restricted s390x c-n-f Metadata [116 B] 148s Get:11 http://ftpmaster.internal/ubuntu plucky-proposed/universe s390x Packages [320 kB] 148s Get:12 http://ftpmaster.internal/ubuntu plucky-proposed/universe s390x c-n-f Metadata [13.4 kB] 148s Get:13 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse s390x Packages [3776 B] 148s Get:14 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse s390x c-n-f Metadata [240 B] 149s Fetched 1073 kB in 2s (604 kB/s) 149s Reading package lists... 150s Reading package lists... 150s Building dependency tree... 150s Reading state information... 150s Calculating upgrade... 150s Calculating upgrade... 150s The following packages were automatically installed and are no longer required: 150s libnsl2 libpython3.12-minimal libpython3.12-stdlib libpython3.12t64 150s linux-headers-6.11.0-8 linux-headers-6.11.0-8-generic 150s linux-modules-6.11.0-8-generic linux-tools-6.11.0-8 150s linux-tools-6.11.0-8-generic 150s Use 'sudo apt autoremove' to remove them. 150s The following packages will be upgraded: 150s pinentry-curses python3-jinja2 strace 151s 3 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 151s Need to get 652 kB of archives. 151s After this operation, 27.6 kB of additional disk space will be used. 151s Get:1 http://ftpmaster.internal/ubuntu plucky/main s390x strace s390x 6.13+ds-1ubuntu1 [500 kB] 151s Get:2 http://ftpmaster.internal/ubuntu plucky/main s390x pinentry-curses s390x 1.3.1-2ubuntu3 [42.9 kB] 152s Get:3 http://ftpmaster.internal/ubuntu plucky/main s390x python3-jinja2 all 3.1.5-2ubuntu1 [109 kB] 152s Fetched 652 kB in 1s (509 kB/s) 152s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81428 files and directories currently installed.) 152s Preparing to unpack .../strace_6.13+ds-1ubuntu1_s390x.deb ... 152s Unpacking strace (6.13+ds-1ubuntu1) over (6.11-0ubuntu1) ... 152s Preparing to unpack .../pinentry-curses_1.3.1-2ubuntu3_s390x.deb ... 152s Unpacking pinentry-curses (1.3.1-2ubuntu3) over (1.3.1-2ubuntu2) ... 152s Preparing to unpack .../python3-jinja2_3.1.5-2ubuntu1_all.deb ... 152s Unpacking python3-jinja2 (3.1.5-2ubuntu1) over (3.1.5-2) ... 152s Setting up pinentry-curses (1.3.1-2ubuntu3) ... 152s Setting up python3-jinja2 (3.1.5-2ubuntu1) ... 152s Setting up strace (6.13+ds-1ubuntu1) ... 152s Processing triggers for man-db (2.13.0-1) ... 153s Reading package lists... 153s Building dependency tree... 153s Reading state information... 153s Solving dependencies... 153s The following packages will be REMOVED: 153s libnsl2* libpython3.12-minimal* libpython3.12-stdlib* libpython3.12t64* 153s linux-headers-6.11.0-8* linux-headers-6.11.0-8-generic* 153s linux-modules-6.11.0-8-generic* linux-tools-6.11.0-8* 153s linux-tools-6.11.0-8-generic* 153s 0 upgraded, 0 newly installed, 9 to remove and 5 not upgraded. 153s After this operation, 167 MB disk space will be freed. 153s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81428 files and directories currently installed.) 153s Removing linux-tools-6.11.0-8-generic (6.11.0-8.8) ... 153s Removing linux-tools-6.11.0-8 (6.11.0-8.8) ... 153s Removing libpython3.12t64:s390x (3.12.9-1) ... 153s Removing libpython3.12-stdlib:s390x (3.12.9-1) ... 153s Removing libnsl2:s390x (1.3.0-3build3) ... 153s Removing libpython3.12-minimal:s390x (3.12.9-1) ... 153s Removing linux-headers-6.11.0-8-generic (6.11.0-8.8) ... 154s Removing linux-headers-6.11.0-8 (6.11.0-8.8) ... 154s Removing linux-modules-6.11.0-8-generic (6.11.0-8.8) ... 154s Processing triggers for libc-bin (2.41-1ubuntu1) ... 154s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 56328 files and directories currently installed.) 154s Purging configuration files for libpython3.12-minimal:s390x (3.12.9-1) ... 154s Purging configuration files for linux-modules-6.11.0-8-generic (6.11.0-8.8) ... 155s autopkgtest [16:52:05]: upgrading testbed (apt dist-upgrade and autopurge) 155s Reading package lists... 155s Building dependency tree... 155s Reading state information... 155s Calculating upgrade...Starting pkgProblemResolver with broken count: 0 155s Starting 2 pkgProblemResolver with broken count: 0 155s Done 155s Entering ResolveByKeep 155s 156s Calculating upgrade... 156s The following packages will be upgraded: 156s libc-bin libc-dev-bin libc6 libc6-dev locales 156s 5 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 156s Need to get 9512 kB of archives. 156s After this operation, 8192 B of additional disk space will be used. 156s Get:1 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x libc6-dev s390x 2.41-1ubuntu2 [1678 kB] 158s Get:2 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x libc-dev-bin s390x 2.41-1ubuntu2 [24.3 kB] 158s Get:3 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x libc6 s390x 2.41-1ubuntu2 [2892 kB] 162s Get:4 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x libc-bin s390x 2.41-1ubuntu2 [671 kB] 162s Get:5 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x locales all 2.41-1ubuntu2 [4246 kB] 168s Preconfiguring packages ... 168s Fetched 9512 kB in 12s (772 kB/s) 168s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 56326 files and directories currently installed.) 168s Preparing to unpack .../libc6-dev_2.41-1ubuntu2_s390x.deb ... 168s Unpacking libc6-dev:s390x (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 168s Preparing to unpack .../libc-dev-bin_2.41-1ubuntu2_s390x.deb ... 168s Unpacking libc-dev-bin (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 168s Preparing to unpack .../libc6_2.41-1ubuntu2_s390x.deb ... 168s Unpacking libc6:s390x (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 169s Setting up libc6:s390x (2.41-1ubuntu2) ... 169s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 56326 files and directories currently installed.) 169s Preparing to unpack .../libc-bin_2.41-1ubuntu2_s390x.deb ... 169s Unpacking libc-bin (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 169s Setting up libc-bin (2.41-1ubuntu2) ... 169s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 56326 files and directories currently installed.) 169s Preparing to unpack .../locales_2.41-1ubuntu2_all.deb ... 169s Unpacking locales (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 169s Setting up locales (2.41-1ubuntu2) ... 170s Generating locales (this might take a while)... 170s en_US.UTF-8... done 170s Generation complete. 170s Setting up libc-dev-bin (2.41-1ubuntu2) ... 170s Setting up libc6-dev:s390x (2.41-1ubuntu2) ... 170s Processing triggers for man-db (2.13.0-1) ... 171s Processing triggers for systemd (257.3-1ubuntu3) ... 172s Reading package lists... 172s Building dependency tree... 172s Reading state information... 172s Starting pkgProblemResolver with broken count: 0 172s Starting 2 pkgProblemResolver with broken count: 0 172s Done 172s Solving dependencies... 172s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 173s autopkgtest [16:52:22]: rebooting testbed after setup commands that affected boot 193s autopkgtest [16:52:43]: testbed running kernel: Linux 6.14.0-10-generic #10-Ubuntu SMP Wed Mar 12 14:53:49 UTC 2025 195s autopkgtest [16:52:45]: @@@@@@@@@@@@@@@@@@@@ apt-source slony1-2 199s Get:1 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (dsc) [2462 B] 199s Get:2 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (tar) [1465 kB] 199s Get:3 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (diff) [17.3 kB] 199s gpgv: Signature made Thu Sep 19 09:07:19 2024 UTC 199s gpgv: using RSA key 5C48FE6157F49179597087C64C5A6BAB12D2A7AE 199s gpgv: Can't check signature: No public key 199s dpkg-source: warning: cannot verify inline signature for ./slony1-2_2.2.11-6.dsc: no acceptable signature found 200s autopkgtest [16:52:50]: testing package slony1-2 version 2.2.11-6 200s autopkgtest [16:52:50]: build not needed 201s autopkgtest [16:52:51]: test load-functions: preparing testbed 201s Reading package lists... 201s Building dependency tree... 201s Reading state information... 201s Starting pkgProblemResolver with broken count: 0 201s Starting 2 pkgProblemResolver with broken count: 0 201s Done 202s The following NEW packages will be installed: 202s libio-pty-perl libipc-run-perl libjson-perl libllvm20 libpq5 libxslt1.1 202s postgresql-17 postgresql-17-slony1-2 postgresql-client-17 202s postgresql-client-common postgresql-common postgresql-common-dev 202s slony1-2-bin slony1-2-doc ssl-cert 202s 0 upgraded, 15 newly installed, 0 to remove and 0 not upgraded. 202s Need to get 50.3 MB of archives. 202s After this operation, 215 MB of additional disk space will be used. 202s Get:1 http://ftpmaster.internal/ubuntu plucky/main s390x libjson-perl all 4.10000-1 [81.9 kB] 202s Get:2 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-client-common all 274 [47.6 kB] 202s Get:3 http://ftpmaster.internal/ubuntu plucky/main s390x libio-pty-perl s390x 1:1.20-1build3 [31.6 kB] 202s Get:4 http://ftpmaster.internal/ubuntu plucky/main s390x libipc-run-perl all 20231003.0-2 [91.5 kB] 202s Get:5 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-common-dev all 274 [73.0 kB] 202s Get:6 http://ftpmaster.internal/ubuntu plucky/main s390x ssl-cert all 1.1.3ubuntu1 [18.7 kB] 202s Get:7 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-common all 274 [101 kB] 203s Get:8 http://ftpmaster.internal/ubuntu plucky/main s390x libllvm20 s390x 1:20.1.0~+rc2-1~exp2ubuntu0.4 [31.3 MB] 239s Get:9 http://ftpmaster.internal/ubuntu plucky/main s390x libpq5 s390x 17.4-1 [147 kB] 239s Get:10 http://ftpmaster.internal/ubuntu plucky/main s390x libxslt1.1 s390x 1.1.39-0exp1ubuntu2 [169 kB] 239s Get:11 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-client-17 s390x 17.4-1 [1367 kB] 240s Get:12 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-17 s390x 17.4-1 [16.3 MB] 258s Get:13 http://ftpmaster.internal/ubuntu plucky/universe s390x postgresql-17-slony1-2 s390x 2.2.11-6 [21.4 kB] 258s Get:14 http://ftpmaster.internal/ubuntu plucky/universe s390x slony1-2-bin s390x 2.2.11-6 [228 kB] 258s Get:15 http://ftpmaster.internal/ubuntu plucky/universe s390x slony1-2-doc all 2.2.11-6 [327 kB] 258s Preconfiguring packages ... 259s Fetched 50.3 MB in 57s (889 kB/s) 259s Selecting previously unselected package libjson-perl. 259s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 56326 files and directories currently installed.) 259s Preparing to unpack .../00-libjson-perl_4.10000-1_all.deb ... 259s Unpacking libjson-perl (4.10000-1) ... 259s Selecting previously unselected package postgresql-client-common. 259s Preparing to unpack .../01-postgresql-client-common_274_all.deb ... 259s Unpacking postgresql-client-common (274) ... 259s Selecting previously unselected package libio-pty-perl. 259s Preparing to unpack .../02-libio-pty-perl_1%3a1.20-1build3_s390x.deb ... 259s Unpacking libio-pty-perl (1:1.20-1build3) ... 259s Selecting previously unselected package libipc-run-perl. 259s Preparing to unpack .../03-libipc-run-perl_20231003.0-2_all.deb ... 259s Unpacking libipc-run-perl (20231003.0-2) ... 259s Selecting previously unselected package postgresql-common-dev. 259s Preparing to unpack .../04-postgresql-common-dev_274_all.deb ... 259s Unpacking postgresql-common-dev (274) ... 259s Selecting previously unselected package ssl-cert. 259s Preparing to unpack .../05-ssl-cert_1.1.3ubuntu1_all.deb ... 259s Unpacking ssl-cert (1.1.3ubuntu1) ... 259s Selecting previously unselected package postgresql-common. 259s Preparing to unpack .../06-postgresql-common_274_all.deb ... 259s Adding 'diversion of /usr/bin/pg_config to /usr/bin/pg_config.libpq-dev by postgresql-common' 259s Unpacking postgresql-common (274) ... 259s Selecting previously unselected package libllvm20:s390x. 259s Preparing to unpack .../07-libllvm20_1%3a20.1.0~+rc2-1~exp2ubuntu0.4_s390x.deb ... 259s Unpacking libllvm20:s390x (1:20.1.0~+rc2-1~exp2ubuntu0.4) ... 259s Selecting previously unselected package libpq5:s390x. 259s Preparing to unpack .../08-libpq5_17.4-1_s390x.deb ... 259s Unpacking libpq5:s390x (17.4-1) ... 259s Selecting previously unselected package libxslt1.1:s390x. 259s Preparing to unpack .../09-libxslt1.1_1.1.39-0exp1ubuntu2_s390x.deb ... 259s Unpacking libxslt1.1:s390x (1.1.39-0exp1ubuntu2) ... 259s Selecting previously unselected package postgresql-client-17. 259s Preparing to unpack .../10-postgresql-client-17_17.4-1_s390x.deb ... 259s Unpacking postgresql-client-17 (17.4-1) ... 259s Selecting previously unselected package postgresql-17. 259s Preparing to unpack .../11-postgresql-17_17.4-1_s390x.deb ... 259s Unpacking postgresql-17 (17.4-1) ... 260s Selecting previously unselected package postgresql-17-slony1-2. 260s Preparing to unpack .../12-postgresql-17-slony1-2_2.2.11-6_s390x.deb ... 260s Unpacking postgresql-17-slony1-2 (2.2.11-6) ... 260s Selecting previously unselected package slony1-2-bin. 260s Preparing to unpack .../13-slony1-2-bin_2.2.11-6_s390x.deb ... 260s Unpacking slony1-2-bin (2.2.11-6) ... 260s Selecting previously unselected package slony1-2-doc. 260s Preparing to unpack .../14-slony1-2-doc_2.2.11-6_all.deb ... 260s Unpacking slony1-2-doc (2.2.11-6) ... 260s Setting up postgresql-client-common (274) ... 260s Setting up libio-pty-perl (1:1.20-1build3) ... 260s Setting up libpq5:s390x (17.4-1) ... 260s Setting up ssl-cert (1.1.3ubuntu1) ... 260s Created symlink '/etc/systemd/system/multi-user.target.wants/ssl-cert.service' → '/usr/lib/systemd/system/ssl-cert.service'. 260s Setting up libllvm20:s390x (1:20.1.0~+rc2-1~exp2ubuntu0.4) ... 260s Setting up libipc-run-perl (20231003.0-2) ... 260s Setting up libjson-perl (4.10000-1) ... 260s Setting up libxslt1.1:s390x (1.1.39-0exp1ubuntu2) ... 260s Setting up slony1-2-doc (2.2.11-6) ... 260s Setting up postgresql-common-dev (274) ... 260s Setting up postgresql-client-17 (17.4-1) ... 261s update-alternatives: using /usr/share/postgresql/17/man/man1/psql.1.gz to provide /usr/share/man/man1/psql.1.gz (psql.1.gz) in auto mode 261s Setting up postgresql-common (274) ... 261s Creating config file /etc/postgresql-common/createcluster.conf with new version 261s Building PostgreSQL dictionaries from installed myspell/hunspell packages... 261s Removing obsolete dictionary files: 261s Created symlink '/etc/systemd/system/multi-user.target.wants/postgresql.service' → '/usr/lib/systemd/system/postgresql.service'. 262s Setting up slony1-2-bin (2.2.11-6) ... 262s Setting up postgresql-17 (17.4-1) ... 262s Creating new PostgreSQL cluster 17/main ... 262s /usr/lib/postgresql/17/bin/initdb -D /var/lib/postgresql/17/main --auth-local peer --auth-host scram-sha-256 --no-instructions 262s The files belonging to this database system will be owned by user "postgres". 262s This user must also own the server process. 262s 262s The database cluster will be initialized with locale "C.UTF-8". 262s The default database encoding has accordingly been set to "UTF8". 262s The default text search configuration will be set to "english". 262s 262s Data page checksums are disabled. 262s 262s fixing permissions on existing directory /var/lib/postgresql/17/main ... ok 262s creating subdirectories ... ok 262s selecting dynamic shared memory implementation ... posix 262s selecting default "max_connections" ... 100 262s selecting default "shared_buffers" ... 128MB 262s selecting default time zone ... Etc/UTC 262s creating configuration files ... ok 263s running bootstrap script ... ok 263s performing post-bootstrap initialization ... ok 263s syncing data to disk ... ok 266s Setting up postgresql-17-slony1-2 (2.2.11-6) ... 266s Processing triggers for man-db (2.13.0-1) ... 266s Processing triggers for libc-bin (2.41-1ubuntu2) ... 268s autopkgtest [16:53:58]: test load-functions: [----------------------- 268s ### PostgreSQL 17 psql ### 268s Creating new PostgreSQL cluster 17/regress ... 271s create table public.sl_node ( 271s no_id int4, 271s no_active bool, 271s no_comment text, 271s no_failed bool, 271s CONSTRAINT "sl_node-pkey" 271s PRIMARY KEY (no_id) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_node is 'Holds the list of nodes associated with this namespace.'; 271s COMMENT 271s comment on column public.sl_node.no_id is 'The unique ID number for the node'; 271s COMMENT 271s comment on column public.sl_node.no_active is 'Is the node active in replication yet?'; 271s COMMENT 271s comment on column public.sl_node.no_comment is 'A human-oriented description of the node'; 271s COMMENT 271s create table public.sl_nodelock ( 271s nl_nodeid int4, 271s nl_conncnt serial, 271s nl_backendpid int4, 271s CONSTRAINT "sl_nodelock-pkey" 271s PRIMARY KEY (nl_nodeid, nl_conncnt) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_nodelock is 'Used to prevent multiple slon instances and to identify the backends to kill in terminateNodeConnections().'; 271s COMMENT 271s comment on column public.sl_nodelock.nl_nodeid is 'Clients node_id'; 271s COMMENT 271s comment on column public.sl_nodelock.nl_conncnt is 'Clients connection number'; 271s COMMENT 271s comment on column public.sl_nodelock.nl_backendpid is 'PID of database backend owning this lock'; 271s COMMENT 271s create table public.sl_set ( 271s set_id int4, 271s set_origin int4, 271s set_locked bigint, 271s set_comment text, 271s CONSTRAINT "sl_set-pkey" 271s PRIMARY KEY (set_id), 271s CONSTRAINT "set_origin-no_id-ref" 271s FOREIGN KEY (set_origin) 271s REFERENCES public.sl_node (no_id) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_set is 'Holds definitions of replication sets.'; 271s COMMENT 271s comment on column public.sl_set.set_id is 'A unique ID number for the set.'; 271s COMMENT 271s comment on column public.sl_set.set_origin is 271s 'The ID number of the source node for the replication set.'; 271s COMMENT 271s comment on column public.sl_set.set_locked is 'Transaction ID where the set was locked.'; 271s COMMENT 271s comment on column public.sl_set.set_comment is 'A human-oriented description of the set.'; 271s COMMENT 271s create table public.sl_setsync ( 271s ssy_setid int4, 271s ssy_origin int4, 271s ssy_seqno int8, 271s ssy_snapshot "pg_catalog".txid_snapshot, 271s ssy_action_list text, 271s CONSTRAINT "sl_setsync-pkey" 271s PRIMARY KEY (ssy_setid), 271s CONSTRAINT "ssy_setid-set_id-ref" 271s FOREIGN KEY (ssy_setid) 271s REFERENCES public.sl_set (set_id), 271s CONSTRAINT "ssy_origin-no_id-ref" 271s FOREIGN KEY (ssy_origin) 271s REFERENCES public.sl_node (no_id) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_setsync is 'SYNC information'; 271s COMMENT 271s comment on column public.sl_setsync.ssy_setid is 'ID number of the replication set'; 271s COMMENT 271s comment on column public.sl_setsync.ssy_origin is 'ID number of the node'; 271s COMMENT 271s comment on column public.sl_setsync.ssy_seqno is 'Slony-I sequence number'; 271s COMMENT 271s comment on column public.sl_setsync.ssy_snapshot is 'TXID in provider system seen by the event'; 271s COMMENT 271s comment on column public.sl_setsync.ssy_action_list is 'action list used during the subscription process. At the time a subscriber copies over data from the origin, it sees all tables in a state somewhere between two SYNC events. Therefore this list must contains all log_actionseqs that are visible at that time, whose operations have therefore already been included in the data copied at the time the initial data copy is done. Those actions may therefore be filtered out of the first SYNC done after subscribing.'; 271s COMMENT 271s create table public.sl_table ( 271s tab_id int4, 271s tab_reloid oid UNIQUE NOT NULL, 271s tab_relname name NOT NULL, 271s tab_nspname name NOT NULL, 271s tab_set int4, 271s tab_idxname name NOT NULL, 271s tab_altered boolean NOT NULL, 271s tab_comment text, 271s CONSTRAINT "sl_table-pkey" 271s PRIMARY KEY (tab_id), 271s CONSTRAINT "tab_set-set_id-ref" 271s FOREIGN KEY (tab_set) 271s REFERENCES public.sl_set (set_id) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_table is 'Holds information about the tables being replicated.'; 271s COMMENT 271s comment on column public.sl_table.tab_id is 'Unique key for Slony-I to use to identify the table'; 271s COMMENT 271s comment on column public.sl_table.tab_reloid is 'The OID of the table in pg_catalog.pg_class.oid'; 271s COMMENT 271s comment on column public.sl_table.tab_relname is 'The name of the table in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 271s COMMENT 271s comment on column public.sl_table.tab_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 271s COMMENT 271s comment on column public.sl_table.tab_set is 'ID of the replication set the table is in'; 271s COMMENT 271s comment on column public.sl_table.tab_idxname is 'The name of the primary index of the table'; 271s COMMENT 271s comment on column public.sl_table.tab_altered is 'Has the table been modified for replication?'; 271s COMMENT 271s comment on column public.sl_table.tab_comment is 'Human-oriented description of the table'; 271s COMMENT 271s create table public.sl_sequence ( 271s seq_id int4, 271s seq_reloid oid UNIQUE NOT NULL, 271s seq_relname name NOT NULL, 271s seq_nspname name NOT NULL, 271s seq_set int4, 271s seq_comment text, 271s CONSTRAINT "sl_sequence-pkey" 271s PRIMARY KEY (seq_id), 271s CONSTRAINT "seq_set-set_id-ref" 271s FOREIGN KEY (seq_set) 271s REFERENCES public.sl_set (set_id) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_sequence is 'Similar to sl_table, each entry identifies a sequence being replicated.'; 271s COMMENT 271s comment on column public.sl_sequence.seq_id is 'An internally-used ID for Slony-I to use in its sequencing of updates'; 271s COMMENT 271s comment on column public.sl_sequence.seq_reloid is 'The OID of the sequence object'; 271s COMMENT 271s comment on column public.sl_sequence.seq_relname is 'The name of the sequence in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 271s COMMENT 271s comment on column public.sl_sequence.seq_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 271s COMMENT 271s comment on column public.sl_sequence.seq_set is 'Indicates which replication set the object is in'; 271s COMMENT 271s comment on column public.sl_sequence.seq_comment is 'A human-oriented comment'; 271s COMMENT 271s create table public.sl_path ( 271s pa_server int4, 271s pa_client int4, 271s pa_conninfo text NOT NULL, 271s pa_connretry int4, 271s CONSTRAINT "sl_path-pkey" 271s PRIMARY KEY (pa_server, pa_client), 271s CONSTRAINT "pa_server-no_id-ref" 271s FOREIGN KEY (pa_server) 271s REFERENCES public.sl_node (no_id), 271s CONSTRAINT "pa_client-no_id-ref" 271s FOREIGN KEY (pa_client) 271s REFERENCES public.sl_node (no_id) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_path is 'Holds connection information for the paths between nodes, and the synchronisation delay'; 271s COMMENT 271s comment on column public.sl_path.pa_server is 'The Node ID # (from sl_node.no_id) of the data source'; 271s COMMENT 271s comment on column public.sl_path.pa_client is 'The Node ID # (from sl_node.no_id) of the data target'; 271s COMMENT 271s comment on column public.sl_path.pa_conninfo is 'The PostgreSQL connection string used to connect to the source node.'; 271s COMMENT 271s comment on column public.sl_path.pa_connretry is 'The synchronisation delay, in seconds'; 271s COMMENT 271s create table public.sl_listen ( 271s li_origin int4, 271s li_provider int4, 271s li_receiver int4, 271s CONSTRAINT "sl_listen-pkey" 271s PRIMARY KEY (li_origin, li_provider, li_receiver), 271s CONSTRAINT "li_origin-no_id-ref" 271s FOREIGN KEY (li_origin) 271s REFERENCES public.sl_node (no_id), 271s CONSTRAINT "sl_listen-sl_path-ref" 271s FOREIGN KEY (li_provider, li_receiver) 271s REFERENCES public.sl_path (pa_server, pa_client) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_listen is 'Indicates how nodes listen to events from other nodes in the Slony-I network.'; 271s COMMENT 271s comment on column public.sl_listen.li_origin is 'The ID # (from sl_node.no_id) of the node this listener is operating on'; 271s COMMENT 271s comment on column public.sl_listen.li_provider is 'The ID # (from sl_node.no_id) of the source node for this listening event'; 271s COMMENT 271s comment on column public.sl_listen.li_receiver is 'The ID # (from sl_node.no_id) of the target node for this listening event'; 271s COMMENT 271s create table public.sl_subscribe ( 271s sub_set int4, 271s sub_provider int4, 271s sub_receiver int4, 271s sub_forward bool, 271s sub_active bool, 271s CONSTRAINT "sl_subscribe-pkey" 271s PRIMARY KEY (sub_receiver, sub_set), 271s CONSTRAINT "sl_subscribe-sl_path-ref" 271s FOREIGN KEY (sub_provider, sub_receiver) 271s REFERENCES public.sl_path (pa_server, pa_client), 271s CONSTRAINT "sub_set-set_id-ref" 271s FOREIGN KEY (sub_set) 271s REFERENCES public.sl_set (set_id) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_subscribe is 'Holds a list of subscriptions on sets'; 271s COMMENT 271s comment on column public.sl_subscribe.sub_set is 'ID # (from sl_set) of the set being subscribed to'; 271s COMMENT 271s comment on column public.sl_subscribe.sub_provider is 'ID# (from sl_node) of the node providing data'; 271s COMMENT 271s comment on column public.sl_subscribe.sub_receiver is 'ID# (from sl_node) of the node receiving data from the provider'; 271s COMMENT 271s comment on column public.sl_subscribe.sub_forward is 'Does this provider keep data in sl_log_1/sl_log_2 to allow it to be a provider for other nodes?'; 271s COMMENT 271s comment on column public.sl_subscribe.sub_active is 'Has this subscription been activated? This is not set on the subscriber until AFTER the subscriber has received COPY data from the provider'; 271s COMMENT 271s create table public.sl_event ( 271s ev_origin int4, 271s ev_seqno int8, 271s ev_timestamp timestamptz, 271s ev_snapshot "pg_catalog".txid_snapshot, 271s ev_type text, 271s ev_data1 text, 271s ev_data2 text, 271s ev_data3 text, 271s ev_data4 text, 271s ev_data5 text, 271s ev_data6 text, 271s ev_data7 text, 271s ev_data8 text, 271s CONSTRAINT "sl_event-pkey" 271s PRIMARY KEY (ev_origin, ev_seqno) 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_event is 'Holds information about replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_confirm table.'; 271s COMMENT 271s comment on column public.sl_event.ev_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 271s COMMENT 271s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 271s COMMENT 271s comment on column public.sl_event.ev_timestamp is 'When this event record was created'; 271s COMMENT 271s comment on column public.sl_event.ev_snapshot is 'TXID snapshot on provider node for this event'; 271s COMMENT 271s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 271s COMMENT 271s comment on column public.sl_event.ev_type is 'The type of event this record is for. 271s SYNC = Synchronise 271s STORE_NODE = 271s ENABLE_NODE = 271s DROP_NODE = 271s STORE_PATH = 271s DROP_PATH = 271s STORE_LISTEN = 271s DROP_LISTEN = 271s STORE_SET = 271s DROP_SET = 271s MERGE_SET = 271s SET_ADD_TABLE = 271s SET_ADD_SEQUENCE = 271s STORE_TRIGGER = 271s DROP_TRIGGER = 271s MOVE_SET = 271s ACCEPT_SET = 271s SET_DROP_TABLE = 271s SET_DROP_SEQUENCE = 271s SET_MOVE_TABLE = 271s SET_MOVE_SEQUENCE = 271s FAILOVER_SET = 271s SUBSCRIBE_SET = 271s ENABLE_SUBSCRIPTION = 271s UNSUBSCRIBE_SET = 271s DDL_SCRIPT = 271s ADJUST_SEQ = 271s RESET_CONFIG = 271s '; 271s COMMENT 271s comment on column public.sl_event.ev_data1 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s comment on column public.sl_event.ev_data2 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s comment on column public.sl_event.ev_data3 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s comment on column public.sl_event.ev_data4 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s comment on column public.sl_event.ev_data5 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s comment on column public.sl_event.ev_data6 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s comment on column public.sl_event.ev_data7 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s comment on column public.sl_event.ev_data8 is 'Data field containing an argument needed to process the event'; 271s COMMENT 271s create table public.sl_confirm ( 271s con_origin int4, 271s con_received int4, 271s con_seqno int8, 271s con_timestamp timestamptz DEFAULT timeofday()::timestamptz 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_confirm is 'Holds confirmation of replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_event table.'; 271s COMMENT 271s comment on column public.sl_confirm.con_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 271s COMMENT 271s comment on column public.sl_confirm.con_seqno is 'The ID # for the event'; 271s COMMENT 271s comment on column public.sl_confirm.con_timestamp is 'When this event was confirmed'; 271s COMMENT 271s create index sl_confirm_idx1 on public.sl_confirm 271s (con_origin, con_received, con_seqno); 271s CREATE INDEX 271s create index sl_confirm_idx2 on public.sl_confirm 271s (con_received, con_seqno); 271s CREATE INDEX 271s create table public.sl_seqlog ( 271s seql_seqid int4, 271s seql_origin int4, 271s seql_ev_seqno int8, 271s seql_last_value int8 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_seqlog is 'Log of Sequence updates'; 271s COMMENT 271s comment on column public.sl_seqlog.seql_seqid is 'Sequence ID'; 271s COMMENT 271s comment on column public.sl_seqlog.seql_origin is 'Publisher node at which the sequence originates'; 271s COMMENT 271s comment on column public.sl_seqlog.seql_ev_seqno is 'Slony-I Event with which this sequence update is associated'; 271s COMMENT 271s comment on column public.sl_seqlog.seql_last_value is 'Last value published for this sequence'; 271s COMMENT 271s create index sl_seqlog_idx on public.sl_seqlog 271s (seql_origin, seql_ev_seqno, seql_seqid); 271s CREATE INDEX 271s create function public.sequenceLastValue(p_seqname text) returns int8 271s as $$ 271s declare 271s v_seq_row record; 271s begin 271s for v_seq_row in execute 'select last_value from ' || public.slon_quote_input(p_seqname) 271s loop 271s return v_seq_row.last_value; 271s end loop; 271s 271s -- not reached 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.sequenceLastValue(p_seqname text) is 271s 'sequenceLastValue(p_seqname) 271s 271s Utility function used in sl_seqlastvalue view to compactly get the 271s last value from the requested sequence.'; 271s COMMENT 271s create table public.sl_log_1 ( 271s log_origin int4, 271s log_txid bigint, 271s log_tableid int4, 271s log_actionseq int8, 271s log_tablenspname text, 271s log_tablerelname text, 271s log_cmdtype "char", 271s log_cmdupdncols int4, 271s log_cmdargs text[] 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s create index sl_log_1_idx1 on public.sl_log_1 271s (log_origin, log_txid, log_actionseq); 271s CREATE INDEX 271s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 271s COMMENT 271s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 271s COMMENT 271s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 271s COMMENT 271s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 271s COMMENT 271s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 271s COMMENT 271s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 271s COMMENT 271s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 271s COMMENT 271s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 271s COMMENT 271s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 271s COMMENT 271s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 271s COMMENT 271s create table public.sl_log_2 ( 271s log_origin int4, 271s log_txid bigint, 271s log_tableid int4, 271s log_actionseq int8, 271s log_tablenspname text, 271s log_tablerelname text, 271s log_cmdtype "char", 271s log_cmdupdncols int4, 271s log_cmdargs text[] 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s create index sl_log_2_idx1 on public.sl_log_2 271s (log_origin, log_txid, log_actionseq); 271s CREATE INDEX 271s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 271s COMMENT 271s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 271s COMMENT 271s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 271s COMMENT 271s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 271s COMMENT 271s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 271s COMMENT 271s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 271s COMMENT 271s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 271s COMMENT 271s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 271s COMMENT 271s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 271s COMMENT 271s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 271s COMMENT 271s create table public.sl_log_script ( 271s log_origin int4, 271s log_txid bigint, 271s log_actionseq int8, 271s log_cmdtype "char", 271s log_cmdargs text[] 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s create index sl_log_script_idx1 on public.sl_log_script 271s (log_origin, log_txid, log_actionseq); 271s CREATE INDEX 271s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 271s COMMENT 271s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 271s COMMENT 271s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 271s COMMENT 271s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 271s COMMENT 271s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 271s COMMENT 271s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 271s COMMENT 271s create table public.sl_registry ( 271s reg_key text primary key, 271s reg_int4 int4, 271s reg_text text, 271s reg_timestamp timestamptz 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s comment on table public.sl_registry is 'Stores miscellaneous runtime data'; 271s COMMENT 271s comment on column public.sl_registry.reg_key is 'Unique key of the runtime option'; 271s COMMENT 271s comment on column public.sl_registry.reg_int4 is 'Option value if type int4'; 271s COMMENT 271s comment on column public.sl_registry.reg_text is 'Option value if type text'; 271s COMMENT 271s comment on column public.sl_registry.reg_timestamp is 'Option value if type timestamp'; 271s COMMENT 271s create table public.sl_apply_stats ( 271s as_origin int4, 271s as_num_insert int8, 271s as_num_update int8, 271s as_num_delete int8, 271s as_num_truncate int8, 271s as_num_script int8, 271s as_num_total int8, 271s as_duration interval, 271s as_apply_first timestamptz, 271s as_apply_last timestamptz, 271s as_cache_prepare int8, 271s as_cache_hit int8, 271s as_cache_evict int8, 271s as_cache_prepare_max int8 271s ) WITHOUT OIDS; 271s CREATE TABLE 271s create index sl_apply_stats_idx1 on public.sl_apply_stats 271s (as_origin); 271s CREATE INDEX 271s comment on table public.sl_apply_stats is 'Local SYNC apply statistics (running totals)'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_origin is 'Origin of the SYNCs'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_num_insert is 'Number of INSERT operations performed'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_num_update is 'Number of UPDATE operations performed'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_num_delete is 'Number of DELETE operations performed'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_num_truncate is 'Number of TRUNCATE operations performed'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_num_script is 'Number of DDL operations performed'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_num_total is 'Total number of operations'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_duration is 'Processing time'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_apply_first is 'Timestamp of first recorded SYNC'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_apply_last is 'Timestamp of most recent recorded SYNC'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_cache_evict is 'Number of apply query cache evict operations'; 271s COMMENT 271s comment on column public.sl_apply_stats.as_cache_prepare_max is 'Maximum number of apply queries prepared in one SYNC group'; 271s COMMENT 271s create view public.sl_seqlastvalue as 271s select SQ.seq_id, SQ.seq_set, SQ.seq_reloid, 271s S.set_origin as seq_origin, 271s public.sequenceLastValue( 271s "pg_catalog".quote_ident(PGN.nspname) || '.' || 271s "pg_catalog".quote_ident(PGC.relname)) as seq_last_value 271s from public.sl_sequence SQ, public.sl_set S, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where S.set_id = SQ.seq_set 271s and PGC.oid = SQ.seq_reloid and PGN.oid = PGC.relnamespace; 271s CREATE VIEW 271s create view public.sl_failover_targets as 271s select set_id, 271s set_origin as set_origin, 271s sub1.sub_receiver as backup_id 271s FROM 271s public.sl_subscribe sub1 271s ,public.sl_set set1 271s where 271s sub1.sub_set=set_id 271s and sub1.sub_forward=true 271s --exclude candidates where the set_origin 271s --has a path a node but the failover 271s --candidate has no path to that node 271s and sub1.sub_receiver not in 271s (select p1.pa_client from 271s public.sl_path p1 271s left outer join public.sl_path p2 on 271s (p2.pa_client=p1.pa_client 271s and p2.pa_server=sub1.sub_receiver) 271s where p2.pa_client is null 271s and p1.pa_server=set_origin 271s and p1.pa_client<>sub1.sub_receiver 271s ) 271s and sub1.sub_provider=set_origin 271s --exclude any subscribers that are not 271s --direct subscribers of all sets on the 271s --origin 271s and sub1.sub_receiver not in 271s (select direct_recv.sub_receiver 271s from 271s 271s (--all direct receivers of the first set 271s select subs2.sub_receiver 271s from public.sl_subscribe subs2 271s where subs2.sub_provider=set1.set_origin 271s and subs2.sub_set=set1.set_id) as 271s direct_recv 271s inner join 271s (--all other sets from the origin 271s select set_id from public.sl_set set2 271s where set2.set_origin=set1.set_origin 271s and set2.set_id<>sub1.sub_set) 271s as othersets on(true) 271s left outer join public.sl_subscribe subs3 271s on(subs3.sub_set=othersets.set_id 271s and subs3.sub_forward=true 271s and subs3.sub_provider=set1.set_origin 271s and direct_recv.sub_receiver=subs3.sub_receiver) 271s where subs3.sub_receiver is null 271s ); 271s CREATE VIEW 271s create sequence public.sl_local_node_id 271s MINVALUE -1; 271s CREATE SEQUENCE 271s SELECT setval('public.sl_local_node_id', -1); 271s setval 271s -------- 271s -1 271s (1 row) 271s 271s comment on sequence public.sl_local_node_id is 'The local node ID is initialized to -1, meaning that this node is not initialized yet.'; 271s COMMENT 271s create sequence public.sl_event_seq; 271s CREATE SEQUENCE 271s comment on sequence public.sl_event_seq is 'The sequence for numbering events originating from this node.'; 271s COMMENT 271s select setval('public.sl_event_seq', 5000000000); 271s setval 271s ------------ 271s 5000000000 271s (1 row) 271s 271s create sequence public.sl_action_seq; 271s CREATE SEQUENCE 271s comment on sequence public.sl_action_seq is 'The sequence to number statements in the transaction logs, so that the replication engines can figure out the "agreeable" order of statements.'; 271s COMMENT 271s create sequence public.sl_log_status 271s MINVALUE 0 MAXVALUE 3; 271s CREATE SEQUENCE 271s SELECT setval('public.sl_log_status', 0); 271s setval 271s -------- 271s 0 271s (1 row) 271s 271s comment on sequence public.sl_log_status is ' 271s Bit 0x01 determines the currently active log table 271s Bit 0x02 tells if the engine needs to read both logs 271s after switching until the old log is clean and truncated. 271s 271s Possible values: 271s 0 sl_log_1 active, sl_log_2 clean 271s 1 sl_log_2 active, sl_log_1 clean 271s 2 sl_log_1 active, sl_log_2 unknown - cleanup 271s 3 sl_log_2 active, sl_log_1 unknown - cleanup 271s 271s This is not yet in use. 271s '; 271s COMMENT 271s create table public.sl_config_lock ( 271s dummy integer 271s ); 271s CREATE TABLE 271s comment on table public.sl_config_lock is 'This table exists solely to prevent overlapping execution of configuration change procedures and the resulting possible deadlocks. 271s '; 271s COMMENT 271s comment on column public.sl_config_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 271s COMMENT 271s create table public.sl_event_lock ( 271s dummy integer 271s ); 271s CREATE TABLE 271s comment on table public.sl_event_lock is 'This table exists solely to prevent multiple connections from concurrently creating new events and perhaps getting them out of order.'; 271s COMMENT 271s comment on column public.sl_event_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 271s COMMENT 271s create table public.sl_archive_counter ( 271s ac_num bigint, 271s ac_timestamp timestamptz 271s ) without oids; 271s CREATE TABLE 271s comment on table public.sl_archive_counter is 'Table used to generate the log shipping archive number. 271s '; 271s COMMENT 271s comment on column public.sl_archive_counter.ac_num is 'Counter of SYNC ID used in log shipping as the archive number'; 271s COMMENT 271s comment on column public.sl_archive_counter.ac_timestamp is 'Time at which the archive log was generated on the subscriber'; 271s COMMENT 271s insert into public.sl_archive_counter (ac_num, ac_timestamp) 271s values (0, 'epoch'::timestamptz); 271s INSERT 0 1 271s create table public.sl_components ( 271s co_actor text not null primary key, 271s co_pid integer not null, 271s co_node integer not null, 271s co_connection_pid integer not null, 271s co_activity text, 271s co_starttime timestamptz not null, 271s co_event bigint, 271s co_eventtype text 271s ) without oids; 271s CREATE TABLE 271s comment on table public.sl_components is 'Table used to monitor what various slon/slonik components are doing'; 271s COMMENT 271s comment on column public.sl_components.co_actor is 'which component am I?'; 271s COMMENT 271s comment on column public.sl_components.co_pid is 'my process/thread PID on node where slon runs'; 271s COMMENT 271s comment on column public.sl_components.co_node is 'which node am I servicing?'; 271s COMMENT 271s comment on column public.sl_components.co_connection_pid is 'PID of database connection being used on database server'; 271s COMMENT 271s comment on column public.sl_components.co_activity is 'activity that I am up to'; 271s COMMENT 271s comment on column public.sl_components.co_starttime is 'when did my activity begin? (timestamp reported as per slon process on server running slon)'; 271s COMMENT 271s comment on column public.sl_components.co_eventtype is 'what kind of event am I processing? (commonly n/a for event loop main threads)'; 271s COMMENT 271s comment on column public.sl_components.co_event is 'which event have I started processing?'; 271s COMMENT 271s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 271s $BODY$ 271s DECLARE 271s c_delim text; 271s BEGIN 271s c_delim = ','; 271s IF (txt_before IS NULL or txt_before='') THEN 271s RETURN txt_new; 271s END IF; 271s RETURN txt_before || c_delim || txt_new; 271s END; 271s $BODY$ 271s LANGUAGE plpgsql; 271s CREATE FUNCTION 271s comment on function public.agg_text_sum(text,text) is 271s 'An accumulator function used by the slony string_agg function to 271s aggregate rows into a string'; 271s COMMENT 271s CREATE AGGREGATE public.string_agg(text) ( 271s SFUNC=public.agg_text_sum, 271s STYPE=text, 271s INITCOND='' 271s ); 271s CREATE AGGREGATE 271s grant usage on schema public to public; 271s GRANT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) 271s returns bigint 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 271s language C 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) is 271s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 271s 271s Create an sl_event entry'; 271s COMMENT 271s create or replace function public.denyAccess () 271s returns trigger 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__denyAccess' 271s language C 271s security definer; 271s CREATE FUNCTION 271s comment on function public.denyAccess () is 271s 'Trigger function to prevent modifications to a table on a subscriber'; 271s COMMENT 271s grant execute on function public.denyAccess () to public; 271s GRANT 271s create or replace function public.lockedSet () 271s returns trigger 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__lockedSet' 271s language C; 271s CREATE FUNCTION 271s comment on function public.lockedSet () is 271s 'Trigger function to prevent modifications to a table before and after a moveSet()'; 271s COMMENT 271s create or replace function public.getLocalNodeId (p_cluster name) returns int4 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getLocalNodeId' 271s language C 271s security definer; 271s CREATE FUNCTION 271s grant execute on function public.getLocalNodeId (p_cluster name) to public; 271s GRANT 271s comment on function public.getLocalNodeId (p_cluster name) is 271s 'Returns the node ID of the node being serviced on the local database'; 271s COMMENT 271s create or replace function public.getModuleVersion () returns text 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getModuleVersion' 271s language C 271s security definer; 271s CREATE FUNCTION 271s grant execute on function public.getModuleVersion () to public; 271s GRANT 271s comment on function public.getModuleVersion () is 271s 'Returns the compiled-in version number of the Slony-I shared object'; 271s COMMENT 271s create or replace function public.resetSession() returns text 271s as '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__resetSession' 271s language C; 271s CREATE FUNCTION 271s create or replace function public.logApply () returns trigger 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApply' 271s language C 271s security definer; 271s CREATE FUNCTION 271s create or replace function public.logApplySetCacheSize (p_size int4) 271s returns int4 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySetCacheSize' 271s language C; 271s CREATE FUNCTION 271s create or replace function public.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval) 271s returns int4 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySaveStats' 271s language C; 271s CREATE FUNCTION 271s create or replace function public.checkmoduleversion () returns text as $$ 271s declare 271s moduleversion text; 271s begin 271s select into moduleversion public.getModuleVersion(); 271s if moduleversion <> '2.2.11' then 271s raise exception 'Slonik version: 2.2.11 != Slony-I version in PG build %', 271s moduleversion; 271s end if; 271s return null; 271s end;$$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.checkmoduleversion () is 271s 'Inline test function that verifies that slonik request for STORE 271s NODE/INIT CLUSTER is being run against a conformant set of 271s schema/functions.'; 271s COMMENT 271s select public.checkmoduleversion(); 271s checkmoduleversion 271s -------------------- 271s 271s (1 row) 271s 271s create or replace function public.decode_tgargs(bytea) returns text[] as 271s '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__slon_decode_tgargs' language C security definer; 271s CREATE FUNCTION 271s comment on function public.decode_tgargs(bytea) is 271s 'Translates the contents of pg_trigger.tgargs to an array of text arguments'; 271s COMMENT 271s grant execute on function public.decode_tgargs(bytea) to public; 271s GRANT 271s create or replace function public.check_namespace_validity () returns boolean as $$ 271s declare 271s c_cluster text; 271s begin 271s c_cluster := 'main'; 271s if c_cluster !~ E'^[[:alpha:]_][[:alnum:]_\$]{0,62}$' then 271s raise exception 'Cluster name % is not a valid SQL symbol!', c_cluster; 271s else 271s raise notice 'checked validity of cluster % namespace - OK!', c_cluster; 271s end if; 271s return 't'; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s select public.check_namespace_validity(); 271s check_namespace_validity 271s -------------------------- 271s t 271s (1 row) 271s 271s drop function public.check_namespace_validity(); 271s DROP FUNCTION 271s create or replace function public.logTrigger () returns trigger 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logTrigger' 271s language C 271s security definer; 271s CREATE FUNCTION 271s comment on function public.logTrigger () is 271s 'This is the trigger that is executed on the origin node that causes 271s updates to be recorded in sl_log_1/sl_log_2.'; 271s COMMENT 271s grant execute on function public.logTrigger () to public; 271s GRANT 271s create or replace function public.terminateNodeConnections (p_failed_node int4) returns int4 271s as $$ 271s declare 271s v_row record; 271s begin 271s for v_row in select nl_nodeid, nl_conncnt, 271s nl_backendpid from public.sl_nodelock 271s where nl_nodeid = p_failed_node for update 271s loop 271s perform public.killBackend(v_row.nl_backendpid, 'TERM'); 271s delete from public.sl_nodelock 271s where nl_nodeid = v_row.nl_nodeid 271s and nl_conncnt = v_row.nl_conncnt; 271s end loop; 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.terminateNodeConnections (p_failed_node int4) is 271s 'terminates all backends that have registered to be from the given node'; 271s COMMENT 271s create or replace function public.killBackend (p_pid int4, p_signame text) returns int4 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__killBackend' 271s language C; 271s CREATE FUNCTION 271s comment on function public.killBackend(p_pid int4, p_signame text) is 271s 'Send a signal to a postgres process. Requires superuser rights'; 271s COMMENT 271s create or replace function public.seqtrack (p_seqid int4, p_seqval int8) returns int8 271s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__seqtrack' 271s strict language C; 271s CREATE FUNCTION 271s comment on function public.seqtrack(p_seqid int4, p_seqval int8) is 271s 'Returns NULL if seqval has not changed since the last call for seqid'; 271s COMMENT 271s create or replace function public.slon_quote_brute(p_tab_fqname text) returns text 271s as $$ 271s declare 271s v_fqname text default ''; 271s begin 271s v_fqname := '"' || replace(p_tab_fqname,'"','""') || '"'; 271s return v_fqname; 271s end; 271s $$ language plpgsql immutable; 271s CREATE FUNCTION 271s comment on function public.slon_quote_brute(p_tab_fqname text) is 271s 'Brutally quote the given text'; 271s COMMENT 271s create or replace function public.slon_quote_input(p_tab_fqname text) returns text as $$ 271s declare 271s v_nsp_name text; 271s v_tab_name text; 271s v_i integer; 271s v_l integer; 271s v_pq2 integer; 271s begin 271s v_l := length(p_tab_fqname); 271s 271s -- Let us search for the dot 271s if p_tab_fqname like '"%' then 271s -- if the first part of the ident starts with a double quote, search 271s -- for the closing double quote, skipping over double double quotes. 271s v_i := 2; 271s while v_i <= v_l loop 271s if substr(p_tab_fqname, v_i, 1) != '"' then 271s v_i := v_i + 1; 271s else 271s v_i := v_i + 1; 271s if substr(p_tab_fqname, v_i, 1) != '"' then 271s exit; 271s end if; 271s v_i := v_i + 1; 271s end if; 271s end loop; 271s else 271s -- first part of ident is not quoted, search for the dot directly 271s v_i := 1; 271s while v_i <= v_l loop 271s if substr(p_tab_fqname, v_i, 1) = '.' then 271s exit; 271s end if; 271s v_i := v_i + 1; 271s end loop; 271s end if; 271s 271s -- v_i now points at the dot or behind the string. 271s 271s if substr(p_tab_fqname, v_i, 1) = '.' then 271s -- There is a dot now, so split the ident into its namespace 271s -- and objname parts and make sure each is quoted 271s v_nsp_name := substr(p_tab_fqname, 1, v_i - 1); 271s v_tab_name := substr(p_tab_fqname, v_i + 1); 271s if v_nsp_name not like '"%' then 271s v_nsp_name := '"' || replace(v_nsp_name, '"', '""') || 271s '"'; 271s end if; 271s if v_tab_name not like '"%' then 271s v_tab_name := '"' || replace(v_tab_name, '"', '""') || 271s '"'; 271s end if; 271s 271s return v_nsp_name || '.' || v_tab_name; 271s else 271s -- No dot ... must be just an ident without schema 271s if p_tab_fqname like '"%' then 271s return p_tab_fqname; 271s else 271s return '"' || replace(p_tab_fqname, '"', '""') || '"'; 271s end if; 271s end if; 271s 271s end;$$ language plpgsql immutable; 271s CREATE FUNCTION 271s comment on function public.slon_quote_input(p_text text) is 271s 'quote all words that aren''t quoted yet'; 271s COMMENT 271s create or replace function public.slonyVersionMajor() 271s returns int4 271s as $$ 271s begin 271s return 2; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.slonyVersionMajor () is 271s 'Returns the major version number of the slony schema'; 271s COMMENT 271s create or replace function public.slonyVersionMinor() 271s returns int4 271s as $$ 271s begin 271s return 2; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.slonyVersionMinor () is 271s 'Returns the minor version number of the slony schema'; 271s COMMENT 271s create or replace function public.slonyVersionPatchlevel() 271s returns int4 271s as $$ 271s begin 271s return 11; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.slonyVersionPatchlevel () is 271s 'Returns the version patch level of the slony schema'; 271s COMMENT 271s create or replace function public.slonyVersion() 271s returns text 271s as $$ 271s begin 271s return public.slonyVersionMajor()::text || '.' || 271s public.slonyVersionMinor()::text || '.' || 271s public.slonyVersionPatchlevel()::text ; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.slonyVersion() is 271s 'Returns the version number of the slony schema'; 271s COMMENT 271s create or replace function public.registry_set_int4(p_key text, p_value int4) 271s returns int4 as $$ 271s BEGIN 271s if p_value is null then 271s delete from public.sl_registry 271s where reg_key = p_key; 271s else 271s lock table public.sl_registry; 271s update public.sl_registry 271s set reg_int4 = p_value 271s where reg_key = p_key; 271s if not found then 271s insert into public.sl_registry (reg_key, reg_int4) 271s values (p_key, p_value); 271s end if; 271s end if; 271s return p_value; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.registry_set_int4(p_key text, p_value int4) is 271s 'registry_set_int4(key, value) 271s 271s Set or delete a registry value'; 271s COMMENT 271s create or replace function public.registry_get_int4(p_key text, p_default int4) 271s returns int4 as $$ 271s DECLARE 271s v_value int4; 271s BEGIN 271s select reg_int4 into v_value from public.sl_registry 271s where reg_key = p_key; 271s if not found then 271s v_value = p_default; 271s if p_default notnull then 271s perform public.registry_set_int4(p_key, p_default); 271s end if; 271s else 271s if v_value is null then 271s raise exception 'Slony-I: registry key % is not an int4 value', 271s p_key; 271s end if; 271s end if; 271s return v_value; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.registry_get_int4(p_key text, p_default int4) is 271s 'registry_get_int4(key, value) 271s 271s Get a registry value. If not present, set and return the default.'; 271s COMMENT 271s create or replace function public.registry_set_text(p_key text, p_value text) 271s returns text as $$ 271s BEGIN 271s if p_value is null then 271s delete from public.sl_registry 271s where reg_key = p_key; 271s else 271s lock table public.sl_registry; 271s update public.sl_registry 271s set reg_text = p_value 271s where reg_key = p_key; 271s if not found then 271s insert into public.sl_registry (reg_key, reg_text) 271s values (p_key, p_value); 271s end if; 271s end if; 271s return p_value; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.registry_set_text(text, text) is 271s 'registry_set_text(key, value) 271s 271s Set or delete a registry value'; 271s COMMENT 271s create or replace function public.registry_get_text(p_key text, p_default text) 271s returns text as $$ 271s DECLARE 271s v_value text; 271s BEGIN 271s select reg_text into v_value from public.sl_registry 271s where reg_key = p_key; 271s if not found then 271s v_value = p_default; 271s if p_default notnull then 271s perform public.registry_set_text(p_key, p_default); 271s end if; 271s else 271s if v_value is null then 271s raise exception 'Slony-I: registry key % is not a text value', 271s p_key; 271s end if; 271s end if; 271s return v_value; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.registry_get_text(p_key text, p_default text) is 271s 'registry_get_text(key, value) 271s 271s Get a registry value. If not present, set and return the default.'; 271s COMMENT 271s create or replace function public.registry_set_timestamp(p_key text, p_value timestamptz) 271s returns timestamp as $$ 271s BEGIN 271s if p_value is null then 271s delete from public.sl_registry 271s where reg_key = p_key; 271s else 271s lock table public.sl_registry; 271s update public.sl_registry 271s set reg_timestamp = p_value 271s where reg_key = p_key; 271s if not found then 271s insert into public.sl_registry (reg_key, reg_timestamp) 271s values (p_key, p_value); 271s end if; 271s end if; 271s return p_value; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.registry_set_timestamp(p_key text, p_value timestamptz) is 271s 'registry_set_timestamp(key, value) 271s 271s Set or delete a registry value'; 271s COMMENT 271s create or replace function public.registry_get_timestamp(p_key text, p_default timestamptz) 271s returns timestamp as $$ 271s DECLARE 271s v_value timestamp; 271s BEGIN 271s select reg_timestamp into v_value from public.sl_registry 271s where reg_key = p_key; 271s if not found then 271s v_value = p_default; 271s if p_default notnull then 271s perform public.registry_set_timestamp(p_key, p_default); 271s end if; 271s else 271s if v_value is null then 271s raise exception 'Slony-I: registry key % is not an timestamp value', 271s p_key; 271s end if; 271s end if; 271s return v_value; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.registry_get_timestamp(p_key text, p_default timestamptz) is 271s 'registry_get_timestamp(key, value) 271s 271s Get a registry value. If not present, set and return the default.'; 271s COMMENT 271s create or replace function public.cleanupNodelock () 271s returns int4 271s as $$ 271s declare 271s v_row record; 271s begin 271s for v_row in select nl_nodeid, nl_conncnt, nl_backendpid 271s from public.sl_nodelock 271s for update 271s loop 271s if public.killBackend(v_row.nl_backendpid, 'NULL') < 0 then 271s raise notice 'Slony-I: cleanup stale sl_nodelock entry for pid=%', 271s v_row.nl_backendpid; 271s delete from public.sl_nodelock where 271s nl_nodeid = v_row.nl_nodeid and 271s nl_conncnt = v_row.nl_conncnt; 271s end if; 271s end loop; 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.cleanupNodelock() is 271s 'Clean up stale entries when restarting slon'; 271s COMMENT 271s create or replace function public.registerNodeConnection (p_nodeid int4) 271s returns int4 271s as $$ 271s begin 271s insert into public.sl_nodelock 271s (nl_nodeid, nl_backendpid) 271s values 271s (p_nodeid, pg_backend_pid()); 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.registerNodeConnection (p_nodeid int4) is 271s 'Register (uniquely) the node connection so that only one slon can service the node'; 271s COMMENT 271s create or replace function public.initializeLocalNode (p_local_node_id int4, p_comment text) 271s returns int4 271s as $$ 271s declare 271s v_old_node_id int4; 271s v_first_log_no int4; 271s v_event_seq int8; 271s begin 271s -- ---- 271s -- Make sure this node is uninitialized or got reset 271s -- ---- 271s select last_value::int4 into v_old_node_id from public.sl_local_node_id; 271s if v_old_node_id != -1 then 271s raise exception 'Slony-I: This node is already initialized'; 271s end if; 271s 271s -- ---- 271s -- Set sl_local_node_id to the requested value and add our 271s -- own system to sl_node. 271s -- ---- 271s perform setval('public.sl_local_node_id', p_local_node_id); 271s perform public.storeNode_int (p_local_node_id, p_comment); 271s 271s if (pg_catalog.current_setting('max_identifier_length')::integer - pg_catalog.length('public')) < 5 then 271s raise notice 'Slony-I: Cluster name length [%] versus system max_identifier_length [%] ', pg_catalog.length('public'), pg_catalog.current_setting('max_identifier_length'); 271s raise notice 'leaves narrow/no room for some Slony-I-generated objects (such as indexes).'; 271s raise notice 'You may run into problems later!'; 271s end if; 271s 271s -- 271s -- Put the apply trigger onto sl_log_1 and sl_log_2 271s -- 271s create trigger apply_trigger 271s before INSERT on public.sl_log_1 271s for each row execute procedure public.logApply('_main'); 271s alter table public.sl_log_1 271s enable replica trigger apply_trigger; 271s create trigger apply_trigger 271s before INSERT on public.sl_log_2 271s for each row execute procedure public.logApply('_main'); 271s alter table public.sl_log_2 271s enable replica trigger apply_trigger; 271s 271s return p_local_node_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.initializeLocalNode (p_local_node_id int4, p_comment text) is 271s 'no_id - Node ID # 271s no_comment - Human-oriented comment 271s 271s Initializes the new node, no_id'; 271s COMMENT 271s create or replace function public.storeNode (p_no_id int4, p_no_comment text) 271s returns bigint 271s as $$ 271s begin 271s perform public.storeNode_int (p_no_id, p_no_comment); 271s return public.createEvent('_main', 'STORE_NODE', 271s p_no_id::text, p_no_comment::text); 271s end; 271s $$ language plpgsql 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.storeNode(p_no_id int4, p_no_comment text) is 271s 'no_id - Node ID # 271s no_comment - Human-oriented comment 271s 271s Generate the STORE_NODE event for node no_id'; 271s COMMENT 271s create or replace function public.storeNode_int (p_no_id int4, p_no_comment text) 271s returns int4 271s as $$ 271s declare 271s v_old_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check if the node exists 271s -- ---- 271s select * into v_old_row 271s from public.sl_node 271s where no_id = p_no_id 271s for update; 271s if found then 271s -- ---- 271s -- Node exists, update the existing row. 271s -- ---- 271s update public.sl_node 271s set no_comment = p_no_comment 271s where no_id = p_no_id; 271s else 271s -- ---- 271s -- New node, insert the sl_node row 271s -- ---- 271s insert into public.sl_node 271s (no_id, no_active, no_comment,no_failed) values 271s (p_no_id, 'f', p_no_comment,false); 271s end if; 271s 271s return p_no_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.storeNode_int(p_no_id int4, p_no_comment text) is 271s 'no_id - Node ID # 271s no_comment - Human-oriented comment 271s 271s Internal function to process the STORE_NODE event for node no_id'; 271s COMMENT 271s create or replace function public.enableNode (p_no_id int4) 271s returns bigint 271s as $$ 271s declare 271s v_local_node_id int4; 271s v_node_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that we are the node to activate and that we are 271s -- currently disabled. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select * into v_node_row 271s from public.sl_node 271s where no_id = p_no_id 271s for update; 271s if not found then 271s raise exception 'Slony-I: node % not found', p_no_id; 271s end if; 271s if v_node_row.no_active then 271s raise exception 'Slony-I: node % is already active', p_no_id; 271s end if; 271s 271s -- ---- 271s -- Activate this node and generate the ENABLE_NODE event 271s -- ---- 271s perform public.enableNode_int (p_no_id); 271s return public.createEvent('_main', 'ENABLE_NODE', 271s p_no_id::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.enableNode(p_no_id int4) is 271s 'no_id - Node ID # 271s 271s Generate the ENABLE_NODE event for node no_id'; 271s COMMENT 271s create or replace function public.enableNode_int (p_no_id int4) 271s returns int4 271s as $$ 271s declare 271s v_local_node_id int4; 271s v_node_row record; 271s v_sub_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that the node is inactive 271s -- ---- 271s select * into v_node_row 271s from public.sl_node 271s where no_id = p_no_id 271s for update; 271s if not found then 271s raise exception 'Slony-I: node % not found', p_no_id; 271s end if; 271s if v_node_row.no_active then 271s return p_no_id; 271s end if; 271s 271s -- ---- 271s -- Activate the node and generate sl_confirm status rows for it. 271s -- ---- 271s update public.sl_node 271s set no_active = 't' 271s where no_id = p_no_id; 271s insert into public.sl_confirm 271s (con_origin, con_received, con_seqno) 271s select no_id, p_no_id, 0 from public.sl_node 271s where no_id != p_no_id 271s and no_active; 271s insert into public.sl_confirm 271s (con_origin, con_received, con_seqno) 271s select p_no_id, no_id, 0 from public.sl_node 271s where no_id != p_no_id 271s and no_active; 271s 271s -- ---- 271s -- Generate ENABLE_SUBSCRIPTION events for all sets that 271s -- origin here and are subscribed by the just enabled node. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s for v_sub_row in select SUB.sub_set, SUB.sub_provider from 271s public.sl_set S, 271s public.sl_subscribe SUB 271s where S.set_origin = v_local_node_id 271s and S.set_id = SUB.sub_set 271s and SUB.sub_receiver = p_no_id 271s for update of S 271s loop 271s perform public.enableSubscription (v_sub_row.sub_set, 271s v_sub_row.sub_provider, p_no_id); 271s end loop; 271s 271s return p_no_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.enableNode_int(p_no_id int4) is 271s 'no_id - Node ID # 271s 271s Internal function to process the ENABLE_NODE event for node no_id'; 271s COMMENT 271s create or replace function public.disableNode (p_no_id int4) 271s returns bigint 271s as $$ 271s begin 271s -- **** TODO **** 271s raise exception 'Slony-I: disableNode() not implemented'; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.disableNode(p_no_id int4) is 271s 'generate DISABLE_NODE event for node no_id'; 271s COMMENT 271s create or replace function public.disableNode_int (p_no_id int4) 271s returns int4 271s as $$ 271s begin 271s -- **** TODO **** 271s raise exception 'Slony-I: disableNode_int() not implemented'; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.disableNode(p_no_id int4) is 271s 'process DISABLE_NODE event for node no_id 271s 271s NOTE: This is not yet implemented!'; 271s COMMENT 271s create or replace function public.dropNode (p_no_ids int4[]) 271s returns bigint 271s as $$ 271s declare 271s v_node_row record; 271s v_idx integer; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that this got called on a different node 271s -- ---- 271s if public.getLocalNodeId('_main') = ANY (p_no_ids) then 271s raise exception 'Slony-I: DROP_NODE cannot initiate on the dropped node'; 271s end if; 271s 271s -- 271s -- if any of the deleted nodes are receivers we drop the sl_subscribe line 271s -- 271s delete from public.sl_subscribe where sub_receiver = ANY (p_no_ids); 271s 271s v_idx:=1; 271s LOOP 271s EXIT WHEN v_idx>array_upper(p_no_ids,1) ; 271s select * into v_node_row from public.sl_node 271s where no_id = p_no_ids[v_idx] 271s for update; 271s if not found then 271s raise exception 'Slony-I: unknown node ID % %', p_no_ids[v_idx],v_idx; 271s end if; 271s -- ---- 271s -- Make sure we do not break other nodes subscriptions with this 271s -- ---- 271s if exists (select true from public.sl_subscribe 271s where sub_provider = p_no_ids[v_idx]) 271s then 271s raise exception 'Slony-I: Node % is still configured as a data provider', 271s p_no_ids[v_idx]; 271s end if; 271s 271s -- ---- 271s -- Make sure no set originates there any more 271s -- ---- 271s if exists (select true from public.sl_set 271s where set_origin = p_no_ids[v_idx]) 271s then 271s raise exception 'Slony-I: Node % is still origin of one or more sets', 271s p_no_ids[v_idx]; 271s end if; 271s 271s -- ---- 271s -- Call the internal drop functionality and generate the event 271s -- ---- 271s perform public.dropNode_int(p_no_ids[v_idx]); 271s v_idx:=v_idx+1; 271s END LOOP; 271s return public.createEvent('_main', 'DROP_NODE', 271s array_to_string(p_no_ids,',')); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropNode(p_no_ids int4[]) is 271s 'generate DROP_NODE event to drop node node_id from replication'; 271s COMMENT 271s create or replace function public.dropNode_int (p_no_id int4) 271s returns int4 271s as $$ 271s declare 271s v_tab_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- If the dropped node is a remote node, clean the configuration 271s -- from all traces for it. 271s -- ---- 271s if p_no_id <> public.getLocalNodeId('_main') then 271s delete from public.sl_subscribe 271s where sub_receiver = p_no_id; 271s delete from public.sl_listen 271s where li_origin = p_no_id 271s or li_provider = p_no_id 271s or li_receiver = p_no_id; 271s delete from public.sl_path 271s where pa_server = p_no_id 271s or pa_client = p_no_id; 271s delete from public.sl_confirm 271s where con_origin = p_no_id 271s or con_received = p_no_id; 271s delete from public.sl_event 271s where ev_origin = p_no_id; 271s delete from public.sl_node 271s where no_id = p_no_id; 271s 271s return p_no_id; 271s end if; 271s 271s -- ---- 271s -- This is us ... deactivate the node for now, the daemon 271s -- will call uninstallNode() in a separate transaction. 271s -- ---- 271s update public.sl_node 271s set no_active = false 271s where no_id = p_no_id; 271s 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s return p_no_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropNode_int(p_no_id int4) is 271s 'internal function to process DROP_NODE event to drop node node_id from replication'; 271s COMMENT 271s create or replace function public.preFailover(p_failed_node int4,p_is_candidate boolean) 271s returns int4 271s as $$ 271s declare 271s v_row record; 271s v_row2 record; 271s v_n int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- All consistency checks first 271s 271s if p_is_candidate then 271s -- ---- 271s -- Check all sets originating on the failed node 271s -- ---- 271s for v_row in select set_id 271s from public.sl_set 271s where set_origin = p_failed_node 271s loop 271s -- ---- 271s -- Check that the backup node is subscribed to all sets 271s -- that originate on the failed node 271s -- ---- 271s select into v_row2 sub_forward, sub_active 271s from public.sl_subscribe 271s where sub_set = v_row.set_id 271s and sub_receiver = public.getLocalNodeId('_main'); 271s if not found then 271s raise exception 'Slony-I: cannot failover - node % is not subscribed to set %', 271s public.getLocalNodeId('_main'), v_row.set_id; 271s end if; 271s 271s -- ---- 271s -- Check that the subscription is active 271s -- ---- 271s if not v_row2.sub_active then 271s raise exception 'Slony-I: cannot failover - subscription for set % is not active', 271s v_row.set_id; 271s end if; 271s 271s -- ---- 271s -- If there are other subscribers, the backup node needs to 271s -- be a forwarder too. 271s -- ---- 271s select into v_n count(*) 271s from public.sl_subscribe 271s where sub_set = v_row.set_id 271s and sub_receiver <> public.getLocalNodeId('_main'); 271s if v_n > 0 and not v_row2.sub_forward then 271s raise exception 'Slony-I: cannot failover - node % is not a forwarder of set %', 271s public.getLocalNodeId('_main'), v_row.set_id; 271s end if; 271s end loop; 271s end if; 271s 271s -- ---- 271s -- Terminate all connections of the failed node the hard way 271s -- ---- 271s perform public.terminateNodeConnections(p_failed_node); 271s 271s update public.sl_path set pa_conninfo='' WHERE 271s pa_server=p_failed_node; 271s notify "_main_Restart"; 271s -- ---- 271s -- That is it - so far. 271s -- ---- 271s return p_failed_node; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.preFailover(p_failed_node int4,is_failover_candidate boolean) is 271s 'Prepare for a failover. This function is called on all candidate nodes. 271s It blanks the paths to the failed node 271s and then restart of all node daemons.'; 271s COMMENT 271s create or replace function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) 271s returns int4 271s as $$ 271s declare 271s v_row record; 271s v_row2 record; 271s v_failed boolean; 271s v_restart_required boolean; 271s begin 271s 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s v_restart_required:=false; 271s -- 271s -- any nodes other than the backup receiving 271s -- ANY subscription from a failed node 271s -- will now get that data from the backup node. 271s update public.sl_subscribe set 271s sub_provider=p_backup_node 271s where sub_provider=p_failed_node 271s and sub_receiver<>p_backup_node 271s and sub_receiver <> ALL (p_failed_nodes); 271s if found then 271s v_restart_required:=true; 271s end if; 271s -- 271s -- if this node is receiving a subscription from the backup node 271s -- with a failed node as the provider we need to fix this. 271s update public.sl_subscribe set 271s sub_provider=p_backup_node 271s from public.sl_set 271s where set_id = sub_set 271s and set_origin=p_failed_node 271s and sub_provider = ANY(p_failed_nodes) 271s and sub_receiver=public.getLocalNodeId('_main'); 271s 271s -- ---- 271s -- Terminate all connections of the failed node the hard way 271s -- ---- 271s perform public.terminateNodeConnections(p_failed_node); 271s 271s -- Clear out the paths for the failed node. 271s -- This ensures that *this* node won't be pulling data from 271s -- the failed node even if it *does* become accessible 271s 271s update public.sl_path set pa_NOTICE: checked validity of cluster main namespace - OK! 271s NOTICE: function public.clonenodeprepare(int4,int4,text) does not exist, skipping 271s NOTICE: function public.ddlcapture(text,text) does not exist, skipping 271s NOTICE: function public.ddlscript_complete(int4,text,int4) does not exist, skipping 271s NOTICE: function public.ddlscript_complete_int(int4,int4) does not exist, skipping 271s NOTICE: function public.subscribeset_int(int4,int4,int4,bool,bool) does not exist, skipping 271s NOTICE: function public.unsubscribeset(int4,int4,pg_catalog.bool) does not exist, skipping 271s NOTICE: function public.updaterelname(int4,int4) does not exist, skipping 271s NOTICE: function public.updatereloid(int4,int4) does not exist, skipping 271s NOTICE: function public.reshapesubscription(int4,int4,int4) does not exist, skipping 271s conninfo='' WHERE 271s pa_server=p_failed_node 271s and pa_conninfo<>''; 271s 271s if found then 271s v_restart_required:=true; 271s end if; 271s 271s v_failed := exists (select 1 from public.sl_node 271s where no_failed=true and no_id=p_failed_node); 271s 271s if not v_failed then 271s 271s update public.sl_node set no_failed=true where no_id = ANY (p_failed_nodes) 271s and no_failed=false; 271s if found then 271s v_restart_required:=true; 271s end if; 271s end if; 271s 271s if v_restart_required then 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s -- ---- 271s -- Make sure the node daemon will restart 271s -- ---- 271s notify "_main_Restart"; 271s end if; 271s 271s 271s -- ---- 271s -- That is it - so far. 271s -- ---- 271s return p_failed_node; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) is 271s 'Initiate failover from failed_node to backup_node. This function must be called on all nodes, 271s and then waited for the restart of all node daemons.'; 271s COMMENT 271s create or replace function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8, p_failed_nodes integer[]) 271s returns bigint 271s as $$ 271s declare 271s v_row record; 271s v_new_event bigint; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s select * into v_row 271s from public.sl_event 271s where ev_origin = p_failed_node 271s and ev_seqno = p_ev_seqno; 271s if not found then 271s raise exception 'Slony-I: event %,% not found', 271s p_failed_node, p_ev_seqno; 271s end if; 271s 271s update public.sl_node set no_failed=true where no_id = ANY 271s (p_failed_nodes) and no_failed=false; 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s -- ---- 271s -- Make sure the node daemon will restart 271s -- ---- 271s raise notice 'calling restart node %',p_failed_node; 271s 271s notify "_main_Restart"; 271s 271s select public.createEvent('_main','FAILOVER_NODE', 271s p_failed_node::text,p_ev_seqno::text, 271s array_to_string(p_failed_nodes,',')) 271s into v_new_event; 271s 271s 271s return v_new_event; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8,p_failed_nodes integer[] ) is 271s 'FUNCTION failedNode2 (failed_node, backup_node, set_id, ev_seqno, ev_seqfake,p_failed_nodes) 271s 271s On the node that has the highest sequence number of the failed node, 271s fake the FAILOVER_SET event.'; 271s COMMENT 271s create or replace function public.failedNode3 (p_failed_node int4, p_backup_node int4,p_seq_no bigint) 271s returns int4 271s as $$ 271s declare 271s 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s perform public.failoverSet_int(p_failed_node, 271s p_backup_node,p_seq_no); 271s 271s notify "_main_Restart"; 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s create or replace function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_last_seqno bigint) 271s returns int4 271s as $$ 271s declare 271s v_row record; 271s v_last_sync int8; 271s v_set int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s SELECT max(ev_seqno) into v_last_sync FROM public.sl_event where 271s ev_origin=p_failed_node; 271s if v_last_sync > p_last_seqno then 271s -- this node is ahead of the last sequence number from the 271s -- failed node that the backup node has. 271s -- this node must unsubscribe from all sets from the origin. 271s for v_set in select set_id from public.sl_set where 271s set_origin=p_failed_node 271s loop 271s raise warning 'Slony is dropping the subscription of set % found sync %s bigger than %s ' 271s , v_set, v_last_sync::text, p_last_seqno::text; 271s perform public.unsubscribeSet(v_set, 271s public.getLocalNodeId('_main'), 271s true); 271s end loop; 271s delete from public.sl_event where ev_origin=p_failed_node 271s and ev_seqno > p_last_seqno; 271s end if; 271s -- ---- 271s -- Change the origin of the set now to the backup node. 271s -- On the backup node this includes changing all the 271s -- trigger and protection stuff 271s for v_set in select set_id from public.sl_set where 271s set_origin=p_failed_node 271s loop 271s -- ---- 271s if p_backup_node = public.getLocalNodeId('_main') then 271s delete from public.sl_setsync 271s where ssy_setid = v_set; 271s delete from public.sl_subscribe 271s where sub_set = v_set 271s and sub_receiver = p_backup_node; 271s update public.sl_set 271s set set_origin = p_backup_node 271s where set_id = v_set; 271s update public.sl_subscribe 271s set sub_provider=p_backup_node 271s FROM public.sl_node receive_node 271s where sub_set = v_set 271s and sub_provider=p_failed_node 271s and sub_receiver=receive_node.no_id 271s and receive_node.no_failed=false; 271s 271s for v_row in select * from public.sl_table 271s where tab_set = v_set 271s order by tab_id 271s loop 271s perform public.alterTableConfigureTriggers(v_row.tab_id); 271s end loop; 271s else 271s raise notice 'deleting from sl_subscribe all rows with receiver %', 271s p_backup_node; 271s 271s delete from public.sl_subscribe 271s where sub_set = v_set 271s and sub_receiver = p_backup_node; 271s 271s update public.sl_subscribe 271s set sub_provider=p_backup_node 271s FROM public.sl_node receive_node 271s where sub_set = v_set 271s and sub_provider=p_failed_node 271s and sub_provider=p_failed_node 271s and sub_receiver=receive_node.no_id 271s and receive_node.no_failed=false; 271s update public.sl_set 271s set set_origin = p_backup_node 271s where set_id = v_set; 271s -- ---- 271s -- If we are a subscriber of the set ourself, change our 271s -- setsync status to reflect the new set origin. 271s -- ---- 271s if exists (select true from public.sl_subscribe 271s where sub_set = v_set 271s and sub_receiver = public.getLocalNodeId( 271s '_main')) 271s then 271s delete from public.sl_setsync 271s where ssy_setid = v_set; 271s 271s select coalesce(max(ev_seqno), 0) into v_last_sync 271s from public.sl_event 271s where ev_origin = p_backup_node 271s and ev_type = 'SYNC'; 271s if v_last_sync > 0 then 271s insert into public.sl_setsync 271s (ssy_setid, ssy_origin, ssy_seqno, 271s ssy_snapshot, ssy_action_list) 271s select v_set, p_backup_node, v_last_sync, 271s ev_snapshot, NULL 271s from public.sl_event 271s where ev_origin = p_backup_node 271s and ev_seqno = v_last_sync; 271s else 271s insert into public.sl_setsync 271s (ssy_setid, ssy_origin, ssy_seqno, 271s ssy_snapshot, ssy_action_list) 271s values (v_set, p_backup_node, '0', 271s '1:1:', NULL); 271s end if; 271s end if; 271s end if; 271s end loop; 271s 271s --If there are any subscriptions with 271s --the failed_node being the provider then 271s --we want to redirect those subscriptions 271s --to come from the backup node. 271s -- 271s -- The backup node should be a valid 271s -- provider for all subscriptions served 271s -- by the failed node. (otherwise it 271s -- wouldn't be a allowable backup node). 271s -- delete from public.sl_subscribe 271s -- where sub_receiver=p_backup_node; 271s 271s update public.sl_subscribe 271s set sub_provider=p_backup_node 271s from public.sl_node 271s where sub_provider=p_failed_node 271s and sl_node.no_id=sub_receiver 271s and sl_node.no_failed=false 271s and sub_receiver<>p_backup_node; 271s 271s update public.sl_subscribe 271s set sub_provider=(select set_origin from 271s public.sl_set where set_id= 271s sub_set) 271s where sub_provider=p_failed_node 271s and sub_receiver=p_backup_node; 271s 271s update public.sl_node 271s set no_active=false WHERE 271s no_id=p_failed_node; 271s 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s 271s return p_failed_node; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_seqno bigint) is 271s 'FUNCTION failoverSet_int (failed_node, backup_node, set_id, wait_seqno) 271s 271s Finish failover for one set.'; 271s COMMENT 271s create or replace function public.uninstallNode () 271s returns int4 271s as $$ 271s declare 271s v_tab_row record; 271s begin 271s raise notice 'Slony-I: Please drop schema "_main"'; 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.uninstallNode() is 271s 'Reset the whole database to standalone by removing the whole 271s replication system.'; 271s COMMENT 271s DROP FUNCTION IF EXISTS public.cloneNodePrepare(int4,int4,text); 271s DROP FUNCTION 271s create or replace function public.cloneNodePrepare (p_no_id int4, p_no_provider int4, p_no_comment text) 271s returns bigint 271s as $$ 271s begin 271s perform public.cloneNodePrepare_int (p_no_id, p_no_provider, p_no_comment); 271s return public.createEvent('_main', 'CLONE_NODE', 271s p_no_id::text, p_no_provider::text, 271s p_no_comment::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.cloneNodePrepare(p_no_id int4, p_no_provider int4, p_no_comment text) is 271s 'Prepare for cloning a node.'; 271s COMMENT 271s create or replace function public.cloneNodePrepare_int (p_no_id int4, p_no_provider int4, p_no_comment text) 271s returns int4 271s as $$ 271s declare 271s v_dummy int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s update public.sl_node set 271s no_active = np.no_active, 271s no_comment = np.no_comment, 271s no_failed = np.no_failed 271s from public.sl_node np 271s where np.no_id = p_no_provider 271s and sl_node.no_id = p_no_id; 271s if not found then 271s insert into public.sl_node 271s (no_id, no_active, no_comment,no_failed) 271s select p_no_id, no_active, p_no_comment, no_failed 271s from public.sl_node 271s where no_id = p_no_provider; 271s end if; 271s 271s insert into public.sl_path 271s (pa_server, pa_client, pa_conninfo, pa_connretry) 271s select pa_server, p_no_id, '', pa_connretry 271s from public.sl_path 271s where pa_client = p_no_provider 271s and (pa_server, p_no_id) not in (select pa_server, pa_client 271s from public.sl_path); 271s 271s insert into public.sl_path 271s (pa_server, pa_client, pa_conninfo, pa_connretry) 271s select p_no_id, pa_client, '', pa_connretry 271s from public.sl_path 271s where pa_server = p_no_provider 271s and (p_no_id, pa_client) not in (select pa_server, pa_client 271s from public.sl_path); 271s 271s insert into public.sl_subscribe 271s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 271s select sub_set, sub_provider, p_no_id, sub_forward, sub_active 271s from public.sl_subscribe 271s where sub_receiver = p_no_provider; 271s 271s insert into public.sl_confirm 271s (con_origin, con_received, con_seqno, con_timestamp) 271s select con_origin, p_no_id, con_seqno, con_timestamp 271s from public.sl_confirm 271s where con_received = p_no_provider; 271s 271s perform public.RebuildListenEntries(); 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.cloneNodePrepare_int(p_no_id int4, p_no_provider int4, p_no_comment text) is 271s 'Internal part of cloneNodePrepare().'; 271s COMMENT 271s create or replace function public.cloneNodeFinish (p_no_id int4, p_no_provider int4) 271s returns int4 271s as $$ 271s declare 271s v_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s perform "pg_catalog".setval('public.sl_local_node_id', p_no_id); 271s perform public.resetSession(); 271s for v_row in select sub_set from public.sl_subscribe 271s where sub_receiver = p_no_id 271s loop 271s perform public.updateReloid(v_row.sub_set, p_no_id); 271s end loop; 271s 271s perform public.RebuildListenEntries(); 271s 271s delete from public.sl_confirm 271s where con_received = p_no_id; 271s insert into public.sl_confirm 271s (con_origin, con_received, con_seqno, con_timestamp) 271s select con_origin, p_no_id, con_seqno, con_timestamp 271s from public.sl_confirm 271s where con_received = p_no_provider; 271s insert into public.sl_confirm 271s (con_origin, con_received, con_seqno, con_timestamp) 271s select p_no_provider, p_no_id, 271s (select max(ev_seqno) from public.sl_event 271s where ev_origin = p_no_provider), current_timestamp; 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.cloneNodeFinish(p_no_id int4, p_no_provider int4) is 271s 'Internal part of cloneNodePrepare().'; 271s COMMENT 271s create or replace function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 271s returns bigint 271s as $$ 271s begin 271s perform public.storePath_int(p_pa_server, p_pa_client, 271s p_pa_conninfo, p_pa_connretry); 271s return public.createEvent('_main', 'STORE_PATH', 271s p_pa_server::text, p_pa_client::text, 271s p_pa_conninfo::text, p_pa_connretry::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 271s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 271s 271s Generate the STORE_PATH event indicating that node pa_client can 271s access node pa_server using DSN pa_conninfo'; 271s COMMENT 271s create or replace function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 271s returns int4 271s as $$ 271s declare 271s v_dummy int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check if the path already exists 271s -- ---- 271s select 1 into v_dummy 271s from public.sl_path 271s where pa_server = p_pa_server 271s and pa_client = p_pa_client 271s for update; 271s if found then 271s -- ---- 271s -- Path exists, update pa_conninfo 271s -- ---- 271s update public.sl_path 271s set pa_conninfo = p_pa_conninfo, 271s pa_connretry = p_pa_connretry 271s where pa_server = p_pa_server 271s and pa_client = p_pa_client; 271s else 271s -- ---- 271s -- New path 271s -- 271s -- In case we receive STORE_PATH events before we know 271s -- about the nodes involved in this, we generate those nodes 271s -- as pending. 271s -- ---- 271s if not exists (select 1 from public.sl_node 271s where no_id = p_pa_server) then 271s perform public.storeNode_int (p_pa_server, ''); 271s end if; 271s if not exists (select 1 from public.sl_node 271s where no_id = p_pa_client) then 271s perform public.storeNode_int (p_pa_client, ''); 271s end if; 271s insert into public.sl_path 271s (pa_server, pa_client, pa_conninfo, pa_connretry) values 271s (p_pa_server, p_pa_client, p_pa_conninfo, p_pa_connretry); 271s end if; 271s 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 271s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 271s 271s Process the STORE_PATH event indicating that node pa_client can 271s access node pa_server using DSN pa_conninfo'; 271s COMMENT 271s create or replace function public.dropPath (p_pa_server int4, p_pa_client int4) 271s returns bigint 271s as $$ 271s declare 271s v_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- There should be no existing subscriptions. Auto unsubscribing 271s -- is considered too dangerous. 271s -- ---- 271s for v_row in select sub_set, sub_provider, sub_receiver 271s from public.sl_subscribe 271s where sub_provider = p_pa_server 271s and sub_receiver = p_pa_client 271s loop 271s raise exception 271s 'Slony-I: Path cannot be dropped, subscription of set % needs it', 271s v_row.sub_set; 271s end loop; 271s 271s -- ---- 271s -- Drop all sl_listen entries that depend on this path 271s -- ---- 271s for v_row in select li_origin, li_provider, li_receiver 271s from public.sl_listen 271s where li_provider = p_pa_server 271s and li_receiver = p_pa_client 271s loop 271s perform public.dropListen( 271s v_row.li_origin, v_row.li_provider, v_row.li_receiver); 271s end loop; 271s 271s -- ---- 271s -- Now drop the path and create the event 271s -- ---- 271s perform public.dropPath_int(p_pa_server, p_pa_client); 271s 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s return public.createEvent ('_main', 'DROP_PATH', 271s p_pa_server::text, p_pa_client::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropPath (p_pa_server int4, p_pa_client int4) is 271s 'Generate DROP_PATH event to drop path from pa_server to pa_client'; 271s COMMENT 271s create or replace function public.dropPath_int (p_pa_server int4, p_pa_client int4) 271s returns int4 271s as $$ 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Remove any dangling sl_listen entries with the server 271s -- as provider and the client as receiver. This must have 271s -- been cleared out before, but obviously was not. 271s -- ---- 271s delete from public.sl_listen 271s where li_provider = p_pa_server 271s and li_receiver = p_pa_client; 271s 271s delete from public.sl_path 271s where pa_server = p_pa_server 271s and pa_client = p_pa_client; 271s 271s if found then 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s return 1; 271s else 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s return 0; 271s end if; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropPath_int (p_pa_server int4, p_pa_client int4) is 271s 'Process DROP_PATH event to drop path from pa_server to pa_client'; 271s COMMENT 271s create or replace function public.storeListen (p_origin int4, p_provider int4, p_receiver int4) 271s returns bigint 271s as $$ 271s begin 271s perform public.storeListen_int (p_origin, p_provider, p_receiver); 271s return public.createEvent ('_main', 'STORE_LISTEN', 271s p_origin::text, p_provider::text, p_receiver::text); 271s end; 271s $$ language plpgsql 271s called on null input; 271s CREATE FUNCTION 271s comment on function public.storeListen(p_origin int4, p_provider int4, p_receiver int4) is 271s 'FUNCTION storeListen (li_origin, li_provider, li_receiver) 271s 271s generate STORE_LISTEN event, indicating that receiver node li_receiver 271s listens to node li_provider in order to get messages coming from node 271s li_origin.'; 271s COMMENT 271s create or replace function public.storeListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 271s returns int4 271s as $$ 271s declare 271s v_exists int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s select 1 into v_exists 271s from public.sl_listen 271s where li_origin = p_li_origin 271s and li_provider = p_li_provider 271s and li_receiver = p_li_receiver; 271s if not found then 271s -- ---- 271s -- In case we receive STORE_LISTEN events before we know 271s -- about the nodes involved in this, we generate those nodes 271s -- as pending. 271s -- ---- 271s if not exists (select 1 from public.sl_node 271s where no_id = p_li_origin) then 271s perform public.storeNode_int (p_li_origin, ''); 271s end if; 271s if not exists (select 1 from public.sl_node 271s where no_id = p_li_provider) then 271s perform public.storeNode_int (p_li_provider, ''); 271s end if; 271s if not exists (select 1 from public.sl_node 271s where no_id = p_li_receiver) then 271s perform public.storeNode_int (p_li_receiver, ''); 271s end if; 271s 271s insert into public.sl_listen 271s (li_origin, li_provider, li_receiver) values 271s (p_li_origin, p_li_provider, p_li_receiver); 271s end if; 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.storeListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 271s 'FUNCTION storeListen_int (li_origin, li_provider, li_receiver) 271s 271s Process STORE_LISTEN event, indicating that receiver node li_receiver 271s listens to node li_provider in order to get messages coming from node 271s li_origin.'; 271s COMMENT 271s create or replace function public.dropListen (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 271s returns bigint 271s as $$ 271s begin 271s perform public.dropListen_int(p_li_origin, 271s p_li_provider, p_li_receiver); 271s 271s return public.createEvent ('_main', 'DROP_LISTEN', 271s p_li_origin::text, p_li_provider::text, p_li_receiver::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropListen(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 271s 'dropListen (li_origin, li_provider, li_receiver) 271s 271s Generate the DROP_LISTEN event.'; 271s COMMENT 271s create or replace function public.dropListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 271s returns int4 271s as $$ 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s delete from public.sl_listen 271s where li_origin = p_li_origin 271s and li_provider = p_li_provider 271s and li_receiver = p_li_receiver; 271s if found then 271s return 1; 271s else 271s return 0; 271s end if; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 271s 'dropListen (li_origin, li_provider, li_receiver) 271s 271s Process the DROP_LISTEN event, deleting the sl_listen entry for 271s the indicated (origin,provider,receiver) combination.'; 271s COMMENT 271s create or replace function public.storeSet (p_set_id int4, p_set_comment text) 271s returns bigint 271s as $$ 271s declare 271s v_local_node_id int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s v_local_node_id := public.getLocalNodeId('_main'); 271s 271s insert into public.sl_set 271s (set_id, set_origin, set_comment) values 271s (p_set_id, v_local_node_id, p_set_comment); 271s 271s return public.createEvent('_main', 'STORE_SET', 271s p_set_id::text, v_local_node_id::text, p_set_comment::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.storeSet(p_set_id int4, p_set_comment text) is 271s 'Generate STORE_SET event for set set_id with human readable comment set_comment'; 271s COMMENT 271s create or replace function public.storeSet_int (p_set_id int4, p_set_origin int4, p_set_comment text) 271s returns int4 271s as $$ 271s declare 271s v_dummy int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s select 1 into v_dummy 271s from public.sl_set 271s where set_id = p_set_id 271s for update; 271s if found then 271s update public.sl_set 271s set set_comment = p_set_comment 271s where set_id = p_set_id; 271s else 271s if not exists (select 1 from public.sl_node 271s where no_id = p_set_origin) then 271s perform public.storeNode_int (p_set_origin, ''); 271s end if; 271s insert into public.sl_set 271s (set_id, set_origin, set_comment) values 271s (p_set_id, p_set_origin, p_set_comment); 271s end if; 271s 271s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 271s perform public.addPartialLogIndices(); 271s 271s return p_set_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.storeSet_int(p_set_id int4, p_set_origin int4, p_set_comment text) is 271s 'storeSet_int (set_id, set_origin, set_comment) 271s 271s Process the STORE_SET event, indicating the new set with given ID, 271s origin node, and human readable comment.'; 271s COMMENT 271s create or replace function public.lockSet (p_set_id int4) 271s returns int4 271s as $$ 271s declare 271s v_local_node_id int4; 271s v_set_row record; 271s v_tab_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that the set exists and that we are the origin 271s -- and that it is not already locked. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select * into v_set_row from public.sl_set 271s where set_id = p_set_id 271s for update; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_set_id; 271s end if; 271s if v_set_row.set_origin <> v_local_node_id then 271s raise exception 'Slony-I: set % does not originate on local node', 271s p_set_id; 271s end if; 271s if v_set_row.set_locked notnull then 271s raise exception 'Slony-I: set % is already locked', p_set_id; 271s end if; 271s 271s -- ---- 271s -- Place the lockedSet trigger on all tables in the set. 271s -- ---- 271s for v_tab_row in select T.tab_id, 271s public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) as tab_fqname 271s from public.sl_table T, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where T.tab_set = p_set_id 271s and T.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid 271s order by tab_id 271s loop 271s execute 'create trigger "_main_lockedset" ' || 271s 'before insert or update or delete on ' || 271s v_tab_row.tab_fqname || ' for each row execute procedure 271s public.lockedSet (''_main'');'; 271s end loop; 271s 271s -- ---- 271s -- Remember our snapshots xmax as for the set locking 271s -- ---- 271s update public.sl_set 271s set set_locked = "pg_catalog".txid_snapshot_xmax("pg_catalog".txid_current_snapshot()) 271s where set_id = p_set_id; 271s 271s return p_set_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.lockSet(p_set_id int4) is 271s 'lockSet(set_id) 271s 271s Add a special trigger to all tables of a set that disables access to 271s it.'; 271s COMMENT 271s create or replace function public.unlockSet (p_set_id int4) 271s returns int4 271s as $$ 271s declare 271s v_local_node_id int4; 271s v_set_row record; 271s v_tab_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that the set exists and that we are the origin 271s -- and that it is not already locked. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select * into v_set_row from public.sl_set 271s where set_id = p_set_id 271s for update; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_set_id; 271s end if; 271s if v_set_row.set_origin <> v_local_node_id then 271s raise exception 'Slony-I: set % does not originate on local node', 271s p_set_id; 271s end if; 271s if v_set_row.set_locked isnull then 271s raise exception 'Slony-I: set % is not locked', p_set_id; 271s end if; 271s 271s -- ---- 271s -- Drop the lockedSet trigger from all tables in the set. 271s -- ---- 271s for v_tab_row in select T.tab_id, 271s public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) as tab_fqname 271s from public.sl_table T, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where T.tab_set = p_set_id 271s and T.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid 271s order by tab_id 271s loop 271s execute 'drop trigger "_main_lockedset" ' || 271s 'on ' || v_tab_row.tab_fqname; 271s end loop; 271s 271s -- ---- 271s -- Clear out the set_locked field 271s -- ---- 271s update public.sl_set 271s set set_locked = NULL 271s where set_id = p_set_id; 271s 271s return p_set_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.unlockSet(p_set_id int4) is 271s 'Remove the special trigger from all tables of a set that disables access to it.'; 271s COMMENT 271s create or replace function public.moveSet (p_set_id int4, p_new_origin int4) 271s returns bigint 271s as $$ 271s declare 271s v_local_node_id int4; 271s v_set_row record; 271s v_sub_row record; 271s v_sync_seqno int8; 271s v_lv_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that the set is locked and that this locking 271s -- happened long enough ago. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select * into v_set_row from public.sl_set 271s where set_id = p_set_id 271s for update; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_set_id; 271s end if; 271s if v_set_row.set_origin <> v_local_node_id then 271s raise exception 'Slony-I: set % does not originate on local node', 271s p_set_id; 271s end if; 271s if v_set_row.set_locked isnull then 271s raise exception 'Slony-I: set % is not locked', p_set_id; 271s end if; 271s if v_set_row.set_locked > "pg_catalog".txid_snapshot_xmin("pg_catalog".txid_current_snapshot()) then 271s raise exception 'Slony-I: cannot move set % yet, transactions < % are still in progress', 271s p_set_id, v_set_row.set_locked; 271s end if; 271s 271s -- ---- 271s -- Unlock the set 271s -- ---- 271s perform public.unlockSet(p_set_id); 271s 271s -- ---- 271s -- Check that the new_origin is an active subscriber of the set 271s -- ---- 271s select * into v_sub_row from public.sl_subscribe 271s where sub_set = p_set_id 271s and sub_receiver = p_new_origin; 271s if not found then 271s raise exception 'Slony-I: set % is not subscribed by node %', 271s p_set_id, p_new_origin; 271s end if; 271s if not v_sub_row.sub_active then 271s raise exception 'Slony-I: subsctiption of node % for set % is inactive', 271s p_new_origin, p_set_id; 271s end if; 271s 271s -- ---- 271s -- Reconfigure everything 271s -- ---- 271s perform public.moveSet_int(p_set_id, v_local_node_id, 271s p_new_origin, 0); 271s 271s perform public.RebuildListenEntries(); 271s 271s -- ---- 271s -- At this time we hold access exclusive locks for every table 271s -- in the set. But we did move the set to the new origin, so the 271s -- createEvent() we are doing now will not record the sequences. 271s -- ---- 271s v_sync_seqno := public.createEvent('_main', 'SYNC'); 271s insert into public.sl_seqlog 271s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 271s select seq_id, v_local_node_id, v_sync_seqno, seq_last_value 271s from public.sl_seqlastvalue 271s where seq_set = p_set_id; 271s 271s -- ---- 271s -- Finally we generate the real event 271s -- ---- 271s return public.createEvent('_main', 'MOVE_SET', 271s p_set_id::text, v_local_node_id::text, p_new_origin::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.moveSet(p_set_id int4, p_new_origin int4) is 271s 'moveSet(set_id, new_origin) 271s 271s Generate MOVE_SET event to request that the origin for set set_id be moved to node new_origin'; 271s COMMENT 271s create or replace function public.moveSet_int (p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) 271s returns int4 271s as $$ 271s declare 271s v_local_node_id int4; 271s v_tab_row record; 271s v_sub_row record; 271s v_sub_node int4; 271s v_sub_last int4; 271s v_sub_next int4; 271s v_last_sync int8; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Get our local node ID 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s 271s -- On the new origin, raise an event - ACCEPT_SET 271s if v_local_node_id = p_new_origin then 271s -- Create a SYNC event as well so that the ACCEPT_SET has 271s -- the same snapshot as the last SYNC generated by the new 271s -- origin. This snapshot will be used by other nodes to 271s -- finalize the setsync status. 271s perform public.createEvent('_main', 'SYNC', NULL); 271s perform public.createEvent('_main', 'ACCEPT_SET', 271s p_set_id::text, p_old_origin::text, 271s p_new_origin::text, p_wait_seqno::text); 271s end if; 271s 271s -- ---- 271s -- Next we have to reverse the subscription path 271s -- ---- 271s v_sub_last = p_new_origin; 271s select sub_provider into v_sub_node 271s from public.sl_subscribe 271s where sub_set = p_set_id 271s and sub_receiver = p_new_origin; 271s if not found then 271s raise exception 'Slony-I: subscription path broken in moveSet_int'; 271s end if; 271s while v_sub_node <> p_old_origin loop 271s -- ---- 271s -- Tracing node by node, the old receiver is now in 271s -- v_sub_last and the old provider is in v_sub_node. 271s -- ---- 271s 271s -- ---- 271s -- Get the current provider of this node as next 271s -- and change the provider to the previous one in 271s -- the reverse chain. 271s -- ---- 271s select sub_provider into v_sub_next 271s from public.sl_subscribe 271s where sub_set = p_set_id 271s and sub_receiver = v_sub_node 271s for update; 271s if not found then 271s raise exception 'Slony-I: subscription path broken in moveSet_int'; 271s end if; 271s update public.sl_subscribe 271s set sub_provider = v_sub_last 271s where sub_set = p_set_id 271s and sub_receiver = v_sub_node 271s and sub_receiver <> v_sub_last; 271s 271s v_sub_last = v_sub_node; 271s v_sub_node = v_sub_next; 271s end loop; 271s 271s -- ---- 271s -- This includes creating a subscription for the old origin 271s -- ---- 271s insert into public.sl_subscribe 271s (sub_set, sub_provider, sub_receiver, 271s sub_forward, sub_active) 271s values (p_set_id, v_sub_last, p_old_origin, true, true); 271s if v_local_node_id = p_old_origin then 271s select coalesce(max(ev_seqno), 0) into v_last_sync 271s from public.sl_event 271s where ev_origin = p_new_origin 271s and ev_type = 'SYNC'; 271s if v_last_sync > 0 then 271s insert into public.sl_setsync 271s (ssy_setid, ssy_origin, ssy_seqno, 271s ssy_snapshot, ssy_action_list) 271s select p_set_id, p_new_origin, v_last_sync, 271s ev_snapshot, NULL 271s from public.sl_event 271s where ev_origin = p_new_origin 271s and ev_seqno = v_last_sync; 271s else 271s insert into public.sl_setsync 271s (ssy_setid, ssy_origin, ssy_seqno, 271s ssy_snapshot, ssy_action_list) 271s values (p_set_id, p_new_origin, '0', 271s '1:1:', NULL); 271s end if; 271s end if; 271s 271s -- ---- 271s -- Now change the ownership of the set. 271s -- ---- 271s update public.sl_set 271s set set_origin = p_new_origin 271s where set_id = p_set_id; 271s 271s -- ---- 271s -- On the new origin, delete the obsolete setsync information 271s -- and the subscription. 271s -- ---- 271s if v_local_node_id = p_new_origin then 271s delete from public.sl_setsync 271s where ssy_setid = p_set_id; 271s else 271s if v_local_node_id <> p_old_origin then 271s -- 271s -- On every other node, change the setsync so that it will 271s -- pick up from the new origins last known sync. 271s -- 271s delete from public.sl_setsync 271s where ssy_setid = p_set_id; 271s select coalesce(max(ev_seqno), 0) into v_last_sync 271s from public.sl_event 271s where ev_origin = p_new_origin 271s and ev_type = 'SYNC'; 271s if v_last_sync > 0 then 271s insert into public.sl_setsync 271s (ssy_setid, ssy_origin, ssy_seqno, 271s ssy_snapshot, ssy_action_list) 271s select p_set_id, p_new_origin, v_last_sync, 271s ev_snapshot, NULL 271s from public.sl_event 271s where ev_origin = p_new_origin 271s and ev_seqno = v_last_sync; 271s else 271s insert into public.sl_setsync 271s (ssy_setid, ssy_origin, ssy_seqno, 271s ssy_snapshot, ssy_action_list) 271s values (p_set_id, p_new_origin, 271s '0', '1:1:', NULL); 271s end if; 271s end if; 271s end if; 271s delete from public.sl_subscribe 271s where sub_set = p_set_id 271s and sub_receiver = p_new_origin; 271s 271s -- Regenerate sl_listen since we revised the subscriptions 271s perform public.RebuildListenEntries(); 271s 271s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 271s perform public.addPartialLogIndices(); 271s 271s -- ---- 271s -- If we are the new or old origin, we have to 271s -- adjust the log and deny access trigger configuration. 271s -- ---- 271s if v_local_node_id = p_old_origin or v_local_node_id = p_new_origin then 271s for v_tab_row in select tab_id from public.sl_table 271s where tab_set = p_set_id 271s order by tab_id 271s loop 271s perform public.alterTableConfigureTriggers(v_tab_row.tab_id); 271s end loop; 271s end if; 271s 271s return p_set_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.moveSet_int(p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) is 271s 'moveSet(set_id, old_origin, new_origin, wait_seqno) 271s 271s Process MOVE_SET event to request that the origin for set set_id be 271s moved from old_origin to node new_origin'; 271s COMMENT 271s create or replace function public.dropSet (p_set_id int4) 271s returns bigint 271s as $$ 271s declare 271s v_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that the set exists and originates here 271s -- ---- 271s select set_origin into v_origin from public.sl_set 271s where set_id = p_set_id; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_set_id; 271s end if; 271s if v_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: set % does not originate on local node', 271s p_set_id; 271s end if; 271s 271s -- ---- 271s -- Call the internal drop set functionality and generate the event 271s -- ---- 271s perform public.dropSet_int(p_set_id); 271s return public.createEvent('_main', 'DROP_SET', 271s p_set_id::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropSet(p_set_id int4) is 271s 'Generate DROP_SET event to drop replication of set set_id'; 271s COMMENT 271s create or replace function public.dropSet_int (p_set_id int4) 271s returns int4 271s as $$ 271s declare 271s v_tab_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Restore all tables original triggers and rules and remove 271s -- our replication stuff. 271s -- ---- 271s for v_tab_row in select tab_id from public.sl_table 271s where tab_set = p_set_id 271s order by tab_id 271s loop 271s perform public.alterTableDropTriggers(v_tab_row.tab_id); 271s end loop; 271s 271s -- ---- 271s -- Remove all traces of the set configuration 271s -- ---- 271s delete from public.sl_sequence 271s where seq_set = p_set_id; 271s delete from public.sl_table 271s where tab_set = p_set_id; 271s delete from public.sl_subscribe 271s where sub_set = p_set_id; 271s delete from public.sl_setsync 271s where ssy_setid = p_set_id; 271s delete from public.sl_set 271s where set_id = p_set_id; 271s 271s -- Regenerate sl_listen since we revised the subscriptions 271s perform public.RebuildListenEntries(); 271s 271s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 271s perform public.addPartialLogIndices(); 271s 271s return p_set_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.dropSet(p_set_id int4) is 271s 'Process DROP_SET event to drop replication of set set_id. This involves: 271s - Removing log and deny access triggers 271s - Removing all traces of the set configuration, including sequences, tables, subscribers, syncs, and the set itself'; 271s COMMENT 271s create or replace function public.mergeSet (p_set_id int4, p_add_id int4) 271s returns bigint 271s as $$ 271s declare 271s v_origin int4; 271s in_progress boolean; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that both sets exist and originate here 271s -- ---- 271s if p_set_id = p_add_id then 271s raise exception 'Slony-I: merged set ids cannot be identical'; 271s end if; 271s select set_origin into v_origin from public.sl_set 271s where set_id = p_set_id; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_set_id; 271s end if; 271s if v_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: set % does not originate on local node', 271s p_set_id; 271s end if; 271s 271s select set_origin into v_origin from public.sl_set 271s where set_id = p_add_id; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_add_id; 271s end if; 271s if v_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: set % does not originate on local node', 271s p_add_id; 271s end if; 271s 271s -- ---- 271s -- Check that both sets are subscribed by the same set of nodes 271s -- ---- 271s if exists (select true from public.sl_subscribe SUB1 271s where SUB1.sub_set = p_set_id 271s and SUB1.sub_receiver not in (select SUB2.sub_receiver 271s from public.sl_subscribe SUB2 271s where SUB2.sub_set = p_add_id)) 271s then 271s raise exception 'Slony-I: subscriber lists of set % and % are different', 271s p_set_id, p_add_id; 271s end if; 271s 271s if exists (select true from public.sl_subscribe SUB1 271s where SUB1.sub_set = p_add_id 271s and SUB1.sub_receiver not in (select SUB2.sub_receiver 271s from public.sl_subscribe SUB2 271s where SUB2.sub_set = p_set_id)) 271s then 271s raise exception 'Slony-I: subscriber lists of set % and % are different', 271s p_add_id, p_set_id; 271s end if; 271s 271s -- ---- 271s -- Check that all ENABLE_SUBSCRIPTION events for the set are confirmed 271s -- ---- 271s select public.isSubscriptionInProgress(p_add_id) into in_progress ; 271s 271s if in_progress then 271s raise exception 'Slony-I: set % has subscriptions in progress - cannot merge', 271s p_add_id; 271s end if; 271s 271s -- ---- 271s -- Create a SYNC event, merge the sets, create a MERGE_SET event 271s -- ---- 271s perform public.createEvent('_main', 'SYNC', NULL); 271s perform public.mergeSet_int(p_set_id, p_add_id); 271s return public.createEvent('_main', 'MERGE_SET', 271s p_set_id::text, p_add_id::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.mergeSet(p_set_id int4, p_add_id int4) is 271s 'Generate MERGE_SET event to request that sets be merged together. 271s 271s Both sets must exist, and originate on the same node. They must be 271s subscribed by the same set of nodes.'; 271s COMMENT 271s create or replace function public.isSubscriptionInProgress(p_add_id int4) 271s returns boolean 271s as $$ 271s begin 271s if exists (select true from public.sl_event 271s where ev_type = 'ENABLE_SUBSCRIPTION' 271s and ev_data1 = p_add_id::text 271s and ev_seqno > (select max(con_seqno) from public.sl_confirm 271s where con_origin = ev_origin 271s and con_received::text = ev_data3)) 271s then 271s return true; 271s else 271s return false; 271s end if; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.isSubscriptionInProgress(p_add_id int4) is 271s 'Checks to see if a subscription for the indicated set is in progress. 271s Returns true if a subscription is in progress. Otherwise false'; 271s COMMENT 271s create or replace function public.mergeSet_int (p_set_id int4, p_add_id int4) 271s returns int4 271s as $$ 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s update public.sl_sequence 271s set seq_set = p_set_id 271s where seq_set = p_add_id; 271s update public.sl_table 271s set tab_set = p_set_id 271s where tab_set = p_add_id; 271s delete from public.sl_subscribe 271s where sub_set = p_add_id; 271s delete from public.sl_setsync 271s where ssy_setid = p_add_id; 271s delete from public.sl_set 271s where set_id = p_add_id; 271s 271s return p_set_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.mergeSet_int(p_set_id int4, p_add_id int4) is 271s 'mergeSet_int(set_id, add_id) - Perform MERGE_SET event, merging all objects from 271s set add_id into set set_id.'; 271s COMMENT 271s create or replace function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 271s returns bigint 271s as $$ 271s declare 271s v_set_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that we are the origin of the set 271s -- ---- 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = p_set_id; 271s if not found then 271s raise exception 'Slony-I: setAddTable(): set % not found', p_set_id; 271s end if; 271s if v_set_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: setAddTable(): set % has remote origin', p_set_id; 271s end if; 271s 271s if exists (select true from public.sl_subscribe 271s where sub_set = p_set_id) 271s then 271s raise exception 'Slony-I: cannot add table to currently subscribed set % - must attach to an unsubscribed set', 271s p_set_id; 271s end if; 271s 271s -- ---- 271s -- Add the table to the set and generate the SET_ADD_TABLE event 271s -- ---- 271s perform public.setAddTable_int(p_set_id, p_tab_id, p_fqname, 271s p_tab_idxname, p_tab_comment); 271s return public.createEvent('_main', 'SET_ADD_TABLE', 271s p_set_id::text, p_tab_id::text, p_fqname::text, 271s p_tab_idxname::text, p_tab_comment::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 271s 'setAddTable (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 271s 271s Add table tab_fqname to replication set on origin node, and generate 271s SET_ADD_TABLE event to allow this to propagate to other nodes. 271s 271s Note that the table id, tab_id, must be unique ACROSS ALL SETS.'; 271s COMMENT 271s create or replace function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 271s returns int4 271s as $$ 271s declare 271s v_tab_relname name; 271s v_tab_nspname name; 271s v_local_node_id int4; 271s v_set_origin int4; 271s v_sub_provider int4; 271s v_relkind char; 271s v_tab_reloid oid; 271s v_pkcand_nn boolean; 271s v_prec record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- For sets with a remote origin, check that we are subscribed 271s -- to that set. Otherwise we ignore the table because it might 271s -- not even exist in our database. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = p_set_id; 271s if not found then 271s raise exception 'Slony-I: setAddTable_int(): set % not found', 271s p_set_id; 271s end if; 271s if v_set_origin != v_local_node_id then 271s select sub_provider into v_sub_provider 271s from public.sl_subscribe 271s where sub_set = p_set_id 271s and sub_receiver = public.getLocalNodeId('_main'); 271s if not found then 271s return 0; 271s end if; 271s end if; 271s 271s -- ---- 271s -- Get the tables OID and check that it is a real table 271s -- ---- 271s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname into v_tab_reloid, v_relkind, v_tab_relname, v_tab_nspname 271s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where PGC.relnamespace = PGN.oid 271s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 271s '.' || public.slon_quote_brute(PGC.relname); 271s if not found then 271s raise exception 'Slony-I: setAddTable_int(): table % not found', 271s p_fqname; 271s end if; 271s if v_relkind != 'r' then 271s raise exception 'Slony-I: setAddTable_int(): % is not a regular table', 271s p_fqname; 271s end if; 271s 271s if not exists (select indexrelid 271s from "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGC 271s where PGX.indrelid = v_tab_reloid 271s and PGX.indexrelid = PGC.oid 271s and PGC.relname = p_tab_idxname) 271s then 271s raise exception 'Slony-I: setAddTable_int(): table % has no index %', 271s p_fqname, p_tab_idxname; 271s end if; 271s 271s -- ---- 271s -- Verify that the columns in the PK (or candidate) are not NULLABLE 271s -- ---- 271s 271s v_pkcand_nn := 'f'; 271s for v_prec in select attname from "pg_catalog".pg_attribute where attrelid = 271s (select oid from "pg_catalog".pg_class where oid = v_tab_reloid) 271s and attname in (select attname from "pg_catalog".pg_attribute where 271s attrelid = (select oid from "pg_catalog".pg_class PGC, 271s "pg_catalog".pg_index PGX where 271s PGC.relname = p_tab_idxname and PGX.indexrelid=PGC.oid and 271s PGX.indrelid = v_tab_reloid)) and attnotnull <> 't' 271s loop 271s raise notice 'Slony-I: setAddTable_int: table % PK column % nullable', p_fqname, v_prec.attname; 271s v_pkcand_nn := 't'; 271s end loop; 271s if v_pkcand_nn then 271s raise exception 'Slony-I: setAddTable_int: table % not replicable!', p_fqname; 271s end if; 271s 271s select * into v_prec from public.sl_table where tab_id = p_tab_id; 271s if not found then 271s v_pkcand_nn := 't'; -- No-op -- All is well 271s else 271s raise exception 'Slony-I: setAddTable_int: table id % has already been assigned!', p_tab_id; 271s end if; 271s 271s -- ---- 271s -- Add the table to sl_table and create the trigger on it. 271s -- ---- 271s insert into public.sl_table 271s (tab_id, tab_reloid, tab_relname, tab_nspname, 271s tab_set, tab_idxname, tab_altered, tab_comment) 271s values 271s (p_tab_id, v_tab_reloid, v_tab_relname, v_tab_nspname, 271s p_set_id, p_tab_idxname, false, p_tab_comment); 271s perform public.alterTableAddTriggers(p_tab_id); 271s 271s return p_tab_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 271s 'setAddTable_int (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 271s 271s This function processes the SET_ADD_TABLE event on remote nodes, 271s adding a table to replication if the remote node is subscribing to its 271s replication set.'; 271s COMMENT 271s create or replace function public.setDropTable(p_tab_id int4) 271s returns bigint 271s as $$ 271s declare 271s v_set_id int4; 271s v_set_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Determine the set_id 271s -- ---- 271s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 271s 271s -- ---- 271s -- Ensure table exists 271s -- ---- 271s if not found then 271s raise exception 'Slony-I: setDropTable_int(): table % not found', 271s p_tab_id; 271s end if; 271s 271s -- ---- 271s -- Check that we are the origin of the set 271s -- ---- 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = v_set_id; 271s if not found then 271s raise exception 'Slony-I: setDropTable(): set % not found', v_set_id; 271s end if; 271s if v_set_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: setDropTable(): set % has remote origin', v_set_id; 271s end if; 271s 271s -- ---- 271s -- Drop the table from the set and generate the SET_ADD_TABLE event 271s -- ---- 271s perform public.setDropTable_int(p_tab_id); 271s return public.createEvent('_main', 'SET_DROP_TABLE', 271s p_tab_id::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setDropTable(p_tab_id int4) is 271s 'setDropTable (tab_id) 271s 271s Drop table tab_id from set on origin node, and generate SET_DROP_TABLE 271s event to allow this to propagate to other nodes.'; 271s COMMENT 271s create or replace function public.setDropTable_int(p_tab_id int4) 271s returns int4 271s as $$ 271s declare 271s v_set_id int4; 271s v_local_node_id int4; 271s v_set_origin int4; 271s v_sub_provider int4; 271s v_tab_reloid oid; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Determine the set_id 271s -- ---- 271s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 271s 271s -- ---- 271s -- Ensure table exists 271s -- ---- 271s if not found then 271s return 0; 271s end if; 271s 271s -- ---- 271s -- For sets with a remote origin, check that we are subscribed 271s -- to that set. Otherwise we ignore the table because it might 271s -- not even exist in our database. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = v_set_id; 271s if not found then 271s raise exception 'Slony-I: setDropTable_int(): set % not found', 271s v_set_id; 271s end if; 271s if v_set_origin != v_local_node_id then 271s select sub_provider into v_sub_provider 271s from public.sl_subscribe 271s where sub_set = v_set_id 271s and sub_receiver = public.getLocalNodeId('_main'); 271s if not found then 271s return 0; 271s end if; 271s end if; 271s 271s -- ---- 271s -- Drop the table from sl_table and drop trigger from it. 271s -- ---- 271s perform public.alterTableDropTriggers(p_tab_id); 271s delete from public.sl_table where tab_id = p_tab_id; 271s return p_tab_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setDropTable_int(p_tab_id int4) is 271s 'setDropTable_int (tab_id) 271s 271s This function processes the SET_DROP_TABLE event on remote nodes, 271s dropping a table from replication if the remote node is subscribing to 271s its replication set.'; 271s COMMENT 271s create or replace function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 271s returns bigint 271s as $$ 271s declare 271s v_set_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that we are the origin of the set 271s -- ---- 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = p_set_id; 271s if not found then 271s raise exception 'Slony-I: setAddSequence(): set % not found', p_set_id; 271s end if; 271s if v_set_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: setAddSequence(): set % has remote origin - submit to origin node', p_set_id; 271s end if; 271s 271s if exists (select true from public.sl_subscribe 271s where sub_set = p_set_id) 271s then 271s raise exception 'Slony-I: cannot add sequence to currently subscribed set %', 271s p_set_id; 271s end if; 271s 271s -- ---- 271s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 271s -- ---- 271s perform public.setAddSequence_int(p_set_id, p_seq_id, p_fqname, 271s p_seq_comment); 271s return public.createEvent('_main', 'SET_ADD_SEQUENCE', 271s p_set_id::text, p_seq_id::text, 271s p_fqname::text, p_seq_comment::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 271s 'setAddSequence (set_id, seq_id, seq_fqname, seq_comment) 271s 271s On the origin node for set set_id, add sequence seq_fqname to the 271s replication set, and raise SET_ADD_SEQUENCE to cause this to replicate 271s to subscriber nodes.'; 271s COMMENT 271s create or replace function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 271s returns int4 271s as $$ 271s declare 271s v_local_node_id int4; 271s v_set_origin int4; 271s v_sub_provider int4; 271s v_relkind char; 271s v_seq_reloid oid; 271s v_seq_relname name; 271s v_seq_nspname name; 271s v_sync_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- For sets with a remote origin, check that we are subscribed 271s -- to that set. Otherwise we ignore the sequence because it might 271s -- not even exist in our database. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = p_set_id; 271s if not found then 271s raise exception 'Slony-I: setAddSequence_int(): set % not found', 271s p_set_id; 271s end if; 271s if v_set_origin != v_local_node_id then 271s select sub_provider into v_sub_provider 271s from public.sl_subscribe 271s where sub_set = p_set_id 271s and sub_receiver = public.getLocalNodeId('_main'); 271s if not found then 271s return 0; 271s end if; 271s end if; 271s 271s -- ---- 271s -- Get the sequences OID and check that it is a sequence 271s -- ---- 271s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname 271s into v_seq_reloid, v_relkind, v_seq_relname, v_seq_nspname 271s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where PGC.relnamespace = PGN.oid 271s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 271s '.' || public.slon_quote_brute(PGC.relname); 271s if not found then 271s raise exception 'Slony-I: setAddSequence_int(): sequence % not found', 271s p_fqname; 271s end if; 271s if v_relkind != 'S' then 271s raise exception 'Slony-I: setAddSequence_int(): % is not a sequence', 271s p_fqname; 271s end if; 271s 271s select 1 into v_sync_row from public.sl_sequence where seq_id = p_seq_id; 271s if not found then 271s v_relkind := 'o'; -- all is OK 271s else 271s raise exception 'Slony-I: setAddSequence_int(): sequence ID % has already been assigned', p_seq_id; 271s end if; 271s 271s -- ---- 271s -- Add the sequence to sl_sequence 271s -- ---- 271s insert into public.sl_sequence 271s (seq_id, seq_reloid, seq_relname, seq_nspname, seq_set, seq_comment) 271s values 271s (p_seq_id, v_seq_reloid, v_seq_relname, v_seq_nspname, p_set_id, p_seq_comment); 271s 271s -- ---- 271s -- On the set origin, fake a sl_seqlog row for the last sync event 271s -- ---- 271s if v_set_origin = v_local_node_id then 271s for v_sync_row in select coalesce (max(ev_seqno), 0) as ev_seqno 271s from public.sl_event 271s where ev_origin = v_local_node_id 271s and ev_type = 'SYNC' 271s loop 271s insert into public.sl_seqlog 271s (seql_seqid, seql_origin, seql_ev_seqno, 271s seql_last_value) values 271s (p_seq_id, v_local_node_id, v_sync_row.ev_seqno, 271s public.sequenceLastValue(p_fqname)); 271s end loop; 271s end if; 271s 271s return p_seq_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 271s 'setAddSequence_int (set_id, seq_id, seq_fqname, seq_comment) 271s 271s This processes the SET_ADD_SEQUENCE event. On remote nodes that 271s subscribe to set_id, add the sequence to the replication set.'; 271s COMMENT 271s create or replace function public.setDropSequence (p_seq_id int4) 271s returns bigint 271s as $$ 271s declare 271s v_set_id int4; 271s v_set_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Determine set id for this sequence 271s -- ---- 271s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 271s 271s -- ---- 271s -- Ensure sequence exists 271s -- ---- 271s if not found then 271s raise exception 'Slony-I: setDropSequence_int(): sequence % not found', 271s p_seq_id; 271s end if; 271s 271s -- ---- 271s -- Check that we are the origin of the set 271s -- ---- 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = v_set_id; 271s if not found then 271s raise exception 'Slony-I: setDropSequence(): set % not found', v_set_id; 271s end if; 271s if v_set_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: setDropSequence(): set % has origin at another node - submit this to that node', v_set_id; 271s end if; 271s 271s -- ---- 271s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 271s -- ---- 271s perform public.setDropSequence_int(p_seq_id); 271s return public.createEvent('_main', 'SET_DROP_SEQUENCE', 271s p_seq_id::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setDropSequence (p_seq_id int4) is 271s 'setDropSequence (seq_id) 271s 271s On the origin node for the set, drop sequence seq_id from replication 271s set, and raise SET_DROP_SEQUENCE to cause this to replicate to 271s subscriber nodes.'; 271s COMMENT 271s create or replace function public.setDropSequence_int(p_seq_id int4) 271s returns int4 271s as $$ 271s declare 271s v_set_id int4; 271s v_local_node_id int4; 271s v_set_origin int4; 271s v_sub_provider int4; 271s v_relkind char; 271s v_sync_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Determine set id for this sequence 271s -- ---- 271s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 271s 271s -- ---- 271s -- Ensure sequence exists 271s -- ---- 271s if not found then 271s return 0; 271s end if; 271s 271s -- ---- 271s -- For sets with a remote origin, check that we are subscribed 271s -- to that set. Otherwise we ignore the sequence because it might 271s -- not even exist in our database. 271s -- ---- 271s v_local_node_id := public.getLocalNodeId('_main'); 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = v_set_id; 271s if not found then 271s raise exception 'Slony-I: setDropSequence_int(): set % not found', 271s v_set_id; 271s end if; 271s if v_set_origin != v_local_node_id then 271s select sub_provider into v_sub_provider 271s from public.sl_subscribe 271s where sub_set = v_set_id 271s and sub_receiver = public.getLocalNodeId('_main'); 271s if not found then 271s return 0; 271s end if; 271s end if; 271s 271s -- ---- 271s -- drop the sequence from sl_sequence, sl_seqlog 271s -- ---- 271s delete from public.sl_seqlog where seql_seqid = p_seq_id; 271s delete from public.sl_sequence where seq_id = p_seq_id; 271s 271s return p_seq_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setDropSequence_int(p_seq_id int4) is 271s 'setDropSequence_int (seq_id) 271s 271s This processes the SET_DROP_SEQUENCE event. On remote nodes that 271s subscribe to the set containing sequence seq_id, drop the sequence 271s from the replication set.'; 271s COMMENT 271s create or replace function public.setMoveTable (p_tab_id int4, p_new_set_id int4) 271s returns bigint 271s as $$ 271s declare 271s v_old_set_id int4; 271s v_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Get the tables current set 271s -- ---- 271s select tab_set into v_old_set_id from public.sl_table 271s where tab_id = p_tab_id; 271s if not found then 271s raise exception 'Slony-I: table %d not found', p_tab_id; 271s end if; 271s 271s -- ---- 271s -- Check that both sets exist and originate here 271s -- ---- 271s if p_new_set_id = v_old_set_id then 271s raise exception 'Slony-I: set ids cannot be identical'; 271s end if; 271s select set_origin into v_origin from public.sl_set 271s where set_id = p_new_set_id; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_new_set_id; 271s end if; 271s if v_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: set % does not originate on local node', 271s p_new_set_id; 271s end if; 271s 271s select set_origin into v_origin from public.sl_set 271s where set_id = v_old_set_id; 271s if not found then 271s raise exception 'Slony-I: set % not found', v_old_set_id; 271s end if; 271s if v_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: set % does not originate on local node', 271s v_old_set_id; 271s end if; 271s 271s -- ---- 271s -- Check that both sets are subscribed by the same set of nodes 271s -- ---- 271s if exists (select true from public.sl_subscribe SUB1 271s where SUB1.sub_set = p_new_set_id 271s and SUB1.sub_receiver not in (select SUB2.sub_receiver 271s from public.sl_subscribe SUB2 271s where SUB2.sub_set = v_old_set_id)) 271s then 271s raise exception 'Slony-I: subscriber lists of set % and % are different', 271s p_new_set_id, v_old_set_id; 271s end if; 271s 271s if exists (select true from public.sl_subscribe SUB1 271s where SUB1.sub_set = v_old_set_id 271s and SUB1.sub_receiver not in (select SUB2.sub_receiver 271s from public.sl_subscribe SUB2 271s where SUB2.sub_set = p_new_set_id)) 271s then 271s raise exception 'Slony-I: subscriber lists of set % and % are different', 271s v_old_set_id, p_new_set_id; 271s end if; 271s 271s -- ---- 271s -- Change the set the table belongs to 271s -- ---- 271s perform public.createEvent('_main', 'SYNC', NULL); 271s perform public.setMoveTable_int(p_tab_id, p_new_set_id); 271s return public.createEvent('_main', 'SET_MOVE_TABLE', 271s p_tab_id::text, p_new_set_id::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 271s 'This generates the SET_MOVE_TABLE event. If the set that the table is 271s in is identically subscribed to the set that the table is to be moved 271s into, then the SET_MOVE_TABLE event is raised.'; 271s COMMENT 271s create or replace function public.setMoveTable_int (p_tab_id int4, p_new_set_id int4) 271s returns int4 271s as $$ 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Move the table to the new set 271s -- ---- 271s update public.sl_table 271s set tab_set = p_new_set_id 271s where tab_id = p_tab_id; 271s 271s return p_tab_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 271s 'This processes the SET_MOVE_TABLE event. The table is moved 271s to the destination set.'; 271s COMMENT 271s create or replace function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) 271s returns bigint 271s as $$ 271s declare 271s v_old_set_id int4; 271s v_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Get the sequences current set 271s -- ---- 271s select seq_set into v_old_set_id from public.sl_sequence 271s where seq_id = p_seq_id; 271s if not found then 271s raise exception 'Slony-I: setMoveSequence(): sequence %d not found', p_seq_id; 271s end if; 271s 271s -- ---- 271s -- Check that both sets exist and originate here 271s -- ---- 271s if p_new_set_id = v_old_set_id then 271s raise exception 'Slony-I: setMoveSequence(): set ids cannot be identical'; 271s end if; 271s select set_origin into v_origin from public.sl_set 271s where set_id = p_new_set_id; 271s if not found then 271s raise exception 'Slony-I: setMoveSequence(): set % not found', p_new_set_id; 271s end if; 271s if v_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: setMoveSequence(): set % does not originate on local node', 271s p_new_set_id; 271s end if; 271s 271s select set_origin into v_origin from public.sl_set 271s where set_id = v_old_set_id; 271s if not found then 271s raise exception 'Slony-I: set % not found', v_old_set_id; 271s end if; 271s if v_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: set % does not originate on local node', 271s v_old_set_id; 271s end if; 271s 271s -- ---- 271s -- Check that both sets are subscribed by the same set of nodes 271s -- ---- 271s if exists (select true from public.sl_subscribe SUB1 271s where SUB1.sub_set = p_new_set_id 271s and SUB1.sub_receiver not in (select SUB2.sub_receiver 271s from public.sl_subscribe SUB2 271s where SUB2.sub_set = v_old_set_id)) 271s then 271s raise exception 'Slony-I: subscriber lists of set % and % are different', 271s p_new_set_id, v_old_set_id; 271s end if; 271s 271s if exists (select true from public.sl_subscribe SUB1 271s where SUB1.sub_set = v_old_set_id 271s and SUB1.sub_receiver not in (select SUB2.sub_receiver 271s from public.sl_subscribe SUB2 271s where SUB2.sub_set = p_new_set_id)) 271s then 271s raise exception 'Slony-I: subscriber lists of set % and % are different', 271s v_old_set_id, p_new_set_id; 271s end if; 271s 271s -- ---- 271s -- Change the set the sequence belongs to 271s -- ---- 271s perform public.setMoveSequence_int(p_seq_id, p_new_set_id); 271s return public.createEvent('_main', 'SET_MOVE_SEQUENCE', 271s p_seq_id::text, p_new_set_id::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) is 271s 'setMoveSequence(p_seq_id, p_new_set_id) - This generates the 271s SET_MOVE_SEQUENCE event, after validation, notably that both sets 271s exist, are distinct, and have exactly the same subscription lists'; 271s COMMENT 271s create or replace function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) 271s returns int4 271s as $$ 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Move the sequence to the new set 271s -- ---- 271s update public.sl_sequence 271s set seq_set = p_new_set_id 271s where seq_id = p_seq_id; 271s 271s return p_seq_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) is 271s 'setMoveSequence_int(p_seq_id, p_new_set_id) - processes the 271s SET_MOVE_SEQUENCE event, moving a sequence to another replication 271s set.'; 271s COMMENT 271s create or replace function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) returns int4 271s as $$ 271s declare 271s v_fqname text; 271s v_found integer; 271s begin 271s -- ---- 271s -- Get the sequences fully qualified name 271s -- ---- 271s select public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) into v_fqname 271s from public.sl_sequence SQ, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where SQ.seq_id = p_seq_id 271s and SQ.seq_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid; 271s if not found then 271s if p_ignore_missing then 271s return null; 271s end if; 271s raise exception 'Slony-I: sequenceSetValue(): sequence % not found', p_seq_id; 271s end if; 271s 271s -- ---- 271s -- Update it to the new value 271s -- ---- 271s execute 'select setval(''' || v_fqname || 271s ''', ' || p_last_value::text || ')'; 271s 271s if p_ev_seqno is not null then 271s insert into public.sl_seqlog 271s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 271s values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value); 271s end if; 271s return p_seq_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) is 271s 'sequenceSetValue (seq_id, seq_origin, ev_seqno, last_value,ignore_missing) 271s Set sequence seq_id to have new value last_value. 271s '; 271s COMMENT 271s drop function if exists public.ddlCapture (p_statement text, p_nodes text); 271s DROP FUNCTION 271s create or replace function public.ddlCapture (p_statement text, p_nodes text) 271s returns bigint 271s as $$ 271s declare 271s c_local_node integer; 271s c_found_origin boolean; 271s c_node text; 271s c_cmdargs text[]; 271s c_nodeargs text; 271s c_delim text; 271s begin 271s c_local_node := public.getLocalNodeId('_main'); 271s 271s c_cmdargs = array_append('{}'::text[], p_statement); 271s c_nodeargs = ''; 271s if p_nodes is not null then 271s c_found_origin := 'f'; 271s -- p_nodes list needs to consist of a list of nodes that exist 271s -- and that include the current node ID 271s for c_node in select trim(node) from 271s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 271s if not exists 271s (select 1 from public.sl_node 271s where no_id = (c_node::integer)) then 271s raise exception 'ddlcapture(%,%) - node % does not exist!', 271s p_statement, p_nodes, c_node; 271s end if; 271s 271s if c_local_node = (c_node::integer) then 271s c_found_origin := 't'; 271s end if; 271s if length(c_nodeargs)>0 then 271s c_nodeargs = c_nodeargs ||','|| c_node; 271s else 271s c_nodeargs=c_node; 271s end if; 271s end loop; 271s 271s if not c_found_origin then 271s raise exception 271s 'ddlcapture(%,%) - origin node % not included in ONLY ON list!', 271s p_statement, p_nodes, c_local_node; 271s end if; 271s end if; 271s c_cmdargs = array_append(c_cmdargs,c_nodeargs); 271s c_delim=','; 271s c_cmdargs = array_append(c_cmdargs, 271s 271s (select public.string_agg( seq_id::text || c_delim 271s || c_local_node || 271s c_delim || seq_last_value) 271s FROM ( 271s select seq_id, 271s seq_last_value from public.sl_seqlastvalue 271s where seq_origin = c_local_node) as FOO 271s where NOT public.seqtrack(seq_id,seq_last_value) is NULL)); 271s insert into public.sl_log_script 271s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 271s values 271s (c_local_node, pg_catalog.txid_current(), 271s nextval('public.sl_action_seq'), 'S', c_cmdargs); 271s execute p_statement; 271s return currval('public.sl_action_seq'); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.ddlCapture (p_statement text, p_nodes text) is 271s 'Capture an SQL statement (usually DDL) that is to be literally replayed on subscribers'; 271s COMMENT 271s drop function if exists public.ddlScript_complete (int4, text, int4); 271s DROP FUNCTION 271s create or replace function public.ddlScript_complete (p_nodes text) 271s returns bigint 271s as $$ 271s declare 271s c_local_node integer; 271s c_found_origin boolean; 271s c_node text; 271s c_cmdargs text[]; 271s begin 271s c_local_node := public.getLocalNodeId('_main'); 271s 271s c_cmdargs = '{}'::text[]; 271s if p_nodes is not null then 271s c_found_origin := 'f'; 271s -- p_nodes list needs to consist o a list of nodes that exist 271s -- and that include the current node ID 271s for c_node in select trim(node) from 271s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 271s if not exists 271s (select 1 from public.sl_node 271s where no_id = (c_node::integer)) then 271s raise exception 'ddlcapture(%,%) - node % does not exist!', 271s p_statement, p_nodes, c_node; 271s end if; 271s 271s if c_local_node = (c_node::integer) then 271s c_found_origin := 't'; 271s end if; 271s 271s c_cmdargs = array_append(c_cmdargs, c_node); 271s end loop; 271s 271s if not c_found_origin then 271s raise exception 271s 'ddlScript_complete(%) - origin node % not included in ONLY ON list!', 271s p_nodes, c_local_node; 271s end if; 271s end if; 271s 271s perform public.ddlScript_complete_int(); 271s 271s insert into public.sl_log_script 271s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 271s values 271s (c_local_node, pg_catalog.txid_current(), 271s nextval('public.sl_action_seq'), 's', c_cmdargs); 271s 271s return currval('public.sl_action_seq'); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.ddlScript_complete(p_nodes text) is 271s 'ddlScript_complete(set_id, script, only_on_node) 271s 271s After script has run on origin, this fixes up relnames and 271s log trigger arguments and inserts the "fire ddlScript_complete_int() 271s log row into sl_log_script.'; 271s COMMENT 271s drop function if exists public.ddlScript_complete_int(int4, int4); 271s DROP FUNCTION 271s create or replace function public.ddlScript_complete_int () 271s returns int4 271s as $$ 271s begin 271s perform public.updateRelname(); 271s perform public.repair_log_triggers(true); 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.ddlScript_complete_int() is 271s 'ddlScript_complete_int() 271s 271s Complete processing the DDL_SCRIPT event.'; 271s COMMENT 271s create or replace function public.alterTableAddTriggers (p_tab_id int4) 271s returns int4 271s as $$ 271s declare 271s v_no_id int4; 271s v_tab_row record; 271s v_tab_fqname text; 271s v_tab_attkind text; 271s v_n int4; 271s v_trec record; 271s v_tgbad boolean; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Get our local node ID 271s -- ---- 271s v_no_id := public.getLocalNodeId('_main'); 271s 271s -- ---- 271s -- Get the sl_table row and the current origin of the table. 271s -- ---- 271s select T.tab_reloid, T.tab_set, T.tab_idxname, 271s S.set_origin, PGX.indexrelid, 271s public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) as tab_fqname 271s into v_tab_row 271s from public.sl_table T, public.sl_set S, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 271s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 271s where T.tab_id = p_tab_id 271s and T.tab_set = S.set_id 271s and T.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid 271s and PGX.indrelid = T.tab_reloid 271s and PGX.indexrelid = PGXC.oid 271s and PGXC.relname = T.tab_idxname 271s for update; 271s if not found then 271s raise exception 'Slony-I: alterTableAddTriggers(): Table with id % not found', p_tab_id; 271s end if; 271s v_tab_fqname = v_tab_row.tab_fqname; 271s 271s v_tab_attkind := public.determineAttKindUnique(v_tab_row.tab_fqname, 271s v_tab_row.tab_idxname); 271s 271s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 271s 271s -- ---- 271s -- Create the log and the deny access triggers 271s -- ---- 271s execute 'create trigger "_main_logtrigger"' || 271s ' after insert or update or delete on ' || 271s v_tab_fqname || ' for each row execute procedure public.logTrigger (' || 271s pg_catalog.quote_literal('_main') || ',' || 271s pg_catalog.quote_literal(p_tab_id::text) || ',' || 271s pg_catalog.quote_literal(v_tab_attkind) || ');'; 271s 271s execute 'create trigger "_main_denyaccess" ' || 271s 'before insert or update or delete on ' || 271s v_tab_fqname || ' for each row execute procedure ' || 271s 'public.denyAccess (' || pg_catalog.quote_literal('_main') || ');'; 271s 271s perform public.alterTableAddTruncateTrigger(v_tab_fqname, p_tab_id); 271s 271s perform public.alterTableConfigureTriggers (p_tab_id); 271s return p_tab_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.alterTableAddTriggers(p_tab_id int4) is 271s 'alterTableAddTriggers(tab_id) 271s 271s Adds the log and deny access triggers to a replicated table.'; 271s COMMENT 271s create or replace function public.alterTableDropTriggers (p_tab_id int4) 271s returns int4 271s as $$ 271s declare 271s v_no_id int4; 271s v_tab_row record; 271s v_tab_fqname text; 271s v_n int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Get our local node ID 271s -- ---- 271s v_no_id := public.getLocalNodeId('_main'); 271s 271s -- ---- 271s -- Get the sl_table row and the current tables origin. 271s -- ---- 271s select T.tab_reloid, T.tab_set, 271s S.set_origin, PGX.indexrelid, 271s public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) as tab_fqname 271s into v_tab_row 271s from public.sl_table T, public.sl_set S, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 271s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 271s where T.tab_id = p_tab_id 271s and T.tab_set = S.set_id 271s and T.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid 271s and PGX.indrelid = T.tab_reloid 271s and PGX.indexrelid = PGXC.oid 271s and PGXC.relname = T.tab_idxname 271s for update; 271s if not found then 271s raise exception 'Slony-I: alterTableDropTriggers(): Table with id % not found', p_tab_id; 271s end if; 271s v_tab_fqname = v_tab_row.tab_fqname; 271s 271s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 271s 271s -- ---- 271s -- Drop both triggers 271s -- ---- 271s execute 'drop trigger "_main_logtrigger" on ' || 271s v_tab_fqname; 271s 271s execute 'drop trigger "_main_denyaccess" on ' || 271s v_tab_fqname; 271s 271s perform public.alterTableDropTruncateTrigger(v_tab_fqname, p_tab_id); 271s 271s return p_tab_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.alterTableDropTriggers (p_tab_id int4) is 271s 'alterTableDropTriggers (tab_id) 271s 271s Remove the log and deny access triggers from a table.'; 271s COMMENT 271s create or replace function public.alterTableConfigureTriggers (p_tab_id int4) 271s returns int4 271s as $$ 271s declare 271s v_no_id int4; 271s v_tab_row record; 271s v_tab_fqname text; 271s v_n int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Get our local node ID 271s -- ---- 271s v_no_id := public.getLocalNodeId('_main'); 271s 271s -- ---- 271s -- Get the sl_table row and the current tables origin. 271s -- ---- 271s select T.tab_reloid, T.tab_set, 271s S.set_origin, PGX.indexrelid, 271s public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) as tab_fqname 271s into v_tab_row 271s from public.sl_table T, public.sl_set S, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 271s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 271s where T.tab_id = p_tab_id 271s and T.tab_set = S.set_id 271s and T.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid 271s and PGX.indrelid = T.tab_reloid 271s and PGX.indexrelid = PGXC.oid 271s and PGXC.relname = T.tab_idxname 271s for update; 271s if not found then 271s raise exception 'Slony-I: alterTableConfigureTriggers(): Table with id % not found', p_tab_id; 271s end if; 271s v_tab_fqname = v_tab_row.tab_fqname; 271s 271s -- ---- 271s -- Configuration depends on the origin of the table 271s -- ---- 271s if v_tab_row.set_origin = v_no_id then 271s -- ---- 271s -- On the origin the log trigger is configured like a default 271s -- user trigger and the deny access trigger is disabled. 271s -- ---- 271s execute 'alter table ' || v_tab_fqname || 271s ' enable trigger "_main_logtrigger"'; 271s execute 'alter table ' || v_tab_fqname || 271s ' disable trigger "_main_denyaccess"'; 271s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 271s 'enable', 'disable'); 271s else 271s -- ---- 271s -- On a replica the log trigger is disabled and the 271s -- deny access trigger fires in origin session role. 271s -- ---- 271s execute 'alter table ' || v_tab_fqname || 271s ' disable trigger "_main_logtrigger"'; 271s execute 'alter table ' || v_tab_fqname || 271s ' enable trigger "_main_denyaccess"'; 271s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 271s 'disable', 'enable'); 271s 271s end if; 271s 271s return p_tab_id; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.alterTableConfigureTriggers (p_tab_id int4) is 271s 'alterTableConfigureTriggers (tab_id) 271s 271s Set the enable/disable configuration for the replication triggers 271s according to the origin of the set.'; 271s COMMENT 271s create or replace function public.resubscribeNode (p_origin int4, 271s p_provider int4, p_receiver int4) 271s returns bigint 271s as $$ 271s declare 271s v_record record; 271s v_missing_sets text; 271s v_ev_seqno bigint; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- 271s -- Check that the receiver exists 271s -- 271s if not exists (select no_id from public.sl_node where no_id= 271s p_receiver) then 271s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_receiver; 271s end if; 271s 271s -- 271s -- Check that the provider exists 271s -- 271s if not exists (select no_id from public.sl_node where no_id= 271s p_provider) then 271s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_provider; 271s end if; 271s 271s 271s -- ---- 271s -- Check that this is called on the origin node 271s -- ---- 271s if p_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: subscribeSet() must be called on origin'; 271s end if; 271s 271s -- --- 271s -- Verify that the provider is either the origin or an active subscriber 271s -- Bug report #1362 271s -- --- 271s if p_origin <> p_provider then 271s for v_record in select sub1.sub_set from 271s public.sl_subscribe sub1 271s left outer join (public.sl_subscribe sub2 271s inner join 271s public.sl_set on ( 271s sl_set.set_id=sub2.sub_set 271s and sub2.sub_set=p_origin) 271s ) 271s ON ( sub1.sub_set = sub2.sub_set and 271s sub1.sub_receiver = p_provider and 271s sub1.sub_forward and sub1.sub_active 271s and sub2.sub_receiver=p_receiver) 271s 271s where sub2.sub_set is null 271s loop 271s v_missing_sets=v_missing_sets || ' ' || v_record.sub_set; 271s end loop; 271s if v_missing_sets is not null then 271s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, v_missing_sets; 271s end if; 271s end if; 271s 271s for v_record in select * from 271s public.sl_subscribe, public.sl_set where 271s sub_set=set_id and 271s sub_receiver=p_receiver 271s and set_origin=p_origin 271s loop 271s -- ---- 271s -- Create the SUBSCRIBE_SET event 271s -- ---- 271s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 271s v_record.sub_set::text, p_provider::text, p_receiver::text, 271s case v_record.sub_forward when true then 't' else 'f' end, 271s 'f' ); 271s 271s -- ---- 271s -- Call the internal procedure to store the subscription 271s -- ---- 271s perform public.subscribeSet_int(v_record.sub_set, 271s p_provider, 271s p_receiver, v_record.sub_forward, false); 271s end loop; 271s 271s return v_ev_seqno; 271s end; 271s $$ 271s language plpgsql; 271s CREATE FUNCTION 271s create or replace function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 271s returns bigint 271s as $$ 271s declare 271s v_set_origin int4; 271s v_ev_seqno int8; 271s v_ev_seqno2 int8; 271s v_rec record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- 271s -- Check that the receiver exists 271s -- 271s if not exists (select no_id from public.sl_node where no_id= 271s p_sub_receiver) then 271s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_sub_receiver; 271s end if; 271s 271s -- 271s -- Check that the provider exists 271s -- 271s if not exists (select no_id from public.sl_node where no_id= 271s p_sub_provider) then 271s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_sub_provider; 271s end if; 271s 271s -- ---- 271s -- Check that the origin and provider of the set are remote 271s -- ---- 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = p_sub_set; 271s if not found then 271s raise exception 'Slony-I: subscribeSet(): set % not found', p_sub_set; 271s end if; 271s if v_set_origin = p_sub_receiver then 271s raise exception 271s 'Slony-I: subscribeSet(): set origin and receiver cannot be identical'; 271s end if; 271s if p_sub_receiver = p_sub_provider then 271s raise exception 271s 'Slony-I: subscribeSet(): set provider and receiver cannot be identical'; 271s end if; 271s -- ---- 271s -- Check that this is called on the origin node 271s -- ---- 271s if v_set_origin != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: subscribeSet() must be called on origin'; 271s end if; 271s 271s -- --- 271s -- Verify that the provider is either the origin or an active subscriber 271s -- Bug report #1362 271s -- --- 271s if v_set_origin <> p_sub_provider then 271s if not exists (select 1 from public.sl_subscribe 271s where sub_set = p_sub_set and 271s sub_receiver = p_sub_provider and 271s sub_forward and sub_active) then 271s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, p_sub_set; 271s end if; 271s end if; 271s 271s -- --- 271s -- Enforce that all sets from one origin are subscribed 271s -- using the same data provider per receiver. 271s -- ---- 271s if not exists (select 1 from public.sl_subscribe 271s where sub_set = p_sub_set and sub_receiver = p_sub_receiver) then 271s -- 271s -- New subscription - error out if we have any other subscription 271s -- from that origin with a different data provider. 271s -- 271s for v_rec in select sub_provider from public.sl_subscribe 271s join public.sl_set on set_id = sub_set 271s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 271s loop 271s if v_rec.sub_provider <> p_sub_provider then 271s raise exception 'Slony-I: subscribeSet(): wrong provider % - existing subscription from origin % users provider %', 271s p_sub_provider, v_set_origin, v_rec.sub_provider; 271s end if; 271s end loop; 271s else 271s -- 271s -- Existing subscription - in case the data provider changes and 271s -- there are other subscriptions, warn here. subscribeSet_int() 271s -- will currently change the data provider for those sets as well. 271s -- 271s for v_rec in select set_id, sub_provider from public.sl_subscribe 271s join public.sl_set on set_id = sub_set 271s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 271s and set_id <> p_sub_set 271s loop 271s if v_rec.sub_provider <> p_sub_provider then 271s raise exception 'Slony-I: subscribeSet(): also data provider for set % use resubscribe instead', 271s v_rec.set_id; 271s end if; 271s end loop; 271s end if; 271s 271s -- ---- 271s -- Create the SUBSCRIBE_SET event 271s -- ---- 271s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 271s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 271s case p_sub_forward when true then 't' else 'f' end, 271s case p_omit_copy when true then 't' else 'f' end 271s ); 271s 271s -- ---- 271s -- Call the internal procedure to store the subscription 271s -- ---- 271s v_ev_seqno2:=public.subscribeSet_int(p_sub_set, p_sub_provider, 271s p_sub_receiver, p_sub_forward, p_omit_copy); 271s 271s if v_ev_seqno2 is not null then 271s v_ev_seqno:=v_ev_seqno2; 271s end if; 271s 271s return v_ev_seqno; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 271s 'subscribeSet (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 271s 271s Makes sure that the receiver is not the provider, then stores the 271s subscription, and publishes the SUBSCRIBE_SET event to other nodes. 271s 271s If omit_copy is true, then no data copy will be done. 271s '; 271s COMMENT 271s DROP FUNCTION IF EXISTS public.subscribeSet_int(int4,int4,int4,bool,bool); 271s DROP FUNCTION 271s create or replace function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 271s returns int4 271s as $$ 271s declare 271s v_set_origin int4; 271s v_sub_row record; 271s v_seq_id bigint; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Lookup the set origin 271s -- ---- 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = p_sub_set; 271s if not found then 271s raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set; 271s end if; 271s 271s -- ---- 271s -- Provider change is only allowed for active sets 271s -- ---- 271s if p_sub_receiver = public.getLocalNodeId('_main') then 271s select sub_active into v_sub_row from public.sl_subscribe 271s where sub_set = p_sub_set 271s and sub_receiver = p_sub_receiver; 271s if found then 271s if not v_sub_row.sub_active then 271s raise exception 'Slony-I: subscribeSet_int(): set % is not active, cannot change provider', 271s p_sub_set; 271s end if; 271s end if; 271s end if; 271s 271s -- ---- 271s -- Try to change provider and/or forward for an existing subscription 271s -- ---- 271s update public.sl_subscribe 271s set sub_provider = p_sub_provider, 271s sub_forward = p_sub_forward 271s where sub_set = p_sub_set 271s and sub_receiver = p_sub_receiver; 271s if found then 271s 271s -- ---- 271s -- This is changing a subscriptoin. Make sure all sets from 271s -- this origin are subscribed using the same data provider. 271s -- For this we first check that the requested data provider 271s -- is subscribed to all the sets, the receiver is subscribed to. 271s -- ---- 271s for v_sub_row in select set_id from public.sl_set 271s join public.sl_subscribe on set_id = sub_set 271s where set_origin = v_set_origin 271s and sub_receiver = p_sub_receiver 271s and sub_set <> p_sub_set 271s loop 271s if not exists (select 1 from public.sl_subscribe 271s where sub_set = v_sub_row.set_id 271s and sub_receiver = p_sub_provider 271s and sub_active and sub_forward) 271s and not exists (select 1 from public.sl_set 271s where set_id = v_sub_row.set_id 271s and set_origin = p_sub_provider) 271s then 271s raise exception 'Slony-I: subscribeSet_int(): node % is not a forwarding subscriber for set %', 271s p_sub_provider, v_sub_row.set_id; 271s end if; 271s 271s -- ---- 271s -- New data provider offers this set as well, change that 271s -- subscription too. 271s -- ---- 271s update public.sl_subscribe 271s set sub_provider = p_sub_provider 271s where sub_set = v_sub_row.set_id 271s and sub_receiver = p_sub_receiver; 271s end loop; 271s 271s -- ---- 271s -- Rewrite sl_listen table 271s -- ---- 271s perform public.RebuildListenEntries(); 271s 271s return p_sub_set; 271s end if; 271s 271s -- ---- 271s -- Not found, insert a new one 271s -- ---- 271s if not exists (select true from public.sl_path 271s where pa_server = p_sub_provider 271s and pa_client = p_sub_receiver) 271s then 271s insert into public.sl_path 271s (pa_server, pa_client, pa_conninfo, pa_connretry) 271s values 271s (p_sub_provider, p_sub_receiver, 271s '', 10); 271s end if; 271s insert into public.sl_subscribe 271s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 271s values (p_sub_set, p_sub_provider, p_sub_receiver, 271s p_sub_forward, false); 271s 271s -- ---- 271s -- If the set origin is here, then enable the subscription 271s -- ---- 271s if v_set_origin = public.getLocalNodeId('_main') then 271s select public.createEvent('_main', 'ENABLE_SUBSCRIPTION', 271s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 271s case p_sub_forward when true then 't' else 'f' end, 271s case p_omit_copy when true then 't' else 'f' end 271s ) into v_seq_id; 271s perform public.enableSubscription(p_sub_set, 271s p_sub_provider, p_sub_receiver); 271s end if; 271s 271s -- ---- 271s -- Rewrite sl_listen table 271s -- ---- 271s perform public.RebuildListenEntries(); 271s 271s return p_sub_set; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 271s 'subscribeSet_int (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 271s 271s Internal actions for subscribing receiver sub_receiver to subscription 271s set sub_set.'; 271s COMMENT 271s drop function IF EXISTS public.unsubscribeSet(int4,int4,boolean); 271s DROP FUNCTION 271s create or replace function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,p_force boolean) 271s returns bigint 271s as $$ 271s declare 271s v_tab_row record; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- Check that this is called on the receiver node 271s -- ---- 271s if p_sub_receiver != public.getLocalNodeId('_main') then 271s raise exception 'Slony-I: unsubscribeSet() must be called on receiver'; 271s end if; 271s 271s 271s 271s -- ---- 271s -- Check that this does not break any chains 271s -- ---- 271s if p_force=false and exists (select true from public.sl_subscribe 271s where sub_set = p_sub_set 271s and sub_provider = p_sub_receiver) 271s then 271s raise exception 'Slony-I: Cannot unsubscribe set % while being provider', 271s p_sub_set; 271s end if; 271s 271s if exists (select true from public.sl_subscribe 271s where sub_set = p_sub_set 271s and sub_provider = p_sub_receiver) 271s then 271s --delete the receivers of this provider. 271s --unsubscribeSet_int() will generate the event 271s --when it runs on the receiver. 271s delete from public.sl_subscribe 271s where sub_set=p_sub_set 271s and sub_provider=p_sub_receiver; 271s end if; 271s 271s -- ---- 271s -- Remove the replication triggers. 271s -- ---- 271s for v_tab_row in select tab_id from public.sl_table 271s where tab_set = p_sub_set 271s order by tab_id 271s loop 271s perform public.alterTableDropTriggers(v_tab_row.tab_id); 271s end loop; 271s 271s -- ---- 271s -- Remove the setsync status. This will also cause the 271s -- worker thread to ignore the set and stop replicating 271s -- right now. 271s -- ---- 271s delete from public.sl_setsync 271s where ssy_setid = p_sub_set; 271s 271s -- ---- 271s -- Remove all sl_table and sl_sequence entries for this set. 271s -- Should we ever subscribe again, the initial data 271s -- copy process will create new ones. 271s -- ---- 271s delete from public.sl_table 271s where tab_set = p_sub_set; 271s delete from public.sl_sequence 271s where seq_set = p_sub_set; 271s 271s -- ---- 271s -- Call the internal procedure to drop the subscription 271s -- ---- 271s perform public.unsubscribeSet_int(p_sub_set, p_sub_receiver); 271s 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s -- ---- 271s -- Create the UNSUBSCRIBE_SET event 271s -- ---- 271s return public.createEvent('_main', 'UNSUBSCRIBE_SET', 271s p_sub_set::text, p_sub_receiver::text); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,force boolean) is 271s 'unsubscribeSet (sub_set, sub_receiver,force) 271s 271s Unsubscribe node sub_receiver from subscription set sub_set. This is 271s invoked on the receiver node. It verifies that this does not break 271s any chains (e.g. - where sub_receiver is a provider for another node), 271s then restores tables, drops Slony-specific keys, drops table entries 271s for the set, drops the subscription, and generates an UNSUBSCRIBE_SET 271s node to publish that the node is being dropped.'; 271s COMMENT 271s create or replace function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) 271s returns int4 271s as $$ 271s declare 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- All the real work is done before event generation on the 271s -- subscriber. 271s -- ---- 271s 271s --if this event unsubscribes the provider of this node 271s --then this node should unsubscribe itself from the set as well. 271s 271s if exists (select true from 271s public.sl_subscribe where 271s sub_set=p_sub_set and sub_provider=p_sub_receiver 271s and sub_receiver=public.getLocalNodeId('_main')) 271s then 271s perform public.unsubscribeSet(p_sub_set,public.getLocalNodeId('_main'),true); 271s end if; 271s 271s 271s delete from public.sl_subscribe 271s where sub_set = p_sub_set 271s and sub_receiver = p_sub_receiver; 271s 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s return p_sub_set; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) is 271s 'unsubscribeSet_int (sub_set, sub_receiver) 271s 271s All the REAL work of removing the subscriber is done before the event 271s is generated, so this function just has to drop the references to the 271s subscription in sl_subscribe.'; 271s COMMENT 271s create or replace function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 271s returns int4 271s as $$ 271s begin 271s return public.enableSubscription_int (p_sub_set, 271s p_sub_provider, p_sub_receiver); 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 271s 'enableSubscription (sub_set, sub_provider, sub_receiver) 271s 271s Indicates that sub_receiver intends subscribing to set sub_set from 271s sub_provider. Work is all done by the internal function 271s enableSubscription_int (sub_set, sub_provider, sub_receiver).'; 271s COMMENT 271s create or replace function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 271s returns int4 271s as $$ 271s declare 271s v_n int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- ---- 271s -- The real work is done in the replication engine. All 271s -- we have to do here is remembering that it happened. 271s -- ---- 271s 271s -- ---- 271s -- Well, not only ... we might be missing an important event here 271s -- ---- 271s if not exists (select true from public.sl_path 271s where pa_server = p_sub_provider 271s and pa_client = p_sub_receiver) 271s then 271s insert into public.sl_path 271s (pa_server, pa_client, pa_conninfo, pa_connretry) 271s values 271s (p_sub_provider, p_sub_receiver, 271s '', 10); 271s end if; 271s 271s update public.sl_subscribe 271s set sub_active = 't' 271s where sub_set = p_sub_set 271s and sub_receiver = p_sub_receiver; 271s get diagnostics v_n = row_count; 271s if v_n = 0 then 271s insert into public.sl_subscribe 271s (sub_set, sub_provider, sub_receiver, 271s sub_forward, sub_active) 271s values 271s (p_sub_set, p_sub_provider, p_sub_receiver, 271s false, true); 271s end if; 271s 271s -- Rewrite sl_listen table 271s perform public.RebuildListenEntries(); 271s 271s return p_sub_set; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 271s 'enableSubscription_int (sub_set, sub_provider, sub_receiver) 271s 271s Internal function to enable subscription of node sub_receiver to set 271s sub_set via node sub_provider. 271s 271s slon does most of the work; all we need do here is to remember that it 271s happened. The function updates sl_subscribe, indicating that the 271s subscription has become active.'; 271s COMMENT 271s create or replace function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) 271s returns bigint 271s as $$ 271s declare 271s v_max_seqno bigint; 271s begin 271s select into v_max_seqno coalesce(max(con_seqno), 0) 271s from public.sl_confirm 271s where con_origin = p_con_origin 271s and con_received = p_con_received; 271s if v_max_seqno < p_con_seqno then 271s insert into public.sl_confirm 271s (con_origin, con_received, con_seqno, con_timestamp) 271s values (p_con_origin, p_con_received, p_con_seqno, 271s p_con_timestamp); 271s v_max_seqno = p_con_seqno; 271s end if; 271s 271s return v_max_seqno; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) is 271s 'forwardConfirm (p_con_origin, p_con_received, p_con_seqno, p_con_timestamp) 271s 271s Confirms (recorded in sl_confirm) that items from p_con_origin up to 271s p_con_seqno have been received by node p_con_received as of 271s p_con_timestamp, and raises an event to forward this confirmation.'; 271s COMMENT 271s create or replace function public.cleanupEvent (p_interval interval) 271s returns int4 271s as $$ 271s declare 271s v_max_row record; 271s v_min_row record; 271s v_max_sync int8; 271s v_origin int8; 271s v_seqno int8; 271s v_xmin bigint; 271s v_rc int8; 271s begin 271s -- ---- 271s -- First remove all confirmations where origin/receiver no longer exist 271s -- ---- 271s delete from public.sl_confirm 271s where con_origin not in (select no_id from public.sl_node); 271s delete from public.sl_confirm 271s where con_received not in (select no_id from public.sl_node); 271s -- ---- 271s -- Next remove all but the oldest confirm row per origin,receiver pair. 271s -- Ignore confirmations that are younger than 10 minutes. We currently 271s -- have an not confirmed suspicion that a possibly lost transaction due 271s -- to a server crash might have been visible to another session, and 271s -- that this led to log data that is needed again got removed. 271s -- ---- 271s for v_max_row in select con_origin, con_received, max(con_seqno) as con_seqno 271s from public.sl_confirm 271s where con_timestamp < (CURRENT_TIMESTAMP - p_interval) 271s group by con_origin, con_received 271s loop 271s delete from public.sl_confirm 271s where con_origin = v_max_row.con_origin 271s and con_received = v_max_row.con_received 271s and con_seqno < v_max_row.con_seqno; 271s end loop; 271s 271s -- ---- 271s -- Then remove all events that are confirmed by all nodes in the 271s -- whole cluster up to the last SYNC 271s -- ---- 271s for v_min_row in select con_origin, min(con_seqno) as con_seqno 271s from public.sl_confirm 271s group by con_origin 271s loop 271s select coalesce(max(ev_seqno), 0) into v_max_sync 271s from public.sl_event 271s where ev_origin = v_min_row.con_origin 271s and ev_seqno <= v_min_row.con_seqno 271s and ev_type = 'SYNC'; 271s if v_max_sync > 0 then 271s delete from public.sl_event 271s where ev_origin = v_min_row.con_origin 271s and ev_seqno < v_max_sync; 271s end if; 271s end loop; 271s 271s -- ---- 271s -- If cluster has only one node, then remove all events up to 271s -- the last SYNC - Bug #1538 271s -- http://gborg.postgresql.org/project/slony1/bugs/bugupdate.php?1538 271s -- ---- 271s 271s select * into v_min_row from public.sl_node where 271s no_id <> public.getLocalNodeId('_main') limit 1; 271s if not found then 271s select ev_origin, ev_seqno into v_min_row from public.sl_event 271s where ev_origin = public.getLocalNodeId('_main') 271s order by ev_origin desc, ev_seqno desc limit 1; 271s raise notice 'Slony-I: cleanupEvent(): Single node - deleting events < %', v_min_row.ev_seqno; 271s delete from public.sl_event 271s where 271s ev_origin = v_min_row.ev_origin and 271s ev_seqno < v_min_row.ev_seqno; 271s 271s end if; 271s 271s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_seqlog' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 271s execute 'alter table public.sl_seqlog set without oids;'; 271s end if; 271s -- ---- 271s -- Also remove stale entries from the nodelock table. 271s -- ---- 271s perform public.cleanupNodelock(); 271s 271s -- ---- 271s -- Find the eldest event left, for each origin 271s -- ---- 271s for v_origin, v_seqno, v_xmin in 271s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 271s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 271s loop 271s delete from public.sl_seqlog where seql_origin = v_origin and seql_ev_seqno < v_seqno; 271s delete from public.sl_log_script where log_origin = v_origin and log_txid < v_xmin; 271s end loop; 271s 271s v_rc := public.logswitch_finish(); 271s if v_rc = 0 then -- no switch in progress 271s perform public.logswitch_start(); 271s end if; 271s 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.cleanupEvent (p_interval interval) is 271s 'cleaning old data out of sl_confirm, sl_event. Removes all but the 271s last sl_confirm row per (origin,receiver), and then removes all events 271s that are confirmed by all nodes in the whole cluster up to the last 271s SYNC.'; 271s COMMENT 271s create or replace function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) returns name 271s as $$ 271s declare 271s v_tab_fqname_quoted text default ''; 271s v_idxrow record; 271s begin 271s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 271s -- 271s -- Ensure that the table exists 271s -- 271s if (select PGC.relname 271s from "pg_catalog".pg_class PGC, 271s "pg_catalog".pg_namespace PGN 271s where public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 271s and PGN.oid = PGC.relnamespace) is null then 271s raise exception 'Slony-I: determineIdxnameUnique(): table % not found', v_tab_fqname_quoted; 271s end if; 271s 271s -- 271s -- Lookup the tables primary key or the specified unique index 271s -- 271s if p_idx_name isnull then 271s select PGXC.relname 271s into v_idxrow 271s from "pg_catalog".pg_class PGC, 271s "pg_catalog".pg_namespace PGN, 271s "pg_catalog".pg_index PGX, 271s "pg_catalog".pg_class PGXC 271s where public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 271s and PGN.oid = PGC.relnamespace 271s and PGX.indrelid = PGC.oid 271s and PGX.indexrelid = PGXC.oid 271s and PGX.indisprimary; 271s if not found then 271s raise exception 'Slony-I: table % has no primary key', 271s v_tab_fqname_quoted; 271s end if; 271s else 271s select PGXC.relname 271s into v_idxrow 271s from "pg_catalog".pg_class PGC, 271s "pg_catalog".pg_namespace PGN, 271s "pg_catalog".pg_index PGX, 271s "pg_catalog".pg_class PGXC 271s where public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 271s and PGN.oid = PGC.relnamespace 271s and PGX.indrelid = PGC.oid 271s and PGX.indexrelid = PGXC.oid 271s and PGX.indisunique 271s and public.slon_quote_brute(PGXC.relname) = public.slon_quote_input(p_idx_name); 271s if not found then 271s raise exception 'Slony-I: table % has no unique index %', 271s v_tab_fqname_quoted, p_idx_name; 271s end if; 271s end if; 271s 271s -- 271s -- Return the found index name 271s -- 271s return v_idxrow.relname; 271s end; 271s $$ language plpgsql called on null input; 271s CREATE FUNCTION 271s comment on function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) is 271s 'FUNCTION determineIdxnameUnique (tab_fqname, indexname) 271s 271s Given a tablename, tab_fqname, check that the unique index, indexname, 271s exists or return the primary key index name for the table. If there 271s is no unique index, it raises an exception.'; 271s COMMENT 271s create or replace function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) returns text 271s as $$ 271s declare 271s v_tab_fqname_quoted text default ''; 271s v_idx_name_quoted text; 271s v_idxrow record; 271s v_attrow record; 271s v_i integer; 271s v_attno int2; 271s v_attkind text default ''; 271s v_attfound bool; 271s begin 271s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 271s v_idx_name_quoted := public.slon_quote_brute(p_idx_name); 271s -- 271s -- Ensure that the table exists 271s -- 271s if (select PGC.relname 271s from "pg_catalog".pg_class PGC, 271s "pg_catalog".pg_namespace PGN 271s where public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 271s and PGN.oid = PGC.relnamespace) is null then 271s raise exception 'Slony-I: table % not found', v_tab_fqname_quoted; 271s end if; 271s 271s -- 271s -- Lookup the tables primary key or the specified unique index 271s -- 271s if p_idx_name isnull then 271s raise exception 'Slony-I: index name must be specified'; 271s else 271s select PGXC.relname, PGX.indexrelid, PGX.indkey 271s into v_idxrow 271s from "pg_catalog".pg_class PGC, 271s "pg_catalog".pg_namespace PGN, 271s "pg_catalog".pg_index PGX, 271s "pg_catalog".pg_class PGXC 271s where public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 271s and PGN.oid = PGC.relnamespace 271s and PGX.indrelid = PGC.oid 271s and PGX.indexrelid = PGXC.oid 271s and PGX.indisunique 271s and public.slon_quote_brute(PGXC.relname) = v_idx_name_quoted; 271s if not found then 271s raise exception 'Slony-I: table % has no unique index %', 271s v_tab_fqname_quoted, v_idx_name_quoted; 271s end if; 271s end if; 271s 271s -- 271s -- Loop over the tables attributes and check if they are 271s -- index attributes. If so, add a "k" to the return value, 271s -- otherwise add a "v". 271s -- 271s for v_attrow in select PGA.attnum, PGA.attname 271s from "pg_catalog".pg_class PGC, 271s "pg_catalog".pg_namespace PGN, 271s "pg_catalog".pg_attribute PGA 271s where public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 271s and PGN.oid = PGC.relnamespace 271s and PGA.attrelid = PGC.oid 271s and not PGA.attisdropped 271s and PGA.attnum > 0 271s order by attnum 271s loop 271s v_attfound = 'f'; 271s 271s v_i := 0; 271s loop 271s select indkey[v_i] into v_attno from "pg_catalog".pg_index 271s where indexrelid = v_idxrow.indexrelid; 271s if v_attno isnull or v_attno = 0 then 271s exit; 271s end if; 271s if v_attrow.attnum = v_attno then 271s v_attfound = 't'; 271s exit; 271s end if; 271s v_i := v_i + 1; 271s end loop; 271s 271s if v_attfound then 271s v_attkind := v_attkind || 'k'; 271s else 271s v_attkind := v_attkind || 'v'; 271s end if; 271s end loop; 271s 271s -- Strip off trailing v characters as they are not needed by the logtrigger 271s v_attkind := pg_catalog.rtrim(v_attkind, 'v'); 271s 271s -- 271s -- Return the resulting attkind 271s -- 271s return v_attkind; 271s end; 271s $$ language plpgsql called on null input; 271s CREATE FUNCTION 271s comment on function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) is 271s 'determineAttKindUnique (tab_fqname, indexname) 271s 271s Given a tablename, return the Slony-I specific attkind (used for the 271s log trigger) of the table. Use the specified unique index or the 271s primary key (if indexname is NULL).'; 271s COMMENT 271s create or replace function public.RebuildListenEntries() 271s returns int 271s as $$ 271s declare 271s v_row record; 271s v_cnt integer; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s -- First remove the entire configuration 271s delete from public.sl_listen; 271s 271s -- Second populate the sl_listen configuration with a full 271s -- network of all possible paths. 271s insert into public.sl_listen 271s (li_origin, li_provider, li_receiver) 271s select pa_server, pa_server, pa_client from public.sl_path; 271s while true loop 271s insert into public.sl_listen 271s (li_origin, li_provider, li_receiver) 271s select distinct li_origin, pa_server, pa_client 271s from public.sl_listen, public.sl_path 271s where li_receiver = pa_server 271s and li_origin <> pa_client 271s and pa_conninfo<>'' 271s except 271s select li_origin, li_provider, li_receiver 271s from public.sl_listen; 271s 271s if not found then 271s exit; 271s end if; 271s end loop; 271s 271s -- We now replace specific event-origin,receiver combinations 271s -- with a configuration that tries to avoid events arriving at 271s -- a node before the data provider actually has the data ready. 271s 271s -- Loop over every possible pair of receiver and event origin 271s for v_row in select N1.no_id as receiver, N2.no_id as origin, 271s N2.no_failed as failed 271s from public.sl_node as N1, public.sl_node as N2 271s where N1.no_id <> N2.no_id 271s loop 271s -- 1st choice: 271s -- If we use the event origin as a data provider for any 271s -- set that originates on that very node, we are a direct 271s -- subscriber to that origin and listen there only. 271s if exists (select true from public.sl_set, public.sl_subscribe , public.sl_node p 271s where set_origin = v_row.origin 271s and sub_set = set_id 271s and sub_provider = v_row.origin 271s and sub_receiver = v_row.receiver 271s and sub_active 271s and p.no_active 271s and p.no_id=sub_provider 271s ) 271s then 271s delete from public.sl_listen 271s where li_origin = v_row.origin 271s and li_receiver = v_row.receiver; 271s insert into public.sl_listen (li_origin, li_provider, li_receiver) 271s values (v_row.origin, v_row.origin, v_row.receiver); 271s 271s -- 2nd choice: 271s -- If we are subscribed to any set originating on this 271s -- event origin, we want to listen on all data providers 271s -- we use for this origin. We are a cascaded subscriber 271s -- for sets from this node. 271s else 271s if exists (select true from public.sl_set, public.sl_subscribe, 271s public.sl_node provider 271s where set_origin = v_row.origin 271s and sub_set = set_id 271s and sub_provider=provider.no_id 271s and provider.no_failed = false 271s and sub_receiver = v_row.receiver 271s and sub_active) 271s then 271s delete from public.sl_listen 271s where li_origin = v_row.origin 271s and li_receiver = v_row.receiver; 271s insert into public.sl_listen (li_origin, li_provider, li_receiver) 271s select distinct set_origin, sub_provider, v_row.receiver 271s from public.sl_set, public.sl_subscribe 271s where set_origin = v_row.origin 271s and sub_set = set_id 271s and sub_receiver = v_row.receiver 271s and sub_active; 271s end if; 271s end if; 271s 271s if v_row.failed then 271s 271s --for every failed node we delete all sl_listen entries 271s --except via providers (listed in sl_subscribe) 271s --or failover candidates (sl_failover_targets) 271s --we do this to prevent a non-failover candidate 271s --that is more ahead of the failover candidate from 271s --sending events to the failover candidate that 271s --are 'too far ahead' 271s 271s --if the failed node is not an origin for any 271s --node then we don't delete all listen paths 271s --for events from it. Instead we leave 271s --the listen network alone. 271s 271s select count(*) into v_cnt from public.sl_subscribe sub, 271s public.sl_set s 271s where s.set_origin=v_row.origin and s.set_id=sub.sub_set; 271s if v_cnt > 0 then 271s delete from public.sl_listen where 271s li_origin=v_row.origin and 271s li_receiver=v_row.receiver 271s and li_provider not in 271s (select sub_provider from 271s public.sl_subscribe, 271s public.sl_set where 271s sub_set=set_id 271s and set_origin=v_row.origin); 271s end if; 271s end if; 271s -- insert into public.sl_listen 271s -- (li_origin,li_provider,li_receiver) 271s -- SELECT v_row.origin, pa_server 271s -- ,v_row.receiver 271s -- FROM public.sl_path where 271s -- pa_client=v_row.receiver 271s -- and (v_row.origin,pa_server,v_row.receiver) not in 271s -- (select li_origin,li_provider,li_receiver 271s -- from public.sl_listen); 271s -- end if; 271s end loop ; 271s 271s return null ; 271s end ; 271s $$ language 'plpgsql'; 271s CREATE FUNCTION 271s comment on function public.RebuildListenEntries() is 271s 'RebuildListenEntries() 271s 271s Invoked by various subscription and path modifying functions, this 271s rewrites the sl_listen entries, adding in all the ones required to 271s allow communications between nodes in the Slony-I cluster.'; 271s COMMENT 271s create or replace function public.generate_sync_event(p_interval interval) 271s returns int4 271s as $$ 271s declare 271s v_node_row record; 271s 271s BEGIN 271s select 1 into v_node_row from public.sl_event 271s where ev_type = 'SYNC' and ev_origin = public.getLocalNodeId('_main') 271s and ev_timestamp > now() - p_interval limit 1; 271s if not found then 271s -- If there has been no SYNC in the last interval, then push one 271s perform public.createEvent('_main', 'SYNC', NULL); 271s return 1; 271s else 271s return 0; 271s end if; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.generate_sync_event(p_interval interval) is 271s 'Generate a sync event if there has not been one in the requested interval, and this is a provider node.'; 271s COMMENT 271s drop function if exists public.updateRelname(int4, int4); 271s DROP FUNCTION 271s create or replace function public.updateRelname () 271s returns int4 271s as $$ 271s declare 271s v_no_id int4; 271s v_set_origin int4; 271s begin 271s -- ---- 271s -- Grab the central configuration lock 271s -- ---- 271s lock table public.sl_config_lock; 271s 271s update public.sl_table set 271s tab_relname = PGC.relname, tab_nspname = PGN.nspname 271s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 271s where public.sl_table.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid and 271s (tab_relname <> PGC.relname or tab_nspname <> PGN.nspname); 271s update public.sl_sequence set 271s seq_relname = PGC.relname, seq_nspname = PGN.nspname 271s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 271s where public.sl_sequence.seq_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid and 271s (seq_relname <> PGC.relname or seq_nspname <> PGN.nspname); 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.updateRelname() is 271s 'updateRelname()'; 271s COMMENT 271s drop function if exists public.updateReloid (int4, int4); 271s DROP FUNCTION 271s create or replace function public.updateReloid (p_set_id int4, p_only_on_node int4) 271s returns bigint 271s as $$ 271s declare 271s v_no_id int4; 271s v_set_origin int4; 271s prec record; 271s begin 271s -- ---- 271s -- Check that we either are the set origin or a current 271s -- subscriber of the set. 271s -- ---- 271s v_no_id := public.getLocalNodeId('_main'); 271s select set_origin into v_set_origin 271s from public.sl_set 271s where set_id = p_set_id 271s for update; 271s if not found then 271s raise exception 'Slony-I: set % not found', p_set_id; 271s end if; 271s if v_set_origin <> v_no_id 271s and not exists (select 1 from public.sl_subscribe 271s where sub_set = p_set_id 271s and sub_receiver = v_no_id) 271s then 271s return 0; 271s end if; 271s 271s -- ---- 271s -- If execution on only one node is requested, check that 271s -- we are that node. 271s -- ---- 271s if p_only_on_node > 0 and p_only_on_node <> v_no_id then 271s return 0; 271s end if; 271s 271s -- Update OIDs for tables to values pulled from non-table objects in pg_class 271s -- This ensures that we won't have collisions when repairing the oids 271s for prec in select tab_id from public.sl_table loop 271s update public.sl_table set tab_reloid = (select oid from pg_class pc where relkind <> 'r' and not exists (select 1 from public.sl_table t2 where t2.tab_reloid = pc.oid) limit 1) 271s where tab_id = prec.tab_id; 271s end loop; 271s 271s for prec in select tab_id, tab_relname, tab_nspname from public.sl_table loop 271s update public.sl_table set 271s tab_reloid = (select PGC.oid 271s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 271s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.tab_relname) 271s and PGC.relnamespace = PGN.oid 271s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.tab_nspname)) 271s where tab_id = prec.tab_id; 271s end loop; 271s 271s for prec in select seq_id from public.sl_sequence loop 271s update public.sl_sequence set seq_reloid = (select oid from pg_class pc where relkind <> 'S' and not exists (select 1 from public.sl_sequence t2 where t2.seq_reloid = pc.oid) limit 1) 271s where seq_id = prec.seq_id; 271s end loop; 271s 271s for prec in select seq_id, seq_relname, seq_nspname from public.sl_sequence loop 271s update public.sl_sequence set 271s seq_reloid = (select PGC.oid 271s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 271s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.seq_relname) 271s and PGC.relnamespace = PGN.oid 271s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.seq_nspname)) 271s where seq_id = prec.seq_id; 271s end loop; 271s 271s return 1; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.updateReloid(p_set_id int4, p_only_on_node int4) is 271s 'updateReloid(set_id, only_on_node) 271s 271s Updates the respective reloids in sl_table and sl_seqeunce based on 271s their respective FQN'; 271s COMMENT 271s create or replace function public.logswitch_start() 271s returns int4 as $$ 271s DECLARE 271s v_current_status int4; 271s BEGIN 271s -- ---- 271s -- Get the current log status. 271s -- ---- 271s select last_value into v_current_status from public.sl_log_status; 271s 271s -- ---- 271s -- status = 0: sl_log_1 active, sl_log_2 clean 271s -- Initiate a switch to sl_log_2. 271s -- ---- 271s if v_current_status = 0 then 271s perform "pg_catalog".setval('public.sl_log_status', 3); 271s perform public.registry_set_timestamp( 271s 'logswitch.laststart', now()); 271s raise notice 'Slony-I: Logswitch to sl_log_2 initiated'; 271s return 2; 271s end if; 271s 271s -- ---- 271s -- status = 1: sl_log_2 active, sl_log_1 clean 271s -- Initiate a switch to sl_log_1. 271s -- ---- 271s if v_current_status = 1 then 271s perform "pg_catalog".setval('public.sl_log_status', 2); 271s perform public.registry_set_timestamp( 271s 'logswitch.laststart', now()); 271s raise notice 'Slony-I: Logswitch to sl_log_1 initiated'; 271s return 1; 271s end if; 271s 271s raise exception 'Previous logswitch still in progress'; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.logswitch_start() is 271s 'logswitch_start() 271s 271s Initiate a log table switch if none is in progress'; 271s COMMENT 271s create or replace function public.logswitch_finish() 271s returns int4 as $$ 271s DECLARE 271s v_current_status int4; 271s v_dummy record; 271s v_origin int8; 271s v_seqno int8; 271s v_xmin bigint; 271s v_purgeable boolean; 271s BEGIN 271s -- ---- 271s -- Get the current log status. 271s -- ---- 271s select last_value into v_current_status from public.sl_log_status; 271s 271s -- ---- 271s -- status value 0 or 1 means that there is no log switch in progress 271s -- ---- 271s if v_current_status = 0 or v_current_status = 1 then 271s return 0; 271s end if; 271s 271s -- ---- 271s -- status = 2: sl_log_1 active, cleanup sl_log_2 271s -- ---- 271s if v_current_status = 2 then 271s v_purgeable := 'true'; 271s 271s -- ---- 271s -- Attempt to lock sl_log_2 in order to make sure there are no other transactions 271s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 271s -- blocking writers to sl_log_2 while it is waiting for a lock. It also prevents it 271s -- immediately truncating log data generated inside the transaction which was active 271s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 271s -- transaction is committed. 271s -- ---- 271s begin 271s lock table public.sl_log_2 in access exclusive mode nowait; 271s exception when lock_not_available then 271s raise notice 'Slony-I: could not lock sl_log_2 - sl_log_2 not truncated'; 271s return -1; 271s end; 271s 271s -- ---- 271s -- The cleanup thread calls us after it did the delete and 271s -- vacuum of both log tables. If sl_log_2 is empty now, we 271s -- can truncate it and the log switch is done. 271s -- ---- 271s for v_origin, v_seqno, v_xmin in 271s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 271s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 271s loop 271s if exists (select 1 from public.sl_log_2 where log_origin = v_origin and log_txid >= v_xmin limit 1) then 271s v_purgeable := 'false'; 271s end if; 271s end loop; 271s if not v_purgeable then 271s -- ---- 271s -- Found a row ... log switch is still in progress. 271s -- ---- 271s raise notice 'Slony-I: log switch to sl_log_1 still in progress - sl_log_2 not truncated'; 271s return -1; 271s end if; 271s 271s raise notice 'Slony-I: log switch to sl_log_1 complete - truncate sl_log_2'; 271s truncate public.sl_log_2; 271s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_2' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 271s execute 'alter table public.sl_log_2 set without oids;'; 271s end if; 271s perform "pg_catalog".setval('public.sl_log_status', 0); 271s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 271s perform public.addPartialLogIndices(); 271s 271s return 1; 271s end if; 271s 271s -- ---- 271s -- status = 3: sl_log_2 active, cleanup sl_log_1 271s -- ---- 271s if v_current_status = 3 then 271s v_purgeable := 'true'; 271s 271s -- ---- 271s -- Attempt to lock sl_log_1 in order to make sure there are no other transactions 271s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 271s -- blocking writes to sl_log_1 while it is waiting for a lock. It also prevents it 271s -- immediately truncating log data generated inside the transaction which was active 271s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 271s -- transaction is committed. 271s -- ---- 271s begin 271s lock table public.sl_log_1 in access exclusive mode nowait; 271s exception when lock_not_available then 271s raise notice 'Slony-I: could not lock sl_log_1 - sl_log_1 not truncated'; 271s return -1; 271s end; 271s 271s -- ---- 271s -- The cleanup thread calls us after it did the delete and 271s -- vacuum of both log tables. If sl_log_2 is empty now, we 271s -- can truncate it and the log switch is done. 271s -- ---- 271s for v_origin, v_seqno, v_xmin in 271s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 271s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 271s loop 271s if (exists (select 1 from public.sl_log_1 where log_origin = v_origin and log_txid >= v_xmin limit 1)) then 271s v_purgeable := 'false'; 271s end if; 271s end loop; 271s if not v_purgeable then 271s -- ---- 271s -- Found a row ... log switch is still in progress. 271s -- ---- 271s raise notice 'Slony-I: log switch to sl_log_2 still in progress - sl_log_1 not truncated'; 271s return -1; 271s end if; 271s 271s raise notice 'Slony-I: log switch to sl_log_2 complete - truncate sl_log_1'; 271s truncate public.sl_log_1; 271s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_1' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 271s execute 'alter table public.sl_log_1 set without oids;'; 271s end if; 271s perform "pg_catalog".setval('public.sl_log_status', 1); 271s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 271s perform public.addPartialLogIndices(); 271s return 2; 271s end if; 271s END; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.logswitch_finish() is 271s 'logswitch_finish() 271s 271s Attempt to finalize a log table switch in progress 271s return values: 271s -1 if switch in progress, but not complete 271s 0 if no switch in progress 271s 1 if performed truncate on sl_log_2 271s 2 if performed truncate on sl_log_1 271s '; 271s COMMENT 271s create or replace function public.addPartialLogIndices () returns integer as $$ 271s DECLARE 271s v_current_status int4; 271s v_log int4; 271s v_dummy record; 271s v_dummy2 record; 271s idef text; 271s v_count int4; 271s v_iname text; 271s v_ilen int4; 271s v_maxlen int4; 271s BEGIN 271s v_count := 0; 271s select last_value into v_current_status from public.sl_log_status; 271s 271s -- If status is 2 or 3 --> in process of cleanup --> unsafe to create indices 271s if v_current_status in (2, 3) then 271s return 0; 271s end if; 271s 271s if v_current_status = 0 then -- Which log should get indices? 271s v_log := 2; 271s else 271s v_log := 1; 271s end if; 271s -- PartInd_test_db_sl_log_2-node-1 271s -- Add missing indices... 271s for v_dummy in select distinct set_origin from public.sl_set loop 271s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' 271s || v_dummy.set_origin::text; 271s -- raise notice 'Consider adding partial index % on sl_log_%', v_iname, v_log; 271s -- raise notice 'schema: [_main] tablename:[sl_log_%]', v_log; 271s select * into v_dummy2 from pg_catalog.pg_indexes where tablename = 'sl_log_' || v_log::text and indexname = v_iname; 271s if not found then 271s -- raise notice 'index was not found - add it!'; 271s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' || v_dummy.set_origin::text; 271s v_ilen := pg_catalog.length(v_iname); 271s v_maxlen := pg_catalog.current_setting('max_identifier_length'::text)::int4; 271s if v_ilen > v_maxlen then 271s raise exception 'Length of proposed index name [%] > max_identifier_length [%] - cluster name probably too long', v_ilen, v_maxlen; 271s end if; 271s 271s idef := 'create index "' || v_iname || 271s '" on public.sl_log_' || v_log::text || ' USING btree(log_txid) where (log_origin = ' || v_dummy.set_origin::text || ');'; 271s execute idef; 271s v_count := v_count + 1; 271s else 271s -- raise notice 'Index % already present - skipping', v_iname; 271s end if; 271s end loop; 271s 271s -- Remove unneeded indices... 271s for v_dummy in select indexname from pg_catalog.pg_indexes i where i.tablename = 'sl_log_' || v_log::text and 271s i.indexname like ('PartInd_main_sl_log_' || v_log::text || '-node-%') and 271s not exists (select 1 from public.sl_set where 271s i.indexname = 'PartInd_main_sl_log_' || v_log::text || '-node-' || set_origin::text) 271s loop 271s -- raise notice 'Dropping obsolete index %d', v_dummy.indexname; 271s idef := 'drop index public."' || v_dummy.indexname || '";'; 271s execute idef; 271s v_count := v_count - 1; 271s end loop; 271s return v_count; 271s END 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.addPartialLogIndices () is 271s 'Add partial indexes, if possible, to the unused sl_log_? table for 271s all origin nodes, and drop any that are no longer needed. 271s 271s This function presently gets run any time set origins are manipulated 271s (FAILOVER, STORE SET, MOVE SET, DROP SET), as well as each time the 271s system switches between sl_log_1 and sl_log_2.'; 271s COMMENT 271s create or replace function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 271s returns bool as $$ 271s BEGIN 271s return exists ( 271s select 1 from "information_schema".columns 271s where table_schema = p_namespace 271s and table_name = p_table 271s and column_name = p_field 271s ); 271s END;$$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 271s is 'Check if a table has a specific attribute'; 271s COMMENT 271s create or replace function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 271s returns bool as $$ 271s DECLARE 271s v_row record; 271s v_query text; 271s BEGIN 271s if not public.check_table_field_exists(p_namespace, p_table, p_field) then 271s raise notice 'Upgrade table %.% - add field %', p_namespace, p_table, p_field; 271s v_query := 'alter table ' || p_namespace || '.' || p_table || ' add column '; 271s v_query := v_query || p_field || ' ' || p_type || ';'; 271s execute v_query; 271s return 't'; 271s else 271s return 'f'; 271s end if; 271s END;$$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 271s is 'Add a column of a given type to a table if it is missing'; 271s COMMENT 271s create or replace function public.upgradeSchema(p_old text) 271s returns text as $$ 271s declare 271s v_tab_row record; 271s v_query text; 271s v_keepstatus text; 271s begin 271s -- If old version is pre-2.0, then we require a special upgrade process 271s if p_old like '1.%' then 271s raise exception 'Upgrading to Slony-I 2.x requires running slony_upgrade_20'; 271s end if; 271s 271s perform public.upgradeSchemaAddTruncateTriggers(); 271s 271s -- Change all Slony-I-defined columns that are "timestamp without time zone" to "timestamp *WITH* time zone" 271s if exists (select 1 from information_schema.columns c 271s where table_schema = '_main' and data_type = 'timestamp without time zone' 271s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 271s and (c.table_name, c.column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp'))) 271s then 271s 271s -- Preserve sl_status 271s select pg_get_viewdef('public.sl_status') into v_keepstatus; 271s execute 'drop view sl_status'; 271s for v_tab_row in select table_schema, table_name, column_name from information_schema.columns c 271s where table_schema = '_main' and data_type = 'timestamp without time zone' 271s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 271s and (table_name, column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp')) 271s loop 271s raise notice 'Changing Slony-I column [%.%] to timestamp WITH time zone', v_tab_row.table_name, v_tab_row.column_name; 271s v_query := 'alter table ' || public.slon_quote_brute(v_tab_row.table_schema) || 271s '.' || v_tab_row.table_name || ' alter column ' || v_tab_row.column_name || 271s ' type timestamp with time zone;'; 271s execute v_query; 271s end loop; 271s -- restore sl_status 271s execute 'create view sl_status as ' || v_keepstatus; 271s end if; 271s 271s if not exists (select 1 from information_schema.tables where table_schema = '_main' and table_name = 'sl_components') then 271s v_query := ' 271s create table public.sl_components ( 271s co_actor text not null primary key, 271s co_pid integer not null, 271s co_node integer not null, 271s co_connection_pid integer not null, 271s co_activity text, 271s co_starttime timestamptz not null, 271s co_event bigint, 271s co_eventtype text 271s ) without oids; 271s '; 271s execute v_query; 271s end if; 271s 271s 271s 271s 271s 271s if not exists (select 1 from information_schema.tables t where table_schema = '_main' and table_name = 'sl_event_lock') then 271s v_query := 'create table public.sl_event_lock (dummy integer);'; 271s execute v_query; 271s end if; 271s 271s if not exists (select 1 from information_schema.tables t 271s where table_schema = '_main' 271s and table_name = 'sl_apply_stats') then 271s v_query := ' 271s create table public.sl_apply_stats ( 271s as_origin int4, 271s as_num_insert int8, 271s as_num_update int8, 271s as_num_delete int8, 271s as_num_truncate int8, 271s as_num_script int8, 271s as_num_total int8, 271s as_duration interval, 271s as_apply_first timestamptz, 271s as_apply_last timestamptz, 271s as_cache_prepare int8, 271s as_cache_hit int8, 271s as_cache_evict int8, 271s as_cache_prepare_max int8 271s ) WITHOUT OIDS;'; 271s execute v_query; 271s end if; 271s 271s -- 271s -- On the upgrade to 2.2, we change the layout of sl_log_N by 271s -- adding columns log_tablenspname, log_tablerelname, and 271s -- log_cmdupdncols as well as changing log_cmddata into 271s -- log_cmdargs, which is a text array. 271s -- 271s if not public.check_table_field_exists('_main', 'sl_log_1', 'log_cmdargs') then 271s -- 271s -- Check that the cluster is completely caught up 271s -- 271s if public.check_unconfirmed_log() then 271s raise EXCEPTION 'cannot upgrade to new sl_log_N format due to existing unreplicated data'; 271s end if; 271s 271s -- 271s -- Drop tables sl_log_1 and sl_log_2 271s -- 271s drop table public.sl_log_1; 271s drop table public.sl_log_2; 271s 271s -- 271s -- Create the new sl_log_1 271s -- 271s create table public.sl_log_1 ( 271s log_origin int4, 271s log_txid bigint, 271s log_tableid int4, 271s log_actionseq int8, 271s log_tablenspname text, 271s log_tablerelname text, 271s log_cmdtype "char", 271s log_cmdupdncols int4, 271s log_cmdargs text[] 271s ) without oids; 271s create index sl_log_1_idx1 on public.sl_log_1 271s (log_origin, log_txid, log_actionseq); 271s 271s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 271s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 271s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 271s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 271s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 271s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 271s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 271s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 271s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 271s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 271s 271s -- 271s -- Create the new sl_log_2 271s -- 271s create table public.sl_log_2 ( 271s log_origin int4, 271s log_txid bigint, 271s log_tableid int4, 271s log_actionseq int8, 271s log_tablenspname text, 271s log_tablerelname text, 271s log_cmdtype "char", 271s log_cmdupdncols int4, 271s log_cmdargs text[] 271s ) without oids; 271s create index sl_log_2_idx1 on public.sl_log_2 271s (log_origin, log_txid, log_actionseq); 271s 271s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 271s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 271s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 271s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 271s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 271s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 271s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 271s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 271s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 271s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 271s 271s create table public.sl_log_script ( 271s log_origin int4, 271s log_txid bigint, 271s log_actionseq int8, 271s log_cmdtype "char", 271s log_cmdargs text[] 271s ) WITHOUT OIDS; 271s create index sl_log_script_idx1 on public.sl_log_script 271s (log_origin, log_txid, log_actionseq); 271s 271s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 271s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 271s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 271s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 271s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 271s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 271s 271s -- 271s -- Put the log apply triggers back onto sl_log_1/2 271s -- 271s create trigger apply_trigger 271s before INSERT on public.sl_log_1 271s for each row execute procedure public.logApply('_main'); 271s alter table public.sl_log_1 271s enable replica trigger apply_trigger; 271s create trigger apply_trigger 271s before INSERT on public.sl_log_2 271s for each row execute procedure public.logApply('_main'); 271s alter table public.sl_log_2 271s enable replica trigger apply_trigger; 271s end if; 271s if not exists (select 1 from information_schema.routines where routine_schema = '_main' and routine_name = 'string_agg') then 271s CREATE AGGREGATE public.string_agg(text) ( 271s SFUNC=public.agg_text_sum, 271s STYPE=text, 271s INITCOND='' 271s ); 271s end if; 271s if not exists (select 1 from information_schema.views where table_schema='_main' and table_name='sl_failover_targets') then 271s create view public.sl_failover_targets as 271s select set_id, 271s set_origin as set_origin, 271s sub1.sub_receiver as backup_id 271s 271s FROM 271s public.sl_subscribe sub1 271s ,public.sl_set set1 271s where 271s sub1.sub_set=set_id 271s and sub1.sub_forward=true 271s --exclude candidates where the set_origin 271s --has a path a node but the failover 271s --candidate has no path to that node 271s and sub1.sub_receiver not in 271s (select p1.pa_client from 271s public.sl_path p1 271s left outer join public.sl_path p2 on 271s (p2.pa_client=p1.pa_client 271s and p2.pa_server=sub1.sub_receiver) 271s where p2.pa_client is null 271s and p1.pa_server=set_origin 271s and p1.pa_client<>sub1.sub_receiver 271s ) 271s and sub1.sub_provider=set_origin 271s --exclude any subscribers that are not 271s --direct subscribers of all sets on the 271s --origin 271s and sub1.sub_receiver not in 271s (select direct_recv.sub_receiver 271s from 271s 271s (--all direct receivers of the first set 271s select subs2.sub_receiver 271s from public.sl_subscribe subs2 271s where subs2.sub_provider=set1.set_origin 271s and subs2.sub_set=set1.set_id) as 271s direct_recv 271s inner join 271s (--all other sets from the origin 271s select set_id from public.sl_set set2 271s where set2.set_origin=set1.set_origin 271s and set2.set_id<>sub1.sub_set) 271s as othersets on(true) 271s left outer join public.sl_subscribe subs3 271s on(subs3.sub_set=othersets.set_id 271s and subs3.sub_forward=true 271s and subs3.sub_provider=set1.set_origin 271s and direct_recv.sub_receiver=subs3.sub_receiver) 271s where subs3.sub_receiver is null 271s ); 271s end if; 271s 271s if not public.check_table_field_exists('_main', 'sl_node', 'no_failed') then 271s alter table public.sl_node add column no_failed bool; 271s update public.sl_node set no_failed=false; 271s end if; 271s return p_old; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s create or replace function public.check_unconfirmed_log () 271s returns bool as $$ 271s declare 271s v_rc bool = false; 271s v_error bool = false; 271s v_origin integer; 271s v_allconf bigint; 271s v_allsnap txid_snapshot; 271s v_count bigint; 271s begin 271s -- 271s -- Loop over all nodes that are the origin of at least one set 271s -- 271s for v_origin in select distinct set_origin as no_id 271s from public.sl_set loop 271s -- 271s -- Per origin determine which is the highest event seqno 271s -- that is confirmed by all subscribers to any of the 271s -- origins sets. 271s -- 271s select into v_allconf min(max_seqno) from ( 271s select con_received, max(con_seqno) as max_seqno 271s from public.sl_confirm 271s where con_origin = v_origin 271s and con_received in ( 271s select distinct sub_receiver 271s from public.sl_set as SET, 271s public.sl_subscribe as SUB 271s where SET.set_id = SUB.sub_set 271s and SET.set_origin = v_origin 271s ) 271s group by con_received 271s ) as maxconfirmed; 271s if not found then 271s raise NOTICE 'check_unconfirmed_log(): cannot determine highest ev_seqno for node % confirmed by all subscribers', v_origin; 271s v_error = true; 271s continue; 271s end if; 271s 271s -- 271s -- Get the txid snapshot that corresponds with that event 271s -- 271s select into v_allsnap ev_snapshot 271s from public.sl_event 271s where ev_origin = v_origin 271s and ev_seqno = v_allconf; 271s if not found then 271s raise NOTICE 'check_unconfirmed_log(): cannot find event %,% in sl_event', v_origin, v_allconf; 271s v_error = true; 271s continue; 271s end if; 271s 271s -- 271s -- Count the number of log rows that appeard after that event. 271s -- 271s select into v_count count(*) from ( 271s select 1 from public.sl_log_1 271s where log_origin = v_origin 271s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 271s union all 271s select 1 from public.sl_log_1 271s where log_origin = v_origin 271s and log_txid in ( 271s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 271s ) 271s union all 271s select 1 from public.sl_log_2 271s where log_origin = v_origin 271s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 271s union all 271s select 1 from public.sl_log_2 271s where log_origin = v_origin 271s and log_txid in ( 271s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 271s ) 271s ) as cnt; 271s 271s if v_count > 0 then 271s raise NOTICE 'check_unconfirmed_log(): origin % has % log rows that have not propagated to all subscribers yet', v_origin, v_count; 271s v_rc = true; 271s end if; 271s end loop; 271s 271s if v_error then 271s raise EXCEPTION 'check_unconfirmed_log(): aborting due to previous inconsistency'; 271s end if; 271s 271s return v_rc; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s set search_path to public 271s ; 271s SET 271s comment on function public.upgradeSchema(p_old text) is 271s 'Called during "update functions" by slonik to perform schema changes'; 271s COMMENT 271s create or replace view public.sl_status as select 271s E.ev_origin as st_origin, 271s C.con_received as st_received, 271s E.ev_seqno as st_last_event, 271s E.ev_timestamp as st_last_event_ts, 271s C.con_seqno as st_last_received, 271s C.con_timestamp as st_last_received_ts, 271s CE.ev_timestamp as st_last_received_event_ts, 271s E.ev_seqno - C.con_seqno as st_lag_num_events, 271s current_timestamp - CE.ev_timestamp as st_lag_time 271s from public.sl_event E, public.sl_confirm C, 271s public.sl_event CE 271s where E.ev_origin = C.con_origin 271s and CE.ev_origin = E.ev_origin 271s and CE.ev_seqno = C.con_seqno 271s and (E.ev_origin, E.ev_seqno) in 271s (select ev_origin, max(ev_seqno) 271s from public.sl_event 271s where ev_origin = public.getLocalNodeId('_main') 271s group by 1 271s ) 271s and (C.con_origin, C.con_received, C.con_seqno) in 271s (select con_origin, con_received, max(con_seqno) 271s from public.sl_confirm 271s where con_origin = public.getLocalNodeId('_main') 271s group by 1, 2 271s ); 271s CREATE VIEW 271s comment on view public.sl_status is 'View showing how far behind remote nodes are.'; 271s COMMENT 271s create or replace function public.copyFields(p_tab_id integer) 271s returns text 271s as $$ 271s declare 271s result text; 271s prefix text; 271s prec record; 271s begin 271s result := ''; 271s prefix := '('; -- Initially, prefix is the opening paren 271s 271s for prec in select public.slon_quote_input(a.attname) as column from public.sl_table t, pg_catalog.pg_attribute a where t.tab_id = p_tab_id and t.tab_reloid = a.attrelid and a.attnum > 0 and a.attisdropped = false order by attnum 271s loop 271s result := result || prefix || prec.column; 271s prefix := ','; -- Subsequently, prepend columns with commas 271s end loop; 271s result := result || ')'; 271s return result; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.copyFields(p_tab_id integer) is 271s 'Return a string consisting of what should be appended to a COPY statement 271s to specify fields for the passed-in tab_id. 271s 271s In PG versions > 7.3, this looks like (field1,field2,...fieldn)'; 271s COMMENT 271s create or replace function public.prepareTableForCopy(p_tab_id int4) 271s returns int4 271s as $$ 271s declare 271s v_tab_oid oid; 271s v_tab_fqname text; 271s begin 271s -- ---- 271s -- Get the OID and fully qualified name for the table 271s -- --- 271s select PGC.oid, 271s public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) as tab_fqname 271s into v_tab_oid, v_tab_fqname 271s from public.sl_table T, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where T.tab_id = p_tab_id 271s and T.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid; 271s if not found then 271s raise exception 'Table with ID % not found in sl_table', p_tab_id; 271s end if; 271s 271s -- ---- 271s -- Try using truncate to empty the table and fallback to 271s -- delete on error. 271s -- ---- 271s perform public.TruncateOnlyTable(v_tab_fqname); 271s raise notice 'truncate of % succeeded', v_tab_fqname; 271s 271s -- suppress index activity 271s perform public.disable_indexes_on_table(v_tab_oid); 271s 271s return 1; 271s exception when others then 271s raise notice 'truncate of % failed - doing delete', v_tab_fqname; 271s perform public.disable_indexes_on_table(v_tab_oid); 271s execute 'delete from only ' || public.slon_quote_input(v_tab_fqname); 271s return 0; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.prepareTableForCopy(p_tab_id int4) is 271s 'Delete all data and suppress index maintenance'; 271s COMMENT 271s create or replace function public.finishTableAfterCopy(p_tab_id int4) 271s returns int4 271s as $$ 271s declare 271s v_tab_oid oid; 271s v_tab_fqname text; 271s begin 271s -- ---- 271s -- Get the tables OID and fully qualified name 271s -- --- 271s select PGC.oid, 271s public.slon_quote_brute(PGN.nspname) || '.' || 271s public.slon_quote_brute(PGC.relname) as tab_fqname 271s into v_tab_oid, v_tab_fqname 271s from public.sl_table T, 271s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 271s where T.tab_id = p_tab_id 271s and T.tab_reloid = PGC.oid 271s and PGC.relnamespace = PGN.oid; 271s if not found then 271s raise exception 'Table with ID % not found in sl_table', p_tab_id; 271s end if; 271s 271s -- ---- 271s -- Reenable indexes and reindex the table. 271s -- ---- 271s perform public.enable_indexes_on_table(v_tab_oid); 271s execute 'reindex table ' || public.slon_quote_input(v_tab_fqname); 271s 271s return 1; 271s end; 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.finishTableAfterCopy(p_tab_id int4) is 271s 'Reenable index maintenance and reindex the table'; 271s COMMENT 271s create or replace function public.setup_vactables_type () returns integer as $$ 271s begin 271s if not exists (select 1 from pg_catalog.pg_type t, pg_catalog.pg_namespace n 271s where n.nspname = '_main' and t.typnamespace = n.oid and 271s t.typname = 'vactables') then 271s execute 'create type public.vactables as (nspname name, relname name);'; 271s end if; 271s return 1; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.setup_vactables_type () is 271s 'Function to be run as part of loading slony1_funcs.sql that creates the vactables type if it is missing'; 271s COMMENT 271s select public.setup_vactables_type(); 271s setup_vactables_type 271s ---------------------- 271s 1 271s (1 row) 271s 271s drop function public.setup_vactables_type (); 271s DROP FUNCTION 271s create or replace function public.TablesToVacuum () returns setof public.vactables as $$ 271s declare 271s prec public.vactables%rowtype; 271s begin 271s prec.nspname := '_main'; 271s prec.relname := 'sl_event'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := '_main'; 271s prec.relname := 'sl_confirm'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := '_main'; 271s prec.relname := 'sl_setsync'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := '_main'; 271s prec.relname := 'sl_seqlog'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := '_main'; 271s prec.relname := 'sl_archive_counter'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := '_main'; 271s prec.relname := 'sl_components'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := '_main'; 271s prec.relname := 'sl_log_script'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := 'pg_catalog'; 271s prec.relname := 'pg_listener'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s prec.nspname := 'pg_catalog'; 271s prec.relname := 'pg_statistic'; 271s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 271s return next prec; 271s end if; 271s 271s return; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.TablesToVacuum () is 271s 'Return a list of tables that require frequent vacuuming. The 271s function is used so that the list is not hardcoded into C code.'; 271s COMMENT 271s create or replace function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 271s declare 271s 271s prec record; 271s v_origin int4; 271s v_isorigin boolean; 271s v_fqname text; 271s v_query text; 271s v_rows integer; 271s v_idxname text; 271s 271s begin 271s -- Need to validate that the set exists; the set will tell us if this is the origin 271s select set_origin into v_origin from public.sl_set where set_id = p_set_id; 271s if not found then 271s raise exception 'add_empty_table_to_replication: set % not found!', p_set_id; 271s end if; 271s 271s -- Need to be aware of whether or not this node is origin for the set 271s v_isorigin := ( v_origin = public.getLocalNodeId('_main') ); 271s 271s v_fqname := '"' || p_nspname || '"."' || p_tabname || '"'; 271s -- Take out a lock on the table 271s v_query := 'lock ' || v_fqname || ';'; 271s execute v_query; 271s 271s if v_isorigin then 271s -- On the origin, verify that the table is empty, failing if it has any tuples 271s v_query := 'select 1 as tuple from ' || v_fqname || ' limit 1;'; 271s execute v_query into prec; 271s GET DIAGNOSTICS v_rows = ROW_COUNT; 271s if v_rows = 0 then 271s raise notice 'add_empty_table_to_replication: table % empty on origin - OK', v_fqname; 271s else 271s raise exception 'add_empty_table_to_replication: table % contained tuples on origin node %', v_fqname, v_origin; 271s end if; 271s else 271s -- On other nodes, TRUNCATE the table 271s v_query := 'truncate ' || v_fqname || ';'; 271s execute v_query; 271s end if; 271s -- If p_idxname is NULL, then look up the PK index, and RAISE EXCEPTION if one does not exist 271s if p_idxname is NULL then 271s select c2.relname into prec from pg_catalog.pg_index i, pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_namespace n where i.indrelid = c1.oid and i.indexrelid = c2.oid and c1.relname = p_tabname and i.indisprimary and n.nspname = p_nspname and n.oid = c1.relnamespace; 271s if not found then 271s raise exception 'add_empty_table_to_replication: table % has no primary key and no candidate specified!', v_fqname; 271s else 271s v_idxname := prec.relname; 271s end if; 271s else 271s v_idxname := p_idxname; 271s end if; 271s return public.setAddTable_int(p_set_id, p_tab_id, v_fqname, v_idxname, p_comment); 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 271s 'Verify that a table is empty, and add it to replication. 271s tab_idxname is optional - if NULL, then we use the primary key. 271s 271s Note that this function is to be run within an EXECUTE SCRIPT script, 271s so it runs at the right place in the transaction stream on all 271s nodes.'; 271s COMMENT 271s create or replace function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 271s declare 271s prec record; 271s prec2 record; 271s v_set_id int4; 271s 271s begin 271s -- Look up the parent table; fail if it does not exist 271s select c1.oid into prec from pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_inherits i, pg_catalog.pg_namespace n where c1.oid = i.inhparent and c2.oid = i.inhrelid and n.oid = c2.relnamespace and n.nspname = p_nspname and c2.relname = p_tabname; 271s if not found then 271s raise exception 'replicate_partition: No parent table found for %.%!', p_nspname, p_tabname; 271s end if; 271s 271s -- The parent table tells us what replication set to use 271s select tab_set into prec2 from public.sl_table where tab_reloid = prec.oid; 271s if not found then 271s raise exception 'replicate_partition: Parent table % for new partition %.% is not replicated!', prec.oid, p_nspname, p_tabname; 271s end if; 271s 271s v_set_id := prec2.tab_set; 271s 271s -- Now, we have all the parameters necessary to run add_empty_table_to_replication... 271s return public.add_empty_table_to_replication(v_set_id, p_tab_id, p_nspname, p_tabname, p_idxname, p_comment); 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 271s 'Add a partition table to replication. 271s tab_idxname is optional - if NULL, then we use the primary key. 271s This function looks up replication configuration via the parent table. 271s 271s Note that this function is to be run within an EXECUTE SCRIPT script, 271s so it runs at the right place in the transaction stream on all 271s nodes.'; 271s COMMENT 271s create or replace function public.disable_indexes_on_table (i_oid oid) 271s returns integer as $$ 271s begin 271s -- Setting pg_class.relhasindex to false will cause copy not to 271s -- maintain any indexes. At the end of the copy we will reenable 271s -- them and reindex the table. This bulk creating of indexes is 271s -- faster. 271s 271s update pg_catalog.pg_class set relhasindex ='f' where oid = i_oid; 271s return 1; 271s end $$ 271s language plpgsql; 271s CREATE FUNCTION 271s comment on function public.disable_indexes_on_table(i_oid oid) is 271s 'disable indexes on the specified table. 271s Used during subscription process to suppress indexes, which allows 271s COPY to go much faster. 271s 271s This may be set as a SECURITY DEFINER in order to eliminate the need 271s for superuser access by Slony-I. 271s '; 271s COMMENT 271s create or replace function public.enable_indexes_on_table (i_oid oid) 271s returns integer as $$ 271s begin 271s update pg_catalog.pg_class set relhasindex ='t' where oid = i_oid; 271s return 1; 271s end $$ 271s language plpgsql 271s security definer; 271s CREATE FUNCTION 271s comment on function public.enable_indexes_on_table(i_oid oid) is 271s 're-enable indexes on the specified table. 271s 271s This may be set as a SECURITY DEFINER in order to eliminate the need 271s for superuser access by Slony-I. 271s '; 271s COMMENT 271s drop function if exists public.reshapeSubscription(int4,int4,int4); 271s DROP FUNCTION 271s create or replace function public.reshapeSubscription (p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) returns int4 as $$ 271s begin 271s update public.sl_subscribe 271s set sub_provider=p_sub_provider 271s from public.sl_set 271s WHERE sub_set=sl_set.set_id 271s and sl_set.set_origin=p_sub_origin and sub_receiver=p_sub_receiver; 271s if found then 271s perform public.RebuildListenEntries(); 271s notify "_main_Restart"; 271s end if; 271s return 0; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.reshapeSubscription(p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) is 271s 'Run on a receiver/subscriber node when the provider for that 271s subscription is being changed. Slonik will invoke this method 271s before the SUBSCRIBE_SET event propogates to the receiver 271s so listen paths can be updated.'; 271s COMMENT 271s create or replace function public.slon_node_health_check() returns boolean as $$ 271s declare 271s prec record; 271s all_ok boolean; 271s begin 271s all_ok := 't'::boolean; 271s -- validate that all tables in sl_table have: 271s -- sl_table agreeing with pg_class 271s for prec in select tab_id, tab_relname, tab_nspname from 271s public.sl_table t where not exists (select 1 from pg_catalog.pg_class c, pg_catalog.pg_namespace n 271s where c.oid = t.tab_reloid and c.relname = t.tab_relname and c.relnamespace = n.oid and n.nspname = t.tab_nspname) loop 271s all_ok := 'f'::boolean; 271s raise warning 'table [id,nsp,name]=[%,%,%] - sl_table does not match pg_class/pg_namespace', prec.tab_id, prec.tab_relname, prec.tab_nspname; 271s end loop; 271s if not all_ok then 271s raise warning 'Mismatch found between sl_table and pg_class. Slonik command REPAIR CONFIG may be useful to rectify this.'; 271s end if; 271s return all_ok; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.slon_node_health_check() is 'called when slon starts up to validate that there are not problems with node configuration. Returns t if all is OK, f if there is a problem.'; 271s COMMENT 271s create or replace function public.log_truncate () returns trigger as 271s $$ 271s declare 271s r_role text; 271s c_nspname text; 271s c_relname text; 271s c_log integer; 271s c_node integer; 271s c_tabid integer; 271s begin 271s -- Ignore this call if session_replication_role = 'local' 271s select into r_role setting 271s from pg_catalog.pg_settings where name = 'session_replication_role'; 271s if r_role = 'local' then 271s return NULL; 271s end if; 271s 271s c_tabid := tg_argv[0]; 271s c_node := public.getLocalNodeId('_main'); 271s select tab_nspname, tab_relname into c_nspname, c_relname 271s from public.sl_table where tab_id = c_tabid; 271s select last_value into c_log from public.sl_log_status; 271s if c_log in (0, 2) then 271s insert into public.sl_log_1 ( 271s log_origin, log_txid, log_tableid, 271s log_actionseq, log_tablenspname, 271s log_tablerelname, log_cmdtype, 271s log_cmdupdncols, log_cmdargs 271s ) values ( 271s c_node, pg_catalog.txid_current(), c_tabid, 271s nextval('public.sl_action_seq'), c_nspname, 271s c_relname, 'T', 0, '{}'::text[]); 271s else -- (1, 3) 271s insert into public.sl_log_2 ( 271s log_origin, log_txid, log_tableid, 271s log_actionseq, log_tablenspname, 271s log_tablerelname, log_cmdtype, 271s log_cmdupdncols, log_cmdargs 271s ) values ( 271s c_node, pg_catalog.txid_current(), c_tabid, 271s nextval('public.sl_action_seq'), c_nspname, 271s c_relname, 'T', 0, '{}'::text[]); 271s end if; 271s return NULL; 271s end 271s $$ language plpgsql 271s security definer; 271s CREATE FUNCTION 271s comment on function public.log_truncate () 271s is 'trigger function run when a replicated table receives a TRUNCATE request'; 271s COMMENT 271s create or replace function public.deny_truncate () returns trigger as 271s $$ 271s declare 271s r_role text; 271s begin 271s -- Ignore this call if session_replication_role = 'local' 271s select into r_role setting 271s from pg_catalog.pg_settings where name = 'session_replication_role'; 271s if r_role = 'local' then 271s return NULL; 271s end if; 271s 271s raise exception 'truncation of replicated table forbidden on subscriber node'; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.deny_truncate () 271s is 'trigger function run when a replicated table receives a TRUNCATE request'; 271s COMMENT 271s create or replace function public.store_application_name (i_name text) returns text as $$ 271s declare 271s p_command text; 271s begin 271s if exists (select 1 from pg_catalog.pg_settings where name = 'application_name') then 271s p_command := 'set application_name to '''|| i_name || ''';'; 271s execute p_command; 271s return i_name; 271s end if; 271s return NULL::text; 271s end $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.store_application_name (i_name text) is 271s 'Set application_name GUC, if possible. Returns NULL if it fails to work.'; 271s COMMENT 271s create or replace function public.is_node_reachable(origin_node_id integer, 271s receiver_node_id integer) returns boolean as $$ 271s declare 271s listen_row record; 271s reachable boolean; 271s begin 271s reachable:=false; 271s select * into listen_row from public.sl_listen where 271s li_origin=origin_node_id and li_receiver=receiver_node_id; 271s if found then 271s reachable:=true; 271s end if; 271s return reachable; 271s end $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.is_node_reachable(origin_node_id integer, receiver_node_id integer) 271s is 'Is the receiver node reachable from the origin, via any of the listen paths?'; 271s COMMENT 271s create or replace function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) returns integer as $$ 271s begin 271s -- Trim out old state for this component 271s if not exists (select 1 from public.sl_components where co_actor = i_actor) then 271s insert into public.sl_components 271s (co_actor, co_pid, co_node, co_connection_pid, co_activity, co_starttime, co_event, co_eventtype) 271s values 271s (i_actor, i_pid, i_node, i_conn_pid, i_activity, i_starttime, i_event, i_eventtype); 271s else 271s update public.sl_components 271s set 271s co_connection_pid = i_conn_pid, co_activity = i_activity, co_starttime = i_starttime, co_event = i_event, 271s co_eventtype = i_eventtype 271s where co_actor = i_actor 271s and co_starttime < i_starttime; 271s end if; 271s return 1; 271s end $$ 271s language plpgsql; 271s CREATE FUNCTION 271s comment on function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) is 271s 'Store state of a Slony component. Useful for monitoring'; 271s COMMENT 271s create or replace function public.recreate_log_trigger(p_fq_table_name text, 271s p_tab_id oid, p_tab_attkind text) returns integer as $$ 271s begin 271s execute 'drop trigger "_main_logtrigger" on ' || 271s p_fq_table_name ; 271s -- ---- 271s execute 'create trigger "_main_logtrigger"' || 271s ' after insert or update or delete on ' || 271s p_fq_table_name 271s || ' for each row execute procedure public.logTrigger (' || 271s pg_catalog.quote_literal('_main') || ',' || 271s pg_catalog.quote_literal(p_tab_id::text) || ',' || 271s pg_catalog.quote_literal(p_tab_attkind) || ');'; 271s return 0; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s comment on function public.recreate_log_trigger(p_fq_table_name text, 271s p_tab_id oid, p_tab_attkind text) is 271s 'A function that drops and recreates the log trigger on the specified table. 271s It is intended to be used after the primary_key/unique index has changed.'; 271s COMMENT 271s create or replace function public.repair_log_triggers(only_locked boolean) 271s returns integer as $$ 271s declare 271s retval integer; 271s table_row record; 271s begin 271s retval=0; 271s for table_row in 271s select tab_nspname,tab_relname, 271s tab_idxname, tab_id, mode, 271s public.determineAttKindUnique(tab_nspname|| 271s '.'||tab_relname,tab_idxname) as attkind 271s from 271s public.sl_table 271s left join 271s pg_locks on (relation=tab_reloid and pid=pg_backend_pid() 271s and mode='AccessExclusiveLock') 271s ,pg_trigger 271s where tab_reloid=tgrelid and 271s public.determineAttKindUnique(tab_nspname||'.' 271s ||tab_relname,tab_idxname) 271s !=(public.decode_tgargs(tgargs))[2] 271s and tgname = '_main' 271s || '_logtrigger' 271s LOOP 271s if (only_locked=false) or table_row.mode='AccessExclusiveLock' then 271s perform public.recreate_log_trigger 271s (table_row.tab_nspname||'.'||table_row.tab_relname, 271s table_row.tab_id,table_row.attkind); 271s retval=retval+1; 271s else 271s raise notice '%.% has an invalid configuration on the log trigger. This was not corrected because only_lock is true and the table is not locked.', 271s table_row.tab_nspname,table_row.tab_relname; 271s 271s end if; 271s end loop; 271s return retval; 271s end 271s $$ 271s language plpgsql; 271s CREATE FUNCTION 271s comment on function public.repair_log_triggers(only_locked boolean) 271s is ' 271s repair the log triggers as required. If only_locked is true then only 271s tables that are already exclusively locked by the current transaction are 271s repaired. Otherwise all replicated tables with outdated trigger arguments 271s are recreated.'; 271s COMMENT 271s create or replace function public.unsubscribe_abandoned_sets(p_failed_node int4) returns bigint 271s as $$ 271s declare 271s v_row record; 271s v_seq_id bigint; 271s v_local_node int4; 271s begin 271s 271s select public.getLocalNodeId('_main') into 271s v_local_node; 271s 271s if found then 271s --abandon all subscriptions from this origin. 271s for v_row in select sub_set,sub_receiver from 271s public.sl_subscribe, public.sl_set 271s where sub_set=set_id and set_origin=p_failed_node 271s and sub_receiver=v_local_node 271s loop 271s raise notice 'Slony-I: failover_abandon_set() is abandoning subscription to set % on node % because it is too far ahead', v_row.sub_set, 271s v_local_node; 271s --If this node is a provider for the set 271s --then the receiver needs to be unsubscribed. 271s -- 271s select public.unsubscribeSet(v_row.sub_set, 271s v_local_node,true) 271s into v_seq_id; 271s end loop; 271s end if; 271s 271s return v_seq_id; 271s end 271s $$ language plpgsql; 271s CREATE FUNCTION 271s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 271s $BODY$ 271s DECLARE 271s c_delim text; 271s BEGIN 271s c_delim = ','; 271s IF (txt_before IS NULL or txt_before='') THEN 271s RETURN txt_new; 271s END IF; 271s RETURN txt_before || c_delim || txt_new; 271s END; 271s $BODY$ 271s LANGUAGE plpgsql; 271s CREATE FUNCTION 271s comment on function public.agg_text_sum(text,text) is 271s 'An accumulator function used by the slony string_agg function to 271s aggregate rows into a string'; 271s COMMENT 271s Dropping cluster 17/regress ... 271s ### End 17 psql ### 272s autopkgtest [16:54:02]: test load-functions: -----------------------] 272s load-functions PASS 272s autopkgtest [16:54:02]: test load-functions: - - - - - - - - - - results - - - - - - - - - - 273s autopkgtest [16:54:03]: @@@@@@@@@@@@@@@@@@@@ summary 273s load-functions PASS 291s nova [W] Using flock in prodstack6-s390x 291s flock: timeout while waiting to get lock 291s Creating nova instance adt-plucky-s390x-slony1-2-20250315-164930-juju-7f2275-prod-proposed-migration-environment-20-1994cbae-3f66-41d1-ae44-d38abdc4bfea from image adt/ubuntu-plucky-s390x-server-20250315.img (UUID 3d3557fa-fd0f-4bba-9b89-8d5964e09f61)... 291s nova [W] Timed out waiting for 000c6004-39a5-4480-a7ac-ede09dbf1564 to get deleted.