Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3456 Updated docs/designs/postgresql-state-store.md based on initial exper… #3457

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 99 additions & 66 deletions docs/designs/postgresql-state-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,72 +6,105 @@ Proposed

## Context

We have two implementations of the state store that tracks partitions and files in a Sleeper table. This takes some
effort to keep both working as the system changes, and both have problems.

The DynamoDB state store holds partitions and files as individual items in DynamoDB tables. This means that updates
which affect many items at once require splitting into separate transactions, and we can't always apply changes as
atomically or quickly as we would like. There's also a consistency issue when working with many items at once. As we
page through items to load them into memory, the data may change in DynamoDB in between pages.

The S3 state store keeps one file for partitions and one for files, both in an S3 bucket. A DynamoDB table is used to
track the current revision of each file, and each change means writing a whole new file. This means that each change
takes some time to process, and if two changes happen to the same file at once, it backs out and has to retry. Under
contention, many retries may happen. It's common for updates to fail entirely due to too many retries, or to take a long
time.
There are currently three state stores in Sleeper. The DynamoDBStateStore will be deleted soon as DynamoDB does not
support snapshot isolation and therefore we cannot obtain a consistent view of the file references in a
table. The S3StateStore does not have this problem but as it requires the entire state to be written to S3 for each
update, it can take several seconds to apply an update. This is too slow for some use cases which may require
a million or more updates per day.

The transaction log state store stores each change to the state store as a new
item in DynamoDB, with optimistic concurrency control used to add new transactions. To avoid reading the entire
history of transactions when querying or updating the state store, we periodically create snapshots of the state.
To get an up-to-date view of the state store, the latest snapshot is queried and then updated by reading all subsequent
transactions from the DyanmoDB transaction log. This state store enforces a sequential ordering to all the updates to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a typo at "DyanmoDB".

the state store. Due to challenges performing lots of concurrent updates, there is an option to apply updates to the
state store via an SQS FIFO queue which triggers a committer lambda. The table id is used to ensure that only one
lambda can be processing an update for a table at a time. This works well for a single table, but the update rate
drops when there are multiple tables as different lambda instances will process updates for the same table which
means that there is a need to refresh the state from the transaction log. Further testing is needed to determine
if the transaction log state store performs sufficiently well to enable our test scenarios to run successfully.

The transaction log state store also requires significant amount of clean up operations, e.g. of old transactions
and old snapshots. These will have some impact on the underlying DynamoDB table and this needs to be tested further.

We want to experiment with using a relational database as the state store. This should be simpler than the transaction
log state store and offer us the consistency guarantees we need. The trade-off is that it will not be serverless,
specifically it will not scale down to consume zero computational resources when there is no work to do.

The comments in the rest of this page are based on experiments with a prototype state store that uses the PostgreSQL
compatible variant of Amazon Aurora. That prototype implemented both the FileReferenceStore and the PartitionStore
using PostgreSQL, however as the load on the partition store is not significant, it will be easier to continue to
use the transaction log store as the partition store.

## Design Summary

Store the file and partitions state in a PostgreSQL database, with a similar structure to the DynamoDB state store.

The database schema may be more normalised than the DynamoDB equivalent. We can consider this during prototyping.

## Consequences

With a relational database, large queries can be made to present a consistent view of the data. This could avoid the
consistency issue we have with DynamoDB, but would come with some costs:

- Transaction management and locking
- Server-based deployment model

### Transaction management and locking

With a relational database, larger transactions involve locking many records. If a larger transaction takes a
significant amount of time, these locks may produce waiting or conflicts. A relational database is similar to DynamoDB
in that each record needs to be updated individually. It's not clear whether this may result in slower performance than
we would like, deadlocks, or other contention issues.

Since PostgreSQL supports larger queries with joins across tables, this should make it possible to produce a consistent
view of large amounts of data, in contrast to DynamoDB.

If we wanted to replicate DynamoDB's conditional updates, one way would be to make a query to check the condition, and
perform an update within the same transaction. This may result in problems with transaction isolation.

PostgreSQL defaults to a read committed isolation level. This means that during one transaction, if you make multiple
queries, the database may change in between those queries. By default, checking state before an update does not produce
a conditional update as in DynamoDB.

With higher levels of transaction isolation, you can produce the same behaviour as a conditional update in DynamoDB.
If a conflicting update occurs at the same time, this will produce a serialization failure. This would require you to
retry the update. There may be other solutions to this problem, but this may push us towards keeping transactions as
small as possible.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still seems possible this will be a problem. From the manual it sounds like there are cases where page locks are acquired even with serializable isolation level. This could still affect us even when we're certain we never update the same records in multiple places.

I can imagine myself coming back to look at this and not being able to find everything. It might be useful to keep a record of the logic behind this, at least why we use serializable isolation level, what we were worried about, and some detail of how it affects us. Can we bring some of this back?


See the PostgreSQL manual on transaction isolation levels:

https://www.postgresql.org/docs/current/transaction-iso.html

### Deployment model

PostgreSQL operates as a cluster of individual server nodes. We can mitigate this by using a service with automatic
scaling.

