File size: 5,127 Bytes
ab250f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from app.backend.models.chats import Chat
from app.settings import settings
from app.backend.models.users import (
    get_user_last_chat,
    find_user_by_id,
    add_new_user,
    User,
)

from fastapi import Response, Request, HTTPException
from datetime import datetime, timedelta, timezone

from uuid import uuid4
import jwt


def extract_user_from_context(request: Request) -> User | None:
    if hasattr(request.state, "current_user"):
        return request.state.current_user
    print("*" * 40, "No attribute 'current_user`", "*" * 40, "\n")
    return None


def create_access_token(user_id: str, expires_delta: timedelta = settings.max_cookie_lifetime) -> str:
    token_payload = {"user_id": user_id}
    token_payload.update({"exp": datetime.now() + expires_delta})

    try:
        encoded_jwt: str = jwt.encode(
            token_payload, settings.secret_pepper, algorithm=settings.jwt_algorithm
        )
    except Exception:
        raise HTTPException(status_code=500, detail="json encoding error")

    print("^" * 40, "New JWT token was created", "^" * 40)
    print(encoded_jwt)
    print("^" * 105, "\n\n")

    return encoded_jwt


def create_user() -> User | None:
    new_user_id = str(uuid4())
    try:
        user = add_new_user(id=new_user_id)
    except Exception as e:
        raise HTTPException(status_code=418, detail=e)

    print("$" * 40, "New User was created", "$" * 40)
    print("Created user - {user.id}")
    print("$" * 100, "\n\n")

    return user


def authorize_user(response: Response, user: User) -> dict:
    print("%" * 40, "START Authorizing User", "%" * 40)
    try:
        access_token: str = create_access_token(user_id=user.id)
        expires = datetime.now(timezone.utc) + settings.max_cookie_lifetime

        response.set_cookie(
            key="access_token",
            value=access_token,
            path="/",
            expires=expires.strftime("%a, %d %b %Y %H:%M:%S GMT"),
            max_age=settings.max_cookie_lifetime,
            httponly=True,
            secure=True,
            samesite='None'
        )

        return {"status": "ok"}
    finally:
        print("%" * 40, "END Authorizing User", "%" * 40)


def get_current_user(request: Request) -> User | None:
    print("-" * 40, "START Getting User", "-" * 40)
    try:
        user = None
        token: str | None = request.cookies.get("access_token")

        print(f"Token -----> {token if token else 'Empty token!'}\n")

        if not token:
            return None

        try:
            user_id = jwt.decode(
                jwt=bytes(token, encoding="utf-8"),
                key=settings.secret_pepper,
                algorithms=[settings.jwt_algorithm],
            ).get("user_id")

            print(f"User id -----> {user_id if user_id else 'Empty user id!'}\n")

            user = find_user_by_id(id=user_id)

            print(f"Found user -----> {user.id if user else 'No user was found!'}")
        except Exception as e:
            raise e

        if not user:
            return None

        return user
    except HTTPException as exception:
        raise exception
    finally:
        print("-" * 40, "END Getting User", "-" * 40, "\n\n")


def check_cookie(request: Request) -> dict:
    result = {"token": "No token is present"}
    token = request.cookies.get("access_token")
    if token:
        result["token"] = token
    return result


def clear_cookie(response: Response) -> dict:
    response.set_cookie(key="access_token", value="", httponly=True)
    return {"status": "ok"}


def get_latest_chat(user: User) -> Chat | None:
    return get_user_last_chat(user)


def refresh_cookie(request: Request, response: Response) -> None:
    print("+" * 40, "START Refreshing cookie", "+" * 40)
    try:
        token: str | None = request.cookies.get("access_token")

        print(f"Token -----> {token if token else 'Empty token!'}\n")

        if token is None:
            return

        try:
            jwt_token = jwt.decode(
                        jwt=bytes(token, encoding="utf-8"),
                        key=settings.secret_pepper,
                        algorithms=[settings.jwt_algorithm],
                    )
            exp_datetime = datetime.fromtimestamp(jwt_token.get("exp"), tz=timezone.utc)
            print(f"Expires -----> {exp_datetime if exp_datetime else 'No expiration date!'}\n")
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="jwt signature has expired")
        except jwt.PyJWTError as e:
            raise HTTPException(status_code=500, detail=e)

        diff = exp_datetime - datetime.now(timezone.utc)
        print(f"Difference -----> {diff if diff else 'No difference in date!'}\n")

        if diff.total_seconds() < 0.2 * settings.max_cookie_lifetime.total_seconds():
            print("<----- Refreshing ----->")
            user = extract_user_from_context(request)
            authorize_user(response, user)
    except HTTPException as exception:
        raise exception
    finally:
        print("+" * 40, "END Refreshing cookie", "+" * 40, "\n\n")