Skip to content

utils

check_node_not_dict_or_not_last_node(part, is_last, current_level)

Checks if given name is not present in the current level of the structure. Checks if given name is not the last name in the split list of parts. Checks if the given name in current level is not a dictionary.

All those checks verify if there is a need for deeper traversal.

:param part: :type part: str :param is_last: flag to check if last element :type is_last: bool :param current_level: current level of the traversed structure :type current_level: Any :return: result of the check :rtype: bool

Source code in ormar/queryset/utils.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def check_node_not_dict_or_not_last_node(
    part: str, is_last: bool, current_level: Any
) -> bool:
    """
    Checks if given name is not present in the current level of the structure.
    Checks if given name is not the last name in the split list of parts.
    Checks if the given name in current level is not a dictionary.

    All those checks verify if there is a need for deeper traversal.

    :param part:
    :type part: str
    :param is_last: flag to check if last element
    :type is_last: bool
    :param current_level: current level of the traversed structure
    :type current_level: Any
    :return: result of the check
    :rtype: bool
    """
    return (part not in current_level and not is_last) or (
        part in current_level and not isinstance(current_level[part], dict)
    )

convert_set_to_required_dict(set_to_convert)

Converts set to dictionary of required keys. Required key is Ellipsis.

:param set_to_convert: set to convert to dict :type set_to_convert: set :return: set converted to dict of ellipsis :rtype: Dict

Source code in ormar/queryset/utils.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def convert_set_to_required_dict(set_to_convert: set) -> Dict:
    """
    Converts set to dictionary of required keys.
    Required key is Ellipsis.

    :param set_to_convert: set to convert to dict
    :type set_to_convert: set
    :return: set converted to dict of ellipsis
    :rtype: Dict
    """
    new_dict = dict()
    for key in set_to_convert:
        new_dict[key] = Ellipsis
    return new_dict

get_relationship_alias_model_and_str(source_model, related_parts)

Walks the relation to retrieve the actual model on which the clause should be constructed, extracts alias based on last relation leading to target model. :param related_parts: list of related names extracted from string :type related_parts: Union[List, List[str]] :param source_model: model from which relation starts :type source_model: Type[Model] :return: table prefix, target model and relation string :rtype: Tuple[str, Type["Model"], str]

Source code in ormar/queryset/utils.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def get_relationship_alias_model_and_str(
    source_model: Type["Model"], related_parts: List
) -> Tuple[str, Type["Model"], str, bool]:
    """
    Walks the relation to retrieve the actual model on which the clause should be
    constructed, extracts alias based on last relation leading to target model.
    :param related_parts: list of related names extracted from string
    :type related_parts: Union[List, List[str]]
    :param source_model: model from which relation starts
    :type source_model: Type[Model]
    :return: table prefix, target model and relation string
    :rtype: Tuple[str, Type["Model"], str]
    """
    table_prefix = ""
    is_through = False
    target_model = source_model
    previous_model = target_model
    previous_models = [target_model]
    manager = target_model.ormar_config.alias_manager
    for relation in related_parts[:]:
        related_field = target_model.ormar_config.model_fields[relation]

        if related_field.is_through:
            (previous_model, relation, is_through) = _process_through_field(
                related_parts=related_parts,
                relation=relation,
                related_field=related_field,
                previous_model=previous_model,
                previous_models=previous_models,
            )
        if related_field.is_multi:
            previous_model = related_field.through
            relation = related_field.default_target_field_name()  # type: ignore
        table_prefix = manager.resolve_relation_alias(
            from_model=previous_model, relation_name=relation
        )
        target_model = related_field.to
        previous_model = target_model
        if not is_through:
            previous_models.append(previous_model)
    relation_str = "__".join(related_parts)

    return table_prefix, target_model, relation_str, is_through

subtract_dict(current_dict, updating_dict)

Update one dict with another but with regard for nested keys.

That way nested sets are unionised, dicts updated and only other values are overwritten.

:param current_dict: dict to update :type current_dict: Dict[str, ellipsis] :param updating_dict: dict with values to update :type updating_dict: Dict :return: combination of both dicts :rtype: Dict

