Skip to content

transaction

Transaction module - provides transaction management with savepoint support.

Transaction

Transaction context manager with support for nested transactions via savepoints.

Source code in ormar/databases/transaction.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class Transaction:
    """
    Transaction context manager with support for nested transactions via savepoints.
    """

    def __init__(
        self,
        database: "DatabaseConnection",
        force_rollback: bool = False,
    ) -> None:
        """
        Initialize transaction.

        :param database: DatabaseConnection instance
        :param force_rollback: If True, always rollback (used for testing)
        """
        self._database = database
        self._force_rollback = force_rollback
        self._connection: Optional[AsyncConnection] = None
        self._transaction: Optional[AsyncTransaction] = None
        self._depth: int = 0

    async def __aenter__(self) -> "Transaction":
        """Enter transaction context."""
        self._depth = _transaction_depth.get()

        # If this is the outermost transaction, get a new connection
        if self._depth == 0:
            self._connection = await self._database.engine.connect().__aenter__()
            self._database.set_transaction_connection(self._connection)
            self._transaction = await self._connection.begin()
            # SQLite requires an explicit BEGIN before SAVEPOINTs to prevent
            # RELEASE SAVEPOINT from auto-committing when no outer transaction exists.
            # Issue after conn.begin() to avoid conflicting with SQLAlchemy's autobegin.
            if self._database.engine.dialect.name == "sqlite":  # pragma: nocover
                await self._connection.exec_driver_sql("BEGIN")
        else:
            # Nested transaction - use savepoint
            self._connection = self._database.get_transaction_connection()
            assert self._connection is not None
            self._transaction = await self._connection.begin_nested()

        _transaction_depth.set(self._depth + 1)

        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]] = None,
        exc_value: Optional[BaseException] = None,
        traceback: Optional[TracebackType] = None,
    ) -> None:
        """Exit transaction context."""
        try:
            _transaction_depth.set(self._depth)

            # Handle transaction completion
            if self._transaction is not None:
                if exc_type is not None or self._force_rollback:
                    await self._transaction.rollback()
                else:
                    await self._transaction.commit()
        finally:
            if self._depth == 0:
                self._database.set_transaction_connection(None)
                if self._connection is not None:
                    await self._connection.close()

__aenter__() async

Enter transaction context.

Source code in ormar/databases/transaction.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def __aenter__(self) -> "Transaction":
    """Enter transaction context."""
    self._depth = _transaction_depth.get()

    # If this is the outermost transaction, get a new connection
    if self._depth == 0:
        self._connection = await self._database.engine.connect().__aenter__()
        self._database.set_transaction_connection(self._connection)
        self._transaction = await self._connection.begin()
        # SQLite requires an explicit BEGIN before SAVEPOINTs to prevent
        # RELEASE SAVEPOINT from auto-committing when no outer transaction exists.
        # Issue after conn.begin() to avoid conflicting with SQLAlchemy's autobegin.
        if self._database.engine.dialect.name == "sqlite":  # pragma: nocover
            await self._connection.exec_driver_sql("BEGIN")
    else:
        # Nested transaction - use savepoint
        self._connection = self._database.get_transaction_connection()
        assert self._connection is not None
        self._transaction = await self._connection.begin_nested()

    _transaction_depth.set(self._depth + 1)

    return self

__aexit__(exc_type=None, exc_value=None, traceback=None) async

Exit transaction context.

Source code in ormar/databases/transaction.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def __aexit__(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_value: Optional[BaseException] = None,
    traceback: Optional[TracebackType] = None,
) -> None:
    """Exit transaction context."""
    try:
        _transaction_depth.set(self._depth)

        # Handle transaction completion
        if self._transaction is not None:
            if exc_type is not None or self._force_rollback:
                await self._transaction.rollback()
            else:
                await self._transaction.commit()
    finally:
        if self._depth == 0:
            self._database.set_transaction_connection(None)
            if self._connection is not None:
                await self._connection.close()

__init__(database, force_rollback=False)

Initialize transaction.

:param database: DatabaseConnection instance :param force_rollback: If True, always rollback (used for testing)

Source code in ormar/databases/transaction.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(
    self,
    database: "DatabaseConnection",
    force_rollback: bool = False,
) -> None:
    """
    Initialize transaction.

    :param database: DatabaseConnection instance
    :param force_rollback: If True, always rollback (used for testing)
    """
    self._database = database
    self._force_rollback = force_rollback
    self._connection: Optional[AsyncConnection] = None
    self._transaction: Optional[AsyncTransaction] = None
    self._depth: int = 0