Module src.utils.repositories.userRepo
Classes
class UserRepo (dbAnandaTrackerWrapper)-
Repository class for managing user-related interactions with the database.
This class wraps direct interaction with the user part of the database, including validating user credentials, retrieving user data by ID or name, and creating new users.
Attributes
dbWrapper- The database wrapper that provides database connection and cursor.
Initializes the UserRepo with a database wrapper.
Args
dbAnandaTrackerWrapper- The database wrapper object used to interact with the database.
Expand source code
class UserRepo: """ Repository class for managing user-related interactions with the database. This class wraps direct interaction with the user part of the database, including validating user credentials, retrieving user data by ID or name, and creating new users. Attributes: dbWrapper: The database wrapper that provides database connection and cursor. """ def __init__(self, dbAnandaTrackerWrapper): """ Initializes the UserRepo with a database wrapper. Args: dbAnandaTrackerWrapper: The database wrapper object used to interact with the database. """ self.dbWrapper = dbAnandaTrackerWrapper def isUserPasswordCorrect(self, credentialsItem) -> bool or str or None: """ Validates the user's password. Args: credentialsItem: The credentialsItem object containing the user's token, username, and hashed password. Returns: bool or str or None: - True if the password is correct, - "invalid password" if the password is incorrect, - False if the token is invalid, - None if the user does not exist. """ if not self.dbWrapper.isTokenValid(credentialsItem.token): print("invalid token") return False user = self.getUserByName(credentialsItem.userName) if user is None: return None if user["hashedPassword"] == credentialsItem.hashedPassword: return True return "invalid password" def getUserByID(self, userID: int, alreadyAttemptedToUpdateOwnClassVars: bool = False) -> dict or None: """ Retrieves a user by their ID from the database. Args: userID (int): The ID of the user. alreadyAttemptedToUpdateOwnClassVars (bool): Flag to prevent multiple updates in case of error. Returns: dict or None: A dictionary containing the user details if found, otherwise None. """ try: query = """ SELECT ID, AES_DECRYPT(name_encr, %s) as name, hashedPassword FROM users WHERE ID=%s """ val = (str(self.dbWrapper.encryptionKey), userID) self.dbWrapper.dbCursor.execute(query, val) myresult = self.dbWrapper.dbCursor.fetchone() if myresult: return { 'ID': myresult[0], 'name': '' if myresult[1] is None else myresult[1].decode(), 'hashedPassword': myresult[2], } return None except Exception as e: if alreadyAttemptedToUpdateOwnClassVars: return None self.dbWrapper.updateOwnClassVars() return self.dbWrapper.getUserRepo().getUserByID(userID, True) def getUserByName(self, userName: str, alreadyAttemptedToUpdateOwnClassVars: bool = False) -> dict or None: """ Retrieves a user by their name from the database. Args: userName (str): The name of the user. alreadyAttemptedToUpdateOwnClassVars (bool): Flag to prevent multiple updates in case of error. Returns: dict or None: A dictionary containing the user details if found, otherwise None. """ try: query = """ SELECT ID FROM users WHERE AES_DECRYPT(name_encr, %s) = %s """ val = (str(self.dbWrapper.encryptionKey), userName) self.dbWrapper.dbCursor.execute(query, val) myresult = self.dbWrapper.dbCursor.fetchone() if myresult: return self.getUserByID(myresult[0]) return None except Exception as e: if alreadyAttemptedToUpdateOwnClassVars: return None self.dbWrapper.updateOwnClassVars() return self.dbWrapper.getUserRepo().getUserByName(userName, True) def getUserByCredentialsItem(self, credentialsItem) -> dict or None: """ Retrieves a user by their credentials. Args: credentialsItem: The credentialsItem object containing the user's token, username, and hashed password. Returns: dict or None: A dictionary containing the user details if found, otherwise None. """ return self.getUserByName(credentialsItem.userName) def getUserIDByCredentialsItem(self, credentialsItem) -> int or None: """ Retrieves the user ID by their credentials. Args: credentialsItem: The credentialsItem object containing the user's token, username, and hashed password. Returns: int or None: The user ID if found, otherwise None. """ user = self.getUserByName(credentialsItem.userName) return user["ID"] if user else None def getAllUserIDs(self, alreadyAttemptedToUpdateOwnClassVars: bool = False) -> list or None: """ Retrieves all user IDs from the database. Args: alreadyAttemptedToUpdateOwnClassVars (bool): Flag to prevent multiple updates in case of error. Returns: list or None: A list of user IDs if found, otherwise None. """ try: query = "SELECT ID FROM users" self.dbWrapper.dbCursor.execute(query) myresults = self.dbWrapper.dbCursor.fetchall() return [result[0] for result in myresults] if myresults else [] except Exception as e: if alreadyAttemptedToUpdateOwnClassVars: return None self.dbWrapper.updateOwnClassVars() return self.dbWrapper.getUserRepo().getAllUserIDs(True) def createNewUser(self, name: str, hashedPassword: str, alreadyAttemptedToUpdateOwnClassVars: bool = False) -> dict or None: """ Creates a new user in the database. Args: name (str): The name of the user. hashedPassword (str): The hashed password of the user. alreadyAttemptedToUpdateOwnClassVars (bool): Flag to prevent multiple updates in case of error. Returns: dict or None: A dictionary containing the newly created user details, or None if the user already exists. """ try: user = self.getUserByName(name) if user is None: query = """ INSERT INTO users (name_encr, hashedPassword) VALUES (AES_ENCRYPT(%s, %s), %s) """ val = (name, str(self.dbWrapper.encryptionKey), hashedPassword) self.dbWrapper.dbCursor.execute(query, val) self.dbWrapper.dbConnection.commit() return self.getUserByID(self.dbWrapper.dbCursor.lastrowid) return None except Exception as e: if alreadyAttemptedToUpdateOwnClassVars: return None self.dbWrapper.updateOwnClassVars() return self.dbWrapper.getUserRepo().createNewUser(name, hashedPassword, True) def createNewUser_fromCredentialsItem(self, credentialsItem) -> dict or None: """ Creates a new user in the database from a credentialsItem object. Args: credentialsItem: The credentialsItem object containing the user's token, username, and hashed password. Returns: dict or None: A dictionary containing the newly created user details, or None if the user already exists. """ return self.createNewUser(credentialsItem.userName, credentialsItem.hashedPassword)Methods
def createNewUser(self, name: str, hashedPassword: str, alreadyAttemptedToUpdateOwnClassVars: bool = False) ‑> dict-
Creates a new user in the database.
Args
name:str- The name of the user.
hashedPassword:str- The hashed password of the user.
alreadyAttemptedToUpdateOwnClassVars:bool- Flag to prevent multiple updates in case of error.
Returns
dictorNone- A dictionary containing the newly created user details, or None if the user already exists.
def createNewUser_fromCredentialsItem(self, credentialsItem) ‑> dict-
Creates a new user in the database from a credentialsItem object.
Args
credentialsItem- The credentialsItem object containing the user's token, username, and hashed password.
Returns
dictorNone- A dictionary containing the newly created user details, or None if the user already exists.
def getAllUserIDs(self, alreadyAttemptedToUpdateOwnClassVars: bool = False) ‑> list-
Retrieves all user IDs from the database.
Args
alreadyAttemptedToUpdateOwnClassVars:bool- Flag to prevent multiple updates in case of error.
Returns
listorNone- A list of user IDs if found, otherwise None.
def getUserByCredentialsItem(self, credentialsItem) ‑> dict-
Retrieves a user by their credentials.
Args
credentialsItem- The credentialsItem object containing the user's token, username, and hashed password.
Returns
dictorNone- A dictionary containing the user details if found, otherwise None.
def getUserByID(self, userID: int, alreadyAttemptedToUpdateOwnClassVars: bool = False) ‑> dict-
Retrieves a user by their ID from the database.
Args
userID:int- The ID of the user.
alreadyAttemptedToUpdateOwnClassVars:bool- Flag to prevent multiple updates in case of error.
Returns
dictorNone- A dictionary containing the user details if found, otherwise None.
def getUserByName(self, userName: str, alreadyAttemptedToUpdateOwnClassVars: bool = False) ‑> dict-
Retrieves a user by their name from the database.
Args
userName:str- The name of the user.
alreadyAttemptedToUpdateOwnClassVars:bool- Flag to prevent multiple updates in case of error.
Returns
dictorNone- A dictionary containing the user details if found, otherwise None.
def getUserIDByCredentialsItem(self, credentialsItem) ‑> int-
Retrieves the user ID by their credentials.
Args
credentialsItem- The credentialsItem object containing the user's token, username, and hashed password.
Returns
intorNone- The user ID if found, otherwise None.
def isUserPasswordCorrect(self, credentialsItem) ‑> bool-
Validates the user's password.
Args
credentialsItem- The credentialsItem object containing the user's token, username, and hashed password.
Returns
boolorstrorNone-
- True if the password is correct,
- "invalid password" if the password is incorrect,
- False if the token is invalid,
- None if the user does not exist.