decoder_json /

Filename Size Date modified Message
66 B
99 B
5.3 KB
21.7 KB
1.6 KB

decoder_json, output plugin for Postgres logical replication

This output plugin for Postgres logical replication generates a JSON document for each logical change.

This is experimental code.

USAGE

Install it:

make
make install

Configure Postgres for logical replication. That means lines like this in postgres.conf:

wal_level = logical
max_wal_senders = 1
max_replication_slots = 1

And like this in pg_hba.conf:

local   replication     postgres                                md5

Then restart Postgres and run the included test.sql script:

psql -U postgres -f test.sql

You should see results like this:

 location  | xid |                                                                                                                                                       data                                                                                                                                                        
-----------+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 0/1831740 | 856 | {"table":"public.test_decoder_json","op":"INSERT","data":{"id":1,"json_data":[1, 2, 3, "blerg"],"string_data":"this is a string","string_array":["tag1","tag2","tag3"],"int_array":[4,5,6],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:28:58.777292+00:00"}}
 0/1831918 | 857 | {"table":"public.test_decoder_json","op":"UPDATE","data":{"id":1,"json_data":[1, 2, 3, "blerg"],"string_data":"this is a string","string_array":["tag1","tag2","tag3"],"int_array":[99,98,97],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:28:58.777292+00:00"}}
 0/1831A80 | 858 | {"table":"public.test_decoder_json","op":"INSERT","data":{"id":999999,"json_data":[1, 2, 3, "blerg"],"string_data":"this is a string","string_array":["tag1","tag2","tag3"],"int_array":[4,5,6],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:28:58.77884+00:00"}}
 0/1831C20 | 859 | {"table":"public.test_decoder_json","op":"UPDATE","data":{"id":999999,"json_data":[1, 2, 3, "blerg"],"string_data":"update!","string_array":["tag1","tag2","tag3"],"int_array":[4,5,6],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:28:58.77884+00:00"}}
 0/1831D80 | 860 | {"table":"public.test_decoder_json","op":"DELETE","old_key":{"id":999999}}
 0/1831E08 | 861 | {"table":"public.test_decoder_json","op":"DELETE","old_key":{"id":1}}
(6 rows)

In addition to the SQL-based interface used in test.sql, there is a streaming interface that you can connect to with the pg_recvlogical program included with Postgres:

$ pg_recvlogical --slot=test_decoder_json_streaming --dbname=postgres --user=postgres --create-slot --plugin=decoder_json 
$ pg_recvlogical --slot=test_decoder_json_streaming --dbname=postgres --user=postgres --start -f -

Now in another terminal, run psql -U postgres -f test.sql, and you should see lines of json streaming in your pg_recvlogical terminal, like this:

vagrant@vagrant-ubuntu-trusty-64:/vagrant/decoder_json$ pg_recvlogical --slot=test_decoder_json_streaming --dbname=postgres --user=postgres --start -f -
{"table":"public.test_decoder_json","op":"INSERT","data":{"id":1,"json_data":[1, 2, 3, "blerg"],"string_data":"this is a string","string_array":["tag1","tag2","tag3"],"int_array":[4,5,6],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:38:25.715435+00:00"}}
{"table":"public.test_decoder_json","op":"UPDATE","data":{"id":1,"json_data":[1, 2, 3, "blerg"],"string_data":"this is a string","string_array":["tag1","tag2","tag3"],"int_array":[99,98,97],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:38:25.715435+00:00"}}
{"table":"public.test_decoder_json","op":"INSERT","data":{"id":999999,"json_data":[1, 2, 3, "blerg"],"string_data":"this is a string","string_array":["tag1","tag2","tag3"],"int_array":[4,5,6],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:38:25.717425+00:00"}}
{"table":"public.test_decoder_json","op":"UPDATE","data":{"id":999999,"json_data":[1, 2, 3, "blerg"],"string_data":"update!","string_array":["tag1","tag2","tag3"],"int_array":[4,5,6],"string_array_array":[["1","2"],["3","4"],["5","6"]],"float_data":1.2345,"ts":"2015-01-11T22:38:25.717425+00:00"}}
{"table":"public.test_decoder_json","op":"DELETE","old_key":{"id":999999}}
{"table":"public.test_decoder_json","op":"DELETE","old_key":{"id":1}}

When you're done, be sure to drop the slot:

$ pg_recvlogical --slot=test_decoder_json_streaming --dbname=postgres --user=postgres --drop-slot

TODO

  • Clean up cruft left over from test_decoding code.
  • Provide option for including BEGIN and COMMIT messages.
  • Provide option for including transaction IDs
  • Provide option for including timestamp.
  • Automate tests