123 lines
4.3 KiB
Python
123 lines
4.3 KiB
Python
import json
|
|
import os
|
|
import secrets
|
|
from fastapi import FastAPI, HTTPException, Header
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError
|
|
|
|
app = FastAPI()
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
DB_FILE = "database.json"
|
|
OBJ_FILE = "objects.json"
|
|
ph = PasswordHasher()
|
|
SESSIONS = {}
|
|
|
|
class User(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
class ChangePasswordModel(BaseModel):
|
|
new_password: str
|
|
|
|
class NoteObject(BaseModel):
|
|
title: str
|
|
text: str
|
|
|
|
def read_json(filename):
|
|
if not os.path.exists(filename): return []
|
|
with open(filename, "r") as f:
|
|
try: return json.load(f)
|
|
except json.JSONDecodeError: return []
|
|
|
|
def write_json(filename, data):
|
|
with open(filename, "w") as f: json.dump(data, f, indent=4)
|
|
|
|
def get_user_from_token(token: str):
|
|
if not token or token not in SESSIONS:
|
|
raise HTTPException(status_code=401, detail="Токен невалидный")
|
|
return SESSIONS[token]
|
|
|
|
@app.post("/register")
|
|
def register(user: User):
|
|
db = read_json(DB_FILE)
|
|
if any(u["username"] == user.username for u in db):
|
|
raise HTTPException(status_code=400, detail="Такой юзер уже есть")
|
|
|
|
# Валидация длины пароля на бэке
|
|
if len(user.password) < 6:
|
|
raise HTTPException(status_code=400, detail="Пароль должен быть не менее 6 символов")
|
|
|
|
db.append({"username": user.username, "password": ph.hash(user.password)})
|
|
write_json(DB_FILE, db)
|
|
return {"message": "Регистрация успешна"}
|
|
|
|
@app.post("/login")
|
|
def login(user: User):
|
|
db = read_json(DB_FILE)
|
|
for u in db:
|
|
if u["username"] == user.username:
|
|
try:
|
|
ph.verify(u["password"], user.password)
|
|
session_token = secrets.token_hex(32)
|
|
SESSIONS[session_token] = user.username
|
|
return {"message": "Успешный вход", "token": session_token, "username": user.username}
|
|
except VerifyMismatchError:
|
|
raise HTTPException(status_code=401, detail="Неверный пароль")
|
|
raise HTTPException(status_code=401, detail="Юзер не найден")
|
|
|
|
@app.get("/profile")
|
|
def get_profile(x_token: str = Header(None)):
|
|
username = get_user_from_token(x_token)
|
|
db = read_json(DB_FILE)
|
|
for u in db:
|
|
if u["username"] == username:
|
|
return {"username": u["username"], "hash": u["password"]}
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
@app.post("/change-password")
|
|
def change_password(data: ChangePasswordModel, x_token: str = Header(None)):
|
|
username = get_user_from_token(x_token)
|
|
db = read_json(DB_FILE)
|
|
for u in db:
|
|
if u["username"] == username:
|
|
if len(data.new_password) < 6:
|
|
raise HTTPException(status_code=400, detail="Пароль должен быть не менее 6 символов")
|
|
u["password"] = ph.hash(data.new_password)
|
|
write_json(DB_FILE, db)
|
|
return {"message": "Пароль успешно изменен"}
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
@app.post("/objects")
|
|
def create_object(obj: NoteObject, x_token: str = Header(None)):
|
|
username = get_user_from_token(x_token)
|
|
objects = read_json(OBJ_FILE)
|
|
objects.append({"username": username, "title": obj.title, "text": obj.text})
|
|
write_json(OBJ_FILE, objects)
|
|
return {"message": "Объект создан"}
|
|
|
|
@app.get("/objects")
|
|
def get_objects(x_token: str = Header(None)):
|
|
username = get_user_from_token(x_token)
|
|
objects = read_json(OBJ_FILE)
|
|
return [o for o in objects if o["username"] == username]
|
|
|
|
@app.get("/admin/objects")
|
|
def admin_get_objects(x_token: str = Header(None)):
|
|
username = get_user_from_token(x_token)
|
|
if username != "root": raise HTTPException(status_code=403)
|
|
return read_json(OBJ_FILE)
|
|
|
|
@app.get("/admin/users")
|
|
def admin_get_users(x_token: str = Header(None)):
|
|
username = get_user_from_token(x_token)
|
|
if username != "root": raise HTTPException(status_code=403)
|
|
return read_json(DB_FILE) |