Recently, I’ve needed to add concurrency protections to several HTTP APIs.
One endpoint was for updating a schedule, where incident.io customers would tell us about who was on-call and what hours they worked so we could generate a pay report. It would suck if another user was modifying the same schedule – maybe just to add a single user – and pressed save, just to remove all your hard work.
Another – very different – situation was for our terraform provider, which can be used to manage the schema of something in our Catalog (e.g. add an attribute on Team for their Slack channel) and wanted to read-then-write to the schema.
In that case, it was possible to:
- Read schema with columns A B
- Build new schema with new column C
- Write schema with A B C
And miss that some other process had added column S (for sad) after (1) but before (3).
In both situations, you can implement a compare-and-swap at the HTTP layer to prevent collisions. It’s pretty simple once you get it, and compare-and-swap primitives can be composed to build some pretty cool things (distributed locks, anyone?) so it’s worth having in your toolbox.
Here’s how it works.
Versioning
The concept behind compare-and-swap is really simple: it just means “set my value to be X if it is currently Y”, making it one of the most “what-it-says-on-the-tin” concepts in computer science.
We can see how this looks in the incident.io API on the “Update catalog type schema” endpoint, which is what powers the terraform I mentioned in the previous example.
This endpoint allows setting new schema attributes for a catalog type, say adding a ‘Slack channel’ attribute to a ‘Team’ type.
An example payload would be:
POST /v2/catalog_types/:id/actions/update_type_schema
{
"version": 3,
"attributes": [
{
"id": "01GW2G3V0S59R238FAHPDS1R66",
"name": "Linear team",
"type": "LinearTeam"
},
{
"name": "Slack channel",
"type": "SlackChannel"
}
],
}
Where we want to update the schema to preserve the existing ‘Linear team’ (which is why we provide the attribute’s ID) and add a new ‘Slack channel’ attribute.
Ignoring the attributes for now, the part that’s important to us is version
:
that’s what we’re using to provide the concurrency protection. Obviously
numeric, you can probably guess it increases monotonically, but what is it?
The idea of version
is to help with a read-then-write process where we expect
anyone using this API to have read the resource just before they decide to write
to it. When you GET /catalog_types/:id
the response will contain the current
version of the schema, and the idea is you provide that version
when making a
call to update the schema.
The update will only be accepted if the version provided matches the current version of the schema, otherwise you’ll get this back and the update will fail:
{
"type": "validation_error",
"status": 422,
"request_id": "2LWe--BY",
"errors": [
{
"code": "invalid_value",
"message": "Catalog type schema version mismatch. Provided: 350, Current: 351",
"source": {
"pointer": "schema.version"
}
}
]
}
If you’re building on top of the API, you can periodically check the version to determine if someone has modified the resource your user is working on and warn them. Or you can just error on save, to protect someone from unintentionally undoing other changes.
But how does it work?
How does it work?
You can implement an API endpoint that just checks the version and errors if it mismatches, but the naive approach won’t be 100% reliable.
An example is:
// This example is only partially correct: look below for a better one!
func UpdateTypeSchema(
ctx context.Context, db *gorm.DB, identity *rbac.Identity, payload *catalog.UpdateTypeSchemaPayload,
) (*domain.CatalogType, error) {
// (1)
catalogType, err := domain.NewQuerier[domain.CatalogType](db).
Where(domain.CatalogTypeBuilder(
domain.CatalogTypeBuilder.OrganisationID(identity.Organisation.ID),
domain.CatalogTypeBuilder.ID(payload.Id),
)).
First(ctx)
if err != nil {
return nil, errors.Wrap(err, "finding catalog type")
}
// (2)
if catalogType.Schema.Version != payload.Version {
return nil, errors.InvalidValue(
"version", errors.New("Catalog type schema version mismatch. Provided: %d, Current: %d",
payload.Version, catalogType.Schema.Version),
)
}
// Do the update...
}
This will no doubt help: if you have several people trying to change the schema but you process them one after another, the first will succeed and the second will fail.
But if you’re expecting many concurrent updates or burst of simulatneous activity, this code is still racey. What happens if these requests are interleaved is:
- User A: Reads catalog type at (1)
- User B: Reads catalog type at (1)
- User A: Checks version (2): passes!
- User B: Checks version (2): passes!
Both requests have passed the version check, which means both will write to the schema and only the last of thoese requests is going to have its write persisted. Oops.
To fix this problem, we’re going to lean on our database to lend us the coordination that can make our compare-and-swap safe. In this example we’ll assume Postgres, and you may find it useful to read “What developers find surprising about Postgres transactions” as a primer.
This code will guarantee we enforce our version check, even with many concurrent requests:
// This example is only partially correct: look below for a better one!
func UpdateTypeSchema(
ctx context.Context, db *gorm.DB, identity *rbac.Identity, payload *catalog.UpdateTypeSchemaPayload,
) (*domain.CatalogType, error) {
// BEGIN;
return database.Transaction(ctx, db,
func(ctx context.Context, db *gorm.DB) (*domain.CatalogType, error) {
// (1)
catalogType, err := domain.NewQuerier[domain.CatalogType](db).
Clauses(clause.Locking{Strength: "UPDATE"}). // lock within transaction
Where(domain.CatalogTypeBuilder(
domain.CatalogTypeBuilder.OrganisationID(identity.Organisation.ID),
domain.CatalogTypeBuilder.ID(payload.Id),
)).
First(ctx)
if err != nil {
return nil, errors.Wrap(err, "locking catalog type")
}
// (2)
if catalogType.Schema.Version != payload.Version {
return nil, errors.InvalidValue(
"version", errors.New("Catalog type schema version mismatch. Provided: %d, Current: %d",
payload.Version, catalogType.Schema.Version),
)
}
// Do the update...
return catalogType, nil
},
)
}
There’s just two differences:
- Before we load the catalog type, we begin a transaction. This is important as we’re about to take a row lock and want to hold the lock for the duration of the transction.
- Then when we load the catalog type we use a
SELECT ... FOR UPDATE
.
The FOR UPDATE
row lock is designed exactly for this: you should apply this
lock when you’re reading a row that you intend – later in the same transaction –
to write, and it’ll guarantee you serialise an operation wrt that resource.
Back to our interleaving requests, we have a different outcome now:
- User A: Reads catalog type at (1)
- User B: Attempts to reads catalog type at (1) but BLOCKS
- User A: Checks version (2): passes!
- User A: Increments version, writes catalog type, commit’s transaction
- User B: Unlocked! Reads latest catalog type (with incremented version)
- User B: Checks version (2): fails!
Now even if many requests process simultaneously, we’ll extend the concurrency mechanisms we have available in our database to those HTTP requests. Pretty neat!
You’ll find this everywhere
Whenever an API exists on a system with access to a consistent store, you can extend that consistency through compare-and-swap to your consumers. Most web applications fit this description, and it’s likely you’ll find uses for this technique in your own work.
It’s cool how much you can build on top of a compare-and-swap, though.
When at GoCardless, we ran our HA Postgres clusters using Stolon. HA database systems invariably need some level of consensus and leader election and Stolon provides this through tools like etcd, which offer leasing out-the-box.
But lots of people run Stolon in Kubernetes, and Kubernetes is powered by etcd under-the-hood. As it turns out, Kubernetes ConfigMap’s offer a very similar compare-and-swap mechanism and you can build more complex abstractions like leader-election on top of it, all powered by the underlying etcd consistency.
That’s why you’ll find a store/k8s.go
implemention in Stolon that
can be used as a shoe-in for etcd. Awesome news for those who have Kubernetes
but don’t want to run an etcd cluster (for very good reasons!).
Once you know it, you’ll see this technique everywhere. It’s likely someone on your team has built a distributed lock with GCS at some point, again powered by Google’s ability to provide strong global consistency across a subset of their APIs.
Hopefully you can use this now, too!
If you liked this post and want to see more, follow me at @lawrjones.