Skip to content

save_mixin

SavePrepareMixin

Bases: RelationMixin, AliasMixin

Used to prepare models to be saved in database

Source code in ormar\models\mixins\save_mixin.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
class SavePrepareMixin(RelationMixin, AliasMixin):
    """
    Used to prepare models to be saved in database
    """

    if TYPE_CHECKING:  # pragma: nocover
        _choices_fields: Optional[Set]
        _skip_ellipsis: Callable
        _json_fields: Set[str]
        _bytes_fields: Set[str]
        __fields__: Dict[str, pydantic.fields.ModelField]

    @classmethod
    def prepare_model_to_save(cls, new_kwargs: dict) -> dict:
        """
        Combines all preparation methods before saving.
        Removes primary key for if it's nullable or autoincrement pk field,
        and it's set to None.
        Substitute related models with their primary key values as fk column.
        Populates the default values for field with default set and no value.
        Translate columns into aliases (db names).

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: Dict[str, str]
        :return: dictionary of model that is about to be saved
        :rtype: Dict[str, str]
        """
        new_kwargs = cls._remove_pk_from_kwargs(new_kwargs)
        new_kwargs = cls._remove_not_ormar_fields(new_kwargs)
        new_kwargs = cls.substitute_models_with_pks(new_kwargs)
        new_kwargs = cls.populate_default_values(new_kwargs)
        new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
        new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
        return new_kwargs

    @classmethod
    def prepare_model_to_update(cls, new_kwargs: dict) -> dict:
        """
        Combines all preparation methods before updating.
        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: Dict[str, str]
        :return: dictionary of model that is about to be updated
        :rtype: Dict[str, str]
        """
        new_kwargs = cls.parse_non_db_fields(new_kwargs)
        new_kwargs = cls.substitute_models_with_pks(new_kwargs)
        new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
        new_kwargs = cls.dump_all_json_fields_to_str(new_kwargs)
        new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
        new_kwargs = cls.translate_enum_columns(new_kwargs)
        return new_kwargs

    @classmethod
    def translate_enum_columns(cls, new_kwargs: dict) -> dict:
        for key, value in new_kwargs.items():
            if isinstance(value, Enum):
                new_kwargs[key] = value.name
        return new_kwargs

    @classmethod
    def _remove_not_ormar_fields(cls, new_kwargs: dict) -> dict:
        """
        Removes primary key for if it's nullable or autoincrement pk field,
        and it's set to None.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: Dict[str, str]
        :return: dictionary of model that is about to be saved
        :rtype: Dict[str, str]
        """
        ormar_fields = {
            k for k, v in cls.Meta.model_fields.items() if not v.pydantic_only
        }
        new_kwargs = {k: v for k, v in new_kwargs.items() if k in ormar_fields}
        return new_kwargs

    @classmethod
    def _remove_pk_from_kwargs(cls, new_kwargs: dict) -> dict:
        """
        Removes primary key for if it's nullable or autoincrement pk field,
        and it's set to None.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: Dict[str, str]
        :return: dictionary of model that is about to be saved
        :rtype: Dict[str, str]
        """
        pkname = cls.Meta.pkname
        pk = cls.Meta.model_fields[pkname]
        if new_kwargs.get(pkname, ormar.Undefined) is None and (
            pk.nullable or pk.autoincrement
        ):
            del new_kwargs[pkname]
        return new_kwargs

    @classmethod
    def parse_non_db_fields(cls, model_dict: Dict) -> Dict:
        """
        Receives dictionary of model that is about to be saved and changes uuid fields
        to strings in bulk_update.

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: Dict
        :return: dictionary of model that is about to be saved
        :rtype: Dict
        """
        for name, field in cls.Meta.model_fields.items():
            if field.__type__ == uuid.UUID and name in model_dict:
                parsers = {"string": lambda x: str(x), "hex": lambda x: "%.32x" % x.int}
                uuid_format = field.column_type.uuid_format
                parser: Callable[..., Any] = parsers.get(uuid_format, lambda x: x)
                model_dict[name] = parser(model_dict[name])
        return model_dict

    @classmethod
    def substitute_models_with_pks(cls, model_dict: Dict) -> Dict:  # noqa  CCR001
        """
        Receives dictionary of model that is about to be saved and changes all related
        models that are stored as foreign keys to their fk value.

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: Dict
        :return: dictionary of model that is about to be saved
        :rtype: Dict
        """
        for field in cls.extract_related_names():
            field_value = model_dict.get(field, None)
            if field_value is not None:
                target_field = cls.Meta.model_fields[field]
                target_pkname = target_field.to.Meta.pkname
                if isinstance(field_value, ormar.Model):  # pragma: no cover
                    pk_value = getattr(field_value, target_pkname)
                    if not pk_value:
                        raise ModelPersistenceError(
                            f"You cannot save {field_value.get_name()} "
                            f"model without pk set!"
                        )
                    model_dict[field] = pk_value
                elif isinstance(field_value, (list, dict)) and field_value:
                    if isinstance(field_value, list):
                        model_dict[field] = [
                            target.get(target_pkname) for target in field_value
                        ]
                    else:
                        model_dict[field] = field_value.get(target_pkname)
                else:
                    model_dict.pop(field, None)
        return model_dict

    @classmethod
    def reconvert_str_to_bytes(cls, model_dict: Dict) -> Dict:
        """
        Receives dictionary of model that is about to be saved and changes
        all bytes fields that are represented as strings back into bytes.

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: Dict
        :return: dictionary of model that is about to be saved
        :rtype: Dict
        """
        bytes_base64_fields = {
            name
            for name, field in cls.Meta.model_fields.items()
            if field.represent_as_base64_str
        }
        for key, value in model_dict.items():
            if key in cls._bytes_fields and isinstance(value, str):
                model_dict[key] = (
                    value.encode("utf-8")
                    if key not in bytes_base64_fields
                    else base64.b64decode(value)
                )
        return model_dict

    @classmethod
    def dump_all_json_fields_to_str(cls, model_dict: Dict) -> Dict:
        """
        Receives dictionary of model that is about to be saved and changes
        all json fields into strings

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: Dict
        :return: dictionary of model that is about to be saved
        :rtype: Dict
        """
        for key, value in model_dict.items():
            if key in cls._json_fields:
                model_dict[key] = encode_json(value)
        return model_dict

    @classmethod
    def populate_default_values(cls, new_kwargs: Dict) -> Dict:
        """
        Receives dictionary of model that is about to be saved and populates the default
        value on the fields that have the default value set, but no actual value was
        passed by the user.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: Dict
        :return: dictionary of model that is about to be saved
        :rtype: Dict
        """
        for field_name, field in cls.Meta.model_fields.items():
            if (
                field_name not in new_kwargs
                and field.has_default(use_server=False)
                and not field.pydantic_only
            ):
                new_kwargs[field_name] = field.get_default()
            # clear fields with server_default set as None
            if (
                field.server_default is not None
                and new_kwargs.get(field_name, None) is None
            ):
                new_kwargs.pop(field_name, None)
        return new_kwargs

    @classmethod
    def validate_choices(cls, new_kwargs: Dict) -> Dict:
        """
        Receives dictionary of model that is about to be saved and validates the
        fields with choices set to see if the value is allowed.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: Dict
        :return: dictionary of model that is about to be saved
        :rtype: Dict
        """
        if not cls._choices_fields:
            return new_kwargs

        fields_to_check = [
            field
            for field in cls.Meta.model_fields.values()
            if field.name in cls._choices_fields and field.name in new_kwargs
        ]
        for field in fields_to_check:
            if new_kwargs[field.name] not in field.choices:
                raise ValueError(
                    f"{field.name}: '{new_kwargs[field.name]}' "
                    f"not in allowed choices set:"
                    f" {field.choices}"
                )
        return new_kwargs

    @staticmethod
    async def _upsert_model(
        instance: "Model",
        save_all: bool,
        previous_model: Optional["Model"],
        relation_field: Optional["ForeignKeyField"],
        update_count: int,
    ) -> int:
        """
        Method updates given instance if:

        * instance is not saved or
        * instance have no pk or
        * save_all=True flag is set

        and instance is not __pk_only__.

        If relation leading to instance is a ManyToMany also the through model is saved

        :param instance: current model to upsert
        :type instance: Model
        :param save_all: flag if all models should be saved or only not saved ones
        :type save_all: bool
        :param relation_field: field with relation
        :type relation_field: Optional[ForeignKeyField]
        :param previous_model: previous model from which method came
        :type previous_model: Model
        :param update_count: no of updated models
        :type update_count: int
        :return: no of updated models
        :rtype: int
        """
        if (
            save_all or not instance.pk or not instance.saved
        ) and not instance.__pk_only__:
            await instance.upsert(__force_save__=True)
            if relation_field and relation_field.is_multi:
                await instance._upsert_through_model(
                    instance=instance,
                    relation_field=relation_field,
                    previous_model=cast("Model", previous_model),
                )
            update_count += 1
        return update_count

    @staticmethod
    async def _upsert_through_model(
        instance: "Model", previous_model: "Model", relation_field: "ForeignKeyField"
    ) -> None:
        """
        Upsert through model for m2m relation.

        :param instance: current model to upsert
        :type instance: Model
        :param relation_field: field with relation
        :type relation_field: Optional[ForeignKeyField]
        :param previous_model: previous model from which method came
        :type previous_model: Model
        """
        through_name = previous_model.Meta.model_fields[
            relation_field.name
        ].through.get_name()
        through = getattr(instance, through_name)
        if through:
            through_dict = through.dict(exclude=through.extract_related_names())
        else:
            through_dict = {}
        await getattr(
            previous_model, relation_field.name
        ).queryset_proxy.upsert_through_instance(instance, **through_dict)

    async def _update_relation_list(
        self,
        fields_list: Collection["ForeignKeyField"],
        follow: bool,
        save_all: bool,
        relation_map: Dict,
        update_count: int,
    ) -> int:
        """
        Internal method used in save_related to follow deeper from
        related models and update numbers of updated related instances.

        :type save_all: flag if all models should be saved
        :type save_all: bool
        :param fields_list: list of ormar fields to follow and save
        :type fields_list: Collection["ForeignKeyField"]
        :param relation_map: map of relations to follow
        :type relation_map: Dict
        :param follow: flag to trigger deep save -
        by default only directly related models are saved
        with follow=True also related models of related models are saved
        :type follow: bool
        :param update_count: internal parameter for recursive calls -
        number of updated instances
        :type update_count: int
        :return: tuple of update count and visited
        :rtype: int
        """
        for field in fields_list:
            values = self._get_field_values(name=field.name)
            for value in values:
                if follow:
                    update_count = await value.save_related(
                        follow=follow,
                        save_all=save_all,
                        relation_map=self._skip_ellipsis(  # type: ignore
                            relation_map, field.name, default_return={}
                        ),
                        update_count=update_count,
                        previous_model=self,
                        relation_field=field,
                    )
                else:
                    update_count = await value._upsert_model(
                        instance=value,
                        save_all=save_all,
                        previous_model=self,
                        relation_field=field,
                        update_count=update_count,
                    )
        return update_count

    def _get_field_values(self, name: str) -> List:
        """
        Extract field values and ensures it is a list.

        :param name: name of the field
        :type name: str
        :return: list of values
        :rtype: List
        """
        values = getattr(self, name) or []
        if not isinstance(values, list):
            values = [values]
        return values

