Skip to content
Snippets Groups Projects
Select Git revision
  • dea7d6cd2d198a7d7432c5ce6006da57905028c3
  • master default protected
  • embray/issue-22
  • documentation/init
  • ci/tag-master
  • V1.0
6 results

functional.py

Blame
  • server.py 8.62 KiB
    """Mock Renewal web server for testing recsystems against."""
    
    import asyncio
    import contextlib
    import logging
    import socket
    import threading
    import time
    
    from quart import Quart, Blueprint, jsonify, g, request, websocket
    from jsonrpcclient.clients.websockets_client import WebSocketsClient
    from jsonrpcclient.response import Response
    
    from ...articles import ArticleCollection
    from ...utils import shutdown
    
    
    class QuartWebSocketsClient(WebSocketsClient):
        """
        Simple adapter of the
        `jsonrpcclient.clients.websockets_client.WebsocketsClient` to Quart's
        websockets interface.
    
        Only supports sending and waiting on one RPC call at a time, which is
        sufficient for basic testing.
        """
    
        async def send_message(self, request, response_expected, **kwargs):
            await self.socket.send(request)
            if response_expected:
                response_text = await self.socket.receive()
                return Response(response_text)
    
            return Response('')
    
    
    
    api_v1 = Blueprint('api.v1', __name__)
    
    
    @api_v1.route('/')
    async def index():
        return jsonify({'version': 1})
    
    
    @api_v1.route('/user_assignments')
    async def user_assignments():
        return jsonify([])
    
    
    @api_v1.route('/articles')
    async def articles():
        limit = int(request.args.get('limit', 30))
    
        if limit < 1:
            limit = 1
    
        if 'since_id' in request.args:
            since_id = int(request.args.get('since_id'))
        else:
            since_id = None
    
        if 'max_id' in request.args:
            max_id = int(request.args.get('max_id'))
        else:
            max_id = None
    
        articles = g.articles[since_id:max_id]
        return jsonify(articles[-limit:])
    
    
    @api_v1.websocket('/event_stream')
    async def event_stream():
        await websocket.accept()
        rpc_client = QuartWebSocketsClient(websocket)
        # Until client disconnects, take RPC calls to send it off the
        # rpc queue, and put the responses on the rpc_result queue
        # (unless the call type is 'notify')
        # If neither get/put function is defined then the websocket just
        # disconnects immediately
        if g.get_rpc_call is None or g.put_rpc_result is None:
            return
    
        while True:
            command = await g.get_rpc_call()
            if command is None:
                # None is a poison-pill to disconnect the websocket
                return
    
            type_, method, args, kwargs = command
            try:
                response = await getattr(rpc_client, type_)(method, *args, **kwargs)
                result = response.data.result
            except asyncio.CancelledError:
                raise
            except Exception as exc:
                result = exc
    
            if type_ == 'request':
                await g.put_rpc_result(result)
    
    
    def _make_server_task(port, articles=None, get_rpc_call=None,
                          put_rpc_result=None, quiet=False, **kwargs):
        """
        Create and return a task to run the server on the given port.
    
        ``get_rpc_call`` and ``put_rpc_result`` are functions the websocket
        server should use to get RPC calls to send to the websocket client and
        return the results of those calls.
        """
    
        if articles is None:
            articles = ArticleCollection()
    
        app = Quart(__name__)
        app.register_blueprint(api_v1, url_prefix='/api/v1')
    
        @app.before_request
        async def before_request():
            g.articles = articles
            g.get_rpc_call = get_rpc_call
            g.put_rpc_result = put_rpc_result
    
        @app.before_websocket
        async def before_websocket():
            if getattr(g, 'client_connected', False):
                raise RuntimeError(
                    'dummy_server only supports one websocket connection at a '
                    'time')
            g.client_connected = True
            await before_request()
    
        task = app.run_task(port=port, **kwargs)
    
        if quiet:
            # Disable the access log output from Quart
            log = logging.getLogger('quart.server').disabled = True
    
        return task
    
    
    def dummy_server(port=None, articles=None, quiet=False, **kwargs):
        loop = asyncio.get_event_loop()
        try:
            server_task = _make_server_task(port, articles, quiet=quiet, **kwargs)
            loop.run_until_complete(server_task)
        finally:
            # Shut down the event loop
            loop.run_until_complete(shutdown(loop))
            loop.close()
    
    
    class ServerThread(threading.Thread):
        _rpc_call_queue = None
        _rpc_result_queue = None
        _stop_event = None
    
        def __init__(self, port, quiet=True, **kwargs):
            super().__init__()
            self._port = kwargs['port'] = port
            kwargs['quiet'] = quiet
            self._server_kwargs = kwargs
            self._loop = asyncio.new_event_loop()
    
        def run(self):
            asyncio.set_event_loop(self._loop)
            self._rpc_call_queue = asyncio.Queue()
            self._rpc_result_queue = asyncio.Queue()
            self._stop_event = asyncio.Event()
            dummy_server(get_rpc_call=self._rpc_call_queue.get,
                         put_rpc_result=self._rpc_result_queue.put,
                         shutdown_trigger=self._stop_event.wait,
                         **self._server_kwargs)
    
        def wait(self):
            """Wait for the server port to be bound."""
    
            # Unfortunately there is no great way I know of to synchronously block
            # until a port is bound without a busy loop.
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            while True:
                try:
                    sock.connect(('localhost', self._port))
                    break
                except ConnectionRefusedError:
                    time.sleep(0.1)
    
        def stop(self):
            """Shut down the server cleanly and stop the thread."""
    
            if self._stop_event is None:
                raise RuntimeError(f'{self.__class__.__name__} not running')
    
            if self._loop.is_running():
                self._loop.call_soon_threadsafe(self._stop_event.set)
    
        def request(self, method, *args, timeout=None, **kwargs):
            return self._rpc_call('request', method, *args, timeout=timeout,
                    **kwargs)
    
        def notify(self, method, *args, timeout=None, **kwargs):
            return self._rpc_call('notify', method, *args, timeout=timeout,
                    **kwargs)
    
        def _rpc_call(self, type_, method, *args, timeout=None, **kwargs):
            asyncio.run_coroutine_threadsafe(
                    self._rpc_call_queue.put((type_, method, args, kwargs)),
                    self._loop)
    
            if type_ == 'request':
                fut = asyncio.run_coroutine_threadsafe(
                        self._rpc_result_queue.get(), self._loop)
                result = fut.result(timeout=timeout)
                if isinstance(result, Exception):
                    raise result
                return result
            else:
                return
    
    
    def dummy_server_thread(port=None, articles=None, quiet=True, **kwargs):
        """
        Starts `dummy_server` but in its own thread.
    
        Returns a `ServerThread` object which has `ServerThread.request` and
        `ServerThread.notify` methods which send JSON-RPC calls to the connected
        websocket client (if any).  Only one connected client is supported.
    
        Examples
        --------
    
        First start up the server thread:
    
        >>> from renewal_recsystem.utils.testing import get_free_port_safe
        >>> from renewal_recsystem.utils.testing.server import dummy_server_thread
        >>> with get_free_port_safe() as port:
        ...     server_thread = dummy_server_thread(port=port)
        ...     server_thread.wait()
        ...
    
        Now let's make a simple websocket client acting as a JSON-RPC server:
    
        >>> import websockets
        >>> from jsonrpcserver import method, async_dispatch as dispatch
        >>> async def websocket_client(ws):
        ...     @method
        ...     async def hello(name):
        ...         return f'Hello {name}!'
        ...
        ...     while True:
        ...         try:
        ...             response = await dispatch(await ws.recv())
        ...         except websockets.ConnectionClosed:
        ...             break
        ...         if response.wanted:
        ...             await ws.send(str(response))
    
        And spin it up in its own thread.  It should connect to the server.
    
        >>> from renewal_recsystem.utils.testing import (
        ...      websocket_test_client_thread)
        >>> client_thread = websocket_test_client_thread(websocket_client,
        ...     uri=f'ws://localhost:{port}/api/v1/event_stream')
    
        Now we can use the reference to the server_thread to send RPC calls and
        receive results:
    
        >>> server_thread.request('hello', name='Fred', timeout=5)
        'Hello Fred!'
    
        Make sure to call `ServerThread.stop` before joining the thread, or else
        it will block forever.  This ensures that the server shuts down cleanly:
    
        >>> server_thread.stop()
        >>> server_thread.join()
        >>> client_thread.join()
        """
    
        thread = ServerThread(port=port, articles=articles, quiet=quiet, **kwargs)
        thread.daemon = True
        thread.start()
        return thread