Skip to content

Commit

Permalink
Merge pull request #36 from uvjustin/more_otp_methods
Browse files Browse the repository at this point in the history
Support OTP via email and sms.
  • Loading branch information
elahd authored Feb 20, 2022
2 parents 53bb65d + 94299a0 commit a064693
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 33 deletions.
58 changes: 51 additions & 7 deletions pyalarmdotcomajax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ADCGarageDoorCommand,
ADCImageSensorCommand,
ADCLockCommand,
ADCOtpType,
ADCPartitionCommand,
ADCTroubleCondition,
ArmingOption,
Expand All @@ -42,7 +43,7 @@
UnsupportedDevice,
)

__version__ = "0.2.0"
__version__ = "0.2.1"


log = logging.getLogger(__name__)
Expand All @@ -62,6 +63,8 @@ class ADCController:
"{}web/api/twoFactorAuthentication/twoFactorAuthentications/{}"
)
LOGIN_2FA_TRUST_URL_TEMPLATE = "{}web/api/twoFactorAuthentication/twoFactorAuthentications/{}/trustTwoFactorDevice"
LOGIN_2FA_REQUEST_OTP_SMS_URL_TEMPLATE = "{}web/api/twoFactorAuthentication/twoFactorAuthentications/{}/sendTwoFactorAuthenticationCode"
LOGIN_2FA_REQUEST_OTP_EMAIL_URL_TEMPLATE = "{}web/api/twoFactorAuthentication/twoFactorAuthentications/{}/sendTwoFactorAuthenticationCodeViaEmail"

VIEWSTATE_FIELD = "__VIEWSTATE"
VIEWSTATEGENERATOR_FIELD = "__VIEWSTATEGENERATOR"
Expand Down Expand Up @@ -106,9 +109,9 @@ def __init__(
silentarming: ArmingOption = ArmingOption.NEVER,
):
"""Use AIOHTTP to make a request to alarm.com."""
self._username = username
self._password = password
self._websession = websession
self._username: str = username
self._password: str = password
self._websession: aiohttp.ClientSession = websession
self._ajax_headers = {
"Accept": "application/vnd.api+json",
"ajaxrequestuniquekey": None,
Expand All @@ -121,6 +124,7 @@ def __init__(
{"twoFactorAuthenticationId": twofactorcookie} if twofactorcookie else {}
)
self._factor_type_id: int | None = None
self._two_factor_method: ADCOtpType | None = None
self._provider_name: str | None = None
self._user_id: str | None = None
self._user_email: str | None = None
Expand Down Expand Up @@ -176,7 +180,7 @@ def two_factor_cookie(self) -> str | None:
#
#

async def async_login(self) -> AuthResult:
async def async_login(self, request_otp: bool = True) -> AuthResult:
"""Login to Alarm.com."""
log.debug("Attempting to log in to Alarm.com")

Expand All @@ -186,6 +190,13 @@ async def async_login(self) -> AuthResult:

if not self._two_factor_cookie and await self._async_requires_2fa():
log.debug("Two factor authentication code or cookie required.")

if request_otp and self._two_factor_method in [
ADCOtpType.SMS,
ADCOtpType.EMAIL,
]:
await self.async_request_otp()

return AuthResult.OTP_REQUIRED

except (DataFetchFailed, UnexpectedDataStructure) as err:
Expand All @@ -197,6 +208,32 @@ async def async_login(self) -> AuthResult:

return AuthResult.SUCCESS

async def async_request_otp(self) -> str | None:
"""Request SMS/email OTP code from Alarm.com."""

try:

log.debug("Requesting OTP code...")

request_url = (
self.LOGIN_2FA_REQUEST_OTP_EMAIL_URL_TEMPLATE
if self._two_factor_method == ADCOtpType.EMAIL
else self.LOGIN_2FA_REQUEST_OTP_SMS_URL_TEMPLATE
)

async with self._websession.post(
url=request_url.format(self._url_base, self._user_id),
headers=self._ajax_headers,
) as resp:
if resp.status != 200:
raise DataFetchFailed("Failed to request 2FA code.")

except (asyncio.TimeoutError, aiohttp.ClientError) as err:
log.error("Can not load 2FA submission page from Alarm.com")
raise DataFetchFailed from err

return None

async def async_submit_otp(
self, code: str, device_name: str | None = None
) -> str | None:
Expand All @@ -216,7 +253,7 @@ async def async_submit_otp(
self._url_base, self._user_id
),
headers=self._ajax_headers,
json={"code": code, "typeOf2FA": 1},
json={"code": code, "typeOf2FA": self._two_factor_method.value},
) as resp:
json_rsp = await (resp.json())

Expand Down Expand Up @@ -711,7 +748,14 @@ async def _async_requires_2fa(self) -> bool | None:
factor_id := json_rsp.get("data", {}).get("id"), int
):
self._factor_type_id = factor_id
log.debug("Requires 2FA.")
self._two_factor_method = ADCOtpType(
json_rsp.get("data", {})
.get("attributes", {})
.get("twoFactorType")
)
log.debug(
"Requires 2FA. Using method %s", self._two_factor_method
)
return True

log.debug("Does not require 2FA.")
Expand Down
35 changes: 9 additions & 26 deletions pyalarmdotcomajax/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def get_enum_value(enum_class: Enum) -> Any:
return list(map(get_enum_value, cls))


class ADCOtpType(Enum):
"""Alarm.com two factor authentication type."""

APP = 1
SMS = 2
EMAIL = 4


class ADCTroubleCondition(TypedDict):
"""Alarm.com alert / trouble condition."""

Expand Down Expand Up @@ -103,7 +111,7 @@ class ADCSensorSubtype(IntEnum):
GLASS_BREAK_DETECTOR = 19
PANEL_MOTION_SENSOR = 89


class ADCPartitionCommand(Enum):
"""Commands for ADC partitions."""

Expand Down Expand Up @@ -147,28 +155,3 @@ class ImageData(TypedDict):
image_src: str
description: str
timestamp: datetime


# class DeviceTypeFetchErrors(TypedDict, total=False):
# """Store all errors encountered when fetching devices."""

# systems: DeviceTypeFetchError | None
# partitions: DeviceTypeFetchError | None
# locks: DeviceTypeFetchError | None
# sensors: DeviceTypeFetchError | None
# garageDoors: DeviceTypeFetchError | None


# class DeviceTypeFetchError(TypedDict):
# """Store errors encountered when fetching a particular device type."""

# device_type: ADCDeviceType
# errors: list[ADCError]


# class ADCError(TypedDict):
# """Alarm.com response error format."""

# status: str
# detail: str
# code: str

0 comments on commit a064693

Please sign in to comment.