from optparse import OptionParser from grammar import parse import os, dbus, gobject, traceback, copy from xml.dom import minidom from dbus.mainloop.glib import DBusGMainLoop DBusGMainLoop(set_as_default=True) class LinglishException(Exception): def __init__(self, *args): self.args = [x or 'unspecified' for x in args] def __str__(self): return self.errorstr % self.args class BadBus(LinglishException): errorstr = "No such bus %s" class BadApplication(LinglishException): errorstr = "Couldn't find application %s" class InspecificApplication(LinglishException): errorstr = "Application %s was not specific enough: matches were %s" class ObjectNeeded(LinglishException): errorstr = "You must specify an object for the %s application: valid objects are %s" class BadObject(LinglishException): errorstr = "Couldn't find object %s in application %s" class InspecificObject(LinglishException): errorstr = "Object %s was not specific enough for application %s: valid objects are %s" class BadSignal(LinglishException): errorstr = "Couldn't find signal %s in application %s" class InspecificSignal(LinglishException): errorstr = "Signal %s was not specific enough for application %s: valid signals are %s" class BadProperty(LinglishException): errorstr = "Couldn't find property %s in application %s" class InspecificProperty(LinglishException): errorstr = "Property %s was not specific enough for application %s: valid properties are %s" class InterfaceNeeded(LinglishException): errorstr = "You must specify an interface for %s's %s object: valid interfaces are %s" class BadInterface(LinglishException): errorstr = "Couldn't find interface %s on object %s in application %s" class InspecificInterface(LinglishException): errorstr = "Interface %s was not specific enough for %s's %s object: valid interfaces are %s" class BadMethod(LinglishException): errorstr = "%s's %s object has no action %s" class InvalidVariable(LinglishException): errorstr = "No such variable '%s'" class Executer: def __init__(self, mainloop): self.mainloop = mainloop self.script_vars = {"true": True, "false": False} self.commands_waited_for = [] def quit(self): self.mainloop.quit() def execute(self, script): self.actually_execute(script) def actually_execute(self, script): self.commands = parse("script", script) if not self.commands: return self.process_command_queue() def process_command_queue(self): broke_loop = False while 1: try: next_command = self.commands.pop(0) except IndexError: if not self.commands_waited_for: break else: # there are waited-for commands # the first command in commands_waited_for should be a wait # if it isn't something has gone very wrong if self.commands_waited_for[0]["cmd"] != "wait": print "First command in the wait queue is not a wait" sys.exit(0) # decrement the wait command's wait_count by one self.commands_waited_for[0]["wait_count"] = self.commands_waited_for[0]["wait_count"] - 1 if self.commands_waited_for[0]["wait_count"] == 0: # we've executed this command enough break # put all the commands in the wait queue back on the real queue while self.commands_waited_for: self.commands.append(self.commands_waited_for.pop(0)) # pop off the first command next_command = self.commands.pop(0) # and continue on fn = getattr(self, "do_%s" % next_command["cmd"], None) if not fn: print "Command %s not recognised" % next_command["cmd"] return # special handling for "wait" commands. If this command is a wait, # or if a previous command was a wait, then put this command into # the commands_waited_for queue, so they can be re-added to the # main command queue later if next_command["cmd"] == "wait" or self.commands_waited_for: self.commands_waited_for.append(next_command) # Get a copy of the command, so we can pass all but "cmd" to the handler command_params = copy.deepcopy(next_command) del command_params["cmd"] try: cont = fn(**command_params) except: traceback.print_exc() self.quit() return if cont: # command says "carry on", so do nothing and we'll stay in # the while loop pass else: # command says "exit", so break the loop and expect the # command to restart it broke_loop = True break if not broke_loop: self.quit() def get_matching_application(self, act_bus, requested): # get all bus names on this bus dbus_proxy = act_bus.get_object('org.freedesktop.DBus','/') busnames = [str(x) for x in list(dbus_proxy.ListNames())] + \ [str(x) for x in list(dbus_proxy.ListActivatableNames())] busnames = dict([(x,"") for x in busnames]).keys() # uniquify # work out whether our application matches one of the names possible_apps = [] for busname in busnames: if busname.find(requested) != -1: # case-sensitive check first possible_apps.append((busname, True)) # case-sensitive match elif busname.lower().find(requested.lower()) != -1: # failing that case-insensitive possible_apps.append((busname, False)) # not case-sensitive if len(possible_apps) == 0: raise BadApplication(requested) elif len(possible_apps) > 1: # if there's only one app with case-sensitive True, use it poss_case_sens_apps = [x for x in possible_apps if x[1]] if len(poss_case_sens_apps) == 1: possible_apps = poss_case_sens_apps else: raise InspecificApplication( requested, ", ".join([x[0] for x in possible_apps])) act_application = possible_apps[0][0] return act_application def do_run(self, runnee): os.system(runnee) # return True so the main handler runs the next command return True def do_wait(self, application, signal, wait_count, bus, retvals, required_value): # connect to the bus if bus is None or bus == "Session": act_bus = dbus.SessionBus() elif bus == "System": act_bus = dbus.SystemBus() else: raise BadBus(bus) act_wait_app = self.get_matching_application(act_bus, application) all_objects = self.get_objects(act_bus, act_wait_app) # look for a signal that matches possible_signals = [] for some_object in all_objects.values(): for some_interface in some_object["interfaces"]: for some_signal in some_object["interfaces"][some_interface]["signals"]: if some_signal.find(signal) != -1: # case-sensitive check first possible_signals.append(some_signal) elif some_signal.lower().find(signal.lower()) != -1: # failing that case-insensitive possible_signals.append(some_signal) if len(possible_signals) == 0: raise BadSignal(wait_signal, application) elif len(possible_signals) > 1: raise InspecificSignal( wait_signal, application, ", ".join(possible_signals) ) act_signal = possible_signals[0] print "Waiting for %s" % act_signal self.current_signal_receiver = act_bus.add_signal_receiver(handler_function=self.waiter, signal_name=act_signal, bus_name=act_wait_app) self.waiter_retvals = retvals self.waiter_required_value = required_value # return False because we're calling the next command (from self.waiter) return False def waiter(self, *args): # stop listening for this signal self.current_signal_receiver.remove() if args: self.script_vars["it"] = args self.script_vars["the result"] = args if self.waiter_retvals: retval_length = len(args) if retval_length != len(self.waiter_retvals): raise "bad number of retvals" for i in range(retval_length): if self.waiter_retvals[i]["type"] != "var": raise "fail can't return into var'" retvals_item_name = self.waiter_retvals[i]["name"] self.script_vars[retvals_item_name] = args[i] MATCHES_REQUIRED_VALUE = True if self.waiter_required_value: MATCHES_REQUIRED_VALUE = False required_value = None if self.waiter_required_value["type"] == "string": required_value = self.waiter_required_value["value"] elif self.waiter_required_value["type"] == "integer": required_value = int(self.waiter_required_value["value"]) elif self.waiter_required_value["type"] == "var": required_value = eval(self.waiter_required_value["name"]) if args == required_value: MATCHES_REQUIRED_VALUE = True if len(args) == 1 and args[0] == required_value: MATCHES_REQUIRED_VALUE = True if not MATCHES_REQUIRED_VALUE: failstr = "Got the signal but its value was %r and you are waiting for %r" if len(args) == 1: print failstr % (args[0], required_value) else: print failstr % (args, required_value) if not MATCHES_REQUIRED_VALUE: # we waited, but we didn't get the result we wanted # so *add* one to this command's wait count, so it runs again # this command has just been added to the wait queue self.commands_waited_for[-1]["wait_count"] += 1 self.process_command_queue() def do_ask(self, application, obj, bus, prop): # connect to the bus if bus is None or bus == "Session": act_bus = dbus.SessionBus() elif bus == "System": act_bus = dbus.SystemBus() else: raise BadBus(bus) act_ask_app = self.get_matching_application(act_bus, application) all_objects = self.get_objects(act_bus, act_ask_app) # look for a signal that matches possible_props = [] for some_object in all_objects.values(): for some_interface in some_object["interfaces"]: for some_prop in some_object["interfaces"][some_interface]["properties"]: if some_prop.find(prop) != -1: # case-sensitive check first possible_props.append((some_object, some_interface, some_prop)) elif some_prop.lower().find(prop.lower()) != -1: # failing that case-insensitive possible_props.append((some_object, some_interface, some_prop)) if len(possible_props) == 0: raise BadProperty(prop, application) elif len(possible_props) > 1: raise InspecificProperty( prop, application, ", ".join([x[2] for x in possible_props]) ) act_obj, act_interface, act_prop = possible_props[0] method = "Get%s" % act_prop # finally, call it proxy_obj = act_bus.get_object(act_ask_app, act_obj["path"]) dbus_method = getattr(proxy_obj, method) retval = dbus_method(dbus_interface=act_interface) self.script_vars["it"] = retval self.script_vars["the result"] = retval self.script_vars[prop] = retval # return True so the main handler runs the next command return True def do_tell(self, application, obj, interface, bus, action, params, retvals): # connect to the bus if bus is None or bus == "Session": act_bus = dbus.SessionBus() elif bus == "System": act_bus = dbus.SystemBus() else: raise BadBus(bus) act_application = self.get_matching_application(act_bus, application) # get object paths from this application and see which match all_objects = self.get_objects(act_bus, act_application) possible_objects = [] if obj is None: # didn't specify an object; assume there is only one if len(all_objects.keys()) == 1: act_obj = all_objects.values()[0] else: raise ObjectNeeded(application, ", ".join(all_objects)) else: # object specified; find it for all_object in all_objects.keys(): if all_object.find(obj) != -1: # case-sensitive check first possible_objects.append(all_object) elif all_object.lower().find(obj.lower()) != -1: # failing that case-insensitive possible_objects.append(all_object) if len(possible_objects) == 0: raise BadObject(obj, application) elif len(possible_objects) > 1: raise InspecificObject( obj, application, ", ".join(possible_objects) ) act_obj = all_objects[possible_objects[0]] # now the interface all_interfaces = [x for x in act_obj["interfaces"].keys() if not x.startswith("org.freedesktop.DBus")] if interface is None: # check if there is one interface if len(all_interfaces) == 1: act_interface = all_interfaces[0] else: raise InterfaceNeeded(act_application, obj, ", ".join(all_interfaces)) else: possible_interfaces = [] for ifname in all_interfaces: if ifname.find(interface) != -1: # case-sensitive check first possible_interfaces.append(ifname) elif ifname.lower().find(interface.lower()) != -1: # failing that case-insensitive possible_interfaces.append(ifname) if len(possible_interfaces) == 0: raise BadInterface(interface, obj, application) elif len(possible_objects) > 1: raise InspecificInterface( interface, application, obj, ", ".join(possible_interfaces) ) act_interface = possible_interfaces[0] # and now the method, the name of which must be right (but case-insensitive) method = None for m in act_obj["interfaces"][act_interface]["methods"]: if m.lower() == action.lower(): method = m if not method: raise BadMethod(application, obj, action) # now, substitute vars into the params list act_params = [] for param in params: if param["type"] == "string": act_params.append(param["value"]) elif param["type"] == "integer": act_params.append(int(param["value"])) elif param["type"] == "var": varname = param["name"] if self.script_vars.has_key(varname): act_params.append(self.script_vars[varname]) else: # using a variable that's not found raise InvalidVariable(varname) # retvals are special magic; it comes through as a list of vars # but those vars are *not* substituted; instead, they are OUTPUT # vars into which the values are placed # finally, call it proxy_obj = act_bus.get_object(act_application, act_obj["path"]) dbus_method = getattr(proxy_obj, method) retval = dbus_method(dbus_interface=act_interface, *act_params) self.script_vars["it"] = retval self.script_vars["the result"] = retval if retvals: if isinstance(retval, dbus.String): # so string, not a list of char retval_length = 1 retval = [retval] elif retval is None: retval_length = 0 else: retval_length = len(retval) if retval_length != len(retvals): raise "bad number of retvals" for i in range(retval_length): if retvals[i]["type"] != "var": raise "fail can't return into var'" retvals_item_name = retvals[i]["name"] self.script_vars[retvals_item_name] = retval[i] # return True because the main handler calls the next command for us return True def get_objects(self, bus, app, cum_path=""): found_objects = {} if cum_path == "": proxy = bus.get_object(app, "/") else: proxy = bus.get_object(app, cum_path) xml = proxy.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable') dom = minidom.parseString(xml) elements = [x for x in dom.documentElement.childNodes if x.nodeType == 1] subdict = {} for element in elements: if element.nodeName == "node": subpath = cum_path + "/" + element.getAttribute("name") found_objects.update(self.get_objects(bus, app, subpath)) elif element.nodeName == "interface": methods = [] signals = [] properties = [] for subel in element.childNodes: if subel.nodeName == "method": methods.append(subel.getAttribute("name")) if subel.nodeName == "signal": signals.append(subel.getAttribute("name")) if subel.nodeName == "property": properties.append(subel.getAttribute("name")) subdict[element.getAttribute("name")] = {} subdict[element.getAttribute("name")]["methods"] = methods subdict[element.getAttribute("name")]["signals"] = signals subdict[element.getAttribute("name")]["properties"] = properties else: pass if subdict: if cum_path == "": found_objects["/"] = {"path": "/", "interfaces": subdict} else: found_objects[cum_path] = {"path": cum_path, "interfaces": subdict} return found_objects if __name__ == "__main__": parser = OptionParser() parser.add_option("-c", "--command", dest="command", help="Specify the command to execute") (options, args) = parser.parse_args() data = None if options.command: data = options.command else: if args: fn = args[0] if os.path.exists(fn): fp = open(fn) data = fp.read() fp.close() else: print "File %s does not exist" % fn else: print "Syntax: linglish " if data: mainloop = gobject.MainLoop() executer = Executer(mainloop) gobject.idle_add(executer.execute, data) mainloop.run() final_result = executer.script_vars.get("the result", "") if final_result is not None: print final_result