coproapi/main.py

123 lines
4.3 KiB
Python

import json
import os
import secrets
from fastapi import FastAPI, HTTPException, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
DB_FILE = "database.json"
OBJ_FILE = "objects.json"
ph = PasswordHasher()
SESSIONS = {}
class User(BaseModel):
username: str
password: str
class ChangePasswordModel(BaseModel):
new_password: str
class NoteObject(BaseModel):
title: str
text: str
def read_json(filename):
if not os.path.exists(filename): return []
with open(filename, "r") as f:
try: return json.load(f)
except json.JSONDecodeError: return []
def write_json(filename, data):
with open(filename, "w") as f: json.dump(data, f, indent=4)
def get_user_from_token(token: str):
if not token or token not in SESSIONS:
raise HTTPException(status_code=401, detail="Токен невалидный")
return SESSIONS[token]
@app.post("/register")
def register(user: User):
db = read_json(DB_FILE)
if any(u["username"] == user.username for u in db):
raise HTTPException(status_code=400, detail="Такой юзер уже есть")
# Валидация длины пароля на бэке
if len(user.password) < 6:
raise HTTPException(status_code=400, detail="Пароль должен быть не менее 6 символов")
db.append({"username": user.username, "password": ph.hash(user.password)})
write_json(DB_FILE, db)
return {"message": "Регистрация успешна"}
@app.post("/login")
def login(user: User):
db = read_json(DB_FILE)
for u in db:
if u["username"] == user.username:
try:
ph.verify(u["password"], user.password)
session_token = secrets.token_hex(32)
SESSIONS[session_token] = user.username
return {"message": "Успешный вход", "token": session_token, "username": user.username}
except VerifyMismatchError:
raise HTTPException(status_code=401, detail="Неверный пароль")
raise HTTPException(status_code=401, detail="Юзер не найден")
@app.get("/profile")
def get_profile(x_token: str = Header(None)):
username = get_user_from_token(x_token)
db = read_json(DB_FILE)
for u in db:
if u["username"] == username:
return {"username": u["username"], "hash": u["password"]}
raise HTTPException(status_code=404, detail="User not found")
@app.post("/change-password")
def change_password(data: ChangePasswordModel, x_token: str = Header(None)):
username = get_user_from_token(x_token)
db = read_json(DB_FILE)
for u in db:
if u["username"] == username:
if len(data.new_password) < 6:
raise HTTPException(status_code=400, detail="Пароль должен быть не менее 6 символов")
u["password"] = ph.hash(data.new_password)
write_json(DB_FILE, db)
return {"message": "Пароль успешно изменен"}
raise HTTPException(status_code=404, detail="User not found")
@app.post("/objects")
def create_object(obj: NoteObject, x_token: str = Header(None)):
username = get_user_from_token(x_token)
objects = read_json(OBJ_FILE)
objects.append({"username": username, "title": obj.title, "text": obj.text})
write_json(OBJ_FILE, objects)
return {"message": "Объект создан"}
@app.get("/objects")
def get_objects(x_token: str = Header(None)):
username = get_user_from_token(x_token)
objects = read_json(OBJ_FILE)
return [o for o in objects if o["username"] == username]
@app.get("/admin/objects")
def admin_get_objects(x_token: str = Header(None)):
username = get_user_from_token(x_token)
if username != "root": raise HTTPException(status_code=403)
return read_json(OBJ_FILE)
@app.get("/admin/users")
def admin_get_users(x_token: str = Header(None)):
username = get_user_from_token(x_token)
if username != "root": raise HTTPException(status_code=403)
return read_json(DB_FILE)