Async I/O

The cysqlite.aio module provides an experimental asyncio interface to cysqlite. Queries and other blocking methods get sent to a worker thread.

SQLite operates on local disk storage, so queries typically execute extremely quickly (microseconds / few milliseconds). The cost of dispatching to a background thread and wrapping in coroutines increases the latency per query. For every query executed, a closure must be created, a future allocated, a queue written-to, a loop call_soon_threadsafe() issued, and two context switches made. This is the case with cysqlite and other drivers like aiosqlite.

If your SQLite workload is heavy enough that avoiding blocking the event-loop is an issue, SQLite may not be a good fit. SQLite only allows one writer at a time, so while using an async wrapper may keep things responsive while waiting to obtain the write lock, writes will not occur “faster”, the bottleneck has merely been moved. Conversely, if you don’t have that much load, the async wrapper adds complexity and overhead for no measurable benefit.

It’s like a super fancy restaurant that has only one table. When using SQLite, any thread or task that needs to write has to wait in the lobby for the hostess to seat them at the one available table. The aio implementation here changes things so that now there are plenty of tables, but only one set of plates. Everybody can sit down, but they are still waiting, since the plates can only be at one table at any given time.

Additionally, if multiple coroutines share a single async connection, transaction state can get interleaved between tasks, leading to data corruption. Transactions and savepoints will end up getting interleaved if your connection is used by multiple tasks and you have any concurrency. aiosqlite suffers from this problem as well (since 2018!).

The bottom-line is that the best ways to avoid blowing off your foot:

  • Use an AsyncConnection per task, or

  • Use the Pool and check-out the single, dedicated writer connection whenever you need a transaction.

Example pool usage:

# Pool serializes access to the writer, so transactions are safe.
pool = Pool('app.db')

async def task_a(pool):
    async with pool.writer() as db:
        async with db.atomic() as tx:
            await db.execute('insert into ...')
            await asyncio.sleep(0)

async def task_a(pool):
    async with pool.writer() as db:
        async with db.atomic() as tx:
            await db.execute('insert into ...')
            await tx.rollback()

Module

cysqlite.aio.connect(database, **kwargs)

Open an AsyncConnection to the database. Arbitrary keyword arguments are passed to the underlying synchronous cysqlite.connect().

Must be called from within a running asyncio event loop.

Parameters:
  • database (str, pathlib.Path) – database filename or ':memory:'.

  • kwargs – passed to cysqlite.connect(), e.g. timeout, pragmas.

Returns:

async connection wrapping a synchronous Connection.

Return type:

AsyncConnection

Example:

import asyncio
from cysqlite.aio import connect

async def main():
    db = connect(':memory:')

    await db.execute('create table kv (key text, value text)')
    await db.execute('insert into kv (key, value) values (?, ?)',
                     ('hello', 'world'))

    row = await db.execute_one('select * from kv')
    print(row)  # ('hello', 'world')

    await db.close()

asyncio.run(main())

The connection can also be used as an async context-manager:

async with connect('app.db', pragmas={'journal_mode': 'wal'}) as db:
    await db.execute('create table if not exists kv ("key", "value")')
    await db.execute('insert into kv values (?, ?)', ('hello', 'asyncio'))

# Connection is closed.

AsyncConnection

class cysqlite.aio.AsyncConnection(conn, loop)

Async wrapper around synchronous Connection, created by connect().

Parameters:
  • conn (Connection) – synchronous cysqlite connection.

  • loop – the running asyncio event loop.

Every AsyncConnection has a dedicated background threading.Thread from which it pulls queries or other blocking operations. All SQLite calls for a given connection are serialized through that thread.

conn

The underlying synchronous Connection.

async execute(sql, params=None)

Execute the given sql and return an AsyncCursor.

Parameters:
  • sql (str) – SQL query to execute.

  • params (tuple, list, dict, or None) – parameters for query (optional).

Returns:

cursor wrapping the result set.

Return type:

AsyncCursor

If the awaiting coroutine is cancelled while the query is pending, interrupt() is called on the underlying connection to abort the in-progress operation.

Example:

curs = await db.execute('select * from users where active = ?', (1,))
for row in await curs.fetchall():
    print(row)
async executemany(sql, seq_of_params)

Execute the given sql repeatedly for each parameter group.

Queries executed by executemany() must not return any result rows, or this will result in an OperationalError.

Parameters:
  • sql (str) – SQL query to execute.

  • seq_of_params (sequence of tuple, list, sequence, dict, or None.) – iterable of parameters.

Returns:

cursor.

Return type:

AsyncCursor

Example:

await db.execute('create table kv ("id" integer primary key, "key", "value")')

curs = await db.executemany('insert into kv (key, value) values (?, ?)',
                            [('k1', 'v1'), ('k2', 'v2'), ('k3', 'v3')])
