-
Benoit Favre authoredBenoit Favre authored
main.py 9.13 KiB
#!/usr/bin/env python2
import sys
import os
import glob, re
# set to location of libgstkaldionline2.so
directory = os.path.dirname(__file__) or '.'
os.environ['GST_PLUGIN_PATH'] = directory + '/asr/gst'
os.environ['GTK_THEME'] = 'light'
print 'gst plugin path =', os.environ['GST_PLUGIN_PATH']
# import gtk stuff
from threading import Thread
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, Gtk, Gdk
GObject.threads_init()
Gdk.threads_init()
Gst.init(None)
# make sure ctrl-c works
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
# import local stuff
import confirm, asr, actions, xmlview
import levenstein, slu, osc
class ScriptedASR(Gtk.Window):
def __init__(self, asr_config_file, osc_host, osc_port, slu_type):
super(ScriptedASR, self).__init__()
import config
config_dict = config.read(asr_config_file)
xml_filename = config_dict['xml_filename']
self.connect("destroy", self.quit)
self.set_default_size(1024, 768)
self.set_border_width(10)
self.set_title('ASR Transcript [xml=%s asr=%s osc=%s:%s slu=%d]' % (xml_filename, asr_config_file, osc_host, osc_port, slu_type))
vbox = Gtk.VBox()
self.xmlview = xmlview.XmlView(xml_filename)
vbox.pack_start(self.xmlview, True, True, 5)
self.confirmer = confirm.ConfirmationBox()
vbox.pack_start(self.confirmer, False, True, 5)
# transcript view
if slu_type == 1:
self.asr = asr.ASR(asr_config_file, self.hyp_changed2)
else:
self.asr = asr.ASR(asr_config_file, self.hyp_changed)
vbox.pack_start(self.asr, False, True, 5)
# slu
#prefix = 'slu/automate/homeostasis_25nov_%s'
#library = 'slu/src.new/librocio_slu.so'
#prefix = 'tools/model/automate/simple-example_%s'
#library = 'tools/slu/src/librocio_slu.so'
slu_prefix = config_dict['slu_prefix']
slu_actions = config_dict['slu_actions']
slu_library = '%s/tools/slu/src/librocio_slu.so' % directory
self.slu = {}
for section_fst in glob.glob(slu_prefix % 'section*.fst'):
found = re.search('section(\d+)\.fst$', section_fst)
if found:
section_id = int(found.group(1))
self.slu[section_id - 1] = slu.SLU(slu_prefix % 'dico_word.txt', slu_prefix % 'dico_action.txt', section_fst, slu_prefix % 'clean_tail.fst', slu_actions, library=slu_library)
if slu_type == 0:
for keyword in self.xmlview.keywords:
keyword.add_listener(self.set_slu_history)
import alternate_slu
self.slu2 = alternate_slu.SLU(xml_filename)
self.slu2_expected = 0
self.slu2_performed = set()
self.slu2_last_section = -1
self.slu2_history = []
self.add(vbox)
self.show_all()
self.confirmer.hide()
# load css style
style_provider = Gtk.CssProvider()
style_provider.load_from_data(open('data/style.css', 'rb').read())
Gtk.StyleContext.add_provider_for_screen( Gdk.Screen.get_default(), style_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
# setup singletons
osc.setup(osc_host, osc_port)
actions.setup(self.confirmer, self.xmlview)
self.current_section = 0
self.current_section_history = ['']
self.slu_output = ''
self.kept_history = ''
self.kept_actions = []
self.connect('key-press-event', self.global_keybindings)
def global_keybindings(self, widget, event):
if event.keyval == Gdk.KEY_1:
self.set_section(0)
elif event.keyval == Gdk.KEY_2:
self.set_section(1)
elif event.keyval == Gdk.KEY_3:
self.set_section(2)
elif event.keyval == Gdk.KEY_4:
self.set_section(3)
elif event.keyval == Gdk.KEY_5:
self.set_section(4)
elif event.keyval == Gdk.KEY_6:
self.set_section(5)
elif event.keyval == Gdk.KEY_7:
self.set_section(6)
elif event.keyval == Gdk.KEY_8:
self.set_section(7)
elif event.keyval == Gdk.KEY_9:
self.set_section(8)
elif event.keyval == Gdk.KEY_y:
self.confirmer.click_yes()
elif event.keyval == Gdk.KEY_n:
self.confirmer.click_no()
elif event.keyval == Gdk.KEY_space:
if not self.asr.button.has_focus():
self.asr.button_clicked(None)
else:
return True
return False
def set_section(self, section_id):
self.xmlview.set_section(section_id)
def slu_finished(self, model, slu_output):
#slu_output = str(slu_output)
#print slu_output
self.slu_actions = model.get_actions().split()
self.slu_output = slu_output
print 'SLU output: "%s", actions: "%s"' % (self.slu_output, ' '.join(self.slu_actions))
#for action_id in range(0, len(self.slu_actions)):
for action_id in range(len(self.kept_actions), len(self.slu_actions)):
print 'NEW ACTION:', action_id, self.slu_actions[action_id]
action = actions.parse_slu_action(self.slu_actions[action_id])
if action.text.startswith('#ENDSEQUENCE('):
pass
elif action.text.startswith('#ENDSECTION('):
new_section = self.xmlview.get_section() + 1
self.confirmer.confirm('Go to section %d?' % (new_section + 1), 3, lambda: self.set_section(new_section))
else:
actions.perform_action(action, False)
self.kept_history = self.slu_output
self.kept_actions = self.slu_actions
def hyp_changed(self, hyp):
#hyp = ' '.join(hyp).replace('[noise]', ' ').split()
if len(hyp) > len(self.current_section_history):
self.current_section_history.append('')
#self.kept_history = self.slu_output
#self.kept_actions = self.slu_actions
words = hyp[-1].strip().replace('_', ' ').split()
if len(words) == 0:
return
section_id = self.xmlview.get_section()
if self.current_section != section_id:
self.previous_actions = 0
self.current_section = section_id
self.current_section_history = ['']
self.slu_output = ''
self.slu_actions = []
self.kept_history = ''
self.kept_actions = []
if section_id in self.slu:
self.slu[section_id].reset_slu()
#print section_id
if section_id in self.slu:
model = self.slu[section_id]
#self.previous_actions = model.num_actions()
print 'SLU input: history="%s", words="%s"' % (self.kept_history, ' '.join(words))
model.process(words, self.kept_history, self.slu_finished, False)
self.current_section_history[-1] = ' '.join(words)
def set_slu_history(self, keyword, uri, num):
for keyword in self.xmlview.keywords[:num + 1]:
keyword.highlight()
for keyword in self.xmlview.keywords[num + 1:]:
keyword.highlight(False)
self.kept_actions = [x.action for x in self.xmlview.keywords[:num + 1]]
self.kept_history = self.slu[self.current_section].get_action_history(num)
print 'SLU set history: history="%s", actions="%s"' % (self.kept_history, ' '.join(self.kept_actions))
self.slu_output = ''
self.slu_actions = []
def hyp_changed2(self, hyp):
section_id = self.xmlview.get_section()
if self.slu2_last_section != section_id:
self.slu2_last_section = section_id
self.slu2_performed = set()
self.slu2_history = []
self.slu2_expected = self.slu2.expected_at_section_start(section_id)
for word in hyp[-1].strip().replace('_', ' ').split():
#self.slu2_history.extend(hyp[-1].strip().replace('_', ' ').split())
self.slu2_history.append(word)
found = self.slu2.process(self.slu2_history[:-1], self.slu2_history[-1], self.slu2_expected, section_id)
if found and found.num not in self.slu2_performed and found.word == self.slu2_history[-1]:
self.slu2_performed.add(found.num)
print found.section, found.action, found.history, found.word
found.keyword = self.xmlview.keywords[found.num]
self.xmlview.highlight(found)
self.slu2_expected = found.num + 1
if self.slu2.last_in_section(found):
new_section = self.xmlview.get_section() + 1
self.confirmer.confirm('Go to section %d?' % (new_section + 1), 3, lambda: self.set_section(new_section))
def quit(self, window):
for slu in self.slu.values():
slu.shutdown()
Gtk.main_quit()
if __name__ == '__main__':
import selector
asr_config_file = 'asr/custom.cfg'
asr_config_file, osc_host, osc_port, slu_type = selector.ModelSelector(asr_config_file).run()
if asr_config_file == None or osc_host == None or osc_port == None or slu_type == None:
sys.exit(0)
app = ScriptedASR(asr_config_file, osc_host, osc_port, slu_type)
Gtk.main()