Skip to content

Commit

Permalink
Add setpick command
Browse files Browse the repository at this point in the history
  • Loading branch information
BuildTools committed Nov 12, 2023
1 parent d29c351 commit f58e88a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 61 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
aiohttp~=3.8.4
colorama~=0.4.6
cryptography~=41.0.5
discord~=2.3.2
dpytest~=0.6.4
heckbot
Expand Down
157 changes: 96 additions & 61 deletions src/heckbot/cogs/picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,97 @@
import csv
import os
import random
from datetime import datetime, timedelta
from pathlib import Path
from urllib.parse import quote

from discord.ext import commands
from discord.ext.commands import Bot
from discord.ext.commands import Context
from dotenv import load_dotenv

from bot import HeckBot
from bot import cursor
from bot import db_conn
from bot import HeckBot
from heckbot.utils.auth import generate_mac, encrypt

load_dotenv(Path(__file__).parent.parent.parent.parent / '.env')

PLAYERS_MAX = 100
PLAYERS_MIN = 1
RESOURCE_DIR = 'resources/'
PICK_SERVER_URL = os.getenv('PICK_SERVER_URL')

owned_games = {}
game_constraints = {}
last_players = []


def load_games():
try:
with open(f'{RESOURCE_DIR}/games.csv') as f:
csv_reader = csv.reader(f)
for line in csv_reader:
if len(line) == 1:
game_constraints[line[0]] = (PLAYERS_MIN, PLAYERS_MAX)
elif len(line) == 2:
game_constraints[line[0]] = (
int(line[1]), PLAYERS_MAX,
)
else:
game_constraints[line[0]] = (
int(line[1]), int(line[2]),
)

for player_file in os.listdir(f'{RESOURCE_DIR}/players'):
player = player_file.rpartition('.')[0]
owned_games[player] = set()
with open(f'{RESOURCE_DIR}/players/' + player_file) as f:
owned_games[player] = {line.strip().lower() for line in f.readlines()}
print('Loaded previous data from disk')
except Exception as ex:
print('Nothing to load from disk')
...


def random_game(players: list[str]):
need_info_players = [
player for player in players if player not in owned_games
]
players = [player for player in players if player in owned_games]
r = ''
options = owned_games[players[0]]
for player in players[1:]:
options = options.intersection(owned_games[player])
options = {
item for item in options
if game_constraints[item][0] <= len(players) <= game_constraints[item][1]
}
if len(options) == 0:
r += "No games available. Y'all are too picky."
else:
game_choice = random.choice(list(options))
picky_person = min(players, key=lambda p: len(owned_games[p]))
r = f'You can play {game_choice.title()}.'
global last_players
if players != last_players:
r += f'\nBtw, the pickiest person here is: {picky_person}'
last_players = players
if len(need_info_players) > 0:
r += (
f'\n(p.s. I don\'t know what games these people have: '
f'{", ".join(need_info_players)})\n'
)
return r


def get_pick_link(user_name: str) -> str:
TTL = 60 * 5 # 5 minutes
expiry = (
datetime.utcnow() + timedelta(seconds=TTL)
).isoformat()
token, iv = encrypt(user_name, expiry)
return PICK_SERVER_URL + f'?token={quote(token)}&iv={quote(iv)}'


class Picker(commands.Cog):
Expand All @@ -33,10 +112,7 @@ def __init__(
self._bot = bot
self._db_conn = db_conn
self._cursor = cursor
self._owned_games = {}
self._game_constraints = {}
self._last_players = []
self.load_games()
load_games()

@commands.command()
async def pick(
Expand All @@ -54,62 +130,21 @@ async def pick(
users_in_channel = [
self._bot.get_user(int(uid)).name for uid in voice_states
]
await ctx.send(self.random_game(users_in_channel))

def load_games(self):
try:
with open(f'{RESOURCE_DIR}/games.csv') as f:
csv_reader = csv.reader(f)
for line in csv_reader:
if len(line) == 1:
self._game_constraints[line[0]] = (PLAYERS_MIN, PLAYERS_MAX)
elif len(line) == 2:
self._game_constraints[line[0]] = (
int(line[1]), PLAYERS_MAX,
)
else:
self._game_constraints[line[0]] = (
int(line[1]), int(line[2]),
)

for player_file in os.listdir(f'{RESOURCE_DIR}/players'):
player = player_file.rpartition('.')[0]
self._owned_games[player] = set()
with open(f'{RESOURCE_DIR}/players/' + player_file) as f:
self._owned_games[player] = {line.strip().lower() for line in f.readlines()}
print('Loaded previous data from disk')
except Exception as ex:
print('Nothing to load from disk')
...

def random_game(self, players: list[str]):
need_info_players = [
player for player in players if player not in self._owned_games
]
players = [player for player in players if player in self._owned_games]
r = ''
options = self._owned_games[players[0]]
for player in players[1:]:
options = options.intersection(self._owned_games[player])
options = {
item for item in options
if self._game_constraints[item][0] <= len(players) <= self._game_constraints[item][1]
}
if len(options) == 0:
r += "No games available. Y'all are too picky."
else:
game_choice = random.choice(list(options))
picky_person = min(players, key=lambda p: len(self._owned_games[p]))
r = f'You can play {game_choice.title()}.'
if players != self._last_players:
r += f'\nBtw, the pickiest person here is: {picky_person}'
self._last_players = players
if len(need_info_players) > 0:
r += (
f'\n(p.s. I don\'t know what games these people have: '
f'{", ".join(need_info_players)})\n'
)
return r
await ctx.send(random_game(users_in_channel))

@commands.command(aliases=['pickadmin'])
@commands.has_permissions(administrator=True)
@commands.bot_has_permissions(administrator=True)
async def edit_activities(self, ctx: Context):
...

@commands.command(aliases=['setpicks'])
async def set_picks(self, ctx: Context):
await ctx.author.send(
"Here's your custom link to edit your picks.\n"
"Don't share this with anyone!\n" +
get_pick_link(self._bot.get_user(ctx.author.id).name)
)


async def setup(
Expand Down
39 changes: 39 additions & 0 deletions src/heckbot/utils/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
from datetime import datetime, timedelta
from typing import Optional

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

SECRET_KEY = os.getenv('HECKBOT_SECRET_KEY')


def encrypt(username: str, expiry: str) -> tuple[bytes, bytes]:
message = f"{username}:{expiry}".encode()
iv = os.urandom(16)
cipher = Cipher(
algorithms.AES(
SECRET_KEY.encode()
), modes.CFB(iv), backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(message) + encryptor.finalize()
return ciphertext, iv


def decrypt(ciphertext: str, iv: str) -> Optional[str]:
cipher = Cipher(
algorithms.AES(
SECRET_KEY.encode()
), modes.CFB(iv.encode()), backend=default_backend()
)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(
ciphertext.encode()
) + decryptor.finalize()
decoded_data = decrypted_data.decode()
username, timestamp = decoded_data.split(':')
timestamp = datetime.fromisoformat(timestamp)
if datetime.utcnow() < timestamp + timedelta(seconds=300):
return username
return None

0 comments on commit f58e88a

Please sign in to comment.