248 lines
5.7 KiB
Python
Executable File
248 lines
5.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from cmd import Cmd
|
|
from datetime import datetime, time
|
|
from inspect import signature, Signature
|
|
import os
|
|
from time import sleep
|
|
|
|
|
|
def drive_ir(pulses):
|
|
# Debugging placeholder
|
|
print(','.join([str(x) for x in pulses]))
|
|
|
|
|
|
class EufyRawIR:
|
|
""" Low level IR signal modulation """
|
|
|
|
_IR_PREAMBLE = (3000, 3000)
|
|
_IR_0 = (400, 600)
|
|
_IR_1 = (400, 1600)
|
|
_IR_TRAILER = (400, 20000)
|
|
|
|
def _modulate(self, message):
|
|
signal = list(self._IR_PREAMBLE)
|
|
for b in message:
|
|
for i in range(8):
|
|
if (b << i) & 0x80:
|
|
signal.extend(list(self._IR_1))
|
|
else:
|
|
signal.extend(list(self._IR_0))
|
|
signal.extend(list(self._IR_TRAILER))
|
|
return 3*signal
|
|
|
|
def sendRaw(self, message):
|
|
drive_ir(self._modulate(message))
|
|
|
|
|
|
class EufyIR(EufyRawIR):
|
|
""" High level Eufy commands """
|
|
|
|
_HEADER = 0x68
|
|
|
|
CLEAN_AUTO = 0x5d
|
|
CLEAN_SPOT = 0x8c
|
|
CLEAN_EDGE = 0x9c
|
|
CLEAN_ROOM = 0xad
|
|
STOP = 0x4f
|
|
POWER_MAX = 0x1c
|
|
POWER_BOOSTIQ = 0x1d
|
|
POWER_STANDARD = 0x1e
|
|
MOVE_FORWARD = 0x2f
|
|
MOVE_BACKWARD = 0x7f
|
|
MOVE_CCW = 0x3f
|
|
MOVE_CW = 0x6f
|
|
SET_TIME = 0xbf
|
|
SET_SCHEDULE = 0xcf
|
|
CANCEL_SCHEDULE = 0xdf
|
|
RETURN_BASE = 0xef
|
|
|
|
def __init__(self):
|
|
self._schedule_file = os.path.expanduser('~/.eufy-schedule')
|
|
try:
|
|
with open(os.path.expanduser(self._SCHEDULE_FILE), 'r') as f:
|
|
self._schedule = int(f.read(5))
|
|
except:
|
|
self._schedule = 0xff
|
|
|
|
def _message(self, code):
|
|
now = datetime.now()
|
|
msg = [self._HEADER, code, now.hour, now.minute, self._schedule]
|
|
checksum = sum(msg) & 0xff
|
|
msg.append(checksum)
|
|
return msg
|
|
|
|
def send(self, code):
|
|
self.sendRaw(self._message(code))
|
|
|
|
def cleanAuto(self):
|
|
self.send(self.CLEAN_AUTO)
|
|
|
|
def cleanSpot(self):
|
|
self.send(self.CLEAN_SPOT)
|
|
|
|
def cleanEdge(self):
|
|
self.send(self.CLEAN_EDGE)
|
|
|
|
def cleanRoom(self):
|
|
self.send(self.CLEAN_ROOM)
|
|
|
|
def start(self):
|
|
self.cleanAuto()
|
|
|
|
def stop(self):
|
|
self.send(self.STOP)
|
|
|
|
def powerMax(self):
|
|
self.send(self.POWER_MAX)
|
|
|
|
def powerBoostIQ(self):
|
|
self.send(self.POWER_BOOSTIQ)
|
|
|
|
def powerStandard(self):
|
|
self.send(self.POWER_STANDARD)
|
|
|
|
def moveForward(self):
|
|
self.send(self.MOVE_FORWARD)
|
|
|
|
def moveBackward(self):
|
|
self.send(self.MOVE_BACKWARD)
|
|
|
|
def moveCCW(self):
|
|
self.send(self.MOVE_CCW)
|
|
|
|
def moveLeft(self):
|
|
self.moveCCW()
|
|
|
|
def moveCW(self):
|
|
self.send(self.MOVE_CW)
|
|
|
|
def moveRight(self):
|
|
self.moveCW()
|
|
|
|
def setTime(self):
|
|
self.send(self.SET_TIME)
|
|
|
|
def setSchedule(self, t):
|
|
self._schedule = t.hour * 4 + t.minute // 15
|
|
with open(self._schedule_file, 'w') as f:
|
|
f.write(str(self._schedule))
|
|
self.send(self.SET_SCHEDULE)
|
|
|
|
def cancelSchedule(self):
|
|
self._schedule = 0xff
|
|
try:
|
|
os.remove(self._schedule_file)
|
|
except:
|
|
pass
|
|
self.send(self.CANCEL_SCHEDULE)
|
|
|
|
def returnBase(self):
|
|
self.send(self.RETURN_BASE)
|
|
|
|
|
|
def lexer(f):
|
|
def g(self, args):
|
|
argv = tuple(x.lower() for x in args.split())
|
|
s = signature(f)
|
|
maxargs = len(s.parameters) - 1
|
|
minargs = len([x for _,x in s.parameters.items() if x.default is Signature.empty]) - 1
|
|
if minargs <= len(argv) <= maxargs:
|
|
return f(self, *argv)
|
|
else:
|
|
self.stdout.write("*** Invalid arguments for {}\n".format(f.__name__[3:]))
|
|
return g
|
|
|
|
|
|
class Eufy(Cmd):
|
|
|
|
prompt = 'eufy> '
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(Eufy, self).__init__(*args, **kwargs)
|
|
self.ir = EufyIR()
|
|
|
|
@lexer
|
|
def do_clean(self, mode='auto'):
|
|
{
|
|
'auto': self.ir.cleanAuto,
|
|
'spot': self.ir.cleanSpot,
|
|
'edge': self.ir.cleanEdge,
|
|
'room': self.ir.cleanRoom
|
|
}.get(mode)()
|
|
|
|
@lexer
|
|
def do_start(self):
|
|
self.ir.cleanAuto()
|
|
|
|
@lexer
|
|
def do_stop(self):
|
|
self.ir.stop()
|
|
|
|
@lexer
|
|
def do_power(self, mode):
|
|
{
|
|
'standard': self.ir.powerStandard,
|
|
'boostiq': self.ir.powerBoostIQ,
|
|
'max': self.ir.powerMax
|
|
}.get(mode)()
|
|
|
|
@lexer
|
|
def do_move(self, direction):
|
|
{
|
|
'forward': self.ir.moveForward,
|
|
'backward': self.ir.moveBackward,
|
|
'ccw': self.ir.moveCCW,
|
|
'cw': self.ir.moveCW,
|
|
'left': self.ir.moveCCW,
|
|
'right': self.ir.moveCW
|
|
}.get(direction)()
|
|
|
|
@lexer
|
|
def do_time(self):
|
|
self.ir.setTime()
|
|
|
|
@lexer
|
|
def do_schedule(self, t):
|
|
newtime = datetime.now()
|
|
newtime.strptime(t, "%H:%M")
|
|
self.ir.setSchedule(newtime.time())
|
|
|
|
@lexer
|
|
def do_base(self):
|
|
self.ir.returnBase()
|
|
|
|
@lexer
|
|
def do_pause(self, delay=1):
|
|
sleep(int(delay))
|
|
|
|
@lexer
|
|
def do_quit(self):
|
|
return True
|
|
|
|
@lexer
|
|
def do_EOF(self):
|
|
return True
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from argparse import ArgumentParser, FileType
|
|
|
|
parser = ArgumentParser(description='Eufy RoboVac 11s CLI tool')
|
|
parser.add_argument('-f', '--file', type=FileType('r'), help='read commands from file')
|
|
parser.add_argument('command', nargs='*')
|
|
args = parser.parse_args()
|
|
|
|
eufy = Eufy()
|
|
|
|
if args.file:
|
|
eufy.cmdqueue.extend(args.file.readlines())
|
|
|
|
if args.command:
|
|
eufy.cmdqueue.extend(' '.join(args.command).split(','))
|
|
|
|
if eufy.cmdqueue:
|
|
eufy.cmdqueue.append('quit')
|
|
|
|
eufy.cmdloop()
|