pgstream - Postgres replication with DDL changes pgstream is an open source CDC command-line tool and library that offers Postgres replication support with DDL changes to any provided target. Features Schema change tracking and replication of DDL changes Support for multiple out of the box targets Elasticsearch/OpenSearch Webhooks PostgreSQL Initial and on demand PostgreSQL snapshots (for when you don't need continuous replication) Column value transformations (anonymise your data on the go!) Modular deployment configuration, only requires Postgres Kafka support with schema based partitioning Extendable support for custom targets Table of Contents Usage pgstream can be used via the readily available CLI or as a library. CLI Installation Binaries Binaries are available for Linux, macOS & Windows, check our Releases. From source To install pgstream from the source, run the following command: go install github.com/xataio/pgstream@latest From package manager - Homebrew To install pgstream with homebrew, run the following command: # macOS or Linux brew tap xataio/pgstream brew install pgstream Environment setup If you have an environment available, with at least Postgres and whichever module resources you're planning on running, then you can skip this step. Otherwise, a docker setup is available in this repository that starts Postgres, Kafka and OpenSearch (as well as OpenSearch dashboards for easy visualisation). docker-compose -f build/docker/docker-compose.yml up The docker-compose file has profiles that can be used in order to bring up only the relevant containers. If for example you only want to run PostgreSQL to PostgreSQL pgstream replication you can use the pg2pg profile as follows: docker-compose -f build/docker/docker-compose.yml --profile pg2pg up You can also run multiple profiles. For example to start two PostgreSQL instances and Kafka: docker-compose -f build/docker/docker-compose.yml --profile pg2pg --profile kafka up List of supported docker profiles: pg2pg pg2os pg2webhook kafka Configuration Pgstream source and target need to be configured appropriately before the commands can be run. This can be done: Using the relevant CLI flags for each command Using a yaml configuration file Using environment variables (.env file supported) Check the documentation for more information about the configuration options, or check the help on the CLI for details on the available flags. Additionally, at the root of this repository you can find sample files for both .env and .yaml. If you want to configure column transformations, leveraging greenmask, neosync and go-masker open source integrations, as well as custom transformers, check the transformation rules configuration for more details, along with the list of available transformers. Prepare the database This will create the pgstream schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. It will also create a replication slot for the configured database which will be used by the pgstream service. If no replication slot name is provided, it will use a default one with the format pgstream__slot . This step can be skipped and --init can be provided as an option to run command. It will do the same preparation right before starting the replication. # with CLI flags pgstream init --postgres-url " postgres://postgres:postgres@localhost?sslmode=disable " --replication-slot test # with yaml configuration file pgstream init -c pg2pg.yaml # with environment configuration file pgstream init -c pg2pg.env The status of the initalisation and the configuration can be checked by using the status command. pgstream status -c pg2pg.yaml SUCCESS pgstream status check encountered no issues Initialisation status: - Pgstream schema exists: true - Pgstream schema_log table exists: true - Migration current version: 7 - Migration status: success - Replication slot name: pgstream_postgres_slot - Replication slot plugin: wal2json - Replication slot database: postgres Config status: - Valid: true Transformation rules status: - Valid: true Source status: - Reachable: true If there are any issues or if you want to revert the pgstream setup, you can use the destroy command to clean up all pgstream state. pgstream destroy --postgres-url " postgres://postgres:postgres@localhost?sslmode=disable " --replication-slot test # with yaml configuration file pgstream destroy -c pg2pg.yaml # with environment configuration file pgstream destroy -c pg2pg.env Run pgstream Replication mode Run will start streaming data from the configured source into the configured target. Example running pgstream replication from Postgres -> OpenSearch: # using the environment configuration file pgstream run -c pg2os.env --log-level trace # using the yaml configuration file pgstream run -c pg2os.yaml --log-level info # using the CLI flags pgstream run --source postgres --source-url " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target opensearch --target-url " http://admin:admin@localhost:9200 " Example running pgstream with Postgres -> Kafka, and in a separate terminal, Kafka->OpenSearch: # using the environment configuration file pgstream run -c pg2kafka.env --log-level trace # using the yaml configuration file pgstream run -c pg2kafka.yaml --log-level info # using the CLI flags pgstream run --source postgres --source-url " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target kafka --target-url " localhost:9092 " # using the environment configuration file pgstream run -c kafka2os.env --log-level trace # using the yaml configuration file pgstream run -c kafka2os.yaml --log-level info # using the CLI flags pgstream run --source kafka --source-url " localhost:9092 " --target opensearch --target-url " http://admin:admin@localhost:9200 " Example running pgstream with PostgreSQL -> PostgreSQL with initial snapshot enabled: # using the environment configuration file pgstream run -c pg2pg.env --log-level trace # using the yaml configuration file pgstream run -c pg2pg.yaml --log-level info # using the CLI flags pgstream run --source postgres --source-url " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target postgres --target-url " postgres://postgres:postgres@localhost:7654?sslmode=disable " --snapshot-tables test Snapshot mode Example running pgstream to perform a snapshot from PostgreSQL -> PostgreSQL: # using the environment configuration file pgstream snapshot -c snapshot2pg.env --log-level trace # using the yaml configuration file pgstream snapshot -c snapshot2pg.yaml --log-level info # using the CLI flags pgstream snapshot --postgres-url= " postgres://postgres:postgres@localhost:5432?sslmode=disable " --target=postgres --target-url= " postgres://postgres:postgres@localhost:7654?sslmode=disable " --tables= " test " --reset Pgstream will parse the configuration provided, and initialise the relevant modules. It requires at least one source(listener) and one target(processor). Tutorials Documentation For more advanced usage, implementation details, and detailed configuration settings, please refer to the full Documentation. Benchmarks Snapshots Datasets used: IMDB database, MusicBrainz database, Firenibble database. All benchmarks were run using the same setup, with pgstream v0.7.2 , pg_dump/pg_restore (PostgreSQL) 17.4 and PostgreSQL 17.4, using identical resources to ensure a fair comparison. For more details into performance benchmarking for snapshots to PostgreSQL with pgstream , check out this blogpost. Limitations Some of the limitations of the initial release include: Single Kafka topic support Postgres plugin support limited to wal2json No row level filtering support Primary key/unique not null column required for replication Kafka serialisation support limited to JSON Contributing We welcome contributions from the community! If you'd like to contribute to pgstream, please follow these guidelines and adhere to our code of conduct. License This project is licensed under the Apache License 2.0 - see the LICENSE file for details. Support If you have any questions, encounter issues, or need assistance, open an issue in this repository our join our Discord, and our community will be happy to help. Made with 💜 by Xata 🦋