7

I'm using SqlAlchemy 1.4.18 (async) and I believe I'm running into a race condition that I can't explain. The underlying database is Postgres and asyncpg is used internally by SqlAlchemy.

I have the following insert function in SQL Alchemy Core.

async def create_device(
    device_id: str,
    device_type: DeviceType,
    account_type: AccountType = AccountType.FREE,
    expires_at: Optional[datetime] = None,
    account_id: Optional[int] = None,
    is_banned: bool = False,
    last_login_at: Optional[datetime] = None,
) -> datetime:
    if expires_at is None:
        expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
            days=7
        )
    async with engine.begin() as conn:
        await conn.execute(
            DeviceTable.insert().values(
                id=device_id,
                type=device_type,
                expires_at=expires_at,
                account_type=account_type,
                account_id=account_id,
                is_banned=is_banned,
                last_login_at=last_login_at,
            ),
        )
        return expires_at

The unit test runs successfully on its own. However when I run all tests in the test class, then this test will fail every time.

@pytest.mark.asyncio
    @patch("service.email_service.EmailService.confirm_token")
    async def test_confirm_email_already_confirmed(self, mock_token, client):
        expiry_date = self.get_time_in_future()
        account_id = await crud_account.create_account(
            "[email protected]", "pass1", is_confirmed=True
        )
        await crud_device.create_device(
            "u1", DeviceType.IPHONE, account_id=account_id, expires_at=expiry_date
        )
## It has already failed at this point.
        mock_token.return_value = "[email protected]"
        result = await client.get("/email/confirm/t1")
        assert result.status_code == 200

Error:

