PostgreSQL Listen/Notification proces

In PostgreSQL it is possible to send a notification to a channel based on a trigger on a table. This notification can then be send to a GO program. This language is specifically meant to respond to messages coming from a channel…

Notification process

The notification process is a process that sends a message from PostgreSQL to an internal queue. There is 8000 byte limit on the size of the message, so the amount of data should be limited. This mechanism should not be used for a big message queueing system, but it’s an easy way to process change notifications to other systems. This article is about the combination of this mechanism and the Go language as a receiving client for a notification.

Let’s demonstrate this by putting a message on a queue. As an example we create a table TEST, create a trigger on that table and execute a stored procedure to respond to transactions on this table.

create table TEST( ID     serial
                 , TIJD   timestamp
                 , ACTION varchar( 100 ) )

The code we’re going to execute is a stored procedure that creates a JSON message and uses this message as the payload for a notification.

create or replace function notify_message()
returns trigger as $$
declare
  payload JSON;
begin
   payload = json_build_object( 'action', TG_OP
                              , 'data', row_to_json( NEW ) );


  perform pg_notify( 'event_channel', payload::text );
  
  return null;
end;
$$ language plpgsql;

The next step is to create a trigger on our TEST table to response to DML changes on this table.

create trigger taiu_worklog
after insert or update
on TEST
for each row
execute procedure notify_message();

Listen

The next step is to listen to a message on this channel. In PgPLSQL this is done so by giving the command listen event_channel.Once a message is send to the channel, this PgPLSQL session will respond to this message:

Session A
postgres=# insert into TEST( TIJD, ACTION ) values ( now(), 'Test 1' );
Session B
postgres=# listen event_channel;
LISTEN
postgres=# notify event_channel, 'touch';
NOTIFY
Asynchronous notification "event_channel" with payload "{"action" : "INSERT", "data" : {"id":10,"tijd":"2019-05-17T20:51:46.809745","action":"Test 1"}}" received from server process with PID 15807.
Asynchronous notification "event_channel" with payload "touch" received from server process with PID 16129.

Go

The next step is to have a client application that can respond to a message on a channel. Go is designed to wait for a message to present itself on a channel. This is more efficient than a client that keeps polling a channel to see if there are messages available,

The latest version of Go can be obtained from Google: https://golang.org/dl/A very short summary of the installation (on CentOS 7) is:

wget https://dl.google.com/go/go1.12.5.linux-amd64.tar.gz
tar xzvf ./go1.12.5.linux-amd64.tar.gz
mv go /usr/local/
cd /bin
ln -s /usr/local/go/bin/go

yum install git

go get -u github.com/lib/pq
cp -pR ./go /usr/local/

The next step is to create a go program to listen to messages:

go_postgres.go
package main

import (
  "bytes"
  "database/sql"
  "encoding/json"
  "fmt"
  "time"
  "github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
  for {
    select {
    case n := <-l.Notify:
      fmt.Println("Received data from channel [", n.Channel, "] :")
      // Prepare notification payload for pretty print
      var prettyJSON bytes.Buffer
      err := json.Indent(&prettyJSON, []byte(n.Extra), "", "\t")
      if err != nil {
        fmt.Println("Error processing JSON: ", err)
        return
      }
      fmt.Println(string(prettyJSON.Bytes()))
      return
    case <-time.After(90 * time.Second):
      fmt.Println("Received no events for 90 seconds, checking connection")
      go func() {
        l.Ping()
      }()
      return
    }
  }
}

func main() {
  var conninfo string = "dbname=postgres user=postgres sslmode=disable"

  _, err := sql.Open("postgres", conninfo)
  if err != nil {
    panic(err)
  }

  reportProblem := func(ev pq.ListenerEventType, err error) {
    if err != nil {
      fmt.Println(err.Error())
    }
  }

  listener := pq.NewListener(conninfo, 10*time.Second, time.Minute, reportProblem)
  err = listener.Listen("event_channel")
  if err != nil {
    panic(err)
  }

  fmt.Println("Start monitoring PostgreSQL...")
  for {
    waitForNotification(listener)
  }
}

The listener can now be started and when it’s running we can insert a record in another session in the table:

postgres@vlhvmpg02 ~ $ go run go_postgres.go
Start monitoring PostgreSQL...
Received data from channel [ event_channel ] :
{
	"action": "INSERT",
	"data": {
		"id": 11,
		"tijd": "2019-05-17T22:02:06.155694",
		"action": "Test 1"
	}
}

https://www.postgresql.org/docs/current/sql-notify.html

https://tapoueh.org/blog/2018/07/postgresql-listen/notify/

http://coussej.github.io/2015/09/15/Listening-to-generic-JSON-notifications-from-PostgreSQL-in-Go/

 

One thought on “PostgreSQL Listen/Notification proces

Geef een reactie

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *