0s autopkgtest [10:05:39]: starting date and time: 2025-01-06 10:05:39+0000 0s autopkgtest [10:05:39]: git checkout: 325255d2 Merge branch 'pin-any-arch' into 'ubuntu/production' 0s autopkgtest [10:05:39]: host juju-7f2275-prod-proposed-migration-environment-2; command line: /home/ubuntu/autopkgtest/runner/autopkgtest --output-dir /tmp/autopkgtest-work._04mi3lk/out --timeout-copy=6000 --setup-commands /home/ubuntu/autopkgtest-cloud/worker-config-production/setup-canonical.sh --apt-pocket=proposed=src:postgresql-17,src:icu --apt-upgrade slony1-2 --timeout-short=300 --timeout-copy=20000 --timeout-build=20000 '--env=ADT_TEST_TRIGGERS=postgresql-17/17.2-1build2 icu/76.1-1ubuntu1' -- ssh -s /home/ubuntu/autopkgtest/ssh-setup/nova -- --flavor autopkgtest-s390x --security-groups autopkgtest-juju-7f2275-prod-proposed-migration-environment-2@bos03-s390x-12.secgroup --name adt-plucky-s390x-slony1-2-20250106-100539-juju-7f2275-prod-proposed-migration-environment-2-57d29276-7e7c-440e-b42d-21beda410862 --image adt/ubuntu-plucky-s390x-server --keyname testbed-juju-7f2275-prod-proposed-migration-environment-2 --net-id=net_prod-proposed-migration-s390x -e TERM=linux -e ''"'"'http_proxy=http://squid.internal:3128'"'"'' -e ''"'"'https_proxy=http://squid.internal:3128'"'"'' -e ''"'"'no_proxy=127.0.0.1,127.0.1.1,login.ubuntu.com,localhost,localdomain,novalocal,internal,archive.ubuntu.com,ports.ubuntu.com,security.ubuntu.com,ddebs.ubuntu.com,changelogs.ubuntu.com,keyserver.ubuntu.com,launchpadlibrarian.net,launchpadcontent.net,launchpad.net,10.24.0.0/24,keystone.ps5.canonical.com,objectstorage.prodstack5.canonical.com,radosgw.ps5.canonical.com'"'"'' --mirror=http://ftpmaster.internal/ubuntu/ 48s autopkgtest [10:06:27]: testbed dpkg architecture: s390x 48s autopkgtest [10:06:27]: testbed apt version: 2.9.18 49s autopkgtest [10:06:28]: @@@@@@@@@@@@@@@@@@@@ test bed setup 49s autopkgtest [10:06:28]: testbed release detected to be: None 50s autopkgtest [10:06:29]: updating testbed package index (apt update) 50s Get:1 http://ftpmaster.internal/ubuntu plucky-proposed InRelease [73.9 kB] 50s Hit:2 http://ftpmaster.internal/ubuntu plucky InRelease 50s Hit:3 http://ftpmaster.internal/ubuntu plucky-updates InRelease 50s Hit:4 http://ftpmaster.internal/ubuntu plucky-security InRelease 50s Get:5 http://ftpmaster.internal/ubuntu plucky-proposed/universe Sources [741 kB] 51s Get:6 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse Sources [17.2 kB] 51s Get:7 http://ftpmaster.internal/ubuntu plucky-proposed/restricted Sources [9708 B] 51s Get:8 http://ftpmaster.internal/ubuntu plucky-proposed/main Sources [105 kB] 51s Get:9 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x Packages [173 kB] 51s Get:10 http://ftpmaster.internal/ubuntu plucky-proposed/restricted s390x Packages [756 B] 51s Get:11 http://ftpmaster.internal/ubuntu plucky-proposed/universe s390x Packages [765 kB] 51s Get:12 http://ftpmaster.internal/ubuntu plucky-proposed/multiverse s390x Packages [5928 B] 51s Fetched 1892 kB in 1s (1895 kB/s) 51s Reading package lists... 52s Reading package lists... 52s Building dependency tree... 52s Reading state information... 52s Calculating upgrade... 52s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 53s Reading package lists... 53s Building dependency tree... 53s Reading state information... 53s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 53s autopkgtest [10:06:32]: upgrading testbed (apt dist-upgrade and autopurge) 53s Reading package lists... 53s Building dependency tree... 53s Reading state information... 53s Calculating upgrade...Starting pkgProblemResolver with broken count: 0 53s Starting 2 pkgProblemResolver with broken count: 0 53s Done 54s Entering ResolveByKeep 54s 54s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 54s Reading package lists... 54s Building dependency tree... 54s Reading state information... 54s Starting pkgProblemResolver with broken count: 0 54s Starting 2 pkgProblemResolver with broken count: 0 54s Done 54s 0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded. 57s autopkgtest [10:06:36]: testbed running kernel: Linux 6.11.0-8-generic #8-Ubuntu SMP Mon Sep 16 12:49:35 UTC 2024 57s autopkgtest [10:06:36]: @@@@@@@@@@@@@@@@@@@@ apt-source slony1-2 60s Get:1 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-4 (dsc) [2413 B] 60s Get:2 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-4 (tar) [1465 kB] 60s Get:3 http://ftpmaster.internal/ubuntu plucky/universe slony1-2 2.2.11-4 (diff) [17.1 kB] 60s gpgv: Signature made Sun Jul 28 15:46:54 2024 UTC 60s gpgv: using RSA key 5C48FE6157F49179597087C64C5A6BAB12D2A7AE 60s gpgv: Can't check signature: No public key 60s dpkg-source: warning: cannot verify inline signature for ./slony1-2_2.2.11-4.dsc: no acceptable signature found 61s autopkgtest [10:06:40]: testing package slony1-2 version 2.2.11-4 61s autopkgtest [10:06:40]: build not needed 62s autopkgtest [10:06:41]: test load-functions: preparing testbed 62s Reading package lists... 62s Building dependency tree... 62s Reading state information... 62s Starting pkgProblemResolver with broken count: 0 62s Starting 2 pkgProblemResolver with broken count: 0 62s Done 63s The following NEW packages will be installed: 63s libjson-perl libpq5 libxslt1.1 postgresql-16 postgresql-16-slony1-2 63s postgresql-client-16 postgresql-client-common postgresql-common slony1-2-bin 63s slony1-2-doc ssl-cert 63s 0 upgraded, 11 newly installed, 0 to remove and 0 not upgraded. 63s Need to get 18.8 MB of archives. 63s After this operation, 55.5 MB of additional disk space will be used. 63s Get:1 http://ftpmaster.internal/ubuntu plucky/main s390x libjson-perl all 4.10000-1 [81.9 kB] 63s Get:2 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-client-common all 262 [36.7 kB] 63s Get:3 http://ftpmaster.internal/ubuntu plucky/main s390x ssl-cert all 1.1.3ubuntu1 [18.7 kB] 63s Get:4 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-common all 262 [162 kB] 63s Get:5 http://ftpmaster.internal/ubuntu plucky-proposed/main s390x libpq5 s390x 17.2-1build2 [146 kB] 63s Get:6 http://ftpmaster.internal/ubuntu plucky/main s390x libxslt1.1 s390x 1.1.39-0exp1ubuntu2 [169 kB] 63s Get:7 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-client-16 s390x 16.4-3 [1294 kB] 63s Get:8 http://ftpmaster.internal/ubuntu plucky/main s390x postgresql-16 s390x 16.4-3 [16.3 MB] 64s Get:9 http://ftpmaster.internal/ubuntu plucky/universe s390x postgresql-16-slony1-2 s390x 2.2.11-4 [21.3 kB] 64s Get:10 http://ftpmaster.internal/ubuntu plucky/universe s390x slony1-2-bin s390x 2.2.11-4 [229 kB] 64s Get:11 http://ftpmaster.internal/ubuntu plucky/universe s390x slony1-2-doc all 2.2.11-4 [328 kB] 64s Preconfiguring packages ... 64s Fetched 18.8 MB in 1s (13.4 MB/s) 64s Selecting previously unselected package libjson-perl. 64s (Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 55616 files and directories currently installed.) 64s Preparing to unpack .../00-libjson-perl_4.10000-1_all.deb ... 64s Unpacking libjson-perl (4.10000-1) ... 64s Selecting previously unselected package postgresql-client-common. 64s Preparing to unpack .../01-postgresql-client-common_262_all.deb ... 64s Unpacking postgresql-client-common (262) ... 64s Selecting previously unselected package ssl-cert. 64s Preparing to unpack .../02-ssl-cert_1.1.3ubuntu1_all.deb ... 64s Unpacking ssl-cert (1.1.3ubuntu1) ... 64s Selecting previously unselected package postgresql-common. 64s Preparing to unpack .../03-postgresql-common_262_all.deb ... 64s Adding 'diversion of /usr/bin/pg_config to /usr/bin/pg_config.libpq-dev by postgresql-common' 64s Unpacking postgresql-common (262) ... 65s Selecting previously unselected package libpq5:s390x. 65s Preparing to unpack .../04-libpq5_17.2-1build2_s390x.deb ... 65s Unpacking libpq5:s390x (17.2-1build2) ... 65s Selecting previously unselected package libxslt1.1:s390x. 65s Preparing to unpack .../05-libxslt1.1_1.1.39-0exp1ubuntu2_s390x.deb ... 65s Unpacking libxslt1.1:s390x (1.1.39-0exp1ubuntu2) ... 65s Selecting previously unselected package postgresql-client-16. 65s Preparing to unpack .../06-postgresql-client-16_16.4-3_s390x.deb ... 65s Unpacking postgresql-client-16 (16.4-3) ... 65s Selecting previously unselected package postgresql-16. 65s Preparing to unpack .../07-postgresql-16_16.4-3_s390x.deb ... 65s Unpacking postgresql-16 (16.4-3) ... 65s Selecting previously unselected package postgresql-16-slony1-2. 65s Preparing to unpack .../08-postgresql-16-slony1-2_2.2.11-4_s390x.deb ... 65s Unpacking postgresql-16-slony1-2 (2.2.11-4) ... 65s Selecting previously unselected package slony1-2-bin. 65s Preparing to unpack .../09-slony1-2-bin_2.2.11-4_s390x.deb ... 65s Unpacking slony1-2-bin (2.2.11-4) ... 65s Selecting previously unselected package slony1-2-doc. 65s Preparing to unpack .../10-slony1-2-doc_2.2.11-4_all.deb ... 65s Unpacking slony1-2-doc (2.2.11-4) ... 65s Setting up postgresql-client-common (262) ... 65s Setting up libpq5:s390x (17.2-1build2) ... 65s Setting up ssl-cert (1.1.3ubuntu1) ... 65s Created symlink '/etc/systemd/system/multi-user.target.wants/ssl-cert.service' → '/usr/lib/systemd/system/ssl-cert.service'. 66s Setting up libjson-perl (4.10000-1) ... 66s Setting up libxslt1.1:s390x (1.1.39-0exp1ubuntu2) ... 66s Setting up slony1-2-doc (2.2.11-4) ... 66s Setting up postgresql-client-16 (16.4-3) ... 66s update-alternatives: using /usr/share/postgresql/16/man/man1/psql.1.gz to provide /usr/share/man/man1/psql.1.gz (psql.1.gz) in auto mode 66s Setting up postgresql-common (262) ... 66s Creating config file /etc/postgresql-common/createcluster.conf with new version 66s Building PostgreSQL dictionaries from installed myspell/hunspell packages... 66s Removing obsolete dictionary files: 66s Created symlink '/etc/systemd/system/multi-user.target.wants/postgresql.service' → '/usr/lib/systemd/system/postgresql.service'. 67s Setting up slony1-2-bin (2.2.11-4) ... 67s Setting up postgresql-16 (16.4-3) ... 67s Creating new PostgreSQL cluster 16/main ... 67s /usr/lib/postgresql/16/bin/initdb -D /var/lib/postgresql/16/main --auth-local peer --auth-host scram-sha-256 --no-instructions 67s The files belonging to this database system will be owned by user "postgres". 67s This user must also own the server process. 67s 67s The database cluster will be initialized with locale "C.UTF-8". 67s The default database encoding has accordingly been set to "UTF8". 67s The default text search configuration will be set to "english". 67s 67s Data page checksums are disabled. 67s 67s fixing permissions on existing directory /var/lib/postgresql/16/main ... ok 67s creating subdirectories ... ok 67s selecting dynamic shared memory implementation ... posix 67s selecting default max_connections ... 100 67s selecting default shared_buffers ... 128MB 67s selecting default time zone ... Etc/UTC 67s creating configuration files ... ok 67s running bootstrap script ... ok 68s performing post-bootstrap initialization ... ok 68s syncing data to disk ... ok 71s Setting up postgresql-16-slony1-2 (2.2.11-4) ... 71s Processing triggers for libc-bin (2.40-4ubuntu1) ... 71s Processing triggers for man-db (2.13.0-1) ... 73s autopkgtest [10:06:52]: test load-functions: [----------------------- 74s ### PostgreSQL 16 psql ### 74s Creating new PostgreSQL cluster 16/regress ... 76s create table public.sl_node ( 76s no_id int4, 76s no_active bool, 76s no_comment text, 76s no_failed bool, 76s CONSTRAINT "sl_node-pkey" 76s PRIMARY KEY (no_id) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_node is 'Holds the list of nodes associated with this namespace.'; 76s COMMENT 76s comment on column public.sl_node.no_id is 'The unique ID number for the node'; 76s COMMENT 76s comment on column public.sl_node.no_active is 'Is the node active in replication yet?'; 76s COMMENT 76s comment on column public.sl_node.no_comment is 'A human-oriented description of the node'; 76s COMMENT 76s create table public.sl_nodelock ( 76s nl_nodeid int4, 76s nl_conncnt serial, 76s nl_backendpid int4, 76s CONSTRAINT "sl_nodelock-pkey" 76s PRIMARY KEY (nl_nodeid, nl_conncnt) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_nodelock is 'Used to prevent multiple slon instances and to identify the backends to kill in terminateNodeConnections().'; 76s COMMENT 76s comment on column public.sl_nodelock.nl_nodeid is 'Clients node_id'; 76s COMMENT 76s comment on column public.sl_nodelock.nl_conncnt is 'Clients connection number'; 76s COMMENT 76s comment on column public.sl_nodelock.nl_backendpid is 'PID of database backend owning this lock'; 76s COMMENT 76s create table public.sl_set ( 76s set_id int4, 76s set_origin int4, 76s set_locked bigint, 76s set_comment text, 76s CONSTRAINT "sl_set-pkey" 76s PRIMARY KEY (set_id), 76s CONSTRAINT "set_origin-no_id-ref" 76s FOREIGN KEY (set_origin) 76s REFERENCES public.sl_node (no_id) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_set is 'Holds definitions of replication sets.'; 76s COMMENT 76s comment on column public.sl_set.set_id is 'A unique ID number for the set.'; 76s COMMENT 76s comment on column public.sl_set.set_origin is 76s 'The ID number of the source node for the replication set.'; 76s COMMENT 76s comment on column public.sl_set.set_locked is 'Transaction ID where the set was locked.'; 76s COMMENT 76s comment on column public.sl_set.set_comment is 'A human-oriented description of the set.'; 76s COMMENT 76s create table public.sl_setsync ( 76s ssy_setid int4, 76s ssy_origin int4, 76s ssy_seqno int8, 76s ssy_snapshot "pg_catalog".txid_snapshot, 76s ssy_action_list text, 76s CONSTRAINT "sl_setsync-pkey" 76s PRIMARY KEY (ssy_setid), 76s CONSTRAINT "ssy_setid-set_id-ref" 76s FOREIGN KEY (ssy_setid) 76s REFERENCES public.sl_set (set_id), 76s CONSTRAINT "ssy_origin-no_id-ref" 76s FOREIGN KEY (ssy_origin) 76s REFERENCES public.sl_node (no_id) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_setsync is 'SYNC information'; 76s COMMENT 76s comment on column public.sl_setsync.ssy_setid is 'ID number of the replication set'; 76s COMMENT 76s comment on column public.sl_setsync.ssy_origin is 'ID number of the node'; 76s COMMENT 76s comment on column public.sl_setsync.ssy_seqno is 'Slony-I sequence number'; 76s COMMENT 76s comment on column public.sl_setsync.ssy_snapshot is 'TXID in provider system seen by the event'; 76s COMMENT 76s comment on column public.sl_setsync.ssy_action_list is 'action list used during the subscription process. At the time a subscriber copies over data from the origin, it sees all tables in a state somewhere between two SYNC events. Therefore this list must contains all log_actionseqs that are visible at that time, whose operations have therefore already been included in the data copied at the time the initial data copy is done. Those actions may therefore be filtered out of the first SYNC done after subscribing.'; 76s COMMENT 76s create table public.sl_table ( 76s tab_id int4, 76s tab_reloid oid UNIQUE NOT NULL, 76s tab_relname name NOT NULL, 76s tab_nspname name NOT NULL, 76s tab_set int4, 76s tab_idxname name NOT NULL, 76s tab_altered boolean NOT NULL, 76s tab_comment text, 76s CONSTRAINT "sl_table-pkey" 76s PRIMARY KEY (tab_id), 76s CONSTRAINT "tab_set-set_id-ref" 76s FOREIGN KEY (tab_set) 76s REFERENCES public.sl_set (set_id) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_table is 'Holds information about the tables being replicated.'; 76s COMMENT 76s comment on column public.sl_table.tab_id is 'Unique key for Slony-I to use to identify the table'; 76s COMMENT 76s comment on column public.sl_table.tab_reloid is 'The OID of the table in pg_catalog.pg_class.oid'; 76s COMMENT 76s comment on column public.sl_table.tab_relname is 'The name of the table in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 76s COMMENT 76s comment on column public.sl_table.tab_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 76s COMMENT 76s comment on column public.sl_table.tab_set is 'ID of the replication set the table is in'; 76s COMMENT 76s comment on column public.sl_table.tab_idxname is 'The name of the primary index of the table'; 76s COMMENT 76s comment on column public.sl_table.tab_altered is 'Has the table been modified for replication?'; 76s COMMENT 76s comment on column public.sl_table.tab_comment is 'Human-oriented description of the table'; 76s COMMENT 76s create table public.sl_sequence ( 76s seq_id int4, 76s seq_reloid oid UNIQUE NOT NULL, 76s seq_relname name NOT NULL, 76s seq_nspname name NOT NULL, 76s seq_set int4, 76s seq_comment text, 76s CONSTRAINT "sl_sequence-pkey" 76s PRIMARY KEY (seq_id), 76s CONSTRAINT "seq_set-set_id-ref" 76s FOREIGN KEY (seq_set) 76s REFERENCES public.sl_set (set_id) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_sequence is 'Similar to sl_table, each entry identifies a sequence being replicated.'; 76s COMMENT 76s comment on column public.sl_sequence.seq_id is 'An internally-used ID for Slony-I to use in its sequencing of updates'; 76s COMMENT 76s comment on column public.sl_sequence.seq_reloid is 'The OID of the sequence object'; 76s COMMENT 76s comment on column public.sl_sequence.seq_relname is 'The name of the sequence in pg_catalog.pg_class.relname used to recover from a dump/restore cycle'; 76s COMMENT 76s comment on column public.sl_sequence.seq_nspname is 'The name of the schema in pg_catalog.pg_namespace.nspname used to recover from a dump/restore cycle'; 76s COMMENT 76s comment on column public.sl_sequence.seq_set is 'Indicates which replication set the object is in'; 76s COMMENT 76s comment on column public.sl_sequence.seq_comment is 'A human-oriented comment'; 76s COMMENT 76s create table public.sl_path ( 76s pa_server int4, 76s pa_client int4, 76s pa_conninfo text NOT NULL, 76s pa_connretry int4, 76s CONSTRAINT "sl_path-pkey" 76s PRIMARY KEY (pa_server, pa_client), 76s CONSTRAINT "pa_server-no_id-ref" 76s FOREIGN KEY (pa_server) 76s REFERENCES public.sl_node (no_id), 76s CONSTRAINT "pa_client-no_id-ref" 76s FOREIGN KEY (pa_client) 76s REFERENCES public.sl_node (no_id) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_path is 'Holds connection information for the paths between nodes, and the synchronisation delay'; 76s COMMENT 76s comment on column public.sl_path.pa_server is 'The Node ID # (from sl_node.no_id) of the data source'; 76s COMMENT 76s comment on column public.sl_path.pa_client is 'The Node ID # (from sl_node.no_id) of the data target'; 76s COMMENT 76s comment on column public.sl_path.pa_conninfo is 'The PostgreSQL connection string used to connect to the source node.'; 76s COMMENT 76s comment on column public.sl_path.pa_connretry is 'The synchronisation delay, in seconds'; 76s COMMENT 76s create table public.sl_listen ( 76s li_origin int4, 76s li_provider int4, 76s li_receiver int4, 76s CONSTRAINT "sl_listen-pkey" 76s PRIMARY KEY (li_origin, li_provider, li_receiver), 76s CONSTRAINT "li_origin-no_id-ref" 76s FOREIGN KEY (li_origin) 76s REFERENCES public.sl_node (no_id), 76s CONSTRAINT "sl_listen-sl_path-ref" 76s FOREIGN KEY (li_provider, li_receiver) 76s REFERENCES public.sl_path (pa_server, pa_client) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_listen is 'Indicates how nodes listen to events from other nodes in the Slony-I network.'; 76s COMMENT 76s comment on column public.sl_listen.li_origin is 'The ID # (from sl_node.no_id) of the node this listener is operating on'; 76s COMMENT 76s comment on column public.sl_listen.li_provider is 'The ID # (from sl_node.no_id) of the source node for this listening event'; 76s COMMENT 76s comment on column public.sl_listen.li_receiver is 'The ID # (from sl_node.no_id) of the target node for this listening event'; 76s COMMENT 76s create table public.sl_subscribe ( 76s sub_set int4, 76s sub_provider int4, 76s sub_receiver int4, 76s sub_forward bool, 76s sub_active bool, 76s CONSTRAINT "sl_subscribe-pkey" 76s PRIMARY KEY (sub_receiver, sub_set), 76s CONSTRAINT "sl_subscribe-sl_path-ref" 76s FOREIGN KEY (sub_provider, sub_receiver) 76s REFERENCES public.sl_path (pa_server, pa_client), 76s CONSTRAINT "sub_set-set_id-ref" 76s FOREIGN KEY (sub_set) 76s REFERENCES public.sl_set (set_id) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_subscribe is 'Holds a list of subscriptions on sets'; 76s COMMENT 76s comment on column public.sl_subscribe.sub_set is 'ID # (from sl_set) of the set being subscribed to'; 76s COMMENT 76s comment on column public.sl_subscribe.sub_provider is 'ID# (from sl_node) of the node providing data'; 76s COMMENT 76s comment on column public.sl_subscribe.sub_receiver is 'ID# (from sl_node) of the node receiving data from the provider'; 76s COMMENT 76s comment on column public.sl_subscribe.sub_forward is 'Does this provider keep data in sl_log_1/sl_log_2 to allow it to be a provider for other nodes?'; 76s COMMENT 76s comment on column public.sl_subscribe.sub_active is 'Has this subscription been activated? This is not set on the subscriber until AFTER the subscriber has received COPY data from the provider'; 76s COMMENT 76s create table public.sl_event ( 76s ev_origin int4, 76s ev_seqno int8, 76s ev_timestamp timestamptz, 76s ev_snapshot "pg_catalog".txid_snapshot, 76s ev_type text, 76s ev_data1 text, 76s ev_data2 text, 76s ev_data3 text, 76s ev_data4 text, 76s ev_data5 text, 76s ev_data6 text, 76s ev_data7 text, 76s ev_data8 text, 76s CONSTRAINT "sl_event-pkey" 76s PRIMARY KEY (ev_origin, ev_seqno) 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_event is 'Holds information about replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_confirm table.'; 76s COMMENT 76s comment on column public.sl_event.ev_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 76s COMMENT 76s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 76s COMMENT 76s comment on column public.sl_event.ev_timestamp is 'When this event record was created'; 76s COMMENT 76s comment on column public.sl_event.ev_snapshot is 'TXID snapshot on provider node for this event'; 76s COMMENT 76s comment on column public.sl_event.ev_seqno is 'The ID # for the event'; 76s COMMENT 76s comment on column public.sl_event.ev_type is 'The type of event this record is for. 76s SYNC = Synchronise 76s STORE_NODE = 76s ENABLE_NODE = 76s DROP_NODE = 76s STORE_PATH = 76s DROP_PATH = 76s STORE_LISTEN = 76s DROP_LISTEN = 76s STORE_SET = 76s DROP_SET = 76s MERGE_SET = 76s SET_ADD_TABLE = 76s SET_ADD_SEQUENCE = 76s STORE_TRIGGER = 76s DROP_TRIGGER = 76s MOVE_SET = 76s ACCEPT_SET = 76s SET_DROP_TABLE = 76s SET_DROP_SEQUENCE = 76s SET_MOVE_TABLE = 76s SET_MOVE_SEQUENCE = 76s FAILOVER_SET = 76s SUBSCRIBE_SET = 76s ENABLE_SUBSCRIPTION = 76s UNSUBSCRIBE_SET = 76s DDL_SCRIPT = 76s ADJUST_SEQ = 76s RESET_CONFIG = 76s '; 76s COMMENT 76s comment on column public.sl_event.ev_data1 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s comment on column public.sl_event.ev_data2 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s comment on column public.sl_event.ev_data3 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s comment on column public.sl_event.ev_data4 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s comment on column public.sl_event.ev_data5 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s comment on column public.sl_event.ev_data6 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s comment on column public.sl_event.ev_data7 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s comment on column public.sl_event.ev_data8 is 'Data field containing an argument needed to process the event'; 76s COMMENT 76s create table public.sl_confirm ( 76s con_origin int4, 76s con_received int4, 76s con_seqno int8, 76s con_timestamp timestamptz DEFAULT timeofday()::timestamptz 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_confirm is 'Holds confirmation of replication events. After a period of time, Slony removes old confirmed events from both this table and the sl_event table.'; 76s COMMENT 76s comment on column public.sl_confirm.con_origin is 'The ID # (from sl_node.no_id) of the source node for this event'; 76s COMMENT 76s comment on column public.sl_confirm.con_seqno is 'The ID # for the event'; 76s COMMENT 76s comment on column public.sl_confirm.con_timestamp is 'When this event was confirmed'; 76s COMMENT 76s create index sl_confirm_idx1 on public.sl_confirm 76s (con_origin, con_received, con_seqno); 76s CREATE INDEX 76s create index sl_confirm_idx2 on public.sl_confirm 76s (con_received, con_seqno); 76s CREATE INDEX 76s create table public.sl_seqlog ( 76s seql_seqid int4, 76s seql_origin int4, 76s seql_ev_seqno int8, 76s seql_last_value int8 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_seqlog is 'Log of Sequence updates'; 76s COMMENT 76s comment on column public.sl_seqlog.seql_seqid is 'Sequence ID'; 76s COMMENT 76s comment on column public.sl_seqlog.seql_origin is 'Publisher node at which the sequence originates'; 76s COMMENT 76s comment on column public.sl_seqlog.seql_ev_seqno is 'Slony-I Event with which this sequence update is associated'; 76s COMMENT 76s comment on column public.sl_seqlog.seql_last_value is 'Last value published for this sequence'; 76s COMMENT 76s create index sl_seqlog_idx on public.sl_seqlog 76s (seql_origin, seql_ev_seqno, seql_seqid); 76s CREATE INDEX 76s create function public.sequenceLastValue(p_seqname text) returns int8 76s as $$ 76s declare 76s v_seq_row record; 76s begin 76s for v_seq_row in execute 'select last_value from ' || public.slon_quote_input(p_seqname) 76s loop 76s return v_seq_row.last_value; 76s end loop; 76s 76s -- not reached 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.sequenceLastValue(p_seqname text) is 76s 'sequenceLastValue(p_seqname) 76s 76s Utility function used in sl_seqlastvalue view to compactly get the 76s last value from the requested sequence.'; 76s COMMENT 76s create table public.sl_log_1 ( 76s log_origin int4, 76s log_txid bigint, 76s log_tableid int4, 76s log_actionseq int8, 76s log_tablenspname text, 76s log_tablerelname text, 76s log_cmdtype "char", 76s log_cmdupdncols int4, 76s log_cmdargs text[] 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s create index sl_log_1_idx1 on public.sl_log_1 76s (log_origin, log_txid, log_actionseq); 76s CREATE INDEX 76s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 76s COMMENT 76s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 76s COMMENT 76s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 76s COMMENT 76s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 76s COMMENT 76s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 76s COMMENT 76s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 76s COMMENT 76s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 76s COMMENT 76s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 76s COMMENT 76s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 76s COMMENT 76s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 76s COMMENT 76s create table public.sl_log_2 ( 76s log_origin int4, 76s log_txid bigint, 76s log_tableid int4, 76s log_actionseq int8, 76s log_tablenspname text, 76s log_tablerelname text, 76s log_cmdtype "char", 76s log_cmdupdncols int4, 76s log_cmdargs text[] 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s create index sl_log_2_idx1 on public.sl_log_2 76s (log_origin, log_txid, log_actionseq); 76s CREATE INDEX 76s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 76s COMMENT 76s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 76s COMMENT 76s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 76s COMMENT 76s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 76s COMMENT 76s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 76s COMMENT 76s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 76s COMMENT 76s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 76s COMMENT 76s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 76s COMMENT 76s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 76s COMMENT 76s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 76s COMMENT 76s create table public.sl_log_script ( 76s log_origin int4, 76s log_txid bigint, 76s log_actionseq int8, 76s log_cmdtype "char", 76s log_cmdargs text[] 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s create index sl_log_script_idx1 on public.sl_log_script 76s (log_origin, log_txid, log_actionseq); 76s CREATE INDEX 76s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 76s COMMENT 76s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 76s COMMENT 76s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 76s COMMENT 76s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 76s COMMENT 76s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 76s COMMENT 76s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 76s COMMENT 76s create table public.sl_registry ( 76s reg_key text primary key, 76s reg_int4 int4, 76s reg_text text, 76s reg_timestamp timestamptz 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s comment on table public.sl_registry is 'Stores miscellaneous runtime data'; 76s COMMENT 76s comment on column public.sl_registry.reg_key is 'Unique key of the runtime option'; 76s COMMENT 76s comment on column public.sl_registry.reg_int4 is 'Option value if type int4'; 76s COMMENT 76s comment on column public.sl_registry.reg_text is 'Option value if type text'; 76s COMMENT 76s comment on column public.sl_registry.reg_timestamp is 'Option value if type timestamp'; 76s COMMENT 76s create table public.sl_apply_stats ( 76s as_origin int4, 76s as_num_insert int8, 76s as_num_update int8, 76s as_num_delete int8, 76s as_num_truncate int8, 76s as_num_script int8, 76s as_num_total int8, 76s as_duration interval, 76s as_apply_first timestamptz, 76s as_apply_last timestamptz, 76s as_cache_prepare int8, 76s as_cache_hit int8, 76s as_cache_evict int8, 76s as_cache_prepare_max int8 76s ) WITHOUT OIDS; 76s CREATE TABLE 76s create index sl_apply_stats_idx1 on public.sl_apply_stats 76s (as_origin); 76s CREATE INDEX 76s comment on table public.sl_apply_stats is 'Local SYNC apply statistics (running totals)'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_origin is 'Origin of the SYNCs'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_num_insert is 'Number of INSERT operations performed'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_num_update is 'Number of UPDATE operations performed'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_num_delete is 'Number of DELETE operations performed'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_num_truncate is 'Number of TRUNCATE operations performed'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_num_script is 'Number of DDL operations performed'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_num_total is 'Total number of operations'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_duration is 'Processing time'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_apply_first is 'Timestamp of first recorded SYNC'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_apply_last is 'Timestamp of most recent recorded SYNC'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_cache_evict is 'Number of apply query cache evict operations'; 76s COMMENT 76s comment on column public.sl_apply_stats.as_cache_prepare_max is 'Maximum number of apply queries prepared in one SYNC group'; 76s COMMENT 76s create view public.sl_seqlastvalue as 76s select SQ.seq_id, SQ.seq_set, SQ.seq_reloid, 76s S.set_origin as seq_origin, 76s public.sequenceLastValue( 76s "pg_catalog".quote_ident(PGN.nspname) || '.' || 76s "pg_catalog".quote_ident(PGC.relname)) as seq_last_value 76s from public.sl_sequence SQ, public.sl_set S, 76s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 76s where S.set_id = SQ.seq_set 76s and PGC.oid = SQ.seq_reloid and PGN.oid = PGC.relnamespace; 76s CREATE VIEW 76s create view public.sl_failover_targets as 76s select set_id, 76s set_origin as set_origin, 76s sub1.sub_receiver as backup_id 76s FROM 76s public.sl_subscribe sub1 76s ,public.sl_set set1 76s where 76s sub1.sub_set=set_id 76s and sub1.sub_forward=true 76s --exclude candidates where the set_origin 76s --has a path a node but the failover 76s --candidate has no path to that node 76s and sub1.sub_receiver not in 76s (select p1.pa_client from 76s public.sl_path p1 76s left outer join public.sl_path p2 on 76s (p2.pa_client=p1.pa_client 76s and p2.pa_server=sub1.sub_receiver) 76s where p2.pa_client is null 76s and p1.pa_server=set_origin 76s and p1.pa_client<>sub1.sub_receiver 76s ) 76s and sub1.sub_provider=set_origin 76s --exclude any subscribers that are not 76s --direct subscribers of all sets on the 76s --origin 76s and sub1.sub_receiver not in 76s (select direct_recv.sub_receiver 76s from 76s 76s (--all direct receivers of the first set 76s select subs2.sub_receiver 76s from public.sl_subscribe subs2 76s where subs2.sub_provider=set1.set_origin 76s and subs2.sub_set=set1.set_id) as 76s direct_recv 76s inner join 76s (--all other sets from the origin 76s select set_id from public.sl_set set2 76s where set2.set_origin=set1.set_origin 76s and set2.set_id<>sub1.sub_set) 76s as othersets on(true) 76s left outer join public.sl_subscribe subs3 76s on(subs3.sub_set=othersets.set_id 76s and subs3.sub_forward=true 76s and subs3.sub_provider=set1.set_origin 76s and direct_recv.sub_receiver=subs3.sub_receiver) 76s where subs3.sub_receiver is null 76s ); 76s CREATE VIEW 76s create sequence public.sl_local_node_id 76s MINVALUE -1; 76s CREATE SEQUENCE 76s SELECT setval('public.sl_local_node_id', -1); 76s setval 76s -------- 76s -1 76s (1 row) 76s 76s comment on sequence public.sl_local_node_id is 'The local node ID is initialized to -1, meaning that this node is not initialized yet.'; 76s COMMENT 76s create sequence public.sl_event_seq; 76s CREATE SEQUENCE 76s comment on sequence public.sl_event_seq is 'The sequence for numbering events originating from this node.'; 76s COMMENT 76s select setval('public.sl_event_seq', 5000000000); 76s setval 76s ------------ 76s 5000000000 76s (1 row) 76s 76s create sequence public.sl_action_seq; 76s CREATE SEQUENCE 76s comment on sequence public.sl_action_seq is 'The sequence to number statements in the transaction logs, so that the replication engines can figure out the "agreeable" order of statements.'; 76s COMMENT 76s create sequence public.sl_log_status 76s MINVALUE 0 MAXVALUE 3; 76s CREATE SEQUENCE 76s SELECT setval('public.sl_log_status', 0); 76s setval 76s -------- 76s 0 76s (1 row) 76s 76s comment on sequence public.sl_log_status is ' 76s Bit 0x01 determines the currently active log table 76s Bit 0x02 tells if the engine needs to read both logs 76s after switching until the old log is clean and truncated. 76s 76s Possible values: 76s 0 sl_log_1 active, sl_log_2 clean 76s 1 sl_log_2 active, sl_log_1 clean 76s 2 sl_log_1 active, sl_log_2 unknown - cleanup 76s 3 sl_log_2 active, sl_log_1 unknown - cleanup 76s 76s This is not yet in use. 76s '; 76s COMMENT 76s create table public.sl_config_lock ( 76s dummy integer 76s ); 76s CREATE TABLE 76s comment on table public.sl_config_lock is 'This table exists solely to prevent overlapping execution of configuration change procedures and the resulting possible deadlocks. 76s '; 76s COMMENT 76s comment on column public.sl_config_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 76s COMMENT 76s create table public.sl_event_lock ( 76s dummy integer 76s ); 76s CREATE TABLE 76s comment on table public.sl_event_lock is 'This table exists solely to prevent multiple connections from concurrently creating new events and perhaps getting them out of order.'; 76s COMMENT 76s comment on column public.sl_event_lock.dummy is 'No data ever goes in this table so the contents never matter. Indeed, this column does not really need to exist.'; 76s COMMENT 76s create table public.sl_archive_counter ( 76s ac_num bigint, 76s ac_timestamp timestamptz 76s ) without oids; 76s CREATE TABLE 76s comment on table public.sl_archive_counter is 'Table used to generate the log shipping archive number. 76s '; 76s COMMENT 76s comment on column public.sl_archive_counter.ac_num is 'Counter of SYNC ID used in log shipping as the archive number'; 76s COMMENT 76s comment on column public.sl_archive_counter.ac_timestamp is 'Time at which the archive log was generated on the subscriber'; 76s COMMENT 76s insert into public.sl_archive_counter (ac_num, ac_timestamp) 76s values (0, 'epoch'::timestamptz); 76s INSERT 0 1 76s create table public.sl_components ( 76s co_actor text not null primary key, 76s co_pid integer not null, 76s co_node integer not null, 76s co_connection_pid integer not null, 76s co_activity text, 76s co_starttime timestamptz not null, 76s co_event bigint, 76s co_eventtype text 76s ) without oids; 76s CREATE TABLE 76s comment on table public.sl_components is 'Table used to monitor what various slon/slonik components are doing'; 76s COMMENT 76s comment on column public.sl_components.co_actor is 'which component am I?'; 76s COMMENT 76s comment on column public.sl_components.co_pid is 'my process/thread PID on node where slon runs'; 76s COMMENT 76s comment on column public.sl_components.co_node is 'which node am I servicing?'; 76s COMMENT 76s comment on column public.sl_components.co_connection_pid is 'PID of database connection being used on database server'; 76s COMMENT 76s comment on column public.sl_components.co_activity is 'activity that I am up to'; 76s COMMENT 76s comment on column public.sl_components.co_starttime is 'when did my activity begin? (timestamp reported as per slon process on server running slon)'; 76s COMMENT 76s comment on column public.sl_components.co_eventtype is 'what kind of event am I processing? (commonly n/a for event loop main threads)'; 76s COMMENT 76s comment on column public.sl_components.co_event is 'which event have I started processing?'; 76s COMMENT 76s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 76s $BODY$ 76s DECLARE 76s c_delim text; 76s BEGIN 76s c_delim = ','; 76s IF (txt_before IS NULL or txt_before='') THEN 76s RETURN txt_new; 76s END IF; 76s RETURN txt_before || c_delim || txt_new; 76s END; 76s $BODY$ 76s LANGUAGE plpgsql; 76s CREATE FUNCTION 76s comment on function public.agg_text_sum(text,text) is 76s 'An accumulator function used by the slony string_agg function to 76s aggregate rows into a string'; 76s COMMENT 76s CREATE AGGREGATE public.string_agg(text) ( 76s SFUNC=public.agg_text_sum, 76s STYPE=text, 76s INITCOND='' 76s ); 76s CREATE AGGREGATE 76s grant usage on schema public to public; 76s GRANT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) 76s returns bigint 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__createEvent' 76s language C 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.createEvent (p_cluster_name name, p_event_type text, ev_data1 text, ev_data2 text, ev_data3 text, ev_data4 text, ev_data5 text, ev_data6 text, ev_data7 text, ev_data8 text) is 76s 'FUNCTION createEvent (cluster_name, ev_type [, ev_data [...]]) 76s 76s Create an sl_event entry'; 76s COMMENT 76s create or replace function public.denyAccess () 76s returns trigger 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__denyAccess' 76s language C 76s security definer; 76s CREATE FUNCTION 76s comment on function public.denyAccess () is 76s 'Trigger function to prevent modifications to a table on a subscriber'; 76s COMMENT 76s grant execute on function public.denyAccess () to public; 76s GRANT 76s create or replace function public.lockedSet () 76s returns trigger 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__lockedSet' 76s language C; 76s CREATE FUNCTION 76s comment on function public.lockedSet () is 76s 'Trigger function to prevent modifications to a table before and after a moveSet()'; 76s COMMENT 76s create or replace function public.getLocalNodeId (p_cluster name) returns int4 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getLocalNodeId' 76s language C 76s security definer; 76s CREATE FUNCTION 76s grant execute on function public.getLocalNodeId (p_cluster name) to public; 76s GRANT 76s comment on function public.getLocalNodeId (p_cluster name) is 76s 'Returns the node ID of the node being serviced on the local database'; 76s COMMENT 76s create or replace function public.getModuleVersion () returns text 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__getModuleVersion' 76s language C 76s security definer; 76s CREATE FUNCTION 76s grant execute on function public.getModuleVersion () to public; 76s GRANT 76s comment on function public.getModuleVersion () is 76s 'Returns the compiled-in version number of the Slony-I shared object'; 76s COMMENT 76s create or replace function public.resetSession() returns text 76s as '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__resetSession' 76s language C; 76s CREATE FUNCTION 76s create or replace function public.logApply () returns trigger 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApply' 76s language C 76s security definer; 76s CREATE FUNCTION 76s create or replace function public.logApplySetCacheSize (p_size int4) 76s returns int4 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySetCacheSize' 76s language C; 76s CREATE FUNCTION 76s create or replace function public.logApplySaveStats (p_cluster name, p_origin int4, p_duration interval) 76s returns int4 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logApplySaveStats' 76s language C; 76s CREATE FUNCTION 76s create or replace function public.checkmoduleversion () returns text as $$ 76s declare 76s moduleversion text; 76s begin 76s select into moduleversion public.getModuleVersion(); 76s if moduleversion <> '2.2.11' then 76s raise exception 'Slonik version: 2.2.11 != Slony-I version in PG build %', 76s moduleversion; 76s end if; 76s return null; 76s end;$$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.checkmoduleversion () is 76s 'Inline test function that verifies that slonik request for STORE 76s NODE/INIT CLUSTER is being run against a conformant set of 76s schema/functions.'; 76s COMMENT 76s select public.checkmoduleversion(); 76s checkmoduleversion 76s -------------------- 76s 76s (1 row) 76s 76s create or replace function public.decode_tgargs(bytea) returns text[] as 76s '$libdir/slony1_funcs.2.2.11','_Slony_I_2_2_11__slon_decode_tgargs' language C security definer; 76s CREATE FUNCTION 76s comment on function public.decode_tgargs(bytea) is 76s 'Translates the contents of pg_trigger.tgargs to an array of text arguments'; 76s COMMENT 76s grant execute on function public.decode_tgargs(bytea) to public; 76s GRANT 76s create or replace function public.check_namespace_validity () returns boolean as $$ 76s declare 76s c_cluster text; 76s begin 76s c_cluster := 'main'; 76s if c_cluster !~ E'^[[:alpha:]_][[:alnum:]_\$]{0,62}$' then 76s raise exception 'Cluster name % is not a valid SQL symbol!', c_cluster; 76s else 76s raise notice 'checked validity of cluster % namespace - OK!', c_cluster; 76s end if; 76s return 't'; 76s end 76s $$ language plpgsql; 76s CREATE FUNCTION 76s select public.check_namespace_validity(); 76s check_namespace_validity 76s -------------------------- 76s t 76s (1 row) 76s 76s drop function public.check_namespace_validity(); 76s DROP FUNCTION 76s create or replace function public.logTrigger () returns trigger 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__logTrigger' 76s language C 76s security definer; 76s CREATE FUNCTION 76s comment on function public.logTrigger () is 76s 'This is the trigger that is executed on the origin node that causes 76s updates to be recorded in sl_log_1/sl_log_2.'; 76s COMMENT 76s grant execute on function public.logTrigger () to public; 76s GRANT 76s create or replace function public.terminateNodeConnections (p_failed_node int4) returns int4 76s as $$ 76s declare 76s v_row record; 76s begin 76s for v_row in select nl_nodeid, nl_conncnt, 76s nl_backendpid from public.sl_nodelock 76s where nl_nodeid = p_failed_node for update 76s loop 76s perform public.killBackend(v_row.nl_backendpid, 'TERM'); 76s delete from public.sl_nodelock 76s where nl_nodeid = v_row.nl_nodeid 76s and nl_conncnt = v_row.nl_conncnt; 76s end loop; 76s 76s return 0; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.terminateNodeConnections (p_failed_node int4) is 76s 'terminates all backends that have registered to be from the given node'; 76s COMMENT 76s create or replace function public.killBackend (p_pid int4, p_signame text) returns int4 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__killBackend' 76s language C; 76s CREATE FUNCTION 76s comment on function public.killBackend(p_pid int4, p_signame text) is 76s 'Send a signal to a postgres process. Requires superuser rights'; 76s COMMENT 76s create or replace function public.seqtrack (p_seqid int4, p_seqval int8) returns int8 76s as '$libdir/slony1_funcs.2.2.11', '_Slony_I_2_2_11__seqtrack' 76s strict language C; 76s CREATE FUNCTION 76s comment on function public.seqtrack(p_seqid int4, p_seqval int8) is 76s 'Returns NULL if seqval has not changed since the last call for seqid'; 76s COMMENT 76s create or replace function public.slon_quote_brute(p_tab_fqname text) returns text 76s as $$ 76s declare 76s v_fqname text default ''; 76s begin 76s v_fqname := '"' || replace(p_tab_fqname,'"','""') || '"'; 76s return v_fqname; 76s end; 76s $$ language plpgsql immutable; 76s CREATE FUNCTION 76s comment on function public.slon_quote_brute(p_tab_fqname text) is 76s 'Brutally quote the given text'; 76s COMMENT 76s create or replace function public.slon_quote_input(p_tab_fqname text) returns text as $$ 76s declare 76s v_nsp_name text; 76s v_tab_name text; 76s v_i integer; 76s v_l integer; 76s v_pq2 integer; 76s begin 76s v_l := length(p_tab_fqname); 76s 76s -- Let us search for the dot 76s if p_tab_fqname like '"%' then 76s -- if the first part of the ident starts with a double quote, search 76s -- for the closing double quote, skipping over double double quotes. 76s v_i := 2; 76s while v_i <= v_l loop 76s if substr(p_tab_fqname, v_i, 1) != '"' then 76s v_i := v_i + 1; 76s else 76s v_i := v_i + 1; 76s if substr(p_tab_fqname, v_i, 1) != '"' then 76s exit; 76s end if; 76s v_i := v_i + 1; 76s end if; 76s end loop; 76s else 76s -- first part of ident is not quoted, search for the dot directly 76s v_i := 1; 76s while v_i <= v_l loop 76s if substr(p_tab_fqname, v_i, 1) = '.' then 76s exit; 76s end if; 76s v_i := v_i + 1; 76s end loop; 76s end if; 76s 76s -- v_i now points at the dot or behind the string. 76s 76s if substr(p_tab_fqname, v_i, 1) = '.' then 76s -- There is a dot now, so split the ident into its namespace 76s -- and objname parts and make sure each is quoted 76s v_nsp_name := substr(p_tab_fqname, 1, v_i - 1); 76s v_tab_name := substr(p_tab_fqname, v_i + 1); 76s if v_nsp_name not like '"%' then 76s v_nsp_name := '"' || replace(v_nsp_name, '"', '""') || 76s '"'; 76s end if; 76s if v_tab_name not like '"%' then 76s v_tab_name := '"' || replace(v_tab_name, '"', '""') || 76s '"'; 76s end if; 76s 76s return v_nsp_name || '.' || v_tab_name; 76s else 76s -- No dot ... must be just an ident without schema 76s if p_tab_fqname like '"%' then 76s return p_tab_fqname; 76s else 76s return '"' || replace(p_tab_fqname, '"', '""') || '"'; 76s end if; 76s end if; 76s 76s end;$$ language plpgsql immutable; 76s CREATE FUNCTION 76s comment on function public.slon_quote_input(p_text text) is 76s 'quote all words that aren''t quoted yet'; 76s COMMENT 76s create or replace function public.slonyVersionMajor() 76s returns int4 76s as $$ 76s begin 76s return 2; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.slonyVersionMajor () is 76s 'Returns the major version number of the slony schema'; 76s COMMENT 76s create or replace function public.slonyVersionMinor() 76s returns int4 76s as $$ 76s begin 76s return 2; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.slonyVersionMinor () is 76s 'Returns the minor version number of the slony schema'; 76s COMMENT 76s create or replace function public.slonyVersionPatchlevel() 76s returns int4 76s as $$ 76s begin 76s return 11; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.slonyVersionPatchlevel () is 76s 'Returns the version patch level of the slony schema'; 76s COMMENT 76s create or replace function public.slonyVersion() 76s returns text 76s as $$ 76s begin 76s return public.slonyVersionMajor()::text || '.' || 76s public.slonyVersionMinor()::text || '.' || 76s public.slonyVersionPatchlevel()::text ; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.slonyVersion() is 76s 'Returns the version number of the slony schema'; 76s COMMENT 76s create or replace function public.registry_set_int4(p_key text, p_value int4) 76s returns int4 as $$ 76s BEGIN 76s if p_value is null then 76s delete from public.sl_registry 76s where reg_key = p_key; 76s else 76s lock table public.sl_registry; 76s update public.sl_registry 76s set reg_int4 = p_value 76s where reg_key = p_key; 76s if not found then 76s insert into public.sl_registry (reg_key, reg_int4) 76s values (p_key, p_value); 76s end if; 76s end if; 76s return p_value; 76s END; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.registry_set_int4(p_key text, p_value int4) is 76s 'registry_set_int4(key, value) 76s 76s Set or delete a registry value'; 76s COMMENT 76s create or replace function public.registry_get_int4(p_key text, p_default int4) 76s returns int4 as $$ 76s DECLARE 76s v_value int4; 76s BEGIN 76s select reg_int4 into v_value from public.sl_registry 76s where reg_key = p_key; 76s if not found then 76s v_value = p_default; 76s if p_default notnull then 76s perform public.registry_set_int4(p_key, p_default); 76s end if; 76s else 76s if v_value is null then 76s raise exception 'Slony-I: registry key % is not an int4 value', 76s p_key; 76s end if; 76s end if; 76s return v_value; 76s END; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.registry_get_int4(p_key text, p_default int4) is 76s 'registry_get_int4(key, value) 76s 76s Get a registry value. If not present, set and return the default.'; 76s COMMENT 76s create or replace function public.registry_set_text(p_key text, p_value text) 76s returns text as $$ 76s BEGIN 76s if p_value is null then 76s delete from public.sl_registry 76s where reg_key = p_key; 76s else 76s lock table public.sl_registry; 76s update public.sl_registry 76s set reg_text = p_value 76s where reg_key = p_key; 76s if not found then 76s insert into public.sl_registry (reg_key, reg_text) 76s values (p_key, p_value); 76s end if; 76s end if; 76s return p_value; 76s END; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.registry_set_text(text, text) is 76s 'registry_set_text(key, value) 76s 76s Set or delete a registry value'; 76s COMMENT 76s create or replace function public.registry_get_text(p_key text, p_default text) 76s returns text as $$ 76s DECLARE 76s v_value text; 76s BEGIN 76s select reg_text into v_value from public.sl_registry 76s where reg_key = p_key; 76s if not found then 76s v_value = p_default; 76s if p_default notnull then 76s perform public.registry_set_text(p_key, p_default); 76s end if; 76s else 76s if v_value is null then 76s raise exception 'Slony-I: registry key % is not a text value', 76s p_key; 76s end if; 76s end if; 76s return v_value; 76s END; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.registry_get_text(p_key text, p_default text) is 76s 'registry_get_text(key, value) 76s 76s Get a registry value. If not present, set and return the default.'; 76s COMMENT 76s create or replace function public.registry_set_timestamp(p_key text, p_value timestamptz) 76s returns timestamp as $$ 76s BEGIN 76s if p_value is null then 76s delete from public.sl_registry 76s where reg_key = p_key; 76s else 76s lock table public.sl_registry; 76s update public.sl_registry 76s set reg_timestamp = p_value 76s where reg_key = p_key; 76s if not found then 76s insert into public.sl_registry (reg_key, reg_timestamp) 76s values (p_key, p_value); 76s end if; 76s end if; 76s return p_value; 76s END; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.registry_set_timestamp(p_key text, p_value timestamptz) is 76s 'registry_set_timestamp(key, value) 76s 76s Set or delete a registry value'; 76s COMMENT 76s create or replace function public.registry_get_timestamp(p_key text, p_default timestamptz) 76s returns timestamp as $$ 76s DECLARE 76s v_value timestamp; 76s BEGIN 76s select reg_timestamp into v_value from public.sl_registry 76s where reg_key = p_key; 76s if not found then 76s v_value = p_default; 76s if p_default notnull then 76s perform public.registry_set_timestamp(p_key, p_default); 76s end if; 76s else 76s if v_value is null then 76s raise exception 'Slony-I: registry key % is not an timestamp value', 76s p_key; 76s end if; 76s end if; 76s return v_value; 76s END; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.registry_get_timestamp(p_key text, p_default timestamptz) is 76s 'registry_get_timestamp(key, value) 76s 76s Get a registry value. If not present, set and return the default.'; 76s COMMENT 76s create or replace function public.cleanupNodelock () 76s returns int4 76s as $$ 76s declare 76s v_row record; 76s begin 76s for v_row in select nl_nodeid, nl_conncnt, nl_backendpid 76s from public.sl_nodelock 76s for update 76s loop 76s if public.killBackend(v_row.nl_backendpid, 'NULL') < 0 then 76s raise notice 'Slony-I: cleanup stale sl_nodelock entry for pid=%', 76s v_row.nl_backendpid; 76s delete from public.sl_nodelock where 76s nl_nodeid = v_row.nl_nodeid and 76s nl_conncnt = v_row.nl_conncnt; 76s end if; 76s end loop; 76s 76s return 0; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.cleanupNodelock() is 76s 'Clean up stale entries when restarting slon'; 76s COMMENT 76s create or replace function public.registerNodeConnection (p_nodeid int4) 76s returns int4 76s as $$ 76s begin 76s insert into public.sl_nodelock 76s (nl_nodeid, nl_backendpid) 76s values 76s (p_nodeid, pg_backend_pid()); 76s 76s return 0; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.registerNodeConnection (p_nodeid int4) is 76s 'Register (uniquely) the node connection so that only one slon can service the node'; 76s COMMENT 76s create or replace function public.initializeLocalNode (p_local_node_id int4, p_comment text) 76s returns int4 76s as $$ 76s declare 76s v_old_node_id int4; 76s v_first_log_no int4; 76s v_event_seq int8; 76s begin 76s -- ---- 76s -- Make sure this node is uninitialized or got reset 76s -- ---- 76s select last_value::int4 into v_old_node_id from public.sl_local_node_id; 76s if v_old_node_id != -1 then 76s raise exception 'Slony-I: This node is already initialized'; 76s end if; 76s 76s -- ---- 76s -- Set sl_local_node_id to the requested value and add our 76s -- own system to sl_node. 76s -- ---- 76s perform setval('public.sl_local_node_id', p_local_node_id); 76s perform public.storeNode_int (p_local_node_id, p_comment); 76s 76s if (pg_catalog.current_setting('max_identifier_length')::integer - pg_catalog.length('public')) < 5 then 76s raise notice 'Slony-I: Cluster name length [%] versus system max_identifier_length [%] ', pg_catalog.length('public'), pg_catalog.current_setting('max_identifier_length'); 76s raise notice 'leaves narrow/no room for some Slony-I-generated objects (such as indexes).'; 76s raise notice 'You may run into problems later!'; 76s end if; 76s 76s -- 76s -- Put the apply trigger onto sl_log_1 and sl_log_2 76s -- 76s create trigger apply_trigger 76s before INSERT on public.sl_log_1 76s for each row execute procedure public.logApply('_main'); 76s alter table public.sl_log_1 76s enable replica trigger apply_trigger; 76s create trigger apply_trigger 76s before INSERT on public.sl_log_2 76s for each row execute procedure public.logApply('_main'); 76s alter table public.sl_log_2 76s enable replica trigger apply_trigger; 76s 76s return p_local_node_id; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.initializeLocalNode (p_local_node_id int4, p_comment text) is 76s 'no_id - Node ID # 76s no_comment - Human-oriented comment 76s 76s Initializes the new node, no_id'; 76s COMMENT 76s create or replace function public.storeNode (p_no_id int4, p_no_comment text) 76s returns bigint 76s as $$ 76s begin 76s perform public.storeNode_int (p_no_id, p_no_comment); 76s return public.createEvent('_main', 'STORE_NODE', 76s p_no_id::text, p_no_comment::text); 76s end; 76s $$ language plpgsql 76s called on null input; 76s CREATE FUNCTION 76s comment on function public.storeNode(p_no_id int4, p_no_comment text) is 76s 'no_id - Node ID # 76s no_comment - Human-oriented comment 76s 76s Generate the STORE_NODE event for node no_id'; 76s COMMENT 76s create or replace function public.storeNode_int (p_no_id int4, p_no_comment text) 76s returns int4 76s as $$ 76s declare 76s v_old_row record; 76s begin 76s -- ---- 76s -- Grab the central configuration lock 76s -- ---- 76s lock table public.sl_config_lock; 76s 76s -- ---- 76s -- Check if the node exists 76s -- ---- 76s select * into v_old_row 76s from public.sl_node 76s where no_id = p_no_id 76s for update; 76s if found then 76s -- ---- 76s -- Node exists, update the existing row. 76s -- ---- 76s update public.sl_node 76s set no_comment = p_no_comment 76s where no_id = p_no_id; 76s else 76s -- ---- 76s -- New node, insert the sl_node row 76s -- ---- 76s insert into public.sl_node 76s (no_id, no_active, no_comment,no_failed) values 76s (p_no_id, 'f', p_no_comment,false); 76s end if; 76s 76s return p_no_id; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.storeNode_int(p_no_id int4, p_no_comment text) is 76s 'no_id - Node ID # 76s no_comment - Human-oriented comment 76s 76s Internal function to process the STORE_NODE event for node no_id'; 76s COMMENT 76s create or replace function public.enableNode (p_no_id int4) 76s returns bigint 76s as $$ 76s declare 76s v_local_node_id int4; 76s v_node_row record; 76s begin 76s -- ---- 76s -- Grab the central configuration lock 76s -- ---- 76s lock table public.sl_config_lock; 76s 76s -- ---- 76s -- Check that we are the node to activate and that we are 76s -- currently disabled. 76s -- ---- 76s v_local_node_id := public.getLocalNodeId('_main'); 76s select * into v_node_row 76s from public.sl_node 76s where no_id = p_no_id 76s for update; 76s if not found then 76s raise exception 'Slony-I: node % not found', p_no_id; 76s end if; 76s if v_node_row.no_active then 76s raise exception 'Slony-I: node % is already active', p_no_id; 76s end if; 76s 76s -- ---- 76s -- Activate this node and generate the ENABLE_NODE event 76s -- ---- 76s perform public.enableNode_int (p_no_id); 76s return public.createEvent('_main', 'ENABLE_NODE', 76s p_no_id::text); 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.enableNode(p_no_id int4) is 76s 'no_id - Node ID # 76s 76s Generate the ENABLE_NODE event for node no_id'; 76s COMMENT 76s create or replace function public.enableNode_int (p_no_id int4) 76s returns int4 76s as $$ 76s declare 76s v_local_node_id int4; 76s v_node_row record; 76s v_sub_row record; 76s begin 76s -- ---- 76s -- Grab the central configuration lock 76s -- ---- 76s lock table public.sl_config_lock; 76s 76s -- ---- 76s -- Check that the node is inactive 76s -- ---- 76s select * into v_node_row 76s from public.sl_node 76s where no_id = p_no_id 76s for update; 76s if not found then 76s raise exception 'Slony-I: node % not found', p_no_id; 76s end if; 76s if v_node_row.no_active then 76s return p_no_id; 76s end if; 76s 76s -- ---- 76s -- Activate the node and generate sl_confirm status rows for it. 76s -- ---- 76s update public.sl_node 76s set no_active = 't' 76s where no_id = p_no_id; 76s insert into public.sl_confirm 76s (con_origin, con_received, con_seqno) 76s select no_id, p_no_id, 0 from public.sl_node 76s where no_id != p_no_id 76s and no_active; 76s insert into public.sl_confirm 76s (con_origin, con_received, con_seqno) 76s select p_no_id, no_id, 0 from public.sl_node 76s where no_id != p_no_id 76s and no_active; 76s 76s -- ---- 76s -- Generate ENABLE_SUBSCRIPTION events for all sets that 76s -- origin here and are subscribed by the just enabled node. 76s -- ---- 76s v_local_node_id := public.getLocalNodeId('_main'); 76s for v_sub_row in select SUB.sub_set, SUB.sub_provider from 76s public.sl_set S, 76s public.sl_subscribe SUB 76s where S.set_origin = v_local_node_id 76s and S.set_id = SUB.sub_set 76s and SUB.sub_receiver = p_no_id 76s for update of S 76s loop 76s perform public.enableSubscription (v_sub_row.sub_set, 76s v_sub_row.sub_provider, p_no_id); 76s end loop; 76s 76s return p_no_id; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.enableNode_int(p_no_id int4) is 76s 'no_id - Node ID # 76s 76s Internal function to process the ENABLE_NODE event for node no_id'; 76s COMMENT 76s create or replace function public.disableNode (p_no_id int4) 76s returns bigint 76s as $$ 76s begin 76s -- **** TODO **** 76s raise exception 'Slony-I: disableNode() not implemented'; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.disableNode(p_no_id int4) is 76s 'generate DISABLE_NODE event for node no_id'; 76s COMMENT 76s create or replace function public.disableNode_int (p_no_id int4) 76s returns int4 76s as $$ 76s begin 76s -- **** TODO **** 76s raise exception 'Slony-I: disableNode_int() not implemented'; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.disableNode(p_no_id int4) is 76s 'process DISABLE_NODE event for node no_id 76s 76s NOTE: This is not yet implemented!'; 76s COMMENT 76s create or replace function public.dropNode (p_no_ids int4[]) 76s returns bigint 76s as $$ 76s declare 76s v_node_row record; 76s v_idx integer; 76s begin 76s -- ---- 76s -- Grab the central configuration lock 76s -- ---- 76s lock table public.sl_config_lock; 76s 76s -- ---- 76s -- Check that this got called on a different node 76s -- ---- 76s if public.getLocalNodeId('_main') = ANY (p_no_ids) then 76s raise exception 'Slony-I: DROP_NODE cannot initiate on the dropped node'; 76s end if; 76s 76s -- 76s -- if any of the deleted nodes are receivers we drop the sl_subscribe line 76s -- 76s delete from public.sl_subscribe where sub_receiver = ANY (p_no_ids); 76s 76s v_idx:=1; 76s LOOP 76s EXIT WHEN v_idx>array_upper(p_no_ids,1) ; 76s select * into v_node_row from public.sl_node 76s where no_id = p_no_ids[v_idx] 76s for update; 76s if not found then 76s raise exception 'Slony-I: unknown node ID % %', p_no_ids[v_idx],v_idx; 76s end if; 76s -- ---- 76s -- Make sure we do not break other nodes subscriptions with this 76s -- ---- 76s if exists (select true from public.sl_subscribe 76s where sub_provider = p_no_ids[v_idx]) 76s then 76s raise exception 'Slony-I: Node % is still configured as a data provider', 76s p_no_ids[v_idx]; 76s end if; 76s 76s -- ---- 76s -- Make sure no set originates there any more 76s -- ---- 76s if exists (select true from public.sl_set 76s where set_origin = p_no_ids[v_idx]) 76s then 76s raise exception 'Slony-I: Node % is still origin of one or more sets', 76s p_no_ids[v_idx]; 76s end if; 76s 76s -- ---- 76s -- Call the internal drop functionality and generate the event 76s -- ---- 76s perform public.dropNode_int(p_no_ids[v_idx]); 76s v_idx:=v_idx+1; 76s END LOOP; 76s return public.createEvent('_main', 'DROP_NODE', 76s array_to_string(p_no_ids,',')); 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.dropNode(p_no_ids int4[]) is 76s 'generate DROP_NODE event to drop node node_id from replication'; 76s COMMENT 76s create or replace function public.dropNode_int (p_no_id int4) 76s returns int4 76s as $$ 76s declare 76s v_tab_row record; 76s begin 76s -- ---- 76s -- Grab the central configuration lock 76s -- ---- 76s lock table public.sl_config_lock; 76s 76s -- ---- 76s -- If the dropped node is a remote node, clean the configuration 76s -- from all traces for it. 76s -- ---- 76s if p_no_id <> public.getLocalNodeId('_main') then 76s delete from public.sl_subscribe 76s where sub_receiver = p_no_id; 76s delete from public.sl_listen 76s where li_origin = p_no_id 76s or li_provider = p_no_id 76s or li_receiver = p_no_id; 76s delete from public.sl_path 76s where pa_server = p_no_id 76s or pa_client = p_no_id; 76s delete from public.sl_confirm 76s where con_origin = p_no_id 76s or con_received = p_no_id; 76s delete from public.sl_event 76s where ev_origin = p_no_id; 76s delete from public.sl_node 76s where no_id = p_no_id; 76s 76s return p_no_id; 76s end if; 76s 76s -- ---- 76s -- This is us ... deactivate the node for now, the daemon 76s -- will call uninstallNode() in a separate transaction. 76s -- ---- 76s update public.sl_node 76s set no_active = false 76s where no_id = p_no_id; 76s 76s -- Rewrite sl_listen table 76s perform public.RebuildListenEntries(); 76s 76s return p_no_id; 76s end; 76s $$ language plpgsql; 76s CREATE FUNCTION 76s comment on function public.dropNode_int(p_no_id int4) is 76s 'internal function to process DROP_NODE event to drop node node_id from replication'; 76s COMMENT 76s create or replace function public.preFailover(p_failed_node int4,p_is_candidate boolean) 76s returns int4 76s as $$ 76s declare 76s v_row record; 76s v_row2 record; 76s v_n int4; 76s begin 76s -- ---- 76s -- Grab the central configuration lock 76s -- ---- 76s lock table public.sl_config_lock; 76s 76s -- ---- 76s -- All consistency checks first 76s 76s if p_is_candidate then 76s -- ---- 76s -- Check all sets originating on the failed node 76s -- ---- 76s for v_row in select set_id 76s from public.sl_set 76s where set_origin = p_failed_node 76s loop 76s -- ---- 76s -- Check that the backup node is subscribed to all sets 76s -- that originate on the failed node 76s -- ---- 76s select into v_row2 sub_forward, sub_active 76s from public.sl_subscribe 76s where sub_set = v_row.set_id 76s and sub_receiver = public.getLocalNodeId('_main'); 76s if not found then 76s raise exception 'Slony-I: cannot failover - node % is not subscribed to set %', 76s public.getLocalNodeId('_main'), v_row.set_id; 76s end if; 76s 76s -- ---- 76s -- Check that the subscription is active 76s -- ---- 76s if not v_row2.sub_active then 77s raise exception 'Slony-I: cannot failover - subscription for set % is not active', 77s v_row.set_id; 77s end if; 77s 77s -- ---- 77s -- If there are other subscribers, the backup node needs to 77s -- be a forwarder too. 77s -- ---- 77s select into v_n count(*) 77s from public.sl_subscribe 77s where sub_set = v_row.set_id 77s and sub_receiver <> public.getLocalNodeId('_main'); 77s if v_n > 0 and not v_row2.sub_forward then 77s raise exception 'Slony-I: cannot failover - node % is not a forwarder of set %', 77s public.getLocalNodeId('_main'), v_row.set_id; 77s end if; 77s end loop; 77s end if; 77s 77s -- ---- 77s -- Terminate all connections of the failed node the hard way 77s -- ---- 77s perform public.terminateNodeConnections(p_failed_node); 77s 77s update public.sl_path set pa_conninfo='' WHERE 77s pa_server=p_failed_node; 77s notify "_main_Restart"; 77s -- ---- 77s -- That is it - so far. 77s -- ---- 77s return p_failed_node; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.preFailover(p_failed_node int4,is_failover_candidate boolean) is 77s 'Prepare for a failover. This function is called on all candidate nodes. 77s It blanks the paths to the failed node 77s and then restart of all node daemons.'; 77s COMMENT 77s create or replace function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) 77s returns int4 77s as $$ 77s declare 77s v_row record; 77s v_row2 record; 77s v_failed boolean; 77s v_restart_required boolean; 77s begin 77s 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s v_restart_required:=false; 77s -- 77s -- any nodes other than the backup receiving 77s -- ANY subscription from a failed node 77s -- will now get that data from the backup node. 77s update public.sl_subscribe set 77s sub_provider=p_backup_node 77s where sub_provider=p_failed_node 77s and sub_receiver<>p_backup_node 77s and sub_receiver <> ALL (p_failed_nodes); 77s if found then 77s v_restart_required:=true; 77s end if; 77s -- 77s -- if this node is receiving a subscription from the backup node 77s -- with a failed node as the provider we need to fix this. 77s update public.sl_subscribe set 77s sub_provider=p_backup_node 77s from public.sl_set 77s where set_id = sub_set 77s and set_origin=p_failed_node 77s and sub_provider = ANY(p_failed_nodes) 77s and sub_receiver=public.getLocalNodeId('_main'); 77s 77s -- ---- 77s -- Terminate all connections of the failed node the hard way 77s -- ---- 77s perform public.terminateNodeConnections(p_failed_node); 77s 77s -- Clear out the paths for the failed node. 77s -- This ensures that *this* node won't be pulling data from 77s -- the failed node even if it *does* become accessible 77s 77s update public.sl_path set pa_conninfo='' WHERE 77s pa_server=p_failed_node 77s and pa_conninfo<>''; 77s 77s if found then 77s v_restart_required:=true; 77s end if; 77s 77s v_failed := exists (select 1 from public.sl_node 77s where no_failed=true and no_id=p_failed_node); 77s 77s if not v_failed then 77s 77s update public.sl_node set no_failed=true where no_id = ANY (p_failed_nodes) 77s and no_failed=false; 77s if found then 77s v_restart_required:=true; 77s end if; 77s end if; 77s 77s if v_restart_required then 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s -- ---- 77s -- Make sure the node daemon will restart 77s -- ---- 77s notify "_main_Restart"; 77s end if; 77s 77s 77s -- ---- 77s -- That is it - so far. 77s -- ---- 77s return p_failed_node; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.failedNode(p_failed_node int4, p_backup_node int4,p_failed_nodes integer[]) is 77s 'Initiate failover from failed_node to backup_node. This function must be called on all nodes, 77s and then waited for the restart of all node daemons.'; 77s COMMENT 77s create or replace function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8, p_failed_nodes integer[]) 77s returns bigint 77s as $$ 77s declare 77s v_row record; 77s v_new_event bigint; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s select * into v_row 77s from public.sl_event 77s where ev_origin = p_failed_node 77s and ev_seqno = p_ev_seqno; 77s if not found then 77s raise exception 'Slony-I: event %,% not found', 77s p_failed_node, p_ev_seqno; 77s end if; 77s 77s update public.sl_node set no_failed=true where no_id = ANY 77s (p_failed_nodes) and no_failed=false; 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s -- ---- 77s -- Make sure the node daemon will restart 77s -- ---- 77s raise notice 'calling restart node %',p_failed_node; 77s 77s notify "_main_Restart"; 77s 77s select public.createEvent('_main','FAILOVER_NODE', 77s p_failed_node::text,p_ev_seqno::text, 77s array_to_string(p_failed_nodes,',')) 77s into v_new_event; 77s 77s 77s return v_new_event; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.failedNode2 (p_failed_node int4, p_backup_node int4, p_ev_seqno int8,p_failed_nodes integer[] ) is 77s 'FUNCTION failedNode2 (failed_node, backup_node, set_id, ev_seqno, ev_seqfake,p_failed_nodes) 77s 77s On the node that has the highest sequence number of the failed node, 77s fake the FAILOVER_SET event.'; 77s COMMENT 77s create or replace function public.failedNode3 (p_failed_node int4, p_backup_node int4,p_seq_no bigint) 77s returns int4 77s as $$ 77s declare 77s 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s perform public.failoverSet_int(p_failed_node, 77s p_backup_node,p_seq_no); 77s 77s notify "_main_Restart"; 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s create or replace function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_last_seqno bigint) 77s returns int4 77s as $$ 77s declare 77s v_row record; 77s v_last_sync int8; 77s v_set int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s SELECT max(ev_seqno) into v_last_sync FROM public.sl_event where 77s ev_origin=p_failed_node; 77s if v_last_sync > p_last_seqno then 77s -- this node is ahead of the last sequence number from the 77s -- failed node that the backup node has. 77s -- this node must unsubscribe from all sets from the origin. 77s for v_set in select set_id from public.sl_set where 77s set_origin=p_failed_node 77s loop 77s raise warning 'Slony is dropping the subscription of set % found sync %s bigger than %s ' 77s , v_set, v_last_sync::text, p_last_seqno::text; 77s perform public.unsubscribeSet(v_set, 77s public.getLocalNodeId('_main'), 77s true); 77s end loop; 77s delete from public.sl_event where ev_origin=p_failed_node 77s and ev_seqno > p_last_seqno; 77s end if; 77s -- ---- 77s -- Change the origin of the set now to the backup node. 77s -- On the backup node this includes changing all the 77s -- trigger and protection stuff 77s for v_set in select set_id from public.sl_set where 77s set_origin=p_failed_node 77s loop 77s -- ---- 77s if p_backup_node = public.getLocalNodeId('_main') then 77s delete from public.sl_setsync 77s where ssy_setid = v_set; 77s delete from public.sl_subscribe 77s where sub_set = v_set 77s and sub_receiver = p_backup_node; 77s update public.sl_set 77s set set_origin = p_backup_node 77s where set_id = v_set; 77s update public.sl_subscribe 77s set sub_provider=p_backup_node 77s FROM public.sl_node receive_node 77s where sub_set = v_set 77s and sub_provider=p_failed_node 77s and sub_receiver=receive_node.no_id 77s and receive_node.no_failed=false; 77s 77s for v_row in select * from public.sl_table 77s where tab_set = v_set 77s order by tab_id 77s loop 77s perform public.alterTableConfigureTriggers(v_row.tab_id); 77s end loop; 77s else 77s raise notice 'deleting from sl_subscribe all rows with receiver %', 77s p_backup_node; 77s 77s delete from public.sl_subscribe 77s where sub_set = v_set 77s and sub_receiver = p_backup_node; 77s 77s update public.sl_subscribe 77s set sub_provider=p_backup_node 77s FROM public.sl_node receive_node 77s where sub_set = v_set 77s and sub_provider=p_failed_node 77s and sub_provider=p_failed_node 77s and sub_receiver=receive_node.no_id 77s and receive_node.no_failed=false; 77s update public.sl_set 77s set set_origin = p_backup_node 77s where set_id = v_set; 77s -- ---- 77s -- If we are a subscriber of the set ourself, change our 77s -- setsync status to reflect the new set origin. 77s -- ---- 77s if exists (select true from public.sl_subscribe 77s where sub_set = v_set 77s and sub_receiver = public.getLocalNodeId( 77s '_main')) 77s then 77s delete from public.sl_setsync 77s where ssy_setid = v_set; 77s 77s select coalesce(max(ev_seqno), 0) into v_last_sync 77s from public.sl_event 77s where ev_origin = p_backup_node 77s and ev_type = 'SYNC'; 77s if v_last_sync > 0 then 77s insert into public.sl_setsync 77s (ssy_setid, ssy_origin, ssy_seqno, 77s ssy_snapshot, ssy_action_list) 77s select v_set, p_backup_node, v_last_sync, 77s ev_snapshot, NULL 77s from public.sl_event 77s where ev_origin = p_backup_node 77s and ev_seqno = v_last_sync; 77s else 77s insert into public.sl_setsync 77s (ssy_setid, ssy_origin, ssy_seqno, 77s ssy_snapshot, ssy_action_list) 77s values (v_set, p_backup_node, '0', 77s '1:1:', NULL); 77s end if; 77s end if; 77s end if; 77s end loop; 77s 77s --If there are any subscriptions with 77s --the failed_node being the provider then 77s --we want to redirect those subscriptions 77s --to come from the backup node. 77s -- 77s -- The backup node should be a valid 77s -- provider for all subscriptions served 77s -- by the failed node. (otherwise it 77s -- wouldn't be a allowable backup node). 77s -- delete from public.sl_subscribe 77s -- where sub_receiver=p_backup_node; 77s 77s update public.sl_subscribe 77s set sub_provider=p_backup_node 77s from public.sl_node 77s where sub_provider=p_failed_node 77s and sl_node.no_id=sub_receiver 77s and sl_node.no_failed=false 77s and sub_receiver<>p_backup_node; 77s 77s update public.sl_subscribe 77s set sub_provider=(select set_origin from 77s public.sl_set where set_id= 77s sub_set) 77s where sub_provider=p_failed_node 77s and sub_receiver=p_backup_node; 77s 77s update public.sl_node 77s set no_active=false WHERE 77s no_id=p_failed_node; 77s 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s 77s return p_failed_node; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.failoverSet_int (p_failed_node int4, p_backup_node int4,p_seqno bigint) is 77s 'FUNCTION failoverSet_int (failed_node, backup_node, set_id, wait_seqno) 77s 77s Finish failover for one set.'; 77s COMMENT 77s create or replace function public.uninstallNode () 77s returns int4 77s as $$ 77s declare 77s v_tab_row record; 77s begin 77s raise notice 'Slony-I: Please drop schema "_main"'; 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.uninstallNode() is 77s 'Reset the whole database to standalone by removing the whole 77s replication system.'; 77s COMMENT 77s DROP FUNCTION IF EXISTS public.cloneNodePrepare(int4,int4,text); 77s DROP FUNCTION 77s create or replace function public.cloneNodePrepare (p_no_id int4, p_no_provider int4, p_no_comment text) 77s returns bigint 77s as $$ 77s begin 77s perform public.cloneNodePrepare_int (p_no_id, p_no_provider, p_no_comment); 77s return public.createEvent('_main', 'CLONE_NODE', 77s p_no_id::text, p_no_provider::text, 77s p_no_comment::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.cloneNodePrepare(p_no_id int4, p_no_provider int4, p_no_comment text) is 77s 'Prepare for cloning a node.'; 77s COMMENT 77s create or replace function public.cloneNodePrepare_int (p_no_id int4, p_no_provider int4, p_no_comment text) 77s returns int4 77s as $$ 77s declare 77s v_dummy int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s update public.sl_node set 77s no_active = np.no_active, 77s no_comment = np.no_comment, 77s no_failed = np.no_failed 77s from public.sl_node np 77s where np.no_id = p_no_provider 77s and sl_node.no_id = p_no_id; 77s if not found then 77s insert into public.sl_node 77s (no_id, no_active, no_comment,no_failed) 77s select p_no_id, no_active, p_no_comment, no_failed 77s from public.sl_node 77s where no_id = p_no_provider; 77s end if; 77s 77s insert into public.sl_path 77s (pa_server, pa_client, pa_conninfo, pa_connretry) 77s select pa_server, p_no_id, '', pa_connretry 77s from public.sl_path 77s where pa_client = p_no_provider 77s and (pa_server, p_no_id) not in (select pa_server, pa_client 77s from public.sl_path); 77s 77s insert into public.sl_path 77s (pa_server, pa_client, pa_conninfo, pa_connretry) 77s select p_no_id, pa_client, '', pa_connretry 77s from public.sl_path 77s where pa_server = p_no_provider 77s and (p_no_id, pa_client) not in (select pa_server, pa_client 77s from public.sl_path); 77s 77s insert into public.sl_subscribe 77s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 77s select sub_set, sub_provider, p_no_id, sub_forward, sub_active 77s from public.sl_subscribe 77s where sub_receiver = p_no_provider; 77s 77s insert into public.sl_confirm 77s (con_origin, con_received, con_seqno, con_timestamp) 77s select con_origin, p_no_id, con_seqno, con_timestamp 77s from public.sl_confirm 77s where con_received = p_no_provider; 77s 77s perform public.RebuildListenEntries(); 77s 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.cloneNodePrepare_int(p_no_id int4, p_no_provider int4, p_no_comment text) is 77s 'Internal part of cloneNodePrepare().'; 77s COMMENT 77s create or replace function public.cloneNodeFinish (p_no_id int4, p_no_provider int4) 77s returns int4 77s as $$ 77s declare 77s v_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s perform "pg_catalog".setval('public.sl_local_node_id', p_no_id); 77s perform public.resetSession(); 77s for v_row in select sub_set from public.sl_subscribe 77s where sub_receiver = p_no_id 77s loop 77s perform public.updateReloid(v_row.sub_set, p_no_id); 77s end loop; 77s 77s perform public.RebuildListenEntries(); 77s 77s delete from public.sl_confirm 77s where con_received = p_no_id; 77s insert into public.sl_confirm 77s (con_origin, con_received, con_seqno, con_timestamp) 77s select con_origin, p_no_id, con_seqno, con_timestamp 77s from public.sl_confirm 77s where con_received = p_no_provider; 77s insert into public.sl_confirm 77s (con_origin, con_received, con_seqno, con_timestamp) 77s select p_no_provider, p_no_id, 77s (select max(ev_seqno) from public.sl_event 77s where ev_origin = p_no_provider), current_timestamp; 77s 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.cloneNodeFinish(p_no_id int4, p_no_provider int4) is 77s 'Internal part of cloneNodePrepare().'; 77s COMMENT 77s create or replace function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 77s returns bigint 77s as $$ 77s begin 77s perform public.storePath_int(p_pa_server, p_pa_client, 77s p_pa_conninfo, p_pa_connretry); 77s return public.createEvent('_main', 'STORE_PATH', 77s p_pa_server::text, p_pa_client::text, 77s p_pa_conninfo::text, p_pa_connretry::text); 77s end; 77s $$ language plpgsql; 77s NOTICE: checked validity of cluster main namespace - OK! 77s NOTICE: function public.clonenodeprepare(int4,int4,text) does not exist, skipping 77s NOTICE: function public.ddlcapture(text,text) does not exist, skipping 77s NOTICE: function public.ddlscript_complete(int4,text,int4) does not exist, skipping 77s NOTICE: function public.ddlscript_complete_int(int4,int4) does not exist, skipping 77s CREATE FUNCTION 77s comment on function public.storePath (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 77s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 77s 77s Generate the STORE_PATH event indicating that node pa_client can 77s access node pa_server using DSN pa_conninfo'; 77s COMMENT 77s create or replace function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) 77s returns int4 77s as $$ 77s declare 77s v_dummy int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check if the path already exists 77s -- ---- 77s select 1 into v_dummy 77s from public.sl_path 77s where pa_server = p_pa_server 77s and pa_client = p_pa_client 77s for update; 77s if found then 77s -- ---- 77s -- Path exists, update pa_conninfo 77s -- ---- 77s update public.sl_path 77s set pa_conninfo = p_pa_conninfo, 77s pa_connretry = p_pa_connretry 77s where pa_server = p_pa_server 77s and pa_client = p_pa_client; 77s else 77s -- ---- 77s -- New path 77s -- 77s -- In case we receive STORE_PATH events before we know 77s -- about the nodes involved in this, we generate those nodes 77s -- as pending. 77s -- ---- 77s if not exists (select 1 from public.sl_node 77s where no_id = p_pa_server) then 77s perform public.storeNode_int (p_pa_server, ''); 77s end if; 77s if not exists (select 1 from public.sl_node 77s where no_id = p_pa_client) then 77s perform public.storeNode_int (p_pa_client, ''); 77s end if; 77s insert into public.sl_path 77s (pa_server, pa_client, pa_conninfo, pa_connretry) values 77s (p_pa_server, p_pa_client, p_pa_conninfo, p_pa_connretry); 77s end if; 77s 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.storePath_int (p_pa_server int4, p_pa_client int4, p_pa_conninfo text, p_pa_connretry int4) is 77s 'FUNCTION storePath (pa_server, pa_client, pa_conninfo, pa_connretry) 77s 77s Process the STORE_PATH event indicating that node pa_client can 77s access node pa_server using DSN pa_conninfo'; 77s COMMENT 77s create or replace function public.dropPath (p_pa_server int4, p_pa_client int4) 77s returns bigint 77s as $$ 77s declare 77s v_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- There should be no existing subscriptions. Auto unsubscribing 77s -- is considered too dangerous. 77s -- ---- 77s for v_row in select sub_set, sub_provider, sub_receiver 77s from public.sl_subscribe 77s where sub_provider = p_pa_server 77s and sub_receiver = p_pa_client 77s loop 77s raise exception 77s 'Slony-I: Path cannot be dropped, subscription of set % needs it', 77s v_row.sub_set; 77s end loop; 77s 77s -- ---- 77s -- Drop all sl_listen entries that depend on this path 77s -- ---- 77s for v_row in select li_origin, li_provider, li_receiver 77s from public.sl_listen 77s where li_provider = p_pa_server 77s and li_receiver = p_pa_client 77s loop 77s perform public.dropListen( 77s v_row.li_origin, v_row.li_provider, v_row.li_receiver); 77s end loop; 77s 77s -- ---- 77s -- Now drop the path and create the event 77s -- ---- 77s perform public.dropPath_int(p_pa_server, p_pa_client); 77s 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s return public.createEvent ('_main', 'DROP_PATH', 77s p_pa_server::text, p_pa_client::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.dropPath (p_pa_server int4, p_pa_client int4) is 77s 'Generate DROP_PATH event to drop path from pa_server to pa_client'; 77s COMMENT 77s create or replace function public.dropPath_int (p_pa_server int4, p_pa_client int4) 77s returns int4 77s as $$ 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Remove any dangling sl_listen entries with the server 77s -- as provider and the client as receiver. This must have 77s -- been cleared out before, but obviously was not. 77s -- ---- 77s delete from public.sl_listen 77s where li_provider = p_pa_server 77s and li_receiver = p_pa_client; 77s 77s delete from public.sl_path 77s where pa_server = p_pa_server 77s and pa_client = p_pa_client; 77s 77s if found then 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s return 1; 77s else 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s return 0; 77s end if; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.dropPath_int (p_pa_server int4, p_pa_client int4) is 77s 'Process DROP_PATH event to drop path from pa_server to pa_client'; 77s COMMENT 77s create or replace function public.storeListen (p_origin int4, p_provider int4, p_receiver int4) 77s returns bigint 77s as $$ 77s begin 77s perform public.storeListen_int (p_origin, p_provider, p_receiver); 77s return public.createEvent ('_main', 'STORE_LISTEN', 77s p_origin::text, p_provider::text, p_receiver::text); 77s end; 77s $$ language plpgsql 77s called on null input; 77s CREATE FUNCTION 77s comment on function public.storeListen(p_origin int4, p_provider int4, p_receiver int4) is 77s 'FUNCTION storeListen (li_origin, li_provider, li_receiver) 77s 77s generate STORE_LISTEN event, indicating that receiver node li_receiver 77s listens to node li_provider in order to get messages coming from node 77s li_origin.'; 77s COMMENT 77s create or replace function public.storeListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 77s returns int4 77s as $$ 77s declare 77s v_exists int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s select 1 into v_exists 77s from public.sl_listen 77s where li_origin = p_li_origin 77s and li_provider = p_li_provider 77s and li_receiver = p_li_receiver; 77s if not found then 77s -- ---- 77s -- In case we receive STORE_LISTEN events before we know 77s -- about the nodes involved in this, we generate those nodes 77s -- as pending. 77s -- ---- 77s if not exists (select 1 from public.sl_node 77s where no_id = p_li_origin) then 77s perform public.storeNode_int (p_li_origin, ''); 77s end if; 77s if not exists (select 1 from public.sl_node 77s where no_id = p_li_provider) then 77s perform public.storeNode_int (p_li_provider, ''); 77s end if; 77s if not exists (select 1 from public.sl_node 77s where no_id = p_li_receiver) then 77s perform public.storeNode_int (p_li_receiver, ''); 77s end if; 77s 77s insert into public.sl_listen 77s (li_origin, li_provider, li_receiver) values 77s (p_li_origin, p_li_provider, p_li_receiver); 77s end if; 77s 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.storeListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 77s 'FUNCTION storeListen_int (li_origin, li_provider, li_receiver) 77s 77s Process STORE_LISTEN event, indicating that receiver node li_receiver 77s listens to node li_provider in order to get messages coming from node 77s li_origin.'; 77s COMMENT 77s create or replace function public.dropListen (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 77s returns bigint 77s as $$ 77s begin 77s perform public.dropListen_int(p_li_origin, 77s p_li_provider, p_li_receiver); 77s 77s return public.createEvent ('_main', 'DROP_LISTEN', 77s p_li_origin::text, p_li_provider::text, p_li_receiver::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.dropListen(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 77s 'dropListen (li_origin, li_provider, li_receiver) 77s 77s Generate the DROP_LISTEN event.'; 77s COMMENT 77s create or replace function public.dropListen_int (p_li_origin int4, p_li_provider int4, p_li_receiver int4) 77s returns int4 77s as $$ 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s delete from public.sl_listen 77s where li_origin = p_li_origin 77s and li_provider = p_li_provider 77s and li_receiver = p_li_receiver; 77s if found then 77s return 1; 77s else 77s return 0; 77s end if; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.dropListen_int(p_li_origin int4, p_li_provider int4, p_li_receiver int4) is 77s 'dropListen (li_origin, li_provider, li_receiver) 77s 77s Process the DROP_LISTEN event, deleting the sl_listen entry for 77s the indicated (origin,provider,receiver) combination.'; 77s COMMENT 77s create or replace function public.storeSet (p_set_id int4, p_set_comment text) 77s returns bigint 77s as $$ 77s declare 77s v_local_node_id int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s v_local_node_id := public.getLocalNodeId('_main'); 77s 77s insert into public.sl_set 77s (set_id, set_origin, set_comment) values 77s (p_set_id, v_local_node_id, p_set_comment); 77s 77s return public.createEvent('_main', 'STORE_SET', 77s p_set_id::text, v_local_node_id::text, p_set_comment::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.storeSet(p_set_id int4, p_set_comment text) is 77s 'Generate STORE_SET event for set set_id with human readable comment set_comment'; 77s COMMENT 77s create or replace function public.storeSet_int (p_set_id int4, p_set_origin int4, p_set_comment text) 77s returns int4 77s as $$ 77s declare 77s v_dummy int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s select 1 into v_dummy 77s from public.sl_set 77s where set_id = p_set_id 77s for update; 77s if found then 77s update public.sl_set 77s set set_comment = p_set_comment 77s where set_id = p_set_id; 77s else 77s if not exists (select 1 from public.sl_node 77s where no_id = p_set_origin) then 77s perform public.storeNode_int (p_set_origin, ''); 77s end if; 77s insert into public.sl_set 77s (set_id, set_origin, set_comment) values 77s (p_set_id, p_set_origin, p_set_comment); 77s end if; 77s 77s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 77s perform public.addPartialLogIndices(); 77s 77s return p_set_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.storeSet_int(p_set_id int4, p_set_origin int4, p_set_comment text) is 77s 'storeSet_int (set_id, set_origin, set_comment) 77s 77s Process the STORE_SET event, indicating the new set with given ID, 77s origin node, and human readable comment.'; 77s COMMENT 77s create or replace function public.lockSet (p_set_id int4) 77s returns int4 77s as $$ 77s declare 77s v_local_node_id int4; 77s v_set_row record; 77s v_tab_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that the set exists and that we are the origin 77s -- and that it is not already locked. 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s select * into v_set_row from public.sl_set 77s where set_id = p_set_id 77s for update; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_set_id; 77s end if; 77s if v_set_row.set_origin <> v_local_node_id then 77s raise exception 'Slony-I: set % does not originate on local node', 77s p_set_id; 77s end if; 77s if v_set_row.set_locked notnull then 77s raise exception 'Slony-I: set % is already locked', p_set_id; 77s end if; 77s 77s -- ---- 77s -- Place the lockedSet trigger on all tables in the set. 77s -- ---- 77s for v_tab_row in select T.tab_id, 77s public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) as tab_fqname 77s from public.sl_table T, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 77s where T.tab_set = p_set_id 77s and T.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid 77s order by tab_id 77s loop 77s execute 'create trigger "_main_lockedset" ' || 77s 'before insert or update or delete on ' || 77s v_tab_row.tab_fqname || ' for each row execute procedure 77s public.lockedSet (''_main'');'; 77s end loop; 77s 77s -- ---- 77s -- Remember our snapshots xmax as for the set locking 77s -- ---- 77s update public.sl_set 77s set set_locked = "pg_catalog".txid_snapshot_xmax("pg_catalog".txid_current_snapshot()) 77s where set_id = p_set_id; 77s 77s return p_set_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.lockSet(p_set_id int4) is 77s 'lockSet(set_id) 77s 77s Add a special trigger to all tables of a set that disables access to 77s it.'; 77s COMMENT 77s create or replace function public.unlockSet (p_set_id int4) 77s returns int4 77s as $$ 77s declare 77s v_local_node_id int4; 77s v_set_row record; 77s v_tab_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that the set exists and that we are the origin 77s -- and that it is not already locked. 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s select * into v_set_row from public.sl_set 77s where set_id = p_set_id 77s for update; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_set_id; 77s end if; 77s if v_set_row.set_origin <> v_local_node_id then 77s raise exception 'Slony-I: set % does not originate on local node', 77s p_set_id; 77s end if; 77s if v_set_row.set_locked isnull then 77s raise exception 'Slony-I: set % is not locked', p_set_id; 77s end if; 77s 77s -- ---- 77s -- Drop the lockedSet trigger from all tables in the set. 77s -- ---- 77s for v_tab_row in select T.tab_id, 77s public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) as tab_fqname 77s from public.sl_table T, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 77s where T.tab_set = p_set_id 77s and T.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid 77s order by tab_id 77s loop 77s execute 'drop trigger "_main_lockedset" ' || 77s 'on ' || v_tab_row.tab_fqname; 77s end loop; 77s 77s -- ---- 77s -- Clear out the set_locked field 77s -- ---- 77s update public.sl_set 77s set set_locked = NULL 77s where set_id = p_set_id; 77s 77s return p_set_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.unlockSet(p_set_id int4) is 77s 'Remove the special trigger from all tables of a set that disables access to it.'; 77s COMMENT 77s create or replace function public.moveSet (p_set_id int4, p_new_origin int4) 77s returns bigint 77s as $$ 77s declare 77s v_local_node_id int4; 77s v_set_row record; 77s v_sub_row record; 77s v_sync_seqno int8; 77s v_lv_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that the set is locked and that this locking 77s -- happened long enough ago. 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s select * into v_set_row from public.sl_set 77s where set_id = p_set_id 77s for update; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_set_id; 77s end if; 77s if v_set_row.set_origin <> v_local_node_id then 77s raise exception 'Slony-I: set % does not originate on local node', 77s p_set_id; 77s end if; 77s if v_set_row.set_locked isnull then 77s raise exception 'Slony-I: set % is not locked', p_set_id; 77s end if; 77s if v_set_row.set_locked > "pg_catalog".txid_snapshot_xmin("pg_catalog".txid_current_snapshot()) then 77s raise exception 'Slony-I: cannot move set % yet, transactions < % are still in progress', 77s p_set_id, v_set_row.set_locked; 77s end if; 77s 77s -- ---- 77s -- Unlock the set 77s -- ---- 77s perform public.unlockSet(p_set_id); 77s 77s -- ---- 77s -- Check that the new_origin is an active subscriber of the set 77s -- ---- 77s select * into v_sub_row from public.sl_subscribe 77s where sub_set = p_set_id 77s and sub_receiver = p_new_origin; 77s if not found then 77s raise exception 'Slony-I: set % is not subscribed by node %', 77s p_set_id, p_new_origin; 77s end if; 77s if not v_sub_row.sub_active then 77s raise exception 'Slony-I: subsctiption of node % for set % is inactive', 77s p_new_origin, p_set_id; 77s end if; 77s 77s -- ---- 77s -- Reconfigure everything 77s -- ---- 77s perform public.moveSet_int(p_set_id, v_local_node_id, 77s p_new_origin, 0); 77s 77s perform public.RebuildListenEntries(); 77s 77s -- ---- 77s -- At this time we hold access exclusive locks for every table 77s -- in the set. But we did move the set to the new origin, so the 77s -- createEvent() we are doing now will not record the sequences. 77s -- ---- 77s v_sync_seqno := public.createEvent('_main', 'SYNC'); 77s insert into public.sl_seqlog 77s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 77s select seq_id, v_local_node_id, v_sync_seqno, seq_last_value 77s from public.sl_seqlastvalue 77s where seq_set = p_set_id; 77s 77s -- ---- 77s -- Finally we generate the real event 77s -- ---- 77s return public.createEvent('_main', 'MOVE_SET', 77s p_set_id::text, v_local_node_id::text, p_new_origin::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.moveSet(p_set_id int4, p_new_origin int4) is 77s 'moveSet(set_id, new_origin) 77s 77s Generate MOVE_SET event to request that the origin for set set_id be moved to node new_origin'; 77s COMMENT 77s create or replace function public.moveSet_int (p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) 77s returns int4 77s as $$ 77s declare 77s v_local_node_id int4; 77s v_tab_row record; 77s v_sub_row record; 77s v_sub_node int4; 77s v_sub_last int4; 77s v_sub_next int4; 77s v_last_sync int8; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Get our local node ID 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s 77s -- On the new origin, raise an event - ACCEPT_SET 77s if v_local_node_id = p_new_origin then 77s -- Create a SYNC event as well so that the ACCEPT_SET has 77s -- the same snapshot as the last SYNC generated by the new 77s -- origin. This snapshot will be used by other nodes to 77s -- finalize the setsync status. 77s perform public.createEvent('_main', 'SYNC', NULL); 77s perform public.createEvent('_main', 'ACCEPT_SET', 77s p_set_id::text, p_old_origin::text, 77s p_new_origin::text, p_wait_seqno::text); 77s end if; 77s 77s -- ---- 77s -- Next we have to reverse the subscription path 77s -- ---- 77s v_sub_last = p_new_origin; 77s select sub_provider into v_sub_node 77s from public.sl_subscribe 77s where sub_set = p_set_id 77s and sub_receiver = p_new_origin; 77s if not found then 77s raise exception 'Slony-I: subscription path broken in moveSet_int'; 77s end if; 77s while v_sub_node <> p_old_origin loop 77s -- ---- 77s -- Tracing node by node, the old receiver is now in 77s -- v_sub_last and the old provider is in v_sub_node. 77s -- ---- 77s 77s -- ---- 77s -- Get the current provider of this node as next 77s -- and change the provider to the previous one in 77s -- the reverse chain. 77s -- ---- 77s select sub_provider into v_sub_next 77s from public.sl_subscribe 77s where sub_set = p_set_id 77s and sub_receiver = v_sub_node 77s for update; 77s if not found then 77s raise exception 'Slony-I: subscription path broken in moveSet_int'; 77s end if; 77s update public.sl_subscribe 77s set sub_provider = v_sub_last 77s where sub_set = p_set_id 77s and sub_receiver = v_sub_node 77s and sub_receiver <> v_sub_last; 77s 77s v_sub_last = v_sub_node; 77s v_sub_node = v_sub_next; 77s end loop; 77s 77s -- ---- 77s -- This includes creating a subscription for the old origin 77s -- ---- 77s insert into public.sl_subscribe 77s (sub_set, sub_provider, sub_receiver, 77s sub_forward, sub_active) 77s values (p_set_id, v_sub_last, p_old_origin, true, true); 77s if v_local_node_id = p_old_origin then 77s select coalesce(max(ev_seqno), 0) into v_last_sync 77s from public.sl_event 77s where ev_origin = p_new_origin 77s and ev_type = 'SYNC'; 77s if v_last_sync > 0 then 77s insert into public.sl_setsync 77s (ssy_setid, ssy_origin, ssy_seqno, 77s ssy_snapshot, ssy_action_list) 77s select p_set_id, p_new_origin, v_last_sync, 77s ev_snapshot, NULL 77s from public.sl_event 77s where ev_origin = p_new_origin 77s and ev_seqno = v_last_sync; 77s else 77s insert into public.sl_setsync 77s (ssy_setid, ssy_origin, ssy_seqno, 77s ssy_snapshot, ssy_action_list) 77s values (p_set_id, p_new_origin, '0', 77s '1:1:', NULL); 77s end if; 77s end if; 77s 77s -- ---- 77s -- Now change the ownership of the set. 77s -- ---- 77s update public.sl_set 77s set set_origin = p_new_origin 77s where set_id = p_set_id; 77s 77s -- ---- 77s -- On the new origin, delete the obsolete setsync information 77s -- and the subscription. 77s -- ---- 77s if v_local_node_id = p_new_origin then 77s delete from public.sl_setsync 77s where ssy_setid = p_set_id; 77s else 77s if v_local_node_id <> p_old_origin then 77s -- 77s -- On every other node, change the setsync so that it will 77s -- pick up from the new origins last known sync. 77s -- 77s delete from public.sl_setsync 77s where ssy_setid = p_set_id; 77s select coalesce(max(ev_seqno), 0) into v_last_sync 77s from public.sl_event 77s where ev_origin = p_new_origin 77s and ev_type = 'SYNC'; 77s if v_last_sync > 0 then 77s insert into public.sl_setsync 77s (ssy_setid, ssy_origin, ssy_seqno, 77s ssy_snapshot, ssy_action_list) 77s select p_set_id, p_new_origin, v_last_sync, 77s ev_snapshot, NULL 77s from public.sl_event 77s where ev_origin = p_new_origin 77s and ev_seqno = v_last_sync; 77s else 77s insert into public.sl_setsync 77s (ssy_setid, ssy_origin, ssy_seqno, 77s ssy_snapshot, ssy_action_list) 77s values (p_set_id, p_new_origin, 77s '0', '1:1:', NULL); 77s end if; 77s end if; 77s end if; 77s delete from public.sl_subscribe 77s where sub_set = p_set_id 77s and sub_receiver = p_new_origin; 77s 77s -- Regenerate sl_listen since we revised the subscriptions 77s perform public.RebuildListenEntries(); 77s 77s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 77s perform public.addPartialLogIndices(); 77s 77s -- ---- 77s -- If we are the new or old origin, we have to 77s -- adjust the log and deny access trigger configuration. 77s -- ---- 77s if v_local_node_id = p_old_origin or v_local_node_id = p_new_origin then 77s for v_tab_row in select tab_id from public.sl_table 77s where tab_set = p_set_id 77s order by tab_id 77s loop 77s perform public.alterTableConfigureTriggers(v_tab_row.tab_id); 77s end loop; 77s end if; 77s 77s return p_set_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.moveSet_int(p_set_id int4, p_old_origin int4, p_new_origin int4, p_wait_seqno int8) is 77s 'moveSet(set_id, old_origin, new_origin, wait_seqno) 77s 77s Process MOVE_SET event to request that the origin for set set_id be 77s moved from old_origin to node new_origin'; 77s COMMENT 77s create or replace function public.dropSet (p_set_id int4) 77s returns bigint 77s as $$ 77s declare 77s v_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that the set exists and originates here 77s -- ---- 77s select set_origin into v_origin from public.sl_set 77s where set_id = p_set_id; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_set_id; 77s end if; 77s if v_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: set % does not originate on local node', 77s p_set_id; 77s end if; 77s 77s -- ---- 77s -- Call the internal drop set functionality and generate the event 77s -- ---- 77s perform public.dropSet_int(p_set_id); 77s return public.createEvent('_main', 'DROP_SET', 77s p_set_id::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.dropSet(p_set_id int4) is 77s 'Generate DROP_SET event to drop replication of set set_id'; 77s COMMENT 77s create or replace function public.dropSet_int (p_set_id int4) 77s returns int4 77s as $$ 77s declare 77s v_tab_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Restore all tables original triggers and rules and remove 77s -- our replication stuff. 77s -- ---- 77s for v_tab_row in select tab_id from public.sl_table 77s where tab_set = p_set_id 77s order by tab_id 77s loop 77s perform public.alterTableDropTriggers(v_tab_row.tab_id); 77s end loop; 77s 77s -- ---- 77s -- Remove all traces of the set configuration 77s -- ---- 77s delete from public.sl_sequence 77s where seq_set = p_set_id; 77s delete from public.sl_table 77s where tab_set = p_set_id; 77s delete from public.sl_subscribe 77s where sub_set = p_set_id; 77s delete from public.sl_setsync 77s where ssy_setid = p_set_id; 77s delete from public.sl_set 77s where set_id = p_set_id; 77s 77s -- Regenerate sl_listen since we revised the subscriptions 77s perform public.RebuildListenEntries(); 77s 77s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 77s perform public.addPartialLogIndices(); 77s 77s return p_set_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.dropSet(p_set_id int4) is 77s 'Process DROP_SET event to drop replication of set set_id. This involves: 77s - Removing log and deny access triggers 77s - Removing all traces of the set configuration, including sequences, tables, subscribers, syncs, and the set itself'; 77s COMMENT 77s create or replace function public.mergeSet (p_set_id int4, p_add_id int4) 77s returns bigint 77s as $$ 77s declare 77s v_origin int4; 77s in_progress boolean; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that both sets exist and originate here 77s -- ---- 77s if p_set_id = p_add_id then 77s raise exception 'Slony-I: merged set ids cannot be identical'; 77s end if; 77s select set_origin into v_origin from public.sl_set 77s where set_id = p_set_id; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_set_id; 77s end if; 77s if v_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: set % does not originate on local node', 77s p_set_id; 77s end if; 77s 77s select set_origin into v_origin from public.sl_set 77s where set_id = p_add_id; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_add_id; 77s end if; 77s if v_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: set % does not originate on local node', 77s p_add_id; 77s end if; 77s 77s -- ---- 77s -- Check that both sets are subscribed by the same set of nodes 77s -- ---- 77s if exists (select true from public.sl_subscribe SUB1 77s where SUB1.sub_set = p_set_id 77s and SUB1.sub_receiver not in (select SUB2.sub_receiver 77s from public.sl_subscribe SUB2 77s where SUB2.sub_set = p_add_id)) 77s then 77s raise exception 'Slony-I: subscriber lists of set % and % are different', 77s p_set_id, p_add_id; 77s end if; 77s 77s if exists (select true from public.sl_subscribe SUB1 77s where SUB1.sub_set = p_add_id 77s and SUB1.sub_receiver not in (select SUB2.sub_receiver 77s from public.sl_subscribe SUB2 77s where SUB2.sub_set = p_set_id)) 77s then 77s raise exception 'Slony-I: subscriber lists of set % and % are different', 77s p_add_id, p_set_id; 77s end if; 77s 77s -- ---- 77s -- Check that all ENABLE_SUBSCRIPTION events for the set are confirmed 77s -- ---- 77s select public.isSubscriptionInProgress(p_add_id) into in_progress ; 77s 77s if in_progress then 77s raise exception 'Slony-I: set % has subscriptions in progress - cannot merge', 77s p_add_id; 77s end if; 77s 77s -- ---- 77s -- Create a SYNC event, merge the sets, create a MERGE_SET event 77s -- ---- 77s perform public.createEvent('_main', 'SYNC', NULL); 77s perform public.mergeSet_int(p_set_id, p_add_id); 77s return public.createEvent('_main', 'MERGE_SET', 77s p_set_id::text, p_add_id::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.mergeSet(p_set_id int4, p_add_id int4) is 77s 'Generate MERGE_SET event to request that sets be merged together. 77s 77s Both sets must exist, and originate on the same node. They must be 77s subscribed by the same set of nodes.'; 77s COMMENT 77s create or replace function public.isSubscriptionInProgress(p_add_id int4) 77s returns boolean 77s as $$ 77s begin 77s if exists (select true from public.sl_event 77s where ev_type = 'ENABLE_SUBSCRIPTION' 77s and ev_data1 = p_add_id::text 77s and ev_seqno > (select max(con_seqno) from public.sl_confirm 77s where con_origin = ev_origin 77s and con_received::text = ev_data3)) 77s then 77s return true; 77s else 77s return false; 77s end if; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.isSubscriptionInProgress(p_add_id int4) is 77s 'Checks to see if a subscription for the indicated set is in progress. 77s Returns true if a subscription is in progress. Otherwise false'; 77s COMMENT 77s create or replace function public.mergeSet_int (p_set_id int4, p_add_id int4) 77s returns int4 77s as $$ 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s update public.sl_sequence 77s set seq_set = p_set_id 77s where seq_set = p_add_id; 77s update public.sl_table 77s set tab_set = p_set_id 77s where tab_set = p_add_id; 77s delete from public.sl_subscribe 77s where sub_set = p_add_id; 77s delete from public.sl_setsync 77s where ssy_setid = p_add_id; 77s delete from public.sl_set 77s where set_id = p_add_id; 77s 77s return p_set_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.mergeSet_int(p_set_id int4, p_add_id int4) is 77s 'mergeSet_int(set_id, add_id) - Perform MERGE_SET event, merging all objects from 77s set add_id into set set_id.'; 77s COMMENT 77s create or replace function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 77s returns bigint 77s as $$ 77s declare 77s v_set_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that we are the origin of the set 77s -- ---- 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = p_set_id; 77s if not found then 77s raise exception 'Slony-I: setAddTable(): set % not found', p_set_id; 77s end if; 77s if v_set_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: setAddTable(): set % has remote origin', p_set_id; 77s end if; 77s 77s if exists (select true from public.sl_subscribe 77s where sub_set = p_set_id) 77s then 77s raise exception 'Slony-I: cannot add table to currently subscribed set % - must attach to an unsubscribed set', 77s p_set_id; 77s end if; 77s 77s -- ---- 77s -- Add the table to the set and generate the SET_ADD_TABLE event 77s -- ---- 77s perform public.setAddTable_int(p_set_id, p_tab_id, p_fqname, 77s p_tab_idxname, p_tab_comment); 77s return public.createEvent('_main', 'SET_ADD_TABLE', 77s p_set_id::text, p_tab_id::text, p_fqname::text, 77s p_tab_idxname::text, p_tab_comment::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setAddTable(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 77s 'setAddTable (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 77s 77s Add table tab_fqname to replication set on origin node, and generate 77s SET_ADD_TABLE event to allow this to propagate to other nodes. 77s 77s Note that the table id, tab_id, must be unique ACROSS ALL SETS.'; 77s COMMENT 77s create or replace function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) 77s returns int4 77s as $$ 77s declare 77s v_tab_relname name; 77s v_tab_nspname name; 77s v_local_node_id int4; 77s v_set_origin int4; 77s v_sub_provider int4; 77s v_relkind char; 77s v_tab_reloid oid; 77s v_pkcand_nn boolean; 77s v_prec record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- For sets with a remote origin, check that we are subscribed 77s -- to that set. Otherwise we ignore the table because it might 77s -- not even exist in our database. 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = p_set_id; 77s if not found then 77s raise exception 'Slony-I: setAddTable_int(): set % not found', 77s p_set_id; 77s end if; 77s if v_set_origin != v_local_node_id then 77s select sub_provider into v_sub_provider 77s from public.sl_subscribe 77s where sub_set = p_set_id 77s and sub_receiver = public.getLocalNodeId('_main'); 77s if not found then 77s return 0; 77s end if; 77s end if; 77s 77s -- ---- 77s -- Get the tables OID and check that it is a real table 77s -- ---- 77s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname into v_tab_reloid, v_relkind, v_tab_relname, v_tab_nspname 77s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 77s where PGC.relnamespace = PGN.oid 77s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 77s '.' || public.slon_quote_brute(PGC.relname); 77s if not found then 77s raise exception 'Slony-I: setAddTable_int(): table % not found', 77s p_fqname; 77s end if; 77s if v_relkind != 'r' then 77s raise exception 'Slony-I: setAddTable_int(): % is not a regular table', 77s p_fqname; 77s end if; 77s 77s if not exists (select indexrelid 77s from "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGC 77s where PGX.indrelid = v_tab_reloid 77s and PGX.indexrelid = PGC.oid 77s and PGC.relname = p_tab_idxname) 77s then 77s raise exception 'Slony-I: setAddTable_int(): table % has no index %', 77s p_fqname, p_tab_idxname; 77s end if; 77s 77s -- ---- 77s -- Verify that the columns in the PK (or candidate) are not NULLABLE 77s -- ---- 77s 77s v_pkcand_nn := 'f'; 77s for v_prec in select attname from "pg_catalog".pg_attribute where attrelid = 77s (select oid from "pg_catalog".pg_class where oid = v_tab_reloid) 77s and attname in (select attname from "pg_catalog".pg_attribute where 77s attrelid = (select oid from "pg_catalog".pg_class PGC, 77s "pg_catalog".pg_index PGX where 77s PGC.relname = p_tab_idxname and PGX.indexrelid=PGC.oid and 77s PGX.indrelid = v_tab_reloid)) and attnotnull <> 't' 77s loop 77s raise notice 'Slony-I: setAddTable_int: table % PK column % nullable', p_fqname, v_prec.attname; 77s v_pkcand_nn := 't'; 77s end loop; 77s if v_pkcand_nn then 77s raise exception 'Slony-I: setAddTable_int: table % not replicable!', p_fqname; 77s end if; 77s 77s select * into v_prec from public.sl_table where tab_id = p_tab_id; 77s if not found then 77s v_pkcand_nn := 't'; -- No-op -- All is well 77s else 77s raise exception 'Slony-I: setAddTable_int: table id % has already been assigned!', p_tab_id; 77s end if; 77s 77s -- ---- 77s -- Add the table to sl_table and create the trigger on it. 77s -- ---- 77s insert into public.sl_table 77s (tab_id, tab_reloid, tab_relname, tab_nspname, 77s tab_set, tab_idxname, tab_altered, tab_comment) 77s values 77s (p_tab_id, v_tab_reloid, v_tab_relname, v_tab_nspname, 77s p_set_id, p_tab_idxname, false, p_tab_comment); 77s perform public.alterTableAddTriggers(p_tab_id); 77s 77s return p_tab_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setAddTable_int(p_set_id int4, p_tab_id int4, p_fqname text, p_tab_idxname name, p_tab_comment text) is 77s 'setAddTable_int (set_id, tab_id, tab_fqname, tab_idxname, tab_comment) 77s 77s This function processes the SET_ADD_TABLE event on remote nodes, 77s adding a table to replication if the remote node is subscribing to its 77s replication set.'; 77s COMMENT 77s create or replace function public.setDropTable(p_tab_id int4) 77s returns bigint 77s as $$ 77s declare 77s v_set_id int4; 77s v_set_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Determine the set_id 77s -- ---- 77s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 77s 77s -- ---- 77s -- Ensure table exists 77s -- ---- 77s if not found then 77s raise exception 'Slony-I: setDropTable_int(): table % not found', 77s p_tab_id; 77s end if; 77s 77s -- ---- 77s -- Check that we are the origin of the set 77s -- ---- 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = v_set_id; 77s if not found then 77s raise exception 'Slony-I: setDropTable(): set % not found', v_set_id; 77s end if; 77s if v_set_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: setDropTable(): set % has remote origin', v_set_id; 77s end if; 77s 77s -- ---- 77s -- Drop the table from the set and generate the SET_ADD_TABLE event 77s -- ---- 77s perform public.setDropTable_int(p_tab_id); 77s return public.createEvent('_main', 'SET_DROP_TABLE', 77s p_tab_id::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setDropTable(p_tab_id int4) is 77s 'setDropTable (tab_id) 77s 77s Drop table tab_id from set on origin node, and generate SET_DROP_TABLE 77s event to allow this to propagate to other nodes.'; 77s COMMENT 77s create or replace function public.setDropTable_int(p_tab_id int4) 77s returns int4 77s as $$ 77s declare 77s v_set_id int4; 77s v_local_node_id int4; 77s v_set_origin int4; 77s v_sub_provider int4; 77s v_tab_reloid oid; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Determine the set_id 77s -- ---- 77s select tab_set into v_set_id from public.sl_table where tab_id = p_tab_id; 77s 77s -- ---- 77s -- Ensure table exists 77s -- ---- 77s if not found then 77s return 0; 77s end if; 77s 77s -- ---- 77s -- For sets with a remote origin, check that we are subscribed 77s -- to that set. Otherwise we ignore the table because it might 77s -- not even exist in our database. 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = v_set_id; 77s if not found then 77s raise exception 'Slony-I: setDropTable_int(): set % not found', 77s v_set_id; 77s end if; 77s if v_set_origin != v_local_node_id then 77s select sub_provider into v_sub_provider 77s from public.sl_subscribe 77s where sub_set = v_set_id 77s and sub_receiver = public.getLocalNodeId('_main'); 77s if not found then 77s return 0; 77s end if; 77s end if; 77s 77s -- ---- 77s -- Drop the table from sl_table and drop trigger from it. 77s -- ---- 77s perform public.alterTableDropTriggers(p_tab_id); 77s delete from public.sl_table where tab_id = p_tab_id; 77s return p_tab_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setDropTable_int(p_tab_id int4) is 77s 'setDropTable_int (tab_id) 77s 77s This function processes the SET_DROP_TABLE event on remote nodes, 77s dropping a table from replication if the remote node is subscribing to 77s its replication set.'; 77s COMMENT 77s create or replace function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 77s returns bigint 77s as $$ 77s declare 77s v_set_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that we are the origin of the set 77s -- ---- 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = p_set_id; 77s if not found then 77s raise exception 'Slony-I: setAddSequence(): set % not found', p_set_id; 77s end if; 77s if v_set_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: setAddSequence(): set % has remote origin - submit to origin node', p_set_id; 77s end if; 77s 77s if exists (select true from public.sl_subscribe 77s where sub_set = p_set_id) 77s then 77s raise exception 'Slony-I: cannot add sequence to currently subscribed set %', 77s p_set_id; 77s end if; 77s 77s -- ---- 77s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 77s -- ---- 77s perform public.setAddSequence_int(p_set_id, p_seq_id, p_fqname, 77s p_seq_comment); 77s return public.createEvent('_main', 'SET_ADD_SEQUENCE', 77s p_set_id::text, p_seq_id::text, 77s p_fqname::text, p_seq_comment::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setAddSequence (p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 77s 'setAddSequence (set_id, seq_id, seq_fqname, seq_comment) 77s 77s On the origin node for set set_id, add sequence seq_fqname to the 77s replication set, and raise SET_ADD_SEQUENCE to cause this to replicate 77s to subscriber nodes.'; 77s COMMENT 77s create or replace function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) 77s returns int4 77s as $$ 77s declare 77s v_local_node_id int4; 77s v_set_origin int4; 77s v_sub_provider int4; 77s v_relkind char; 77s v_seq_reloid oid; 77s v_seq_relname name; 77s v_seq_nspname name; 77s v_sync_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- For sets with a remote origin, check that we are subscribed 77s -- to that set. Otherwise we ignore the sequence because it might 77s -- not even exist in our database. 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = p_set_id; 77s if not found then 77s raise exception 'Slony-I: setAddSequence_int(): set % not found', 77s p_set_id; 77s end if; 77s if v_set_origin != v_local_node_id then 77s select sub_provider into v_sub_provider 77s from public.sl_subscribe 77s where sub_set = p_set_id 77s and sub_receiver = public.getLocalNodeId('_main'); 77s if not found then 77s return 0; 77s end if; 77s end if; 77s 77s -- ---- 77s -- Get the sequences OID and check that it is a sequence 77s -- ---- 77s select PGC.oid, PGC.relkind, PGC.relname, PGN.nspname 77s into v_seq_reloid, v_relkind, v_seq_relname, v_seq_nspname 77s from "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 77s where PGC.relnamespace = PGN.oid 77s and public.slon_quote_input(p_fqname) = public.slon_quote_brute(PGN.nspname) || 77s '.' || public.slon_quote_brute(PGC.relname); 77s if not found then 77s raise exception 'Slony-I: setAddSequence_int(): sequence % not found', 77s p_fqname; 77s end if; 77s if v_relkind != 'S' then 77s raise exception 'Slony-I: setAddSequence_int(): % is not a sequence', 77s p_fqname; 77s end if; 77s 77s select 1 into v_sync_row from public.sl_sequence where seq_id = p_seq_id; 77s if not found then 77s v_relkind := 'o'; -- all is OK 77s else 77s raise exception 'Slony-I: setAddSequence_int(): sequence ID % has already been assigned', p_seq_id; 77s end if; 77s 77s -- ---- 77s -- Add the sequence to sl_sequence 77s -- ---- 77s insert into public.sl_sequence 77s (seq_id, seq_reloid, seq_relname, seq_nspname, seq_set, seq_comment) 77s values 77s (p_seq_id, v_seq_reloid, v_seq_relname, v_seq_nspname, p_set_id, p_seq_comment); 77s 77s -- ---- 77s -- On the set origin, fake a sl_seqlog row for the last sync event 77s -- ---- 77s if v_set_origin = v_local_node_id then 77s for v_sync_row in select coalesce (max(ev_seqno), 0) as ev_seqno 77s from public.sl_event 77s where ev_origin = v_local_node_id 77s and ev_type = 'SYNC' 77s loop 77s insert into public.sl_seqlog 77s (seql_seqid, seql_origin, seql_ev_seqno, 77s seql_last_value) values 77s (p_seq_id, v_local_node_id, v_sync_row.ev_seqno, 77s public.sequenceLastValue(p_fqname)); 77s end loop; 77s end if; 77s 77s return p_seq_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setAddSequence_int(p_set_id int4, p_seq_id int4, p_fqname text, p_seq_comment text) is 77s 'setAddSequence_int (set_id, seq_id, seq_fqname, seq_comment) 77s 77s This processes the SET_ADD_SEQUENCE event. On remote nodes that 77s subscribe to set_id, add the sequence to the replication set.'; 77s COMMENT 77s create or replace function public.setDropSequence (p_seq_id int4) 77s returns bigint 77s as $$ 77s declare 77s v_set_id int4; 77s v_set_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Determine set id for this sequence 77s -- ---- 77s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 77s 77s -- ---- 77s -- Ensure sequence exists 77s -- ---- 77s if not found then 77s raise exception 'Slony-I: setDropSequence_int(): sequence % not found', 77s p_seq_id; 77s end if; 77s 77s -- ---- 77s -- Check that we are the origin of the set 77s -- ---- 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = v_set_id; 77s if not found then 77s raise exception 'Slony-I: setDropSequence(): set % not found', v_set_id; 77s end if; 77s if v_set_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: setDropSequence(): set % has origin at another node - submit this to that node', v_set_id; 77s end if; 77s 77s -- ---- 77s -- Add the sequence to the set and generate the SET_ADD_SEQUENCE event 77s -- ---- 77s perform public.setDropSequence_int(p_seq_id); 77s return public.createEvent('_main', 'SET_DROP_SEQUENCE', 77s p_seq_id::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setDropSequence (p_seq_id int4) is 77s 'setDropSequence (seq_id) 77s 77s On the origin node for the set, drop sequence seq_id from replication 77s set, and raise SET_DROP_SEQUENCE to cause this to replicate to 77s subscriber nodes.'; 77s COMMENT 77s create or replace function public.setDropSequence_int(p_seq_id int4) 77s returns int4 77s as $$ 77s declare 77s v_set_id int4; 77s v_local_node_id int4; 77s v_set_origin int4; 77s v_sub_provider int4; 77s v_relkind char; 77s v_sync_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Determine set id for this sequence 77s -- ---- 77s select seq_set into v_set_id from public.sl_sequence where seq_id = p_seq_id; 77s 77s -- ---- 77s -- Ensure sequence exists 77s -- ---- 77s if not found then 77s return 0; 77s end if; 77s 77s -- ---- 77s -- For sets with a remote origin, check that we are subscribed 77s -- to that set. Otherwise we ignore the sequence because it might 77s -- not even exist in our database. 77s -- ---- 77s v_local_node_id := public.getLocalNodeId('_main'); 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = v_set_id; 77s if not found then 77s raise exception 'Slony-I: setDropSequence_int(): set % not found', 77s v_set_id; 77s end if; 77s if v_set_origin != v_local_node_id then 77s select sub_provider into v_sub_provider 77s from public.sl_subscribe 77s where sub_set = v_set_id 77s and sub_receiver = public.getLocalNodeId('_main'); 77s if not found then 77s return 0; 77s end if; 77s end if; 77s 77s -- ---- 77s -- drop the sequence from sl_sequence, sl_seqlog 77s -- ---- 77s delete from public.sl_seqlog where seql_seqid = p_seq_id; 77s delete from public.sl_sequence where seq_id = p_seq_id; 77s 77s return p_seq_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setDropSequence_int(p_seq_id int4) is 77s 'setDropSequence_int (seq_id) 77s 77s This processes the SET_DROP_SEQUENCE event. On remote nodes that 77s subscribe to the set containing sequence seq_id, drop the sequence 77s from the replication set.'; 77s COMMENT 77s create or replace function public.setMoveTable (p_tab_id int4, p_new_set_id int4) 77s returns bigint 77s as $$ 77s declare 77s v_old_set_id int4; 77s v_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Get the tables current set 77s -- ---- 77s select tab_set into v_old_set_id from public.sl_table 77s where tab_id = p_tab_id; 77s if not found then 77s raise exception 'Slony-I: table %d not found', p_tab_id; 77s end if; 77s 77s -- ---- 77s -- Check that both sets exist and originate here 77s -- ---- 77s if p_new_set_id = v_old_set_id then 77s raise exception 'Slony-I: set ids cannot be identical'; 77s end if; 77s select set_origin into v_origin from public.sl_set 77s where set_id = p_new_set_id; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_new_set_id; 77s end if; 77s if v_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: set % does not originate on local node', 77s p_new_set_id; 77s end if; 77s 77s select set_origin into v_origin from public.sl_set 77s where set_id = v_old_set_id; 77s if not found then 77s raise exception 'Slony-I: set % not found', v_old_set_id; 77s end if; 77s if v_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: set % does not originate on local node', 77s v_old_set_id; 77s end if; 77s 77s -- ---- 77s -- Check that both sets are subscribed by the same set of nodes 77s -- ---- 77s if exists (select true from public.sl_subscribe SUB1 77s where SUB1.sub_set = p_new_set_id 77s and SUB1.sub_receiver not in (select SUB2.sub_receiver 77s from public.sl_subscribe SUB2 77s where SUB2.sub_set = v_old_set_id)) 77s then 77s raise exception 'Slony-I: subscriber lists of set % and % are different', 77s p_new_set_id, v_old_set_id; 77s end if; 77s 77s if exists (select true from public.sl_subscribe SUB1 77s where SUB1.sub_set = v_old_set_id 77s and SUB1.sub_receiver not in (select SUB2.sub_receiver 77s from public.sl_subscribe SUB2 77s where SUB2.sub_set = p_new_set_id)) 77s then 77s raise exception 'Slony-I: subscriber lists of set % and % are different', 77s v_old_set_id, p_new_set_id; 77s end if; 77s 77s -- ---- 77s -- Change the set the table belongs to 77s -- ---- 77s perform public.createEvent('_main', 'SYNC', NULL); 77s perform public.setMoveTable_int(p_tab_id, p_new_set_id); 77s return public.createEvent('_main', 'SET_MOVE_TABLE', 77s p_tab_id::text, p_new_set_id::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 77s 'This generates the SET_MOVE_TABLE event. If the set that the table is 77s in is identically subscribed to the set that the table is to be moved 77s into, then the SET_MOVE_TABLE event is raised.'; 77s COMMENT 77s create or replace function public.setMoveTable_int (p_tab_id int4, p_new_set_id int4) 77s returns int4 77s as $$ 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Move the table to the new set 77s -- ---- 77s update public.sl_table 77s set tab_set = p_new_set_id 77s where tab_id = p_tab_id; 77s 77s return p_tab_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setMoveTable(p_tab_id int4, p_new_set_id int4) is 77s 'This processes the SET_MOVE_TABLE event. The table is moved 77s to the destination set.'; 77s COMMENT 77s create or replace function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) 77s returns bigint 77s as $$ 77s declare 77s v_old_set_id int4; 77s v_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Get the sequences current set 77s -- ---- 77s select seq_set into v_old_set_id from public.sl_sequence 77s where seq_id = p_seq_id; 77s if not found then 77s raise exception 'Slony-I: setMoveSequence(): sequence %d not found', p_seq_id; 77s end if; 77s 77s -- ---- 77s -- Check that both sets exist and originate here 77s -- ---- 77s if p_new_set_id = v_old_set_id then 77s raise exception 'Slony-I: setMoveSequence(): set ids cannot be identical'; 77s end if; 77s select set_origin into v_origin from public.sl_set 77s where set_id = p_new_set_id; 77s if not found then 77s raise exception 'Slony-I: setMoveSequence(): set % not found', p_new_set_id; 77s end if; 77s if v_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: setMoveSequence(): set % does not originate on local node', 77s p_new_set_id; 77s end if; 77s 77s select set_origin into v_origin from public.sl_set 77s where set_id = v_old_set_id; 77s if not found then 77s raise exception 'Slony-I: set % not found', v_old_set_id; 77s end if; 77s if v_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: set % does not originate on local node', 77s v_old_set_id; 77s end if; 77s 77s -- ---- 77s -- Check that both sets are subscribed by the same set of nodes 77s -- ---- 77s if exists (select true from public.sl_subscribe SUB1 77s where SUB1.sub_set = p_new_set_id 77s and SUB1.sub_receiver not in (select SUB2.sub_receiver 77s from public.sl_subscribe SUB2 77s where SUB2.sub_set = v_old_set_id)) 77s then 77s raise exception 'Slony-I: subscriber lists of set % and % are different', 77s p_new_set_id, v_old_set_id; 77s end if; 77s 77s if exists (select true from public.sl_subscribe SUB1 77s where SUB1.sub_set = v_old_set_id 77s and SUB1.sub_receiver not in (select SUB2.sub_receiver 77s from public.sl_subscribe SUB2 77s where SUB2.sub_set = p_new_set_id)) 77s then 77s raise exception 'Slony-I: subscriber lists of set % and % are different', 77s v_old_set_id, p_new_set_id; 77s end if; 77s 77s -- ---- 77s -- Change the set the sequence belongs to 77s -- ---- 77s perform public.setMoveSequence_int(p_seq_id, p_new_set_id); 77s return public.createEvent('_main', 'SET_MOVE_SEQUENCE', 77s p_seq_id::text, p_new_set_id::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setMoveSequence (p_seq_id int4, p_new_set_id int4) is 77s 'setMoveSequence(p_seq_id, p_new_set_id) - This generates the 77s SET_MOVE_SEQUENCE event, after validation, notably that both sets 77s exist, are distinct, and have exactly the same subscription lists'; 77s COMMENT 77s create or replace function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) 77s returns int4 77s as $$ 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Move the sequence to the new set 77s -- ---- 77s update public.sl_sequence 77s set seq_set = p_new_set_id 77s where seq_id = p_seq_id; 77s 77s return p_seq_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setMoveSequence_int (p_seq_id int4, p_new_set_id int4) is 77s 'setMoveSequence_int(p_seq_id, p_new_set_id) - processes the 77s SET_MOVE_SEQUENCE event, moving a sequence to another replication 77s set.'; 77s COMMENT 77s create or replace function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) returns int4 77s as $$ 77s declare 77s v_fqname text; 77s v_found integer; 77s begin 77s -- ---- 77s -- Get the sequences fully qualified name 77s -- ---- 77s select public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) into v_fqname 77s from public.sl_sequence SQ, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 77s where SQ.seq_id = p_seq_id 77s and SQ.seq_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid; 77s if not found then 77s if p_ignore_missing then 77s return null; 77s end if; 77s raise exception 'Slony-I: sequenceSetValue(): sequence % not found', p_seq_id; 77s end if; 77s 77s -- ---- 77s -- Update it to the new value 77s -- ---- 77s execute 'select setval(''' || v_fqname || 77s ''', ' || p_last_value::text || ')'; 77s 77s if p_ev_seqno is not null then 77s insert into public.sl_seqlog 77s (seql_seqid, seql_origin, seql_ev_seqno, seql_last_value) 77s values (p_seq_id, p_seq_origin, p_ev_seqno, p_last_value); 77s end if; 77s return p_seq_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.sequenceSetValue(p_seq_id int4, p_seq_origin int4, p_ev_seqno int8, p_last_value int8,p_ignore_missing bool) is 77s 'sequenceSetValue (seq_id, seq_origin, ev_seqno, last_value,ignore_missing) 77s Set sequence seq_id to have new value last_value. 77s '; 77s COMMENT 77s drop function if exists public.ddlCapture (p_statement text, p_nodes text); 77s DROP FUNCTION 77s create or replace function public.ddlCapture (p_statement text, p_nodes text) 77s returns bigint 77s as $$ 77s declare 77s c_local_node integer; 77s c_found_origin boolean; 77s c_node text; 77s c_cmdargs text[]; 77s c_nodeargs text; 77s c_delim text; 77s begin 77s c_local_node := public.getLocalNodeId('_main'); 77s 77s c_cmdargs = array_append('{}'::text[], p_statement); 77s c_nodeargs = ''; 77s if p_nodes is not null then 77s c_found_origin := 'f'; 77s -- p_nodes list needs to consist of a list of nodes that exist 77s -- and that include the current node ID 77s for c_node in select trim(node) from 77s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 77s if not exists 77s (select 1 from public.sl_node 77s where no_id = (c_node::integer)) then 77s raise exception 'ddlcapture(%,%) - node % does not exist!', 77s p_statement, p_nodes, c_node; 77s end if; 77s 77s if c_local_node = (c_node::integer) then 77s c_found_origin := 't'; 77s end if; 77s if length(c_nodeargs)>0 then 77s c_nodeargs = c_nodeargs ||','|| c_node; 77s else 77s c_nodeargs=c_node; 77s end if; 77s end loop; 77s 77s if not c_found_origin then 77s raise exception 77s 'ddlcapture(%,%) - origin node % not included in ONLY ON list!', 77s p_statement, p_nodes, c_local_node; 77s end if; 77s end if; 77s c_cmdargs = array_append(c_cmdargs,c_nodeargs); 77s c_delim=','; 77s c_cmdargs = array_append(c_cmdargs, 77s 77s (select public.string_agg( seq_id::text || c_delim 77s || c_local_node || 77s c_delim || seq_last_value) 77s FROM ( 77s select seq_id, 77s seq_last_value from public.sl_seqlastvalue 77s where seq_origin = c_local_node) as FOO 77s where NOT public.seqtrack(seq_id,seq_last_value) is NULL)); 77s insert into public.sl_log_script 77s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 77s values 77s (c_local_node, pg_catalog.txid_current(), 77s nextval('public.sl_action_seq'), 'S', c_cmdargs); 77s execute p_statement; 77s return currval('public.sl_action_seq'); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.ddlCapture (p_statement text, p_nodes text) is 77s 'Capture an SQL statement (usually DDL) that is to be literally replayed on subscribers'; 77s COMMENT 77s drop function if exists public.ddlScript_complete (int4, text, int4); 77s DROP FUNCTION 77s create or replace function public.ddlScript_complete (p_nodes text) 77s returns bigint 77s as $$ 77s declare 77s c_local_node integer; 77s c_found_origin boolean; 77s c_node text; 77s c_cmdargs text[]; 77s begin 77s c_local_node := public.getLocalNodeId('_main'); 77s 77s c_cmdargs = '{}'::text[]; 77s if p_nodes is not null then 77s c_found_origin := 'f'; 77s -- p_nodes list needs to consist o a list of nodes that exist 77s -- and that include the current node ID 77s for c_node in select trim(node) from 77s pg_catalog.regexp_split_to_table(p_nodes, ',') as node loop 77s if not exists 77s (select 1 from public.sl_node 77s where no_id = (c_node::integer)) then 77s raise exception 'ddlcapture(%,%) - node % does not exist!', 77s p_statement, p_nodes, c_node; 77s end if; 77s 77s if c_local_node = (c_node::integer) then 77s c_found_origin := 't'; 77s end if; 77s 77s c_cmdargs = array_append(c_cmdargs, c_node); 77s end loop; 77s 77s if not c_found_origin then 77s raise exception 77s 'ddlScript_complete(%) - origin node % not included in ONLY ON list!', 77s p_nodes, c_local_node; 77s end if; 77s end if; 77s 77s perform public.ddlScript_complete_int(); 77s 77s insert into public.sl_log_script 77s (log_origin, log_txid, log_actionseq, log_cmdtype, log_cmdargs) 77s values 77s (c_local_node, pg_catalog.txid_current(), 77s nextval('public.sl_action_seq'), 's', c_cmdargs); 77s 77s return currval('public.sl_action_seq'); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.ddlScript_complete(p_nodes text) is 77s 'ddlScript_complete(set_id, script, only_on_node) 77s 77s After script has run on origin, this fixes up relnames and 77s log trigger arguments and inserts the "fire ddlScript_complete_int() 77s log row into sl_log_script.'; 77s COMMENT 77s drop function if exists public.ddlScript_complete_int(int4, int4); 77s DROP FUNCTION 77s create or replace function public.ddlScript_complete_int () 77s returns int4 77s as $$ 77s begin 77s perform public.updateRelname(); 77s perform public.repair_log_triggers(true); 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.ddlScript_complete_int() is 77s 'ddlScript_complete_int() 77s 77s Complete processing the DDL_SCRIPT event.'; 77s COMMENT 77s create or replace function public.alterTableAddTriggers (p_tab_id int4) 77s returns int4 77s as $$ 77s declare 77s v_no_id int4; 77s v_tab_row record; 77s v_tab_fqname text; 77s v_tab_attkind text; 77s v_n int4; 77s v_trec record; 77s v_tgbad boolean; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Get our local node ID 77s -- ---- 77s v_no_id := public.getLocalNodeId('_main'); 77s 77s -- ---- 77s -- Get the sl_table row and the current origin of the table. 77s -- ---- 77s select T.tab_reloid, T.tab_set, T.tab_idxname, 77s S.set_origin, PGX.indexrelid, 77s public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) as tab_fqname 77s into v_tab_row 77s from public.sl_table T, public.sl_set S, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 77s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 77s where T.tab_id = p_tab_id 77s and T.tab_set = S.set_id 77s and T.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid 77s and PGX.indrelid = T.tab_reloid 77s and PGX.indexrelid = PGXC.oid 77s and PGXC.relname = T.tab_idxname 77s for update; 77s if not found then 77s raise exception 'Slony-I: alterTableAddTriggers(): Table with id % not found', p_tab_id; 77s end if; 77s v_tab_fqname = v_tab_row.tab_fqname; 77s 77s v_tab_attkind := public.determineAttKindUnique(v_tab_row.tab_fqname, 77s v_tab_row.tab_idxname); 77s 77s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 77s 77s -- ---- 77s -- Create the log and the deny access triggers 77s -- ---- 77s execute 'create trigger "_main_logtrigger"' || 77s ' after insert or update or delete on ' || 77s v_tab_fqname || ' for each row execute procedure public.logTrigger (' || 77s pg_catalog.quote_literal('_main') || ',' || 77s pg_catalog.quote_literal(p_tab_id::text) || ',' || 77s pg_catalog.quote_literal(v_tab_attkind) || ');'; 77s 77s execute 'create trigger "_main_denyaccess" ' || 77s 'before insert or update or delete on ' || 77s v_tab_fqname || ' for each row execute procedure ' || 77s 'public.denyAccess (' || pg_catalog.quote_literal('_main') || ');'; 77s 77s perform public.alterTableAddTruncateTrigger(v_tab_fqname, p_tab_id); 77s 77s perform public.alterTableConfigureTriggers (p_tab_id); 77s return p_tab_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.alterTableAddTriggers(p_tab_id int4) is 77s 'alterTableAddTriggers(tab_id) 77s 77s Adds the log and deny access triggers to a replicated table.'; 77s COMMENT 77s create or replace function public.alterTableDropTriggers (p_tab_id int4) 77s returns int4 77s as $$ 77s declare 77s v_no_id int4; 77s v_tab_row record; 77s v_tab_fqname text; 77s v_n int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Get our local node ID 77s -- ---- 77s v_no_id := public.getLocalNodeId('_main'); 77s 77s -- ---- 77s -- Get the sl_table row and the current tables origin. 77s -- ---- 77s select T.tab_reloid, T.tab_set, 77s S.set_origin, PGX.indexrelid, 77s public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) as tab_fqname 77s into v_tab_row 77s from public.sl_table T, public.sl_set S, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 77s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 77s where T.tab_id = p_tab_id 77s and T.tab_set = S.set_id 77s and T.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid 77s and PGX.indrelid = T.tab_reloid 77s and PGX.indexrelid = PGXC.oid 77s and PGXC.relname = T.tab_idxname 77s for update; 77s if not found then 77s raise exception 'Slony-I: alterTableDropTriggers(): Table with id % not found', p_tab_id; 77s end if; 77s v_tab_fqname = v_tab_row.tab_fqname; 77s 77s execute 'lock table ' || v_tab_fqname || ' in access exclusive mode'; 77s 77s -- ---- 77s -- Drop both triggers 77s -- ---- 77s execute 'drop trigger "_main_logtrigNOTICE: function public.subscribeset_int(int4,int4,int4,bool,bool) does not exist, skipping 77s NOTICE: function public.unsubscribeset(int4,int4,pg_catalog.bool) does not exist, skipping 77s NOTICE: function public.updaterelname(int4,int4) does not exist, skipping 77s NOTICE: function public.updatereloid(int4,int4) does not exist, skipping 77s ger" on ' || 77s v_tab_fqname; 77s 77s execute 'drop trigger "_main_denyaccess" on ' || 77s v_tab_fqname; 77s 77s perform public.alterTableDropTruncateTrigger(v_tab_fqname, p_tab_id); 77s 77s return p_tab_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.alterTableDropTriggers (p_tab_id int4) is 77s 'alterTableDropTriggers (tab_id) 77s 77s Remove the log and deny access triggers from a table.'; 77s COMMENT 77s create or replace function public.alterTableConfigureTriggers (p_tab_id int4) 77s returns int4 77s as $$ 77s declare 77s v_no_id int4; 77s v_tab_row record; 77s v_tab_fqname text; 77s v_n int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Get our local node ID 77s -- ---- 77s v_no_id := public.getLocalNodeId('_main'); 77s 77s -- ---- 77s -- Get the sl_table row and the current tables origin. 77s -- ---- 77s select T.tab_reloid, T.tab_set, 77s S.set_origin, PGX.indexrelid, 77s public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) as tab_fqname 77s into v_tab_row 77s from public.sl_table T, public.sl_set S, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN, 77s "pg_catalog".pg_index PGX, "pg_catalog".pg_class PGXC 77s where T.tab_id = p_tab_id 77s and T.tab_set = S.set_id 77s and T.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid 77s and PGX.indrelid = T.tab_reloid 77s and PGX.indexrelid = PGXC.oid 77s and PGXC.relname = T.tab_idxname 77s for update; 77s if not found then 77s raise exception 'Slony-I: alterTableConfigureTriggers(): Table with id % not found', p_tab_id; 77s end if; 77s v_tab_fqname = v_tab_row.tab_fqname; 77s 77s -- ---- 77s -- Configuration depends on the origin of the table 77s -- ---- 77s if v_tab_row.set_origin = v_no_id then 77s -- ---- 77s -- On the origin the log trigger is configured like a default 77s -- user trigger and the deny access trigger is disabled. 77s -- ---- 77s execute 'alter table ' || v_tab_fqname || 77s ' enable trigger "_main_logtrigger"'; 77s execute 'alter table ' || v_tab_fqname || 77s ' disable trigger "_main_denyaccess"'; 77s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 77s 'enable', 'disable'); 77s else 77s -- ---- 77s -- On a replica the log trigger is disabled and the 77s -- deny access trigger fires in origin session role. 77s -- ---- 77s execute 'alter table ' || v_tab_fqname || 77s ' disable trigger "_main_logtrigger"'; 77s execute 'alter table ' || v_tab_fqname || 77s ' enable trigger "_main_denyaccess"'; 77s perform public.alterTableConfigureTruncateTrigger(v_tab_fqname, 77s 'disable', 'enable'); 77s 77s end if; 77s 77s return p_tab_id; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.alterTableConfigureTriggers (p_tab_id int4) is 77s 'alterTableConfigureTriggers (tab_id) 77s 77s Set the enable/disable configuration for the replication triggers 77s according to the origin of the set.'; 77s COMMENT 77s create or replace function public.resubscribeNode (p_origin int4, 77s p_provider int4, p_receiver int4) 77s returns bigint 77s as $$ 77s declare 77s v_record record; 77s v_missing_sets text; 77s v_ev_seqno bigint; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- 77s -- Check that the receiver exists 77s -- 77s if not exists (select no_id from public.sl_node where no_id= 77s p_receiver) then 77s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_receiver; 77s end if; 77s 77s -- 77s -- Check that the provider exists 77s -- 77s if not exists (select no_id from public.sl_node where no_id= 77s p_provider) then 77s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_provider; 77s end if; 77s 77s 77s -- ---- 77s -- Check that this is called on the origin node 77s -- ---- 77s if p_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: subscribeSet() must be called on origin'; 77s end if; 77s 77s -- --- 77s -- Verify that the provider is either the origin or an active subscriber 77s -- Bug report #1362 77s -- --- 77s if p_origin <> p_provider then 77s for v_record in select sub1.sub_set from 77s public.sl_subscribe sub1 77s left outer join (public.sl_subscribe sub2 77s inner join 77s public.sl_set on ( 77s sl_set.set_id=sub2.sub_set 77s and sub2.sub_set=p_origin) 77s ) 77s ON ( sub1.sub_set = sub2.sub_set and 77s sub1.sub_receiver = p_provider and 77s sub1.sub_forward and sub1.sub_active 77s and sub2.sub_receiver=p_receiver) 77s 77s where sub2.sub_set is null 77s loop 77s v_missing_sets=v_missing_sets || ' ' || v_record.sub_set; 77s end loop; 77s if v_missing_sets is not null then 77s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, v_missing_sets; 77s end if; 77s end if; 77s 77s for v_record in select * from 77s public.sl_subscribe, public.sl_set where 77s sub_set=set_id and 77s sub_receiver=p_receiver 77s and set_origin=p_origin 77s loop 77s -- ---- 77s -- Create the SUBSCRIBE_SET event 77s -- ---- 77s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 77s v_record.sub_set::text, p_provider::text, p_receiver::text, 77s case v_record.sub_forward when true then 't' else 'f' end, 77s 'f' ); 77s 77s -- ---- 77s -- Call the internal procedure to store the subscription 77s -- ---- 77s perform public.subscribeSet_int(v_record.sub_set, 77s p_provider, 77s p_receiver, v_record.sub_forward, false); 77s end loop; 77s 77s return v_ev_seqno; 77s end; 77s $$ 77s language plpgsql; 77s CREATE FUNCTION 77s create or replace function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 77s returns bigint 77s as $$ 77s declare 77s v_set_origin int4; 77s v_ev_seqno int8; 77s v_ev_seqno2 int8; 77s v_rec record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- 77s -- Check that the receiver exists 77s -- 77s if not exists (select no_id from public.sl_node where no_id= 77s p_sub_receiver) then 77s raise exception 'Slony-I: subscribeSet() receiver % does not exist' , p_sub_receiver; 77s end if; 77s 77s -- 77s -- Check that the provider exists 77s -- 77s if not exists (select no_id from public.sl_node where no_id= 77s p_sub_provider) then 77s raise exception 'Slony-I: subscribeSet() provider % does not exist' , p_sub_provider; 77s end if; 77s 77s -- ---- 77s -- Check that the origin and provider of the set are remote 77s -- ---- 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = p_sub_set; 77s if not found then 77s raise exception 'Slony-I: subscribeSet(): set % not found', p_sub_set; 77s end if; 77s if v_set_origin = p_sub_receiver then 77s raise exception 77s 'Slony-I: subscribeSet(): set origin and receiver cannot be identical'; 77s end if; 77s if p_sub_receiver = p_sub_provider then 77s raise exception 77s 'Slony-I: subscribeSet(): set provider and receiver cannot be identical'; 77s end if; 77s -- ---- 77s -- Check that this is called on the origin node 77s -- ---- 77s if v_set_origin != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: subscribeSet() must be called on origin'; 77s end if; 77s 77s -- --- 77s -- Verify that the provider is either the origin or an active subscriber 77s -- Bug report #1362 77s -- --- 77s if v_set_origin <> p_sub_provider then 77s if not exists (select 1 from public.sl_subscribe 77s where sub_set = p_sub_set and 77s sub_receiver = p_sub_provider and 77s sub_forward and sub_active) then 77s raise exception 'Slony-I: subscribeSet(): provider % is not an active forwarding node for replication set %', p_sub_provider, p_sub_set; 77s end if; 77s end if; 77s 77s -- --- 77s -- Enforce that all sets from one origin are subscribed 77s -- using the same data provider per receiver. 77s -- ---- 77s if not exists (select 1 from public.sl_subscribe 77s where sub_set = p_sub_set and sub_receiver = p_sub_receiver) then 77s -- 77s -- New subscription - error out if we have any other subscription 77s -- from that origin with a different data provider. 77s -- 77s for v_rec in select sub_provider from public.sl_subscribe 77s join public.sl_set on set_id = sub_set 77s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 77s loop 77s if v_rec.sub_provider <> p_sub_provider then 77s raise exception 'Slony-I: subscribeSet(): wrong provider % - existing subscription from origin % users provider %', 77s p_sub_provider, v_set_origin, v_rec.sub_provider; 77s end if; 77s end loop; 77s else 77s -- 77s -- Existing subscription - in case the data provider changes and 77s -- there are other subscriptions, warn here. subscribeSet_int() 77s -- will currently change the data provider for those sets as well. 77s -- 77s for v_rec in select set_id, sub_provider from public.sl_subscribe 77s join public.sl_set on set_id = sub_set 77s where set_origin = v_set_origin and sub_receiver = p_sub_receiver 77s and set_id <> p_sub_set 77s loop 77s if v_rec.sub_provider <> p_sub_provider then 77s raise exception 'Slony-I: subscribeSet(): also data provider for set % use resubscribe instead', 77s v_rec.set_id; 77s end if; 77s end loop; 77s end if; 77s 77s -- ---- 77s -- Create the SUBSCRIBE_SET event 77s -- ---- 77s v_ev_seqno := public.createEvent('_main', 'SUBSCRIBE_SET', 77s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 77s case p_sub_forward when true then 't' else 'f' end, 77s case p_omit_copy when true then 't' else 'f' end 77s ); 77s 77s -- ---- 77s -- Call the internal procedure to store the subscription 77s -- ---- 77s v_ev_seqno2:=public.subscribeSet_int(p_sub_set, p_sub_provider, 77s p_sub_receiver, p_sub_forward, p_omit_copy); 77s 77s if v_ev_seqno2 is not null then 77s v_ev_seqno:=v_ev_seqno2; 77s end if; 77s 77s return v_ev_seqno; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.subscribeSet (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 77s 'subscribeSet (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 77s 77s Makes sure that the receiver is not the provider, then stores the 77s subscription, and publishes the SUBSCRIBE_SET event to other nodes. 77s 77s If omit_copy is true, then no data copy will be done. 77s '; 77s COMMENT 77s DROP FUNCTION IF EXISTS public.subscribeSet_int(int4,int4,int4,bool,bool); 77s DROP FUNCTION 77s create or replace function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) 77s returns int4 77s as $$ 77s declare 77s v_set_origin int4; 77s v_sub_row record; 77s v_seq_id bigint; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Lookup the set origin 77s -- ---- 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = p_sub_set; 77s if not found then 77s raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set; 77s end if; 77s 77s -- ---- 77s -- Provider change is only allowed for active sets 77s -- ---- 77s if p_sub_receiver = public.getLocalNodeId('_main') then 77s select sub_active into v_sub_row from public.sl_subscribe 77s where sub_set = p_sub_set 77s and sub_receiver = p_sub_receiver; 77s if found then 77s if not v_sub_row.sub_active then 77s raise exception 'Slony-I: subscribeSet_int(): set % is not active, cannot change provider', 77s p_sub_set; 77s end if; 77s end if; 77s end if; 77s 77s -- ---- 77s -- Try to change provider and/or forward for an existing subscription 77s -- ---- 77s update public.sl_subscribe 77s set sub_provider = p_sub_provider, 77s sub_forward = p_sub_forward 77s where sub_set = p_sub_set 77s and sub_receiver = p_sub_receiver; 77s if found then 77s 77s -- ---- 77s -- This is changing a subscriptoin. Make sure all sets from 77s -- this origin are subscribed using the same data provider. 77s -- For this we first check that the requested data provider 77s -- is subscribed to all the sets, the receiver is subscribed to. 77s -- ---- 77s for v_sub_row in select set_id from public.sl_set 77s join public.sl_subscribe on set_id = sub_set 77s where set_origin = v_set_origin 77s and sub_receiver = p_sub_receiver 77s and sub_set <> p_sub_set 77s loop 77s if not exists (select 1 from public.sl_subscribe 77s where sub_set = v_sub_row.set_id 77s and sub_receiver = p_sub_provider 77s and sub_active and sub_forward) 77s and not exists (select 1 from public.sl_set 77s where set_id = v_sub_row.set_id 77s and set_origin = p_sub_provider) 77s then 77s raise exception 'Slony-I: subscribeSet_int(): node % is not a forwarding subscriber for set %', 77s p_sub_provider, v_sub_row.set_id; 77s end if; 77s 77s -- ---- 77s -- New data provider offers this set as well, change that 77s -- subscription too. 77s -- ---- 77s update public.sl_subscribe 77s set sub_provider = p_sub_provider 77s where sub_set = v_sub_row.set_id 77s and sub_receiver = p_sub_receiver; 77s end loop; 77s 77s -- ---- 77s -- Rewrite sl_listen table 77s -- ---- 77s perform public.RebuildListenEntries(); 77s 77s return p_sub_set; 77s end if; 77s 77s -- ---- 77s -- Not found, insert a new one 77s -- ---- 77s if not exists (select true from public.sl_path 77s where pa_server = p_sub_provider 77s and pa_client = p_sub_receiver) 77s then 77s insert into public.sl_path 77s (pa_server, pa_client, pa_conninfo, pa_connretry) 77s values 77s (p_sub_provider, p_sub_receiver, 77s '', 10); 77s end if; 77s insert into public.sl_subscribe 77s (sub_set, sub_provider, sub_receiver, sub_forward, sub_active) 77s values (p_sub_set, p_sub_provider, p_sub_receiver, 77s p_sub_forward, false); 77s 77s -- ---- 77s -- If the set origin is here, then enable the subscription 77s -- ---- 77s if v_set_origin = public.getLocalNodeId('_main') then 77s select public.createEvent('_main', 'ENABLE_SUBSCRIPTION', 77s p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 77s case p_sub_forward when true then 't' else 'f' end, 77s case p_omit_copy when true then 't' else 'f' end 77s ) into v_seq_id; 77s perform public.enableSubscription(p_sub_set, 77s p_sub_provider, p_sub_receiver); 77s end if; 77s 77s -- ---- 77s -- Rewrite sl_listen table 77s -- ---- 77s perform public.RebuildListenEntries(); 77s 77s return p_sub_set; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.subscribeSet_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4, p_sub_forward bool, p_omit_copy bool) is 77s 'subscribeSet_int (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) 77s 77s Internal actions for subscribing receiver sub_receiver to subscription 77s set sub_set.'; 77s COMMENT 77s drop function IF EXISTS public.unsubscribeSet(int4,int4,boolean); 77s DROP FUNCTION 77s create or replace function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,p_force boolean) 77s returns bigint 77s as $$ 77s declare 77s v_tab_row record; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- Check that this is called on the receiver node 77s -- ---- 77s if p_sub_receiver != public.getLocalNodeId('_main') then 77s raise exception 'Slony-I: unsubscribeSet() must be called on receiver'; 77s end if; 77s 77s 77s 77s -- ---- 77s -- Check that this does not break any chains 77s -- ---- 77s if p_force=false and exists (select true from public.sl_subscribe 77s where sub_set = p_sub_set 77s and sub_provider = p_sub_receiver) 77s then 77s raise exception 'Slony-I: Cannot unsubscribe set % while being provider', 77s p_sub_set; 77s end if; 77s 77s if exists (select true from public.sl_subscribe 77s where sub_set = p_sub_set 77s and sub_provider = p_sub_receiver) 77s then 77s --delete the receivers of this provider. 77s --unsubscribeSet_int() will generate the event 77s --when it runs on the receiver. 77s delete from public.sl_subscribe 77s where sub_set=p_sub_set 77s and sub_provider=p_sub_receiver; 77s end if; 77s 77s -- ---- 77s -- Remove the replication triggers. 77s -- ---- 77s for v_tab_row in select tab_id from public.sl_table 77s where tab_set = p_sub_set 77s order by tab_id 77s loop 77s perform public.alterTableDropTriggers(v_tab_row.tab_id); 77s end loop; 77s 77s -- ---- 77s -- Remove the setsync status. This will also cause the 77s -- worker thread to ignore the set and stop replicating 77s -- right now. 77s -- ---- 77s delete from public.sl_setsync 77s where ssy_setid = p_sub_set; 77s 77s -- ---- 77s -- Remove all sl_table and sl_sequence entries for this set. 77s -- Should we ever subscribe again, the initial data 77s -- copy process will create new ones. 77s -- ---- 77s delete from public.sl_table 77s where tab_set = p_sub_set; 77s delete from public.sl_sequence 77s where seq_set = p_sub_set; 77s 77s -- ---- 77s -- Call the internal procedure to drop the subscription 77s -- ---- 77s perform public.unsubscribeSet_int(p_sub_set, p_sub_receiver); 77s 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s -- ---- 77s -- Create the UNSUBSCRIBE_SET event 77s -- ---- 77s return public.createEvent('_main', 'UNSUBSCRIBE_SET', 77s p_sub_set::text, p_sub_receiver::text); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.unsubscribeSet (p_sub_set int4, p_sub_receiver int4,force boolean) is 77s 'unsubscribeSet (sub_set, sub_receiver,force) 77s 77s Unsubscribe node sub_receiver from subscription set sub_set. This is 77s invoked on the receiver node. It verifies that this does not break 77s any chains (e.g. - where sub_receiver is a provider for another node), 77s then restores tables, drops Slony-specific keys, drops table entries 77s for the set, drops the subscription, and generates an UNSUBSCRIBE_SET 77s node to publish that the node is being dropped.'; 77s COMMENT 77s create or replace function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) 77s returns int4 77s as $$ 77s declare 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- All the real work is done before event generation on the 77s -- subscriber. 77s -- ---- 77s 77s --if this event unsubscribes the provider of this node 77s --then this node should unsubscribe itself from the set as well. 77s 77s if exists (select true from 77s public.sl_subscribe where 77s sub_set=p_sub_set and sub_provider=p_sub_receiver 77s and sub_receiver=public.getLocalNodeId('_main')) 77s then 77s perform public.unsubscribeSet(p_sub_set,public.getLocalNodeId('_main'),true); 77s end if; 77s 77s 77s delete from public.sl_subscribe 77s where sub_set = p_sub_set 77s and sub_receiver = p_sub_receiver; 77s 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s return p_sub_set; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.unsubscribeSet_int (p_sub_set int4, p_sub_receiver int4) is 77s 'unsubscribeSet_int (sub_set, sub_receiver) 77s 77s All the REAL work of removing the subscriber is done before the event 77s is generated, so this function just has to drop the references to the 77s subscription in sl_subscribe.'; 77s COMMENT 77s create or replace function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 77s returns int4 77s as $$ 77s begin 77s return public.enableSubscription_int (p_sub_set, 77s p_sub_provider, p_sub_receiver); 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.enableSubscription (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 77s 'enableSubscription (sub_set, sub_provider, sub_receiver) 77s 77s Indicates that sub_receiver intends subscribing to set sub_set from 77s sub_provider. Work is all done by the internal function 77s enableSubscription_int (sub_set, sub_provider, sub_receiver).'; 77s COMMENT 77s create or replace function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) 77s returns int4 77s as $$ 77s declare 77s v_n int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- ---- 77s -- The real work is done in the replication engine. All 77s -- we have to do here is remembering that it happened. 77s -- ---- 77s 77s -- ---- 77s -- Well, not only ... we might be missing an important event here 77s -- ---- 77s if not exists (select true from public.sl_path 77s where pa_server = p_sub_provider 77s and pa_client = p_sub_receiver) 77s then 77s insert into public.sl_path 77s (pa_server, pa_client, pa_conninfo, pa_connretry) 77s values 77s (p_sub_provider, p_sub_receiver, 77s '', 10); 77s end if; 77s 77s update public.sl_subscribe 77s set sub_active = 't' 77s where sub_set = p_sub_set 77s and sub_receiver = p_sub_receiver; 77s get diagnostics v_n = row_count; 77s if v_n = 0 then 77s insert into public.sl_subscribe 77s (sub_set, sub_provider, sub_receiver, 77s sub_forward, sub_active) 77s values 77s (p_sub_set, p_sub_provider, p_sub_receiver, 77s false, true); 77s end if; 77s 77s -- Rewrite sl_listen table 77s perform public.RebuildListenEntries(); 77s 77s return p_sub_set; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.enableSubscription_int (p_sub_set int4, p_sub_provider int4, p_sub_receiver int4) is 77s 'enableSubscription_int (sub_set, sub_provider, sub_receiver) 77s 77s Internal function to enable subscription of node sub_receiver to set 77s sub_set via node sub_provider. 77s 77s slon does most of the work; all we need do here is to remember that it 77s happened. The function updates sl_subscribe, indicating that the 77s subscription has become active.'; 77s COMMENT 77s create or replace function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) 77s returns bigint 77s as $$ 77s declare 77s v_max_seqno bigint; 77s begin 77s select into v_max_seqno coalesce(max(con_seqno), 0) 77s from public.sl_confirm 77s where con_origin = p_con_origin 77s and con_received = p_con_received; 77s if v_max_seqno < p_con_seqno then 77s insert into public.sl_confirm 77s (con_origin, con_received, con_seqno, con_timestamp) 77s values (p_con_origin, p_con_received, p_con_seqno, 77s p_con_timestamp); 77s v_max_seqno = p_con_seqno; 77s end if; 77s 77s return v_max_seqno; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.forwardConfirm (p_con_origin int4, p_con_received int4, p_con_seqno int8, p_con_timestamp timestamp) is 77s 'forwardConfirm (p_con_origin, p_con_received, p_con_seqno, p_con_timestamp) 77s 77s Confirms (recorded in sl_confirm) that items from p_con_origin up to 77s p_con_seqno have been received by node p_con_received as of 77s p_con_timestamp, and raises an event to forward this confirmation.'; 77s COMMENT 77s create or replace function public.cleanupEvent (p_interval interval) 77s returns int4 77s as $$ 77s declare 77s v_max_row record; 77s v_min_row record; 77s v_max_sync int8; 77s v_origin int8; 77s v_seqno int8; 77s v_xmin bigint; 77s v_rc int8; 77s begin 77s -- ---- 77s -- First remove all confirmations where origin/receiver no longer exist 77s -- ---- 77s delete from public.sl_confirm 77s where con_origin not in (select no_id from public.sl_node); 77s delete from public.sl_confirm 77s where con_received not in (select no_id from public.sl_node); 77s -- ---- 77s -- Next remove all but the oldest confirm row per origin,receiver pair. 77s -- Ignore confirmations that are younger than 10 minutes. We currently 77s -- have an not confirmed suspicion that a possibly lost transaction due 77s -- to a server crash might have been visible to another session, and 77s -- that this led to log data that is needed again got removed. 77s -- ---- 77s for v_max_row in select con_origin, con_received, max(con_seqno) as con_seqno 77s from public.sl_confirm 77s where con_timestamp < (CURRENT_TIMESTAMP - p_interval) 77s group by con_origin, con_received 77s loop 77s delete from public.sl_confirm 77s where con_origin = v_max_row.con_origin 77s and con_received = v_max_row.con_received 77s and con_seqno < v_max_row.con_seqno; 77s end loop; 77s 77s -- ---- 77s -- Then remove all events that are confirmed by all nodes in the 77s -- whole cluster up to the last SYNC 77s -- ---- 77s for v_min_row in select con_origin, min(con_seqno) as con_seqno 77s from public.sl_confirm 77s group by con_origin 77s loop 77s select coalesce(max(ev_seqno), 0) into v_max_sync 77s from public.sl_event 77s where ev_origin = v_min_row.con_origin 77s and ev_seqno <= v_min_row.con_seqno 77s and ev_type = 'SYNC'; 77s if v_max_sync > 0 then 77s delete from public.sl_event 77s where ev_origin = v_min_row.con_origin 77s and ev_seqno < v_max_sync; 77s end if; 77s end loop; 77s 77s -- ---- 77s -- If cluster has only one node, then remove all events up to 77s -- the last SYNC - Bug #1538 77s -- http://gborg.postgresql.org/project/slony1/bugs/bugupdate.php?1538 77s -- ---- 77s 77s select * into v_min_row from public.sl_node where 77s no_id <> public.getLocalNodeId('_main') limit 1; 77s if not found then 77s select ev_origin, ev_seqno into v_min_row from public.sl_event 77s where ev_origin = public.getLocalNodeId('_main') 77s order by ev_origin desc, ev_seqno desc limit 1; 77s raise notice 'Slony-I: cleanupEvent(): Single node - deleting events < %', v_min_row.ev_seqno; 77s delete from public.sl_event 77s where 77s ev_origin = v_min_row.ev_origin and 77s ev_seqno < v_min_row.ev_seqno; 77s 77s end if; 77s 77s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_seqlog' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 77s execute 'alter table public.sl_seqlog set without oids;'; 77s end if; 77s -- ---- 77s -- Also remove stale entries from the nodelock table. 77s -- ---- 77s perform public.cleanupNodelock(); 77s 77s -- ---- 77s -- Find the eldest event left, for each origin 77s -- ---- 77s for v_origin, v_seqno, v_xmin in 77s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 77s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 77s loop 77s delete from public.sl_seqlog where seql_origin = v_origin and seql_ev_seqno < v_seqno; 77s delete from public.sl_log_script where log_origin = v_origin and log_txid < v_xmin; 77s end loop; 77s 77s v_rc := public.logswitch_finish(); 77s if v_rc = 0 then -- no switch in progress 77s perform public.logswitch_start(); 77s end if; 77s 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.cleanupEvent (p_interval interval) is 77s 'cleaning old data out of sl_confirm, sl_event. Removes all but the 77s last sl_confirm row per (origin,receiver), and then removes all events 77s that are confirmed by all nodes in the whole cluster up to the last 77s SYNC.'; 77s COMMENT 77s create or replace function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) returns name 77s as $$ 77s declare 77s v_tab_fqname_quoted text default ''; 77s v_idxrow record; 77s begin 77s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 77s -- 77s -- Ensure that the table exists 77s -- 77s if (select PGC.relname 77s from "pg_catalog".pg_class PGC, 77s "pg_catalog".pg_namespace PGN 77s where public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 77s and PGN.oid = PGC.relnamespace) is null then 77s raise exception 'Slony-I: determineIdxnameUnique(): table % not found', v_tab_fqname_quoted; 77s end if; 77s 77s -- 77s -- Lookup the tables primary key or the specified unique index 77s -- 77s if p_idx_name isnull then 77s select PGXC.relname 77s into v_idxrow 77s from "pg_catalog".pg_class PGC, 77s "pg_catalog".pg_namespace PGN, 77s "pg_catalog".pg_index PGX, 77s "pg_catalog".pg_class PGXC 77s where public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 77s and PGN.oid = PGC.relnamespace 77s and PGX.indrelid = PGC.oid 77s and PGX.indexrelid = PGXC.oid 77s and PGX.indisprimary; 77s if not found then 77s raise exception 'Slony-I: table % has no primary key', 77s v_tab_fqname_quoted; 77s end if; 77s else 77s select PGXC.relname 77s into v_idxrow 77s from "pg_catalog".pg_class PGC, 77s "pg_catalog".pg_namespace PGN, 77s "pg_catalog".pg_index PGX, 77s "pg_catalog".pg_class PGXC 77s where public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 77s and PGN.oid = PGC.relnamespace 77s and PGX.indrelid = PGC.oid 77s and PGX.indexrelid = PGXC.oid 77s and PGX.indisunique 77s and public.slon_quote_brute(PGXC.relname) = public.slon_quote_input(p_idx_name); 77s if not found then 77s raise exception 'Slony-I: table % has no unique index %', 77s v_tab_fqname_quoted, p_idx_name; 77s end if; 77s end if; 77s 77s -- 77s -- Return the found index name 77s -- 77s return v_idxrow.relname; 77s end; 77s $$ language plpgsql called on null input; 77s CREATE FUNCTION 77s comment on function public.determineIdxnameUnique(p_tab_fqname text, p_idx_name name) is 77s 'FUNCTION determineIdxnameUnique (tab_fqname, indexname) 77s 77s Given a tablename, tab_fqname, check that the unique index, indexname, 77s exists or return the primary key index name for the table. If there 77s is no unique index, it raises an exception.'; 77s COMMENT 77s create or replace function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) returns text 77s as $$ 77s declare 77s v_tab_fqname_quoted text default ''; 77s v_idx_name_quoted text; 77s v_idxrow record; 77s v_attrow record; 77s v_i integer; 77s v_attno int2; 77s v_attkind text default ''; 77s v_attfound bool; 77s begin 77s v_tab_fqname_quoted := public.slon_quote_input(p_tab_fqname); 77s v_idx_name_quoted := public.slon_quote_brute(p_idx_name); 77s -- 77s -- Ensure that the table exists 77s -- 77s if (select PGC.relname 77s from "pg_catalog".pg_class PGC, 77s "pg_catalog".pg_namespace PGN 77s where public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 77s and PGN.oid = PGC.relnamespace) is null then 77s raise exception 'Slony-I: table % not found', v_tab_fqname_quoted; 77s end if; 77s 77s -- 77s -- Lookup the tables primary key or the specified unique index 77s -- 77s if p_idx_name isnull then 77s raise exception 'Slony-I: index name must be specified'; 77s else 77s select PGXC.relname, PGX.indexrelid, PGX.indkey 77s into v_idxrow 77s from "pg_catalog".pg_class PGC, 77s "pg_catalog".pg_namespace PGN, 77s "pg_catalog".pg_index PGX, 77s "pg_catalog".pg_class PGXC 77s where public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 77s and PGN.oid = PGC.relnamespace 77s and PGX.indrelid = PGC.oid 77s and PGX.indexrelid = PGXC.oid 77s and PGX.indisunique 77s and public.slon_quote_brute(PGXC.relname) = v_idx_name_quoted; 77s if not found then 77s raise exception 'Slony-I: table % has no unique index %', 77s v_tab_fqname_quoted, v_idx_name_quoted; 77s end if; 77s end if; 77s 77s -- 77s -- Loop over the tables attributes and check if they are 77s -- index attributes. If so, add a "k" to the return value, 77s -- otherwise add a "v". 77s -- 77s for v_attrow in select PGA.attnum, PGA.attname 77s from "pg_catalog".pg_class PGC, 77s "pg_catalog".pg_namespace PGN, 77s "pg_catalog".pg_attribute PGA 77s where public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) = v_tab_fqname_quoted 77s and PGN.oid = PGC.relnamespace 77s and PGA.attrelid = PGC.oid 77s and not PGA.attisdropped 77s and PGA.attnum > 0 77s order by attnum 77s loop 77s v_attfound = 'f'; 77s 77s v_i := 0; 77s loop 77s select indkey[v_i] into v_attno from "pg_catalog".pg_index 77s where indexrelid = v_idxrow.indexrelid; 77s if v_attno isnull or v_attno = 0 then 77s exit; 77s end if; 77s if v_attrow.attnum = v_attno then 77s v_attfound = 't'; 77s exit; 77s end if; 77s v_i := v_i + 1; 77s end loop; 77s 77s if v_attfound then 77s v_attkind := v_attkind || 'k'; 77s else 77s v_attkind := v_attkind || 'v'; 77s end if; 77s end loop; 77s 77s -- Strip off trailing v characters as they are not needed by the logtrigger 77s v_attkind := pg_catalog.rtrim(v_attkind, 'v'); 77s 77s -- 77s -- Return the resulting attkind 77s -- 77s return v_attkind; 77s end; 77s $$ language plpgsql called on null input; 77s CREATE FUNCTION 77s comment on function public.determineAttkindUnique(p_tab_fqname text, p_idx_name name) is 77s 'determineAttKindUnique (tab_fqname, indexname) 77s 77s Given a tablename, return the Slony-I specific attkind (used for the 77s log trigger) of the table. Use the specified unique index or the 77s primary key (if indexname is NULL).'; 77s COMMENT 77s create or replace function public.RebuildListenEntries() 77s returns int 77s as $$ 77s declare 77s v_row record; 77s v_cnt integer; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s -- First remove the entire configuration 77s delete from public.sl_listen; 77s 77s -- Second populate the sl_listen configuration with a full 77s -- network of all possible paths. 77s insert into public.sl_listen 77s (li_origin, li_provider, li_receiver) 77s select pa_server, pa_server, pa_client from public.sl_path; 77s while true loop 77s insert into public.sl_listen 77s (li_origin, li_provider, li_receiver) 77s select distinct li_origin, pa_server, pa_client 77s from public.sl_listen, public.sl_path 77s where li_receiver = pa_server 77s and li_origin <> pa_client 77s and pa_conninfo<>'' 77s except 77s select li_origin, li_provider, li_receiver 77s from public.sl_listen; 77s 77s if not found then 77s exit; 77s end if; 77s end loop; 77s 77s -- We now replace specific event-origin,receiver combinations 77s -- with a configuration that tries to avoid events arriving at 77s -- a node before the data provider actually has the data ready. 77s 77s -- Loop over every possible pair of receiver and event origin 77s for v_row in select N1.no_id as receiver, N2.no_id as origin, 77s N2.no_failed as failed 77s from public.sl_node as N1, public.sl_node as N2 77s where N1.no_id <> N2.no_id 77s loop 77s -- 1st choice: 77s -- If we use the event origin as a data provider for any 77s -- set that originates on that very node, we are a direct 77s -- subscriber to that origin and listen there only. 77s if exists (select true from public.sl_set, public.sl_subscribe , public.sl_node p 77s where set_origin = v_row.origin 77s and sub_set = set_id 77s and sub_provider = v_row.origin 77s and sub_receiver = v_row.receiver 77s and sub_active 77s and p.no_active 77s and p.no_id=sub_provider 77s ) 77s then 77s delete from public.sl_listen 77s where li_origin = v_row.origin 77s and li_receiver = v_row.receiver; 77s insert into public.sl_listen (li_origin, li_provider, li_receiver) 77s values (v_row.origin, v_row.origin, v_row.receiver); 77s 77s -- 2nd choice: 77s -- If we are subscribed to any set originating on this 77s -- event origin, we want to listen on all data providers 77s -- we use for this origin. We are a cascaded subscriber 77s -- for sets from this node. 77s else 77s if exists (select true from public.sl_set, public.sl_subscribe, 77s public.sl_node provider 77s where set_origin = v_row.origin 77s and sub_set = set_id 77s and sub_provider=provider.no_id 77s and provider.no_failed = false 77s and sub_receiver = v_row.receiver 77s and sub_active) 77s then 77s delete from public.sl_listen 77s where li_origin = v_row.origin 77s and li_receiver = v_row.receiver; 77s insert into public.sl_listen (li_origin, li_provider, li_receiver) 77s select distinct set_origin, sub_provider, v_row.receiver 77s from public.sl_set, public.sl_subscribe 77s where set_origin = v_row.origin 77s and sub_set = set_id 77s and sub_receiver = v_row.receiver 77s and sub_active; 77s end if; 77s end if; 77s 77s if v_row.failed then 77s 77s --for every failed node we delete all sl_listen entries 77s --except via providers (listed in sl_subscribe) 77s --or failover candidates (sl_failover_targets) 77s --we do this to prevent a non-failover candidate 77s --that is more ahead of the failover candidate from 77s --sending events to the failover candidate that 77s --are 'too far ahead' 77s 77s --if the failed node is not an origin for any 77s --node then we don't delete all listen paths 77s --for events from it. Instead we leave 77s --the listen network alone. 77s 77s select count(*) into v_cnt from public.sl_subscribe sub, 77s public.sl_set s 77s where s.set_origin=v_row.origin and s.set_id=sub.sub_set; 77s if v_cnt > 0 then 77s delete from public.sl_listen where 77s li_origin=v_row.origin and 77s li_receiver=v_row.receiver 77s and li_provider not in 77s (select sub_provider from 77s public.sl_subscribe, 77s public.sl_set where 77s sub_set=set_id 77s and set_origin=v_row.origin); 77s end if; 77s end if; 77s -- insert into public.sl_listen 77s -- (li_origin,li_provider,li_receiver) 77s -- SELECT v_row.origin, pa_server 77s -- ,v_row.receiver 77s -- FROM public.sl_path where 77s -- pa_client=v_row.receiver 77s -- and (v_row.origin,pa_server,v_row.receiver) not in 77s -- (select li_origin,li_provider,li_receiver 77s -- from public.sl_listen); 77s -- end if; 77s end loop ; 77s 77s return null ; 77s end ; 77s $$ language 'plpgsql'; 77s CREATE FUNCTION 77s comment on function public.RebuildListenEntries() is 77s 'RebuildListenEntries() 77s 77s Invoked by various subscription and path modifying functions, this 77s rewrites the sl_listen entries, adding in all the ones required to 77s allow communications between nodes in the Slony-I cluster.'; 77s COMMENT 77s create or replace function public.generate_sync_event(p_interval interval) 77s returns int4 77s as $$ 77s declare 77s v_node_row record; 77s 77s BEGIN 77s select 1 into v_node_row from public.sl_event 77s where ev_type = 'SYNC' and ev_origin = public.getLocalNodeId('_main') 77s and ev_timestamp > now() - p_interval limit 1; 77s if not found then 77s -- If there has been no SYNC in the last interval, then push one 77s perform public.createEvent('_main', 'SYNC', NULL); 77s return 1; 77s else 77s return 0; 77s end if; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.generate_sync_event(p_interval interval) is 77s 'Generate a sync event if there has not been one in the requested interval, and this is a provider node.'; 77s COMMENT 77s drop function if exists public.updateRelname(int4, int4); 77s DROP FUNCTION 77s create or replace function public.updateRelname () 77s returns int4 77s as $$ 77s declare 77s v_no_id int4; 77s v_set_origin int4; 77s begin 77s -- ---- 77s -- Grab the central configuration lock 77s -- ---- 77s lock table public.sl_config_lock; 77s 77s update public.sl_table set 77s tab_relname = PGC.relname, tab_nspname = PGN.nspname 77s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 77s where public.sl_table.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid and 77s (tab_relname <> PGC.relname or tab_nspname <> PGN.nspname); 77s update public.sl_sequence set 77s seq_relname = PGC.relname, seq_nspname = PGN.nspname 77s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 77s where public.sl_sequence.seq_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid and 77s (seq_relname <> PGC.relname or seq_nspname <> PGN.nspname); 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.updateRelname() is 77s 'updateRelname()'; 77s COMMENT 77s drop function if exists public.updateReloid (int4, int4); 77s DROP FUNCTION 77s create or replace function public.updateReloid (p_set_id int4, p_only_on_node int4) 77s returns bigint 77s as $$ 77s declare 77s v_no_id int4; 77s v_set_origin int4; 77s prec record; 77s begin 77s -- ---- 77s -- Check that we either are the set origin or a current 77s -- subscriber of the set. 77s -- ---- 77s v_no_id := public.getLocalNodeId('_main'); 77s select set_origin into v_set_origin 77s from public.sl_set 77s where set_id = p_set_id 77s for update; 77s if not found then 77s raise exception 'Slony-I: set % not found', p_set_id; 77s end if; 77s if v_set_origin <> v_no_id 77s and not exists (select 1 from public.sl_subscribe 77s where sub_set = p_set_id 77s and sub_receiver = v_no_id) 77s then 77s return 0; 77s end if; 77s 77s -- ---- 77s -- If execution on only one node is requested, check that 77s -- we are that node. 77s -- ---- 77s if p_only_on_node > 0 and p_only_on_node <> v_no_id then 77s return 0; 77s end if; 77s 77s -- Update OIDs for tables to values pulled from non-table objects in pg_class 77s -- This ensures that we won't have collisions when repairing the oids 77s for prec in select tab_id from public.sl_table loop 77s update public.sl_table set tab_reloid = (select oid from pg_class pc where relkind <> 'r' and not exists (select 1 from public.sl_table t2 where t2.tab_reloid = pc.oid) limit 1) 77s where tab_id = prec.tab_id; 77s end loop; 77s 77s for prec in select tab_id, tab_relname, tab_nspname from public.sl_table loop 77s update public.sl_table set 77s tab_reloid = (select PGC.oid 77s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 77s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.tab_relname) 77s and PGC.relnamespace = PGN.oid 77s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.tab_nspname)) 77s where tab_id = prec.tab_id; 77s end loop; 77s 77s for prec in select seq_id from public.sl_sequence loop 77s update public.sl_sequence set seq_reloid = (select oid from pg_class pc where relkind <> 'S' and not exists (select 1 from public.sl_sequence t2 where t2.seq_reloid = pc.oid) limit 1) 77s where seq_id = prec.seq_id; 77s end loop; 77s 77s for prec in select seq_id, seq_relname, seq_nspname from public.sl_sequence loop 77s update public.sl_sequence set 77s seq_reloid = (select PGC.oid 77s from pg_catalog.pg_class PGC, pg_catalog.pg_namespace PGN 77s where public.slon_quote_brute(PGC.relname) = public.slon_quote_brute(prec.seq_relname) 77s and PGC.relnamespace = PGN.oid 77s and public.slon_quote_brute(PGN.nspname) = public.slon_quote_brute(prec.seq_nspname)) 77s where seq_id = prec.seq_id; 77s end loop; 77s 77s return 1; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.updateReloid(p_set_id int4, p_only_on_node int4) is 77s 'updateReloid(set_id, only_on_node) 77s 77s Updates the respective reloids in sl_table and sl_seqeunce based on 77s their respective FQN'; 77s COMMENT 77s create or replace function public.logswitch_start() 77s returns int4 as $$ 77s DECLARE 77s v_current_status int4; 77s BEGIN 77s -- ---- 77s -- Get the current log status. 77s -- ---- 77s select last_value into v_current_status from public.sl_log_status; 77s 77s -- ---- 77s -- status = 0: sl_log_1 active, sl_log_2 clean 77s -- Initiate a switch to sl_log_2. 77s -- ---- 77s if v_current_status = 0 then 77s perform "pg_catalog".setval('public.sl_log_status', 3); 77s perform public.registry_set_timestamp( 77s 'logswitch.laststart', now()); 77s raise notice 'Slony-I: Logswitch to sl_log_2 initiated'; 77s return 2; 77s end if; 77s 77s -- ---- 77s -- status = 1: sl_log_2 active, sl_log_1 clean 77s -- Initiate a switch to sl_log_1. 77s -- ---- 77s if v_current_status = 1 then 77s perform "pg_catalog".setval('public.sl_log_status', 2); 77s perform public.registry_set_timestamp( 77s 'logswitch.laststart', now()); 77s raise notice 'Slony-I: Logswitch to sl_log_1 initiated'; 77s return 1; 77s end if; 77s 77s raise exception 'Previous logswitch still in progress'; 77s END; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.logswitch_start() is 77s 'logswitch_start() 77s 77s Initiate a log table switch if none is in progress'; 77s COMMENT 77s create or replace function public.logswitch_finish() 77s returns int4 as $$ 77s DECLARE 77s v_current_status int4; 77s v_dummy record; 77s v_origin int8; 77s v_seqno int8; 77s v_xmin bigint; 77s v_purgeable boolean; 77s BEGIN 77s -- ---- 77s -- Get the current log status. 77s -- ---- 77s select last_value into v_current_status from public.sl_log_status; 77s 77s -- ---- 77s -- status value 0 or 1 means that there is no log switch in progress 77s -- ---- 77s if v_current_status = 0 or v_current_status = 1 then 77s return 0; 77s end if; 77s 77s -- ---- 77s -- status = 2: sl_log_1 active, cleanup sl_log_2 77s -- ---- 77s if v_current_status = 2 then 77s v_purgeable := 'true'; 77s 77s -- ---- 77s -- Attempt to lock sl_log_2 in order to make sure there are no other transactions 77s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 77s -- blocking writers to sl_log_2 while it is waiting for a lock. It also prevents it 77s -- immediately truncating log data generated inside the transaction which was active 77s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 77s -- transaction is committed. 77s -- ---- 77s begin 77s lock table public.sl_log_2 in access exclusive mode nowait; 77s exception when lock_not_available then 77s raise notice 'Slony-I: could not lock sl_log_2 - sl_log_2 not truncated'; 77s return -1; 77s end; 77s 77s -- ---- 77s -- The cleanup thread calls us after it did the delete and 77s -- vacuum of both log tables. If sl_log_2 is empty now, we 77s -- can truncate it and the log switch is done. 77s -- ---- 77s for v_origin, v_seqno, v_xmin in 77s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 77s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 77s loop 77s if exists (select 1 from public.sl_log_2 where log_origin = v_origin and log_txid >= v_xmin limit 1) then 77s v_purgeable := 'false'; 77s end if; 77s end loop; 77s if not v_purgeable then 77s -- ---- 77s -- Found a row ... log switch is still in progress. 77s -- ---- 77s raise notice 'Slony-I: log switch to sl_log_1 still in progress - sl_log_2 not truncated'; 77s return -1; 77s end if; 77s 77s raise notice 'Slony-I: log switch to sl_log_1 complete - truncate sl_log_2'; 77s truncate public.sl_log_2; 77s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_2' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 77s execute 'alter table public.sl_log_2 set without oids;'; 77s end if; 77s perform "pg_catalog".setval('public.sl_log_status', 0); 77s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 77s perform public.addPartialLogIndices(); 77s 77s return 1; 77s end if; 77s 77s -- ---- 77s -- status = 3: sl_log_2 active, cleanup sl_log_1 77s -- ---- 77s if v_current_status = 3 then 77s v_purgeable := 'true'; 77s 77s -- ---- 77s -- Attempt to lock sl_log_1 in order to make sure there are no other transactions 77s -- currently writing to it. Exit if it is still in use. This prevents TRUNCATE from 77s -- blocking writes to sl_log_1 while it is waiting for a lock. It also prevents it 77s -- immediately truncating log data generated inside the transaction which was active 77s -- when logswitch_finish() was called (and was blocking TRUNCATE) as soon as that 77s -- transaction is committed. 77s -- ---- 77s begin 77s lock table public.sl_log_1 in access exclusive mode nowait; 77s exception when lock_not_available then 77s raise notice 'Slony-I: could not lock sl_log_1 - sl_log_1 not truncated'; 77s return -1; 77s end; 77s 77s -- ---- 77s -- The cleanup thread calls us after it did the delete and 77s -- vacuum of both log tables. If sl_log_2 is empty now, we 77s -- can truncate it and the log switch is done. 77s -- ---- 77s for v_origin, v_seqno, v_xmin in 77s select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from public.sl_event 77s where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from public.sl_event where ev_type = 'SYNC' group by ev_origin) 77s loop 77s if (exists (select 1 from public.sl_log_1 where log_origin = v_origin and log_txid >= v_xmin limit 1)) then 77s v_purgeable := 'false'; 77s end if; 77s end loop; 77s if not v_purgeable then 77s -- ---- 77s -- Found a row ... log switch is still in progress. 77s -- ---- 77s raise notice 'Slony-I: log switch to sl_log_2 still in progress - sl_log_1 not truncated'; 77s return -1; 77s end if; 77s 77s raise notice 'Slony-I: log switch to sl_log_2 complete - truncate sl_log_1'; 77s truncate public.sl_log_1; 77s if exists (select * from "pg_catalog".pg_class c, "pg_catalog".pg_namespace n, "pg_catalog".pg_attribute a where c.relname = 'sl_log_1' and n.oid = c.relnamespace and a.attrelid = c.oid and a.attname = 'oid') then 77s execute 'alter table public.sl_log_1 set without oids;'; 77s end if; 77s perform "pg_catalog".setval('public.sl_log_status', 1); 77s -- Run addPartialLogIndices() to try to add indices to unused sl_log_? table 77s perform public.addPartialLogIndices(); 77s return 2; 77s end if; 77s END; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.logswitch_finish() is 77s 'logswitch_finish() 77s 77s Attempt to finalize a log table switch in progress 77s return values: 77s -1 if switch in progress, but not complete 77s 0 if no switch in progress 77s 1 if performed truncate on sl_log_2 77s 2 if performed truncate on sl_log_1 77s '; 77s COMMENT 77s create or replace function public.addPartialLogIndices () returns integer as $$ 77s DECLARE 77s v_current_status int4; 77s v_log int4; 77s v_dummy record; 77s v_dummy2 record; 77s idef text; 77s v_count int4; 77s v_iname text; 77s v_ilen int4; 77s v_maxlen int4; 77s BEGIN 77s v_count := 0; 77s select last_value into v_current_status from public.sl_log_status; 77s 77s -- If status is 2 or 3 --> in process of cleanup --> unsafe to create indices 77s if v_current_status in (2, 3) then 77s return 0; 77s end if; 77s 77s if v_current_status = 0 then -- Which log should get indices? 77s v_log := 2; 77s else 77s v_log := 1; 77s end if; 77s -- PartInd_test_db_sl_log_2-node-1 77s -- Add missing indices... 77s for v_dummy in select distinct set_origin from public.sl_set loop 77s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' 77s || v_dummy.set_origin::text; 77s -- raise notice 'Consider adding partial index % on sl_log_%', v_iname, v_log; 77s -- raise notice 'schema: [_main] tablename:[sl_log_%]', v_log; 77s select * into v_dummy2 from pg_catalog.pg_indexes where tablename = 'sl_log_' || v_log::text and indexname = v_iname; 77s if not found then 77s -- raise notice 'index was not found - add it!'; 77s v_iname := 'PartInd_main_sl_log_' || v_log::text || '-node-' || v_dummy.set_origin::text; 77s v_ilen := pg_catalog.length(v_iname); 77s v_maxlen := pg_catalog.current_setting('max_identifier_length'::text)::int4; 77s if v_ilen > v_maxlen then 77s raise exception 'Length of proposed index name [%] > max_identifier_length [%] - cluster name probably too long', v_ilen, v_maxlen; 77s end if; 77s 77s idef := 'create index "' || v_iname || 77s '" on public.sl_log_' || v_log::text || ' USING btree(log_txid) where (log_origin = ' || v_dummy.set_origin::text || ');'; 77s execute idef; 77s v_count := v_count + 1; 77s else 77s -- raise notice 'Index % already present - skipping', v_iname; 77s end if; 77s end loop; 77s 77s -- Remove unneeded indices... 77s for v_dummy in select indexname from pg_catalog.pg_indexes i where i.tablename = 'sl_log_' || v_log::text and 77s i.indexname like ('PartInd_main_sl_log_' || v_log::text || '-node-%') and 77s not exists (select 1 from public.sl_set where 77s i.indexname = 'PartInd_main_sl_log_' || v_log::text || '-node-' || set_origin::text) 77s loop 77s -- raise notice 'Dropping obsolete index %d', v_dummy.indexname; 77s idef := 'drop index public."' || v_dummy.indexname || '";'; 77s execute idef; 77s v_count := v_count - 1; 77s end loop; 77s return v_count; 77s END 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.addPartialLogIndices () is 77s 'Add partial indexes, if possible, to the unused sl_log_? table for 77s all origin nodes, and drop any that are no longer needed. 77s 77s This function presently gets run any time set origins are manipulated 77s (FAILOVER, STORE SET, MOVE SET, DROP SET), as well as each time the 77s system switches between sl_log_1 and sl_log_2.'; 77s COMMENT 77s create or replace function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 77s returns bool as $$ 77s BEGIN 77s return exists ( 77s select 1 from "information_schema".columns 77s where table_schema = p_namespace 77s and table_name = p_table 77s and column_name = p_field 77s ); 77s END;$$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.check_table_field_exists (p_namespace text, p_table text, p_field text) 77s is 'Check if a table has a specific attribute'; 77s COMMENT 77s create or replace function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 77s returns bool as $$ 77s DECLARE 77s v_row record; 77s v_query text; 77s BEGIN 77s if not public.check_table_field_exists(p_namespace, p_table, p_field) then 77s raise notice 'Upgrade table %.% - add field %', p_namespace, p_table, p_field; 77s v_query := 'alter table ' || p_namespace || '.' || p_table || ' add column '; 77s v_query := v_query || p_field || ' ' || p_type || ';'; 77s execute v_query; 77s return 't'; 77s else 77s return 'f'; 77s end if; 77s END;$$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.add_missing_table_field (p_namespace text, p_table text, p_field text, p_type text) 77s is 'Add a column of a given type to a table if it is missing'; 77s COMMENT 77s create or replace function public.upgradeSchema(p_old text) 77s returns text as $$ 77s declare 77s v_tab_row record; 77s v_query text; 77s v_keepstatus text; 77s begin 77s -- If old version is pre-2.0, then we require a special upgrade process 77s if p_old like '1.%' then 77s raise exception 'Upgrading to Slony-I 2.x requires running slony_upgrade_20'; 77s end if; 77s 77s perform public.upgradeSchemaAddTruncateTriggers(); 77s 77s -- Change all Slony-I-defined columns that are "timestamp without time zone" to "timestamp *WITH* time zone" 77s if exists (select 1 from information_schema.columns c 77s where table_schema = '_main' and data_type = 'timestamp without time zone' 77s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 77s and (c.table_name, c.column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp'))) 77s then 77s 77s -- Preserve sl_status 77s select pg_get_viewdef('public.sl_status') into v_keepstatus; 77s execute 'drop view sl_status'; 77s for v_tab_row in select table_schema, table_name, column_name from information_schema.columns c 77s where table_schema = '_main' and data_type = 'timestamp without time zone' 77s and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') 77s and (table_name, column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp')) 77s loop 77s raise notice 'Changing Slony-I column [%.%] to timestamp WITH time zone', v_tab_row.table_name, v_tab_row.column_name; 77s v_query := 'alter table ' || public.slon_quote_brute(v_tab_row.table_schema) || 77s '.' || v_tab_row.table_name || ' alter column ' || v_tab_row.column_name || 77s ' type timestamp with time zone;'; 77s execute v_query; 77s end loop; 77s -- restore sl_status 77s execute 'create view sl_status as ' || v_keepstatus; 77s end if; 77s 77s if not exists (select 1 from information_schema.tables where table_schema = '_main' and table_name = 'sl_components') then 77s v_query := ' 77s create table public.sl_components ( 77s co_actor text not null primary key, 77s co_pid integer not null, 77s co_node integer not null, 77s co_connection_pid integer not null, 77s co_activity text, 77s co_starttime timestamptz not null, 77s co_event bigint, 77s co_eventtype text 77s ) without oids; 77s '; 77s execute v_query; 77s end if; 77s 77s 77s 77s 77s 77s if not exists (select 1 from information_schema.tables t where table_schema = '_main' and table_name = 'sl_event_lock') then 77s v_query := 'create table public.sl_event_lock (dummy integer);'; 77s execute v_query; 77s end if; 77s 77s if not exists (select 1 from information_schema.tables t 77s where table_schema = '_main' 77s and table_name = 'sl_apply_stats') then 77s v_query := ' 77s create table public.sl_apply_stats ( 77s as_origin int4, 77s as_num_insert int8, 77s as_num_update int8, 77s as_num_delete int8, 77s as_num_truncate int8, 77s as_num_script int8, 77s as_num_total int8, 77s as_duration interval, 77s as_apply_first timestamptz, 77s as_apply_last timestamptz, 77s as_cache_prepare int8, 77s as_cache_hit int8, 77s as_cache_evict int8, 77s as_cache_prepare_max int8 77s ) WITHOUT OIDS;'; 77s execute v_query; 77s end if; 77s 77s -- 77s -- On the upgrade to 2.2, we change the layout of sl_log_N by 77s -- adding columns log_tablenspname, log_tablerelname, and 77s -- log_cmdupdncols as well as changing log_cmddata into 77s -- log_cmdargs, which is a text array. 77s -- 77s if not public.check_table_field_exists('_main', 'sl_log_1', 'log_cmdargs') then 77s -- 77s -- Check that the cluster is completely caught up 77s -- 77s if public.check_unconfirmed_log() then 77s raise EXCEPTION 'cannot upgrade to new sl_log_N format due to existing unreplicated data'; 77s end if; 77s 77s -- 77s -- Drop tables sl_log_1 and sl_log_2 77s -- 77s drop table public.sl_log_1; 77s drop table public.sl_log_2; 77s 77s -- 77s -- Create the new sl_log_1 77s -- 77s create table public.sl_log_1 ( 77s log_origin int4, 77s log_txid bigint, 77s log_tableid int4, 77s log_actionseq int8, 77s log_tablenspname text, 77s log_tablerelname text, 77s log_cmdtype "char", 77s log_cmdupdncols int4, 77s log_cmdargs text[] 77s ) without oids; 77s create index sl_log_1_idx1 on public.sl_log_1 77s (log_origin, log_txid, log_actionseq); 77s 77s comment on table public.sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; 77s comment on column public.sl_log_1.log_origin is 'Origin node from which the change came'; 77s comment on column public.sl_log_1.log_txid is 'Transaction ID on the origin node'; 77s comment on column public.sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 77s comment on column public.sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 77s comment on column public.sl_log_1.log_tablenspname is 'The schema name of the table affected'; 77s comment on column public.sl_log_1.log_tablerelname is 'The table name of the table affected'; 77s comment on column public.sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 77s comment on column public.sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 77s comment on column public.sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; 77s 77s -- 77s -- Create the new sl_log_2 77s -- 77s create table public.sl_log_2 ( 77s log_origin int4, 77s log_txid bigint, 77s log_tableid int4, 77s log_actionseq int8, 77s log_tablenspname text, 77s log_tablerelname text, 77s log_cmdtype "char", 77s log_cmdupdncols int4, 77s log_cmdargs text[] 77s ) without oids; 77s create index sl_log_2_idx1 on public.sl_log_2 77s (log_origin, log_txid, log_actionseq); 77s 77s comment on table public.sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; 77s comment on column public.sl_log_2.log_origin is 'Origin node from which the change came'; 77s comment on column public.sl_log_2.log_txid is 'Transaction ID on the origin node'; 77s comment on column public.sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; 77s comment on column public.sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 77s comment on column public.sl_log_2.log_tablenspname is 'The schema name of the table affected'; 77s comment on column public.sl_log_2.log_tablerelname is 'The table name of the table affected'; 77s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; 77s comment on column public.sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; 77s comment on column public.sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; 77s 77s create table public.sl_log_script ( 77s log_origin int4, 77s log_txid bigint, 77s log_actionseq int8, 77s log_cmdtype "char", 77s log_cmdargs text[] 77s ) WITHOUT OIDS; 77s create index sl_log_script_idx1 on public.sl_log_script 77s (log_origin, log_txid, log_actionseq); 77s 77s comment on table public.sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; 77s comment on column public.sl_log_script.log_origin is 'Origin name from which the change came'; 77s comment on column public.sl_log_script.log_txid is 'Transaction ID on the origin node'; 77s comment on column public.sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; 77s comment on column public.sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; 77s comment on column public.sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; 77s 77s -- 77s -- Put the log apply triggers back onto sl_log_1/2 77s -- 77s create trigger apply_trigger 77s before INSERT on public.sl_log_1 77s for each row execute procedure public.logApply('_main'); 77s alter table public.sl_log_1 77s enable replica trigger apply_trigger; 77s create trigger apply_trigger 77s before INSERT on public.sl_log_2 77s for each row execute procedure public.logApply('_main'); 77s alter table public.sl_log_2 77s enable replica trigger apply_trigger; 77s end if; 77s if not exists (select 1 from information_schema.routines where routine_schema = '_main' and routine_name = 'string_agg') then 77s CREATE AGGREGATE public.string_agg(text) ( 77s SFUNC=public.agg_text_sum, 77s STYPE=text, 77s INITCOND='' 77s ); 77s end if; 77s if not exists (select 1 from information_schema.views where table_schema='_main' and table_name='sl_failover_targets') then 77s create view public.sl_failover_targets as 77s select set_id, 77s set_origin as set_origin, 77s sub1.sub_receiver as backup_id 77s 77s FROM 77s public.sl_subscribe sub1 77s ,public.sl_set set1 77s where 77s sub1.sub_set=set_id 77s and sub1.sub_forward=true 77s --exclude candidates where the set_origin 77s --has a path a node but the failover 77s --candidate has no path to that node 77s and sub1.sub_receiver not in 77s (select p1.pa_client from 77s public.sl_path p1 77s left outer join public.sl_path p2 on 77s (p2.pa_client=p1.pa_client 77s and p2.pa_server=sub1.sub_receiver) 77s where p2.pa_client is null 77s and p1.pa_server=set_origin 77s and p1.pa_client<>sub1.sub_receiver 77s ) 77s and sub1.sub_provider=set_origin 77s --exclude any subscribers that are not 77s --direct subscribers of all sets on the 77s --origin 77s and sub1.sub_receiver not in 77s (select direct_recv.sub_receiver 77s from 77s 77s (--all direct receivers of the first set 77s select subs2.sub_receiver 77s from public.sl_subscribe subs2 77s where subs2.sub_provider=set1.set_origin 77s and subs2.sub_set=set1.set_id) as 77s direct_recv 77s inner join 77s (--all other sets from the origin 77s select set_id from public.sl_set set2 77s where set2.set_origin=set1.set_origin 77s and set2.set_id<>sub1.sub_set) 77s as othersets on(true) 77s left outer join public.sl_subscribe subs3 77s on(subs3.sub_set=othersets.set_id 77s and subs3.sub_forward=true 77s and subs3.sub_provider=set1.set_origin 77s and direct_recv.sub_receiver=subs3.sub_receiver) 77s where subs3.sub_receiver is null 77s ); 77s end if; 77s 77s if not public.check_table_field_exists('_main', 'sl_node', 'no_failed') then 77s alter table public.sl_node add column no_failed bool; 77s update public.sl_node set no_failed=false; 77s end if; 77s return p_old; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s create or replace function public.check_unconfirmed_log () 77s returns bool as $$ 77s declare 77s v_rc bool = false; 77s v_error bool = false; 77s v_origin integer; 77s v_allconf bigint; 77s v_allsnap txid_snapshot; 77s v_count bigint; 77s begin 77s -- 77s -- Loop over all nodes that are the origin of at least one set 77s -- 77s for v_origin in select distinct set_origin as no_id 77s from public.sl_set loop 77s -- 77s -- Per origin determine which is the highest event seqno 77s -- that is confirmed by all subscribers to any of the 77s -- origins sets. 77s -- 77s select into v_allconf min(max_seqno) from ( 77s select con_received, max(con_seqno) as max_seqno 77s from public.sl_confirm 77s where con_origin = v_origin 77s and con_received in ( 77s select distinct sub_receiver 77s from public.sl_set as SET, 77s public.sl_subscribe as SUB 77s where SET.set_id = SUB.sub_set 77s and SET.set_origin = v_origin 77s ) 77s group by con_received 77s ) as maxconfirmed; 77s if not found then 77s raise NOTICE 'check_unconfirmed_log(): cannot determine highest ev_seqno for node % confirmed by all subscribers', v_origin; 77s v_error = true; 77s continue; 77s end if; 77s 77s -- 77s -- Get the txid snapshot that corresponds with that event 77s -- 77s select into v_allsnap ev_snapshot 77s from public.sl_event 77s where ev_origin = v_origin 77s and ev_seqno = v_allconf; 77s if not found then 77s raise NOTICE 'check_unconfirmed_log(): cannot find event %,% in sl_event', v_origin, v_allconf; 77s v_error = true; 77s continue; 77s end if; 77s 77s -- 77s -- Count the number of log rows that appeard after that event. 77s -- 77s select into v_count count(*) from ( 77s select 1 from public.sl_log_1 77s where log_origin = v_origin 77s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 77s union all 77s select 1 from public.sl_log_1 77s where log_origin = v_origin 77s and log_txid in ( 77s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 77s ) 77s union all 77s select 1 from public.sl_log_2 77s where log_origin = v_origin 77s and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap) 77s union all 77s select 1 from public.sl_log_2 77s where log_origin = v_origin 77s and log_txid in ( 77s select * from "pg_catalog".txid_snapshot_xip(v_allsnap) 77s ) 77s ) as cnt; 77s 77s if v_count > 0 then 77s raise NOTICE 'check_unconfirmed_log(): origin % has % log rows that have not propagated to all subscribers yet', v_origin, v_count; 77s v_rc = true; 77s end if; 77s end loop; 77s 77s if v_error then 77s raise EXCEPTION 'check_unconfirmed_log(): aborting due to previous inconsistency'; 77s end if; 77s 77s return v_rc; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s set search_path to public 77s ; 77s SET 77s comment on function public.upgradeSchema(p_old text) is 77s 'Called during "update functions" by slonik to perform schema changes'; 77s COMMENT 77s create or replace view public.sl_status as select 77s E.ev_origin as st_origin, 77s C.con_received as st_received, 77s E.ev_seqno as st_last_event, 77s E.ev_timestamp as st_last_event_ts, 77s C.con_seqno as st_last_received, 77s C.con_timestamp as st_last_received_ts, 77s CE.ev_timestamp as st_last_received_event_ts, 77s E.ev_seqno - C.con_seqno as st_lag_num_events, 77s current_timestamp - CE.ev_timestamp as st_lag_time 77s from public.sl_event E, public.sl_confirm C, 77s public.sl_event CE 77s where E.ev_origin = C.con_origin 77s and CE.ev_origin = E.ev_origin 77s and CE.ev_seqno = C.con_seqno 77s and (E.ev_origin, E.ev_seqno) in 77s (select ev_origin, max(ev_seqno) 77s from public.sl_event 77s where ev_origin = public.getLocalNodeId('_main') 77s group by 1 77s ) 77s and (C.con_origin, C.con_received, C.con_seqno) in 77s (select con_origin, con_received, max(con_seqno) 77s from public.sl_confirm 77s where con_origin = public.getLocalNodeId('_main') 77s group by 1, 2 77s ); 77s CREATE VIEW 77s comment on view public.sl_status is 'View showing how far behind remote nodes are.'; 77s COMMENT 77s create or replace function public.copyFields(p_tab_id integer) 77s returns text 77s as $$ 77s declare 77s result text; 77s prefix text; 77s prec record; 77s begin 77s result := ''; 77s prefix := '('; -- Initially, prefix is the opening paren 77s 77s for prec in select public.slon_quote_input(a.attname) as column from public.sl_table t, pg_catalog.pg_attribute a where t.tab_id = p_tab_id and t.tab_reloid = a.attrelid and a.attnum > 0 and a.attisdropped = false order by attnum 77s loop 77s result := result || prefix || prec.column; 77s prefix := ','; -- Subsequently, prepend columns with commas 77s end loop; 77s result := result || ')'; 77s return result; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.copyFields(p_tab_id integer) is 77s 'Return a string consisting of what should be appended to a COPY statement 77s to specify fields for the passed-in tab_id. 77s 77s In PG versions > 7.3, this looks like (field1,field2,...fieldn)'; 77s COMMENT 77s create or replace function public.prepareTableForCopy(p_tab_id int4) 77s returns int4 77s as $$ 77s declare 77s v_tab_oid oid; 77s v_tab_fqname text; 77s begin 77s -- ---- 77s -- Get the OID and fully qualified name for the table 77s -- --- 77s select PGC.oid, 77s public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) as tab_fqname 77s into v_tab_oid, v_tab_fqname 77s from public.sl_table T, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 77s where T.tab_id = p_tab_id 77s and T.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid; 77s if not found then 77s raise exception 'Table with ID % not found in sl_table', p_tab_id; 77s end if; 77s 77s -- ---- 77s -- Try using truncate to empty the table and fallback to 77s -- delete on error. 77s -- ---- 77s perform public.TruncateOnlyTable(v_tab_fqname); 77s raise notice 'truncate of % succeeded', v_tab_fqname; 77s 77s -- suppress index activity 77s perform public.disable_indexes_on_table(v_tab_oid); 77s 77s return 1; 77s exception when others then 77s raise notice 'truncate of % failed - doing delete', v_tab_fqname; 77s perform public.disable_indexes_on_table(v_tab_oid); 77s execute 'delete from only ' || public.slon_quote_input(v_tab_fqname); 77s return 0; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.prepareTableForCopy(p_tab_id int4) is 77s 'Delete all data and suppress index maintenance'; 77s COMMENT 77s create or replace function public.finishTableAfterCopy(p_tab_id int4) 77s returns int4 77s as $$ 77s declare 77s v_tab_oid oid; 77s v_tab_fqname text; 77s begin 77s -- ---- 77s -- Get the tables OID and fully qualified name 77s -- --- 77s select PGC.oid, 77s public.slon_quote_brute(PGN.nspname) || '.' || 77s public.slon_quote_brute(PGC.relname) as tab_fqname 77s into v_tab_oid, v_tab_fqname 77s from public.sl_table T, 77s "pg_catalog".pg_class PGC, "pg_catalog".pg_namespace PGN 77s where T.tab_id = p_tab_id 77s and T.tab_reloid = PGC.oid 77s and PGC.relnamespace = PGN.oid; 77s if not found then 77s raise exception 'Table with ID % not found in sl_table', p_tab_id; 77s end if; 77s 77s -- ---- 77s -- Reenable indexes and reindex the table. 77s -- ---- 77s perform public.enable_indexes_on_table(v_tab_oid); 77s execute 'reindex table ' || public.slon_quote_input(v_tab_fqname); 77s 77s return 1; 77s end; 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.finishTableAfterCopy(p_tab_id int4) is 77s 'Reenable index maintenance and reindex the table'; 77s COMMENT 77s create or replace function public.setup_vactables_type () returns integer as $$ 77s begin 77s if not exists (select 1 from pg_catalog.pg_type t, pg_catalog.pg_namespace n 77s where n.nspname = '_main' and t.typnamespace = n.oid and 77s t.typname = 'vactables') then 77s execute 'create type public.vactables as (nspname name, relname name);'; 77s end if; 77s return 1; 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.setup_vactables_type () is 77s 'Function to be run as part of loading slony1_funcs.sql that creates the vactables type if it is missing'; 77s COMMENT 77s select public.setup_vactables_type(); 77s setup_vactables_type 77s ---------------------- 77s 1 77s (1 row) 77s 77s drop function public.setup_vactables_type (); 77s DROP FUNCTION 77s create or replace function public.TablesToVacuum () returns setof public.vactables as $$ 77s declare 77s prec public.vactables%rowtype; 77s begin 77s prec.nspname := '_main'; 77s prec.relname := 'sl_event'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := '_main'; 77s prec.relname := 'sl_confirm'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := '_main'; 77s prec.relname := 'sl_setsync'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := '_main'; 77s prec.relname := 'sl_seqlog'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := '_main'; 77s prec.relname := 'sl_archive_counter'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := '_main'; 77s prec.relname := 'sl_components'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := '_main'; 77s prec.relname := 'sl_log_script'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := 'pg_catalog'; 77s prec.relname := 'pg_listener'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s prec.nspname := 'pg_catalog'; 77s prec.relname := 'pg_statistic'; 77s if public.ShouldSlonyVacuumTable(prec.nspname, prec.relname) then 77s return next prec; 77s end if; 77s 77s return; 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.TablesToVacuum () is 77s 'Return a list of tables that require frequent vacuuming. The 77s function is used so that the list is not hardcoded into C code.'; 77s COMMENT 77s create or replace function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 77s declare 77s 77s prec record; 77s v_origin int4; 77s v_isorigin boolean; 77s v_fqname text; 77s v_query text; 77s v_rows integer; 77s v_idxname text; 77s 77s begin 77s -- Need to validate that the set exists; the set will tell us if this is the origin 77s select set_origin into v_origin from public.sl_set where set_id = p_set_id; 77s if not found then 77s raise exception 'add_empty_table_to_replication: set % not found!', p_set_id; 77s end if; 77s 77s -- Need to be aware of whether or not this node is origin for the set 77s v_isorigin := ( v_origin = public.getLocalNodeId('_main') ); 77s 77s v_fqname := '"' || p_nspname || '"."' || p_tabname || '"'; 77s -- Take out a lock on the table 77s v_query := 'lock ' || v_fqname || ';'; 77s execute v_query; 77s 77s if v_isorigin then 77s -- On the origin, verify that the table is empty, failing if it has any tuples 77s v_query := 'select 1 as tuple from ' || v_fqname || ' limit 1;'; 77s execute v_query into prec; 77s GET DIAGNOSTICS v_rows = ROW_COUNT; 77s if v_rows = 0 then 77s raise notice 'add_empty_table_to_replication: table % empty on origin - OK', v_fqname; 77s else 77s raise exception 'add_empty_table_to_replication: table % contained tuples on origin node %', v_fqname, v_origin; 77s end if; 77s else 77s -- On other nodes, TRUNCATE the table 77s v_query := 'truncate ' || v_fqname || ';'; 77s execute v_query; 77s end if; 77s -- If p_idxname is NULL, then look up the PK index, and RAISE EXCEPTION if one does not exist 77s if p_idxname is NULL then 77s select c2.relname into prec from pg_catalog.pg_index i, pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_namespace n where i.indrelid = c1.oid and i.indexrelid = c2.oid and c1.relname = p_tabname and i.indisprimary and n.nspname = p_nspname and n.oid = c1.relnamespace; 77s if not found then 77s raise exception 'add_empty_table_to_replication: table % has no primary key and no candidate specified!', v_fqname; 77s else 77s v_idxname := prec.relname; 77s end if; 77s else 77s v_idxname := p_idxname; 77s end if; 77s return public.setAddTable_int(p_set_id, p_tab_id, v_fqname, v_idxname, p_comment); 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.add_empty_table_to_replication(p_set_id int4, p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 77s 'Verify that a table is empty, and add it to replication. 77s tab_idxname is optional - if NULL, then we use the primary key. 77s 77s Note that this function is to be run within an EXECUTE SCRIPT script, 77s so it runs at the right place in the transaction stream on all 77s nodes.'; 77s COMMENT 77s create or replace function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) returns bigint as $$ 77s declare 77s prec record; 77s prec2 record; 77s v_set_id int4; 77s 77s begin 77s -- Look up the parent table; fail if it does not exist 77s select c1.oid into prec from pg_catalog.pg_class c1, pg_catalog.pg_class c2, pg_catalog.pg_inherits i, pg_catalog.pg_namespace n where c1.oid = i.inhparent and c2.oid = i.inhrelid and n.oid = c2.relnamespace and n.nspname = p_nspname and c2.relname = p_tabname; 77s if not found then 77s raise exception 'replicate_partition: No parent table found for %.%!', p_nspname, p_tabname; 77s end if; 77s 77s -- The parent table tells us what replication set to use 77s select tab_set into prec2 from public.sl_table where tab_reloid = prec.oid; 77s if not found then 77s raise exception 'replicate_partition: Parent table % for new partition %.% is not replicated!', prec.oid, p_nspname, p_tabname; 77s end if; 77s 77s v_set_id := prec2.tab_set; 77s 77s -- Now, we have all the parameters necessary to run add_empty_table_to_replication... 77s return public.add_empty_table_to_replication(v_set_id, p_tab_id, p_nspname, p_tabname, p_idxname, p_comment); 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.replicate_partition(p_tab_id int4, p_nspname text, p_tabname text, p_idxname text, p_comment text) is 77s 'Add a partition table to replication. 77s tab_idxname is optional - if NULL, then we use the primary key. 77s This function looks up replication configuration via the parent table. 77s 77s Note that this function is to be run within an EXECUTE SCRIPT script, 77s so it runs at the right place in the transaction stream on all 77s nodes.'; 77s COMMENT 77s create or replace function public.disable_indexes_on_table (i_oid oid) 77s returns integer as $$ 77s begin 77s -- Setting pg_class.relhasindex to false will cause copy not to 77s -- maintain any indexes. At the end of the copy we will reenable 77s -- them and reindex the table. This bulk creating of indexes is 77s -- faster. 77s 77s update pg_catalog.pg_class set relhasindex ='f' where oid = i_oid; 77s return 1; 77s end $$ 77s language plpgsql; 77s CREATE FUNCTION 77s comment on function public.disable_indexes_on_table(i_oid oid) is 77s 'disable indexes on the specified table. 77s Used during subscription process to suppress indexes, which allows 77s COPY to go much faster. 77s 77s This may be set as a SECURITY DEFINER in order to eliminate the need 77s for superuser access by Slony-I. 77s '; 77s COMMENT 77s create or replace function public.enable_indexes_on_table (i_oid oid) 77s returns integer as $$ 77s begin 77s update pg_catalog.pg_class set relhasindex ='t' where oid = i_oid; 77s return 1; 77s end $$ 77s language plpgsql 77s security definer; 77s CREATE FUNCTION 77s comment on function public.enable_indexes_on_table(i_oid oid) is 77s 're-enable indexes on the specified table. 77s 77s This may be set as a SECURITY DEFINER in order to eliminate the need 77s for superuser access by Slony-I. 77s '; 77s COMMENT 77s drop function if exists public.reshapeSubscription(int4,int4,int4); 77s DROP FUNCTION 77s create or replace function public.reshapeSubscription (p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) returns int4 as $$ 77s begin 77s update public.sl_subscribe 77s set sub_provider=p_sub_provider 77s from public.sl_set 77s WHERE sub_set=sl_set.set_id 77s and sl_set.set_origin=p_sub_origin and sub_receiver=p_sub_receiver; 77s if found then 77s perform public.RebuildListenEntries(); 77s notify "_main_Restart"; 77s end if; 77s return 0; 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.reshapeSubscription(p_sub_origin int4, p_sub_provider int4, p_sub_receiver int4) is 77s 'Run on a receiver/subscriber node when the provider for that 77s subscription is being changed. Slonik will invoke this method 77s before the SUBSCRIBE_SET event propogates to the receiver 77s so listen paths can be updated.'; 77s COMMENT 77s create or replace function public.slon_node_health_check() returns boolean as $$ 77s declare 77s prec record; 77s all_ok boolean; 77s begin 77s all_ok := 't'::boolean; 77s -- validate that all tables in sl_table have: 77s -- sl_table agreeing with pg_class 77s for prec in select tab_id, tab_relname, tab_nspname from 77s public.sl_table t where not exists (select 1 from pg_catalog.pg_class c, pg_catalog.pg_namespace n 77s where c.oid = t.tab_reloid and c.relname = t.tab_relname and c.relnamespace = n.oid and n.nspname = t.tab_nspname) loop 77s all_ok := 'f'::boolean; 77s raise warning 'table [id,nsp,name]=[%,%,%] - sl_table does not match pg_class/pg_namespace', prec.tab_id, prec.tab_relname, prec.tab_nspname; 77s end loop; 77s if not all_ok then 77s raise warning 'Mismatch found between sl_table and pg_class. Slonik command REPAIR CONFIG may be useful to rectify this.'; 77s end if; 77s return all_ok; 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.slon_node_health_check() is 'called when slon starts up to validate that there are not problems with node configuration. Returns t if all is OK, f if there is a problem.'; 77s COMMENT 77s create or replace function public.log_truncate () returns trigger as 77s $$ 77s declare 77s r_role text; 77s c_nspname text; 77s c_relname text; 77s c_log integer; 77s c_node integer; 77s c_tabid integer; 77s begin 77s -- Ignore this call if session_replication_role = 'local' 77s select into r_role setting 77s from pg_catalog.pg_settings where name = 'session_replication_role'; 77s if r_role = 'local' then 77s return NULL; 77s end if; 77s 77s c_tabid := tg_argv[0]; 77s c_node := public.getLocalNodeId('_main'); 77s select tab_nspname, tab_relname into c_nspname, c_relname 77s from public.sl_table where tab_id = c_tabid; 77s select last_value into c_log from public.sl_log_status; 77s if c_log in (0, 2) then 77s insert into public.sl_log_1 ( 77s log_origin, log_txid, log_tableid, 77s log_actionseq, log_tablenspname, 77s log_tablerelname, log_cmdtype, 77s log_cmdupdncols, log_cmdargs 77s ) values ( 77s c_node, pg_catalog.txid_current(), c_tabid, 77s nextval('public.sl_action_seq'), c_nspname, 77s c_relname, 'T', 0, '{}'::text[]); 77s else -- (1, 3) 77s insert into public.sl_log_2 ( 77s log_origin, log_txid, log_tableid, 77s log_actionseq, log_tablenspname, 77s log_tablerelname, log_cmdtype, 77s log_cmdupdncols, log_cmdargs 77s ) values ( 77s c_node, pg_catalog.txid_current(), c_tabid, 77s nextval('public.sl_action_seq'), c_nspname, 77s c_relname, 'T', 0, '{}'::text[]); 77s end if; 77s return NULL; 77s end 77s $$ language plpgsql 77s security definer; 77s CREATE FUNCTION 77s comment on function public.log_truncate () 77s is 'trigger function run when a replicated table receives a TRUNCATE request'; 77s COMMENT 77s create or replace function public.deny_truncate () returns trigger as 77s $$ 77s declare 77s r_role text; 77s begin 77s -- Ignore this call if session_replication_role = 'local' 77s select into r_role setting 77s from pg_catalog.pg_settings where name = 'session_replication_role'; 77s if r_role = 'local' then 77s return NULL; 77s end if; 77s 77s raise exception 'truncation of replicated table forbidden on subscriber node'; 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.deny_truncate () 77s is 'trigger function run when a replicated table receives a TRUNCATE request'; 77s COMMENT 77s create or replace function public.store_application_name (i_name text) returns text as $$ 77s declare 77s p_command text; 77s begin 77s if exists (select 1 from pg_catalog.pg_settings where name = 'application_name') then 77s p_command := 'set application_name to '''|| i_name || ''';'; 77s execute p_command; 77s return i_name; 77s end if; 77s return NULL::text; 77s end $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.store_application_name (i_name text) is 77s 'Set application_name GUC, if possible. Returns NULL if it fails to work.'; 77s COMMENT 77s create or replace function public.is_node_reachable(origin_node_id integer, 77s receiver_node_id integer) returns boolean as $$ 77s declare 77s listen_row record; 77s reachable boolean; 77s begin 77s reachable:=false; 77s select * into listen_row from public.sl_listen where 77s li_origin=origin_node_id and li_receiver=receiver_node_id; 77s if found then 77s reachable:=true; 77s end if; 77s return reachable; 77s end $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.is_node_reachable(origin_node_id integer, receiver_node_id integer) 77s is 'Is the receiver node reachable from the origin, via any of the listen paths?'; 77s COMMENT 77s create or replace function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) returns integer as $$ 77s begin 77s -- Trim out old state for this component 77s if not exists (select 1 from public.sl_components where co_actor = i_actor) then 77s insert into public.sl_components 77s (co_actor, co_pid, co_node, co_connection_pid, co_activity, co_starttime, co_event, co_eventtype) 77s values 77s (i_actor, i_pid, i_node, i_conn_pid, i_activity, i_starttime, i_event, i_eventtype); 77s else 77s update public.sl_components 77s set 77s co_connection_pid = i_conn_pid, co_activity = i_activity, co_starttime = i_starttime, co_event = i_event, 77s co_eventtype = i_eventtype 77s where co_actor = i_actor 77s and co_starttime < i_starttime; 77s end if; 77s return 1; 77s end $$ 77s language plpgsql; 77s CREATE FUNCTION 77s comment on function public.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) is 77s 'Store state of a Slony component. Useful for monitoring'; 77s COMMENT 77s create or replace function public.recreate_log_trigger(p_fq_table_name text, 77s p_tab_id oid, p_tab_attkind text) returns integer as $$ 77s begin 77s execute 'drop trigger "_main_logtrigger" on ' || 77s p_fq_table_name ; 77s -- ---- 77s execute 'create trigger "_main_logtrigger"' || 77s ' after insert or update or delete on ' || 77s p_fq_table_name 77s || ' for each row execute procedure public.logTrigger (' || 77s pg_catalog.quote_literal('_main') || ',' || 77s pg_catalog.quote_literal(p_tab_id::text) || ',' || 77s pg_catalog.quote_literal(p_tab_attkind) || ');'; 77s return 0; 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s comment on function public.recreate_log_trigger(p_fq_table_name text, 77s p_tab_id oid, p_tab_attkind text) is 77s 'A function that drops and recreates the log trigger on the specified table. 77s It is intended to be used after the primary_key/unique index has changed.'; 77s COMMENT 77s create or replace function public.repair_log_triggers(only_locked boolean) 77s returns integer as $$ 77s declare 77s retval integer; 77s table_row record; 77s begin 77s retval=0; 77s for table_row in 77s select tab_nspname,tab_relname, 77s tab_idxname, tab_id, mode, 77s public.determineAttKindUnique(tab_nspname|| 77s '.'||tab_relname,tab_idxname) as attkind 77s from 77s public.sl_table 77s left join 77s pg_locks on (relation=tab_reloid and pid=pg_backend_pid() 77s and mode='AccessExclusiveLock') 77s ,pg_trigger 77s where tab_reloid=tgrelid and 77s public.determineAttKindUnique(tab_nspname||'.' 77s ||tab_relname,tab_idxname) 77s !=(public.decode_tgargs(tgargs))[2] 77s and tgname = '_main' 77s || '_logtrigger' 77s LOOP 77s if (only_locked=false) or table_row.mode='AccessExclusiveLock' then 77s perform public.recreate_log_trigger 77s (table_row.tab_nspname||'.'||table_row.tab_relname, 77s table_row.tab_id,table_row.attkind); 77s retval=retval+1; 77s else 77s raise notice '%.% has an invalid configuration on the log trigger. This was not corrected because only_lock is true and the table is not locked.', 77s table_row.tab_nspname,table_row.tab_relname; 77s 77s end if; 77s end loop; 77s return retval; 77s end 77s $$ 77s language plpgsql; 77s CREATE FUNCTION 77s comment on function public.repair_log_triggers(only_locked boolean) 77s is ' 77s repair the log triggers as required. If only_locked is true then only 77s tables that are already exclusively locked by the current transaction are 77s repaired. Otherwise all replicated tables with outdated trigger arguments 77s are recreated.'; 77s COMMENT 77s create or replace function public.unsubscribe_abandoned_sets(p_failed_node int4) returns bigint 77s as $$ 77s declare 77s v_row record; 77s v_seq_id bigint; 77s v_local_node int4; 77s begin 77s 77s select public.getLocalNodeId('_main') into 77s v_local_node; 77s 77s if found then 77s --abandon all subscriptions from this origin. 77s for v_row in select sub_set,sub_receiver from 77s public.sl_subscribe, public.sl_set 77s where sub_set=set_id and set_origin=p_failed_node 77s and sub_receiver=v_local_node 77s loop 77s raise notice 'Slony-I: failover_abandon_set() is abandoning subscription to set % on node % because it is too far ahead', v_row.sub_set, 77s v_local_node; 77s --If this node is a provider for the set 77s --then the receiver needs to be unsubscribed. 77s -- 77s select public.unsubscribeSet(v_row.sub_set, 77s v_local_node,true) 77s into v_seq_id; 77s end loop; 77s end if; 77s 77s return v_seq_id; 77s end 77s $$ language plpgsql; 77s CREATE FUNCTION 77s CREATE OR replace function public.agg_text_sum(txt_before TEXT, txt_new TEXT) RETURNS TEXT AS 77s $BODY$ 77s DECLARE 77s c_delim text; 77s BEGIN 77s c_delim = ','; 77s IF (txt_before IS NULL or txt_before='') THEN 77s RETURN txt_new; 77s END IF; 77s RETURN txt_before || c_delim || txt_new; 77s END; 77s $BODY$ 77s LANGUAGE plpgsql; 77s CREATE FUNCTION 77s comment on function public.agg_text_sum(text,text) is 77s 'An accumulator function used by the slony string_agg function to 77s aggregate rows into a string'; 77s COMMENT 77s Dropping cluster 16/regress ... 77s ### End 16 psql ### 77s NOTICE: function public.reshapesubscription(int4,int4,int4) does not exist, skipping 77s autopkgtest [10:06:56]: test load-functions: -----------------------] 77s autopkgtest [10:06:56]: test load-functions: - - - - - - - - - - results - - - - - - - - - - 77s load-functions PASS 78s autopkgtest [10:06:57]: @@@@@@@@@@@@@@@@@@@@ summary 78s load-functions PASS 84s nova [W] Using flock in prodstack6-s390x 84s Creating nova instance adt-plucky-s390x-slony1-2-20250106-100539-juju-7f2275-prod-proposed-migration-environment-2-57d29276-7e7c-440e-b42d-21beda410862 from image adt/ubuntu-plucky-s390x-server-20250106.img (UUID 5b452c36-3901-4afa-abc7-e21db541e94d)... 84s nova [W] Timed out waiting for 6dd85b50-efd8-4246-b101-f1b0e6711d21 to get deleted.