Skip to content

query_executor

QueryExecutor module - executes database queries using SQLAlchemy async API.

QueryExecutor

Executes database queries using SQLAlchemy async API. Provides a databases-compatible interface.

Source code in ormar/databases/query_executor.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class QueryExecutor:
    """
    Executes database queries using SQLAlchemy async API.
    Provides a databases-compatible interface.
    """

    def __init__(self, connection: AsyncConnection) -> None:
        """
        Initialize query executor.

        :param connection: SQLAlchemy async connection
        """
        self._connection = connection

    async def fetch_all(self, query: Executable) -> List[Any]:
        """
        Execute a query and fetch all rows.

        :param query: SQLAlchemy query expression
        :return: List of Row objects
        """
        result: CursorResult[Any] = await self._connection.execute(query)
        return list(result.mappings().all())

    async def fetch_one(self, query: Executable) -> Optional[RowMapping]:
        """
        Execute a query and fetch one row.

        :param query: SQLAlchemy query expression
        :return: Single Row object or None
        """
        result: CursorResult[Any] = await self._connection.execute(query)
        row = result.mappings().first()
        return row

    async def fetch_val(self, query: Executable, column: int = 0) -> Optional[Any]:
        """
        Execute a query and fetch a single scalar value.

        :param query: SQLAlchemy query expression
        :param column: Column index to fetch (default 0)
        :return: Scalar value or None
        """
        result: CursorResult[Any] = await self._connection.execute(query)
        return result.scalar()

    async def execute(self, query: Executable) -> Any:
        """
        Execute a query (INSERT, UPDATE, DELETE).

        :param query: SQLAlchemy query expression
        :return: For INSERT, returns last row id; for UPDATE/DELETE, returns row count
        """
        result: CursorResult[Any] = await self._connection.execute(query)

        # For INSERT queries, try to get the inserted primary key
        # PostgreSQL/MySQL use inserted_primary_key, SQLite uses lastrowid
        if result.context and result.context.isinsert:  # pragma: no cover
            if result.inserted_primary_key:
                pk_value = result.inserted_primary_key[0]
                if pk_value is not None:
                    return pk_value

            if hasattr(result, "lastrowid") and result.lastrowid:  # pragma: no cover
                return result.lastrowid

        return result.rowcount if result.rowcount is not None else 0

    async def execute_many(
        self, query: Union[Executable, str], values: Sequence[Mapping[str, Any]]
    ) -> None:
        """
        Execute a query multiple times with different parameter sets.

        :param query: SQLAlchemy query expression or SQL string
        :param values: Sequence of parameter mappings
        """
        exec_query = text(query) if isinstance(query, str) else query
        await self._connection.execute(exec_query, values)

    async def iterate(self, query: Executable) -> AsyncIterator[Any]:
        """
        Execute a query and iterate over results.

        :param query: SQLAlchemy query expression
        :return: Async iterator of Row objects
        """
        async with self._connection.stream(query) as result:
            async for row in result.mappings():
                yield row

__init__(connection)

Initialize query executor.

:param connection: SQLAlchemy async connection

Source code in ormar/databases/query_executor.py
19
20
21
22
23
24
25
def __init__(self, connection: AsyncConnection) -> None:
    """
    Initialize query executor.

    :param connection: SQLAlchemy async connection
    """
    self._connection = connection

execute(query) async

Execute a query (INSERT, UPDATE, DELETE).

:param query: SQLAlchemy query expression :return: For INSERT, returns last row id; for UPDATE/DELETE, returns row count

Source code in ormar/databases/query_executor.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def execute(self, query: Executable) -> Any:
    """
    Execute a query (INSERT, UPDATE, DELETE).

    :param query: SQLAlchemy query expression
    :return: For INSERT, returns last row id; for UPDATE/DELETE, returns row count
    """
    result: CursorResult[Any] = await self._connection.execute(query)

    # For INSERT queries, try to get the inserted primary key
    # PostgreSQL/MySQL use inserted_primary_key, SQLite uses lastrowid
    if result.context and result.context.isinsert:  # pragma: no cover
        if result.inserted_primary_key:
            pk_value = result.inserted_primary_key[0]
            if pk_value is not None:
                return pk_value

        if hasattr(result, "lastrowid") and result.lastrowid:  # pragma: no cover
            return result.lastrowid

    return result.rowcount if result.rowcount is not None else 0

execute_many(query, values) async

Execute a query multiple times with different parameter sets.

:param query: SQLAlchemy query expression or SQL string :param values: Sequence of parameter mappings

Source code in ormar/databases/query_executor.py
81
82
83
84
85
86
87
88
89
90
91
async def execute_many(
    self, query: Union[Executable, str], values: Sequence[Mapping[str, Any]]
) -> None:
    """
    Execute a query multiple times with different parameter sets.

    :param query: SQLAlchemy query expression or SQL string
    :param values: Sequence of parameter mappings
    """
    exec_query = text(query) if isinstance(query, str) else query
    await self._connection.execute(exec_query, values)

fetch_all(query) async

Execute a query and fetch all rows.

:param query: SQLAlchemy query expression :return: List of Row objects

Source code in ormar/databases/query_executor.py
27
28
29
30
31
32
33
34
35
async def fetch_all(self, query: Executable) -> List[Any]:
    """
    Execute a query and fetch all rows.

    :param query: SQLAlchemy query expression
    :return: List of Row objects
    """
    result: CursorResult[Any] = await self._connection.execute(query)
    return list(result.mappings().all())

fetch_one(query) async

Execute a query and fetch one row.

:param query: SQLAlchemy query expression :return: Single Row object or None

Source code in ormar/databases/query_executor.py
37
38
39
40
41
42
43
44
45
46
async def fetch_one(self, query: Executable) -> Optional[RowMapping]:
    """
    Execute a query and fetch one row.

    :param query: SQLAlchemy query expression
    :return: Single Row object or None
    """
    result: CursorResult[Any] = await self._connection.execute(query)
    row = result.mappings().first()
    return row

fetch_val(query, column=0) async

Execute a query and fetch a single scalar value.

:param query: SQLAlchemy query expression :param column: Column index to fetch (default 0) :return: Scalar value or None

Source code in ormar/databases/query_executor.py
48
49
50
51
52
53
54
55
56
57
async def fetch_val(self, query: Executable, column: int = 0) -> Optional[Any]:
    """
    Execute a query and fetch a single scalar value.

    :param query: SQLAlchemy query expression
    :param column: Column index to fetch (default 0)
    :return: Scalar value or None
    """
    result: CursorResult[Any] = await self._connection.execute(query)
    return result.scalar()

iterate(query) async

Execute a query and iterate over results.

:param query: SQLAlchemy query expression :return: Async iterator of Row objects

Source code in ormar/databases/query_executor.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def iterate(self, query: Executable) -> AsyncIterator[Any]:
    """
    Execute a query and iterate over results.

    :param query: SQLAlchemy query expression
    :return: Async iterator of Row objects
    """
    async with self._connection.stream(query) as result:
        async for row in result.mappings():
            yield row