Commit f9e371c6 authored by Benoit Favre's avatar Benoit Favre
Browse files

t push -u origin master

parents
SENSEI processing environment
=============================
This pipeline is a set of annotation services which can be run against the
conversation repository or run as standalone REST services.
Install requirements
--------------------
The REST services require bottle.py to be installed.
pip install --user -r requirements.txt -U
REST services
-------------
Core modules can be integrated as REST services which provide annotations
through a simple http protocol. The backend uses the bottle.py framework and a
server of your choice (the default WSGIRefServer is quite slow, you may install
"paste" instead).
For instance, you can run the rest/test.py script which reverses character strings.
rest/test.py
It replies to three URLs: / is a short textual help for using the service.
curl http://localhost:1234
It also recognizes GET requests of the form /test/<text>, where <text> is the
string you want to reverse.
curl http://localhost:1234/test/hello
Finally, for large inputs, it recognizes POST queries where the input string is
specified in a "text" form parameter.
curl --form "text=hello" http://localhost:1234/test
A generic REST service is also available. It runs a custom command and feeds it
with inputs through stdin and collects output from stdout. The following
example echoes the inputs as output. The name parameter set the name of the
service in the url.
rest/generic.py --port 1234 --name "cat" --command "cat"
curl http://localhost:1234/cat/hello
The command can also be made persistent (run in the background) and fed line by
line through stdin, and its output read line by line from stdout. Note that for
this to work, the command needs to flush stdout after each input.
rest/generic.py --port 1234 --name --command "awk '{x+=1;print x,\$0;fflush()}'" --persistent True
WARNING: the generic REST service does not enforce any kind of security.
You can check the rest/test.py implementation for making your own REST services
using the provided framework.
Repository integration
----------------------
Repository-integrated services poll the repository for new documents, process
them and push back annotation sets. In order to get access to the repository,
you can create a tunnel with (given a proper ssh key is setup):
util/repository.py --tunnel
The util/repository.py script has a lot more commands available. It can also be
used as a python module as full-functional repository client.
Once the tunnel is setup, you can try the test annotator which polls the
repository and adds phony annotations. That script allows to choose the host and
port of the repository.
repo/test.py
This script gets all documents which don't have the AMU_hasTest feature, puts
generated annotations in the AMU_Test annotation set. Then, it sets the
AMU_hasTest feature to True.
After a few documents are processed, you can kill the script and run the
cleaner which deletes all annotation sets created by the previous script.
repo/clean.py --presence_feature AMU_hasTest --annotation_set AMU_Test
The first way to integrate a novel processing module to the repository is to
use the generic annotator. It grabs all documents which don't have a given
feature, passes them as json, through stdin, to a custom command, reads the
generated annotations as json from the command's stdout, and creates a new
annotation set with the result and marks the document as processed.
Note that for the repository to accept the new annotation, it must conform to
its expectations and contain the right fields. Overwise, an error is returned.
For example, you can write a script which computes the length of the json
representation of a document, and creates a new "checksum" annotation with it.
cat script.sh
awk '{print "[{\"type\": \"checksum\", \"features\": {\"value\":"length()"}, \"start\": 0, \"end\": 0}]"}'
repo/generic.py --command ./script.sh --mark_feature "AMU_hasChecksum" --annotation "AMU_Checksum"
The command can be run once for each document, or just once in the background.
When doing so, the command is fed line by line and its output is read line by
line. For this mode to work, the command MUST flush stdout after processing an
input. See the generic REST service example for more details.
The second way to integrate a novel processing module to the repository is to
subclass the repository.AnnotationGenerator class in python. See the
repo/test.py script for an example.
class Annotator(repository.AnnotationGenerator):
def __init__(self):
query = '_MISSING_=MarkingFeature&_MAX_=20'
super(Annotator, self).__init__(query)
... # initialize object
def process_document(self, client, document):
print(document['content']['id'])
... # generate annotation
client.put_annotation_set(doc_id, 'AnnotatinoName', ...)
client.put_features(doc_id, {'MarkingFeature': True})
class Test:
def __init__(self):
pass
def process(self, text):
return ''.join(reversed(text))
#!/usr/bin/env python2
from __future__ import print_function
import os, sys
dirname = os.path.dirname(__file__) or '.'
sys.path.append(dirname + '/..')
from util import repository
class AnnotationCleaner(repository.AnnotationGenerator):
def __init__(self, presence_feature, annotation_set):
self.feature = presence_feature
self.annotation = annotation_set
query = '_PRESENT_=%s&_MAX_=20' % presence_feature
super(AnnotationCleaner, self).__init__(query)
def process_document(self, client, document):
doc_id = int(document['id'])
client.delete_annotation_set(doc_id, self.annotation)
client.delete_feature(doc_id, self.feature)
print(doc_id)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Clean repository from annotation set', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--host', type=str, help='repository host', default='localhost')
parser.add_argument('--port', type=int, help='repository port', default=8080)
parser.add_argument('--presence_feature', type=str, help='feature to select documents (i.e. "AMU_hasTest")', required=True)
parser.add_argument('--annotation_set', type=str, help='annotation set to remove (i.e. "AMU_Test")', required=True)
args = parser.parse_args()
client = repository.Client(host=args.host, port=args.port)
cleaner = AnnotationCleaner(args.presence_feature, args.annotation_set)
cleaner.run(client)
#!/usr/bin/env python2
from __future__ import print_function
import os, sys
import subprocess
import json
dirname = os.path.dirname(__file__) or '.'
sys.path.append(dirname + '/..')
from util import repository
pipe = None
def run_command(directory, command, text, persistent):
global pipe
if not persistent or pipe is None:
pipe = subprocess.Popen(command, shell=True, cwd=directory, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0)
if persistent:
if not text.endswith('\n'):
text += '\n'
pipe.stdin.write(text)
pipe.stdin.flush()
result = pipe.stdout.readline()
else:
result = pipe.communicate(input=text)[0]
return result
class GenericAnnotator(repository.AnnotationGenerator):
def __init__(self, mark_feature, annotation_set, document_query, command, directory, persistent):
if document_query is None:
document_query = mark_feature
query = '_MISSING_=%s&_MAX_=20' % document_query
super(GenericAnnotator, self).__init__(query)
self.mark_feature = mark_feature
self.annotation_set = annotation_set
self.command = command
self.directory = directory
self.persistent = persistent
def process_document(self, client, document):
content = document['content']
doc_id = int(document['id'])
print(doc_id)
annotation = run_command(self.directory, self.command, json.dumps(document), self.persistent)
client.put_annotation_set(doc_id, self.annotation_set, json.loads(annotation))
client.put_features(doc_id, {self.mark_feature: True})
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Generic repository annotator', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--host', type=str, help='repository host', default='localhost')
parser.add_argument('--port', type=int, help='repository port', default=8080)
parser.add_argument('--command', type=str, help='command to run', default='pwd')
parser.add_argument('--directory', type=str, help='directory where to run command', default='.')
parser.add_argument('--persistent', type=str, help='command is run once, and fed line by line', default=False)
parser.add_argument('--mark_feature', type=str, help='feature to mark documents (i.e. "AMU_hasTest")', required=True)
parser.add_argument('--annotation_set', type=str, help='annotation set name (i.e. "AMU_Test")', required=True)
parser.add_argument('--query', type=str, help='document query (feature1=value1&feature2=value2...), defaults to the mark_feature')
args = parser.parse_args()
client = repository.Client(host=args.host, port=args.port)
annotator = GenericAnnotator(args.mark_feature, args.annotation_set, args.query, args.command, args.directory, args.persistent)
annotator.run(client)
#!/usr/bin/env python2
from __future__ import print_function
import os, sys
dirname = os.path.dirname(__file__) or '.'
sys.path.append(dirname + '/..')
from util import repository
from core import test
class TestAnnotator(repository.AnnotationGenerator):
def __init__(self):
query = '_MISSING_=AMU_hasTest&_MAX_=20'
super(TestAnnotator, self).__init__(query)
self.tester = test.Test()
def process_document(self, client, document):
content = document['content']
doc_id = int(document['id'])
print(doc_id)
annotation = self.tester.process(content)
client.put_annotation_set(doc_id, 'AMU_Test', [{'type': 'test', 'features': {'reversed': annotation}, 'start': 0, 'end': len(content)}])
client.put_features(doc_id, {'AMU_hasTest': True})
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Repository test annotator', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--host', type=str, help='repository host', default='localhost')
parser.add_argument('--port', type=int, help='repository port', default=8080)
args = parser.parse_args()
client = repository.Client(host=args.host, port=args.port)
tester = TestAnnotator()
tester.run(client)
#!/usr/bin/env python2
import bottle
import json
import sys
import os
import urllib
import subprocess
# Generic REST service
# Supports /name/<input> GET queries, as well as /name POST queries with input passed as POST argument.
# The command is either run for every query, or run in the background with --persistent True.
# The input is sent as stdin, and the output is read from stdout.
# In persistent mode, each input / output should be one liners.
#
# Example use:
# python2 rest/generic.py --port 1234 --command "ls" --directory ..
# python2 rest/generic.py --port 1234 --command "awk '{x+=1;print x,\$0;fflush()}'" --persistent True
pipe = None
def run_command(directory, command, text, persistent):
global pipe
if not persistent or pipe is None:
pipe = subprocess.Popen(command, shell=True, cwd=directory, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0)
if persistent:
if not text.endswith('\n'):
text += '\n'
pipe.stdin.write(text)
pipe.stdin.flush()
result = pipe.stdout.readline()
else:
result = pipe.communicate(input=text)[0]
return result
def setup_routes(name, command, directory, persistent):
@bottle.get('/')
def intex():
return '<h2>SENSEI REST service: %s (command="%s", directory="%s", persistent="%s")</h2>' % (name, command, directory, persistent) \
+ '<ul>' \
+ '<li>GET /%s/&lt;input&gt; (parse input passed in url)</li>' % name \
+ '<li>POST /%s (process input passed in POST "input" field)</li>' % name \
+ '</ul>'
@bottle.get('/%s/<text>' % name)
def process_get(text):
bottle.response.content_type = 'application/json'
return run_command(directory, command, text, persistent)
@bottle.post('/%s' % name)
def process_post():
global parser
text = bottle.request.forms.get('text')
bottle.response.content_type = 'application/json'
return run_command(directory, command, text, persistent)
def run(port):
bottle.run(host='0.0.0.0', port=port)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Generic REST service', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--port', type=int, help='server port', default=1234)
parser.add_argument('--name', type=str, help='service name', default='generic')
parser.add_argument('--command', type=str, help='command to run', default='pwd')
parser.add_argument('--directory', type=str, help='directory where to run command', default='.')
parser.add_argument('--persistent', type=str, help='command is run once, and fed line by line', default=False)
args = parser.parse_args()
setup_routes(args.name, args.command, args.directory, args.persistent)
run(args.port)
#!/usr/bin/env python2
import bottle
import json
import sys
import os
import urllib
# Simple example of rest service using the "test" core module
dirname = os.path.dirname(__file__) or '.'
sys.path.append('%s/..' % dirname)
from core import test
@bottle.get('/')
def intex():
return '<h2>SENSEI REST service: test</h2>' \
+ '<ul>' \
+ '<li>GET /test/&lt;text&gt; (reverse text passed in url, utf8-encoded)</li>' \
+ '<li>POST /test (reverse text passed in POST "text" field, utf8-encoded)</li>' \
+ '</ul>' \
+ '<p>Example: <a href="/test/hello">/test/hello</a></p>\n'
@bottle.get('/test/<text>')
def process_get(text):
global tester
bottle.response.content_type = 'application/json'
return json.dumps(tester.process(text))
@bottle.post('/test')
def process_post():
global tester
text = bottle.request.forms.get('text')
bottle.response.content_type = 'application/json'
return json.dumps(tester.process(text))
def run(port):
global tester
tester = test.Test()
bottle.run(host='0.0.0.0', port=port)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='REST tester', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--port', type=int, help='server port', default=1234)
args = parser.parse_args()
run(args.port)
#!/usr/bin/env python2
from __future__ import print_function
import atexit
import json
import os
import shlex
import subprocess
import sys
import time
from six.moves.urllib.error import URLError, HTTPError
from six.moves.urllib.parse import quote
from six.moves.urllib.request import Request, urlopen
# TODO(documentation, distribute)
class RepositoryException(BaseException):
def __init__(self, code, message):
super(RepositoryException, self).__init__('code=%d, message="%s"' % (code, message))
self.code = code
self.message = message
class Client(object):
def __init__(self, host='127.0.0.1', port=8080):
self.host = host
self.port = port
self.version = '20160107-1526' # up to date with that repository version
def load(self, method, url, data=None, keep_data=False, content_type='text/plain'):
url = "http://%s:%d%s" % (self.host, self.port, url)
if data is None:
request = Request(url)
else:
if method == 'GET':
url += '?'
if isinstance(data, dict):
url += '&'.join(['%s=%s' % (quote(str(name)), quote(str(value))) for name, value in data.items()])
elif isinstance(data, list):
url += '&'.join(['%s' % (quote(str(name)), quote(str(value))) for name, value in data])
elif isinstance(data, str):
url += data
request = Request(url)
elif keep_data:
request = Request(url, data=data.encode('utf8'), headers={'Content-Type': content_type})
else:
request = Request(url, data=json.dumps(data).encode('utf8'), headers={'Content-Type': 'application/json'})
request.get_method = lambda: method
handler = urlopen(request)
result = json.loads(handler.read().decode('utf8'))
if result['success'] is False:
raise RepositoryException(result['code'], result['result'])
return result['result']
def get_documents(self):
return self.load('GET', '/repository/documents')
def get_document(self, docid):
return self.load('GET', '/repository/document/%s' % docid)
def get_content(self, docid):
return self.load('GET', '/repository/document/%s/content' % docid)
def get_features(self, docid, feature_names=None):
if feature_names is None:
return self.load('GET', '/repository/document/%s/features' % docid)
else:
return self.load('POST', '/repository/document/%s/features' % docid, feature_names)
def query_features(self, key_value_pairs):
return self.load('GET', '/repository/documents/features', key_value_pairs)
def has_feature(self, feature_name, num=10):
return self.load('GET', '/repository/documents/present/feature/%s/%d' % (feature_name, num))
def missing_feature(self, feature_name, num=10):
return self.load('GET', '/repository/documents/missing/feature/%s/%d' % (feature_name, num))
def false_feature(self, feature_name, num=10):
return self.load('GET', '/repository/documents/false/feature/%s/%d' % (feature_name, num))
def get_documents_complex(self, query):
return self.load('GET', '/repository/doc-feature-query/ids', query)
def get_documents_complex_full(self, query):
return self.load('GET', '/repository/doc-feature-query/full', query)
def delete_feature(self, docid, feature):
return self.load('DELETE', '/repository/document/%s/feature/%s' % (docid, feature))
def delete_features(self, docid, features):
return self.load('DELETE', '/repository/document/%s/features' % docid, features)
def delete_document(self, docid):
return self.load('DELETE', '/repository/document/%s' % docid)
def delete_annotation_set(self, docid, name):
return self.load('DELETE', '/repository/document/%s/annotation-set/%s' % (docid, name))
def delete_annotation(self, docid, name, annotation_id):
return self.load('DELETE', '/repository/document/%s/annotation-set/%s/%s' % (docid, name, annotation_id))
def post_document(self, content):
return self.load('POST', '/repository/document', content)
def post_document_websays(self, content):
return self.load('POST', '/repository/document/websays', content, keep_data=True, content_type='application/xml')
def post_document_plaintext(self, content):
return self.load('POST', '/repository/document/plaintext', content, keep_data=True, content_type='text/plain')
def test_doc(self):
return self.load('GET', '/repository/test-doc')
def test_doc_str(self, content):
return self.load('GET', '/repository/test-doc/%s' % content)
def test_doc_post(self, content):
return self.load('POST', '/repository/test-doc', content, keep_data=True, content_type='text/plain')
def put_features(self, docid, features):
return self.load('PUT', '/repository/document/%d/features' % docid, features)
def put_annotations(self, docid, annotations):
return self.load('PUT', '/repository/document/%d/annotations' % docid, annotations)
def put_annotation_set(self, docid, name, content):
return self.load('PUT', '/repository/document/%d/annotation-set/%s' % (docid, name), content)
class AnnotationGenerator(object):
def __init__(self, query):
self.query = query
def process_document(self, client, document):
#print(document['id'])
raise NotImplementedError
def run(self, client, delay=300):
print('waiting for more documents')
while True:
documents = client.get_documents_complex_full(self.query)
if len(documents) > 0:
for document in documents:
self.process_document(client, document)
print('waiting for more documents')
else:
time.sleep(delay)
def test_repository(client):
"""Run tests on the repository client."""
print(len(client.get_documents()))
doc = client.test_doc()
print(doc)
print(client.get_document(doc))
print(client.get_content(doc))
print(client.get_features(doc))
print(client.get_features(doc, ['testDocument']))
print(client.query_features({"provenance": "testDoc"}))
print(client.has_feature('testDocument', 2))
print(client.missing_feature('Websays_sourceType', 2))
print(client.false_feature('Websays_isBestComment', 2))
print(client.get_documents_complex({"testDocument": True}))
print(client.get_documents_complex_full({"testDocument": True}))
print(client.delete_feature(doc, "misc"))
print(client.delete_features(doc, ["junk"]))
print(client.delete_annotation_set(doc, 'all'))
feature = client.get_document(doc)['annotations']['split'][0]['id']
print(feature)
print(client.delete_annotation(doc, 'split', feature))
print(client.put_features(doc, {"x": "y"}))
print(client.put_annotations(doc, {"w": []}))
print(client.put_annotation_set(doc, 'z', []))
print(client.delete_document(doc))
doc = client.post_document({"name": "x", "content": "x"})
print(client.delete_document(doc))
doc = client.post_document_websays('<senseiClipping><text>x</text></senseiClipping>')[0]
print(client.delete_document(doc))
doc = client.post_document_plaintext('x')
print(client.delete_document(doc))
def test_annotator(client):
annotator = AnnotationGenerator("x");
annotator.run(client)
tunnel = None
def start_tunnel():
"""Create a tunnel to the SENSEI repository.
This assumes that you have a ssh key setup to access that tunnel.
"""
global tunnel
if tunnel is None:
print('starting tunnel')