coproapi/main.py

118 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import secrets
from fastapi import FastAPI, HTTPException, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
DB_FILE = "database.json"
OBJ_FILE = "objects.json"
ph = PasswordHasher()
SESSIONS = {}
class User(BaseModel):
username: str
password: str
class ChangePasswordModel(BaseModel):
new_password: str
class NoteObject(BaseModel):
title: str # Добавили название
text: str
def read_json(filename):
if not os.path.exists(filename): return []
with open(filename, "r") as f:
try: return json.load(f)
except json.JSONDecodeError: return []
def write_json(filename, data):
with open(filename, "w") as f: json.dump(data, f, indent=4)
def get_user_from_token(token: str):
if not token or token not in SESSIONS:
raise HTTPException(status_code=401, detail="Токен невалидный")
return SESSIONS[token]
@app.post("/register")
def register(user: User):
db = read_json(DB_FILE)
if any(u["username"] == user.username for u in db):
raise HTTPException(status_code=400, detail="Такой юзер уже есть")
db.append({"username": user.username, "password": ph.hash(user.password)})
write_json(DB_FILE, db)
return {"message": "Регистрация успешна"}
@app.post("/login")
def login(user: User):
db = read_json(DB_FILE)
for u in db:
if u["username"] == user.username:
try:
ph.verify(u["password"], user.password)
session_token = secrets.token_hex(32)
SESSIONS[session_token] = user.username
return {"message": "Успешный вход", "token": session_token, "username": user.username}
except VerifyMismatchError:
raise HTTPException(status_code=401, detail="Неверный пароль")
raise HTTPException(status_code=401, detail="Юзер не найден")
@app.get("/profile")
def get_profile(x_token: str = Header(None)):
username = get_user_from_token(x_token)
db = read_json(DB_FILE)
for u in db:
if u["username"] == username:
return {"username": u["username"], "hash": u["password"]}
raise HTTPException(status_code=404, detail="User not found")
# Эндпоинт для смены пароля (любой юзер меняет свой, root в админке меняет тоже свой, т.к. сессия его)
@app.post("/change-password")
def change_password(data: ChangePasswordModel, x_token: str = Header(None)):
username = get_user_from_token(x_token)
db = read_json(DB_FILE)
for u in db:
if u["username"] == username:
u["password"] = ph.hash(data.new_password)
write_json(DB_FILE, db)
return {"message": "Пароль успешно изменен"}
raise HTTPException(status_code=404, detail="User not found")
@app.post("/objects")
def create_object(obj: NoteObject, x_token: str = Header(None)):
username = get_user_from_token(x_token)
objects = read_json(OBJ_FILE)
objects.append({"username": username, "title": obj.title, "text": obj.text})
write_json(OBJ_FILE, objects)
return {"message": "Объект создан"}
@app.get("/objects")
def get_objects(x_token: str = Header(None)):
username = get_user_from_token(x_token)
objects = read_json(OBJ_FILE)
return [o for o in objects if o["username"] == username]
@app.get("/admin/objects")
def admin_get_objects(x_token: str = Header(None)):
username = get_user_from_token(x_token)
if username != "root": raise HTTPException(status_code=403)
return read_json(OBJ_FILE)
@app.get("/admin/users")
def admin_get_users(x_token: str = Header(None)):
username = get_user_from_token(x_token)
if username != "root": raise HTTPException(status_code=403)
return read_json(DB_FILE)