Skip to content

join

SqlJoin

Source code in ormar\queryset\join.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
class SqlJoin:
    def __init__(  # noqa:  CFQ002
        self,
        used_aliases: List,
        select_from: sqlalchemy.sql.select,
        columns: List[sqlalchemy.Column],
        excludable: "ExcludableItems",
        order_columns: Optional[List["OrderAction"]],
        sorted_orders: Dict,
        main_model: Type["Model"],
        relation_name: str,
        relation_str: str,
        related_models: Any = None,
        own_alias: str = "",
        source_model: Type["Model"] = None,
        already_sorted: Dict = None,
    ) -> None:
        self.relation_name = relation_name
        self.related_models = related_models or []
        self.select_from = select_from
        self.columns = columns
        self.excludable = excludable

        self.order_columns = order_columns
        self.sorted_orders = sorted_orders
        self.already_sorted = already_sorted or dict()

        self.main_model = main_model
        self.own_alias = own_alias
        self.used_aliases = used_aliases
        self.target_field = self.main_model.Meta.model_fields[self.relation_name]

        self._next_model: Optional[Type["Model"]] = None
        self._next_alias: Optional[str] = None

        self.relation_str = relation_str
        self.source_model = source_model

    @property
    def next_model(self) -> Type["Model"]:
        if not self._next_model:  # pragma: nocover
            raise RelationshipInstanceError(
                "Cannot link to related table if relation.to model is not set."
            )
        return self._next_model

    @next_model.setter
    def next_model(self, value: Type["Model"]) -> None:
        self._next_model = value

    @property
    def next_alias(self) -> str:
        if not self._next_alias:  # pragma: nocover
            raise RelationshipInstanceError("Alias for given relation not found.")
        return self._next_alias

    @next_alias.setter
    def next_alias(self, value: str) -> None:
        self._next_alias = value

    @property
    def alias_manager(self) -> AliasManager:
        """
        Shortcut for ormar's model AliasManager stored on Meta.

        :return: alias manager from model's Meta
        :rtype: AliasManager
        """
        return self.main_model.Meta.alias_manager

    @property
    def to_table(self) -> sqlalchemy.Table:
        """
        Shortcut to table name of the next model
        :return: name of the target table
        :rtype: str
        """
        return self.next_model.Meta.table

    def _on_clause(self, previous_alias: str, from_table_name:str, from_column_name: str, to_table_name: str, to_column_name: str) -> text:
        """
        Receives aliases and names of both ends of the join and combines them
        into one text clause used in joins.

        :param previous_alias: alias of previous table
        :type previous_alias: str
        :param from_table_name: from table name
        :type from_table_name: str
        :param from_column_name: from column name
        :type from_column_name: str
        :param to_table_name: to table name
        :type to_table_name: str
        :param to_column_name: to column name
        :type to_column_name: str
        :return: clause combining all strings
        :rtype: sqlalchemy.text
        """
        dialect = self.main_model.Meta.database._backend._dialect
        quoter = dialect.identifier_preparer.quote
        left_part = f"{quoter(f'{self.next_alias}_{to_table_name}')}.{quoter(to_column_name)}"
        if not previous_alias:
            right_part = f"{quoter(from_table_name)}.{quoter(from_column_name)}"
        else:
            right_part = f"{quoter(f'{previous_alias}_{from_table_name}')}.{from_column_name}"

        return text(f"{left_part}={right_part}")

    def build_join(self) -> Tuple[List, sqlalchemy.sql.select, List, Dict]:
        """
        Main external access point for building a join.
        Splits the join definition, updates fields and exclude_fields if needed,
        handles switching to through models for m2m relations, returns updated lists of
        used_aliases and sort_orders.

        :return: list of used aliases, select from, list of aliased columns, sort orders
        :rtype: Tuple[List[str], Join, List[TextClause], Dict]
        """
        if self.target_field.is_multi:
            self._process_m2m_through_table()

        self.next_model = self.target_field.to
        self._forward_join()

        self._process_following_joins()

        return (self.used_aliases, self.select_from, self.columns, self.sorted_orders)

    def _forward_join(self) -> None:
        """
        Process actual join.
        Registers complex relation join on encountering of the duplicated alias.
        """
        self.next_alias = self.alias_manager.resolve_relation_alias(
            from_model=self.target_field.owner, relation_name=self.relation_name
        )
        if self.next_alias not in self.used_aliases:
            self._process_join()
        else:
            if "__" in self.relation_str and self.source_model:
                relation_key = f"{self.source_model.get_name()}_{self.relation_str}"
                if relation_key not in self.alias_manager:
                    self.next_alias = self.alias_manager.add_alias(
                        alias_key=relation_key
                    )
                else:
                    self.next_alias = self.alias_manager[relation_key]
                self._process_join()

    def _process_following_joins(self) -> None:
        """
        Iterates through nested models to create subsequent joins.
        """
        for related_name in self.related_models:
            remainder = None
            if (
                isinstance(self.related_models, dict)
                and self.related_models[related_name]
            ):
                remainder = self.related_models[related_name]
            self._process_deeper_join(related_name=related_name, remainder=remainder)

    def _process_deeper_join(self, related_name: str, remainder: Any) -> None:
        """
        Creates nested recurrent instance of SqlJoin for each nested join table,
        updating needed return params here as a side effect.

        Updated are:

        * self.used_aliases,
        * self.select_from,
        * self.columns,
        * self.sorted_orders,

        :param related_name: name of the relation to follow
        :type related_name: str
        :param remainder: deeper tables if there are more nested joins
        :type remainder: Any
        """
        sql_join = SqlJoin(
            used_aliases=self.used_aliases,
            select_from=self.select_from,
            columns=self.columns,
            excludable=self.excludable,
            order_columns=self.order_columns,
            sorted_orders=self.sorted_orders,
            main_model=self.next_model,
            relation_name=related_name,
            related_models=remainder,
            relation_str="__".join([self.relation_str, related_name]),
            own_alias=self.next_alias,
            source_model=self.source_model or self.main_model,
            already_sorted=self.already_sorted,
        )
        (
            self.used_aliases,
            self.select_from,
            self.columns,
            self.sorted_orders,
        ) = sql_join.build_join()

    def _process_m2m_through_table(self) -> None:
        """
        Process Through table of the ManyToMany relation so that source table is
        linked to the through table (one additional join)

        Replaces needed parameters like:

        * self.next_model,
        * self.next_alias,
        * self.relation_name,
        * self.own_alias,
        * self.target_field

        To point to through model
        """
        new_part = self._process_m2m_related_name_change()

        self.next_model = self.target_field.through
        self._forward_join()

        self.relation_name = new_part
        self.own_alias = self.next_alias
        self.target_field = self.next_model.Meta.model_fields[self.relation_name]

    def _process_m2m_related_name_change(self, reverse: bool = False) -> str:
        """
        Extracts relation name to link join through the Through model declared on
        relation field.

        Changes the same names in order_by queries if they are present.

        :param reverse: flag if it's on_clause lookup - use reverse fields
        :type reverse: bool
        :return: new relation name switched to through model field
        :rtype: str
        """
        target_field = self.target_field
        is_primary_self_ref = (
            target_field.self_reference
            and self.relation_name == target_field.self_reference_primary
        )
        if (is_primary_self_ref and not reverse) or (
            not is_primary_self_ref and reverse
        ):
            new_part = target_field.default_source_field_name()  # type: ignore
        else:
            new_part = target_field.default_target_field_name()  # type: ignore
        return new_part

    def _process_join(self) -> None:  # noqa: CFQ002
        """
        Resolves to and from column names and table names.

        Produces on_clause.

        Performs actual join updating select_from parameter.

        Adds aliases of required column to list of columns to include in query.

        Updates the used aliases list directly.

        Process order_by causes for non m2m relations.

        """
        to_key, from_key = self._get_to_and_from_keys()

        on_clause = self._on_clause(
            previous_alias=self.own_alias,
            from_table_name=self.target_field.owner.Meta.tablename,
            from_column_name=from_key,
            to_table_name=self.to_table.name,
            to_column_name=to_key,
        )
        target_table = self.alias_manager.prefixed_table_name(
            self.next_alias, self.to_table
        )
        self.select_from = sqlalchemy.sql.outerjoin(
            self.select_from, target_table, on_clause
        )

        self._get_order_bys()

        self_related_fields = self.next_model.own_table_columns(
            model=self.next_model,
            excludable=self.excludable,
            alias=self.next_alias,
            use_alias=True,
        )
        self.columns.extend(
            self.alias_manager.prefixed_columns(
                self.next_alias, target_table, self_related_fields
            )
        )
        self.used_aliases.append(self.next_alias)

    def _set_default_primary_key_order_by(self) -> None:
        for order_by in self.next_model.Meta.orders_by:
            clause = ormar.OrderAction(
                order_str=order_by, model_cls=self.next_model, alias=self.next_alias
            )
            self.sorted_orders[clause] = clause.get_text_clause()

    def _verify_allowed_order_field(self, order_by: str) -> None:
        """
        Verifies if proper field string is used.
        :param order_by: string with order by definition
        :type order_by: str
        """
        parts = order_by.split("__")
        if len(parts) > 2 or parts[0] != self.target_field.through.get_name():
            raise ModelDefinitionError(
                "You can order the relation only " "by related or link table columns!"
            )

    def _get_alias_and_model(self, order_by: str) -> Tuple[str, Type["Model"]]:
        """
        Returns proper model and alias to be applied in the clause.

        :param order_by: string with order by definition
        :type order_by: str
        :return: alias and model to be used in clause
        :rtype: Tuple[str, Type["Model"]]
        """
        if self.target_field.is_multi and "__" in order_by:
            self._verify_allowed_order_field(order_by=order_by)
            alias = self.next_alias
            model = self.target_field.owner
        elif self.target_field.is_multi:
            alias = self.alias_manager.resolve_relation_alias(
                from_model=self.target_field.through,
                relation_name=cast(
                    "ManyToManyField", self.target_field
                ).default_target_field_name(),
            )
            model = self.target_field.to
        else:
            alias = self.alias_manager.resolve_relation_alias(
                from_model=self.target_field.owner, relation_name=self.target_field.name
            )
            model = self.target_field.to

        return alias, model

    def _get_order_bys(self) -> None:  # noqa: CCR001
        """
        Triggers construction of order bys if they are given.
        Otherwise by default each table is sorted by a primary key column asc.
        """
        alias = self.next_alias
        current_table_sorted = False
        if f"{alias}_{self.next_model.get_name()}" in self.already_sorted:
            current_table_sorted = True
        if self.order_columns:
            for condition in self.order_columns:
                if condition.check_if_filter_apply(
                    target_model=self.next_model, alias=alias
                ):
                    current_table_sorted = True
                    self.sorted_orders[condition] = condition.get_text_clause()
                    self.already_sorted[
                        f"{self.next_alias}_{self.next_model.get_name()}"
                    ] = condition
        if self.target_field.orders_by and not current_table_sorted:
            current_table_sorted = True
            for order_by in self.target_field.orders_by:
                alias, model = self._get_alias_and_model(order_by=order_by)
                clause = ormar.OrderAction(
                    order_str=order_by, model_cls=model, alias=alias
                )
                self.sorted_orders[clause] = clause.get_text_clause()
                self.already_sorted[f"{alias}_{model.get_name()}"] = clause

        if not current_table_sorted and not self.target_field.is_multi:
            self._set_default_primary_key_order_by()

    def _get_to_and_from_keys(self) -> Tuple[str, str]:
        """
        Based on the relation type, name of the relation and previous models and parts
        stored in JoinParameters it resolves the current to and from keys, which are
        different for ManyToMany relation, ForeignKey and reverse related of relations.

        :return: to key and from key
        :rtype: Tuple[str, str]
        """
        if self.target_field.is_multi:
            to_key = self._process_m2m_related_name_change(reverse=True)
            from_key = self.main_model.get_column_alias(self.main_model.Meta.pkname)

        elif self.target_field.virtual:
            to_field = self.target_field.get_related_name()
            to_key = self.target_field.to.get_column_alias(to_field)
            from_key = self.main_model.get_column_alias(self.main_model.Meta.pkname)

        else:
            to_key = self.target_field.to.get_column_alias(
                self.target_field.to.Meta.pkname
            )
            from_key = self.main_model.get_column_alias(self.relation_name)

        return to_key, from_key

