legoEv3/save/home/robot/motorIR.py
from datetime import datetime, timedelta
print(str(datetime.now()), __file__, "begin import")
import time
import threading
from queue import Queue, Empty
import sys
from select import select
running = True
class Log(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.que = Queue()
self.log("__init__")
def run(self):
self.f = open("/home/robot/log.log", "wt", buffering=1)
self.f.write(str(datetime.now()) + ' ' + __file__ + ' begin\n')
while True:
w = self.que.get()
if w == 'end':
break
self.f.write(w.replace("\n", "\n\t") + "\n")
self.f.write(str(datetime.now()) + ' ' + __file__ + ' end\n')
self.f.close()
def log(self, *args):
t = str(datetime.now())
for a in args:
t += ' ' + str(a)
print(t)
self.que.put(t)
def end(self):
self.que.put("end")
lg = Log()
lg.start()
lg.log('first log',3, datetime.now())
from ev3dev2.led import Leds
leds = Leds()
leds.set_color("LEFT", "ORANGE", pct=25)
leds.set_color("RIGHT", "YELLOW", pct=25)
lg.log("after led colors", leds.led_colors)
from ev3dev2.button import *
from ev3dev2.sensor.lego import InfraredSensor
from ev3dev2.motor import *
# from ev3dev2._platform.ev3 import *
# print('EVDEV_DEVICE_NAME', EVDEV_DEVICE_NAME)
# for name, mod in sys.modules.items():
# print(name, '-->', mod)
lg.log("start t1.py ... ")
lg.log("sys.version", sys.version, "time=" + str(datetime.now()))
def readLoopTimeout(dev, to):
'''Enter an endless ``select()`` loop that yields input events and timeouts
adapted from evdev/device.py InputDevice.read_loop'''
while True:
r, w, x = select.select([dev.fd], [], [], to)
if len(r) <= 0:
yield None
else:
for event in dev.read():
yield event
class ButtonThr(threading.Thread):
""" thread ev3 brick buttons """
def __init__(self):
threading.Thread.__init__(self)
self.but = Button()
self.button_pressed = []
Button.evdev_device_name = 'EV3 Brick Buttons' # Constant in Button has wrong case, still aber updgrade ... !!!!
def run(self):
global running
for event in readLoopTimeout(self.but.evdev_device, 0.5):
if not running:
break
if event != None:
self.buttons_pressed = self.but.buttons_pressed
lg.log("button", self.buttons_pressed, 'event', repr(event))
if 'backspace' in self.buttons_pressed:
running = False
break
lg.log("button stop", self.but.buttons_pressed)
class IRThr(threading.Thread):
""" thread ev3 brick buttons """
def __init__(self):
threading.Thread.__init__(self)
self.ir = InfraredSensor()
self.ir.mode = InfraredSensor.MODE_IR_REMOTE
self.info()
def info(self):
lg.log("InfraredSensor " + self.ir.address,
"\nmode=" + str(self.ir.mode) + " modes=" + repr(self.ir.modes),
"\ncommands=" + str(self.ir.commands),
"\nBUTTONS=", self.ir._BUTTONS,
"\n__dict__=", repr(self.ir.__dict__),
"\n__slots__=", repr(type(self.ir).__slots__))
def run(self):
global running
self.buttons = None
while running:
n = self.ir.buttons_pressed(1)
if n != self.buttons:
lg.log("ir buttons ch1", n, "running=", running)
self.buttons = n
if "top_left" in self.buttons:
mA.que.put(lambda t, m, i: (lg.log(i, "half rotate gripper plus"),
m.on_for_rotations(SpeedPercent(75), 1.5, block=False) ) ) # half rotation on gripper
if "bottom_left" in self.buttons:
mA.que.put(lambda t, m, i: (lg.log(i, "half rotate gripper minus"),
m.on_for_rotations(SpeedPercent(75), -1.5, block=False) ) ) # half rotation on gripper
if "top_right" in self.buttons:
mD.que.put(lambda t, m, i: (lg.log(i, "rotate large plus"),
m.on_for_rotations(SpeedPercent(75), 5, block=False) ) ) # half rotation on large
if "bottom_right" in self.buttons:
mD.que.put(lambda t, m, i: (lg.log(i, "rotate large minus"),
m.on_for_rotations(SpeedPercent(75), -5, block=False) ) ) # half rotation on large
if "beacon" in self.buttons:
lg.log("beacon stop")
running = False
time.sleep(0.001)
class MotorThr(threading.Thread):
""" thread for this motor - handle all operations in this thread
print the motor state, whenever it changes """
def __init__(self, mo):
threading.Thread.__init__(self)
self.mot = mo
self.que = Queue()
def run(self):
global running
self.address = self.mot.address
self.count_per_rot = self.mot.count_per_rot
self.commands = self.mot.commands
nxTi = datetime.now()
lg.log(nxTi, "motor " + self.address, "begin count_per_rot=",
self.count_per_rot, "commands=", self.mot.commands)
i = 0
j = 0
oSt = ''
while running:
i = i + 1
try:
# if we can get an element from the queue (before timeout) execute it
tiOu = (nxTi - datetime.now()).total_seconds()
w = self.que.get(True, tiOu) if tiOu > 0 else self.que.get(False)
w(self, self.mot, i)
except Empty:
# Timeout, no input in given time
self.state = self.mot.state
self.position = self.mot.position
# self.speed = self.mot.speed
aSt = " state=" + repr(self.state) + " position="+str(self.position) # + " speed="+str(self.speed)
if aSt != oSt:
j = j+1
lg.log("motor " + self.address + aSt + " cnt="+str(i)+'/'+str(j))
nxTi = datetime.now() + timedelta(seconds=0.001)
oSt = aSt
lg.log("motor ", self.address, 'end')
lg.log(__file__, "begin, platform", get_current_platform())
# print("devices", repr(evdev.list_devices()))
# print("dev 2", repr([evdev.InputDevice(fn).name for fn in evdev.list_devices()]))
but = ButtonThr()
ir = IRThr()
mA = MotorThr(MediumMotor(OUTPUT_A))
mD = MotorThr(LargeMotor(OUTPUT_D))
but.start()
ir.start()
mA.start()
mD.start()
leds.set_color("LEFT", "GREEN", pct=25)
leds.set_color("RIGHT", "GREEN", pct=25)
lg.log(str(datetime.now()), __file__, "started")
but.join()
ir.join()
mA.join()
leds.set_color("LEFT", "BLACK", pct=0)
leds.set_color("RIGHT", "BLACK", pct=0)
lg.end()
lg.join()
print(str(datetime.now()), __file__, "after lg.join")