#!/usr/bin/env python3
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
##
# mPlane Software Development Kit
# Component framework
#
# (c) 2015 mPlane Consortium (http://www.ict-mplane.eu)
# Author: Stefano Pentassuglia <stefano.pentassuglia@ssbprogetti.it>
# Brian Trammell <brian@trammell.ch>
#
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
import mplane.utils
import mplane.model
import mplane.azn
import mplane.tls
import importlib
import logging
import tornado.web
import tornado.httpserver
from datetime import datetime
import time
from time import sleep
import urllib3
import threading
import socket
import random
# FIXME HACK
# some urllib3 versions let you disable warnings about untrusted CAs,
# which we use a lot in the project demo. Try to disable warnings if we can.
try:
urllib3.disable_warnings()
except:
pass
from threading import Thread
import json
logger = logging.getLogger(__name__)
DEFAULT_MPLANE_PORT = 8890
SLEEP_QUANTUM = 0.250
RETRY_QUANTUM = 5
CAPABILITY_PATH_ELEM = "capability"
SPECIFICATION_PATH_ELEM = "/"
DEFAULT_CAPABILITY_URL = "http://127.0.0.1:8889/register/capability"
DEFAULT_SPECIFICATION_URL = "http://127.0.0.1:8889/show/specification"
DEFAULT_RESULT_URL = "http://127.0.0.1:8889/register/result"
class BaseComponent(object):
def __init__(self, config):
self.config = config
# registry initialization phase (preload + fetch from URI)
registry_uri = None
if config is not None:
if "Registries" in config:
if "preload" in config["Registries"]:
for reg in config["Registries"]["preload"]:
mplane.model.preload_registry(reg)
if "default" in config["Registries"]:
registry_uri = config["Registries"]["default"]
mplane.model.initialize_registry(registry_uri)
self.tls = mplane.tls.TlsState(self.config)
self.scheduler = mplane.scheduler.Scheduler(config)
self._ipaddresses = None # list of IPs to listen on, if the component is Listener
services = self._load_services()
# Iterate over services, fixing up link sections
if config is not None and "Listener" in config["Component"]:
if "interfaces" in config["Component"]["Listener"] and \
config["Component"]["Listener"]["interfaces"]:
self._ipaddresses = config["Component"]["Listener"]["interfaces"]
if len(self._ipaddresses) == 1:
# Only do link fix-up if we have only one IP address. If listening
# on multiple, need to delegate link fix-up to the request handlers
# (see DiscoveryHandler._respond_capability())
if "TLS" in config:
link = "https://"
else:
link = "http://"
link += config["Component"]["Listener"]["interfaces"][0] + ":"
link += config["Component"]["Listener"]["port"] + SPECIFICATION_PATH_ELEM
for service in services:
service.set_capability_link(link)
# Now add all the services to the scheduler
for service in services:
self.scheduler.add_service(service)
def _load_services(self):
services = []
if self.config is not None:
# load all the modules that are present in the 'Modules' section
if "Modules" in self.config["Component"]:
for mod_name in self.config["Component"]["Modules"]:
module = importlib.import_module(mod_name)
kwargs = {}
for arg in self.config["Component"]["Modules"][mod_name]:
kwargs[arg] = self.config["Component"]["Modules"][mod_name][arg]
for service in module.services(**kwargs):
services.append(service)
return services
def remove_capability(self, capability):
for service in self.scheduler.services:
if service.capability().get_token() == capability.get_token():
self.scheduler.remove_service(service)
return
logger.warning("Component: no service to remove for capability "+repr(capability))
class ListenerHttpComponent(BaseComponent):
def __init__(self, config, io_loop=None, as_daemon=False):
self._port = DEFAULT_MPLANE_PORT
if config is not None and "Component" in config and "Listener" in config["Component"]:
if "port" in config["Component"]["Listener"]:
self._port = int(config["Component"]["Listener"]["port"])
self._path = SPECIFICATION_PATH_ELEM
super(ListenerHttpComponent, self).__init__(config)
application = tornado.web.Application([
(r"/", MessagePostHandler, {'scheduler': self.scheduler, 'tlsState': self.tls}),
(r"/" + CAPABILITY_PATH_ELEM, DiscoveryHandler, {'scheduler': self.scheduler,
'tlsState': self.tls,
'config': config}),
(r"/" + CAPABILITY_PATH_ELEM + "/.*", DiscoveryHandler, {'scheduler': self.scheduler,
'tlsState': self.tls,
'config': config}),
])
http_server = tornado.httpserver.HTTPServer(
application,
ssl_options=self.tls.get_ssl_options())
# run the server
if self._ipaddresses is not None:
for ip in self._ipaddresses:
http_server.listen(self._port, ip)
else:
http_server.listen(self._port)
logger.info("ListenerHttpComponent running on port " + str(self._port))
comp_t = Thread(target=self.listen_in_background, args=(io_loop,))
comp_t.setDaemon(as_daemon)
comp_t.start()
def listen_in_background(self, io_loop):
""" The component listens for requests in background """
if io_loop is None:
tornado.ioloop.IOLoop.instance().start()
[docs]class MPlaneHandler(tornado.web.RequestHandler):
"""
Abstract tornado RequestHandler that allows a
handler to respond with an mPlane Message or an Exception.
"""
def _respond_message(self, msg):
self.set_status(200)
self.set_header("Content-Type", "application/x-mplane+json")
self.write(mplane.model.unparse_json(msg))
self.finish()
def _respond_error(self, errmsg=None, exception=None, token=None, status=400):
if exception:
if len(exception.args) == 1:
errmsg = str(exception.args[0])
else:
errmsg = repr(exception.args)
elif errmsg is None:
raise RuntimeError("_respond_error called without message or exception")
mex = mplane.model.Exception(token=token, errmsg=errmsg)
self.set_status(status)
self.set_header("Content-Type", "application/x-mplane+json")
self.write(mplane.model.unparse_json(mex))
self.finish()
[docs]class DiscoveryHandler(MPlaneHandler):
"""
Exposes the capabilities registered with a given scheduler.
URIs ending with "capability" will result in an HTML page
listing links to each capability.
"""
def initialize(self, scheduler, tlsState, config):
self.scheduler = scheduler
self.tls = tlsState
self.config = config
def get(self):
# capabilities
path = self.request.path.split("/")[1:]
if path[0] == CAPABILITY_PATH_ELEM:
if len(path) == 1 or path[1] is None:
self._respond_capability_links()
else:
self._respond_capability(path[1])
else:
self._respond_error(errmsg="I only know how to handle /"+CAPABILITY_PATH_ELEM+" URLs via HTTP GET", status=405)
def _respond_capability_links(self):
self.set_status(200)
self.set_header("Content-Type", "text/html")
self.write("<html><head><title>Capabilities</title></head><body>")
no_caps_exposed = True
for key in self.scheduler.capability_keys():
if (not isinstance(self.scheduler.capability_for_key(key), mplane.model.Withdrawal) and
self.scheduler.azn.check(self.scheduler.capability_for_key(key),
self.tls.extract_peer_identity(self.request))):
no_caps_exposed = False
self.write("<a href='/capability/" + key + "'>" + key + "</a><br/>")
self.write("</body></html>")
if no_caps_exposed is True:
logger.warning("Discovery: no capabilities available to "+
self.tls.extract_peer_identity(self.request)+
", check authorizations")
self.finish()
def _respond_capability(self, key):
cap = self.scheduler.capability_for_key(key)
# if the 'link' field is empty, compose it using the host requested by the client/supervisor
if not cap.get_link():
if self.config is not None and "TLS" in self.config:
link = "https://"
else:
link = "http://"
link = link + self.request.host + SPECIFICATION_PATH_ELEM
cap.set_link(link)
self._respond_message(cap)
[docs]class MessagePostHandler(MPlaneHandler):
"""
Receives mPlane messages POSTed from a client, and passes them to a
scheduler for processing. After waiting for a specified delay to see
if a Result is immediately available, returns a receipt for future
redemption.
"""
def initialize(self, scheduler, tlsState, immediate_ms = 5000):
self.scheduler = scheduler
self.tls = tlsState
self.immediate_ms = immediate_ms
def get(self):
# message
self.set_status(200)
self.set_header("Content-Type", "text/html")
self.write("<html><head><title>mplane.httpsrv</title></head><body>")
self.write("This is a client-initiated mPlane component. POST mPlane messages to this URL to use.<br/>")
self.write("<a href='/"+CAPABILITY_PATH_ELEM+"'>Capabilities</a> provided by this server:<br/>")
for key in self.scheduler.capability_keys():
if (not isinstance(self.scheduler.capability_for_key(key), mplane.model.Withdrawal) and
self.scheduler.azn.check(self.scheduler.capability_for_key(key),
self.tls.extract_peer_identity(self.request))):
self.write("<br/><pre>")
self.write(mplane.model.unparse_json(self.scheduler.capability_for_key(key)))
self.write("</body></html>")
self.finish()
def post(self):
# unwrap json message from body
if (self.request.headers["Content-Type"] == "application/x-mplane+json"):
try:
msg = mplane.model.parse_json(self.request.body.decode("utf-8"))
except Exception as e:
self._respond_error(exception=e)
else:
self._respond_error(errmsg="I only know how to handle mPlane JSON messages via HTTP POST", status="406")
# check if requested capability is withdrawn
is_withdrawn = False
if isinstance(msg, mplane.model.Specification):
for key in self.scheduler.capability_keys():
cap = self.scheduler.capability_for_key(key)
if msg.fulfills(cap) and isinstance(cap, mplane.model.Withdrawal):
is_withdrawn = True
self._respond_message(cap)
if not is_withdrawn:
# hand message to scheduler
reply = self.scheduler.process_message(self.tls.extract_peer_identity(self.request), msg)
# wait for immediate delay
if self.immediate_ms > 0 and \
isinstance(msg, mplane.model.Specification) and \
isinstance(reply, mplane.model.Receipt):
job = self.scheduler.job_for_message(reply)
wait_start = datetime.utcnow()
while (datetime.utcnow() - wait_start).total_seconds() * 1000 < self.immediate_ms:
time.sleep(SLEEP_QUANTUM)
if job.failed() or job.finished():
reply = job.get_reply()
break
# return reply
self._respond_message(reply)
class InitiatorHttpComponent(BaseComponent):
def __init__(self, config, supervisor=False):
self._supervisor = supervisor
self._callback_token = "comp-cb" + str(random.random())
super(InitiatorHttpComponent, self).__init__(config)
# configuration of URLs that will be used for requests
if self.config is not None and "Component" in self.config and "Initiator" in self.config["Component"]:
if ("capability-url" in self.config["Component"]["Initiator"]
and "specification-url" in self.config["Component"]["Initiator"]
and "result-url" in self.config["Component"]["Initiator"]):
self.registration_url = urllib3.util.parse_url(
self.config["Component"]["Initiator"]["capability-url"])
self.specification_url = urllib3.util.parse_url(
self.config["Component"]["Initiator"]["specification-url"])
self.result_url = urllib3.util.parse_url(
self.config["Component"]["Initiator"]["result-url"])
elif "url" in self.config["Component"]["Initiator"]:
self.registration_url = urllib3.util.parse_url(self.config["Component"]["Initiator"]["url"])
self.specification_url = self.registration_url
self.result_url = self.registration_url
else:
raise ValueError("Config file is missing information on URLs in Component.Initiator. "
"See documentation for details")
else:
self.registration_url = urllib3.util.parse_url(DEFAULT_CAPABILITY_URL)
self.specification_url = urllib3.util.parse_url(DEFAULT_SPECIFICATION_URL)
self.result_url = urllib3.util.parse_url(DEFAULT_RESULT_URL)
self.pool = self.tls.pool_for(self.registration_url.scheme,
self.registration_url.host,
self.registration_url.port)
self._result_url = dict()
self.register_to_client()
self._callback_lock = threading.Lock()
# periodically poll the Client/Supervisor for Specifications
t = Thread(target=self.check_for_specs)
t.start()
def register_to_client(self, caps=None):
"""
Sends a list of capabilities to the Client, in order to register them
"""
env = mplane.model.Envelope()
logger.info("Component: registering my capabilities to "+self.registration_url)
# try to register capabilities, if URL is unreachable keep trying every 5 seconds
connected = False
while not connected:
try:
self._client_identity = self.tls.extract_peer_identity(self.registration_url)
connected = True
except:
logger.info("Component: client unreachable, will retry in "+str(RETRY_QUANTUM)+" sec.")
sleep(RETRY_QUANTUM)
# If caps is not None, register them
if caps is not None:
for cap in caps:
if self.scheduler.azn.check(cap, self._client_identity):
env.append_message(cap)
else:
# generate the envelope containing the capability list
no_caps_exposed = True
for key in self.scheduler.capability_keys():
cap = self.scheduler.capability_for_key(key)
if self.scheduler.azn.check(cap, self._client_identity):
env.append_message(cap)
no_caps_exposed = False
if no_caps_exposed is True:
logger.warning("Component: no capabilities available to "+
self._client_identity +", check authorizations")
if not self._supervisor:
exit(0)
# add callback capability to the list
# FIXME NOOO see issue #3
callback_cap = mplane.model.Capability(label="callback",
when = "now ... future", token = self._callback_token)
env.append_message(callback_cap)
# send the envelope to the client
res = self.send_message(self.registration_url, "POST", env)
# handle response message
if res.status == 200:
logger.info("Component: successfully registered to "+self.registration_url)
# FIXME this does not appear to have anything
# to do with the protocol specification, see issue #4
# body = json.loads(res.data.decode("utf-8"))
# print("\nCapability registration outcome:")
# for key in body:
# if body[key]['registered'] == "ok":
# print(key + ": Ok")
# else:
# print(key + ": Failed (" + body[key]['reason'] + ")")
# print("")
else:
logger.critical("Capability registration to "+self.registration_url+" failed:"+
str(res.status) + " - " + res.data.decode("utf-8"))
def check_for_specs(self):
"""
Poll the client for specifications
"""
while True:
# FIXME configurable default idle time.
self.idle_time = 5
# try to send a request for specifications. If URL is unreachable means that the Supervisor (or Client) has
# most probably died, so we need to re-register capabilities
try:
logger.info("Polling for specifications at " + self.specification_url)
res = self.send_message(self.specification_url, "GET")
except Exception as e:
logger.warning("Specification poll at " + self.specification_url + "failed :" + repr(e))
logger.warning("Attempting reregistration")
self.register_to_client()
if res.status == 200:
# specs retrieved: split them if there is more than one
env = mplane.model.parse_json(res.data.decode("utf-8"))
for spec in env.messages():
# handle callbacks
# FIXME NO NO NO see issue #3
if spec.get_label() == "callback":
self.idle_time = spec.when().timer_delays()[1]
break
# hand spec to scheduler, making sure the callback is called after
with self._callback_lock:
reply = self.scheduler.process_message(self._client_identity, spec, callback=self.return_results)
if not isinstance(spec, mplane.model.Interrupt):
self._result_url[spec.get_token()] = spec.get_link()
# send receipt to the Client/Supervisor
res = self.send_message(self._result_url[spec.get_token()], "POST", reply)
# not registered on supervisor, need to re-register
# FIXME what's 428 for? See issue #4
elif res.status == 428:
logger.warning("Specification poll got 428, attempting reregistration")
self.register_to_client()
else:
logger.critical("Specification poll to "+self.specification_url+" failed:"+
str(res.status) + " - " + res.data.decode("utf-8"))
sleep(self.idle_time)
def return_results(self,receipt):
"""
Checks if a job is complete, and in case sends it to the Client/Supervisor
"""
#wait for scheduling process above
with self._callback_lock:
pass
job = self.scheduler.job_for_message(receipt)
reply = job.get_reply()
# check if job is completed
if (job.finished() is not True and
job.failed() is not True):
logger.debug("Component: not returning partial result (%s len: %d, label: %s)"
% (type(reply).__name__, len(reply), reply.get_label()))
return
# send result to the Client/Supervisor
res = self.send_message(self._result_url[reply.get_token()], "POST", reply)
# handle response
label = reply.get_label()
if res.status == 200:
logger.info("posted "+ repr(reply) +
" to "+self._result_url[reply.get_token()])
else:
logger.critical("Result post to "+self._result_url[reply.get_token()]+" failed:"+
str(res.status) + " - " + res.data.decode("utf-8"))
def send_message(self, url_or_str, method, msg=None):
# if the URL is empty (meaning that the 'link' section was empty), use the default url for results
if not url_or_str:
url_or_str = self.result_url
if isinstance(url_or_str, str):
url = urllib3.util.parse_url(url_or_str)
else:
url = url_or_str
# if the URL has a different host from the one used for
# capabilities registration, we need a new connectionPool
if self.pool.is_same_host(mplane.utils.unparse_url(url)):
pool = self.pool
else:
pool = self.tls.pool_for(url.scheme, url.host, url.port)
if method == "POST" and msg is not None:
# post message
res = pool.urlopen('POST', url.path,
body=mplane.model.unparse_json(msg).encode("utf-8"),
headers={"content-type": "application/x-mplane+json"})
elif method == "GET":
# get message
res = pool.request('GET', url.path)
return res
def remove_capability(self, capability):
super(InitiatorHttpComponent, self).remove_capability(capability)
withdrawn_cap = mplane.model.Withdrawal(capability=capability)
self.register_to_client([withdrawn_cap])