Skip to content

FastCRUD API Reference

FastCRUD is a base class for CRUD operations on a model, utilizing Pydantic schemas for data validation and serialization.

Class Definition

Bases: Generic[ModelType, CreateSchemaType, UpdateSchemaType, UpdateSchemaInternalType, DeleteSchemaType]

Base class for CRUD operations on a model.

This class provides a set of methods for create, read, update, and delete operations on a given SQLAlchemy model, utilizing Pydantic schemas for data validation and serialization.

Parameters:

Name Type Description Default
model type[ModelType]

The SQLAlchemy model type.

required
is_deleted_column str

Optional column name to use for indicating a soft delete. Defaults to "is_deleted".

'is_deleted'
deleted_at_column str

Optional column name to use for storing the timestamp of a soft delete. Defaults to "deleted_at".

'deleted_at'
updated_at_column str

Optional column name to use for storing the timestamp of an update. Defaults to "updated_at".

'updated_at'

Methods:

Name Description
create

Creates a new record in the database from the provided Pydantic schema.

select

Generates a SQL Alchemy Select statement with optional filtering and sorting.

get

Retrieves a single record based on filters. Supports advanced filtering through comparison operators like '__gt', '__lt', etc.

exists

Checks if a record exists based on the provided filters.

count

Counts the number of records matching the provided filters.

get_multi

Fetches multiple records with optional sorting, pagination, and model conversion.

get_joined

Performs a join operation with another model, supporting custom join conditions and selection of specific columns.

get_multi_joined

Fetches multiple records with a join on another model, offering pagination and sorting for the joined tables.

get_multi_by_cursor

Implements cursor-based pagination for fetching records, ideal for large datasets and infinite scrolling features.

update

Updates an existing record or multiple records based on specified filters.

db_delete

Hard deletes a record or multiple records from the database based on provided filters.

delete

Soft deletes a record if it has an "is_deleted" attribute; otherwise, performs a hard delete.

Examples:

Example 1: Basic Usage

Create a FastCRUD instance for a User model and perform basic CRUD operations.

# Assuming you have a User model (either SQLAlchemy or SQLModel)
# pydantic schemas for creation, update and deletion and an async session `db`
CRUDUser = FastCRUD[User, UserCreateInternal, UserUpdate, UserUpdateInternal, UserDelete]
user_crud = CRUDUser(User)

# If you don't care about typing, you can also just ignore the CRUDUser part
# Straight up define user_crud with FastCRUD
user_crud = FastCRUD(User)

# Create a new user
new_user = await user_crud.create(db, UserCreateSchema(name="Alice"))
# Read a user
user = await user_crud.get(db, id=new_user.id)
# Update a user
await user_crud.update(db, UserUpdateSchema(email="alice@example.com"), id=new_user.id)
# Delete a user
await user_crud.delete(db, id=new_user.id)

Example 2: Advanced Filtering and Pagination

Use advanced filtering, sorting, and pagination for fetching records.

product_crud = FastCRUD(Product)
products = await product_crud.get_multi(
    db,
    offset=0,
    limit=10,
    sort_columns=['price'],
    sort_orders=['asc'],
)

Example 3: Join Operations with Custom Schemas

Perform join operations between two models using custom schemas for selection.

order_crud = FastCRUD(Order)
orders = await order_crud.get_multi_joined(
    db,
    offset=0,
    limit=5,
    join_model=Product,
    join_prefix="product_",
    schema_to_select=OrderReadSchema,
    join_schema_to_select=ProductReadSchema,
)

Example 4: Cursor Pagination

Implement cursor-based pagination for efficient data retrieval in large datasets.

comment_crud = FastCRUD(Comment)

first_page = await comment_crud.get_multi_by_cursor(db, limit=10)
next_cursor = first_page['next_cursor']
second_page = await comment_crud.get_multi_by_cursor(db, cursor=next_cursor, limit=10)

Example 5: Dynamic Filtering and Counting

Dynamically filter records based on various criteria and count the results.

task_crud = FastCRUD(Task)
completed_tasks = await task_crud.get_multi(
    db,
    status='completed'
)
high_priority_task_count = await task_crud.count(
    db,
    priority='high'
)

Example 6: Using Custom Column Names for Soft Delete

If your model uses different column names for indicating a soft delete and its timestamp, you can specify these when creating the FastCRUD instance.

custom_user_crud = FastCRUD(
    User,
    is_deleted_column="archived",
    deleted_at_column="archived_at"
)
# Now 'archived' and 'archived_at' will be used for soft delete operations.