../app/database/crud_device.py:26: in create_device
    await conn.execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/engine.py:405: in execute
    result = await greenlet_spawn(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:125: in greenlet_spawn
    result = context.throw(*sys.exc_info())
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1582: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:324: in _execute_on_connection
    return connection._execute_clauseelement(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1451: in _execute_clauseelement
    ret = self._execute_context(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1813: in _execute_context
    self._handle_dbapi_exception(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1994: in _handle_dbapi_exception
    util.raise_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
    raise exception
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1770: in _execute_context
    self.dialect.do_execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py:717: in do_execute
    cursor.execute(statement, parameters)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:449: in execute
    self._adapt_connection.await_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:67: in await_only
    return current.driver.switch(awaitable)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:120: in greenlet_spawn
    value = await result
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:424: in _prepare_and_execute
    self._handle_exception(error)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:358: in _handle_exception
    self._adapt_connection._handle_exception(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_connection object at 0x115007340>
error = InternalServerError('cache lookup failed for type 3912040')

    def _handle_exception(self, error):
        if self._connection.is_closed():
            self._transaction = None
            self._started = False
    
        if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
            exception_mapping = self.dbapi._asyncpg_error_translate
    
            for super_ in type(error).__mro__:
                if super_ in exception_mapping:
                    translated_error = exception_mapping[super_](
                        "%s: %s" % (type(error), error)
                    )
                    translated_error.pgcode = (
                        translated_error.sqlstate
                    ) = getattr(error, "sqlstate", None)
>                   raise translated_error from error
E                   sqlalchemy.exc.InternalError: (sqlalchemy.dialects.postgresql.asyncpg.InternalServerError) <class 'asyncpg.exceptions.InternalServerError'>: cache lookup failed for type 3912040
E                   [SQL: INSERT INTO main.device (id, type, created_at, last_login_at, expires_at, account_type, is_banned, account_id) VALUES (%s, %s, now(), NULL, %s, %s, %s, %s)]
E                   [parameters: ('u1', 'IPHONE', datetime.datetime(2021, 6, 16, 10, 24), 'FREE', False, 1)]
E                   (Background on this error at: http://sqlalche.me/e/14/2j85)

../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:652: InternalError

What is this translated_error? Many thanks

3
  • 2
    I am experiencing the same issue. I have noticed that my unit tests will pass individually, but only every other test passes when run as a group. Commented Jun 16, 2021 at 13:12
  • I was thinking perhaps the cache lookup failed for type 3912940 had something to do with the SQLAlchemy SQLCompilation caching, see reference. I set the query size in create_async_engine in accordance with the docs, but that did not change the error behavior. Commented Jun 16, 2021 at 13:29
  • 1
    Correct, the failure is random and unrelated to the unit test itself. For many hours I tried to use explicit primary keys in my tests to ensure there is no collision between the inserts. But that is not the issue. I have opened a bug report and will keep you updated if I find anything. Please let me know if you come across a solution. Thanks Commented Jun 16, 2021 at 13:36

2 Answers 2

4

I had help with this from the SQLAlchemy maintainers.

As Nadir suggested in the other answer, one way to resolve this is to disable the caching, however this will have a performance hit.

The correct way to solve that is by invalidating the async engine from your teardown unittest. This goes with the assumption that you are using a sync engine in your test_base.py.

Database.py

engine = create_async_engine(
    "postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

test_base.py

from database.database import metadata, engine

engine_sync = create_engine(
    "postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

class TestBase:
    async def dispose(self):
        await engine.dispose()

    def teardown(self):
        metadata.drop_all(engine_sync)
        asyncio.run(self.dispose())
Sign up to request clarification or add additional context in comments.

6 Comments

Does this have the effect of creating a new engine for each unit test? If so, is that desired and consistent with how the application will operate? In my case, I'm using the FastAPI framework and the engine gets instantiated once when the app is instantiated. Then, whether for testing or in service, the application maintains the same engine for all requests. I understand the issue to be that SQLAlchemy uses a connection pool with asyncpg, but it sounds like that conflicts with the asyncpg cache approach. Thus, subsequent requests to the same engine potentially fail.
Ah, nvm. I looked at the docs for AsyncEngine.dispose() and it is exactly for disposing of the connection pool. Thank you for sharing!
You're welcome. In my example the production app utilises a single async engine. The unit tests on the other hand use the second sync engine just for create_all and drop_all (to keep it simpler). You brought up an interesting point. I assume you worked with Databases wrapper before you switched to SqlAlchemy 1.4, correct? With that you had to initiate a connection onStartup(). This doesn't seem possible with SqlAlchemy 1.4. But every connection you open come directly from the pool. It seems faster, but I need to test more. How did you setup your environment? Thanks
I'm using FastAPI's dependency injection, see docs here to call a generator that yields my db AsyncSession then closes it for each endpoint. I copied the pattern from the FastAPI full-stack example backend, see this repo, and then modified it for the async SQLAlchemy.
For purposes of completeness, it looks like the SQLAlchemy docs do have a warning about the asyncpg cache and doing things through different engines. The warning here is warning about the root cause of the issue.
|
3

I was able to solve the problem. It appears to be an issue with the asyncpg driver prepared statement cache. This sounds like a frequent issue, because they mention it in their FAQs here

I first attempted to turn-off this caching behavior by setting the query_cache_size=0 in the SQLAlchemy create_async_engine. This is mentioned in the SQLAlchemy documentation:

async_engine = create_async_engine(
    f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}",
    echo=True,
    query_cache_size=0
)

However, looking at the logs of the SQL spit out by echo, it appears to still use the cache. I think this is because I misunderstood that both asyncpg and SQLAlchemy seem to implement a statement cache of sorts. The SQLAlchemy param doesn't affect the asyncpg behavior.

I then found some discussion about the asyncpg prepared statement cache on SQLAlchemy GitHub Issue 6467.

Based upon a comment in that thread, I was able to solve the problem by passing the prepared_statement_cache_size=0 as a query param directly in the PostgreSQL URI. The new, working create_async_engine looks like:

async_engine = create_async_engine(
    f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}?prepared_statement_cache_size=0",
    echo=True
)

NOTE: The query_cache_size SQLAlchemy param didn't affect this issue, so I removed that to allow SQLAlchemy to cache its compiled SQL statements using the default behavior in 1.4.

1 Comment

Thanks Nadir. I have discussed it with the maintainers. Please see my answer, you may find it useful, since you can keep the cache enabled that way.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.