Skip to content

queryset

Contains QuerySet and different Query classes to allow for constructing of sql queries.

FieldAccessor

Helper to access ormar fields directly from Model class also for nested models attributes.

Source code in ormar\queryset\field_accessor.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class FieldAccessor:
    """
    Helper to access ormar fields directly from Model class also for nested
    models attributes.
    """

    def __init__(
        self,
        source_model: Type["Model"],
        field: "BaseField" = None,
        model: Type["Model"] = None,
        access_chain: str = "",
    ) -> None:
        self._source_model = source_model
        self._field = field
        self._model = model
        self._access_chain = access_chain

    def __bool__(self) -> bool:
        """
        Hack to avoid pydantic name check from parent model, returns false

        :return: False
        :rtype: bool
        """
        return False

    def __getattr__(self, item: str) -> Any:
        """
        Accessor return new accessor for each field and nested models.
        Thanks to that operator overload is possible to use in filter.

        :param item: attribute name
        :type item: str
        :return: FieldAccessor for field or nested model
        :rtype: ormar.queryset.field_accessor.FieldAccessor
        """
        if (
            object.__getattribute__(self, "_field")
            and item == object.__getattribute__(self, "_field").name
        ):
            return self._field

        if (
            object.__getattribute__(self, "_model")
            and item in object.__getattribute__(self, "_model").Meta.model_fields
        ):
            field = cast("Model", self._model).Meta.model_fields[item]
            if field.is_relation:
                return FieldAccessor(
                    source_model=self._source_model,
                    model=field.to,
                    access_chain=self._access_chain + f"__{item}",
                )
            else:
                return FieldAccessor(
                    source_model=self._source_model,
                    field=field,
                    access_chain=self._access_chain + f"__{item}",
                )
        return object.__getattribute__(self, item)  # pragma: no cover

    def _check_field(self) -> None:
        if not self._field:
            raise AttributeError(
                "Cannot filter by Model, you need to provide model name"
            )

    def _select_operator(self, op: str, other: Any) -> FilterGroup:
        self._check_field()
        filter_kwg = {self._access_chain + f"__{METHODS_TO_OPERATORS[op]}": other}
        return FilterGroup(**filter_kwg)

    def __eq__(self, other: Any) -> FilterGroup:  # type: ignore
        """
        overloaded to work as sql `column = <VALUE>`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="__eq__", other=other)

    def __ge__(self, other: Any) -> FilterGroup:
        """
        overloaded to work as sql `column >= <VALUE>`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="__ge__", other=other)

    def __gt__(self, other: Any) -> FilterGroup:
        """
        overloaded to work as sql `column > <VALUE>`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="__gt__", other=other)

    def __le__(self, other: Any) -> FilterGroup:
        """
        overloaded to work as sql `column <= <VALUE>`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="__le__", other=other)

    def __lt__(self, other: Any) -> FilterGroup:
        """
        overloaded to work as sql `column < <VALUE>`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="__lt__", other=other)

    def __mod__(self, other: Any) -> FilterGroup:
        """
        overloaded to work as sql `column LIKE '%<VALUE>%'`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="__mod__", other=other)

    def __lshift__(self, other: Any) -> FilterGroup:
        """
        overloaded to work as sql `column IN (<VALUE1>, <VALUE2>,...)`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="in", other=other)

    def __rshift__(self, other: Any) -> FilterGroup:
        """
        overloaded to work as sql `column IS NULL`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="isnull", other=True)

    def in_(self, other: Any) -> FilterGroup:
        """
        works as sql `column IN (<VALUE1>, <VALUE2>,...)`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="in", other=other)

    def iexact(self, other: Any) -> FilterGroup:
        """
        works as sql `column = <VALUE>` case-insensitive

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="iexact", other=other)

    def contains(self, other: Any) -> FilterGroup:
        """
        works as sql `column LIKE '%<VALUE>%'`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="contains", other=other)

    def icontains(self, other: Any) -> FilterGroup:
        """
        works as sql `column LIKE '%<VALUE>%'` case-insensitive

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="icontains", other=other)

    def startswith(self, other: Any) -> FilterGroup:
        """
        works as sql `column LIKE '<VALUE>%'`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="startswith", other=other)

    def istartswith(self, other: Any) -> FilterGroup:
        """
        works as sql `column LIKE '%<VALUE>'` case-insensitive

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="istartswith", other=other)

    def endswith(self, other: Any) -> FilterGroup:
        """
        works as sql `column LIKE '%<VALUE>'`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="endswith", other=other)

    def iendswith(self, other: Any) -> FilterGroup:
        """
        works as sql `column LIKE '%<VALUE>'` case-insensitive

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="iendswith", other=other)

    def isnull(self, other: Any) -> FilterGroup:
        """
        works as sql `column IS NULL` or `IS NOT NULL`

        :param other: value to check agains operator
        :type other: str
        :return: FilterGroup for operator
        :rtype: ormar.queryset.clause.FilterGroup
        """
        return self._select_operator(op="isnull", other=other)

    def asc(self) -> OrderAction:
        """
        works as sql `column asc`

        :return: OrderGroup for operator
        :rtype: ormar.queryset.actions.OrderGroup
        """
        return OrderAction(order_str=self._access_chain, model_cls=self._source_model)

    def desc(self) -> OrderAction:
        """
        works as sql `column desc`

        :return: OrderGroup for operator
        :rtype: ormar.queryset.actions.OrderGroup
        """
        return OrderAction(
            order_str="-" + self._access_chain, model_cls=self._source_model
        )

__bool__()

Hack to avoid pydantic name check from parent model, returns false

Returns:

Type Description
bool

False

Source code in ormar\queryset\field_accessor.py
29
30
31
32
33
34
35
36
def __bool__(self) -> bool:
    """
    Hack to avoid pydantic name check from parent model, returns false

    :return: False
    :rtype: bool
    """
    return False

__eq__(other)

overloaded to work as sql column = <VALUE>

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
84
85
86
87
88
89
90
91
92
93
def __eq__(self, other: Any) -> FilterGroup:  # type: ignore
    """
    overloaded to work as sql `column = <VALUE>`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="__eq__", other=other)

__ge__(other)

overloaded to work as sql column >= <VALUE>

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
 95
 96
 97
 98
 99
100
101
102
103
104
def __ge__(self, other: Any) -> FilterGroup:
    """
    overloaded to work as sql `column >= <VALUE>`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="__ge__", other=other)

__getattr__(item)

Accessor return new accessor for each field and nested models. Thanks to that operator overload is possible to use in filter.

Parameters:

Name Type Description Default
item str

attribute name

required

Returns:

Type Description
ormar.queryset.field_accessor.FieldAccessor

FieldAccessor for field or nested model

Source code in ormar\queryset\field_accessor.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __getattr__(self, item: str) -> Any:
    """
    Accessor return new accessor for each field and nested models.
    Thanks to that operator overload is possible to use in filter.

    :param item: attribute name
    :type item: str
    :return: FieldAccessor for field or nested model
    :rtype: ormar.queryset.field_accessor.FieldAccessor
    """
    if (
        object.__getattribute__(self, "_field")
        and item == object.__getattribute__(self, "_field").name
    ):
        return self._field

    if (
        object.__getattribute__(self, "_model")
        and item in object.__getattribute__(self, "_model").Meta.model_fields
    ):
        field = cast("Model", self._model).Meta.model_fields[item]
        if field.is_relation:
            return FieldAccessor(
                source_model=self._source_model,
                model=field.to,
                access_chain=self._access_chain + f"__{item}",
            )
        else:
            return FieldAccessor(
                source_model=self._source_model,
                field=field,
                access_chain=self._access_chain + f"__{item}",
            )
    return object.__getattribute__(self, item)  # pragma: no cover

__gt__(other)

overloaded to work as sql column > <VALUE>

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
106
107
108
109
110
111
112
113
114
115
def __gt__(self, other: Any) -> FilterGroup:
    """
    overloaded to work as sql `column > <VALUE>`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="__gt__", other=other)

__le__(other)

overloaded to work as sql column <= <VALUE>

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
117
118
119
120
121
122
123
124
125
126
def __le__(self, other: Any) -> FilterGroup:
    """
    overloaded to work as sql `column <= <VALUE>`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="__le__", other=other)

__lshift__(other)

overloaded to work as sql column IN (<VALUE1>, <VALUE2>,...)

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
150
151
152
153
154
155
156
157
158
159
def __lshift__(self, other: Any) -> FilterGroup:
    """
    overloaded to work as sql `column IN (<VALUE1>, <VALUE2>,...)`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="in", other=other)

__lt__(other)

overloaded to work as sql column < <VALUE>

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
128
129
130
131
132
133
134
135
136
137
def __lt__(self, other: Any) -> FilterGroup:
    """
    overloaded to work as sql `column < <VALUE>`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="__lt__", other=other)

__mod__(other)

overloaded to work as sql column LIKE '%<VALUE>%'

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
139
140
141
142
143
144
145
146
147
148
def __mod__(self, other: Any) -> FilterGroup:
    """
    overloaded to work as sql `column LIKE '%<VALUE>%'`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="__mod__", other=other)

__rshift__(other)

overloaded to work as sql column IS NULL

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
161
162
163
164
165
166
167
168
169
170
def __rshift__(self, other: Any) -> FilterGroup:
    """
    overloaded to work as sql `column IS NULL`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="isnull", other=True)

asc()

works as sql column asc

Returns:

Type Description
ormar.queryset.actions.OrderGroup

OrderGroup for operator

Source code in ormar\queryset\field_accessor.py
271
272
273
274
275
276
277
278
def asc(self) -> OrderAction:
    """
    works as sql `column asc`

    :return: OrderGroup for operator
    :rtype: ormar.queryset.actions.OrderGroup
    """
    return OrderAction(order_str=self._access_chain, model_cls=self._source_model)

contains(other)

works as sql column LIKE '%<VALUE>%'

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
194
195
196
197
198
199
200
201
202
203
def contains(self, other: Any) -> FilterGroup:
    """
    works as sql `column LIKE '%<VALUE>%'`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="contains", other=other)

desc()

works as sql column desc

Returns:

Type Description
ormar.queryset.actions.OrderGroup

OrderGroup for operator

Source code in ormar\queryset\field_accessor.py
280
281
282
283
284
285
286
287
288
289
def desc(self) -> OrderAction:
    """
    works as sql `column desc`

    :return: OrderGroup for operator
    :rtype: ormar.queryset.actions.OrderGroup
    """
    return OrderAction(
        order_str="-" + self._access_chain, model_cls=self._source_model
    )

endswith(other)

works as sql column LIKE '%<VALUE>'

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
238
239
240
241
242
243
244
245
246
247
def endswith(self, other: Any) -> FilterGroup:
    """
    works as sql `column LIKE '%<VALUE>'`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="endswith", other=other)

icontains(other)

works as sql column LIKE '%<VALUE>%' case-insensitive

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
205
206
207
208
209
210
211
212
213
214
def icontains(self, other: Any) -> FilterGroup:
    """
    works as sql `column LIKE '%<VALUE>%'` case-insensitive

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="icontains", other=other)

iendswith(other)

works as sql column LIKE '%<VALUE>' case-insensitive

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
249
250
251
252
253
254
255
256
257
258
def iendswith(self, other: Any) -> FilterGroup:
    """
    works as sql `column LIKE '%<VALUE>'` case-insensitive

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="iendswith", other=other)

iexact(other)

works as sql column = <VALUE> case-insensitive

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
183
184
185
186
187
188
189
190
191
192
def iexact(self, other: Any) -> FilterGroup:
    """
    works as sql `column = <VALUE>` case-insensitive

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="iexact", other=other)

in_(other)

works as sql column IN (<VALUE1>, <VALUE2>,...)

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
172
173
174
175
176
177
178
179
180
181
def in_(self, other: Any) -> FilterGroup:
    """
    works as sql `column IN (<VALUE1>, <VALUE2>,...)`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="in", other=other)

isnull(other)

works as sql column IS NULL or IS NOT NULL

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
260
261
262
263
264
265
266
267
268
269
def isnull(self, other: Any) -> FilterGroup:
    """
    works as sql `column IS NULL` or `IS NOT NULL`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="isnull", other=other)

istartswith(other)

works as sql column LIKE '%<VALUE>' case-insensitive

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
227
228
229
230
231
232
233
234
235
236
def istartswith(self, other: Any) -> FilterGroup:
    """
    works as sql `column LIKE '%<VALUE>'` case-insensitive

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="istartswith", other=other)

startswith(other)

works as sql column LIKE '<VALUE>%'

Parameters:

Name Type Description Default
other Any

value to check agains operator

required

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup for operator

