#!/usr/bin/env python """ (c) 2015 - Copyright Red Hat Inc Authors: Pierre-Yves Chibon Streaming server for pagure's eventsource feature This server takes messages sent to redis and publish them at the specified endpoint To test, run this script and in another terminal nc localhost 8080 HELLO GET /test/issue/26?foo=bar HTTP/1.1 """ import datetime import logging import os import urlparse import trollius import trollius_redis log = logging.getLogger(__name__) if 'PAGURE_CONFIG' not in os.environ \ and os.path.exists('/etc/pagure/pagure.cfg'): print 'Using configuration file `/etc/pagure/pagure.cfg`' os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' import pagure import pagure.lib from pagure.exceptions import PagureEvException SERVER = None def get_obj_from_path(path): """ Return the Ticket or Request object based on the path provided. """ username = None try: if path.startswith('/fork'): username, repo, obj, objid = path.split('/')[2:6] else: repo, obj, objid = path.split('/')[1:4] except: raise PagureEvException("Invalid URL: %s" % path) repo = pagure.lib.get_project(pagure.SESSION, repo, user=username) if repo is None: raise PagureEvException("Project '%s' not found" % repo) output = None if obj == 'issue': if not repo.settings.get('issue_tracker', True): raise PagureEvException("No issue tracker found for this project") output = pagure.lib.search_issues( pagure.SESSION, repo, issueid=objid) if output is None or output.project != repo: raise PagureEvException("Issue '%s' not found" % objid) if output.private: # TODO: find a way to do auth raise PagureEvException( "This issue is private and you are not allowed to view it") elif obj == 'pull-request': if not repo.settings.get('pull_requests', True): raise PagureEvException( "No pull-request tracker found for this project") output = pagure.lib.search_pull_requests( pagure.SESSION, project_id=repo.id, requestid=objid) if output is None or output.project != repo: raise PagureEvException("Pull-Request '%s' not found" % objid) else: raise PagureEvException("Invalid object provided: '%s'" % obj) return output @trollius.coroutine def handle_client(client_reader, client_writer): data = None while True: # give client a chance to respond, timeout after 10 seconds line = yield trollius.From(trollius.wait_for( client_reader.readline(), timeout=10.0)) if not line.decode().strip(): break line = line.decode().rstrip() if data is None: data = line if data is None: log.warning("Expected ticket uid, received None") return data = data.decode().rstrip().split() log.info("Received %s", data) if not data: log.warning("No URL provided: %s" % data) return if not '/' in data[1]: log.warning("Invalid URL provided: %s" % data[1]) return url = urlparse.urlsplit(data[1]) try: obj = get_obj_from_path(url.path) except PagureEvException as err: log.warning(err.message) return origin = pagure.APP.config.get('APP_URL') if origin.endswith('/'): origin = origin[:-1] client_writer.write(( "HTTP/1.0 200 OK\n" "Content-Type: text/event-stream\n" "Cache: nocache\n" "Connection: keep-alive\n" "Access-Control-Allow-Origin: %s\n\n" % origin ).encode()) connection = yield trollius.From(trollius_redis.Connection.create( host=pagure.APP.config['REDIS_HOST'], port=pagure.APP.config['REDIS_PORT'], db=pagure.APP.config['REDIS_DB'])) try: # Create subscriber. subscriber = yield trollius.From(connection.start_subscribe()) # Subscribe to channel. yield trollius.From(subscriber.subscribe(['pagure.%s' % obj.uid])) # Inside a while loop, wait for incoming events. while True: reply = yield trollius.From(subscriber.next_published()) #print(u'Received: ', repr(reply.value), u'on channel', reply.channel) log.info(reply) log.info("Sending %s", reply.value) client_writer.write(('data: %s\n\n' % reply.value).encode()) yield trollius.From(client_writer.drain()) except trollius.ConnectionResetError as err: log.exception("ERROR: ConnectionResetError in handle_client") except Exception as err: log.exception("ERROR: Exception in handle_client") finally: # Wathever happens, close the connection. connection.close() client_writer.close() @trollius.coroutine def stats(client_reader, client_writer): try: log.info('Clients: %s', SERVER.active_count) client_writer.write(( "HTTP/1.0 200 OK\n" "Cache: nocache\n\n" ).encode()) client_writer.write(('data: %s\n\n' % SERVER.active_count).encode()) yield trollius.From(client_writer.drain()) except trollius.ConnectionResetError as err: log.info(err) pass finally: client_writer.close() return def main(): global SERVER try: loop = trollius.get_event_loop() coro = trollius.start_server( handle_client, host=None, port=pagure.APP.config['EVENTSOURCE_PORT'], loop=loop) SERVER = loop.run_until_complete(coro) log.info('Serving server at {}'.format(SERVER.sockets[0].getsockname())) if pagure.APP.config.get('EV_STATS_PORT'): stats_coro = trollius.start_server( stats, host=None, port=pagure.APP.config.get('EV_STATS_PORT'), loop=loop) stats_server = loop.run_until_complete(stats_coro) log.info('Serving stats at {}'.format( stats_server.sockets[0].getsockname())) loop.run_forever() except KeyboardInterrupt: pass except trollius.ConnectionResetError as err: log.exception("ERROR: ConnectionResetError in main") except Exception as err: log.exception("ERROR: Exception in main") finally: # Close the server SERVER.close() if pagure.APP.config.get('EV_STATS_PORT'): stats_server.close() log.info("End Connection") loop.run_until_complete(SERVER.wait_closed()) loop.close() log.info("End") if __name__ == '__main__': log = logging.getLogger("") formatter = logging.Formatter( "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") # setup console logging log.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) aslog = logging.getLogger("asyncio") aslog.setLevel(logging.DEBUG) ch.setFormatter(formatter) log.addHandler(ch) main()