Skip to content

Commit

Permalink
[Enhancement] Allow pulling config details from env vars (#21)
Browse files Browse the repository at this point in the history
* update(config): introduce environment variables and validation (#20)

* add env vars for api keys

* allow mixing of config file and env var

* change priority of env vars and config

* remove accidental comment

* correct output when loading configuration

* simplify config loading output

* refactor config class

* change config default value to ''

* fix typo

* default bool option to false instead of none as it will get removed from dict anyway

* handle case where INJECT_TORRENTS is unset

* add config value validation at startup

* reformat test

* address review and finish validation

* remove covid async hallucination

* redistribute non-main functions

---------

Co-authored-by: milkers69 <[email protected]>

* Ran ruff

* Re-added dot to errors module import

* Started refactoring config class; added new tests

* Refactored main

* Fixed existing tests

* Reverted some unrelated changes

---------

Co-authored-by: zakary <[email protected]>
Co-authored-by: milkers69 <[email protected]>
  • Loading branch information
3 people authored Sep 9, 2024
1 parent b05a5e3 commit c9d8b6f
Show file tree
Hide file tree
Showing 17 changed files with 416 additions and 120 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,7 @@ tmp/
!tests/support/files/*.torrent
!tests/support/files/*.fastresume
tests/support/files/example.torrent

# Jetbrains
.idea
*.iml
26 changes: 10 additions & 16 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
import os
import sys
import traceback

from colorama import Fore

from src.api import RedAPI, OpsAPI
from src.args import parse_args
from src.config import Config
from src.injection import Injection
from src.scanner import scan_torrent_directory, scan_torrent_file
from src.config_validator import ConfigValidator
from src.webserver import run_webserver
from src.injection import Injection


def cli_entrypoint(args):
try:
# using input_file means this is probably running as a script and extra printing wouldn't be appreciated
should_print = args.input_directory or args.server
config = command_log_wrapper("Reading config file:", should_print, lambda: Config().load(args.config_file))
config_dict = Config.build_config_dict(args.config_file, os.environ)
validator = ConfigValidator(config_dict)
config = command_log_wrapper("Reading configuration:", should_print, lambda: Config(validator.validate()))

if config.inject_torrents:
injector = command_log_wrapper("Connecting to torrent client:", should_print, lambda: Injection(config).setup())
else:
injector = None

red_api, ops_api = command_log_wrapper("Verifying API keys:", should_print, lambda: __verify_api_keys(config))
red_api, ops_api = command_log_wrapper(
"Verifying API keys:", should_print, lambda: validator.verify_api_keys(config)
)

if args.server:
run_webserver(args.input_directory, args.output_directory, red_api, ops_api, injector, port=config.server_port)
Expand All @@ -37,18 +43,6 @@ def cli_entrypoint(args):
exit(1)


def __verify_api_keys(config):
red_api = RedAPI(config.red_key)
ops_api = OpsAPI(config.ops_key)

# This will perform a lookup with the API and raise if there was a failure.
# Also caches the announce URL for future use which is a nice bonus
red_api.announce_url
ops_api.announce_url

return red_api, ops_api


def command_log_wrapper(label, should_print, func):
def maybe_print(str, *args, **kwargs):
if should_print:
Expand Down
6 changes: 3 additions & 3 deletions src/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
from math import exp
from time import time, sleep
import json

import requests

from .errors import handle_error, AuthenticationError
from .errors import AuthenticationError, handle_error


class GazelleAPI:
Expand Down Expand Up @@ -87,7 +87,7 @@ def __get_announce_url(self):
try:
account_info = self.get_account_info()
except AuthenticationError as e:
handle_error(description=f"Authentication to {self.sitename} failed", exception_details=e, should_raise=True)
handle_error(description=f"Authentication to {self.sitename} failed", exception_details=str(e), should_raise=True)

passkey = account_info["response"]["passkey"]
return f"{self.tracker_url}/{passkey}/announce"
Expand Down
2 changes: 1 addition & 1 deletion src/args.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
import argparse
import sys


def parse_args(args=None):
Expand Down
4 changes: 2 additions & 2 deletions src/config.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"red_key": "xxxxxxxxxx",
"ops_key": "xxxxxxxxxx"
"red_key": "",
"ops_key": ""
}
70 changes: 37 additions & 33 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,63 @@
import json
import os

from .errors import ConfigKeyError
from urllib.parse import ParseResult


class Config:
"""
Class for loading and accessing the config file.
Class for working with configuration options
"""

def __init__(self):
self._json = {}

def load(self, config_filepath: str):
if not os.path.exists(config_filepath):
raise FileNotFoundError(f"{config_filepath} does not exist.")

with open(config_filepath, "r", encoding="utf-8") as f:
self._json = json.loads(f.read())

return self
@classmethod
def build_config_dict(cls, config_filepath: str, env_vars: dict):
file_config = {}
if os.path.exists(config_filepath):
with open(config_filepath, "r", encoding="utf-8") as f:
file_config = {key: str(value) for key, value in json.loads(f.read()).items() if value}

formatted_env_vars = {
key: value
for key, value in {
"red_key": env_vars.get("RED_KEY"),
"ops_key": env_vars.get("OPS_KEY"),
"port": env_vars.get("PORT"),
"inject_torrents": True if env_vars.get("INJECT_TORRENTS", "").lower().strip() == "true" else False,
"deluge_rpc_url": env_vars.get("DELUGE_RPC_URL"),
"qbittorrent_url": env_vars.get("QBITTORRENT_URL"),
"injection_link_directory": env_vars.get("INJECTION_LINK_DIRECTORY"),
}.items()
if value
}

return {**formatted_env_vars, **file_config}

def __init__(self, config: dict):
self._config = config

@property
def red_key(self) -> str:
return self.__get_key("red_key")
return self._config["red_key"]

@property
def ops_key(self) -> str:
return self.__get_key("ops_key")
return self._config["ops_key"]

@property
def server_port(self) -> str:
return self.__get_key("port", must_exist=False) or "9713"
return self._config.get("port", "9713")

@property
def deluge_rpc_url(self) -> str | None:
return self.__get_key("deluge_rpc_url", must_exist=False) or None
def deluge_rpc_url(self) -> ParseResult | None:
return self._config.get("deluge_rpc_url")

@property
def qbittorrent_url(self) -> str | None:
return self.__get_key("qbittorrent_url", must_exist=False) or None
def qbittorrent_url(self) -> ParseResult | None:
return self._config.get("qbittorrent_url")

@property
def inject_torrents(self) -> str | bool:
return self.__get_key("inject_torrents", must_exist=False) or False
def inject_torrents(self) -> bool:
return self._config.get("inject_torrents", False)

@property
def injection_link_directory(self) -> str | None:
return self.__get_key("injection_link_directory", must_exist=False) or None

def __get_key(self, key, must_exist=True):
try:
return self._json[key]
except KeyError:
if must_exist:
raise ConfigKeyError(f"Key '{key}' not found in config file.")

return None
return self._config.get("injection_link_directory")
123 changes: 123 additions & 0 deletions src/config_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import re
from urllib.parse import urlparse

from .api import RedAPI, OpsAPI
from .filesystem import assert_path_exists


class ConfigValidator:
REQUIRED_KEYS = ["red_key", "ops_key"]
TORRENT_CLIENT_KEYS = ["deluge_rpc_url", "qbittorrent_url"]

def __init__(self, config_dict):
self.config_dict = config_dict
self.validation_schema = {
"red_key": self.__is_valid_red_key,
"ops_key": self.__is_valid_ops_key,
"port": self.__is_valid_port,
"deluge_rpc_url": self.__is_valid_deluge_url,
"qbittorrent_url": self.__is_valid_qbit_url,
"inject_torrents": self.__is_boolean,
"injection_link_directory": assert_path_exists,
}

@staticmethod
def verify_api_keys(config):
red_api = RedAPI(config.red_key)
ops_api = OpsAPI(config.ops_key)

# This will perform a lookup with the API and raise if there was a failure.
# Also caches the announce URL for future use which is a nice bonus
red_api.announce_url
ops_api.announce_url

return red_api, ops_api

def validate(self):
presence_errors = self.__validate_key_presence()
validation_errors, validated_values = self.__validate_attributes(presence_errors)

if validation_errors:
raise ValueError(self.__format_validation_errors(validation_errors))
return validated_values

def __validate_key_presence(self):
errors = {}

for key in self.REQUIRED_KEYS:
if not self.config_dict.get(key):
errors[key] = "Is required but was not found in the configuration"

if self.__torrent_injection_enabled():
if not any(self.config_dict.get(key) for key in self.TORRENT_CLIENT_KEYS):
errors["torrent_clients"] = 'A torrent client URL is required if "inject_torrents" is enabled'

if not self.config_dict.get("injection_link_directory"):
errors["injection_link_directory"] = 'An injection directory path is required if "inject_torrents" is enabled'

return errors

def __validate_attributes(self, presence_errors):
validated_values = {}
validation_errors = presence_errors.copy()

for key, validator in self.validation_schema.items():
existing_error = presence_errors.get(key, None)
value = self.config_dict.get(key, None)

if existing_error is None and value is not None:
try:
validated_values[key] = validator(str(value))
except Exception as e:
validation_errors[key] = str(e)

return validation_errors, validated_values

def __format_validation_errors(self, errors):
return "\n".join([f'- "{key}": {value}' for key, value in errors.items()])

def __torrent_injection_enabled(self):
return str(self.config_dict.get("inject_torrents", False)).lower() == "true"

@staticmethod
def __is_valid_qbit_url(url):
parsed_url = urlparse(url)
if parsed_url.scheme and parsed_url.netloc:
return parsed_url.geturl() # return the parsed URL
raise ValueError(f'Invalid "qbittorrent_url" provided: {url}')

@staticmethod
def __is_valid_deluge_url(url):
parsed_url = urlparse(url)
if parsed_url.scheme and parsed_url.netloc:
if not parsed_url.password:
raise Exception(
"You need to define a password in the Deluge RPC URL. (e.g. http://:<PASSWORD>@localhost:8112/json)"
)
return parsed_url.geturl() # return the parsed URL
raise ValueError(f'Invalid "deluge_rpc_url" provided: {url}')

@staticmethod
def __is_boolean(value):
coerced = value.lower().strip()
if coerced in ["true", "false"]:
return coerced == "true"
raise ValueError('value is not boolean ("true" or "false")')

@staticmethod
def __is_valid_port(port):
if port.isdigit() and 1 <= int(port) <= 65535:
return int(port) # Return the port number as an integer
raise ValueError(f'Invalid "port" ({port}): Not between 1 and 65535')

@staticmethod
def __is_valid_red_key(key):
if re.fullmatch(r"^[a-z0-9.]{41}$", key):
return key
raise ValueError(f'does not appear to match known API key patterns: "{key}"')

@staticmethod
def __is_valid_ops_key(key):
if re.fullmatch(r"^[A-Za-z0-9+/]{116}$", key):
return key
raise ValueError(f'does not appear to match known API key patterns: "{key}"')
4 changes: 0 additions & 4 deletions src/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ class TorrentAlreadyExistsError(Exception):
pass


class ConfigKeyError(Exception):
pass


class TorrentClientError(Exception):
pass

Expand Down
2 changes: 1 addition & 1 deletion src/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def sane_join(*args: str) -> str:
path_parts = [part.lstrip(os.path.sep) for part in args[1:]]
path_parts.insert(0, args[0])

return os.path.join(*path_parts)
return str(os.path.join(*path_parts))


def mkdir_p(directory_path: str) -> str:
Expand Down
13 changes: 8 additions & 5 deletions src/injection.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import shutil

from .errors import TorrentInjectionError
from .clients.deluge import Deluge
from .clients.qbittorrent import Qbittorrent
from .config import Config
from .errors import TorrentInjectionError
from .parser import calculate_infohash, get_bencoded_data


Expand All @@ -31,7 +31,8 @@ def inject_torrent(self, source_torrent_filepath, new_torrent_filepath, new_trac
save_path_override=output_parent_directory,
)

def __validate_config(self, config: Config):
@staticmethod
def __validate_config(config: Config):
if not config.inject_torrents:
raise TorrentInjectionError("Torrent injection is disabled in the config file.")

Expand All @@ -43,7 +44,8 @@ def __validate_config(self, config: Config):

return config

def __determine_torrent_client(self, config: Config):
@staticmethod
def __determine_torrent_client(config: Config):
if config.deluge_rpc_url:
return Deluge(config.deluge_rpc_url)
elif config.qbittorrent_url:
Expand All @@ -64,7 +66,7 @@ def __determine_source_torrent_data_location(self, torrent_data):
# torrents are stored under that directory.
#
# If a torrent has one file and that file is at the root level of the torrent, the `files` key is absent.
# If a torrent has multiple files OR a single file but it's in a directory, the `files` key is present
# If a torrent has multiple files OR a single file, but it's in a directory, the `files` key is present
# and is an array of dictionaries. Each dictionary has a `path` key that is an array of bytestrings where
# each array member is a part of the path to the file. In other words, if you joined all the bytestrings
# in the `path` array for a given file, you'd get the path to the file relative to the topmost parent
Expand All @@ -88,7 +90,8 @@ def __determine_output_location(self, source_torrent_file_or_dir, new_tracker):

return os.path.join(tracker_output_directory, os.path.basename(source_torrent_file_or_dir))

def __link_files_to_output_location(self, source_torrent_file_or_dir, output_location):
@staticmethod
def __link_files_to_output_location(source_torrent_file_or_dir, output_location):
if os.path.exists(output_location):
raise TorrentInjectionError(f"Cannot link given torrent since it's already been linked: {output_location}")

Expand Down
Loading

0 comments on commit c9d8b6f

Please sign in to comment.