Select Git revision
test_baseline.py
server.py 8.62 KiB
"""Mock Renewal web server for testing recsystems against."""
import asyncio
import contextlib
import logging
import socket
import threading
import time
from quart import Quart, Blueprint, jsonify, g, request, websocket
from jsonrpcclient.clients.websockets_client import WebSocketsClient
from jsonrpcclient.response import Response
from ...articles import ArticleCollection
from ...utils import shutdown
class QuartWebSocketsClient(WebSocketsClient):
"""
Simple adapter of the
`jsonrpcclient.clients.websockets_client.WebsocketsClient` to Quart's
websockets interface.
Only supports sending and waiting on one RPC call at a time, which is
sufficient for basic testing.
"""
async def send_message(self, request, response_expected, **kwargs):
await self.socket.send(request)
if response_expected:
response_text = await self.socket.receive()
return Response(response_text)
return Response('')
api_v1 = Blueprint('api.v1', __name__)
@api_v1.route('/')
async def index():
return jsonify({'version': 1})
@api_v1.route('/user_assignments')
async def user_assignments():
return jsonify([])
@api_v1.route('/articles')
async def articles():
limit = int(request.args.get('limit', 30))
if limit < 1:
limit = 1
if 'since_id' in request.args:
since_id = int(request.args.get('since_id'))
else:
since_id = None
if 'max_id' in request.args:
max_id = int(request.args.get('max_id'))
else:
max_id = None
articles = g.articles[since_id:max_id]
return jsonify(articles[-limit:])
@api_v1.websocket('/event_stream')
async def event_stream():
await websocket.accept()
rpc_client = QuartWebSocketsClient(websocket)
# Until client disconnects, take RPC calls to send it off the
# rpc queue, and put the responses on the rpc_result queue
# (unless the call type is 'notify')
# If neither get/put function is defined then the websocket just
# disconnects immediately
if g.get_rpc_call is None or g.put_rpc_result is None:
return
while True:
command = await g.get_rpc_call()
if command is None:
# None is a poison-pill to disconnect the websocket
return
type_, method, args, kwargs = command
try:
response = await getattr(rpc_client, type_)(method, *args, **kwargs)
result = response.data.result
except asyncio.CancelledError:
raise
except Exception as exc:
result = exc
if type_ == 'request':
await g.put_rpc_result(result)
def _make_server_task(port, articles=None, get_rpc_call=None,
put_rpc_result=None, quiet=False, **kwargs):
"""
Create and return a task to run the server on the given port.
``get_rpc_call`` and ``put_rpc_result`` are functions the websocket
server should use to get RPC calls to send to the websocket client and
return the results of those calls.
"""
if articles is None:
articles = ArticleCollection()
app = Quart(__name__)
app.register_blueprint(api_v1, url_prefix='/api/v1')
@app.before_request
async def before_request():
g.articles = articles
g.get_rpc_call = get_rpc_call
g.put_rpc_result = put_rpc_result
@app.before_websocket
async def before_websocket():
if getattr(g, 'client_connected', False):
raise RuntimeError(
'dummy_server only supports one websocket connection at a '
'time')
g.client_connected = True
await before_request()
task = app.run_task(port=port, **kwargs)
if quiet:
# Disable the access log output from Quart
log = logging.getLogger('quart.server').disabled = True
return task
def dummy_server(port=None, articles=None, quiet=False, **kwargs):
loop = asyncio.get_event_loop()
try:
server_task = _make_server_task(port, articles, quiet=quiet, **kwargs)
loop.run_until_complete(server_task)
finally:
# Shut down the event loop
loop.run_until_complete(shutdown(loop))
loop.close()
class ServerThread(threading.Thread):
_rpc_call_queue = None
_rpc_result_queue = None
_stop_event = None
def __init__(self, port, quiet=True, **kwargs):
super().__init__()
self._port = kwargs['port'] = port
kwargs['quiet'] = quiet
self._server_kwargs = kwargs
self._loop = asyncio.new_event_loop()
def run(self):
asyncio.set_event_loop(self._loop)
self._rpc_call_queue = asyncio.Queue()
self._rpc_result_queue = asyncio.Queue()
self._stop_event = asyncio.Event()
dummy_server(get_rpc_call=self._rpc_call_queue.get,
put_rpc_result=self._rpc_result_queue.put,
shutdown_trigger=self._stop_event.wait,
**self._server_kwargs)
def wait(self):
"""Wait for the server port to be bound."""
# Unfortunately there is no great way I know of to synchronously block
# until a port is bound without a busy loop.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
sock.connect(('localhost', self._port))
break
except ConnectionRefusedError:
time.sleep(0.1)
def stop(self):
"""Shut down the server cleanly and stop the thread."""
if self._stop_event is None:
raise RuntimeError(f'{self.__class__.__name__} not running')
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._stop_event.set)
def request(self, method, *args, timeout=None, **kwargs):
return self._rpc_call('request', method, *args, timeout=timeout,
**kwargs)
def notify(self, method, *args, timeout=None, **kwargs):
return self._rpc_call('notify', method, *args, timeout=timeout,
**kwargs)
def _rpc_call(self, type_, method, *args, timeout=None, **kwargs):
asyncio.run_coroutine_threadsafe(
self._rpc_call_queue.put((type_, method, args, kwargs)),
self._loop)
if type_ == 'request':
fut = asyncio.run_coroutine_threadsafe(
self._rpc_result_queue.get(), self._loop)
result = fut.result(timeout=timeout)
if isinstance(result, Exception):
raise result
return result
else:
return
def dummy_server_thread(port=None, articles=None, quiet=True, **kwargs):
"""
Starts `dummy_server` but in its own thread.
Returns a `ServerThread` object which has `ServerThread.request` and
`ServerThread.notify` methods which send JSON-RPC calls to the connected
websocket client (if any). Only one connected client is supported.
Examples
--------
First start up the server thread:
>>> from renewal_recsystem.utils.testing import get_free_port_safe
>>> from renewal_recsystem.utils.testing.server import dummy_server_thread
>>> with get_free_port_safe() as port:
... server_thread = dummy_server_thread(port=port)
... server_thread.wait()
...
Now let's make a simple websocket client acting as a JSON-RPC server:
>>> import websockets
>>> from jsonrpcserver import method, async_dispatch as dispatch
>>> async def websocket_client(ws):
... @method
... async def hello(name):
... return f'Hello {name}!'
...
... while True:
... try:
... response = await dispatch(await ws.recv())
... except websockets.ConnectionClosed:
... break
... if response.wanted:
... await ws.send(str(response))
And spin it up in its own thread. It should connect to the server.
>>> from renewal_recsystem.utils.testing import (
... websocket_test_client_thread)
>>> client_thread = websocket_test_client_thread(websocket_client,
... uri=f'ws://localhost:{port}/api/v1/event_stream')
Now we can use the reference to the server_thread to send RPC calls and
receive results:
>>> server_thread.request('hello', name='Fred', timeout=5)
'Hello Fred!'
Make sure to call `ServerThread.stop` before joining the thread, or else
it will block forever. This ensures that the server shuts down cleanly:
>>> server_thread.stop()
>>> server_thread.join()
>>> client_thread.join()
"""
thread = ServerThread(port=port, articles=articles, quiet=quiet, **kwargs)
thread.daemon = True
thread.start()
return thread