#!/usr/bin/python # vim: set fileencoding=utf-8 : # Alexander Grothe 2011 - 2012 # # This script requires python-uinput V 0.6.1. or higher. Additional required packages are libudev0 and libudev-dev. # ### Fetch the code for python-uinput from git: ### # # git clone git://github.com/tuomasjjrasanen/python-uinput.git # cd python-uinput # git clone git://github.com/tuomasjjrasanen/libsuinput.git # sudo python setup.py install # ### # # This script must be run as superuser or with sufficent rights to create an uinput device and exspects a lircd socket using pid from /var/run/lirc/lircd.pid under /var/run/lirc/lircd. if none is given by --lircd-socket /PATH/TO/LIRCD_SOCKET # lircd must not be startet with --uinput, but may be started with --release="_up" to prevent ghosting events if necessary. import syslog import string import socket import gobject import sys import uinput import datetime import time from threading import Timer from optparse import OptionParser class Lirc2uinput: """Sends keystrokes to a virtual uinput device after applying a repeat-filter""" def __init__(self, uinput_name="lircd", options=None, debug=None): self.lastkey = None self.wait_repeats = options.wait_repeats self.max_gap = options.max_gap self.min_gap = options.min_gap self.acceleration = options.acceleration self.lircd_socket = options.lircd_socket self.xbmc = options.xbmc self.gap_delta = (self.max_gap - self.min_gap)*self.acceleration self.current_gap = self.max_gap self.repeat_num = 0 self.debug = debug self.timestamp = datetime.datetime.now() self.timeout_value = 1 self.events = [] self.timeout = Timer(self.timeout_value, uinput.KEY_COFFEE) # add all defined KEY_.* to supported key events for element in dir(uinput): if element.startswith("KEY_"): self.events.append(eval("uinput.%s"%element)) self.debug.log("uinput.%s"%element) # create uinput device self.specialkeys = [uinput.KEY_VOLUMEUP, uinput.KEY_VOLUMEDOWN] self.device = uinput.Device(self.events, uinput_name) #self.specialkeys = [(1, 114),(1, 115)] # KEY_VOLUMEDOWN and KEY_VOLUMEUP - a "real" repeat behaviour is used. #print "init" def get_gap(self,repeat_num): if self.current_gap > self.min_gap: self.current_gap = self.current_gap - self.gap_delta else: self.debug.log("minimum gap reached") pass return self.current_gap def getKeyname(self, key): try: if key[0].islower(): keycmd = eval('uinput.%s'%(key.upper())) k_upper = False else: keycmd = eval('uinput.%s'%(key.replace('_up',''))) k_upper = True # '_up' ist a suffix added by lircd to keynames optionally to signal key release except: keycmd = uinput.KEY_COFFEE k_upper = True self.debug.log("Key %s is not supported by your input.h, get a coffee ;)"%key) #print "%s mapped to %s"%(key,keycmd) return keycmd, k_upper def send_key_r(self,key): keycmd, k_upper = self.getKeyname(key) self.debug.log(keycmd) now = datetime.datetime.now() # repeated keypress if self.lastkey == keycmd and (now - self.timestamp).microseconds < self.current_gap: self.debug.log("Passing keypress %s... too early"%(unicode(keycmd))) pass elif self.lastkey == keycmd: self.debug.log("Repeated keypress %s" %(unicode(keycmd))) if self.repeat_num >= self.wait_repeats: self.current_gap = self.get_gap(self.repeat_num) else: pass if self.repeat_num > 0: self.keypress(keycmd, 2) self.timestamp = datetime.datetime.now() self.repeat_num += 1 else: self.keypress(keycmd, 1) self.repeat_num += 1 self.lastkey=keycmd return keycmd def send_key(self,key): keycmd, k_upper = self.getKeyname(key) self.debug.log(keycmd) if self.xbmc and keycmd not in self.specialkeys: self.keypress(keycmd, 1) self.keypress(keycmd, 0) else: if self.lastkey == keycmd and self.repeat_num > 0: self.keypress(keycmd, 2) else: self.keypress(keycmd, 1) self.repeat_num += 1 self.lastkey=keycmd return keycmd def keypress(self, key, value): self.device.emit(key, value) class main: """Listens to LIRC's domain socket and calls a method each time an IR command is received.""" def __init__(self): parser = Options() self.options = parser.get_opts() self.Dbg = Debug(self.options.debug) self.timeout = self.options.timeout if (self.timeout * 1000) < self.options.min_gap: self.timeout = self.options.min_gap + 10000 self.Dbg.log("Warning: timeout < --min-gap: setting timeout 10ms greater than --min-gap") self.repeatfilter = self.options.repeatfilter self.xbmc = self.options.xbmc self.syslog_init() self.timer = None if not self.options.lircd_socket: # use /var/run/lirc/lircd. as socket try: with open("/var/run/lirc/lircd.pid", 'r') as pidfile: pid = int(pidfile.read().strip("\n")) self.socket_path = "/var/run/lirc/lircd.%s"%(pid) except IOError: self.Dbg.log(syslog.LOG_ERR, 'Error reading PID for lircd, I will sleep 1 second and exit this script') time.sleep(1) sys.exit("socket not found") finally: self.Dbg.log('lircd_socket = %s'%(self.socket_path)) else: self.socket_path = self.options.lircd_socket self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(self.socket_path) self.uinputdev = Lirc2uinput(options=self.options, debug=self.Dbg) if self.repeatfilter: self.command = "send_key_r" gobject.io_add_watch(self.sock, gobject.IO_IN, self.handler) else: self.command = "send_key" gobject.io_add_watch(self.sock, gobject.IO_IN, self.handler) def handler(self, sock, *args): try: gobject.source_remove(self.timer) #print "removed timer" except: pass buf = self.sock.recv(1024) lines = string.split(buf, "\n") for line in lines[:-1]: code,count,cmd,device = string.split(line, " ") keycmd = eval("self.uinputdev.%s(cmd)"%(self.command)) self.timer = gobject.timeout_add(self.timeout, self.release_key,keycmd) return True def release_key(self, keycmd): self.uinputdev.keypress(keycmd, 0) self.uinputdev.repeat_num = 0 self.uinputdev.lastkey = None self.uinputdev.current_gap = self.uinputdev.max_gap #print 'released key', keycmd return 0 def syslog_init(self): self.Dbg.log('Started lircd2uinput with these options:') self.Dbg.log('wait_repeats = %s'%(self.options.wait_repeats)) self.Dbg.log('max_gap = %s'%(self.options.max_gap)) self.Dbg.log('min_gap = %s'%(self.options.min_gap)) self.Dbg.log('acceleration = %s'%(self.options.acceleration)) class Options: def __init__(self): self.parser = OptionParser() self.parser.add_option("-f", "--repeat-filter", dest="repeatfilter", action="store_true", help=u'enable repeat-filter') self.parser.add_option("-s", "--lircd-socket", dest="lircd_socket", default=None, help=u"choose lircd socket to listen on", metavar="LIRCD_SOCKET") self.parser.add_option("-d", "--debug", dest="debug", action="store_true", help=u'enable debug mode') self.parser.add_option("-x", "--xbmc", dest="xbmc", action="store_true", help=u'enable xbmc single keypress mode') self.parser.add_option("-l", "--min-gap", dest="min_gap", default=150000, type="int", help=u"set minimum gap between repeated keystrokes (default 150000 µs) - needs active repeat-filter", metavar="MIN_GAP") self.parser.add_option("-u", "--max-gap", dest="max_gap", default=300000, type="int", help=u"set maximum gap between repeated keystrokes (default 300000 µs) - needs active repeat-filter", metavar="MAX_GAP") self.parser.add_option("-r", "--min-repeats", dest="wait_repeats", default=2, type="int", help=u"number of repeats before using accelerated keypresses (default = 2) - needs active repeat-filter", metavar="WAIT_REPEATS") self.parser.add_option("-t", "--timeout", dest="timeout", default=200, type="int", help=u"release key after x ms no following key is received (default = 200 ms)", metavar="TIMEOUT") self.parser.add_option("-a", "--acceleration", dest="acceleration", default=0.25, type="float", help=u"acceleration to get from MAX_GAP to MIN_GAP. default value of 0.25 equals 4 repeated keystrokes to reach maximum speed - needs active repeat-filter", metavar="ACCELERATION") def get_opts(self): (options, args) = self.parser.parse_args() return options class Debug(): def __init__(self,isactive=False): self.active = isactive def log(self, message): if self.active: syslog.syslog(unicode(message)) if __name__ == "__main__": vlirc = main() gobject.MainLoop().run()