Source code in ormar/queryset/utils.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def subtract_dict(current_dict: Any, updating_dict: Any) -> Dict:  # noqa: CCR001
    """
    Update one dict with another but with regard for nested keys.

    That way nested sets are unionised, dicts updated and
    only other values are overwritten.

    :param current_dict: dict to update
    :type current_dict: Dict[str, ellipsis]
    :param updating_dict: dict with values to update
    :type updating_dict: Dict
    :return: combination of both dicts
    :rtype: Dict
    """
    for key, value in updating_dict.items():
        old_key = current_dict.get(key, {})
        new_value: Optional[Union[Dict, Set]] = None
        if not old_key:
            continue
        if isinstance(value, set) and isinstance(old_key, set):
            new_value = old_key.difference(value)
        elif isinstance(value, (set, collections.abc.Mapping)) and isinstance(
            old_key, (set, collections.abc.Mapping)
        ):
            value = (
                convert_set_to_required_dict(value)
                if not isinstance(value, collections.abc.Mapping)
                else value
            )
            old_key = (
                convert_set_to_required_dict(old_key)
                if not isinstance(old_key, collections.abc.Mapping)
                else old_key
            )
            new_value = subtract_dict(old_key, value)

        if new_value:
            current_dict[key] = new_value
        else:
            current_dict.pop(key, None)
    return current_dict

translate_list_to_dict(list_to_trans, default=...)

Splits the list of strings by '__' and converts them to dictionary with nested models grouped by parent model. That way each model appears only once in the whole dictionary and children are grouped under parent name.

Default required key ise Ellipsis like in pydantic.

:param list_to_trans: input list :type list_to_trans: Union[List, Set] :param default: value to use as a default value :type default: Any :param is_order: flag if change affects order_by clauses are they require special default value with sort order. :type is_order: bool :return: converted to dictionary input list :rtype: Dict

Source code in ormar/queryset/utils.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def translate_list_to_dict(  # noqa: CCR001
    list_to_trans: Union[List, Set], default: Any = ...
) -> Dict:
    """
    Splits the list of strings by '__' and converts them to dictionary with nested
    models grouped by parent model. That way each model appears only once in the whole
    dictionary and children are grouped under parent name.

    Default required key ise Ellipsis like in pydantic.

    :param list_to_trans: input list
    :type list_to_trans: Union[List, Set]
    :param default: value to use as a default value
    :type default: Any
    :param is_order: flag if change affects order_by clauses are they require special
    default value with sort order.
    :type is_order: bool
    :return: converted to dictionary input list
    :rtype: Dict
    """
    new_dict: Dict = dict()
    for path in list_to_trans:
        current_level = new_dict
        parts = path.split("__")
        def_val: Any = copy.deepcopy(default)
        for ind, part in enumerate(parts):
            is_last = ind == len(parts) - 1
            if check_node_not_dict_or_not_last_node(
                part=part, is_last=is_last, current_level=current_level
            ):
                current_level[part] = dict()
            elif part not in current_level:
                current_level[part] = def_val
            current_level = current_level[part]
    return new_dict

update(current_dict, updating_dict)

Update one dict with another but with regard for nested keys.

That way nested sets are unionised, dicts updated and only other values are overwritten.

:param current_dict: dict to update :type current_dict: Dict[str, ellipsis] :param updating_dict: dict with values to update :type updating_dict: Dict :return: combination of both dicts :rtype: Dict

Source code in ormar/queryset/utils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def update(current_dict: Any, updating_dict: Any) -> Dict:  # noqa: CCR001
    """
    Update one dict with another but with regard for nested keys.

    That way nested sets are unionised, dicts updated and
    only other values are overwritten.

    :param current_dict: dict to update
    :type current_dict: Dict[str, ellipsis]
    :param updating_dict: dict with values to update
    :type updating_dict: Dict
    :return: combination of both dicts
    :rtype: Dict
    """
    if current_dict is Ellipsis:
        current_dict = dict()
    for key, value in updating_dict.items():
        if isinstance(value, collections.abc.Mapping):
            old_key = current_dict.get(key, {})
            if isinstance(old_key, set):
                old_key = convert_set_to_required_dict(old_key)
            current_dict[key] = update(old_key, value)
        elif isinstance(value, set) and isinstance(current_dict.get(key), set):
            current_dict[key] = current_dict.get(key).union(value)
        else:
            current_dict[key] = value
    return current_dict

update_dict_from_list(curr_dict, list_to_update)

Converts the list into dictionary and later performs special update, where nested keys that are sets or dicts are combined and not overwritten.

:param curr_dict: dict to update :type curr_dict: Dict :param list_to_update: list with values to update the dict :type list_to_update: List[str] :return: updated dict :rtype: Dict

Source code in ormar/queryset/utils.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def update_dict_from_list(curr_dict: Dict, list_to_update: Union[List, Set]) -> Dict:
    """
    Converts the list into dictionary and later performs special update, where
    nested keys that are sets or dicts are combined and not overwritten.

    :param curr_dict: dict to update
    :type curr_dict: Dict
    :param list_to_update: list with values to update the dict
    :type list_to_update: List[str]
    :return: updated dict
    :rtype: Dict
    """
    updated_dict = copy.copy(curr_dict)
    dict_to_update = translate_list_to_dict(list_to_update)
    update(updated_dict, dict_to_update)
    return updated_dict