Databases

Build Status Coverage Package version

Databases gives you simple asyncio support for a range of databases.

It allows you to make queries using the powerful SQLAlchemy Core expression language, and provides support for PostgreSQL, MySQL, and SQLite.

Databases is suitable for integrating against any async Web framework, such as Starlette, Sanic, Responder, Quart, aiohttp, Tornado, FastAPI, or Bocadillo.

Requirements: Python 3.6+


Installation

$ pip install databases

You can install the required database drivers with:

$ pip install databases[postgresql]
$ pip install databases[mysql]
$ pip install databases[sqlite]

Driver support is providing using one of asyncpg, aiomysql, or aiosqlite.

Getting started

Note: Use ipython to try these example from the console, since it supports await.

Declare your tables using SQLAlchemy:

import sqlalchemy


metadata = sqlalchemy.MetaData()

notes = sqlalchemy.Table(
    "notes",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("text", sqlalchemy.String(length=100)),
    sqlalchemy.Column("completed", sqlalchemy.Boolean),
)

You can use any of the sqlalchemy column types such as sqlalchemy.JSON, or custom column types.

Queries

You can now use any SQLAlchemy core queries (official tutorial).

from databases import Database

database = Database('postgresql://localhost/example')


# Establish the connection pool
await database.connect()

# Execute
query = notes.insert()
values = {"text": "example1", "completed": True}
await database.execute(query=query, values=values)

# Execute many
query = notes.insert()
values = [
    {"text": "example2", "completed": False},
    {"text": "example3", "completed": True},
]
await database.execute_many(query=query, values=values)

# Fetch multiple rows
query = notes.select()
rows = await database.fetch_all(query=query)

# Fetch single row
query = notes.select()
row = await database.fetch_one(query=query)

# Fetch single value, defaults to `column=0`.
query = notes.select()
value = await database.fetch_val(query=query)

# Fetch multiple rows without loading them all into memory at once
query = notes.select()
async for row in database.iterate(query=query):
    ...

# Close all connection in the connection pool
await database.disconnect()

Connections are managed as task-local state, with driver implementations transparently using connection pooling behind the scenes.

Raw queries

In addition to SQLAlchemy core queries, you can also perform raw SQL queries:

# Execute
query = "INSERT INTO notes(text, completed) VALUES (:text, :completed)"
values = {"text": "example1", "completed": True}
await database.execute(query=query, values=values)

# Execute many
query = "INSERT INTO notes(text, completed) VALUES (:text, :completed)"
values = [
    {"text": "example2", "completed": False},
    {"text": "example3", "completed": True},
]
await database.execute_many(query=query, values=values)

# Fetch multiple rows
query = "SELECT * FROM notes WHERE completed = :completed"
rows = await database.fetch_all(query=query, values={"completed": True})

# Fetch single row
query = "SELECT * FROM notes WHERE id = :id"
result = await database.fetch_one(query=query, values={"id": 1})

Note that query arguments should follow the :query_arg style.

Transactions

Transactions are managed by async context blocks:

async with database.transaction():
    ...

For a lower-level transaction API:

transaction = await database.transaction()
try:
    ...
except:
    transaction.rollback()
else:
    transaction.commit()

You can also use .transaction() as a function decorator on any async function:

@database.transaction()
async def create_users(request):
    ...

Transaction blocks are managed as task-local state. Nested transactions are fully supported, and are implemented using database savepoints.

Connecting and disconnecting

You can control the database connect/disconnect, by using it as a async context manager.

async with Database(DATABASE_URL) as database:
    ...

Or by using explicit connection and disconnection:

database = Database(DATABASE_URL)
await database.connect()
...
await database.disconnect()

If you're integrating against a web framework, then you'll probably want to hook into framework startup or shutdown events. For example, with Starlette you would use the following:

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

Connection options

The PostgreSQL and MySQL backends provide a few connection options for SSL and for configuring the connection pool.

# Use an SSL connection.
database = Database('postgresql://localhost/example?ssl=true')

# Use a connection pool of between 5-20 connections.
database = Database('mysql://localhost/example?min_size=5&max_size=20')

You can also use keyword arguments to pass in any connection options. Available keyword arguments may differ between database backends.

database = Database('postgresql://localhost/example', ssl=True, min_size=5, max_size=20)

Test isolation

For strict test isolation you will always want to rollback the test database to a clean state between each test case:

database = Database(DATABASE_URL, force_rollback=True)

This will ensure that all database connections are run within a transaction that rollbacks once the database is disconnected.

If you're integrating against a web framework you'll typically want to use something like the following pattern:

if TESTING:
    database = Database(TEST_DATABASE_URL, force_rollback=True)
else:
    database = Database(DATABASE_URL)

This will give you test cases that run against a different database to the development database, with strict test isolation so long as you make sure to connect and disconnect to the database between test cases.

For a lower level API you can explicitly create force-rollback transactions:

async with database.transaction(force_rollback=True):
    ...

Migrations

Because databases uses SQLAlchemy core, you can integrate with Alembic for database migration support.

$ pip install alembic
$ alembic init migrations

You'll want to set things up so that Alembic references the configured DATABASE_URL, and uses your table metadata.

In alembic.ini remove the following line:

sqlalchemy.url = driver://user:pass@localhost/dbname

In migrations/env.py, you need to set the 'sqlalchemy.url' configuration key, and the target_metadata variable. You'll want something like this:

# The Alembic Config object.
config = context.config

# Configure Alembic to use our DATABASE_URL and our table definitions.
# These are just examples - the exact setup will depend on whatever
# framework you're integrating against.
from myapp.settings import DATABASE_URL
from myapp.tables import metadata

config.set_main_option('sqlalchemy.url', str(DATABASE_URL))
target_metadata = metadata

...

Note that migrations will use a standard synchronous database driver, rather than using the async drivers that databases provides support for.

This will also be the case if you're using SQLAlchemy's standard tooling, such as using metadata.create_all(engine) to setup the database tables.

Note for MySQL:

For MySQL you'll probably need to explicitly specify the pymysql dialect when using Alembic since the default MySQL dialect does not support Python 3.

If you're using the databases.DatabaseURL datatype, you can obtain this using DATABASE_URL.replace(dialect="pymysql")

— ⭐️ —

::...
免责声明:
当前网页内容, 由 大妈 ZoomQuiet 使用工具: ScrapBook :: Firefox Extension 人工从互联网中收集并分享;
内容版权归原作者所有;
本人对内容的有效性/合法性不承担任何强制性责任.
若有不妥, 欢迎评注提醒:

或是邮件反馈可也:
askdama[AT]googlegroups.com



自怼圈/年番新

DU21.7
关于 ~ DebugUself with DAMA ;-)


关注公众号, 持续获得相关各种嗯哼:
zoomquiet


粤ICP备18025058号-1
公安备案号: 44049002000656 ...::