18 KiB
categories | date | draft | preview | tags | title | type | url | ||
---|---|---|---|---|---|---|---|---|---|
|
2023-08-14 11:57:25 | false | /social/84f2e17dfb3984df186f2bdbe534d08b16e0741817363b2b9d6bec1e035e2983.png | Prod-Ready Airbyte Sync | posts | /2023/08/14/stable-airbyte-sync/ |
Airbyte is a tool that allows you to periodically extract data from one database and then load and transform it into another. It provides a performant way to clone data between databases and gives us the flexibility to dictate what gets shared at field level (for example we can copy the users table but we can omit name, address, phone number etc). There are a bunch of use cases where this kind of thing might be useful. For example, say you have a data science team who want to generate reports on how many sales your e-shop made this month and train predictive models for next month’s revenue. You wouldn’t want to give your data team direct access to your e-shop database because:
- there might be sensitive personal information (SPI) in there (think names, addresses, bank details, links to what individual users ordered)
- running this kind of analysis might impact the performance of your shop database and customers might start churning.
Instead, we can use a tool, such as airbyte, regularly make copies of a subset of the production database minus the SPI and load it into an external analytics database. The data team can then use this external database to make complex queries all day long without affecting application performance. This pattern is called Extract Load Transform or ELT.
In this post I’ll summarise some strategies for using airbyte in a production environment and share some tips for navigating some of it’s “rough edges” based on my own experience of setting up this tool as a pilot project for some of my clients in my day job.
General Requirements
- We need to securely copy data from our “live” application database to our “analytics” database
- We need to do this without impacting the performance of the application
- We need to be able to do this daily so that the analytics db contains the latest information
- We need to be able to control which tables and columns get sent
- We need to limit who is able to configure airbyte and incorporate checks to prevent accidental data leakage or loss
- We want to be able to do this relatively cheaply
Database Technologies
Airbyte has a large number of mature data “Source” plugins for a bunch of standard SQL databases like MySQL/MariaDB, PostgreSQL, MSSQL and so on. It’s Source plugins for data warehouse systems like BigQuery are still in alpha. The reverse appears to be true for “Destination” plugins. Airbyte provides mature destination implementations for data warehouse products like Google BigQuery and Snowflake but immature alpha and beta support for syncing to a traditional RDBMS.
The full list of supportect source and destination connectors is available here. We are able to use the MySQL source to pull data from the core application and store our data in Google BigQuery.
Picking an Edition of Airbyte
Airbyte comes in two “flavours”:
- A hosted cloud version where you pay Airbyte to host an instance of their application on your behalf and they copy all your data around
- A self-hosted version where you install Airbyte yourself in your own infrastructure and you debug it yourself when you inevitably mess up installation instructions.
I have a risk averse client-base who have strong data protection requirements so we opted for self-hosted instances that sit in the same region and virtual network in our Google Cloud instance as our application. The target database is Google BigQuery in the same data centre region and the involved clients were ok with that.
Picking a Virtual Machine Spec
If you opt for the self-hosted version like we did you’ll need to pick a VM that has enough resources to run Airbyte. We went for google’s n2-standard-4
machine spec which has 4 CPU cores and 16GB of RAM. This was actually our second choice after picking an e2-standard-2
which only had 8GB of RAM which was not enough to run Airbyte optimally and caused thrashing/spiking issues.
Although all the data does pass through the VM, it’s done in buffered chunks so your VM doesn’t need a lot of storage space - 50GiB was sufficient for our setup.
Setting up Airbyte
We deployed Airbyte using docker-compose by following their quick start guide. We locked down access to the machine over HTTPS so that it only accepts requests from inside our corporate VPN.
We were able to create a database connection to the MySQL database via it’s internal Google Cloud IP address which meant that no production data is routed outside of the virtual network during the first leg of the extraction (everything is encrypted with TLS anyway).
CDC versus Full Refresh
When you configure a MySQL source (or indeed a bunch of other compatible sources), you can turn on either full refresh or incremental sync via change data capture. The latter makes use of logs from the SQL server to play back all of the SQL queries that have been run since the last sync and reduce the amount of data that is transferred. If you have a large database (of the order of 10s to 100s of GiB), this may be worthwhile as it is likely to accelerate the sync process significantly after the first run.
However, the current implementation of CDC/incremental sync for MySQL appears to get stuck and continue to run if you have ongoing changes being made to the system. For example, if you have a high availability application that is always in use or if you have automated processes making changes to the data around the clock, the sync driver will continue to pick up these new changes and append them on an ongoing basis unless you’re able to turn off the workers or create an isolated copy of the source database (as described below).
We opted to stick with basic “full sync” as we’re only copying around 10GiB of data on a daily basis and our Airbyte setup is able to do this in about 30-45 mins under normal circumstances.
When to Sync
Ideally you want to run the sync when the system is not in use or when it is least likely to impact the ongoing operation of your primary business application. Since most of our clients operate between 9am-6pm GMT we have a nice big “out of hours” window during which we can run this sync.
If you don’t have that luxury because you have a high-availability application, Google Cloud SQL has the ability to take disk snapshots that don’t appear to significantly impact the database performance. We did sketch out a potential workflow that would involve using the Google Cloud SQL Admin API to:
- Create a new disk dump of the client db
- “Restore” the disk dump to a secondary database replica
- Run the airbyte sync, targetting the secondary replica as the “sync”
- Turn off the replica db
- Rinse & repeat
However, we were able to get full sync (without incremental CDC) working on a nightly basis without adding these extra complications. This would have required us to use an external orchestrator like Airflow which has Airbyte operators to execute our process. Thanksfully, we were able to use the built in cron scheduler to have Airbyte run the sync nightly before our application maintenance window.
Securing the Process
As outlined, it can be important to make sure that certain fields are not shared via airbyte. Airbyte does provide a UI for toggling fields and indeed whole tables on and off as part of sync but for schemas with large numbers of tables and columns this is time-consuming and unwieldy and is likely to lead to human error.
Octavia CLI
Airbyte’s answer to this problem is a command line tool named Octavia. Octavia communicates with Airbyte via a REST API and generates YAML configuration files that contain all tables, fields and configurations.
This is a game changer as it means we can script the fields that get synchronized and we can version control and quality control both the script and the config file itself. We can also gate changes to the sync config via CI tooling and “lint” the config for potential errors (e.g. by providing a script that compares the config against a blacklist of columns that are never allowed).
Below is an excerpt from one such script that might give readers some inspiration for implementing a similar process themselves:
import yaml import click @cli.command() @click.option("--connection-file", "-c", type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, writable=True), required=True) @click.option("--fix/--no-fix", type=bool, default=False) def lint_connection(connection_file, fix): with open(connection_file,'r') as f: config = yaml.safe_load(f) print(f"Reading config file for {config['resource_name']}") print(f"Checking Connection Resource: {config['resource_name']}") streams = config['configuration']['sync_catalog']['streams'] print(f"Found {len(streams)} configured streams (tables) from source") errors = [] for i, stream in enumerate(streams): name = stream['stream']['name'] if ("admin" in name) or (name in BLOCKED): print(f"Checking {name} is not enabled for sync...") if stream['config']['selected']: err_string = f"ERROR: {name} must not be synced but is being synced" errors.append(err_string) if fix: streams[i]['config']['selected'] = False print(f"Found {len(errors)} problems") for error in errors: print(f"\t - {error}") if len(errors) > 0: if fix: config['configuration']['sync_catalog']['streams'] = streams print("Fix=True, writing updated config") with open(connection_file, 'w') as f: yaml.safe_dump(config, f) else: print("Fix=False, please manually fix any problems or rerun with --fix to resolve automatically") else: print("No problems found! 😊")
The idea here is that we run the script as part of CI and have the pipeline fail to deploy if it catches any errors and have a --fix
mode that attempts to automatically rectify any problems which our maintainer can run locally.
Octavia “Bug”
When I started using Octavia, I noticed that it would bug out and print error messages when I started to use change which fields are selected for sync. I found a bug ticket about this issue and then eventually realised that the Airbyte documentation for Octavia is quite out of date and by default it installs an old version of Octavia that is not compatible with the current version of the Airbyte server itself. In order to make it work, I simply changed my .zshrc
file (or .bashrc
file for some) to use the latest version of the tool - which at time of writing is 0.50.7
:
OCTAVIA_ENV_FILE=/home/james/.octavia export OCTAVIA_ENABLE_TELEMETRY=True alias octavia="docker run -i --rm -v \$(pwd):/home/octavia-project --network host --env-file \${OCTAVIA_ENV_FILE} --user \$(id -u):\$(id -g) airbyte/octavia-cli:0.50.7"
Octavia and SSL
Unfortunately I couldn’t find an easy way to get Octavia to play nicely with self-signed SSL certificates which meant we had to load in an externally “signed” SSL cert. Octavia is written in Python and uses requests to interact with Airbyte so you could theoretically configure it to trust a self-signing certificate authority (as per this stackoverflow post).
Keeping an Eye on Things
Once you have your sync up and running you probably want to make sure it keeps running regularly. Airflow has slack webhook integration which means that it’s easy enough to have it automatically notify you when sync has passed or failed.
Conclusion
There are a lot of variables and moving parts to consider when using Airbyte, especially when data security and privacy are so important. In this post I outlined some hints and tips for using airbyte successfully based on my own experience of setting up the tool. Hopefully some of my observations were useful or interesting to any readers who are thinking about picking up Airbyte.