Overview

Sam is a library providing synchronization primitives for asio based application.

The primitives are modeled on common concepts, as listed below:

Table 1. List of Primitives
Sam C++ Standard

Mutex

std::mutex

Condition Variable

std::condition_variable

Semaphore

std::counting_semaphore

Barrier

std::barrier[1]

Any

Note
This library is not meant as the default method of synchronization of asio-based apps, but to be used by library developers.

Config

Sam is by default considered a part of boost, thus it will use boost/asio.hpp. it is however possible to use it standalone, by defining BOOST_SAM_STANDALONE. In standalone mode the namespace will be sam and it will include standalone asio.

Sam is compiled by default, but a header only mode can be switched on by defining BOOST_SAM_HEADER_ONLY.

The user needs can include boost/sam/src.hpp as an alternative to linking.

Tutorial

When should I use this?

asio itself bring synchronization mechanism, mainly strands, that will assure there’s no overlap between two async operations.

This however only works if the entire operation is to be locked, as it does not give the user finer tools to have exclusive access only for some time during the operation.

This is the gap this library fills, i.e. giving a user tools to synchronize parts of composed operations where this fine-tuning is needed.

E.g. to implement a connection pool, where we only lock acquiring the connection object, but not the entire op.

Another use case is for synchronizing completions.

Semaphore example

The semaphore is the simplest synchronization method, it allows notifying workers trying to acquire a resource.

asio::awaitable<void> work(sam::semaphore & sem)
{
    co_await sem.async_acquire(asio::use_awaitable);
    co_await do_work();
}
Threading Mode

Sam deduces the threading mode from the executor following the concurrency_hint by asio, unless explicitly set.

The objects allow setting the concurrency_hint manually, as well.

See the asio reference for details.

In the documentation we’ll refer to the mode as single-threaded and multi-threaded mode.

Reference

Barrier

/// An asio based barrier modeled on `std::barrier`.
template<typename Executor = net::any_io_executor>
struct basic_barrier
{
    /// The executor type.
    using executor_type = Executor;

