-
Notifications
You must be signed in to change notification settings - Fork 0
/
Source.py
46 lines (32 loc) · 1.11 KB
/
Source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from dataclasses import dataclass
import os
from pathlib import Path
from data.ccy import CcyPair
import Quote.FX.Utils as FxUtils
class Source:
pass
@dataclass(frozen=True)
class CSV(Source):
filepath: Path
def __post_init(self):
if not self.filepath.exists():
ValueError("Input directory {} is invalid".format(repo))
@dataclass(frozen=True)
class Quandl(Source):
authtoken: str
@staticmethod
def quandl_db(cls: type) -> str:
# Name of the Quandl db associated with asset.
# See 'Quandl Code' section of https://www.quandl.com/blog/getting-started-with-the-quandl-api
if cls == CcyPair: return 'CUR'
else:
raise ValueError(f'no known Quandl DB for: {cls}')
@staticmethod
def quandl_code(obj: Any) -> str:
if type(obj) == CcyPair:
lhs, rhs = obj
if not obj.is_USD_cross():
raise ValueError(
f'Quandl only accepts USD crosses, got {lhs}/{rhs}')
non_USD_ccy = rhs if lhs is Ccy.USD else lhs
return f'{quandl_db(CcyPair)}/{non_usd_ccy}'