166 lines
4.4 KiB
Python
166 lines
4.4 KiB
Python
#!/usr/bin/env python
|
|
|
|
"""
|
|
(c) 2015 - Copyright Red Hat Inc
|
|
|
|
Authors:
|
|
Pierre-Yves Chibon <pingou@pingoured.fr>
|
|
|
|
|
|
This server listens to message sent via redis and send the corresponding
|
|
web-hook request.
|
|
|
|
Using this mechanism, we no longer block the main application if the
|
|
receiving end is offline or so.
|
|
|
|
"""
|
|
|
|
import datetime
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import os
|
|
import requests
|
|
import time
|
|
import uuid
|
|
|
|
import six
|
|
import trollius
|
|
import trollius_redis
|
|
|
|
from kitchen.text.converters import to_bytes
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
if 'PAGURE_CONFIG' not in os.environ \
|
|
and os.path.exists('/etc/pagure/pagure.cfg'):
|
|
print 'Using configuration file `/etc/pagure/pagure.cfg`'
|
|
os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
|
|
|
|
|
|
import pagure
|
|
import pagure.lib
|
|
from pagure.exceptions import PagureEvException
|
|
|
|
_i = 0
|
|
|
|
|
|
def call_web_hooks(project, topic, msg):
|
|
''' Sends the web-hook notification. '''
|
|
log.info(
|
|
"Processing project: %s - topic: %s", project.fullname, topic)
|
|
log.debug('msg: %s', msg)
|
|
|
|
# Send web-hooks notification
|
|
global _i
|
|
_i += 1
|
|
year = datetime.datetime.now().year
|
|
if isinstance(topic, six.text_type):
|
|
topic = to_bytes(topic, encoding='utf8', nonstring="passthru")
|
|
msg = dict(
|
|
topic=topic.decode('utf-8'),
|
|
msg=msg,
|
|
timestamp=int(time.time()),
|
|
msg_id=str(year) + '-' + str(uuid.uuid4()),
|
|
i=_i,
|
|
)
|
|
|
|
content = json.dumps(msg)
|
|
hashhex = hmac.new(
|
|
str(project.hook_token), content, hashlib.sha1).hexdigest()
|
|
headers = {
|
|
'X-Pagure-Topic': topic,
|
|
'X-Pagure-Signature': hashhex
|
|
}
|
|
msg = json.dumps(msg)
|
|
for url in project.settings.get('Web-hooks').split('\n'):
|
|
url = url.strip()
|
|
log.info('Calling url %s' % url)
|
|
try:
|
|
req = requests.post(
|
|
url,
|
|
headers=headers,
|
|
data={'payload': msg}
|
|
)
|
|
if not req:
|
|
log.info(
|
|
'An error occured while querying: %s - '
|
|
'Error code: %s' % (url, req.status_code))
|
|
except (requests.exceptions.RequestException, Exception) as err:
|
|
log.info(
|
|
'An error occured while querying: %s - Error: %s' % (
|
|
url, err))
|
|
|
|
|
|
@trollius.coroutine
|
|
def handle_messages():
|
|
connection = yield trollius.From(trollius_redis.Connection.create(
|
|
host='0.0.0.0', port=6379, db=0))
|
|
|
|
# Create subscriber.
|
|
subscriber = yield trollius.From(connection.start_subscribe())
|
|
|
|
# Subscribe to channel.
|
|
yield trollius.From(subscriber.subscribe(['pagure.hook']))
|
|
|
|
# Inside a while loop, wait for incoming events.
|
|
while True:
|
|
reply = yield trollius.From(subscriber.next_published())
|
|
log.info(
|
|
'Received: %s on channel: %s',
|
|
repr(reply.value), reply.channel)
|
|
data = json.loads(reply.value)
|
|
username = None
|
|
if data['project'].startswith('forks'):
|
|
username, projectname = data['project'].split('/', 1)
|
|
else:
|
|
projectname = data['project']
|
|
|
|
namespace = None
|
|
if '/' in projectname:
|
|
namespace, projectname = projectname.split('/', 1)
|
|
|
|
project = pagure.lib.get_project(
|
|
session=pagure.SESSION, name=projectname, user=username)
|
|
log.info('Got the project, going to the webhooks')
|
|
call_web_hooks(project, data['topic'], data['msg'])
|
|
|
|
|
|
def main():
|
|
server = None
|
|
try:
|
|
loop = trollius.get_event_loop()
|
|
tasks = [
|
|
trollius.async(handle_messages()),
|
|
]
|
|
loop.run_until_complete(trollius.wait(tasks))
|
|
loop.run_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except trollius.ConnectionResetError:
|
|
pass
|
|
|
|
log.info("End Connection")
|
|
loop.close()
|
|
log.info("End")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
log = logging.getLogger("")
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
|
|
|
|
# setup console logging
|
|
log.setLevel(logging.DEBUG)
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.DEBUG)
|
|
|
|
aslog = logging.getLogger("asyncio")
|
|
aslog.setLevel(logging.DEBUG)
|
|
|
|
ch.setFormatter(formatter)
|
|
log.addHandler(ch)
|
|
main()
|