Perform atomic operations without leaking transactions into your business logic
Writing a repository that performs database operations inside a transaction without leaking its implementation details into my business logic has been a recent obsession of mine.
In Go projects that follow Hexagonal or Clean Architecture, you’ll see the separation of concerns along package boundaries. Code that powers the business logic is decoupled from code responsible for data storage through the use of interfaces.
Imagine an application that enrolls students in courses. The business logic might represent the association of courses and students as classes, and validate certain constraints on these classes. For example, by ensuring they’re not oversubscribed.
To do this, it needs access to the course and student data, but it shouldn’t care whether that data source is an SQL database, in-memory cache, or an intern typing in data at a terminal. The business logic cares only that any data source it’s given response to the methods it needs to enroll students.
Usually, this takes the form of a Repository
interface. I’ve omitted the definitions of struct types such as Class
, Course
, and Student
for brevity.
// Business logic
package class
type Repository interface {
GetClassByCourseID(ctx context.Context, courseID int64) (Class, error)
EnrollStudents(ctx context.Context, c Course, s Students) (Class, error)
}
Which is satisfied by concrete types in a repository
package:
// Data storage logic
package repository
// Imports omitted
type ClassRepository struct {
db *sqlx.dB
}
// Verify that ClassRepository satisfies class.Respository.
var _ class.Repository = (*ClassRepository)(nil)
func (cr *ClassRepository) GetClassByCourseID(ctx context.Context, courseID int64) (Class, error) {
// Code to contruct the class object from the course and students tables.
}
func (cr *ClassRepository) EnrollStudents(ctx context.Context, c Course, s Students) (Class, error) {
// Code to enroll students, inserting into courses table, students table and a join table.
}
The ClassRepository
concrete type knows how to interact with an SQL database. It implements the class.Repository
interface so that the business logic can use it without knowing that there’s a relation database under the hood.
At application start-up, we can instantiate a new class Service
to handle our business needs, passing it any concrete thing of interface type Repository
.
// Business logic
package class
type Repository interface {
GetClassByCourseID(ctx context.Context, courseID int64) (Class, error)
EnrollStudents(ctx context.Context, c Course, s Students) (Class, error)
}
type Service struct {
repo Repository
}
func NewService(repo Repository) *Service {
return &Service{repo: repo}
}
func (s *Service) Enroll(ctx context.Context, req EnrollmentRequest) error {
// Business logic calls repository methods when it needs data from the store
class, err := s.repo.GetClassByCourseID(ctx, req.CourseID)
if err != nil {
return err
}
// ...
}
This is a clean way to handle simple interactions with a data store.
This pattern breaks down when you need to perform a sequence of actions atomically. That is, when you need multiple repository operations to happen inside a single transaction.
Let’s say you want to validate that a course has capacity for incoming students before enrolling them. Using this pattern, you would retrieve the course and student data via the repository in the form of a Class
object, check its capacity against the length of the students in the enrollment request plus the number of students already enrolled and return an error if there aren’t enough spaces in the class.
But what if a concurrent enrollment request completes after you’ve retrieved the class data from the store, but before you enroll the new students? If this second request takes the class to capacity, and our first goroutine enrolls more students, our course ends up silently oversubscribed. Our initial enrollment request has no way of knowing that the number of students enrolled in the course has changed, and no way of preventing this from happening while it performs its own enrollment.
We need to perform these steps atomically. Getting the class data, validating that it has capacity, and persisting the enrollments must happen inside a single transaction.
That’s a problem. Only the repository layer knows how to manage transactions, and we don’t want to move our business logic into the repository. We could define the repository interface so that its methods return and accept transactions, but this would leak the implementation details of the data store into the business logic.
// Business logic
package class
type Repository interface {
Begin(ctx context.Context) (*sql.Tx, error) // Uh oh - the business logic shouldn't know anything about SQL.
GetClassByCourseID(ctx context.Context, tx *sql.Tx, courseID int64) (Class, error)
EnrollStudents(ctx context.Context, tx *sql.Tx, c Course, s Students) (Class, error)
}
On the other hand, the requirement that these operations happen atomically is a business requirement. The database doesn’t care if a course is oversubscribed. It’s a dumb store of bytes, and even if it wasn’t, we don’t want to end up in a situation where we have to implement new enrollment validation for every repository we might use in the future. That defeats the point of our Repository
interface.
How do we keep our validations in the business logic while ensuring that all enrollments happen atomically?
How about defining validations in the business layer, and injecting them into the repository with a single method call, which then runs all the enrollment operations atomically?
// Business logic
package class
type Validator func(Class) error
type Repository interface {
Enroll(ctx context.Context, req EnrollmentRequest, validators []Validator) error
}
func ValidateCourseHasCapacity(class Class) error {
if class.Course.Capacity < len(class.Students) {
return errors.New("class is oversubscribed")
}
return nil
}
// Service definition and constructor omitted.
func (s *Service) Enroll(ctx context.Context, req EnrollmentRequest) error {
// Business logic injects validations into the repository.
if err := s.repo.Enroll(ctx, req, []Validator{ValidateCourseHasCapacity}); err != nil {
return err
}
return nil
}
This looks neat and tidy, but we’re leaking the business domain into the repository layer by expecting the repository to know when to call the validations.
As a result, there are some confusing ambiguities in the validation code. For example, how do we know whether len(class.Students)
on line 11 is referring to the number of students already in the class or the number of students after we’ve added the enrolling students too?
Onwards.
We’ve established that transactionality is a business conern, so let’s bring transactions into our business layer in a way that doesn’t expose the implementation details of the transactions. The way to do this is with an interface.
// Business logic
package class
type Tx interface {
Commit() error
// Implementors must guarantee that Rollback leaves the repository in its clean,
// pre-transaciton state even in the event of an error.
// Calling Rollback after Commit returns an error.
Rollback() error
}
type Repository interface {
Begin(ctx context.Context) (Tx, error)
GetClassByCourseID(ctx context.Context, tx Tx, courseID int64) (Class, error)
EnrollStudents(ctx context.Context, tx Tx, c Course, s Students) (Class, error)
}
// Service definition and constructor omitted.
func (s *Service) Enroll(ctx context.Context, req EnrollmentRequest) error {
// Business logic starts and controls a transaction of interface type Tx.
tx, err := s.repo.Begin(ctx)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
class, err := s.repo.GetClassByCourseID(ctx, tx, req.CourseID)
if err != nil {
return err
}
// ...
if err := tx.Commit(); err != nil {
return err
}
return nil
}
Now we’re getting somewhere. It still feels odd that our business logic has to manipulate a Tx
object, but we’ve decoupled the use of the transaction from its implementation, so we no longer care if it’s a SQL database transaction or even something that’s just pretending to be a transaction. We can now create mock implementations of Tx
for unit testing. This is a great improvement.
The story on the repository side isn’t quite so happy:
// Data storage logic
package repository
// Imports omitted
type ClassRepository struct {
db *sqlx.dB
}
// Verify that ClassRepository satisfies class.Respository.
var _ class.Repository = (*ClassRepository)(nil)
func (cr *ClassRepository) Begin(ctx context.Context) (class.Tx, error) {
tx, err := cr.db.Beginx(ctx)
if err != nil {
return nil, err
}
// The db's concrete tx implements Commit and Rollback,
// so it already satisfies class.Tx.
return tx, nil
}
func (cr *ClassRepository) GetClassByCourseID(ctx context.Context, tx class.Tx, courseID int64) (Class, error) {
// Type assertion.
sqlTx, ok := tx.(*sqlx.Tx)
if !ok {
return class.Class{}, errors.New("class.Tx must have concrete type *sqlx.Tx")
}
// ...
}
// ...
As a result of requiring our respository methods to accept the interface type class.Tx
, we must perform a type assertion on the interface to get the concrete transaction type back again before doing any work on the database tables.
This isn’t unreasonable. The type conversion will only fail if we do something monumentally stupid, like mixing up the transactions of two separate repositories with different underlying data stores. It is ugly though. By making our business logic handle transaction objects, we create a lot of boilerplate. We can do better.
Telling a repository that it should run its operations atomically is a business concern, but we don’t want the awkward type assertions that come with managing a transaction from the business layer.
Let’s define a repository that can manage its own transactions, starting with its interface in the business layer:
// Business logic
package class
// Transactor represents an object that creates and manages a transaction.
type Transactor interface {
Begin(ctx context.Context) error
Commit() error
// Implementors must guarantee that Rollback leaves the repository in its clean,
// pre-transaction state even in the event of an error.
// Calling Rollback after Commit returns an error.
Rollback() error
}
type AtomicRepository interface {
Transactor
GetClassByCourseID(ctx context.Context, courseID int64) (Class, error)
EnrollStudents(ctx context.Context, c Course, s Students) (Class, error)
}
// Service definition and constructor omitted.
func (s *Service) Enroll(ctx context.Context, req EnrollmentRequest) error {
// Business logic starts triggers a transaction, but doesn't manage it.
if err := s.repo.Begin(ctx); err != nil {
return err
}
defer func() { _ = s.repo.Rollback() }()
// Repo operations run against the internal transaction.
class, err := s.repo.GetClassByCourseID(ctx, req.CourseID)
if err != nil {
return err
}
// ...
if err := s.repo.Commit(); err != nil {
return err
}
return nil
}
AtomicRepository
embeds a Transactor
, which is some object that can begin, commit and roll back transactions. Notice that no transaction is returned or accepted by these methods. The management of the transaction itself is an implementation detail. This allows the business logic to call AtomicRepository.Begin
followed by any other repo methods it likes, safe in the knowledge that the repository’s operations are now atomic. When done, the business logic calls Commit
.
The method naming – Begin
, Commit
, Rollback
– is the terminology we associate with SQL databases, but if you want to emphasise that that this is an interface representing any store of data, you could use Start
, Save
, Undo
, or Open
, Write
, Cancel
. It doesn’t matter. The naming declares an action, not an implementation.
The result is that the business logic no longer has to wrangle a transaction object, and the repository no longer has to perform type assertions because it’s not accepting transaction interfaces at call sites. Instead, it retains the transaction internally. Let’s look at how that’s implemented:
package repository
var (
// ErrTransactionInProgress is returned when more than one concurrent
// transaction is attempted.
ErrTransactionInProgress = errors.New("transaction already in progress")
// ErrTransactionNotStarted is returned when an atomic operation is
// requested but the repository hasn't begun a transaction.
ErrTransactionNotStarted = errors.New("transaction not started")
)
// transactor manages the lifecycle of a transaction.
// Satisfies class.Transactor.
type transactor struct {
db *sqlx.DB
tx *sqlx.Tx
}
var _ class.Transactor = (*transactor)(nil)
// Begin starts a new transaction.
func (t *transactor) Begin(ctx context.Context) error {
if t.tx != nil {
return ErrTransactionInProgress
}
tx, err := t.db.Beginx(ctx)
if err != nil {
return err
}
t.tx = tx
return nil
}
// Commit and Rollback implementations omitted.
type AtomicClassRepository struct {
*transactor
}
// Verify that AtomicClassRepository satisfies class.AtomicRespository.
var _ class.AtomicRepository = (*AtomicClassRepository)(nil)
func (acr *AtomicClassRepository) GetClassByCourseID(ctx context.Context, courseID int64) (Class, error) {
if acr.tx == nil {
return class.Class{}, ErrTransactionNotStarted
}
// Use internal transaction to perform DB operations.
}
// ...
We define the unexported type transactor
, which has both database and transaction fields, and implements all the methods required by class.Transactor
. When we call (*transactor).Begin
, it creates a new transaction from its database and stores it on the struct.
AtomicClassRepository
embeds a *transactor
, so that when repository methods are called, it first checks the embedded transaction field for an active transaction and, if one exists, it performs the desired database operations using the embedded transaction.
Thus, our business logic has control over when to start and end a transaction, but doesn’t have to manage either concrete or interface transaction types, which is the responsibility of the repository layer.
Looks good? Well, no. There’s a catastrophic issue.
As you know, our class.Service
struct, which defines our business methods like Enroll
, has a repo
field of type AtomicRepository
that provides access to all the methods our Service
needs to interact with a data store.
class.Service
will be passed around our program as a pointer, *class.Service
. A single service is shared by all goroutines handling enrollment requests.
Which means that more than one simultaneous enrollment request will result in concurrent calls to the same repository.
Which contains a single, unprotected transaction.
When this happens — and it will happen — all bets are off. You might get an amalgamation of enrollment requests written to the database in a single transaction. You might get TransactionInProgress
errors in goroutines that haven’t called Begin
yet. You will certainly trash your database.
Let’s take stock. We don’t want our business logic to leak into the repository. We don’t want the repository’s implementation details leaking into the business logic. We must run business operations atomically, but we don’t want to handle a transaction object in the business layer, and we don’t want to perform type assertions on transaction interfaces in the repository layer. More than any of this, we want our reads and writes to the repository to be thread-safe.
We’re 90% of the way there. There’s one more trick to reveal.
Traditionally, repositories are long-lived objects that get created at program start and cleaned up when the application exits.
In this paradigm, a repository can’t hold an unprotected reference to a transaction, because concurrent access from the program’s many request handlers will clobber it. Locking the transaction reference with a mutex is futile, because it would effectively reduce our pool of database connections to 1, and our program would chug as requests queue up waiting to acquire the lock.
But if we start thinking about a repository as a mayfly that briefly lives, fulfils its purpose, then dies, these problems go away.
Instead of one long-lived repository, we can instantiate a new repository for every enrollment request. This repository manages a single transaction in a single goroutine. When we’re done with the transaction, we throw away the whole repository.
From the business perspective:
// Business logic
package class
type Transactor interface {
Begin(ctx context.Context) error
Commit() error
// Implementors must guarantee that Rollback leaves the repository in its clean,
// pre-transaction state even in the event of an error.
// Calling Rollback after Commit returns an error.
Rollback() error
}
type AtomicRepository interface {
Transactor
GetClassByCourseID(ctx context.Context, courseID int64) (Class, error)
EnrollStudents(ctx context.Context, c Course, s Students) (Class, error)
}
type AtomicRepositoryFactory func() AtomicRepository
type Service struct {
newRepo AtomicRepositoryFactory
}
func NewService(repoFactory AtomicRepositoryFactory) *Service {
return &Service{newRepo: repoFactory}
}
func (s *Service) Enroll(ctx context.Context, req EnrollmentRequest) error {
// Create a new AtomicRepository scoped to Enroll.
repo := s.newRepo()
// Business logic triggers a transaction but doesn't manage it.
if err := repo.Begin(ctx); err != nil {
return err
}
defer func() { _ = repo.Rollback() }()
// Repo operations run against the internal transaction.
class, err := repo.GetClassByCourseID(ctx, req.CourseID)
if err != nil {
return err
}
// ...
if err := repo.Commit(); err != nil {
return err
}
return nil
// repo goes out of scope and is thrown away.
}
Instead of holding an AtomicRepository
on the Service
, we now store an AtomicRepositoryFactory
. When called, AtomicRepositoryFactory
returns a new AtomicRepository
which is scoped to the current function. The transaction that we begin on this repository instance is protected from all outside influence, and other goroutines are free to instantiate their own repositories, making use of the full pool of database connections.
Let’s implement AtomicRepository
and its factory func in the repository layer:
package repository
// transactor manages the lifecycle of a transaction.
// Satisfies class.Transactor.
type transactor struct {
db *sqlx.DB
tx *sqlx.Tx
}
var _ class.Transactor = (*transactor)(nil)
// Begin, Commit and Rollback implementations omitted.
type AtomicClassRepository struct {
*transactor
}
// Verify that AtomicClassRepository satisfies class.AtomicRespository.
var _ class.AtomicRepository = (*AtomicClassRepository)(nil)
func NewAtomicClassRepositoryFactory(db *sqlx.DB) class.AtomicRepositoryFactory {
return func() class.AtomicRepository {
return &AtomicClassRepository{
transactor: &transactor{db: db}
}
}
}
// class.AtomicRepository methods...
NewAtomicClassRepositoryFactory
returns the AtomicRepositoryFactory
that our class service will use to instantiate a new repository for each request. The repository itself is featherweight. Creating a repo requires the creation of two new pointers, so unless you’re working with a highly performance-sensitive application, instantiating short-lived repositories won’t be a bottleneck of concern.
Notice that the AtomicRepositoryFactory
we return from NewAtomicClassRepositoryFactory
is a closure around a pointer to the underlying database, which is a long-lived, thread-safe object. The closure allows us to pass a single database reference at application start and then forget about the database for the rest of the program’s runtime. There’s no need to pass a new reference to the database every time we create a new repository using the factory.
Hooking this all up in main.go
is simple:
package main
import (
"log"
"yourapp/repository"
"yourapp/class"
)
func main() {
if err := run(); err != nil {
log.Fatal(err)
}
}
func run() error {
db, err := repository.NewDB()
if err != nil {
return err
}
defer func() { _ = db.Close() }()
repoFactory := repository.NewAtomicClassRepositoryFactory(db)
classService := class.NewService(repoFactory)
// ... Hook up classService to transports, etc.
return nil
}
Using a factory function to create single-use repositories gives us even more power to abstract transactions away from the business layer.
For example, you might decide that every new repository should start life with an active transaction, eliminating the need for business code to call Begin on the repo.
I opted against this approach, because it’s easy to imagine this behaviour surprising other developers. Starting a transaction also takes a database connection from the pool. An unsuspecting colleague might instantiate a repo and save it for later, unaware that they’re leaking a connection.
Beware of making transaction management too implicit. You shouldn’t need to understand a repository’s internals to use it safely.
EDIT: This final suggestion comes from my colleagues at Qonto. I’d already published the piece, but it’s too good not to include.
Now we’re comfortable with the idea of repositories that we instantiate to perform a single task, let’s think about how we can lower the Transactor interface out of our business logic altogether. Although our current approach of manually beginning and ending transactions in the business layer is data store-agnostic, we would eliminate noise if we somehow got the repository to do it for us.
To pull this off, we two repository interfaces and a function type.
// Business logic
package class
type AtomicOperation func(context.Context, Repository) error
type AtomicRepository interface {
Execute(context.Context, AtomicOperation) error
}
type Repository interface {
GetClassByCourseCode(ctx context.Context, courseCode string) (Class, error)
GetStudentsByEmail(ctx context.Context, emails []primitive.EmailAddress) (Students, error)
EnrollStudents(ctx context.Context, c Course, s Students) (Class, error)
}
// Service definition omitted.
New on line 4: AtomicOperation
, a function type that takes a context and a non-atomic Repository
and returns an error.
On line 6, we define AtomicRepository
, which has a single method: Execute
. Execute
accepts an AtomicOperation
and, you guessed it, executes it atomically.
Line 10 is our familiar Repository
. Something of this type gets passed into an AtomicOperation
when the operation is called.
Here’s how we use these types:
// Business logic
package class
// Repository definitions omitted.
type Service struct {
atomicRepo AtomicRepository
}
func NewService(atomicRepo AtomicRepository) *Service {
return &Service{atomicRepo: atomicRepo}
}
func (s *Service) Enroll(ctx context.Context, req EnrollmentRequest) error {
enroll := func(ctx context.Context, repo Repository) error {
class, err := repo.GetClassByCourseCode(ctx, req.CourseCode)
if err != nil {
return fmt.Errorf("Enroll: %w", err)
}
// Perform other business logic, validations, etc.
return nil
}
if err := svc.atomicRepo.Execute(ctx, enroll); err != nil {
return err
}
return nil
}
Service
is now instantiated with a long-lived AtomicRepository
, but we don’t encounter our vanilla Repository
until line 15, where we define a function inside Enroll
. This function is an AtomicOperation
– it takes a context.Context
and a Repository
as arguments and returns an error. Inside this function, we behave as if we’re running inside a transaction. There’s no mention of Begin
or Commit
; we call Repository
methods as we please.
After defining this function, we pass it to the Service
’s AtomicRepository
to execute on line 26. This is the equivalent of saying “run this function inside a transaction”. It’s then up to the AtomicRepository
to make that happen.
package repository
// tableOperator represents the methods required to interact with database
// tables. Typically this is satisfied by both databases and transactions.
type tableOperator interface {
Execer
Queryer
}
// beginner represents something that can begin a transaction, usually a
// database.
type beginner interface {
Begin(ctx context.Context) (*sqlx.Tx, error)
// Note: *sqlx.Tx satisfies tableOperator. To improve testability,
// you could define Begin to return a custom transaction interface
// type that also satisfies tableOperator.
}
// AtomicClassRepository satisfies class.AtomicRepository and is capable
// of beginning transactions.
type AtomicClassRepository struct {
beginner beginner
}
var _ class.AtomicRepository = (*AtomicClassRepository)(nil)
// NewAtomic instantiates a new AtomicClassRepository using the beginner provided.
func NewAtomic(b beginner) *AtomicClassRepository {
return &AtomicClassRepository{beginner: b}
}
// ClassRepository satisfies class.Repository. It is agnostic as to whether
// its tableOperator is a database or transaction.
type ClassRepository struct {
operator tableOperator
}
var _ class.Repository = (*ClassRepository)(nil)
func (cr *ClassRepository) GetClassByCourseCode(
ctx context.Context,
courseCode string,
) (class.Class, error) {
// Query courses and students tables using cr.operator.
}
// Other class.Repository method implementations omitted.
There’s a lot going on here, so I’ll step you through it.
tableOperator
(line 5) is an interface that describes any object capable of interacting with a database table. Queryer
and Execer
is a naming convention used by the standard library sql package, so I’ve omitted the implementations of these simple interfaces for brevity. In practice, a tableOperator
may be either a database or a transaction. You’ll see why this is useful shortly.
beginner
is any object that can begin a transaction. Begin
returns a concrete *sqlx.Tx
, which implements tableOperator
out of the box, but you can improve the testability of this package by implementing your own tx
interface type which also satisfies tableOperator
.
AtomicClassRepository
(line 21) satisfies class.AtomicRepository
. We’ll see what its Execute
method looks like in a second. It’s instantiated with a beginner. That is, an object that can start transactions.
ClassRepository
is an implementation of class.Repository
backed by a tableOperator
. That means we can instantiate ClassRepository
s with either a transaction or a database, and provided that they implement tableOperator
, the repository won’t know the difference.
Here’s the missing puzzle piece: let’s connect our AtomicClassRepository
, which knows about transactions, to our ClassRepository
, which doesn’t.
package repository
// Execute decorates the given AtomicOperation with a transaction. If the
// AtomicOperation returns an error, the transaction is rolled back. Otherwise,
// the transaction is committed.
func (ar *AtomicClassRepository) Execute(
ctx context.Context,
op class.AtomicOperation,
) error {
tx, err := ar.beginner.Begin(ctx)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
// Create a new single-use ClassRepository backed by the transaction.
classRepoWithTransaction := ClassRepository{operator: tx}
// Perform the AtomicOperation using the ClassRepository.
if err := op(ctx, &classRepoWithTransaction); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
Execute
begins a transaction and defers a rollback in case anything goes wrong. Rolling back a committed transaction will have no effect.
It then instantiates a short-lived ClassRepository
, passing it the newly created transaction, which satisfies tableOperator
. If we want to make non-atomic class-related database queries elsewhere in our code, we can do that by skipping AtomicClassRepository
altogether and passing ClassRepository
a database that implements tableOperator
instead of a transaction. ClassRepository
has no knowledge of whether it’s acting atomically or not.
We then call the class.AtomicOperation
that Execute
received from the business logic, passing in the ClassRepository
instance containing the live transaction. The AtomicOperation
that we defined in class.Enroll
makes calls to this repository without needing to know anything about the transaction.
When the AtomicOperation
returns, Execute
either commits or rolls back the transaction, and the classRepository
is thrown away, just like our “mayfly” repositories in Attempt 4.
This approach hides the transactional complexity from the business layer, at the expense of a little more interface complexity in the repository layer.
There you have it. Two clean, thread-safe ways to handle atomic database operations while respecting the separation of concerns between business logic and repositories.
The takeaways are: