Skip to content

Commit

Permalink
Merge pull request #491 from y-yosuke/mod-with-switchbot-api-1.1
Browse files Browse the repository at this point in the history
Mod for switchbot api 1.1
  • Loading branch information
k-okada authored Nov 16, 2023
2 parents d7a974e + d535017 commit 56747c3
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 25 deletions.
2 changes: 2 additions & 0 deletions switchbot_ros/launch/switchbot.launch
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
<launch>
<arg name="token" />
<arg name="secret" default="''" />
<arg name="respawn" default="true" />

<node name="switchbot_ros" pkg="switchbot_ros" type="switchbot_ros_server.py"
respawn="$(arg respawn)" output="screen">
<rosparam subst_value="true">
token: $(arg token)
secret: $(arg secret)
</rosparam>
</node>
</launch>
15 changes: 15 additions & 0 deletions switchbot_ros/scripts/control_switchbot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python

import rospy
from switchbot_ros.switchbot_ros_client import SwitchBotROSClient

rospy.init_node('controler_node')
client = SwitchBotROSClient()

devices = client.get_devices()
print(devices)

client.control_device('pendant-light', 'turnOff')

client.control_device('bot74a', 'turnOn')

83 changes: 74 additions & 9 deletions switchbot_ros/scripts/switchbot_ros_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ def __init__(self):
self.token = f.read().replace('\n', '')
else:
self.token = token

# Switchbot API v1.1 needs secret key
secret = rospy.get_param('~secret', None )
if secret is not None and os.path.exists(secret):
with open(secret, 'r', encoding='utf-8') as f:
self.secret = f.read().replace('\n', '')
else:
self.secret = secret

# Initialize switchbot client
self.bots = self.get_switchbot_client()
self.print_apiversion()
self.print_devices()
self.print_scenes()
# Actionlib
self._as = actionlib.SimpleActionServer(
'~switch', SwitchBotCommandAction,
Expand All @@ -37,10 +48,11 @@ def __init__(self):
# Topic
self.pub = rospy.Publisher('~devices', DeviceArray, queue_size=1, latch=True)
self.published = False
rospy.loginfo('Ready.')

def get_switchbot_client(self):
try:
client = SwitchBotAPIClient(token=self.token)
client = SwitchBotAPIClient(token=self.token, secret=self.secret)
rospy.loginfo('Switchbot API Client initialized.')
return client
except ConnectionError: # If the machine is not connected to the internet
Expand All @@ -57,31 +69,84 @@ def spin(self):
self.publish_devices()
self.published = True

def print_apiversion(self):
if self.bots is None:
return

apiversion_str = 'Using SwitchBot API ';
apiversion_str += self.bots.api_version;
rospy.loginfo(apiversion_str)


def print_devices(self):
if self.bots is None:
return
device_list_str = 'Switchbot device list:\n'

device_list_str = 'Switchbot Device List:\n'
device_list = sorted(
self.bots.device_list,
key=lambda device: str(device['deviceName']))
key=lambda device: str(device.get('deviceName')))
device_list_str += str(len(device_list)) + ' Item(s)\n'
for device in device_list:
device_list_str += 'Name: ' + str(device['deviceName'])
device_list_str += ', Type: ' + str(device['deviceType'])
device_list_str += 'deviceName: ' + str(device.get('deviceName'))
device_list_str += ', deviceID: ' + str(device.get('deviceId'))
device_list_str += ', deviceType: ' + str(device.get('deviceType'))
device_list_str += '\n'
rospy.loginfo(device_list_str)

remote_list_str = 'Switchbot Remote List:\n'
infrared_remote_list = sorted(
self.bots.infrared_remote_list,
key=lambda infrared_remote: str(infrared_remote.get('deviceName')))
remote_list_str += str(len(infrared_remote_list)) + ' Item(s)\n'
for infrared_remote in infrared_remote_list:
remote_list_str += 'deviceName: ' + str(infrared_remote.get('deviceName'))
remote_list_str += ', deviceID: ' + str(infrared_remote.get('deviceId'))
remote_list_str += ', remoteType: ' + str(infrared_remote.get('remoteType'))
remote_list_str += '\n'
rospy.loginfo(remote_list_str)


def print_scenes(self):
if self.bots is None:
return

scene_list_str = 'Switchbot Scene List:\n'
scene_list = sorted(
self.bots.scene_list,
key=lambda scene: str(scene.get('sceneName')))
scene_list_str += str(len(scene_list)) + ' Item(s)\n'
for scene in scene_list:
scene_list_str += 'sceneName: ' + str(scene.get('sceneName'))
scene_list_str += ', sceneID: ' + str(scene.get('sceneId'))
scene_list_str += '\n'
rospy.loginfo(scene_list_str)


def publish_devices(self):
if self.bots is None:
return

msg = DeviceArray()

device_list = sorted(
self.bots.device_list,
key=lambda device: str(device['deviceName']))
key=lambda device: str(device.get('deviceName')))
for device in device_list:
msg_device = Device()
msg_device.name = str(device['deviceName'])
msg_device.type = str(device['deviceType'])
msg_device.name = str(device.get('deviceName'))
msg_device.type = str(device.get('deviceType'))
msg.devices.append(msg_device)