Source code in ormar\queryset\field_accessor.py
216
217
218
219
220
221
222
223
224
225
def startswith(self, other: Any) -> FilterGroup:
    """
    works as sql `column LIKE '<VALUE>%'`

    :param other: value to check agains operator
    :type other: str
    :return: FilterGroup for operator
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return self._select_operator(op="startswith", other=other)

FilterAction

Bases: QueryAction

Filter Actions is populated by queryset when filter() is called.

All required params are extracted but kept raw until actual filter clause value is required -> then the action is converted into text() clause.

Extracted in order to easily change table prefixes on complex relations.

Source code in ormar\queryset\actions\filter_action.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class FilterAction(QueryAction):
    """
    Filter Actions is populated by queryset when filter() is called.

    All required params are extracted but kept raw until actual filter clause value
    is required -> then the action is converted into text() clause.

    Extracted in order to easily change table prefixes on complex relations.
    """

    def __init__(self, filter_str: str, value: Any, model_cls: Type["Model"]) -> None:
        super().__init__(query_str=filter_str, model_cls=model_cls)
        self.filter_value = value
        self._escape_characters_in_clause()

    def has_escaped_characters(self) -> bool:
        """Check if value is a string that contains characters to escape"""
        return isinstance(self.filter_value, str) and any(
            c for c in ESCAPE_CHARACTERS if c in self.filter_value
        )

    def _split_value_into_parts(self, query_str: str) -> None:
        parts = query_str.split("__")
        if parts[-1] in FILTER_OPERATORS:
            self.operator = parts[-1]
            self.field_name = parts[-2]
            self.related_parts = parts[:-2]
        else:
            self.operator = "exact"
            self.field_name = parts[-1]
            self.related_parts = parts[:-1]

    def _escape_characters_in_clause(self) -> None:
        """
        Escapes the special characters ["%", "_"] if needed.
        Adds `%` for `like` queries.

        :raises QueryDefinitionError: if contains or icontains is used with
        ormar model instance
        :return: escaped value and flag if escaping is needed
        :rtype: Tuple[Any, bool]
        """
        self.has_escaped_character = False
        if self.operator in [
            "contains",
            "icontains",
            "startswith",
            "istartswith",
            "endswith",
            "iendswith",
        ]:
            if isinstance(self.filter_value, ormar.Model):
                raise QueryDefinitionError(
                    "You cannot use contains and icontains with instance of the Model"
                )
            self.has_escaped_character = self.has_escaped_characters()
            if self.has_escaped_character:
                self._escape_chars()
            self._prefix_suffix_quote()

    def _escape_chars(self) -> None:
        """Actually replaces chars to escape in value"""
        for char in ESCAPE_CHARACTERS:
            self.filter_value = self.filter_value.replace(char, f"\\{char}")

    def _prefix_suffix_quote(self) -> None:
        """
        Adds % to the beginning of the value if operator checks for containment and not
        starts with.

        Adds % to the end of the value if operator checks for containment and not
        end with.
        :return:
        :rtype:
        """
        prefix = "%" if "start" not in self.operator else ""
        sufix = "%" if "end" not in self.operator else ""
        self.filter_value = f"{prefix}{self.filter_value}{sufix}"

    def get_text_clause(self) -> sqlalchemy.sql.expression.BinaryExpression:
        """
        Escapes characters if it's required.
        Substitutes values of the models if value is a ormar Model with its pk value.
        Compiles the clause.

        :return: complied and escaped clause
        :rtype: sqlalchemy.sql.elements.TextClause
        """
        if isinstance(self.filter_value, ormar.Model):
            self.filter_value = self.filter_value.pk

        op_attr = FILTER_OPERATORS[self.operator]
        if self.operator == "isnull":
            op_attr = "is_" if self.filter_value else "isnot"
            filter_value = None
        else:
            filter_value = self.filter_value
        if self.table_prefix:
            aliased_table = self.source_model.Meta.alias_manager.prefixed_table_name(
                self.table_prefix, self.column.table
            )
            aliased_column = getattr(aliased_table.c, self.column.name)
        else:
            aliased_column = self.column
        clause = getattr(aliased_column, op_attr)(filter_value)
        if self.has_escaped_character:
            clause.modifiers["escape"] = "\\"
        return clause

get_text_clause()

Escapes characters if it's required. Substitutes values of the models if value is a ormar Model with its pk value. Compiles the clause.

Returns:

Type Description
sqlalchemy.sql.elements.TextClause

complied and escaped clause

Source code in ormar\queryset\actions\filter_action.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def get_text_clause(self) -> sqlalchemy.sql.expression.BinaryExpression:
    """
    Escapes characters if it's required.
    Substitutes values of the models if value is a ormar Model with its pk value.
    Compiles the clause.

    :return: complied and escaped clause
    :rtype: sqlalchemy.sql.elements.TextClause
    """
    if isinstance(self.filter_value, ormar.Model):
        self.filter_value = self.filter_value.pk

    op_attr = FILTER_OPERATORS[self.operator]
    if self.operator == "isnull":
        op_attr = "is_" if self.filter_value else "isnot"
        filter_value = None
    else:
        filter_value = self.filter_value
    if self.table_prefix:
        aliased_table = self.source_model.Meta.alias_manager.prefixed_table_name(
            self.table_prefix, self.column.table
        )
        aliased_column = getattr(aliased_table.c, self.column.name)
    else:
        aliased_column = self.column
    clause = getattr(aliased_column, op_attr)(filter_value)
    if self.has_escaped_character:
        clause.modifiers["escape"] = "\\"
    return clause

has_escaped_characters()

Check if value is a string that contains characters to escape

Source code in ormar\queryset\actions\filter_action.py
63
64
65
66
67
def has_escaped_characters(self) -> bool:
    """Check if value is a string that contains characters to escape"""
    return isinstance(self.filter_value, str) and any(
        c for c in ESCAPE_CHARACTERS if c in self.filter_value
    )

FilterQuery

Modifies the select query with given list of where/filter clauses.

Source code in ormar\queryset\queries\filter_query.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class FilterQuery:
    """
    Modifies the select query with given list of where/filter clauses.
    """

    def __init__(
        self, filter_clauses: List[FilterAction], exclude: bool = False
    ) -> None:
        self.exclude = exclude
        self.filter_clauses = filter_clauses

    def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
        """
        Applies all filter clauses if set.

        :param expr: query to modify
        :type expr: sqlalchemy.sql.selectable.Select
        :return: modified query
        :rtype: sqlalchemy.sql.selectable.Select
        """
        if self.filter_clauses:
            if len(self.filter_clauses) == 1:
                clause = self.filter_clauses[0].get_text_clause()
            else:
                clause = sqlalchemy.sql.and_(
                    *[x.get_text_clause() for x in self.filter_clauses]
                )
            clause = sqlalchemy.sql.not_(clause) if self.exclude else clause
            expr = expr.where(clause)
        return expr

apply(expr)

Applies all filter clauses if set.

Parameters:

Name Type Description Default
expr sqlalchemy.sql.select

query to modify

required

Returns:

Type Description
sqlalchemy.sql.selectable.Select

modified query

Source code in ormar\queryset\queries\filter_query.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
    """
    Applies all filter clauses if set.

    :param expr: query to modify
    :type expr: sqlalchemy.sql.selectable.Select
    :return: modified query
    :rtype: sqlalchemy.sql.selectable.Select
    """
    if self.filter_clauses:
        if len(self.filter_clauses) == 1:
            clause = self.filter_clauses[0].get_text_clause()
        else:
            clause = sqlalchemy.sql.and_(
                *[x.get_text_clause() for x in self.filter_clauses]
            )
        clause = sqlalchemy.sql.not_(clause) if self.exclude else clause
        expr = expr.where(clause)
    return expr

LimitQuery

Modifies the select query with limit clause.

Source code in ormar\queryset\queries\limit_query.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class LimitQuery:
    """
    Modifies the select query with limit clause.
    """

    def __init__(self, limit_count: Optional[int]) -> None:
        self.limit_count = limit_count

    def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
        """
        Applies the limit clause.

        :param expr: query to modify
        :type expr: sqlalchemy.sql.selectable.Select
        :return: modified query
        :rtype: sqlalchemy.sql.selectable.Select
        """

        if self.limit_count is not None:
            expr = expr.limit(self.limit_count)

        return expr

apply(expr)

Applies the limit clause.

Parameters:

Name Type Description Default
expr sqlalchemy.sql.select

query to modify

required

Returns:

Type Description
sqlalchemy.sql.selectable.Select

modified query

Source code in ormar\queryset\queries\limit_query.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
    """
    Applies the limit clause.

    :param expr: query to modify
    :type expr: sqlalchemy.sql.selectable.Select
    :return: modified query
    :rtype: sqlalchemy.sql.selectable.Select
    """

    if self.limit_count is not None:
        expr = expr.limit(self.limit_count)

    return expr

OffsetQuery

Modifies the select query with offset if set

Source code in ormar\queryset\queries\offset_query.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class OffsetQuery:
    """
    Modifies the select query with offset if set
    """

    def __init__(self, query_offset: Optional[int]) -> None:
        self.query_offset = query_offset

    def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
        """
        Applies the offset clause.

        :param expr: query to modify
        :type expr: sqlalchemy.sql.selectable.Select
        :return: modified query
        :rtype: sqlalchemy.sql.selectable.Select
        """
        if self.query_offset:
            expr = expr.offset(self.query_offset)
        return expr

apply(expr)

Applies the offset clause.

Parameters:

Name Type Description Default
expr sqlalchemy.sql.select

query to modify

required

Returns:

Type Description
sqlalchemy.sql.selectable.Select

modified query

Source code in ormar\queryset\queries\offset_query.py
14
15
16
17
18
19
20
21
22
23
24
25
def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
    """
    Applies the offset clause.

    :param expr: query to modify
    :type expr: sqlalchemy.sql.selectable.Select
    :return: modified query
    :rtype: sqlalchemy.sql.selectable.Select
    """
    if self.query_offset:
        expr = expr.offset(self.query_offset)
    return expr

OrderAction

Bases: QueryAction

Order Actions is populated by queryset when order_by() is called.

All required params are extracted but kept raw until actual filter clause value is required -> then the action is converted into text() clause.

Extracted in order to easily change table prefixes on complex relations.

Source code in ormar\queryset\actions\order_action.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class OrderAction(QueryAction):
    """
    Order Actions is populated by queryset when order_by() is called.

    All required params are extracted but kept raw until actual filter clause value
    is required -> then the action is converted into text() clause.

    Extracted in order to easily change table prefixes on complex relations.
    """

    def __init__(
        self, order_str: str, model_cls: Type["Model"], alias: str = None
    ) -> None:
        self.direction: str = ""
        super().__init__(query_str=order_str, model_cls=model_cls)
        self.is_source_model_order = False
        if alias:
            self.table_prefix = alias
        if self.source_model == self.target_model and "__" not in self.related_str:
            self.is_source_model_order = True

    @property
    def field_alias(self) -> str:
        return self.target_model.get_column_alias(self.field_name)

    @property
    def is_postgres_bool(self) -> bool:
        dialect = self.target_model.Meta.database._backend._dialect.name
        field_type = self.target_model.Meta.model_fields[self.field_name].__type__
        return dialect == "postgresql" and field_type == bool

    def get_field_name_text(self) -> str:
        """
        Escapes characters if it's required.
        Substitutes values of the models if value is a ormar Model with its pk value.
        Compiles the clause.

        :return: complied and escaped clause
        :rtype: sqlalchemy.sql.elements.TextClause
        """
        prefix = f"{self.table_prefix}_" if self.table_prefix else ""
        return f"{prefix}{self.table}" f".{self.field_alias}"

    def get_min_or_max(self) -> sqlalchemy.sql.expression.TextClause:
        """
        Used in limit sub queries where you need to use aggregated functions
        in order to order by columns not included in group by. For postgres bool
        field it's using bool_or function as aggregates does not work with this type
        of columns.

        :return: min or max function to order
        :rtype: sqlalchemy.sql.elements.TextClause
        """
        prefix = f"{self.table_prefix}_" if self.table_prefix else ""
        if self.direction == "":
            function = "min" if not self.is_postgres_bool else "bool_or"
            return text(f"{function}({prefix}{self.table}" f".{self.field_alias})")
        function = "max" if not self.is_postgres_bool else "bool_or"
        return text(f"{function}({prefix}{self.table}" f".{self.field_alias}) desc")

    def get_text_clause(self) -> sqlalchemy.sql.expression.TextClause:
        """
        Escapes characters if it's required.
        Substitutes values of the models if value is a ormar Model with its pk value.
        Compiles the clause.

        :return: complied and escaped clause
        :rtype: sqlalchemy.sql.elements.TextClause
        """
        dialect = self.target_model.Meta.database._backend._dialect
        quoter = dialect.identifier_preparer.quote
        prefix = f"{self.table_prefix}_" if self.table_prefix else ""
        table_name = self.table.name
        field_name = self.field_alias
        if not prefix:
            table_name = quoter(table_name)
        else:
            table_name = quoter(f"{prefix}{table_name}")
        field_name = quoter(field_name)
        return text(f"{table_name}.{field_name} {self.direction}")

    def _split_value_into_parts(self, order_str: str) -> None:
        if order_str.startswith("-"):
            self.direction = "desc"
            order_str = order_str[1:]
        parts = order_str.split("__")
        self.field_name = parts[-1]
        self.related_parts = parts[:-1]

    def check_if_filter_apply(self, target_model: Type["Model"], alias: str) -> bool:
        """
        Checks filter conditions to find if they apply to current join.

        :param target_model: model which is now processed
        :type target_model: Type["Model"]
        :param alias: prefix of the relation
        :type alias: str
        :return: result of the check
        :rtype: bool
        """
        return target_model == self.target_model and alias == self.table_prefix

check_if_filter_apply(target_model, alias)

Checks filter conditions to find if they apply to current join.

Parameters:

Name Type Description Default
target_model Type[Model]

model which is now processed

required
alias str

prefix of the relation

required

Returns:

Type Description
bool

result of the check

Source code in ormar\queryset\actions\order_action.py
101
102
103
104
105
106
107
108
109
110
111
112
def check_if_filter_apply(self, target_model: Type["Model"], alias: str) -> bool:
    """
    Checks filter conditions to find if they apply to current join.

    :param target_model: model which is now processed
    :type target_model: Type["Model"]
    :param alias: prefix of the relation
    :type alias: str
    :return: result of the check
    :rtype: bool
    """
    return target_model == self.target_model and alias == self.table_prefix

get_field_name_text()

Escapes characters if it's required. Substitutes values of the models if value is a ormar Model with its pk value. Compiles the clause.

Returns:

Type Description
sqlalchemy.sql.elements.TextClause

complied and escaped clause

Source code in ormar\queryset\actions\order_action.py
43
44
45
46
47
48
49
50
51
52
53
def get_field_name_text(self) -> str:
    """
    Escapes characters if it's required.
    Substitutes values of the models if value is a ormar Model with its pk value.
    Compiles the clause.

    :return: complied and escaped clause
    :rtype: sqlalchemy.sql.elements.TextClause
    """
    prefix = f"{self.table_prefix}_" if self.table_prefix else ""
    return f"{prefix}{self.table}" f".{self.field_alias}"

get_min_or_max()

Used in limit sub queries where you need to use aggregated functions in order to order by columns not included in group by. For postgres bool field it's using bool_or function as aggregates does not work with this type of columns.

Returns:

Type Description
sqlalchemy.sql.elements.TextClause

min or max function to order

Source code in ormar\queryset\actions\order_action.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def get_min_or_max(self) -> sqlalchemy.sql.expression.TextClause:
    """
    Used in limit sub queries where you need to use aggregated functions
    in order to order by columns not included in group by. For postgres bool
    field it's using bool_or function as aggregates does not work with this type
    of columns.

    :return: min or max function to order
    :rtype: sqlalchemy.sql.elements.TextClause
    """
    prefix = f"{self.table_prefix}_" if self.table_prefix else ""
    if self.direction == "":
        function = "min" if not self.is_postgres_bool else "bool_or"
        return text(f"{function}({prefix}{self.table}" f".{self.field_alias})")
    function = "max" if not self.is_postgres_bool else "bool_or"
    return text(f"{function}({prefix}{self.table}" f".{self.field_alias}) desc")

get_text_clause()

Escapes characters if it's required. Substitutes values of the models if value is a ormar Model with its pk value. Compiles the clause.

Returns:

Type Description
sqlalchemy.sql.elements.TextClause

complied and escaped clause

Source code in ormar\queryset\actions\order_action.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def get_text_clause(self) -> sqlalchemy.sql.expression.TextClause:
    """
    Escapes characters if it's required.
    Substitutes values of the models if value is a ormar Model with its pk value.
    Compiles the clause.

    :return: complied and escaped clause
    :rtype: sqlalchemy.sql.elements.TextClause
    """
    dialect = self.target_model.Meta.database._backend._dialect
    quoter = dialect.identifier_preparer.quote
    prefix = f"{self.table_prefix}_" if self.table_prefix else ""
    table_name = self.table.name
    field_name = self.field_alias
    if not prefix:
        table_name = quoter(table_name)
    else:
        table_name = quoter(f"{prefix}{table_name}")
    field_name = quoter(field_name)
    return text(f"{table_name}.{field_name} {self.direction}")

OrderQuery

Modifies the select query with given list of order_by clauses.

Source code in ormar\queryset\queries\order_query.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class OrderQuery:
    """
    Modifies the select query with given list of order_by clauses.
    """

    def __init__(self, sorted_orders: Dict) -> None:
        self.sorted_orders = sorted_orders

    def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
        """
        Applies all order_by clauses if set.

        :param expr: query to modify
        :type expr: sqlalchemy.sql.selectable.Select
        :return: modified query
        :rtype: sqlalchemy.sql.selectable.Select
        """
        if self.sorted_orders:
            for order in list(self.sorted_orders.values()):
                if order is not None:
                    expr = expr.order_by(order)
        return expr

apply(expr)

Applies all order_by clauses if set.

Parameters:

Name Type Description Default
expr sqlalchemy.sql.select

query to modify

required

Returns:

Type Description
sqlalchemy.sql.selectable.Select

modified query

Source code in ormar\queryset\queries\order_query.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def apply(self, expr: sqlalchemy.sql.select) -> sqlalchemy.sql.select:
    """
    Applies all order_by clauses if set.

    :param expr: query to modify
    :type expr: sqlalchemy.sql.selectable.Select
    :return: modified query
    :rtype: sqlalchemy.sql.selectable.Select
    """
    if self.sorted_orders:
        for order in list(self.sorted_orders.values()):
            if order is not None:
                expr = expr.order_by(order)
    return expr

QuerySet

Bases: Generic[T]

Main class to perform database queries, exposed on each model as objects attribute.

Source code in ormar\queryset\queryset.py
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
class QuerySet(Generic[T]):
    """
    Main class to perform database queries, exposed on each model as objects attribute.
    """

    def __init__(  # noqa CFQ002
        self,
        model_cls: Optional[Type["T"]] = None,
        filter_clauses: List = None,
        exclude_clauses: List = None,
        select_related: List = None,
        limit_count: int = None,
        offset: int = None,
        excludable: "ExcludableItems" = None,
        order_bys: List = None,
        prefetch_related: List = None,
        limit_raw_sql: bool = False,
        proxy_source_model: Optional[Type["Model"]] = None,
    ) -> None:
        self.proxy_source_model = proxy_source_model
        self.model_cls = model_cls
        self.filter_clauses = [] if filter_clauses is None else filter_clauses
        self.exclude_clauses = [] if exclude_clauses is None else exclude_clauses
        self._select_related = [] if select_related is None else select_related
        self._prefetch_related = [] if prefetch_related is None else prefetch_related
        self.limit_count = limit_count
        self.query_offset = offset
        self._excludable = excludable or ormar.ExcludableItems()
        self.order_bys = order_bys or []
        self.limit_sql_raw = limit_raw_sql

    @property
    def model_meta(self) -> "ModelMeta":
        """
        Shortcut to model class Meta set on QuerySet model.

        :return: Meta class of the model
        :rtype: model Meta class
        """
        if not self.model_cls:  # pragma nocover
            raise ValueError("Model class of QuerySet is not initialized")
        return self.model_cls.Meta

    @property
    def model(self) -> Type["T"]:
        """
        Shortcut to model class set on QuerySet.

        :return: model class
        :rtype: Type[Model]
        """
        if not self.model_cls:  # pragma nocover
            raise ValueError("Model class of QuerySet is not initialized")
        return self.model_cls

    def rebuild_self(  # noqa: CFQ002
        self,
        filter_clauses: List = None,
        exclude_clauses: List = None,
        select_related: List = None,
        limit_count: int = None,
        offset: int = None,
        excludable: "ExcludableItems" = None,
        order_bys: List = None,
        prefetch_related: List = None,
        limit_raw_sql: bool = None,
        proxy_source_model: Optional[Type["Model"]] = None,
    ) -> "QuerySet":
        """
        Method that returns new instance of queryset based on passed params,
        all not passed params are taken from current values.
        """
        overwrites = {
            "select_related": "_select_related",
            "offset": "query_offset",
            "excludable": "_excludable",
            "prefetch_related": "_prefetch_related",
            "limit_raw_sql": "limit_sql_raw",
        }
        passed_args = locals()

        def replace_if_none(arg_name: str) -> Any:
            if passed_args.get(arg_name) is None:
                return getattr(self, overwrites.get(arg_name, arg_name))
            return passed_args.get(arg_name)

        return self.__class__(
            model_cls=self.model_cls,
            filter_clauses=replace_if_none("filter_clauses"),
            exclude_clauses=replace_if_none("exclude_clauses"),
            select_related=replace_if_none("select_related"),
            limit_count=replace_if_none("limit_count"),
            offset=replace_if_none("offset"),
            excludable=replace_if_none("excludable"),
            order_bys=replace_if_none("order_bys"),
            prefetch_related=replace_if_none("prefetch_related"),
            limit_raw_sql=replace_if_none("limit_raw_sql"),
            proxy_source_model=replace_if_none("proxy_source_model"),
        )

    async def _prefetch_related_models(
        self, models: List["T"], rows: List
    ) -> List["T"]:
        """
        Performs prefetch query for selected models names.

        :param models: list of already parsed main Models from main query
        :type models: List[Model]
        :param rows: database rows from main query
        :type rows: List[sqlalchemy.engine.result.RowProxy]
        :return: list of models with prefetch models populated
        :rtype: List[Model]
        """
        query = PrefetchQuery(
            model_cls=self.model,
            excludable=self._excludable,
            prefetch_related=self._prefetch_related,
            select_related=self._select_related,
            orders_by=self.order_bys,
        )
        return await query.prefetch_related(models=models, rows=rows)  # type: ignore

    async def _process_query_result_rows(self, rows: List) -> List["T"]:
        """
        Process database rows and initialize ormar Model from each of the rows.

        :param rows: list of database rows from query result
        :type rows: List[sqlalchemy.engine.result.RowProxy]
        :return: list of models
        :rtype: List[Model]
        """
        result_rows = []
        for row in rows:
            result_rows.append(
                self.model.from_row(
                    row=row,
                    select_related=self._select_related,
                    excludable=self._excludable,
                    source_model=self.model,
                    proxy_source_model=self.proxy_source_model,
                )
            )
            await asyncio.sleep(0)

        if result_rows:
            return self.model.merge_instances_list(result_rows)  # type: ignore
        return cast(List["T"], result_rows)

    def _resolve_filter_groups(
        self, groups: Any
    ) -> Tuple[List[FilterGroup], List[str]]:
        """
        Resolves filter groups to populate FilterAction params in group tree.

        :param groups: tuple of FilterGroups
        :type groups: Any
        :return: list of resolver groups
        :rtype: Tuple[List[FilterGroup], List[str]]
        """
        filter_groups = []
        select_related = self._select_related
        if groups:
            for group in groups:
                if not isinstance(group, FilterGroup):
                    raise QueryDefinitionError(
                        "Only ormar.and_ and ormar.or_ "
                        "can be passed as filter positional"
                        " arguments,"
                        "other values need to be passed by"
                        "keyword arguments"
                    )
                _, select_related = group.resolve(
                    model_cls=self.model,
                    select_related=self._select_related,
                    filter_clauses=self.filter_clauses,
                )
                filter_groups.append(group)
        return filter_groups, select_related

    @staticmethod
    def check_single_result_rows_count(rows: Sequence[Optional["T"]]) -> None:
        """
        Verifies if the result has one and only one row.

        :param rows: one element list of Models
        :type rows: List[Model]
        """
        if not rows or rows[0] is None:
            raise NoMatch()
        if len(rows) > 1:
            raise MultipleMatches()

    @property
    def database(self) -> databases.Database:
        """
        Shortcut to models database from Meta class.

        :return: database
        :rtype: databases.Database
        """
        return self.model_meta.database

    @property
    def table(self) -> sqlalchemy.Table:
        """
        Shortcut to models table from Meta class.

        :return: database table
        :rtype: sqlalchemy.Table
        """
        return self.model_meta.table

    def build_select_expression(
        self, limit: int = None, offset: int = None, order_bys: List = None
    ) -> sqlalchemy.sql.select:
        """
        Constructs the actual database query used in the QuerySet.
        If any of the params is not passed the QuerySet own value is used.

        :param limit: number to limit the query
        :type limit: int
        :param offset: number to offset by
        :type offset: int
        :param order_bys: list of order-by fields names
        :type order_bys: List
        :return: built sqlalchemy select expression
        :rtype: sqlalchemy.sql.selectable.Select
        """
        qry = Query(
            model_cls=self.model,
            select_related=self._select_related,
            filter_clauses=self.filter_clauses,
            exclude_clauses=self.exclude_clauses,
            offset=offset or self.query_offset,
            excludable=self._excludable,
            order_bys=order_bys or self.order_bys,
            limit_raw_sql=self.limit_sql_raw,
            limit_count=limit if limit is not None else self.limit_count,
        )
        exp = qry.build_select_expression()
        # print("\n", exp.compile(compile_kwargs={"literal_binds": True}))
        return exp

    def filter(  # noqa: A003
        self, *args: Any, _exclude: bool = False, **kwargs: Any
    ) -> "QuerySet[T]":
        """
        Allows you to filter by any `Model` attribute/field
        as well as to fetch instances, with a filter across an FK relationship.

        You can use special filter suffix to change the filter operands:

        *  exact - like `album__name__exact='Malibu'` (exact match)
        *  iexact - like `album__name__iexact='malibu'` (exact match case insensitive)
        *  contains - like `album__name__contains='Mal'` (sql like)
        *  icontains - like `album__name__icontains='mal'` (sql like case insensitive)
        *  in - like `album__name__in=['Malibu', 'Barclay']` (sql in)
        *  isnull - like `album__name__isnull=True` (sql is null)
           (isnotnull `album__name__isnull=False` (sql is not null))
        *  gt - like `position__gt=3` (sql >)
        *  gte - like `position__gte=3` (sql >=)
        *  lt - like `position__lt=3` (sql <)
        *  lte - like `position__lte=3` (sql <=)
        *  startswith - like `album__name__startswith='Mal'` (exact start match)
        *  istartswith - like `album__name__istartswith='mal'` (case insensitive)
        *  endswith - like `album__name__endswith='ibu'` (exact end match)
        *  iendswith - like `album__name__iendswith='IBU'` (case insensitive)

        Note that you can also use python style filters - check the docs!

        :param _exclude: flag if it should be exclude or filter
        :type _exclude: bool
        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: filtered QuerySet
        :rtype: QuerySet
        """
        filter_groups, select_related = self._resolve_filter_groups(groups=args)
        qryclause = QueryClause(
            model_cls=self.model,
            select_related=select_related,
            filter_clauses=self.filter_clauses,
        )
        filter_clauses, select_related = qryclause.prepare_filter(**kwargs)
        filter_clauses = filter_clauses + filter_groups  # type: ignore
        if _exclude:
            exclude_clauses = filter_clauses
            filter_clauses = self.filter_clauses
        else:
            exclude_clauses = self.exclude_clauses
            filter_clauses = filter_clauses

        return self.rebuild_self(
            filter_clauses=filter_clauses,
            exclude_clauses=exclude_clauses,
            select_related=select_related,
        )

    def exclude(self, *args: Any, **kwargs: Any) -> "QuerySet[T]":  # noqa: A003
        """
        Works exactly the same as filter and all modifiers (suffixes) are the same,
        but returns a *not* condition.

        So if you use `filter(name='John')` which is `where name = 'John'` in SQL,
        the `exclude(name='John')` equals to `where name <> 'John'`

        Note that all conditions are joined so if you pass multiple values it
        becomes a union of conditions.

        `exclude(name='John', age>=35)` will become
        `where not (name='John' and age>=35)`

        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: filtered QuerySet
        :rtype: QuerySet
        """
        return self.filter(_exclude=True, *args, **kwargs)

    def select_related(self, related: Union[List, str, FieldAccessor]) -> "QuerySet[T]":
        """
        Allows to prefetch related models during the same query.

        **With `select_related` always only one query is run against the database**,
        meaning that one (sometimes complicated) join is generated and later nested
        models are processed in python.

        To fetch related model use `ForeignKey` names.

        To chain related `Models` relation use double underscores between names.

        :param related: list of relation field names, can be linked by '__' to nest
        :type related: Union[List, str]
        :return: QuerySet
        :rtype: QuerySet
        """
        if not isinstance(related, list):
            related = [related]
        related = [
            rel._access_chain if isinstance(rel, FieldAccessor) else rel
            for rel in related
        ]

        related = sorted(list(set(list(self._select_related) + related)))
        return self.rebuild_self(select_related=related)

    def select_all(self, follow: bool = False) -> "QuerySet[T]":
        """
        By default adds only directly related models.

        If follow=True is set it adds also related models of related models.

        To not get stuck in an infinite loop as related models also keep a relation
        to parent model visited models set is kept.

        That way already visited models that are nested are loaded, but the load do not
        follow them inside. So Model A -> Model B -> Model C -> Model A -> Model X
        will load second Model A but will never follow into Model X.
        Nested relations of those kind need to be loaded manually.

        :param follow: flag to trigger deep save -
        by default only directly related models are saved
        with follow=True also related models of related models are saved
        :type follow: bool
        :return: reloaded Model
        :rtype: Model
        """
        relations = list(self.model.extract_related_names())
        if follow:
            relations = self.model._iterate_related_models()
        return self.rebuild_self(select_related=relations)

    def prefetch_related(
        self, related: Union[List, str, FieldAccessor]
    ) -> "QuerySet[T]":
        """
        Allows to prefetch related models during query - but opposite to
        `select_related` each subsequent model is fetched in a separate database query.

        **With `prefetch_related` always one query per Model is run against the
        database**, meaning that you will have multiple queries executed one
        after another.

        To fetch related model use `ForeignKey` names.

        To chain related `Models` relation use double underscores between names.

        :param related: list of relation field names, can be linked by '__' to nest
        :type related: Union[List, str]
        :return: QuerySet
        :rtype: QuerySet
        """
        if not isinstance(related, list):
            related = [related]
        related = [
            rel._access_chain if isinstance(rel, FieldAccessor) else rel
            for rel in related
        ]

        related = list(set(list(self._prefetch_related) + related))
        return self.rebuild_self(prefetch_related=related)

    def fields(
        self, columns: Union[List, str, Set, Dict], _is_exclude: bool = False
    ) -> "QuerySet[T]":
        """
        With `fields()` you can select subset of model columns to limit the data load.

        Note that `fields()` and `exclude_fields()` works both for main models
        (on normal queries like `get`, `all` etc.)
        as well as `select_related` and `prefetch_related`
        models (with nested notation).

        You can select specified fields by passing a `str, List[str], Set[str] or
        dict` with nested definition.

        To include related models use notation
        `{related_name}__{column}[__{optional_next} etc.]`.

        `fields()` can be called several times, building up the columns to select.

        If you include related models into `select_related()` call but you won't specify
        columns for those models in fields - implies a list of all fields for
        those nested models.

        Mandatory fields cannot be excluded as it will raise `ValidationError`,
        to exclude a field it has to be nullable.

        Pk column cannot be excluded - it's always auto added even if
        not explicitly included.

        You can also pass fields to include as dictionary or set.

        To mark a field as included in a dictionary use it's name as key
        and ellipsis as value.

        To traverse nested models use nested dictionaries.

        To include fields at last level instead of nested dictionary a set can be used.

        To include whole nested model specify model related field name and ellipsis.

        :param _is_exclude: flag if it's exclude or include operation
        :type _is_exclude: bool
        :param columns: columns to include
        :type columns: Union[List, str, Set, Dict]
        :return: QuerySet
        :rtype: QuerySet
        """
        excludable = ormar.ExcludableItems.from_excludable(self._excludable)
        excludable.build(
            items=columns,
            model_cls=self.model_cls,  # type: ignore
            is_exclude=_is_exclude,
        )

        return self.rebuild_self(excludable=excludable)

    def exclude_fields(self, columns: Union[List, str, Set, Dict]) -> "QuerySet[T]":
        """
        With `exclude_fields()` you can select subset of model columns that will
        be excluded to limit the data load.

        It's the opposite of `fields()` method so check documentation above
        to see what options are available.

        Especially check above how you can pass also nested dictionaries
        and sets as a mask to exclude fields from whole hierarchy.

        Note that `fields()` and `exclude_fields()` works both for main models
        (on normal queries like `get`, `all` etc.)
        as well as `select_related` and `prefetch_related` models
        (with nested notation).

        Mandatory fields cannot be excluded as it will raise `ValidationError`,
        to exclude a field it has to be nullable.

        Pk column cannot be excluded - it's always auto added even
        if explicitly excluded.

        :param columns: columns to exclude
        :type columns: Union[List, str, Set, Dict]
        :return: QuerySet
        :rtype: QuerySet
        """
        return self.fields(columns=columns, _is_exclude=True)

    def order_by(self, columns: Union[List, str, OrderAction]) -> "QuerySet[T]":
        """
        With `order_by()` you can order the results from database based on your
        choice of fields.

        You can provide a string with field name or list of strings with fields names.

        Ordering in sql will be applied in order of names you provide in order_by.

        By default if you do not provide ordering `ormar` explicitly orders by
        all primary keys

        If you are sorting by nested models that causes that the result rows are
        unsorted by the main model `ormar` will combine those children rows into
        one main model.

        The main model will never duplicate in the result

        To order by main model field just provide a field name

        To sort on nested models separate field names with dunder '__'.

        You can sort this way across all relation types -> `ForeignKey`,
        reverse virtual FK and `ManyToMany` fields.

        To sort in descending order provide a hyphen in front of the field name

        :param columns: columns by which models should be sorted
        :type columns: Union[List, str]
        :return: QuerySet
        :rtype: QuerySet
        """
        if not isinstance(columns, list):
            columns = [columns]

        orders_by = [
            OrderAction(order_str=x, model_cls=self.model_cls)  # type: ignore
            if not isinstance(x, OrderAction)
            else x
            for x in columns
        ]

        order_bys = self.order_bys + [x for x in orders_by if x not in self.order_bys]
        return self.rebuild_self(order_bys=order_bys)

    async def values(
        self,
        fields: Union[List, str, Set, Dict] = None,
        exclude_through: bool = False,
        _as_dict: bool = True,
        _flatten: bool = False,
    ) -> List:
        """
        Return a list of dictionaries with column values in order of the fields
        passed or all fields from queried models.

        To filter for given row use filter/exclude methods before values,
        to limit number of rows use limit/offset or paginate before values.

        Note that it always return a list even for one row from database.

        :param exclude_through: flag if through models should be excluded
        :type exclude_through: bool
        :param _flatten: internal parameter to flatten one element tuples
        :type _flatten: bool
        :param _as_dict: internal parameter if return dict or tuples
        :type _as_dict: bool
        :param fields: field name or list of field names to extract from db
        :type fields:  Union[List, str, Set, Dict]
        """
        if fields:
            return await self.fields(columns=fields).values(
                _as_dict=_as_dict, _flatten=_flatten, exclude_through=exclude_through
            )
        expr = self.build_select_expression()
        rows = await self.database.fetch_all(expr)
        if not rows:
            return []
        alias_resolver = ReverseAliasResolver(
            select_related=self._select_related,
            excludable=self._excludable,
            model_cls=self.model_cls,  # type: ignore
            exclude_through=exclude_through,
        )
        column_map = alias_resolver.resolve_columns(
            columns_names=list(cast(LegacyRow, rows[0]).keys())
        )
        result = [
            {column_map.get(k): v for k, v in dict(x).items() if k in column_map}
            for x in rows
        ]
        if _as_dict:
            return result
        if _flatten and self._excludable.include_entry_count() != 1:
            raise QueryDefinitionError(
                "You cannot flatten values_list if more than one field is selected!"
            )
        tuple_result = [tuple(x.values()) for x in result]
        return tuple_result if not _flatten else [x[0] for x in tuple_result]

    async def values_list(
        self,
        fields: Union[List, str, Set, Dict] = None,
        flatten: bool = False,
        exclude_through: bool = False,
    ) -> List:
        """
        Return a list of tuples with column values in order of the fields passed or
        all fields from queried models.

        When one field is passed you can flatten the list of tuples into list of values
        of that single field.

        To filter for given row use filter/exclude methods before values,
        to limit number of rows use limit/offset or paginate before values.

        Note that it always return a list even for one row from database.

        :param exclude_through: flag if through models should be excluded
        :type exclude_through: bool
        :param fields: field name or list of field names to extract from db
        :type fields: Union[str, List[str]]
        :param flatten: when one field is passed you can flatten the list of tuples
        :type flatten: bool
        """
        return await self.values(
            fields=fields,
            exclude_through=exclude_through,
            _as_dict=False,
            _flatten=flatten,
        )

    async def exists(self) -> bool:
        """
        Returns a bool value to confirm if there are rows matching the given criteria
        (applied with `filter` and `exclude` if set).

        :return: result of the check
        :rtype: bool
        """
        expr = self.build_select_expression()
        expr = sqlalchemy.exists(expr).select()
        return await self.database.fetch_val(expr)

    async def count(self, distinct: bool = True) -> int:
        """
        Returns number of rows matching the given criteria
        (applied with `filter` and `exclude` if set before).
        If `distinct` is `True` (the default), this will return
        the number of primary rows selected. If `False`,
        the count will be the total number of rows returned
        (including extra rows for `one-to-many` or `many-to-many`
        left `select_related` table joins).
        `False` is the legacy (buggy) behavior for workflows that depend on it.

        :param distinct: flag if the primary table rows should be distinct or not

        :return: number of rows
        :rtype: int
        """
        expr = self.build_select_expression().alias("subquery_for_count")
        expr = sqlalchemy.func.count().select().select_from(expr)
        if distinct:
            pk_column_name = self.model.get_column_alias(self.model_meta.pkname)
            expr_distinct = expr.group_by(pk_column_name).alias("subquery_for_group")
            expr = sqlalchemy.func.count().select().select_from(expr_distinct)
        return await self.database.fetch_val(expr)

    async def _query_aggr_function(self, func_name: str, columns: List) -> Any:
        func = getattr(sqlalchemy.func, func_name)
        select_actions = [
            SelectAction(select_str=column, model_cls=self.model) for column in columns
        ]
        if func_name in ["sum", "avg"]:
            if any(not x.is_numeric for x in select_actions):
                raise QueryDefinitionError(
                    "You can use sum and svg only with" "numeric types of columns"
                )
        select_columns = [x.apply_func(func, use_label=True) for x in select_actions]
        expr = self.build_select_expression().alias(f"subquery_for_{func_name}")
        expr = sqlalchemy.select(select_columns).select_from(expr)
        # print("\n", expr.compile(compile_kwargs={"literal_binds": True}))
        result = await self.database.fetch_one(expr)
        return dict(result) if len(result) > 1 else result[0]  # type: ignore

    async def max(self, columns: Union[str, List[str]]) -> Any:  # noqa: A003
        """
        Returns max value of columns for rows matching the given criteria
        (applied with `filter` and `exclude` if set before).

        :return: max value of column(s)
        :rtype: Any
        """
        if not isinstance(columns, list):
            columns = [columns]
        return await self._query_aggr_function(func_name="max", columns=columns)

    async def min(self, columns: Union[str, List[str]]) -> Any:  # noqa: A003
        """
        Returns min value of columns for rows matching the given criteria
        (applied with `filter` and `exclude` if set before).

        :return: min value of column(s)
        :rtype: Any
        """
        if not isinstance(columns, list):
            columns = [columns]
        return await self._query_aggr_function(func_name="min", columns=columns)

    async def sum(self, columns: Union[str, List[str]]) -> Any:  # noqa: A003
        """
        Returns sum value of columns for rows matching the given criteria
        (applied with `filter` and `exclude` if set before).

        :return: sum value of columns
        :rtype: int
        """
        if not isinstance(columns, list):
            columns = [columns]
        return await self._query_aggr_function(func_name="sum", columns=columns)

    async def avg(self, columns: Union[str, List[str]]) -> Any:
        """
        Returns avg value of columns for rows matching the given criteria
        (applied with `filter` and `exclude` if set before).

        :return: avg value of columns
        :rtype: Union[int, float, List]
        """
        if not isinstance(columns, list):
            columns = [columns]
        return await self._query_aggr_function(func_name="avg", columns=columns)

    async def update(self, each: bool = False, **kwargs: Any) -> int:
        """
        Updates the model table after applying the filters from kwargs.

        You have to either pass a filter to narrow down a query or explicitly pass
        each=True flag to affect whole table.

        :param each: flag if whole table should be affected if no filter is passed
        :type each: bool
        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: number of updated rows
        :rtype: int
        """
        if not each and not (self.filter_clauses or self.exclude_clauses):
            raise QueryDefinitionError(
                "You cannot update without filtering the queryset first. "
                "If you want to update all rows use update(each=True, **kwargs)"
            )

        self_fields = self.model.extract_db_own_fields().union(
            self.model.extract_related_names()
        )
        updates = {k: v for k, v in kwargs.items() if k in self_fields}
        updates = self.model.validate_choices(updates)
        updates = self.model.translate_columns_to_aliases(updates)

        expr = FilterQuery(filter_clauses=self.filter_clauses).apply(
            self.table.update().values(**updates)
        )
        expr = FilterQuery(filter_clauses=self.exclude_clauses, exclude=True).apply(
            expr
        )
        return await self.database.execute(expr)

    async def delete(self, *args: Any, each: bool = False, **kwargs: Any) -> int:
        """
        Deletes from the model table after applying the filters from kwargs.

        You have to either pass a filter to narrow down a query or explicitly pass
        each=True flag to affect whole table.

        :param each: flag if whole table should be affected if no filter is passed
        :type each: bool
        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: number of deleted rows
        :rtype:int
        """
        if kwargs or args:
            return await self.filter(*args, **kwargs).delete()
        if not each and not (self.filter_clauses or self.exclude_clauses):
            raise QueryDefinitionError(
                "You cannot delete without filtering the queryset first. "
                "If you want to delete all rows use delete(each=True)"
            )
        expr = FilterQuery(filter_clauses=self.filter_clauses).apply(
            self.table.delete()
        )
        expr = FilterQuery(filter_clauses=self.exclude_clauses, exclude=True).apply(
            expr
        )
        return await self.database.execute(expr)

    def paginate(self, page: int, page_size: int = 20) -> "QuerySet[T]":
        """
        You can paginate the result which is a combination of offset and limit clauses.
        Limit is set to page size and offset is set to (page-1) * page_size.

        :param page_size: numbers of items per page
        :type page_size: int
        :param page: page number
        :type page: int
        :return: QuerySet
        :rtype: QuerySet
        """
        if page < 1 or page_size < 1:
            raise QueryDefinitionError("Page size and page have to be greater than 0.")

        limit_count = page_size
        query_offset = (page - 1) * page_size
        return self.rebuild_self(limit_count=limit_count, offset=query_offset)

    def limit(self, limit_count: int, limit_raw_sql: bool = None) -> "QuerySet[T]":
        """
        You can limit the results to desired number of parent models.

        To limit the actual number of database query rows instead of number of main
        models use the `limit_raw_sql` parameter flag, and set it to `True`.

        :param limit_raw_sql: flag if raw sql should be limited
        :type limit_raw_sql: bool
        :param limit_count: number of models to limit
        :type limit_count: int
        :return: QuerySet
        :rtype: QuerySet
        """
        limit_raw_sql = self.limit_sql_raw if limit_raw_sql is None else limit_raw_sql
        return self.rebuild_self(limit_count=limit_count, limit_raw_sql=limit_raw_sql)

    def offset(self, offset: int, limit_raw_sql: bool = None) -> "QuerySet[T]":
        """
        You can also offset the results by desired number of main models.

        To offset the actual number of database query rows instead of number of main
        models use the `limit_raw_sql` parameter flag, and set it to `True`.

        :param limit_raw_sql: flag if raw sql should be offset
        :type limit_raw_sql: bool
        :param offset: numbers of models to offset
        :type offset: int
        :return: QuerySet
        :rtype: QuerySet
        """
        limit_raw_sql = self.limit_sql_raw if limit_raw_sql is None else limit_raw_sql
        return self.rebuild_self(offset=offset, limit_raw_sql=limit_raw_sql)

    async def first(self, *args: Any, **kwargs: Any) -> "T":
        """
        Gets the first row from the db ordered by primary key column ascending.

        :raises NoMatch: if no rows are returned
        :raises MultipleMatches: if more than 1 row is returned.
        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: returned model
        :rtype: Model
        """
        if kwargs or args:
            return await self.filter(*args, **kwargs).first()

        expr = self.build_select_expression(
            limit=1,
            order_bys=(
                [
                    OrderAction(
                        order_str=f"{self.model.Meta.pkname}",
                        model_cls=self.model_cls,  # type: ignore
                    )
                ]
                if not any([x.is_source_model_order for x in self.order_bys])
                else []
            )
            + self.order_bys,
        )
        rows = await self.database.fetch_all(expr)
        processed_rows = await self._process_query_result_rows(rows)
        if self._prefetch_related and processed_rows:
            processed_rows = await self._prefetch_related_models(processed_rows, rows)
        self.check_single_result_rows_count(processed_rows)
        return processed_rows[0]  # type: ignore

    async def get_or_none(self, *args: Any, **kwargs: Any) -> Optional["T"]:
        """
        Gets the first row from the db meeting the criteria set by kwargs.

        If no criteria set it will return the last row in db sorted by pk.

        Passing a criteria is actually calling filter(*args, **kwargs) method described
        below.

        If not match is found None will be returned.

        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: returned model
        :rtype: Model
        """
        try:
            return await self.get(*args, **kwargs)
        except ormar.NoMatch:
            return None

    async def get(self, *args: Any, **kwargs: Any) -> "T":  # noqa: CCR001
        """
        Gets the first row from the db meeting the criteria set by kwargs.

        If no criteria set it will return the last row in db sorted by pk.

        Passing a criteria is actually calling filter(*args, **kwargs) method described
        below.

        :raises NoMatch: if no rows are returned
        :raises MultipleMatches: if more than 1 row is returned.
        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: returned model
        :rtype: Model
        """
        if kwargs or args:
            return await self.filter(*args, **kwargs).get()

        if not self.filter_clauses:
            expr = self.build_select_expression(
                limit=1,
                order_bys=(
                    [
                        OrderAction(
                            order_str=f"-{self.model.Meta.pkname}",
                            model_cls=self.model_cls,  # type: ignore
                        )
                    ]
                    if not any([x.is_source_model_order for x in self.order_bys])
                    else []
                )
                + self.order_bys,
            )
        else:
            expr = self.build_select_expression()

        rows = await self.database.fetch_all(expr)
        processed_rows = await self._process_query_result_rows(rows)
        if self._prefetch_related and processed_rows:
            processed_rows = await self._prefetch_related_models(processed_rows, rows)
        self.check_single_result_rows_count(processed_rows)
        return processed_rows[0]  # type: ignore

    async def get_or_create(
        self,
        _defaults: Optional[Dict[str, Any]] = None,
        *args: Any,
        **kwargs: Any,
    ) -> Tuple["T", bool]:
        """
        Combination of create and get methods.

        Tries to get a row meeting the criteria for kwargs
        and if `NoMatch` exception is raised
        it creates a new one with given kwargs and _defaults.

        Passing a criteria is actually calling filter(*args, **kwargs) method described
        below.

        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :param _defaults: default values for creating object
        :type _defaults: Optional[Dict[str, Any]]
        :return: model instance and a boolean
        :rtype: Tuple("T", bool)
        """
        try:
            return await self.get(*args, **kwargs), False
        except NoMatch:
            _defaults = _defaults or {}
            return await self.create(**{**kwargs, **_defaults}), True

    async def update_or_create(self, **kwargs: Any) -> "T":
        """
        Updates the model, or in case there is no match in database creates a new one.

        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: updated or created model
        :rtype: Model
        """
        pk_name = self.model_meta.pkname
        if "pk" in kwargs:
            kwargs[pk_name] = kwargs.pop("pk")
        if pk_name not in kwargs or kwargs.get(pk_name) is None:
            return await self.create(**kwargs)
        model = await self.get(pk=kwargs[pk_name])
        return await model.update(**kwargs)

    async def all(self, *args: Any, **kwargs: Any) -> List["T"]:  # noqa: A003
        """
        Returns all rows from a database for given model for set filter options.

        Passing args and/or kwargs is a shortcut and equals to calling
        `filter(*args, **kwargs).all()`.

        If there are no rows meeting the criteria an empty list is returned.

        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: list of returned models
        :rtype: List[Model]
        """
        if kwargs or args:
            return await self.filter(*args, **kwargs).all()

        expr = self.build_select_expression()
        rows = await self.database.fetch_all(expr)
        result_rows = await self._process_query_result_rows(rows)
        if self._prefetch_related and result_rows:
            result_rows = await self._prefetch_related_models(result_rows, rows)

        return result_rows

    async def iterate(  # noqa: A003
        self,
        *args: Any,
        **kwargs: Any,
    ) -> AsyncGenerator["T", None]:
        """
        Return async iterable generator for all rows from a database for given model.

        Passing args and/or kwargs is a shortcut and equals to calling
        `filter(*args, **kwargs).iterate()`.

        If there are no rows meeting the criteria an empty async generator is returned.

        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: asynchronous iterable generator of returned models
        :rtype: AsyncGenerator[Model]
        """

        if self._prefetch_related:
            raise QueryDefinitionError(
                "Prefetch related queries are not supported in iterators"
            )

        if kwargs or args:
            async for result in self.filter(*args, **kwargs).iterate():
                yield result
            return

        expr = self.build_select_expression()

        rows: list = []
        last_primary_key = None
        pk_alias = self.model.get_column_alias(self.model_meta.pkname)

        async for row in self.database.iterate(query=expr):
            current_primary_key = row[pk_alias]
            if last_primary_key == current_primary_key or last_primary_key is None:
                last_primary_key = current_primary_key
                rows.append(row)
                continue

            yield (await self._process_query_result_rows(rows))[0]
            last_primary_key = current_primary_key
            rows = [row]

        if rows:
            yield (await self._process_query_result_rows(rows))[0]

    async def create(self, **kwargs: Any) -> "T":
        """
        Creates the model instance, saves it in a database and returns the updates model
        (with pk populated if not passed and autoincrement is set).

        The allowed kwargs are `Model` fields names and proper value types.

        :param kwargs: fields names and proper value types
        :type kwargs: Any
        :return: created model
        :rtype: Model
        """
        instance = self.model(**kwargs)
        instance = await instance.save()
        return instance

    async def bulk_create(self, objects: List["T"]) -> None:
        """
        Performs a bulk create in one database session to speed up the process.

        Allows you to create multiple objects at once.

        A valid list of `Model` objects needs to be passed.

        Bulk operations do not send signals.

        :param objects: list of ormar models already initialized and ready to save.
        :type objects: List[Model]
        """

        if not objects:
            raise ModelListEmptyError("Bulk create objects are empty!")

        ready_objects = []
        for obj in objects:
            ready_objects.append(obj.prepare_model_to_save(obj.dict()))
            await asyncio.sleep(0)  # Allow context switching to prevent blocking

        # don't use execute_many, as in databases it's executed in a loop
        # instead of using execute_many from drivers
        expr = self.table.insert().values(ready_objects)
        await self.database.execute(expr)

        for obj in objects:
            obj.set_save_status(True)

    async def bulk_update(  # noqa:  CCR001
        self, objects: List["T"], columns: List[str] = None
    ) -> None:
        """
        Performs bulk update in one database session to speed up the process.

        Allows you to update multiple instance at once.

        All `Models` passed need to have primary key column populated.

        You can also select which fields to update by passing `columns` list
        as a list of string names.

        Bulk operations do not send signals.

        :param objects: list of ormar models
        :type objects: List[Model]
        :param columns: list of columns to update
        :type columns: List[str]
        """
        if not objects:
            raise ModelListEmptyError("Bulk update objects are empty!")

        ready_objects = []
        pk_name = self.model_meta.pkname
        if not columns:
            columns = list(
                self.model.extract_db_own_fields().union(
                    self.model.extract_related_names()
                )
            )

        if pk_name not in columns:
            columns.append(pk_name)

        columns = [self.model.get_column_alias(k) for k in columns]

        for obj in objects:
            new_kwargs = obj.dict()
            if new_kwargs.get(pk_name) is None:
                raise ModelPersistenceError(
                    "You cannot update unsaved objects. "
                    f"{self.model.__name__} has to have {pk_name} filled."
                )
            new_kwargs = obj.prepare_model_to_update(new_kwargs)
            ready_objects.append(
                {"new_" + k: v for k, v in new_kwargs.items() if k in columns}
            )
            await asyncio.sleep(0)

        pk_column = self.model_meta.table.c.get(self.model.get_column_alias(pk_name))
        pk_column_name = self.model.get_column_alias(pk_name)
        table_columns = [c.name for c in self.model_meta.table.c]
        expr = self.table.update().where(
            pk_column == bindparam("new_" + pk_column_name)
        )
        expr = expr.values(
            **{
                k: bindparam("new_" + k)
                for k in columns
                if k != pk_column_name and k in table_columns
            }
        )
        # databases bind params only where query is passed as string
        # otherwise it just passes all data to values and results in unconsumed columns
        expr = str(expr)
        await self.database.execute_many(expr, ready_objects)

        for obj in objects:
            obj.set_save_status(True)

        await cast(Type["Model"], self.model_cls).Meta.signals.post_bulk_update.send(
            sender=self.model_cls, instances=objects  # type: ignore
        )

database: databases.Database property

Shortcut to models database from Meta class.

Returns:

Type Description
databases.Database

database

model: Type[T] property

Shortcut to model class set on QuerySet.

Returns:

Type Description
Type[Model]

model class

model_meta: ModelMeta property

Shortcut to model class Meta set on QuerySet model.

Returns:

Type Description
model Meta class

Meta class of the model

table: sqlalchemy.Table property

Shortcut to models table from Meta class.

Returns:

Type Description
sqlalchemy.Table

database table

all(*args, **kwargs) async

Returns all rows from a database for given model for set filter options.

Passing args and/or kwargs is a shortcut and equals to calling filter(*args, **kwargs).all().

If there are no rows meeting the criteria an empty list is returned.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
List[Model]

list of returned models

Source code in ormar\queryset\queryset.py
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
async def all(self, *args: Any, **kwargs: Any) -> List["T"]:  # noqa: A003
    """
    Returns all rows from a database for given model for set filter options.

    Passing args and/or kwargs is a shortcut and equals to calling
    `filter(*args, **kwargs).all()`.

    If there are no rows meeting the criteria an empty list is returned.

    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: list of returned models
    :rtype: List[Model]
    """
    if kwargs or args:
        return await self.filter(*args, **kwargs).all()

    expr = self.build_select_expression()
    rows = await self.database.fetch_all(expr)
    result_rows = await self._process_query_result_rows(rows)
    if self._prefetch_related and result_rows:
        result_rows = await self._prefetch_related_models(result_rows, rows)

    return result_rows

avg(columns) async

Returns avg value of columns for rows matching the given criteria (applied with filter and exclude if set before).

Returns:

Type Description
Union[int, float, List]

avg value of columns

Source code in ormar\queryset\queryset.py
763
764
765
766
767
768
769
770
771
772
773
async def avg(self, columns: Union[str, List[str]]) -> Any:
    """
    Returns avg value of columns for rows matching the given criteria
    (applied with `filter` and `exclude` if set before).

    :return: avg value of columns
    :rtype: Union[int, float, List]
    """
    if not isinstance(columns, list):
        columns = [columns]
    return await self._query_aggr_function(func_name="avg", columns=columns)

build_select_expression(limit=None, offset=None, order_bys=None)

Constructs the actual database query used in the QuerySet. If any of the params is not passed the QuerySet own value is used.

Parameters:

Name Type Description Default
limit int

number to limit the query

None
offset int

number to offset by

None
order_bys List

list of order-by fields names

None

Returns:

Type Description
sqlalchemy.sql.selectable.Select

built sqlalchemy select expression

Source code in ormar\queryset\queryset.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def build_select_expression(
    self, limit: int = None, offset: int = None, order_bys: List = None
) -> sqlalchemy.sql.select:
    """
    Constructs the actual database query used in the QuerySet.
    If any of the params is not passed the QuerySet own value is used.

    :param limit: number to limit the query
    :type limit: int
    :param offset: number to offset by
    :type offset: int
    :param order_bys: list of order-by fields names
    :type order_bys: List
    :return: built sqlalchemy select expression
    :rtype: sqlalchemy.sql.selectable.Select
    """
    qry = Query(
        model_cls=self.model,
        select_related=self._select_related,
        filter_clauses=self.filter_clauses,
        exclude_clauses=self.exclude_clauses,
        offset=offset or self.query_offset,
        excludable=self._excludable,
        order_bys=order_bys or self.order_bys,
        limit_raw_sql=self.limit_sql_raw,
        limit_count=limit if limit is not None else self.limit_count,
    )
    exp = qry.build_select_expression()
    # print("\n", exp.compile(compile_kwargs={"literal_binds": True}))
    return exp

bulk_create(objects) async

Performs a bulk create in one database session to speed up the process.

Allows you to create multiple objects at once.

A valid list of Model objects needs to be passed.

Bulk operations do not send signals.

Parameters:

Name Type Description Default
objects List[T]

list of ormar models already initialized and ready to save.

required
Source code in ormar\queryset\queryset.py
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
async def bulk_create(self, objects: List["T"]) -> None:
    """
    Performs a bulk create in one database session to speed up the process.

    Allows you to create multiple objects at once.

    A valid list of `Model` objects needs to be passed.

    Bulk operations do not send signals.

    :param objects: list of ormar models already initialized and ready to save.
    :type objects: List[Model]
    """

    if not objects:
        raise ModelListEmptyError("Bulk create objects are empty!")

    ready_objects = []
    for obj in objects:
        ready_objects.append(obj.prepare_model_to_save(obj.dict()))
        await asyncio.sleep(0)  # Allow context switching to prevent blocking

    # don't use execute_many, as in databases it's executed in a loop
    # instead of using execute_many from drivers
    expr = self.table.insert().values(ready_objects)
    await self.database.execute(expr)

    for obj in objects:
        obj.set_save_status(True)

bulk_update(objects, columns=None) async

Performs bulk update in one database session to speed up the process.

Allows you to update multiple instance at once.

All Models passed need to have primary key column populated.

You can also select which fields to update by passing columns list as a list of string names.

Bulk operations do not send signals.

Parameters:

Name Type Description Default
objects List[T]

list of ormar models

required
columns List[str]

list of columns to update

None
Source code in ormar\queryset\queryset.py
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
async def bulk_update(  # noqa:  CCR001
    self, objects: List["T"], columns: List[str] = None
) -> None:
    """
    Performs bulk update in one database session to speed up the process.

    Allows you to update multiple instance at once.

    All `Models` passed need to have primary key column populated.

    You can also select which fields to update by passing `columns` list
    as a list of string names.

    Bulk operations do not send signals.

    :param objects: list of ormar models
    :type objects: List[Model]
    :param columns: list of columns to update
    :type columns: List[str]
    """
    if not objects:
        raise ModelListEmptyError("Bulk update objects are empty!")

    ready_objects = []
    pk_name = self.model_meta.pkname
    if not columns:
        columns = list(
            self.model.extract_db_own_fields().union(
                self.model.extract_related_names()
            )
        )

    if pk_name not in columns:
        columns.append(pk_name)

    columns = [self.model.get_column_alias(k) for k in columns]

    for obj in objects:
        new_kwargs = obj.dict()
        if new_kwargs.get(pk_name) is None:
            raise ModelPersistenceError(
                "You cannot update unsaved objects. "
                f"{self.model.__name__} has to have {pk_name} filled."
            )
        new_kwargs = obj.prepare_model_to_update(new_kwargs)
        ready_objects.append(
            {"new_" + k: v for k, v in new_kwargs.items() if k in columns}
        )
        await asyncio.sleep(0)

    pk_column = self.model_meta.table.c.get(self.model.get_column_alias(pk_name))
    pk_column_name = self.model.get_column_alias(pk_name)
    table_columns = [c.name for c in self.model_meta.table.c]
    expr = self.table.update().where(
        pk_column == bindparam("new_" + pk_column_name)
    )
    expr = expr.values(
        **{
            k: bindparam("new_" + k)
            for k in columns
            if k != pk_column_name and k in table_columns
        }
    )
    # databases bind params only where query is passed as string
    # otherwise it just passes all data to values and results in unconsumed columns
    expr = str(expr)
    await self.database.execute_many(expr, ready_objects)

    for obj in objects:
        obj.set_save_status(True)

    await cast(Type["Model"], self.model_cls).Meta.signals.post_bulk_update.send(
        sender=self.model_cls, instances=objects  # type: ignore
    )

check_single_result_rows_count(rows) staticmethod

Verifies if the result has one and only one row.

Parameters:

Name Type Description Default
rows Sequence[Optional[T]]

one element list of Models

required
Source code in ormar\queryset\queryset.py
234
235
236
237
238
239
240
241
242
243
244
245
@staticmethod
def check_single_result_rows_count(rows: Sequence[Optional["T"]]) -> None:
    """
    Verifies if the result has one and only one row.

    :param rows: one element list of Models
    :type rows: List[Model]
    """
    if not rows or rows[0] is None:
        raise NoMatch()
    if len(rows) > 1:
        raise MultipleMatches()

count(distinct=True) async

Returns number of rows matching the given criteria (applied with filter and exclude if set before). If distinct is True (the default), this will return the number of primary rows selected. If False, the count will be the total number of rows returned (including extra rows for one-to-many or many-to-many left select_related table joins). False is the legacy (buggy) behavior for workflows that depend on it.

Parameters:

Name Type Description Default
distinct bool

flag if the primary table rows should be distinct or not

True

Returns:

Type Description
int

number of rows

Source code in ormar\queryset\queryset.py
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
async def count(self, distinct: bool = True) -> int:
    """
    Returns number of rows matching the given criteria
    (applied with `filter` and `exclude` if set before).
    If `distinct` is `True` (the default), this will return
    the number of primary rows selected. If `False`,
    the count will be the total number of rows returned
    (including extra rows for `one-to-many` or `many-to-many`
    left `select_related` table joins).
    `False` is the legacy (buggy) behavior for workflows that depend on it.

    :param distinct: flag if the primary table rows should be distinct or not

    :return: number of rows
    :rtype: int
    """
    expr = self.build_select_expression().alias("subquery_for_count")
    expr = sqlalchemy.func.count().select().select_from(expr)
    if distinct:
        pk_column_name = self.model.get_column_alias(self.model_meta.pkname)
        expr_distinct = expr.group_by(pk_column_name).alias("subquery_for_group")
        expr = sqlalchemy.func.count().select().select_from(expr_distinct)
    return await self.database.fetch_val(expr)

create(**kwargs) async

Creates the model instance, saves it in a database and returns the updates model (with pk populated if not passed and autoincrement is set).

The allowed kwargs are Model fields names and proper value types.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
Model

created model

Source code in ormar\queryset\queryset.py
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
async def create(self, **kwargs: Any) -> "T":
    """
    Creates the model instance, saves it in a database and returns the updates model
    (with pk populated if not passed and autoincrement is set).

    The allowed kwargs are `Model` fields names and proper value types.

    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: created model
    :rtype: Model
    """
    instance = self.model(**kwargs)
    instance = await instance.save()
    return instance

delete(*args, each=False, **kwargs) async

Deletes from the model table after applying the filters from kwargs.

You have to either pass a filter to narrow down a query or explicitly pass each=True flag to affect whole table.

Parameters:

Name Type Description Default
each bool

flag if whole table should be affected if no filter is passed

False
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
int

number of deleted rows

Source code in ormar\queryset\queryset.py
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
async def delete(self, *args: Any, each: bool = False, **kwargs: Any) -> int:
    """
    Deletes from the model table after applying the filters from kwargs.

    You have to either pass a filter to narrow down a query or explicitly pass
    each=True flag to affect whole table.

    :param each: flag if whole table should be affected if no filter is passed
    :type each: bool
    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: number of deleted rows
    :rtype:int
    """
    if kwargs or args:
        return await self.filter(*args, **kwargs).delete()
    if not each and not (self.filter_clauses or self.exclude_clauses):
        raise QueryDefinitionError(
            "You cannot delete without filtering the queryset first. "
            "If you want to delete all rows use delete(each=True)"
        )
    expr = FilterQuery(filter_clauses=self.filter_clauses).apply(
        self.table.delete()
    )
    expr = FilterQuery(filter_clauses=self.exclude_clauses, exclude=True).apply(
        expr
    )
    return await self.database.execute(expr)

exclude(*args, **kwargs)

Works exactly the same as filter and all modifiers (suffixes) are the same, but returns a not condition.

So if you use filter(name='John') which is where name = 'John' in SQL, the exclude(name='John') equals to where name <> 'John'

Note that all conditions are joined so if you pass multiple values it becomes a union of conditions.

exclude(name='John', age>=35) will become where not (name='John' and age>=35)

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
QuerySet

filtered QuerySet

Source code in ormar\queryset\queryset.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def exclude(self, *args: Any, **kwargs: Any) -> "QuerySet[T]":  # noqa: A003
    """
    Works exactly the same as filter and all modifiers (suffixes) are the same,
    but returns a *not* condition.

    So if you use `filter(name='John')` which is `where name = 'John'` in SQL,
    the `exclude(name='John')` equals to `where name <> 'John'`

    Note that all conditions are joined so if you pass multiple values it
    becomes a union of conditions.

    `exclude(name='John', age>=35)` will become
    `where not (name='John' and age>=35)`

    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: filtered QuerySet
    :rtype: QuerySet
    """
    return self.filter(_exclude=True, *args, **kwargs)

exclude_fields(columns)

With exclude_fields() you can select subset of model columns that will be excluded to limit the data load.

It's the opposite of fields() method so check documentation above to see what options are available.

Especially check above how you can pass also nested dictionaries and sets as a mask to exclude fields from whole hierarchy.

Note that fields() and exclude_fields() works both for main models (on normal queries like get, all etc.) as well as select_related and prefetch_related models (with nested notation).

Mandatory fields cannot be excluded as it will raise ValidationError, to exclude a field it has to be nullable.

Pk column cannot be excluded - it's always auto added even if explicitly excluded.

Parameters:

Name Type Description Default
columns Union[List, str, Set, Dict]

columns to exclude

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def exclude_fields(self, columns: Union[List, str, Set, Dict]) -> "QuerySet[T]":
    """
    With `exclude_fields()` you can select subset of model columns that will
    be excluded to limit the data load.

    It's the opposite of `fields()` method so check documentation above
    to see what options are available.

    Especially check above how you can pass also nested dictionaries
    and sets as a mask to exclude fields from whole hierarchy.

    Note that `fields()` and `exclude_fields()` works both for main models
    (on normal queries like `get`, `all` etc.)
    as well as `select_related` and `prefetch_related` models
    (with nested notation).

    Mandatory fields cannot be excluded as it will raise `ValidationError`,
    to exclude a field it has to be nullable.

    Pk column cannot be excluded - it's always auto added even
    if explicitly excluded.

    :param columns: columns to exclude
    :type columns: Union[List, str, Set, Dict]
    :return: QuerySet
    :rtype: QuerySet
    """
    return self.fields(columns=columns, _is_exclude=True)

exists() async

Returns a bool value to confirm if there are rows matching the given criteria (applied with filter and exclude if set).

Returns:

Type Description
bool

result of the check

Source code in ormar\queryset\queryset.py
674
675
676
677
678
679
680
681
682
683
684
async def exists(self) -> bool:
    """
    Returns a bool value to confirm if there are rows matching the given criteria
    (applied with `filter` and `exclude` if set).

    :return: result of the check
    :rtype: bool
    """
    expr = self.build_select_expression()
    expr = sqlalchemy.exists(expr).select()
    return await self.database.fetch_val(expr)

fields(columns, _is_exclude=False)

With fields() you can select subset of model columns to limit the data load.

Note that fields() and exclude_fields() works both for main models (on normal queries like get, all etc.) as well as select_related and prefetch_related models (with nested notation).

You can select specified fields by passing a str, List[str], Set[str] or dict with nested definition.

To include related models use notation {related_name}__{column}[__{optional_next} etc.].

fields() can be called several times, building up the columns to select.

If you include related models into select_related() call but you won't specify columns for those models in fields - implies a list of all fields for those nested models.

Mandatory fields cannot be excluded as it will raise ValidationError, to exclude a field it has to be nullable.

Pk column cannot be excluded - it's always auto added even if not explicitly included.

You can also pass fields to include as dictionary or set.

To mark a field as included in a dictionary use it's name as key and ellipsis as value.

To traverse nested models use nested dictionaries.

To include fields at last level instead of nested dictionary a set can be used.

To include whole nested model specify model related field name and ellipsis.

Parameters:

Name Type Description Default
_is_exclude bool

flag if it's exclude or include operation

False
columns Union[List, str, Set, Dict]

columns to include

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def fields(
    self, columns: Union[List, str, Set, Dict], _is_exclude: bool = False
) -> "QuerySet[T]":
    """
    With `fields()` you can select subset of model columns to limit the data load.

    Note that `fields()` and `exclude_fields()` works both for main models
    (on normal queries like `get`, `all` etc.)
    as well as `select_related` and `prefetch_related`
    models (with nested notation).

    You can select specified fields by passing a `str, List[str], Set[str] or
    dict` with nested definition.

    To include related models use notation
    `{related_name}__{column}[__{optional_next} etc.]`.

    `fields()` can be called several times, building up the columns to select.

    If you include related models into `select_related()` call but you won't specify
    columns for those models in fields - implies a list of all fields for
    those nested models.

    Mandatory fields cannot be excluded as it will raise `ValidationError`,
    to exclude a field it has to be nullable.

    Pk column cannot be excluded - it's always auto added even if
    not explicitly included.

    You can also pass fields to include as dictionary or set.

    To mark a field as included in a dictionary use it's name as key
    and ellipsis as value.

    To traverse nested models use nested dictionaries.

    To include fields at last level instead of nested dictionary a set can be used.

    To include whole nested model specify model related field name and ellipsis.

    :param _is_exclude: flag if it's exclude or include operation
    :type _is_exclude: bool
    :param columns: columns to include
    :type columns: Union[List, str, Set, Dict]
    :return: QuerySet
    :rtype: QuerySet
    """
    excludable = ormar.ExcludableItems.from_excludable(self._excludable)
    excludable.build(
        items=columns,
        model_cls=self.model_cls,  # type: ignore
        is_exclude=_is_exclude,
    )

    return self.rebuild_self(excludable=excludable)

filter(*args, _exclude=False, **kwargs)

Allows you to filter by any Model attribute/field as well as to fetch instances, with a filter across an FK relationship.

You can use special filter suffix to change the filter operands:

  • exact - like album__name__exact='Malibu' (exact match)
  • iexact - like album__name__iexact='malibu' (exact match case insensitive)
  • contains - like album__name__contains='Mal' (sql like)
  • icontains - like album__name__icontains='mal' (sql like case insensitive)
  • in - like album__name__in=['Malibu', 'Barclay'] (sql in)
  • isnull - like album__name__isnull=True (sql is null) (isnotnull album__name__isnull=False (sql is not null))
  • gt - like position__gt=3 (sql >)
  • gte - like position__gte=3 (sql >=)
  • lt - like position__lt=3 (sql <)
  • lte - like position__lte=3 (sql <=)
  • startswith - like album__name__startswith='Mal' (exact start match)
  • istartswith - like album__name__istartswith='mal' (case insensitive)
  • endswith - like album__name__endswith='ibu' (exact end match)
  • iendswith - like album__name__iendswith='IBU' (case insensitive)

Note that you can also use python style filters - check the docs!

Parameters:

Name Type Description Default
_exclude bool

flag if it should be exclude or filter

False
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
QuerySet

filtered QuerySet

Source code in ormar\queryset\queryset.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def filter(  # noqa: A003
    self, *args: Any, _exclude: bool = False, **kwargs: Any
) -> "QuerySet[T]":
    """
    Allows you to filter by any `Model` attribute/field
    as well as to fetch instances, with a filter across an FK relationship.

    You can use special filter suffix to change the filter operands:

    *  exact - like `album__name__exact='Malibu'` (exact match)
    *  iexact - like `album__name__iexact='malibu'` (exact match case insensitive)
    *  contains - like `album__name__contains='Mal'` (sql like)
    *  icontains - like `album__name__icontains='mal'` (sql like case insensitive)
    *  in - like `album__name__in=['Malibu', 'Barclay']` (sql in)
    *  isnull - like `album__name__isnull=True` (sql is null)
       (isnotnull `album__name__isnull=False` (sql is not null))
    *  gt - like `position__gt=3` (sql >)
    *  gte - like `position__gte=3` (sql >=)
    *  lt - like `position__lt=3` (sql <)
    *  lte - like `position__lte=3` (sql <=)
    *  startswith - like `album__name__startswith='Mal'` (exact start match)
    *  istartswith - like `album__name__istartswith='mal'` (case insensitive)
    *  endswith - like `album__name__endswith='ibu'` (exact end match)
    *  iendswith - like `album__name__iendswith='IBU'` (case insensitive)

    Note that you can also use python style filters - check the docs!

    :param _exclude: flag if it should be exclude or filter
    :type _exclude: bool
    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: filtered QuerySet
    :rtype: QuerySet
    """
    filter_groups, select_related = self._resolve_filter_groups(groups=args)
    qryclause = QueryClause(
        model_cls=self.model,
        select_related=select_related,
        filter_clauses=self.filter_clauses,
    )
    filter_clauses, select_related = qryclause.prepare_filter(**kwargs)
    filter_clauses = filter_clauses + filter_groups  # type: ignore
    if _exclude:
        exclude_clauses = filter_clauses
        filter_clauses = self.filter_clauses
    else:
        exclude_clauses = self.exclude_clauses
        filter_clauses = filter_clauses

    return self.rebuild_self(
        filter_clauses=filter_clauses,
        exclude_clauses=exclude_clauses,
        select_related=select_related,
    )

first(*args, **kwargs) async

Gets the first row from the db ordered by primary key column ascending.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
Model

returned model

Raises:

Type Description
NoMatch

if no rows are returned

MultipleMatches

if more than 1 row is returned.

Source code in ormar\queryset\queryset.py
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
async def first(self, *args: Any, **kwargs: Any) -> "T":
    """
    Gets the first row from the db ordered by primary key column ascending.

    :raises NoMatch: if no rows are returned
    :raises MultipleMatches: if more than 1 row is returned.
    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: returned model
    :rtype: Model
    """
    if kwargs or args:
        return await self.filter(*args, **kwargs).first()

    expr = self.build_select_expression(
        limit=1,
        order_bys=(
            [
                OrderAction(
                    order_str=f"{self.model.Meta.pkname}",
                    model_cls=self.model_cls,  # type: ignore
                )
            ]
            if not any([x.is_source_model_order for x in self.order_bys])
            else []
        )
        + self.order_bys,
    )
    rows = await self.database.fetch_all(expr)
    processed_rows = await self._process_query_result_rows(rows)
    if self._prefetch_related and processed_rows:
        processed_rows = await self._prefetch_related_models(processed_rows, rows)
    self.check_single_result_rows_count(processed_rows)
    return processed_rows[0]  # type: ignore

get(*args, **kwargs) async

Gets the first row from the db meeting the criteria set by kwargs.

If no criteria set it will return the last row in db sorted by pk.

Passing a criteria is actually calling filter(args, *kwargs) method described below.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
Model

returned model

Raises:

Type Description
NoMatch

if no rows are returned

MultipleMatches

if more than 1 row is returned.

Source code in ormar\queryset\queryset.py
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
async def get(self, *args: Any, **kwargs: Any) -> "T":  # noqa: CCR001
    """
    Gets the first row from the db meeting the criteria set by kwargs.

    If no criteria set it will return the last row in db sorted by pk.

    Passing a criteria is actually calling filter(*args, **kwargs) method described
    below.

    :raises NoMatch: if no rows are returned
    :raises MultipleMatches: if more than 1 row is returned.
    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: returned model
    :rtype: Model
    """
    if kwargs or args:
        return await self.filter(*args, **kwargs).get()

    if not self.filter_clauses:
        expr = self.build_select_expression(
            limit=1,
            order_bys=(
                [
                    OrderAction(
                        order_str=f"-{self.model.Meta.pkname}",
                        model_cls=self.model_cls,  # type: ignore
                    )
                ]
                if not any([x.is_source_model_order for x in self.order_bys])
                else []
            )
            + self.order_bys,
        )
    else:
        expr = self.build_select_expression()

    rows = await self.database.fetch_all(expr)
    processed_rows = await self._process_query_result_rows(rows)
    if self._prefetch_related and processed_rows:
        processed_rows = await self._prefetch_related_models(processed_rows, rows)
    self.check_single_result_rows_count(processed_rows)
    return processed_rows[0]  # type: ignore

get_or_create(_defaults=None, *args, **kwargs) async

Combination of create and get methods.

Tries to get a row meeting the criteria for kwargs and if NoMatch exception is raised it creates a new one with given kwargs and _defaults.

Passing a criteria is actually calling filter(args, *kwargs) method described below.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}
_defaults Optional[Dict[str, Any]]

default values for creating object

None

Returns:

Type Description
Tuple("T", bool)

model instance and a boolean

Source code in ormar\queryset\queryset.py
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
async def get_or_create(
    self,
    _defaults: Optional[Dict[str, Any]] = None,
    *args: Any,
    **kwargs: Any,
) -> Tuple["T", bool]:
    """
    Combination of create and get methods.

    Tries to get a row meeting the criteria for kwargs
    and if `NoMatch` exception is raised
    it creates a new one with given kwargs and _defaults.

    Passing a criteria is actually calling filter(*args, **kwargs) method described
    below.

    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :param _defaults: default values for creating object
    :type _defaults: Optional[Dict[str, Any]]
    :return: model instance and a boolean
    :rtype: Tuple("T", bool)
    """
    try:
        return await self.get(*args, **kwargs), False
    except NoMatch:
        _defaults = _defaults or {}
        return await self.create(**{**kwargs, **_defaults}), True

get_or_none(*args, **kwargs) async

Gets the first row from the db meeting the criteria set by kwargs.

If no criteria set it will return the last row in db sorted by pk.

Passing a criteria is actually calling filter(args, *kwargs) method described below.

If not match is found None will be returned.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
Model

returned model

Source code in ormar\queryset\queryset.py
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
async def get_or_none(self, *args: Any, **kwargs: Any) -> Optional["T"]:
    """
    Gets the first row from the db meeting the criteria set by kwargs.

    If no criteria set it will return the last row in db sorted by pk.

    Passing a criteria is actually calling filter(*args, **kwargs) method described
    below.

    If not match is found None will be returned.

    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: returned model
    :rtype: Model
    """
    try:
        return await self.get(*args, **kwargs)
    except ormar.NoMatch:
        return None

iterate(*args, **kwargs) async

Return async iterable generator for all rows from a database for given model.

Passing args and/or kwargs is a shortcut and equals to calling filter(*args, **kwargs).iterate().

If there are no rows meeting the criteria an empty async generator is returned.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
AsyncGenerator[Model]

asynchronous iterable generator of returned models

Source code in ormar\queryset\queryset.py
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
async def iterate(  # noqa: A003
    self,
    *args: Any,
    **kwargs: Any,
) -> AsyncGenerator["T", None]:
    """
    Return async iterable generator for all rows from a database for given model.

    Passing args and/or kwargs is a shortcut and equals to calling
    `filter(*args, **kwargs).iterate()`.

    If there are no rows meeting the criteria an empty async generator is returned.

    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: asynchronous iterable generator of returned models
    :rtype: AsyncGenerator[Model]
    """

    if self._prefetch_related:
        raise QueryDefinitionError(
            "Prefetch related queries are not supported in iterators"
        )

    if kwargs or args:
        async for result in self.filter(*args, **kwargs).iterate():
            yield result
        return

    expr = self.build_select_expression()

    rows: list = []
    last_primary_key = None
    pk_alias = self.model.get_column_alias(self.model_meta.pkname)

    async for row in self.database.iterate(query=expr):
        current_primary_key = row[pk_alias]
        if last_primary_key == current_primary_key or last_primary_key is None:
            last_primary_key = current_primary_key
            rows.append(row)
            continue

        yield (await self._process_query_result_rows(rows))[0]
        last_primary_key = current_primary_key
        rows = [row]

    if rows:
        yield (await self._process_query_result_rows(rows))[0]

limit(limit_count, limit_raw_sql=None)

You can limit the results to desired number of parent models.

To limit the actual number of database query rows instead of number of main models use the limit_raw_sql parameter flag, and set it to True.

Parameters:

Name Type Description Default
limit_raw_sql bool

flag if raw sql should be limited

None
limit_count int

number of models to limit

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
def limit(self, limit_count: int, limit_raw_sql: bool = None) -> "QuerySet[T]":
    """
    You can limit the results to desired number of parent models.

    To limit the actual number of database query rows instead of number of main
    models use the `limit_raw_sql` parameter flag, and set it to `True`.

    :param limit_raw_sql: flag if raw sql should be limited
    :type limit_raw_sql: bool
    :param limit_count: number of models to limit
    :type limit_count: int
    :return: QuerySet
    :rtype: QuerySet
    """
    limit_raw_sql = self.limit_sql_raw if limit_raw_sql is None else limit_raw_sql
    return self.rebuild_self(limit_count=limit_count, limit_raw_sql=limit_raw_sql)

max(columns) async

Returns max value of columns for rows matching the given criteria (applied with filter and exclude if set before).

Returns:

Type Description
Any

max value of column(s)

Source code in ormar\queryset\queryset.py
727
728
729
730
731
732
733
734
735
736
737
async def max(self, columns: Union[str, List[str]]) -> Any:  # noqa: A003
    """
    Returns max value of columns for rows matching the given criteria
    (applied with `filter` and `exclude` if set before).

    :return: max value of column(s)
    :rtype: Any
    """
    if not isinstance(columns, list):
        columns = [columns]
    return await self._query_aggr_function(func_name="max", columns=columns)

min(columns) async

Returns min value of columns for rows matching the given criteria (applied with filter and exclude if set before).

Returns:

Type Description
Any

min value of column(s)

Source code in ormar\queryset\queryset.py
739
740
741
742
743
744
745
746
747
748
749
async def min(self, columns: Union[str, List[str]]) -> Any:  # noqa: A003
    """
    Returns min value of columns for rows matching the given criteria
    (applied with `filter` and `exclude` if set before).

    :return: min value of column(s)
    :rtype: Any
    """
    if not isinstance(columns, list):
        columns = [columns]
    return await self._query_aggr_function(func_name="min", columns=columns)

offset(offset, limit_raw_sql=None)

You can also offset the results by desired number of main models.

To offset the actual number of database query rows instead of number of main models use the limit_raw_sql parameter flag, and set it to True.

Parameters:

Name Type Description Default
limit_raw_sql bool

flag if raw sql should be offset

None
offset int

numbers of models to offset

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
def offset(self, offset: int, limit_raw_sql: bool = None) -> "QuerySet[T]":
    """
    You can also offset the results by desired number of main models.

    To offset the actual number of database query rows instead of number of main
    models use the `limit_raw_sql` parameter flag, and set it to `True`.

    :param limit_raw_sql: flag if raw sql should be offset
    :type limit_raw_sql: bool
    :param offset: numbers of models to offset
    :type offset: int
    :return: QuerySet
    :rtype: QuerySet
    """
    limit_raw_sql = self.limit_sql_raw if limit_raw_sql is None else limit_raw_sql
    return self.rebuild_self(offset=offset, limit_raw_sql=limit_raw_sql)

order_by(columns)

With order_by() you can order the results from database based on your choice of fields.

You can provide a string with field name or list of strings with fields names.

Ordering in sql will be applied in order of names you provide in order_by.

By default if you do not provide ordering ormar explicitly orders by all primary keys

If you are sorting by nested models that causes that the result rows are unsorted by the main model ormar will combine those children rows into one main model.

The main model will never duplicate in the result

To order by main model field just provide a field name

To sort on nested models separate field names with dunder '__'.

You can sort this way across all relation types -> ForeignKey, reverse virtual FK and ManyToMany fields.

To sort in descending order provide a hyphen in front of the field name

Parameters:

Name Type Description Default
columns Union[List, str, OrderAction]

columns by which models should be sorted

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def order_by(self, columns: Union[List, str, OrderAction]) -> "QuerySet[T]":
    """
    With `order_by()` you can order the results from database based on your
    choice of fields.

    You can provide a string with field name or list of strings with fields names.

    Ordering in sql will be applied in order of names you provide in order_by.

    By default if you do not provide ordering `ormar` explicitly orders by
    all primary keys

    If you are sorting by nested models that causes that the result rows are
    unsorted by the main model `ormar` will combine those children rows into
    one main model.

    The main model will never duplicate in the result

    To order by main model field just provide a field name

    To sort on nested models separate field names with dunder '__'.

    You can sort this way across all relation types -> `ForeignKey`,
    reverse virtual FK and `ManyToMany` fields.

    To sort in descending order provide a hyphen in front of the field name

    :param columns: columns by which models should be sorted
    :type columns: Union[List, str]
    :return: QuerySet
    :rtype: QuerySet
    """
    if not isinstance(columns, list):
        columns = [columns]

    orders_by = [
        OrderAction(order_str=x, model_cls=self.model_cls)  # type: ignore
        if not isinstance(x, OrderAction)
        else x
        for x in columns
    ]

    order_bys = self.order_bys + [x for x in orders_by if x not in self.order_bys]
    return self.rebuild_self(order_bys=order_bys)

paginate(page, page_size=20)

You can paginate the result which is a combination of offset and limit clauses. Limit is set to page size and offset is set to (page-1) * page_size.

Parameters:

Name Type Description Default
page_size int

numbers of items per page

20
page int

page number

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
def paginate(self, page: int, page_size: int = 20) -> "QuerySet[T]":
    """
    You can paginate the result which is a combination of offset and limit clauses.
    Limit is set to page size and offset is set to (page-1) * page_size.

    :param page_size: numbers of items per page
    :type page_size: int
    :param page: page number
    :type page: int
    :return: QuerySet
    :rtype: QuerySet
    """
    if page < 1 or page_size < 1:
        raise QueryDefinitionError("Page size and page have to be greater than 0.")

    limit_count = page_size
    query_offset = (page - 1) * page_size
    return self.rebuild_self(limit_count=limit_count, offset=query_offset)

Allows to prefetch related models during query - but opposite to select_related each subsequent model is fetched in a separate database query.

With prefetch_related always one query per Model is run against the database, meaning that you will have multiple queries executed one after another.

To fetch related model use ForeignKey names.

To chain related Models relation use double underscores between names.

Parameters:

Name Type Description Default
related Union[List, str, FieldAccessor]

list of relation field names, can be linked by '__' to nest

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def prefetch_related(
    self, related: Union[List, str, FieldAccessor]
) -> "QuerySet[T]":
    """
    Allows to prefetch related models during query - but opposite to
    `select_related` each subsequent model is fetched in a separate database query.

    **With `prefetch_related` always one query per Model is run against the
    database**, meaning that you will have multiple queries executed one
    after another.

    To fetch related model use `ForeignKey` names.

    To chain related `Models` relation use double underscores between names.

    :param related: list of relation field names, can be linked by '__' to nest
    :type related: Union[List, str]
    :return: QuerySet
    :rtype: QuerySet
    """
    if not isinstance(related, list):
        related = [related]
    related = [
        rel._access_chain if isinstance(rel, FieldAccessor) else rel
        for rel in related
    ]

    related = list(set(list(self._prefetch_related) + related))
    return self.rebuild_self(prefetch_related=related)

rebuild_self(filter_clauses=None, exclude_clauses=None, select_related=None, limit_count=None, offset=None, excludable=None, order_bys=None, prefetch_related=None, limit_raw_sql=None, proxy_source_model=None)

Method that returns new instance of queryset based on passed params, all not passed params are taken from current values.

Source code in ormar\queryset\queryset.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def rebuild_self(  # noqa: CFQ002
    self,
    filter_clauses: List = None,
    exclude_clauses: List = None,
    select_related: List = None,
    limit_count: int = None,
    offset: int = None,
    excludable: "ExcludableItems" = None,
    order_bys: List = None,
    prefetch_related: List = None,
    limit_raw_sql: bool = None,
    proxy_source_model: Optional[Type["Model"]] = None,
) -> "QuerySet":
    """
    Method that returns new instance of queryset based on passed params,
    all not passed params are taken from current values.
    """
    overwrites = {
        "select_related": "_select_related",
        "offset": "query_offset",
        "excludable": "_excludable",
        "prefetch_related": "_prefetch_related",
        "limit_raw_sql": "limit_sql_raw",
    }
    passed_args = locals()

    def replace_if_none(arg_name: str) -> Any:
        if passed_args.get(arg_name) is None:
            return getattr(self, overwrites.get(arg_name, arg_name))
        return passed_args.get(arg_name)

    return self.__class__(
        model_cls=self.model_cls,
        filter_clauses=replace_if_none("filter_clauses"),
        exclude_clauses=replace_if_none("exclude_clauses"),
        select_related=replace_if_none("select_related"),
        limit_count=replace_if_none("limit_count"),
        offset=replace_if_none("offset"),
        excludable=replace_if_none("excludable"),
        order_bys=replace_if_none("order_bys"),
        prefetch_related=replace_if_none("prefetch_related"),
        limit_raw_sql=replace_if_none("limit_raw_sql"),
        proxy_source_model=replace_if_none("proxy_source_model"),
    )

select_all(follow=False)

By default adds only directly related models.

If follow=True is set it adds also related models of related models.

To not get stuck in an infinite loop as related models also keep a relation to parent model visited models set is kept.

That way already visited models that are nested are loaded, but the load do not follow them inside. So Model A -> Model B -> Model C -> Model A -> Model X will load second Model A but will never follow into Model X. Nested relations of those kind need to be loaded manually.

Parameters:

Name Type Description Default
follow bool

flag to trigger deep save - by default only directly related models are saved with follow=True also related models of related models are saved

False

Returns:

Type Description
Model

reloaded Model

Source code in ormar\queryset\queryset.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def select_all(self, follow: bool = False) -> "QuerySet[T]":
    """
    By default adds only directly related models.

    If follow=True is set it adds also related models of related models.

    To not get stuck in an infinite loop as related models also keep a relation
    to parent model visited models set is kept.

    That way already visited models that are nested are loaded, but the load do not
    follow them inside. So Model A -> Model B -> Model C -> Model A -> Model X
    will load second Model A but will never follow into Model X.
    Nested relations of those kind need to be loaded manually.

    :param follow: flag to trigger deep save -
    by default only directly related models are saved
    with follow=True also related models of related models are saved
    :type follow: bool
    :return: reloaded Model
    :rtype: Model
    """
    relations = list(self.model.extract_related_names())
    if follow:
        relations = self.model._iterate_related_models()
    return self.rebuild_self(select_related=relations)

Allows to prefetch related models during the same query.

With select_related always only one query is run against the database, meaning that one (sometimes complicated) join is generated and later nested models are processed in python.

To fetch related model use ForeignKey names.

To chain related Models relation use double underscores between names.

Parameters:

Name Type Description Default
related Union[List, str, FieldAccessor]

list of relation field names, can be linked by '__' to nest

required

Returns:

Type Description
QuerySet

QuerySet

Source code in ormar\queryset\queryset.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def select_related(self, related: Union[List, str, FieldAccessor]) -> "QuerySet[T]":
    """
    Allows to prefetch related models during the same query.

    **With `select_related` always only one query is run against the database**,
    meaning that one (sometimes complicated) join is generated and later nested
    models are processed in python.

    To fetch related model use `ForeignKey` names.

    To chain related `Models` relation use double underscores between names.

    :param related: list of relation field names, can be linked by '__' to nest
    :type related: Union[List, str]
    :return: QuerySet
    :rtype: QuerySet
    """
    if not isinstance(related, list):
        related = [related]
    related = [
        rel._access_chain if isinstance(rel, FieldAccessor) else rel
        for rel in related
    ]

    related = sorted(list(set(list(self._select_related) + related)))
    return self.rebuild_self(select_related=related)

sum(columns) async

Returns sum value of columns for rows matching the given criteria (applied with filter and exclude if set before).

Returns:

Type Description
int

sum value of columns

Source code in ormar\queryset\queryset.py
751
752
753
754
755
756
757
758
759
760
761
async def sum(self, columns: Union[str, List[str]]) -> Any:  # noqa: A003
    """
    Returns sum value of columns for rows matching the given criteria
    (applied with `filter` and `exclude` if set before).

    :return: sum value of columns
    :rtype: int
    """
    if not isinstance(columns, list):
        columns = [columns]
    return await self._query_aggr_function(func_name="sum", columns=columns)

update(each=False, **kwargs) async

Updates the model table after applying the filters from kwargs.

You have to either pass a filter to narrow down a query or explicitly pass each=True flag to affect whole table.

Parameters:

Name Type Description Default
each bool

flag if whole table should be affected if no filter is passed

False
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
int

number of updated rows

Source code in ormar\queryset\queryset.py
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
async def update(self, each: bool = False, **kwargs: Any) -> int:
    """
    Updates the model table after applying the filters from kwargs.

    You have to either pass a filter to narrow down a query or explicitly pass
    each=True flag to affect whole table.

    :param each: flag if whole table should be affected if no filter is passed
    :type each: bool
    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: number of updated rows
    :rtype: int
    """
    if not each and not (self.filter_clauses or self.exclude_clauses):
        raise QueryDefinitionError(
            "You cannot update without filtering the queryset first. "
            "If you want to update all rows use update(each=True, **kwargs)"
        )

    self_fields = self.model.extract_db_own_fields().union(
        self.model.extract_related_names()
    )
    updates = {k: v for k, v in kwargs.items() if k in self_fields}
    updates = self.model.validate_choices(updates)
    updates = self.model.translate_columns_to_aliases(updates)

    expr = FilterQuery(filter_clauses=self.filter_clauses).apply(
        self.table.update().values(**updates)
    )
    expr = FilterQuery(filter_clauses=self.exclude_clauses, exclude=True).apply(
        expr
    )
    return await self.database.execute(expr)

update_or_create(**kwargs) async

Updates the model, or in case there is no match in database creates a new one.

Parameters:

Name Type Description Default
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
Model

updated or created model

Source code in ormar\queryset\queryset.py
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
async def update_or_create(self, **kwargs: Any) -> "T":
    """
    Updates the model, or in case there is no match in database creates a new one.

    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: updated or created model
    :rtype: Model
    """
    pk_name = self.model_meta.pkname
    if "pk" in kwargs:
        kwargs[pk_name] = kwargs.pop("pk")
    if pk_name not in kwargs or kwargs.get(pk_name) is None:
        return await self.create(**kwargs)
    model = await self.get(pk=kwargs[pk_name])
    return await model.update(**kwargs)

values(fields=None, exclude_through=False, _as_dict=True, _flatten=False) async

Return a list of dictionaries with column values in order of the fields passed or all fields from queried models.

To filter for given row use filter/exclude methods before values, to limit number of rows use limit/offset or paginate before values.

Note that it always return a list even for one row from database.

Parameters:

Name Type Description Default
exclude_through bool

flag if through models should be excluded

False
_flatten bool

internal parameter to flatten one element tuples

False
_as_dict bool

internal parameter if return dict or tuples

True
fields Union[List, str, Set, Dict]

field name or list of field names to extract from db

None
Source code in ormar\queryset\queryset.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
async def values(
    self,
    fields: Union[List, str, Set, Dict] = None,
    exclude_through: bool = False,
    _as_dict: bool = True,
    _flatten: bool = False,
) -> List:
    """
    Return a list of dictionaries with column values in order of the fields
    passed or all fields from queried models.

    To filter for given row use filter/exclude methods before values,
    to limit number of rows use limit/offset or paginate before values.

    Note that it always return a list even for one row from database.

    :param exclude_through: flag if through models should be excluded
    :type exclude_through: bool
    :param _flatten: internal parameter to flatten one element tuples
    :type _flatten: bool
    :param _as_dict: internal parameter if return dict or tuples
    :type _as_dict: bool
    :param fields: field name or list of field names to extract from db
    :type fields:  Union[List, str, Set, Dict]
    """
    if fields:
        return await self.fields(columns=fields).values(
            _as_dict=_as_dict, _flatten=_flatten, exclude_through=exclude_through
        )
    expr = self.build_select_expression()
    rows = await self.database.fetch_all(expr)
    if not rows:
        return []
    alias_resolver = ReverseAliasResolver(
        select_related=self._select_related,
        excludable=self._excludable,
        model_cls=self.model_cls,  # type: ignore
        exclude_through=exclude_through,
    )
    column_map = alias_resolver.resolve_columns(
        columns_names=list(cast(LegacyRow, rows[0]).keys())
    )
    result = [
        {column_map.get(k): v for k, v in dict(x).items() if k in column_map}
        for x in rows
    ]
    if _as_dict:
        return result
    if _flatten and self._excludable.include_entry_count() != 1:
        raise QueryDefinitionError(
            "You cannot flatten values_list if more than one field is selected!"
        )
    tuple_result = [tuple(x.values()) for x in result]
    return tuple_result if not _flatten else [x[0] for x in tuple_result]

values_list(fields=None, flatten=False, exclude_through=False) async

Return a list of tuples with column values in order of the fields passed or all fields from queried models.

When one field is passed you can flatten the list of tuples into list of values of that single field.

To filter for given row use filter/exclude methods before values, to limit number of rows use limit/offset or paginate before values.

Note that it always return a list even for one row from database.

Parameters:

Name Type Description Default
exclude_through bool

flag if through models should be excluded

False
fields Union[List, str, Set, Dict]

field name or list of field names to extract from db

None
flatten bool

when one field is passed you can flatten the list of tuples

False
Source code in ormar\queryset\queryset.py
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
async def values_list(
    self,
    fields: Union[List, str, Set, Dict] = None,
    flatten: bool = False,
    exclude_through: bool = False,
) -> List:
    """
    Return a list of tuples with column values in order of the fields passed or
    all fields from queried models.

    When one field is passed you can flatten the list of tuples into list of values
    of that single field.

    To filter for given row use filter/exclude methods before values,
    to limit number of rows use limit/offset or paginate before values.

    Note that it always return a list even for one row from database.

    :param exclude_through: flag if through models should be excluded
    :type exclude_through: bool
    :param fields: field name or list of field names to extract from db
    :type fields: Union[str, List[str]]
    :param flatten: when one field is passed you can flatten the list of tuples
    :type flatten: bool
    """
    return await self.values(
        fields=fields,
        exclude_through=exclude_through,
        _as_dict=False,
        _flatten=flatten,
    )

SelectAction

Bases: QueryAction

Order Actions is populated by queryset when order_by() is called.

All required params are extracted but kept raw until actual filter clause value is required -> then the action is converted into text() clause.

Extracted in order to easily change table prefixes on complex relations.

Source code in ormar\queryset\actions\select_action.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class SelectAction(QueryAction):
    """
    Order Actions is populated by queryset when order_by() is called.

    All required params are extracted but kept raw until actual filter clause value
    is required -> then the action is converted into text() clause.

    Extracted in order to easily change table prefixes on complex relations.
    """

    def __init__(
        self, select_str: str, model_cls: Type["Model"], alias: str = None
    ) -> None:
        super().__init__(query_str=select_str, model_cls=model_cls)
        if alias:  # pragma: no cover
            self.table_prefix = alias

    def _split_value_into_parts(self, order_str: str) -> None:
        parts = order_str.split("__")
        self.field_name = parts[-1]
        self.related_parts = parts[:-1]

    @property
    def is_numeric(self) -> bool:
        return self.get_target_field_type() in [int, float, decimal.Decimal]

    def get_target_field_type(self) -> Any:
        return self.target_model.Meta.model_fields[self.field_name].__type__

    def get_text_clause(self) -> sqlalchemy.sql.expression.TextClause:
        alias = f"{self.table_prefix}_" if self.table_prefix else ""
        return sqlalchemy.text(f"{alias}{self.field_name}")

    def apply_func(
        self, func: Callable, use_label: bool = True
    ) -> sqlalchemy.sql.expression.TextClause:
        result = func(self.get_text_clause())
        if use_label:
            rel_prefix = f"{self.related_str}__" if self.related_str else ""
            result = result.label(f"{rel_prefix}{self.field_name}")
        return result

and_(*args, **kwargs)

Construct and filter from nested groups and keyword arguments

Parameters:

Name Type Description Default
args FilterGroup

nested filter groups

()
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup ready to be resolved

Source code in ormar\queryset\clause.py
147
148
149
150
151
152
153
154
155
156
157
158
def and_(*args: FilterGroup, **kwargs: Any) -> FilterGroup:
    """
    Construct and filter from nested groups and keyword arguments

    :param args: nested filter groups
    :type args: Tuple[FilterGroup]
    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: FilterGroup ready to be resolved
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return FilterGroup(_filter_type=FilterType.AND, *args, **kwargs)

or_(*args, **kwargs)

Construct or filter from nested groups and keyword arguments

Parameters:

Name Type Description Default
args FilterGroup

nested filter groups

()
kwargs Any

fields names and proper value types

{}

Returns:

Type Description
ormar.queryset.clause.FilterGroup

FilterGroup ready to be resolved

Source code in ormar\queryset\clause.py
133
134
135
136
137
138
139
140
141
142
143
144
def or_(*args: FilterGroup, **kwargs: Any) -> FilterGroup:
    """
    Construct or filter from nested groups and keyword arguments

    :param args: nested filter groups
    :type args: Tuple[FilterGroup]
    :param kwargs: fields names and proper value types
    :type kwargs: Any
    :return: FilterGroup ready to be resolved
    :rtype: ormar.queryset.clause.FilterGroup
    """
    return FilterGroup(_filter_type=FilterType.OR, *args, **kwargs)