0s autopkgtest [06:55:27]: starting date and time: 2025-02-22 06:55:27+0000 0s autopkgtest [06:55:27]: git checkout: 325255d2 Merge branch 'pin-any-arch' into 'ubuntu/production' 0s autopkgtest [06:55:27]: host juju-7f2275-prod-proposed-migration-environment-20; command line: /home/ubuntu/autopkgtest/runner/autopkgtest --output-dir /tmp/autopkgtest-work.gms3bac6/out --timeout-copy=6000 --setup-commands /home/ubuntu/autopkgtest-cloud/worker-config-production/setup-canonical.sh --apt-pocket=proposed=src:postgresql-17 --apt-upgrade slony1-2 --timeout-short=300 --timeout-copy=20000 --timeout-build=20000 --env=ADT_TEST_TRIGGERS=postgresql-17/17.4-1 -- ssh -s /home/ubuntu/autopkgtest/ssh-setup/nova -- --flavor autopkgtest-s390x --security-groups autopkgtest-juju-7f2275-prod-proposed-migration-environment-20@bos03-s390x-1.secgroup --name adt-plucky-s390x-slony1-2-20250222-065527-juju-7f2275-prod-proposed-migration-environment-20-7094059e-3c68-4721-83c7-b5dbde5d1c5e --image adt/ubuntu-plucky-s390x-server --keyname testbed-juju-7f2275-prod-proposed-migration-environment-20 --net-id=net_prod-proposed-migration-s390x -e TERM=linux -e ''"'"'http_proxy=http://squid.internal:3128'"'"'' -e ''"'"'https_proxy=http://squid.internal:3128'"'"'' -e ''"'"'no_proxy=127.0.0.1,127.0.1.1,login.ubuntu.com,localhost,localdomain,novalocal,internal,archive.ubuntu.com,ports.ubuntu.com,security.ubuntu.com,ddebs.ubuntu.com,changelogs.ubuntu.com,keyserver.ubuntu.com,launchpadlibrarian.net,launchpadcontent.net,launchpad.net,10.24.0.0/24,keystone.ps5.canonical.com,objectstorage.prodstack5.canonical.com,radosgw.ps5.canonical.com'"'"'' --mirror=http://ftpmaster.internal/ubuntu/ 114s autopkgtest [06:57:21]: testbed dpkg architecture: s390x 115s autopkgtest [06:57:22]: testbed apt version: 2.9.30ubuntu1 115s autopkgtest [06:57:22]: @@@@@@@@@@@@@@@@@@@@ test bed setup 115s autopkgtest [06:57:22]: testbed release detected to be: None 116s autopkgtest [06:57:23]: updating testbed package index (apt update) 116s Get:1 http://ftpmaster.internal/ubuntu plucky-proposed InRelease [110 kB] 116s Hit:2 http://ftpmaster.internal/ubuntu plucky InRelease 116s Hit:3 http://ftpmaster.internal/ubuntu plucky-updates InRelease 116s Hit:4 http://ftpmaster.internal/ubuntu plucky-security InRelease 117s Get:5 http://ftpmaster.internal/ubuntu plucky-proposed/main Sources [80.9 kB] 117s Get:6 http://ftpmaster.internal/ubuntu plucky-proposed/universe Sources [504 kB] 117s Get:7 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse Sources [13.5 kB] 117s Get:8 http://ftpmaster.internal/ubuntu plucky-proposed/restricted Sources [3120 B] 117s Get:9 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x Packages [125 kB] 117s Get:10 http://ftpmaster.internal/ubuntu plucky-proposed/restricted s390x Packages [760 B] 117s Get:11 http://ftpmaster.internal/ubuntu plucky-proposed/universe s390x Packages [424 kB] 117s Get:12 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse s390x Packages [2816 B] 117s Fetched 1264 kB in 1s (1510 kB/s) 117s Reading package lists... 118s Reading package lists... 118s Building dependency tree... 118s Reading state information... 118s Calculating upgrade... 118s Calculating upgrade... 119s The following packages were automatically installed and are no longer required: 119s libnsl2 libpython3.12-minimal libpython3.12-stdlib libpython3.12t64 119s linux-headers-6.11.0-8 linux-headers-6.11.0-8-generic 119s linux-modules-6.11.0-8-generic linux-tools-6.11.0-8 119s linux-tools-6.11.0-8-generic 119s Use 'sudo apt autoremove' to remove them. 119s The following packages will be upgraded: 119s curl libcurl3t64-gnutls libcurl4t64 119s 3 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 119s Need to get 1041 kB of archives. 119s After this operation, 7168 B disk space will be freed. 119s Get:1 http://ftpmaster.internal/ubuntu plucky/main s390x curl s390x 8.12.1-2ubuntu1 [251 kB] 119s Get:2 http://ftpmaster.internal/ubuntu plucky/main s390x libcurl4t64 s390x 8.12.1-2ubuntu1 [397 kB] 119s Get:3 http://ftpmaster.internal/ubuntu plucky/main s390x libcurl3t64-gnutls s390x 8.12.1-2ubuntu1 [393 kB] 119s Fetched 1041 kB in 1s (1893 kB/s) 120s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81030 files and directories currently installed.) 120s Preparing to unpack .../curl_8.12.1-2ubuntu1_s390x.deb ... 120s Unpacking curl (8.12.1-2ubuntu1) over (8.12.0+git20250209.89ed161+ds-1ubuntu1) ... 120s Preparing to unpack .../libcurl4t64_8.12.1-2ubuntu1_s390x.deb ... 120s Unpacking libcurl4t64:s390x (8.12.1-2ubuntu1) over (8.12.0+git20250209.89ed161+ds-1ubuntu1) ... 120s Preparing to unpack .../libcurl3t64-gnutls_8.12.1-2ubuntu1_s390x.deb ... 120s Unpacking libcurl3t64-gnutls:s390x (8.12.1-2ubuntu1) over (8.12.0+git20250209.89ed161+ds-1ubuntu1) ... 120s Setting up libcurl4t64:s390x (8.12.1-2ubuntu1) ... 120s Setting up libcurl3t64-gnutls:s390x (8.12.1-2ubuntu1) ... 120s Setting up curl (8.12.1-2ubuntu1) ... 120s Processing triggers for man-db (2.13.0-1) ... 120s Processing triggers for libc-bin (2.40-4ubuntu1) ... 120s Reading package lists... 121s Building dependency tree... 121s Reading state information... 121s Solving dependencies... 121s The following packages will be REMOVED: 121s libnsl2* libpython3.12-minimal* libpython3.12-stdlib* libpython3.12t64* 121s linux-headers-6.11.0-8* linux-headers-6.11.0-8-generic* 121s linux-modules-6.11.0-8-generic* linux-tools-6.11.0-8* 121s linux-tools-6.11.0-8-generic* 121s 0 upgraded, 0 newly installed, 9 to remove and 0 not upgraded. 121s After this operation, 167 MB disk space will be freed. 121s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 81030 files and directories currently installed.) 121s Removing linux-tools-6.11.0-8-generic (6.11.0-8.8) ... 121s Removing linux-tools-6.11.0-8 (6.11.0-8.8) ... 121s Removing libpython3.12t64:s390x (3.12.9-1) ... 121s Removing libpython3.12-stdlib:s390x (3.12.9-1) ... 121s Removing libnsl2:s390x (1.3.0-3build3) ... 121s Removing libpython3.12-minimal:s390x (3.12.9-1) ... 121s Removing linux-headers-6.11.0-8-generic (6.11.0-8.8) ... 121s Removing linux-headers-6.11.0-8 (6.11.0-8.8) ... 122s Removing linux-modules-6.11.0-8-generic (6.11.0-8.8) ... 122s Processing triggers for libc-bin (2.40-4ubuntu1) ... 122s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 55930 files and directories currently installed.) 122s Purging configuration files for libpython3.12-minimal:s390x (3.12.9-1) ... 122s Purging configuration files for linux-modules-6.11.0-8-generic (6.11.0-8.8) ... 123s autopkgtest [06:57:30]: upgrading testbed (apt dist-upgrade and autopurge) 123s Reading package lists... 123s Building dependency tree... 123s Reading state information... 123s Calculating upgrade...Starting pkgProblemResolver with broken count: 0 123s Starting 2 pkgProblemResolver with broken count: 0 123s Done 123s Entering ResolveByKeep 124s 124s Calculating upgrade... 124s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 124s Reading package lists... 124s Building dependency tree... 124s Reading state information... 125s Starting pkgProblemResolver with broken count: 0 125s Starting 2 pkgProblemResolver with broken count: 0 125s Done 125s Solving dependencies... 125s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 125s autopkgtest [06:57:32]: rebooting testbed after setup commands that affected boot 134s autopkgtest-virt-ssh: WARNING: ssh connection failed. Retrying in 3 seconds... 144s autopkgtest [06:57:51]: testbed running kernel: Linux 6.12.0-15-generic #15-Ubuntu SMP Tue Feb 4 15:05:57 UTC 2025 147s autopkgtest [06:57:54]: @@@@@@@@@@@@@@@@@@@@ apt-source slony1-2 149s Get:1 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (dsc) [2462 B] 149s Get:2 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (tar) [1465 kB] 149s Get:3 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-6 (diff) [17.3 kB] 149s gpgv: Signature made Thu Sep 19 09:07:19 2024 UTC 149s gpgv: using RSA key 5C48FE6157F49179597087C64C5A6BAB12D2A7AE 149s gpgv: Can't check signature: No public key 149s dpkg-source: warning: cannot verify inline signature for ./slony1-2_2.2.11-6.dsc: no acceptable signature found 150s autopkgtest [06:57:57]: testing package slony1-2 version 2.2.11-6 150s autopkgtest [06:57:57]: build not needed 151s autopkgtest [06:57:58]: test load-functions: preparing testbed 151s Reading package lists... 151s Building dependency tree... 151s Reading state information... 151s Starting pkgProblemResolver with broken count: 0 151s Starting 2 pkgProblemResolver with broken count: 0 151s Done 151s The following NEW packages will be installed: 151s libio-pty-perl libipc-run-perl libjson-perl libllvm20 libpq5 libxslt1.1 151s postgresql-17 postgresql-17-slony1-2 postgresql-client-17 151s postgresql-client-common postgresql-common postgresql-common-dev 151s slony1-2-bin slony1-2-doc ssl-cert 152s 0 upgraded, 15 newly installed, 0 to remove and 0 not upgraded. 152s Need to get 50.3 MB of archives. 152s After this operation, 215 MB of additional disk space will be used. 152s Get:1 http://ftpmaster.internal/ubuntu plucky/main s390x libjson-perl all 4.10000-1 [81.9 kB] 152s Get:2 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-client-common all 273 [47.5 kB] 152s Get:3 http://ftpmaster.internal/ubuntu plucky/main s390x libio-pty-perl s390x 1:1.20-1build3 [31.6 kB] 152s Get:4 http://ftpmaster.internal/ubuntu plucky/main s390x libipc-run-perl all 20231003.0-2 [91.5 kB] 152s Get:5 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-common-dev all 273 [72.9 kB] 152s Get:6 http://ftpmaster.internal/ubuntu plucky/main s390x ssl-cert all 1.1.3ubuntu1 [18.7 kB] 152s Get:7 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-common all 273 [101 kB] 152s Get:8 http://ftpmaster.internal/ubuntu plucky/main s390x libllvm20 s390x 1:20.1.0~+rc2-1~exp2ubuntu0.4 [31.3 MB] 153s Get:9 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x libpq5 s390x 17.4-1 [147 kB] 153s Get:10 http://ftpmaster.internal/ubuntu plucky/main s390x libxslt1.1 s390x 1.1.39-0exp1ubuntu2 [169 kB] 153s Get:11 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x postgresql-client-17 s390x 17.4-1 [1367 kB] 153s Get:12 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x postgresql-17 s390x 17.4-1 [16.3 MB] 153s Get:13 http://ftpmaster.internal/ubuntu plucky/universe s390x postgresql-17-slony1-2 s390x 2.2.11-6 [21.4 kB] 153s Get:14 http://ftpmaster.internal/ubuntu plucky/universe s390x slony1-2-bin s390x 2.2.11-6 [228 kB] 153s Get:15 http://ftpmaster.internal/ubuntu plucky/universe s390x slony1-2-doc all 2.2.11-6 [327 kB] 154s Preconfiguring packages ... 154s Fetched 50.3 MB in 2s (27.0 MB/s) 154s Selecting previously unselected package libjson-perl. 154s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 55928 files and directories currently installed.) 154s Preparing to unpack .../00-libjson-perl_4.10000-1_all.deb ... 154s Unpacking libjson-perl (4.10000-1) ... 154s Selecting previously unselected package postgresql-client-common. 154s Preparing to unpack .../01-postgresql-client-common_273_all.deb ... 154s Unpacking postgresql-client-common (273) ... 154s Selecting previously unselected package libio-pty-perl. 154s Preparing to unpack .../02-libio-pty-perl_1%3a1.20-1build3_s390x.deb ... 154s Unpacking libio-pty-perl (1:1.20-1build3) ... 154s Selecting previously unselected package libipc-run-perl. 154s Preparing to unpack .../03-libipc-run-perl_20231003.0-2_all.deb ... 154s Unpacking libipc-run-perl (20231003.0-2) ... 154s Selecting previously unselected package postgresql-common-dev. 154s Preparing to unpack .../04-postgresql-common-dev_273_all.deb ... 154s Unpacking postgresql-common-dev (273) ... 154s Selecting previously unselected package ssl-cert. 154s Preparing to unpack .../05-ssl-cert_1.1.3ubuntu1_all.deb ... 154s Unpacking ssl-cert (1.1.3ubuntu1) ... 154s Selecting previously unselected package postgresql-common. 154s Preparing to unpack .../06-postgresql-common_273_all.deb ... 154s Adding 'diversion of /usr/bin/pg_config to /usr/bin/pg_config.libpq-dev by postgresql-common' 154s Unpacking postgresql-common (273) ... 154s Selecting previously unselected package libllvm20:s390x. 154s Preparing to unpack .../07-libllvm20_1%3a20.1.0~+rc2-1~exp2ubuntu0.4_s390x.deb ... 154s Unpacking libllvm20:s390x (1:20.1.0~+rc2-1~exp2ubuntu0.4) ... 155s Selecting previously unselected package libpq5:s390x. 155s Preparing to unpack .../08-libpq5_17.4-1_s390x.deb ... 155s Unpacking libpq5:s390x (17.4-1) ... 155s Selecting previously unselected package libxslt1.1:s390x. 155s Preparing to unpack .../09-libxslt1.1_1.1.39-0exp1ubuntu2_s390x.deb ... 155s Unpacking libxslt1.1:s390x (1.1.39-0exp1ubuntu2) ... 155s Selecting previously unselected package postgresql-client-17. 155s Preparing to unpack .../10-postgresql-client-17_17.4-1_s390x.deb ... 155s Unpacking postgresql-client-17 (17.4-1) ... 155s Selecting previously unselected package postgresql-17. 155s Preparing to unpack .../11-postgresql-17_17.4-1_s390x.deb ... 155s Unpacking postgresql-17 (17.4-1) ... 155s Selecting previously unselected package postgresql-17-slony1-2. 155s Preparing to unpack .../12-postgresql-17-slony1-2_2.2.11-6_s390x.deb ... 155s Unpacking postgresql-17-slony1-2 (2.2.11-6) ... 155s Selecting previously unselected package slony1-2-bin. 155s Preparing to unpack .../13-slony1-2-bin_2.2.11-6_s390x.deb ... 155s Unpacking slony1-2-bin (2.2.11-6) ... 155s Selecting previously unselected package slony1-2-doc. 155s Preparing to unpack .../14-slony1-2-doc_2.2.11-6_all.deb ... 155s Unpacking slony1-2-doc (2.2.11-6) ... 155s Setting up postgresql-client-common (273) ... 155s Setting up libio-pty-perl (1:1.20-1build3) ... 155s Setting up libpq5:s390x (17.4-1) ... 155s Setting up ssl-cert (1.1.3ubuntu1) ... 155s Created symlink '/etc/systemd/system/multi-user.target.wants/ssl-cert.service' → '/usr/lib/systemd/system/ssl-cert.service'. 156s Setting up libllvm20:s390x (1:20.1.0~+rc2-1~exp2ubuntu0.4) ... 156s Setting up libipc-run-perl (20231003.0-2) ... 156s Setting up libjson-perl (4.10000-1) ... 156s Setting up libxslt1.1:s390x (1.1.39-0exp1ubuntu2) ... 156s Setting up slony1-2-doc (2.2.11-6) ... 156s Setting up postgresql-common-dev (273) ... 156s Setting up postgresql-client-17 (17.4-1) ... 156s update-alternatives: using /usr/share/postgresql/17/man/man1/psql.1.gz to provide /usr/share/man/man1/psql.1.gz (psql.1.gz) in auto mode 156s Setting up postgresql-common (273) ... 156s Creating config file /etc/postgresql-common/createcluster.conf with new version 156s Building PostgreSQL dictionaries from installed myspell/hunspell packages... 156s Removing obsolete dictionary files: 157s Created symlink '/etc/systemd/system/multi-user.target.wants/postgresql.service' → '/usr/lib/systemd/system/postgresql.service'. 157s Setting up slony1-2-bin (2.2.11-6) ... 157s Setting up postgresql-17 (17.4-1) ... 158s Creating new PostgreSQL cluster 17/main ... 158s /usr/lib/postgresql/17/bin/initdb -D /var/lib/postgresql/17/main --auth-local peer --auth-host scram-sha-256 --no-instructions 158s The files belonging to this database system will be owned by user "postgres". 158s This user must also own the server process. 158s 158s The database cluster will be initialized with locale "C.UTF-8". 158s The default database encoding has accordingly been set to "UTF8". 158s The default text search configuration will be set to "english". 158s 158s Data page checksums are disabled. 158s 158s fixing permissions on existing directory /var/lib/postgresql/17/main ... ok 158s creating subdirectories ... ok 158s selecting dynamic shared memory implementation ... posix 158s selecting default "max_connections" ... 100 158s selecting default "shared_buffers" ... 128MB 158s selecting default time zone ... Etc/UTC 158s creating configuration files ... ok 158s running bootstrap script ... ok 158s performing post-bootstrap initialization ... ok 158s syncing data to disk ... ok 161s Setting up postgresql-17-slony1-2 (2.2.11-6) ... 161s Processing triggers for man-db (2.13.0-1) ... 162s Processing triggers for libc-bin (2.40-4ubuntu1) ... 164s autopkgtest [06:58:11]: test load-functions: [----------------------- 164s ### PostgreSQL 17 psql ### 164s Creating new PostgreSQL cluster 17/regress ... 167s create table public.sl_node ( 167s no_id int4, 167s no_active bool, 167s no_comment text, 167s no_failed bool, 167s CONSTRAINT "sl_node-pkey" 167s PRIMARY KEY (no_id) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_node is 'Holds the list of nodes associated with this namespace.'; 167s COMMENT 167s comment on column public.sl_node.no_id is 'The unique ID number for the node'; 167s COMMENT 167s comment on column public.sl_node.no_active is 'Is the node active in replication yet?'; 167s COMMENT 167s comment on column public.sl_node.no_comment is 'A human-oriented description of the node'; 167s COMMENT 167s create table public.sl_nodelock ( 167s nl_nodeid int4, 167s nl_conncnt serial, 167s nl_backendpid int4, 167s CONSTRAINT "sl_nodelock-pkey" 167s PRIMARY KEY (nl_nodeid, nl_conncnt) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_nodelock is 'Used to prevent multiple slon instances and to identify the backends to kill in terminateNodeConnections().'; 167s COMMENT 167s comment on column public.sl_nodelock.nl_nodeid is 'Clients node_id'; 167s COMMENT 167s comment on column public.sl_nodelock.nl_conncnt is 'Clients connection number'; 167s COMMENT 167s comment on column public.sl_nodelock.nl_backendpid is 'PID of database backend owning this lock'; 167s COMMENT 167s create table public.sl_set ( 167s set_id int4, 167s set_origin int4, 167s set_locked bigint, 167s set_comment text, 167s CONSTRAINT "sl_set-pkey" 167s PRIMARY KEY (set_id), 167s CONSTRAINT "set_origin-no_id-ref" 167s FOREIGN KEY (set_origin) 167s REFERENCES public.sl_node (no_id) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_set is 'Holds definitions of replication sets.'; 167s COMMENT 167s comment on column public.sl_set.set_id is 'A unique ID number for the set.'; 167s COMMENT 167s comment on column public.sl_set.set_origin is 167s 'The ID number of the source node for the replication set.'; 167s COMMENT 167s comment on column public.sl_set.set_locked is 'Transaction ID where the set was locked.'; 167s COMMENT 167s comment on column public.sl_set.set_comment is 'A human-oriented description of the set.'; 167s COMMENT 167s create table public.sl_setsync ( 167s ssy_setid int4, 167s ssy_origin int4, 167s ssy_seqno int8, 167s ssy_snapshot "pg_catalog".txid_snapshot, 167s ssy_action_list text, 167s CONSTRAINT "sl_setsync-pkey" 167s PRIMARY KEY (ssy_setid), 167s CONSTRAINT "ssy_setid-set_id-ref" 167s FOREIGN KEY (ssy_setid) 167s REFERENCES public.sl_set (set_id), 167s CONSTRAINT "ssy_origin-no_id-ref" 167s FOREIGN KEY (ssy_origin) 167s REFERENCES public.sl_node (no_id) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_setsync is 'SYNC information'; 167s COMMENT 167s comment on column public.sl_setsync.ssy_setid is 'ID number of the replication set'; 167s COMMENT 167s comment on column public.sl_setsync.ssy_origin is 'ID number of the node'; 167s COMMENT 167s comment on column public.sl_setsync.ssy_seqno is 'Slony-I sequence number'; 167s COMMENT 167s comment on column public.sl_setsync.ssy_snapshot is 'TXID in provider system seen by the event'; 167s COMMENT 167s comment on column public.sl_setsync.ssy_action_list is 'action list used during the subscription process. At the time a subscriber copies over data from the origin, it sees all tables in a state somewhere between two SYNC events. Therefore this list must contains all log_actionseqs that are visible at that time, whose operations have therefore already been included in the data copied at the time the initial data copy is done. Those actions may therefore be filtered out of the first SYNC done after subscribing.'; 167s COMMENT 167s create table public.sl_table ( 167s tab_id int4, 167s tab_reloid oid UNIQUE NOT NULL, 167s tab_relname name NOT NULL, 167s tab_nspname name NOT NULL, 167s tab_set int4, 167s tab_idxname name NOT NULL, 167s tab_altered boolean NOT NULL, 167s tab_comment text, 167s CONSTRAINT "sl_table-pkey" 167s PRIMARY KEY (tab_id), 167s CONSTRAINT "tab_set-set_id-ref" 167s FOREIGN KEY (tab_set) 167s REFERENCES public.sl_set (set_id) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_table is 'Holds information about the tables being replicated.'; 167s COMMENT 167s comment on column public.sl_table.tab_id is 'Unique key for Slony-I to use to identify the table'; 167s COMMENT 167s comment on column public.sl_table.tab_reloid is 'The OID of the table in pg_catalog.pg_class.oid'; 167s COMMENT 167s comment on column public.sl_table.tab_relname is 'The name of the table in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 167s COMMENT 167s comment on column public.sl_table.tab_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 167s COMMENT 167s comment on column public.sl_table.tab_set is 'ID of the replication set the table is in'; 167s COMMENT 167s comment on column public.sl_table.tab_idxname is 'The name of the primary index of the table'; 167s COMMENT 167s comment on column public.sl_table.tab_altered is 'Has the table been modified for replication?'; 167s COMMENT 167s comment on column public.sl_table.tab_comment is 'Human-oriented description of the table'; 167s COMMENT 167s create table public.sl_sequence ( 167s seq_id int4, 167s seq_reloid oid UNIQUE NOT NULL, 167s seq_relname name NOT NULL, 167s seq_nspname name NOT NULL, 167s seq_set int4, 167s seq_comment text, 167s CONSTRAINT "sl_sequence-pkey" 167s PRIMARY KEY (seq_id), 167s CONSTRAINT "seq_set-set_id-ref" 167s FOREIGN KEY (seq_set) 167s REFERENCES public.sl_set (set_id) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_sequence is 'Similar to sl_table, each entry identifies a sequence being replicated.'; 167s COMMENT 167s comment on column public.sl_sequence.seq_id is 'An internally-used ID for Slony-I to use in its sequencing of updates'; 167s COMMENT 167s comment on column public.sl_sequence.seq_reloid is 'The OID of the sequence object'; 167s COMMENT 167s comment on column public.sl_sequence.seq_relname is 'The name of the sequence in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 167s COMMENT 167s comment on column public.sl_sequence.seq_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 167s COMMENT 167s comment on column public.sl_sequence.seq_set is 'Indicates which replication set the object is in'; 167s COMMENT 167s comment on column public.sl_sequence.seq_comment is 'A human-oriented comment'; 167s COMMENT 167s create table public.sl_path ( 167s pa_server int4, 167s pa_client int4, 167s pa_conninfo text NOT NULL, 167s pa_connretry int4, 167s CONSTRAINT "sl_path-pkey" 167s PRIMARY KEY (pa_server, pa_client), 167s CONSTRAINT "pa_server-no_id-ref" 167s FOREIGN KEY (pa_server) 167s REFERENCES public.sl_node (no_id), 167s CONSTRAINT "pa_client-no_id-ref" 167s FOREIGN KEY (pa_client) 167s REFERENCES public.sl_node (no_id) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_path is 'Holds connection information for the paths between nodes, and the synchronisation delay'; 167s COMMENT 167s comment on column public.sl_path.pa_server is 'The Node ID # (from sl_node.no_id) of the data source'; 167s COMMENT 167s comment on column public.sl_path.pa_client is 'The Node ID # (from sl_node.no_id) of the data target'; 167s COMMENT 167s comment on column public.sl_path.pa_conninfo is 'The PostgreSQL connection string used to connect to the source node.'; 167s COMMENT 167s comment on column public.sl_path.pa_connretry is 'The synchronisation delay, in seconds'; 167s COMMENT 167s create table public.sl_listen ( 167s li_origin int4, 167s li_provider int4, 167s li_receiver int4, 167s CONSTRAINT "sl_listen-pkey" 167s PRIMARY KEY (li_origin, li_provider, li_receiver), 167s CONSTRAINT "li_origin-no_id-ref" 167s FOREIGN KEY (li_origin) 167s REFERENCES public.sl_node (no_id), 167s CONSTRAINT "sl_listen-sl_path-ref" 167s FOREIGN KEY (li_provider, li_receiver) 167s REFERENCES public.sl_path (pa_server, pa_client) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_listen is 'Indicates how nodes listen to events from other nodes in the Slony-I network.'; 167s COMMENT 167s comment on column public.sl_listen.li_origin is 'The ID # (from sl_node.no_id) of the node this listener is operating on'; 167s COMMENT 167s comment on column public.sl_listen.li_provider is 'The ID # (from sl_node.no_id) of the source node for this listening event'; 167s COMMENT 167s comment on column public.sl_listen.li_receiver is 'The ID # (from sl_node.no_id) of the target node for this listening event'; 167s COMMENT 167s create table public.sl_subscribe ( 167s sub_set int4, 167s sub_provider int4, 167s sub_receiver int4, 167s sub_forward bool, 167s sub_active bool, 167s CONSTRAINT "sl_subscribe-pkey" 167s PRIMARY KEY (sub_receiver, sub_set), 167s CONSTRAINT "sl_subscribe-sl_path-ref" 167s FOREIGN KEY (sub_provider, sub_receiver) 167s REFERENCES public.sl_path (pa_server, pa_client), 167s CONSTRAINT "sub_set-set_id-ref" 167s FOREIGN KEY (sub_set) 167s REFERENCES public.sl_set (set_id) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_subscribe is 'Holds a list of subscriptions on sets'; 167s COMMENT 167s comment on column public.sl_subscribe.sub_set is 'ID # (from sl_set) of the set being subscribed to'; 167s COMMENT 167s comment on column public.sl_subscribe.sub_provider is 'ID# (from sl_node) of the node providing data'; 167s COMMENT 167s comment on column public.sl_subscribe.sub_receiver is 'ID# (from sl_node) of the node receiving data from the provider'; 167s COMMENT 167s comment on column public.sl_subscribe.sub_forward is 'Does this provider keep data in sl_log_1/sl_log_2 to allow it to be a provider for other nodes?'; 167s COMMENT 167s comment on column public.sl_subscribe.sub_active is 'Has this subscription been activated? This is not set on the subscriber until AFTER the subscriber has received COPY data from the provider'; 167s COMMENT 167s create table public.sl_event ( 167s ev_origin int4, 167s ev_seqno int8, 167s ev_timestamp timestamptz, 167s ev_snapshot "pg_catalog".txid_snapshot, 167s ev_type text, 167s ev_data1 text, 167s ev_data2 text, 167s ev_data3 text, 167s ev_data4 text, 167s ev_data5 text, 167s ev_data6 text, 167s ev_data7 text, 167s ev_data8 text, 167s CONSTRAINT "sl_event-pkey" 167s PRIMARY KEY (ev_origin, ev_seqno) 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_event is 'Holds information about replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_confirm table.'; 167s COMMENT 167s comment on column public.sl_event.ev_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 167s COMMENT 167s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 167s COMMENT 167s comment on column public.sl_event.ev_timestamp is 'When this event record was created'; 167s COMMENT 167s comment on column public.sl_event.ev_snapshot is 'TXID snapshot on provider node for this event'; 167s COMMENT 167s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 167s COMMENT 167s comment on column public.sl_event.ev_type is 'The type of event this record is for. 167s SYNC = Synchronise 167s STORE_NODE = 167s ENABLE_NODE = 167s DROP_NODE = 167s STORE_PATH = 167s DROP_PATH = 167s STORE_LISTEN = 167s DROP_LISTEN = 167s STORE_SET = 167s DROP_SET = 167s MERGE_SET = 167s SET_ADD_TABLE = 167s SET_ADD_SEQUENCE = 167s STORE_TRIGGER = 167s DROP_TRIGGER = 167s MOVE_SET = 167s ACCEPT_SET = 167s SET_DROP_TABLE = 167s SET_DROP_SEQUENCE = 167s SET_MOVE_TABLE = 167s SET_MOVE_SEQUENCE = 167s FAILOVER_SET = 167s SUBSCRIBE_SET = 167s ENABLE_SUBSCRIPTION = 167s UNSUBSCRIBE_SET = 167s DDL_SCRIPT = 167s ADJUST_SEQ = 167s RESET_CONFIG = 167s '; 167s COMMENT 167s comment on column public.sl_event.ev_data1 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s comment on column public.sl_event.ev_data2 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s comment on column public.sl_event.ev_data3 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s comment on column public.sl_event.ev_data4 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s comment on column public.sl_event.ev_data5 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s comment on column public.sl_event.ev_data6 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s comment on column public.sl_event.ev_data7 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s comment on column public.sl_event.ev_data8 is 'Data field containing an argument needed to process the event'; 167s COMMENT 167s create table public.sl_confirm ( 167s con_origin int4, 167s con_received int4, 167s con_seqno int8, 167s con_timestamp timestamptz DEFAULT timeofday()::timestamptz 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_confirm is 'Holds confirmation of replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_event table.'; 167s COMMENT 167s comment on column public.sl_confirm.con_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 167s COMMENT 167s comment on column public.sl_confirm.con_seqno is 'The ID # for the event'; 167s COMMENT 167s comment on column public.sl_confirm.con_timestamp is 'When this event was confirmed'; 167s COMMENT 167s create index sl_confirm_idx1 on public.sl_confirm 167s (con_origin, con_received, con_seqno); 167s CREATE INDEX 167s create index sl_confirm_idx2 on public.sl_confirm 167s (con_received, con_seqno); 167s CREATE INDEX 167s create table public.sl_seqlog ( 167s seql_seqid int4, 167s seql_origin int4, 167s seql_ev_seqno int8, 167s seql_last_value int8 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_seqlog is 'Log of Sequence updates'; 167s COMMENT 167s comment on column public.sl_seqlog.seql_seqid is 'Sequence ID'; 167s COMMENT 167s comment on column public.sl_seqlog.seql_origin is 'Publisher node at which the sequence originates'; 167s COMMENT 167s comment on column public.sl_seqlog.seql_ev_seqno is 'Slony-I Event with which this sequence update is associated'; 167s COMMENT 167s comment on column public.sl_seqlog.seql_last_value is 'Last value published for this sequence'; 167s COMMENT 167s create index sl_seqlog_idx on public.sl_seqlog 167s (seql_origin, seql_ev_seqno, seql_seqid); 167s CREATE INDEX 167s create function public.sequenceLastValue(p_seqname text) returns int8 167s as $$ 167s declare 167s v_seq_row record; 167s begin 167s for v_seq_row in execute 'select last_value from ' || public.slon_quote_input(p_seqname) 167s loop 167s return v_seq_row.last_value; 167s end loop; 167s 167s -- not reached 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.sequenceLastValue(p_seqname text) is 167s 'sequenceLastValue(p_seqname) 167s 167s Utility function used in sl_seqlastvalue view to compactly get the 167s last value from the requested sequence.'; 167s COMMENT 167s create table public.sl_log_1 ( 167s log_origin int4, 167s log_txid bigint, 167s log_tableid int4, 167s log_actionseq int8, 167s log_tablenspname text, 167s log_tablerelname text, 167s log_cmdtype "char", 167s log_cmdupdncols int4, 167s log_cmdargs text[] 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s create index sl_log_1_idx1 on public.sl_log_1 167s (log_origin, log_txid, log_actionseq); 167s CREATE INDEX 167s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 167s COMMENT 167s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 167s COMMENT 167s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 167s COMMENT 167s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 167s COMMENT 167s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 167s COMMENT 167s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 167s COMMENT 167s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 167s COMMENT 167s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 167s COMMENT 167s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 167s COMMENT 167s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 167s COMMENT 167s create table public.sl_log_2 ( 167s log_origin int4, 167s log_txid bigint, 167s log_tableid int4, 167s log_actionseq int8, 167s log_tablenspname text, 167s log_tablerelname text, 167s log_cmdtype "char", 167s log_cmdupdncols int4, 167s log_cmdargs text[] 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s create index sl_log_2_idx1 on public.sl_log_2 167s (log_origin, log_txid, log_actionseq); 167s CREATE INDEX 167s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 167s COMMENT 167s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 167s COMMENT 167s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 167s COMMENT 167s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 167s COMMENT 167s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 167s COMMENT 167s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 167s COMMENT 167s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 167s COMMENT 167s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 167s COMMENT 167s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 167s COMMENT 167s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 167s COMMENT 167s create table public.sl_log_script ( 167s log_origin int4, 167s log_txid bigint, 167s log_actionseq int8, 167s log_cmdtype "char", 167s log_cmdargs text[] 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s create index sl_log_script_idx1 on public.sl_log_script 167s (log_origin, log_txid, log_actionseq); 167s CREATE INDEX 167s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 167s COMMENT 167s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 167s COMMENT 167s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 167s COMMENT 167s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 167s COMMENT 167s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 167s COMMENT 167s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 167s COMMENT 167s create table public.sl_registry ( 167s reg_key text primary key, 167s reg_int4 int4, 167s reg_text text, 167s reg_timestamp timestamptz 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s comment on table public.sl_registry is 'Stores miscellaneous runtime data'; 167s COMMENT 167s comment on column public.sl_registry.reg_key is 'Unique key of the runtime option'; 167s COMMENT 167s comment on column public.sl_registry.reg_int4 is 'Option value if type int4'; 167s COMMENT 167s comment on column public.sl_registry.reg_text is 'Option value if type text'; 167s COMMENT 167s comment on column public.sl_registry.reg_timestamp is 'Option value if type timestamp'; 167s COMMENT 167s create table public.sl_apply_stats ( 167s as_origin int4, 167s as_num_insert int8, 167s as_num_update int8, 167s as_num_delete int8, 167s as_num_truncate int8, 167s as_num_script int8, 167s as_num_total int8, 167s as_duration interval, 167s as_apply_first timestamptz, 167s as_apply_last timestamptz, 167s as_cache_prepare int8, 167s as_cache_hit int8, 167s as_cache_evict int8, 167s as_cache_prepare_max int8 167s ) WITHOUT OIDS; 167s CREATE TABLE 167s create index sl_apply_stats_idx1 on public.sl_apply_stats 167s (as_origin); 167s CREATE INDEX 167s comment on table public.sl_apply_stats is 'Local SYNC apply statistics (running totals)'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_origin is 'Origin of the SYNCs'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_num_insert is 'Number of INSERT operations performed'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_num_update is 'Number of UPDATE operations performed'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_num_delete is 'Number of DELETE operations performed'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_num_truncate is 'Number of TRUNCATE operations performed'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_num_script is 'Number of DDL operations performed'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_num_total is 'Total number of operations'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_duration is 'Processing time'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_apply_first is 'Timestamp of first recorded SYNC'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_apply_last is 'Timestamp of most recent recorded SYNC'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_cache_evict is 'Number of apply query cache evict operations'; 167s COMMENT 167s comment on column public.sl_apply_stats.as_cache_prepare_max is 'Maximum number of apply queries prepared in one SYNC group'; 167s COMMENT 167s create view public.sl_seqlastvalue as 167s select SQ.seq_id, SQ.seq_set, SQ.seq_reloid, 167s S.set_origin as seq_origin, 167s public.sequenceLastValue( 167s "pg_catalog".quote_ident(PGN.nspname) || '.' || 167s "pg_catalog".quote_ident(PGC.relname)) as seq_last_value 167s from public.sl_sequence SQ, public.sl_set S, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where S.set_id = SQ.seq_set 167s and PGC.oid = SQ.seq_reloid and PGN.oid = PGC.relnamespace; 167s CREATE VIEW 167s create view public.sl_failover_targets as 167s select set_id, 167s set_origin as set_origin, 167s sub1.sub_receiver as backup_id 167s FROM 167s public.sl_subscribe sub1 167s ,public.sl_set set1 167s where 167s sub1.sub_set=set_id 167s and sub1.sub_forward=true 167s --exclude candidates where the set_origin 167s --has a path a node but the failover 167s --candidate has no path to that node 167s and sub1.sub_receiver not in 167s (select p1.pa_client from 167s public.sl_path p1 167s left outer join public.sl_path p2 on 167s (p2.pa_client=p1.pa_client 167s and p2.pa_server=sub1.sub_receiver) 167s where p2.pa_client is null 167s and p1.pa_server=set_origin 167s and p1.pa_client<>sub1.sub_receiver 167s ) 167s and sub1.sub_provider=set_origin 167s --exclude any subscribers that are not 167s --direct subscribers of all sets on the 167s --origin 167s and sub1.sub_receiver not in 167s (select direct_recv.sub_receiver 167s from 167s 167s (--all direct receivers of the first set 167s select subs2.sub_receiver 167s from public.sl_subscribe subs2 167s where subs2.sub_provider=set1.set_origin 167s and subs2.sub_set=set1.set_id) as 167s direct_recv 167s inner join 167s (--all other sets from the origin 167s select set_id from public.sl_set set2 167s where set2.set_origin=set1.set_origin 167s and set2.set_id<>sub1.sub_set) 167s as othersets on(true) 167s left outer join public.sl_subscribe subs3 167s on(subs3.sub_set=othersets.set_id 167s and subs3.sub_forward=true 167s and subs3.sub_provider=set1.set_origin 167s and direct_recv.sub_receiver=subs3.sub_receiver) 167s where subs3.sub_receiver is null 167s ); 167s CREATE VIEW 167s create sequence public.sl_local_node_id 167s MINVALUE -1; 167s CREATE SEQUENCE 167s SELECT setval('public.sl_local_node_id', -1); 167s setval 167s -------- 167s -1 167s (1 row) 167s 167s comment on sequence public.sl_local_node_id is 'The local node ID is initialized to -1, meaning that this node is not initialized yet.'; 167s COMMENT 167s create sequence public.sl_event_seq; 167s CREATE SEQUENCE 167s comment on sequence public.sl_event_seq is 'The sequence for numbering events originating from this node.'; 167s COMMENT 167s select setval('public.sl_event_seq', 5000000000); 167s setval 167s ------------ 167s 5000000000 167s (1 row) 167s 167s create sequence public.sl_action_seq; 167s CREATE SEQUENCE 167s comment on sequence public.sl_action_seq is 'The sequence to number statements in the transaction logs, so that the replication engines can figure out the "agreeable" order of statements.'; 167s COMMENT 167s create sequence public.sl_log_status 167s MINVALUE 0 MAXVALUE 3; 167s CREATE SEQUENCE 167s SELECT setval('public.sl_log_status', 0); 167s setval 167s -------- 167s 0 167s (1 row) 167s 167s comment on sequence public.sl_log_status is ' 167s Bit 0x01 determines the currently active log table 167s Bit 0x02 tells if the engine needs to read both logs 167s after switching until the old log is clean and truncated. 167s 167s Possible values: 167s 0 sl_log_1 active, sl_log_2 clean 167s 1 sl_log_2 active, sl_log_1 clean 167s 2 sl_log_1 active, sl_log_2 unknown - cleanup 167s 3 sl_log_2 active, sl_log_1 unknown - cleanup 167s 167s This is not yet in use. 167s '; 167s COMMENT 167s create table public.sl_config_lock ( 167s dummy integer 167s ); 167s CREATE TABLE 167s comment on table public.sl_config_lock is 'This table exists solely to prevent overlapping execution of configuration change procedures and the resulting possible deadlocks. 167s '; 167s COMMENT 167s comment on column public.sl_config_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 167s COMMENT 167s create table public.sl_event_lock ( 167s dummy integer 167s ); 167s CREATE TABLE 167s comment on table public.sl_event_lock is 'This table exists solely to prevent multiple connections from concurrently creating new events and perhaps getting them out of order.'; 167s COMMENT 167s comment on column public.sl_event_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 167s COMMENT 167s create table public.sl_archive_counter ( 167s ac_num bigint, 167s ac_timestamp timestamptz 167s ) without oids; 167s CREATE TABLE 167s comment on table public.sl_archive_counter is 'Table used to generate the log shipping archive number. 167s '; 167s COMMENT 167s comment on column public.sl_archive_counter.ac_num is 'Counter of SYNC ID used in log shipping as the archive number'; 167s COMMENT 167s comment on column public.sl_archive_counter.ac_timestamp is 'Time at which the archive log was generated on the subscriber'; 167s COMMENT 167s insert into public.sl_archive_counter (ac_num, ac_timestamp) 167s values (0, 'epoch'::timestamptz); 167s INSERT 0 1 167s create table public.sl_components ( 167s co_actor text not null primary key, 167s co_pid integer not null, 167s co_node integer not null, 167s co_connection_pid integer not null, 167s co_activity text, 167s co_starttime timestamptz not null, 167s co_event bigint, 167s co_eventtype text 167s ) without oids; 167s CREATE TABLE 167s comment on table public.sl_components is 'Table used to monitor what various slon/slonik components are doing'; 167s COMMENT 167s comment on column public.sl_components.co_actor is 'which component am I?'; 167s COMMENT 167s comment on column public.sl_components.co_pid is 'my process/thread PID on node where slon runs'; 167s COMMENT 167s comment on column public.sl_components.co_node is 'which node am I servicing?'; 167s COMMENT 167s comment on column public.sl_components.co_connection_pid is 'PID of database connection being used on database server'; 167s COMMENT 167s comment on column public.sl_components.co_activity is 'activity that I am up to'; 167s COMMENT 167s comment on column public.sl_components.co_starttime is 'when did my activity begin? (timestamp reported as per slon process on server running slon)'; 167s COMMENT 167s comment on column public.sl_components.co_eventtype is 'what kind of event am I processing? (commonly n/a for event loop main threads)'; 167s COMMENT 167s comment on column public.sl_components.co_event is 'which event have I started processing?'; 167s COMMENT 167s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 167s $BODY$ 167s DECLARE 167s c_delim text; 167s BEGIN 167s c_delim = ','; 167s IF (txt_before IS NULL or txt_before='') THEN 167s RETURN txt_new; 167s END IF; 167s RETURN txt_before || c_delim || txt_new; 167s END; 167s $BODY$ 167s LANGUAGE plpgsql; 167s CREATE FUNCTION 167s comment on function public.agg_text_sum(text,text) is 167s 'An accumulator function used by the slony string_agg function to 167s aggregate rows into a string'; 167s COMMENT 167s CREATE AGGREGATE public.string_agg(text) ( 167s SFUNC=public.agg_text_sum, 167s STYPE=text, 167s INITCOND='' 167s ); 167s CREATE AGGREGATE 167s grant usage on schema public to public; 167s GRANT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s NOTICE: checked validity of cluster main namespace - OK! 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) 167s returns bigint 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 167s language C 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) is 167s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 167s 167s Create an sl_event entry'; 167s COMMENT 167s create or replace function public.denyAccess () 167s returns trigger 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__denyAccess' 167s language C 167s security definer; 167s CREATE FUNCTION 167s comment on function public.denyAccess () is 167s 'Trigger function to prevent modifications to a table on a subscriber'; 167s COMMENT 167s grant execute on function public.denyAccess () to public; 167s GRANT 167s create or replace function public.lockedSet () 167s returns trigger 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__lockedSet' 167s language C; 167s CREATE FUNCTION 167s comment on function public.lockedSet () is 167s 'Trigger function to prevent modifications to a table before and after a moveSet()'; 167s COMMENT 167s create or replace function public.getLocalNodeId (p_cluster name) returns int4 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getLocalNodeId' 167s language C 167s security definer; 167s CREATE FUNCTION 167s grant execute on function public.getLocalNodeId (p_cluster name) to public; 167s GRANT 167s comment on function public.getLocalNodeId (p_cluster name) is 167s 'Returns the node ID of the node being serviced on the local database'; 167s COMMENT 167s create or replace function public.getModuleVersion () returns text 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getModuleVersion' 167s language C 167s security definer; 167s CREATE FUNCTION 167s grant execute on function public.getModuleVersion () to public; 167s GRANT 167s comment on function public.getModuleVersion () is 167s 'Returns the compiled-in version number of the Slony-I shared object'; 167s COMMENT 167s create or replace function public.resetSession() returns text 167s as '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__resetSession' 167s language C; 167s CREATE FUNCTION 167s create or replace function public.logApply () returns trigger 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApply' 167s language C 167s security definer; 167s CREATE FUNCTION 167s create or replace function public.logApplySetCacheSize (p_size int4) 167s returns int4 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySetCacheSize' 167s language C; 167s CREATE FUNCTION 167s create or replace function public.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval) 167s returns int4 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySaveStats' 167s language C; 167s CREATE FUNCTION 167s create or replace function public.checkmoduleversion () returns text as $$ 167s declare 167s moduleversion text; 167s begin 167s select into moduleversion public.getModuleVersion(); 167s if moduleversion <> '2.2.11' then 167s raise exception 'Slonik version: 2.2.11 != Slony-I version in PG build %', 167s moduleversion; 167s end if; 167s return null; 167s end;$$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.checkmoduleversion () is 167s 'Inline test function that verifies that slonik request for STORE 167s NODE/INIT CLUSTER is being run against a conformant set of 167s schema/functions.'; 167s COMMENT 167s select public.checkmoduleversion(); 167s checkmoduleversion 167s -------------------- 167s 167s (1 row) 167s 167s create or replace function public.decode_tgargs(bytea) returns text[] as 167s '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__slon_decode_tgargs' language C security definer; 167s CREATE FUNCTION 167s comment on function public.decode_tgargs(bytea) is 167s 'Translates the contents of pg_trigger.tgargs to an array of text arguments'; 167s COMMENT 167s grant execute on function public.decode_tgargs(bytea) to public; 167s GRANT 167s create or replace function public.check_namespace_validity () returns boolean as $$ 167s declare 167s c_cluster text; 167s begin 167s c_cluster := 'main'; 167s if c_cluster !~ E'^[[:alpha:]_][[:alnum:]_\$]{0,62}$' then 167s raise exception 'Cluster name % is not a valid SQL symbol!', c_cluster; 167s else 167s raise notice 'checked validity of cluster % namespace - OK!', c_cluster; 167s end if; 167s return 't'; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s select public.check_namespace_validity(); 167s check_namespace_validity 167s -------------------------- 167s t 167s (1 row) 167s 167s drop function public.check_namespace_validity(); 167s DROP FUNCTION 167s create or replace function public.logTrigger () returns trigger 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logTrigger' 167s language C 167s security definer; 167s CREATE FUNCTION 167s comment on function public.logTrigger () is 167s 'This is the trigger that is executed on the origin node that causes 167s updates to be recorded in sl_log_1/sl_log_2.'; 167s COMMENT 167s grant execute on function public.logTrigger () to public; 167s GRANT 167s create or replace function public.terminateNodeConnections (p_failed_node int4) returns int4 167s as $$ 167s declare 167s v_row record; 167s begin 167s for v_row in select nl_nodeid, nl_conncnt, 167s nl_backendpid from public.sl_nodelock 167s where nl_nodeid = p_failed_node for update 167s loop 167s perform public.killBackend(v_row.nl_backendpid, 'TERM'); 167s delete from public.sl_nodelock 167s where nl_nodeid = v_row.nl_nodeid 167s and nl_conncnt = v_row.nl_conncnt; 167s end loop; 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.terminateNodeConnections (p_failed_node int4) is 167s 'terminates all backends that have registered to be from the given node'; 167s COMMENT 167s create or replace function public.killBackend (p_pid int4, p_signame text) returns int4 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__killBackend' 167s language C; 167s CREATE FUNCTION 167s comment on function public.killBackend(p_pid int4, p_signame text) is 167s 'Send a signal to a postgres process. Requires superuser rights'; 167s COMMENT 167s create or replace function public.seqtrack (p_seqid int4, p_seqval int8) returns int8 167s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__seqtrack' 167s strict language C; 167s CREATE FUNCTION 167s comment on function public.seqtrack(p_seqid int4, p_seqval int8) is 167s 'Returns NULL if seqval has not changed since the last call for seqid'; 167s COMMENT 167s create or replace function public.slon_quote_brute(p_tab_fqname text) returns text 167s as $$ 167s declare 167s v_fqname text default ''; 167s begin 167s v_fqname := '"' || replace(p_tab_fqname,'"','""') || '"'; 167s return v_fqname; 167s end; 167s $$ language plpgsql immutable; 167s CREATE FUNCTION 167s comment on function public.slon_quote_brute(p_tab_fqname text) is 167s 'Brutally quote the given text'; 167s COMMENT 167s create or replace function public.slon_quote_input(p_tab_fqname text) returns text as $$ 167s declare 167s v_nsp_name text; 167s v_tab_name text; 167s v_i integer; 167s v_l integer; 167s v_pq2 integer; 167s begin 167s v_l := length(p_tab_fqname); 167s 167s -- Let us search for the dot 167s if p_tab_fqname like '"%' then 167s -- if the first part of the ident starts with a double quote, search 167s -- for the closing double quote, skipping over double double quotes. 167s v_i := 2; 167s while v_i <= v_l loop 167s if substr(p_tab_fqname, v_i, 1) != '"' then 167s v_i := v_i + 1; 167s else 167s v_i := v_i + 1; 167s if substr(p_tab_fqname, v_i, 1) != '"' then 167s exit; 167s end if; 167s v_i := v_i + 1; 167s end if; 167s end loop; 167s else 167s -- first part of ident is not quoted, search for the dot directly 167s v_i := 1; 167s while v_i <= v_l loop 167s if substr(p_tab_fqname, v_i, 1) = '.' then 167s exit; 167s end if; 167s v_i := v_i + 1; 167s end loop; 167s end if; 167s 167s -- v_i now points at the dot or behind the string. 167s 167s if substr(p_tab_fqname, v_i, 1) = '.' then 167s -- There is a dot now, so split the ident into its namespace 167s -- and objname parts and make sure each is quoted 167s v_nsp_name := substr(p_tab_fqname, 1, v_i - 1); 167s v_tab_name := substr(p_tab_fqname, v_i + 1); 167s if v_nsp_name not like '"%' then 167s v_nsp_name := '"' || replace(v_nsp_name, '"', '""') || 167s '"'; 167s end if; 167s if v_tab_name not like '"%' then 167s v_tab_name := '"' || replace(v_tab_name, '"', '""') || 167s '"'; 167s end if; 167s 167s return v_nsp_name || '.' || v_tab_name; 167s else 167s -- No dot ... must be just an ident without schema 167s if p_tab_fqname like '"%' then 167s return p_tab_fqname; 167s else 167s return '"' || replace(p_tab_fqname, '"', '""') || '"'; 167s end if; 167s end if; 167s 167s end;$$ language plpgsql immutable; 167s CREATE FUNCTION 167s comment on function public.slon_quote_input(p_text text) is 167s 'quote all words that aren''t quoted yet'; 167s COMMENT 167s create or replace function public.slonyVersionMajor() 167s returns int4 167s as $$ 167s begin 167s return 2; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.slonyVersionMajor () is 167s 'Returns the major version number of the slony schema'; 167s COMMENT 167s create or replace function public.slonyVersionMinor() 167s returns int4 167s as $$ 167s begin 167s return 2; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.slonyVersionMinor () is 167s 'Returns the minor version number of the slony schema'; 167s COMMENT 167s create or replace function public.slonyVersionPatchlevel() 167s returns int4 167s as $$ 167s begin 167s return 11; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.slonyVersionPatchlevel () is 167s 'Returns the version patch level of the slony schema'; 167s COMMENT 167s create or replace function public.slonyVersion() 167s returns text 167s as $$ 167s begin 167s return public.slonyVersionMajor()::text || '.' || 167s public.slonyVersionMinor()::text || '.' || 167s public.slonyVersionPatchlevel()::text ; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.slonyVersion() is 167s 'Returns the version number of the slony schema'; 167s COMMENT 167s create or replace function public.registry_set_int4(p_key text, p_value int4) 167s returns int4 as $$ 167s BEGIN 167s if p_value is null then 167s delete from public.sl_registry 167s where reg_key = p_key; 167s else 167s lock table public.sl_registry; 167s update public.sl_registry 167s set reg_int4 = p_value 167s where reg_key = p_key; 167s if not found then 167s insert into public.sl_registry (reg_key, reg_int4) 167s values (p_key, p_value); 167s end if; 167s end if; 167s return p_value; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.registry_set_int4(p_key text, p_value int4) is 167s 'registry_set_int4(key, value) 167s 167s Set or delete a registry value'; 167s COMMENT 167s create or replace function public.registry_get_int4(p_key text, p_default int4) 167s returns int4 as $$ 167s DECLARE 167s v_value int4; 167s BEGIN 167s select reg_int4 into v_value from public.sl_registry 167s where reg_key = p_key; 167s if not found then 167s v_value = p_default; 167s if p_default notnull then 167s perform public.registry_set_int4(p_key, p_default); 167s end if; 167s else 167s if v_value is null then 167s raise exception 'Slony-I: registry key % is not an int4 value', 167s p_key; 167s end if; 167s end if; 167s return v_value; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.registry_get_int4(p_key text, p_default int4) is 167s 'registry_get_int4(key, value) 167s 167s Get a registry value. If not present, set and return the default.'; 167s COMMENT 167s create or replace function public.registry_set_text(p_key text, p_value text) 167s returns text as $$ 167s BEGIN 167s if p_value is null then 167s delete from public.sl_registry 167s where reg_key = p_key; 167s else 167s lock table public.sl_registry; 167s update public.sl_registry 167s set reg_text = p_value 167s where reg_key = p_key; 167s if not found then 167s insert into public.sl_registry (reg_key, reg_text) 167s values (p_key, p_value); 167s end if; 167s end if; 167s return p_value; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.registry_set_text(text, text) is 167s 'registry_set_text(key, value) 167s 167s Set or delete a registry value'; 167s COMMENT 167s create or replace function public.registry_get_text(p_key text, p_default text) 167s returns text as $$ 167s DECLARE 167s v_value text; 167s BEGIN 167s select reg_text into v_value from public.sl_registry 167s where reg_key = p_key; 167s if not found then 167s v_value = p_default; 167s if p_default notnull then 167s perform public.registry_set_text(p_key, p_default); 167s end if; 167s else 167s if v_value is null then 167s raise exception 'Slony-I: registry key % is not a text value', 167s p_key; 167s end if; 167s end if; 167s return v_value; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.registry_get_text(p_key text, p_default text) is 167s 'registry_get_text(key, value) 167s 167s Get a registry value. If not present, set and return the default.'; 167s COMMENT 167s create or replace function public.registry_set_timestamp(p_key text, p_value timestamptz) 167s returns timestamp as $$ 167s BEGIN 167s if p_value is null then 167s delete from public.sl_registry 167s where reg_key = p_key; 167s else 167s lock table public.sl_registry; 167s update public.sl_registry 167s set reg_timestamp = p_value 167s where reg_key = p_key; 167s if not found then 167s insert into public.sl_registry (reg_key, reg_timestamp) 167s values (p_key, p_value); 167s end if; 167s end if; 167s return p_value; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.registry_set_timestamp(p_key text, p_value timestamptz) is 167s 'registry_set_timestamp(key, value) 167s 167s Set or delete a registry value'; 167s COMMENT 167s create or replace function public.registry_get_timestamp(p_key text, p_default timestamptz) 167s returns timestamp as $$ 167s DECLARE 167s v_value timestamp; 167s BEGIN 167s select reg_timestamp into v_value from public.sl_registry 167s where reg_key = p_key; 167s if not found then 167s v_value = p_default; 167s if p_default notnull then 167s perform public.registry_set_timestamp(p_key, p_default); 167s end if; 167s else 167s if v_value is null then 167s raise exception 'Slony-I: registry key % is not an timestamp value', 167s p_key; 167s end if; 167s end if; 167s return v_value; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.registry_get_timestamp(p_key text, p_default timestamptz) is 167s 'registry_get_timestamp(key, value) 167s 167s Get a registry value. If not present, set and return the default.'; 167s COMMENT 167s create or replace function public.cleanupNodelock () 167s returns int4 167s as $$ 167s declare 167s v_row record; 167s begin 167s for v_row in select nl_nodeid, nl_conncnt, nl_backendpid 167s from public.sl_nodelock 167s for update 167s loop 167s if public.killBackend(v_row.nl_backendpid, 'NULL') < 0 then 167s raise notice 'Slony-I: cleanup stale sl_nodelock entry for pid=%', 167s v_row.nl_backendpid; 167s delete from public.sl_nodelock where 167s nl_nodeid = v_row.nl_nodeid and 167s nl_conncnt = v_row.nl_conncnt; 167s end if; 167s end loop; 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.cleanupNodelock() is 167s 'Clean up stale entries when restarting slon'; 167s COMMENT 167s create or replace function public.registerNodeConnection (p_nodeid int4) 167s returns int4 167s as $$ 167s begin 167s insert into public.sl_nodelock 167s (nl_nodeid, nl_backendpid) 167s values 167s (p_nodeid, pg_backend_pid()); 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.registerNodeConnection (p_nodeid int4) is 167s 'Register (uniquely) the node connection so that only one slon can service the node'; 167s COMMENT 167s create or replace function public.initializeLocalNode (p_local_node_id int4, p_comment text) 167s returns int4 167s as $$ 167s declare 167s v_old_node_id int4; 167s v_first_log_no int4; 167s v_event_seq int8; 167s begin 167s -- ---- 167s -- Make sure this node is uninitialized or got reset 167s -- ---- 167s select last_value::int4 into v_old_node_id from public.sl_local_node_id; 167s if v_old_node_id != -1 then 167s raise exception 'Slony-I: This node is already initialized'; 167s end if; 167s 167s -- ---- 167s -- Set sl_local_node_id to the requested value and add our 167s -- own system to sl_node. 167s -- ---- 167s perform setval('public.sl_local_node_id', p_local_node_id); 167s perform public.storeNode_int (p_local_node_id, p_comment); 167s 167s if (pg_catalog.current_setting('max_identifier_length')::integer - pg_catalog.length('public')) < 5 then 167s raise notice 'Slony-I: Cluster name length [%] versus system max_identifier_length [%] ', pg_catalog.length('public'), pg_catalog.current_setting('max_identifier_length'); 167s raise notice 'leaves narrow/no room for some Slony-I-generated objects (such as indexes).'; 167s raise notice 'You may run into problems later!'; 167s end if; 167s 167s -- 167s -- Put the apply trigger onto sl_log_1 and sl_log_2 167s -- 167s create trigger apply_trigger 167s before INSERT on public.sl_log_1 167s for each row execute procedure public.logApply('_main'); 167s alter table public.sl_log_1 167s enable replica trigger apply_trigger; 167s create trigger apply_trigger 167s before INSERT on public.sl_log_2 167s for each row execute procedure public.logApply('_main'); 167s alter table public.sl_log_2 167s enable replica trigger apply_trigger; 167s 167s return p_local_node_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.initializeLocalNode (p_local_node_id int4, p_comment text) is 167s 'no_id - Node ID # 167s no_comment - Human-oriented comment 167s 167s Initializes the new node, no_id'; 167s COMMENT 167s create or replace function public.storeNode (p_no_id int4, p_no_comment text) 167s returns bigint 167s as $$ 167s begin 167s perform public.storeNode_int (p_no_id, p_no_comment); 167s return public.createEvent('_main', 'STORE_NODE', 167s p_no_id::text, p_no_comment::text); 167s end; 167s $$ language plpgsql 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.storeNode(p_no_id int4, p_no_comment text) is 167s 'no_id - Node ID # 167s no_comment - Human-oriented comment 167s 167s Generate the STORE_NODE event for node no_id'; 167s COMMENT 167s create or replace function public.storeNode_int (p_no_id int4, p_no_comment text) 167s returns int4 167s as $$ 167s declare 167s v_old_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check if the node exists 167s -- ---- 167s select * into v_old_row 167s from public.sl_node 167s where no_id = p_no_id 167s for update; 167s if found then 167s -- ---- 167s -- Node exists, update the existing row. 167s -- ---- 167s update public.sl_node 167s set no_comment = p_no_comment 167s where no_id = p_no_id; 167s else 167s -- ---- 167s -- New node, insert the sl_node row 167s -- ---- 167s insert into public.sl_node 167s (no_id, no_active, no_comment,no_failed) values 167s (p_no_id, 'f', p_no_comment,false); 167s end if; 167s 167s return p_no_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.storeNode_int(p_no_id int4, p_no_comment text) is 167s 'no_id - Node ID # 167s no_comment - Human-oriented comment 167s 167s Internal function to process the STORE_NODE event for node no_id'; 167s COMMENT 167s create or replace function public.enableNode (p_no_id int4) 167s returns bigint 167s as $$ 167s declare 167s v_local_node_id int4; 167s v_node_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that we are the node to activate and that we are 167s -- currently disabled. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select * into v_node_row 167s from public.sl_node 167s where no_id = p_no_id 167s for update; 167s if not found then 167s raise exception 'Slony-I: node % not found', p_no_id; 167s end if; 167s if v_node_row.no_active then 167s raise exception 'Slony-I: node % is already active', p_no_id; 167s end if; 167s 167s -- ---- 167s -- Activate this node and generate the ENABLE_NODE event 167s -- ---- 167s perform public.enableNode_int (p_no_id); 167s return public.createEvent('_main', 'ENABLE_NODE', 167s p_no_id::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.enableNode(p_no_id int4) is 167s 'no_id - Node ID # 167s 167s Generate the ENABLE_NODE event for node no_id'; 167s COMMENT 167s create or replace function public.enableNode_int (p_no_id int4) 167s returns int4 167s as $$ 167s declare 167s v_local_node_id int4; 167s v_node_row record; 167s v_sub_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that the node is inactive 167s -- ---- 167s select * into v_node_row 167s from public.sl_node 167s where no_id = p_no_id 167s for update; 167s if not found then 167s raise exception 'Slony-I: node % not found', p_no_id; 167s end if; 167s if v_node_row.no_active then 167s return p_no_id; 167s end if; 167s 167s -- ---- 167s -- Activate the node and generate sl_confirm status rows for it. 167s -- ---- 167s update public.sl_node 167s set no_active = 't' 167s where no_id = p_no_id; 167s insert into public.sl_confirm 167s (con_origin, con_received, con_seqno) 167s select no_id, p_no_id, 0 from public.sl_node 167s where no_id != p_no_id 167s and no_active; 167s insert into public.sl_confirm 167s (con_origin, con_received, con_seqno) 167s select p_no_id, no_id, 0 from public.sl_node 167s where no_id != p_no_id 167s and no_active; 167s 167s -- ---- 167s -- Generate ENABLE_SUBSCRIPTION events for all sets that 167s -- origin here and are subscribed by the just enabled node. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s for v_sub_row in select SUB.sub_set, SUB.sub_provider from 167s public.sl_set S, 167s public.sl_subscribe SUB 167s where S.set_origin = v_local_node_id 167s and S.set_id = SUB.sub_set 167s and SUB.sub_receiver = p_no_id 167s for update of S 167s loop 167s perform public.enableSubscription (v_sub_row.sub_set, 167s v_sub_row.sub_provider, p_no_id); 167s end loop; 167s 167s return p_no_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.enableNode_int(p_no_id int4) is 167s 'no_id - Node ID # 167s 167s Internal function to process the ENABLE_NODE event for node no_id'; 167s COMMENT 167s create or replace function public.disableNode (p_no_id int4) 167s returns bigint 167s as $$ 167s begin 167s -- **** TODO **** 167s raise exception 'Slony-I: disableNode() not implemented'; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.disableNode(p_no_id int4) is 167s 'generate DISABLE_NODE event for node no_id'; 167s COMMENT 167s create or replace function public.disableNode_int (p_no_id int4) 167s returns int4 167s as $$ 167s begin 167s -- **** TODO **** 167s raise exception 'Slony-I: disableNode_int() not implemented'; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.disableNode(p_no_id int4) is 167s 'process DISABLE_NODE event for node no_id 167s 167s NOTE: This is not yet implemented!'; 167s COMMENT 167s create or replace function public.dropNode (p_no_ids int4[]) 167s returns bigint 167s as $$ 167s declare 167s v_node_row record; 167s v_idx integer; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that this got called on a different node 167s -- ---- 167s if public.getLocalNodeId('_main') = ANY (p_no_ids) then 167s raise exception 'Slony-I: DROP_NODE cannot initiate on the dropped node'; 167s end if; 167s 167s -- 167s -- if any of the deleted nodes are receivers we drop the sl_subscribe line 167s -- 167s delete from public.sl_subscribe where sub_receiver = ANY (p_no_ids); 167s 167s v_idx:=1; 167s LOOP 167s EXIT WHEN v_idx>array_upper(p_no_ids,1) ; 167s select * into v_node_row from public.sl_node 167s where no_id = p_no_ids[v_idx] 167s for update; 167s if not found then 167s raise exception 'Slony-I: unknown node ID % %', p_no_ids[v_idx],v_idx; 167s end if; 167s -- ---- 167s -- Make sure we do not break other nodes subscriptions with this 167s -- ---- 167s if exists (select true from public.sl_subscribe 167s where sub_provider = p_no_ids[v_idx]) 167s then 167s raise exception 'Slony-I: Node % is still configured as a data provider', 167s p_no_ids[v_idx]; 167s end if; 167s 167s -- ---- 167s -- Make sure no set originates there any more 167s -- ---- 167s if exists (select true from public.sl_set 167s where set_origin = p_no_ids[v_idx]) 167s then 167s raise exception 'Slony-I: Node % is still origin of one or more sets', 167s p_no_ids[v_idx]; 167s end if; 167s 167s -- ---- 167s -- Call the internal drop functionality and generate the event 167s -- ---- 167s perform public.dropNode_int(p_no_ids[v_idx]); 167s v_idx:=v_idx+1; 167s END LOOP; 167s return public.createEvent('_main', 'DROP_NODE', 167s array_to_string(p_no_ids,',')); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropNode(p_no_ids int4[]) is 167s 'generate DROP_NODE event to drop node node_id from replication'; 167s COMMENT 167s create or replace function public.dropNode_int (p_no_id int4) 167s returns int4 167s as $$ 167s declare 167s v_tab_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- If the dropped node is a remote node, clean the configuration 167s -- from all traces for it. 167s -- ---- 167s if p_no_id <> public.getLocalNodeId('_main') then 167s delete from public.sl_subscribe 167s where sub_receiver = p_no_id; 167s delete from public.sl_listen 167s where li_origin = p_no_id 167s or li_provider = p_no_id 167s or li_receiver = p_no_id; 167s delete from public.sl_path 167s where pa_server = p_no_id 167s or pa_client = p_no_id; 167s delete from public.sl_confirm 167s where con_origin = p_no_id 167s or con_received = p_no_id; 167s delete from public.sl_event 167s where ev_origin = p_no_id; 167s delete from public.sl_node 167s where no_id = p_no_id; 167s 167s return p_no_id; 167s end if; 167s 167s -- ---- 167s -- This is us ... deactivate the node for now, the daemon 167s -- will call uninstallNode() in a separate transaction. 167s -- ---- 167s update public.sl_node 167s set no_active = false 167s where no_id = p_no_id; 167s 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s return p_no_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropNode_int(p_no_id int4) is 167s 'internal function to process DROP_NODE event to drop node node_id from replication'; 167s COMMENT 167s create or replace function public.preFailover(p_failed_node int4,p_is_candidate boolean) 167s returns int4 167s as $$ 167s declare 167s v_row record; 167s v_row2 record; 167s v_n int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- All consistency checks first 167s 167s if p_is_candidate then 167s -- ---- 167s -- Check all sets originating on the failed node 167s -- ---- 167s for v_row in select set_id 167s from public.sl_set 167s where set_origin = p_failed_node 167s loop 167s -- ---- 167s -- Check that the backup node is subscribed to all sets 167s -- that originate on the failed node 167s -- ---- 167s select into v_row2 sub_forward, sub_active 167s from public.sl_subscribe 167s where sub_set = v_row.set_id 167s and sub_receiver = public.getLocalNodeId('_main'); 167s if not found then 167s raise exception 'Slony-I: cannot failover - node % is not subscribed to set %', 167s public.getLocalNodeId('_main'), v_row.set_id; 167s end if; 167s 167s -- ---- 167s -- Check that the subscription is active 167s -- ---- 167s if not v_row2.sub_active then 167s raise exception 'Slony-I: cannot failover - subscription for set % is not active', 167s v_row.set_id; 167s end if; 167s 167s -- ---- 167s -- If there are other subscribers, the backup node needs to 167s -- be a forwarder too. 167s -- ---- 167s select into v_n count(*) 167s from public.sl_subscribe 167s where sub_set = v_row.set_id 167s and sub_receiver <> public.getLocalNodeId('_main'); 167s if v_n > 0 and not v_row2.sub_forward then 167s raise exception 'Slony-I: cannot failover - node % is not a forwarder of set %', 167s public.getLocalNodeId('_main'), v_row.set_id; 167s end if; 167s end loop; 167s end if; 167s 167s -- ---- 167s -- Terminate all connections of the failed node the hard way 167s -- ---- 167s perform public.terminateNodeConnections(p_failed_node); 167s 167s update public.sl_path set pa_conninfo='' WHERE 167s pa_server=p_failed_node; 167s notify "_main_Restart"; 167s -- ---- 167s -- That is it - so far. 167s -- ---- 167s return p_failed_node; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.preFailover(p_failed_node int4,is_failover_candidate boolean) is 167s 'Prepare for a failover. This function is called on all candidate nodes. 167s It blanks the paths to the failed node 167s and then restart of all node daemons.'; 167s COMMENT 167s create or replace function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) 167s returns int4 167s as $$ 167s declare 167s v_row record; 167s v_row2 record; 167s v_failed boolean; 167s v_restart_required boolean; 167s begin 167s 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s v_restart_required:=false; 167s -- 167s -- any nodes other than the backup receiving 167s -- ANY subscription from a failed node 167s -- will now get that data from the backup node. 167s update public.sl_subscribe set 167s sub_provider=p_backup_node 167s where sub_provider=p_failed_node 167s and sub_receiver<>p_backup_node 167s and sub_receiver <> ALL (p_failed_nodes); 167s if found then 167s v_restart_required:=true; 167s end if; 167s -- 167s -- if this node is receiving a subscription from the backup node 167s -- with a failed node as the provider we need to fix this. 167s update public.sl_subscribe set 167s sub_provider=p_backup_node 167s from public.sl_set 167s where set_id = sub_set 167s and set_origin=p_failed_node 167s and sub_provider = ANY(p_failed_nodes) 167s and sub_receiver=public.getLocalNodeId('_main'); 167s 167s -- ---- 167s -- Terminate all connections of the failed node the hard way 167s -- ---- 167s perform public.terminateNodeConnections(p_failed_node); 167s 167s -- Clear out the paths for the failed node. 167s -- This ensures that *this* node won't be pulling data from 167s -- the failed node even if it *does* become accessible 167s 167s update public.sl_path set pa_conninfo='' WHERE 167s pa_server=p_failed_node 167s and pa_conninfo<>''; 167s 167s if found then 167s v_restart_required:=true; 167s end if; 167s 167s v_failed := exists (select 1 from public.sl_node 167s where no_failed=true and no_id=p_failed_node); 167s 167s if not v_failed then 167s 167s update public.sl_node set no_failed=true where no_id = ANY (p_failed_nodes) 167s and no_failed=false; 167s if found then 167s v_restart_required:=true; 167s end if; 167s end if; 167s 167s if v_restart_required then 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s -- ---- 167s -- Make sure the node daemon will restart 167s -- ---- 167s notify "_main_Restart"; 167s end if; 167s 167s 167s -- ---- 167s -- That is it - so far. 167s -- ---- 167s return p_failed_node; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) is 167s 'Initiate failover from failed_node to backup_node. This function must be called on all nodes, 167s and then waited for the restart of all node daemons.'; 167s COMMENT 167s create or replace function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8, p_failed_nodes integer[]) 167s returns bigint 167s as $$ 167s declare 167s v_row record; 167s v_new_event bigint; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s select * into v_row 167s from public.sl_event 167s where ev_origin = p_failed_node 167s and ev_seqno = p_ev_seqno; 167s if not found then 167s raise exception 'Slony-I: event %,% not found', 167s p_failed_node, p_ev_seqno; 167s end if; 167s 167s update public.sl_node set no_failed=true where no_id = ANY 167s (p_failed_nodes) and no_failed=false; 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s -- ---- 167s -- Make sure the node daemon will restart 167s -- ---- 167s raise notice 'calling restart node %',p_failed_node; 167s 167s notify "_main_Restart"; 167s 167s select public.createEvent('_main','FAILOVER_NODE', 167s p_failed_node::text,p_ev_seqno::text, 167s array_to_string(p_failed_nodes,',')) 167s into v_new_event; 167s 167s 167s return v_new_event; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8,p_failed_nodes integer[] ) is 167s 'FUNCTION failedNode2 (failed_node, backup_node, set_id, ev_seqno, ev_seqfake,p_failed_nodes) 167s 167s On the node that has the highest sequence number of the failed node, 167s fake the FAILOVER_SET event.'; 167s COMMENT 167s create or replace function public.failedNode3 (p_failed_node int4, p_backup_node int4,p_seq_no bigint) 167s returns int4 167s as $$ 167s declare 167s 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s perform public.failoverSet_int(p_failed_node, 167s p_backup_node,p_seq_no); 167s 167s notify "_main_Restart"; 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s create or replace function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_last_seqno bigint) 167s returns int4 167s as $$ 167s declare 167s v_row record; 167s v_last_sync int8; 167s v_set int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s SELECT max(ev_seqno) into v_last_sync FROM public.sl_event where 167s ev_origin=p_failed_node; 167s if v_last_sync > p_last_seqno then 167s -- this node is ahead of the last sequence number from the 167s -- failed node that the backup node has. 167s -- this node must unsubscribe from all sets from the origin. 167s for v_set in select set_id from public.sl_set where 167s set_origin=p_failed_node 167s loop 167s raise warning 'Slony is dropping the subscription of set % found sync %s bigger than %s ' 167s , v_set, v_last_sync::text, p_last_seqno::text; 167s perform public.unsubscribeSet(v_set, 167s public.getLocalNodeId('_main'), 167s true); 167s end loop; 167s delete from public.sl_event where ev_origin=p_failed_node 167s and ev_seqno > p_last_seqno; 167s end if; 167s -- ---- 167s -- Change the origin of the set now to the backup node. 167s -- On the backup node this includes changing all the 167s -- trigger and protection stuff 167s for v_set in select set_id from public.sl_set where 167s set_origin=p_failed_node 167s loop 167s -- ---- 167s if p_backup_node = public.getLocalNodeId('_main') then 167s delete from public.sl_setsync 167s where ssy_setid = v_set; 167s delete from public.sl_subscribe 167s where sub_set = v_set 167s and sub_receiver = p_backup_node; 167s update public.sl_set 167s set set_origin = p_backup_node 167s where set_id = v_set; 167s update public.sl_subscribe 167s set sub_provider=p_backup_node 167s FROM public.sl_node receive_node 167s where sub_set = v_set 167s and sub_provider=p_failed_node 167s and sub_receiver=receive_node.no_id 167s and receive_node.no_failed=false; 167s 167s for v_row in select * from public.sl_table 167s where tab_set = v_set 167s order by tab_id 167s loop 167s perform public.alterTableConfigureTriggers(v_row.tab_id); 167s end loop; 167s else 167s raise notice 'deleting from sl_subscribe all rows with receiver %', 167s p_backup_node; 167s 167s delete from public.sl_subscribe 167s where sub_set = v_set 167s and sub_receiver = p_backup_node; 167s 167s update public.sl_subscribe 167s set sub_provider=p_backup_node 167s FROM public.sl_node receive_node 167s where sub_set = v_set 167s and sub_provider=p_failed_node 167s and sub_provider=p_failed_node 167s and sub_receiver=receive_node.no_id 167s and receive_node.no_failed=false; 167s update public.sl_set 167s set set_origin = p_backup_node 167s where set_id = v_set; 167s -- ---- 167s -- If we are a subscriber of the set ourself, change our 167s -- setsync status to reflect the new set origin. 167s -- ---- 167s if exists (select true from public.sl_subscribe 167s where sub_set = v_set 167s and sub_receiver = public.getLocalNodeId( 167s '_main')) 167s then 167s delete from public.sl_setsync 167s where ssy_setid = v_set; 167s 167s select coalesce(max(ev_seqno), 0) into v_last_sync 167s from public.sl_event 167s where ev_origin = p_backup_node 167s and ev_type = 'SYNC'; 167s if v_last_sync > 0 then 167s insert into public.sl_setsync 167s (ssy_setid, ssy_origin, ssy_seqno, 167s ssy_snapshot, ssy_action_list) 167s select v_set, p_backup_node, v_last_sync, 167s ev_snapshot, NULL 167s from public.sl_event 167s where ev_origin = p_backup_node 167s and ev_seqno = v_last_sync; 167s else 167s insert into public.sl_setsync 167s (ssy_setid, ssy_origin, ssy_seqno, 167s ssy_snapshot, ssy_action_list) 167s values (v_set, p_backup_node, '0', 167s '1:1:', NULL); 167s end if; 167s end if; 167s end if; 167s end loop; 167s 167s --If there are any subscriptions with 167s --the failed_node being the provider then 167s --we want to redirect those subscriptions 167s --to come from the backup node. 167s -- 167s -- The backup node should be a valid 167s -- provider for all subscriptions served 167s -- by the failed node. (otherwise it 167s -- wouldn't be a allowable backup node). 167s -- delete from public.sl_subscribe 167s -- where sub_receiver=p_backup_node; 167s 167s update public.sl_subscribe 167s set sub_provider=p_backup_node 167s from public.sl_node 167s where sub_provider=p_failed_node 167s and sl_node.no_id=sub_receiver 167s and sl_node.no_failed=false 167s and sub_receiver<>p_backup_node; 167s 167s update public.sl_subscribe 167s set sub_provider=(select set_origin from 167s public.sl_set where set_id= 167s sub_set) 167s where sub_provider=p_failed_node 167s and sub_receiver=p_backup_node; 167s 167s update public.sl_node 167s set no_active=false WHERE 167s no_id=p_failed_node; 167s 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s 167s return p_failed_node; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_seqno bigint) is 167s 'FUNCTION failoverSet_int (failed_node, backup_node, set_id, wait_seqno) 167s 167s Finish failover for one set.'; 167s COMMENT 167s create or replace function public.uninstallNode () 167s returns int4 167s as $$ 167s declare 167s v_tab_row record; 167s begin 167s raise notice 'Slony-I: Please drop schema "_main"'; 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.uninstallNode() is 167s 'Reset the whole database to standalone by removing the whole 167s replication system.'; 167s COMMENT 167s DROP FUNCTION IF EXISTS public.cloneNodePrepare(int4,int4,text); 167s DROP FUNCTION 167s create or replace function public.cloneNodePrepare (p_no_id int4, p_no_provider int4, p_no_comment text) 167s returns bigint 167s as $$ 167s begin 167s perform public.cloneNodePrepare_int (p_no_id, p_no_provider, p_no_comment); 167s return public.createEvent('_main', 'CLONE_NODE', 167s p_no_id::text, p_no_provider::text, 167s p_no_comment::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.cloneNodePrepare(p_no_id int4, p_no_provider int4, p_no_comment text) is 167s 'Prepare for cloning a node.'; 167s COMMENT 167s create or replace function public.cloneNodePrepare_int (p_no_id int4, p_no_provider int4, p_no_comment text) 167s returns int4 167s as $$ 167s declare 167s v_dummy int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s update public.sl_node set 167s no_active = np.no_active, 167s no_comment = np.no_comment, 167s no_failed = np.no_failed 167s from public.sl_node np 167s where np.no_id = p_no_provider 167s and sl_node.no_id = p_no_id; 167s if not found then 167s insert into public.sl_node 167s (no_id, no_active, no_comment,no_failed) 167s select p_no_id, no_active, p_no_comment, no_failed 167s from public.sl_node 167s where no_id = p_no_provider; 167s end if; 167s 167s insert into public.sl_path 167s (pa_server, pa_client, pa_conninfo, pa_connretry) 167s select pa_server, p_no_id, '', pa_connretry 167s from public.sl_path 167s where pa_client = p_no_provider 167s and (pa_server, p_no_id) not in (select pa_server, pa_client 167s from public.sl_path); 167s 167s insert into public.sl_path 167s (pa_server, pa_client, pa_conninfo, pa_connretry) 167s select p_no_id, pa_client, '', pa_connretry 167s from public.sl_path 167s where pa_server = p_no_provider 167s and (p_no_id, pa_client) not in (select pa_server, pa_client 167s from public.sl_path); 167s 167s insert into public.sl_subscribe 167s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 167s select sub_set, sub_provider, p_no_id, sub_forward, sub_active 167s from public.sl_subscribe 167s where sub_receiver = p_no_provider; 167s 167s insert into public.sl_confirm 167s (con_origin, con_received, con_seqno, con_timestamp) 167s select con_origin, p_no_id, con_seqno, con_timestamp 167s from public.sl_confirm 167s where con_received = p_no_provider; 167s 167s perform public.RebuildListenEntries(); 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.cloneNodePrepare_int(p_no_id int4, p_no_provider int4, p_no_comment text) is 167s 'Internal part of cloneNodePrepare().'; 167s COMMENT 167s create or replace function public.cloneNodeFinish (p_no_id int4, p_no_provider int4) 167s returns int4 167s as $$ 167s declare 167s v_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s perform "pg_catalog".setval('public.sl_local_node_id', p_no_id); 167s perform public.resetSession(); 167s for v_row in select sub_set from public.sl_subscribe 167s where sub_receiver = p_no_id 167s loop 167s perform public.updateReloid(v_row.sub_set, p_no_id); 167s end loop; 167s 167s perform public.RebuildListenEntries(); 167s 167s delete from public.sl_confirm 167s where con_received = p_no_id; 167s insert into public.sl_confirm 167s (con_origin, con_received, con_seqno, con_timestamp) 167s select con_origin, p_no_id, con_seqno, con_timestamp 167s from public.sl_confirm 167s where con_received = p_no_provider; 167s insert into public.sl_confirm 167s (con_origin, con_received, con_seqno, con_timestamp) 167s select p_no_provider, p_no_id, 167s (select max(ev_seqno) from public.sl_event 167s where ev_origin = p_no_provider), current_timestamp; 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.cloneNodeFinish(p_no_id int4, p_no_provider int4) is 167s 'Internal part of cloneNodePrepare().'; 167s COMMENT 167s create or replace function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 167s returns bigint 167s as $$ 167s begin 167s perform public.storePath_int(p_pa_server, p_pa_client, 167s p_pa_conninfo, p_pa_connretry); 167s return public.createEvent('_main', 'STORE_PATH', 167s p_pa_server::text, p_pa_client::text, 167s p_pa_conninfo::text, p_pa_connretry::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 167s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 167s 167s Generate the STORE_PATH event indicating that node pa_client can 167s access node pa_server using DSN pa_conninfo'; 167s COMMENT 167s create or replace function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 167s returns int4 167s as $$ 167s declare 167s v_dummy int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check if the path already exists 167s -- ---- 167s select 1 into v_dummy 167s from public.sl_path 167s where pa_server = p_pa_server 167s and pa_client = p_pa_client 167s for update; 167s if found then 167s -- ---- 167s -- Path exists, update pa_conninfo 167s -- ---- 167s update public.sl_path 167s set pa_conninfo = p_pa_conninfo, 167s pa_connretry = p_pa_connretry 167s where pa_server = p_pa_server 167s and pa_client = p_pa_client; 167s else 167s -- ---- 167s -- New path 167s -- 167s -- In case we receive STORE_PATH events before we know 167s -- about the nodes involved in this, we generate those nodes 167s -- as pending. 167s -- ---- 167s if not exists (select 1 from public.sl_node 167s where no_id = p_pa_server) then 167s perform public.storeNode_int (p_pa_server, ''); 167s end if; 167s if not exists (select 1 from public.sl_node 167s where no_id = p_pa_client) then 167s perform public.storeNode_int (p_pa_client, ''); 167s end if; 167s insert into public.sl_path 167s (pa_server, pa_client, pa_conninfo, pa_connretry) values 167s (p_pa_server, p_pa_client, p_pa_conninfo, p_pa_connretry); 167s end if; 167s 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 167s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 167s 167s Process the STORE_PATH event indicating that node pa_client can 167s access node pa_server using DSN pa_conninfo'; 167s COMMENT 167s create or replace function public.dropPath (p_pa_server int4, p_pa_client int4) 167s returns bigint 167s as $$ 167s declare 167s v_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- There should be no existing subscriptions. Auto unsubscribing 167s -- is considered too dangerous. 167s -- ---- 167s for v_row in select sub_set, sub_provider, sub_receiver 167s from public.sl_subscribe 167s where sub_provider = p_pa_server 167s and sub_receiver = p_pa_client 167s loop 167s raise exception 167s 'Slony-I: Path cannot be dropped, subscription of set % needs it', 167s v_row.sub_set; 167s end loop; 167s 167s -- ---- 167s -- Drop all sl_listen entries that depend on this path 167s -- ---- 167s for v_row in select li_origin, li_provider, li_receiver 167s from public.sl_listen 167s where li_provider = p_pa_server 167s and li_receiver = p_pa_client 167s loop 167s perform public.dropListen( 167s v_row.li_origin, v_row.li_provider, v_row.li_receiver); 167s end loop; 167s 167s -- ---- 167s -- Now drop the path and create the event 167s -- ---- 167s perform public.dropPath_int(p_pa_server, p_pa_client); 167s 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s return public.createEvent ('_main', 'DROP_PATH', 167s p_pa_server::text, p_pa_client::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropPath (p_pa_server int4, p_pa_client int4) is 167s 'Generate DROP_PATH event to drop path from pa_server to pa_client'; 167s COMMENT 167s create or replace function public.dropPath_int (p_pa_server int4, p_pa_client int4) 167s returns int4 167s as $$ 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Remove any dangling sl_listen entries with the server 167s -- as provider and the client as receiver. This must have 167s -- been cleared out before, but obviously was not. 167s -- ---- 167s delete from public.sl_listen 167s where li_provider = p_pa_server 167s and li_receiver = p_pa_client; 167s 167s delete from public.sl_path 167s where pa_server = p_pa_server 167s and pa_client = p_pa_client; 167s 167s if found then 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s return 1; 167s else 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s return 0; 167s end if; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropPath_int (p_pa_server int4, p_pa_client int4) is 167s 'Process DROP_PATH event to drop path from pa_server to pa_client'; 167s COMMENT 167s create or replace function public.storeListen (p_origin int4, p_provider int4, p_receiver int4) 167s returns bigint 167s as $$ 167s begin 167s perform public.storeListen_int (p_origin, p_provider, p_receiver); 167s return public.createEvent ('_main', 'STORE_LISTEN', 167s p_origin::text, p_provider::text, p_receiver::text); 167s end; 167s $$ language plpgsql 167s called on null input; 167s CREATE FUNCTION 167s comment on function public.storeListen(p_origin int4, p_provider int4, p_receiver int4) is 167s 'FUNCTION storeListen (li_origin, li_provider, li_receiver) 167s 167s generate STORE_LISTEN event, indicating that receiver node li_receiver 167s listens to node li_provider in order to get messages coming from node 167s li_origin.'; 167s COMMENT 167s create or replace function public.storeListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 167s returns int4 167s as $$ 167s declare 167s v_exists int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s select 1 into v_exists 167s from public.sl_listen 167s where li_origin = p_li_origin 167s and li_provider = p_li_provider 167s and li_receiver = p_li_receiver; 167s if not found then 167s -- ---- 167s -- In case we receive STORE_LISTEN events before we know 167s -- about the nodes involved in this, we generate those nodes 167s -- as pending. 167s -- ---- 167s if not exists (select 1 from public.sl_node 167s where no_id = p_li_origin) then 167s perform public.storeNode_int (p_li_origin, ''); 167s end if; 167s if not exists (select 1 from public.sl_node 167s where no_id = p_li_provider) then 167s perform public.storeNode_int (p_li_provider, ''); 167s end if; 167s if not exists (select 1 from public.sl_node 167s where no_id = p_li_receiver) then 167s perform public.storeNode_int (p_li_receiver, ''); 167s end if; 167s 167s insert into public.sl_listen 167s (li_origin, li_provider, li_receiver) values 167s (p_li_origin, p_li_provider, p_li_receiver); 167s end if; 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.storeListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 167s 'FUNCTION storeListen_int (li_origin, li_provider, li_receiver) 167s 167s Process STORE_LISTEN event, indicating that receiver node li_receiver 167s listens to node li_provider in order to get messages coming from node 167s li_origin.'; 167s COMMENT 167s create or replace function public.dropListen (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 167s returns bigint 167s as $$ 167s begin 167s perform public.dropListen_int(p_li_origin, 167s p_li_provider, p_li_receiver); 167s 167s return public.createEvent ('_main', 'DROP_LISTEN', 167s p_li_origin::text, p_li_provider::text, p_li_receiver::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropListen(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 167s 'dropListen (li_origin, li_provider, li_receiver) 167s 167s Generate the DROP_LISTEN event.'; 167s COMMENT 167s create or replace function public.dropListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 167s returns int4 167s as $$ 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s delete from public.sl_listen 167s where li_origin = p_li_origin 167s and li_provider = p_li_provider 167s and li_receiver = p_li_receiver; 167s if found then 167s return 1; 167s else 167s return 0; 167s end if; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 167s 'dropListen (li_origin, li_provider, li_receiver) 167s 167s Process the DROP_LISTEN event, deleting the sl_listen entry for 167s the indicated (origin,provider,receiver) combination.'; 167s COMMENT 167s create or replace function public.storeSet (p_set_id int4, p_set_comment text) 167s returns bigint 167s as $$ 167s declare 167s v_local_node_id int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s v_local_node_id := public.getLocalNodeId('_main'); 167s 167s insert into public.sl_set 167s (set_id, set_origin, set_comment) values 167s (p_set_id, v_local_node_id, p_set_comment); 167s 167s return public.createEvent('_main', 'STORE_SET', 167s p_set_id::text, v_local_node_id::text, p_set_comment::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.storeSet(p_set_id int4, p_set_comment text) is 167s 'Generate STORE_SET event for set set_id with human readable comment set_comment'; 167s COMMENT 167s create or replace function public.storeSet_int (p_set_id int4, p_set_origin int4, p_set_comment text) 167s returns int4 167s as $$ 167s declare 167s v_dummy int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s select 1 into v_dummy 167s from public.sl_set 167s where set_id = p_set_id 167s for update; 167s if found then 167s update public.sl_set 167s set set_comment = p_set_comment 167s where set_id = p_set_id; 167s else 167s if not exists (select 1 from public.sl_node 167s where no_id = p_set_origin) then 167s perform public.storeNode_int (p_set_origin, ''); 167s end if; 167s insert into public.sl_set 167s (set_id, set_origin, set_comment) values 167s (p_set_id, p_set_origin, p_set_comment); 167s end if; 167s 167s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 167s perform public.addPartialLogIndices(); 167s 167s return p_set_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.storeSet_int(p_set_id int4, p_set_origin int4, p_set_comment text) is 167s 'storeSet_int (set_id, set_origin, set_comment) 167s 167s Process the STORE_SET event, indicating the new set with given ID, 167s origin node, and human readable comment.'; 167s COMMENT 167s NOTICE: function public.clonenodeprepare(int4,int4,text) does not exist, skipping 167s NOTICE: function public.ddlcapture(text,text) does not exist, skipping 167s NOTICE: function public.ddlscript_complete(int4,text,int4) does not exist, skipping 167s NOTICE: function public.ddlscript_complete_int(int4,int4) does not exist, skipping 167s NOTICE: function public.subscribeset_int(int4,int4,int4,bool,bool) does not exist, skipping 167s NOTICE: function public.unsubscribeset(int4,int4,pg_catalog.bool) does not exist, skipping 167s create or replace function public.lockSet (p_set_id int4) 167s returns int4 167s as $$ 167s declare 167s v_local_node_id int4; 167s v_set_row record; 167s v_tab_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that the set exists and that we are the origin 167s -- and that it is not already locked. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select * into v_set_row from public.sl_set 167s where set_id = p_set_id 167s for update; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_set_id; 167s end if; 167s if v_set_row.set_origin <> v_local_node_id then 167s raise exception 'Slony-I: set % does not originate on local node', 167s p_set_id; 167s end if; 167s if v_set_row.set_locked notnull then 167s raise exception 'Slony-I: set % is already locked', p_set_id; 167s end if; 167s 167s -- ---- 167s -- Place the lockedSet trigger on all tables in the set. 167s -- ---- 167s for v_tab_row in select T.tab_id, 167s public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) as tab_fqname 167s from public.sl_table T, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where T.tab_set = p_set_id 167s and T.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid 167s order by tab_id 167s loop 167s execute 'create trigger "_main_lockedset" ' || 167s 'before insert or update or delete on ' || 167s v_tab_row.tab_fqname || ' for each row execute procedure 167s public.lockedSet (''_main'');'; 167s end loop; 167s 167s -- ---- 167s -- Remember our snapshots xmax as for the set locking 167s -- ---- 167s update public.sl_set 167s set set_locked = "pg_catalog".txid_snapshot_xmax("pg_catalog".txid_current_snapshot()) 167s where set_id = p_set_id; 167s 167s return p_set_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.lockSet(p_set_id int4) is 167s 'lockSet(set_id) 167s 167s Add a special trigger to all tables of a set that disables access to 167s it.'; 167s COMMENT 167s create or replace function public.unlockSet (p_set_id int4) 167s returns int4 167s as $$ 167s declare 167s v_local_node_id int4; 167s v_set_row record; 167s v_tab_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that the set exists and that we are the origin 167s -- and that it is not already locked. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select * into v_set_row from public.sl_set 167s where set_id = p_set_id 167s for update; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_set_id; 167s end if; 167s if v_set_row.set_origin <> v_local_node_id then 167s raise exception 'Slony-I: set % does not originate on local node', 167s p_set_id; 167s end if; 167s if v_set_row.set_locked isnull then 167s raise exception 'Slony-I: set % is not locked', p_set_id; 167s end if; 167s 167s -- ---- 167s -- Drop the lockedSet trigger from all tables in the set. 167s -- ---- 167s for v_tab_row in select T.tab_id, 167s public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) as tab_fqname 167s from public.sl_table T, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where T.tab_set = p_set_id 167s and T.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid 167s order by tab_id 167s loop 167s execute 'drop trigger "_main_lockedset" ' || 167s 'on ' || v_tab_row.tab_fqname; 167s end loop; 167s 167s -- ---- 167s -- Clear out the set_locked field 167s -- ---- 167s update public.sl_set 167s set set_locked = NULL 167s where set_id = p_set_id; 167s 167s return p_set_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.unlockSet(p_set_id int4) is 167s 'Remove the special trigger from all tables of a set that disables access to it.'; 167s COMMENT 167s create or replace function public.moveSet (p_set_id int4, p_new_origin int4) 167s returns bigint 167s as $$ 167s declare 167s v_local_node_id int4; 167s v_set_row record; 167s v_sub_row record; 167s v_sync_seqno int8; 167s v_lv_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that the set is locked and that this locking 167s -- happened long enough ago. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select * into v_set_row from public.sl_set 167s where set_id = p_set_id 167s for update; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_set_id; 167s end if; 167s if v_set_row.set_origin <> v_local_node_id then 167s raise exception 'Slony-I: set % does not originate on local node', 167s p_set_id; 167s end if; 167s if v_set_row.set_locked isnull then 167s raise exception 'Slony-I: set % is not locked', p_set_id; 167s end if; 167s if v_set_row.set_locked > "pg_catalog".txid_snapshot_xmin("pg_catalog".txid_current_snapshot()) then 167s raise exception 'Slony-I: cannot move set % yet, transactions < % are still in progress', 167s p_set_id, v_set_row.set_locked; 167s end if; 167s 167s -- ---- 167s -- Unlock the set 167s -- ---- 167s perform public.unlockSet(p_set_id); 167s 167s -- ---- 167s -- Check that the new_origin is an active subscriber of the set 167s -- ---- 167s select * into v_sub_row from public.sl_subscribe 167s where sub_set = p_set_id 167s and sub_receiver = p_new_origin; 167s if not found then 167s raise exception 'Slony-I: set % is not subscribed by node %', 167s p_set_id, p_new_origin; 167s end if; 167s if not v_sub_row.sub_active then 167s raise exception 'Slony-I: subsctiption of node % for set % is inactive', 167s p_new_origin, p_set_id; 167s end if; 167s 167s -- ---- 167s -- Reconfigure everything 167s -- ---- 167s perform public.moveSet_int(p_set_id, v_local_node_id, 167s p_new_origin, 0); 167s 167s perform public.RebuildListenEntries(); 167s 167s -- ---- 167s -- At this time we hold access exclusive locks for every table 167s -- in the set. But we did move the set to the new origin, so the 167s -- createEvent() we are doing now will not record the sequences. 167s -- ---- 167s v_sync_seqno := public.createEvent('_main', 'SYNC'); 167s insert into public.sl_seqlog 167s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 167s select seq_id, v_local_node_id, v_sync_seqno, seq_last_value 167s from public.sl_seqlastvalue 167s where seq_set = p_set_id; 167s 167s -- ---- 167s -- Finally we generate the real event 167s -- ---- 167s return public.createEvent('_main', 'MOVE_SET', 167s p_set_id::text, v_local_node_id::text, p_new_origin::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.moveSet(p_set_id int4, p_new_origin int4) is 167s 'moveSet(set_id, new_origin) 167s 167s Generate MOVE_SET event to request that the origin for set set_id be moved to node new_origin'; 167s COMMENT 167s create or replace function public.moveSet_int (p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) 167s returns int4 167s as $$ 167s declare 167s v_local_node_id int4; 167s v_tab_row record; 167s v_sub_row record; 167s v_sub_node int4; 167s v_sub_last int4; 167s v_sub_next int4; 167s v_last_sync int8; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Get our local node ID 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s 167s -- On the new origin, raise an event - ACCEPT_SET 167s if v_local_node_id = p_new_origin then 167s -- Create a SYNC event as well so that the ACCEPT_SET has 167s -- the same snapshot as the last SYNC generated by the new 167s -- origin. This snapshot will be used by other nodes to 167s -- finalize the setsync status. 167s perform public.createEvent('_main', 'SYNC', NULL); 167s perform public.createEvent('_main', 'ACCEPT_SET', 167s p_set_id::text, p_old_origin::text, 167s p_new_origin::text, p_wait_seqno::text); 167s end if; 167s 167s -- ---- 167s -- Next we have to reverse the subscription path 167s -- ---- 167s v_sub_last = p_new_origin; 167s select sub_provider into v_sub_node 167s from public.sl_subscribe 167s where sub_set = p_set_id 167s and sub_receiver = p_new_origin; 167s if not found then 167s raise exception 'Slony-I: subscription path broken in moveSet_int'; 167s end if; 167s while v_sub_node <> p_old_origin loop 167s -- ---- 167s -- Tracing node by node, the old receiver is now in 167s -- v_sub_last and the old provider is in v_sub_node. 167s -- ---- 167s 167s -- ---- 167s -- Get the current provider of this node as next 167s -- and change the provider to the previous one in 167s -- the reverse chain. 167s -- ---- 167s select sub_provider into v_sub_next 167s from public.sl_subscribe 167s where sub_set = p_set_id 167s and sub_receiver = v_sub_node 167s for update; 167s if not found then 167s raise exception 'Slony-I: subscription path broken in moveSet_int'; 167s end if; 167s update public.sl_subscribe 167s set sub_provider = v_sub_last 167s where sub_set = p_set_id 167s and sub_receiver = v_sub_node 167s and sub_receiver <> v_sub_last; 167s 167s v_sub_last = v_sub_node; 167s v_sub_node = v_sub_next; 167s end loop; 167s 167s -- ---- 167s -- This includes creating a subscription for the old origin 167s -- ---- 167s insert into public.sl_subscribe 167s (sub_set, sub_provider, sub_receiver, 167s sub_forward, sub_active) 167s values (p_set_id, v_sub_last, p_old_origin, true, true); 167s if v_local_node_id = p_old_origin then 167s select coalesce(max(ev_seqno), 0) into v_last_sync 167s from public.sl_event 167s where ev_origin = p_new_origin 167s and ev_type = 'SYNC'; 167s if v_last_sync > 0 then 167s insert into public.sl_setsync 167s (ssy_setid, ssy_origin, ssy_seqno, 167s ssy_snapshot, ssy_action_list) 167s select p_set_id, p_new_origin, v_last_sync, 167s ev_snapshot, NULL 167s from public.sl_event 167s where ev_origin = p_new_origin 167s and ev_seqno = v_last_sync; 167s else 167s insert into public.sl_setsync 167s (ssy_setid, ssy_origin, ssy_seqno, 167s ssy_snapshot, ssy_action_list) 167s values (p_set_id, p_new_origin, '0', 167s '1:1:', NULL); 167s end if; 167s end if; 167s 167s -- ---- 167s -- Now change the ownership of the set. 167s -- ---- 167s update public.sl_set 167s set set_origin = p_new_origin 167s where set_id = p_set_id; 167s 167s -- ---- 167s -- On the new origin, delete the obsolete setsync information 167s -- and the subscription. 167s -- ---- 167s if v_local_node_id = p_new_origin then 167s delete from public.sl_setsync 167s where ssy_setid = p_set_id; 167s else 167s if v_local_node_id <> p_old_origin then 167s -- 167s -- On every other node, change the setsync so that it will 167s -- pick up from the new origins last known sync. 167s -- 167s delete from public.sl_setsync 167s where ssy_setid = p_set_id; 167s select coalesce(max(ev_seqno), 0) into v_last_sync 167s from public.sl_event 167s where ev_origin = p_new_origin 167s and ev_type = 'SYNC'; 167s if v_last_sync > 0 then 167s insert into public.sl_setsync 167s (ssy_setid, ssy_origin, ssy_seqno, 167s ssy_snapshot, ssy_action_list) 167s select p_set_id, p_new_origin, v_last_sync, 167s ev_snapshot, NULL 167s from public.sl_event 167s where ev_origin = p_new_origin 167s and ev_seqno = v_last_sync; 167s else 167s insert into public.sl_setsync 167s (ssy_setid, ssy_origin, ssy_seqno, 167s ssy_snapshot, ssy_action_list) 167s values (p_set_id, p_new_origin, 167s '0', '1:1:', NULL); 167s end if; 167s end if; 167s end if; 167s delete from public.sl_subscribe 167s where sub_set = p_set_id 167s and sub_receiver = p_new_origin; 167s 167s -- Regenerate sl_listen since we revised the subscriptions 167s perform public.RebuildListenEntries(); 167s 167s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 167s perform public.addPartialLogIndices(); 167s 167s -- ---- 167s -- If we are the new or old origin, we have to 167s -- adjust the log and deny access trigger configuration. 167s -- ---- 167s if v_local_node_id = p_old_origin or v_local_node_id = p_new_origin then 167s for v_tab_row in select tab_id from public.sl_table 167s where tab_set = p_set_id 167s order by tab_id 167s loop 167s perform public.alterTableConfigureTriggers(v_tab_row.tab_id); 167s end loop; 167s end if; 167s 167s return p_set_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.moveSet_int(p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) is 167s 'moveSet(set_id, old_origin, new_origin, wait_seqno) 167s 167s Process MOVE_SET event to request that the origin for set set_id be 167s moved from old_origin to node new_origin'; 167s COMMENT 167s create or replace function public.dropSet (p_set_id int4) 167s returns bigint 167s as $$ 167s declare 167s v_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that the set exists and originates here 167s -- ---- 167s select set_origin into v_origin from public.sl_set 167s where set_id = p_set_id; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_set_id; 167s end if; 167s if v_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: set % does not originate on local node', 167s p_set_id; 167s end if; 167s 167s -- ---- 167s -- Call the internal drop set functionality and generate the event 167s -- ---- 167s perform public.dropSet_int(p_set_id); 167s return public.createEvent('_main', 'DROP_SET', 167s p_set_id::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropSet(p_set_id int4) is 167s 'Generate DROP_SET event to drop replication of set set_id'; 167s COMMENT 167s create or replace function public.dropSet_int (p_set_id int4) 167s returns int4 167s as $$ 167s declare 167s v_tab_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Restore all tables original triggers and rules and remove 167s -- our replication stuff. 167s -- ---- 167s for v_tab_row in select tab_id from public.sl_table 167s where tab_set = p_set_id 167s order by tab_id 167s loop 167s perform public.alterTableDropTriggers(v_tab_row.tab_id); 167s end loop; 167s 167s -- ---- 167s -- Remove all traces of the set configuration 167s -- ---- 167s delete from public.sl_sequence 167s where seq_set = p_set_id; 167s delete from public.sl_table 167s where tab_set = p_set_id; 167s delete from public.sl_subscribe 167s where sub_set = p_set_id; 167s delete from public.sl_setsync 167s where ssy_setid = p_set_id; 167s delete from public.sl_set 167s where set_id = p_set_id; 167s 167s -- Regenerate sl_listen since we revised the subscriptions 167s perform public.RebuildListenEntries(); 167s 167s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 167s perform public.addPartialLogIndices(); 167s 167s return p_set_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.dropSet(p_set_id int4) is 167s 'Process DROP_SET event to drop replication of set set_id. This involves: 167s - Removing log and deny access triggers 167s - Removing all traces of the set configuration, including sequences, tables, subscribers, syncs, and the set itself'; 167s COMMENT 167s create or replace function public.mergeSet (p_set_id int4, p_add_id int4) 167s returns bigint 167s as $$ 167s declare 167s v_origin int4; 167s in_progress boolean; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that both sets exist and originate here 167s -- ---- 167s if p_set_id = p_add_id then 167s raise exception 'Slony-I: merged set ids cannot be identical'; 167s end if; 167s select set_origin into v_origin from public.sl_set 167s where set_id = p_set_id; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_set_id; 167s end if; 167s if v_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: set % does not originate on local node', 167s p_set_id; 167s end if; 167s 167s select set_origin into v_origin from public.sl_set 167s where set_id = p_add_id; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_add_id; 167s end if; 167s if v_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: set % does not originate on local node', 167s p_add_id; 167s end if; 167s 167s -- ---- 167s -- Check that both sets are subscribed by the same set of nodes 167s -- ---- 167s if exists (select true from public.sl_subscribe SUB1 167s where SUB1.sub_set = p_set_id 167s and SUB1.sub_receiver not in (select SUB2.sub_receiver 167s from public.sl_subscribe SUB2 167s where SUB2.sub_set = p_add_id)) 167s then 167s raise exception 'Slony-I: subscriber lists of set % and % are different', 167s p_set_id, p_add_id; 167s end if; 167s 167s if exists (select true from public.sl_subscribe SUB1 167s where SUB1.sub_set = p_add_id 167s and SUB1.sub_receiver not in (select SUB2.sub_receiver 167s from public.sl_subscribe SUB2 167s where SUB2.sub_set = p_set_id)) 167s then 167s raise exception 'Slony-I: subscriber lists of set % and % are different', 167s p_add_id, p_set_id; 167s end if; 167s 167s -- ---- 167s -- Check that all ENABLE_SUBSCRIPTION events for the set are confirmed 167s -- ---- 167s select public.isSubscriptionInProgress(p_add_id) into in_progress ; 167s 167s if in_progress then 167s raise exception 'Slony-I: set % has subscriptions in progress - cannot merge', 167s p_add_id; 167s end if; 167s 167s -- ---- 167s -- Create a SYNC event, merge the sets, create a MERGE_SET event 167s -- ---- 167s perform public.createEvent('_main', 'SYNC', NULL); 167s perform public.mergeSet_int(p_set_id, p_add_id); 167s return public.createEvent('_main', 'MERGE_SET', 167s p_set_id::text, p_add_id::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.mergeSet(p_set_id int4, p_add_id int4) is 167s 'Generate MERGE_SET event to request that sets be merged together. 167s 167s Both sets must exist, and originate on the same node. They must be 167s subscribed by the same set of nodes.'; 167s COMMENT 167s create or replace function public.isSubscriptionInProgress(p_add_id int4) 167s returns boolean 167s as $$ 167s begin 167s if exists (select true from public.sl_event 167s where ev_type = 'ENABLE_SUBSCRIPTION' 167s and ev_data1 = p_add_id::text 167s and ev_seqno > (select max(con_seqno) from public.sl_confirm 167s where con_origin = ev_origin 167s and con_received::text = ev_data3)) 167s then 167s return true; 167s else 167s return false; 167s end if; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.isSubscriptionInProgress(p_add_id int4) is 167s 'Checks to see if a subscription for the indicated set is in progress. 167s Returns true if a subscription is in progress. Otherwise false'; 167s COMMENT 167s create or replace function public.mergeSet_int (p_set_id int4, p_add_id int4) 167s returns int4 167s as $$ 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s update public.sl_sequence 167s set seq_set = p_set_id 167s where seq_set = p_add_id; 167s update public.sl_table 167s set tab_set = p_set_id 167s where tab_set = p_add_id; 167s delete from public.sl_subscribe 167s where sub_set = p_add_id; 167s delete from public.sl_setsync 167s where ssy_setid = p_add_id; 167s delete from public.sl_set 167s where set_id = p_add_id; 167s 167s return p_set_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.mergeSet_int(p_set_id int4, p_add_id int4) is 167s 'mergeSet_int(set_id, add_id) - Perform MERGE_SET event, merging all objects from 167s set add_id into set set_id.'; 167s COMMENT 167s create or replace function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 167s returns bigint 167s as $$ 167s declare 167s v_set_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that we are the origin of the set 167s -- ---- 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = p_set_id; 167s if not found then 167s raise exception 'Slony-I: setAddTable(): set % not found', p_set_id; 167s end if; 167s if v_set_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: setAddTable(): set % has remote origin', p_set_id; 167s end if; 167s 167s if exists (select true from public.sl_subscribe 167s where sub_set = p_set_id) 167s then 167s raise exception 'Slony-I: cannot add table to currently subscribed set % - must attach to an unsubscribed set', 167s p_set_id; 167s end if; 167s 167s -- ---- 167s -- Add the table to the set and generate the SET_ADD_TABLE event 167s -- ---- 167s perform public.setAddTable_int(p_set_id, p_tab_id, p_fqname, 167s p_tab_idxname, p_tab_comment); 167s return public.createEvent('_main', 'SET_ADD_TABLE', 167s p_set_id::text, p_tab_id::text, p_fqname::text, 167s p_tab_idxname::text, p_tab_comment::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 167s 'setAddTable (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 167s 167s Add table tab_fqname to replication set on origin node, and generate 167s SET_ADD_TABLE event to allow this to propagate to other nodes. 167s 167s Note that the table id, tab_id, must be unique ACROSS ALL SETS.'; 167s COMMENT 167s create or replace function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 167s returns int4 167s as $$ 167s declare 167s v_tab_relname name; 167s v_tab_nspname name; 167s v_local_node_id int4; 167s v_set_origin int4; 167s v_sub_provider int4; 167s v_relkind char; 167s v_tab_reloid oid; 167s v_pkcand_nn boolean; 167s v_prec record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- For sets with a remote origin, check that we are subscribed 167s -- to that set. Otherwise we ignore the table because it might 167s -- not even exist in our database. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = p_set_id; 167s if not found then 167s raise exception 'Slony-I: setAddTable_int(): set % not found', 167s p_set_id; 167s end if; 167s if v_set_origin != v_local_node_id then 167s select sub_provider into v_sub_provider 167s from public.sl_subscribe 167s where sub_set = p_set_id 167s and sub_receiver = public.getLocalNodeId('_main'); 167s if not found then 167s return 0; 167s end if; 167s end if; 167s 167s -- ---- 167s -- Get the tables OID and check that it is a real table 167s -- ---- 167s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname into v_tab_reloid, v_relkind, v_tab_relname, v_tab_nspname 167s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where PGC.relnamespace = PGN.oid 167s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 167s '.' || public.slon_quote_brute(PGC.relname); 167s if not found then 167s raise exception 'Slony-I: setAddTable_int(): table % not found', 167s p_fqname; 167s end if; 167s if v_relkind != 'r' then 167s raise exception 'Slony-I: setAddTable_int(): % is not a regular table', 167s p_fqname; 167s end if; 167s 167s if not exists (select indexrelid 167s from "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGC 167s where PGX.indrelid = v_tab_reloid 167s and PGX.indexrelid = PGC.oid 167s and PGC.relname = p_tab_idxname) 167s then 167s raise exception 'Slony-I: setAddTable_int(): table % has no index %', 167s p_fqname, p_tab_idxname; 167s end if; 167s 167s -- ---- 167s -- Verify that the columns in the PK (or candidate) are not NULLABLE 167s -- ---- 167s 167s v_pkcand_nn := 'f'; 167s for v_prec in select attname from "pg_catalog".pg_attribute where attrelid = 167s (select oid from "pg_catalog".pg_class where oid = v_tab_reloid) 167s and attname in (select attname from "pg_catalog".pg_attribute where 167s attrelid = (select oid from "pg_catalog".pg_class PGC, 167s "pg_catalog".pg_index PGX where 167s PGC.relname = p_tab_idxname and PGX.indexrelid=PGC.oid and 167s PGX.indrelid = v_tab_reloid)) and attnotnull <> 't' 167s loop 167s raise notice 'Slony-I: setAddTable_int: table % PK column % nullable', p_fqname, v_prec.attname; 167s v_pkcand_nn := 't'; 167s end loop; 167s if v_pkcand_nn then 167s raise exception 'Slony-I: setAddTable_int: table % not replicable!', p_fqname; 167s end if; 167s 167s select * into v_prec from public.sl_table where tab_id = p_tab_id; 167s if not found then 167s v_pkcand_nn := 't'; -- No-op -- All is well 167s else 167s raise exception 'Slony-I: setAddTable_int: table id % has already been assigned!', p_tab_id; 167s end if; 167s 167s -- ---- 167s -- Add the table to sl_table and create the trigger on it. 167s -- ---- 167s insert into public.sl_table 167s (tab_id, tab_reloid, tab_relname, tab_nspname, 167s tab_set, tab_idxname, tab_altered, tab_comment) 167s values 167s (p_tab_id, v_tab_reloid, v_tab_relname, v_tab_nspname, 167s p_set_id, p_tab_idxname, false, p_tab_comment); 167s perform public.alterTableAddTriggers(p_tab_id); 167s 167s return p_tab_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 167s 'setAddTable_int (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 167s 167s This function processes the SET_ADD_TABLE event on remote nodes, 167s adding a table to replication if the remote node is subscribing to its 167s replication set.'; 167s COMMENT 167s create or replace function public.setDropTable(p_tab_id int4) 167s returns bigint 167s as $$ 167s declare 167s v_set_id int4; 167s v_set_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Determine the set_id 167s -- ---- 167s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 167s 167s -- ---- 167s -- Ensure table exists 167s -- ---- 167s if not found then 167s raise exception 'Slony-I: setDropTable_int(): table % not found', 167s p_tab_id; 167s end if; 167s 167s -- ---- 167s -- Check that we are the origin of the set 167s -- ---- 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = v_set_id; 167s if not found then 167s raise exception 'Slony-I: setDropTable(): set % not found', v_set_id; 167s end if; 167s if v_set_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: setDropTable(): set % has remote origin', v_set_id; 167s end if; 167s 167s -- ---- 167s -- Drop the table from the set and generate the SET_ADD_TABLE event 167s -- ---- 167s perform public.setDropTable_int(p_tab_id); 167s return public.createEvent('_main', 'SET_DROP_TABLE', 167s p_tab_id::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setDropTable(p_tab_id int4) is 167s 'setDropTable (tab_id) 167s 167s Drop table tab_id from set on origin node, and generate SET_DROP_TABLE 167s event to allow this to propagate to other nodes.'; 167s COMMENT 167s create or replace function public.setDropTable_int(p_tab_id int4) 167s returns int4 167s as $$ 167s declare 167s v_set_id int4; 167s v_local_node_id int4; 167s v_set_origin int4; 167s v_sub_provider int4; 167s v_tab_reloid oid; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Determine the set_id 167s -- ---- 167s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 167s 167s -- ---- 167s -- Ensure table exists 167s -- ---- 167s if not found then 167s return 0; 167s end if; 167s 167s -- ---- 167s -- For sets with a remote origin, check that we are subscribed 167s -- to that set. Otherwise we ignore the table because it might 167s -- not even exist in our database. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = v_set_id; 167s if not found then 167s raise exception 'Slony-I: setDropTable_int(): set % not found', 167s v_set_id; 167s end if; 167s if v_set_origin != v_local_node_id then 167s select sub_provider into v_sub_provider 167s from public.sl_subscribe 167s where sub_set = v_set_id 167s and sub_receiver = public.getLocalNodeId('_main'); 167s if not found then 167s return 0; 167s end if; 167s end if; 167s 167s -- ---- 167s -- Drop the table from sl_table and drop trigger from it. 167s -- ---- 167s perform public.alterTableDropTriggers(p_tab_id); 167s delete from public.sl_table where tab_id = p_tab_id; 167s return p_tab_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setDropTable_int(p_tab_id int4) is 167s 'setDropTable_int (tab_id) 167s 167s This function processes the SET_DROP_TABLE event on remote nodes, 167s dropping a table from replication if the remote node is subscribing to 167s its replication set.'; 167s COMMENT 167s create or replace function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 167s returns bigint 167s as $$ 167s declare 167s v_set_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that we are the origin of the set 167s -- ---- 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = p_set_id; 167s if not found then 167s raise exception 'Slony-I: setAddSequence(): set % not found', p_set_id; 167s end if; 167s if v_set_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: setAddSequence(): set % has remote origin - submit to origin node', p_set_id; 167s end if; 167s 167s if exists (select true from public.sl_subscribe 167s where sub_set = p_set_id) 167s then 167s raise exception 'Slony-I: cannot add sequence to currently subscribed set %', 167s p_set_id; 167s end if; 167s 167s -- ---- 167s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 167s -- ---- 167s perform public.setAddSequence_int(p_set_id, p_seq_id, p_fqname, 167s p_seq_comment); 167s return public.createEvent('_main', 'SET_ADD_SEQUENCE', 167s p_set_id::text, p_seq_id::text, 167s p_fqname::text, p_seq_comment::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 167s 'setAddSequence (set_id, seq_id, seq_fqname, seq_comment) 167s 167s On the origin node for set set_id, add sequence seq_fqname to the 167s replication set, and raise SET_ADD_SEQUENCE to cause this to replicate 167s to subscriber nodes.'; 167s COMMENT 167s create or replace function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 167s returns int4 167s as $$ 167s declare 167s v_local_node_id int4; 167s v_set_origin int4; 167s v_sub_provider int4; 167s v_relkind char; 167s v_seq_reloid oid; 167s v_seq_relname name; 167s v_seq_nspname name; 167s v_sync_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- For sets with a remote origin, check that we are subscribed 167s -- to that set. Otherwise we ignore the sequence because it might 167s -- not even exist in our database. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = p_set_id; 167s if not found then 167s raise exception 'Slony-I: setAddSequence_int(): set % not found', 167s p_set_id; 167s end if; 167s if v_set_origin != v_local_node_id then 167s select sub_provider into v_sub_provider 167s from public.sl_subscribe 167s where sub_set = p_set_id 167s and sub_receiver = public.getLocalNodeId('_main'); 167s if not found then 167s return 0; 167s end if; 167s end if; 167s 167s -- ---- 167s -- Get the sequences OID and check that it is a sequence 167s -- ---- 167s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname 167s into v_seq_reloid, v_relkind, v_seq_relname, v_seq_nspname 167s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where PGC.relnamespace = PGN.oid 167s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 167s '.' || public.slon_quote_brute(PGC.relname); 167s if not found then 167s raise exception 'Slony-I: setAddSequence_int(): sequence % not found', 167s p_fqname; 167s end if; 167s if v_relkind != 'S' then 167s raise exception 'Slony-I: setAddSequence_int(): % is not a sequence', 167s p_fqname; 167s end if; 167s 167s select 1 into v_sync_row from public.sl_sequence where seq_id = p_seq_id; 167s if not found then 167s v_relkind := 'o'; -- all is OK 167s else 167s raise exception 'Slony-I: setAddSequence_int(): sequence ID % has already been assigned', p_seq_id; 167s end if; 167s 167s -- ---- 167s -- Add the sequence to sl_sequence 167s -- ---- 167s insert into public.sl_sequence 167s (seq_id, seq_reloid, seq_relname, seq_nspname, seq_set, seq_comment) 167s values 167s (p_seq_id, v_seq_reloid, v_seq_relname, v_seq_nspname, p_set_id, p_seq_comment); 167s 167s -- ---- 167s -- On the set origin, fake a sl_seqlog row for the last sync event 167s -- ---- 167s if v_set_origin = v_local_node_id then 167s for v_sync_row in select coalesce (max(ev_seqno), 0) as ev_seqno 167s from public.sl_event 167s where ev_origin = v_local_node_id 167s and ev_type = 'SYNC' 167s loop 167s insert into public.sl_seqlog 167s (seql_seqid, seql_origin, seql_ev_seqno, 167s seql_last_value) values 167s (p_seq_id, v_local_node_id, v_sync_row.ev_seqno, 167s public.sequenceLastValue(p_fqname)); 167s end loop; 167s end if; 167s 167s return p_seq_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 167s 'setAddSequence_int (set_id, seq_id, seq_fqname, seq_comment) 167s 167s This processes the SET_ADD_SEQUENCE event. On remote nodes that 167s subscribe to set_id, add the sequence to the replication set.'; 167s COMMENT 167s create or replace function public.setDropSequence (p_seq_id int4) 167s returns bigint 167s as $$ 167s declare 167s v_set_id int4; 167s v_set_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Determine set id for this sequence 167s -- ---- 167s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 167s 167s -- ---- 167s -- Ensure sequence exists 167s -- ---- 167s if not found then 167s raise exception 'Slony-I: setDropSequence_int(): sequence % not found', 167s p_seq_id; 167s end if; 167s 167s -- ---- 167s -- Check that we are the origin of the set 167s -- ---- 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = v_set_id; 167s if not found then 167s raise exception 'Slony-I: setDropSequence(): set % not found', v_set_id; 167s end if; 167s if v_set_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: setDropSequence(): set % has origin at another node - submit this to that node', v_set_id; 167s end if; 167s 167s -- ---- 167s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 167s -- ---- 167s perform public.setDropSequence_int(p_seq_id); 167s return public.createEvent('_main', 'SET_DROP_SEQUENCE', 167s p_seq_id::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setDropSequence (p_seq_id int4) is 167s 'setDropSequence (seq_id) 167s 167s On the origin node for the set, drop sequence seq_id from replication 167s set, and raise SET_DROP_SEQUENCE to cause this to replicate to 167s subscriber nodes.'; 167s COMMENT 167s create or replace function public.setDropSequence_int(p_seq_id int4) 167s returns int4 167s as $$ 167s declare 167s v_set_id int4; 167s v_local_node_id int4; 167s v_set_origin int4; 167s v_sub_provider int4; 167s v_relkind char; 167s v_sync_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Determine set id for this sequence 167s -- ---- 167s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 167s 167s -- ---- 167s -- Ensure sequence exists 167s -- ---- 167s if not found then 167s return 0; 167s end if; 167s 167s -- ---- 167s -- For sets with a remote origin, check that we are subscribed 167s -- to that set. Otherwise we ignore the sequence because it might 167s -- not even exist in our database. 167s -- ---- 167s v_local_node_id := public.getLocalNodeId('_main'); 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = v_set_id; 167s if not found then 167s raise exception 'Slony-I: setDropSequence_int(): set % not found', 167s v_set_id; 167s end if; 167s if v_set_origin != v_local_node_id then 167s select sub_provider into v_sub_provider 167s from public.sl_subscribe 167s where sub_set = v_set_id 167s and sub_receiver = public.getLocalNodeId('_main'); 167s if not found then 167s return 0; 167s end if; 167s end if; 167s 167s -- ---- 167s -- drop the sequence from sl_sequence, sl_seqlog 167s -- ---- 167s delete from public.sl_seqlog where seql_seqid = p_seq_id; 167s delete from public.sl_sequence where seq_id = p_seq_id; 167s 167s return p_seq_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setDropSequence_int(p_seq_id int4) is 167s 'setDropSequence_int (seq_id) 167s 167s This processes the SET_DROP_SEQUENCE event. On remote nodes that 167s subscribe to the set containing sequence seq_id, drop the sequence 167s from the replication set.'; 167s COMMENT 167s create or replace function public.setMoveTable (p_tab_id int4, p_new_set_id int4) 167s returns bigint 167s as $$ 167s declare 167s v_old_set_id int4; 167s v_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Get the tables current set 167s -- ---- 167s select tab_set into v_old_set_id from public.sl_table 167s where tab_id = p_tab_id; 167s if not found then 167s raise exception 'Slony-I: table %d not found', p_tab_id; 167s end if; 167s 167s -- ---- 167s -- Check that both sets exist and originate here 167s -- ---- 167s if p_new_set_id = v_old_set_id then 167s raise exception 'Slony-I: set ids cannot be identical'; 167s end if; 167s select set_origin into v_origin from public.sl_set 167s where set_id = p_new_set_id; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_new_set_id; 167s end if; 167s if v_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: set % does not originate on local node', 167s p_new_set_id; 167s end if; 167s 167s select set_origin into v_origin from public.sl_set 167s where set_id = v_old_set_id; 167s if not found then 167s raise exception 'Slony-I: set % not found', v_old_set_id; 167s end if; 167s if v_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: set % does not originate on local node', 167s v_old_set_id; 167s end if; 167s 167s -- ---- 167s -- Check that both sets are subscribed by the same set of nodes 167s -- ---- 167s if exists (select true from public.sl_subscribe SUB1 167s where SUB1.sub_set = p_new_set_id 167s and SUB1.sub_receiver not in (select SUB2.sub_receiver 167s from public.sl_subscribe SUB2 167s where SUB2.sub_set = v_old_set_id)) 167s then 167s raise exception 'Slony-I: subscriber lists of set % and % are different', 167s p_new_set_id, v_old_set_id; 167s end if; 167s 167s if exists (select true from public.sl_subscribe SUB1 167s where SUB1.sub_set = v_old_set_id 167s and SUB1.sub_receiver not in (select SUB2.sub_receiver 167s from public.sl_subscribe SUB2 167s where SUB2.sub_set = p_new_set_id)) 167s then 167s raise exception 'Slony-I: subscriber lists of set % and % are different', 167s v_old_set_id, p_new_set_id; 167s end if; 167s 167s -- ---- 167s -- Change the set the table belongs to 167s -- ---- 167s perform public.createEvent('_main', 'SYNC', NULL); 167s perform public.setMoveTable_int(p_tab_id, p_new_set_id); 167s return public.createEvent('_main', 'SET_MOVE_TABLE', 167s p_tab_id::text, p_new_set_id::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 167s 'This generates the SET_MOVE_TABLE event. If the set that the table is 167s in is identically subscribed to the set that the table is to be moved 167s into, then the SET_MOVE_TABLE event is raised.'; 167s COMMENT 167s create or replace function public.setMoveTable_int (p_tab_id int4, p_new_set_id int4) 167s returns int4 167s as $$ 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Move the table to the new set 167s -- ---- 167s update public.sl_table 167s set tab_set = p_new_set_id 167s where tab_id = p_tab_id; 167s 167s return p_tab_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 167s 'This processes the SET_MOVE_TABLE event. The table is moved 167s to the destination set.'; 167s COMMENT 167s create or replace function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) 167s returns bigint 167s as $$ 167s declare 167s v_old_set_id int4; 167s v_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Get the sequences current set 167s -- ---- 167s select seq_set into v_old_set_id from public.sl_sequence 167s where seq_id = p_seq_id; 167s if not found then 167s raise exception 'Slony-I: setMoveSequence(): sequence %d not found', p_seq_id; 167s end if; 167s 167s -- ---- 167s -- Check that both sets exist and originate here 167s -- ---- 167s if p_new_set_id = v_old_set_id then 167s raise exception 'Slony-I: setMoveSequence(): set ids cannot be identical'; 167s end if; 167s select set_origin into v_origin from public.sl_set 167s where set_id = p_new_set_id; 167s if not found then 167s raise exception 'Slony-I: setMoveSequence(): set % not found', p_new_set_id; 167s end if; 167s if v_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: setMoveSequence(): set % does not originate on local node', 167s p_new_set_id; 167s end if; 167s 167s select set_origin into v_origin from public.sl_set 167s where set_id = v_old_set_id; 167s if not found then 167s raise exception 'Slony-I: set % not found', v_old_set_id; 167s end if; 167s if v_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: set % does not originate on local node', 167s v_old_set_id; 167s end if; 167s 167s -- ---- 167s -- Check that both sets are subscribed by the same set of nodes 167s -- ---- 167s if exists (select true from public.sl_subscribe SUB1 167s where SUB1.sub_set = p_new_set_id 167s and SUB1.sub_receiver not in (select SUB2.sub_receiver 167s from public.sl_subscribe SUB2 167s where SUB2.sub_set = v_old_set_id)) 167s then 167s raise exception 'Slony-I: subscriber lists of set % and % are different', 167s p_new_set_id, v_old_set_id; 167s end if; 167s 167s if exists (select true from public.sl_subscribe SUB1 167s where SUB1.sub_set = v_old_set_id 167s and SUB1.sub_receiver not in (select SUB2.sub_receiver 167s from public.sl_subscribe SUB2 167s where SUB2.sub_set = p_new_set_id)) 167s then 167s raise exception 'Slony-I: subscriber lists of set % and % are different', 167s v_old_set_id, p_new_set_id; 167s end if; 167s 167s -- ---- 167s -- Change the set the sequence belongs to 167s -- ---- 167s perform public.setMoveSequence_int(p_seq_id, p_new_set_id); 167s return public.createEvent('_main', 'SET_MOVE_SEQUENCE', 167s p_seq_id::text, p_new_set_id::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) is 167s 'setMoveSequence(p_seq_id, p_new_set_id) - This generates the 167s SET_MOVE_SEQUENCE event, after validation, notably that both sets 167s exist, are distinct, and have exactly the same subscription lists'; 167s COMMENT 167s create or replace function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) 167s returns int4 167s as $$ 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Move the sequence to the new set 167s -- ---- 167s update public.sl_sequence 167s set seq_set = p_new_set_id 167s where seq_id = p_seq_id; 167s 167s return p_seq_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) is 167s 'setMoveSequence_int(p_seq_id, p_new_set_id) - processes the 167s SET_MOVE_SEQUENCE event, moving a sequence to another replication 167s set.'; 167s COMMENT 167s create or replace function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) returns int4 167s as $$ 167s declare 167s v_fqname text; 167s v_found integer; 167s begin 167s -- ---- 167s -- Get the sequences fully qualified name 167s -- ---- 167s select public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) into v_fqname 167s from public.sl_sequence SQ, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where SQ.seq_id = p_seq_id 167s and SQ.seq_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid; 167s if not found then 167s if p_ignore_missing then 167s return null; 167s end if; 167s raise exception 'Slony-I: sequenceSetValue(): sequence % not found', p_seq_id; 167s end if; 167s 167s -- ---- 167s -- Update it to the new value 167s -- ---- 167s execute 'select setval(''' || v_fqname || 167s ''', ' || p_last_value::text || ')'; 167s 167s if p_ev_seqno is not null then 167s insert into public.sl_seqlog 167s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 167s values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value); 167s end if; 167s return p_seq_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) is 167s 'sequenceSetValue (seq_id, seq_origin, ev_seqno, last_value,ignore_missing) 167s Set sequence seq_id to have new value last_value. 167s '; 167s COMMENT 167s drop function if exists public.ddlCapture (p_statement text, p_nodes text); 167s DROP FUNCTION 167s create or replace function public.ddlCapture (p_statement text, p_nodes text) 167s returns bigint 167s as $$ 167s declare 167s c_local_node integer; 167s c_found_origin boolean; 167s c_node text; 167s c_cmdargs text[]; 167s c_nodeargs text; 167s c_delim text; 167s begin 167s c_local_node := public.getLocalNodeId('_main'); 167s 167s c_cmdargs = array_append('{}'::text[], p_statement); 167s c_nodeargs = ''; 167s if p_nodes is not null then 167s c_found_origin := 'f'; 167s -- p_nodes list needs to consist of a list of nodes that exist 167s -- and that include the current node ID 167s for c_node in select trim(node) from 167s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 167s if not exists 167s (select 1 from public.sl_node 167s where no_id = (c_node::integer)) then 167s raise exception 'ddlcapture(%,%) - node % does not exist!', 167s p_statement, p_nodes, c_node; 167s end if; 167s 167s if c_local_node = (c_node::integer) then 167s c_found_origin := 't'; 167s end if; 167s if length(c_nodeargs)>0 then 167s c_nodeargs = c_nodeargs ||','|| c_node; 167s else 167s c_nodeargs=c_node; 167s end if; 167s end loop; 167s 167s if not c_found_origin then 167s raise exception 167s 'ddlcapture(%,%) - origin node % not included in ONLY ON list!', 167s p_statement, p_nodes, c_local_node; 167s end if; 167s end if; 167s c_cmdargs = array_append(c_cmdargs,c_nodeargs); 167s c_delim=','; 167s c_cmdargs = array_append(c_cmdargs, 167s 167s (select public.string_agg( seq_id::text || c_delim 167s || c_local_node || 167s c_delim || seq_last_value) 167s FROM ( 167s select seq_id, 167s seq_last_value from public.sl_seqlastvalue 167s where seq_origin = c_local_node) as FOO 167s where NOT public.seqtrack(seq_id,seq_last_value) is NULL)); 167s insert into public.sl_log_script 167s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 167s values 167s (c_local_node, pg_catalog.txid_current(), 167s nextval('public.sl_action_seq'), 'S', c_cmdargs); 167s execute p_statement; 167s return currval('public.sl_action_seq'); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.ddlCapture (p_statement text, p_nodes text) is 167s 'Capture an SQL statement (usually DDL) that is to be literally replayed on subscribers'; 167s COMMENT 167s drop function if exists public.ddlScript_complete (int4, text, int4); 167s DROP FUNCTION 167s create or replace function public.ddlScript_complete (p_nodes text) 167s returns bigint 167s as $$ 167s declare 167s c_local_node integer; 167s c_found_origin boolean; 167s c_node text; 167s c_cmdargs text[]; 167s begin 167s c_local_node := public.getLocalNodeId('_main'); 167s 167s c_cmdargs = '{}'::text[]; 167s if p_nodes is not null then 167s c_found_origin := 'f'; 167s -- p_nodes list needs to consist o a list of nodes that exist 167s -- and that include the current node ID 167s for c_node in select trim(node) from 167s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 167s if not exists 167s (select 1 from public.sl_node 167s where no_id = (c_node::integer)) then 167s raise exception 'ddlcapture(%,%) - node % does not exist!', 167s p_statement, p_nodes, c_node; 167s end if; 167s 167s if c_local_node = (c_node::integer) then 167s c_found_origin := 't'; 167s end if; 167s 167s c_cmdargs = array_append(c_cmdargs, c_node); 167s end loop; 167s 167s if not c_found_origin then 167s raise exception 167s 'ddlScript_complete(%) - origin node % not included in ONLY ON list!', 167s p_nodes, c_local_node; 167s end if; 167s end if; 167s 167s perform public.ddlScript_complete_int(); 167s 167s insert into public.sl_log_script 167s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 167s values 167s (c_local_node, pg_catalog.txid_current(), 167s nextval('public.sl_action_seq'), 's', c_cmdargs); 167s 167s return currval('public.sl_action_seq'); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.ddlScript_complete(p_nodes text) is 167s 'ddlScript_complete(set_id, script, only_on_node) 167s 167s After script has run on origin, this fixes up relnames and 167s log trigger arguments and inserts the "fire ddlScript_complete_int() 167s log row into sl_log_script.'; 167s COMMENT 167s drop function if exists public.ddlScript_complete_int(int4, int4); 167s DROP FUNCTION 167s create or replace function public.ddlScript_complete_int () 167s returns int4 167s as $$ 167s begin 167s perform public.updateRelname(); 167s perform public.repair_log_triggers(true); 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.ddlScript_complete_int() is 167s 'ddlScript_complete_int() 167s 167s Complete processing the DDL_SCRIPT event.'; 167s COMMENT 167s create or replace function public.alterTableAddTriggers (p_tab_id int4) 167s returns int4 167s as $$ 167s declare 167s v_no_id int4; 167s v_tab_row record; 167s v_tab_fqname text; 167s v_tab_attkind text; 167s v_n int4; 167s v_trec record; 167s v_tgbad boolean; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Get our local node ID 167s -- ---- 167s v_no_id := public.getLocalNodeId('_main'); 167s 167s -- ---- 167s -- Get the sl_table row and the current origin of the table. 167s -- ---- 167s select T.tab_reloid, T.tab_set, T.tab_idxname, 167s S.set_origin, PGX.indexrelid, 167s public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) as tab_fqname 167s into v_tab_row 167s from public.sl_table T, public.sl_set S, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 167s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 167s where T.tab_id = p_tab_id 167s and T.tab_set = S.set_id 167s and T.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid 167s and PGX.indrelid = T.tab_reloid 167s and PGX.indexrelid = PGXC.oid 167s and PGXC.relname = T.tab_idxname 167s for update; 167s if not found then 167s raise exception 'Slony-I: alterTableAddTriggers(): Table with id % not found', p_tab_id; 167s end if; 167s v_tab_fqname = v_tab_row.tab_fqname; 167s 167s v_tab_attkind := public.determineAttKindUnique(v_tab_row.tab_fqname, 167s v_tab_row.tab_idxname); 167s 167s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 167s 167s -- ---- 167s -- Create the log and the deny access triggers 167s -- ---- 167s execute 'create trigger "_main_logtrigger"' || 167s ' after insert or update or delete on ' || 167s v_tab_fqname || ' for each row execute procedure public.logTrigger (' || 167s pg_catalog.quote_literal('_main') || ',' || 167s pg_catalog.quote_literal(p_tab_id::text) || ',' || 167s pg_catalog.quote_literal(v_tab_attkind) || ');'; 167s 167s execute 'create trigger "_main_denyaccess" ' || 167s 'before insert or update or delete on ' || 167s v_tab_fqname || ' for each row execute procedure ' || 167s 'public.denyAccess (' || pg_catalog.quote_literal('_main') || ');'; 167s 167s perform public.alterTableAddTruncateTrigger(v_tab_fqname, p_tab_id); 167s 167s perform public.alterTableConfigureTriggers (p_tab_id); 167s return p_tab_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.alterTableAddTriggers(p_tab_id int4) is 167s 'alterTableAddTriggers(tab_id) 167s 167s Adds the log and deny access triggers to a replicated table.'; 167s COMMENT 167s create or replace function public.alterTableDropTriggers (p_tab_id int4) 167s returns int4 167s as $$ 167s declare 167s v_no_id int4; 167s v_tab_row record; 167s v_tab_fqname text; 167s v_n int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Get our local node ID 167s -- ---- 167s v_no_id := public.getLocalNodeId('_main'); 167s 167s -- ---- 167s -- Get the sl_table row and the current tables origin. 167s -- ---- 167s select T.tab_reloid, T.tab_set, 167s S.set_origin, PGX.indexrelid, 167s public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) as tab_fqname 167s into v_tab_row 167s from public.sl_table T, public.sl_set S, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 167s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 167s where T.tab_id = p_tab_id 167s and T.tab_set = S.set_id 167s and T.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid 167s and PGX.indrelid = T.tab_reloid 167s and PGX.indexrelid = PGXC.oid 167s and PGXC.relname = T.tab_idxname 167s for update; 167s if not found then 167s raise exception 'Slony-I: alterTableDropTriggers(): Table with id % not found', p_tab_id; 167s end if; 167s v_tab_fqname = v_tab_row.tab_fqname; 167s 167s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 167s 167s -- ---- 167s -- Drop both triggers 167s -- ---- 167s execute 'drop trigger "_main_logtrigger" on ' || 167s v_tab_fqname; 167s 167s execute 'drop trigger "_main_denyaccess" on ' || 167s v_tab_fqname; 167s 167s perform public.alterTableDropTruncateTrigger(v_tab_fqname, p_tab_id); 167s 167s return p_tab_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.alterTableDropTriggers (p_tab_id int4) is 167s 'alterTableDropTriggers (tab_id) 167s 167s Remove the log and deny access triggers from a table.'; 167s COMMENT 167s create or replace function public.alterTableConfigureTriggers (p_tab_id int4) 167s returns int4 167s as $$ 167s declare 167s v_no_id int4; 167s v_tab_row record; 167s v_tab_fqname text; 167s v_n int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Get our local node ID 167s -- ---- 167s v_no_id := public.getLocalNodeId('_main'); 167s 167s -- ---- 167s -- Get the sl_table row and the current tables origin. 167s -- ---- 167s select T.tab_reloid, T.tab_set, 167s S.set_origin, PGX.indexrelid, 167s public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) as tab_fqname 167s into v_tab_row 167s from public.sl_table T, public.sl_set S, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 167s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 167s where T.tab_id = p_tab_id 167s and T.tab_set = S.set_id 167s and T.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid 167s and PGX.indrelid = T.tab_reloid 167s and PGX.indexrelid = PGXC.oid 167s and PGXC.relname = T.tab_idxname 167s for update; 167s if not found then 167s raise exception 'Slony-I: alterTableConfigureTriggers(): Table with id % not found', p_tab_id; 167s end if; 167s v_tab_fqname = v_tab_row.tab_fqname; 167s 167s -- ---- 167s -- Configuration depends on the origin of the table 167s -- ---- 167s if v_tab_row.set_origin = v_no_id then 167s -- ---- 167s -- On the origin the log trigger is configured like a default 167s -- user trigger and the deny access trigger is disabled. 167s -- ---- 167s execute 'alter table ' || v_tab_fqname || 167s ' enable trigger "_main_logtrigger"'; 167s execute 'alter table ' || v_tab_fqname || 167s ' disable trigger "_main_denyaccess"'; 167s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 167s 'enable', 'disable'); 167s else 167s -- ---- 167s -- On a replica the log trigger is disabled and the 167s -- deny access trigger fires in origin session role. 167s -- ---- 167s execute 'alter table ' || v_tab_fqname || 167s ' disable trigger "_main_logtrigger"'; 167s execute 'alter table ' || v_tab_fqname || 167s ' enable trigger "_main_denyaccess"'; 167s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 167s 'disable', 'enable'); 167s 167s end if; 167s 167s return p_tab_id; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.alterTableConfigureTriggers (p_tab_id int4) is 167s 'alterTableConfigureTriggers (tab_id) 167s 167s Set the enable/disable configuration for the replication triggers 167s according to the origin of the set.'; 167s COMMENT 167s create or replace function public.resubscribeNode (p_origin int4, 167s p_provider int4, p_receiver int4) 167s returns bigint 167s as $$ 167s declare 167s v_record record; 167s v_missing_sets text; 167s v_ev_seqno bigint; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- 167s -- Check that the receiver exists 167s -- 167s if not exists (select no_id from public.sl_node where no_id= 167s p_receiver) then 167s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_receiver; 167s end if; 167s 167s -- 167s -- Check that the provider exists 167s -- 167s if not exists (select no_id from public.sl_node where no_id= 167s p_provider) then 167s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_provider; 167s end if; 167s 167s 167s -- ---- 167s -- Check that this is called on the origin node 167s -- ---- 167s if p_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: subscribeSet() must be called on origin'; 167s end if; 167s 167s -- --- 167s -- Verify that the provider is either the origin or an active subscriber 167s -- Bug report #1362 167s -- --- 167s if p_origin <> p_provider then 167s for v_record in select sub1.sub_set from 167s public.sl_subscribe sub1 167s left outer join (public.sl_subscribe sub2 167s inner join 167s public.sl_set on ( 167s sl_set.set_id=sub2.sub_set 167s and sub2.sub_set=p_origin) 167s ) 167s ON ( sub1.sub_set = sub2.sub_set and 167s sub1.sub_receiver = p_provider and 167s sub1.sub_forward and sub1.sub_active 167s and sub2.sub_receiver=p_receiver) 167s 167s where sub2.sub_set is null 167s loop 167s v_missing_sets=v_missing_sets || ' ' || v_record.sub_set; 167s end loop; 167s if v_missing_sets is not null then 167s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, v_missing_sets; 167s end if; 167s end if; 167s 167s for v_record in select * from 167s public.sl_subscribe, public.sl_set where 167s sub_set=set_id and 167s sub_receiver=p_receiver 167s and set_origin=p_origin 167s loop 167s -- ---- 167s -- Create the SUBSCRIBE_SET event 167s -- ---- 167s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 167s v_record.sub_set::text, p_provider::text, p_receiver::text, 167s case v_record.sub_forward when true then 't' else 'f' end, 167s 'f' ); 167s 167s -- ---- 167s -- Call the internal procedure to store the subscription 167s -- ---- 167s perform public.subscribeSet_int(v_record.sub_set, 167s p_provider, 167s p_receiver, v_record.sub_forward, false); 167s end loop; 167s 167s return v_ev_seqno; 167s end; 167s $$ 167s language plpgsql; 167s CREATE FUNCTION 167s create or replace function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 167s returns bigint 167s as $$ 167s declare 167s v_set_origin int4; 167s v_ev_seqno int8; 167s v_ev_seqno2 int8; 167s v_rec record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- 167s -- Check that the receiver exists 167s -- 167s if not exists (select no_id from public.sl_node where no_id= 167s p_sub_receiver) then 167s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_sub_receiver; 167s end if; 167s 167s -- 167s -- Check that the provider exists 167s -- 167s if not exists (select no_id from public.sl_node where no_id= 167s p_sub_provider) then 167s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_sub_provider; 167s end if; 167s 167s -- ---- 167s -- Check that the origin and provider of the set are remote 167s -- ---- 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = p_sub_set; 167s if not found then 167s raise exception 'Slony-I: subscribeSet(): set % not found', p_sub_set; 167s end if; 167s if v_set_origin = p_sub_receiver then 167s raise exception 167s 'Slony-I: subscribeSet(): set origin and receiver cannot be identical'; 167s end if; 167s if p_sub_receiver = p_sub_provider then 167s raise exception 167s 'Slony-I: subscribeSet(): set provider and receiver cannot be identical'; 167s end if; 167s -- ---- 167s -- Check that this is called on the origin node 167s -- ---- 167s if v_set_origin != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: subscribeSet() must be called on origin'; 167s end if; 167s 167s -- --- 167s -- Verify that the provider is either the origin or an active subscriber 167s -- Bug report #1362 167s -- --- 167s if v_set_origin <> p_sub_provider then 167s if not exists (select 1 from public.sl_subscribe 167s where sub_set = p_sub_set and 167s sub_receiver = p_sub_provider and 167s sub_forward and sub_active) then 167s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, p_sub_set; 167s end if; 167s end if; 167s 167s -- --- 167s -- Enforce that all sets from one origin are subscribed 167s -- using the same data provider per receiver. 167s -- ---- 167s if not exists (select 1 from public.sl_subscribe 167s where sub_set = p_sub_set and sub_receiver = p_sub_receiver) then 167s -- 167s -- New subscription - error out if we have any other subscription 167s -- from that origin with a different data provider. 167s -- 167s for v_rec in select sub_provider from public.sl_subscribe 167s join public.sl_set on set_id = sub_set 167s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 167s loop 167s if v_rec.sub_provider <> p_sub_provider then 167s raise exception 'Slony-I: subscribeSet(): wrong provider % - existing subscription from origin % users provider %', 167s p_sub_provider, v_set_origin, v_rec.sub_provider; 167s end if; 167s end loop; 167s else 167s -- 167s -- Existing subscription - in case the data provider changes and 167s -- there are other subscriptions, warn here. subscribeSet_int() 167s -- will currently change the data provider for those sets as well. 167s -- 167s for v_rec in select set_id, sub_provider from public.sl_subscribe 167s join public.sl_set on set_id = sub_set 167s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 167s and set_id <> p_sub_set 167s loop 167s if v_rec.sub_provider <> p_sub_provider then 167s raise exception 'Slony-I: subscribeSet(): also data provider for set % use resubscribe instead', 167s v_rec.set_id; 167s end if; 167s end loop; 167s end if; 167s 167s -- ---- 167s -- Create the SUBSCRIBE_SET event 167s -- ---- 167s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 167s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 167s case p_sub_forward when true then 't' else 'f' end, 167s case p_omit_copy when true then 't' else 'f' end 167s ); 167s 167s -- ---- 167s -- Call the internal procedure to store the subscription 167s -- ---- 167s v_ev_seqno2:=public.subscribeSet_int(p_sub_set, p_sub_provider, 167s p_sub_receiver, p_sub_forward, p_omit_copy); 167s 167s if v_ev_seqno2 is not null then 167s v_ev_seqno:=v_ev_seqno2; 167s end if; 167s 167s return v_ev_seqno; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 167s 'subscribeSet (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 167s 167s Makes sure that the receiver is not the provider, then stores the 167s subscription, and publishes the SUBSCRIBE_SET event to other nodes. 167s 167s If omit_copy is true, then no data copy will be done. 167s '; 167s COMMENT 167s DROP FUNCTION IF EXISTS public.subscribeSet_int(int4,int4,int4,bool,bool); 167s DROP FUNCTION 167s create or replace function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 167s returns int4 167s as $$ 167s declare 167s v_set_origin int4; 167s v_sub_row record; 167s v_seq_id bigint; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Lookup the set origin 167s -- ---- 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = p_sub_set; 167s if not found then 167s raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set; 167s end if; 167s 167s -- ---- 167s -- Provider change is only allowed for active sets 167s -- ---- 167s if p_sub_receiver = public.getLocalNodeId('_main') then 167s select sub_active into v_sub_row from public.sl_subscribe 167s where sub_set = p_sub_set 167s and sub_receiver = p_sub_receiver; 167s if found then 167s if not v_sub_row.sub_active then 167s raise exception 'Slony-I: subscribeSet_int(): set % is not active, cannot change provider', 167s p_sub_set; 167s end if; 167s end if; 167s end if; 167s 167s -- ---- 167s -- Try to change provider and/or forward for an existing subscription 167s -- ---- 167s update public.sl_subscribe 167s set sub_provider = p_sub_provider, 167s sub_forward = p_sub_forward 167s where sub_set = p_sub_set 167s and sub_receiver = p_sub_receiver; 167s if found then 167s 167s -- ---- 167s -- This is changing a subscriptoin. Make sure all sets from 167s -- this origin are subscribed using the same data provider. 167s -- For this we first check that the requested data provider 167s -- is subscribed to all the sets, the receiver is subscribed to. 167s -- ---- 167s for v_sub_row in select set_id from public.sl_set 167s join public.sl_subscribe on set_id = sub_set 167s where set_origin = v_set_origin 167s and sub_receiver = p_sub_receiver 167s and sub_set <> p_sub_set 167s loop 167s if not exists (select 1 from public.sl_subscribe 167s where sub_set = v_sub_row.set_id 167s and sub_receiver = p_sub_provider 167s and sub_active and sub_forward) 167s and not exists (select 1 from public.sl_set 167s where set_id = v_sub_row.set_id 167s and set_origin = p_sub_provider) 167s then 167s raise exception 'Slony-I: subscribeSet_int(): node % is not a forwarding subscriber for set %', 167s p_sub_provider, v_sub_row.set_id; 167s end if; 167s 167s -- ---- 167s -- New data provider offers this set as well, change that 167s -- subscription too. 167s -- ---- 167s update public.sl_subscribe 167s set sub_provider = p_sub_provider 167s where sub_set = v_sub_row.set_id 167s and sub_receiver = p_sub_receiver; 167s end loop; 167s 167s -- ---- 167s -- Rewrite sl_listen table 167s -- ---- 167s perform public.RebuildListenEntries(); 167s 167s return p_sub_set; 167s end if; 167s 167s -- ---- 167s -- Not found, insert a new one 167s -- ---- 167s if not exists (select true from public.sl_path 167s where pa_server = p_sub_provider 167s and pa_client = p_sub_receiver) 167s then 167s insert into public.sl_path 167s (pa_server, pa_client, pa_conninfo, pa_connretry) 167s values 167s (p_sub_provider, p_sub_receiver, 167s '', 10); 167s end if; 167s insert into public.sl_subscribe 167s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 167s values (p_sub_set, p_sub_provider, p_sub_receiver, 167s p_sub_forward, false); 167s 167s -- ---- 167s -- If the set origin is here, then enable the subscription 167s -- ---- 167s if v_set_origin = public.getLocalNodeId('_main') then 167s select public.createEvent('_main', 'ENABLE_SUBSCRIPTION', 167s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 167s case p_sub_forward when true then 't' else 'f' end, 167s case p_omit_copy when true then 't' else 'f' end 167s ) into v_seq_id; 167s perform public.enableSubscription(p_sub_set, 167s p_sub_provider, p_sub_receiver); 167s end if; 167s 167s -- ---- 167s -- Rewrite sl_listen table 167s -- ---- 167s perform public.RebuildListenEntries(); 167s 167s return p_sub_set; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 167s 'subscribeSet_int (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 167s 167s Internal actions for subscribing receiver sub_receiver to subscription 167s set sub_set.'; 167s COMMENT 167s drop function IF EXISTS public.unsubscribeSet(int4,int4,boolean); 167s DROP FUNCTION 167s create or replace function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,p_force boolean) 167s returns bigint 167s as $$ 167s declare 167s v_tab_row record; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- Check that this is called on the receiver node 167s -- ---- 167s if p_sub_receiver != public.getLocalNodeId('_main') then 167s raise exception 'Slony-I: unsubscribeSet() must be called on receiver'; 167s end if; 167s 167s 167s 167s -- ---- 167s -- Check that this does not break any chains 167s -- ---- 167s if p_force=false and exists (select true from public.sl_subscribe 167s where sub_set = p_sub_set 167s and sub_provider = p_sub_receiver) 167s then 167s raise exception 'Slony-I: Cannot unsubscribe set % while being provider', 167s p_sub_set; 167s end if; 167s 167s if exists (select true from public.sl_subscribe 167s where sub_set = p_sub_set 167s and sub_provider = p_sub_receiver) 167s then 167s --delete the receivers of this provider. 167s --unsubscribeSet_int() will generate the event 167s --when it runs on the receiver. 167s delete from public.sl_subscribe 167s where sub_set=p_sub_set 167s and sub_provider=p_sub_receiver; 167s end if; 167s 167s -- ---- 167s -- Remove the replication triggers. 167s -- ---- 167s for v_tab_row in select tab_id from public.sl_table 167s where tab_set = p_sub_set 167s order by tab_id 167s loop 167s perform public.alterTableDropTriggers(v_tab_row.tab_id); 167s end loop; 167s 167s -- ---- 167s -- Remove the setsync status. This will also cause the 167s -- worker thread to ignore the set and stop replicating 167s -- right now. 167s -- ---- 167s delete from public.sl_setsync 167s where ssy_setid = p_sub_set; 167s 167s -- ---- 167s -- Remove all sl_table and sl_sequence entries for this set. 167s -- Should we ever subscribe again, the initial data 167s -- copy process will create new ones. 167s -- ---- 167s delete from public.sl_table 167s where tab_set = p_sub_set; 167s delete from public.sl_sequence 167s where seq_set = p_sub_set; 167s 167s -- ---- 167s -- Call the internal procedure to drop the subscription 167s -- ---- 167s perform public.unsubscribeSet_int(p_sub_set, p_sub_receiver); 167s 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s -- ---- 167s -- Create the UNSUBSCRIBE_SET event 167s -- ---- 167s return public.createEvent('_main', 'UNSUBSCRIBE_SET', 167s p_sub_set::text, p_sub_receiver::text); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,force boolean) is 167s 'unsubscribeSet (sub_set, sub_receiver,force) 167s 167s Unsubscribe node sub_receiver from subscription set sub_set. This is 167s invoked on the receiver node. It verifies that this does not break 167s any chains (e.g. - where sub_receiver is a provider for another node), 167s then restores tables, drops Slony-specific keys, drops table entries 167s for the set, drops the subscription, and generates an UNSUBSCRIBE_SET 167s node to publish that the node is being dropped.'; 167s COMMENT 167s create or replace function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) 167s returns int4 167s as $$ 167s declare 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- All the real work is done before event generation on the 167s -- subscriber. 167s -- ---- 167s 167s --if this event unsubscribes the provider of this node 167s --then this node should unsubscribe itself from the set as well. 167s 167s if exists (select true from 167s public.sl_subscribe where 167s sub_set=p_sub_set and sub_provider=p_sub_receiver 167s and sub_receiver=public.getLocalNodeId('_main')) 167s then 167s perform public.unsubscribeSet(p_sub_set,public.getLocalNodeId('_main'),true); 167s end if; 167s 167s 167s delete from public.sl_subscribe 167s where sub_set = p_sub_set 167s and sub_receiver = p_sub_receiver; 167s 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s return p_sub_set; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) is 167s 'unsubscribeSet_int (sub_set, sub_receiver) 167s 167s All the REAL work of removing the subscriber is done before the event 167s is generated, so this function just has to drop the references to the 167s subscription in sl_subscribe.'; 167s COMMENT 167s create or replace function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 167s returns int4 167s as $$ 167s begin 167s return public.enableSubscription_int (p_sub_set, 167s p_sub_provider, p_sub_receiver); 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 167s 'enableSubscription (sub_set, sub_provider, sub_receiver) 167s 167s Indicates that sub_receiver intends subscribing to set sub_set from 167s sub_provider. Work is all done by the internal function 167s enableSubscription_int (sub_set, sub_provider, sub_receiver).'; 167s COMMENT 167s create or replace function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 167s returns int4 167s as $$ 167s declare 167s v_n int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- ---- 167s -- The real work is done in the replication engine. All 167s -- we have to do here is remembering that it happened. 167s -- ---- 167s 167s -- ---- 167s -- Well, not only ... we might be missing an important event here 167s -- ---- 167s if not exists (select true from public.sl_path 167s where pa_server = p_sub_provider 167s and pa_client = p_sub_receiver) 167s then 167s insert into public.sl_path 167s (pa_server, pa_client, pa_conninfo, pa_connretry) 167s values 167s (p_sub_provider, p_sub_receiver, 167s '', 10); 167s end if; 167s 167s update public.sl_subscribe 167s set sub_active = 't' 167s where sub_set = p_sub_set 167s and sub_receiver = p_sub_receiver; 167s get diagnostics v_n = row_count; 167s if v_n = 0 then 167s insert into public.sl_subscribe 167s (sub_set, sub_provider, sub_receiver, 167s sub_forward, sub_active) 167s values 167s (p_sub_set, p_sub_provider, p_sub_receiver, 167s false, true); 167s end if; 167s 167s -- Rewrite sl_listen table 167s perform public.RebuildListenEntries(); 167s 167s return p_sub_set; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 167s 'enableSubscription_int (sub_set, sub_provider, sub_receiver) 167s 167s Internal function to enable subscription of node sub_receiver to set 167s sub_set via node sub_provider. 167s 167s slon does most of the work; all we need do here is to remember that it 167s happened. The function updates sl_subscribe, indicating that the 167s subscription has become active.'; 167s COMMENT 167s create or replace function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) 167s returns bigint 167s as $$ 167s declare 167s v_max_seqno bigint; 167s begin 167s select into v_max_seqno coalesce(max(con_seqno), 0) 167s from public.sl_confirm 167s where con_origin = p_con_origin 167s and con_received = p_con_received; 167s if v_max_seqno < p_con_seqno then 167s insert into public.sl_confirm 167s (con_origin, con_received, con_seqno, con_timestamp) 167s values (p_con_origin, p_con_received, p_con_seqno, 167s p_con_timestamp); 167s v_max_seqno = p_con_seqno; 167s end if; 167s 167s return v_max_seqno; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) is 167s 'forwardConfirm (p_con_origin, p_con_received, p_con_seqno, p_con_timestamp) 167s 167s Confirms (recorded in sl_confirm) that items from p_con_origin up to 167s p_con_seqno have been received by node p_con_received as of 167s p_con_timestamp, and raises an event to forward this confirmation.'; 167s COMMENT 167s create or replace function public.cleanupEvent (p_interval interval) 167s returns int4 167s as $$ 167s declare 167s v_max_row record; 167s v_min_row record; 167s v_max_sync int8; 167s v_origin int8; 167s v_seqno int8; 167s v_xmin bigint; 167s v_rc int8; 167s begin 167s -- ---- 167s -- First remove all confirmations where origin/receiver no longer exist 167s -- ---- 167s delete from public.sl_confirm 167s where con_origin not in (select no_id from public.sl_node); 167s delete from public.sl_confirm 167s where con_received not in (select no_id from public.sl_node); 167s -- ---- 167s -- Next remove all but the oldest confirm row per origin,receiver pair. 167s -- Ignore confirmations that are younger than 10 minutes. We currently 167s -- have an not confirmed suspicion that a possibly lost transaction due 167s -- to a server crash might have been visible to another session, and 167s -- that this led to log data that is needed again got removed. 167s -- ---- 167s for v_max_row in select con_origin, con_received, max(con_seqno) as con_seqno 167s from public.sl_confirm 167s where con_timestamp < (CURRENT_TIMESTAMP - p_interval) 167s group by con_origin, con_received 167s loop 167s delete from public.sl_confirm 167s where con_origin = v_max_row.con_origin 167s and con_received = v_max_row.con_received 167s and con_seqno < v_max_row.con_seqno; 167s end loop; 167s 167s -- ---- 167s -- Then remove all events that are confirmed by all nodes in the 167s -- whole cluster up to the last SYNC 167s -- ---- 167s for v_min_row in select con_origin, min(con_seqno) as con_seqno 167s from public.sl_confirm 167s group by con_origin 167s loop 167s select coalesce(max(ev_seqno), 0) into v_max_sync 167s from public.sl_event 167s where ev_origin = v_min_row.con_origin 167s and ev_seqno <= v_min_row.con_seqno 167s and ev_type = 'SYNC'; 167s if v_max_sync > 0 then 167s delete from public.sl_event 167s where ev_origin = v_min_row.con_origin 167s and ev_seqno < v_max_sync; 167s end if; 167s end loop; 167s 167s -- ---- 167s -- If cluster has only one node, then remove all events up to 167s -- the last SYNC - Bug #1538 167s -- http://gborg.postgresql.org/project/slony1/bugs/bugupdate.php?1538 167s -- ---- 167s 167s select * into v_min_row from public.sl_node where 167s no_id <> public.getLocalNodeId('_main') limit 1; 167s if not found then 167s select ev_origin, ev_seqno into v_min_row from public.sl_event 167s where ev_origin = public.getLocalNodeId('_main') 167s order by ev_origin desc, ev_seqno desc limit 1; 167s raise notice 'Slony-I: cleanupEvent(): Single node - deleting events < %', v_min_row.ev_seqno; 167s delete from public.sl_event 167s where 167s ev_origin = v_min_row.ev_origin and 167s ev_seqno < v_min_row.ev_seqno; 167s 167s end if; 167s 167s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_seqlog' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 167s execute 'alter table public.sl_seqlog set without oids;'; 167s end if; 167s -- ---- 167s -- Also remove stale entries from the nodelock table. 167s -- ---- 167s perform public.cleanupNodelock(); 167s 167s -- ---- 167s -- Find the eldest event left, for each origin 167s -- ---- 167s for v_origin, v_seqno, v_xmin in 167s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 167s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 167s loop 167s delete from public.sl_seqlog where seql_origin = v_origin and seql_ev_seqno < v_seqno; 167s delete from public.sl_log_script where log_origin = v_origin and log_txid < v_xmin; 167s end loop; 167s 167s v_rc := public.logswitch_finish(); 167s if v_rc = 0 then -- no switch in progress 167s perform public.logswitch_start(); 167s end if; 167s 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.cleanupEvent (p_interval interval) is 167s 'cleaning old data out of sl_confirm, sl_event. Removes all but the 167s last sl_confirm row per (origin,receiver), and then removes all events 167s that are confirmed by all nodes in the whole cluster up to the last 167s SYNC.'; 167s COMMENT 167s create or replace function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) returns name 167s as $$ 167s declare 167s v_tab_fqname_quoted text default ''; 167s v_idxrow record; 167s begin 167s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 167s -- 167s -- Ensure that the table exists 167s -- 167s if (select PGC.relname 167s from "pg_catalog".pg_class PGC, 167s "pg_catalog".pg_namespace PGN 167s where public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 167s and PGN.oid = PGC.relnamespace) is null then 167s raise exception 'Slony-I: determineIdxnameUnique(): table % not found', v_tab_fqname_quoted; 167s end if; 167s 167s -- 167s -- Lookup the tables primary key or the specified unique index 167s -- 167s if p_idx_name isnull then 167s select PGXC.relname 167s into v_idxrow 167s from "pg_catalog".pg_class PGC, 167s "pg_catalog".pg_namespace PGN, 167s "pg_catalog".pg_index PGX, 167s "pg_catalog".pg_class PGXC 167s where public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 167s and PGN.oid = PGC.relnamespace 167s and PGX.indrelid = PGC.oid 167s and PGX.indexrelid = PGXC.oid 167s and PGX.indisprimary; 167s if not found then 167s raise exception 'Slony-I: table % has no primary key', 167s v_tab_fqname_quoted; 167s end if; 167s else 167s select PGXC.relname 167s into v_idxrow 167s from "pg_catalog".pg_class PGC, 167s "pg_catalog".pg_namespace PGN, 167s "pg_catalog".pg_index PGX, 167s "pg_catalog".pg_class PGXC 167s where public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 167s and PGN.oid = PGC.relnamespace 167s and PGX.indrelid = PGC.oid 167s and PGX.indexrelid = PGXC.oid 167s and PGX.indisunique 167s and public.slon_quote_brute(PGXC.relname) = public.slon_quote_input(p_idx_name); 167s if not found then 167s raise exception 'Slony-I: table % has no unique index %', 167s v_tab_fqname_quoted, p_idx_name; 167s end if; 167s end if; 167s 167s -- 167s -- Return the found index name 167s -- 167s return v_idxrow.relname; 167s end; 167s $$ language plpgsql called on null input; 167s CREATE FUNCTION 167s comment on function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) is 167s 'FUNCTION determineIdxnameUnique (tab_fqname, indexname) 167s 167s Given a tablename, tab_fqname, check that the unique index, indexname, 167s exists or return the primary key index name for the table. If there 167s is no unique index, it raises an exception.'; 167s COMMENT 167s create or replace function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) returns text 167s as $$ 167s declare 167s v_tab_fqname_quoted text default ''; 167s v_idx_name_quoted text; 167s v_idxrow record; 167s v_attrow record; 167s v_i integer; 167s v_attno int2; 167s v_attkind text default ''; 167s v_attfound bool; 167s begin 167s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 167s v_idx_name_quoted := public.slon_quote_brute(p_idx_name); 167s -- 167s -- Ensure that the table exists 167s -- 167s if (select PGC.relname 167s from "pg_catalog".pg_class PGC, 167s "pg_catalog".pg_namespace PGN 167s where public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 167s and PGN.oid = PGC.relnamespace) is null then 167s raise exception 'Slony-I: table % not found', v_tab_fqname_quoted; 167s end if; 167s 167s -- 167s -- Lookup the tables primary key or the specified unique index 167s -- 167s if p_idx_name isnull then 167s raise exception 'Slony-I: index name must be specified'; 167s else 167s select PGXC.relname, PGX.indexrelid, PGX.indkey 167s into v_idxrow 167s from "pg_catalog".pg_class PGC, 167s "pg_catalog".pg_namespace PGN, 167s "pg_catalog".pg_index PGX, 167s "pg_catalog".pg_class PGXC 167s where public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 167s and PGN.oid = PGC.relnamespace 167s and PGX.indrelid = PGC.oid 167s and PGX.indexrelid = PGXC.oid 167s and PGX.indisunique 167s and public.slon_quote_brute(PGXC.relname) = v_idx_name_quoted; 167s if not found then 167s raise exception 'Slony-I: table % has no unique index %', 167s v_tab_fqname_quoted, v_idx_name_quoted; 167s end if; 167s end if; 167s 167s -- 167s -- Loop over the tables attributes and check if they are 167s -- index attributes. If so, add a "k" to the return value, 167s -- otherwise add a "v". 167s -- 167s for v_attrow in select PGA.attnum, PGA.attname 167s from "pg_catalog".pg_class PGC, 167s "pg_catalog".pg_namespace PGN, 167s "pg_catalog".pg_attribute PGA 167s where public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 167s and PGN.oid = PGC.relnamespace 167s and PGA.attrelid = PGC.oid 167s and not PGA.attisdropped 167s and PGA.attnum > 0 167s order by attnum 167s loop 167s v_attfound = 'f'; 167s 167s v_i := 0; 167s loop 167s select indkey[v_i] into v_attno from "pg_catalog".pg_index 167s where indexrelid = v_idxrow.indexrelid; 167s if v_attno isnull or v_attno = 0 then 167s exit; 167s end if; 167s if v_attrow.attnum = v_attno then 167s v_attfound = 't'; 167s exit; 167s end if; 167s v_i := v_i + 1; 167s end loop; 167s 167s if v_attfound then 167s v_attkind := v_attkind || 'k'; 167s else 167s v_attkind := v_attkind || 'v'; 167s end if; 167s end loop; 167s 167s -- Strip off trailing v characters as they are not needed by the logtrigger 167s v_attkind := pg_catalog.rtrim(v_attkind, 'v'); 167s 167s -- 167s -- Return the resulting attkind 167s -- 167s return v_attkind; 167s end; 167s $$ language plpgsql called on null input; 167s CREATE FUNCTION 167s comment on function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) is 167s 'determineAttKindUnique (tab_fqname, indexname) 167s 167s Given a tablename, return the Slony-I specific attkind (used for the 167s log trigger) of the table. Use the specified unique index or the 167s primary key (if indexname is NULL).'; 167s COMMENT 167s NOTICE: function public.updaterelname(int4,int4) does not exist, skipping 167s NOTICE: function public.updatereloid(int4,int4) does not exist, skipping 167s create or replace function public.RebuildListenEntries() 167s returns int 167s as $$ 167s declare 167s v_row record; 167s v_cnt integer; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s -- First remove the entire configuration 167s delete from public.sl_listen; 167s 167s -- Second populate the sl_listen configuration with a full 167s -- network of all possible paths. 167s insert into public.sl_listen 167s (li_origin, li_provider, li_receiver) 167s select pa_server, pa_server, pa_client from public.sl_path; 167s while true loop 167s insert into public.sl_listen 167s (li_origin, li_provider, li_receiver) 167s select distinct li_origin, pa_server, pa_client 167s from public.sl_listen, public.sl_path 167s where li_receiver = pa_server 167s and li_origin <> pa_client 167s and pa_conninfo<>'' 167s except 167s select li_origin, li_provider, li_receiver 167s from public.sl_listen; 167s 167s if not found then 167s exit; 167s end if; 167s end loop; 167s 167s -- We now replace specific event-origin,receiver combinations 167s -- with a configuration that tries to avoid events arriving at 167s -- a node before the data provider actually has the data ready. 167s 167s -- Loop over every possible pair of receiver and event origin 167s for v_row in select N1.no_id as receiver, N2.no_id as origin, 167s N2.no_failed as failed 167s from public.sl_node as N1, public.sl_node as N2 167s where N1.no_id <> N2.no_id 167s loop 167s -- 1st choice: 167s -- If we use the event origin as a data provider for any 167s -- set that originates on that very node, we are a direct 167s -- subscriber to that origin and listen there only. 167s if exists (select true from public.sl_set, public.sl_subscribe , public.sl_node p 167s where set_origin = v_row.origin 167s and sub_set = set_id 167s and sub_provider = v_row.origin 167s and sub_receiver = v_row.receiver 167s and sub_active 167s and p.no_active 167s and p.no_id=sub_provider 167s ) 167s then 167s delete from public.sl_listen 167s where li_origin = v_row.origin 167s and li_receiver = v_row.receiver; 167s insert into public.sl_listen (li_origin, li_provider, li_receiver) 167s values (v_row.origin, v_row.origin, v_row.receiver); 167s 167s -- 2nd choice: 167s -- If we are subscribed to any set originating on this 167s -- event origin, we want to listen on all data providers 167s -- we use for this origin. We are a cascaded subscriber 167s -- for sets from this node. 167s else 167s if exists (select true from public.sl_set, public.sl_subscribe, 167s public.sl_node provider 167s where set_origin = v_row.origin 167s and sub_set = set_id 167s and sub_provider=provider.no_id 167s and provider.no_failed = false 167s and sub_receiver = v_row.receiver 167s and sub_active) 167s then 167s delete from public.sl_listen 167s where li_origin = v_row.origin 167s and li_receiver = v_row.receiver; 167s insert into public.sl_listen (li_origin, li_provider, li_receiver) 167s select distinct set_origin, sub_provider, v_row.receiver 167s from public.sl_set, public.sl_subscribe 167s where set_origin = v_row.origin 167s and sub_set = set_id 167s and sub_receiver = v_row.receiver 167s and sub_active; 167s end if; 167s end if; 167s 167s if v_row.failed then 167s 167s --for every failed node we delete all sl_listen entries 167s --except via providers (listed in sl_subscribe) 167s --or failover candidates (sl_failover_targets) 167s --we do this to prevent a non-failover candidate 167s --that is more ahead of the failover candidate from 167s --sending events to the failover candidate that 167s --are 'too far ahead' 167s 167s --if the failed node is not an origin for any 167s --node then we don't delete all listen paths 167s --for events from it. Instead we leave 167s --the listen network alone. 167s 167s select count(*) into v_cnt from public.sl_subscribe sub, 167s public.sl_set s 167s where s.set_origin=v_row.origin and s.set_id=sub.sub_set; 167s if v_cnt > 0 then 167s delete from public.sl_listen where 167s li_origin=v_row.origin and 167s li_receiver=v_row.receiver 167s and li_provider not in 167s (select sub_provider from 167s public.sl_subscribe, 167s public.sl_set where 167s sub_set=set_id 167s and set_origin=v_row.origin); 167s end if; 167s end if; 167s -- insert into public.sl_listen 167s -- (li_origin,li_provider,li_receiver) 167s -- SELECT v_row.origin, pa_server 167s -- ,v_row.receiver 167s -- FROM public.sl_path where 167s -- pa_client=v_row.receiver 167s -- and (v_row.origin,pa_server,v_row.receiver) not in 167s -- (select li_origin,li_provider,li_receiver 167s -- from public.sl_listen); 167s -- end if; 167s end loop ; 167s 167s return null ; 167s end ; 167s $$ language 'plpgsql'; 167s CREATE FUNCTION 167s comment on function public.RebuildListenEntries() is 167s 'RebuildListenEntries() 167s 167s Invoked by various subscription and path modifying functions, this 167s rewrites the sl_listen entries, adding in all the ones required to 167s allow communications between nodes in the Slony-I cluster.'; 167s COMMENT 167s create or replace function public.generate_sync_event(p_interval interval) 167s returns int4 167s as $$ 167s declare 167s v_node_row record; 167s 167s BEGIN 167s select 1 into v_node_row from public.sl_event 167s where ev_type = 'SYNC' and ev_origin = public.getLocalNodeId('_main') 167s and ev_timestamp > now() - p_interval limit 1; 167s if not found then 167s -- If there has been no SYNC in the last interval, then push one 167s perform public.createEvent('_main', 'SYNC', NULL); 167s return 1; 167s else 167s return 0; 167s end if; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.generate_sync_event(p_interval interval) is 167s 'Generate a sync event if there has not been one in the requested interval, and this is a provider node.'; 167s COMMENT 167s drop function if exists public.updateRelname(int4, int4); 167s DROP FUNCTION 167s create or replace function public.updateRelname () 167s returns int4 167s as $$ 167s declare 167s v_no_id int4; 167s v_set_origin int4; 167s begin 167s -- ---- 167s -- Grab the central configuration lock 167s -- ---- 167s lock table public.sl_config_lock; 167s 167s update public.sl_table set 167s tab_relname = PGC.relname, tab_nspname = PGN.nspname 167s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 167s where public.sl_table.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid and 167s (tab_relname <> PGC.relname or tab_nspname <> PGN.nspname); 167s update public.sl_sequence set 167s seq_relname = PGC.relname, seq_nspname = PGN.nspname 167s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 167s where public.sl_sequence.seq_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid and 167s (seq_relname <> PGC.relname or seq_nspname <> PGN.nspname); 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.updateRelname() is 167s 'updateRelname()'; 167s COMMENT 167s drop function if exists public.updateReloid (int4, int4); 167s DROP FUNCTION 167s create or replace function public.updateReloid (p_set_id int4, p_only_on_node int4) 167s returns bigint 167s as $$ 167s declare 167s v_no_id int4; 167s v_set_origin int4; 167s prec record; 167s begin 167s -- ---- 167s -- Check that we either are the set origin or a current 167s -- subscriber of the set. 167s -- ---- 167s v_no_id := public.getLocalNodeId('_main'); 167s select set_origin into v_set_origin 167s from public.sl_set 167s where set_id = p_set_id 167s for update; 167s if not found then 167s raise exception 'Slony-I: set % not found', p_set_id; 167s end if; 167s if v_set_origin <> v_no_id 167s and not exists (select 1 from public.sl_subscribe 167s where sub_set = p_set_id 167s and sub_receiver = v_no_id) 167s then 167s return 0; 167s end if; 167s 167s -- ---- 167s -- If execution on only one node is requested, check that 167s -- we are that node. 167s -- ---- 167s if p_only_on_node > 0 and p_only_on_node <> v_no_id then 167s return 0; 167s end if; 167s 167s -- Update OIDs for tables to values pulled from non-table objects in pg_class 167s -- This ensures that we won't have collisions when repairing the oids 167s for prec in select tab_id from public.sl_table loop 167s update public.sl_table set tab_reloid = (select oid from pg_class pc where relkind <> 'r' and not exists (select 1 from public.sl_table t2 where t2.tab_reloid = pc.oid) limit 1) 167s where tab_id = prec.tab_id; 167s end loop; 167s 167s for prec in select tab_id, tab_relname, tab_nspname from public.sl_table loop 167s update public.sl_table set 167s tab_reloid = (select PGC.oid 167s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 167s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.tab_relname) 167s and PGC.relnamespace = PGN.oid 167s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.tab_nspname)) 167s where tab_id = prec.tab_id; 167s end loop; 167s 167s for prec in select seq_id from public.sl_sequence loop 167s update public.sl_sequence set seq_reloid = (select oid from pg_class pc where relkind <> 'S' and not exists (select 1 from public.sl_sequence t2 where t2.seq_reloid = pc.oid) limit 1) 167s where seq_id = prec.seq_id; 167s end loop; 167s 167s for prec in select seq_id, seq_relname, seq_nspname from public.sl_sequence loop 167s update public.sl_sequence set 167s seq_reloid = (select PGC.oid 167s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 167s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.seq_relname) 167s and PGC.relnamespace = PGN.oid 167s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.seq_nspname)) 167s where seq_id = prec.seq_id; 167s end loop; 167s 167s return 1; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.updateReloid(p_set_id int4, p_only_on_node int4) is 167s 'updateReloid(set_id, only_on_node) 167s 167s Updates the respective reloids in sl_table and sl_seqeunce based on 167s their respective FQN'; 167s COMMENT 167s create or replace function public.logswitch_start() 167s returns int4 as $$ 167s DECLARE 167s v_current_status int4; 167s BEGIN 167s -- ---- 167s -- Get the current log status. 167s -- ---- 167s select last_value into v_current_status from public.sl_log_status; 167s 167s -- ---- 167s -- status = 0: sl_log_1 active, sl_log_2 clean 167s -- Initiate a switch to sl_log_2. 167s -- ---- 167s if v_current_status = 0 then 167s perform "pg_catalog".setval('public.sl_log_status', 3); 167s perform public.registry_set_timestamp( 167s 'logswitch.laststart', now()); 167s raise notice 'Slony-I: Logswitch to sl_log_2 initiated'; 167s return 2; 167s end if; 167s 167s -- ---- 167s -- status = 1: sl_log_2 active, sl_log_1 clean 167s -- Initiate a switch to sl_log_1. 167s -- ---- 167s if v_current_status = 1 then 167s perform "pg_catalog".setval('public.sl_log_status', 2); 167s perform public.registry_set_timestamp( 167s 'logswitch.laststart', now()); 167s raise notice 'Slony-I: Logswitch to sl_log_1 initiated'; 167s return 1; 167s end if; 167s 167s raise exception 'Previous logswitch still in progress'; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.logswitch_start() is 167s 'logswitch_start() 167s 167s Initiate a log table switch if none is in progress'; 167s COMMENT 167s create or replace function public.logswitch_finish() 167s returns int4 as $$ 167s DECLARE 167s v_current_status int4; 167s v_dummy record; 167s v_origin int8; 167s v_seqno int8; 167s v_xmin bigint; 167s v_purgeable boolean; 167s BEGIN 167s -- ---- 167s -- Get the current log status. 167s -- ---- 167s select last_value into v_current_status from public.sl_log_status; 167s 167s -- ---- 167s -- status value 0 or 1 means that there is no log switch in progress 167s -- ---- 167s if v_current_status = 0 or v_current_status = 1 then 167s return 0; 167s end if; 167s 167s -- ---- 167s -- status = 2: sl_log_1 active, cleanup sl_log_2 167s -- ---- 167s if v_current_status = 2 then 167s v_purgeable := 'true'; 167s 167s -- ---- 167s -- Attempt to lock sl_log_2 in order to make sure there are no other transactions 167s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 167s -- blocking writers to sl_log_2 while it is waiting for a lock. It also prevents it 167s -- immediately truncating log data generated inside the transaction which was active 167s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 167s -- transaction is committed. 167s -- ---- 167s begin 167s lock table public.sl_log_2 in access exclusive mode nowait; 167s exception when lock_not_available then 167s raise notice 'Slony-I: could not lock sl_log_2 - sl_log_2 not truncated'; 167s return -1; 167s end; 167s 167s -- ---- 167s -- The cleanup thread calls us after it did the delete and 167s -- vacuum of both log tables. If sl_log_2 is empty now, we 167s -- can truncate it and the log switch is done. 167s -- ---- 167s for v_origin, v_seqno, v_xmin in 167s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 167s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 167s loop 167s if exists (select 1 from public.sl_log_2 where log_origin = v_origin and log_txid >= v_xmin limit 1) then 167s v_purgeable := 'false'; 167s end if; 167s end loop; 167s if not v_purgeable then 167s -- ---- 167s -- Found a row ... log switch is still in progress. 167s -- ---- 167s raise notice 'Slony-I: log switch to sl_log_1 still in progress - sl_log_2 not truncated'; 167s return -1; 167s end if; 167s 167s raise notice 'Slony-I: log switch to sl_log_1 complete - truncate sl_log_2'; 167s truncate public.sl_log_2; 167s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_2' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 167s execute 'alter table public.sl_log_2 set without oids;'; 167s end if; 167s perform "pg_catalog".setval('public.sl_log_status', 0); 167s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 167s perform public.addPartialLogIndices(); 167s 167s return 1; 167s end if; 167s 167s -- ---- 167s -- status = 3: sl_log_2 active, cleanup sl_log_1 167s -- ---- 167s if v_current_status = 3 then 167s v_purgeable := 'true'; 167s 167s -- ---- 167s -- Attempt to lock sl_log_1 in order to make sure there are no other transactions 167s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 167s -- blocking writes to sl_log_1 while it is waiting for a lock. It also prevents it 167s -- immediately truncating log data generated inside the transaction which was active 167s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 167s -- transaction is committed. 167s -- ---- 167s begin 167s lock table public.sl_log_1 in access exclusive mode nowait; 167s exception when lock_not_available then 167s raise notice 'Slony-I: could not lock sl_log_1 - sl_log_1 not truncated'; 167s return -1; 167s end; 167s 167s -- ---- 167s -- The cleanup thread calls us after it did the delete and 167s -- vacuum of both log tables. If sl_log_2 is empty now, we 167s -- can truncate it and the log switch is done. 167s -- ---- 167s for v_origin, v_seqno, v_xmin in 167s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 167s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 167s loop 167s if (exists (select 1 from public.sl_log_1 where log_origin = v_origin and log_txid >= v_xmin limit 1)) then 167s v_purgeable := 'false'; 167s end if; 167s end loop; 167s if not v_purgeable then 167s -- ---- 167s -- Found a row ... log switch is still in progress. 167s -- ---- 167s raise notice 'Slony-I: log switch to sl_log_2 still in progress - sl_log_1 not truncated'; 167s return -1; 167s end if; 167s 167s raise notice 'Slony-I: log switch to sl_log_2 complete - truncate sl_log_1'; 167s truncate public.sl_log_1; 167s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_1' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 167s execute 'alter table public.sl_log_1 set without oids;'; 167s end if; 167s perform "pg_catalog".setval('public.sl_log_status', 1); 167s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 167s perform public.addPartialLogIndices(); 167s return 2; 167s end if; 167s END; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.logswitch_finish() is 167s 'logswitch_finish() 167s 167s Attempt to finalize a log table switch in progress 167s return values: 167s -1 if switch in progress, but not complete 167s 0 if no switch in progress 167s 1 if performed truncate on sl_log_2 167s 2 if performed truncate on sl_log_1 167s '; 167s COMMENT 167s create or replace function public.addPartialLogIndices () returns integer as $$ 167s DECLARE 167s v_current_status int4; 167s v_log int4; 167s v_dummy record; 167s v_dummy2 record; 167s idef text; 167s v_count int4; 167s v_iname text; 167s v_ilen int4; 167s v_maxlen int4; 167s BEGIN 167s v_count := 0; 167s select last_value into v_current_status from public.sl_log_status; 167s 167s -- If status is 2 or 3 --> in process of cleanup --> unsafe to create indices 167s if v_current_status in (2, 3) then 167s return 0; 167s end if; 167s 167s if v_current_status = 0 then -- Which log should get indices? 167s v_log := 2; 167s else 167s v_log := 1; 167s end if; 167s -- PartInd_test_db_sl_log_2-node-1 167s -- Add missing indices... 167s for v_dummy in select distinct set_origin from public.sl_set loop 167s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' 167s || v_dummy.set_origin::text; 167s -- raise notice 'Consider adding partial index % on sl_log_%', v_iname, v_log; 167s -- raise notice 'schema: [_main] tablename:[sl_log_%]', v_log; 167s select * into v_dummy2 from pg_catalog.pg_indexes where tablename = 'sl_log_' || v_log::text and indexname = v_iname; 167s if not found then 167s -- raise notice 'index was not found - add it!'; 167s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' || v_dummy.set_origin::text; 167s v_ilen := pg_catalog.length(v_iname); 167s v_maxlen := pg_catalog.current_setting('max_identifier_length'::text)::int4; 167s if v_ilen > v_maxlen then 167s raise exception 'Length of proposed index name [%] > max_identifier_length [%] - cluster name probably too long', v_ilen, v_maxlen; 167s end if; 167s 167s idef := 'create index "' || v_iname || 167s '" on public.sl_log_' || v_log::text || ' USING btree(log_txid) where (log_origin = ' || v_dummy.set_origin::text || ');'; 167s execute idef; 167s v_count := v_count + 1; 167s else 167s -- raise notice 'Index % already present - skipping', v_iname; 167s end if; 167s end loop; 167s 167s -- Remove unneeded indices... 167s for v_dummy in select indexname from pg_catalog.pg_indexes i where i.tablename = 'sl_log_' || v_log::text and 167s i.indexname like ('PartInd_main_sl_log_' || v_log::text || '-node-%') and 167s not exists (select 1 from public.sl_set where 167s i.indexname = 'PartInd_main_sl_log_' || v_log::text || '-node-' || set_origin::text) 167s loop 167s -- raise notice 'Dropping obsolete index %d', v_dummy.indexname; 167s idef := 'drop index public."' || v_dummy.indexname || '";'; 167s execute idef; 167s v_count := v_count - 1; 167s end loop; 167s return v_count; 167s END 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.addPartialLogIndices () is 167s 'Add partial indexes, if possible, to the unused sl_log_? table for 167s all origin nodes, and drop any that are no longer needed. 167s 167s This function presently gets run any time set origins are manipulated 167s (FAILOVER, STORE SET, MOVE SET, DROP SET), as well as each time the 167s system switches between sl_log_1 and sl_log_2.'; 167s COMMENT 167s create or replace function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 167s returns bool as $$ 167s BEGIN 167s return exists ( 167s select 1 from "information_schema".columns 167s where table_schema = p_namespace 167s and table_name = p_table 167s and column_name = p_field 167s ); 167s END;$$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 167s is 'Check if a table has a specific attribute'; 167s COMMENT 167s create or replace function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 167s returns bool as $$ 167s DECLARE 167s v_row record; 167s v_query text; 167s BEGIN 167s if not public.check_table_field_exists(p_namespace, p_table, p_field) then 167s raise notice 'Upgrade table %.% - add field %', p_namespace, p_table, p_field; 167s v_query := 'alter table ' || p_namespace || '.' || p_table || ' add column '; 167s v_query := v_query || p_field || ' ' || p_type || ';'; 167s execute v_query; 167s return 't'; 167s else 167s return 'f'; 167s end if; 167s END;$$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 167s is 'Add a column of a given type to a table if it is missing'; 167s COMMENT 167s create or replace function public.upgradeSchema(p_old text) 167s returns text as $$ 167s declare 167s v_tab_row record; 167s v_query text; 167s v_keepstatus text; 167s begin 167s -- If old version is pre-2.0, then we require a special upgrade process 167s if p_old like '1.%' then 167s raise exception 'Upgrading to Slony-I 2.x requires running slony_upgrade_20'; 167s end if; 167s 167s perform public.upgradeSchemaAddTruncateTriggers(); 167s 167s -- Change all Slony-I-defined columns that are "timestamp without time zone" to "timestamp *WITH* time zone" 167s if exists (select 1 from information_schema.columns c 167s where table_schema = '_main' and data_type = 'timestamp without time zone' 167s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 167s and (c.table_name, c.column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp'))) 167s then 167s 167s -- Preserve sl_status 167s select pg_get_viewdef('public.sl_status') into v_keepstatus; 167s execute 'drop view sl_status'; 167s for v_tab_row in select table_schema, table_name, column_name from information_schema.columns c 167s where table_schema = '_main' and data_type = 'timestamp without time zone' 167s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 167s and (table_name, column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp')) 167s loop 167s raise notice 'Changing Slony-I column [%.%] to timestamp WITH time zone', v_tab_row.table_name, v_tab_row.column_name; 167s v_query := 'alter table ' || public.slon_quote_brute(v_tab_row.table_schema) || 167s '.' || v_tab_row.table_name || ' alter column ' || v_tab_row.column_name || 167s ' type timestamp with time zone;'; 167s execute v_query; 167s end loop; 167s -- restore sl_status 167s execute 'create view sl_status as ' || v_keepstatus; 167s end if; 167s 167s if not exists (select 1 from information_schema.tables where table_schema = '_main' and table_name = 'sl_components') then 167s v_query := ' 167s create table public.sl_components ( 167s co_actor text not null primary key, 167s co_pid integer not null, 167s co_node integer not null, 167s co_connection_pid integer not null, 167s co_activity text, 167s co_starttime timestamptz not null, 167s co_event bigint, 167s co_eventtype text 167s ) without oids; 167s '; 167s execute v_query; 167s end if; 167s 167s 167s 167s 167s 167s if not exists (select 1 from information_schema.tables t where table_schema = '_main' and table_name = 'sl_event_lock') then 167s v_query := 'create table public.sl_event_lock (dummy integer);'; 167s execute v_query; 167s end if; 167s 167s if not exists (select 1 from information_schema.tables t 167s where table_schema = '_main' 167s and table_name = 'sl_apply_stats') then 167s v_query := ' 167s create table public.sl_apply_stats ( 167s as_origin int4, 167s as_num_insert int8, 167s as_num_update int8, 167s as_num_delete int8, 167s as_num_truncate int8, 167s as_num_script int8, 167s as_num_total int8, 167s as_duration interval, 167s as_apply_first timestamptz, 167s as_apply_last timestamptz, 167s as_cache_prepare int8, 167s as_cache_hit int8, 167s as_cache_evict int8, 167s as_cache_prepare_max int8 167s ) WITHOUT OIDS;'; 167s execute v_query; 167s end if; 167s 167s -- 167s -- On the upgrade to 2.2, we change the layout of sl_log_N by 167s -- adding columns log_tablenspname, log_tablerelname, and 167s -- log_cmdupdncols as well as changing log_cmddata into 167s -- log_cmdargs, which is a text array. 167s -- 167s if not public.check_table_field_exists('_main', 'sl_log_1', 'log_cmdargs') then 167s -- 167s -- Check that the cluster is completely caught up 167s -- 167s if public.check_unconfirmed_log() then 167s raise EXCEPTION 'cannot upgrade to new sl_log_N format due to existing unreplicated data'; 167s end if; 167s 167s -- 167s -- Drop tables sl_log_1 and sl_log_2 167s -- 167s drop table public.sl_log_1; 167s drop table public.sl_log_2; 167s 167s -- 167s -- Create the new sl_log_1 167s -- 167s create table public.sl_log_1 ( 167s log_origin int4, 167s log_txid bigint, 167s log_tableid int4, 167s log_actionseq int8, 167s log_tablenspname text, 167s log_tablerelname text, 167s log_cmdtype "char", 167s log_cmdupdncols int4, 167s log_cmdargs text[] 167s ) without oids; 167s create index sl_log_1_idx1 on public.sl_log_1 167s (log_origin, log_txid, log_actionseq); 167s 167s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 167s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 167s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 167s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 167s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 167s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 167s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 167s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 167s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 167s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 167s 167s -- 167s -- Create the new sl_log_2 167s -- 167s create table public.sl_log_2 ( 167s log_origin int4, 167s log_txid bigint, 167s log_tableid int4, 167s log_actionseq int8, 167s log_tablenspname text, 167s log_tablerelname text, 167s log_cmdtype "char", 167s log_cmdupdncols int4, 167s log_cmdargs text[] 167s ) without oids; 167s create index sl_log_2_idx1 on public.sl_log_2 167s (log_origin, log_txid, log_actionseq); 167s 167s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 167s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 167s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 167s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 167s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 167s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 167s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 167s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 167s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 167s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 167s 167s create table public.sl_log_script ( 167s log_origin int4, 167s log_txid bigint, 167s log_actionseq int8, 167s log_cmdtype "char", 167s log_cmdargs text[] 167s ) WITHOUT OIDS; 167s create index sl_log_script_idx1 on public.sl_log_script 167s (log_origin, log_txid, log_actionseq); 167s 167s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 167s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 167s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 167s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 167s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 167s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 167s 167s -- 167s -- Put the log apply triggers back onto sl_log_1/2 167s -- 167s create trigger apply_trigger 167s before INSERT on public.sl_log_1 167s for each row execute procedure public.logApply('_main'); 167s alter table public.sl_log_1 167s enable replica trigger apply_trigger; 167s create trigger apply_trigger 167s before INSERT on public.sl_log_2 167s for each row execute procedure public.logApply('_main'); 167s alter table public.sl_log_2 167s enable replica trigger apply_trigger; 167s end if; 167s if not exists (select 1 from information_schema.routines where routine_schema = '_main' and routine_name = 'string_agg') then 167s CREATE AGGREGATE public.string_agg(text) ( 167s SFUNC=public.agg_text_sum, 167s STYPE=text, 167s INITCOND='' 167s ); 167s end if; 167s if not exists (select 1 from information_schema.views where table_schema='_main' and table_name='sl_failover_targets') then 167s create view public.sl_failover_targets as 167s select set_id, 167s set_origin as set_origin, 167s sub1.sub_receiver as backup_id 167s 167s FROM 167s public.sl_subscribe sub1 167s ,public.sl_set set1 167s where 167s sub1.sub_set=set_id 167s and sub1.sub_forward=true 167s --exclude candidates where the set_origin 167s --has a path a node but the failover 167s --candidate has no path to that node 167s and sub1.sub_receiver not in 167s (select p1.pa_client from 167s public.sl_path p1 167s left outer join public.sl_path p2 on 167s (p2.pa_client=p1.pa_client 167s and p2.pa_server=sub1.sub_receiver) 167s where p2.pa_client is null 167s and p1.pa_server=set_origin 167s and p1.pa_client<>sub1.sub_receiver 167s ) 167s and sub1.sub_provider=set_origin 167s --exclude any subscribers that are not 167s --direct subscribers of all sets on the 167s --origin 167s and sub1.sub_receiver not in 167s (select direct_recv.sub_receiver 167s from 167s 167s (--all direct receivers of the first set 167s select subs2.sub_receiver 167s from public.sl_subscribe subs2 167s where subs2.sub_provider=set1.set_origin 167s and subs2.sub_set=set1.set_id) as 167s direct_recv 167s inner join 167s (--all other sets from the origin 167s select set_id from public.sl_set set2 167s where set2.set_origin=set1.set_origin 167s and set2.set_id<>sub1.sub_set) 167s as othersets on(true) 167s left outer join public.sl_subscribe subs3 167s on(subs3.sub_set=othersets.set_id 167s and subs3.sub_forward=true 167s and subs3.sub_provider=set1.set_origin 167s and direct_recv.sub_receiver=subs3.sub_receiver) 167s where subs3.sub_receiver is null 167s ); 167s end if; 167s 167s if not public.check_table_field_exists('_main', 'sl_node', 'no_failed') then 167s alter table public.sl_node add column no_failed bool; 167s update public.sl_node set no_failed=false; 167s end if; 167s return p_old; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s create or replace function public.check_unconfirmed_log () 167s returns bool as $$ 167s declare 167s v_rc bool = false; 167s v_error bool = false; 167s v_origin integer; 167s v_allconf bigint; 167s v_allsnap txid_snapshot; 167s v_count bigint; 167s begin 167s -- 167s -- Loop over all nodes that are the origin of at least one set 167s -- 167s for v_origin in select distinct set_origin as no_id 167s from public.sl_set loop 167s -- 167s -- Per origin determine which is the highest event seqno 167s -- that is confirmed by all subscribers to any of the 167s -- origins sets. 167s -- 167s select into v_allconf min(max_seqno) from ( 167s select con_received, max(con_seqno) as max_seqno 167s from public.sl_confirm 167s where con_origin = v_origin 167s and con_received in ( 167s select distinct sub_receiver 167s from public.sl_set as SET, 167s public.sl_subscribe as SUB 167s where SET.set_id = SUB.sub_set 167s and SET.set_origin = v_origin 167s ) 167s group by con_received 167s ) as maxconfirmed; 167s if not found then 167s raise NOTICE 'check_unconfirmed_log(): cannot determine highest ev_seqno for node % confirmed by all subscribers', v_origin; 167s v_error = true; 167s continue; 167s end if; 167s 167s -- 167s -- Get the txid snapshot that corresponds with that event 167s -- 167s select into v_allsnap ev_snapshot 167s from public.sl_event 167s where ev_origin = v_origin 167s and ev_seqno = v_allconf; 167s if not found then 167s raise NOTICE 'check_unconfirmed_log(): cannot find event %,% in sl_event', v_origin, v_allconf; 167s v_error = true; 167s continue; 167s end if; 167s 167s -- 167s -- Count the number of log rows that appeard after that event. 167s -- 167s select into v_count count(*) from ( 167s select 1 from public.sl_log_1 167s where log_origin = v_origin 167s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 167s union all 167s select 1 from public.sl_log_1 167s where log_origin = v_origin 167s and log_txid in ( 167s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 167s ) 167s union all 167s select 1 from public.sl_log_2 167s where log_origin = v_origin 167s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 167s union all 167s select 1 from public.sl_log_2 167s where log_origin = v_origin 167s and log_txid in ( 167s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 167s ) 167s ) as cnt; 167s 167s if v_count > 0 then 167s raise NOTICE 'check_unconfirmed_log(): origin % has % log rows that have not propagated to all subscribers yet', v_origin, v_count; 167s v_rc = true; 167s end if; 167s end loop; 167s 167s if v_error then 167s raise EXCEPTION 'check_unconfirmed_log(): aborting due to previous inconsistency'; 167s end if; 167s 167s return v_rc; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s set search_path to public 167s ; 167s SET 167s comment on function public.upgradeSchema(p_old text) is 167s 'Called during "update functions" by slonik to perform schema changes'; 167s COMMENT 167s create or replace view public.sl_status as select 167s E.ev_origin as st_origin, 167s C.con_received as st_received, 167s E.ev_seqno as st_last_event, 167s E.ev_timestamp as st_last_event_ts, 167s C.con_seqno as st_last_received, 167s C.con_timestamp as st_last_received_ts, 167s CE.ev_timestamp as st_last_received_event_ts, 167s E.ev_seqno - C.con_seqno as st_lag_num_events, 167s current_timestamp - CE.ev_timestamp as st_lag_time 167s from public.sl_event E, public.sl_confirm C, 167s public.sl_event CE 167s where E.ev_origin = C.con_origin 167s and CE.ev_origin = E.ev_origin 167s and CE.ev_seqno = C.con_seqno 167s and (E.ev_origin, E.ev_seqno) in 167s (select ev_origin, max(ev_seqno) 167s from public.sl_event 167s where ev_origin = public.getLocalNodeId('_main') 167s group by 1 167s ) 167s and (C.con_origin, C.con_received, C.con_seqno) in 167s (select con_origin, con_received, max(con_seqno) 167s from public.sl_confirm 167s where con_origin = public.getLocalNodeId('_main') 167s group by 1, 2 167s ); 167s CREATE VIEW 167s comment on view public.sl_status is 'View showing how far behind remote nodes are.'; 167s COMMENT 167s create or replace function public.copyFields(p_tab_id integer) 167s returns text 167s as $$ 167s declare 167s result text; 167s prefix text; 167s prec record; 167s begin 167s result := ''; 167s prefix := '('; -- Initially, prefix is the opening paren 167s 167s for prec in select public.slon_quote_input(a.attname) as column from public.sl_table t, pg_catalog.pg_attribute a where t.tab_id = p_tab_id and t.tab_reloid = a.attrelid and a.attnum > 0 and a.attisdropped = false order by attnum 167s loop 167s result := result || prefix || prec.column; 167s prefix := ','; -- Subsequently, prepend columns with commas 167s end loop; 167s result := result || ')'; 167s return result; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.copyFields(p_tab_id integer) is 167s 'Return a string consisting of what should be appended to a COPY statement 167s to specify fields for the passed-in tab_id. 167s 167s In PG versions > 7.3, this looks like (field1,field2,...fieldn)'; 167s COMMENT 167s create or replace function public.prepareTableForCopy(p_tab_id int4) 167s returns int4 167s as $$ 167s declare 167s v_tab_oid oid; 167s v_tab_fqname text; 167s begin 167s -- ---- 167s -- Get the OID and fully qualified name for the table 167s -- --- 167s select PGC.oid, 167s public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) as tab_fqname 167s into v_tab_oid, v_tab_fqname 167s from public.sl_table T, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where T.tab_id = p_tab_id 167s and T.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid; 167s if not found then 167s raise exception 'Table with ID % not found in sl_table', p_tab_id; 167s end if; 167s 167s -- ---- 167s -- Try using truncate to empty the table and fallback to 167s -- delete on error. 167s -- ---- 167s perform public.TruncateOnlyTable(v_tab_fqname); 167s raise notice 'truncate of % succeeded', v_tab_fqname; 167s 167s -- suppress index activity 167s perform public.disable_indexes_on_table(v_tab_oid); 167s 167s return 1; 167s exception when others then 167s raise notice 'truncate of % failed - doing delete', v_tab_fqname; 167s perform public.disable_indexes_on_table(v_tab_oid); 167s execute 'delete from only ' || public.slon_quote_input(v_tab_fqname); 167s return 0; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.prepareTableForCopy(p_tab_id int4) is 167s 'Delete all data and suppress index maintenance'; 167s COMMENT 167s create or replace function public.finishTableAfterCopy(p_tab_id int4) 167s returns int4 167s as $$ 167s declare 167s v_tab_oid oid; 167s v_tab_fqname text; 167s begin 167s -- ---- 167s -- Get the tables OID and fully qualified name 167s -- --- 167s select PGC.oid, 167s public.slon_quote_brute(PGN.nspname) || '.' || 167s public.slon_quote_brute(PGC.relname) as tab_fqname 167s into v_tab_oid, v_tab_fqname 167s from public.sl_table T, 167s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 167s where T.tab_id = p_tab_id 167s and T.tab_reloid = PGC.oid 167s and PGC.relnamespace = PGN.oid; 167s if not found then 167s raise exception 'Table with ID % not found in sl_table', p_tab_id; 167s end if; 167s 167s -- ---- 167s -- Reenable indexes and reindex the table. 167s -- ---- 167s perform public.enable_indexes_on_table(v_tab_oid); 167s execute 'reindex table ' || public.slon_quote_input(v_tab_fqname); 167s 167s return 1; 167s end; 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.finishTableAfterCopy(p_tab_id int4) is 167s 'Reenable index maintenance and reindex the table'; 167s COMMENT 167s create or replace function public.setup_vactables_type () returns integer as $$ 167s begin 167s if not exists (select 1 from pg_catalog.pg_type t, pg_catalog.pg_namespace n 167s where n.nspname = '_main' and t.typnamespace = n.oid and 167s t.typname = 'vactables') then 167s execute 'create type public.vactables as (nspname name, relname name);'; 167s end if; 167s return 1; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.setup_vactables_type () is 167s 'Function to be run as part of loading slony1_funcs.sql that creates the vactables type if it is missing'; 167s COMMENT 167s select public.setup_vactables_type(); 167s setup_vactables_type 167s ---------------------- 167s 1 167s (1 row) 167s 167s drop function public.setup_vactables_type (); 167s DROP FUNCTION 167s create or replace function public.TablesToVacuum () returns setof public.vactables as $$ 167s declare 167s prec public.vactables%rowtype; 167s begin 167s prec.nspname := '_main'; 167s prec.relname := 'sl_event'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := '_main'; 167s prec.relname := 'sl_confirm'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := '_main'; 167s prec.relname := 'sl_setsync'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := '_main'; 167s prec.relname := 'sl_seqlog'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := '_main'; 167s prec.relname := 'sl_archive_counter'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := '_main'; 167s prec.relname := 'sl_components'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := '_main'; 167s prec.relname := 'sl_log_script'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := 'pg_catalog'; 167s prec.relname := 'pg_listener'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s prec.nspname := 'pg_catalog'; 167s prec.relname := 'pg_statistic'; 167s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 167s return next prec; 167s end if; 167s 167s return; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.TablesToVacuum () is 167s 'Return a list of tables that require frequent vacuuming. The 167s function is used so that the list is not hardcoded into C code.'; 167s COMMENT 167s create or replace function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 167s declare 167s 167s prec record; 167s v_origin int4; 167s v_isorigin boolean; 167s v_fqname text; 167s v_query text; 167s v_rows integer; 167s v_idxname text; 167s 167s begin 167s -- Need to validate that the set exists; the set will tell us if this is the origin 167s select set_origin into v_origin from public.sl_set where set_id = p_set_id; 167s if not found then 167s raise exception 'add_empty_table_to_replication: set % not found!', p_set_id; 167s end if; 167s 167s -- Need to be aware of whether or not this node is origin for the set 167s v_isorigin := ( v_origin = public.getLocalNodeId('_main') ); 167s 167s v_fqname := '"' || p_nspname || '"."' || p_tabname || '"'; 167s -- Take out a lock on the table 167s v_query := 'lock ' || v_fqname || ';'; 167s execute v_query; 167s 167s if v_isorigin then 167s -- On the origin, verify that the table is empty, failing if it has any tuples 167s v_query := 'select 1 as tuple from ' || v_fqname || ' limit 1;'; 167s execute v_query into prec; 167s GET DIAGNOSTICS v_rows = ROW_COUNT; 167s if v_rows = 0 then 167s raise notice 'add_empty_table_to_replication: table % empty on origin - OK', v_fqname; 167s else 167s raise exception 'add_empty_table_to_replication: table % contained tuples on origin node %', v_fqname, v_origin; 167s end if; 167s else 167s -- On other nodes, TRUNCATE the table 167s v_query := 'truncate ' || v_fqname || ';'; 167s execute v_query; 167s end if; 167s -- If p_idxname is NULL, then look up the PK index, and RAISE EXCEPTION if one does not exist 167s if p_idxname is NULL then 167s select c2.relname into prec from pg_catalog.pg_index i, pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_namespace n where i.indrelid = c1.oid and i.indexrelid = c2.oid and c1.relname = p_tabname and i.indisprimary and n.nspname = p_nspname and n.oid = c1.relnamespace; 167s if not found then 167s raise exception 'add_empty_table_to_replication: table % has no primary key and no candidate specified!', v_fqname; 167s else 167s v_idxname := prec.relname; 167s end if; 167s else 167s v_idxname := p_idxname; 167s end if; 167s return public.setAddTable_int(p_set_id, p_tab_id, v_fqname, v_idxname, p_comment); 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 167s 'Verify that a table is empty, and add it to replication. 167s tab_idxname is optional - if NULL, then we use the primary key. 167s 167s Note that this function is to be run within an EXECUTE SCRIPT script, 167s so it runs at the right place in the transaction stream on all 167s nodes.'; 167s COMMENT 167s create or replace function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 167s declare 167s prec record; 167s prec2 record; 167s v_set_id int4; 167s 167s begin 167s -- Look up the parent table; fail if it does not exist 167s select c1.oid into prec from pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_inherits i, pg_catalog.pg_namespace n where c1.oid = i.inhparent and c2.oid = i.inhrelid and n.oid = c2.relnamespace and n.nspname = p_nspname and c2.relname = p_tabname; 167s if not found then 167s raise exception 'replicate_partition: No parent table found for %.%!', p_nspname, p_tabname; 167s end if; 167s 167s -- The parent table tells us what replication set to use 167s select tab_set into prec2 from public.sl_table where tab_reloid = prec.oid; 167s if not found then 167s raise exception 'replicate_partition: Parent table % for new partition %.% is not replicated!', prec.oid, p_nspname, p_tabname; 167s end if; 167s 167s v_set_id := prec2.tab_set; 167s 167s -- Now, we have all the parameters necessary to run add_empty_table_to_replication... 167s return public.add_empty_table_to_replication(v_set_id, p_tab_id, p_nspname, p_tabname, p_idxname, p_comment); 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 167s 'Add a partition table to replication. 167s tab_idxname is optional - if NULL, then we use the primary key. 167s This function looks up replication configuration via the parent table. 167s 167s Note that this function is to be run within an EXECUTE SCRIPT script, 167s so it runs at the right place in the transaction stream on all 167s nodes.'; 167s COMMENT 167s create or replace function public.disable_indexes_on_table (i_oid oid) 167s returns integer as $$ 167s begin 167s -- Setting pg_class.relhasindex to false will cause copy not to 167s -- maintain any indexes. At the end of the copy we will reenable 167s -- them and reindex the table. This bulk creating of indexes is 167s -- faster. 167s 167s update pg_catalog.pg_class set relhasindex ='f' where oid = i_oid; 167s return 1; 167s end $$ 167s language plpgsql; 167s CREATE FUNCTION 167s comment on function public.disable_indexes_on_table(i_oid oid) is 167s 'disable indexes on the specified table. 167s Used during subscription process to suppress indexes, which allows 167s COPY to go much faster. 167s 167s This may be set as a SECURITY DEFINER in order to eliminate the need 167s for superuser access by Slony-I. 167s '; 167s COMMENT 167s create or replace function public.enable_indexes_on_table (i_oid oid) 167s returns integer as $$ 167s begin 167s update pg_catalog.pg_class set relhasindex ='t' where oid = i_oid; 167s return 1; 167s end $$ 167s language plpgsql 167s security definer; 167s CREATE FUNCTION 167s comment on function public.enable_indexes_on_table(i_oid oid) is 167s 're-enable indexes on the specified table. 167s 167s This may be set as a SECURITY DEFINER in order to eliminate the need 167s for superuser access by Slony-I. 167s '; 167s COMMENT 167s drop function if exists public.reshapeSubscription(int4,int4,int4); 167s DROP FUNCTION 167s create or replace function public.reshapeSubscription (p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) returns int4 as $$ 167s begin 167s update public.sl_subscribe 167s set sub_provider=p_sub_provider 167s from public.sl_set 167s WHERE sub_set=sl_set.set_id 167s and sl_set.set_origin=p_sub_origin and sub_receiver=p_sub_receiver; 167s if found then 167s perform public.RebuildListenEntries(); 167s notify "_main_Restart"; 167s end if; 167s return 0; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.reshapeSubscription(p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) is 167s 'Run on a receiver/subscriber node when the provider for that 167s subscription is being changed. Slonik will invoke this method 167s before the SUBSCRIBE_SET event propogates to the receiver 167s so listen paths can be updated.'; 167s COMMENT 167s create or replace function public.slon_node_health_check() returns boolean as $$ 167s declare 167s prec record; 167s all_ok boolean; 167s begin 167s all_ok := 't'::boolean; 167s -- validate that all tables in sl_table have: 167s -- sl_table agreeing with pg_class 167s for prec in select tab_id, tab_relname, tab_nspname from 167s public.sl_table t where not exists (select 1 from pg_catalog.pg_class c, pg_catalog.pg_namespace n 167s where c.oid = t.tab_reloid and c.relname = t.tab_relname and c.relnamespace = n.oid and n.nspname = t.tab_nspname) loop 167s all_ok := 'f'::boolean; 167s raise warning 'table [id,nsp,name]=[%,%,%] - sl_table does not match pg_class/pg_namespace', prec.tab_id, prec.tab_relname, prec.tab_nspname; 167s end loop; 167s if not all_ok then 167s raise warning 'Mismatch found between sl_table and pg_class. Slonik command REPAIR CONFIG may be useful to rectify this.'; 167s end if; 167s return all_ok; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.slon_node_health_check() is 'called when slon starts up to validate that there are not problems with node configuration. Returns t if all is OK, f if there is a problem.'; 167s COMMENT 167s create or replace function public.log_truncate () returns trigger as 167s $$ 167s declare 167s r_role text; 167s c_nspname text; 167s c_relname text; 167s c_log integer; 167s c_node integer; 167s c_tabid integer; 167s begin 167s -- Ignore this call if session_replication_role = 'local' 167s select into r_role setting 167s from pg_catalog.pg_settings where name = 'session_replication_role'; 167s if r_role = 'local' then 167s return NULL; 167s end if; 167s 167s c_tabid := tg_argv[0]; 167s c_node := public.getLocalNodeId('_main'); 167s select tab_nspname, tab_relname into c_nspname, c_relname 167s from public.sl_table where tab_id = c_tabid; 167s select last_value into c_log from public.sl_log_status; 167s if c_log in (0, 2) then 167s insert into public.sl_log_1 ( 167s log_origin, log_txid, log_tableid, 167s log_actionseq, log_tablenspname, 167s log_tablerelname, log_cmdtype, 167s log_cmdupdncols, log_cmdargs 167s ) values ( 167s c_node, pg_catalog.txid_current(), c_tabid, 167s nextval('public.sl_action_seq'), c_nspname, 167s c_relname, 'T', 0, '{}'::text[]); 167s else -- (1, 3) 167s insert into public.sl_log_2 ( 167s log_origin, log_txid, log_tableid, 167s log_actionseq, log_tablenspname, 167s log_tablerelname, log_cmdtype, 167s log_cmdupdncols, log_cmdargs 167s ) values ( 167s c_node, pg_catalog.txid_current(), c_tabid, 167s nextval('public.sl_action_seq'), c_nspname, 167s c_relname, 'T', 0, '{}'::text[]); 167s end if; 167s return NULL; 167s end 167s $$ language plpgsql 167s security definer; 167s CREATE FUNCTION 167s comment on function public.log_truncate () 167s is 'trigger function run when a replicated table receives a TRUNCATE request'; 167s COMMENT 167s create or replace function public.deny_truncate () returns trigger as 167s $$ 167s declare 167s r_role text; 167s begin 167s -- Ignore this call if session_replication_role = 'local' 167s select into r_role setting 167s from pg_catalog.pg_settings where name = 'session_replication_role'; 167s if r_role = 'local' then 167s return NULL; 167s end if; 167s 167s raise exception 'truncation of replicated table forbidden on subscriber node'; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.deny_truncate () 167s is 'trigger function run when a replicated table receives a TRUNCATE request'; 167s COMMENT 167s create or replace function public.store_application_name (i_name text) returns text as $$ 167s declare 167s p_command text; 167s begin 167s if exists (select 1 from pg_catalog.pg_settings where name = 'application_name') then 167s p_command := 'set application_name to '''|| i_name || ''';'; 167s execute p_command; 167s return i_name; 167s end if; 167s return NULL::text; 167s end $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.store_application_name (i_name text) is 167s 'Set application_name GUC, if possible. Returns NULL if it fails to work.'; 167s COMMENT 167s create or replace function public.is_node_reachable(origin_node_id integer, 167s receiver_node_id integer) returns boolean as $$ 167s declare 167s listen_row record; 167s reachable boolean; 167s begin 167s reachable:=false; 167s select * into listen_row from public.sl_listen where 167s li_origin=origin_node_id and li_receiver=receiver_node_id; 167s if found then 167s reachable:=true; 167s end if; 167s return reachable; 167s end $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.is_node_reachable(origin_node_id integer, receiver_node_id integer) 167s is 'Is the receiver node reachable from the origin, via any of the listen paths?'; 167s COMMENT 167s create or replace function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) returns integer as $$ 167s begin 167s -- Trim out old state for this component 167s if not exists (select 1 from public.sl_components where co_actor = i_actor) then 167s insert into public.sl_components 167s (co_actor, co_pid, co_node, co_connection_pid, co_activity, co_starttime, co_event, co_eventtype) 167s values 167s (i_actor, i_pid, i_node, i_conn_pid, i_activity, i_starttime, i_event, i_eventtype); 167s else 167s update public.sl_components 167s set 167s co_connection_pid = i_conn_pid, co_activity = i_activity, co_starttime = i_starttime, co_event = i_event, 167s co_eventtype = i_eventtype 167s where co_actor = i_actor 167s and co_starttime < i_starttime; 167s end if; 167s return 1; 167s end $$ 167s language plpgsql; 167s CREATE FUNCTION 167s comment on function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) is 167s 'Store state of a Slony component. Useful for monitoring'; 167s COMMENT 167s create or replace function public.recreate_log_trigger(p_fq_table_name text, 167s p_tab_id oid, p_tab_attkind text) returns integer as $$ 167s begin 167s execute 'drop trigger "_main_logtrigger" on ' || 167s p_fq_table_name ; 167s -- ---- 167s execute 'create trigger "_main_logtrigger"' || 167s ' after insert or update or delete on ' || 167s p_fq_table_name 167s || ' for each row execute procedure public.logTrigger (' || 167s pg_catalog.quote_literal('_main') || ',' || 167s pg_catalog.quote_literal(p_tab_id::text) || ',' || 167s pg_catalog.quote_literal(p_tab_attkind) || ');'; 167s return 0; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s comment on function public.recreate_log_trigger(p_fq_table_name text, 167s p_tab_id oid, p_tab_attkind text) is 167s 'A function that drops and recreates the log trigger on the specified table. 167s It is intended to be used after the primary_key/unique index has changed.'; 167s COMMENT 167s create or replace function public.repair_log_triggers(only_locked boolean) 167s returns integer as $$ 167s declare 167s retval integer; 167s table_row record; 167s begin 167s retval=0; 167s for table_row in 167s select tab_nspname,tab_relname, 167s tab_idxname, tab_id, mode, 167s public.determineAttKindUnique(tab_nspname|| 167s '.'||tab_relname,tab_idxname) as attkind 167s from 167s public.sl_table 167s left join 167s pg_locks on (relation=tab_reloid and pid=pg_backend_pid() 167s and mode='AccessExclusiveLock') 167s ,pg_trigger 167s where tab_reloid=tgrelid and 167s public.determineAttKindUnique(tab_nspname||'.' 167s ||tab_relname,tab_idxname) 167s !=(public.decode_tgargs(tgargs))[2] 167s and tgname = '_main' 167s || '_logtrigger' 167s LOOP 167s if (only_locked=false) or table_row.mode='AccessExclusiveLock' then 167s perform public.recreate_log_trigger 167s (table_row.tab_nspname||'.'||table_row.tab_relname, 167s table_row.tab_id,table_row.attkind); 167s retval=retval+1; 167s else 167s raise notice '%.% has an invalid configuration on the log trigger. This was not corrected because only_lock is true and the table is not locked.', 167s table_row.tab_nspname,table_row.tab_relname; 167s 167s end if; 167s end loop; 167s return retval; 167s end 167s $$ 167s language plpgsql; 167s CREATE FUNCTION 167s comment on function public.repair_log_triggers(only_locked boolean) 167s is ' 167s repair the log triggers as required. If only_locked is true then only 167s tables that are already exclusively locked by the current transaction are 167s repaired. Otherwise all replicated tables with outdated trigger arguments 167s are recreated.'; 167s COMMENT 167s create or replace function public.unsubscribe_abandoned_sets(p_failed_node int4) returns bigint 167s as $$ 167s declare 167s v_row record; 167s v_seq_id bigint; 167s v_local_node int4; 167s begin 167s 167s select public.getLocalNodeId('_main') into 167s v_local_node; 167s 167s if found then 167s --abandon all subscriptions from this origin. 167s for v_row in select sub_set,sub_receiver from 167s public.sl_subscribe, public.sl_set 167s where sub_set=set_id and set_origin=p_failed_node 167s and sub_receiver=v_local_node 167s loop 167s raise notice 'Slony-I: failover_abandon_set() is abandoning subscription to set % on node % because it is too far ahead', v_row.sub_set, 167s v_local_node; 167s --If this node is a provider for the set 167s --then the receiver needs to be unsubscribed. 167s -- 167s select public.unsubscribeSet(v_row.sub_set, 167s v_local_node,true) 167s into v_seq_id; 167s end loop; 167s end if; 167s 167s return v_seq_id; 167s end 167s $$ language plpgsql; 167s CREATE FUNCTION 167s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 167s $BODY$ 167s DECLARE 167s c_delim text; 167s BEGIN 167s c_delim = ','; 167s IF (txt_before IS NULL or txt_before='') THEN 167s RETURN txt_new; 167s END IF; 167s RETURN txt_before || c_delim || txt_new; 167s END; 167s $BODY$ 167s LANGUAGE plpgsql; 167s CREATE FUNCTION 167s comment on function public.agg_text_sum(text,text) is 167s 'An accumulator function used by the slony string_agg function to 167s aggregate rows into a string'; 167s COMMENT 167s Dropping cluster 17/regress ... 167s ### End 17 psql ### 167s NOTICE: function public.reshapesubscription(int4,int4,int4) does not exist, skipping 167s autopkgtest [06:58:14]: test load-functions: -----------------------] 168s load-functions PASS 168s autopkgtest [06:58:15]: test load-functions: - - - - - - - - - - results - - - - - - - - - - 168s autopkgtest [06:58:15]: @@@@@@@@@@@@@@@@@@@@ summary 168s load-functions PASS 184s nova [W] Using flock in prodstack6-s390x 184s flock: timeout while waiting to get lock 184s Creating nova instance adt-plucky-s390x-slony1-2-20250222-065527-juju-7f2275-prod-proposed-migration-environment-20-7094059e-3c68-4721-83c7-b5dbde5d1c5e from image adt/ubuntu-plucky-s390x-server-20250222.img (UUID ae633cda-0dee-481c-9805-d0ef3f3b5166)... 184s nova [W] Timed out waiting for c4150888-ccfe-49d8-a358-8e07345f8673 to get deleted.