Sábado, 10 de Maio de 2008

Receiving events/notifications from Postgresql

At work I'm constantly seeing the (anti)pattern of (ab)using the RDBMS as datasource. The main reason for not using a cache for data lookup seems to be the fear of serving stale data. This is easily solved if all access to the database goes through somekind of service layer, in this case the service layer must have the added responsability of updating the cache. If there isn't a service layer in place what is needed is a way for the database to send "cache eviction events" whenever there is a change in some record.

Pgmemcache is a solution to this problem but it's tightly coupled to memcached and in the future we may want to receive events not related to caching, also, altough memcached supports an UDP protocol, pgmemcache seems to require a persistent TCP connection to the memcached servers, which is something that makes me uncomfortable.

After reading the excelent documentation on how to write custom C functions I come up with this:

#include "postgres.h"
#include "fmgr.h"

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

PG_FUNCTION_INFO_V1(publish_event);

Datum
publish_event(PG_FUNCTION_ARGS)
{
	int32 port = PG_GETARG_INT32(0); // destination port
	text *payload = PG_GETARG_TEXT_P(1); // message payload

	int ret;
	int fd = socket(PF_INET, SOCK_DGRAM , 0);
	extern int errno;
	struct sockaddr_in dst_addr;
	int flag = 1;
	int len = VARSIZE(payload) - VARHDRSZ;

	setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &flag, sizeof(int));

	if ( fd < 0 )
		elog(NOTICE,"publish_event: %s\n", strerror(errno));

	inet_aton("localhost", &dst_addr.sin_addr);

	dst_addr.sin_family=AF_INET;
	dst_addr.sin_port=htons(port);

	ret = sendto(fd, VARDATA(payload),len , 0, 
	(struct sockaddr * ) &dst_addr, sizeof(struct sockaddr_in));

	if ( ret < 0 )
		elog(NOTICE,"publish_event: %s\n", strerror(errno));

	closesocket(fd);

	PG_RETURN_VOID();
}

And this is an example of how this function can be used:

CREATE OR REPLACE FUNCTION item_trg_upd()
  RETURNS trigger AS
$BODY$
BEGIN
	IF ROW(OLD.*) IS DISTINCT FROM ROW(NEW.*)  THEN
		PERFORM publish_event(1234, '<id>' || OLD.id || '</id>' );
	END IF;
	RETURN NEW;
END;
$BODY$
  LANGUAGE 'plpgsql' VOLATILE;

CREATE TRIGGER item_trg_upd
  AFTER UPDATE
  ON item
  FOR EACH ROW
  EXECUTE PROCEDURE item_trg_upd();

 

And that's it, whenever there is a change in some record in the "item" table a notification is sent. In order for this to work there must be local daemon listening in wichever port the message is sent to, something like this:

 

import static java.lang.System.out;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;

public class SimpleDaemon
{
	public static void main(String[] args) throws Throwable
	{
		InetAddress inet = InetAddress.getByName("localhost");
		DatagramSocket socket = new DatagramSocket(1234, inet);
	
		// increase the rcv buffer size to minimize packet loss
		socket.setReceiveBufferSize(1024 * 1024);
	
		int mSize = (int) Math.pow(2, 16);
		DatagramPacket packet = new DatagramPacket(new byte[mSize], mSize);

		out.println("listening ...");

		while (true)
		{
			socket.receive(packet);
			byte[] receivedData = packet.getData();
			int len = packet.getLength();
			byte[] messageData = new byte[len];			
			System.arraycopy(receivedData, 0, messageData, 0, len);			
			String message = new String(messageData, "UTF-8");
			out.println(message);
		}
	}
}

 

 

Fetch the code if you find this usefull.
Note: the Java code above increases the receive buffer size to a value not allowed by default in most Operating Systems, read "UDP Buffer Sizing" for instructions on how to tune your OS.

 

published by luisneves às 19:40
perm link | comment | add to favourites

.search this blog

.Fevereiro 2009

Dom
Seg
Ter
Qua
Qui
Sex
Sab
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

.recent posts

. Alternative serialization...

. WSTest, some numbers - Up...

. A fast implementation of ...

. European PGDay 2008

. Need for speed

. Receiving events/notifica...

.archives

. Fevereiro 2009

. Janeiro 2009

. Dezembro 2008

. Outubro 2008

. Maio 2008

.tags

. todas as tags

.subscrever feeds