I'm using SqlAlchemy 1.4.18 (async) and I believe I'm running into a race condition that I can't explain. The underlying database is Postgres and asyncpg is used internally by SqlAlchemy.
I have the following insert function in SQL Alchemy Core.
async def create_device(
device_id: str,
device_type: DeviceType,
account_type: AccountType = AccountType.FREE,
expires_at: Optional[datetime] = None,
account_id: Optional[int] = None,
is_banned: bool = False,
last_login_at: Optional[datetime] = None,
) -> datetime:
if expires_at is None:
expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
days=7
)
async with engine.begin() as conn:
await conn.execute(
DeviceTable.insert().values(
id=device_id,
type=device_type,
expires_at=expires_at,
account_type=account_type,
account_id=account_id,
is_banned=is_banned,
last_login_at=last_login_at,
),
)
return expires_at
The unit test runs successfully on its own. However when I run all tests in the test class, then this test will fail every time.
@pytest.mark.asyncio
@patch("service.email_service.EmailService.confirm_token")
async def test_confirm_email_already_confirmed(self, mock_token, client):
expiry_date = self.get_time_in_future()
account_id = await crud_account.create_account(
"[email protected]", "pass1", is_confirmed=True
)
await crud_device.create_device(
"u1", DeviceType.IPHONE, account_id=account_id, expires_at=expiry_date
)
## It has already failed at this point.
mock_token.return_value = "[email protected]"
result = await client.get("/email/confirm/t1")
assert result.status_code == 200
Error:
../app/database/crud_device.py:26: in create_device
await conn.execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/engine.py:405: in execute
result = await greenlet_spawn(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:125: in greenlet_spawn
result = context.throw(*sys.exc_info())
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1582: in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:324: in _execute_on_connection
return connection._execute_clauseelement(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1451: in _execute_clauseelement
ret = self._execute_context(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1813: in _execute_context
self._handle_dbapi_exception(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1994: in _handle_dbapi_exception
util.raise_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
raise exception
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1770: in _execute_context
self.dialect.do_execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py:717: in do_execute
cursor.execute(statement, parameters)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:449: in execute
self._adapt_connection.await_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:67: in await_only
return current.driver.switch(awaitable)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:120: in greenlet_spawn
value = await result
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:424: in _prepare_and_execute
self._handle_exception(error)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:358: in _handle_exception
self._adapt_connection._handle_exception(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_connection object at 0x115007340>
error = InternalServerError('cache lookup failed for type 3912040')
def _handle_exception(self, error):
if self._connection.is_closed():
self._transaction = None
self._started = False
if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
exception_mapping = self.dbapi._asyncpg_error_translate
for super_ in type(error).__mro__:
if super_ in exception_mapping:
translated_error = exception_mapping[super_](
"%s: %s" % (type(error), error)
)
translated_error.pgcode = (
translated_error.sqlstate
) = getattr(error, "sqlstate", None)
> raise translated_error from error
E sqlalchemy.exc.InternalError: (sqlalchemy.dialects.postgresql.asyncpg.InternalServerError) <class 'asyncpg.exceptions.InternalServerError'>: cache lookup failed for type 3912040
E [SQL: INSERT INTO main.device (id, type, created_at, last_login_at, expires_at, account_type, is_banned, account_id) VALUES (%s, %s, now(), NULL, %s, %s, %s, %s)]
E [parameters: ('u1', 'IPHONE', datetime.datetime(2021, 6, 16, 10, 24), 'FREE', False, 1)]
E (Background on this error at: http://sqlalche.me/e/14/2j85)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:652: InternalError
What is this translated_error? Many thanks
cache lookup failed for type 3912940had something to do with the SQLAlchemy SQLCompilation caching, see reference. I set the query size increate_async_enginein accordance with the docs, but that did not change the error behavior.