infrared_remote_list = sorted(
self.bots.infrared_remote_list,
key=lambda infrared_remote: str(infrared_remote.get('deviceName')))
for infrared_remote in infrared_remote_list:
msg_device = Device()
msg_device.name = str(infrared_remote.get('deviceName'))
msg_device.type = str(infrared_remote.get('remoteType'))
msg.devices.append(msg_device)

self.pub.publish(msg)

def execute_cb(self, goal):
Expand All @@ -97,7 +162,7 @@ def execute_cb(self, goal):
command_type = 'command'
try:
if not self.bots:
self.bots = SwitchBotAPIClient(token=self.token)
self.bots = SwitchBotAPIClient(token=self.token, secret=self.secret)
feedback.status = str(
self.bots.control_device(
command=goal.command,
Expand Down
80 changes: 64 additions & 16 deletions switchbot_ros/src/switchbot_ros/switchbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,28 @@
import os.path
import requests

import sys
import os
import time
import hashlib
import hmac
import base64
import uuid


class SwitchBotAPIClient(object):
"""
For Using SwitchBot via official API.
Please see https://github.com/OpenWonderLabs/SwitchBotAPI for details.
"""
def __init__(self, token):
self._host_domain = "https://api.switch-bot.com/v1.0/"
def __init__(self, token, secret=""):
if not secret:
self.api_version = "v1.0"
else:
self.api_version = "v1.1"
self._host_domain = "https://api.switch-bot.com/" + self.api_version + "/"
self.token = token
self.secret = secret # SwitchBot API v1.1
self.device_list = None
self.infrared_remote_list = None
self.scene_list = None
Expand All @@ -21,6 +34,41 @@ def __init__(self, token):
self.update_device_list()
self.update_scene_list()

def make_sign(self, token, secret):
"""
Make Sign from token and secret
"""
nonce = uuid.uuid4()
t = int(round(time.time() * 1000))
string_to_sign = '{}{}{}'.format(token, t, nonce)

if sys.version_info[0] > 2:
string_to_sign = bytes(string_to_sign, 'utf-8')
secret = bytes(secret, 'utf-8')
else:
string_to_sign = bytes(string_to_sign)
secret = bytes(secret)

sign = base64.b64encode(hmac.new(secret, msg=string_to_sign, digestmod=hashlib.sha256).digest())

if sys.version_info[0] > 2:
sign = sign.decode('utf-8')

return sign, str(t), nonce

def make_request_header(self, token, secret):
"""
Make Request Header
"""
sign, t, nonce = self.make_sign(token, secret)
headers={
"Authorization": token,
"sign": str(sign),
"t": str(t),
"nonce": str(nonce),
"Content-Type": "application/json; charset=utf8"
}
return headers

def request(self, method='GET', devices_or_scenes='devices', service_id='', service='', json_body=None):
"""
Expand All @@ -30,20 +78,20 @@ def request(self, method='GET', devices_or_scenes='devices', service_id='', serv
raise ValueError('Please set devices_or_scenes variable devices or scenes')

url = os.path.join(self._host_domain, devices_or_scenes, service_id, service)

headers = self.make_request_header(self.token, self.secret)

if method == 'GET':
response = requests.get(
url,
headers={'Authorization': self.token}
headers=headers
)
elif method == 'POST':
response = requests.post(
url,
json_body,
headers={
'Content-Type': 'application/json; charset=utf8',
'Authorization': self.token
})
json=json_body,
headers=headers
)
else:
raise ValueError('Got unexpected http request method. Please use GET or POST.')

Expand Down Expand Up @@ -87,8 +135,8 @@ def update_device_list(self):
self.infrared_remote_list = res['body']['infraredRemoteList']
for device in self.device_list:
self.device_name_id[device['deviceName']] = device['deviceId']
for infrated_remote in self.infrared_remote_list:
self.device_name_id[device['deviceName']] = device['deviceId']
for infrared_remote in self.infrared_remote_list:
self.device_name_id[infrared_remote['deviceName']] = infrared_remote['deviceId']

return self.device_list, self.infrared_remote_list

Expand All @@ -99,7 +147,7 @@ def update_scene_list(self):
"""
self.scene_list = self.request(devices_or_scenes='scenes')['body']
for scene in self.scene_list:
self.scene_name_id[scene['sceneName']] = device['sceneId']
self.scene_name_id[scene['sceneName']] = scene['sceneId']

return self.scene_list

Expand All @@ -125,11 +173,11 @@ def control_device(self, command, parameter='default', command_type='command', d
"""
Send Command to the device. Please see https://github.com/OpenWonderLabs/SwitchBotAPI#send-device-control-commands for command options.
"""
json_body = json.dumps({
"command": command,
"parameter": parameter,
"commandType": command_type
})
json_body = {
"command": str(command),
"parameter": str(parameter),
"commandType": str(command_type)
}
if device_id:
pass
elif device_name:
Expand Down

0 comments on commit 56747c3

Please sign in to comment.