#
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
##
# mPlane Protocol Reference Implementation
# Client SDK API implementation
#
# (c) 2013-2015 mPlane Consortium (http://www.ict-mplane.eu)
# Author: Brian Trammell <brian@trammell.ch>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
import mplane.model
import mplane.utils
from datetime import datetime
import html.parser
import urllib3
# FIXME HACK
# some urllib3 versions let you disable warnings about untrusted CAs,
# which we use a lot in the project demo. Try to disable warnings if we can.
try:
urllib3.disable_warnings()
except:
pass
from threading import Thread
import queue
import tornado.web
import tornado.httpserver
import tornado.ioloop
import logging
CAPABILITY_PATH_ELEM = "capability"
FORGED_DN_HEADER = "Forged-MPlane-Identity"
DEFAULT_IDENTITY = "default"
DEFAULT_PORT = 8889
DEFAULT_HOST = "127.0.0.1"
DEFAULT_REGISTRATION_PATH = "register/capability"
DEFAULT_SPECIFICATION_PATH = "show/specification"
DEFAULT_RESULT_PATH = "register/result"
logger = logging.getLogger(__name__)
[docs]class BaseClient(object):
"""
Core implementation of a generic programmatic client.
Used for common client state management between
HttpInitiatorClient and HttpListenerClient;
use one of these instead.
"""
def __init__(self, tls_state, supervisor=False, exporter=None):
self._tls_state = tls_state
self.reset()
self._ssn = -1
self._supervisor = supervisor
if self._supervisor:
self._exporter = exporter
def reset(self,args=None):
self._capabilities = {}
self._capability_labels = {}
self._capability_identities = {}
self._receipt_identities = {}
self._receipts = {}
self._receipt_labels = {}
self._results = {}
self._result_labels = {}
# structures for capability expiration after timeout
self._capability_timeouts = {}
self._capabilities_by_identity = {}
def _add_capability(self, msg, identity):
"""
Add a capability to internal state. The capability will be recallable
by token, and, if present, by label.
Internal use only; use handle_message instead.
"""
# FIXME retoken on token collision with another identity
token = msg.get_token()
self._capabilities[token] = msg
self._capability_timeouts[token] = datetime.utcnow()
if msg.get_label():
self._capability_labels[msg.get_label()] = msg
if identity:
self._capability_identities[token] = identity
mplane.utils.add_value_to(self._capabilities_by_identity, identity, token)
def _remove_capability(self, token):
"""
Remove a capability from all the structures
"""
if token in self._capabilities:
label = self._capabilities[token].get_label()
del self._capabilities[token]
del self._capability_timeouts[token]
identity = self.identity_for(token)
self._capabilities_by_identity[identity].remove(token)
if label and label in self._capability_labels:
del self._capability_labels[label]
def _withdraw_capability(self, msg, identity):
"""
Process a withdrawal. Match the withdrawal to the capability,
first by token, then by schema. Withdrawals that do not match
any known capabilities are dropped silently.
Internal use only; use handle_message instead.
"""
token = msg.get_token()
# FIXME check identity, exception on mismatch
if token in self._capabilities:
self._remove_capability(token)
else:
# Search all capabilities by schema
for cap in self.capabilities_matching_schema(msg):
self._remove_capability(cap.get_token())
[docs] def capability_for(self, token_or_label):
"""
Retrieve a capability given a token or label.
"""
if token_or_label in self._capability_labels:
return self._capability_labels[token_or_label]
elif token_or_label in self._capabilities:
return self._capabilities[token_or_label]
else:
raise KeyError("no capability for token or label "+token_or_label)
[docs] def identity_for(self, token_or_label, receipt=False):
"""
Retrieve an identity given a capability token or label, or a receipt token.
"""
if not receipt:
if token_or_label in self._capability_identities:
return self._capability_identities[token_or_label]
elif token_or_label in self._capability_labels:
return self._capability_identities[self._capability_labels[token_or_label].get_token()]
else:
raise KeyError("no identity for capability token or label "+token_or_label)
else:
if token_or_label in self._receipt_identities:
return self._receipt_identities[token_or_label]
else:
raise KeyError("no identity for receipt token " + token_or_label)
[docs] def capabilities_matching_schema(self, schema_capability):
"""
Given a capability, return *all* known capabilities matching the
given schema capability. A capability matches a schema capability
if and only if: (1) the capability schemas match and (2) all
constraints in the capability are contained by all constraints
in the schema capability.
Used to programmatically select capabilities matching an
aggregation or other collection operation (e.g. at a supervisor).
"""
# FIXME write this, maybe refactor part back into model.
pass
def _spec_for(self, cap_tol, when, params, relabel=None):
"""
Given a capability token or label, a temporal scope, a dictionary
of parameters, and an optional new label, derive a specification
ready for invocation, and return the capability and specification.
Used internally by derived classes; use invoke_capability instead.
"""
cap = self.capability_for(cap_tol)
spec = mplane.model.Specification(capability=cap)
# set temporal scope
spec.set_when(when)
# fill in parameters
# spec.set_single_values() # this is automatic now
for pname in spec.parameter_names():
if spec.get_parameter_value(pname) is None:
if pname in params:
spec.set_parameter_value(pname, params[pname])
else:
raise KeyError("missing parameter "+pname)
# regenerate token based on parameters and temporal scope
spec.retoken()
# generate label
if relabel:
spec.set_label(relabel)
else:
spec.set_label(cap.get_label() + "-" + str(self._ssn))
self._ssn += 1
return (cap, spec)
def _handle_receipt(self, msg, identity):
self._add_receipt(msg, identity)
def _add_receipt(self, msg, identity):
"""
Add a receipt to internal state. The receipt will be recallable
by token, and, if present, by label.
Internal use only; use handle_message instead.
"""
self._receipt_identities[msg.get_token()] = identity
self._receipts[msg.get_token()] = msg
if msg.get_label():
self._receipt_labels[msg.get_label()] = msg
def _remove_receipt(self, msg):
"""
Remove a receipt from internal state.
Internal use only; use handle_message instead.
"""
token = msg.get_token()
if token in self._receipts:
receipt = self._receipts[token]
del self._receipts[token]
label = receipt.get_label()
if label and label in self._receipt_labels:
del self._receipt_labels[label]
def _handle_result(self, msg, identity):
# FIXME check the result identity against where we sent the specification to
self._add_result(msg, identity)
def _add_result(self, msg, identity=None):
"""
Add a result to internal state. The result will supercede any receipt
stored for the same token, and will be recallable by token, and,
if present, by label.
Internal use only; use handle_message instead.
"""
receipt = None
try:
if isinstance(msg, mplane.model.Envelope):
# if the result is an envelope containing multijob
# results, keep the receipt until the multijob ends
(start, end) = msg.when().datetimes()
if end < datetime.utcnow():
if self._supervisor:
self._exporter.put_nowait([msg, identity])
receipt = self._receipts[msg.get_token()]
self._remove_receipt(receipt)
else:
receipt = self._receipts[msg.get_token()]
self._remove_receipt(receipt)
except KeyError:
pass
self._results[msg.get_token()] = msg
if not isinstance(msg, mplane.model.Exception):
if msg.get_label():
self._result_labels[msg.get_label()] = msg
else:
# Exceptions are only added to result_labels if a receipt existed in receipts -WHY
if receipt is not None:
self._result_labels[receipt.get_label()] = msg
def _remove_result(self, msg):
token = msg.get_token()
if token in self._results:
label = self._results[token].get_label()
del self._results[token]
if label and label in self._result_labels:
del self._result_labels[label]
[docs] def result_for(self, token_or_label):
"""
Return a result for the token if available;
return the receipt for the token otherwise.
"""
# first look in state
if token_or_label in self._receipt_labels:
return self._receipt_labels[token_or_label]
elif token_or_label in self._receipts:
return self._receipts[token_or_label]
elif token_or_label in self._result_labels:
return self._result_labels[token_or_label]
elif token_or_label in self._results:
return self._results[token_or_label]
else:
raise KeyError("no such token or label "+token_or_label)
def _handle_exception(self, msg, identity):
self._add_result(msg)
[docs] def handle_message(self, msg, identity=None):
"""
Handle a message. Used internally to process
mPlane messages received from a component. Can also be used
to inject messages into a client's state.
"""
if (self._supervisor and
not isinstance(msg, mplane.model.Envelope)):
self._exporter.put_nowait([msg, identity])
if isinstance(msg, mplane.model.Capability):
self._add_capability(msg, identity)
elif isinstance(msg, mplane.model.Withdrawal):
self._withdraw_capability(msg, identity)
elif isinstance(msg, mplane.model.Receipt):
self._handle_receipt(msg, identity)
elif isinstance(msg, mplane.model.Result):
self._handle_result(msg, identity)
elif isinstance(msg, mplane.model.Exception):
self._handle_exception(msg, identity)
elif isinstance(msg, mplane.model.Envelope):
if msg.get_token() in self._receipts:
self._handle_result(msg, identity)
else:
for imsg in msg.messages():
self.handle_message(imsg, identity)
else:
raise ValueError("Internal error: unknown message "+repr(msg))
[docs] def forget(self, token_or_label):
"""
forget all receipts and results for the given token or label
"""
if token_or_label in self._result_labels:
result = self._result_labels[token_or_label]
del self._result_labels[token_or_label]
del self._results[result.get_token()]
if token_or_label in self._results:
result = self._results[token_or_label]
del self._results[token_or_label]
if result.get_label():
del self._result_labels[result.get_label()]
if token_or_label in self._receipt_labels:
receipt = self._receipt_labels[token_or_label]
del self._receipt_labels[token_or_label]
del self._receipts[receipt.get_token()]
if token_or_label in self._receipts:
receipt = self._receipts[token_or_label]
del self._receipts[token_or_label]
if receipt.get_label():
del self._receipt_labels[receipt.get_label()]
[docs] def receipt_tokens(self):
"""
list all tokens for outstanding receipts
"""
return tuple(self._receipts.keys())
[docs] def receipt_labels(self):
"""
list all labels for outstanding receipts
"""
return tuple(self._receipt_labels.keys())
[docs] def result_tokens(self):
"""
list all tokens for stored results
"""
return tuple(self._results.keys())
[docs] def result_labels(self):
"""
list all labels for stored results
"""
return tuple(self._result_labels.keys())
[docs] def capability_tokens(self):
"""
list all tokens for stored capabilities
"""
return tuple(self._capabilities.keys())
[docs] def capability_labels(self):
"""
list all labels for stored capabilities
"""
return tuple(self._capability_labels.keys())
[docs]class CrawlParser(html.parser.HTMLParser):
"""
HTML parser class to extract all URLS in a href attributes in
an HTML page. Used to extract links to Capabilities exposed
as link collections.
"""
def __init__(self, **kwargs):
super(CrawlParser, self).__init__(**kwargs)
self.urls = []
def handle_starttag(self, tag, attrs):
attrs = {k: v for (k,v) in attrs}
if tag == "a" and "href" in attrs:
self.urls.append(attrs["href"])
[docs]class HttpInitiatorClient(BaseClient):
"""
Core implementation of an mPlane JSON-over-HTTP(S) client.
Supports client-initiated workflows. Intended for building
client UIs and bots.
"""
def __init__(self, tls_state, default_url=None,
supervisor=False, exporter=None):
"""
initialize a client with a given
default URL an a given TLS state
"""
super().__init__(tls_state, supervisor=supervisor,
exporter=exporter)
self._default_url = default_url
# specification serial number
# used to create labels programmatically
self._ssn = 0
def set_default_url(self, url):
if isinstance(url, str):
self._default_url = urllib3.util.parse_url(url)
else:
self._default_url = url
[docs] def send_message(self, msg, dst_url=None):
"""
send a message, store any result in client state.
"""
# figure out where to send the message
if not dst_url:
dst_url = self._default_url
if isinstance(dst_url, str):
dst_url = urllib3.util.parse_url(dst_url)
pool = self._tls_state.pool_for(dst_url.scheme, dst_url.host, dst_url.port)
headers = {"Content-Type": "application/x-mplane+json"}
if self._tls_state.forged_identity():
headers[FORGED_DN_HEADER] = self._tls_state.forged_identity()
if dst_url.path is not None:
path = dst_url.path
else:
path = "/"
# send message
res = pool.urlopen('POST', path,
body=mplane.model.unparse_json(msg).encode("utf-8"),
headers=headers)
# process reply
if (res.status == 200 and
res.getheader("Content-Type") == "application/x-mplane+json"):
component_identity = self._tls_state.extract_peer_identity(dst_url)
self.handle_message(mplane.model.parse_json(res.data.decode("utf-8")), component_identity)
else:
# Didn't get an mPlane reply. What now?
pass
[docs] def result_for(self, token_or_label):
"""
return a result for the token if available;
attempt to redeem the receipt for the token otherwise;
if not yet redeemable, return the receipt instead.
"""
# go get a raw receipt or result
rr = super().result_for(token_or_label)
# check if it's a Job or Multijob result
if (isinstance(rr, mplane.model.Result) or
isinstance(rr, mplane.model.Envelope)):
# If it's a Multijob result, there may be a receipt
# to retrieve further data.
# In that case, ignore the current result and
# retrieve up-to-date results from the component
if (rr.get_token() not in self.receipt_tokens() and
rr.get_label() not in self.receipt_labels()):
return rr
else:
rr = self._receipts[rr.get_token()]
elif isinstance(rr, mplane.model.Exception):
return rr
# if we're here, we have a receipt. try to redeem it.
self.send_message(mplane.model.Redemption(receipt=rr))
# see if we got a result
if token_or_label in self._result_labels:
return self._result_labels[token_or_label]
elif token_or_label in self._results:
return self._results[token_or_label]
else:
# Nope. Return the receipt.
return rr
[docs] def invoke_capability(self, cap_tol, when, params, relabel=None):
"""
Given a capability token or label, a temporal scope, a dictionary
of parameters, and an optional new label, derive a specification
and send it to the appropriate destination.
"""
(cap, spec) = self._spec_for(cap_tol, when, params, relabel)
spec.validate()
dst_url = cap.get_link()
self.send_message(spec, dst_url)
return spec
def interrupt_capability(self, cap_tol):
# get the receipt
rr = super().result_for(cap_tol)
interrupt = mplane.model.Interrupt(specification=rr)
dst_url = urllib3.util.Url(scheme=self._default_url.scheme,
host=self._default_url.host,
port=self._default_url.port,
path=self._default_url.path)
self.send_message(interrupt, dst_url)
[docs] def retrieve_capabilities(self, url, urlchain=[], pool=None, identity=None):
"""
Connect to the given URL, retrieve and process the
capabilities/withdrawals found there
"""
# detect loops in capability links
if url in urlchain:
return
if not self._default_url:
self.set_default_url(url)
if isinstance(url, str):
url = urllib3.util.parse_url(url)
if identity is None:
identity = self._tls_state.extract_peer_identity(url)
if pool is None:
if url.host is not None:
pool = self._tls_state.pool_for(url.scheme, url.host, url.port)
else:
raise ValueError("HttpInitiatorClient capability retrieval missing connection pool")
if url.path is not None:
path = url.path
else:
path = "/"
res = pool.request('GET', path)
if res.status == 200:
ctype = res.getheader("Content-Type")
if ctype == "application/x-mplane+json":
# Probably an envelope. Process the message.
self.handle_message(
mplane.model.parse_json(res.data.decode("utf-8")), identity)
elif ctype == "text/html":
# Treat as a list of links to capability messages.
parser = CrawlParser()
parser.feed(res.data.decode("utf-8"))
parser.close()
for capurl in parser.urls:
self.retrieve_capabilities(url=capurl,
urlchain=urlchain + [url],
pool=pool, identity=identity)
[docs]class HttpListenerClient(BaseClient):
"""
Core implementation of an mPlane JSON-over-HTTP(S) client.
Supports component-initiated workflows. Intended for building
supervisors.
"""
def __init__(self, config, tls_state=None,
supervisor=False, exporter=None, io_loop=None):
super().__init__(tls_state, supervisor=supervisor,
exporter=exporter)
listen_port = DEFAULT_PORT
self.registration_path = DEFAULT_REGISTRATION_PATH
self.specification_path = DEFAULT_SPECIFICATION_PATH
self.result_path = DEFAULT_RESULT_PATH
ipaddresses = None
self._link = ""
self.config = config
# if config file contains the needed keys: use them. Otherwise use default values
if self.config is not None and "Client" in self.config and "Listener" in self.config["Client"]:
if "port" in self.config["Client"]["Listener"]:
listen_port = int(self.config["Client"]["Listener"]["port"])
if "capability-path" in self.config["Client"]["Listener"]:
self.registration_path = self.config["Client"]["Listener"]["capability-path"]
if "specification-path" in self.config["Client"]["Listener"]:
self.specification_path = self.config["Client"]["Listener"]["specification-path"]
if "result-path" in self.config["Client"]["Listener"]:
self.result_path = self.config["Client"]["Listener"]["result-path"]
if "interfaces" in self.config["Client"]["Listener"] and self.config["Client"]["Listener"]["interfaces"]:
ipaddresses = self.config["Client"]["Listener"]["interfaces"]
# 'link' construction: if there are multiple IPs to listen on, we have no way to determine
# which will be the correct URI for a component. In this case, let's delegate the construction to the
# request handlers (see InteractionsHandler.get())
if len(ipaddresses) != 1:
self._link = ""
else:
if "TLS" in self.config:
self._link = "https://"
else:
self._link = "http://"
self._link = self._link + ipaddresses[0] + ":"
if not self.result_path.startswith("/"):
self._link = self._link + self.config["Client"]["Listener"]["port"] + "/" + self.result_path
else:
self._link = self._link + self.config["Client"]["Listener"]["port"] + self.result_path
if not self.registration_path.startswith("/"):
self.registration_path = "/" + self.registration_path
if not self.specification_path.startswith("/"):
self.specification_path = "/" + self.specification_path
if not self.result_path.startswith("/"):
self.result_path = "/" + self.result_path
# Outgoing messages per component identifier
self._outgoing = {}
# specification serial number
# used to create labels programmatically
self._ssn = 0
# Capability
self._callback_capability = {}
# Create a request handler pointing at this client
self._tornado_application = tornado.web.Application([
(self.registration_path, InteractionsHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(self.registration_path + "/", InteractionsHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(self.specification_path, InteractionsHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(self.specification_path + "/", InteractionsHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(self.result_path, InteractionsHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(self.result_path + "/", InteractionsHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
])
http_server = tornado.httpserver.HTTPServer(self._tornado_application, ssl_options=tls_state.get_ssl_options())
# run the server
if ipaddresses is not None:
for ip in ipaddresses:
http_server.listen(listen_port, ip)
else:
http_server.listen(listen_port)
if io_loop is not None:
cli_t = Thread(target=self.listen_in_background(io_loop))
timeout_callback = tornado.ioloop.PeriodicCallback(self._check_timeouts, 5000, io_loop)
else:
cli_t = Thread(target=self.listen_in_background)
timeout_callback = tornado.ioloop.PeriodicCallback(self._check_timeouts, 5000)
timeout_callback.start()
cli_t.daemon = True
cli_t.start()
[docs] def listen_in_background(self, io_loop=None):
""" The server listens for requests in background """
if io_loop is None:
tornado.ioloop.IOLoop.instance().start()
def _check_timeouts(self):
""" Checks if capabilities are expired, and if so, delete them """
expired_tokens = []
for token in self._capability_timeouts:
interval = datetime.utcnow() - self._capability_timeouts[token]
if interval.total_seconds() >= 10:
expired_tokens.append(token)
for token in expired_tokens:
cap_withdraw = mplane.model.Withdrawal(capability=self._capabilities[token])
self.handle_message(cap_withdraw, self.identity_for(token))
def _push_outgoing(self, identity, msg):
if identity not in self._outgoing:
self._outgoing[identity] = []
self._outgoing[identity].append(msg)
[docs] def invoke_capability(self, cap_tol, when, params, relabel=None, callback_when=None):
"""
Given a capability token or label, a temporal scope, a dictionary
of parameters, and an optional new label, derive a specification
and queue it for retrieval by the appropriate identity (i.e., the
one associated with the capability).
If the identity has indicated it supports callback control,
the optional callback_when parameter queues a callback spec to
schedule the next callback.
"""
logger.info("will invoke " + cap_tol + " with " +repr(params))
# grab cap, spec, and identity
(cap, spec) = self._spec_for(cap_tol, when, params, relabel)
identity = self.identity_for(cap.get_token())
spec.set_link(self._link)
callback_cap = None
if identity in self._callback_capability:
# prepare a callback spec if we need to
callback_cap = self._callback_capability[identity]
if callback_cap and callback_when:
callback_spec = mplane.model.Specification(capability=callback_cap)
callback_spec.set_when(callback_when)
envelope = mplane.model.Envelope()
envelope.append_message(callback_spec)
envelope.append_message(spec)
self._push_outgoing(identity, envelope)
else:
self._push_outgoing(identity, spec)
return spec
def interrupt_capability(self, cap_tol):
# get the receipt
rr = super().result_for(cap_tol)
identity = self.identity_for(rr.get_token(), receipt=True)
interrupt = mplane.model.Interrupt(specification=rr)
self._push_outgoing(identity, interrupt)
def _add_capability(self, msg, identity):
"""
Override Client's add_capability, check for callback control
"""
if msg.verb() == mplane.model.VERB_CALLBACK:
# FIXME this is kind of dodgy; we should do better checks to
# make sure this is a real callback capability
self._callback_capability[identity] = msg
else:
# not a callback control cap, just add the capability
super()._add_capability(msg, identity)
[docs]class MPlaneHandler(tornado.web.RequestHandler):
"""
Abstract tornado RequestHandler that allows a
handler to respond with an mPlane Message.
"""
def _respond_message(self, msg):
"""
Returns an HTTP response containing a JSON message
"""
self.set_status(200)
self.set_header("Content-Type", "application/x-mplane+json")
self.write(mplane.model.unparse_json(msg))
self.finish()
def _respond_plain_text(self, code, text = None):
"""
Returns an HTTP response containing a plain text message
"""
self.set_status(code)
if text is not None:
self.set_header("Content-Type", "text/plain")
self.write(text)
self.finish()
def _respond_json_text(self, code, text = None):
"""
Returns an HTTP response containing a plain text message
"""
self.set_status(code)
if text is not None:
self.set_header("Content-Type", "application/x-mplane+json")
self.write(text)
self.finish()
# FIXME figure out what this class is for
[docs]class InteractionsHandler(MPlaneHandler):
"""
Handles the probes that want to register to this supervisor
Each capability is registered independently
Exposes the specifications, that will be periodically pulled by the
components
Receives results of specifications
"""
def initialize(self, listenerclient, tlsState):
self._listenerclient = listenerclient
self._tls = tlsState
[docs] def post(self):
"""
Receives POST requests that may contain Capabilities, Results, Receipts and Exceptions
"""
# unwrap json message from body
if self.request.headers["Content-Type"] == "application/x-mplane+json":
env = mplane.model.parse_json(self.request.body.decode("utf-8"))
else:
self._respond_plain_text(400, "Invalid format")
return
if self._listenerclient.registration_path != self._listenerclient.result_path:
# registration and result path are different, we need to check if the requests have been
# sent on the correct path
# if is a Result, Receipt, Exception or Envelope (containing results)
if self.request.path == self._listenerclient.result_path:
if isinstance(env, mplane.model.Result) \
or isinstance(env, mplane.model.Receipt) \
or isinstance(env, mplane.model.Exception):
self._listenerclient.handle_message(env, self._tls.extract_peer_identity(self.request))
self._respond_plain_text(200)
elif isinstance(env, mplane.model.Envelope):
for msg in env.messages():
if not isinstance(msg, mplane.model.Result) \
and not isinstance(msg, mplane.model.Receipt) \
and not isinstance(msg, mplane.model.Exception):
self._respond_plain_text(401, "Not a result (or receipt) received on result path")
return
# fall through
self._listenerclient.handle_message(env, self._tls.extract_peer_identity(self.request))
self._respond_plain_text(200)
return
else:
self._respond_plain_text(401, "Not authorized: not a result (or receipt) received on result path")
return
# if is an Envelope (containing capabilities)
elif self.request.path == self._listenerclient.registration_path:
if isinstance(env, mplane.model.Envelope):
self._listenerclient.handle_message(env, self._tls.extract_peer_identity(self.request))
response = self.generate_response(env)
self._respond_json_text(200, response)
return
else:
self._respond_plain_text(400, "Not a capability / Wrong format")
return
else:
# registration and result path are the same, so we need to differentiate the handling of the
# request based on what's in the request body
if isinstance(env, mplane.model.Result) \
or isinstance(env, mplane.model.Receipt) \
or isinstance(env, mplane.model.Exception):
self._listenerclient.handle_message(env, self._tls.extract_peer_identity(self.request))
self._respond_plain_text(200)
return
elif isinstance(env, mplane.model.Envelope):
self._listenerclient.handle_message(env, self._tls.extract_peer_identity(self.request))
for msg in env.messages():
if isinstance(msg, mplane.model.Result) or isinstance(env, mplane.model.Receipt):
self._respond_plain_text(200)
return
elif isinstance(msg, mplane.model.Capability) or isinstance(msg, mplane.model.Withdrawal):
response = self.generate_response(env)
self._respond_json_text(200, response)
return
else:
self._respond_plain_text(401, "Not authorized")
return
# FIXME noncompliant, see issue #4
[docs] def generate_response(self, env):
"""
Generate the response for the request containing the capabilities
"""
response = ""
for msg in env.messages():
if isinstance(msg, mplane.model.Capability):
response += "\"" + msg.get_label() + "\":{\"registered\":\"ok\"},"
elif isinstance(msg, mplane.model.Withdrawal):
response += "\"" + msg.get_label() + "\":{\"registered\":\"no\", \"reason\":\"Withdrawn\"},"
else:
response += "\"" + msg.get_label() + "\":{\"registered\":\"no\", \"reason\":\"Not a capability\"},"
response = "{" + response[:-1].replace("\n", "") + "}"
return response
# FIXME here's where the 428 comes from, noncompliant, see issue #4
[docs] def get(self):
"""
Receives GET specification requests
"""
# FIXME verify this works -- dodgy merge?
# check if the path is correct (since we can receive a GET request also on registration_path or result_path)
if self.request.path == self._listenerclient.specification_path:
identity = self._tls.extract_peer_identity(self.request)
if identity in self._listenerclient._capabilities_by_identity:
# specification requests from a component are used as keep-alive,
# so let's reset timeouts for capabilities from that component when we receive one
for token in self._listenerclient._capabilities_by_identity[identity]:
self._listenerclient._capability_timeouts[token] = datetime.utcnow()
specs = self._listenerclient._outgoing.pop(identity, [])
env = mplane.model.Envelope()
for spec in specs:
# if the 'link' field is empty, compose it using the host requested by the component/supervisor
if not spec.get_link():
if self._listenerclient.config is not None and "TLS" in self._listenerclient.config:
link = "https://"
else:
link = "http://"
link = link + self.request.host + self._listenerclient.result_path
spec.set_link(link)
logger.info("[fixme] AppendSpec %s" % spec)
env.append_message(spec)
if isinstance(spec, mplane.model.Specification):
logger.info("[fixme] Specification " + spec.get_label() + " successfully pulled by " + identity)
else:
logger.info("[fixme] Interrupt " + spec.get_token() + " successfully pulled by " + identity)
self._respond_json_text(200, mplane.model.unparse_json(env))
else:
self._respond_plain_text(428, "not registered")
else:
self._respond_plain_text(401, "Wrong path for specification requests")