    /// The executor to be used by the barrier.
    explicit basic_barrier(executor_type exec, std::ptrdiff_t init_count,
                           int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// A constructor.
    /// ctx The execution context used by the barrier.
    template<typename ExecutionContext>
    explicit basic_barrier(ExecutionContext & ctx, std::size_t init_count,
                           int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// Rebind a barrier to a new executor - this cancels all outstanding operations.
    template<typename OtherExecutor>
    basic_barrier(basic_barrier<OtherExecutor> && sem);

    /// Arrive at a barrier and wait for all other strands to arrive. (1)
    template < net::completion_token_for<(void(error_code))> CompletionToken >
    auto async_arrive(CompletionToken &&token = net::default_token<executor_type>);

    /// Move assign a barrier.
    basic_barrier& operator=(basic_barrier&&) noexcept = default;

    /// Move assign a barrier with a different executor.
    template<typename Executor_>
    basic_barrier & operator=(basic_barrier<Executor_> && sem);

    /// Delete copy assignment
    basic_barrier& operator=(const basic_barrier&) = delete;

    /// Try to arrive - that is arrive immediately if we're the last thread.
    bool try_arrive();

    /// Arrive synchronously. This may fail depending on the implementation. (2)
    void arrive(error_code & ec);
    void arrive();

    /// Rebinds the barrier type to another executor.
    template <typename Executor1>
    struct rebind_executor
    {
        /// The barrier type when rebound to the specified executor.
        typedef basic_barrier<Executor1> other;
    };

    /// return the default executor.
    executor_type  get_executor() const noexcept;
};

// basic_barrier with default executor.
using barrier = basic_barrier<>;

async_arrive

The async arrive function follows the asio completion token pattern, including default tokens.

This function will not block, but return immediately.

arrive

In single-threaded mode this will generate an error of net::error::in_progress if it cannot complete immediately.

In multi-threaded mode this function will block until other threads arrive. Note that this may lead to deadlocks.

Warning
You must never use the synchronous functions from within an asio event-queue.

Condition Variable

/// An asio based condition variable modeled on `std::condition_variable`.
template<typename Executor = net::any_io_executor>
struct basic_condition_variable
{
    /// The executor type.
    using executor_type = Executor;

    /// A constructor. @param exec The executor to be used by the condition variable
    explicit basic_condition_variable(executor_type exec,
                           int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// Construct from an execution context, which is to be used by the condition variable.
    template<typename ExecutionContext>
    explicit basic_condition_variable(ExecutionContext & ctx,
                           int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// Rebind a condition_variable to a new executor.
    template<typename Executor_>
    basic_condition_variable(basic_condition_variable<Executor_> && sem;

    ///Wait for the condition_variable to become notified. (1)
    template < net::completion_token_for<void(error_code)> CompletionToken >
    auto async_wait(CompletionToken &&token = net::default_token<executor_type>);

    /// Wait for the condition_variable to become notified & the predicate to return true. (2)
    template < typename Predicate,
               net::completion_token_for<void(error_code)> CompletionToken >
    auto async_wait(Predicate && predicate,
                    CompletionToken &&token = net::default_token<executor_type>);

    /// Move assign a condition_variable.
    basic_condition_variable& operator=(basic_condition_variable&&) noexcept = default;

    /// Move assign a condition_variable with a different executor.
    template<typename Executor_>
    basic_condition_variable& operator=(basic_condition_variable< Executor_> && sem);

    /// Delete copy assignment
    basic_condition_variable& operator=(const basic_condition_variable&) = delete;

    /// Notify/wake up one waiting operations.
    void notify_one();
    /// Notify/wake up all waiting operations.
    void notify_all();
    /// Rebinds the mutex type to another executor.
    template <typename Executor1>
    struct rebind_executor
    {
        /// The mutex type when rebound to the specified executor.
        typedef basic_condition_variable<Executor1> other;
    };

    /// return the default executor.
    executor_type get_executor() const noexcept {return exec_;}
};

/// basic_condition_variable with default executor.
using condition_variable = basic_condition_variable<>;

wait for notification

You can wait for a notification with async_wait(token). This operation will may complete on a call to notify_one and will complete on a call to notify_all.

wait with predicate

Waiting with a predicate will complete when notified and if the predicate returns true`. The predicate will be invoked from within the executor provided to the condition_variable.

Mutex

/// An asio based mutex modeled on `std::mutex`.
template<typename Executor = net::any_io_executor>
struct basic_mutex
{
    /// The executor type.
    using executor_type = Executor;

    /// Construct from an executor to be used by the mutex.
    explicit basic_mutex(executor_type exec,
                         int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// Consturct a mutex from an execution context to be used by the mutex.
    template<typename ExecutionContext>
    explicit basic_mutex(ExecutionContext & ctx,
                         int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// Rebind a mutex to a new executor.
    template<typename Executor_>
    basic_mutex(basic_mutex<Executor_> && sem);

    /// Wait for the mutex to become lockable & lock it. (1)
    template < net::completion_token_for<void(error_code)> CompletionToken >)
    auto async_lock(CompletionToken &&token = net::default_token<executor_type>);.

    /// Move assign a mutex.
    basic_mutex& operator=(basic_mutex&&) noexcept = default;

    /// Move assign a mutex with a different executor.
    template<typename Executor_>
    basic_mutex & operator=(basic_mutex<Executor_> && sem);

    /// Lock synchronously. This may fail depending on the implementation. (2)
    void lock(error_code & ec);
    void lock();
    /// Unlock the mutex, and complete one pending lock if pending.
    void unlock();

    ///  Try to lock the mutex.
    bool try_lock();
    /// Rebinds the mutex type to another executor.
    template <typename Executor1>
    struct rebind_executor
    {
        /// The mutex type when rebound to the specified executor.
        typedef basic_mutex<Executor1> other;
    };

    /// return the default executor.
    executor_type get_executor() const noexcept;
};

/// basic_mutex with default executor.
using mutex = basic_mutex<>;
  1. See async_lock

  2. See lock

async_lock

The async arrive function follows the asio completion token pattern, including default tokens.

This function will not block, but return immediately.

lock

In single-threaded mode this will generate an error of net::error::in_progress if it cannot complete immediately.

In multi-threaded mode this function will block until unlocked. Note that this may lead to deadlocks.

Warning
You must never use the synchronous functions from within an asio event-queue.

Semaphore

/// An asio based semaphore.
template < class Executor = net::any_io_executor >
struct basic_semaphore
{
    /// The type of the default executor.
    using executor_type = Executor;

    /// Rebinds the socket type to another executor.
    template < typename Executor1 >
    struct rebind_executor
    {
        /// The socket type when rebound to the specified executor.
        typedef basic_semaphore< Executor1 > other;
    };

    /// Construct a semaphore
    /// `exec` is the default executor associated with the async_semaphore
    /// `initial_count` is the initial value of the internal counter. ( 0 =< initial_count =< MAX_INT)
    basic_semaphore(executor_type exec, int initial_count = 1,
                    int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// Rebind a semaphore to a new executor.
    template<typename Executor_>
    basic_semaphore(basic_semaphore<Executor_> && sem;

    /// Move assign a semaphore.
    basic_semaphore& operator=(basic_semaphore&&) noexcept = default;

    /// Move assign a semaphore with a different executor.
    template<typename Executor_>
    auto operator=(basic_semaphore<Executor_> && sem);

    /// Construct a semaphore from
    template<typename ExecutionContext>
    explicit basic_semaphore(
            ExecutionContext & ctx,
            int initial_count = 1,
            int concurrency_hint = BOOST_SAM_CONCURRENCY_HINT_DEFAULT);

    /// return the default executor.
    executor_type get_executor() const noexcept;

    /// Initiate an asynchronous acquire of the semaphore (1)
    template < net::completion_token_for<void(error_code)> CompletionHandler >
    auto async_acquire(CompletionHandler &&token = net::default_token<executor_type>);

    /// Acquire synchronously. This may fail depending on the implementation. (2)
    void acquire(error_code & ec);
    void acquire();

    /// This function attempts to acquire the semaphore without blocking or initiating an asynchronous operation.
    /// returns true if the semaphore was acquired, false otherwise
    bool try_acquire();

    /// Release the sempahore.
    /// This function immediately releases the semaphore. If there are
    /// pending async_acquire operations, then the least recent operation will commence completion.
    void
    release();

    /// The current value of the semaphore
    int value() const noexcept;
};

/// basic_semaphore with default executor.
using semaphore = basic_semaphore<>;

async_acquire

Multiple asynchronous acquire operations may be in progress at the same time. However, the caller must ensure that this function is not invoked from two threads simultaneously. When the semaphore’s internal count is above zero, async acquire operations will complete in strict FIFO order.

If the semaphore object is destroyed while an async_acquire is outstanding, the operation’s completion handler will be invoked with the error_code set to error::operation_aborted.

If the async_acquire operation is cancelled before completion, the completion handler will be invoked with the error_code set to error::operation_aborted.

Successful acquisition of the semaphore is signalled to the caller when the completion handler is invoked with no error.

CompletionHandler

The completion token represents a completion token or handler which is invokable with the signature void(error_code)

Note
The completion handler will be invoked as if by post to the handler’s associated executor. If no executor is associated with the completion handler, the handler will be invoked as if by post to the async_semaphore’s associated default executor.

acquire

In single-threaded mode this will generate an error of net::error::in_progress if the semaphore cannot be acquired immediately.

In multi-threaded mode this function will block until another thread releases the semaphore. Note that this may lead to deadlocks.

Note that this may lead to deadlocks.

Warning
You must never use the synchronous functions from within an asio event-queue.

guarded

The guarded functions allow an async_operation while holding the synchronization primitive (i.e. a Semaphore or Mutex) automatically.

semaphore

Function to run OPs only when the Semaphore can be acquired. That way an artificial number of processes can run in parallel.

Type Parameters

  • Executor The executor of the semaphore.

  • CompletionToken The completion token

Parameters

  • sm The semaphore to guard the protection

  • op The operation to guard.

  • completion_token The completion token to use for the async completion.

template<typename Executor, typename Op,
         net::completion_token_for<net::completion_signature_of_t<Op>> CompletionToken>
auto guarded(basic_semaphore<Executor> & sm, Op && op,
             CompletionToken && token = net::default_token<Executor>);
mutex

Function to run OPs only when the Mutex can be locked. Unlocks the mutex on completion.

Type Parameters

  • Executor The executor of the semaphore.

  • CompletionToken The completion token

Parameters

  • mtx The mutex to guard the protection

  • op The operation to guard.

  • completion_token The completion token to use for the async completion.

template<typename Executor, typename Op,
         net::completion_token_for<net::completion_signature_of_t<Op>> CompletionToken>
auto guarded(basic_mutex<Executor> & mtx, Op && op,
             CompletionToken && token = net::default_token<Executor>);

lock_guard

The lock_guard can be used similar to a std::lock_guard, but can be used to type-erase the executor from the type.

/// A lock-guard used as an RAII object that automatically unlocks on destruction
struct lock_guard
{
    /// Construct an empty lock_guard.
    lock_guard() = default;
    /// Move constructor.
    lock_guard(lock_guard &&lhs);
    /// Move assignable (unlike std::lock_guard)
    lock_guard & operator=(lock_guard &&lhs);
    /// Unlock the underlying mutex.
    ~lock_guard();
    // Adopt an already locked mutex
    template<typename Executor>
    lock_guard(basic_mutex<Executor> & mtx, const std::adopt_lock_t &);
};
lock

Acquire a lock_guard synchronously.

  • mtx The mutex to lock.

  • token The Completion Token.

returns: The lock_guard. It might be default constructed if locking wasn’t possible.

template<typename Executor>
lock_guard lock(basic_mutex<Executor> & mtx, error_code & ec);

// throwing overload
template<typename Executor>
lock_guard lock(basic_mutex<Executor> & mtx);
async_lock

Acquire a lock_guard asynchronously.

Type parameters:

  • Implementation The mutex implementation

  • Executor The executor type of the mutex

  • CompletionToken The completion token.

Parameters:

  • mtx The mutex to lock.

  • token The Completion Token.

Returns:

  • The async_result deduced from the token.

template<typename Executor,
         net::completion_token_for<void(error_code, lock_guard)> CompletionToken >
auto async_lock(basic_mutex<Executor> &mtx,
                CompletionToken && token = default_token<Executor> );

Example:

net::awaitable<std::string> protected_read(st::mutex & mtx, tcp::socket & sock)
{
    std::string buf;
    auto l = co_await async_lock(mtx);
    co_await socket.async_read(dynamic_buffer(buf), use_awaitable);
}
Note
Consider using guarded instead.

This documentation is copyright 2023 Klemens D. Morgenstern and is distributed under the Boost Software License, Version 1.0.

Acknowledgements

Thanks to Richard Hodges, for getting this started in asio-experiments.


1. Without a completion function