import os from dotenv import load_dotenv from sqlalchemy_utils import database_exists, create_database, drop_database from sqlalchemy import ( create_engine, select, Table, Column, MetaData, Integer, Identity, String, ) from typing import Union from fastapi import FastAPI, HTTPException load_dotenv() DATABASE_URL = os.getenv("DATABASE_URL") engine = create_engine( DATABASE_URL, # isolation_level="AUTOCOMMIT", ) if not database_exists(engine.url): create_database(engine.url) # else: # drop_database(engine.url) # create_database(engine.url) if database_exists(engine.url): print("DB OK") metadata = MetaData() test = Table( "test", metadata, Column("id", Integer, Identity(start=1, cycle=True), primary_key=True), Column("testString", String), ) metadata.create_all(engine) # with engine.connect() as conn: # conn.execution_options(isolation_level="AUTOCOMMIT") # trans = conn.begin() # insert = test.insert().values(testString="ca") # conn.execute(insert) # rows = conn.execute(select(test)) # trans.commit() # for row in rows: # print(row) app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"} @app.get("/test/{test_id}") async def read_item(test_id: int): with engine.connect() as conn: row = conn.execute(select(test.c.testString).where(test.c.id == test_id)) result = row.scalar_one_or_none() if result is None: raise HTTPException(status_code=404, detail="Item not found") return {"test_string": result} @app.put("/test/add") async def add_item(test_string: str): if test_string != "": with engine.connect() as conn: trans = conn.begin() insert = test.insert().values(testString=test_string) conn.execute(insert) trans.commit() rows = conn.execute(select(test)) for row in rows: print(row) else: raise HTTPException(status_code=404, detail="Item not found") return None with engine.connect() as conn: rows = conn.execute(select(test)) for row in rows: print(row) # @app.put("/items/{item_id}") # async def update_item(item_id: int, item: Item): # return {"item_name": item.name, "item_id": item_id}