#!/usr/bin/env python3 from cmd import Cmd from datetime import datetime, time from inspect import signature, Signature import os from time import sleep def ir_debug(pulses): # Debugging placeholder print(','.join([str(x) for x in pulses])) def ir_android(pulses): plyer.irblaster.transmit(38000, pulses, 'microseconds') def ir_termux(pulses): pattern = ','.join([str(x) for x in pulses]) os.system('termux-infrared-transmit -f 38000 '+pattern) default_driver = ir_debug if 'ANDROID_ROOT' in os.environ: if 'com.termux' in os.environ.get('SHELL', ''): default_driver = ir_termux else: try: import plyer default_driver = ir_android except: pass class EufyRawIR: """ Low level IR signal modulation """ _IR_PREAMBLE = (3000, 3000) _IR_0 = (400, 600) _IR_1 = (400, 1600) _IR_TRAILER = (400, 20000) def __init__(self, driver=ir_debug): self._driver = driver def _modulate(self, message): signal = list(self._IR_PREAMBLE) for b in message: for i in range(8): if (b << i) & 0x80: signal.extend(list(self._IR_1)) else: signal.extend(list(self._IR_0)) signal.extend(list(self._IR_TRAILER)) return 3*signal def sendRaw(self, message): self._driver(self._modulate(message)) class EufyIR(EufyRawIR): """ High level Eufy commands """ _HEADER = 0x68 CLEAN_AUTO = 0x5d CLEAN_SPOT = 0x8c CLEAN_EDGE = 0x9c CLEAN_ROOM = 0xad STOP = 0x4f POWER_MAX = 0x1c POWER_BOOSTIQ = 0x1d POWER_STANDARD = 0x1e MOVE_FORWARD = 0x2f MOVE_BACKWARD = 0x7f MOVE_CCW = 0x3f MOVE_CW = 0x6f SET_TIME = 0xbf SET_SCHEDULE = 0xcf CANCEL_SCHEDULE = 0xdf RETURN_BASE = 0xef def __init__(self, driver=ir_debug): super(EufyIR, self).__init__(driver) self._schedule_file = os.path.expanduser('~/.eufy-schedule') try: with open(os.path.expanduser(self._SCHEDULE_FILE), 'r') as f: self._schedule = int(f.read(5)) except: self._schedule = 0xff def _message(self, code): now = datetime.now() msg = [self._HEADER, code, now.hour, now.minute, self._schedule] checksum = sum(msg) & 0xff msg.append(checksum) return msg def send(self, code): self.sendRaw(self._message(code)) def cleanAuto(self): self.send(self.CLEAN_AUTO) def cleanSpot(self): self.send(self.CLEAN_SPOT) def cleanEdge(self): self.send(self.CLEAN_EDGE) def cleanRoom(self): self.send(self.CLEAN_ROOM) def start(self): self.cleanAuto() def stop(self): self.send(self.STOP) def powerMax(self): self.send(self.POWER_MAX) def powerBoostIQ(self): self.send(self.POWER_BOOSTIQ) def powerStandard(self): self.send(self.POWER_STANDARD) def moveForward(self): self.send(self.MOVE_FORWARD) def moveBackward(self): self.send(self.MOVE_BACKWARD) def moveCCW(self): self.send(self.MOVE_CCW) def moveLeft(self): self.moveCCW() def moveCW(self): self.send(self.MOVE_CW) def moveRight(self): self.moveCW() def setTime(self): self.send(self.SET_TIME) def setSchedule(self, t): self._schedule = t.hour * 4 + t.minute // 15 with open(self._schedule_file, 'w') as f: f.write(str(self._schedule)) self.send(self.SET_SCHEDULE) def cancelSchedule(self): self._schedule = 0xff try: os.remove(self._schedule_file) except: pass self.send(self.CANCEL_SCHEDULE) def returnBase(self): self.send(self.RETURN_BASE) def lexer(f): def g(self, args): argv = tuple(x.lower() for x in args.split()) s = signature(f) maxargs = len(s.parameters) - 1 minargs = len([x for _,x in s.parameters.items() if x.default is Signature.empty]) - 1 if minargs <= len(argv) <= maxargs: return f(self, *argv) else: self.stdout.write("*** Invalid arguments for {}\n".format(f.__name__[3:])) return g class Eufy(Cmd): prompt = 'eufy> ' def __init__(self, *args, **kwargs): super(Eufy, self).__init__(*args, **kwargs) self.ir = EufyIR(default_driver) @lexer def do_clean(self, mode='auto'): { 'auto': self.ir.cleanAuto, 'spot': self.ir.cleanSpot, 'edge': self.ir.cleanEdge, 'room': self.ir.cleanRoom }.get(mode)() @lexer def do_start(self): self.ir.cleanAuto() @lexer def do_stop(self): self.ir.stop() @lexer def do_power(self, mode): { 'standard': self.ir.powerStandard, 'boostiq': self.ir.powerBoostIQ, 'max': self.ir.powerMax }.get(mode)() @lexer def do_move(self, direction): { 'forward': self.ir.moveForward, 'backward': self.ir.moveBackward, 'ccw': self.ir.moveCCW, 'cw': self.ir.moveCW, 'left': self.ir.moveCCW, 'right': self.ir.moveCW }.get(direction)() @lexer def do_time(self): self.ir.setTime() @lexer def do_schedule(self, t): if t == 'cancel': self.ir.cancelSchedule() else: newtime = datetime.now() newtime.strptime(t, "%H:%M") self.ir.setSchedule(newtime.time()) @lexer def do_base(self): self.ir.returnBase() @lexer def do_pause(self, delay=1): sleep(int(delay)) @lexer def do_quit(self): return True @lexer def do_EOF(self): return True if __name__ == '__main__': from argparse import ArgumentParser, FileType parser = ArgumentParser(description='Eufy RoboVac 11s CLI tool') parser.add_argument('-f', '--file', type=FileType('r'), help='read commands from file') parser.add_argument('command', nargs='*') args = parser.parse_args() eufy = Eufy() if args.file: eufy.cmdqueue.extend(args.file.readlines()) if args.command: eufy.cmdqueue.extend(' '.join(args.command).split(',')) if eufy.cmdqueue: eufy.cmdqueue.append('quit') eufy.cmdloop()