print(curs.lastrowid)  # 3.
print(curs.rowcount)  # 3.

curs = await db.executemany('insert into kv (key, value) values (:k, :v)',
                           [{'k': 'k4', 'v': 'v4'}, {'k': 'k5', 'v': 'v5'}])
print(curs.lastrowid)  # 5.
print(curs.rowcount)  # 2.
async executescript(sql)

Execute one or more SQL statements separated by semicolons.

Parameters:

sql (str) – one or more SQL statements.

Returns:

cursor.

Return type:

AsyncCursor

Example:

await db.executescript("""
    begin;
    create table users (
       id integer not null primary key,
       name text not null,
       email text not null);
    create index users_email ON users (email);

    create table tweets (
       id integer not null primary key,
       content text not null,
       user_id integer not null references users (id),
       timestamp integer not null);

    commit;
""")
async execute_one(sql, params=None)

Execute a query and return the first row, or None.

Parameters:
  • sql (str) – SQL query.

  • params – parameters (optional).

Returns:

first row or None.

Return type:

tuple, Row, or None

async execute_scalar(sql, params=None)

Execute a query and return the first column of the first row, or None. Useful for aggregates or queries that only return a single value.

Parameters:
  • sql (str) – SQL query.

  • params – parameters (optional).

Returns:

scalar value or None.

Example:

count = await db.execute_scalar('select count(*) from users')
async begin(lock=None)

Begin a transaction.

If a transaction is already active, raises OperationalError.

Parameters:

lock (str) – type of SQLite lock to acquire, DEFERRED (default), IMMEDIATE, or EXCLUSIVE.

async commit()

Commit the current transaction.

If no transaction is active, raises OperationalError.

async rollback()

Roll back the current transaction.

If no transaction is active, raises OperationalError.

async close()

Close the underlying database connection and shut down the background thread. Waits up to 5 seconds for the thread to exit.

Raises:

RuntimeError – if the background thread does not terminate.

After close(), the connection cannot be used for further operations. Calling close() on an already-closed connection returns False.

property in_transaction

Whether a transaction is currently active.

Return type:

bool

atomic(lock=None)

Create an async context-manager which runs queries in a transaction (or savepoint when nested).

Calls to atomic() can be nested.

Parameters:

lock (str) – lock type: DEFERRED, IMMEDIATE, or EXCLUSIVE.

Returns:

AsyncAtomic

If you share your AsyncConnection across tasks, transactions and savepoints will end up getting interleaved if you have any concurrency, and this will certainly cause problems. The best way to avoid bugs from interleaved transactions is to:

  • Use an AsyncConnection per task, or

  • Use the Pool and check-out the single, dedicated writer connection whenever you need a transaction.

Example:

async with db.atomic() as txn:
    await db.execute('insert into users (name) values (?)', ('alice',))

    async with db.atomic() as nested:
        await db.execute('insert into users (name) values (?)', ('bob',))
        await nested.rollback()  # Only 'bob' is rolled back.

        await db.execute('insert into users (name) values (?)', ('carl',))

# 'alice' and 'carl' are committed.

Exceptions in nested blocks roll back the savepoint without affecting the outer transaction:

async with db.atomic():
    await db.execute('insert into users (name) values (?)', ('alice',))

    try:
        async with db.atomic():
            await db.execute('insert into users (name) values (?)', ('alice',))
            # IntegrityError — duplicate. Savepoint is rolled back.
    except IntegrityError:
        pass

    # Outer transaction is unaffected. 'alice' is still pending.

# 'alice' is committed.
transaction(lock=None)

Create an async context-manager that runs queries in a transaction.

Parameters:

lock (str) – lock type: DEFERRED, IMMEDIATE, or EXCLUSIVE.

Returns:

AsyncTransaction

Example:

async with db.transaction() as txn:
    await db.execute('insert into users (name) values (?)', ('alice',))
    await db.execute('insert into users (name) values (?)', ('bob',))

# Both rows committed.

Note

Most applications should prefer AsyncConnection.atomic(), which automatically uses a transaction at the outermost level and savepoints for nested calls.

savepoint(sid=None)

Create an async context-manager that runs queries in a savepoint. Savepoints can only be used within an active transaction.

Calls to savepoint() can be nested.

Parameters:

sid (str) – savepoint identifier (optional, auto-generated if omitted).

Returns:

AsyncSavepoint

async changes()

Return the number of rows modified, inserted or deleted by the most recently completed INSERT, UPDATE or DELETE statement on the database connection.

See sqlite3_changes for details on what operations are counted.

Return type:

int

async last_insert_rowid()
Returns:

rowid of the most-recently inserted row.

Return type:

int

pragma(*args, **kwargs)

Execute a PRAGMA statement. Returns an awaitable.

Parameters:

Example:

journal_mode = await db.pragma('journal_mode')
await db.pragma('cache_size', -8000)
async backup(dest, **kwargs)

