eupy/eufy.py

237 lines
5.2 KiB
Python
Executable File

#!/usr/bin/env python3
from cmd import Cmd
from datetime import datetime, time
from inspect import getargspec
from shlex import shlex
from time import sleep
def drive_ir(pulses):
# Debugging placeholder
print(','.join([str(x) for x in pulses]))
class EufyRawIR:
""" Low level IR signal modulation """
_IR_PREAMBLE = (3000, 3000)
_IR_0 = (400, 600)
_IR_1 = (400, 1600)
_IR_TRAILER = (400, 20000)
def _modulate(self, message):
signal = list(self._IR_PREAMBLE)
for b in message:
for i in range(8):
if (b << i) & 0x80:
signal.extend(list(self._IR_1))
else:
signal.extend(list(self._IR_0))
signal.extend(list(self._IR_TRAILER))
return 3*signal
def sendRaw(self, message):
drive_ir(self._modulate(message))
class EufyIR(EufyRawIR):
""" High level Eufy commands """
_HEADER = 0x68
CLEAN_AUTO = 0x5d
CLEAN_SPOT = 0x8c
CLEAN_EDGE = 0x9c
CLEAN_ROOM = 0xad
STOP = 0x4f
POWER_MAX = 0x1c
POWER_BOOSTIQ = 0x1d
POWER_STANDARD = 0x1e
MOVE_FORWARD = 0x2f
MOVE_BACKWARD = 0x7f
MOVE_CCW = 0x3f
MOVE_CW = 0x6f
SET_TIME = 0xbf
SET_SCHEDULE = 0xcf
CANCEL_SCHEDULE = 0xdf
RETURN_BASE = 0xef
def __init__(self):
self._schedule = 0xff
def _message(self, code):
now = datetime.now()
msg = [self._HEADER, code, now.hour, now.minute, self._schedule]
checksum = sum(msg) & 0xff
msg.append(checksum)
return msg
def send(self, code):
self.sendRaw(self._message(code))
def cleanAuto(self):
self.send(self.CLEAN_AUTO)
def cleanSpot(self):
self.send(self.CLEAN_SPOT)
def cleanEdge(self):
self.send(self.CLEAN_EDGE)
def cleanRoom(self):
self.send(self.CLEAN_ROOM)
def start(self):
self.cleanAuto()
def stop(self):
self.send(self.STOP)
def powerMax(self):
self.send(self.POWER_MAX)
def powerBoostIQ(self):
self.send(self.POWER_BOOSTIQ)
def powerStandard(self):
self.send(self.POWER_STANDARD)
def moveForward(self):
self.send(self.MOVE_FORWARD)
def moveBackward(self):
self.send(self.MOVE_BACKWARD)
def moveCCW(self):
self.send(self.MOVE_CCW)
def moveLeft(self):
self.moveCCW()
def moveCW(self):
self.send(self.MOVE_CW)
def moveRight(self):
self.moveCW()
def setTime(self):
self.send(self.SET_TIME)
def setSchedule(self, t):
# FIXME: properly store schedule
self.send(self.SET_SCHEDULE, t)
def cancelSchedule(self):
# FIXME: properly store schedule
self.send(self.CANCEL_SCHEDULE)
def returnBase(self):
self.send(self.RETURN_BASE)
def lexer(f):
def g(self, args):
argv = tuple(x.lower() for x in shlex(args))
s = getargspec(f)
maxargs = len(s.args or [])
minargs = maxargs - len(s.defaults or [])
if minargs <= len(argv) + 1 <= maxargs:
return f(self, *argv)
else:
self.stdout.write("*** Invalid arguments for {}\n".format(f.__name__[3:]))
return g
class Eufy(Cmd):
prompt = 'eufy> '
def __init__(self, *args, **kwargs):
super(Eufy, self).__init__(*args, **kwargs)
self.ir = EufyIR()
@lexer
def do_clean(self, mode='auto'):
{
'auto': self.ir.cleanAuto,
'spot': self.ir.cleanSpot,
'edge': self.ir.cleanEdge,
'room': self.ir.cleanRoom
}.get(mode)()
@lexer
def do_start(self):
self.ir.cleanAuto()
@lexer
def do_stop(self):
self.ir.stop()
@lexer
def do_power(self, mode):
{
'standard': self.ir.powerStandard,
'boostiq': self.ir.powerBoostIQ,
'max': self.ir.powerMax
}.get(mode)()
@lexer
def do_move(self, direction):
{
'forward': self.ir.moveForward,
'backward': self.ir.moveBackward,
'ccw': self.ir.moveCCW,
'cw': self.ir.moveCW,
'left': self.ir.moveCCW,
'right': self.ir.moveCW
}.get(direction)()
@lexer
def do_time(self):
self.ir.setTime()
@lexer
def do_schedule(self, t):
newtime = datetime.now()
newtime.strptime(t, "%H:%M")
self.ir.setSchedule(newtime.time())
@lexer
def do_base(self):
self.ir.returnBase()
@lexer
def do_pause(self, delay=1):
sleep(int(delay))
@lexer
def do_quit(self):
return True
@lexer
def do_EOF(self):
return True
if __name__ == '__main__':
from argparse import ArgumentParser, FileType
parser = ArgumentParser(description='Eufy RoboVac 11s CLI tool')
parser.add_argument('-f', '--file', type=FileType('r'), help='read commands from file')
parser.add_argument('command', nargs='*')
args = parser.parse_args()
eufy = Eufy()
if args.file:
eufy.cmdqueue.extend(args.file.readlines())
if args.command:
eufy.cmdqueue.extend(' '.join(args.command).split(','))
if eufy.cmdqueue:
eufy.cmdqueue.append('quit')
eufy.cmdloop()