diff --git a/requirements.txt b/requirements.txt index 59ae2ab..e12c4cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ aiohttp~=3.8.4 colorama~=0.4.6 +cryptography~=41.0.5 discord~=2.3.2 dpytest~=0.6.4 heckbot diff --git a/src/heckbot/cogs/picker.py b/src/heckbot/cogs/picker.py index d2d9575..49d792b 100644 --- a/src/heckbot/cogs/picker.py +++ b/src/heckbot/cogs/picker.py @@ -3,18 +3,97 @@ import csv import os import random +from datetime import datetime, timedelta +from pathlib import Path +from urllib.parse import quote from discord.ext import commands from discord.ext.commands import Bot from discord.ext.commands import Context +from dotenv import load_dotenv +from bot import HeckBot from bot import cursor from bot import db_conn -from bot import HeckBot +from heckbot.utils.auth import generate_mac, encrypt + +load_dotenv(Path(__file__).parent.parent.parent.parent / '.env') PLAYERS_MAX = 100 PLAYERS_MIN = 1 RESOURCE_DIR = 'resources/' +PICK_SERVER_URL = os.getenv('PICK_SERVER_URL') + +owned_games = {} +game_constraints = {} +last_players = [] + + +def load_games(): + try: + with open(f'{RESOURCE_DIR}/games.csv') as f: + csv_reader = csv.reader(f) + for line in csv_reader: + if len(line) == 1: + game_constraints[line[0]] = (PLAYERS_MIN, PLAYERS_MAX) + elif len(line) == 2: + game_constraints[line[0]] = ( + int(line[1]), PLAYERS_MAX, + ) + else: + game_constraints[line[0]] = ( + int(line[1]), int(line[2]), + ) + + for player_file in os.listdir(f'{RESOURCE_DIR}/players'): + player = player_file.rpartition('.')[0] + owned_games[player] = set() + with open(f'{RESOURCE_DIR}/players/' + player_file) as f: + owned_games[player] = {line.strip().lower() for line in f.readlines()} + print('Loaded previous data from disk') + except Exception as ex: + print('Nothing to load from disk') + ... + + +def random_game(players: list[str]): + need_info_players = [ + player for player in players if player not in owned_games + ] + players = [player for player in players if player in owned_games] + r = '' + options = owned_games[players[0]] + for player in players[1:]: + options = options.intersection(owned_games[player]) + options = { + item for item in options + if game_constraints[item][0] <= len(players) <= game_constraints[item][1] + } + if len(options) == 0: + r += "No games available. Y'all are too picky." + else: + game_choice = random.choice(list(options)) + picky_person = min(players, key=lambda p: len(owned_games[p])) + r = f'You can play {game_choice.title()}.' + global last_players + if players != last_players: + r += f'\nBtw, the pickiest person here is: {picky_person}' + last_players = players + if len(need_info_players) > 0: + r += ( + f'\n(p.s. I don\'t know what games these people have: ' + f'{", ".join(need_info_players)})\n' + ) + return r + + +def get_pick_link(user_name: str) -> str: + TTL = 60 * 5 # 5 minutes + expiry = ( + datetime.utcnow() + timedelta(seconds=TTL) + ).isoformat() + token, iv = encrypt(user_name, expiry) + return PICK_SERVER_URL + f'?token={quote(token)}&iv={quote(iv)}' class Picker(commands.Cog): @@ -33,10 +112,7 @@ def __init__( self._bot = bot self._db_conn = db_conn self._cursor = cursor - self._owned_games = {} - self._game_constraints = {} - self._last_players = [] - self.load_games() + load_games() @commands.command() async def pick( @@ -54,62 +130,21 @@ async def pick( users_in_channel = [ self._bot.get_user(int(uid)).name for uid in voice_states ] - await ctx.send(self.random_game(users_in_channel)) - - def load_games(self): - try: - with open(f'{RESOURCE_DIR}/games.csv') as f: - csv_reader = csv.reader(f) - for line in csv_reader: - if len(line) == 1: - self._game_constraints[line[0]] = (PLAYERS_MIN, PLAYERS_MAX) - elif len(line) == 2: - self._game_constraints[line[0]] = ( - int(line[1]), PLAYERS_MAX, - ) - else: - self._game_constraints[line[0]] = ( - int(line[1]), int(line[2]), - ) - - for player_file in os.listdir(f'{RESOURCE_DIR}/players'): - player = player_file.rpartition('.')[0] - self._owned_games[player] = set() - with open(f'{RESOURCE_DIR}/players/' + player_file) as f: - self._owned_games[player] = {line.strip().lower() for line in f.readlines()} - print('Loaded previous data from disk') - except Exception as ex: - print('Nothing to load from disk') - ... - - def random_game(self, players: list[str]): - need_info_players = [ - player for player in players if player not in self._owned_games - ] - players = [player for player in players if player in self._owned_games] - r = '' - options = self._owned_games[players[0]] - for player in players[1:]: - options = options.intersection(self._owned_games[player]) - options = { - item for item in options - if self._game_constraints[item][0] <= len(players) <= self._game_constraints[item][1] - } - if len(options) == 0: - r += "No games available. Y'all are too picky." - else: - game_choice = random.choice(list(options)) - picky_person = min(players, key=lambda p: len(self._owned_games[p])) - r = f'You can play {game_choice.title()}.' - if players != self._last_players: - r += f'\nBtw, the pickiest person here is: {picky_person}' - self._last_players = players - if len(need_info_players) > 0: - r += ( - f'\n(p.s. I don\'t know what games these people have: ' - f'{", ".join(need_info_players)})\n' - ) - return r + await ctx.send(random_game(users_in_channel)) + + @commands.command(aliases=['pickadmin']) + @commands.has_permissions(administrator=True) + @commands.bot_has_permissions(administrator=True) + async def edit_activities(self, ctx: Context): + ... + + @commands.command(aliases=['setpicks']) + async def set_picks(self, ctx: Context): + await ctx.author.send( + "Here's your custom link to edit your picks.\n" + "Don't share this with anyone!\n" + + get_pick_link(self._bot.get_user(ctx.author.id).name) + ) async def setup( diff --git a/src/heckbot/utils/auth.py b/src/heckbot/utils/auth.py new file mode 100644 index 0000000..874a1e5 --- /dev/null +++ b/src/heckbot/utils/auth.py @@ -0,0 +1,39 @@ +import os +from datetime import datetime, timedelta +from typing import Optional + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +SECRET_KEY = os.getenv('HECKBOT_SECRET_KEY') + + +def encrypt(username: str, expiry: str) -> tuple[bytes, bytes]: + message = f"{username}:{expiry}".encode() + iv = os.urandom(16) + cipher = Cipher( + algorithms.AES( + SECRET_KEY.encode() + ), modes.CFB(iv), backend=default_backend() + ) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(message) + encryptor.finalize() + return ciphertext, iv + + +def decrypt(ciphertext: str, iv: str) -> Optional[str]: + cipher = Cipher( + algorithms.AES( + SECRET_KEY.encode() + ), modes.CFB(iv.encode()), backend=default_backend() + ) + decryptor = cipher.decryptor() + decrypted_data = decryptor.update( + ciphertext.encode() + ) + decryptor.finalize() + decoded_data = decrypted_data.decode() + username, timestamp = decoded_data.split(':') + timestamp = datetime.fromisoformat(timestamp) + if datetime.utcnow() < timestamp + timedelta(seconds=300): + return username + return None