Perform an online backup to the given destination AsyncConnection.

Parameters:

Example:

source = connect('app.db')
dest = connect(':memory:')

await source.backup(dest)

count = await dest.execute_scalar('select count(*) from users')
print(f'Backed up {count} users to in-memory copy.')

await dest.close()
await source.close()
async backup_to_file(filename, **kwargs)

Perform an online backup to the given destination file.

Parameters:
async checkpoint(**kwargs)

Perform a WAL checkpoint.

Parameters:

kwargs – forwarded to Connection.checkpoint().

Returns:

tuple of (wal_size, checkpointed_pages).

Return type:

tuple

async __aenter__()
async __aexit__(exc_type, exc_val, exc_tb)

Use the connection as an async context-manager. On exit, the connection is closed.

async with connect('app.db') as db:
    await db.execute('select 1')

# db is closed.

AsyncCursor

class cysqlite.aio.AsyncCursor(conn, cursor)

Async wrapper around a synchronous Cursor. Returned by AsyncConnection.execute().

async fetchone()

Fetch the next row from the result set.

If no results are available or cursor has been consumed returns None.

Returns:

next row, or None if exhausted.

async fetchall()

Fetch all remaining rows from the result set.

Returns:

list of rows.

Return type:

list

async fetchmany(size=100, constructor=list)

Fetch up to size rows from the result set. Returns fewer than size rows if the result set is exhausted.

Parameters:
  • size (int) – maximum number of rows to return.

  • constructor – callable used to build the result container, defaults to list.

Returns:

container of rows.

async scalar()

Fetch the first column of the first row, or None if the result set is empty. Closes the cursor after reading.

Returns:

scalar value or None.

property description

Column description tuples for the current result set, or None if the last operation did not produce rows. Each tuple contains (name,).

This property reads from the synchronous cursor without dispatching.

property lastrowid

The rowid of the most-recently inserted row for this cursor, or None.

This property reads from the synchronous cursor without dispatching.

property rowcount

Return the count of rows modified by the last operation. Returns -1 for queries that do not modify data.

This property reads from the synchronous cursor without dispatching.

async __aiter__()
async __anext__()

Async iteration over the result set. Rows are fetched in batches of 100 internally to amortize the cost of dispatching to the background thread.

Example:

curs = await db.execute('select * from events order by ts')
async for row in curs:
    process(row)

Transaction Wrappers

class cysqlite.aio.AsyncAtomic(conn, lock=None)

Async context-manager for AsyncConnection.atomic(). Uses a transaction at the outermost level and savepoints for nested calls.

async __aenter__()

Begin the transaction or savepoint.

async __aexit__(exc_type, exc_val, exc_tb)

Commit the transaction or savepoint if exiting cleanly. If an unhandled exception occurred, roll back.

async commit(*args)

Explicitly commit the underlying transaction.

async rollback(*args)

Explicitly roll back the underlying transaction.

class cysqlite.aio.AsyncTransaction(conn, lock=None)

Async context-manager for AsyncConnection.transaction(). Same API as AsyncAtomic.

class cysqlite.aio.AsyncSavepoint(conn, sid=None)

Async context-manager for AsyncConnection.savepoint(). Same API as AsyncAtomic.

Pool

class cysqlite.aio.Pool(database, readers=4, writer=True, **connect_kwargs)

Async implementation of cysqlite.utils.Pool.

Parameters:
  • database (str, pathlib.Path) – database filename.

  • readers (int) – number of read-only connections to create.

  • writer (bool) – create a dedicated writer connection.

  • connect_kwargs – arguments for connect()

Connection pool implementation that provides read-only connections and, optionally, a dedicated writer connection. Ensures that multiple writers are serialized, and that readers cannot lock the database. Requires WAL-mode.

The following default pragmas are applied to all connections opened by the pool:

  • journal_mode = wal

  • cache_size = -64000 (64MiB page cache)

  • mmap_size = 256 * 1024 * 1024 (256MiB)

  • foreign_keys = 1 (enable foreign-key constraint enforcement)

reader()
Returns:

read-only connection from pool.

Return type:

AsyncConnection

Raises:

InterfaceError if pool has been closed.

Context-manager which checks out a read-only async connection from the pool.

Example:

pool = Pool('app.db')

async with pool.reader() as conn:
    curs = await conn.execute('select ...')
writer()
Returns:

read/write connection from pool.

Return type:

AsyncConnection

Raises:

InterfaceError if pool has been closed or if writer connection was disabled.

Context-manager which checks out the async writer connection from the pool. At the end of the wrapped block, if a transaction is active and un-committed the transaction is rolled-back.

Example:

pool = Pool('app.db')

async with pool.writer() as conn:
    async with conn.atomic() as txn:
        await conn.execute('insert ...')
async close()

Close all connections.