0s autopkgtest [16:47:56]: starting date and time: 2025-03-15 16:47:56+0000 0s autopkgtest [16:47:56]: git checkout: 325255d2 Merge branch 'pin-any-arch' into 'ubuntu/production' 0s autopkgtest [16:47:56]: host juju-7f2275-prod-proposed-migration-environment-2; command line: /home/ubuntu/autopkgtest/runner/autopkgtest --output-dir /tmp/autopkgtest-work.1fli4r2e/out --timeout-copy=6000 --setup-commands /home/ubuntu/autopkgtest-cloud/worker-config-production/setup-canonical.sh --apt-pocket=proposed=src:glibc --apt-upgrade slony1-2 --timeout-short=300 --timeout-copy=20000 --timeout-build=20000 --env=ADT_TEST_TRIGGERS=glibc/2.41-1ubuntu2 -- ssh -s /home/ubuntu/autopkgtest/ssh-setup/nova -- --flavor autopkgtest --security-groups autopkgtest-juju-7f2275-prod-proposed-migration-environment-2@bos03-arm64-39.secgroup --name adt-plucky-arm64-slony1-2-20250315-164756-juju-7f2275-prod-proposed-migration-environment-2-529207f4-500e-4f0f-b232-17e8f473a5ca --image adt/ubuntu-plucky-arm64-server --keyname testbed-juju-7f2275-prod-proposed-migration-environment-2 --net-id=net_prod-proposed-migration -e TERM=linux -e ''"'"'http_proxy=http://squid.internal:3128'"'"'' -e ''"'"'https_proxy=http://squid.internal:3128'"'"'' -e ''"'"'no_proxy=127.0.0.1,127.0.1.1,login.ubuntu.com,localhost,localdomain,novalocal,internal,archive.ubuntu.com,ports.ubuntu.com,security.ubuntu.com,ddebs.ubuntu.com,changelogs.ubuntu.com,keyserver.ubuntu.com,launchpadlibrarian.net,launchpadcontent.net,launchpad.net,10.24.0.0/24,keystone.ps5.canonical.com,objectstorage.prodstack5.canonical.com,radosgw.ps5.canonical.com'"'"'' --mirror=http://ftpmaster.internal/ubuntu/ 189s autopkgtest [16:51:05]: testbed dpkg architecture: arm64 189s autopkgtest [16:51:05]: testbed apt version: 2.9.33 189s autopkgtest [16:51:05]: @@@@@@@@@@@@@@@@@@@@ test bed setup 190s autopkgtest [16:51:06]: testbed release detected to be: None 190s autopkgtest [16:51:06]: updating testbed package index (apt update) 191s Get:1 http://ftpmaster.internal/ubuntu plucky-proposed InRelease [126 kB] 191s Hit:2 http://ftpmaster.internal/ubuntu plucky InRelease 191s Hit:3 http://ftpmaster.internal/ubuntu plucky-updates InRelease 191s Hit:4 http://ftpmaster.internal/ubuntu plucky-security InRelease 191s Get:5 http://ftpmaster.internal/ubuntu plucky-proposed/main Sources [99.7 kB] 191s Get:6 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse Sources [15.8 kB] 191s Get:7 http://ftpmaster.internal/ubuntu plucky-proposed/universe Sources [379 kB] 192s Get:8 http://ftpmaster.internal/ubuntu plucky-proposed/main arm64 Packages [111 kB] 192s Get:9 http://ftpmaster.internal/ubuntu plucky-proposed/main arm64 c-n-f Metadata [1856 B] 192s Get:10 http://ftpmaster.internal/ubuntu plucky-proposed/restricted arm64 c-n-f Metadata [116 B] 192s Get:11 http://ftpmaster.internal/ubuntu plucky-proposed/universe arm64 Packages [324 kB] 192s Get:12 http://ftpmaster.internal/ubuntu plucky-proposed/universe arm64 c-n-f Metadata [14.7 kB] 192s Get:13 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse arm64 Packages [4948 B] 192s Get:14 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse arm64 c-n-f Metadata [268 B] 193s Fetched 1078 kB in 2s (663 kB/s) 194s Reading package lists... 194s Reading package lists... 195s Building dependency tree... 195s Reading state information... 195s Calculating upgrade... 195s Calculating upgrade... 196s The following packages will be upgraded: 196s pinentry-curses python3-jinja2 strace 196s 3 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 196s Need to get 647 kB of archives. 196s After this operation, 11.3 kB of additional disk space will be used. 196s Get:1 http://ftpmaster.internal/ubuntu plucky/main arm64 strace arm64 6.13+ds-1ubuntu1 [499 kB] 197s Get:2 http://ftpmaster.internal/ubuntu plucky/main arm64 pinentry-curses arm64 1.3.1-2ubuntu3 [39.2 kB] 197s Get:3 http://ftpmaster.internal/ubuntu plucky/main arm64 python3-jinja2 all 3.1.5-2ubuntu1 [109 kB] 197s Fetched 647 kB in 1s (560 kB/s) 198s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 117701 files and directories currently installed.) 198s Preparing to unpack .../strace_6.13+ds-1ubuntu1_arm64.deb ... 198s Unpacking strace (6.13+ds-1ubuntu1) over (6.11-0ubuntu1) ... 198s Preparing to unpack .../pinentry-curses_1.3.1-2ubuntu3_arm64.deb ... 198s Unpacking pinentry-curses (1.3.1-2ubuntu3) over (1.3.1-2ubuntu2) ... 198s Preparing to unpack .../python3-jinja2_3.1.5-2ubuntu1_all.deb ... 198s Unpacking python3-jinja2 (3.1.5-2ubuntu1) over (3.1.5-2) ... 198s Setting up pinentry-curses (1.3.1-2ubuntu3) ... 198s Setting up python3-jinja2 (3.1.5-2ubuntu1) ... 199s Setting up strace (6.13+ds-1ubuntu1) ... 199s Processing triggers for man-db (2.13.0-1) ... 199s Reading package lists... 200s Building dependency tree... 200s Reading state information... 200s Solving dependencies... 200s The following packages will be REMOVED: 200s libnsl2* libpython3.12-minimal* libpython3.12-stdlib* libpython3.12t64* 200s libunwind8* linux-headers-6.11.0-8* linux-headers-6.11.0-8-generic* 200s linux-image-6.11.0-8-generic* linux-modules-6.11.0-8-generic* 200s linux-tools-6.11.0-8* linux-tools-6.11.0-8-generic* 201s 0 upgraded, 0 newly installed, 11 to remove and 5 not upgraded. 201s After this operation, 267 MB disk space will be freed. 201s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 117701 files and directories currently installed.) 201s Removing linux-tools-6.11.0-8-generic (6.11.0-8.8) ... 201s Removing linux-tools-6.11.0-8 (6.11.0-8.8) ... 201s Removing libpython3.12t64:arm64 (3.12.9-1) ... 201s Removing libpython3.12-stdlib:arm64 (3.12.9-1) ... 201s Removing libnsl2:arm64 (1.3.0-3build3) ... 201s Removing libpython3.12-minimal:arm64 (3.12.9-1) ... 201s Removing libunwind8:arm64 (1.6.2-3.1) ... 201s Removing linux-headers-6.11.0-8-generic (6.11.0-8.8) ... 202s Removing linux-headers-6.11.0-8 (6.11.0-8.8) ... 203s Removing linux-image-6.11.0-8-generic (6.11.0-8.8) ... 203s I: /boot/vmlinuz.old is now a symlink to vmlinuz-6.14.0-10-generic 203s I: /boot/initrd.img.old is now a symlink to initrd.img-6.14.0-10-generic 203s /etc/kernel/postrm.d/initramfs-tools: 203s update-initramfs: Deleting /boot/initrd.img-6.11.0-8-generic 203s /etc/kernel/postrm.d/zz-flash-kernel: 203s flash-kernel: Kernel 6.11.0-8-generic has been removed. 203s flash-kernel: A higher version (6.14.0-10-generic) is still installed, no reflashing required. 204s /etc/kernel/postrm.d/zz-update-grub: 204s Sourcing file `/etc/default/grub' 204s Sourcing file `/etc/default/grub.d/50-cloudimg-settings.cfg' 204s Generating grub configuration file ... 204s Found linux image: /boot/vmlinuz-6.14.0-10-generic 204s Found initrd image: /boot/initrd.img-6.14.0-10-generic 204s Warning: os-prober will not be executed to detect other bootable partitions. 204s Systems on them will not be added to the GRUB boot configuration. 204s Check GRUB_DISABLE_OS_PROBER documentation entry. 204s Adding boot menu entry for UEFI Firmware Settings ... 204s done 204s Removing linux-modules-6.11.0-8-generic (6.11.0-8.8) ... 204s Processing triggers for libc-bin (2.41-1ubuntu1) ... 205s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81650 files and directories currently installed.) 205s Purging configuration files for linux-image-6.11.0-8-generic (6.11.0-8.8) ... 205s Purging configuration files for libpython3.12-minimal:arm64 (3.12.9-1) ... 205s Purging configuration files for linux-modules-6.11.0-8-generic (6.11.0-8.8) ... 205s autopkgtest [16:51:21]: upgrading testbed (apt dist-upgrade and autopurge) 205s Reading package lists... 205s Building dependency tree... 205s Reading state information... 206s Calculating upgrade...Starting pkgProblemResolver with broken count: 0 206s Starting 2 pkgProblemResolver with broken count: 0 206s Done 207s Entering ResolveByKeep 207s 207s Calculating upgrade... 207s The following packages will be upgraded: 207s libc-bin libc-dev-bin libc6 libc6-dev locales 208s 5 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 208s Need to get 9530 kB of archives. 208s After this operation, 0 B of additional disk space will be used. 208s Get:1 http://ftpmaster.internal/ubuntu plucky-proposed/main arm64 libc6-dev arm64 2.41-1ubuntu2 [1750 kB] 210s Get:2 http://ftpmaster.internal/ubuntu plucky-proposed/main arm64 libc-dev-bin arm64 2.41-1ubuntu2 [24.0 kB] 210s Get:3 http://ftpmaster.internal/ubuntu plucky-proposed/main arm64 libc6 arm64 2.41-1ubuntu2 [2910 kB] 214s Get:4 http://ftpmaster.internal/ubuntu plucky-proposed/main arm64 libc-bin arm64 2.41-1ubuntu2 [600 kB] 215s Get:5 http://ftpmaster.internal/ubuntu plucky-proposed/main arm64 locales all 2.41-1ubuntu2 [4246 kB] 221s Preconfiguring packages ... 221s Fetched 9530 kB in 13s (712 kB/s) 221s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81647 files and directories currently installed.) 221s Preparing to unpack .../libc6-dev_2.41-1ubuntu2_arm64.deb ... 221s Unpacking libc6-dev:arm64 (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 221s Preparing to unpack .../libc-dev-bin_2.41-1ubuntu2_arm64.deb ... 221s Unpacking libc-dev-bin (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 222s Preparing to unpack .../libc6_2.41-1ubuntu2_arm64.deb ... 222s Unpacking libc6:arm64 (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 222s Setting up libc6:arm64 (2.41-1ubuntu2) ... 222s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81647 files and directories currently installed.) 222s Preparing to unpack .../libc-bin_2.41-1ubuntu2_arm64.deb ... 222s Unpacking libc-bin (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 222s Setting up libc-bin (2.41-1ubuntu2) ... 222s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81647 files and directories currently installed.) 222s Preparing to unpack .../locales_2.41-1ubuntu2_all.deb ... 222s Unpacking locales (2.41-1ubuntu2) over (2.41-1ubuntu1) ... 222s Setting up locales (2.41-1ubuntu2) ... 223s Generating locales (this might take a while)... 225s en_US.UTF-8... done 225s Generation complete. 225s Setting up libc-dev-bin (2.41-1ubuntu2) ... 225s Setting up libc6-dev:arm64 (2.41-1ubuntu2) ... 225s Processing triggers for man-db (2.13.0-1) ... 226s Processing triggers for systemd (257.3-1ubuntu3) ... 227s Reading package lists... 227s Building dependency tree... 227s Reading state information... 227s Starting pkgProblemResolver with broken count: 0 227s Starting 2 pkgProblemResolver with broken count: 0 227s Done 228s Solving dependencies... 228s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 228s autopkgtest [16:51:44]: rebooting testbed after setup commands that affected boot 251s autopkgtest [16:52:07]: testbed running kernel: Linux 6.14.0-10-generic #10-Ubuntu SMP PREEMPT_DYNAMIC Wed Mar 12 15:45:31 UTC 2025 254s autopkgtest [16:52:10]: @@@@@@@@@@@@@@@@@@@@ apt-source slony1-2 259s Get:1 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (dsc) [2462 B] 259s Get:2 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (tar) [1465 kB] 259s Get:3 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (diff) [17.3 kB] 259s gpgv: Signature made Thu Sep 19 09:07:19 2024 UTC 259s gpgv: using RSA key 5C48FE6157F49179597087C64C5A6BAB12D2A7AE 259s gpgv: Can't check signature: No public key 259s dpkg-source: warning: cannot verify inline signature for ./slony1-2_2.2.11-6.dsc: no acceptable signature found 260s autopkgtest [16:52:16]: testing package slony1-2 version 2.2.11-6 260s autopkgtest [16:52:16]: build not needed 261s autopkgtest [16:52:17]: test load-functions: preparing testbed 261s Reading package lists... 261s Building dependency tree... 261s Reading state information... 262s Starting pkgProblemResolver with broken count: 0 262s Starting 2 pkgProblemResolver with broken count: 0 262s Done 263s The following NEW packages will be installed: 263s libio-pty-perl libipc-run-perl libjson-perl libllvm20 libpq5 libxslt1.1 263s postgresql-17 postgresql-17-slony1-2 postgresql-client-17 263s postgresql-client-common postgresql-common postgresql-common-dev 263s slony1-2-bin slony1-2-doc ssl-cert 263s 0 upgraded, 15 newly installed, 0 to remove and 0 not upgraded. 263s Need to get 47.4 MB of archives. 263s After this operation, 200 MB of additional disk space will be used. 263s Get:1 http://ftpmaster.internal/ubuntu plucky/main arm64 libjson-perl all 4.10000-1 [81.9 kB] 263s Get:2 http://ftpmaster.internal/ubuntu plucky/main arm64 postgresql-client-common all 274 [47.6 kB] 263s Get:3 http://ftpmaster.internal/ubuntu plucky/main arm64 libio-pty-perl arm64 1:1.20-1build3 [31.3 kB] 263s Get:4 http://ftpmaster.internal/ubuntu plucky/main arm64 libipc-run-perl all 20231003.0-2 [91.5 kB] 263s Get:5 http://ftpmaster.internal/ubuntu plucky/main arm64 postgresql-common-dev all 274 [73.0 kB] 263s Get:6 http://ftpmaster.internal/ubuntu plucky/main arm64 ssl-cert all 1.1.3ubuntu1 [18.7 kB] 263s Get:7 http://ftpmaster.internal/ubuntu plucky/main arm64 postgresql-common all 274 [101 kB] 263s Get:8 http://ftpmaster.internal/ubuntu plucky/main arm64 libllvm20 arm64 1:20.1.0~+rc2-1~exp2ubuntu0.4 [29.1 MB] 295s Get:9 http://ftpmaster.internal/ubuntu plucky/main arm64 libpq5 arm64 17.4-1 [142 kB] 295s Get:10 http://ftpmaster.internal/ubuntu plucky/main arm64 libxslt1.1 arm64 1.1.39-0exp1ubuntu2 [168 kB] 295s Get:11 http://ftpmaster.internal/ubuntu plucky/main arm64 postgresql-client-17 arm64 17.4-1 [1342 kB] 297s Get:12 http://ftpmaster.internal/ubuntu plucky/main arm64 postgresql-17 arm64 17.4-1 [15.6 MB] 313s Get:13 http://ftpmaster.internal/ubuntu plucky/universe arm64 postgresql-17-slony1-2 arm64 2.2.11-6 [20.0 kB] 313s Get:14 http://ftpmaster.internal/ubuntu plucky/universe arm64 slony1-2-bin arm64 2.2.11-6 [221 kB] 313s Get:15 http://ftpmaster.internal/ubuntu plucky/universe arm64 slony1-2-doc all 2.2.11-6 [327 kB] 314s Preconfiguring packages ... 314s Fetched 47.4 MB in 51s (930 kB/s) 314s Selecting previously unselected package libjson-perl. 314s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81647 files and directories currently installed.) 314s Preparing to unpack .../00-libjson-perl_4.10000-1_all.deb ... 314s Unpacking libjson-perl (4.10000-1) ... 314s Selecting previously unselected package postgresql-client-common. 314s Preparing to unpack .../01-postgresql-client-common_274_all.deb ... 314s Unpacking postgresql-client-common (274) ... 314s Selecting previously unselected package libio-pty-perl. 314s Preparing to unpack .../02-libio-pty-perl_1%3a1.20-1build3_arm64.deb ... 314s Unpacking libio-pty-perl (1:1.20-1build3) ... 314s Selecting previously unselected package libipc-run-perl. 314s Preparing to unpack .../03-libipc-run-perl_20231003.0-2_all.deb ... 314s Unpacking libipc-run-perl (20231003.0-2) ... 314s Selecting previously unselected package postgresql-common-dev. 314s Preparing to unpack .../04-postgresql-common-dev_274_all.deb ... 314s Unpacking postgresql-common-dev (274) ... 314s Selecting previously unselected package ssl-cert. 314s Preparing to unpack .../05-ssl-cert_1.1.3ubuntu1_all.deb ... 314s Unpacking ssl-cert (1.1.3ubuntu1) ... 314s Selecting previously unselected package postgresql-common. 314s Preparing to unpack .../06-postgresql-common_274_all.deb ... 314s Adding 'diversion of /usr/bin/pg_config to /usr/bin/pg_config.libpq-dev by postgresql-common' 314s Unpacking postgresql-common (274) ... 314s Selecting previously unselected package libllvm20:arm64. 314s Preparing to unpack .../07-libllvm20_1%3a20.1.0~+rc2-1~exp2ubuntu0.4_arm64.deb ... 314s Unpacking libllvm20:arm64 (1:20.1.0~+rc2-1~exp2ubuntu0.4) ... 315s Selecting previously unselected package libpq5:arm64. 315s Preparing to unpack .../08-libpq5_17.4-1_arm64.deb ... 315s Unpacking libpq5:arm64 (17.4-1) ... 315s Selecting previously unselected package libxslt1.1:arm64. 315s Preparing to unpack .../09-libxslt1.1_1.1.39-0exp1ubuntu2_arm64.deb ... 315s Unpacking libxslt1.1:arm64 (1.1.39-0exp1ubuntu2) ... 315s Selecting previously unselected package postgresql-client-17. 315s Preparing to unpack .../10-postgresql-client-17_17.4-1_arm64.deb ... 315s Unpacking postgresql-client-17 (17.4-1) ... 315s Selecting previously unselected package postgresql-17. 315s Preparing to unpack .../11-postgresql-17_17.4-1_arm64.deb ... 315s Unpacking postgresql-17 (17.4-1) ... 316s Selecting previously unselected package postgresql-17-slony1-2. 316s Preparing to unpack .../12-postgresql-17-slony1-2_2.2.11-6_arm64.deb ... 316s Unpacking postgresql-17-slony1-2 (2.2.11-6) ... 316s Selecting previously unselected package slony1-2-bin. 316s Preparing to unpack .../13-slony1-2-bin_2.2.11-6_arm64.deb ... 316s Unpacking slony1-2-bin (2.2.11-6) ... 316s Selecting previously unselected package slony1-2-doc. 316s Preparing to unpack .../14-slony1-2-doc_2.2.11-6_all.deb ... 316s Unpacking slony1-2-doc (2.2.11-6) ... 316s Setting up postgresql-client-common (274) ... 316s Setting up libio-pty-perl (1:1.20-1build3) ... 316s Setting up libpq5:arm64 (17.4-1) ... 316s Setting up ssl-cert (1.1.3ubuntu1) ... 316s Created symlink '/etc/systemd/system/multi-user.target.wants/ssl-cert.service' → '/usr/lib/systemd/system/ssl-cert.service'. 317s Setting up libllvm20:arm64 (1:20.1.0~+rc2-1~exp2ubuntu0.4) ... 317s Setting up libipc-run-perl (20231003.0-2) ... 317s Setting up libjson-perl (4.10000-1) ... 317s Setting up libxslt1.1:arm64 (1.1.39-0exp1ubuntu2) ... 317s Setting up slony1-2-doc (2.2.11-6) ... 317s Setting up postgresql-common-dev (274) ... 317s Setting up postgresql-client-17 (17.4-1) ... 317s update-alternatives: using /usr/share/postgresql/17/man/man1/psql.1.gz to provide /usr/share/man/man1/psql.1.gz (psql.1.gz) in auto mode 317s Setting up postgresql-common (274) ... 317s Creating config file /etc/postgresql-common/createcluster.conf with new version 318s Building PostgreSQL dictionaries from installed myspell/hunspell packages... 318s Removing obsolete dictionary files: 318s Created symlink '/etc/systemd/system/multi-user.target.wants/postgresql.service' → '/usr/lib/systemd/system/postgresql.service'. 319s Setting up slony1-2-bin (2.2.11-6) ... 319s Setting up postgresql-17 (17.4-1) ... 320s Creating new PostgreSQL cluster 17/main ... 320s /usr/lib/postgresql/17/bin/initdb -D /var/lib/postgresql/17/main --auth-local peer --auth-host scram-sha-256 --no-instructions 320s The files belonging to this database system will be owned by user "postgres". 320s This user must also own the server process. 320s 320s The database cluster will be initialized with locale "C.UTF-8". 320s The default database encoding has accordingly been set to "UTF8". 320s The default text search configuration will be set to "english". 320s 320s Data page checksums are disabled. 320s 320s fixing permissions on existing directory /var/lib/postgresql/17/main ... ok 320s creating subdirectories ... ok 320s selecting dynamic shared memory implementation ... posix 320s selecting default "max_connections" ... 100 320s selecting default "shared_buffers" ... 128MB 320s selecting default time zone ... Etc/UTC 320s creating configuration files ... ok 320s running bootstrap script ... ok 320s performing post-bootstrap initialization ... ok 320s syncing data to disk ... ok 323s Setting up postgresql-17-slony1-2 (2.2.11-6) ... 323s Processing triggers for man-db (2.13.0-1) ... 324s Processing triggers for libc-bin (2.41-1ubuntu2) ... 326s autopkgtest [16:53:22]: test load-functions: [----------------------- 327s ### PostgreSQL 17 psql ### 327s Creating new PostgreSQL cluster 17/regress ... 330s create table public.sl_node ( 330s no_id int4, 330s no_active bool, 330s no_comment text, 330s no_failed bool, 330s CONSTRAINT "sl_node-pkey" 330s PRIMARY KEY (no_id) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_node is 'Holds the list of nodes associated with this namespace.'; 330s COMMENT 330s comment on column public.sl_node.no_id is 'The unique ID number for the node'; 330s COMMENT 330s comment on column public.sl_node.no_active is 'Is the node active in replication yet?'; 330s COMMENT 330s comment on column public.sl_node.no_comment is 'A human-oriented description of the node'; 330s COMMENT 330s create table public.sl_nodelock ( 330s nl_nodeid int4, 330s nl_conncnt serial, 330s nl_backendpid int4, 330s CONSTRAINT "sl_nodelock-pkey" 330s PRIMARY KEY (nl_nodeid, nl_conncnt) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_nodelock is 'Used to prevent multiple slon instances and to identify the backends to kill in terminateNodeConnections().'; 330s COMMENT 330s comment on column public.sl_nodelock.nl_nodeid is 'Clients node_id'; 330s COMMENT 330s comment on column public.sl_nodelock.nl_conncnt is 'Clients connection number'; 330s COMMENT 330s comment on column public.sl_nodelock.nl_backendpid is 'PID of database backend owning this lock'; 330s COMMENT 330s create table public.sl_set ( 330s set_id int4, 330s set_origin int4, 330s set_locked bigint, 330s set_comment text, 330s CONSTRAINT "sl_set-pkey" 330s PRIMARY KEY (set_id), 330s CONSTRAINT "set_origin-no_id-ref" 330s FOREIGN KEY (set_origin) 330s REFERENCES public.sl_node (no_id) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_set is 'Holds definitions of replication sets.'; 330s COMMENT 330s comment on column public.sl_set.set_id is 'A unique ID number for the set.'; 330s COMMENT 330s comment on column public.sl_set.set_origin is 330s 'The ID number of the source node for the replication set.'; 330s COMMENT 330s comment on column public.sl_set.set_locked is 'Transaction ID where the set was locked.'; 330s COMMENT 330s comment on column public.sl_set.set_comment is 'A human-oriented description of the set.'; 330s COMMENT 330s create table public.sl_setsync ( 330s ssy_setid int4, 330s ssy_origin int4, 330s ssy_seqno int8, 330s ssy_snapshot "pg_catalog".txid_snapshot, 330s ssy_action_list text, 330s CONSTRAINT "sl_setsync-pkey" 330s PRIMARY KEY (ssy_setid), 330s CONSTRAINT "ssy_setid-set_id-ref" 330s FOREIGN KEY (ssy_setid) 330s REFERENCES public.sl_set (set_id), 330s CONSTRAINT "ssy_origin-no_id-ref" 330s FOREIGN KEY (ssy_origin) 330s REFERENCES public.sl_node (no_id) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_setsync is 'SYNC information'; 330s COMMENT 330s comment on column public.sl_setsync.ssy_setid is 'ID number of the replication set'; 330s COMMENT 330s comment on column public.sl_setsync.ssy_origin is 'ID number of the node'; 330s COMMENT 330s comment on column public.sl_setsync.ssy_seqno is 'Slony-I sequence number'; 330s COMMENT 330s comment on column public.sl_setsync.ssy_snapshot is 'TXID in provider system seen by the event'; 330s COMMENT 330s comment on column public.sl_setsync.ssy_action_list is 'action list used during the subscription process. At the time a subscriber copies over data from the origin, it sees all tables in a state somewhere between two SYNC events. Therefore this list must contains all log_actionseqs that are visible at that time, whose operations have therefore already been included in the data copied at the time the initial data copy is done. Those actions may therefore be filtered out of the first SYNC done after subscribing.'; 330s COMMENT 330s create table public.sl_table ( 330s tab_id int4, 330s tab_reloid oid UNIQUE NOT NULL, 330s tab_relname name NOT NULL, 330s tab_nspname name NOT NULL, 330s tab_set int4, 330s tab_idxname name NOT NULL, 330s tab_altered boolean NOT NULL, 330s tab_comment text, 330s CONSTRAINT "sl_table-pkey" 330s PRIMARY KEY (tab_id), 330s CONSTRAINT "tab_set-set_id-ref" 330s FOREIGN KEY (tab_set) 330s REFERENCES public.sl_set (set_id) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_table is 'Holds information about the tables being replicated.'; 330s COMMENT 330s comment on column public.sl_table.tab_id is 'Unique key for Slony-I to use to identify the table'; 330s COMMENT 330s comment on column public.sl_table.tab_reloid is 'The OID of the table in pg_catalog.pg_class.oid'; 330s COMMENT 330s comment on column public.sl_table.tab_relname is 'The name of the table in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 330s COMMENT 330s comment on column public.sl_table.tab_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 330s COMMENT 330s comment on column public.sl_table.tab_set is 'ID of the replication set the table is in'; 330s COMMENT 330s comment on column public.sl_table.tab_idxname is 'The name of the primary index of the table'; 330s COMMENT 330s comment on column public.sl_table.tab_altered is 'Has the table been modified for replication?'; 330s COMMENT 330s comment on column public.sl_table.tab_comment is 'Human-oriented description of the table'; 330s COMMENT 330s create table public.sl_sequence ( 330s seq_id int4, 330s seq_reloid oid UNIQUE NOT NULL, 330s seq_relname name NOT NULL, 330s seq_nspname name NOT NULL, 330s seq_set int4, 330s seq_comment text, 330s CONSTRAINT "sl_sequence-pkey" 330s PRIMARY KEY (seq_id), 330s CONSTRAINT "seq_set-set_id-ref" 330s FOREIGN KEY (seq_set) 330s REFERENCES public.sl_set (set_id) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_sequence is 'Similar to sl_table, each entry identifies a sequence being replicated.'; 330s COMMENT 330s comment on column public.sl_sequence.seq_id is 'An internally-used ID for Slony-I to use in its sequencing of updates'; 330s COMMENT 330s comment on column public.sl_sequence.seq_reloid is 'The OID of the sequence object'; 330s COMMENT 330s comment on column public.sl_sequence.seq_relname is 'The name of the sequence in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 330s COMMENT 330s comment on column public.sl_sequence.seq_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 330s COMMENT 330s comment on column public.sl_sequence.seq_set is 'Indicates which replication set the object is in'; 330s COMMENT 330s comment on column public.sl_sequence.seq_comment is 'A human-oriented comment'; 330s COMMENT 330s create table public.sl_path ( 330s pa_server int4, 330s pa_client int4, 330s pa_conninfo text NOT NULL, 330s pa_connretry int4, 330s CONSTRAINT "sl_path-pkey" 330s PRIMARY KEY (pa_server, pa_client), 330s CONSTRAINT "pa_server-no_id-ref" 330s FOREIGN KEY (pa_server) 330s REFERENCES public.sl_node (no_id), 330s CONSTRAINT "pa_client-no_id-ref" 330s FOREIGN KEY (pa_client) 330s REFERENCES public.sl_node (no_id) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_path is 'Holds connection information for the paths between nodes, and the synchronisation delay'; 330s COMMENT 330s comment on column public.sl_path.pa_server is 'The Node ID # (from sl_node.no_id) of the data source'; 330s COMMENT 330s comment on column public.sl_path.pa_client is 'The Node ID # (from sl_node.no_id) of the data target'; 330s COMMENT 330s comment on column public.sl_path.pa_conninfo is 'The PostgreSQL connection string used to connect to the source node.'; 330s COMMENT 330s comment on column public.sl_path.pa_connretry is 'The synchronisation delay, in seconds'; 330s COMMENT 330s create table public.sl_listen ( 330s li_origin int4, 330s li_provider int4, 330s li_receiver int4, 330s CONSTRAINT "sl_listen-pkey" 330s PRIMARY KEY (li_origin, li_provider, li_receiver), 330s CONSTRAINT "li_origin-no_id-ref" 330s FOREIGN KEY (li_origin) 330s REFERENCES public.sl_node (no_id), 330s CONSTRAINT "sl_listen-sl_path-ref" 330s FOREIGN KEY (li_provider, li_receiver) 330s REFERENCES public.sl_path (pa_server, pa_client) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_listen is 'Indicates how nodes listen to events from other nodes in the Slony-I network.'; 330s COMMENT 330s comment on column public.sl_listen.li_origin is 'The ID # (from sl_node.no_id) of the node this listener is operating on'; 330s COMMENT 330s comment on column public.sl_listen.li_provider is 'The ID # (from sl_node.no_id) of the source node for this listening event'; 330s COMMENT 330s comment on column public.sl_listen.li_receiver is 'The ID # (from sl_node.no_id) of the target node for this listening event'; 330s COMMENT 330s create table public.sl_subscribe ( 330s sub_set int4, 330s sub_provider int4, 330s sub_receiver int4, 330s sub_forward bool, 330s sub_active bool, 330s CONSTRAINT "sl_subscribe-pkey" 330s PRIMARY KEY (sub_receiver, sub_set), 330s CONSTRAINT "sl_subscribe-sl_path-ref" 330s FOREIGN KEY (sub_provider, sub_receiver) 330s REFERENCES public.sl_path (pa_server, pa_client), 330s CONSTRAINT "sub_set-set_id-ref" 330s FOREIGN KEY (sub_set) 330s REFERENCES public.sl_set (set_id) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_subscribe is 'Holds a list of subscriptions on sets'; 330s COMMENT 330s comment on column public.sl_subscribe.sub_set is 'ID # (from sl_set) of the set being subscribed to'; 330s COMMENT 330s comment on column public.sl_subscribe.sub_provider is 'ID# (from sl_node) of the node providing data'; 330s COMMENT 330s comment on column public.sl_subscribe.sub_receiver is 'ID# (from sl_node) of the node receiving data from the provider'; 330s COMMENT 330s comment on column public.sl_subscribe.sub_forward is 'Does this provider keep data in sl_log_1/sl_log_2 to allow it to be a provider for other nodes?'; 330s COMMENT 330s comment on column public.sl_subscribe.sub_active is 'Has this subscription been activated? This is not set on the subscriber until AFTER the subscriber has received COPY data from the provider'; 330s COMMENT 330s create table public.sl_event ( 330s ev_origin int4, 330s ev_seqno int8, 330s ev_timestamp timestamptz, 330s ev_snapshot "pg_catalog".txid_snapshot, 330s ev_type text, 330s ev_data1 text, 330s ev_data2 text, 330s ev_data3 text, 330s ev_data4 text, 330s ev_data5 text, 330s ev_data6 text, 330s ev_data7 text, 330s ev_data8 text, 330s CONSTRAINT "sl_event-pkey" 330s PRIMARY KEY (ev_origin, ev_seqno) 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_event is 'Holds information about replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_confirm table.'; 330s COMMENT 330s comment on column public.sl_event.ev_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 330s COMMENT 330s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 330s COMMENT 330s comment on column public.sl_event.ev_timestamp is 'When this event record was created'; 330s COMMENT 330s comment on column public.sl_event.ev_snapshot is 'TXID snapshot on provider node for this event'; 330s COMMENT 330s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 330s COMMENT 330s comment on column public.sl_event.ev_type is 'The type of event this record is for. 330s SYNC = Synchronise 330s STORE_NODE = 330s ENABLE_NODE = 330s DROP_NODE = 330s STORE_PATH = 330s DROP_PATH = 330s STORE_LISTEN = 330s DROP_LISTEN = 330s STORE_SET = 330s DROP_SET = 330s MERGE_SET = 330s SET_ADD_TABLE = 330s SET_ADD_SEQUENCE = 330s STORE_TRIGGER = 330s DROP_TRIGGER = 330s MOVE_SET = 330s ACCEPT_SET = 330s SET_DROP_TABLE = 330s SET_DROP_SEQUENCE = 330s SET_MOVE_TABLE = 330s SET_MOVE_SEQUENCE = 330s FAILOVER_SET = 330s SUBSCRIBE_SET = 330s ENABLE_SUBSCRIPTION = 330s UNSUBSCRIBE_SET = 330s DDL_SCRIPT = 330s ADJUST_SEQ = 330s RESET_CONFIG = 330s '; 330s COMMENT 330s comment on column public.sl_event.ev_data1 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s comment on column public.sl_event.ev_data2 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s comment on column public.sl_event.ev_data3 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s comment on column public.sl_event.ev_data4 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s comment on column public.sl_event.ev_data5 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s comment on column public.sl_event.ev_data6 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s comment on column public.sl_event.ev_data7 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s comment on column public.sl_event.ev_data8 is 'Data field containing an argument needed to process the event'; 330s COMMENT 330s create table public.sl_confirm ( 330s con_origin int4, 330s con_received int4, 330s con_seqno int8, 330s con_timestamp timestamptz DEFAULT timeofday()::timestamptz 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_confirm is 'Holds confirmation of replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_event table.'; 330s COMMENT 330s comment on column public.sl_confirm.con_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 330s COMMENT 330s comment on column public.sl_confirm.con_seqno is 'The ID # for the event'; 330s COMMENT 330s comment on column public.sl_confirm.con_timestamp is 'When this event was confirmed'; 330s COMMENT 330s create index sl_confirm_idx1 on public.sl_confirm 330s (con_origin, con_received, con_seqno); 330s CREATE INDEX 330s create index sl_confirm_idx2 on public.sl_confirm 330s (con_received, con_seqno); 330s CREATE INDEX 330s create table public.sl_seqlog ( 330s seql_seqid int4, 330s seql_origin int4, 330s seql_ev_seqno int8, 330s seql_last_value int8 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_seqlog is 'Log of Sequence updates'; 330s COMMENT 330s comment on column public.sl_seqlog.seql_seqid is 'Sequence ID'; 330s COMMENT 330s comment on column public.sl_seqlog.seql_origin is 'Publisher node at which the sequence originates'; 330s COMMENT 330s comment on column public.sl_seqlog.seql_ev_seqno is 'Slony-I Event with which this sequence update is associated'; 330s COMMENT 330s comment on column public.sl_seqlog.seql_last_value is 'Last value published for this sequence'; 330s COMMENT 330s create index sl_seqlog_idx on public.sl_seqlog 330s (seql_origin, seql_ev_seqno, seql_seqid); 330s CREATE INDEX 330s create function public.sequenceLastValue(p_seqname text) returns int8 330s as $$ 330s declare 330s v_seq_row record; 330s begin 330s for v_seq_row in execute 'select last_value from ' || public.slon_quote_input(p_seqname) 330s loop 330s return v_seq_row.last_value; 330s end loop; 330s 330s -- not reached 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.sequenceLastValue(p_seqname text) is 330s 'sequenceLastValue(p_seqname) 330s 330s Utility function used in sl_seqlastvalue view to compactly get the 330s last value from the requested sequence.'; 330s COMMENT 330s create table public.sl_log_1 ( 330s log_origin int4, 330s log_txid bigint, 330s log_tableid int4, 330s log_actionseq int8, 330s log_tablenspname text, 330s log_tablerelname text, 330s log_cmdtype "char", 330s log_cmdupdncols int4, 330s log_cmdargs text[] 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s create index sl_log_1_idx1 on public.sl_log_1 330s (log_origin, log_txid, log_actionseq); 330s CREATE INDEX 330s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 330s COMMENT 330s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 330s COMMENT 330s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 330s COMMENT 330s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 330s COMMENT 330s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 330s COMMENT 330s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 330s COMMENT 330s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 330s COMMENT 330s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 330s COMMENT 330s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 330s COMMENT 330s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 330s COMMENT 330s create table public.sl_log_2 ( 330s log_origin int4, 330s log_txid bigint, 330s log_tableid int4, 330s log_actionseq int8, 330s log_tablenspname text, 330s log_tablerelname text, 330s log_cmdtype "char", 330s log_cmdupdncols int4, 330s log_cmdargs text[] 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s create index sl_log_2_idx1 on public.sl_log_2 330s (log_origin, log_txid, log_actionseq); 330s CREATE INDEX 330s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 330s COMMENT 330s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 330s COMMENT 330s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 330s COMMENT 330s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 330s COMMENT 330s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 330s COMMENT 330s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 330s COMMENT 330s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 330s COMMENT 330s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 330s COMMENT 330s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 330s COMMENT 330s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 330s COMMENT 330s create table public.sl_log_script ( 330s log_origin int4, 330s log_txid bigint, 330s log_actionseq int8, 330s log_cmdtype "char", 330s log_cmdargs text[] 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s create index sl_log_script_idx1 on public.sl_log_script 330s (log_origin, log_txid, log_actionseq); 330s CREATE INDEX 330s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 330s COMMENT 330s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 330s COMMENT 330s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 330s COMMENT 330s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 330s COMMENT 330s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 330s COMMENT 330s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 330s COMMENT 330s create table public.sl_registry ( 330s reg_key text primary key, 330s reg_int4 int4, 330s reg_text text, 330s reg_timestamp timestamptz 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s comment on table public.sl_registry is 'Stores miscellaneous runtime data'; 330s COMMENT 330s comment on column public.sl_registry.reg_key is 'Unique key of the runtime option'; 330s COMMENT 330s comment on column public.sl_registry.reg_int4 is 'Option value if type int4'; 330s COMMENT 330s comment on column public.sl_registry.reg_text is 'Option value if type text'; 330s COMMENT 330s comment on column public.sl_registry.reg_timestamp is 'Option value if type timestamp'; 330s COMMENT 330s create table public.sl_apply_stats ( 330s as_origin int4, 330s as_num_insert int8, 330s as_num_update int8, 330s as_num_delete int8, 330s as_num_truncate int8, 330s as_num_script int8, 330s as_num_total int8, 330s as_duration interval, 330s as_apply_first timestamptz, 330s as_apply_last timestamptz, 330s as_cache_prepare int8, 330s as_cache_hit int8, 330s as_cache_evict int8, 330s as_cache_prepare_max int8 330s ) WITHOUT OIDS; 330s CREATE TABLE 330s create index sl_apply_stats_idx1 on public.sl_apply_stats 330s (as_origin); 330s CREATE INDEX 330s comment on table public.sl_apply_stats is 'Local SYNC apply statistics (running totals)'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_origin is 'Origin of the SYNCs'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_num_insert is 'Number of INSERT operations performed'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_num_update is 'Number of UPDATE operations performed'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_num_delete is 'Number of DELETE operations performed'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_num_truncate is 'Number of TRUNCATE operations performed'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_num_script is 'Number of DDL operations performed'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_num_total is 'Total number of operations'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_duration is 'Processing time'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_apply_first is 'Timestamp of first recorded SYNC'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_apply_last is 'Timestamp of most recent recorded SYNC'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_cache_evict is 'Number of apply query cache evict operations'; 330s COMMENT 330s comment on column public.sl_apply_stats.as_cache_prepare_max is 'Maximum number of apply queries prepared in one SYNC group'; 330s COMMENT 330s create view public.sl_seqlastvalue as 330s select SQ.seq_id, SQ.seq_set, SQ.seq_reloid, 330s S.set_origin as seq_origin, 330s public.sequenceLastValue( 330s "pg_catalog".quote_ident(PGN.nspname) || '.' || 330s "pg_catalog".quote_ident(PGC.relname)) as seq_last_value 330s from public.sl_sequence SQ, public.sl_set S, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where S.set_id = SQ.seq_set 330s and PGC.oid = SQ.seq_reloid and PGN.oid = PGC.relnamespace; 330s CREATE VIEW 330s create view public.sl_failover_targets as 330s select set_id, 330s set_origin as set_origin, 330s sub1.sub_receiver as backup_id 330s FROM 330s public.sl_subscribe sub1 330s ,public.sl_set set1 330s where 330s sub1.sub_set=set_id 330s and sub1.sub_forward=true 330s --exclude candidates where the set_origin 330s --has a path a node but the failover 330s --candidate has no path to that node 330s and sub1.sub_receiver not in 330s (select p1.pa_client from 330s public.sl_path p1 330s left outer join public.sl_path p2 on 330s (p2.pa_client=p1.pa_client 330s and p2.pa_server=sub1.sub_receiver) 330s where p2.pa_client is null 330s and p1.pa_server=set_origin 330s and p1.pa_client<>sub1.sub_receiver 330s ) 330s and sub1.sub_provider=set_origin 330s --exclude any subscribers that are not 330s --direct subscribers of all sets on the 330s --origin 330s and sub1.sub_receiver not in 330s (select direct_recv.sub_receiver 330s from 330s 330s (--all direct receivers of the first set 330s select subs2.sub_receiver 330s from public.sl_subscribe subs2 330s where subs2.sub_provider=set1.set_origin 330s and subs2.sub_set=set1.set_id) as 330s direct_recv 330s inner join 330s (--all other sets from the origin 330s select set_id from public.sl_set set2 330s where set2.set_origin=set1.set_origin 330s and set2.set_id<>sub1.sub_set) 330s as othersets on(true) 330s left outer join public.sl_subscribe subs3 330s on(subs3.sub_set=othersets.set_id 330s and subs3.sub_forward=true 330s and subs3.sub_provider=set1.set_origin 330s and direct_recv.sub_receiver=subs3.sub_receiver) 330s where subs3.sub_receiver is null 330s ); 330s CREATE VIEW 330s create sequence public.sl_local_node_id 330s MINVALUE -1; 330s CREATE SEQUENCE 330s SELECT setval('public.sl_local_node_id', -1); 330s setval 330s -------- 330s -1 330s (1 row) 330s 330s comment on sequence public.sl_local_node_id is 'The local node ID is initialized to -1, meaning that this node is not initialized yet.'; 330s COMMENT 330s create sequence public.sl_event_seq; 330s CREATE SEQUENCE 330s comment on sequence public.sl_event_seq is 'The sequence for numbering events originating from this node.'; 330s COMMENT 330s select setval('public.sl_event_seq', 5000000000); 330s setval 330s ------------ 330s 5000000000 330s (1 row) 330s 330s create sequence public.sl_action_seq; 330s CREATE SEQUENCE 330s comment on sequence public.sl_action_seq is 'The sequence to number statements in the transaction logs, so that the replication engines can figure out the "agreeable" order of statements.'; 330s COMMENT 330s create sequence public.sl_log_status 330s MINVALUE 0 MAXVALUE 3; 330s CREATE SEQUENCE 330s SELECT setval('public.sl_log_status', 0); 330s setval 330s -------- 330s 0 330s (1 row) 330s 330s comment on sequence public.sl_log_status is ' 330s Bit 0x01 determines the currently active log table 330s Bit 0x02 tells if the engine needs to read both logs 330s after switching until the old log is clean and truncated. 330s 330s Possible values: 330s 0 sl_log_1 active, sl_log_2 clean 330s 1 sl_log_2 active, sl_log_1 clean 330s 2 sl_log_1 active, sl_log_2 unknown - cleanup 330s 3 sl_log_2 active, sl_log_1 unknown - cleanup 330s 330s This is not yet in use. 330s '; 330s COMMENT 330s create table public.sl_config_lock ( 330s dummy integer 330s ); 330s CREATE TABLE 330s comment on table public.sl_config_lock is 'This table exists solely to prevent overlapping execution of configuration change procedures and the resulting possible deadlocks. 330s '; 330s COMMENT 330s comment on column public.sl_config_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 330s COMMENT 330s create table public.sl_event_lock ( 330s dummy integer 330s ); 330s CREATE TABLE 330s comment on table public.sl_event_lock is 'This table exists solely to prevent multiple connections from concurrently creating new events and perhaps getting them out of order.'; 330s COMMENT 330s comment on column public.sl_event_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 330s COMMENT 330s create table public.sl_archive_counter ( 330s ac_num bigint, 330s ac_timestamp timestamptz 330s ) without oids; 330s CREATE TABLE 330s comment on table public.sl_archive_counter is 'Table used to generate the log shipping archive number. 330s '; 330s COMMENT 330s comment on column public.sl_archive_counter.ac_num is 'Counter of SYNC ID used in log shipping as the archive number'; 330s COMMENT 330s comment on column public.sl_archive_counter.ac_timestamp is 'Time at which the archive log was generated on the subscriber'; 330s COMMENT 330s insert into public.sl_archive_counter (ac_num, ac_timestamp) 330s values (0, 'epoch'::timestamptz); 330s INSERT 0 1 330s create table public.sl_components ( 330s co_actor text not null primary key, 330s co_pid integer not null, 330s co_node integer not null, 330s co_connection_pid integer not null, 330s co_activity text, 330s co_starttime timestamptz not null, 330s co_event bigint, 330s co_eventtype text 330s ) without oids; 330s CREATE TABLE 330s comment on table public.sl_components is 'Table used to monitor what various slon/slonik components are doing'; 330s COMMENT 330s comment on column public.sl_components.co_actor is 'which component am I?'; 330s COMMENT 330s comment on column public.sl_components.co_pid is 'my process/thread PID on node where slon runs'; 330s COMMENT 330s comment on column public.sl_components.co_node is 'which node am I servicing?'; 330s COMMENT 330s comment on column public.sl_components.co_connection_pid is 'PID of database connection being used on database server'; 330s COMMENT 330s comment on column public.sl_components.co_activity is 'activity that I am up to'; 330s COMMENT 330s comment on column public.sl_components.co_starttime is 'when did my activity begin? (timestamp reported as per slon process on server running slon)'; 330s COMMENT 330s comment on column public.sl_components.co_eventtype is 'what kind of event am I processing? (commonly n/a for event loop main threads)'; 330s COMMENT 330s comment on column public.sl_components.co_event is 'which event have I started processing?'; 330s COMMENT 330s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 330s $BODY$ 330s DECLARE 330s c_delim text; 330s BEGIN 330s c_delim = ','; 330s IF (txt_before IS NULL or txt_before='') THEN 330s RETURN txt_new; 330s END IF; 330s RETURN txt_before || c_delim || txt_new; 330s END; 330s $BODY$ 330s LANGUAGE plpgsql; 330s CREATE FUNCTION 330s comment on function public.agg_text_sum(text,text) is 330s 'An accumulator function used by the slony string_agg function to 330s aggregate rows into a string'; 330s COMMENT 330s CREATE AGGREGATE public.string_agg(text) ( 330s SFUNC=public.agg_text_sum, 330s STYPE=text, 330s INITCOND='' 330s ); 330s CREATE AGGREGATE 330s grant usage on schema public to public; 330s GRANT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s NOTICE: checked validity of cluster main namespace - OK! 330s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) 330s returns bigint 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 330s language C 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) is 330s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 330s 330s Create an sl_event entry'; 330s COMMENT 330s create or replace function public.denyAccess () 330s returns trigger 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__denyAccess' 330s language C 330s security definer; 330s CREATE FUNCTION 330s comment on function public.denyAccess () is 330s 'Trigger function to prevent modifications to a table on a subscriber'; 330s COMMENT 330s grant execute on function public.denyAccess () to public; 330s GRANT 330s create or replace function public.lockedSet () 330s returns trigger 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__lockedSet' 330s language C; 330s CREATE FUNCTION 330s comment on function public.lockedSet () is 330s 'Trigger function to prevent modifications to a table before and after a moveSet()'; 330s COMMENT 330s create or replace function public.getLocalNodeId (p_cluster name) returns int4 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getLocalNodeId' 330s language C 330s security definer; 330s CREATE FUNCTION 330s grant execute on function public.getLocalNodeId (p_cluster name) to public; 330s GRANT 330s comment on function public.getLocalNodeId (p_cluster name) is 330s 'Returns the node ID of the node being serviced on the local database'; 330s COMMENT 330s create or replace function public.getModuleVersion () returns text 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getModuleVersion' 330s language C 330s security definer; 330s CREATE FUNCTION 330s grant execute on function public.getModuleVersion () to public; 330s GRANT 330s comment on function public.getModuleVersion () is 330s 'Returns the compiled-in version number of the Slony-I shared object'; 330s COMMENT 330s create or replace function public.resetSession() returns text 330s as '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__resetSession' 330s language C; 330s CREATE FUNCTION 330s create or replace function public.logApply () returns trigger 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApply' 330s language C 330s security definer; 330s CREATE FUNCTION 330s create or replace function public.logApplySetCacheSize (p_size int4) 330s returns int4 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySetCacheSize' 330s language C; 330s CREATE FUNCTION 330s create or replace function public.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval) 330s returns int4 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySaveStats' 330s language C; 330s CREATE FUNCTION 330s create or replace function public.checkmoduleversion () returns text as $$ 330s declare 330s moduleversion text; 330s begin 330s select into moduleversion public.getModuleVersion(); 330s if moduleversion <> '2.2.11' then 330s raise exception 'Slonik version: 2.2.11 != Slony-I version in PG build %', 330s moduleversion; 330s end if; 330s return null; 330s end;$$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.checkmoduleversion () is 330s 'Inline test function that verifies that slonik request for STORE 330s NODE/INIT CLUSTER is being run against a conformant set of 330s schema/functions.'; 330s COMMENT 330s select public.checkmoduleversion(); 330s checkmoduleversion 330s -------------------- 330s 330s (1 row) 330s 330s create or replace function public.decode_tgargs(bytea) returns text[] as 330s '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__slon_decode_tgargs' language C security definer; 330s CREATE FUNCTION 330s comment on function public.decode_tgargs(bytea) is 330s 'Translates the contents of pg_trigger.tgargs to an array of text arguments'; 330s COMMENT 330s grant execute on function public.decode_tgargs(bytea) to public; 330s GRANT 330s create or replace function public.check_namespace_validity () returns boolean as $$ 330s declare 330s c_cluster text; 330s begin 330s c_cluster := 'main'; 330s if c_cluster !~ E'^[[:alpha:]_][[:alnum:]_\$]{0,62}$' then 330s raise exception 'Cluster name % is not a valid SQL symbol!', c_cluster; 330s else 330s raise notice 'checked validity of cluster % namespace - OK!', c_cluster; 330s end if; 330s return 't'; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s select public.check_namespace_validity(); 330s check_namespace_validity 330s -------------------------- 330s t 330s (1 row) 330s 330s drop function public.check_namespace_validity(); 330s DROP FUNCTION 330s create or replace function public.logTrigger () returns trigger 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logTrigger' 330s language C 330s security definer; 330s CREATE FUNCTION 330s comment on function public.logTrigger () is 330s 'This is the trigger that is executed on the origin node that causes 330s updates to be recorded in sl_log_1/sl_log_2.'; 330s COMMENT 330s grant execute on function public.logTrigger () to public; 330s GRANT 330s create or replace function public.terminateNodeConnections (p_failed_node int4) returns int4 330s as $$ 330s declare 330s v_row record; 330s begin 330s for v_row in select nl_nodeid, nl_conncnt, 330s nl_backendpid from public.sl_nodelock 330s where nl_nodeid = p_failed_node for update 330s loop 330s perform public.killBackend(v_row.nl_backendpid, 'TERM'); 330s delete from public.sl_nodelock 330s where nl_nodeid = v_row.nl_nodeid 330s and nl_conncnt = v_row.nl_conncnt; 330s end loop; 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.terminateNodeConnections (p_failed_node int4) is 330s 'terminates all backends that have registered to be from the given node'; 330s COMMENT 330s create or replace function public.killBackend (p_pid int4, p_signame text) returns int4 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__killBackend' 330s language C; 330s CREATE FUNCTION 330s comment on function public.killBackend(p_pid int4, p_signame text) is 330s 'Send a signal to a postgres process. Requires superuser rights'; 330s COMMENT 330s create or replace function public.seqtrack (p_seqid int4, p_seqval int8) returns int8 330s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__seqtrack' 330s strict language C; 330s CREATE FUNCTION 330s comment on function public.seqtrack(p_seqid int4, p_seqval int8) is 330s 'Returns NULL if seqval has not changed since the last call for seqid'; 330s COMMENT 330s create or replace function public.slon_quote_brute(p_tab_fqname text) returns text 330s as $$ 330s declare 330s v_fqname text default ''; 330s begin 330s v_fqname := '"' || replace(p_tab_fqname,'"','""') || '"'; 330s return v_fqname; 330s end; 330s $$ language plpgsql immutable; 330s CREATE FUNCTION 330s comment on function public.slon_quote_brute(p_tab_fqname text) is 330s 'Brutally quote the given text'; 330s COMMENT 330s create or replace function public.slon_quote_input(p_tab_fqname text) returns text as $$ 330s declare 330s v_nsp_name text; 330s v_tab_name text; 330s v_i integer; 330s v_l integer; 330s v_pq2 integer; 330s begin 330s v_l := length(p_tab_fqname); 330s 330s -- Let us search for the dot 330s if p_tab_fqname like '"%' then 330s -- if the first part of the ident starts with a double quote, search 330s -- for the closing double quote, skipping over double double quotes. 330s v_i := 2; 330s while v_i <= v_l loop 330s if substr(p_tab_fqname, v_i, 1) != '"' then 330s v_i := v_i + 1; 330s else 330s v_i := v_i + 1; 330s if substr(p_tab_fqname, v_i, 1) != '"' then 330s exit; 330s end if; 330s v_i := v_i + 1; 330s end if; 330s end loop; 330s else 330s -- first part of ident is not quoted, search for the dot directly 330s v_i := 1; 330s while v_i <= v_l loop 330s if substr(p_tab_fqname, v_i, 1) = '.' then 330s exit; 330s end if; 330s v_i := v_i + 1; 330s end loop; 330s end if; 330s 330s -- v_i now points at the dot or behind the string. 330s 330s if substr(p_tab_fqname, v_i, 1) = '.' then 330s -- There is a dot now, so split the ident into its namespace 330s -- and objname parts and make sure each is quoted 330s v_nsp_name := substr(p_tab_fqname, 1, v_i - 1); 330s v_tab_name := substr(p_tab_fqname, v_i + 1); 330s if v_nsp_name not like '"%' then 330s v_nsp_name := '"' || replace(v_nsp_name, '"', '""') || 330s '"'; 330s end if; 330s if v_tab_name not like '"%' then 330s v_tab_name := '"' || replace(v_tab_name, '"', '""') || 330s '"'; 330s end if; 330s 330s return v_nsp_name || '.' || v_tab_name; 330s else 330s -- No dot ... must be just an ident without schema 330s if p_tab_fqname like '"%' then 330s return p_tab_fqname; 330s else 330s return '"' || replace(p_tab_fqname, '"', '""') || '"'; 330s end if; 330s end if; 330s 330s end;$$ language plpgsql immutable; 330s CREATE FUNCTION 330s comment on function public.slon_quote_input(p_text text) is 330s 'quote all words that aren''t quoted yet'; 330s COMMENT 330s create or replace function public.slonyVersionMajor() 330s returns int4 330s as $$ 330s begin 330s return 2; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.slonyVersionMajor () is 330s 'Returns the major version number of the slony schema'; 330s COMMENT 330s create or replace function public.slonyVersionMinor() 330s returns int4 330s as $$ 330s begin 330s return 2; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.slonyVersionMinor () is 330s 'Returns the minor version number of the slony schema'; 330s COMMENT 330s create or replace function public.slonyVersionPatchlevel() 330s returns int4 330s as $$ 330s begin 330s return 11; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.slonyVersionPatchlevel () is 330s 'Returns the version patch level of the slony schema'; 330s COMMENT 330s create or replace function public.slonyVersion() 330s returns text 330s as $$ 330s begin 330s return public.slonyVersionMajor()::text || '.' || 330s public.slonyVersionMinor()::text || '.' || 330s public.slonyVersionPatchlevel()::text ; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.slonyVersion() is 330s 'Returns the version number of the slony schema'; 330s COMMENT 330s create or replace function public.registry_set_int4(p_key text, p_value int4) 330s returns int4 as $$ 330s BEGIN 330s if p_value is null then 330s delete from public.sl_registry 330s where reg_key = p_key; 330s else 330s lock table public.sl_registry; 330s update public.sl_registry 330s set reg_int4 = p_value 330s where reg_key = p_key; 330s if not found then 330s insert into public.sl_registry (reg_key, reg_int4) 330s values (p_key, p_value); 330s end if; 330s end if; 330s return p_value; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.registry_set_int4(p_key text, p_value int4) is 330s 'registry_set_int4(key, value) 330s 330s Set or delete a registry value'; 330s COMMENT 330s create or replace function public.registry_get_int4(p_key text, p_default int4) 330s returns int4 as $$ 330s DECLARE 330s v_value int4; 330s BEGIN 330s select reg_int4 into v_value from public.sl_registry 330s where reg_key = p_key; 330s if not found then 330s v_value = p_default; 330s if p_default notnull then 330s perform public.registry_set_int4(p_key, p_default); 330s end if; 330s else 330s if v_value is null then 330s raise exception 'Slony-I: registry key % is not an int4 value', 330s p_key; 330s end if; 330s end if; 330s return v_value; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.registry_get_int4(p_key text, p_default int4) is 330s 'registry_get_int4(key, value) 330s 330s Get a registry value. If not present, set and return the default.'; 330s COMMENT 330s create or replace function public.registry_set_text(p_key text, p_value text) 330s returns text as $$ 330s BEGIN 330s if p_value is null then 330s delete from public.sl_registry 330s where reg_key = p_key; 330s else 330s lock table public.sl_registry; 330s update public.sl_registry 330s set reg_text = p_value 330s where reg_key = p_key; 330s if not found then 330s insert into public.sl_registry (reg_key, reg_text) 330s values (p_key, p_value); 330s end if; 330s end if; 330s return p_value; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.registry_set_text(text, text) is 330s 'registry_set_text(key, value) 330s 330s Set or delete a registry value'; 330s COMMENT 330s create or replace function public.registry_get_text(p_key text, p_default text) 330s returns text as $$ 330s DECLARE 330s v_value text; 330s BEGIN 330s select reg_text into v_value from public.sl_registry 330s where reg_key = p_key; 330s if not found then 330s v_value = p_default; 330s if p_default notnull then 330s perform public.registry_set_text(p_key, p_default); 330s end if; 330s else 330s if v_value is null then 330s raise exception 'Slony-I: registry key % is not a text value', 330s p_key; 330s end if; 330s end if; 330s return v_value; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.registry_get_text(p_key text, p_default text) is 330s 'registry_get_text(key, value) 330s 330s Get a registry value. If not present, set and return the default.'; 330s COMMENT 330s create or replace function public.registry_set_timestamp(p_key text, p_value timestamptz) 330s returns timestamp as $$ 330s BEGIN 330s if p_value is null then 330s delete from public.sl_registry 330s where reg_key = p_key; 330s else 330s lock table public.sl_registry; 330s update public.sl_registry 330s set reg_timestamp = p_value 330s where reg_key = p_key; 330s if not found then 330s insert into public.sl_registry (reg_key, reg_timestamp) 330s values (p_key, p_value); 330s end if; 330s end if; 330s return p_value; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.registry_set_timestamp(p_key text, p_value timestamptz) is 330s 'registry_set_timestamp(key, value) 330s 330s Set or delete a registry value'; 330s COMMENT 330s create or replace function public.registry_get_timestamp(p_key text, p_default timestamptz) 330s returns timestamp as $$ 330s DECLARE 330s v_value timestamp; 330s BEGIN 330s select reg_timestamp into v_value from public.sl_registry 330s where reg_key = p_key; 330s if not found then 330s v_value = p_default; 330s if p_default notnull then 330s perform public.registry_set_timestamp(p_key, p_default); 330s end if; 330s else 330s if v_value is null then 330s raise exception 'Slony-I: registry key % is not an timestamp value', 330s p_key; 330s end if; 330s end if; 330s return v_value; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.registry_get_timestamp(p_key text, p_default timestamptz) is 330s 'registry_get_timestamp(key, value) 330s 330s Get a registry value. If not present, set and return the default.'; 330s COMMENT 330s create or replace function public.cleanupNodelock () 330s returns int4 330s as $$ 330s declare 330s v_row record; 330s begin 330s for v_row in select nl_nodeid, nl_conncnt, nl_backendpid 330s from public.sl_nodelock 330s for update 330s loop 330s if public.killBackend(v_row.nl_backendpid, 'NULL') < 0 then 330s raise notice 'Slony-I: cleanup stale sl_nodelock entry for pid=%', 330s v_row.nl_backendpid; 330s delete from public.sl_nodelock where 330s nl_nodeid = v_row.nl_nodeid and 330s nl_conncnt = v_row.nl_conncnt; 330s end if; 330s end loop; 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.cleanupNodelock() is 330s 'Clean up stale entries when restarting slon'; 330s COMMENT 330s create or replace function public.registerNodeConnection (p_nodeid int4) 330s returns int4 330s as $$ 330s begin 330s insert into public.sl_nodelock 330s (nl_nodeid, nl_backendpid) 330s values 330s (p_nodeid, pg_backend_pid()); 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.registerNodeConnection (p_nodeid int4) is 330s 'Register (uniquely) the node connection so that only one slon can service the node'; 330s COMMENT 330s create or replace function public.initializeLocalNode (p_local_node_id int4, p_comment text) 330s returns int4 330s as $$ 330s declare 330s v_old_node_id int4; 330s v_first_log_no int4; 330s v_event_seq int8; 330s begin 330s -- ---- 330s -- Make sure this node is uninitialized or got reset 330s -- ---- 330s select last_value::int4 into v_old_node_id from public.sl_local_node_id; 330s if v_old_node_id != -1 then 330s raise exception 'Slony-I: This node is already initialized'; 330s end if; 330s 330s -- ---- 330s -- Set sl_local_node_id to the requested value and add our 330s -- own system to sl_node. 330s -- ---- 330s perform setval('public.sl_local_node_id', p_local_node_id); 330s perform public.storeNode_int (p_local_node_id, p_comment); 330s 330s if (pg_catalog.current_setting('max_identifier_length')::integer - pg_catalog.length('public')) < 5 then 330s raise notice 'Slony-I: Cluster name length [%] versus system max_identifier_length [%] ', pg_catalog.length('public'), pg_catalog.current_setting('max_identifier_length'); 330s raise notice 'leaves narrow/no room for some Slony-I-generated objects (such as indexes).'; 330s raise notice 'You may run into problems later!'; 330s end if; 330s 330s -- 330s -- Put the apply trigger onto sl_log_1 and sl_log_2 330s -- 330s create trigger apply_trigger 330s before INSERT on public.sl_log_1 330s for each row execute procedure public.logApply('_main'); 330s alter table public.sl_log_1 330s enable replica trigger apply_trigger; 330s create trigger apply_trigger 330s before INSERT on public.sl_log_2 330s for each row execute procedure public.logApply('_main'); 330s alter table public.sl_log_2 330s enable replica trigger apply_trigger; 330s 330s return p_local_node_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.initializeLocalNode (p_local_node_id int4, p_comment text) is 330s 'no_id - Node ID # 330s no_comment - Human-oriented comment 330s 330s Initializes the new node, no_id'; 330s COMMENT 330s create or replace function public.storeNode (p_no_id int4, p_no_comment text) 330s returns bigint 330s as $$ 330s begin 330s perform public.storeNode_int (p_no_id, p_no_comment); 330s return public.createEvent('_main', 'STORE_NODE', 330s p_no_id::text, p_no_comment::text); 330s end; 330s $$ language plpgsql 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.storeNode(p_no_id int4, p_no_comment text) is 330s 'no_id - Node ID # 330s no_comment - Human-oriented comment 330s 330s Generate the STORE_NODE event for node no_id'; 330s COMMENT 330s create or replace function public.storeNode_int (p_no_id int4, p_no_comment text) 330s returns int4 330s as $$ 330s declare 330s v_old_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check if the node exists 330s -- ---- 330s select * into v_old_row 330s from public.sl_node 330s where no_id = p_no_id 330s for update; 330s if found then 330s -- ---- 330s -- Node exists, update the existing row. 330s -- ---- 330s update public.sl_node 330s set no_comment = p_no_comment 330s where no_id = p_no_id; 330s else 330s -- ---- 330s -- New node, insert the sl_node row 330s -- ---- 330s insert into public.sl_node 330s (no_id, no_active, no_comment,no_failed) values 330s (p_no_id, 'f', p_no_comment,false); 330s end if; 330s 330s return p_no_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.storeNode_int(p_no_id int4, p_no_comment text) is 330s 'no_id - Node ID # 330s no_comment - Human-oriented comment 330s 330s Internal function to process the STORE_NODE event for node no_id'; 330s COMMENT 330s create or replace function public.enableNode (p_no_id int4) 330s returns bigint 330s as $$ 330s declare 330s v_local_node_id int4; 330s v_node_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that we are the node to activate and that we are 330s -- currently disabled. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select * into v_node_row 330s from public.sl_node 330s where no_id = p_no_id 330s for update; 330s if not found then 330s raise exception 'Slony-I: node % not found', p_no_id; 330s end if; 330s if v_node_row.no_active then 330s raise exception 'Slony-I: node % is already active', p_no_id; 330s end if; 330s 330s -- ---- 330s -- Activate this node and generate the ENABLE_NODE event 330s -- ---- 330s perform public.enableNode_int (p_no_id); 330s return public.createEvent('_main', 'ENABLE_NODE', 330s p_no_id::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.enableNode(p_no_id int4) is 330s 'no_id - Node ID # 330s 330s Generate the ENABLE_NODE event for node no_id'; 330s COMMENT 330s create or replace function public.enableNode_int (p_no_id int4) 330s returns int4 330s as $$ 330s declare 330s v_local_node_id int4; 330s v_node_row record; 330s v_sub_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that the node is inactive 330s -- ---- 330s select * into v_node_row 330s from public.sl_node 330s where no_id = p_no_id 330s for update; 330s if not found then 330s raise exception 'Slony-I: node % not found', p_no_id; 330s end if; 330s if v_node_row.no_active then 330s return p_no_id; 330s end if; 330s 330s -- ---- 330s -- Activate the node and generate sl_confirm status rows for it. 330s -- ---- 330s update public.sl_node 330s set no_active = 't' 330s where no_id = p_no_id; 330s insert into public.sl_confirm 330s (con_origin, con_received, con_seqno) 330s select no_id, p_no_id, 0 from public.sl_node 330s where no_id != p_no_id 330s and no_active; 330s insert into public.sl_confirm 330s (con_origin, con_received, con_seqno) 330s select p_no_id, no_id, 0 from public.sl_node 330s where no_id != p_no_id 330s and no_active; 330s 330s -- ---- 330s -- Generate ENABLE_SUBSCRIPTION events for all sets that 330s -- origin here and are subscribed by the just enabled node. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s for v_sub_row in select SUB.sub_set, SUB.sub_provider from 330s public.sl_set S, 330s public.sl_subscribe SUB 330s where S.set_origin = v_local_node_id 330s and S.set_id = SUB.sub_set 330s and SUB.sub_receiver = p_no_id 330s for update of S 330s loop 330s perform public.enableSubscription (v_sub_row.sub_set, 330s v_sub_row.sub_provider, p_no_id); 330s end loop; 330s 330s return p_no_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.enableNode_int(p_no_id int4) is 330s 'no_id - Node ID # 330s 330s Internal function to process the ENABLE_NODE event for node no_id'; 330s COMMENT 330s create or replace function public.disableNode (p_no_id int4) 330s returns bigint 330s as $$ 330s begin 330s -- **** TODO **** 330s raise exception 'Slony-I: disableNode() not implemented'; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.disableNode(p_no_id int4) is 330s 'generate DISABLE_NODE event for node no_id'; 330s COMMENT 330s create or replace function public.disableNode_int (p_no_id int4) 330s returns int4 330s as $$ 330s begin 330s -- **** TODO **** 330s raise exception 'Slony-I: disableNode_int() not implemented'; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.disableNode(p_no_id int4) is 330s 'process DISABLE_NODE event for node no_id 330s 330s NOTE: This is not yet implemented!'; 330s COMMENT 330s create or replace function public.dropNode (p_no_ids int4[]) 330s returns bigint 330s as $$ 330s declare 330s v_node_row record; 330s v_idx integer; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that this got called on a different node 330s -- ---- 330s if public.getLocalNodeId('_main') = ANY (p_no_ids) then 330s raise exception 'Slony-I: DROP_NODE cannot initiate on the dropped node'; 330s end if; 330s 330s -- 330s -- if any of the deleted nodes are receivers we drop the sl_subscribe line 330s -- 330s delete from public.sl_subscribe where sub_receiver = ANY (p_no_ids); 330s 330s v_idx:=1; 330s LOOP 330s EXIT WHEN v_idx>array_upper(p_no_ids,1) ; 330s select * into v_node_row from public.sl_node 330s where no_id = p_no_ids[v_idx] 330s for update; 330s if not found then 330s raise exception 'Slony-I: unknown node ID % %', p_no_ids[v_idx],v_idx; 330s end if; 330s -- ---- 330s -- Make sure we do not break other nodes subscriptions with this 330s -- ---- 330s if exists (select true from public.sl_subscribe 330s where sub_provider = p_no_ids[v_idx]) 330s then 330s raise exception 'Slony-I: Node % is still configured as a data provider', 330s p_no_ids[v_idx]; 330s end if; 330s 330s -- ---- 330s -- Make sure no set originates there any more 330s -- ---- 330s if exists (select true from public.sl_set 330s where set_origin = p_no_ids[v_idx]) 330s then 330s raise exception 'Slony-I: Node % is still origin of one or more sets', 330s p_no_ids[v_idx]; 330s end if; 330s 330s -- ---- 330s -- Call the internal drop functionality and generate the event 330s -- ---- 330s perform public.dropNode_int(p_no_ids[v_idx]); 330s v_idx:=v_idx+1; 330s END LOOP; 330s return public.createEvent('_main', 'DROP_NODE', 330s array_to_string(p_no_ids,',')); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropNode(p_no_ids int4[]) is 330s 'generate DROP_NODE event to drop node node_id from replication'; 330s COMMENT 330s create or replace function public.dropNode_int (p_no_id int4) 330s returns int4 330s as $$ 330s declare 330s v_tab_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- If the dropped node is a remote node, clean the configuration 330s -- from all traces for it. 330s -- ---- 330s if p_no_id <> public.getLocalNodeId('_main') then 330s delete from public.sl_subscribe 330s where sub_receiver = p_no_id; 330s delete from public.sl_listen 330s where li_origin = p_no_id 330s or li_provider = p_no_id 330s or li_receiver = p_no_id; 330s delete from public.sl_path 330s where pa_server = p_no_id 330s or pa_client = p_no_id; 330s delete from public.sl_confirm 330s where con_origin = p_no_id 330s or con_received = p_no_id; 330s delete from public.sl_event 330s where ev_origin = p_no_id; 330s delete from public.sl_node 330s where no_id = p_no_id; 330s 330s return p_no_id; 330s end if; 330s 330s -- ---- 330s -- This is us ... deactivate the node for now, the daemon 330s -- will call uninstallNode() in a separate transaction. 330s -- ---- 330s update public.sl_node 330s set no_active = false 330s where no_id = p_no_id; 330s 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s return p_no_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropNode_int(p_no_id int4) is 330s 'internal function to process DROP_NODE event to drop node node_id from replication'; 330s COMMENT 330s create or replace function public.preFailover(p_failed_node int4,p_is_candidate boolean) 330s returns int4 330s as $$ 330s declare 330s v_row record; 330s v_row2 record; 330s v_n int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- All consistency checks first 330s 330s if p_is_candidate then 330s -- ---- 330s -- Check all sets originating on the failed node 330s -- ---- 330s for v_row in select set_id 330s from public.sl_set 330s where set_origin = p_failed_node 330s loop 330s -- ---- 330s -- Check that the backup node is subscribed to all sets 330s -- that originate on the failed node 330s -- ---- 330s select into v_row2 sub_forward, sub_active 330s from public.sl_subscribe 330s where sub_set = v_row.set_id 330s and sub_receiver = public.getLocalNodeId('_main'); 330s if not found then 330s raise exception 'Slony-I: cannot failover - node % is not subscribed to set %', 330s public.getLocalNodeId('_main'), v_row.set_id; 330s end if; 330s 330s -- ---- 330s -- Check that the subscription is active 330s -- ---- 330s if not v_row2.sub_active then 330s raise exception 'Slony-I: cannot failover - subscription for set % is not active', 330s v_row.set_id; 330s end if; 330s 330s -- ---- 330s -- If there are other subscribers, the backup node needs to 330s -- be a forwarder too. 330s -- ---- 330s select into v_n count(*) 330s from public.sl_subscribe 330s where sub_set = v_row.set_id 330s and sub_receiver <> public.getLocalNodeId('_main'); 330s if v_n > 0 and not v_row2.sub_forward then 330s raise exception 'Slony-I: cannot failover - node % is not a forwarder of set %', 330s public.getLocalNodeId('_main'), v_row.set_id; 330s end if; 330s end loop; 330s end if; 330s 330s -- ---- 330s -- Terminate all connections of the failed node the hard way 330s -- ---- 330s perform public.terminateNodeConnections(p_failed_node); 330s 330s update public.sl_path set pa_conninfo='' WHERE 330s pa_server=p_failed_node; 330s notify "_main_Restart"; 330s -- ---- 330s -- That is it - so far. 330s -- ---- 330s return p_failed_node; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.preFailover(p_failed_node int4,is_failover_candidate boolean) is 330s 'Prepare for a failover. This function is called on all candidate nodes. 330s It blanks the paths to the failed node 330s and then restart of all node daemons.'; 330s COMMENT 330s create or replace function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) 330s returns int4 330s as $$ 330s declare 330s v_row record; 330s v_row2 record; 330s v_failed boolean; 330s v_restart_required boolean; 330s begin 330s 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s v_restart_required:=false; 330s -- 330s -- any nodes other than the backup receiving 330s -- ANY subscription from a failed node 330s -- will now get that data from the backup node. 330s update public.sl_subscribe set 330s sub_provider=p_backup_node 330s where sub_provider=p_failed_node 330s and sub_receiver<>p_backup_node 330s and sub_receiver <> ALL (p_failed_nodes); 330s if found then 330s v_restart_required:=true; 330s end if; 330s -- 330s -- if this node is receiving a subscription from the backup node 330s -- with a failed node as the provider we need to fix this. 330s update public.sl_subscribe set 330s sub_provider=p_backup_node 330s from public.sl_set 330s where set_id = sub_set 330s and set_origin=p_failed_node 330s and sub_provider = ANY(p_failed_nodes) 330s and sub_receiver=public.getLocalNodeId('_main'); 330s 330s -- ---- 330s -- Terminate all connections of the failed node the hard way 330s -- ---- 330s perform public.terminateNodeConnections(p_failed_node); 330s 330s -- Clear out the paths for the failed node. 330s -- This ensures that *this* node won't be pulling data from 330s -- the failed node even if it *does* become accessible 330s 330s update public.sl_path set pa_conninfo='' WHERE 330s pa_server=p_failed_node 330s and pa_conninfo<>''; 330s 330s if found then 330s v_restart_required:=true; 330s end if; 330s 330s v_failed := exists (select 1 from public.sl_node 330s where no_failed=true and no_id=p_failed_node); 330s 330s if not v_failed then 330s 330s update public.sl_node set no_failed=true where no_id = ANY (p_failed_nodes) 330s and no_failed=false; 330s if found then 330s v_restart_required:=true; 330s end if; 330s end if; 330s 330s if v_restart_required then 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s -- ---- 330s -- Make sure the node daemon will restart 330s -- ---- 330s notify "_main_Restart"; 330s end if; 330s 330s 330s -- ---- 330s -- That is it - so far. 330s -- ---- 330s return p_failed_node; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) is 330s 'Initiate failover from failed_node to backup_node. This function must be called on all nodes, 330s and then waited for the restart of all node daemons.'; 330s COMMENT 330s create or replace function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8, p_failed_nodes integer[]) 330s returns bigint 330s as $$ 330s declare 330s v_row record; 330s v_new_event bigint; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s select * into v_row 330s from public.sl_event 330s where ev_origin = p_failed_node 330s and ev_seqno = p_ev_seqno; 330s if not found then 330s raise exception 'Slony-I: event %,% not found', 330s p_failed_node, p_ev_seqno; 330s end if; 330s 330s update public.sl_node set no_failed=true where no_id = ANY 330s (p_failed_nodes) and no_failed=false; 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s -- ---- 330s -- Make sure the node daemon will restart 330s -- ---- 330s raise notice 'calling restart node %',p_failed_node; 330s 330s notify "_main_Restart"; 330s 330s select public.createEvent('_main','FAILOVER_NODE', 330s p_failed_node::text,p_ev_seqno::text, 330s array_to_string(p_failed_nodes,',')) 330s into v_new_event; 330s 330s 330s return v_new_event; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8,p_failed_nodes integer[] ) is 330s 'FUNCTION failedNode2 (failed_node, backup_node, set_id, ev_seqno, ev_seqfake,p_failed_nodes) 330s 330s On the node that has the highest sequence number of the failed node, 330s fake the FAILOVER_SET event.'; 330s COMMENT 330s create or replace function public.failedNode3 (p_failed_node int4, p_backup_node int4,p_seq_no bigint) 330s returns int4 330s as $$ 330s declare 330s 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s perform public.failoverSet_int(p_failed_node, 330s p_backup_node,p_seq_no); 330s 330s notify "_main_Restart"; 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s NOTICE: function public.clonenodeprepare(int4,int4,text) does not exist, skipping 330s NOTICE: function public.ddlcapture(text,text) does not exist, skipping 330s NOTICE: function public.ddlscript_complete(int4,text,int4) does not exist, skipping 330s NOTICE: function public.ddlscript_complete_int(int4,int4) does not exist, skipping 330s NOTICE: function public.subscribeset_int(int4,int4,int4,bool,bool) does not exist, skipping 330s NOTICE: function public.unsubscribeset(int4,int4,pg_catalog.bool) does not exist, skipping 330s create or replace function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_last_seqno bigint) 330s returns int4 330s as $$ 330s declare 330s v_row record; 330s v_last_sync int8; 330s v_set int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s SELECT max(ev_seqno) into v_last_sync FROM public.sl_event where 330s ev_origin=p_failed_node; 330s if v_last_sync > p_last_seqno then 330s -- this node is ahead of the last sequence number from the 330s -- failed node that the backup node has. 330s -- this node must unsubscribe from all sets from the origin. 330s for v_set in select set_id from public.sl_set where 330s set_origin=p_failed_node 330s loop 330s raise warning 'Slony is dropping the subscription of set % found sync %s bigger than %s ' 330s , v_set, v_last_sync::text, p_last_seqno::text; 330s perform public.unsubscribeSet(v_set, 330s public.getLocalNodeId('_main'), 330s true); 330s end loop; 330s delete from public.sl_event where ev_origin=p_failed_node 330s and ev_seqno > p_last_seqno; 330s end if; 330s -- ---- 330s -- Change the origin of the set now to the backup node. 330s -- On the backup node this includes changing all the 330s -- trigger and protection stuff 330s for v_set in select set_id from public.sl_set where 330s set_origin=p_failed_node 330s loop 330s -- ---- 330s if p_backup_node = public.getLocalNodeId('_main') then 330s delete from public.sl_setsync 330s where ssy_setid = v_set; 330s delete from public.sl_subscribe 330s where sub_set = v_set 330s and sub_receiver = p_backup_node; 330s update public.sl_set 330s set set_origin = p_backup_node 330s where set_id = v_set; 330s update public.sl_subscribe 330s set sub_provider=p_backup_node 330s FROM public.sl_node receive_node 330s where sub_set = v_set 330s and sub_provider=p_failed_node 330s and sub_receiver=receive_node.no_id 330s and receive_node.no_failed=false; 330s 330s for v_row in select * from public.sl_table 330s where tab_set = v_set 330s order by tab_id 330s loop 330s perform public.alterTableConfigureTriggers(v_row.tab_id); 330s end loop; 330s else 330s raise notice 'deleting from sl_subscribe all rows with receiver %', 330s p_backup_node; 330s 330s delete from public.sl_subscribe 330s where sub_set = v_set 330s and sub_receiver = p_backup_node; 330s 330s update public.sl_subscribe 330s set sub_provider=p_backup_node 330s FROM public.sl_node receive_node 330s where sub_set = v_set 330s and sub_provider=p_failed_node 330s and sub_provider=p_failed_node 330s and sub_receiver=receive_node.no_id 330s and receive_node.no_failed=false; 330s update public.sl_set 330s set set_origin = p_backup_node 330s where set_id = v_set; 330s -- ---- 330s -- If we are a subscriber of the set ourself, change our 330s -- setsync status to reflect the new set origin. 330s -- ---- 330s if exists (select true from public.sl_subscribe 330s where sub_set = v_set 330s and sub_receiver = public.getLocalNodeId( 330s '_main')) 330s then 330s delete from public.sl_setsync 330s where ssy_setid = v_set; 330s 330s select coalesce(max(ev_seqno), 0) into v_last_sync 330s from public.sl_event 330s where ev_origin = p_backup_node 330s and ev_type = 'SYNC'; 330s if v_last_sync > 0 then 330s insert into public.sl_setsync 330s (ssy_setid, ssy_origin, ssy_seqno, 330s ssy_snapshot, ssy_action_list) 330s select v_set, p_backup_node, v_last_sync, 330s ev_snapshot, NULL 330s from public.sl_event 330s where ev_origin = p_backup_node 330s and ev_seqno = v_last_sync; 330s else 330s insert into public.sl_setsync 330s (ssy_setid, ssy_origin, ssy_seqno, 330s ssy_snapshot, ssy_action_list) 330s values (v_set, p_backup_node, '0', 330s '1:1:', NULL); 330s end if; 330s end if; 330s end if; 330s end loop; 330s 330s --If there are any subscriptions with 330s --the failed_node being the provider then 330s --we want to redirect those subscriptions 330s --to come from the backup node. 330s -- 330s -- The backup node should be a valid 330s -- provider for all subscriptions served 330s -- by the failed node. (otherwise it 330s -- wouldn't be a allowable backup node). 330s -- delete from public.sl_subscribe 330s -- where sub_receiver=p_backup_node; 330s 330s update public.sl_subscribe 330s set sub_provider=p_backup_node 330s from public.sl_node 330s where sub_provider=p_failed_node 330s and sl_node.no_id=sub_receiver 330s and sl_node.no_failed=false 330s and sub_receiver<>p_backup_node; 330s 330s update public.sl_subscribe 330s set sub_provider=(select set_origin from 330s public.sl_set where set_id= 330s sub_set) 330s where sub_provider=p_failed_node 330s and sub_receiver=p_backup_node; 330s 330s update public.sl_node 330s set no_active=false WHERE 330s no_id=p_failed_node; 330s 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s 330s return p_failed_node; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_seqno bigint) is 330s 'FUNCTION failoverSet_int (failed_node, backup_node, set_id, wait_seqno) 330s 330s Finish failover for one set.'; 330s COMMENT 330s create or replace function public.uninstallNode () 330s returns int4 330s as $$ 330s declare 330s v_tab_row record; 330s begin 330s raise notice 'Slony-I: Please drop schema "_main"'; 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.uninstallNode() is 330s 'Reset the whole database to standalone by removing the whole 330s replication system.'; 330s COMMENT 330s DROP FUNCTION IF EXISTS public.cloneNodePrepare(int4,int4,text); 330s DROP FUNCTION 330s create or replace function public.cloneNodePrepare (p_no_id int4, p_no_provider int4, p_no_comment text) 330s returns bigint 330s as $$ 330s begin 330s perform public.cloneNodePrepare_int (p_no_id, p_no_provider, p_no_comment); 330s return public.createEvent('_main', 'CLONE_NODE', 330s p_no_id::text, p_no_provider::text, 330s p_no_comment::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.cloneNodePrepare(p_no_id int4, p_no_provider int4, p_no_comment text) is 330s 'Prepare for cloning a node.'; 330s COMMENT 330s create or replace function public.cloneNodePrepare_int (p_no_id int4, p_no_provider int4, p_no_comment text) 330s returns int4 330s as $$ 330s declare 330s v_dummy int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s update public.sl_node set 330s no_active = np.no_active, 330s no_comment = np.no_comment, 330s no_failed = np.no_failed 330s from public.sl_node np 330s where np.no_id = p_no_provider 330s and sl_node.no_id = p_no_id; 330s if not found then 330s insert into public.sl_node 330s (no_id, no_active, no_comment,no_failed) 330s select p_no_id, no_active, p_no_comment, no_failed 330s from public.sl_node 330s where no_id = p_no_provider; 330s end if; 330s 330s insert into public.sl_path 330s (pa_server, pa_client, pa_conninfo, pa_connretry) 330s select pa_server, p_no_id, '', pa_connretry 330s from public.sl_path 330s where pa_client = p_no_provider 330s and (pa_server, p_no_id) not in (select pa_server, pa_client 330s from public.sl_path); 330s 330s insert into public.sl_path 330s (pa_server, pa_client, pa_conninfo, pa_connretry) 330s select p_no_id, pa_client, '', pa_connretry 330s from public.sl_path 330s where pa_server = p_no_provider 330s and (p_no_id, pa_client) not in (select pa_server, pa_client 330s from public.sl_path); 330s 330s insert into public.sl_subscribe 330s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 330s select sub_set, sub_provider, p_no_id, sub_forward, sub_active 330s from public.sl_subscribe 330s where sub_receiver = p_no_provider; 330s 330s insert into public.sl_confirm 330s (con_origin, con_received, con_seqno, con_timestamp) 330s select con_origin, p_no_id, con_seqno, con_timestamp 330s from public.sl_confirm 330s where con_received = p_no_provider; 330s 330s perform public.RebuildListenEntries(); 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.cloneNodePrepare_int(p_no_id int4, p_no_provider int4, p_no_comment text) is 330s 'Internal part of cloneNodePrepare().'; 330s COMMENT 330s create or replace function public.cloneNodeFinish (p_no_id int4, p_no_provider int4) 330s returns int4 330s as $$ 330s declare 330s v_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s perform "pg_catalog".setval('public.sl_local_node_id', p_no_id); 330s perform public.resetSession(); 330s for v_row in select sub_set from public.sl_subscribe 330s where sub_receiver = p_no_id 330s loop 330s perform public.updateReloid(v_row.sub_set, p_no_id); 330s end loop; 330s 330s perform public.RebuildListenEntries(); 330s 330s delete from public.sl_confirm 330s where con_received = p_no_id; 330s insert into public.sl_confirm 330s (con_origin, con_received, con_seqno, con_timestamp) 330s select con_origin, p_no_id, con_seqno, con_timestamp 330s from public.sl_confirm 330s where con_received = p_no_provider; 330s insert into public.sl_confirm 330s (con_origin, con_received, con_seqno, con_timestamp) 330s select p_no_provider, p_no_id, 330s (select max(ev_seqno) from public.sl_event 330s where ev_origin = p_no_provider), current_timestamp; 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.cloneNodeFinish(p_no_id int4, p_no_provider int4) is 330s 'Internal part of cloneNodePrepare().'; 330s COMMENT 330s create or replace function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 330s returns bigint 330s as $$ 330s begin 330s perform public.storePath_int(p_pa_server, p_pa_client, 330s p_pa_conninfo, p_pa_connretry); 330s return public.createEvent('_main', 'STORE_PATH', 330s p_pa_server::text, p_pa_client::text, 330s p_pa_conninfo::text, p_pa_connretry::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 330s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 330s 330s Generate the STORE_PATH event indicating that node pa_client can 330s access node pa_server using DSN pa_conninfo'; 330s COMMENT 330s create or replace function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 330s returns int4 330s as $$ 330s declare 330s v_dummy int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check if the path already exists 330s -- ---- 330s select 1 into v_dummy 330s from public.sl_path 330s where pa_server = p_pa_server 330s and pa_client = p_pa_client 330s for update; 330s if found then 330s -- ---- 330s -- Path exists, update pa_conninfo 330s -- ---- 330s update public.sl_path 330s set pa_conninfo = p_pa_conninfo, 330s pa_connretry = p_pa_connretry 330s where pa_server = p_pa_server 330s and pa_client = p_pa_client; 330s else 330s -- ---- 330s -- New path 330s -- 330s -- In case we receive STORE_PATH events before we know 330s -- about the nodes involved in this, we generate those nodes 330s -- as pending. 330s -- ---- 330s if not exists (select 1 from public.sl_node 330s where no_id = p_pa_server) then 330s perform public.storeNode_int (p_pa_server, ''); 330s end if; 330s if not exists (select 1 from public.sl_node 330s where no_id = p_pa_client) then 330s perform public.storeNode_int (p_pa_client, ''); 330s end if; 330s insert into public.sl_path 330s (pa_server, pa_client, pa_conninfo, pa_connretry) values 330s (p_pa_server, p_pa_client, p_pa_conninfo, p_pa_connretry); 330s end if; 330s 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 330s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 330s 330s Process the STORE_PATH event indicating that node pa_client can 330s access node pa_server using DSN pa_conninfo'; 330s COMMENT 330s create or replace function public.dropPath (p_pa_server int4, p_pa_client int4) 330s returns bigint 330s as $$ 330s declare 330s v_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- There should be no existing subscriptions. Auto unsubscribing 330s -- is considered too dangerous. 330s -- ---- 330s for v_row in select sub_set, sub_provider, sub_receiver 330s from public.sl_subscribe 330s where sub_provider = p_pa_server 330s and sub_receiver = p_pa_client 330s loop 330s raise exception 330s 'Slony-I: Path cannot be dropped, subscription of set % needs it', 330s v_row.sub_set; 330s end loop; 330s 330s -- ---- 330s -- Drop all sl_listen entries that depend on this path 330s -- ---- 330s for v_row in select li_origin, li_provider, li_receiver 330s from public.sl_listen 330s where li_provider = p_pa_server 330s and li_receiver = p_pa_client 330s loop 330s perform public.dropListen( 330s v_row.li_origin, v_row.li_provider, v_row.li_receiver); 330s end loop; 330s 330s -- ---- 330s -- Now drop the path and create the event 330s -- ---- 330s perform public.dropPath_int(p_pa_server, p_pa_client); 330s 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s return public.createEvent ('_main', 'DROP_PATH', 330s p_pa_server::text, p_pa_client::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropPath (p_pa_server int4, p_pa_client int4) is 330s 'Generate DROP_PATH event to drop path from pa_server to pa_client'; 330s COMMENT 330s create or replace function public.dropPath_int (p_pa_server int4, p_pa_client int4) 330s returns int4 330s as $$ 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Remove any dangling sl_listen entries with the server 330s -- as provider and the client as receiver. This must have 330s -- been cleared out before, but obviously was not. 330s -- ---- 330s delete from public.sl_listen 330s where li_provider = p_pa_server 330s and li_receiver = p_pa_client; 330s 330s delete from public.sl_path 330s where pa_server = p_pa_server 330s and pa_client = p_pa_client; 330s 330s if found then 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s return 1; 330s else 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s return 0; 330s end if; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropPath_int (p_pa_server int4, p_pa_client int4) is 330s 'Process DROP_PATH event to drop path from pa_server to pa_client'; 330s COMMENT 330s create or replace function public.storeListen (p_origin int4, p_provider int4, p_receiver int4) 330s returns bigint 330s as $$ 330s begin 330s perform public.storeListen_int (p_origin, p_provider, p_receiver); 330s return public.createEvent ('_main', 'STORE_LISTEN', 330s p_origin::text, p_provider::text, p_receiver::text); 330s end; 330s $$ language plpgsql 330s called on null input; 330s CREATE FUNCTION 330s comment on function public.storeListen(p_origin int4, p_provider int4, p_receiver int4) is 330s 'FUNCTION storeListen (li_origin, li_provider, li_receiver) 330s 330s generate STORE_LISTEN event, indicating that receiver node li_receiver 330s listens to node li_provider in order to get messages coming from node 330s li_origin.'; 330s COMMENT 330s create or replace function public.storeListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 330s returns int4 330s as $$ 330s declare 330s v_exists int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s select 1 into v_exists 330s from public.sl_listen 330s where li_origin = p_li_origin 330s and li_provider = p_li_provider 330s and li_receiver = p_li_receiver; 330s if not found then 330s -- ---- 330s -- In case we receive STORE_LISTEN events before we know 330s -- about the nodes involved in this, we generate those nodes 330s -- as pending. 330s -- ---- 330s if not exists (select 1 from public.sl_node 330s where no_id = p_li_origin) then 330s perform public.storeNode_int (p_li_origin, ''); 330s end if; 330s if not exists (select 1 from public.sl_node 330s where no_id = p_li_provider) then 330s perform public.storeNode_int (p_li_provider, ''); 330s end if; 330s if not exists (select 1 from public.sl_node 330s where no_id = p_li_receiver) then 330s perform public.storeNode_int (p_li_receiver, ''); 330s end if; 330s 330s insert into public.sl_listen 330s (li_origin, li_provider, li_receiver) values 330s (p_li_origin, p_li_provider, p_li_receiver); 330s end if; 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.storeListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 330s 'FUNCTION storeListen_int (li_origin, li_provider, li_receiver) 330s 330s Process STORE_LISTEN event, indicating that receiver node li_receiver 330s listens to node li_provider in order to get messages coming from node 330s li_origin.'; 330s COMMENT 330s create or replace function public.dropListen (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 330s returns bigint 330s as $$ 330s begin 330s perform public.dropListen_int(p_li_origin, 330s p_li_provider, p_li_receiver); 330s 330s return public.createEvent ('_main', 'DROP_LISTEN', 330s p_li_origin::text, p_li_provider::text, p_li_receiver::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropListen(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 330s 'dropListen (li_origin, li_provider, li_receiver) 330s 330s Generate the DROP_LISTEN event.'; 330s COMMENT 330s create or replace function public.dropListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 330s returns int4 330s as $$ 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s delete from public.sl_listen 330s where li_origin = p_li_origin 330s and li_provider = p_li_provider 330s and li_receiver = p_li_receiver; 330s if found then 330s return 1; 330s else 330s return 0; 330s end if; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 330s 'dropListen (li_origin, li_provider, li_receiver) 330s 330s Process the DROP_LISTEN event, deleting the sl_listen entry for 330s the indicated (origin,provider,receiver) combination.'; 330s COMMENT 330s create or replace function public.storeSet (p_set_id int4, p_set_comment text) 330s returns bigint 330s as $$ 330s declare 330s v_local_node_id int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s v_local_node_id := public.getLocalNodeId('_main'); 330s 330s insert into public.sl_set 330s (set_id, set_origin, set_comment) values 330s (p_set_id, v_local_node_id, p_set_comment); 330s 330s return public.createEvent('_main', 'STORE_SET', 330s p_set_id::text, v_local_node_id::text, p_set_comment::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.storeSet(p_set_id int4, p_set_comment text) is 330s 'Generate STORE_SET event for set set_id with human readable comment set_comment'; 330s COMMENT 330s create or replace function public.storeSet_int (p_set_id int4, p_set_origin int4, p_set_comment text) 330s returns int4 330s as $$ 330s declare 330s v_dummy int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s select 1 into v_dummy 330s from public.sl_set 330s where set_id = p_set_id 330s for update; 330s if found then 330s update public.sl_set 330s set set_comment = p_set_comment 330s where set_id = p_set_id; 330s else 330s if not exists (select 1 from public.sl_node 330s where no_id = p_set_origin) then 330s perform public.storeNode_int (p_set_origin, ''); 330s end if; 330s insert into public.sl_set 330s (set_id, set_origin, set_comment) values 330s (p_set_id, p_set_origin, p_set_comment); 330s end if; 330s 330s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 330s perform public.addPartialLogIndices(); 330s 330s return p_set_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.storeSet_int(p_set_id int4, p_set_origin int4, p_set_comment text) is 330s 'storeSet_int (set_id, set_origin, set_comment) 330s 330s Process the STORE_SET event, indicating the new set with given ID, 330s origin node, and human readable comment.'; 330s COMMENT 330s create or replace function public.lockSet (p_set_id int4) 330s returns int4 330s as $$ 330s declare 330s v_local_node_id int4; 330s v_set_row record; 330s v_tab_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that the set exists and that we are the origin 330s -- and that it is not already locked. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select * into v_set_row from public.sl_set 330s where set_id = p_set_id 330s for update; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_set_id; 330s end if; 330s if v_set_row.set_origin <> v_local_node_id then 330s raise exception 'Slony-I: set % does not originate on local node', 330s p_set_id; 330s end if; 330s if v_set_row.set_locked notnull then 330s raise exception 'Slony-I: set % is already locked', p_set_id; 330s end if; 330s 330s -- ---- 330s -- Place the lockedSet trigger on all tables in the set. 330s -- ---- 330s for v_tab_row in select T.tab_id, 330s public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) as tab_fqname 330s from public.sl_table T, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where T.tab_set = p_set_id 330s and T.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid 330s order by tab_id 330s loop 330s execute 'create trigger "_main_lockedset" ' || 330s 'before insert or update or delete on ' || 330s v_tab_row.tab_fqname || ' for each row execute procedure 330s public.lockedSet (''_main'');'; 330s end loop; 330s 330s -- ---- 330s -- Remember our snapshots xmax as for the set locking 330s -- ---- 330s update public.sl_set 330s set set_locked = "pg_catalog".txid_snapshot_xmax("pg_catalog".txid_current_snapshot()) 330s where set_id = p_set_id; 330s 330s return p_set_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.lockSet(p_set_id int4) is 330s 'lockSet(set_id) 330s 330s Add a special trigger to all tables of a set that disables access to 330s it.'; 330s COMMENT 330s create or replace function public.unlockSet (p_set_id int4) 330s returns int4 330s as $$ 330s declare 330s v_local_node_id int4; 330s v_set_row record; 330s v_tab_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that the set exists and that we are the origin 330s -- and that it is not already locked. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select * into v_set_row from public.sl_set 330s where set_id = p_set_id 330s for update; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_set_id; 330s end if; 330s if v_set_row.set_origin <> v_local_node_id then 330s raise exception 'Slony-I: set % does not originate on local node', 330s p_set_id; 330s end if; 330s if v_set_row.set_locked isnull then 330s raise exception 'Slony-I: set % is not locked', p_set_id; 330s end if; 330s 330s -- ---- 330s -- Drop the lockedSet trigger from all tables in the set. 330s -- ---- 330s for v_tab_row in select T.tab_id, 330s public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) as tab_fqname 330s from public.sl_table T, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where T.tab_set = p_set_id 330s and T.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid 330s order by tab_id 330s loop 330s execute 'drop trigger "_main_lockedset" ' || 330s 'on ' || v_tab_row.tab_fqname; 330s end loop; 330s 330s -- ---- 330s -- Clear out the set_locked field 330s -- ---- 330s update public.sl_set 330s set set_locked = NULL 330s where set_id = p_set_id; 330s 330s return p_set_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.unlockSet(p_set_id int4) is 330s 'Remove the special trigger from all tables of a set that disables access to it.'; 330s COMMENT 330s create or replace function public.moveSet (p_set_id int4, p_new_origin int4) 330s returns bigint 330s as $$ 330s declare 330s v_local_node_id int4; 330s v_set_row record; 330s v_sub_row record; 330s v_sync_seqno int8; 330s v_lv_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that the set is locked and that this locking 330s -- happened long enough ago. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select * into v_set_row from public.sl_set 330s where set_id = p_set_id 330s for update; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_set_id; 330s end if; 330s if v_set_row.set_origin <> v_local_node_id then 330s raise exception 'Slony-I: set % does not originate on local node', 330s p_set_id; 330s end if; 330s if v_set_row.set_locked isnull then 330s raise exception 'Slony-I: set % is not locked', p_set_id; 330s end if; 330s if v_set_row.set_locked > "pg_catalog".txid_snapshot_xmin("pg_catalog".txid_current_snapshot()) then 330s raise exception 'Slony-I: cannot move set % yet, transactions < % are still in progress', 330s p_set_id, v_set_row.set_locked; 330s end if; 330s 330s -- ---- 330s -- Unlock the set 330s -- ---- 330s perform public.unlockSet(p_set_id); 330s 330s -- ---- 330s -- Check that the new_origin is an active subscriber of the set 330s -- ---- 330s select * into v_sub_row from public.sl_subscribe 330s where sub_set = p_set_id 330s and sub_receiver = p_new_origin; 330s if not found then 330s raise exception 'Slony-I: set % is not subscribed by node %', 330s p_set_id, p_new_origin; 330s end if; 330s if not v_sub_row.sub_active then 330s raise exception 'Slony-I: subsctiption of node % for set % is inactive', 330s p_new_origin, p_set_id; 330s end if; 330s 330s -- ---- 330s -- Reconfigure everything 330s -- ---- 330s perform public.moveSet_int(p_set_id, v_local_node_id, 330s p_new_origin, 0); 330s 330s perform public.RebuildListenEntries(); 330s 330s -- ---- 330s -- At this time we hold access exclusive locks for every table 330s -- in the set. But we did move the set to the new origin, so the 330s -- createEvent() we are doing now will not record the sequences. 330s -- ---- 330s v_sync_seqno := public.createEvent('_main', 'SYNC'); 330s insert into public.sl_seqlog 330s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 330s select seq_id, v_local_node_id, v_sync_seqno, seq_last_value 330s from public.sl_seqlastvalue 330s where seq_set = p_set_id; 330s 330s -- ---- 330s -- Finally we generate the real event 330s -- ---- 330s return public.createEvent('_main', 'MOVE_SET', 330s p_set_id::text, v_local_node_id::text, p_new_origin::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.moveSet(p_set_id int4, p_new_origin int4) is 330s 'moveSet(set_id, new_origin) 330s 330s Generate MOVE_SET event to request that the origin for set set_id be moved to node new_origin'; 330s COMMENT 330s create or replace function public.moveSet_int (p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) 330s returns int4 330s as $$ 330s declare 330s v_local_node_id int4; 330s v_tab_row record; 330s v_sub_row record; 330s v_sub_node int4; 330s v_sub_last int4; 330s v_sub_next int4; 330s v_last_sync int8; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Get our local node ID 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s 330s -- On the new origin, raise an event - ACCEPT_SET 330s if v_local_node_id = p_new_origin then 330s -- Create a SYNC event as well so that the ACCEPT_SET has 330s -- the same snapshot as the last SYNC generated by the new 330s -- origin. This snapshot will be used by other nodes to 330s -- finalize the setsync status. 330s perform public.createEvent('_main', 'SYNC', NULL); 330s perform public.createEvent('_main', 'ACCEPT_SET', 330s p_set_id::text, p_old_origin::text, 330s p_new_origin::text, p_wait_seqno::text); 330s end if; 330s 330s -- ---- 330s -- Next we have to reverse the subscription path 330s -- ---- 330s v_sub_last = p_new_origin; 330s select sub_provider into v_sub_node 330s from public.sl_subscribe 330s where sub_set = p_set_id 330s and sub_receiver = p_new_origin; 330s if not found then 330s raise exception 'Slony-I: subscription path broken in moveSet_int'; 330s end if; 330s while v_sub_node <> p_old_origin loop 330s -- ---- 330s -- Tracing node by node, the old receiver is now in 330s -- v_sub_last and the old provider is in v_sub_node. 330s -- ---- 330s 330s -- ---- 330s -- Get the current provider of this node as next 330s -- and change the provider to the previous one in 330s -- the reverse chain. 330s -- ---- 330s select sub_provider into v_sub_next 330s from public.sl_subscribe 330s where sub_set = p_set_id 330s and sub_receiver = v_sub_node 330s for update; 330s if not found then 330s raise exception 'Slony-I: subscription path broken in moveSet_int'; 330s end if; 330s update public.sl_subscribe 330s set sub_provider = v_sub_last 330s where sub_set = p_set_id 330s and sub_receiver = v_sub_node 330s and sub_receiver <> v_sub_last; 330s 330s v_sub_last = v_sub_node; 330s v_sub_node = v_sub_next; 330s end loop; 330s 330s -- ---- 330s -- This includes creating a subscription for the old origin 330s -- ---- 330s insert into public.sl_subscribe 330s (sub_set, sub_provider, sub_receiver, 330s sub_forward, sub_active) 330s values (p_set_id, v_sub_last, p_old_origin, true, true); 330s if v_local_node_id = p_old_origin then 330s select coalesce(max(ev_seqno), 0) into v_last_sync 330s from public.sl_event 330s where ev_origin = p_new_origin 330s and ev_type = 'SYNC'; 330s if v_last_sync > 0 then 330s insert into public.sl_setsync 330s (ssy_setid, ssy_origin, ssy_seqno, 330s ssy_snapshot, ssy_action_list) 330s select p_set_id, p_new_origin, v_last_sync, 330s ev_snapshot, NULL 330s from public.sl_event 330s where ev_origin = p_new_origin 330s and ev_seqno = v_last_sync; 330s else 330s insert into public.sl_setsync 330s (ssy_setid, ssy_origin, ssy_seqno, 330s ssy_snapshot, ssy_action_list) 330s values (p_set_id, p_new_origin, '0', 330s '1:1:', NULL); 330s end if; 330s end if; 330s 330s -- ---- 330s -- Now change the ownership of the set. 330s -- ---- 330s update public.sl_set 330s set set_origin = p_new_origin 330s where set_id = p_set_id; 330s 330s -- ---- 330s -- On the new origin, delete the obsolete setsync information 330s -- and the subscription. 330s -- ---- 330s if v_local_node_id = p_new_origin then 330s delete from public.sl_setsync 330s where ssy_setid = p_set_id; 330s else 330s if v_local_node_id <> p_old_origin then 330s -- 330s -- On every other node, change the setsync so that it will 330s -- pick up from the new origins last known sync. 330s -- 330s delete from public.sl_setsync 330s where ssy_setid = p_set_id; 330s select coalesce(max(ev_seqno), 0) into v_last_sync 330s from public.sl_event 330s where ev_origin = p_new_origin 330s and ev_type = 'SYNC'; 330s if v_last_sync > 0 then 330s insert into public.sl_setsync 330s (ssy_setid, ssy_origin, ssy_seqno, 330s ssy_snapshot, ssy_action_list) 330s select p_set_id, p_new_origin, v_last_sync, 330s ev_snapshot, NULL 330s from public.sl_event 330s where ev_origin = p_new_origin 330s and ev_seqno = v_last_sync; 330s else 330s insert into public.sl_setsync 330s (ssy_setid, ssy_origin, ssy_seqno, 330s ssy_snapshot, ssy_action_list) 330s values (p_set_id, p_new_origin, 330s '0', '1:1:', NULL); 330s end if; 330s end if; 330s end if; 330s delete from public.sl_subscribe 330s where sub_set = p_set_id 330s and sub_receiver = p_new_origin; 330s 330s -- Regenerate sl_listen since we revised the subscriptions 330s perform public.RebuildListenEntries(); 330s 330s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 330s perform public.addPartialLogIndices(); 330s 330s -- ---- 330s -- If we are the new or old origin, we have to 330s -- adjust the log and deny access trigger configuration. 330s -- ---- 330s if v_local_node_id = p_old_origin or v_local_node_id = p_new_origin then 330s for v_tab_row in select tab_id from public.sl_table 330s where tab_set = p_set_id 330s order by tab_id 330s loop 330s perform public.alterTableConfigureTriggers(v_tab_row.tab_id); 330s end loop; 330s end if; 330s 330s return p_set_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.moveSet_int(p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) is 330s 'moveSet(set_id, old_origin, new_origin, wait_seqno) 330s 330s Process MOVE_SET event to request that the origin for set set_id be 330s moved from old_origin to node new_origin'; 330s COMMENT 330s create or replace function public.dropSet (p_set_id int4) 330s returns bigint 330s as $$ 330s declare 330s v_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that the set exists and originates here 330s -- ---- 330s select set_origin into v_origin from public.sl_set 330s where set_id = p_set_id; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_set_id; 330s end if; 330s if v_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: set % does not originate on local node', 330s p_set_id; 330s end if; 330s 330s -- ---- 330s -- Call the internal drop set functionality and generate the event 330s -- ---- 330s perform public.dropSet_int(p_set_id); 330s return public.createEvent('_main', 'DROP_SET', 330s p_set_id::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropSet(p_set_id int4) is 330s 'Generate DROP_SET event to drop replication of set set_id'; 330s COMMENT 330s create or replace function public.dropSet_int (p_set_id int4) 330s returns int4 330s as $$ 330s declare 330s v_tab_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Restore all tables original triggers and rules and remove 330s -- our replication stuff. 330s -- ---- 330s for v_tab_row in select tab_id from public.sl_table 330s where tab_set = p_set_id 330s order by tab_id 330s loop 330s perform public.alterTableDropTriggers(v_tab_row.tab_id); 330s end loop; 330s 330s -- ---- 330s -- Remove all traces of the set configuration 330s -- ---- 330s delete from public.sl_sequence 330s where seq_set = p_set_id; 330s delete from public.sl_table 330s where tab_set = p_set_id; 330s delete from public.sl_subscribe 330s where sub_set = p_set_id; 330s delete from public.sl_setsync 330s where ssy_setid = p_set_id; 330s delete from public.sl_set 330s where set_id = p_set_id; 330s 330s -- Regenerate sl_listen since we revised the subscriptions 330s perform public.RebuildListenEntries(); 330s 330s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 330s perform public.addPartialLogIndices(); 330s 330s return p_set_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.dropSet(p_set_id int4) is 330s 'Process DROP_SET event to drop replication of set set_id. This involves: 330s - Removing log and deny access triggers 330s - Removing all traces of the set configuration, including sequences, tables, subscribers, syncs, and the set itself'; 330s COMMENT 330s create or replace function public.mergeSet (p_set_id int4, p_add_id int4) 330s returns bigint 330s as $$ 330s declare 330s v_origin int4; 330s in_progress boolean; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that both sets exist and originate here 330s -- ---- 330s if p_set_id = p_add_id then 330s raise exception 'Slony-I: merged set ids cannot be identical'; 330s end if; 330s select set_origin into v_origin from public.sl_set 330s where set_id = p_set_id; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_set_id; 330s end if; 330s if v_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: set % does not originate on local node', 330s p_set_id; 330s end if; 330s 330s select set_origin into v_origin from public.sl_set 330s where set_id = p_add_id; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_add_id; 330s end if; 330s if v_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: set % does not originate on local node', 330s p_add_id; 330s end if; 330s 330s -- ---- 330s -- Check that both sets are subscribed by the same set of nodes 330s -- ---- 330s if exists (select true from public.sl_subscribe SUB1 330s where SUB1.sub_set = p_set_id 330s and SUB1.sub_receiver not in (select SUB2.sub_receiver 330s from public.sl_subscribe SUB2 330s where SUB2.sub_set = p_add_id)) 330s then 330s raise exception 'Slony-I: subscriber lists of set % and % are different', 330s p_set_id, p_add_id; 330s end if; 330s 330s if exists (select true from public.sl_subscribe SUB1 330s where SUB1.sub_set = p_add_id 330s and SUB1.sub_receiver not in (select SUB2.sub_receiver 330s from public.sl_subscribe SUB2 330s where SUB2.sub_set = p_set_id)) 330s then 330s raise exception 'Slony-I: subscriber lists of set % and % are different', 330s p_add_id, p_set_id; 330s end if; 330s 330s -- ---- 330s -- Check that all ENABLE_SUBSCRIPTION events for the set are confirmed 330s -- ---- 330s select public.isSubscriptionInProgress(p_add_id) into in_progress ; 330s 330s if in_progress then 330s raise exception 'Slony-I: set % has subscriptions in progress - cannot merge', 330s p_add_id; 330s end if; 330s 330s -- ---- 330s -- Create a SYNC event, merge the sets, create a MERGE_SET event 330s -- ---- 330s perform public.createEvent('_main', 'SYNC', NULL); 330s perform public.mergeSet_int(p_set_id, p_add_id); 330s return public.createEvent('_main', 'MERGE_SET', 330s p_set_id::text, p_add_id::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.mergeSet(p_set_id int4, p_add_id int4) is 330s 'Generate MERGE_SET event to request that sets be merged together. 330s 330s Both sets must exist, and originate on the same node. They must be 330s subscribed by the same set of nodes.'; 330s COMMENT 330s create or replace function public.isSubscriptionInProgress(p_add_id int4) 330s returns boolean 330s as $$ 330s begin 330s if exists (select true from public.sl_event 330s where ev_type = 'ENABLE_SUBSCRIPTION' 330s and ev_data1 = p_add_id::text 330s and ev_seqno > (select max(con_seqno) from public.sl_confirm 330s where con_origin = ev_origin 330s and con_received::text = ev_data3)) 330s then 330s return true; 330s else 330s return false; 330s end if; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.isSubscriptionInProgress(p_add_id int4) is 330s 'Checks to see if a subscription for the indicated set is in progress. 330s Returns true if a subscription is in progress. Otherwise false'; 330s COMMENT 330s create or replace function public.mergeSet_int (p_set_id int4, p_add_id int4) 330s returns int4 330s as $$ 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s update public.sl_sequence 330s set seq_set = p_set_id 330s where seq_set = p_add_id; 330s update public.sl_table 330s set tab_set = p_set_id 330s where tab_set = p_add_id; 330s delete from public.sl_subscribe 330s where sub_set = p_add_id; 330s delete from public.sl_setsync 330s where ssy_setid = p_add_id; 330s delete from public.sl_set 330s where set_id = p_add_id; 330s 330s return p_set_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.mergeSet_int(p_set_id int4, p_add_id int4) is 330s 'mergeSet_int(set_id, add_id) - Perform MERGE_SET event, merging all objects from 330s set add_id into set set_id.'; 330s COMMENT 330s create or replace function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 330s returns bigint 330s as $$ 330s declare 330s v_set_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that we are the origin of the set 330s -- ---- 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = p_set_id; 330s if not found then 330s raise exception 'Slony-I: setAddTable(): set % not found', p_set_id; 330s end if; 330s if v_set_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: setAddTable(): set % has remote origin', p_set_id; 330s end if; 330s 330s if exists (select true from public.sl_subscribe 330s where sub_set = p_set_id) 330s then 330s raise exception 'Slony-I: cannot add table to currently subscribed set % - must attach to an unsubscribed set', 330s p_set_id; 330s end if; 330s 330s -- ---- 330s -- Add the table to the set and generate the SET_ADD_TABLE event 330s -- ---- 330s perform public.setAddTable_int(p_set_id, p_tab_id, p_fqname, 330s p_tab_idxname, p_tab_comment); 330s return public.createEvent('_main', 'SET_ADD_TABLE', 330s p_set_id::text, p_tab_id::text, p_fqname::text, 330s p_tab_idxname::text, p_tab_comment::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 330s 'setAddTable (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 330s 330s Add table tab_fqname to replication set on origin node, and generate 330s SET_ADD_TABLE event to allow this to propagate to other nodes. 330s 330s Note that the table id, tab_id, must be unique ACROSS ALL SETS.'; 330s COMMENT 330s create or replace function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 330s returns int4 330s as $$ 330s declare 330s v_tab_relname name; 330s v_tab_nspname name; 330s v_local_node_id int4; 330s v_set_origin int4; 330s v_sub_provider int4; 330s v_relkind char; 330s v_tab_reloid oid; 330s v_pkcand_nn boolean; 330s v_prec record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- For sets with a remote origin, check that we are subscribed 330s -- to that set. Otherwise we ignore the table because it might 330s -- not even exist in our database. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = p_set_id; 330s if not found then 330s raise exception 'Slony-I: setAddTable_int(): set % not found', 330s p_set_id; 330s end if; 330s if v_set_origin != v_local_node_id then 330s select sub_provider into v_sub_provider 330s from public.sl_subscribe 330s where sub_set = p_set_id 330s and sub_receiver = public.getLocalNodeId('_main'); 330s if not found then 330s return 0; 330s end if; 330s end if; 330s 330s -- ---- 330s -- Get the tables OID and check that it is a real table 330s -- ---- 330s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname into v_tab_reloid, v_relkind, v_tab_relname, v_tab_nspname 330s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where PGC.relnamespace = PGN.oid 330s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 330s '.' || public.slon_quote_brute(PGC.relname); 330s if not found then 330s raise exception 'Slony-I: setAddTable_int(): table % not found', 330s p_fqname; 330s end if; 330s if v_relkind != 'r' then 330s raise exception 'Slony-I: setAddTable_int(): % is not a regular table', 330s p_fqname; 330s end if; 330s 330s if not exists (select indexrelid 330s from "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGC 330s where PGX.indrelid = v_tab_reloid 330s and PGX.indexrelid = PGC.oid 330s and PGC.relname = p_tab_idxname) 330s then 330s raise exception 'Slony-I: setAddTable_int(): table % has no index %', 330s p_fqname, p_tab_idxname; 330s end if; 330s 330s -- ---- 330s -- Verify that the columns in the PK (or candidate) are not NULLABLE 330s -- ---- 330s 330s v_pkcand_nn := 'f'; 330s for v_prec in select attname from "pg_catalog".pg_attribute where attrelid = 330s (select oid from "pg_catalog".pg_class where oid = v_tab_reloid) 330s and attname in (select attname from "pg_catalog".pg_attribute where 330s attrelid = (select oid from "pg_catalog".pg_class PGC, 330s "pg_catalog".pg_index PGX where 330s PGC.relname = p_tab_idxname and PGX.indexrelid=PGC.oid and 330s PGX.indrelid = v_tab_reloid)) and attnotnull <> 't' 330s loop 330s raise notice 'Slony-I: setAddTable_int: table % PK column % nullable', p_fqname, v_prec.attname; 330s v_pkcand_nn := 't'; 330s end loop; 330s if v_pkcand_nn then 330s raise exception 'Slony-I: setAddTable_int: table % not replicable!', p_fqname; 330s end if; 330s 330s select * into v_prec from public.sl_table where tab_id = p_tab_id; 330s if not found then 330s v_pkcand_nn := 't'; -- No-op -- All is well 330s else 330s raise exception 'Slony-I: setAddTable_int: table id % has already been assigned!', p_tab_id; 330s end if; 330s 330s -- ---- 330s -- Add the table to sl_table and create the trigger on it. 330s -- ---- 330s insert into public.sl_table 330s (tab_id, tab_reloid, tab_relname, tab_nspname, 330s tab_set, tab_idxname, tab_altered, tab_comment) 330s values 330s (p_tab_id, v_tab_reloid, v_tab_relname, v_tab_nspname, 330s p_set_id, p_tab_idxname, false, p_tab_comment); 330s perform public.alterTableAddTriggers(p_tab_id); 330s 330s return p_tab_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 330s 'setAddTable_int (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 330s 330s This function processes the SET_ADD_TABLE event on remote nodes, 330s adding a table to replication if the remote node is subscribing to its 330s replication set.'; 330s COMMENT 330s create or replace function public.setDropTable(p_tab_id int4) 330s returns bigint 330s as $$ 330s declare 330s v_set_id int4; 330s v_set_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Determine the set_id 330s -- ---- 330s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 330s 330s -- ---- 330s -- Ensure table exists 330s -- ---- 330s if not found then 330s raise exception 'Slony-I: setDropTable_int(): table % not found', 330s p_tab_id; 330s end if; 330s 330s -- ---- 330s -- Check that we are the origin of the set 330s -- ---- 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = v_set_id; 330s if not found then 330s raise exception 'Slony-I: setDropTable(): set % not found', v_set_id; 330s end if; 330s if v_set_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: setDropTable(): set % has remote origin', v_set_id; 330s end if; 330s 330s -- ---- 330s -- Drop the table from the set and generate the SET_ADD_TABLE event 330s -- ---- 330s perform public.setDropTable_int(p_tab_id); 330s return public.createEvent('_main', 'SET_DROP_TABLE', 330s p_tab_id::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setDropTable(p_tab_id int4) is 330s 'setDropTable (tab_id) 330s 330s Drop table tab_id from set on origin node, and generate SET_DROP_TABLE 330s event to allow this to propagate to other nodes.'; 330s COMMENT 330s create or replace function public.setDropTable_int(p_tab_id int4) 330s returns int4 330s as $$ 330s declare 330s v_set_id int4; 330s v_local_node_id int4; 330s v_set_origin int4; 330s v_sub_provider int4; 330s v_tab_reloid oid; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Determine the set_id 330s -- ---- 330s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 330s 330s -- ---- 330s -- Ensure table exists 330s -- ---- 330s if not found then 330s return 0; 330s end if; 330s 330s -- ---- 330s -- For sets with a remote origin, check that we are subscribed 330s -- to that set. Otherwise we ignore the table because it might 330s -- not even exist in our database. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = v_set_id; 330s if not found then 330s raise exception 'Slony-I: setDropTable_int(): set % not found', 330s v_set_id; 330s end if; 330s if v_set_origin != v_local_node_id then 330s select sub_provider into v_sub_provider 330s from public.sl_subscribe 330s where sub_set = v_set_id 330s and sub_receiver = public.getLocalNodeId('_main'); 330s if not found then 330s return 0; 330s end if; 330s end if; 330s 330s -- ---- 330s -- Drop the table from sl_table and drop trigger from it. 330s -- ---- 330s perform public.alterTableDropTriggers(p_tab_id); 330s delete from public.sl_table where tab_id = p_tab_id; 330s return p_tab_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setDropTable_int(p_tab_id int4) is 330s 'setDropTable_int (tab_id) 330s 330s This function processes the SET_DROP_TABLE event on remote nodes, 330s dropping a table from replication if the remote node is subscribing to 330s its replication set.'; 330s COMMENT 330s create or replace function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 330s returns bigint 330s as $$ 330s declare 330s v_set_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that we are the origin of the set 330s -- ---- 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = p_set_id; 330s if not found then 330s raise exception 'Slony-I: setAddSequence(): set % not found', p_set_id; 330s end if; 330s if v_set_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: setAddSequence(): set % has remote origin - submit to origin node', p_set_id; 330s end if; 330s 330s if exists (select true from public.sl_subscribe 330s where sub_set = p_set_id) 330s then 330s raise exception 'Slony-I: cannot add sequence to currently subscribed set %', 330s p_set_id; 330s end if; 330s 330s -- ---- 330s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 330s -- ---- 330s perform public.setAddSequence_int(p_set_id, p_seq_id, p_fqname, 330s p_seq_comment); 330s return public.createEvent('_main', 'SET_ADD_SEQUENCE', 330s p_set_id::text, p_seq_id::text, 330s p_fqname::text, p_seq_comment::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 330s 'setAddSequence (set_id, seq_id, seq_fqname, seq_comment) 330s 330s On the origin node for set set_id, add sequence seq_fqname to the 330s replication set, and raise SET_ADD_SEQUENCE to cause this to replicate 330s to subscriber nodes.'; 330s COMMENT 330s create or replace function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 330s returns int4 330s as $$ 330s declare 330s v_local_node_id int4; 330s v_set_origin int4; 330s v_sub_provider int4; 330s v_relkind char; 330s v_seq_reloid oid; 330s v_seq_relname name; 330s v_seq_nspname name; 330s v_sync_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- For sets with a remote origin, check that we are subscribed 330s -- to that set. Otherwise we ignore the sequence because it might 330s -- not even exist in our database. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = p_set_id; 330s if not found then 330s raise exception 'Slony-I: setAddSequence_int(): set % not found', 330s p_set_id; 330s end if; 330s if v_set_origin != v_local_node_id then 330s select sub_provider into v_sub_provider 330s from public.sl_subscribe 330s where sub_set = p_set_id 330s and sub_receiver = public.getLocalNodeId('_main'); 330s if not found then 330s return 0; 330s end if; 330s end if; 330s 330s -- ---- 330s -- Get the sequences OID and check that it is a sequence 330s -- ---- 330s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname 330s into v_seq_reloid, v_relkind, v_seq_relname, v_seq_nspname 330s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where PGC.relnamespace = PGN.oid 330s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 330s '.' || public.slon_quote_brute(PGC.relname); 330s if not found then 330s raise exception 'Slony-I: setAddSequence_int(): sequence % not found', 330s p_fqname; 330s end if; 330s if v_relkind != 'S' then 330s raise exception 'Slony-I: setAddSequence_int(): % is not a sequence', 330s p_fqname; 330s end if; 330s 330s select 1 into v_sync_row from public.sl_sequence where seq_id = p_seq_id; 330s if not found then 330s v_relkind := 'o'; -- all is OK 330s else 330s raise exception 'Slony-I: setAddSequence_int(): sequence ID % has already been assigned', p_seq_id; 330s end if; 330s 330s -- ---- 330s -- Add the sequence to sl_sequence 330s -- ---- 330s insert into public.sl_sequence 330s (seq_id, seq_reloid, seq_relname, seq_nspname, seq_set, seq_comment) 330s values 330s (p_seq_id, v_seq_reloid, v_seq_relname, v_seq_nspname, p_set_id, p_seq_comment); 330s 330s -- ---- 330s -- On the set origin, fake a sl_seqlog row for the last sync event 330s -- ---- 330s if v_set_origin = v_local_node_id then 330s for v_sync_row in select coalesce (max(ev_seqno), 0) as ev_seqno 330s from public.sl_event 330s where ev_origin = v_local_node_id 330s and ev_type = 'SYNC' 330s loop 330s insert into public.sl_seqlog 330s (seql_seqid, seql_origin, seql_ev_seqno, 330s seql_last_value) values 330s (p_seq_id, v_local_node_id, v_sync_row.ev_seqno, 330s public.sequenceLastValue(p_fqname)); 330s end loop; 330s end if; 330s 330s return p_seq_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 330s 'setAddSequence_int (set_id, seq_id, seq_fqname, seq_comment) 330s 330s This processes the SET_ADD_SEQUENCE event. On remote nodes that 330s subscribe to set_id, add the sequence to the replication set.'; 330s COMMENT 330s create or replace function public.setDropSequence (p_seq_id int4) 330s returns bigint 330s as $$ 330s declare 330s v_set_id int4; 330s v_set_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Determine set id for this sequence 330s -- ---- 330s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 330s 330s -- ---- 330s -- Ensure sequence exists 330s -- ---- 330s if not found then 330s raise exception 'Slony-I: setDropSequence_int(): sequence % not found', 330s p_seq_id; 330s end if; 330s 330s -- ---- 330s -- Check that we are the origin of the set 330s -- ---- 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = v_set_id; 330s if not found then 330s raise exception 'Slony-I: setDropSequence(): set % not found', v_set_id; 330s end if; 330s if v_set_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: setDropSequence(): set % has origin at another node - submit this to that node', v_set_id; 330s end if; 330s 330s -- ---- 330s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 330s -- ---- 330s perform public.setDropSequence_int(p_seq_id); 330s return public.createEvent('_main', 'SET_DROP_SEQUENCE', 330s p_seq_id::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setDropSequence (p_seq_id int4) is 330s 'setDropSequence (seq_id) 330s 330s On the origin node for the set, drop sequence seq_id from replication 330s set, and raise SET_DROP_SEQUENCE to cause this to replicate to 330s subscriber nodes.'; 330s COMMENT 330s create or replace function public.setDropSequence_int(p_seq_id int4) 330s returns int4 330s as $$ 330s declare 330s v_set_id int4; 330s v_local_node_id int4; 330s v_set_origin int4; 330s v_sub_provider int4; 330s v_relkind char; 330s v_sync_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Determine set id for this sequence 330s -- ---- 330s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 330s 330s -- ---- 330s -- Ensure sequence exists 330s -- ---- 330s if not found then 330s return 0; 330s end if; 330s 330s -- ---- 330s -- For sets with a remote origin, check that we are subscribed 330s -- to that set. Otherwise we ignore the sequence because it might 330s -- not even exist in our database. 330s -- ---- 330s v_local_node_id := public.getLocalNodeId('_main'); 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = v_set_id; 330s if not found then 330s raise exception 'Slony-I: setDropSequence_int(): set % not found', 330s v_set_id; 330s end if; 330s if v_set_origin != v_local_node_id then 330s select sub_provider into v_sub_provider 330s from public.sl_subscribe 330s where sub_set = v_set_id 330s and sub_receiver = public.getLocalNodeId('_main'); 330s if not found then 330s return 0; 330s end if; 330s end if; 330s 330s -- ---- 330s -- drop the sequence from sl_sequence, sl_seqlog 330s -- ---- 330s delete from public.sl_seqlog where seql_seqid = p_seq_id; 330s delete from public.sl_sequence where seq_id = p_seq_id; 330s 330s return p_seq_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setDropSequence_int(p_seq_id int4) is 330s 'setDropSequence_int (seq_id) 330s 330s This processes the SET_DROP_SEQUENCE event. On remote nodes that 330s subscribe to the set containing sequence seq_id, drop the sequence 330s from the replication set.'; 330s COMMENT 330s create or replace function public.setMoveTable (p_tab_id int4, p_new_set_id int4) 330s returns bigint 330s as $$ 330s declare 330s v_old_set_id int4; 330s v_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Get the tables current set 330s -- ---- 330s select tab_set into v_old_set_id from public.sl_table 330s where tab_id = p_tab_id; 330s if not found then 330s raise exception 'Slony-I: table %d not found', p_tab_id; 330s end if; 330s 330s -- ---- 330s -- Check that both sets exist and originate here 330s -- ---- 330s if p_new_set_id = v_old_set_id then 330s raise exception 'Slony-I: set ids cannot be identical'; 330s end if; 330s select set_origin into v_origin from public.sl_set 330s where set_id = p_new_set_id; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_new_set_id; 330s end if; 330s if v_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: set % does not originate on local node', 330s p_new_set_id; 330s end if; 330s 330s select set_origin into v_origin from public.sl_set 330s where set_id = v_old_set_id; 330s if not found then 330s raise exception 'Slony-I: set % not found', v_old_set_id; 330s end if; 330s if v_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: set % does not originate on local node', 330s v_old_set_id; 330s end if; 330s 330s -- ---- 330s -- Check that both sets are subscribed by the same set of nodes 330s -- ---- 330s if exists (select true from public.sl_subscribe SUB1 330s where SUB1.sub_set = p_new_set_id 330s and SUB1.sub_receiver not in (select SUB2.sub_receiver 330s from public.sl_subscribe SUB2 330s where SUB2.sub_set = v_old_set_id)) 330s then 330s raise exception 'Slony-I: subscriber lists of set % and % are different', 330s p_new_set_id, v_old_set_id; 330s end if; 330s 330s if exists (select true from public.sl_subscribe SUB1 330s where SUB1.sub_set = v_old_set_id 330s and SUB1.sub_receiver not in (select SUB2.sub_receiver 330s from public.sl_subscribe SUB2 330s where SUB2.sub_set = p_new_set_id)) 330s then 330s raise exception 'Slony-I: subscriber lists of set % and % are different', 330s v_old_set_id, p_new_set_id; 330s end if; 330s 330s -- ---- 330s -- Change the set the table belongs to 330s -- ---- 330s perform public.createEvent('_main', 'SYNC', NULL); 330s perform public.setMoveTable_int(p_tab_id, p_new_set_id); 330s return public.createEvent('_main', 'SET_MOVE_TABLE', 330s p_tab_id::text, p_new_set_id::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 330s 'This generates the SET_MOVE_TABLE event. If the set that the table is 330s in is identically subscribed to the set that the table is to be moved 330s into, then the SET_MOVE_TABLE event is raised.'; 330s COMMENT 330s create or replace function public.setMoveTable_int (p_tab_id int4, p_new_set_id int4) 330s returns int4 330s as $$ 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Move the table to the new set 330s -- ---- 330s update public.sl_table 330s set tab_set = p_new_set_id 330s where tab_id = p_tab_id; 330s 330s return p_tab_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 330s 'This processes the SET_MOVE_TABLE event. The table is moved 330s to the destination set.'; 330s COMMENT 330s create or replace function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) 330s returns bigint 330s as $$ 330s declare 330s v_old_set_id int4; 330s v_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Get the sequences current set 330s -- ---- 330s select seq_set into v_old_set_id from public.sl_sequence 330s where seq_id = p_seq_id; 330s if not found then 330s raise exception 'Slony-I: setMoveSequence(): sequence %d not found', p_seq_id; 330s end if; 330s 330s -- ---- 330s -- Check that both sets exist and originate here 330s -- ---- 330s if p_new_set_id = v_old_set_id then 330s raise exception 'Slony-I: setMoveSequence(): set ids cannot be identical'; 330s end if; 330s select set_origin into v_origin from public.sl_set 330s where set_id = p_new_set_id; 330s if not found then 330s raise exception 'Slony-I: setMoveSequence(): set % not found', p_new_set_id; 330s end if; 330s if v_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: setMoveSequence(): set % does not originate on local node', 330s p_new_set_id; 330s end if; 330s 330s select set_origin into v_origin from public.sl_set 330s where set_id = v_old_set_id; 330s if not found then 330s raise exception 'Slony-I: set % not found', v_old_set_id; 330s end if; 330s if v_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: set % does not originate on local node', 330s v_old_set_id; 330s end if; 330s 330s -- ---- 330s -- Check that both sets are subscribed by the same set of nodes 330s -- ---- 330s if exists (select true from public.sl_subscribe SUB1 330s where SUB1.sub_set = p_new_set_id 330s and SUB1.sub_receiver not in (select SUB2.sub_receiver 330s from public.sl_subscribe SUB2 330s where SUB2.sub_set = v_old_set_id)) 330s then 330s raise exception 'Slony-I: subscriber lists of set % and % are different', 330s p_new_set_id, v_old_set_id; 330s end if; 330s 330s if exists (select true from public.sl_subscribe SUB1 330s where SUB1.sub_set = v_old_set_id 330s and SUB1.sub_receiver not in (select SUB2.sub_receiver 330s from public.sl_subscribe SUB2 330s where SUB2.sub_set = p_new_set_id)) 330s then 330s raise exception 'Slony-I: subscriber lists of set % and % are different', 330s v_old_set_id, p_new_set_id; 330s end if; 330s 330s -- ---- 330s -- Change the set the sequence belongs to 330s -- ---- 330s perform public.setMoveSequence_int(p_seq_id, p_new_set_id); 330s return public.createEvent('_main', 'SET_MOVE_SEQUENCE', 330s p_seq_id::text, p_new_set_id::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) is 330s 'setMoveSequence(p_seq_id, p_new_set_id) - This generates the 330s SET_MOVE_SEQUENCE event, after validation, notably that both sets 330s exist, are distinct, and have exactly the same subscription lists'; 330s COMMENT 330s create or replace function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) 330s returns int4 330s as $$ 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Move the sequence to the new set 330s -- ---- 330s update public.sl_sequence 330s set seq_set = p_new_set_id 330s where seq_id = p_seq_id; 330s 330s return p_seq_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) is 330s 'setMoveSequence_int(p_seq_id, p_new_set_id) - processes the 330s SET_MOVE_SEQUENCE event, moving a sequence to another replication 330s set.'; 330s COMMENT 330s create or replace function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) returns int4 330s as $$ 330s declare 330s v_fqname text; 330s v_found integer; 330s begin 330s -- ---- 330s -- Get the sequences fully qualified name 330s -- ---- 330s select public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) into v_fqname 330s from public.sl_sequence SQ, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where SQ.seq_id = p_seq_id 330s and SQ.seq_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid; 330s if not found then 330s if p_ignore_missing then 330s return null; 330s end if; 330s raise exception 'Slony-I: sequenceSetValue(): sequence % not found', p_seq_id; 330s end if; 330s 330s -- ---- 330s -- Update it to the new value 330s -- ---- 330s execute 'select setval(''' || v_fqname || 330s ''', ' || p_last_value::text || ')'; 330s 330s if p_ev_seqno is not null then 330s insert into public.sl_seqlog 330s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 330s values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value); 330s end if; 330s return p_seq_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) is 330s 'sequenceSetValue (seq_id, seq_origin, ev_seqno, last_value,ignore_missing) 330s Set sequence seq_id to have new value last_value. 330s '; 330s COMMENT 330s drop function if exists public.ddlCapture (p_statement text, p_nodes text); 330s DROP FUNCTION 330s create or replace function public.ddlCapture (p_statement text, p_nodes text) 330s returns bigint 330s as $$ 330s declare 330s c_local_node integer; 330s c_found_origin boolean; 330s c_node text; 330s c_cmdargs text[]; 330s c_nodeargs text; 330s c_delim text; 330s begin 330s c_local_node := public.getLocalNodeId('_main'); 330s 330s c_cmdargs = array_append('{}'::text[], p_statement); 330s c_nodeargs = ''; 330s if p_nodes is not null then 330s c_found_origin := 'f'; 330s -- p_nodes list needs to consist of a list of nodes that exist 330s -- and that include the current node ID 330s for c_node in select trim(node) from 330s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 330s if not exists 330s (select 1 from public.sl_node 330s where no_id = (c_node::integer)) then 330s raise exception 'ddlcapture(%,%) - node % does not exist!', 330s p_statement, p_nodes, c_node; 330s end if; 330s 330s if c_local_node = (c_node::integer) then 330s c_found_origin := 't'; 330s end if; 330s if length(c_nodeargs)>0 then 330s c_nodeargs = c_nodeargs ||','|| c_node; 330s else 330s c_nodeargs=c_node; 330s end if; 330s end loop; 330s 330s if not c_found_origin then 330s raise exception 330s 'ddlcapture(%,%) - origin node % not included in ONLY ON list!', 330s p_statement, p_nodes, c_local_node; 330s end if; 330s end if; 330s c_cmdargs = array_append(c_cmdargs,c_nodeargs); 330s c_delim=','; 330s c_cmdargs = array_append(c_cmdargs, 330s 330s (select public.string_agg( seq_id::text || c_delim 330s || c_local_node || 330s c_delim || seq_last_value) 330s FROM ( 330s select seq_id, 330s seq_last_value from public.sl_seqlastvalue 330s where seq_origin = c_local_node) as FOO 330s where NOT public.seqtrack(seq_id,seq_last_value) is NULL)); 330s insert into public.sl_log_script 330s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 330s values 330s (c_local_node, pg_catalog.txid_current(), 330s nextval('public.sl_action_seq'), 'S', c_cmdargs); 330s execute p_statement; 330s return currval('public.sl_action_seq'); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.ddlCapture (p_statement text, p_nodes text) is 330s 'Capture an SQL statement (usually DDL) that is to be literally replayed on subscribers'; 330s COMMENT 330s drop function if exists public.ddlScript_complete (int4, text, int4); 330s DROP FUNCTION 330s create or replace function public.ddlScript_complete (p_nodes text) 330s returns bigint 330s as $$ 330s declare 330s c_local_node integer; 330s c_found_origin boolean; 330s c_node text; 330s c_cmdargs text[]; 330s begin 330s c_local_node := public.getLocalNodeId('_main'); 330s 330s c_cmdargs = '{}'::text[]; 330s if p_nodes is not null then 330s c_found_origin := 'f'; 330s -- p_nodes list needs to consist o a list of nodes that exist 330s -- and that include the current node ID 330s for c_node in select trim(node) from 330s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 330s if not exists 330s (select 1 from public.sl_node 330s where no_id = (c_node::integer)) then 330s raise exception 'ddlcapture(%,%) - node % does not exist!', 330s p_statement, p_nodes, c_node; 330s end if; 330s 330s if c_local_node = (c_node::integer) then 330s c_found_origin := 't'; 330s end if; 330s 330s c_cmdargs = array_append(c_cmdargs, c_node); 330s end loop; 330s 330s if not c_found_origin then 330s raise exception 330s 'ddlScript_complete(%) - origin node % not included in ONLY ON list!', 330s p_nodes, c_local_node; 330s end if; 330s end if; 330s 330s perform public.ddlScript_complete_int(); 330s 330s insert into public.sl_log_script 330s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 330s values 330s (c_local_node, pg_catalog.txid_current(), 330s nextval('public.sl_action_seq'), 's', c_cmdargs); 330s 330s return currval('public.sl_action_seq'); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.ddlScript_complete(p_nodes text) is 330s 'ddlScript_complete(set_id, script, only_on_node) 330s 330s After script has run on origin, this fixes up relnames and 330s log trigger arguments and inserts the "fire ddlScript_complete_int() 330s log row into sl_log_script.'; 330s COMMENT 330s drop function if exists public.ddlScript_complete_int(int4, int4); 330s DROP FUNCTION 330s create or replace function public.ddlScript_complete_int () 330s returns int4 330s as $$ 330s begin 330s perform public.updateRelname(); 330s perform public.repair_log_triggers(true); 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.ddlScript_complete_int() is 330s 'ddlScript_complete_int() 330s 330s Complete processing the DDL_SCRIPT event.'; 330s COMMENT 330s create or replace function public.alterTableAddTriggers (p_tab_id int4) 330s returns int4 330s as $$ 330s declare 330s v_no_id int4; 330s v_tab_row record; 330s v_tab_fqname text; 330s v_tab_attkind text; 330s v_n int4; 330s v_trec record; 330s v_tgbad boolean; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Get our local node ID 330s -- ---- 330s v_no_id := public.getLocalNodeId('_main'); 330s 330s -- ---- 330s -- Get the sl_table row and the current origin of the table. 330s -- ---- 330s select T.tab_reloid, T.tab_set, T.tab_idxname, 330s S.set_origin, PGX.indexrelid, 330s public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) as tab_fqname 330s into v_tab_row 330s from public.sl_table T, public.sl_set S, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 330s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 330s where T.tab_id = p_tab_id 330s and T.tab_set = S.set_id 330s and T.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid 330s and PGX.indrelid = T.tab_reloid 330s and PGX.indexrelid = PGXC.oid 330s and PGXC.relname = T.tab_idxname 330s for update; 330s if not found then 330s raise exception 'Slony-I: alterTableAddTriggers(): Table with id % not found', p_tab_id; 330s end if; 330s v_tab_fqname = v_tab_row.tab_fqname; 330s 330s v_tab_attkind := public.determineAttKindUnique(v_tab_row.tab_fqname, 330s v_tab_row.tab_idxname); 330s 330s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 330s 330s -- ---- 330s -- Create the log and the deny access triggers 330s -- ---- 330s execute 'create trigger "_main_logtrigger"' || 330s ' after insert or update or delete on ' || 330s v_tab_fqname || ' for each row execute procedure public.logTrigger (' || 330s pg_catalog.quote_literal('_main') || ',' || 330s pg_catalog.quote_literal(p_tab_id::text) || ',' || 330s pg_catalog.quote_literal(v_tab_attkind) || ');'; 330s 330s execute 'create trigger "_main_denyaccess" ' || 330s 'before insert or update or delete on ' || 330s v_tab_fqname || ' for each row execute procedure ' || 330s 'public.denyAccess (' || pg_catalog.quote_literal('_main') || ');'; 330s 330s perform public.alterTableAddTruncateTrigger(v_tab_fqname, p_tab_id); 330s 330s perform public.alterTableConfigureTriggers (p_tab_id); 330s return p_tab_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.alterTableAddTriggers(p_tab_id int4) is 330s 'alterTableAddTriggers(tab_id) 330s 330s Adds the log and deny access triggers to a replicated table.'; 330s COMMENT 330s create or replace function public.alterTableDropTriggers (p_tab_id int4) 330s returns int4 330s as $$ 330s declare 330s v_no_id int4; 330s v_tab_row record; 330s v_tab_fqname text; 330s v_n int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Get our local node ID 330s -- ---- 330s v_no_id := public.getLocalNodeId('_main'); 330s 330s -- ---- 330s -- Get the sl_table row and the current tables origin. 330s -- ---- 330s select T.tab_reloid, T.tab_set, 330s S.set_origin, PGX.indexrelid, 330s public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) as tab_fqname 330s into v_tab_row 330s from public.sl_table T, public.sl_set S, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 330s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 330s where T.tab_id = p_tab_id 330s and T.tab_set = S.set_id 330s and T.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid 330s and PGX.indrelid = T.tab_reloid 330s and PGX.indexrelid = PGXC.oid 330s and PGXC.relname = T.tab_idxname 330s for update; 330s if not found then 330s raise exception 'Slony-I: alterTableDropTriggers(): Table with id % not found', p_tab_id; 330s end if; 330s v_tab_fqname = v_tab_row.tab_fqname; 330s 330s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 330s 330s -- ---- 330s -- Drop both triggers 330s -- ---- 330s execute 'drop trigger "_main_logtrigger" on ' || 330s v_tab_fqname; 330s 330s execute 'drop trigger "_main_denyaccess" on ' || 330s v_tab_fqname; 330s 330s perform public.alterTableDropTruncateTrigger(v_tab_fqname, p_tab_id); 330s 330s return p_tab_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.alterTableDropTriggers (p_tab_id int4) is 330s 'alterTableDropTriggers (tab_id) 330s 330s Remove the log and deny access triggers from a table.'; 330s COMMENT 330s create or replace function public.alterTableConfigureTriggers (p_tab_id int4) 330s returns int4 330s as $$ 330s declare 330s v_no_id int4; 330s v_tab_row record; 330s v_tab_fqname text; 330s v_n int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Get our local node ID 330s -- ---- 330s v_no_id := public.getLocalNodeId('_main'); 330s 330s -- ---- 330s -- Get the sl_table row and the current tables origin. 330s -- ---- 330s select T.tab_reloid, T.tab_set, 330s S.set_origin, PGX.indexrelid, 330s public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) as tab_fqname 330s into v_tab_row 330s from public.sl_table T, public.sl_set S, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 330s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 330s where T.tab_id = p_tab_id 330s and T.tab_set = S.set_id 330s and T.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid 330s and PGX.indrelid = T.tab_reloid 330s and PGX.indexrelid = PGXC.oid 330s and PGXC.relname = T.tab_idxname 330s for update; 330s if not found then 330s raise exception 'Slony-I: alterTableConfigureTriggers(): Table with id % not found', p_tab_id; 330s end if; 330s v_tab_fqname = v_tab_row.tab_fqname; 330s 330s -- ---- 330s -- Configuration depends on the origin of the table 330s -- ---- 330s if v_tab_row.set_origin = v_no_id then 330s -- ---- 330s -- On the origin the log trigger is configured like a default 330s -- user trigger and the deny access trigger is disabled. 330s -- ---- 330s execute 'alter table ' || v_tab_fqname || 330s ' enable trigger "_main_logtrigger"'; 330s execute 'alter table ' || v_tab_fqname || 330s ' disable trigger "_main_denyaccess"'; 330s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 330s 'enable', 'disable'); 330s else 330s -- ---- 330s -- On a replica the log trigger is disabled and the 330s -- deny access trigger fires in origin session role. 330s -- ---- 330s execute 'alter table ' || v_tab_fqname || 330s ' disable trigger "_main_logtrigger"'; 330s execute 'alter table ' || v_tab_fqname || 330s ' enable trigger "_main_denyaccess"'; 330s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 330s 'disable', 'enable'); 330s 330s end if; 330s 330s return p_tab_id; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.alterTableConfigureTriggers (p_tab_id int4) is 330s 'alterTableConfigureTriggers (tab_id) 330s 330s Set the enable/disable configuration for the replication triggers 330s according to the origin of the set.'; 330s COMMENT 330s create or replace function public.resubscribeNode (p_origin int4, 330s p_provider int4, p_receiver int4) 330s returns bigint 330s as $$ 330s declare 330s v_record record; 330s v_missing_sets text; 330s v_ev_seqno bigint; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- 330s -- Check that the receiver exists 330s -- 330s if not exists (select no_id from public.sl_node where no_id= 330s p_receiver) then 330s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_receiver; 330s end if; 330s 330s -- 330s -- Check that the provider exists 330s -- 330s if not exists (select no_id from public.sl_node where no_id= 330s p_provider) then 330s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_provider; 330s end if; 330s 330s 330s -- ---- 330s -- Check that this is called on the origin node 330s -- ---- 330s if p_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: subscribeSet() must be called on origin'; 330s end if; 330s 330s -- --- 330s -- Verify that the provider is either the origin or an active subscriber 330s -- Bug report #1362 330s -- --- 330s if p_origin <> p_provider then 330s for v_record in select sub1.sub_set from 330s public.sl_subscribe sub1 330s left outer join (public.sl_subscribe sub2 330s inner join 330s public.sl_set on ( 330s sl_set.set_id=sub2.sub_set 330s and sub2.sub_set=p_origin) 330s ) 330s ON ( sub1.sub_set = sub2.sub_set and 330s sub1.sub_receiver = p_provider and 330s sub1.sub_forward and sub1.sub_active 330s and sub2.sub_receiver=p_receiver) 330s 330s where sub2.sub_set is null 330s loop 330s v_missing_sets=v_missing_sets || ' ' || v_record.sub_set; 330s end loop; 330s if v_missing_sets is not null then 330s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, v_missing_sets; 330s end if; 330s end if; 330s 330s for v_record in select * from 330s public.sl_subscribe, public.sl_set where 330s sub_set=set_id and 330s sub_receiver=p_receiver 330s and set_origin=p_origin 330s loop 330s -- ---- 330s -- Create the SUBSCRIBE_SET event 330s -- ---- 330s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 330s v_record.sub_set::text, p_provider::text, p_receiver::text, 330s case v_record.sub_forward when true then 't' else 'f' end, 330s 'f' ); 330s 330s -- ---- 330s -- Call the internal procedure to store the subscription 330s -- ---- 330s perform public.subscribeSet_int(v_record.sub_set, 330s p_provider, 330s p_receiver, v_record.sub_forward, false); 330s end loop; 330s 330s return v_ev_seqno; 330s end; 330s $$ 330s language plpgsql; 330s CREATE FUNCTION 330s create or replace function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 330s returns bigint 330s as $$ 330s declare 330s v_set_origin int4; 330s v_ev_seqno int8; 330s v_ev_seqno2 int8; 330s v_rec record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- 330s -- Check that the receiver exists 330s -- 330s if not exists (select no_id from public.sl_node where no_id= 330s p_sub_receiver) then 330s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_sub_receiver; 330s end if; 330s 330s -- 330s -- Check that the provider exists 330s -- 330s if not exists (select no_id from public.sl_node where no_id= 330s p_sub_provider) then 330s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_sub_provider; 330s end if; 330s 330s -- ---- 330s -- Check that the origin and provider of the set are remote 330s -- ---- 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = p_sub_set; 330s if not found then 330s raise exception 'Slony-I: subscribeSet(): set % not found', p_sub_set; 330s end if; 330s if v_set_origin = p_sub_receiver then 330s raise exception 330s 'Slony-I: subscribeSet(): set origin and receiver cannot be identical'; 330s end if; 330s if p_sub_receiver = p_sub_provider then 330s raise exception 330s 'Slony-I: subscribeSet(): set provider and receiver cannot be identical'; 330s end if; 330s -- ---- 330s -- Check that this is called on the origin node 330s -- ---- 330s if v_set_origin != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: subscribeSet() must be called on origin'; 330s end if; 330s 330s -- --- 330s -- Verify that the provider is either the origin or an active subscriber 330s -- Bug report #1362 330s -- --- 330s if v_set_origin <> p_sub_provider then 330s if not exists (select 1 from public.sl_subscribe 330s where sub_set = p_sub_set and 330s sub_receiver = p_sub_provider and 330s sub_forward and sub_active) then 330s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, p_sub_set; 330s end if; 330s end if; 330s 330s -- --- 330s -- Enforce that all sets from one origin are subscribed 330s -- using the same data provider per receiver. 330s -- ---- 330s if not exists (select 1 from public.sl_subscribe 330s where sub_set = p_sub_set and sub_receiver = p_sub_receiver) then 330s -- 330s -- New subscription - error out if we have any other subscription 330s -- from that origin with a different data provider. 330s -- 330s for v_rec in select sub_provider from public.sl_subscribe 330s join public.sl_set on set_id = sub_set 330s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 330s loop 330s if v_rec.sub_provider <> p_sub_provider then 330s raise exception 'Slony-I: subscribeSet(): wrong provider % - existing subscription from origin % users provider %', 330s p_sub_provider, v_set_origin, v_rec.sub_provider; 330s end if; 330s end loop; 330s else 330s -- 330s -- Existing subscription - in case the data provider changes and 330s -- there are other subscriptions, warn here. subscribeSet_int() 330s -- will currently change the data provider for those sets as well. 330s -- 330s for v_rec in select set_id, sub_provider from public.sl_subscribe 330s join public.sl_set on set_id = sub_set 330s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 330s and set_id <> p_sub_set 330s loop 330s if v_rec.sub_provider <> p_sub_provider then 330s raise exception 'Slony-I: subscribeSet(): also data provider for set % use resubscribe instead', 330s v_rec.set_id; 330s end if; 330s end loop; 330s end if; 330s 330s -- ---- 330s -- Create the SUBSCRIBE_SET event 330s -- ---- 330s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 330s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 330s case p_sub_forward when true then 't' else 'f' end, 330s case p_omit_copy when true then 't' else 'f' end 330s ); 330s 330s -- ---- 330s -- Call the internal procedure to store the subscription 330s -- ---- 330s v_ev_seqno2:=public.subscribeSet_int(p_sub_set, p_sub_provider, 330s p_sub_receiver, p_sub_forward, p_omit_copy); 330s 330s if v_ev_seqno2 is not null then 330s v_ev_seqno:=v_ev_seqno2; 330s end if; 330s 330s return v_ev_seqno; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 330s 'subscribeSet (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 330s 330s Makes sure that the receiver is not the provider, then stores the 330s subscription, and publishes the SUBSCRIBE_SET event to other nodes. 330s 330s If omit_copy is true, then no data copy will be done. 330s '; 330s COMMENT 330s DROP FUNCTION IF EXISTS public.subscribeSet_int(int4,int4,int4,bool,bool); 330s DROP FUNCTION 330s create or replace function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 330s returns int4 330s as $$ 330s declare 330s v_set_origin int4; 330s v_sub_row record; 330s v_seq_id bigint; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Lookup the set origin 330s -- ---- 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = p_sub_set; 330s if not found then 330s raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set; 330s end if; 330s 330s -- ---- 330s -- Provider change is only allowed for active sets 330s -- ---- 330s if p_sub_receiver = public.getLocalNodeId('_main') then 330s select sub_active into v_sub_row from public.sl_subscribe 330s where sub_set = p_sub_set 330s and sub_receiver = p_sub_receiver; 330s if found then 330s if not v_sub_row.sub_active then 330s raise exception 'Slony-I: subscribeSet_int(): set % is not active, cannot change provider', 330s p_sub_set; 330s end if; 330s end if; 330s end if; 330s 330s -- ---- 330s -- Try to change provider and/or forward for an existing subscription 330s -- ---- 330s update public.sl_subscribe 330s set sub_provider = p_sub_provider, 330s sub_forward = p_sub_forward 330s where sub_set = p_sub_set 330s and sub_receiver = p_sub_receiver; 330s if found then 330s 330s -- ---- 330s -- This is changing a subscriptoin. Make sure all sets from 330s -- this origin are subscribed using the same data provider. 330s -- For this we first check that the requested data provider 330s -- is subscribed to all the sets, the receiver is subscribed to. 330s -- ---- 330s for v_sub_row in select set_id from public.sl_set 330s join public.sl_subscribe on set_id = sub_set 330s where set_origin = v_set_origin 330s and sub_receiver = p_sub_receiver 330s and sub_set <> p_sub_set 330s loop 330s if not exists (select 1 from public.sl_subscribe 330s where sub_set = v_sub_row.set_id 330s and sub_receiver = p_sub_provider 330s and sub_active and sub_forward) 330s and not exists (select 1 from public.sl_set 330s where set_id = v_sub_row.set_id 330s and set_origin = p_sub_provider) 330s then 330s raise exception 'Slony-I: subscribeSet_int(): node % is not a forwarding subscriber for set %', 330s p_sub_provider, v_sub_row.set_id; 330s end if; 330s 330s -- ---- 330s -- New data provider offers this set as well, change that 330s -- subscription too. 330s -- ---- 330s update public.sl_subscribe 330s set sub_provider = p_sub_provider 330s where sub_set = v_sub_row.set_id 330s and sub_receiver = p_sub_receiver; 330s end loop; 330s 330s -- ---- 330s -- Rewrite sl_listen table 330s -- ---- 330s perform public.RebuildListenEntries(); 330s 330s return p_sub_set; 330s end if; 330s 330s -- ---- 330s -- Not found, insert a new one 330s -- ---- 330s if not exists (select true from public.sl_path 330s where pa_server = p_sub_provider 330s and pa_client = p_sub_receiver) 330s then 330s insert into public.sl_path 330s (pa_server, pa_client, pa_conninfo, pa_connretry) 330s values 330s (p_sub_provider, p_sub_receiver, 330s '', 10); 330s end if; 330s insert into public.sl_subscribe 330s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 330s values (p_sub_set, p_sub_provider, p_sub_receiver, 330s p_sub_forward, false); 330s 330s -- ---- 330s -- If the set origin is here, then enable the subscription 330s -- ---- 330s if v_set_origin = public.getLocalNodeId('_main') then 330s select public.createEvent('_main', 'ENABLE_SUBSCRIPTION', 330s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 330s case p_sub_forward when true then 't' else 'f' end, 330s case p_omit_copy when true then 't' else 'f' end 330s ) into v_seq_id; 330s perform public.enableSubscription(p_sub_set, 330s p_sub_provider, p_sub_receiver); 330s end if; 330s 330s -- ---- 330s -- Rewrite sl_listen table 330s -- ---- 330s perform public.RebuildListenEntries(); 330s 330s return p_sub_set; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 330s 'subscribeSet_int (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 330s 330s Internal actions for subscribing receiver sub_receiver to subscription 330s set sub_set.'; 330s COMMENT 330s drop function IF EXISTS public.unsubscribeSet(int4,int4,boolean); 330s DROP FUNCTION 330s create or replace function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,p_force boolean) 330s returns bigint 330s as $$ 330s declare 330s v_tab_row record; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- Check that this is called on the receiver node 330s -- ---- 330s if p_sub_receiver != public.getLocalNodeId('_main') then 330s raise exception 'Slony-I: unsubscribeSet() must be called on receiver'; 330s end if; 330s 330s 330s 330s -- ---- 330s -- Check that this does not break any chains 330s -- ---- 330s if p_force=false and exists (select true from public.sl_subscribe 330s where sub_set = p_sub_set 330s and sub_provider = p_sub_receiver) 330s then 330s raise exception 'Slony-I: Cannot unsubscribe set % while being provider', 330s p_sub_set; 330s end if; 330s 330s if exists (select true from public.sl_subscribe 330s where sub_set = p_sub_set 330s and sub_provider = p_sub_receiver) 330s then 330s --delete the receivers of this provider. 330s --unsubscribeSet_int() will generate the event 330s --when it runs on the receiver. 330s delete from public.sl_subscribe 330s where sub_set=p_sub_set 330s and sub_provider=p_sub_receiver; 330s end if; 330s 330s -- ---- 330s -- Remove the replication triggers. 330s -- ---- 330s for v_tab_row in select tab_id from public.sl_table 330s where tab_set = p_sub_set 330s order by tab_id 330s loop 330s perform public.alterTableDropTriggers(v_tab_row.tab_id); 330s end loop; 330s 330s -- ---- 330s -- Remove the setsync status. This will also cause the 330s -- worker thread to ignore the set and stop replicating 330s -- right now. 330s -- ---- 330s delete from public.sl_setsync 330s where ssy_setid = p_sub_set; 330s 330s -- ---- 330s -- Remove all sl_table and sl_sequence entries for this set. 330s -- Should we ever subscribe again, the initial data 330s -- copy process will create new ones. 330s -- ---- 330s delete from public.sl_table 330s where tab_set = p_sub_set; 330s delete from public.sl_sequence 330s where seq_set = p_sub_set; 330s 330s -- ---- 330s -- Call the internal procedure to drop the subscription 330s -- ---- 330s perform public.unsubscribeSet_int(p_sub_set, p_sub_receiver); 330s 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s -- ---- 330s -- Create the UNSUBSCRIBE_SET event 330s -- ---- 330s return public.createEvent('_main', 'UNSUBSCRIBE_SET', 330s p_sub_set::text, p_sub_receiver::text); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,force boolean) is 330s 'unsubscribeSet (sub_set, sub_receiver,force) 330s 330s Unsubscribe node sub_receiver from subscription set sub_set. This is 330s invoked on the receiver node. It verifies that this does not break 330s any chains (e.g. - where sub_receiver is a provider for another node), 330s then restores tables, drops Slony-specific keys, drops table entries 330s for the set, drops the subscription, and generates an UNSUBSCRIBE_SET 330s node to publish that the node is being dropped.'; 330s COMMENT 330s create or replace function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) 330s returns int4 330s as $$ 330s declare 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- All the real work is done before event generation on the 330s -- subscriber. 330s -- ---- 330s 330s --if this event unsubscribes the provider of this node 330s --then this node should unsubscribe itself from the set as well. 330s 330s if exists (select true from 330s public.sl_subscribe where 330s sub_set=p_sub_set and sub_provider=p_sub_receiver 330s and sub_receiver=public.getLocalNodeId('_main')) 330s then 330s perform public.unsubscribeSet(p_sub_set,public.getLocalNodeId('_main'),true); 330s end if; 330s 330s 330s delete from public.sl_subscribe 330s where sub_set = p_sub_set 330s and sub_receiver = p_sub_receiver; 330s 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s return p_sub_set; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) is 330s 'unsubscribeSet_int (sub_set, sub_receiver) 330s 330s All the REAL work of removing the subscriber is done before the event 330s is generated, so this function just has to drop the references to the 330s subscription in sl_subscribe.'; 330s COMMENT 330s create or replace function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 330s returns int4 330s as $$ 330s begin 330s return public.enableSubscription_int (p_sub_set, 330s p_sub_provider, p_sub_receiver); 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 330s 'enableSubscription (sub_set, sub_provider, sub_receiver) 330s 330s Indicates that sub_receiver intends subscribing to set sub_set from 330s sub_provider. Work is all done by the internal function 330s enableSubscription_int (sub_set, sub_provider, sub_receiver).'; 330s COMMENT 330s create or replace function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 330s returns int4 330s as $$ 330s declare 330s v_n int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- ---- 330s -- The real work is done in the replication engine. All 330s -- we have to do here is remembering that it happened. 330s -- ---- 330s 330s -- ---- 330s -- Well, not only ... we might be missing an important event here 330s -- ---- 330s if not exists (select true from public.sl_path 330s where pa_server = p_sub_provider 330s and pa_client = p_sub_receiver) 330s then 330s insert into public.sl_path 330s (pa_server, pa_client, pa_conninfo, pa_connretry) 330s values 330s (p_sub_provider, p_sub_receiver, 330s '', 10); 330s end if; 330s 330s update public.sl_subscribe 330s set sub_active = 't' 330s where sub_set = p_sub_set 330s and sub_receiver = p_sub_receiver; 330s get diagnostics v_n = row_count; 330s if v_n = 0 then 330s insert into public.sl_subscribe 330s (sub_set, sub_provider, sub_receiver, 330s sub_forward, sub_active) 330s values 330s (p_sub_set, p_sub_provider, p_sub_receiver, 330s false, true); 330s end if; 330s 330s -- Rewrite sl_listen table 330s perform public.RebuildListenEntries(); 330s 330s return p_sub_set; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 330s 'enableSubscription_int (sub_set, sub_provider, sub_receiver) 330s 330s Internal function to enable subscription of node sub_receiver to set 330s sub_set via node sub_provider. 330s 330s slon does most of the work; all we need do here is to remember that it 330s happened. The function updates sl_subscribe, indicating that the 330s subscription has become active.'; 330s COMMENT 330s create or replace function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) 330s returns bigint 330s as $$ 330s declare 330s v_max_seqno bigint; 330s begin 330s select into v_max_seqno coalesce(max(con_seqno), 0) 330s from public.sl_confirm 330s where con_origin = p_con_origin 330s and con_received = p_con_received; 330s if v_max_seqno < p_con_seqno then 330s insert into public.sl_confirm 330s (con_origin, con_received, con_seqno, con_timestamp) 330s values (p_con_origin, p_con_received, p_con_seqno, 330s p_con_timestamp); 330s v_max_seqno = p_con_seqno; 330s end if; 330s 330s return v_max_seqno; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) is 330s 'forwardConfirm (p_con_origin, p_con_received, p_con_seqno, p_con_timestamp) 330s 330s Confirms (recorded in sl_confirm) that items from p_con_origin up to 330s p_con_seqno have been received by node p_con_received as of 330s p_con_timestamp, and raises an event to forward this confirmation.'; 330s COMMENT 330s create or replace function public.cleanupEvent (p_interval interval) 330s returns int4 330s as $$ 330s declare 330s v_max_row record; 330s v_min_row record; 330s v_max_sync int8; 330s v_origin int8; 330s v_seqno int8; 330s v_xmin bigint; 330s v_rc int8; 330s begin 330s -- ---- 330s -- First remove all confirmations where origin/receiver no longer exist 330s -- ---- 330s delete from public.sl_confirm 330s where con_origin not in (select no_id from public.sl_node); 330s delete from public.sl_confirm 330s where con_received not in (select no_id from public.sl_node); 330s -- ---- 330s -- Next remove all but the oldest confirm row per origin,receiver pair. 330s -- Ignore confirmations that are younger than 10 minutes. We currently 330s -- have an not confirmed suspicion that a possibly lost transaction due 330s -- to a server crash might have been visible to another session, and 330s -- that this led to log data that is needed again got removed. 330s -- ---- 330s for v_max_row in select con_origin, con_received, max(con_seqno) as con_seqno 330s from public.sl_confirm 330s where con_timestamp < (CURRENT_TIMESTAMP - p_interval) 330s group by con_origin, con_received 330s loop 330s delete from public.sl_confirm 330s where con_origin = v_max_row.con_origin 330s and con_received = v_max_row.con_received 330s and con_seqno < v_max_row.con_seqno; 330s end loop; 330s 330s -- ---- 330s -- Then remove all events that are confirmed by all nodes in the 330s -- whole cluster up to the last SYNC 330s -- ---- 330s for v_min_row in select con_origin, min(con_seqno) as con_seqno 330s from public.sl_confirm 330s group by con_origin 330s loop 330s select coalesce(max(ev_seqno), 0) into v_max_sync 330s from public.sl_event 330s where ev_origin = v_min_row.con_origin 330s and ev_seqno <= v_min_row.con_seqno 330s and ev_type = 'SYNC'; 330s if v_max_sync > 0 then 330s delete from public.sl_event 330s where ev_origin = v_min_row.con_origin 330s and ev_seqno < v_max_sync; 330s end if; 330s end loop; 330s 330s -- ---- 330s -- If cluster has only one node, then remove all events up to 330s -- the last SYNC - Bug #1538 330s -- http://gborg.postgresql.org/project/slony1/bugs/bugupdate.php?1538 330s -- ---- 330s 330s select * into v_min_row from public.sl_node where 330s no_id <> public.getLocalNodeId('_main') limit 1; 330s if not found then 330s select ev_origin, ev_seqno into v_min_row from public.sl_event 330s where ev_origin = public.getLocalNodeId('_main') 330s order by ev_origin desc, ev_seqno desc limit 1; 330s raise notice 'Slony-I: cleanupEvent(): Single node - deleting events < %', v_min_row.ev_seqno; 330s delete from public.sl_event 330s where 330s ev_origin = v_min_row.ev_origin and 330s ev_seqno < v_min_row.ev_seqno; 330s 330s end if; 330s 330s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_seqlog' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 330s execute 'alter table public.sl_seqlog set without oids;'; 330s end if; 330s -- ---- 330s -- Also remove stale entries from the nodelock table. 330s -- ---- 330s perform public.cleanupNodelock(); 330s 330s -- ---- 330s -- Find the eldest event left, for each origin 330s -- ---- 330s for v_origin, v_seqno, v_xmin in 330s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 330s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 330s loop 330s delete from public.sl_seqlog where seql_origin = v_origin and seql_ev_seqno < v_seqno; 330s delete from public.sl_log_script where log_origin = v_origin and log_txid < v_xmin; 330s end loop; 330s 330s v_rc := public.logswitch_finish(); 330s if v_rc = 0 then -- no switch in progress 330s perform public.logswitch_start(); 330s end if; 330s 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.cleanupEvent (p_interval interval) is 330s 'cleaning old data out of sl_confirm, sl_event. Removes all but the 330s last sl_confirm row per (origin,receiver), and then removes all events 330s that are confirmed by all nodes in the whole cluster up to the last 330s SYNC.'; 330s COMMENT 330s create or replace function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) returns name 330s as $$ 330s declare 330s v_tab_fqname_quoted text default ''; 330s v_idxrow record; 330s begin 330s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 330s -- 330s -- Ensure that the table exists 330s -- 330s if (select PGC.relname 330s from "pg_catalog".pg_class PGC, 330s "pg_catalog".pg_namespace PGN 330s where public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 330s and PGN.oid = PGC.relnamespace) is null then 330s raise exception 'Slony-I: determineIdxnameUnique(): table % not found', v_tab_fqname_quoted; 330s end if; 330s 330s -- 330s -- Lookup the tables primary key or the specified unique index 330s -- 330s if p_idx_name isnull then 330s select PGXC.relname 330s into v_idxrow 330s from "pg_catalog".pg_class PGC, 330s "pg_catalog".pg_namespace PGN, 330s "pg_catalog".pg_index PGX, 330s "pg_catalog".pg_class PGXC 330s where public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 330s and PGN.oid = PGC.relnamespace 330s and PGX.indrelid = PGC.oid 330s and PGX.indexrelid = PGXC.oid 330s and PGX.indisprimary; 330s if not found then 330s raise exception 'Slony-I: table % has no primary key', 330s v_tab_fqname_quoted; 330s end if; 330s else 330s select PGXC.relname 330s into v_idxrow 330s from "pg_catalog".pg_class PGC, 330s "pg_catalog".pg_namespace PGN, 330s "pg_catalog".pg_index PGX, 330s "pg_catalog".pg_class PGXC 330s where public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 330s and PGN.oid = PGC.relnamespace 330s and PGX.indrelid = PGC.oid 330s and PGX.indexrelid = PGXC.oid 330s and PGX.indisunique 330s and public.slon_quote_brute(PGXC.relname) = public.slon_quote_input(p_idx_name); 330s if not found then 330s raise exception 'Slony-I: table % has no unique index %', 330s v_tab_fqname_quoted, p_idx_name; 330s end if; 330s end if; 330s 330s -- 330s -- Return the found index name 330s -- 330s return v_idxrow.relname; 330s end; 330s $$ language plpgsql called on null input; 330s CREATE FUNCTION 330s comment on function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) is 330s 'FUNCTION determineIdxnameUnique (tab_fqname, indexname) 330s 330s Given a tablename, tab_fqname, check that the unique index, indexname, 330s exists or return the primary key index name for the table. If there 330s is no unique index, it raises an exception.'; 330s COMMENT 330s create or replace function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) returns text 330s as $$ 330s declare 330s v_tab_fqname_quoted text default ''; 330s v_idx_name_quoted text; 330s v_idxrow record; 330s v_attrow record; 330s v_i integer; 330s v_attno int2; 330s v_attkind text default ''; 330s v_attfound bool; 330s begin 330s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 330s v_idx_name_quoted := public.slon_quote_brute(p_idx_name); 330s -- 330s -- Ensure that the table exists 330s -- 330s if (select PGC.relname 330s from "pg_catalog".pg_class PGC, 330s "pg_catalog".pg_namespace PGN 330s where public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 330s and PGN.oid = PGC.relnamespace) is null then 330s raise exception 'Slony-I: table % not found', v_tab_fqname_quoted; 330s end if; 330s 330s -- 330s -- Lookup the tables primary key or the specified unique index 330s -- 330s if p_idx_name isnull then 330s raise exception 'Slony-I: index name must be specified'; 330s else 330s select PGXC.relname, PGX.indexrelid, PGX.indkey 330s into v_idxrow 330s from "pg_catalog".pg_class PGC, 330s "pg_catalog".pg_namespace PGN, 330s "pg_catalog".pg_index PGX, 330s "pg_catalog".pg_class PGXC 330s where public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 330s and PGN.oid = PGC.relnamespace 330s and PGX.indrelid = PGC.oid 330s and PGX.indexrelid = PGXC.oid 330s and PGX.indisunique 330s and public.slon_quote_brute(PGXC.relname) = v_idx_name_quoted; 330s if not found then 330s raise exception 'Slony-I: table % has no unique index %', 330s v_tab_fqname_quoted, v_idx_name_quoted; 330s end if; 330s end if; 330s 330s -- 330s -- Loop over the tables attributes and check if they are 330s -- index attributes. If so, add a "k" to the return value, 330s -- otherwise add a "v". 330s -- 330s for v_attrow in select PGA.attnum, PGA.attname 330s from "pg_catalog".pg_class PGC, 330s "pg_catalog".pg_namespace PGN, 330s "pg_catalog".pg_attribute PGA 330s where public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 330s and PGN.oid = PGC.relnamespace 330s and PGA.attrelid = PGC.oid 330s and not PGA.attisdropped 330s and PGA.attnum > 0 330s order by attnum 330s loop 330s v_attfound = 'f'; 330s 330s v_i := 0; 330s loop 330s select indkey[v_i] into v_attno from "pg_catalog".pg_index 330s where indexrelid = v_idxrow.indexrelid; 330s if v_attno isnull or v_attno = 0 then 330s exit; 330s end if; 330s if v_attrow.attnum = v_attno then 330s v_attfound = 't'; 330s exit; 330s end if; 330s v_i := v_i + 1; 330s end loop; 330s 330s if v_attfound then 330s v_attkind := v_attkind || 'k'; 330s else 330s v_attkind := v_attkind || 'v'; 330s end if; 330s end loop; 330s 330s -- Strip off trailing v characters as they are not needed by the logtrigger 330s v_attkind := pg_catalog.rtrim(v_attkind, 'v'); 330s 330s -- 330s -- Return the resulting attkind 330s -- 330s return v_attkind; 330s end; 330s $$ language plpgsql called on null input; 330s CREATE FUNCTION 330s comment on function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) is 330s 'determineAttKindUnique (tab_fqname, indexname) 330s 330s Given a tablename, return the Slony-I specific attkind (used for the 330s log trigger) of the table. Use the specified unique index or the 330s primary key (if indexname is NULL).'; 330s COMMENT 330s NOTICE: function public.updaterelname(int4,int4) does not exist, skipping 330s create or replace function public.RebuildListenEntries() 330s returns int 330s as $$ 330s declare 330s v_row record; 330s v_cnt integer; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s -- First remove the entire configuration 330s delete from public.sl_listen; 330s 330s -- Second populate the sl_listen configuration with a full 330s -- network of all possible paths. 330s insert into public.sl_listen 330s (li_origin, li_provider, li_receiver) 330s select pa_server, pa_server, pa_client from public.sl_path; 330s while true loop 330s insert into public.sl_listen 330s (li_origin, li_provider, li_receiver) 330s select distinct li_origin, pa_server, pa_client 330s from public.sl_listen, public.sl_path 330s where li_receiver = pa_server 330s and li_origin <> pa_client 330s and pa_conninfo<>'' 330s except 330s select li_origin, li_provider, li_receiver 330s from public.sl_listen; 330s 330s if not found then 330s exit; 330s end if; 330s end loop; 330s 330s -- We now replace specific event-origin,receiver combinations 330s -- with a configuration that tries to avoid events arriving at 330s -- a node before the data provider actually has the data ready. 330s 330s -- Loop over every possible pair of receiver and event origin 330s for v_row in select N1.no_id as receiver, N2.no_id as origin, 330s N2.no_failed as failed 330s from public.sl_node as N1, public.sl_node as N2 330s where N1.no_id <> N2.no_id 330s loop 330s -- 1st choice: 330s -- If we use the event origin as a data provider for any 330s -- set that originates on that very node, we are a direct 330s -- subscriber to that origin and listen there only. 330s if exists (select true from public.sl_set, public.sl_subscribe , public.sl_node p 330s where set_origin = v_row.origin 330s and sub_set = set_id 330s and sub_provider = v_row.origin 330s and sub_receiver = v_row.receiver 330s and sub_active 330s and p.no_active 330s and p.no_id=sub_provider 330s ) 330s then 330s delete from public.sl_listen 330s where li_origin = v_row.origin 330s and li_receiver = v_row.receiver; 330s insert into public.sl_listen (li_origin, li_provider, li_receiver) 330s values (v_row.origin, v_row.origin, v_row.receiver); 330s 330s -- 2nd choice: 330s -- If we are subscribed to any set originating on this 330s -- event origin, we want to listen on all data providers 330s -- we use for this origin. We are a cascaded subscriber 330s -- for sets from this node. 330s else 330s if exists (select true from public.sl_set, public.sl_subscribe, 330s public.sl_node provider 330s where set_origin = v_row.origin 330s and sub_set = set_id 330s and sub_provider=provider.no_id 330s and provider.no_failed = false 330s and sub_receiver = v_row.receiver 330s and sub_active) 330s then 330s delete from public.sl_listen 330s where li_origin = v_row.origin 330s and li_receiver = v_row.receiver; 330s insert into public.sl_listen (li_origin, li_provider, li_receiver) 330s select distinct set_origin, sub_provider, v_row.receiver 330s from public.sl_set, public.sl_subscribe 330s where set_origin = v_row.origin 330s and sub_set = set_id 330s and sub_receiver = v_row.receiver 330s and sub_active; 330s end if; 330s end if; 330s 330s if v_row.failed then 330s 330s --for every failed node we delete all sl_listen entries 330s --except via providers (listed in sl_subscribe) 330s --or failover candidates (sl_failover_targets) 330s --we do this to prevent a non-failover candidate 330s --that is more ahead of the failover candidate from 330s --sending events to the failover candidate that 330s --are 'too far ahead' 330s 330s --if the failed node is not an origin for any 330s --node then we don't delete all listen paths 330s --for events from it. Instead we leave 330s --the listen network alone. 330s 330s select count(*) into v_cnt from public.sl_subscribe sub, 330s public.sl_set s 330s where s.set_origin=v_row.origin and s.set_id=sub.sub_set; 330s if v_cnt > 0 then 330s delete from public.sl_listen where 330s li_origin=v_row.origin and 330s li_receiver=v_row.receiver 330s and li_provider not in 330s (select sub_provider from 330s public.sl_subscribe, 330s public.sl_set where 330s sub_set=set_id 330s and set_origin=v_row.origin); 330s end if; 330s end if; 330s -- insert into public.sl_listen 330s -- (li_origin,li_provider,li_receiver) 330s -- SELECT v_row.origin, pa_server 330s -- ,v_row.receiver 330s -- FROM public.sl_path where 330s -- pa_client=v_row.receiver 330s -- and (v_row.origin,pa_server,v_row.receiver) not in 330s -- (select li_origin,li_provider,li_receiver 330s -- from public.sl_listen); 330s -- end if; 330s end loop ; 330s 330s return null ; 330s end ; 330s $$ language 'plpgsql'; 330s CREATE FUNCTION 330s comment on function public.RebuildListenEntries() is 330s 'RebuildListenEntries() 330s 330s Invoked by various subscription and path modifying functions, this 330s rewrites the sl_listen entries, adding in all the ones required to 330s allow communications between nodes in the Slony-I cluster.'; 330s COMMENT 330s create or replace function public.generate_sync_event(p_interval interval) 330s returns int4 330s as $$ 330s declare 330s v_node_row record; 330s 330s BEGIN 330s select 1 into v_node_row from public.sl_event 330s where ev_type = 'SYNC' and ev_origin = public.getLocalNodeId('_main') 330s and ev_timestamp > now() - p_interval limit 1; 330s if not found then 330s -- If there has been no SYNC in the last interval, then push one 330s perform public.createEvent('_main', 'SYNC', NULL); 330s return 1; 330s else 330s return 0; 330s end if; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.generate_sync_event(p_interval interval) is 330s 'Generate a sync event if there has not been one in the requested interval, and this is a provider node.'; 330s COMMENT 330s drop function if exists public.updateRelname(int4, int4); 330s DROP FUNCTION 330s create or replace function public.updateRelname () 330s returns int4 330s as $$ 330s declare 330s v_no_id int4; 330s v_set_origin int4; 330s begin 330s -- ---- 330s -- Grab the central configuration lock 330s -- ---- 330s lock table public.sl_config_lock; 330s 330s update public.sl_table set 330s tab_relname = PGC.relname, tab_nspname = PGN.nspname 330s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 330s where public.sl_table.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid and 330s (tab_relname <> PGC.relname or tab_nspname <> PGN.nspname); 330s update public.sl_sequence set 330s seq_relname = PGC.relname, seq_nspname = PGN.nspname 330s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 330s where public.sl_sequence.seq_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid and 330s (seq_relname <> PGC.relname or seq_nspname <> PGN.nspname); 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.updateRelname() is 330s 'updateRelname()'; 330s COMMENT 330s drop function if exists public.updateReloid (int4, int4); 330s DROP FUNCTION 330s create or replace function public.updateReloid (p_set_id int4, p_only_on_node int4) 330s returns bigint 330s as $$ 330s declare 330s v_no_id int4; 330s v_set_origin int4; 330s prec record; 330s begin 330s -- ---- 330s -- Check that we either are the set origin or a current 330s -- subscriber of the set. 330s -- ---- 330s v_no_id := public.getLocalNodeId('_main'); 330s select set_origin into v_set_origin 330s from public.sl_set 330s where set_id = p_set_id 330s for update; 330s if not found then 330s raise exception 'Slony-I: set % not found', p_set_id; 330s end if; 330s if v_set_origin <> v_no_id 330s and not exists (select 1 from public.sl_subscribe 330s where sub_set = p_set_id 330s and sub_receiver = v_no_id) 330s then 330s return 0; 330s end if; 330s 330s -- ---- 330s -- If execution on only one node is requested, check that 330s -- we are that node. 330s -- ---- 330s if p_only_on_node > 0 and p_only_on_node <> v_no_id then 330s return 0; 330s end if; 330s 330s -- Update OIDs for tables to values pulled from non-table objects in pg_class 330s -- This ensures that we won't have collisions when repairing the oids 330s for prec in select tab_id from public.sl_table loop 330s update public.sl_table set tab_reloid = (select oid from pg_class pc where relkind <> 'r' and not exists (select 1 from public.sl_table t2 where t2.tab_reloid = pc.oid) limit 1) 330s where tab_id = prec.tab_id; 330s end loop; 330s 330s for prec in select tab_id, tab_relname, tab_nspname from public.sl_table loop 330s update public.sl_table set 330s tab_reloid = (select PGC.oid 330s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 330s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.tab_relname) 330s and PGC.relnamespace = PGN.oid 330s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.tab_nspname)) 330s where tab_id = prec.tab_id; 330s end loop; 330s 330s for prec in select seq_id from public.sl_sequence loop 330s update public.sl_sequence set seq_reloid = (select oid from pg_class pc where relkind <> 'S' and not exists (select 1 from public.sl_sequence t2 where t2.seq_reloid = pc.oid) limit 1) 330s where seq_id = prec.seq_id; 330s end loop; 330s 330s for prec in select seq_id, seq_relname, seq_nspname from public.sl_sequence loop 330s update public.sl_sequence set 330s seq_reloid = (select PGC.oid 330s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 330s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.seq_relname) 330s and PGC.relnamespace = PGN.oid 330s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.seq_nspname)) 330s where seq_id = prec.seq_id; 330s end loop; 330s 330s return 1; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.updateReloid(p_set_id int4, p_only_on_node int4) is 330s 'updateReloid(set_id, only_on_node) 330s 330s Updates the respective reloids in sl_table and sl_seqeunce based on 330s their respective FQN'; 330s COMMENT 330s create or replace function public.logswitch_start() 330s returns int4 as $$ 330s DECLARE 330s v_current_status int4; 330s BEGIN 330s -- ---- 330s -- Get the current log status. 330s -- ---- 330s select last_value into v_current_status from public.sl_log_status; 330s 330s -- ---- 330s -- status = 0: sl_log_1 active, sl_log_2 clean 330s -- Initiate a switch to sl_log_2. 330s -- ---- 330s if v_current_status = 0 then 330s perform "pg_catalog".setval('public.sl_log_status', 3); 330s perform public.registry_set_timestamp( 330s 'logswitch.laststart', now()); 330s raise notice 'Slony-I: Logswitch to sl_log_2 initiated'; 330s return 2; 330s end if; 330s 330s -- ---- 330s -- status = 1: sl_log_2 active, sl_log_1 clean 330s -- Initiate a switch to sl_log_1. 330s -- ---- 330s if v_current_status = 1 then 330s perform "pg_catalog".setval('public.sl_log_status', 2); 330s perform public.registry_set_timestamp( 330s 'logswitch.laststart', now()); 330s raise notice 'Slony-I: Logswitch to sl_log_1 initiated'; 330s return 1; 330s end if; 330s 330s raise exception 'Previous logswitch still in progress'; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.logswitch_start() is 330s 'logswitch_start() 330s 330s Initiate a log table switch if none is in progress'; 330s COMMENT 330s NOTICE: function public.updatereloid(int4,int4) does not exist, skipping 330s create or replace function public.logswitch_finish() 330s returns int4 as $$ 330s DECLARE 330s v_current_status int4; 330s v_dummy record; 330s v_origin int8; 330s v_seqno int8; 330s v_xmin bigint; 330s v_purgeable boolean; 330s BEGIN 330s -- ---- 330s -- Get the current log status. 330s -- ---- 330s select last_value into v_current_status from public.sl_log_status; 330s 330s -- ---- 330s -- status value 0 or 1 means that there is no log switch in progress 330s -- ---- 330s if v_current_status = 0 or v_current_status = 1 then 330s return 0; 330s end if; 330s 330s -- ---- 330s -- status = 2: sl_log_1 active, cleanup sl_log_2 330s -- ---- 330s if v_current_status = 2 then 330s v_purgeable := 'true'; 330s 330s -- ---- 330s -- Attempt to lock sl_log_2 in order to make sure there are no other transactions 330s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 330s -- blocking writers to sl_log_2 while it is waiting for a lock. It also prevents it 330s -- immediately truncating log data generated inside the transaction which was active 330s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 330s -- transaction is committed. 330s -- ---- 330s begin 330s lock table public.sl_log_2 in access exclusive mode nowait; 330s exception when lock_not_available then 330s raise notice 'Slony-I: could not lock sl_log_2 - sl_log_2 not truncated'; 330s return -1; 330s end; 330s 330s -- ---- 330s -- The cleanup thread calls us after it did the delete and 330s -- vacuum of both log tables. If sl_log_2 is empty now, we 330s -- can truncate it and the log switch is done. 330s -- ---- 330s for v_origin, v_seqno, v_xmin in 330s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 330s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 330s loop 330s if exists (select 1 from public.sl_log_2 where log_origin = v_origin and log_txid >= v_xmin limit 1) then 330s v_purgeable := 'false'; 330s end if; 330s end loop; 330s if not v_purgeable then 330s -- ---- 330s -- Found a row ... log switch is still in progress. 330s -- ---- 330s raise notice 'Slony-I: log switch to sl_log_1 still in progress - sl_log_2 not truncated'; 330s return -1; 330s end if; 330s 330s raise notice 'Slony-I: log switch to sl_log_1 complete - truncate sl_log_2'; 330s truncate public.sl_log_2; 330s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_2' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 330s execute 'alter table public.sl_log_2 set without oids;'; 330s end if; 330s perform "pg_catalog".setval('public.sl_log_status', 0); 330s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 330s perform public.addPartialLogIndices(); 330s 330s return 1; 330s end if; 330s 330s -- ---- 330s -- status = 3: sl_log_2 active, cleanup sl_log_1 330s -- ---- 330s if v_current_status = 3 then 330s v_purgeable := 'true'; 330s 330s -- ---- 330s -- Attempt to lock sl_log_1 in order to make sure there are no other transactions 330s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 330s -- blocking writes to sl_log_1 while it is waiting for a lock. It also prevents it 330s -- immediately truncating log data generated inside the transaction which was active 330s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 330s -- transaction is committed. 330s -- ---- 330s begin 330s lock table public.sl_log_1 in access exclusive mode nowait; 330s exception when lock_not_available then 330s raise notice 'Slony-I: could not lock sl_log_1 - sl_log_1 not truncated'; 330s return -1; 330s end; 330s 330s -- ---- 330s -- The cleanup thread calls us after it did the delete and 330s -- vacuum of both log tables. If sl_log_2 is empty now, we 330s -- can truncate it and the log switch is done. 330s -- ---- 330s for v_origin, v_seqno, v_xmin in 330s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 330s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 330s loop 330s if (exists (select 1 from public.sl_log_1 where log_origin = v_origin and log_txid >= v_xmin limit 1)) then 330s v_purgeable := 'false'; 330s end if; 330s end loop; 330s if not v_purgeable then 330s -- ---- 330s -- Found a row ... log switch is still in progress. 330s -- ---- 330s raise notice 'Slony-I: log switch to sl_log_2 still in progress - sl_log_1 not truncated'; 330s return -1; 330s end if; 330s 330s raise notice 'Slony-I: log switch to sl_log_2 complete - truncate sl_log_1'; 330s truncate public.sl_log_1; 330s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_1' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 330s execute 'alter table public.sl_log_1 set without oids;'; 330s end if; 330s perform "pg_catalog".setval('public.sl_log_status', 1); 330s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 330s perform public.addPartialLogIndices(); 330s return 2; 330s end if; 330s END; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.logswitch_finish() is 330s 'logswitch_finish() 330s 330s Attempt to finalize a log table switch in progress 330s return values: 330s -1 if switch in progress, but not complete 330s 0 if no switch in progress 330s 1 if performed truncate on sl_log_2 330s 2 if performed truncate on sl_log_1 330s '; 330s COMMENT 330s create or replace function public.addPartialLogIndices () returns integer as $$ 330s DECLARE 330s v_current_status int4; 330s v_log int4; 330s v_dummy record; 330s v_dummy2 record; 330s idef text; 330s v_count int4; 330s v_iname text; 330s v_ilen int4; 330s v_maxlen int4; 330s BEGIN 330s v_count := 0; 330s select last_value into v_current_status from public.sl_log_status; 330s 330s -- If status is 2 or 3 --> in process of cleanup --> unsafe to create indices 330s if v_current_status in (2, 3) then 330s return 0; 330s end if; 330s 330s if v_current_status = 0 then -- Which log should get indices? 330s v_log := 2; 330s else 330s v_log := 1; 330s end if; 330s -- PartInd_test_db_sl_log_2-node-1 330s -- Add missing indices... 330s for v_dummy in select distinct set_origin from public.sl_set loop 330s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' 330s || v_dummy.set_origin::text; 330s -- raise notice 'Consider adding partial index % on sl_log_%', v_iname, v_log; 330s -- raise notice 'schema: [_main] tablename:[sl_log_%]', v_log; 330s select * into v_dummy2 from pg_catalog.pg_indexes where tablename = 'sl_log_' || v_log::text and indexname = v_iname; 330s if not found then 330s -- raise notice 'index was not found - add it!'; 330s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' || v_dummy.set_origin::text; 330s v_ilen := pg_catalog.length(v_iname); 330s v_maxlen := pg_catalog.current_setting('max_identifier_length'::text)::int4; 330s if v_ilen > v_maxlen then 330s raise exception 'Length of proposed index name [%] > max_identifier_length [%] - cluster name probably too long', v_ilen, v_maxlen; 330s end if; 330s 330s idef := 'create index "' || v_iname || 330s '" on public.sl_log_' || v_log::text || ' USING btree(log_txid) where (log_origin = ' || v_dummy.set_origin::text || ');'; 330s execute idef; 330s v_count := v_count + 1; 330s else 330s -- raise notice 'Index % already present - skipping', v_iname; 330s end if; 330s end loop; 330s 330s -- Remove unneeded indices... 330s for v_dummy in select indexname from pg_catalog.pg_indexes i where i.tablename = 'sl_log_' || v_log::text and 330s i.indexname like ('PartInd_main_sl_log_' || v_log::text || '-node-%') and 330s not exists (select 1 from public.sl_set where 330s i.indexname = 'PartInd_main_sl_log_' || v_log::text || '-node-' || set_origin::text) 330s loop 330s -- raise notice 'Dropping obsolete index %d', v_dummy.indexname; 330s idef := 'drop index public."' || v_dummy.indexname || '";'; 330s execute idef; 330s v_count := v_count - 1; 330s end loop; 330s return v_count; 330s END 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.addPartialLogIndices () is 330s 'Add partial indexes, if possible, to the unused sl_log_? table for 330s all origin nodes, and drop any that are no longer needed. 330s 330s This function presently gets run any time set origins are manipulated 330s (FAILOVER, STORE SET, MOVE SET, DROP SET), as well as each time the 330s system switches between sl_log_1 and sl_log_2.'; 330s COMMENT 330s create or replace function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 330s returns bool as $$ 330s BEGIN 330s return exists ( 330s select 1 from "information_schema".columns 330s where table_schema = p_namespace 330s and table_name = p_table 330s and column_name = p_field 330s ); 330s END;$$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 330s is 'Check if a table has a specific attribute'; 330s COMMENT 330s create or replace function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 330s returns bool as $$ 330s DECLARE 330s v_row record; 330s v_query text; 330s BEGIN 330s if not public.check_table_field_exists(p_namespace, p_table, p_field) then 330s raise notice 'Upgrade table %.% - add field %', p_namespace, p_table, p_field; 330s v_query := 'alter table ' || p_namespace || '.' || p_table || ' add column '; 330s v_query := v_query || p_field || ' ' || p_type || ';'; 330s execute v_query; 330s return 't'; 330s else 330s return 'f'; 330s end if; 330s END;$$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 330s is 'Add a column of a given type to a table if it is missing'; 330s COMMENT 330s create or replace function public.upgradeSchema(p_old text) 330s returns text as $$ 330s declare 330s v_tab_row record; 330s v_query text; 330s v_keepstatus text; 330s begin 330s -- If old version is pre-2.0, then we require a special upgrade process 330s if p_old like '1.%' then 330s raise exception 'Upgrading to Slony-I 2.x requires running slony_upgrade_20'; 330s end if; 330s 330s perform public.upgradeSchemaAddTruncateTriggers(); 330s 330s -- Change all Slony-I-defined columns that are "timestamp without time zone" to "timestamp *WITH* time zone" 330s if exists (select 1 from information_schema.columns c 330s where table_schema = '_main' and data_type = 'timestamp without time zone' 330s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 330s and (c.table_name, c.column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp'))) 330s then 330s 330s -- Preserve sl_status 330s select pg_get_viewdef('public.sl_status') into v_keepstatus; 330s execute 'drop view sl_status'; 330s for v_tab_row in select table_schema, table_name, column_name from information_schema.columns c 330s where table_schema = '_main' and data_type = 'timestamp without time zone' 330s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 330s and (table_name, column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp')) 330s loop 330s raise notice 'Changing Slony-I column [%.%] to timestamp WITH time zone', v_tab_row.table_name, v_tab_row.column_name; 330s v_query := 'alter table ' || public.slon_quote_brute(v_tab_row.table_schema) || 330s '.' || v_tab_row.table_name || ' alter column ' || v_tab_row.column_name || 330s ' type timestamp with time zone;'; 330s execute v_query; 330s end loop; 330s -- restore sl_status 330s execute 'create view sl_status as ' || v_keepstatus; 330s end if; 330s 330s if not exists (select 1 from information_schema.tables where table_schema = '_main' and table_name = 'sl_components') then 330s v_query := ' 330s create table public.sl_components ( 330s co_actor text not null primary key, 330s co_pid integer not null, 330s co_node integer not null, 330s co_connection_pid integer not null, 330s co_activity text, 330s co_starttime timestamptz not null, 330s co_event bigint, 330s co_eventtype text 330s ) without oids; 330s '; 330s execute v_query; 330s end if; 330s 330s 330s 330s 330s 330s if not exists (select 1 from information_schema.tables t where table_schema = '_main' and table_name = 'sl_event_lock') then 330s v_query := 'create table public.sl_event_lock (dummy integer);'; 330s execute v_query; 330s end if; 330s 330s if not exists (select 1 from information_schema.tables t 330s where table_schema = '_main' 330s and table_name = 'sl_apply_stats') then 330s v_query := ' 330s create table public.sl_apply_stats ( 330s as_origin int4, 330s as_num_insert int8, 330s as_num_update int8, 330s as_num_delete int8, 330s as_num_truncate int8, 330s as_num_script int8, 330s as_num_total int8, 330s as_duration interval, 330s as_apply_first timestamptz, 330s as_apply_last timestamptz, 330s as_cache_prepare int8, 330s as_cache_hit int8, 330s as_cache_evict int8, 330s as_cache_prepare_max int8 330s ) WITHOUT OIDS;'; 330s execute v_query; 330s end if; 330s 330s -- 330s -- On the upgrade to 2.2, we change the layout of sl_log_N by 330s -- adding columns log_tablenspname, log_tablerelname, and 330s -- log_cmdupdncols as well as changing log_cmddata into 330s -- log_cmdargs, which is a text array. 330s -- 330s if not public.check_table_field_exists('_main', 'sl_log_1', 'log_cmdargs') then 330s -- 330s -- Check that the cluster is completely caught up 330s -- 330s if public.check_unconfirmed_log() then 330s raise EXCEPTION 'cannot upgrade to new sl_log_N format due to existing unreplicated data'; 330s end if; 330s 330s -- 330s -- Drop tables sl_log_1 and sl_log_2 330s -- 330s drop table public.sl_log_1; 330s drop table public.sl_log_2; 330s 330s -- 330s -- Create the new sl_log_1 330s -- 330s create table public.sl_log_1 ( 330s log_origin int4, 330s log_txid bigint, 330s log_tableid int4, 330s log_actionseq int8, 330s log_tablenspname text, 330s log_tablerelname text, 330s log_cmdtype "char", 330s log_cmdupdncols int4, 330s log_cmdargs text[] 330s ) without oids; 330s create index sl_log_1_idx1 on public.sl_log_1 330s (log_origin, log_txid, log_actionseq); 330s 330s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 330s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 330s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 330s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 330s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 330s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 330s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 330s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 330s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 330s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 330s 330s -- 330s -- Create the new sl_log_2 330s -- 330s create table public.sl_log_2 ( 330s log_origin int4, 330s log_txid bigint, 330s log_tableid int4, 330s log_actionseq int8, 330s log_tablenspname text, 330s log_tablerelname text, 330s log_cmdtype "char", 330s log_cmdupdncols int4, 330s log_cmdargs text[] 330s ) without oids; 330s create index sl_log_2_idx1 on public.sl_log_2 330s (log_origin, log_txid, log_actionseq); 330s 330s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 330s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 330s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 330s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 330s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 330s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 330s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 330s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 330s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 330s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 330s 330s create table public.sl_log_script ( 330s log_origin int4, 330s log_txid bigint, 330s log_actionseq int8, 330s log_cmdtype "char", 330s log_cmdargs text[] 330s ) WITHOUT OIDS; 330s create index sl_log_script_idx1 on public.sl_log_script 330s (log_origin, log_txid, log_actionseq); 330s 330s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 330s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 330s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 330s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 330s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 330s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 330s 330s -- 330s -- Put the log apply triggers back onto sl_log_1/2 330s -- 330s create trigger apply_trigger 330s before INSERT on public.sl_log_1 330s for each row execute procedure public.logApply('_main'); 330s alter table public.sl_log_1 330s enable replica trigger apply_trigger; 330s create trigger apply_trigger 330s before INSERT on public.sl_log_2 330s for each row execute procedure public.logApply('_main'); 330s alter table public.sl_log_2 330s enable replica trigger apply_trigger; 330s end if; 330s if not exists (select 1 from information_schema.routines where routine_schema = '_main' and routine_name = 'string_agg') then 330s CREATE AGGREGATE public.string_agg(text) ( 330s SFUNC=public.agg_text_sum, 330s STYPE=text, 330s INITCOND='' 330s ); 330s end if; 330s if not exists (select 1 from information_schema.views where table_schema='_main' and table_name='sl_failover_targets') then 330s create view public.sl_failover_targets as 330s select set_id, 330s set_origin as set_origin, 330s sub1.sub_receiver as backup_id 330s 330s FROM 330s public.sl_subscribe sub1 330s ,public.sl_set set1 330s where 330s sub1.sub_set=set_id 330s and sub1.sub_forward=true 330s --exclude candidates where the set_origin 330s --has a path a node but the failover 330s --candidate has no path to that node 330s and sub1.sub_receiver not in 330s (select p1.pa_client from 330s public.sl_path p1 330s left outer join public.sl_path p2 on 330s (p2.pa_client=p1.pa_client 330s and p2.pa_server=sub1.sub_receiver) 330s where p2.pa_client is null 330s and p1.pa_server=set_origin 330s and p1.pa_client<>sub1.sub_receiver 330s ) 330s and sub1.sub_provider=set_origin 330s --exclude any subscribers that are not 330s --direct subscribers of all sets on the 330s --origin 330s and sub1.sub_receiver not in 330s (select direct_recv.sub_receiver 330s from 330s 330s (--all direct receivers of the first set 330s select subs2.sub_receiver 330s from public.sl_subscribe subs2 330s where subs2.sub_provider=set1.set_origin 330s and subs2.sub_set=set1.set_id) as 330s direct_recv 330s inner join 330s (--all other sets from the origin 330s select set_id from public.sl_set set2 330s where set2.set_origin=set1.set_origin 330s and set2.set_id<>sub1.sub_set) 330s as othersets on(true) 330s left outer join public.sl_subscribe subs3 330s on(subs3.sub_set=othersets.set_id 330s and subs3.sub_forward=true 330s and subs3.sub_provider=set1.set_origin 330s and direct_recv.sub_receiver=subs3.sub_receiver) 330s where subs3.sub_receiver is null 330s ); 330s end if; 330s 330s if not public.check_table_field_exists('_main', 'sl_node', 'no_failed') then 330s alter table public.sl_node add column no_failed bool; 330s update public.sl_node set no_failed=false; 330s end if; 330s return p_old; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s create or replace function public.check_unconfirmed_log () 330s returns bool as $$ 330s declare 330s v_rc bool = false; 330s v_error bool = false; 330s v_origin integer; 330s v_allconf bigint; 330s v_allsnap txid_snapshot; 330s v_count bigint; 330s begin 330s -- 330s -- Loop over all nodes that are the origin of at least one set 330s -- 330s for v_origin in select distinct set_origin as no_id 330s from public.sl_set loop 330s -- 330s -- Per origin determine which is the highest event seqno 330s -- that is confirmed by all subscribers to any of the 330s -- origins sets. 330s -- 330s select into v_allconf min(max_seqno) from ( 330s select con_received, max(con_seqno) as max_seqno 330s from public.sl_confirm 330s where con_origin = v_origin 330s and con_received in ( 330s select distinct sub_receiver 330s from public.sl_set as SET, 330s public.sl_subscribe as SUB 330s where SET.set_id = SUB.sub_set 330s and SET.set_origin = v_origin 330s ) 330s group by con_received 330s ) as maxconfirmed; 330s if not found then 330s raise NOTICE 'check_unconfirmed_log(): cannot determine highest ev_seqno for node % confirmed by all subscribers', v_origin; 330s v_error = true; 330s continue; 330s end if; 330s 330s -- 330s -- Get the txid snapshot that corresponds with that event 330s -- 330s select into v_allsnap ev_snapshot 330s from public.sl_event 330s where ev_origin = v_origin 330s and ev_seqno = v_allconf; 330s if not found then 330s raise NOTICE 'check_unconfirmed_log(): cannot find event %,% in sl_event', v_origin, v_allconf; 330s v_error = true; 330s continue; 330s end if; 330s 330s -- 330s -- Count the number of log rows that appeard after that event. 330s -- 330s select into v_count count(*) from ( 330s select 1 from public.sl_log_1 330s where log_origin = v_origin 330s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 330s union all 330s select 1 from public.sl_log_1 330s where log_origin = v_origin 330s and log_txid in ( 330s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 330s ) 330s union all 330s select 1 from public.sl_log_2 330s where log_origin = v_origin 330s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 330s union all 330s select 1 from public.sl_log_2 330s where log_origin = v_origin 330s and log_txid in ( 330s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 330s ) 330s ) as cnt; 330s 330s if v_count > 0 then 330s raise NOTICE 'check_unconfirmed_log(): origin % has % log rows that have not propagated to all subscribers yet', v_origin, v_count; 330s v_rc = true; 330s end if; 330s end loop; 330s 330s if v_error then 330s raise EXCEPTION 'check_unconfirmed_log(): aborting due to previous inconsistency'; 330s end if; 330s 330s return v_rc; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s set search_path to public 330s ; 330s SET 330s comment on function public.upgradeSchema(p_old text) is 330s 'Called during "update functions" by slonik to perform schema changes'; 330s COMMENT 330s create or replace view public.sl_status as select 330s E.ev_origin as st_origin, 330s C.con_received as st_received, 330s E.ev_seqno as st_last_event, 330s E.ev_timestamp as st_last_event_ts, 330s C.con_seqno as st_last_received, 330s C.con_timestamp as st_last_received_ts, 330s CE.ev_timestamp as st_last_received_event_ts, 330s E.ev_seqno - C.con_seqno as st_lag_num_events, 330s current_timestamp - CE.ev_timestamp as st_lag_time 330s from public.sl_event E, public.sl_confirm C, 330s public.sl_event CE 330s where E.ev_origin = C.con_origin 330s and CE.ev_origin = E.ev_origin 330s and CE.ev_seqno = C.con_seqno 330s and (E.ev_origin, E.ev_seqno) in 330s (select ev_origin, max(ev_seqno) 330s from public.sl_event 330s where ev_origin = public.getLocalNodeId('_main') 330s group by 1 330s ) 330s and (C.con_origin, C.con_received, C.con_seqno) in 330s (select con_origin, con_received, max(con_seqno) 330s from public.sl_confirm 330s where con_origin = public.getLocalNodeId('_main') 330s group by 1, 2 330s ); 330s CREATE VIEW 330s comment on view public.sl_status is 'View showing how far behind remote nodes are.'; 330s COMMENT 330s create or replace function public.copyFields(p_tab_id integer) 330s returns text 330s as $$ 330s declare 330s result text; 330s prefix text; 330s prec record; 330s begin 330s result := ''; 330s prefix := '('; -- Initially, prefix is the opening paren 330s 330s for prec in select public.slon_quote_input(a.attname) as column from public.sl_table t, pg_catalog.pg_attribute a where t.tab_id = p_tab_id and t.tab_reloid = a.attrelid and a.attnum > 0 and a.attisdropped = false order by attnum 330s loop 330s result := result || prefix || prec.column; 330s prefix := ','; -- Subsequently, prepend columns with commas 330s end loop; 330s result := result || ')'; 330s return result; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.copyFields(p_tab_id integer) is 330s 'Return a string consisting of what should be appended to a COPY statement 330s to specify fields for the passed-in tab_id. 330s 330s In PG versions > 7.3, this looks like (field1,field2,...fieldn)'; 330s COMMENT 330s create or replace function public.prepareTableForCopy(p_tab_id int4) 330s returns int4 330s as $$ 330s declare 330s v_tab_oid oid; 330s v_tab_fqname text; 330s begin 330s -- ---- 330s -- Get the OID and fully qualified name for the table 330s -- --- 330s select PGC.oid, 330s public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) as tab_fqname 330s into v_tab_oid, v_tab_fqname 330s from public.sl_table T, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where T.tab_id = p_tab_id 330s and T.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid; 330s if not found then 330s raise exception 'Table with ID % not found in sl_table', p_tab_id; 330s end if; 330s 330s -- ---- 330s -- Try using truncate to empty the table and fallback to 330s -- delete on error. 330s -- ---- 330s perform public.TruncateOnlyTable(v_tab_fqname); 330s raise notice 'truncate of % succeeded', v_tab_fqname; 330s 330s -- suppress index activity 330s perform public.disable_indexes_on_table(v_tab_oid); 330s 330s return 1; 330s exception when others then 330s raise notice 'truncate of % failed - doing delete', v_tab_fqname; 330s perform public.disable_indexes_on_table(v_tab_oid); 330s execute 'delete from only ' || public.slon_quote_input(v_tab_fqname); 330s return 0; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.prepareTableForCopy(p_tab_id int4) is 330s 'Delete all data and suppress index maintenance'; 330s COMMENT 330s create or replace function public.finishTableAfterCopy(p_tab_id int4) 330s returns int4 330s as $$ 330s declare 330s v_tab_oid oid; 330s v_tab_fqname text; 330s begin 330s -- ---- 330s -- Get the tables OID and fully qualified name 330s -- --- 330s select PGC.oid, 330s public.slon_quote_brute(PGN.nspname) || '.' || 330s public.slon_quote_brute(PGC.relname) as tab_fqname 330s into v_tab_oid, v_tab_fqname 330s from public.sl_table T, 330s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 330s where T.tab_id = p_tab_id 330s and T.tab_reloid = PGC.oid 330s and PGC.relnamespace = PGN.oid; 330s if not found then 330s raise exception 'Table with ID % not found in sl_table', p_tab_id; 330s end if; 330s 330s -- ---- 330s -- Reenable indexes and reindex the table. 330s -- ---- 330s perform public.enable_indexes_on_table(v_tab_oid); 330s execute 'reindex table ' || public.slon_quote_input(v_tab_fqname); 330s 330s return 1; 330s end; 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.finishTableAfterCopy(p_tab_id int4) is 330s 'Reenable index maintenance and reindex the table'; 330s COMMENT 330s create or replace function public.setup_vactables_type () returns integer as $$ 330s begin 330s if not exists (select 1 from pg_catalog.pg_type t, pg_catalog.pg_namespace n 330s where n.nspname = '_main' and t.typnamespace = n.oid and 330s t.typname = 'vactables') then 330s execute 'create type public.vactables as (nspname name, relname name);'; 330s end if; 330s return 1; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.setup_vactables_type () is 330s 'Function to be run as part of loading slony1_funcs.sql that creates the vactables type if it is missing'; 330s COMMENT 330s select public.setup_vactables_type(); 330s setup_vactables_type 330s ---------------------- 330s 1 330s (1 row) 330s 330s drop function public.setup_vactables_type (); 330s DROP FUNCTION 330s create or replace function public.TablesToVacuum () returns setof public.vactables as $$ 330s declare 330s prec public.vactables%rowtype; 330s begin 330s prec.nspname := '_main'; 330s prec.relname := 'sl_event'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := '_main'; 330s prec.relname := 'sl_confirm'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := '_main'; 330s prec.relname := 'sl_setsync'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := '_main'; 330s prec.relname := 'sl_seqlog'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := '_main'; 330s prec.relname := 'sl_archive_counter'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := '_main'; 330s prec.relname := 'sl_components'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := '_main'; 330s prec.relname := 'sl_log_script'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := 'pg_catalog'; 330s prec.relname := 'pg_listener'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s prec.nspname := 'pg_catalog'; 330s prec.relname := 'pg_statistic'; 330s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 330s return next prec; 330s end if; 330s 330s return; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.TablesToVacuum () is 330s 'Return a list of tables that require frequent vacuuming. The 330s function is used so that the list is not hardcoded into C code.'; 330s COMMENT 330s create or replace function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 330s declare 330s 330s prec record; 330s v_origin int4; 330s v_isorigin boolean; 330s v_fqname text; 330s v_query text; 330s v_rows integer; 330s v_idxname text; 330s 330s begin 330s -- Need to validate that the set exists; the set will tell us if this is the origin 330s select set_origin into v_origin from public.sl_set where set_id = p_set_id; 330s if not found then 330s raise exception 'add_empty_table_to_replication: set % not found!', p_set_id; 330s end if; 330s 330s -- Need to be aware of whether or not this node is origin for the set 330s v_isorigin := ( v_origin = public.getLocalNodeId('_main') ); 330s 330s v_fqname := '"' || p_nspname || '"."' || p_tabname || '"'; 330s -- Take out a lock on the table 330s v_query := 'lock ' || v_fqname || ';'; 330s execute v_query; 330s 330s if v_isorigin then 330s -- On the origin, verify that the table is empty, failing if it has any tuples 330s v_query := 'select 1 as tuple from ' || v_fqname || ' limit 1;'; 330s execute v_query into prec; 330s GET DIAGNOSTICS v_rows = ROW_COUNT; 330s if v_rows = 0 then 330s raise notice 'add_empty_table_to_replication: table % empty on origin - OK', v_fqname; 330s else 330s raise exception 'add_empty_table_to_replication: table % contained tuples on origin node %', v_fqname, v_origin; 330s end if; 330s else 330s -- On other nodes, TRUNCATE the table 330s v_query := 'truncate ' || v_fqname || ';'; 330s execute v_query; 330s end if; 330s -- If p_idxname is NULL, then look up the PK index, and RAISE EXCEPTION if one does not exist 330s if p_idxname is NULL then 330s select c2.relname into prec from pg_catalog.pg_index i, pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_namespace n where i.indrelid = c1.oid and i.indexrelid = c2.oid and c1.relname = p_tabname and i.indisprimary and n.nspname = p_nspname and n.oid = c1.relnamespace; 330s if not found then 330s raise exception 'add_empty_table_to_replication: table % has no primary key and no candidate specified!', v_fqname; 330s else 330s v_idxname := prec.relname; 330s end if; 330s else 330s v_idxname := p_idxname; 330s end if; 330s return public.setAddTable_int(p_set_id, p_tab_id, v_fqname, v_idxname, p_comment); 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 330s 'Verify that a table is empty, and add it to replication. 330s tab_idxname is optional - if NULL, then we use the primary key. 330s 330s Note that this function is to be run within an EXECUTE SCRIPT script, 330s so it runs at the right place in the transaction stream on all 330s nodes.'; 330s COMMENT 330s create or replace function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 330s declare 330s prec record; 330s prec2 record; 330s v_set_id int4; 330s 330s begin 330s -- Look up the parent table; fail if it does not exist 330s select c1.oid into prec from pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_inherits i, pg_catalog.pg_namespace n where c1.oid = i.inhparent and c2.oid = i.inhrelid and n.oid = c2.relnamespace and n.nspname = p_nspname and c2.relname = p_tabname; 330s if not found then 330s raise exception 'replicate_partition: No parent table found for %.%!', p_nspname, p_tabname; 330s end if; 330s 330s -- The parent table tells us what replication set to use 330s select tab_set into prec2 from public.sl_table where tab_reloid = prec.oid; 330s if not found then 330s raise exception 'replicate_partition: Parent table % for new partition %.% is not replicated!', prec.oid, p_nspname, p_tabname; 330s end if; 330s 330s v_set_id := prec2.tab_set; 330s 330s -- Now, we have all the parameters necessary to run add_empty_table_to_replication... 330s return public.add_empty_table_to_replication(v_set_id, p_tab_id, p_nspname, p_tabname, p_idxname, p_comment); 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 330s 'Add a partition table to replication. 330s tab_idxname is optional - if NULL, then we use the primary key. 330s This function looks up replication configuration via the parent table. 330s 330s Note that this function is to be run within an EXECUTE SCRIPT script, 330s so it runs at the right place in the transaction stream on all 330s nodes.'; 330s COMMENT 330s create or replace function public.disable_indexes_on_table (i_oid oid) 330s returns integer as $$ 330s begin 330s -- Setting pg_class.relhasindex to false will cause copy not to 330s -- maintain any indexes. At the end of the copy we will reenable 330s -- them and reindex the table. This bulk creating of indexes is 330s -- faster. 330s 330s update pg_catalog.pg_class set relhasindex ='f' where oid = i_oid; 330s return 1; 330s end $$ 330s language plpgsql; 330s CREATE FUNCTION 330s comment on function public.disable_indexes_on_table(i_oid oid) is 330s 'disable indexes on the specified table. 330s Used during subscription process to suppress indexes, which allows 330s COPY to go much faster. 330s 330s This may be set as a SECURITY DEFINER in order to eliminate the need 330s for superuser access by Slony-I. 330s '; 330s COMMENT 330s create or replace function public.enable_indexes_on_table (i_oid oid) 330s returns integer as $$ 330s begin 330s update pg_catalog.pg_class set relhasindex ='t' where oid = i_oid; 330s return 1; 330s end $$ 330s language plpgsql 330s security definer; 330s CREATE FUNCTION 330s comment on function public.enable_indexes_on_table(i_oid oid) is 330s 're-enable indexes on the specified table. 330s 330s This may be set as a SECURITY DEFINER in order to eliminate the need 330s for superuser access by Slony-I. 330s '; 330s COMMENT 330s drop function if exists public.reshapeSubscription(int4,int4,int4); 330s DROP FUNCTION 330s create or replace function public.reshapeSubscription (p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) returns int4 as $$ 330s begin 330s update public.sl_subscribe 330s set sub_provider=p_sub_provider 330s from public.sl_set 330s WHERE sub_set=sl_set.set_id 330s and sl_set.set_origin=p_sub_origin and sub_receiver=p_sub_receiver; 330s if found then 330s perform public.RebuildListenEntries(); 330s notify "_main_Restart"; 330s end if; 330s return 0; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.reshapeSubscription(p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) is 330s 'Run on a receiver/subscriber node when the provider for that 330s subscription is being changed. Slonik will invoke this method 330s before the SUBSCRIBE_SET event propogates to the receiver 330s so listen paths can be updated.'; 330s COMMENT 330s create or replace function public.slon_node_health_check() returns boolean as $$ 330s declare 330s prec record; 330s all_ok boolean; 330s begin 330s all_ok := 't'::boolean; 330s -- validate that all tables in sl_table have: 330s -- sl_table agreeing with pg_class 330s for prec in select tab_id, tab_relname, tab_nspname from 330s public.sl_table t where not exists (select 1 from pg_catalog.pg_class c, pg_catalog.pg_namespace n 330s where c.oid = t.tab_reloid and c.relname = t.tab_relname and c.relnamespace = n.oid and n.nspname = t.tab_nspname) loop 330s all_ok := 'f'::boolean; 330s raise warning 'table [id,nsp,name]=[%,%,%] - sl_table does not match pg_class/pg_namespace', prec.tab_id, prec.tab_relname, prec.tab_nspname; 330s end loop; 330s if not all_ok then 330s raise warning 'Mismatch found between sl_table and pg_class. Slonik command REPAIR CONFIG may be useful to rectify this.'; 330s end if; 330s return all_ok; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.slon_node_health_check() is 'called when slon starts up to validate that there are not problems with node configuration. Returns t if all is OK, f if there is a problem.'; 330s COMMENT 330s create or replace function public.log_truncate () returns trigger as 330s $$ 330s declare 330s r_role text; 330s c_nspname text; 330s c_relname text; 330s c_log integer; 330s c_node integer; 330s c_tabid integer; 330s begin 330s -- Ignore this call if session_replication_role = 'local' 330s select into r_role setting 330s from pg_catalog.pg_settings where name = 'session_replication_role'; 330s if r_role = 'local' then 330s return NULL; 330s end if; 330s 330s c_tabid := tg_argv[0]; 330s c_node := public.getLocalNodeId('_main'); 330s select tab_nspname, tab_relname into c_nspname, c_relname 330s from public.sl_table where tab_id = c_tabid; 330s select last_value into c_log from public.sl_log_status; 330s if c_log in (0, 2) then 330s insert into public.sl_log_1 ( 330s log_origin, log_txid, log_tableid, 330s log_actionseq, log_tablenspname, 330s log_tablerelname, log_cmdtype, 330s log_cmdupdncols, log_cmdargs 330s ) values ( 330s c_node, pg_catalog.txid_current(), c_tabid, 330s nextval('public.sl_action_seq'), c_nspname, 330s c_relname, 'T', 0, '{}'::text[]); 330s else -- (1, 3) 330s insert into public.sl_log_2 ( 330s log_origin, log_txid, log_tableid, 330s log_actionseq, log_tablenspname, 330s log_tablerelname, log_cmdtype, 330s log_cmdupdncols, log_cmdargs 330s ) values ( 330s c_node, pg_catalog.txid_current(), c_tabid, 330s nextval('public.sl_action_seq'), c_nspname, 330s c_relname, 'T', 0, '{}'::text[]); 330s end if; 330s return NULL; 330s end 330s $$ language plpgsql 330s security definer; 330s CREATE FUNCTION 330s comment on function public.log_truncate () 330s is 'trigger function run when a replicated table receives a TRUNCATE request'; 330s COMMENT 330s create or replace function public.deny_truncate () returns trigger as 330s $$ 330s declare 330s r_role text; 330s begin 330s -- Ignore this call if session_replication_role = 'local' 330s select into r_role setting 330s from pg_catalog.pg_settings where name = 'session_replication_role'; 330s if r_role = 'local' then 330s return NULL; 330s end if; 330s 330s raise exception 'truncation of replicated table forbidden on subscriber node'; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.deny_truncate () 330s is 'trigger function run when a replicated table receives a TRUNCATE request'; 330s COMMENT 330s create or replace function public.store_application_name (i_name text) returns text as $$ 330s declare 330s p_command text; 330s begin 330s if exists (select 1 from pg_catalog.pg_settings where name = 'application_name') then 330s p_command := 'set application_name to '''|| i_name || ''';'; 330s execute p_command; 330s return i_name; 330s end if; 330s return NULL::text; 330s end $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.store_application_name (i_name text) is 330s 'Set application_name GUC, if possible. Returns NULL if it fails to work.'; 330s COMMENT 330s create or replace function public.is_node_reachable(origin_node_id integer, 330s receiver_node_id integer) returns boolean as $$ 330s declare 330s listen_row record; 330s reachable boolean; 330s begin 330s reachable:=false; 330s select * into listen_row from public.sl_listen where 330s li_origin=origin_node_id and li_receiver=receiver_node_id; 330s if found then 330s reachable:=true; 330s end if; 330s return reachable; 330s end $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.is_node_reachable(origin_node_id integer, receiver_node_id integer) 330s is 'Is the receiver node reachable from the origin, via any of the listen paths?'; 330s COMMENT 330s create or replace function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) returns integer as $$ 330s begin 330s -- Trim out old state for this component 330s if not exists (select 1 from public.sl_components where co_actor = i_actor) then 330s insert into public.sl_components 330s (co_actor, co_pid, co_node, co_connection_pid, co_activity, co_starttime, co_event, co_eventtype) 330s values 330s (i_actor, i_pid, i_node, i_conn_pid, i_activity, i_starttime, i_event, i_eventtype); 330s else 330s update public.sl_components 330s set 330s co_connection_pid = i_conn_pid, co_activity = i_activity, co_starttime = i_starttime, co_event = i_event, 330s co_eventtype = i_eventtype 330s where co_actor = i_actor 330s and co_starttime < i_starttime; 330s end if; 330s return 1; 330s end $$ 330s language plpgsql; 330s CREATE FUNCTION 330s comment on function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) is 330s 'Store state of a Slony component. Useful for monitoring'; 330s COMMENT 330s create or replace function public.recreate_log_trigger(p_fq_table_name text, 330s p_tab_id oid, p_tab_attkind text) returns integer as $$ 330s begin 330s execute 'drop trigger "_main_logtrigger" on ' || 330s p_fq_table_name ; 330s -- ---- 330s execute 'create trigger "_main_logtrigger"' || 330s ' after insert or update or delete on ' || 330s p_fq_table_name 330s || ' for each row execute procedure public.logTrigger (' || 330s pg_catalog.quote_literal('_main') || ',' || 330s pg_catalog.quote_literal(p_tab_id::text) || ',' || 330s pg_catalog.quote_literal(p_tab_attkind) || ');'; 330s return 0; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s comment on function public.recreate_log_trigger(p_fq_table_name text, 330s p_tab_id oid, p_tab_attkind text) is 330s 'A function that drops and recreates the log trigger on the specified table. 330s It is intended to be used after the primary_key/unique index has changed.'; 330s COMMENT 330s create or replace function public.repair_log_triggers(only_locked boolean) 330s returns integer as $$ 330s declare 330s retval integer; 330s table_row record; 330s begin 330s retval=0; 330s for table_row in 330s select tab_nspname,tab_relname, 330s tab_idxname, tab_id, mode, 330s public.determineAttKindUnique(tab_nspname|| 330s '.'||tab_relname,tab_idxname) as attkind 330s from 330s public.sl_table 330s left join 330s pg_locks on (relation=tab_reloid and pid=pg_backend_pid() 330s and mode='AccessExclusiveLock') 330s ,pg_trigger 330s where tab_reloid=tgrelid and 330s public.determineAttKindUnique(tab_nspname||'.' 330s ||tab_relname,tab_idxname) 330s !=(public.decode_tgargs(tgargs))[2] 330s and tgname = '_main' 330s || '_logtrigger' 330s LOOP 330s if (only_locked=false) or table_row.mode='AccessExclusiveLock' then 330s perform public.recreate_log_trigger 330s (table_row.tab_nspname||'.'||table_row.tab_relname, 330s table_row.tab_id,table_row.attkind); 330s retval=retval+1; 330s else 330s raise notice '%.% has an invalid configuration on the log trigger. This was not corrected because only_lock is true and the table is not locked.', 330s table_row.tab_nspname,table_row.tab_relname; 330s 330s end if; 330s end loop; 330s return retval; 330s end 330s $$ 330s language plpgsql; 330s CREATE FUNCTION 330s comment on function public.repair_log_triggers(only_locked boolean) 330s is ' 330s repair the log triggers as required. If only_locked is true then only 330s tables that are already exclusively locked by the current transaction are 330s repaired. Otherwise all replicated tables with outdated trigger arguments 330s are recreated.'; 330s COMMENT 330s create or replace function public.unsubscribe_abandoned_sets(p_failed_node int4) returns bigint 330s as $$ 330s declare 330s v_row record; 330s v_seq_id bigint; 330s v_local_node int4; 330s begin 330s 330s select public.getLocalNodeId('_main') into 330s v_local_node; 330s 330s if found then 330s --abandon all subscriptions from this origin. 330s for v_row in select sub_set,sub_receiver from 330s public.sl_subscribe, public.sl_set 330s where sub_set=set_id and set_origin=p_failed_node 330s and sub_receiver=v_local_node 330s loop 330s raise notice 'Slony-I: failover_abandon_set() is abandoning subscription to set % on node % because it is too far ahead', v_row.sub_set, 330s v_local_node; 330s --If this node is a provider for the set 330s --then the receiver needs to be unsubscribed. 330s -- 330s select public.unsubscribeSet(v_row.sub_set, 330s v_local_node,true) 330s into v_seq_id; 330s end loop; 330s end if; 330s 330s return v_seq_id; 330s end 330s $$ language plpgsql; 330s CREATE FUNCTION 330s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 330s $BODY$ 330s DECLARE 330s c_delim text; 330s BEGIN 330s c_delim = ','; 330s IF (txt_before IS NULL or txt_before='') THEN 330s RETURN txt_new; 330s END IF; 330s RETURN txt_before || c_delim || txt_new; 330s END; 330s $BODY$ 330s LANGUAGE plpgsql; 330s CREATE FUNCTION 330s comment on function public.agg_text_sum(text,text) is 330s 'An accumulator function used by the slony string_agg function to 330s aggregate rows into a string'; 330s COMMENT 330s Dropping cluster 17/regress ... 330s NOTICE: function public.reshapesubscription(int4,int4,int4) does not exist, skipping 330s ### End 17 psql ### 330s autopkgtest [16:53:26]: test load-functions: -----------------------] 331s load-functions PASS 331s autopkgtest [16:53:27]: test load-functions: - - - - - - - - - - results - - - - - - - - - - 331s autopkgtest [16:53:27]: @@@@@@@@@@@@@@@@@@@@ summary 331s load-functions PASS 349s nova [W] Using flock in prodstack6-arm64 349s Creating nova instance adt-plucky-arm64-slony1-2-20250315-164756-juju-7f2275-prod-proposed-migration-environment-2-529207f4-500e-4f0f-b232-17e8f473a5ca from image adt/ubuntu-plucky-arm64-server-20250315.img (UUID bd6e766c-b51f-4b53-86d6-23aa4d18f524)... 349s nova [W] Timed out waiting for f09476af-7757-4c74-9d8b-e7bfa2ac2d67 to get deleted.