dump_all_json_fields_to_str(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes all json fields into strings

Parameters:

Name Type Description Default
model_dict Dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict

dictionary of model that is about to be saved

Source code in ormar\models\mixins\save_mixin.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@classmethod
def dump_all_json_fields_to_str(cls, model_dict: Dict) -> Dict:
    """
    Receives dictionary of model that is about to be saved and changes
    all json fields into strings

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: Dict
    :return: dictionary of model that is about to be saved
    :rtype: Dict
    """
    for key, value in model_dict.items():
        if key in cls._json_fields:
            model_dict[key] = encode_json(value)
    return model_dict

parse_non_db_fields(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes uuid fields to strings in bulk_update.

Parameters:

Name Type Description Default
model_dict Dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict

dictionary of model that is about to be saved

Source code in ormar\models\mixins\save_mixin.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@classmethod
def parse_non_db_fields(cls, model_dict: Dict) -> Dict:
    """
    Receives dictionary of model that is about to be saved and changes uuid fields
    to strings in bulk_update.

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: Dict
    :return: dictionary of model that is about to be saved
    :rtype: Dict
    """
    for name, field in cls.Meta.model_fields.items():
        if field.__type__ == uuid.UUID and name in model_dict:
            parsers = {"string": lambda x: str(x), "hex": lambda x: "%.32x" % x.int}
            uuid_format = field.column_type.uuid_format
            parser: Callable[..., Any] = parsers.get(uuid_format, lambda x: x)
            model_dict[name] = parser(model_dict[name])
    return model_dict

populate_default_values(new_kwargs) classmethod

Receives dictionary of model that is about to be saved and populates the default value on the fields that have the default value set, but no actual value was passed by the user.

Parameters:

Name Type Description Default
new_kwargs Dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict

dictionary of model that is about to be saved

Source code in ormar\models\mixins\save_mixin.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@classmethod
def populate_default_values(cls, new_kwargs: Dict) -> Dict:
    """
    Receives dictionary of model that is about to be saved and populates the default
    value on the fields that have the default value set, but no actual value was
    passed by the user.

    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: Dict
    :return: dictionary of model that is about to be saved
    :rtype: Dict
    """
    for field_name, field in cls.Meta.model_fields.items():
        if (
            field_name not in new_kwargs
            and field.has_default(use_server=False)
            and not field.pydantic_only
        ):
            new_kwargs[field_name] = field.get_default()
        # clear fields with server_default set as None
        if (
            field.server_default is not None
            and new_kwargs.get(field_name, None) is None
        ):
            new_kwargs.pop(field_name, None)
    return new_kwargs

prepare_model_to_save(new_kwargs) classmethod

Combines all preparation methods before saving. Removes primary key for if it's nullable or autoincrement pk field, and it's set to None. Substitute related models with their primary key values as fk column. Populates the default values for field with default set and no value. Translate columns into aliases (db names).

Parameters:

Name Type Description Default
new_kwargs dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict[str, str]

dictionary of model that is about to be saved

Source code in ormar\models\mixins\save_mixin.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@classmethod
def prepare_model_to_save(cls, new_kwargs: dict) -> dict:
    """
    Combines all preparation methods before saving.
    Removes primary key for if it's nullable or autoincrement pk field,
    and it's set to None.
    Substitute related models with their primary key values as fk column.
    Populates the default values for field with default set and no value.
    Translate columns into aliases (db names).

    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: Dict[str, str]
    :return: dictionary of model that is about to be saved
    :rtype: Dict[str, str]
    """
    new_kwargs = cls._remove_pk_from_kwargs(new_kwargs)
    new_kwargs = cls._remove_not_ormar_fields(new_kwargs)
    new_kwargs = cls.substitute_models_with_pks(new_kwargs)
    new_kwargs = cls.populate_default_values(new_kwargs)
    new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
    new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
    return new_kwargs

prepare_model_to_update(new_kwargs) classmethod

Combines all preparation methods before updating.

Parameters:

Name Type Description Default
new_kwargs dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict[str, str]

dictionary of model that is about to be updated

Source code in ormar\models\mixins\save_mixin.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@classmethod
def prepare_model_to_update(cls, new_kwargs: dict) -> dict:
    """
    Combines all preparation methods before updating.
    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: Dict[str, str]
    :return: dictionary of model that is about to be updated
    :rtype: Dict[str, str]
    """
    new_kwargs = cls.parse_non_db_fields(new_kwargs)
    new_kwargs = cls.substitute_models_with_pks(new_kwargs)
    new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
    new_kwargs = cls.dump_all_json_fields_to_str(new_kwargs)
    new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
    new_kwargs = cls.translate_enum_columns(new_kwargs)
    return new_kwargs

reconvert_str_to_bytes(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes all bytes fields that are represented as strings back into bytes.

Parameters:

Name Type Description Default
model_dict Dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict

dictionary of model that is about to be saved

Source code in ormar\models\mixins\save_mixin.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@classmethod
def reconvert_str_to_bytes(cls, model_dict: Dict) -> Dict:
    """
    Receives dictionary of model that is about to be saved and changes
    all bytes fields that are represented as strings back into bytes.

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: Dict
    :return: dictionary of model that is about to be saved
    :rtype: Dict
    """
    bytes_base64_fields = {
        name
        for name, field in cls.Meta.model_fields.items()
        if field.represent_as_base64_str
    }
    for key, value in model_dict.items():
        if key in cls._bytes_fields and isinstance(value, str):
            model_dict[key] = (
                value.encode("utf-8")
                if key not in bytes_base64_fields
                else base64.b64decode(value)
            )
    return model_dict

substitute_models_with_pks(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes all related models that are stored as foreign keys to their fk value.

Parameters:

Name Type Description Default
model_dict Dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict

dictionary of model that is about to be saved

Source code in ormar\models\mixins\save_mixin.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@classmethod
def substitute_models_with_pks(cls, model_dict: Dict) -> Dict:  # noqa  CCR001
    """
    Receives dictionary of model that is about to be saved and changes all related
    models that are stored as foreign keys to their fk value.

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: Dict
    :return: dictionary of model that is about to be saved
    :rtype: Dict
    """
    for field in cls.extract_related_names():
        field_value = model_dict.get(field, None)
        if field_value is not None:
            target_field = cls.Meta.model_fields[field]
            target_pkname = target_field.to.Meta.pkname
            if isinstance(field_value, ormar.Model):  # pragma: no cover
                pk_value = getattr(field_value, target_pkname)
                if not pk_value:
                    raise ModelPersistenceError(
                        f"You cannot save {field_value.get_name()} "
                        f"model without pk set!"
                    )
                model_dict[field] = pk_value
            elif isinstance(field_value, (list, dict)) and field_value:
                if isinstance(field_value, list):
                    model_dict[field] = [
                        target.get(target_pkname) for target in field_value
                    ]
                else:
                    model_dict[field] = field_value.get(target_pkname)
            else:
                model_dict.pop(field, None)
    return model_dict

validate_choices(new_kwargs) classmethod

Receives dictionary of model that is about to be saved and validates the fields with choices set to see if the value is allowed.

Parameters:

Name Type Description Default
new_kwargs Dict

dictionary of model that is about to be saved

required

Returns:

Type Description
Dict

dictionary of model that is about to be saved

Source code in ormar\models\mixins\save_mixin.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
@classmethod
def validate_choices(cls, new_kwargs: Dict) -> Dict:
    """
    Receives dictionary of model that is about to be saved and validates the
    fields with choices set to see if the value is allowed.

    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: Dict
    :return: dictionary of model that is about to be saved
    :rtype: Dict
    """
    if not cls._choices_fields:
        return new_kwargs

    fields_to_check = [
        field
        for field in cls.Meta.model_fields.values()
        if field.name in cls._choices_fields and field.name in new_kwargs
    ]
    for field in fields_to_check:
        if new_kwargs[field.name] not in field.choices:
            raise ValueError(
                f"{field.name}: '{new_kwargs[field.name]}' "
                f"not in allowed choices set:"
                f" {field.choices}"
            )
    return new_kwargs