Aurora Serverless v2 supports automatic scaling up and down between minimum and maximum limits. If you know Sleeper will
be idle for a while, we could stop the database and then only be charged for the storage. We already have a concept of
pausing Sleeper so that the periodic lambdas don't run. With Aurora Serverless this wouldn't be too much different. See
the AWS documentation:

https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/aurora-serverless-v2.how-it-works.html

This has some differences to the rest of Sleeper, which is designed to scale to zero by default. Aurora Serverless v2
does not support scaling to zero. This means there would be some persistent costs unless we explicitly pause the Sleeper
instance and stop the database entirely.
Copy link
Collaborator

@patchwork01 patchwork01 Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this information about Aurora in the document?

We use the PostgreSQL compatible version of Amazon Aurora as the database service.

There are two main options for how to store the file reference data in PostgreSQL.

- Option 1: For each Sleeper table, there are two PostgreSQL tables. One stores the file references and one stores the
file reference counts. The file reference store has a primary key of (filename, partition id). When compacting files,
we remove the relevant file reference entries, add a new one for the output file, and decrement the relevant entries in
the file reference counts table. This option has the potential for contention. Suppose we have a table with 1024
leaf partitions, and we perform 11 ingests, each of which writes only 1 file but creates 1024 file references. A
compaction job is created for each partition. If there are a few hundred compaction tasks running simultaneously, then
many will finish at approximately the same time and they will all decrement the same counts. Experiments showed that
this can lead to failures of compaction jobs. Each state store update can be retried, and will eventually succeed but
there may be a delay. To partially mitigate this issue, a salt field can be added to the file reference count table.
The salt is the hash of the partition id modulo some number, e.g. 128. This means that the counts are split across
multiple rows, which has the advantage of decreasing the chance that multiple compaction jobs that finish at the same
time will attempt to update the same records.

- Option 2: For each Sleeper table, there is one PostgreSQL table containing the file references. We do not explicitly
store the file reference counts. If we just add the
file references and delete them when a compaction job finishes, we will lose track of what files are in the system
and not be able to garbage collect them. We can avoid this problem as follows. When we add a file and some references,
we also add a dummy file reference, i.e. one where the partiton id is "DUMMY". Normal operations on the state store
ignore these entries. When all non-dummy references to a file have been removed, only the dummy reference will remain.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like all we need from the dummy record is to know that that file exists. If we make a separate table for which files exist, we could avoid needing to set values for fields that don't have any meaning for this record. Is there any downside to having a separate table for that?

To garbage collect files we can look for files for which there is only one reference (which must be the dummy
reference). We cannot delete the file then as we only delete files once the last update to them is more than a certain
amount of time ago. The garbage collection process now happens in two stages: the first stage looks for files with
only the dummy reference. When it finds one it sets the update time to the current time (when the dummy reference is
first created, its update time is set to Long.MAX_VALUE). The second phase of the garbage collection looks for files
with only the dummy reference and for which the update time is more than N seconds ago. Those can be deleted. This means
that the deletion of files will take at least two calls to the garbage collector, but we do not need to prioritise
deleting files as quickly as possible.

Experiments showed that both of these options may be viable, but it is recommended that option 2 is pursued as that
reduces the contention the most.

## Implementation

### Stack

We will need an optional stack to deploy an Aurora PostgreSQL compatible instance. We can offer the option of a fixed
size instance as well as the serverless option. The Aurora instance can be deployed in the VPC used in other places in
Sleeper. We can use AWS Secrets Manager to control access to the database. All lambdas that need access to the state
store will need to run in the VPC. The Aurora instance will run until paused. We can update the pause system
functionality so that it pauses the Aurora instance. We should be aware that paused instances will automatically be
turned on every 7 days for security updates and then they are not turned off.

We will need a PostgreSQLStateStore implementation of FileReferenceStore. This will need a connection to the database
instance.

### Managing the number of concurrent connections to the Aurora instance

Each connection to the Aurora instance consumes a small amount of resource on the instance. If we have 500 compaction
tasks running at the same time and they all have a connection to the instance, that will put the instance under load
even if those connections are idle. Each execution of a compaction job should create a PostgreSQLStateStore, do the
checks it needs and then close the connection. We could add a method to the state store implementation that closes
any internal connections or we could create the state store with a connection supplier than provides a connection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like "than" should be "that".

when needed but then shuts that connection down when it has not been used for a while.

When we need to make updates to the state store, we can avoid needing to create a connection every time by reusing
the idea of asynchronous commits from the transaction log state store, i.e. an SQS queue that triggers a lambda to
perform the updates. However, in this case we do not want it to be a FIFO queue as we want to be able to make
concurrent updates. We can set the maximum concurrency of the lambda to control the number of simultaneous updates to
the state store.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum concurrency would be shared between all Sleeper tables. If a larger number of tables were actively updated, we wouldn't necessarily get this effect of controlling the simultaneous updates to a state store, because we'd need to set it high enough for all the tables. It seems unlikely that would cause a problem though.


### Transactions

We can use the serializable transaction level. Different operations on the state store will require different types
of implementation - some can use prepared statements, some will use PLPGSQL.

See the PostgreSQL manual on transaction isolation levels: https://www.postgresql.org/docs/current/transaction-iso.html