pgstream - Postgres replication with DDL changes
pgstream is an open source CDC command-line tool and library that offers Postgres replication support with DDL changes to any provided target.
Features
Schema change tracking and replication of DDL changes
Support for multiple out of the box targets Elasticsearch/OpenSearch Webhooks PostgreSQL
Initial and on demand PostgreSQL snapshots (for when you don't need continuous replication)
Column value transformations (anonymise your data on the go!)
Modular deployment configuration, only requires Postgres
Kafka support with schema based partitioning
Extendable support for custom targets
Table of Contents
Usage
pgstream can be used via the readily available CLI or as a library.
CLI Installation
Binaries
Binaries are available for Linux, macOS & Windows, check our Releases.
From source
To install pgstream from the source, run the following command:
go install github.com/xataio/pgstream@latest
From package manager - Homebrew
To install pgstream with homebrew, run the following command:
# macOS or Linux brew tap xataio/pgstream brew install pgstream
Environment setup
If you have an environment available, with at least Postgres and whichever module resources you're planning on running, then you can skip this step. Otherwise, a docker setup is available in this repository that starts Postgres, Kafka and OpenSearch (as well as OpenSearch dashboards for easy visualisation).
docker-compose -f build/docker/docker-compose.yml up
The docker-compose file has profiles that can be used in order to bring up only the relevant containers. If for example you only want to run PostgreSQL to PostgreSQL pgstream replication you can use the pg2pg profile as follows:
docker-compose -f build/docker/docker-compose.yml --profile pg2pg up
You can also run multiple profiles. For example to start two PostgreSQL instances and Kafka:
docker-compose -f build/docker/docker-compose.yml --profile pg2pg --profile kafka up
List of supported docker profiles:
pg2pg
pg2os
pg2webhook
kafka
Configuration
Pgstream source and target need to be configured appropriately before the commands can be run. This can be done:
Using the relevant CLI flags for each command
Using a yaml configuration file
Using environment variables (.env file supported)
Check the documentation for more information about the configuration options, or check the help on the CLI for details on the available flags. Additionally, at the root of this repository you can find sample files for both .env and .yaml.
If you want to configure column transformations, leveraging greenmask, neosync and go-masker open source integrations, as well as custom transformers, check the transformation rules configuration for more details, along with the list of available transformers.
Prepare the database
This will create the pgstream schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. It will also create a replication slot for the configured database which will be used by the pgstream service. If no replication slot name is provided, it will use a default one with the format pgstream__slot . This step can be skipped and --init can be provided as an option to run command. It will do the same preparation right before starting the replication.
# with CLI flags pgstream init --postgres-url " postgres://postgres:postgres@localhost?sslmode=disable " --replication-slot test # with yaml configuration file pgstream init -c pg2pg.yaml # with environment configuration file pgstream init -c pg2pg.env
The status of the initalisation and the configuration can be checked by using the status command.
pgstream status -c pg2pg.yaml SUCCESS pgstream status check encountered no issues Initialisation status: - Pgstream schema exists: true - Pgstream schema_log table exists: true - Migration current version: 7 - Migration status: success - Replication slot name: pgstream_postgres_slot - Replication slot plugin: wal2json - Replication slot database: postgres Config status: - Valid: true Transformation rules status: - Valid: true Source status: - Reachable: true
If there are any issues or if you want to revert the pgstream setup, you can use the destroy command to clean up all pgstream state.
pgstream destroy --postgres-url " postgres://postgres:postgres@localhost?sslmode=disable " --replication-slot test # with yaml configuration file pgstream destroy -c pg2pg.yaml # with environment configuration file pgstream destroy -c pg2pg.env
Run pgstream
Replication mode
Run will start streaming data from the configured source into the configured target.
Example running pgstream replication from Postgres -> OpenSearch:
# using the environment configuration file pgstream run -c pg2os.env --log-level trace # using the yaml configuration file pgstream run -c pg2os.yaml --log-level info # using the CLI flags pgstream run --source postgres --source-url " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target opensearch --target-url " http://admin:admin@localhost:9200 "
Example running pgstream with Postgres -> Kafka, and in a separate terminal, Kafka->OpenSearch:
# using the environment configuration file pgstream run -c pg2kafka.env --log-level trace # using the yaml configuration file pgstream run -c pg2kafka.yaml --log-level info # using the CLI flags pgstream run --source postgres --source-url " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target kafka --target-url " localhost:9092 "
# using the environment configuration file pgstream run -c kafka2os.env --log-level trace # using the yaml configuration file pgstream run -c kafka2os.yaml --log-level info # using the CLI flags pgstream run --source kafka --source-url " localhost:9092 " --target opensearch --target-url " http://admin:admin@localhost:9200 "
Example running pgstream with PostgreSQL -> PostgreSQL with initial snapshot enabled:
# using the environment configuration file pgstream run -c pg2pg.env --log-level trace # using the yaml configuration file pgstream run -c pg2pg.yaml --log-level info # using the CLI flags pgstream run --source postgres --source-url " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target postgres --target-url " postgres://postgres:postgres@localhost:7654?sslmode=disable " --snapshot-tables test
Snapshot mode
Example running pgstream to perform a snapshot from PostgreSQL -> PostgreSQL:
# using the environment configuration file pgstream snapshot -c snapshot2pg.env --log-level trace # using the yaml configuration file pgstream snapshot -c snapshot2pg.yaml --log-level info # using the CLI flags pgstream snapshot --postgres-url= " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target=postgres --target-url= " postgres://postgres:postgres@localhost:7654?sslmode=disable " --tables= " test " --reset
Pgstream will parse the configuration provided, and initialise the relevant modules. It requires at least one source(listener) and one target(processor).
Tutorials
Documentation
For more advanced usage, implementation details, and detailed configuration settings, please refer to the full Documentation.
Benchmarks
Snapshots
Datasets used: IMDB database, MusicBrainz database, Firenibble database.
All benchmarks were run using the same setup, with pgstream v0.7.2 , pg_dump/pg_restore (PostgreSQL) 17.4 and PostgreSQL 17.4, using identical resources to ensure a fair comparison.
For more details into performance benchmarking for snapshots to PostgreSQL with pgstream , check out this blogpost.
Limitations
Some of the limitations of the initial release include:
Single Kafka topic support
Postgres plugin support limited to wal2json
No row level filtering support
Primary key/unique not null column required for replication
Kafka serialisation support limited to JSON
Contributing
We welcome contributions from the community! If you'd like to contribute to pgstream, please follow these guidelines and adhere to our code of conduct.
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Support
If you have any questions, encounter issues, or need assistance, open an issue in this repository our join our Discord, and our community will be happy to help.
Made with 💜 by Xata 🦋