19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246 | class Query:
def __init__( # noqa CFQ002
self,
model_cls: Type["Model"],
filter_clauses: List[FilterAction],
exclude_clauses: List[FilterAction],
select_related: List,
limit_count: Optional[int],
offset: Optional[int],
excludable: "ExcludableItems",
order_bys: Optional[List["OrderAction"]],
limit_raw_sql: bool,
) -> None:
self.query_offset = offset
self.limit_count = limit_count
self._select_related = select_related[:]
self.filter_clauses = filter_clauses[:]
self.exclude_clauses = exclude_clauses[:]
self.excludable = excludable
self.model_cls = model_cls
self.table = self.model_cls.Meta.table
self.used_aliases: List[str] = []
self.select_from: Union[Join, Table, List[str]] = []
self.columns = [sqlalchemy.Column]
self.order_columns = order_bys
self.sorted_orders: Dict[OrderAction, text] = {}
self._init_sorted_orders()
self.limit_raw_sql = limit_raw_sql
def _init_sorted_orders(self) -> None:
"""
Initialize empty order_by dict to be populated later during the query call
"""
if self.order_columns:
for clause in self.order_columns:
self.sorted_orders[clause] = None
def apply_order_bys_for_primary_model(self) -> None: # noqa: CCR001
"""
Applies order_by queries on main model when it's used as a subquery.
That way the subquery with limit and offset only on main model has proper
sorting applied and correct models are fetched.
"""
current_table_sorted = False
if self.order_columns:
for clause in self.order_columns:
if clause.is_source_model_order:
current_table_sorted = True
self.sorted_orders[clause] = clause.get_text_clause()
if not current_table_sorted:
self._apply_default_model_sorting()
def _apply_default_model_sorting(self) -> None:
"""
Applies orders_by from model Meta class (if provided), if it was not provided
it was filled by metaclass so it's always there and falls back to pk column
"""
for order_by in self.model_cls.Meta.orders_by:
clause = ormar.OrderAction(order_str=order_by, model_cls=self.model_cls)
self.sorted_orders[clause] = clause.get_text_clause()
def _pagination_query_required(self) -> bool:
"""
Checks if limit or offset are set, the flag limit_sql_raw is not set
and query has select_related applied. Otherwise we can limit/offset normally
at the end of whole query.
:return: result of the check
:rtype: bool
"""
return bool(
(self.limit_count or self.query_offset)
and not self.limit_raw_sql
and self._select_related
)
def build_select_expression(self) -> Tuple[sqlalchemy.sql.select, List[str]]:
"""
Main entry point from outside (after proper initialization).
Extracts columns list to fetch,
construct all required joins for select related,
then applies all conditional and sort clauses.
Returns ready to run query with all joins and clauses.
:return: ready to run query with all joins and clauses.
:rtype: sqlalchemy.sql.selectable.Select
"""
self_related_fields = self.model_cls.own_table_columns(
model=self.model_cls, excludable=self.excludable, use_alias=True
)
self.columns = self.model_cls.Meta.alias_manager.prefixed_columns(
"", self.table, self_related_fields
)
self.apply_order_bys_for_primary_model()
self.select_from = self.table
related_models = group_related_list(self._select_related)
for related in related_models:
remainder = None
if isinstance(related_models, dict) and related_models[related]:
remainder = related_models[related]
sql_join = SqlJoin(
used_aliases=self.used_aliases,
select_from=self.select_from,
columns=self.columns,
excludable=self.excludable,
order_columns=self.order_columns,
sorted_orders=self.sorted_orders,
main_model=self.model_cls,
relation_name=related,
relation_str=related,
related_models=remainder,
)
(
self.used_aliases,
self.select_from,
self.columns,
self.sorted_orders,
) = sql_join.build_join()
if self._pagination_query_required():
limit_qry, on_clause = self._build_pagination_condition()
self.select_from = sqlalchemy.sql.join(
self.select_from, limit_qry, on_clause
)
expr = sqlalchemy.sql.select(self.columns)
expr = expr.select_from(self.select_from)
expr = self._apply_expression_modifiers(expr)
# print("\n", expr.compile(compile_kwargs={"literal_binds": True}))
self._reset_query_parameters()
return expr
def _build_pagination_condition(
self,
) -> Tuple[
sqlalchemy.sql.expression.TextClause, sqlalchemy.sql.expression.TextClause
]:
"""
In order to apply limit and offset on main table in join only
(otherwise you can get only partially constructed main model
if number of children exceeds the applied limit and select_related is used)
Used also to change first and get() without argument behaviour.
Needed only if limit or offset are set, the flag limit_sql_raw is not set
and query has select_related applied. Otherwise we can limit/offset normally
at the end of whole query.
The condition is added to filters to filter out desired number of main model
primary key values. Whole query is used to determine the values.
"""
pk_alias = self.model_cls.get_column_alias(self.model_cls.Meta.pkname)
pk_aliased_name = f"{self.table.name}.{pk_alias}"
qry_text = sqlalchemy.text(f"{pk_aliased_name}")
maxes = {}
for order in list(self.sorted_orders.keys()):
if order is not None and order.get_field_name_text() != pk_aliased_name:
aliased_col = order.get_field_name_text()
# maxes[aliased_col] = order.get_text_clause()
maxes[aliased_col] = order.get_min_or_max()
elif order.get_field_name_text() == pk_aliased_name:
maxes[pk_aliased_name] = order.get_text_clause()
limit_qry = sqlalchemy.sql.select([qry_text])
limit_qry = limit_qry.select_from(self.select_from)
limit_qry = FilterQuery(filter_clauses=self.filter_clauses).apply(limit_qry)
limit_qry = FilterQuery(
filter_clauses=self.exclude_clauses, exclude=True
).apply(limit_qry)
limit_qry = limit_qry.group_by(qry_text)
for order_by in maxes.values():
limit_qry = limit_qry.order_by(order_by)
limit_qry = LimitQuery(limit_count=self.limit_count).apply(limit_qry)
limit_qry = OffsetQuery(query_offset=self.query_offset).apply(limit_qry)
limit_qry = limit_qry.alias("limit_query")
on_clause = sqlalchemy.text(
f"limit_query.{pk_alias}={self.table.name}.{pk_alias}"
)
return limit_qry, on_clause
def _apply_expression_modifiers(
self, expr: sqlalchemy.sql.select
) -> sqlalchemy.sql.select:
"""
Receives the select query (might be join) and applies:
* Filter clauses
* Exclude filter clauses
* Limit clauses
* Offset clauses
* Order by clauses
Returns complete ready to run query.
:param expr: select expression before clauses
:type expr: sqlalchemy.sql.selectable.Select
:return: expression with all present clauses applied
:rtype: sqlalchemy.sql.selectable.Select
"""
expr = FilterQuery(filter_clauses=self.filter_clauses).apply(expr)
expr = FilterQuery(filter_clauses=self.exclude_clauses, exclude=True).apply(
expr
)
if not self._pagination_query_required():
expr = LimitQuery(limit_count=self.limit_count).apply(expr)
expr = OffsetQuery(query_offset=self.query_offset).apply(expr)
expr = OrderQuery(sorted_orders=self.sorted_orders).apply(expr)
return expr
def _reset_query_parameters(self) -> None:
"""
Although it should be created each time before the call we reset the key params
anyway.
"""
self.select_from = []
self.columns = []
self.used_aliases = []
|