alias_manager: AliasManager property

Shortcut for ormar's model AliasManager stored on Meta.

Returns:

Type Description
AliasManager

alias manager from model's Meta

to_table: sqlalchemy.Table property

Shortcut to table name of the next model

Returns:

Type Description
str

name of the target table

build_join()

Main external access point for building a join. Splits the join definition, updates fields and exclude_fields if needed, handles switching to through models for m2m relations, returns updated lists of used_aliases and sort_orders.

Returns:

Type Description
Tuple[List[str], Join, List[TextClause], Dict]

list of used aliases, select from, list of aliased columns, sort orders

Source code in ormar\queryset\join.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def build_join(self) -> Tuple[List, sqlalchemy.sql.select, List, Dict]:
    """
    Main external access point for building a join.
    Splits the join definition, updates fields and exclude_fields if needed,
    handles switching to through models for m2m relations, returns updated lists of
    used_aliases and sort_orders.

    :return: list of used aliases, select from, list of aliased columns, sort orders
    :rtype: Tuple[List[str], Join, List[TextClause], Dict]
    """
    if self.target_field.is_multi:
        self._process_m2m_through_table()

    self.next_model = self.target_field.to
    self._forward_join()

    self._process_following_joins()

    return (self.used_aliases, self.select_from, self.columns, self.sorted_orders)