Async I/O¶
The cysqlite.aio module provides an experimental asyncio interface to
cysqlite. Queries and other blocking methods get sent to a worker thread.
SQLite operates on local disk storage, so queries typically execute extremely
quickly (microseconds / few milliseconds). The cost of dispatching to a
background thread and wrapping in coroutines increases the latency per query.
For every query executed, a closure must be created, a future allocated, a
queue written-to, a loop call_soon_threadsafe() issued, and two context
switches made. This is the case with cysqlite and other drivers like aiosqlite.
If your SQLite workload is heavy enough that avoiding blocking the event-loop is an issue, SQLite may not be a good fit. SQLite only allows one writer at a time, so while using an async wrapper may keep things responsive while waiting to obtain the write lock, writes will not occur “faster”, the bottleneck has merely been moved. Conversely, if you don’t have that much load, the async wrapper adds complexity and overhead for no measurable benefit.
It’s like a super fancy restaurant that has only one table. When using SQLite,
any thread or task that needs to write has to wait in the lobby for the hostess
to seat them at the one available table. The aio implementation here
changes things so that now there are plenty of tables, but only one set of
plates. Everybody can sit down, but they are still waiting, since the plates
can only be at one table at any given time.
Additionally, if multiple coroutines share a single async connection, transaction state can get interleaved between tasks, leading to data corruption. Transactions and savepoints will end up getting interleaved if your connection is used by multiple tasks and you have any concurrency. aiosqlite suffers from this problem as well (since 2018!).
The bottom-line is that the best ways to avoid blowing off your foot:
Use an
AsyncConnectionper task, orUse the
Pooland check-out the single, dedicated writer connection whenever you need a transaction.
Example pool usage:
# Pool serializes access to the writer, so transactions are safe.
pool = Pool('app.db')
async def task_a(pool):
async with pool.writer() as db:
async with db.atomic() as tx:
await db.execute('insert into ...')
await asyncio.sleep(0)
async def task_a(pool):
async with pool.writer() as db:
async with db.atomic() as tx:
await db.execute('insert into ...')
await tx.rollback()
Module¶
- cysqlite.aio.connect(database, **kwargs)¶
Open an
AsyncConnectionto the database. Arbitrary keyword arguments are passed to the underlying synchronouscysqlite.connect().Must be called from within a running asyncio event loop.
- Parameters:
database (str,
pathlib.Path) – database filename or':memory:'.kwargs – passed to
cysqlite.connect(), e.g.timeout,pragmas.
- Returns:
async connection wrapping a synchronous
Connection.- Return type:
Example:
import asyncio from cysqlite.aio import connect async def main(): db = connect(':memory:') await db.execute('create table kv (key text, value text)') await db.execute('insert into kv (key, value) values (?, ?)', ('hello', 'world')) row = await db.execute_one('select * from kv') print(row) # ('hello', 'world') await db.close() asyncio.run(main())
The connection can also be used as an async context-manager:
async with connect('app.db', pragmas={'journal_mode': 'wal'}) as db: await db.execute('create table if not exists kv ("key", "value")') await db.execute('insert into kv values (?, ?)', ('hello', 'asyncio')) # Connection is closed.
AsyncConnection¶
- class cysqlite.aio.AsyncConnection(conn, loop)¶
Async wrapper around synchronous
Connection, created byconnect().- Parameters:
conn (Connection) – synchronous cysqlite connection.
loop – the running asyncio event loop.
Every
AsyncConnectionhas a dedicated backgroundthreading.Threadfrom which it pulls queries or other blocking operations. All SQLite calls for a given connection are serialized through that thread.- conn¶
The underlying synchronous
Connection.
- async execute(sql, params=None)¶
Execute the given sql and return an
AsyncCursor.- Parameters:
sql (str) – SQL query to execute.
params (tuple, list, dict, or
None) – parameters for query (optional).
- Returns:
cursor wrapping the result set.
- Return type:
If the awaiting coroutine is cancelled while the query is pending,
interrupt()is called on the underlying connection to abort the in-progress operation.Example:
curs = await db.execute('select * from users where active = ?', (1,)) for row in await curs.fetchall(): print(row)
- async executemany(sql, seq_of_params)¶
Execute the given sql repeatedly for each parameter group.
Queries executed by
executemany()must not return any result rows, or this will result in anOperationalError.- Parameters:
sql (str) – SQL query to execute.
seq_of_params (sequence of tuple, list, sequence, dict, or
None.) – iterable of parameters.
- Returns:
cursor.
- Return type:
Example:
await db.execute('create table kv ("id" integer primary key, "key", "value")') curs = await db.executemany('insert into kv (key, value) values (?, ?)', [('k1', 'v1'), ('k2', 'v2'), ('k3', 'v3')]) print(curs.lastrowid) # 3. print(curs.rowcount) # 3. curs = await db.executemany('insert into kv (key, value) values (:k, :v)', [{'k': 'k4', 'v': 'v4'}, {'k': 'k5', 'v': 'v5'}]) print(curs.lastrowid) # 5. print(curs.rowcount) # 2.
- async executescript(sql)¶
Execute one or more SQL statements separated by semicolons.
- Parameters:
sql (str) – one or more SQL statements.
- Returns:
cursor.
- Return type:
Example:
await db.executescript(""" begin; create table users ( id integer not null primary key, name text not null, email text not null); create index users_email ON users (email); create table tweets ( id integer not null primary key, content text not null, user_id integer not null references users (id), timestamp integer not null); commit; """)
- async execute_one(sql, params=None)¶
Execute a query and return the first row, or
None.- Parameters:
sql (str) – SQL query.
params – parameters (optional).
- Returns:
first row or
None.- Return type:
tuple,
Row, orNone
- async execute_scalar(sql, params=None)¶
Execute a query and return the first column of the first row, or
None. Useful for aggregates or queries that only return a single value.- Parameters:
sql (str) – SQL query.
params – parameters (optional).
- Returns:
scalar value or
None.
Example:
count = await db.execute_scalar('select count(*) from users')
- async begin(lock=None)¶
Begin a transaction.
If a transaction is already active, raises
OperationalError.- Parameters:
lock (str) – type of SQLite lock to acquire,
DEFERRED(default),IMMEDIATE, orEXCLUSIVE.
- async commit()¶
Commit the current transaction.
If no transaction is active, raises
OperationalError.
- async rollback()¶
Roll back the current transaction.
If no transaction is active, raises
OperationalError.
- async close()¶
Close the underlying database connection and shut down the background thread. Waits up to 5 seconds for the thread to exit.
- Raises:
RuntimeError – if the background thread does not terminate.
After
close(), the connection cannot be used for further operations. Callingclose()on an already-closed connection returnsFalse.
- property in_transaction¶
Whether a transaction is currently active.
- Return type:
bool
- atomic(lock=None)¶
Create an async context-manager which runs queries in a transaction (or savepoint when nested).
Calls to
atomic()can be nested.- Parameters:
lock (str) – lock type:
DEFERRED,IMMEDIATE, orEXCLUSIVE.- Returns:
If you share your
AsyncConnectionacross tasks, transactions and savepoints will end up getting interleaved if you have any concurrency, and this will certainly cause problems. The best way to avoid bugs from interleaved transactions is to:Use an
AsyncConnectionper task, orUse the
Pooland check-out the single, dedicated writer connection whenever you need a transaction.
Example:
async with db.atomic() as txn: await db.execute('insert into users (name) values (?)', ('alice',)) async with db.atomic() as nested: await db.execute('insert into users (name) values (?)', ('bob',)) await nested.rollback() # Only 'bob' is rolled back. await db.execute('insert into users (name) values (?)', ('carl',)) # 'alice' and 'carl' are committed.
Exceptions in nested blocks roll back the savepoint without affecting the outer transaction:
async with db.atomic(): await db.execute('insert into users (name) values (?)', ('alice',)) try: async with db.atomic(): await db.execute('insert into users (name) values (?)', ('alice',)) # IntegrityError — duplicate. Savepoint is rolled back. except IntegrityError: pass # Outer transaction is unaffected. 'alice' is still pending. # 'alice' is committed.
- transaction(lock=None)¶
Create an async context-manager that runs queries in a transaction.
- Parameters:
lock (str) – lock type:
DEFERRED,IMMEDIATE, orEXCLUSIVE.- Returns:
Example:
async with db.transaction() as txn: await db.execute('insert into users (name) values (?)', ('alice',)) await db.execute('insert into users (name) values (?)', ('bob',)) # Both rows committed.
Note
Most applications should prefer
AsyncConnection.atomic(), which automatically uses a transaction at the outermost level and savepoints for nested calls.
- savepoint(sid=None)¶
Create an async context-manager that runs queries in a savepoint. Savepoints can only be used within an active transaction.
Calls to
savepoint()can be nested.- Parameters:
sid (str) – savepoint identifier (optional, auto-generated if omitted).
- Returns:
- async changes()¶
Return the number of rows modified, inserted or deleted by the most recently completed INSERT, UPDATE or DELETE statement on the database connection.
See sqlite3_changes for details on what operations are counted.
- Return type:
int
- async last_insert_rowid()¶
- Returns:
rowid of the most-recently inserted row.
- Return type:
int
- pragma(*args, **kwargs)¶
Execute a PRAGMA statement. Returns an awaitable.
- Parameters:
args – forwarded to
Connection.pragma().kwargs – forwarded to
Connection.pragma().
Example:
journal_mode = await db.pragma('journal_mode') await db.pragma('cache_size', -8000)
- async backup(dest, **kwargs)¶
Perform an online backup to the given destination
AsyncConnection.- Parameters:
dest (AsyncConnection) – database to serve as destination for the backup.
kwargs – forwarded to
Connection.backup().
Example:
source = connect('app.db') dest = connect(':memory:') await source.backup(dest) count = await dest.execute_scalar('select count(*) from users') print(f'Backed up {count} users to in-memory copy.') await dest.close() await source.close()
- async backup_to_file(filename, **kwargs)¶
Perform an online backup to the given destination file.
- Parameters:
filename (str) – database file to serve as destination for the backup.
kwargs – forwarded to
Connection.backup_to_file().
- async checkpoint(**kwargs)¶
Perform a WAL checkpoint.
- Parameters:
kwargs – forwarded to
Connection.checkpoint().- Returns:
tuple of
(wal_size, checkpointed_pages).- Return type:
tuple
AsyncCursor¶
- class cysqlite.aio.AsyncCursor(conn, cursor)¶
Async wrapper around a synchronous
Cursor. Returned byAsyncConnection.execute().- async fetchone()¶
Fetch the next row from the result set.
If no results are available or cursor has been consumed returns
None.- Returns:
next row, or
Noneif exhausted.
- async fetchall()¶
Fetch all remaining rows from the result set.
- Returns:
list of rows.
- Return type:
list
- async fetchmany(size=100, constructor=list)¶
Fetch up to size rows from the result set. Returns fewer than size rows if the result set is exhausted.
- Parameters:
size (int) – maximum number of rows to return.
constructor – callable used to build the result container, defaults to
list.
- Returns:
container of rows.
- async scalar()¶
Fetch the first column of the first row, or
Noneif the result set is empty. Closes the cursor after reading.- Returns:
scalar value or
None.
- property description¶
Column description tuples for the current result set, or
Noneif the last operation did not produce rows. Each tuple contains(name,).This property reads from the synchronous cursor without dispatching.
- property lastrowid¶
The rowid of the most-recently inserted row for this cursor, or
None.This property reads from the synchronous cursor without dispatching.
- property rowcount¶
Return the count of rows modified by the last operation. Returns
-1for queries that do not modify data.This property reads from the synchronous cursor without dispatching.
Transaction Wrappers¶
- class cysqlite.aio.AsyncAtomic(conn, lock=None)¶
Async context-manager for
AsyncConnection.atomic(). Uses a transaction at the outermost level and savepoints for nested calls.- async __aenter__()¶
Begin the transaction or savepoint.
- async __aexit__(exc_type, exc_val, exc_tb)¶
Commit the transaction or savepoint if exiting cleanly. If an unhandled exception occurred, roll back.
- async commit(*args)¶
Explicitly commit the underlying transaction.
- async rollback(*args)¶
Explicitly roll back the underlying transaction.
- class cysqlite.aio.AsyncTransaction(conn, lock=None)¶
Async context-manager for
AsyncConnection.transaction(). Same API asAsyncAtomic.
- class cysqlite.aio.AsyncSavepoint(conn, sid=None)¶
Async context-manager for
AsyncConnection.savepoint(). Same API asAsyncAtomic.
Pool¶
- class cysqlite.aio.Pool(database, readers=4, writer=True, **connect_kwargs)¶
Async implementation of
cysqlite.utils.Pool.- Parameters:
database (str,
pathlib.Path) – database filename.readers (int) – number of read-only connections to create.
writer (bool) – create a dedicated writer connection.
connect_kwargs – arguments for
connect()
Connection pool implementation that provides read-only connections and, optionally, a dedicated writer connection. Ensures that multiple writers are serialized, and that readers cannot lock the database. Requires WAL-mode.
The following default pragmas are applied to all connections opened by the pool:
journal_mode = walcache_size = -64000(64MiB page cache)mmap_size = 256 * 1024 * 1024(256MiB)foreign_keys = 1(enable foreign-key constraint enforcement)
- reader()¶
- Returns:
read-only connection from pool.
- Return type:
- Raises:
InterfaceErrorif pool has been closed.
Context-manager which checks out a read-only async connection from the pool.
Example:
pool = Pool('app.db') async with pool.reader() as conn: curs = await conn.execute('select ...')
- writer()¶
- Returns:
read/write connection from pool.
- Return type:
- Raises:
InterfaceErrorif pool has been closed or if writer connection was disabled.
Context-manager which checks out the async writer connection from the pool. At the end of the wrapped block, if a transaction is active and un-committed the transaction is rolled-back.
Example:
pool = Pool('app.db') async with pool.writer() as conn: async with conn.atomic() as txn: await conn.execute('insert ...')
- async close()¶
Close all connections.