248 lines
7.1 KiB
Python
248 lines
7.1 KiB
Python
#!/usr/bin/env python
|
|
|
|
"""
|
|
(c) 2015 - Copyright Red Hat Inc
|
|
|
|
Authors:
|
|
Pierre-Yves Chibon <pingou@pingoured.fr>
|
|
|
|
|
|
Streaming server for pagure's eventsource feature
|
|
This server takes messages sent to redis and publish them at the specified
|
|
endpoint
|
|
|
|
To test, run this script and in another terminal
|
|
nc localhost 8080
|
|
HELLO
|
|
|
|
GET /test/issue/26?foo=bar HTTP/1.1
|
|
|
|
"""
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import urlparse
|
|
|
|
import trollius
|
|
import trollius_redis
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
if 'PAGURE_CONFIG' not in os.environ \
|
|
and os.path.exists('/etc/pagure/pagure.cfg'):
|
|
print 'Using configuration file `/etc/pagure/pagure.cfg`'
|
|
os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
|
|
|
|
|
|
import pagure
|
|
import pagure.lib
|
|
from pagure.exceptions import PagureEvException
|
|
|
|
SERVER = None
|
|
|
|
def get_obj_from_path(path):
|
|
""" Return the Ticket or Request object based on the path provided.
|
|
"""
|
|
username = None
|
|
try:
|
|
if path.startswith('/fork'):
|
|
username, repo, obj, objid = path.split('/')[2:6]
|
|
else:
|
|
repo, obj, objid = path.split('/')[1:4]
|
|
except:
|
|
raise PagureEvException("Invalid URL: %s" % path)
|
|
|
|
repo = pagure.lib.get_project(pagure.SESSION, repo, user=username)
|
|
|
|
if repo is None:
|
|
raise PagureEvException("Project '%s' not found" % repo)
|
|
|
|
output = None
|
|
if obj == 'issue':
|
|
if not repo.settings.get('issue_tracker', True):
|
|
raise PagureEvException("No issue tracker found for this project")
|
|
|
|
output = pagure.lib.search_issues(
|
|
pagure.SESSION, repo, issueid=objid)
|
|
|
|
if output is None or output.project != repo:
|
|
raise PagureEvException("Issue '%s' not found" % objid)
|
|
|
|
if output.private:
|
|
# TODO: find a way to do auth
|
|
raise PagureEvException(
|
|
"This issue is private and you are not allowed to view it")
|
|
elif obj == 'pull-request':
|
|
if not repo.settings.get('pull_requests', True):
|
|
raise PagureEvException(
|
|
"No pull-request tracker found for this project")
|
|
|
|
output = pagure.lib.search_pull_requests(
|
|
pagure.SESSION, project_id=repo.id, requestid=objid)
|
|
|
|
if output is None or output.project != repo:
|
|
raise PagureEvException("Pull-Request '%s' not found" % objid)
|
|
|
|
else:
|
|
raise PagureEvException("Invalid object provided: '%s'" % obj)
|
|
|
|
return output
|
|
|
|
|
|
@trollius.coroutine
|
|
def handle_client(client_reader, client_writer):
|
|
data = None
|
|
while True:
|
|
# give client a chance to respond, timeout after 10 seconds
|
|
line = yield trollius.From(trollius.wait_for(
|
|
client_reader.readline(),
|
|
timeout=10.0))
|
|
if not line.decode().strip():
|
|
break
|
|
line = line.decode().rstrip()
|
|
if data is None:
|
|
data = line
|
|
|
|
if data is None:
|
|
log.warning("Expected ticket uid, received None")
|
|
return
|
|
|
|
data = data.decode().rstrip().split()
|
|
log.info("Received %s", data)
|
|
if not data:
|
|
log.warning("No URL provided: %s" % data)
|
|
return
|
|
|
|
if not '/' in data[1]:
|
|
log.warning("Invalid URL provided: %s" % data[1])
|
|
return
|
|
|
|
url = urlparse.urlsplit(data[1])
|
|
|
|
try:
|
|
obj = get_obj_from_path(url.path)
|
|
except PagureEvException as err:
|
|
log.warning(err.message)
|
|
return
|
|
|
|
origin = pagure.APP.config.get('APP_URL')
|
|
if origin.endswith('/'):
|
|
origin = origin[:-1]
|
|
|
|
client_writer.write((
|
|
"HTTP/1.0 200 OK\n"
|
|
"Content-Type: text/event-stream\n"
|
|
"Cache: nocache\n"
|
|
"Connection: keep-alive\n"
|
|
"Access-Control-Allow-Origin: %s\n\n" % origin
|
|
).encode())
|
|
|
|
connection = yield trollius.From(trollius_redis.Connection.create(
|
|
host=pagure.APP.config['REDIS_HOST'],
|
|
port=pagure.APP.config['REDIS_PORT'],
|
|
db=pagure.APP.config['REDIS_DB']))
|
|
|
|
try:
|
|
|
|
# Create subscriber.
|
|
subscriber = yield trollius.From(connection.start_subscribe())
|
|
|
|
# Subscribe to channel.
|
|
yield trollius.From(subscriber.subscribe(['pagure.%s' % obj.uid]))
|
|
|
|
# Inside a while loop, wait for incoming events.
|
|
while True:
|
|
reply = yield trollius.From(subscriber.next_published())
|
|
#print(u'Received: ', repr(reply.value), u'on channel', reply.channel)
|
|
log.info(reply)
|
|
log.info("Sending %s", reply.value)
|
|
client_writer.write(('data: %s\n\n' % reply.value).encode())
|
|
yield trollius.From(client_writer.drain())
|
|
|
|
except trollius.ConnectionResetError as err:
|
|
log.exception("ERROR: ConnectionResetError in handle_client")
|
|
except Exception as err:
|
|
log.exception("ERROR: Exception in handle_client")
|
|
finally:
|
|
# Wathever happens, close the connection.
|
|
connection.close()
|
|
client_writer.close()
|
|
|
|
|
|
@trollius.coroutine
|
|
def stats(client_reader, client_writer):
|
|
|
|
try:
|
|
log.info('Clients: %s', SERVER.active_count)
|
|
client_writer.write((
|
|
"HTTP/1.0 200 OK\n"
|
|
"Cache: nocache\n\n"
|
|
).encode())
|
|
client_writer.write(('data: %s\n\n' % SERVER.active_count).encode())
|
|
yield trollius.From(client_writer.drain())
|
|
|
|
except trollius.ConnectionResetError as err:
|
|
log.info(err)
|
|
pass
|
|
finally:
|
|
client_writer.close()
|
|
return
|
|
|
|
|
|
def main():
|
|
global SERVER
|
|
|
|
try:
|
|
loop = trollius.get_event_loop()
|
|
coro = trollius.start_server(
|
|
handle_client,
|
|
host=None,
|
|
port=pagure.APP.config['EVENTSOURCE_PORT'],
|
|
loop=loop)
|
|
SERVER = loop.run_until_complete(coro)
|
|
log.info('Serving server at {}'.format(SERVER.sockets[0].getsockname()))
|
|
if pagure.APP.config.get('EV_STATS_PORT'):
|
|
stats_coro = trollius.start_server(
|
|
stats,
|
|
host=None,
|
|
port=pagure.APP.config.get('EV_STATS_PORT'),
|
|
loop=loop)
|
|
stats_server = loop.run_until_complete(stats_coro)
|
|
log.info('Serving stats at {}'.format(
|
|
stats_server.sockets[0].getsockname()))
|
|
loop.run_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except trollius.ConnectionResetError as err:
|
|
log.exception("ERROR: ConnectionResetError in main")
|
|
except Exception as err:
|
|
log.exception("ERROR: Exception in main")
|
|
finally:
|
|
# Close the server
|
|
SERVER.close()
|
|
if pagure.APP.config.get('EV_STATS_PORT'):
|
|
stats_server.close()
|
|
log.info("End Connection")
|
|
loop.run_until_complete(SERVER.wait_closed())
|
|
loop.close()
|
|
log.info("End")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
log = logging.getLogger("")
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
|
|
|
|
# setup console logging
|
|
log.setLevel(logging.DEBUG)
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.DEBUG)
|
|
|
|
aslog = logging.getLogger("asyncio")
|
|
aslog.setLevel(logging.DEBUG)
|
|
|
|
ch.setFormatter(formatter)
|
|
log.addHandler(ch)
|
|
main()
|