Source code in fastcrud/crud/fast_crud.py
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
class FastCRUD(
    Generic[
        ModelType,
        CreateSchemaType,
        UpdateSchemaType,
        UpdateSchemaInternalType,
        DeleteSchemaType,
    ]
):
    """
    Base class for CRUD operations on a model.

    This class provides a set of methods for create, read, update, and delete operations on a given SQLAlchemy model,
    utilizing Pydantic schemas for data validation and serialization.

    Args:
        model: The SQLAlchemy model type.
        is_deleted_column: Optional column name to use for indicating a soft delete. Defaults to "is_deleted".
        deleted_at_column: Optional column name to use for storing the timestamp of a soft delete. Defaults to "deleted_at".
        updated_at_column: Optional column name to use for storing the timestamp of an update. Defaults to "updated_at".

    Methods:
        create:
            Creates a new record in the database from the provided Pydantic schema.

        select:
            Generates a SQL Alchemy `Select` statement with optional filtering and sorting.

        get:
            Retrieves a single record based on filters. Supports advanced filtering through comparison operators like '__gt', '__lt', etc.

        exists:
            Checks if a record exists based on the provided filters.

        count:
            Counts the number of records matching the provided filters.

        get_multi:
            Fetches multiple records with optional sorting, pagination, and model conversion.

        get_joined:
            Performs a join operation with another model, supporting custom join conditions and selection of specific columns.

        get_multi_joined:
            Fetches multiple records with a join on another model, offering pagination and sorting for the joined tables.

        get_multi_by_cursor:
            Implements cursor-based pagination for fetching records, ideal for large datasets and infinite scrolling features.

        update:
            Updates an existing record or multiple records based on specified filters.

        db_delete:
            Hard deletes a record or multiple records from the database based on provided filters.

        delete:
            Soft deletes a record if it has an "is_deleted" attribute; otherwise, performs a hard delete.

    Examples:
        Example 1: Basic Usage
        ----------------------
        Create a FastCRUD instance for a User model and perform basic CRUD operations.
        ```python
        # Assuming you have a User model (either SQLAlchemy or SQLModel)
        # pydantic schemas for creation, update and deletion and an async session `db`
        CRUDUser = FastCRUD[User, UserCreateInternal, UserUpdate, UserUpdateInternal, UserDelete]
        user_crud = CRUDUser(User)

        # If you don't care about typing, you can also just ignore the CRUDUser part
        # Straight up define user_crud with FastCRUD
        user_crud = FastCRUD(User)

        # Create a new user
        new_user = await user_crud.create(db, UserCreateSchema(name="Alice"))
        # Read a user
        user = await user_crud.get(db, id=new_user.id)
        # Update a user
        await user_crud.update(db, UserUpdateSchema(email="alice@example.com"), id=new_user.id)
        # Delete a user
        await user_crud.delete(db, id=new_user.id)
        ```

        Example 2: Advanced Filtering and Pagination
        --------------------------------------------
        Use advanced filtering, sorting, and pagination for fetching records.
        ```python
        product_crud = FastCRUD(Product)
        products = await product_crud.get_multi(
            db,
            offset=0,
            limit=10,
            sort_columns=['price'],
            sort_orders=['asc'],
        )
        ```

        Example 3: Join Operations with Custom Schemas
        ----------------------------------------------
        Perform join operations between two models using custom schemas for selection.
        ```python
        order_crud = FastCRUD(Order)
        orders = await order_crud.get_multi_joined(
            db,
            offset=0,
            limit=5,
            join_model=Product,
            join_prefix="product_",
            schema_to_select=OrderReadSchema,
            join_schema_to_select=ProductReadSchema,
        )
        ```

        Example 4: Cursor Pagination
        ----------------------------
        Implement cursor-based pagination for efficient data retrieval in large datasets.
        ```python
        comment_crud = FastCRUD(Comment)

        first_page = await comment_crud.get_multi_by_cursor(db, limit=10)
        next_cursor = first_page['next_cursor']
        second_page = await comment_crud.get_multi_by_cursor(db, cursor=next_cursor, limit=10)
        ```

        Example 5: Dynamic Filtering and Counting
        -----------------------------------------
        Dynamically filter records based on various criteria and count the results.
        ```python
        task_crud = FastCRUD(Task)
        completed_tasks = await task_crud.get_multi(
            db,
            status='completed'
        )
        high_priority_task_count = await task_crud.count(
            db,
            priority='high'
        )
        ```

        Example 6: Using Custom Column Names for Soft Delete
        ----------------------------------------------------
        If your model uses different column names for indicating a soft delete and its timestamp, you can specify these when creating the FastCRUD instance.
        ```python
        custom_user_crud = FastCRUD(
            User,
            is_deleted_column="archived",
            deleted_at_column="archived_at"
        )
        # Now 'archived' and 'archived_at' will be used for soft delete operations.
        ```
    """

    def __init__(
        self,
        model: type[ModelType],
        is_deleted_column: str = "is_deleted",
        deleted_at_column: str = "deleted_at",
        updated_at_column: str = "updated_at",
    ) -> None:
        self.model = model
        self.model_col_names = [col.key for col in model.__table__.columns]
        self.is_deleted_column = is_deleted_column
        self.deleted_at_column = deleted_at_column
        self.updated_at_column = updated_at_column
        self._primary_keys = _get_primary_keys(self.model)

    def _parse_filters(
        self, model: Optional[Union[type[ModelType], AliasedClass]] = None, **kwargs
    ) -> list[BinaryExpression]:
        model = model or self.model
        filters = []
        for key, value in kwargs.items():
            if "__" in key:
                field_name, op = key.rsplit("__", 1)
                column = getattr(model, field_name, None)
                if column is None:
                    raise ValueError(f"Invalid filter column: {field_name}")

                if op == "gt":
                    filters.append(column > value)
                elif op == "lt":
                    filters.append(column < value)
                elif op == "gte":
                    filters.append(column >= value)
                elif op == "lte":
                    filters.append(column <= value)
                elif op == "ne":
                    filters.append(column != value)
                elif op == "in":
                    if not isinstance(value, (tuple, list, set)):
                        raise ValueError("in filter must be tuple, list or set")
                    filters.append(column.in_(value))
                elif op == "not_in":
                    if not isinstance(value, (tuple, list, set)):
                        raise ValueError("in filter must be tuple, list or set")
                    filters.append(column.not_in(value))
            else:
                column = getattr(model, key, None)
                if column is not None:
                    filters.append(column == value)

        return filters

    def _apply_sorting(
        self,
        stmt: Select,
        sort_columns: Union[str, list[str]],
        sort_orders: Optional[Union[str, list[str]]] = None,
    ) -> Select:
        """
        Apply sorting to a SQLAlchemy query based on specified column names and sort orders.

        Args:
            stmt: The SQLAlchemy Select statement to which sorting will be applied.
            sort_columns: A single column name or a list of column names on which to apply sorting.
            sort_orders: A single sort order ('asc' or 'desc') or a list of sort orders corresponding
                to the columns in sort_columns. If not provided, defaults to 'asc' for each column.

        Raises:
            ValueError: Raised if sort orders are provided without corresponding sort columns,
                or if an invalid sort order is provided (not 'asc' or 'desc').
            ArgumentError: Raised if an invalid column name is provided that does not exist in the model.

        Returns:
            The modified Select statement with sorting applied.

        Examples:
            Applying ascending sort on a single column:
            >>> stmt = _apply_sorting(stmt, 'name')

            Applying descending sort on a single column:
            >>> stmt = _apply_sorting(stmt, 'age', 'desc')

            Applying mixed sort orders on multiple columns:
            >>> stmt = _apply_sorting(stmt, ['name', 'age'], ['asc', 'desc'])

            Applying ascending sort on multiple columns:
            >>> stmt = _apply_sorting(stmt, ['name', 'age'])

        Note:
            This method modifies the passed Select statement in-place by applying the order_by clause
            based on the provided column names and sort orders.
        """
        if sort_orders and not sort_columns:
            raise ValueError("Sort orders provided without corresponding sort columns.")

        if sort_columns:
            if not isinstance(sort_columns, list):
                sort_columns = [sort_columns]

            if sort_orders:
                if not isinstance(sort_orders, list):
                    sort_orders = [sort_orders] * len(sort_columns)
                if len(sort_columns) != len(sort_orders):
                    raise ValueError(
                        "The length of sort_columns and sort_orders must match."
                    )

                for idx, order in enumerate(sort_orders):
                    if order not in ["asc", "desc"]:
                        raise ValueError(
                            f"Invalid sort order: {order}. Only 'asc' or 'desc' are allowed."
                        )

            validated_sort_orders = (
                ["asc"] * len(sort_columns) if not sort_orders else sort_orders
            )

            for idx, column_name in enumerate(sort_columns):
                column = getattr(self.model, column_name, None)
                if not column:
                    raise ArgumentError(f"Invalid column name: {column_name}")

                order = validated_sort_orders[idx]
                stmt = stmt.order_by(asc(column) if order == "asc" else desc(column))

        return stmt

    def _prepare_and_apply_joins(
        self,
        stmt: Select,
        joins_config: list[JoinConfig],
        use_temporary_prefix: bool = False,
    ):
        """
        Applies joins to the given SQL statement based on a list of JoinConfig objects.

        Args:
            stmt: The initial SQL statement.
            joins_config: Configurations for all joins.
            use_temporary_prefix: Whether to use or not an aditional prefix for joins. Default False.

        Returns:
            Select: The modified SQL statement with joins applied.
        """
        for join in joins_config:
            model = join.alias or join.model
            join_select = _extract_matching_columns_from_schema(
                model,
                join.schema_to_select,
                join.join_prefix,
                join.alias,
                use_temporary_prefix,
            )
            joined_model_filters = self._parse_filters(
                model=model, **(join.filters or {})
            )

            if join.join_type == "left":
                stmt = stmt.outerjoin(model, join.join_on).add_columns(*join_select)
            elif join.join_type == "inner":
                stmt = stmt.join(model, join.join_on).add_columns(*join_select)
            else:  # pragma: no cover
                raise ValueError(f"Unsupported join type: {join.join_type}.")
            if joined_model_filters:
                stmt = stmt.filter(*joined_model_filters)

        return stmt

    async def create(
        self, db: AsyncSession, object: CreateSchemaType, commit: bool = True
    ) -> ModelType:
        """
        Create a new record in the database.

        Args:
            db: The SQLAlchemy async session.
            object: The Pydantic schema containing the data to be saved.
            commit: If True, commits the transaction immediately. Default is True.

        Returns:
            The created database object.
        """
        object_dict = object.model_dump()
        db_object: ModelType = self.model(**object_dict)
        db.add(db_object)
        if commit:
            await db.commit()
        return db_object

    async def select(
        self,
        schema_to_select: Optional[type[BaseModel]] = None,
        sort_columns: Optional[Union[str, list[str]]] = None,
        sort_orders: Optional[Union[str, list[str]]] = None,
        **kwargs,
    ) -> Select:
        """
        Constructs a SQL Alchemy `Select` statement with optional column selection, filtering, and sorting.
        This method allows for advanced filtering through comparison operators, enabling queries to be refined beyond simple equality checks.
        Supported operators include:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            schema_to_select: Pydantic schema to determine which columns to include in the selection. If not provided, selects all columns of the model.
            sort_columns: A single column name or list of column names to sort the query results by. Must be used in conjunction with sort_orders.
            sort_orders: A single sort order ('asc' or 'desc') or a list of sort orders, corresponding to each column in sort_columns. If not specified, defaults to ascending order for all sort_columns.

        Returns:
            Selectable: An SQL Alchemy `Select` statement object that can be executed or further modified.

        Examples:
            Selecting specific columns with filtering and sorting:
            ```python
            stmt = await crud.select(
                schema_to_select=UserReadSchema,
                sort_columns=['age', 'name'],
                sort_orders=['asc', 'desc'],
                age__gt=18
            )
            ```

            Creating a statement to select all users without any filters:
            ```python
            stmt = await crud.select()
            ```

            Selecting users with a specific role, ordered by name:
            ```python
            stmt = await crud.select(
                schema_to_select=UserReadSchema,
                sort_columns='name',
                role='admin'
            )
            ```
        Note:
            This method does not execute the generated SQL statement.
            Use `db.execute(stmt)` to run the query and fetch results.
        """
        to_select = _extract_matching_columns_from_schema(
            model=self.model, schema=schema_to_select
        )
        filters = self._parse_filters(**kwargs)
        stmt = select(*to_select).filter(*filters)

        if sort_columns:
            stmt = self._apply_sorting(stmt, sort_columns, sort_orders)
        return stmt

    async def get(
        self,
        db: AsyncSession,
        schema_to_select: Optional[type[BaseModel]] = None,
        return_as_model: bool = False,
        one_or_none: bool = False,
        **kwargs: Any,
    ) -> Optional[Union[dict, BaseModel]]:
        """
        Fetches a single record based on specified filters.
        This method allows for advanced filtering through comparison operators, enabling queries to be refined beyond simple equality checks.
        Supported operators include:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The database session to use for the operation.
            schema_to_select: Optional Pydantic schema for selecting specific columns.
            return_as_model: If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.
            one_or_none: Flag to get strictly one or no result. Multiple results are not allowed.
            **kwargs: Filters to apply to the query, using field names for direct matches or appending comparison operators for advanced queries.

        Raises:
            ValueError: If return_as_model is True but schema_to_select is not provided.

        Returns:
            A dictionary or a Pydantic model instance of the fetched database row, or None if no match is found.

        Examples:
            Fetch a user by ID:
            ```python
            user = await crud.get(db, id=1)
            ```

            Fetch a user with an age greater than 30:
            ```python
            user = await crud.get(db, age__gt=30)
            ```

            Fetch a user with a registration date before Jan 1, 2020:
            ```python
            user = await crud.get(db, registration_date__lt=datetime(2020, 1, 1))
            ```

            Fetch a user not equal to a specific username:
            ```python
            user = await crud.get(db, username__ne='admin')
            ```
        """
        stmt = await self.select(schema_to_select=schema_to_select, **kwargs)

        db_row = await db.execute(stmt)
        result: Optional[Row] = db_row.one_or_none() if one_or_none else db_row.first()
        if result is None:
            return None
        out: dict = dict(result._mapping)
        if not return_as_model:
            return out
        if not schema_to_select:
            raise ValueError(
                "schema_to_select must be provided when return_as_model is True."
            )
        return schema_to_select(**out)

    def _get_pk_dict(self, instance):
        return {pk.name: getattr(instance, pk.name) for pk in self._primary_keys}

    async def upsert(
        self,
        db: AsyncSession,
        instance: Union[UpdateSchemaType, CreateSchemaType],
        schema_to_select: Optional[type[BaseModel]] = None,
        return_as_model: bool = False,
    ) -> Union[BaseModel, Dict[str, Any], None]:
        """Update the instance or create it if it doesn't exists.
        Note: This method will perform two transactions to the database (get and create or update).

        Args:
            db (AsyncSession): The database session to use for the operation.
            instance (Union[UpdateSchemaType, type[BaseModel]]): A Pydantic schema representing the instance.
            schema_to_select (Optional[type[BaseModel]], optional): Optional Pydantic schema for selecting specific columns. Defaults to None.
            return_as_model (bool, optional): If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.

        Returns:
            BaseModel: the created or updated instance
        """
        _pks = self._get_pk_dict(instance)
        schema_to_select = schema_to_select or type(instance)
        db_instance = await self.get(
            db,
            schema_to_select=schema_to_select,
            return_as_model=return_as_model,
            **_pks,
        )
        if db_instance is None:
            db_instance = await self.create(db, instance)  # type: ignore
            db_instance = schema_to_select.model_validate(
                db_instance, from_attributes=True
            )
        else:
            await self.update(db, instance)  # type: ignore
            db_instance = await self.get(
                db,
                schema_to_select=schema_to_select,
                return_as_model=return_as_model,
                **_pks,
            )

        return db_instance

    async def exists(self, db: AsyncSession, **kwargs: Any) -> bool:
        """
        Checks if any records exist that match the given filter conditions.
        This method supports advanced filtering with comparison operators:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The database session to use for the operation.
            **kwargs: Filters to apply to the query, supporting both direct matches and advanced comparison operators for refined search criteria.

        Returns:
            True if at least one record matches the filter conditions, False otherwise.

        Examples:
            Fetch a user by ID exists:
            ```python
            exists = await crud.exists(db, id=1)
            ```

            Check if any user is older than 30:
            ```python
            exists = await crud.exists(db, age__gt=30)
            ```

            Check if any user registered before Jan 1, 2020:
            ```python
            exists = await crud.exists(db, registration_date__lt=datetime(2020, 1, 1))
            ```

            Check if a username other than 'admin' exists:
            ```python
            exists = await crud.exists(db, username__ne='admin')
            ```
        """
        filters = self._parse_filters(**kwargs)
        stmt = select(self.model).filter(*filters).limit(1)

        result = await db.execute(stmt)
        return result.first() is not None

    async def count(
        self,
        db: AsyncSession,
        joins_config: Optional[list[JoinConfig]] = None,
        **kwargs: Any,
    ) -> int:
        """
        Counts records that match specified filters, supporting advanced filtering through comparison operators:
            '__gt' (greater than), '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to), '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).
        Can also count records based on a configuration of joins, useful for complex queries involving relationships.

        Args:
            db: The database session to use for the operation.
            joins_config: Optional configuration for applying joins in the count query.
            **kwargs: Filters to apply for the count, including field names for equality checks or with comparison operators for advanced queries.

        Returns:
            The total number of records matching the filter conditions.

        Examples:
            Count users by ID:
            ```python
            count = await crud.count(db, id=1)
            ```

            Count users older than 30:
            ```python
            count = await crud.count(db, age__gt=30)
            ```

            Count users with a username other than 'admin':
            ```python
            count = await crud.count(db, username__ne='admin')
            ```

            Count projects with at least one participant (many-to-many relationship):
            ```python
            joins_config = [
                JoinConfig(
                    model=ProjectsParticipantsAssociation,
                    join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                    join_type="inner"
                ),
                JoinConfig(
                    model=Participant,
                    join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                    join_type="inner"
                )
            ]
            count = await crud.count(db, joins_config=joins_config)
            ```

            Count projects by a specific participant name (filter applied on a joined model):
            ```python
            joins_config = [
                JoinConfig(
                    model=ProjectsParticipantsAssociation,
                    join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                    join_type="inner"
                ),
                JoinConfig(
                    model=Participant,
                    join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                    join_type="inner",
                    filters={'name': 'Jane Doe'}
                )
            ]
            count = await crud.count(db, joins_config=joins_config)
            ```
        """
        primary_filters = self._parse_filters(**kwargs)

        if joins_config is not None:
            primary_keys = [p.name for p in _get_primary_keys(self.model)]
            if not any(primary_keys):  # pragma: no cover
                raise ValueError(
                    f"The model '{self.model.__name__}' does not have a primary key defined, which is required for counting with joins."
                )
            to_select = [
                getattr(self.model, pk).label(f"distinct_{pk}") for pk in primary_keys
            ]
            base_query = select(*to_select)

            for join in joins_config:
                join_model = join.alias or join.model
                join_filters = (
                    self._parse_filters(model=join_model, **join.filters)
                    if join.filters
                    else []
                )

                if join.join_type == "inner":
                    base_query = base_query.join(join_model, join.join_on)
                else:
                    base_query = base_query.outerjoin(join_model, join.join_on)

                if join_filters:
                    base_query = base_query.where(*join_filters)

            if primary_filters:
                base_query = base_query.where(*primary_filters)

            subquery = base_query.subquery()
            count_query = select(func.count()).select_from(subquery)
        else:
            count_query = select(func.count()).select_from(self.model)
            if primary_filters:
                count_query = count_query.where(*primary_filters)

        total_count: Optional[int] = await db.scalar(count_query)
        if total_count is None:
            raise ValueError("Could not find the count.")

        return total_count

    async def get_multi(
        self,
        db: AsyncSession,
        offset: int = 0,
        limit: Optional[int] = 100,
        schema_to_select: Optional[type[BaseModel]] = None,
        sort_columns: Optional[Union[str, list[str]]] = None,
        sort_orders: Optional[Union[str, list[str]]] = None,
        return_as_model: bool = False,
        return_total_count: bool = True,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        Fetches multiple records based on filters, supporting sorting, pagination, and advanced filtering with comparison operators:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The database session to use for the operation.
            offset: Starting index for records to fetch, useful for pagination.
            limit: Maximum number of records to fetch in one call. Use `None` for "no limit", fetching all matching rows. Note that in order to use `limit=None`, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.
            schema_to_select: Optional Pydantic schema for selecting specific columns. Required if `return_as_model` is True.
            sort_columns: Column names to sort the results by.
            sort_orders: Corresponding sort orders ('asc', 'desc') for each column in sort_columns.
            return_as_model: If True, returns data as instances of the specified Pydantic model.
            return_total_count: If True, also returns the total count of rows with the selected filters. Useful for pagination.
            **kwargs: Filters to apply to the query, including advanced comparison operators for more detailed querying.

        Returns:
            A dictionary containing 'data' with fetched records and 'total_count' indicating the total number of records matching the filters.

        Raises:
            ValueError: If limit or offset is negative, or if schema_to_select is required but not provided or invalid.

        Examples:
            Fetch the first 10 users:
            ```python
            users = await crud.get_multi(db, 0, 10)
            ```

            Fetch next 10 users with sorted by username:
            ```python
            users = await crud.get_multi(db, 10, 10, sort_columns='username', sort_orders='desc')
            ```

            Fetch 10 users older than 30, sorted by age in descending order:
            ```python
            get_multi(db, offset=0, limit=10, age__gt=30, sort_columns='age', sort_orders='desc')
            ```

            Fetch 10 users with a registration date before Jan 1, 2020:
            ```python
            get_multi(db, offset=0, limit=10, registration_date__lt=datetime(2020, 1, 1))
            ```

            Fetch 10 users with a username other than 'admin', returning as model instances (ensure appropriate schema is passed):
            ```python
            get_multi(db, offset=0, limit=10, username__ne='admin', schema_to_select=UserSchema, return_as_model=True)
            ```

            Fetch users with filtering and multiple column sorting:
            ```python
            users = await crud.get_multi(db, 0, 10, is_active=True, sort_columns=['username', 'email'], sort_orders=['asc', 'desc'])
            ```
        """
        if (limit is not None and limit < 0) or offset < 0:
            raise ValueError("Limit and offset must be non-negative.")

        stmt = await self.select(
            schema_to_select=schema_to_select,
            sort_columns=sort_columns,
            sort_orders=sort_orders,
            **kwargs,
        )

        if offset:
            stmt = stmt.offset(offset)
        if limit is not None:
            stmt = stmt.limit(limit)

        result = await db.execute(stmt)
        data = [dict(row) for row in result.mappings()]

        response: dict[str, Any] = {"data": data}

        if return_total_count:
            total_count = await self.count(db=db, **kwargs)
            response["total_count"] = total_count

        if return_as_model:
            if not schema_to_select:
                raise ValueError(
                    "schema_to_select must be provided when return_as_model is True."
                )
            try:
                model_data = [schema_to_select(**row) for row in data]
                response["data"] = model_data
            except ValidationError as e:
                raise ValueError(
                    f"Data validation error for schema {schema_to_select.__name__}: {e}"
                )

        return response

    async def get_joined(
        self,
        db: AsyncSession,
        schema_to_select: Optional[type[BaseModel]] = None,
        join_model: Optional[type[DeclarativeBase]] = None,
        join_on: Optional[Union[Join, BinaryExpression]] = None,
        join_prefix: Optional[str] = None,
        join_schema_to_select: Optional[type[BaseModel]] = None,
        join_type: str = "left",
        alias: Optional[AliasedClass] = None,
        join_filters: Optional[dict] = None,
        joins_config: Optional[list[JoinConfig]] = None,
        nest_joins: bool = False,
        relationship_type: Optional[str] = None,
        **kwargs: Any,
    ) -> Optional[dict[str, Any]]:
        """
        Fetches a single record with one or multiple joins on other models. If 'join_on' is not provided, the method attempts
        to automatically detect the join condition using foreign key relationships. For multiple joins, use 'joins_config' to
        specify each join configuration. Advanced filters supported:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The SQLAlchemy async session.
            schema_to_select: Pydantic schema for selecting specific columns from the primary model. Required if `return_as_model` is True.
            join_model: The model to join with.
            join_on: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.
            join_prefix: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.
            join_schema_to_select: Pydantic schema for selecting specific columns from the joined model.
            join_type: Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.
            alias: An instance of `AliasedClass` for the join model, useful for self-joins or multiple joins on the same model. Result of `aliased(join_model)`.
            join_filters: Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.
            joins_config: A list of JoinConfig instances, each specifying a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and the type of join. This parameter enables support for multiple joins.
            nest_joins: If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.
            relationship_type: Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.
            **kwargs: Filters to apply to the primary model query, supporting advanced comparison operators for refined searching.

        Returns:
            A dictionary representing the joined record, or None if no record matches the criteria.

        Raises:
            ValueError: If both single join parameters and 'joins_config' are used simultaneously.
            ArgumentError: If any provided model in 'joins_config' is not recognized or invalid.
            NoResultFound: If no record matches the criteria with the provided filters.

        Examples:
            Simple example: Joining User and Tier models without explicitly providing join_on
            ```python
            result = await crud_user.get_joined(
                db=session,
                join_model=Tier,
                schema_to_select=UserSchema,
                join_schema_to_select=TierSchema
            )
            ```

            Fetch a user and their associated tier, filtering by user ID:
            ```python
            get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, id=1)
            ```

            Fetch a user and their associated tier, where the user's age is greater than 30:
            ```python
            get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, age__gt=30)
            ```

            Fetch a user and their associated tier, excluding users with the 'admin' username:
            ```python
            get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, username__ne='admin')
            ```

            Complex example: Joining with a custom join condition, additional filter parameters, and a prefix
            ```python
            from sqlalchemy import and_
            result = await crud_user.get_joined(
                db=session,
                join_model=Tier,
                join_prefix="tier_",
                join_on=and_(User.tier_id == Tier.id, User.is_superuser == True),
                schema_to_select=UserSchema,
                join_schema_to_select=TierSchema,
                username="john_doe"
            )
            ```

            Example of using 'joins_config' for multiple joins:
            ```python
            from fastcrud import JoinConfig

            result = await crud_user.get_joined(
                db=session,
                schema_to_select=UserSchema,
                joins_config=[
                    JoinConfig(
                        model=Tier,
                        join_on=User.tier_id == Tier.id,
                        join_prefix="tier_",
                        schema_to_select=TierSchema,
                        join_type="left",
                    ),
                    JoinConfig(
                        model=Department,
                        join_on=User.department_id == Department.id,
                        join_prefix="dept_",
                        schema_to_select=DepartmentSchema,
                        join_type="inner",
                    )
                ]
            )
            ```

            Using `alias` for joining the same model multiple times:
            ```python
            from fastcrud import aliased

            owner_alias = aliased(ModelTest, name="owner")
            user_alias = aliased(ModelTest, name="user")

            result = await crud.get_joined(
                db=session,
                schema_to_select=BookingSchema,
                joins_config=[
                    JoinConfig(
                        model=ModelTest,
                        join_on=BookingModel.owner_id == owner_alias.id,
                        join_prefix="owner_",
                        alias=owner_alias,
                        schema_to_select=UserSchema
                    ),
                    JoinConfig(
                        model=ModelTest,
                        join_on=BookingModel.user_id == user_alias.id,
                        join_prefix="user_",
                        alias=user_alias,
                        schema_to_select=UserSchema
                    )
                ],
                id=1
            )
            ```

            Fetching a single project and its associated participants where a participant has a specific role:
            ```python
            joins_config = [
                JoinConfig(
                    model=ProjectsParticipantsAssociation,
                    join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                    join_type="inner"
                ),
                JoinConfig(
                    model=Participant,
                    join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                    join_type="inner",
                    filters={'role': 'Designer'}
                )
            ]
            project = await crud.get_joined(
                db=session,
                schema_to_select=ProjectSchema,
                joins_config=joins_config
            )
            ```

            Example of using 'joins_config' for multiple joins with nested joins enabled:
            ```python
            from fastcrud import JoinConfig

            result = await crud_user.get_joined(
                db=session,
                schema_to_select=UserSchema,
                joins_config=[
                    JoinConfig(
                        model=Tier,
                        join_on=User.tier_id == Tier.id,
                        join_prefix="tier_",
                        schema_to_select=TierSchema,
                        join_type="left",
                    ),
                    JoinConfig(
                        model=Department,
                        join_on=User.department_id == Department.id,
                        join_prefix="dept_",
                        schema_to_select=DepartmentSchema,
                        join_type="inner",
                    )
                ],
                nest_joins=True
            )
            # Expect 'result' to have 'tier' and 'dept' as nested dictionaries
            ```

            Example using one-to-one relationship:
            ```python
            result = await crud_user.get_joined(
                db=session,
                join_model=Profile,
                join_on=User.profile_id == Profile.id,
                schema_to_select=UserSchema,
                join_schema_to_select=ProfileSchema,
                relationship_type='one-to-one' # note that this is the default behavior
            )
            # Expect 'result' to have 'profile' as a nested dictionary
            ```

            Example using one-to-many relationship:
            ```python
            result = await crud_user.get_joined(
                db=session,
                join_model=Post,
                join_on=User.id == Post.user_id,
                schema_to_select=UserSchema,
                join_schema_to_select=PostSchema,
                relationship_type='one-to-many',
                nest_joins=True
            )
            # Expect 'result' to have 'posts' as a nested list of dictionaries
        """
        if joins_config and (
            join_model or join_prefix or join_on or join_schema_to_select or alias
        ):
            raise ValueError(
                "Cannot use both single join parameters and joins_config simultaneously."
            )
        elif not joins_config and not join_model:
            raise ValueError("You need one of join_model or joins_config.")

        primary_select = _extract_matching_columns_from_schema(
            model=self.model,
            schema=schema_to_select,
        )
        stmt: Select = select(*primary_select).select_from(self.model)

        join_definitions = joins_config if joins_config else []
        if join_model:
            join_definitions.append(
                JoinConfig(
                    model=join_model,
                    join_on=join_on,
                    join_prefix=join_prefix,
                    schema_to_select=join_schema_to_select,
                    join_type=join_type,
                    alias=alias,
                    filters=join_filters,
                    relationship_type=relationship_type,
                )
            )

        stmt = self._prepare_and_apply_joins(
            stmt=stmt, joins_config=join_definitions, use_temporary_prefix=nest_joins
        )
        primary_filters = self._parse_filters(**kwargs)
        if primary_filters:
            stmt = stmt.filter(*primary_filters)

        db_rows = await db.execute(stmt)
        if any(join.relationship_type == "one-to-many" for join in join_definitions):
            if nest_joins is False:  # pragma: no cover
                raise ValueError(
                    "Cannot use one-to-many relationship with nest_joins=False"
                )
            results = db_rows.fetchall()
            data_list = [dict(row._mapping) for row in results]
        else:
            result = db_rows.first()
            if result is not None:
                data_list = [dict(result._mapping)]
            else:
                data_list = []

        if data_list:
            if nest_joins:
                nested_data: dict = {}
                for data in data_list:
                    nested_data = _nest_join_data(
                        data,
                        join_definitions,
                        nested_data=nested_data,
                    )
                return nested_data
            return data_list[0]

        return None

    async def get_multi_joined(
        self,
        db: AsyncSession,
        schema_to_select: Optional[type[BaseModel]] = None,
        join_model: Optional[type[ModelType]] = None,
        join_on: Optional[Any] = None,
        join_prefix: Optional[str] = None,
        join_schema_to_select: Optional[type[BaseModel]] = None,
        join_type: str = "left",
        alias: Optional[AliasedClass[Any]] = None,
        join_filters: Optional[dict] = None,
        nest_joins: bool = False,
        offset: int = 0,
        limit: Optional[int] = 100,
        sort_columns: Optional[Union[str, list[str]]] = None,
        sort_orders: Optional[Union[str, list[str]]] = None,
        return_as_model: bool = False,
        joins_config: Optional[list[JoinConfig]] = None,
        return_total_count: bool = True,
        relationship_type: Optional[str] = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        Fetch multiple records with a join on another model, allowing for pagination, optional sorting, and model conversion,
        supporting advanced filtering with comparison operators:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The SQLAlchemy async session.
            schema_to_select: Pydantic schema for selecting specific columns from the primary model. Required if `return_as_model` is True.
            join_model: The model to join with.
            join_on: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.
            join_prefix: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.
            join_schema_to_select: Pydantic schema for selecting specific columns from the joined model.
            join_type: Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.
            alias: An instance of `AliasedClass` for the join model, useful for self-joins or multiple joins on the same model. Result of `aliased(join_model)`.
            join_filters: Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.
            nest_joins: If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.
            offset: The offset (number of records to skip) for pagination.
            limit: Maximum number of records to fetch in one call. Use `None` for "no limit", fetching all matching rows. Note that in order to use `limit=None`, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.
            sort_columns: A single column name or a list of column names on which to apply sorting.
            sort_orders: A single sort order ('asc' or 'desc') or a list of sort orders corresponding to the columns in sort_columns. If not provided, defaults to 'asc' for each column.
            return_as_model: If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.
            joins_config: List of JoinConfig instances for specifying multiple joins. Each instance defines a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and join type.
            return_total_count: If True, also returns the total count of rows with the selected filters. Useful for pagination.
            relationship_type: Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.
            **kwargs: Filters to apply to the primary query, including advanced comparison operators for refined searching.

        Returns:
            A dictionary containing the fetched rows under 'data' key and total count under 'total_count'.

        Raises:
            ValueError: If limit or offset is negative, or if schema_to_select is required but not provided or invalid.
                        Also if both 'joins_config' and any of the single join parameters are provided or none of 'joins_config' and 'join_model' is provided.

        Examples:
            Fetching multiple User records joined with Tier records, using left join, returning raw data:
            ```python
            users = await crud_user.get_multi_joined(
                db=session,
                join_model=Tier,
                join_prefix="tier_",
                schema_to_select=UserSchema,
                join_schema_to_select=TierSchema,
                offset=0,
                limit=10
            )
            ```

            Fetch users joined with their tiers, sorted by username, where user's age is greater than 30:
            ```python
            users = get_multi_joined(
                db,
                User,
                Tier,
                schema_to_select=UserSchema,
                join_schema_to_select=TierSchema,
                age__gt=30,
                sort_columns='username',
                sort_orders='asc'
            )
            ```

            Fetch users joined with their tiers, excluding users with 'admin' username, returning as model instances:
            ```python
            users = get_multi_joined(
                db,
                User,
                Tier,
                schema_to_select=UserSchema,
                join_schema_to_select=TierSchema,
                username__ne='admin',
                return_as_model=True
            )
            ```

            Fetching and sorting by username in descending order, returning as Pydantic model:
            ```python
            users = await crud_user.get_multi_joined(
                db=session,
                join_model=Tier,
                join_prefix="tier_",
                schema_to_select=UserSchema,
                join_schema_to_select=TierSchema,
                offset=0,
                limit=10,
                sort_columns=['username'],
                sort_orders=['desc'],
                return_as_model=True
            )
            ```

            Fetching with complex conditions and custom join, returning as Pydantic model:
            ```python
            users = await crud_user.get_multi_joined(
                db=session,
                join_model=Tier,
                join_prefix="tier_",
                join_on=User.tier_id == Tier.id,
                schema_to_select=UserSchema,
                join_schema_to_select=TierSchema,
                offset=0,
                limit=10,
                is_active=True,
                return_as_model=True
            )
            ```

            Example using 'joins_config' for multiple joins:
            ```python
            from fastcrud import JoinConfig

            users = await crud_user.get_multi_joined(
                db=session,
                schema_to_select=UserSchema,
                joins_config=[
                    JoinConfig(
                        model=Tier,
                        join_on=User.tier_id == Tier.id,
                        join_prefix="tier_",
                        schema_to_select=TierSchema,
                        join_type="left",
                    ),
                    JoinConfig(
                        model=Department,
                        join_on=User.department_id == Department.id,
                        join_prefix="dept_",
                        schema_to_select=DepartmentSchema,
                        join_type="inner",
                    )
                ],
                offset=0,
                limit=10,
                sort_columns='username',
                sort_orders='asc'
            )
            ```

            Example using `alias` for multiple joins, with pagination, sorting, and model conversion:
            ```python
            from fastcrud import JoinConfig, FastCRUD, aliased

            # Aliasing for self-joins or multiple joins on the same table
            owner_alias = aliased(ModelTest, name="owner")
            user_alias = aliased(ModelTest, name="user")

            # Initialize your FastCRUD instance for BookingModel
            crud = FastCRUD(BookingModel)

            result = await crud.get_multi_joined(
                db=session,
                schema_to_select=BookingSchema,  # Primary model schema
                joins_config=[
                    JoinConfig(
                        model=ModelTest,
                        join_on=BookingModel.owner_id == owner_alias.id,
                        join_prefix="owner_",
                        alias=owner_alias,
                        schema_to_select=UserSchema  # Schema for the joined model
                    ),
                    JoinConfig(
                        model=ModelTest,
                        join_on=BookingModel.user_id == user_alias.id,
                        join_prefix="user_",
                        alias=user_alias,
                        schema_to_select=UserSchema
                    )
                ],
                offset=10,  # Skip the first 10 records
                limit=5,  # Fetch up to 5 records
                sort_columns=['booking_date'],  # Sort by booking_date
                sort_orders=['desc']  # In descending order
            )
            ```

            Fetching multiple project records and their associated participants where participants have a specific role:
            ```python
            joins_config = [
                JoinConfig(
                    model=ProjectsParticipantsAssociation,
                    join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                    join_type="inner"
                ),
                JoinConfig(
                    model=Participant,
                    join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                    join_type="inner",
                    filters={'role': 'Developer'}
                )
            ]
            projects = await crud.get_multi_joined(
                db=session,
                schema_to_select=ProjectSchema,
                joins_config=joins_config,
                limit=10
            )
            ```

            Fetching a list of projects, each with nested details of associated tasks and task creators, using nested joins:
            ```python
            projects = await crud.get_multi_joined(
                db=session,
                schema_to_select=ProjectSchema,
                joins_config=[
                    JoinConfig(
                        model=Task,
                        join_on=Project.id == Task.project_id,
                        join_prefix="task_",
                        schema_to_select=TaskSchema,
                        join_type="left",
                    ),
                    JoinConfig(
                        model=User,
                        join_on=Task.creator_id == User.id,
                        join_prefix="creator_",
                        schema_to_select=UserSchema,
                        join_type="left",
                        alias=aliased(User, name="task_creator")
                    )
                ],
                nest_joins=True,
                offset=0,
                limit=5,
                sort_columns='project_name',
                sort_orders='asc'
            )
        ```

        Example using one-to-one relationship:
        ```python
        users = await crud_user.get_multi_joined(
            db=session,
            join_model=Profile,
            join_on=User.profile_id == Profile.id,
            schema_to_select=UserSchema,
            join_schema_to_select=ProfileSchema,
            relationship_type='one-to-one', # note that this is the default behavior
            offset=0,
            limit=10
        )
        # Expect 'profile' to be nested as a dictionary under each user
        ```

        Example using one-to-many relationship:
        ```python
        users = await crud_user.get_multi_joined(
            db=session,
            join_model=Post,
            join_on=User.id == Post.user_id,
            schema_to_select=UserSchema,
            join_schema_to_select=PostSchema,
            relationship_type='one-to-many',
            nest_joins=True,
            offset=0,
            limit=10
        )
        # Expect 'posts' to be nested as a list of dictionaries under each user
        ```
        """
        if joins_config and (
            join_model
            or join_prefix
            or join_on
            or join_schema_to_select
            or alias
            or relationship_type
        ):
            raise ValueError(
                "Cannot use both single join parameters and joins_config simultaneously."
            )
        elif not joins_config and not join_model:
            raise ValueError("You need one of join_model or joins_config.")

        if (limit is not None and limit < 0) or offset < 0:
            raise ValueError("Limit and offset must be non-negative.")

        primary_select = _extract_matching_columns_from_schema(
            model=self.model, schema=schema_to_select
        )
        stmt: Select = select(*primary_select)

        join_definitions = joins_config if joins_config else []
        if join_model:
            join_definitions.append(
                JoinConfig(
                    model=join_model,
                    join_on=join_on
                    or _auto_detect_join_condition(self.model, join_model),
                    join_prefix=join_prefix,
                    schema_to_select=join_schema_to_select,
                    join_type=join_type,
                    alias=alias,
                    filters=join_filters,
                    relationship_type=relationship_type,
                )
            )

        stmt = self._prepare_and_apply_joins(
            stmt=stmt, joins_config=join_definitions, use_temporary_prefix=nest_joins
        )

        primary_filters = self._parse_filters(**kwargs)
        if primary_filters:
            stmt = stmt.filter(*primary_filters)

        if sort_columns:
            stmt = self._apply_sorting(stmt, sort_columns, sort_orders)

        if offset:
            stmt = stmt.offset(offset)
        if limit is not None:
            stmt = stmt.limit(limit)

        result = await db.execute(stmt)
        data: list[Union[dict, BaseModel]] = []

        for row in result.mappings().all():
            row_dict = dict(row)

            if nest_joins:
                row_dict = _nest_join_data(
                    data=row_dict,
                    join_definitions=join_definitions,
                )

            if return_as_model:
                if schema_to_select is None:
                    raise ValueError(
                        "schema_to_select must be provided when return_as_model is True."
                    )
                try:
                    model_instance = schema_to_select(**row_dict)
                    data.append(model_instance)
                except ValidationError as e:
                    raise ValueError(
                        f"Data validation error for schema {schema_to_select.__name__}: {e}"
                    )
            else:
                data.append(row_dict)

        if nest_joins and any(
            join.relationship_type == "one-to-many" for join in join_definitions
        ):
            nested_data = _nest_multi_join_data(
                base_primary_key=self._primary_keys[0].name,
                data=data,
                joins_config=join_definitions,
                return_as_model=return_as_model,
                schema_to_select=schema_to_select if return_as_model else None,
                nested_schema_to_select={
                    (
                        join.join_prefix.rstrip("_")
                        if join.join_prefix
                        else join.model.__name__
                    ): join.schema_to_select
                    for join in join_definitions
                    if join.schema_to_select
                },
            )
        else:
            nested_data = data

        response: dict[str, Any] = {"data": nested_data}

        if return_total_count:
            total_count: int = await self.count(
                db=db, joins_config=joins_config, **kwargs
            )
            response["total_count"] = total_count

        return response

    async def get_multi_by_cursor(
        self,
        db: AsyncSession,
        cursor: Any = None,
        limit: int = 100,
        schema_to_select: Optional[type[BaseModel]] = None,
        sort_column: str = "id",
        sort_order: str = "asc",
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        Implements cursor-based pagination for fetching records. This method is designed for efficient data retrieval in large datasets and is ideal for features like infinite scrolling.
        It supports advanced filtering with comparison operators:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The SQLAlchemy async session.
            cursor: The cursor value to start fetching records from. Defaults to None.
            limit: Maximum number of rows to fetch.
            schema_to_select: Pydantic schema for selecting specific columns.
            sort_column: Column name to use for sorting and cursor pagination.
            sort_order: Sorting direction, either 'asc' or 'desc'.
            **kwargs: Filters to apply to the query, including advanced comparison operators for detailed querying.

        Returns:
            A dictionary containing the fetched rows under 'data' key and the next cursor value under 'next_cursor'.

        Examples:
            Fetch the first set of records (e.g., the first page in an infinite scrolling scenario)
            ```python
            first_page = await crud.get_multi_by_cursor(db, limit=10, sort_column='created_at', sort_order='desc')

            Fetch the next set of records using the cursor from the first page
            next_cursor = first_page['next_cursor']
            second_page = await crud.get_multi_by_cursor(db, cursor=next_cursor, limit=10, sort_column='created_at', sort_order='desc')
            ```

            Fetch records with age greater than 30 using cursor-based pagination:
            ```python
            get_multi_by_cursor(db, limit=10, sort_column='age', sort_order='asc', age__gt=30)
            ```

            Fetch records excluding a specific username using cursor-based pagination:
            ```python
            get_multi_by_cursor(db, limit=10, sort_column='username', sort_order='asc', username__ne='admin')
            ```

        Note:
            This method is designed for efficient pagination in large datasets and is ideal for infinite scrolling features.
            Make sure the column used for cursor pagination is indexed for performance.
            This method assumes that your records can be ordered by a unique, sequential field (like `id` or `created_at`).
        """
        if limit == 0:
            return {"data": [], "next_cursor": None}

        stmt = await self.select(
            schema_to_select=schema_to_select,
            **kwargs,
        )

        if cursor:
            if sort_order == "asc":
                stmt = stmt.filter(getattr(self.model, sort_column) > cursor)
            else:
                stmt = stmt.filter(getattr(self.model, sort_column) < cursor)

        stmt = stmt.order_by(
            asc(getattr(self.model, sort_column))
            if sort_order == "asc"
            else desc(getattr(self.model, sort_column))
        )
        stmt = stmt.limit(limit)

        result = await db.execute(stmt)
        data = [dict(row) for row in result.mappings()]

        next_cursor = None
        if len(data) == limit:
            if sort_order == "asc":
                next_cursor = data[-1][sort_column]
            else:
                data[0][sort_column]

        return {"data": data, "next_cursor": next_cursor}

    async def update(
        self,
        db: AsyncSession,
        object: Union[UpdateSchemaType, dict[str, Any]],
        allow_multiple: bool = False,
        commit: bool = True,
        **kwargs: Any,
    ) -> None:
        """
        Updates an existing record or multiple records in the database based on specified filters. This method allows for precise targeting of records to update.
        It supports advanced filtering through comparison operators:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The database session to use for the operation.
            object: A Pydantic schema or dictionary containing the update data.
            allow_multiple: If True, allows updating multiple records that match the filters. If False, raises an error if more than one record matches the filters.
            commit: If True, commits the transaction immediately. Default is True.
            **kwargs: Filters to identify the record(s) to update, supporting advanced comparison operators for refined querying.

        Returns:
            None

        Raises:
            MultipleResultsFound: If `allow_multiple` is False and more than one record matches the filters.
            ValueError: If extra fields not present in the model are provided in the update data.

        Examples:
            Update a user's email based on their ID:
            ```python
            update(db, {'email': 'new_email@example.com'}, id=1)
            ```

            Update users' statuses to 'inactive' where age is greater than 30 and allow updates to multiple records:
            ```python
            update(db, {'status': 'inactive'}, allow_multiple=True, age__gt=30)
            ```

            Update a user's username excluding specific user ID and prevent multiple updates:
            ```python
            update(db, {'username': 'new_username'}, id__ne=1, allow_multiple=False)
            ```
        """
        if not allow_multiple and (total_count := await self.count(db, **kwargs)) > 1:
            raise MultipleResultsFound(
                f"Expected exactly one record to update, found {total_count}."
            )

        if isinstance(object, dict):
            update_data = object
        else:
            update_data = object.model_dump(exclude_unset=True)

        updated_at_col = getattr(self.model, self.updated_at_column, None)
        if updated_at_col:
            update_data[self.updated_at_column] = datetime.now(timezone.utc)

        update_data_keys = set(update_data.keys())
        model_columns = {column.name for column in inspect(self.model).c}
        extra_fields = update_data_keys - model_columns
        if extra_fields:
            raise ValueError(f"Extra fields provided: {extra_fields}")

        filters = self._parse_filters(**kwargs)
        stmt = update(self.model).filter(*filters).values(update_data)

        await db.execute(stmt)
        if commit:
            await db.commit()

    async def db_delete(
        self,
        db: AsyncSession,
        allow_multiple: bool = False,
        commit: bool = True,
        **kwargs: Any,
    ) -> None:
        """
        Deletes a record or multiple records from the database based on specified filters, with support for advanced filtering through comparison operators:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The database session to use for the operation.
            allow_multiple: If True, allows deleting multiple records that match the filters. If False, raises an error if more than one record matches the filters.
            commit: If True, commits the transaction immediately. Default is True.
            **kwargs: Filters to identify the record(s) to delete, including advanced comparison operators for detailed querying.

        Returns:
            None

        Raises:
            MultipleResultsFound: If `allow_multiple` is False and more than one record matches the filters.

        Examples:
            Delete a user based on their ID:
            ```python
            db_delete(db, id=1)
            ```

            Delete users older than 30 years and allow deletion of multiple records:
            ```python
            db_delete(db, allow_multiple=True, age__gt=30)
            ```

            Delete a user with a specific username, ensuring only one record is deleted:
            ```python
            db_delete(db, username='unique_username', allow_multiple=False)
            ```
        """
        if not allow_multiple and (total_count := await self.count(db, **kwargs)) > 1:
            raise MultipleResultsFound(
                f"Expected exactly one record to delete, found {total_count}."
            )

        filters = self._parse_filters(**kwargs)
        stmt = delete(self.model).filter(*filters)
        await db.execute(stmt)
        if commit:
            await db.commit()

    async def delete(
        self,
        db: AsyncSession,
        db_row: Optional[Row] = None,
        allow_multiple: bool = False,
        commit: bool = True,
        **kwargs: Any,
    ) -> None:
        """
        Soft deletes a record or optionally multiple records if it has an "is_deleted" attribute, otherwise performs a hard delete, based on specified filters.
        Supports advanced filtering through comparison operators:
            '__gt' (greater than),
            '__lt' (less than),
            '__gte' (greater than or equal to),
            '__lte' (less than or equal to),
            '__ne' (not equal),
            '__in' (included in [tuple, list or set]),
            '__not_in' (not included in [tuple, list or set]).

        Args:
            db: The database session to use for the operation.
            db_row: Optional existing database row to delete. If provided, the method will attempt to delete this specific row, ignoring other filters.
            allow_multiple: If True, allows deleting multiple records that match the filters. If False, raises an error if more than one record matches the filters.
            commit: If True, commits the transaction immediately. Default is True.
            **kwargs: Filters to identify the record(s) to delete, supporting advanced comparison operators for refined querying.

        Raises:
            MultipleResultsFound: If `allow_multiple` is False and more than one record matches the filters.
            NoResultFound: If no record matches the filters.

        Returns:
            None

        Examples:
            Soft delete a specific user by ID:
            ```python
            delete(db, id=1)
            ```

            Hard delete users with account creation dates before 2020, allowing deletion of multiple records:
            ```python
            delete(db, allow_multiple=True, creation_date__lt=datetime(2020, 1, 1))
            ```

            Soft delete a user with a specific email, ensuring only one record is deleted:
            ```python
            delete(db, email='unique@example.com', allow_multiple=False)
            ```
        """
        filters = self._parse_filters(**kwargs)
        if db_row:
            if hasattr(db_row, self.is_deleted_column) and hasattr(
                db_row, self.deleted_at_column
            ):
                setattr(db_row, self.is_deleted_column, True)
                setattr(db_row, self.deleted_at_column, datetime.now(timezone.utc))
                if commit:
                    await db.commit()
            else:
                await db.delete(db_row)
            if commit:
                await db.commit()
            return

        total_count = await self.count(db, **kwargs)
        if total_count == 0:
            raise NoResultFound("No record found to delete.")
        if not allow_multiple and total_count > 1:
            raise MultipleResultsFound(
                f"Expected exactly one record to delete, found {total_count}."
            )

        if self.is_deleted_column in self.model_col_names:
            update_stmt = (
                update(self.model)
                .filter(*filters)
                .values(is_deleted=True, deleted_at=datetime.now(timezone.utc))
            )
            await db.execute(update_stmt)
        else:
            delete_stmt = delete(self.model).filter(*filters)
            await db.execute(delete_stmt)

        if commit:
            await db.commit()

count(db, joins_config=None, **kwargs) async

Counts records that match specified filters, supporting advanced filtering through comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]). Can also count records based on a configuration of joins, useful for complex queries involving relationships.

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
joins_config Optional[list[JoinConfig]]

Optional configuration for applying joins in the count query.

None
**kwargs Any

Filters to apply for the count, including field names for equality checks or with comparison operators for advanced queries.

{}

Returns:

Type Description
int

The total number of records matching the filter conditions.

Examples:

Count users by ID:

count = await crud.count(db, id=1)

Count users older than 30:

count = await crud.count(db, age__gt=30)

Count users with a username other than 'admin':

count = await crud.count(db, username__ne='admin')

Count projects with at least one participant (many-to-many relationship):

joins_config = [
    JoinConfig(
        model=ProjectsParticipantsAssociation,
        join_on=Project.id == ProjectsParticipantsAssociation.project_id,
        join_type="inner"
    ),
    JoinConfig(
        model=Participant,
        join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
        join_type="inner"
    )
]
count = await crud.count(db, joins_config=joins_config)

Count projects by a specific participant name (filter applied on a joined model):

joins_config = [
    JoinConfig(
        model=ProjectsParticipantsAssociation,
        join_on=Project.id == ProjectsParticipantsAssociation.project_id,
        join_type="inner"
    ),
    JoinConfig(
        model=Participant,
        join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
        join_type="inner",
        filters={'name': 'Jane Doe'}
    )
]
count = await crud.count(db, joins_config=joins_config)

Source code in fastcrud/crud/fast_crud.py
async def count(
    self,
    db: AsyncSession,
    joins_config: Optional[list[JoinConfig]] = None,
    **kwargs: Any,
) -> int:
    """
    Counts records that match specified filters, supporting advanced filtering through comparison operators:
        '__gt' (greater than), '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to), '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).
    Can also count records based on a configuration of joins, useful for complex queries involving relationships.

    Args:
        db: The database session to use for the operation.
        joins_config: Optional configuration for applying joins in the count query.
        **kwargs: Filters to apply for the count, including field names for equality checks or with comparison operators for advanced queries.

    Returns:
        The total number of records matching the filter conditions.

    Examples:
        Count users by ID:
        ```python
        count = await crud.count(db, id=1)
        ```

        Count users older than 30:
        ```python
        count = await crud.count(db, age__gt=30)
        ```

        Count users with a username other than 'admin':
        ```python
        count = await crud.count(db, username__ne='admin')
        ```

        Count projects with at least one participant (many-to-many relationship):
        ```python
        joins_config = [
            JoinConfig(
                model=ProjectsParticipantsAssociation,
                join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                join_type="inner"
            ),
            JoinConfig(
                model=Participant,
                join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                join_type="inner"
            )
        ]
        count = await crud.count(db, joins_config=joins_config)
        ```

        Count projects by a specific participant name (filter applied on a joined model):
        ```python
        joins_config = [
            JoinConfig(
                model=ProjectsParticipantsAssociation,
                join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                join_type="inner"
            ),
            JoinConfig(
                model=Participant,
                join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                join_type="inner",
                filters={'name': 'Jane Doe'}
            )
        ]
        count = await crud.count(db, joins_config=joins_config)
        ```
    """
    primary_filters = self._parse_filters(**kwargs)

    if joins_config is not None:
        primary_keys = [p.name for p in _get_primary_keys(self.model)]
        if not any(primary_keys):  # pragma: no cover
            raise ValueError(
                f"The model '{self.model.__name__}' does not have a primary key defined, which is required for counting with joins."
            )
        to_select = [
            getattr(self.model, pk).label(f"distinct_{pk}") for pk in primary_keys
        ]
        base_query = select(*to_select)

        for join in joins_config:
            join_model = join.alias or join.model
            join_filters = (
                self._parse_filters(model=join_model, **join.filters)
                if join.filters
                else []
            )

            if join.join_type == "inner":
                base_query = base_query.join(join_model, join.join_on)
            else:
                base_query = base_query.outerjoin(join_model, join.join_on)

            if join_filters:
                base_query = base_query.where(*join_filters)

        if primary_filters:
            base_query = base_query.where(*primary_filters)

        subquery = base_query.subquery()
        count_query = select(func.count()).select_from(subquery)
    else:
        count_query = select(func.count()).select_from(self.model)
        if primary_filters:
            count_query = count_query.where(*primary_filters)

    total_count: Optional[int] = await db.scalar(count_query)
    if total_count is None:
        raise ValueError("Could not find the count.")

    return total_count

create(db, object, commit=True) async

Create a new record in the database.

Parameters:

Name Type Description Default
db AsyncSession

The SQLAlchemy async session.

required
object CreateSchemaType

The Pydantic schema containing the data to be saved.

required
commit bool

If True, commits the transaction immediately. Default is True.

True

Returns:

Type Description
ModelType

The created database object.

Source code in fastcrud/crud/fast_crud.py
async def create(
    self, db: AsyncSession, object: CreateSchemaType, commit: bool = True
) -> ModelType:
    """
    Create a new record in the database.

    Args:
        db: The SQLAlchemy async session.
        object: The Pydantic schema containing the data to be saved.
        commit: If True, commits the transaction immediately. Default is True.

    Returns:
        The created database object.
    """
    object_dict = object.model_dump()
    db_object: ModelType = self.model(**object_dict)
    db.add(db_object)
    if commit:
        await db.commit()
    return db_object

db_delete(db, allow_multiple=False, commit=True, **kwargs) async

Deletes a record or multiple records from the database based on specified filters, with support for advanced filtering through comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
allow_multiple bool

If True, allows deleting multiple records that match the filters. If False, raises an error if more than one record matches the filters.

False
commit bool

If True, commits the transaction immediately. Default is True.

True
**kwargs Any

Filters to identify the record(s) to delete, including advanced comparison operators for detailed querying.

{}

Returns:

Type Description
None

None

Raises:

Type Description
MultipleResultsFound

If allow_multiple is False and more than one record matches the filters.

Examples:

Delete a user based on their ID:

db_delete(db, id=1)

Delete users older than 30 years and allow deletion of multiple records:

db_delete(db, allow_multiple=True, age__gt=30)

Delete a user with a specific username, ensuring only one record is deleted:

db_delete(db, username='unique_username', allow_multiple=False)

Source code in fastcrud/crud/fast_crud.py
async def db_delete(
    self,
    db: AsyncSession,
    allow_multiple: bool = False,
    commit: bool = True,
    **kwargs: Any,
) -> None:
    """
    Deletes a record or multiple records from the database based on specified filters, with support for advanced filtering through comparison operators:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The database session to use for the operation.
        allow_multiple: If True, allows deleting multiple records that match the filters. If False, raises an error if more than one record matches the filters.
        commit: If True, commits the transaction immediately. Default is True.
        **kwargs: Filters to identify the record(s) to delete, including advanced comparison operators for detailed querying.

    Returns:
        None

    Raises:
        MultipleResultsFound: If `allow_multiple` is False and more than one record matches the filters.

    Examples:
        Delete a user based on their ID:
        ```python
        db_delete(db, id=1)
        ```

        Delete users older than 30 years and allow deletion of multiple records:
        ```python
        db_delete(db, allow_multiple=True, age__gt=30)
        ```

        Delete a user with a specific username, ensuring only one record is deleted:
        ```python
        db_delete(db, username='unique_username', allow_multiple=False)
        ```
    """
    if not allow_multiple and (total_count := await self.count(db, **kwargs)) > 1:
        raise MultipleResultsFound(
            f"Expected exactly one record to delete, found {total_count}."
        )

    filters = self._parse_filters(**kwargs)
    stmt = delete(self.model).filter(*filters)
    await db.execute(stmt)
    if commit:
        await db.commit()

delete(db, db_row=None, allow_multiple=False, commit=True, **kwargs) async

Soft deletes a record or optionally multiple records if it has an "is_deleted" attribute, otherwise performs a hard delete, based on specified filters. Supports advanced filtering through comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
db_row Optional[Row]

Optional existing database row to delete. If provided, the method will attempt to delete this specific row, ignoring other filters.

None
allow_multiple bool

If True, allows deleting multiple records that match the filters. If False, raises an error if more than one record matches the filters.

False
commit bool

If True, commits the transaction immediately. Default is True.

True
**kwargs Any

Filters to identify the record(s) to delete, supporting advanced comparison operators for refined querying.

{}

Raises:

Type Description
MultipleResultsFound

If allow_multiple is False and more than one record matches the filters.

NoResultFound

If no record matches the filters.

Returns:

Type Description
None

None

Examples:

Soft delete a specific user by ID:

delete(db, id=1)

Hard delete users with account creation dates before 2020, allowing deletion of multiple records:

delete(db, allow_multiple=True, creation_date__lt=datetime(2020, 1, 1))

Soft delete a user with a specific email, ensuring only one record is deleted:

delete(db, email='unique@example.com', allow_multiple=False)

Source code in fastcrud/crud/fast_crud.py
async def delete(
    self,
    db: AsyncSession,
    db_row: Optional[Row] = None,
    allow_multiple: bool = False,
    commit: bool = True,
    **kwargs: Any,
) -> None:
    """
    Soft deletes a record or optionally multiple records if it has an "is_deleted" attribute, otherwise performs a hard delete, based on specified filters.
    Supports advanced filtering through comparison operators:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The database session to use for the operation.
        db_row: Optional existing database row to delete. If provided, the method will attempt to delete this specific row, ignoring other filters.
        allow_multiple: If True, allows deleting multiple records that match the filters. If False, raises an error if more than one record matches the filters.
        commit: If True, commits the transaction immediately. Default is True.
        **kwargs: Filters to identify the record(s) to delete, supporting advanced comparison operators for refined querying.

    Raises:
        MultipleResultsFound: If `allow_multiple` is False and more than one record matches the filters.
        NoResultFound: If no record matches the filters.

    Returns:
        None

    Examples:
        Soft delete a specific user by ID:
        ```python
        delete(db, id=1)
        ```

        Hard delete users with account creation dates before 2020, allowing deletion of multiple records:
        ```python
        delete(db, allow_multiple=True, creation_date__lt=datetime(2020, 1, 1))
        ```

        Soft delete a user with a specific email, ensuring only one record is deleted:
        ```python
        delete(db, email='unique@example.com', allow_multiple=False)
        ```
    """
    filters = self._parse_filters(**kwargs)
    if db_row:
        if hasattr(db_row, self.is_deleted_column) and hasattr(
            db_row, self.deleted_at_column
        ):
            setattr(db_row, self.is_deleted_column, True)
            setattr(db_row, self.deleted_at_column, datetime.now(timezone.utc))
            if commit:
                await db.commit()
        else:
            await db.delete(db_row)
        if commit:
            await db.commit()
        return

    total_count = await self.count(db, **kwargs)
    if total_count == 0:
        raise NoResultFound("No record found to delete.")
    if not allow_multiple and total_count > 1:
        raise MultipleResultsFound(
            f"Expected exactly one record to delete, found {total_count}."
        )

    if self.is_deleted_column in self.model_col_names:
        update_stmt = (
            update(self.model)
            .filter(*filters)
            .values(is_deleted=True, deleted_at=datetime.now(timezone.utc))
        )
        await db.execute(update_stmt)
    else:
        delete_stmt = delete(self.model).filter(*filters)
        await db.execute(delete_stmt)

    if commit:
        await db.commit()

exists(db, **kwargs) async

Checks if any records exist that match the given filter conditions. This method supports advanced filtering with comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
**kwargs Any

Filters to apply to the query, supporting both direct matches and advanced comparison operators for refined search criteria.

{}

Returns:

Type Description
bool

True if at least one record matches the filter conditions, False otherwise.

Examples:

Fetch a user by ID exists:

exists = await crud.exists(db, id=1)

Check if any user is older than 30:

exists = await crud.exists(db, age__gt=30)

Check if any user registered before Jan 1, 2020:

exists = await crud.exists(db, registration_date__lt=datetime(2020, 1, 1))

Check if a username other than 'admin' exists:

exists = await crud.exists(db, username__ne='admin')

Source code in fastcrud/crud/fast_crud.py
async def exists(self, db: AsyncSession, **kwargs: Any) -> bool:
    """
    Checks if any records exist that match the given filter conditions.
    This method supports advanced filtering with comparison operators:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The database session to use for the operation.
        **kwargs: Filters to apply to the query, supporting both direct matches and advanced comparison operators for refined search criteria.

    Returns:
        True if at least one record matches the filter conditions, False otherwise.

    Examples:
        Fetch a user by ID exists:
        ```python
        exists = await crud.exists(db, id=1)
        ```

        Check if any user is older than 30:
        ```python
        exists = await crud.exists(db, age__gt=30)
        ```

        Check if any user registered before Jan 1, 2020:
        ```python
        exists = await crud.exists(db, registration_date__lt=datetime(2020, 1, 1))
        ```

        Check if a username other than 'admin' exists:
        ```python
        exists = await crud.exists(db, username__ne='admin')
        ```
    """
    filters = self._parse_filters(**kwargs)
    stmt = select(self.model).filter(*filters).limit(1)

    result = await db.execute(stmt)
    return result.first() is not None

get(db, schema_to_select=None, return_as_model=False, one_or_none=False, **kwargs) async

Fetches a single record based on specified filters. This method allows for advanced filtering through comparison operators, enabling queries to be refined beyond simple equality checks. Supported operators include: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
schema_to_select Optional[type[BaseModel]]

Optional Pydantic schema for selecting specific columns.

None
return_as_model bool

If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.

False
one_or_none bool

Flag to get strictly one or no result. Multiple results are not allowed.

False
**kwargs Any

Filters to apply to the query, using field names for direct matches or appending comparison operators for advanced queries.

{}

Raises:

Type Description
ValueError

If return_as_model is True but schema_to_select is not provided.

Returns:

Type Description
Optional[Union[dict, BaseModel]]

A dictionary or a Pydantic model instance of the fetched database row, or None if no match is found.

Examples:

Fetch a user by ID:

user = await crud.get(db, id=1)

Fetch a user with an age greater than 30:

user = await crud.get(db, age__gt=30)

Fetch a user with a registration date before Jan 1, 2020:

user = await crud.get(db, registration_date__lt=datetime(2020, 1, 1))

Fetch a user not equal to a specific username:

user = await crud.get(db, username__ne='admin')

Source code in fastcrud/crud/fast_crud.py
async def get(
    self,
    db: AsyncSession,
    schema_to_select: Optional[type[BaseModel]] = None,
    return_as_model: bool = False,
    one_or_none: bool = False,
    **kwargs: Any,
) -> Optional[Union[dict, BaseModel]]:
    """
    Fetches a single record based on specified filters.
    This method allows for advanced filtering through comparison operators, enabling queries to be refined beyond simple equality checks.
    Supported operators include:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The database session to use for the operation.
        schema_to_select: Optional Pydantic schema for selecting specific columns.
        return_as_model: If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.
        one_or_none: Flag to get strictly one or no result. Multiple results are not allowed.
        **kwargs: Filters to apply to the query, using field names for direct matches or appending comparison operators for advanced queries.

    Raises:
        ValueError: If return_as_model is True but schema_to_select is not provided.

    Returns:
        A dictionary or a Pydantic model instance of the fetched database row, or None if no match is found.

    Examples:
        Fetch a user by ID:
        ```python
        user = await crud.get(db, id=1)
        ```

        Fetch a user with an age greater than 30:
        ```python
        user = await crud.get(db, age__gt=30)
        ```

        Fetch a user with a registration date before Jan 1, 2020:
        ```python
        user = await crud.get(db, registration_date__lt=datetime(2020, 1, 1))
        ```

        Fetch a user not equal to a specific username:
        ```python
        user = await crud.get(db, username__ne='admin')
        ```
    """
    stmt = await self.select(schema_to_select=schema_to_select, **kwargs)

    db_row = await db.execute(stmt)
    result: Optional[Row] = db_row.one_or_none() if one_or_none else db_row.first()
    if result is None:
        return None
    out: dict = dict(result._mapping)
    if not return_as_model:
        return out
    if not schema_to_select:
        raise ValueError(
            "schema_to_select must be provided when return_as_model is True."
        )
    return schema_to_select(**out)

get_joined(db, schema_to_select=None, join_model=None, join_on=None, join_prefix=None, join_schema_to_select=None, join_type='left', alias=None, join_filters=None, joins_config=None, nest_joins=False, relationship_type=None, **kwargs) async

Fetches a single record with one or multiple joins on other models. If 'join_on' is not provided, the method attempts to automatically detect the join condition using foreign key relationships. For multiple joins, use 'joins_config' to specify each join configuration. Advanced filters supported: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The SQLAlchemy async session.

required
schema_to_select Optional[type[BaseModel]]

Pydantic schema for selecting specific columns from the primary model. Required if return_as_model is True.

None
join_model Optional[type[DeclarativeBase]]

The model to join with.

None
join_on Optional[Union[Join, BinaryExpression]]

SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.

None
join_prefix Optional[str]

Optional prefix to be added to all columns of the joined model. If None, no prefix is added.

None
join_schema_to_select Optional[type[BaseModel]]

Pydantic schema for selecting specific columns from the joined model.

None
join_type str

Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.

'left'
alias Optional[AliasedClass]

An instance of AliasedClass for the join model, useful for self-joins or multiple joins on the same model. Result of aliased(join_model).

None
join_filters Optional[dict]

Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.

None
joins_config Optional[list[JoinConfig]]

A list of JoinConfig instances, each specifying a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and the type of join. This parameter enables support for multiple joins.

None
nest_joins bool

If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.

False
relationship_type Optional[str]

Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.

None
**kwargs Any

Filters to apply to the primary model query, supporting advanced comparison operators for refined searching.

{}

Returns:

Type Description
Optional[dict[str, Any]]

A dictionary representing the joined record, or None if no record matches the criteria.

Raises:

Type Description
ValueError

If both single join parameters and 'joins_config' are used simultaneously.

ArgumentError

If any provided model in 'joins_config' is not recognized or invalid.

NoResultFound

If no record matches the criteria with the provided filters.

Examples:

Simple example: Joining User and Tier models without explicitly providing join_on

result = await crud_user.get_joined(
    db=session,
    join_model=Tier,
    schema_to_select=UserSchema,
    join_schema_to_select=TierSchema
)

Fetch a user and their associated tier, filtering by user ID:

get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, id=1)

Fetch a user and their associated tier, where the user's age is greater than 30:

get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, age__gt=30)

Fetch a user and their associated tier, excluding users with the 'admin' username:

get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, username__ne='admin')

Complex example: Joining with a custom join condition, additional filter parameters, and a prefix

from sqlalchemy import and_
result = await crud_user.get_joined(
    db=session,
    join_model=Tier,
    join_prefix="tier_",
    join_on=and_(User.tier_id == Tier.id, User.is_superuser == True),
    schema_to_select=UserSchema,
    join_schema_to_select=TierSchema,
    username="john_doe"
)

Example of using 'joins_config' for multiple joins:

from fastcrud import JoinConfig

result = await crud_user.get_joined(
    db=session,
    schema_to_select=UserSchema,
    joins_config=[
        JoinConfig(
            model=Tier,
            join_on=User.tier_id == Tier.id,
            join_prefix="tier_",
            schema_to_select=TierSchema,
            join_type="left",
        ),
        JoinConfig(
            model=Department,
            join_on=User.department_id == Department.id,
            join_prefix="dept_",
            schema_to_select=DepartmentSchema,
            join_type="inner",
        )
    ]
)

Using alias for joining the same model multiple times:

from fastcrud import aliased

owner_alias = aliased(ModelTest, name="owner")
user_alias = aliased(ModelTest, name="user")

result = await crud.get_joined(
    db=session,
    schema_to_select=BookingSchema,
    joins_config=[
        JoinConfig(
            model=ModelTest,
            join_on=BookingModel.owner_id == owner_alias.id,
            join_prefix="owner_",
            alias=owner_alias,
            schema_to_select=UserSchema
        ),
        JoinConfig(
            model=ModelTest,
            join_on=BookingModel.user_id == user_alias.id,
            join_prefix="user_",
            alias=user_alias,
            schema_to_select=UserSchema
        )
    ],
    id=1
)

Fetching a single project and its associated participants where a participant has a specific role:

joins_config = [
    JoinConfig(
        model=ProjectsParticipantsAssociation,
        join_on=Project.id == ProjectsParticipantsAssociation.project_id,
        join_type="inner"
    ),
    JoinConfig(
        model=Participant,
        join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
        join_type="inner",
        filters={'role': 'Designer'}
    )
]
project = await crud.get_joined(
    db=session,
    schema_to_select=ProjectSchema,
    joins_config=joins_config
)

Example of using 'joins_config' for multiple joins with nested joins enabled:

from fastcrud import JoinConfig

result = await crud_user.get_joined(
    db=session,
    schema_to_select=UserSchema,
    joins_config=[
        JoinConfig(
            model=Tier,
            join_on=User.tier_id == Tier.id,
            join_prefix="tier_",
            schema_to_select=TierSchema,
            join_type="left",
        ),
        JoinConfig(
            model=Department,
            join_on=User.department_id == Department.id,
            join_prefix="dept_",
            schema_to_select=DepartmentSchema,
            join_type="inner",
        )
    ],
    nest_joins=True
)
# Expect 'result' to have 'tier' and 'dept' as nested dictionaries

Example using one-to-one relationship:

result = await crud_user.get_joined(
    db=session,
    join_model=Profile,
    join_on=User.profile_id == Profile.id,
    schema_to_select=UserSchema,
    join_schema_to_select=ProfileSchema,
    relationship_type='one-to-one' # note that this is the default behavior
)
# Expect 'result' to have 'profile' as a nested dictionary

Example using one-to-many relationship: ```python result = await crud_user.get_joined( db=session, join_model=Post, join_on=User.id == Post.user_id, schema_to_select=UserSchema, join_schema_to_select=PostSchema, relationship_type='one-to-many', nest_joins=True )

Expect 'result' to have 'posts' as a nested list of dictionaries

Source code in fastcrud/crud/fast_crud.py
async def get_joined(
    self,
    db: AsyncSession,
    schema_to_select: Optional[type[BaseModel]] = None,
    join_model: Optional[type[DeclarativeBase]] = None,
    join_on: Optional[Union[Join, BinaryExpression]] = None,
    join_prefix: Optional[str] = None,
    join_schema_to_select: Optional[type[BaseModel]] = None,
    join_type: str = "left",
    alias: Optional[AliasedClass] = None,
    join_filters: Optional[dict] = None,
    joins_config: Optional[list[JoinConfig]] = None,
    nest_joins: bool = False,
    relationship_type: Optional[str] = None,
    **kwargs: Any,
) -> Optional[dict[str, Any]]:
    """
    Fetches a single record with one or multiple joins on other models. If 'join_on' is not provided, the method attempts
    to automatically detect the join condition using foreign key relationships. For multiple joins, use 'joins_config' to
    specify each join configuration. Advanced filters supported:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The SQLAlchemy async session.
        schema_to_select: Pydantic schema for selecting specific columns from the primary model. Required if `return_as_model` is True.
        join_model: The model to join with.
        join_on: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.
        join_prefix: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.
        join_schema_to_select: Pydantic schema for selecting specific columns from the joined model.
        join_type: Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.
        alias: An instance of `AliasedClass` for the join model, useful for self-joins or multiple joins on the same model. Result of `aliased(join_model)`.
        join_filters: Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.
        joins_config: A list of JoinConfig instances, each specifying a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and the type of join. This parameter enables support for multiple joins.
        nest_joins: If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.
        relationship_type: Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.
        **kwargs: Filters to apply to the primary model query, supporting advanced comparison operators for refined searching.

    Returns:
        A dictionary representing the joined record, or None if no record matches the criteria.

    Raises:
        ValueError: If both single join parameters and 'joins_config' are used simultaneously.
        ArgumentError: If any provided model in 'joins_config' is not recognized or invalid.
        NoResultFound: If no record matches the criteria with the provided filters.

    Examples:
        Simple example: Joining User and Tier models without explicitly providing join_on
        ```python
        result = await crud_user.get_joined(
            db=session,
            join_model=Tier,
            schema_to_select=UserSchema,
            join_schema_to_select=TierSchema
        )
        ```

        Fetch a user and their associated tier, filtering by user ID:
        ```python
        get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, id=1)
        ```

        Fetch a user and their associated tier, where the user's age is greater than 30:
        ```python
        get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, age__gt=30)
        ```

        Fetch a user and their associated tier, excluding users with the 'admin' username:
        ```python
        get_joined(db, User, Tier, schema_to_select=UserSchema, join_schema_to_select=TierSchema, username__ne='admin')
        ```

        Complex example: Joining with a custom join condition, additional filter parameters, and a prefix
        ```python
        from sqlalchemy import and_
        result = await crud_user.get_joined(
            db=session,
            join_model=Tier,
            join_prefix="tier_",
            join_on=and_(User.tier_id == Tier.id, User.is_superuser == True),
            schema_to_select=UserSchema,
            join_schema_to_select=TierSchema,
            username="john_doe"
        )
        ```

        Example of using 'joins_config' for multiple joins:
        ```python
        from fastcrud import JoinConfig

        result = await crud_user.get_joined(
            db=session,
            schema_to_select=UserSchema,
            joins_config=[
                JoinConfig(
                    model=Tier,
                    join_on=User.tier_id == Tier.id,
                    join_prefix="tier_",
                    schema_to_select=TierSchema,
                    join_type="left",
                ),
                JoinConfig(
                    model=Department,
                    join_on=User.department_id == Department.id,
                    join_prefix="dept_",
                    schema_to_select=DepartmentSchema,
                    join_type="inner",
                )
            ]
        )
        ```

        Using `alias` for joining the same model multiple times:
        ```python
        from fastcrud import aliased

        owner_alias = aliased(ModelTest, name="owner")
        user_alias = aliased(ModelTest, name="user")

        result = await crud.get_joined(
            db=session,
            schema_to_select=BookingSchema,
            joins_config=[
                JoinConfig(
                    model=ModelTest,
                    join_on=BookingModel.owner_id == owner_alias.id,
                    join_prefix="owner_",
                    alias=owner_alias,
                    schema_to_select=UserSchema
                ),
                JoinConfig(
                    model=ModelTest,
                    join_on=BookingModel.user_id == user_alias.id,
                    join_prefix="user_",
                    alias=user_alias,
                    schema_to_select=UserSchema
                )
            ],
            id=1
        )
        ```

        Fetching a single project and its associated participants where a participant has a specific role:
        ```python
        joins_config = [
            JoinConfig(
                model=ProjectsParticipantsAssociation,
                join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                join_type="inner"
            ),
            JoinConfig(
                model=Participant,
                join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                join_type="inner",
                filters={'role': 'Designer'}
            )
        ]
        project = await crud.get_joined(
            db=session,
            schema_to_select=ProjectSchema,
            joins_config=joins_config
        )
        ```

        Example of using 'joins_config' for multiple joins with nested joins enabled:
        ```python
        from fastcrud import JoinConfig

        result = await crud_user.get_joined(
            db=session,
            schema_to_select=UserSchema,
            joins_config=[
                JoinConfig(
                    model=Tier,
                    join_on=User.tier_id == Tier.id,
                    join_prefix="tier_",
                    schema_to_select=TierSchema,
                    join_type="left",
                ),
                JoinConfig(
                    model=Department,
                    join_on=User.department_id == Department.id,
                    join_prefix="dept_",
                    schema_to_select=DepartmentSchema,
                    join_type="inner",
                )
            ],
            nest_joins=True
        )
        # Expect 'result' to have 'tier' and 'dept' as nested dictionaries
        ```

        Example using one-to-one relationship:
        ```python
        result = await crud_user.get_joined(
            db=session,
            join_model=Profile,
            join_on=User.profile_id == Profile.id,
            schema_to_select=UserSchema,
            join_schema_to_select=ProfileSchema,
            relationship_type='one-to-one' # note that this is the default behavior
        )
        # Expect 'result' to have 'profile' as a nested dictionary
        ```

        Example using one-to-many relationship:
        ```python
        result = await crud_user.get_joined(
            db=session,
            join_model=Post,
            join_on=User.id == Post.user_id,
            schema_to_select=UserSchema,
            join_schema_to_select=PostSchema,
            relationship_type='one-to-many',
            nest_joins=True
        )
        # Expect 'result' to have 'posts' as a nested list of dictionaries
    """
    if joins_config and (
        join_model or join_prefix or join_on or join_schema_to_select or alias
    ):
        raise ValueError(
            "Cannot use both single join parameters and joins_config simultaneously."
        )
    elif not joins_config and not join_model:
        raise ValueError("You need one of join_model or joins_config.")

    primary_select = _extract_matching_columns_from_schema(
        model=self.model,
        schema=schema_to_select,
    )
    stmt: Select = select(*primary_select).select_from(self.model)

    join_definitions = joins_config if joins_config else []
    if join_model:
        join_definitions.append(
            JoinConfig(
                model=join_model,
                join_on=join_on,
                join_prefix=join_prefix,
                schema_to_select=join_schema_to_select,
                join_type=join_type,
                alias=alias,
                filters=join_filters,
                relationship_type=relationship_type,
            )
        )

    stmt = self._prepare_and_apply_joins(
        stmt=stmt, joins_config=join_definitions, use_temporary_prefix=nest_joins
    )
    primary_filters = self._parse_filters(**kwargs)
    if primary_filters:
        stmt = stmt.filter(*primary_filters)

    db_rows = await db.execute(stmt)
    if any(join.relationship_type == "one-to-many" for join in join_definitions):
        if nest_joins is False:  # pragma: no cover
            raise ValueError(
                "Cannot use one-to-many relationship with nest_joins=False"
            )
        results = db_rows.fetchall()
        data_list = [dict(row._mapping) for row in results]
    else:
        result = db_rows.first()
        if result is not None:
            data_list = [dict(result._mapping)]
        else:
            data_list = []

    if data_list:
        if nest_joins:
            nested_data: dict = {}
            for data in data_list:
                nested_data = _nest_join_data(
                    data,
                    join_definitions,
                    nested_data=nested_data,
                )
            return nested_data
        return data_list[0]

    return None

get_multi(db, offset=0, limit=100, schema_to_select=None, sort_columns=None, sort_orders=None, return_as_model=False, return_total_count=True, **kwargs) async

Fetches multiple records based on filters, supporting sorting, pagination, and advanced filtering with comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
offset int

Starting index for records to fetch, useful for pagination.

0
limit Optional[int]

Maximum number of records to fetch in one call. Use None for "no limit", fetching all matching rows. Note that in order to use limit=None, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.

100
schema_to_select Optional[type[BaseModel]]

Optional Pydantic schema for selecting specific columns. Required if return_as_model is True.

None
sort_columns Optional[Union[str, list[str]]]

Column names to sort the results by.

None
sort_orders Optional[Union[str, list[str]]]

Corresponding sort orders ('asc', 'desc') for each column in sort_columns.

None
return_as_model bool

If True, returns data as instances of the specified Pydantic model.

False
return_total_count bool

If True, also returns the total count of rows with the selected filters. Useful for pagination.

True
**kwargs Any

Filters to apply to the query, including advanced comparison operators for more detailed querying.

{}

Returns:

Type Description
dict[str, Any]

A dictionary containing 'data' with fetched records and 'total_count' indicating the total number of records matching the filters.

Raises:

Type Description
ValueError

If limit or offset is negative, or if schema_to_select is required but not provided or invalid.

Examples:

Fetch the first 10 users:

users = await crud.get_multi(db, 0, 10)

Fetch next 10 users with sorted by username:

users = await crud.get_multi(db, 10, 10, sort_columns='username', sort_orders='desc')

Fetch 10 users older than 30, sorted by age in descending order:

get_multi(db, offset=0, limit=10, age__gt=30, sort_columns='age', sort_orders='desc')

Fetch 10 users with a registration date before Jan 1, 2020:

get_multi(db, offset=0, limit=10, registration_date__lt=datetime(2020, 1, 1))

Fetch 10 users with a username other than 'admin', returning as model instances (ensure appropriate schema is passed):

get_multi(db, offset=0, limit=10, username__ne='admin', schema_to_select=UserSchema, return_as_model=True)

Fetch users with filtering and multiple column sorting:

users = await crud.get_multi(db, 0, 10, is_active=True, sort_columns=['username', 'email'], sort_orders=['asc', 'desc'])

Source code in fastcrud/crud/fast_crud.py
async def get_multi(
    self,
    db: AsyncSession,
    offset: int = 0,
    limit: Optional[int] = 100,
    schema_to_select: Optional[type[BaseModel]] = None,
    sort_columns: Optional[Union[str, list[str]]] = None,
    sort_orders: Optional[Union[str, list[str]]] = None,
    return_as_model: bool = False,
    return_total_count: bool = True,
    **kwargs: Any,
) -> dict[str, Any]:
    """
    Fetches multiple records based on filters, supporting sorting, pagination, and advanced filtering with comparison operators:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The database session to use for the operation.
        offset: Starting index for records to fetch, useful for pagination.
        limit: Maximum number of records to fetch in one call. Use `None` for "no limit", fetching all matching rows. Note that in order to use `limit=None`, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.
        schema_to_select: Optional Pydantic schema for selecting specific columns. Required if `return_as_model` is True.
        sort_columns: Column names to sort the results by.
        sort_orders: Corresponding sort orders ('asc', 'desc') for each column in sort_columns.
        return_as_model: If True, returns data as instances of the specified Pydantic model.
        return_total_count: If True, also returns the total count of rows with the selected filters. Useful for pagination.
        **kwargs: Filters to apply to the query, including advanced comparison operators for more detailed querying.

    Returns:
        A dictionary containing 'data' with fetched records and 'total_count' indicating the total number of records matching the filters.

    Raises:
        ValueError: If limit or offset is negative, or if schema_to_select is required but not provided or invalid.

    Examples:
        Fetch the first 10 users:
        ```python
        users = await crud.get_multi(db, 0, 10)
        ```

        Fetch next 10 users with sorted by username:
        ```python
        users = await crud.get_multi(db, 10, 10, sort_columns='username', sort_orders='desc')
        ```

        Fetch 10 users older than 30, sorted by age in descending order:
        ```python
        get_multi(db, offset=0, limit=10, age__gt=30, sort_columns='age', sort_orders='desc')
        ```

        Fetch 10 users with a registration date before Jan 1, 2020:
        ```python
        get_multi(db, offset=0, limit=10, registration_date__lt=datetime(2020, 1, 1))
        ```

        Fetch 10 users with a username other than 'admin', returning as model instances (ensure appropriate schema is passed):
        ```python
        get_multi(db, offset=0, limit=10, username__ne='admin', schema_to_select=UserSchema, return_as_model=True)
        ```

        Fetch users with filtering and multiple column sorting:
        ```python
        users = await crud.get_multi(db, 0, 10, is_active=True, sort_columns=['username', 'email'], sort_orders=['asc', 'desc'])
        ```
    """
    if (limit is not None and limit < 0) or offset < 0:
        raise ValueError("Limit and offset must be non-negative.")

    stmt = await self.select(
        schema_to_select=schema_to_select,
        sort_columns=sort_columns,
        sort_orders=sort_orders,
        **kwargs,
    )

    if offset:
        stmt = stmt.offset(offset)
    if limit is not None:
        stmt = stmt.limit(limit)

    result = await db.execute(stmt)
    data = [dict(row) for row in result.mappings()]

    response: dict[str, Any] = {"data": data}

    if return_total_count:
        total_count = await self.count(db=db, **kwargs)
        response["total_count"] = total_count

    if return_as_model:
        if not schema_to_select:
            raise ValueError(
                "schema_to_select must be provided when return_as_model is True."
            )
        try:
            model_data = [schema_to_select(**row) for row in data]
            response["data"] = model_data
        except ValidationError as e:
            raise ValueError(
                f"Data validation error for schema {schema_to_select.__name__}: {e}"
            )

    return response

get_multi_by_cursor(db, cursor=None, limit=100, schema_to_select=None, sort_column='id', sort_order='asc', **kwargs) async

Implements cursor-based pagination for fetching records. This method is designed for efficient data retrieval in large datasets and is ideal for features like infinite scrolling. It supports advanced filtering with comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The SQLAlchemy async session.

required
cursor Any

The cursor value to start fetching records from. Defaults to None.

None
limit int

Maximum number of rows to fetch.

100
schema_to_select Optional[type[BaseModel]]

Pydantic schema for selecting specific columns.

None
sort_column str

Column name to use for sorting and cursor pagination.

'id'
sort_order str

Sorting direction, either 'asc' or 'desc'.

'asc'
**kwargs Any

Filters to apply to the query, including advanced comparison operators for detailed querying.

{}

Returns:

Type Description
dict[str, Any]

A dictionary containing the fetched rows under 'data' key and the next cursor value under 'next_cursor'.

Examples:

Fetch the first set of records (e.g., the first page in an infinite scrolling scenario)

first_page = await crud.get_multi_by_cursor(db, limit=10, sort_column='created_at', sort_order='desc')

Fetch the next set of records using the cursor from the first page
next_cursor = first_page['next_cursor']
second_page = await crud.get_multi_by_cursor(db, cursor=next_cursor, limit=10, sort_column='created_at', sort_order='desc')

Fetch records with age greater than 30 using cursor-based pagination:

get_multi_by_cursor(db, limit=10, sort_column='age', sort_order='asc', age__gt=30)

Fetch records excluding a specific username using cursor-based pagination:

get_multi_by_cursor(db, limit=10, sort_column='username', sort_order='asc', username__ne='admin')

Note

This method is designed for efficient pagination in large datasets and is ideal for infinite scrolling features. Make sure the column used for cursor pagination is indexed for performance. This method assumes that your records can be ordered by a unique, sequential field (like id or created_at).

Source code in fastcrud/crud/fast_crud.py
async def get_multi_by_cursor(
    self,
    db: AsyncSession,
    cursor: Any = None,
    limit: int = 100,
    schema_to_select: Optional[type[BaseModel]] = None,
    sort_column: str = "id",
    sort_order: str = "asc",
    **kwargs: Any,
) -> dict[str, Any]:
    """
    Implements cursor-based pagination for fetching records. This method is designed for efficient data retrieval in large datasets and is ideal for features like infinite scrolling.
    It supports advanced filtering with comparison operators:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The SQLAlchemy async session.
        cursor: The cursor value to start fetching records from. Defaults to None.
        limit: Maximum number of rows to fetch.
        schema_to_select: Pydantic schema for selecting specific columns.
        sort_column: Column name to use for sorting and cursor pagination.
        sort_order: Sorting direction, either 'asc' or 'desc'.
        **kwargs: Filters to apply to the query, including advanced comparison operators for detailed querying.

    Returns:
        A dictionary containing the fetched rows under 'data' key and the next cursor value under 'next_cursor'.

    Examples:
        Fetch the first set of records (e.g., the first page in an infinite scrolling scenario)
        ```python
        first_page = await crud.get_multi_by_cursor(db, limit=10, sort_column='created_at', sort_order='desc')

        Fetch the next set of records using the cursor from the first page
        next_cursor = first_page['next_cursor']
        second_page = await crud.get_multi_by_cursor(db, cursor=next_cursor, limit=10, sort_column='created_at', sort_order='desc')
        ```

        Fetch records with age greater than 30 using cursor-based pagination:
        ```python
        get_multi_by_cursor(db, limit=10, sort_column='age', sort_order='asc', age__gt=30)
        ```

        Fetch records excluding a specific username using cursor-based pagination:
        ```python
        get_multi_by_cursor(db, limit=10, sort_column='username', sort_order='asc', username__ne='admin')
        ```

    Note:
        This method is designed for efficient pagination in large datasets and is ideal for infinite scrolling features.
        Make sure the column used for cursor pagination is indexed for performance.
        This method assumes that your records can be ordered by a unique, sequential field (like `id` or `created_at`).
    """
    if limit == 0:
        return {"data": [], "next_cursor": None}

    stmt = await self.select(
        schema_to_select=schema_to_select,
        **kwargs,
    )

    if cursor:
        if sort_order == "asc":
            stmt = stmt.filter(getattr(self.model, sort_column) > cursor)
        else:
            stmt = stmt.filter(getattr(self.model, sort_column) < cursor)

    stmt = stmt.order_by(
        asc(getattr(self.model, sort_column))
        if sort_order == "asc"
        else desc(getattr(self.model, sort_column))
    )
    stmt = stmt.limit(limit)

    result = await db.execute(stmt)
    data = [dict(row) for row in result.mappings()]

    next_cursor = None
    if len(data) == limit:
        if sort_order == "asc":
            next_cursor = data[-1][sort_column]
        else:
            data[0][sort_column]

    return {"data": data, "next_cursor": next_cursor}

get_multi_joined(db, schema_to_select=None, join_model=None, join_on=None, join_prefix=None, join_schema_to_select=None, join_type='left', alias=None, join_filters=None, nest_joins=False, offset=0, limit=100, sort_columns=None, sort_orders=None, return_as_model=False, joins_config=None, return_total_count=True, relationship_type=None, **kwargs) async

Fetch multiple records with a join on another model, allowing for pagination, optional sorting, and model conversion, supporting advanced filtering with comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The SQLAlchemy async session.

required
schema_to_select Optional[type[BaseModel]]

Pydantic schema for selecting specific columns from the primary model. Required if return_as_model is True.

None
join_model Optional[type[ModelType]]

The model to join with.

None
join_on Optional[Any]

SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.

None
join_prefix Optional[str]

Optional prefix to be added to all columns of the joined model. If None, no prefix is added.

None
join_schema_to_select Optional[type[BaseModel]]

Pydantic schema for selecting specific columns from the joined model.

None
join_type str

Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.

'left'
alias Optional[AliasedClass[Any]]

An instance of AliasedClass for the join model, useful for self-joins or multiple joins on the same model. Result of aliased(join_model).

None
join_filters Optional[dict]

Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.

None
nest_joins bool

If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.

False
offset int

The offset (number of records to skip) for pagination.

0
limit Optional[int]

Maximum number of records to fetch in one call. Use None for "no limit", fetching all matching rows. Note that in order to use limit=None, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.

100
sort_columns Optional[Union[str, list[str]]]

A single column name or a list of column names on which to apply sorting.

None
sort_orders Optional[Union[str, list[str]]]

A single sort order ('asc' or 'desc') or a list of sort orders corresponding to the columns in sort_columns. If not provided, defaults to 'asc' for each column.

None
return_as_model bool

If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.

False
joins_config Optional[list[JoinConfig]]

List of JoinConfig instances for specifying multiple joins. Each instance defines a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and join type.

None
return_total_count bool

If True, also returns the total count of rows with the selected filters. Useful for pagination.

True
relationship_type Optional[str]

Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.

None
**kwargs Any

Filters to apply to the primary query, including advanced comparison operators for refined searching.

{}

Returns:

Type Description
dict[str, Any]

A dictionary containing the fetched rows under 'data' key and total count under 'total_count'.

Raises:

Type Description
ValueError

If limit or offset is negative, or if schema_to_select is required but not provided or invalid. Also if both 'joins_config' and any of the single join parameters are provided or none of 'joins_config' and 'join_model' is provided.

Examples:

Fetching multiple User records joined with Tier records, using left join, returning raw data:

users = await crud_user.get_multi_joined(
    db=session,
    join_model=Tier,
    join_prefix="tier_",
    schema_to_select=UserSchema,
    join_schema_to_select=TierSchema,
    offset=0,
    limit=10
)

Fetch users joined with their tiers, sorted by username, where user's age is greater than 30:

users = get_multi_joined(
    db,
    User,
    Tier,
    schema_to_select=UserSchema,
    join_schema_to_select=TierSchema,
    age__gt=30,
    sort_columns='username',
    sort_orders='asc'
)

Fetch users joined with their tiers, excluding users with 'admin' username, returning as model instances:

users = get_multi_joined(
    db,
    User,
    Tier,
    schema_to_select=UserSchema,
    join_schema_to_select=TierSchema,
    username__ne='admin',
    return_as_model=True
)

Fetching and sorting by username in descending order, returning as Pydantic model:

users = await crud_user.get_multi_joined(
    db=session,
    join_model=Tier,
    join_prefix="tier_",
    schema_to_select=UserSchema,
    join_schema_to_select=TierSchema,
    offset=0,
    limit=10,
    sort_columns=['username'],
    sort_orders=['desc'],
    return_as_model=True
)

Fetching with complex conditions and custom join, returning as Pydantic model:

users = await crud_user.get_multi_joined(
    db=session,
    join_model=Tier,
    join_prefix="tier_",
    join_on=User.tier_id == Tier.id,
    schema_to_select=UserSchema,
    join_schema_to_select=TierSchema,
    offset=0,
    limit=10,
    is_active=True,
    return_as_model=True
)

Example using 'joins_config' for multiple joins:

from fastcrud import JoinConfig

users = await crud_user.get_multi_joined(
    db=session,
    schema_to_select=UserSchema,
    joins_config=[
        JoinConfig(
            model=Tier,
            join_on=User.tier_id == Tier.id,
            join_prefix="tier_",
            schema_to_select=TierSchema,
            join_type="left",
        ),
        JoinConfig(
            model=Department,
            join_on=User.department_id == Department.id,
            join_prefix="dept_",
            schema_to_select=DepartmentSchema,
            join_type="inner",
        )
    ],
    offset=0,
    limit=10,
    sort_columns='username',
    sort_orders='asc'
)

Example using alias for multiple joins, with pagination, sorting, and model conversion:

from fastcrud import JoinConfig, FastCRUD, aliased

# Aliasing for self-joins or multiple joins on the same table
owner_alias = aliased(ModelTest, name="owner")
user_alias = aliased(ModelTest, name="user")

# Initialize your FastCRUD instance for BookingModel
crud = FastCRUD(BookingModel)

result = await crud.get_multi_joined(
    db=session,
    schema_to_select=BookingSchema,  # Primary model schema
    joins_config=[
        JoinConfig(
            model=ModelTest,
            join_on=BookingModel.owner_id == owner_alias.id,
            join_prefix="owner_",
            alias=owner_alias,
            schema_to_select=UserSchema  # Schema for the joined model
        ),
        JoinConfig(
            model=ModelTest,
            join_on=BookingModel.user_id == user_alias.id,
            join_prefix="user_",
            alias=user_alias,
            schema_to_select=UserSchema
        )
    ],
    offset=10,  # Skip the first 10 records
    limit=5,  # Fetch up to 5 records
    sort_columns=['booking_date'],  # Sort by booking_date
    sort_orders=['desc']  # In descending order
)

Fetching multiple project records and their associated participants where participants have a specific role:

joins_config = [
    JoinConfig(
        model=ProjectsParticipantsAssociation,
        join_on=Project.id == ProjectsParticipantsAssociation.project_id,
        join_type="inner"
    ),
    JoinConfig(
        model=Participant,
        join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
        join_type="inner",
        filters={'role': 'Developer'}
    )
]
projects = await crud.get_multi_joined(
    db=session,
    schema_to_select=ProjectSchema,
    joins_config=joins_config,
    limit=10
)

Fetching a list of projects, each with nested details of associated tasks and task creators, using nested joins: ```python projects = await crud.get_multi_joined( db=session, schema_to_select=ProjectSchema, joins_config=[ JoinConfig( model=Task, join_on=Project.id == Task.project_id, join_prefix="task_", schema_to_select=TaskSchema, join_type="left", ), JoinConfig( model=User, join_on=Task.creator_id == User.id, join_prefix="creator_", schema_to_select=UserSchema, join_type="left", alias=aliased(User, name="task_creator") ) ], nest_joins=True, offset=0, limit=5, sort_columns='project_name', sort_orders='asc' )

Example using one-to-one relationship:
```python
users = await crud_user.get_multi_joined(
    db=session,
    join_model=Profile,
    join_on=User.profile_id == Profile.id,
    schema_to_select=UserSchema,
    join_schema_to_select=ProfileSchema,
    relationship_type='one-to-one', # note that this is the default behavior
    offset=0,
    limit=10
)
# Expect 'profile' to be nested as a dictionary under each user

Example using one-to-many relationship:

users = await crud_user.get_multi_joined(
    db=session,
    join_model=Post,
    join_on=User.id == Post.user_id,
    schema_to_select=UserSchema,
    join_schema_to_select=PostSchema,
    relationship_type='one-to-many',
    nest_joins=True,
    offset=0,
    limit=10
)
# Expect 'posts' to be nested as a list of dictionaries under each user

Source code in fastcrud/crud/fast_crud.py
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
async def get_multi_joined(
    self,
    db: AsyncSession,
    schema_to_select: Optional[type[BaseModel]] = None,
    join_model: Optional[type[ModelType]] = None,
    join_on: Optional[Any] = None,
    join_prefix: Optional[str] = None,
    join_schema_to_select: Optional[type[BaseModel]] = None,
    join_type: str = "left",
    alias: Optional[AliasedClass[Any]] = None,
    join_filters: Optional[dict] = None,
    nest_joins: bool = False,
    offset: int = 0,
    limit: Optional[int] = 100,
    sort_columns: Optional[Union[str, list[str]]] = None,
    sort_orders: Optional[Union[str, list[str]]] = None,
    return_as_model: bool = False,
    joins_config: Optional[list[JoinConfig]] = None,
    return_total_count: bool = True,
    relationship_type: Optional[str] = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """
    Fetch multiple records with a join on another model, allowing for pagination, optional sorting, and model conversion,
    supporting advanced filtering with comparison operators:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The SQLAlchemy async session.
        schema_to_select: Pydantic schema for selecting specific columns from the primary model. Required if `return_as_model` is True.
        join_model: The model to join with.
        join_on: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.
        join_prefix: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.
        join_schema_to_select: Pydantic schema for selecting specific columns from the joined model.
        join_type: Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.
        alias: An instance of `AliasedClass` for the join model, useful for self-joins or multiple joins on the same model. Result of `aliased(join_model)`.
        join_filters: Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.
        nest_joins: If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.
        offset: The offset (number of records to skip) for pagination.
        limit: Maximum number of records to fetch in one call. Use `None` for "no limit", fetching all matching rows. Note that in order to use `limit=None`, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.
        sort_columns: A single column name or a list of column names on which to apply sorting.
        sort_orders: A single sort order ('asc' or 'desc') or a list of sort orders corresponding to the columns in sort_columns. If not provided, defaults to 'asc' for each column.
        return_as_model: If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.
        joins_config: List of JoinConfig instances for specifying multiple joins. Each instance defines a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and join type.
        return_total_count: If True, also returns the total count of rows with the selected filters. Useful for pagination.
        relationship_type: Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.
        **kwargs: Filters to apply to the primary query, including advanced comparison operators for refined searching.

    Returns:
        A dictionary containing the fetched rows under 'data' key and total count under 'total_count'.

    Raises:
        ValueError: If limit or offset is negative, or if schema_to_select is required but not provided or invalid.
                    Also if both 'joins_config' and any of the single join parameters are provided or none of 'joins_config' and 'join_model' is provided.

    Examples:
        Fetching multiple User records joined with Tier records, using left join, returning raw data:
        ```python
        users = await crud_user.get_multi_joined(
            db=session,
            join_model=Tier,
            join_prefix="tier_",
            schema_to_select=UserSchema,
            join_schema_to_select=TierSchema,
            offset=0,
            limit=10
        )
        ```

        Fetch users joined with their tiers, sorted by username, where user's age is greater than 30:
        ```python
        users = get_multi_joined(
            db,
            User,
            Tier,
            schema_to_select=UserSchema,
            join_schema_to_select=TierSchema,
            age__gt=30,
            sort_columns='username',
            sort_orders='asc'
        )
        ```

        Fetch users joined with their tiers, excluding users with 'admin' username, returning as model instances:
        ```python
        users = get_multi_joined(
            db,
            User,
            Tier,
            schema_to_select=UserSchema,
            join_schema_to_select=TierSchema,
            username__ne='admin',
            return_as_model=True
        )
        ```

        Fetching and sorting by username in descending order, returning as Pydantic model:
        ```python
        users = await crud_user.get_multi_joined(
            db=session,
            join_model=Tier,
            join_prefix="tier_",
            schema_to_select=UserSchema,
            join_schema_to_select=TierSchema,
            offset=0,
            limit=10,
            sort_columns=['username'],
            sort_orders=['desc'],
            return_as_model=True
        )
        ```

        Fetching with complex conditions and custom join, returning as Pydantic model:
        ```python
        users = await crud_user.get_multi_joined(
            db=session,
            join_model=Tier,
            join_prefix="tier_",
            join_on=User.tier_id == Tier.id,
            schema_to_select=UserSchema,
            join_schema_to_select=TierSchema,
            offset=0,
            limit=10,
            is_active=True,
            return_as_model=True
        )
        ```

        Example using 'joins_config' for multiple joins:
        ```python
        from fastcrud import JoinConfig

        users = await crud_user.get_multi_joined(
            db=session,
            schema_to_select=UserSchema,
            joins_config=[
                JoinConfig(
                    model=Tier,
                    join_on=User.tier_id == Tier.id,
                    join_prefix="tier_",
                    schema_to_select=TierSchema,
                    join_type="left",
                ),
                JoinConfig(
                    model=Department,
                    join_on=User.department_id == Department.id,
                    join_prefix="dept_",
                    schema_to_select=DepartmentSchema,
                    join_type="inner",
                )
            ],
            offset=0,
            limit=10,
            sort_columns='username',
            sort_orders='asc'
        )
        ```

        Example using `alias` for multiple joins, with pagination, sorting, and model conversion:
        ```python
        from fastcrud import JoinConfig, FastCRUD, aliased

        # Aliasing for self-joins or multiple joins on the same table
        owner_alias = aliased(ModelTest, name="owner")
        user_alias = aliased(ModelTest, name="user")

        # Initialize your FastCRUD instance for BookingModel
        crud = FastCRUD(BookingModel)

        result = await crud.get_multi_joined(
            db=session,
            schema_to_select=BookingSchema,  # Primary model schema
            joins_config=[
                JoinConfig(
                    model=ModelTest,
                    join_on=BookingModel.owner_id == owner_alias.id,
                    join_prefix="owner_",
                    alias=owner_alias,
                    schema_to_select=UserSchema  # Schema for the joined model
                ),
                JoinConfig(
                    model=ModelTest,
                    join_on=BookingModel.user_id == user_alias.id,
                    join_prefix="user_",
                    alias=user_alias,
                    schema_to_select=UserSchema
                )
            ],
            offset=10,  # Skip the first 10 records
            limit=5,  # Fetch up to 5 records
            sort_columns=['booking_date'],  # Sort by booking_date
            sort_orders=['desc']  # In descending order
        )
        ```

        Fetching multiple project records and their associated participants where participants have a specific role:
        ```python
        joins_config = [
            JoinConfig(
                model=ProjectsParticipantsAssociation,
                join_on=Project.id == ProjectsParticipantsAssociation.project_id,
                join_type="inner"
            ),
            JoinConfig(
                model=Participant,
                join_on=ProjectsParticipantsAssociation.participant_id == Participant.id,
                join_type="inner",
                filters={'role': 'Developer'}
            )
        ]
        projects = await crud.get_multi_joined(
            db=session,
            schema_to_select=ProjectSchema,
            joins_config=joins_config,
            limit=10
        )
        ```

        Fetching a list of projects, each with nested details of associated tasks and task creators, using nested joins:
        ```python
        projects = await crud.get_multi_joined(
            db=session,
            schema_to_select=ProjectSchema,
            joins_config=[
                JoinConfig(
                    model=Task,
                    join_on=Project.id == Task.project_id,
                    join_prefix="task_",
                    schema_to_select=TaskSchema,
                    join_type="left",
                ),
                JoinConfig(
                    model=User,
                    join_on=Task.creator_id == User.id,
                    join_prefix="creator_",
                    schema_to_select=UserSchema,
                    join_type="left",
                    alias=aliased(User, name="task_creator")
                )
            ],
            nest_joins=True,
            offset=0,
            limit=5,
            sort_columns='project_name',
            sort_orders='asc'
        )
    ```

    Example using one-to-one relationship:
    ```python
    users = await crud_user.get_multi_joined(
        db=session,
        join_model=Profile,
        join_on=User.profile_id == Profile.id,
        schema_to_select=UserSchema,
        join_schema_to_select=ProfileSchema,
        relationship_type='one-to-one', # note that this is the default behavior
        offset=0,
        limit=10
    )
    # Expect 'profile' to be nested as a dictionary under each user
    ```

    Example using one-to-many relationship:
    ```python
    users = await crud_user.get_multi_joined(
        db=session,
        join_model=Post,
        join_on=User.id == Post.user_id,
        schema_to_select=UserSchema,
        join_schema_to_select=PostSchema,
        relationship_type='one-to-many',
        nest_joins=True,
        offset=0,
        limit=10
    )
    # Expect 'posts' to be nested as a list of dictionaries under each user
    ```
    """
    if joins_config and (
        join_model
        or join_prefix
        or join_on
        or join_schema_to_select
        or alias
        or relationship_type
    ):
        raise ValueError(
            "Cannot use both single join parameters and joins_config simultaneously."
        )
    elif not joins_config and not join_model:
        raise ValueError("You need one of join_model or joins_config.")

    if (limit is not None and limit < 0) or offset < 0:
        raise ValueError("Limit and offset must be non-negative.")

    primary_select = _extract_matching_columns_from_schema(
        model=self.model, schema=schema_to_select
    )
    stmt: Select = select(*primary_select)

    join_definitions = joins_config if joins_config else []
    if join_model:
        join_definitions.append(
            JoinConfig(
                model=join_model,
                join_on=join_on
                or _auto_detect_join_condition(self.model, join_model),
                join_prefix=join_prefix,
                schema_to_select=join_schema_to_select,
                join_type=join_type,
                alias=alias,
                filters=join_filters,
                relationship_type=relationship_type,
            )
        )

    stmt = self._prepare_and_apply_joins(
        stmt=stmt, joins_config=join_definitions, use_temporary_prefix=nest_joins
    )

    primary_filters = self._parse_filters(**kwargs)
    if primary_filters:
        stmt = stmt.filter(*primary_filters)

    if sort_columns:
        stmt = self._apply_sorting(stmt, sort_columns, sort_orders)

    if offset:
        stmt = stmt.offset(offset)
    if limit is not None:
        stmt = stmt.limit(limit)

    result = await db.execute(stmt)
    data: list[Union[dict, BaseModel]] = []

    for row in result.mappings().all():
        row_dict = dict(row)

        if nest_joins:
            row_dict = _nest_join_data(
                data=row_dict,
                join_definitions=join_definitions,
            )

        if return_as_model:
            if schema_to_select is None:
                raise ValueError(
                    "schema_to_select must be provided when return_as_model is True."
                )
            try:
                model_instance = schema_to_select(**row_dict)
                data.append(model_instance)
            except ValidationError as e:
                raise ValueError(
                    f"Data validation error for schema {schema_to_select.__name__}: {e}"
                )
        else:
            data.append(row_dict)

    if nest_joins and any(
        join.relationship_type == "one-to-many" for join in join_definitions
    ):
        nested_data = _nest_multi_join_data(
            base_primary_key=self._primary_keys[0].name,
            data=data,
            joins_config=join_definitions,
            return_as_model=return_as_model,
            schema_to_select=schema_to_select if return_as_model else None,
            nested_schema_to_select={
                (
                    join.join_prefix.rstrip("_")
                    if join.join_prefix
                    else join.model.__name__
                ): join.schema_to_select
                for join in join_definitions
                if join.schema_to_select
            },
        )
    else:
        nested_data = data

    response: dict[str, Any] = {"data": nested_data}

    if return_total_count:
        total_count: int = await self.count(
            db=db, joins_config=joins_config, **kwargs
        )
        response["total_count"] = total_count

    return response

select(schema_to_select=None, sort_columns=None, sort_orders=None, **kwargs) async

Constructs a SQL Alchemy Select statement with optional column selection, filtering, and sorting. This method allows for advanced filtering through comparison operators, enabling queries to be refined beyond simple equality checks. Supported operators include: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
schema_to_select Optional[type[BaseModel]]

Pydantic schema to determine which columns to include in the selection. If not provided, selects all columns of the model.

None
sort_columns Optional[Union[str, list[str]]]

A single column name or list of column names to sort the query results by. Must be used in conjunction with sort_orders.

None
sort_orders Optional[Union[str, list[str]]]

A single sort order ('asc' or 'desc') or a list of sort orders, corresponding to each column in sort_columns. If not specified, defaults to ascending order for all sort_columns.

None

Returns:

Name Type Description
Selectable Select

An SQL Alchemy Select statement object that can be executed or further modified.

Examples:

Selecting specific columns with filtering and sorting:

stmt = await crud.select(
    schema_to_select=UserReadSchema,
    sort_columns=['age', 'name'],
    sort_orders=['asc', 'desc'],
    age__gt=18
)

Creating a statement to select all users without any filters:

stmt = await crud.select()

Selecting users with a specific role, ordered by name:

stmt = await crud.select(
    schema_to_select=UserReadSchema,
    sort_columns='name',
    role='admin'
)

Note: This method does not execute the generated SQL statement. Use db.execute(stmt) to run the query and fetch results.

Source code in fastcrud/crud/fast_crud.py
async def select(
    self,
    schema_to_select: Optional[type[BaseModel]] = None,
    sort_columns: Optional[Union[str, list[str]]] = None,
    sort_orders: Optional[Union[str, list[str]]] = None,
    **kwargs,
) -> Select:
    """
    Constructs a SQL Alchemy `Select` statement with optional column selection, filtering, and sorting.
    This method allows for advanced filtering through comparison operators, enabling queries to be refined beyond simple equality checks.
    Supported operators include:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        schema_to_select: Pydantic schema to determine which columns to include in the selection. If not provided, selects all columns of the model.
        sort_columns: A single column name or list of column names to sort the query results by. Must be used in conjunction with sort_orders.
        sort_orders: A single sort order ('asc' or 'desc') or a list of sort orders, corresponding to each column in sort_columns. If not specified, defaults to ascending order for all sort_columns.

    Returns:
        Selectable: An SQL Alchemy `Select` statement object that can be executed or further modified.

    Examples:
        Selecting specific columns with filtering and sorting:
        ```python
        stmt = await crud.select(
            schema_to_select=UserReadSchema,
            sort_columns=['age', 'name'],
            sort_orders=['asc', 'desc'],
            age__gt=18
        )
        ```

        Creating a statement to select all users without any filters:
        ```python
        stmt = await crud.select()
        ```

        Selecting users with a specific role, ordered by name:
        ```python
        stmt = await crud.select(
            schema_to_select=UserReadSchema,
            sort_columns='name',
            role='admin'
        )
        ```
    Note:
        This method does not execute the generated SQL statement.
        Use `db.execute(stmt)` to run the query and fetch results.
    """
    to_select = _extract_matching_columns_from_schema(
        model=self.model, schema=schema_to_select
    )
    filters = self._parse_filters(**kwargs)
    stmt = select(*to_select).filter(*filters)

    if sort_columns:
        stmt = self._apply_sorting(stmt, sort_columns, sort_orders)
    return stmt

update(db, object, allow_multiple=False, commit=True, **kwargs) async

Updates an existing record or multiple records in the database based on specified filters. This method allows for precise targeting of records to update. It supports advanced filtering through comparison operators: '__gt' (greater than), '__lt' (less than), '__gte' (greater than or equal to), '__lte' (less than or equal to), '__ne' (not equal), '__in' (included in [tuple, list or set]), '__not_in' (not included in [tuple, list or set]).

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
object Union[UpdateSchemaType, dict[str, Any]]

A Pydantic schema or dictionary containing the update data.

required
allow_multiple bool

If True, allows updating multiple records that match the filters. If False, raises an error if more than one record matches the filters.

False
commit bool

If True, commits the transaction immediately. Default is True.

True
**kwargs Any

Filters to identify the record(s) to update, supporting advanced comparison operators for refined querying.

{}

Returns:

Type Description
None

None

Raises:

Type Description
MultipleResultsFound

If allow_multiple is False and more than one record matches the filters.

ValueError

If extra fields not present in the model are provided in the update data.

Examples:

Update a user's email based on their ID:

update(db, {'email': 'new_email@example.com'}, id=1)

Update users' statuses to 'inactive' where age is greater than 30 and allow updates to multiple records:

update(db, {'status': 'inactive'}, allow_multiple=True, age__gt=30)

Update a user's username excluding specific user ID and prevent multiple updates:

update(db, {'username': 'new_username'}, id__ne=1, allow_multiple=False)

Source code in fastcrud/crud/fast_crud.py
async def update(
    self,
    db: AsyncSession,
    object: Union[UpdateSchemaType, dict[str, Any]],
    allow_multiple: bool = False,
    commit: bool = True,
    **kwargs: Any,
) -> None:
    """
    Updates an existing record or multiple records in the database based on specified filters. This method allows for precise targeting of records to update.
    It supports advanced filtering through comparison operators:
        '__gt' (greater than),
        '__lt' (less than),
        '__gte' (greater than or equal to),
        '__lte' (less than or equal to),
        '__ne' (not equal),
        '__in' (included in [tuple, list or set]),
        '__not_in' (not included in [tuple, list or set]).

    Args:
        db: The database session to use for the operation.
        object: A Pydantic schema or dictionary containing the update data.
        allow_multiple: If True, allows updating multiple records that match the filters. If False, raises an error if more than one record matches the filters.
        commit: If True, commits the transaction immediately. Default is True.
        **kwargs: Filters to identify the record(s) to update, supporting advanced comparison operators for refined querying.

    Returns:
        None

    Raises:
        MultipleResultsFound: If `allow_multiple` is False and more than one record matches the filters.
        ValueError: If extra fields not present in the model are provided in the update data.

    Examples:
        Update a user's email based on their ID:
        ```python
        update(db, {'email': 'new_email@example.com'}, id=1)
        ```

        Update users' statuses to 'inactive' where age is greater than 30 and allow updates to multiple records:
        ```python
        update(db, {'status': 'inactive'}, allow_multiple=True, age__gt=30)
        ```

        Update a user's username excluding specific user ID and prevent multiple updates:
        ```python
        update(db, {'username': 'new_username'}, id__ne=1, allow_multiple=False)
        ```
    """
    if not allow_multiple and (total_count := await self.count(db, **kwargs)) > 1:
        raise MultipleResultsFound(
            f"Expected exactly one record to update, found {total_count}."
        )

    if isinstance(object, dict):
        update_data = object
    else:
        update_data = object.model_dump(exclude_unset=True)

    updated_at_col = getattr(self.model, self.updated_at_column, None)
    if updated_at_col:
        update_data[self.updated_at_column] = datetime.now(timezone.utc)

    update_data_keys = set(update_data.keys())
    model_columns = {column.name for column in inspect(self.model).c}
    extra_fields = update_data_keys - model_columns
    if extra_fields:
        raise ValueError(f"Extra fields provided: {extra_fields}")

    filters = self._parse_filters(**kwargs)
    stmt = update(self.model).filter(*filters).values(update_data)

    await db.execute(stmt)
    if commit:
        await db.commit()

upsert(db, instance, schema_to_select=None, return_as_model=False) async

Update the instance or create it if it doesn't exists. Note: This method will perform two transactions to the database (get and create or update).

Parameters:

Name Type Description Default
db AsyncSession

The database session to use for the operation.

required
instance Union[UpdateSchemaType, type[BaseModel]]

A Pydantic schema representing the instance.

required
schema_to_select Optional[type[BaseModel]]

Optional Pydantic schema for selecting specific columns. Defaults to None.

None
return_as_model bool

If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.

False

Returns:

Name Type Description
BaseModel Union[BaseModel, Dict[str, Any], None]

the created or updated instance

Source code in fastcrud/crud/fast_crud.py
async def upsert(
    self,
    db: AsyncSession,
    instance: Union[UpdateSchemaType, CreateSchemaType],
    schema_to_select: Optional[type[BaseModel]] = None,
    return_as_model: bool = False,
) -> Union[BaseModel, Dict[str, Any], None]:
    """Update the instance or create it if it doesn't exists.
    Note: This method will perform two transactions to the database (get and create or update).

    Args:
        db (AsyncSession): The database session to use for the operation.
        instance (Union[UpdateSchemaType, type[BaseModel]]): A Pydantic schema representing the instance.
        schema_to_select (Optional[type[BaseModel]], optional): Optional Pydantic schema for selecting specific columns. Defaults to None.
        return_as_model (bool, optional): If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.

    Returns:
        BaseModel: the created or updated instance
    """
    _pks = self._get_pk_dict(instance)
    schema_to_select = schema_to_select or type(instance)
    db_instance = await self.get(
        db,
        schema_to_select=schema_to_select,
        return_as_model=return_as_model,
        **_pks,
    )
    if db_instance is None:
        db_instance = await self.create(db, instance)  # type: ignore
        db_instance = schema_to_select.model_validate(
            db_instance, from_attributes=True
        )
    else:
        await self.update(db, instance)  # type: ignore
        db_instance = await self.get(
            db,
            schema_to_select=schema_to_select,
            return_as_model=return_as_model,
            **_pks,
        )

    return db_instance