Skip to content
Snippets Groups Projects
Commit 13019e7a authored by E. Madison Bray's avatar E. Madison Bray
Browse files

[testing] add suite of functional tests to perform against recsystems;

particularly against their JSON-RPC interface

while these tests are not useful for testing the recommendations themselves
(see the explanation in test_recommendations--this is something implementers
of individual recsystems are responsible for testing--it does provide a good
smoke test as to whether the recsystem has implemented all the JSON-RPC
interfaces correctly

it is designed to in principle be run against any recsystem written in any
language

the test itself is written in Python and requires Python + pytest to run, but
it runs the recsystem itself in a subprocess and can be adapted to any
executable providing the recsystem under test
parent ca68c1a9
No related branches found
No related tags found
No related merge requests found
...@@ -10,6 +10,7 @@ import random ...@@ -10,6 +10,7 @@ import random
import socket import socket
import string import string
import sys import sys
import time
import threading import threading
from datetime import datetime from datetime import datetime
...@@ -418,3 +419,80 @@ def random_user_id(length=28, chars=(string.ascii_letters + string.digits)): ...@@ -418,3 +419,80 @@ def random_user_id(length=28, chars=(string.ascii_letters + string.digits)):
""" """
return ''.join(random.sample(chars, length)) return ''.join(random.sample(chars, length))
def retry(count=0, interval=None, exc_type=BaseException):
"""
Decorator which retries the wrapped function up to ``count`` times if
an exception occurs.
The wrapped function should not require any arguments and is executed
right away--this is a workaround to the fact that it is not easy to write
a context manager to re-execute code (it is possible to do this but not
without massive hacks; see
https://gist.github.com/embray/f95d9cbe677542ce19a1).
If given, will wait ``interval`` seconds before each retry.
If ``exc_type`` is given it will only retry if the exception is a subclass
of the given exception type.
Examples
--------
>>> from renewal_recsystem.utils.testing import retry
>>> tries = 0
>>> def test_func():
... global tries
... tries += 1
... assert tries > 3
...
>>> @retry(count=3, interval=0.1)
... def retry_test_func():
... test_func()
...
>>> tries
4
>>> tries = 0
>>> @retry(count=2, interval=0.1)
... def retry_test_func():
... test_func()
...
Traceback (most recent call last):
...
AssertionError
>>> tries
3
If the error doesn't match ``exc_type`` then the function won't be retried:
>>> tries = 0
>>> @retry(count=10, exc_type=RuntimeError)
... def retry_test_func():
... test_func()
...
Traceback (most recent call last):
...
AssertionError
"""
def decorator(func):
nonlocal count
while True:
try:
return func()
except BaseException as exc:
if not count or not isinstance(exc, exc_type):
raise
if interval is not None:
time.sleep(interval)
count -= 1
# Immediately call the wrapped function--retry starts running as
# as soon as a function is decorated
return func
return decorator
"""Base test suite for functional tests against recsystems."""
import abc
import copy
import os
import os.path as pth
import random
import shutil
import subprocess
import sys
import tempfile
import pytest
from . import get_free_port_safe, generate_articles, random_user_id, retry
from .server import test_server_thread
class RecsystemFunctionalTest(metaclass=abc.ABCMeta):
"""
Base class for running a suite of functional tests against a recsystem's
implementation of the JSON-RPC API.
Subclasses should implement the ``start_recsystem`` method, which is passed
the URL of the dummy server and runs whatever commands necessary to start
up the recsystem connecting to that port (on localhost).
Once the recsystem is running it should attach to the websocket interface
on the test server (``ws://localhost:<port>/api/v1/event_stream``) and it
will be sent various RPC calls to which it must response and appropriate
response. See the docstrings of the individual test methods for what the
expected response is to the test. Tests are run in the order they appear
in the class definition. The ``stats()`` RPC call is used to check many
test results, so as the tests are run the ``stats()`` returned by the
recsystem should evolve according to the tests in a predictable fashion.
Recsystem implementers may wish to run their recsystem in a "testing" mode
separate from their production deployment, as the recsystem will be sent
fake data during testing that it should poison its production deployment
with (e.g., if it is using a database to store articles).
A "testing" mode may also perform additional tests internal to the
recsystem in response to each RPC call (e.g., making tests of its internal
state as expected by each RPC call). If any of the recsystem's internal
tests fail it should exit; each test method also checks whether the
recsystem is still running. This also tests that the recsystem does not
crash unexpectedly during the tests.
.. note::
The test changes the current working directory to a temporary
directory; subclasses may safely write anything into `os.getcwd`.
.. note::
The test server does not currently test authentication, so while under
test the recsystem may use a dummy authentication token or none at all.
.. note::
The test server implements dummy versions of most of the REST API
endpoints, most of which return empty results. The recsystem may make
calls to these APIs, but all user assignments and articles that the
recsystem is tested against are received via the JSON-RPC API.
"""
API_BASE_URI_TEMPLATE = 'http://localhost:{port}/api/v1/'
"""
The URI that recsystems under testing will connect to, with the port left
as a template variable since it will be determined at runtime.
"""
TIMEOUT = 10
"""Default timeout to set on RPC requests."""
RETRY_INTERVAL = 1
"""Default interval on which to retry stats calls that fail."""
@classmethod
def setup_class(cls):
cls.prev_cwd = os.getcwd()
tmpdir = tempfile.mkdtemp()
os.chdir(tmpdir)
# start up the test server
with get_free_port_safe() as port:
cls.server = test_server_thread(port)
cls.server.wait()
# start up the recsystem using the given port
uri = cls.API_BASE_URI_TEMPLATE.format(port=port)
cls.recsystem = cls.start_recsystem(uri)
assert isinstance(cls.recsystem, subprocess.Popen)
assert cls.recsystem.poll() is None
cls.seen_articles = set()
cls.assigned_users = set()
@classmethod
def teardown_class(cls):
# teardown portion
# Try to shut down the recsystem; give it up to a minute to
# terminate cleanly, then try SIGKILL
try:
if hasattr(cls, 'recsystem'):
cls.recsystem.terminate()
try:
cls.recsystem.wait(timeout=60)
except subprocess.TimeoutExpired:
cls.recsystem.kill()
if hasattr(cls, 'server'):
cls.server.stop()
cls.server.join()
finally:
if hasattr(cls, 'prev_cwd'):
os.chdir(cls.prev_cwd)
for attr in ['server', 'recsystem', 'seen_articles', 'assigned_users',
'prev_cwd']:
try:
delattr(cls, attr)
except AttributeError:
pass
@classmethod
@abc.abstractmethod
def start_recsystem(cls, uri):
"""
Should return a `subprocess.Popen` object running the recsystem in a
subprocess.
"""
@staticmethod
def compare_stats(got, want):
"""
Return True if two stats dicts are equal.
If any of the values in the ``want`` dict are sets, the corresponding
values in the ``got`` dict are converted to sets.
Any values in the ``got`` dict that are not in the ``want`` dict are
ignored (recsystems may return additional stats if they wish).
"""
assert isinstance(got, dict)
got = copy.deepcopy(got)
for k, v in want.items():
if isinstance(v, set) and k in got:
try:
got[k] = set(got[k])
except TypeError:
# If the value is not convertable to a set we will deal
# with that later when we compare the dict values
pass
for k in list(got):
if k not in want:
del got[k]
return got == want
def assert_stats_equal(self, want, retry_count=0, retry_interval=None):
"""
Request the recsystem's stats and compare them to the desired stats
using `compare_stats`.
If ``retry_count > 0`` this check will be repeated until the check
succeeds. This is useful becomes sometimes the recsystem can take a
while to catch up, especially to ``notify()`` RPC calls, which return
immediately on the client side, but may take the JSON-RPC server-side
additional time to process.
``retry_interval`` is waited between retries, with a generous default
of 1 second.
"""
if retry_interval is None:
retry_interval = self.RETRY_INTERVAL
@retry(count=retry_count, interval=retry_interval,
exc_type=AssertionError)
def retry_assert_stats_equal():
got = self.request('stats')
assert self.compare_stats(got, want)
def request(self, method, *args, **kwargs):
"""
Shortcut for self.server.request with the default timeout
applied.
"""
return self.server.request(method, *args, timeout=self.TIMEOUT,
**kwargs)
def notify(self, method, *args, **kwargs):
"""
Shortcut for self.server.notify with the default timeout
applied.
"""
return self.server.notify(method, *args, timeout=self.TIMEOUT,
**kwargs)
@pytest.fixture(autouse=True)
def poll_recsystem(self):
"""
Check that the recsystem has not exited before and after each test.
If we don't check before the test it may hang waiting for the recsystem
to response.
"""
assert self.recsystem.poll() is None
yield
assert self.recsystem.poll() is None
def test_stats(self):
"""Test that the stats are empty/zero upon recsystem initialization."""
self.assert_stats_equal({
'ping_count': 0,
'seen_articles': set(),
'assigned_users': set()
})
def test_ping(self):
"""
Ping the recsystem a random number of times between two and ten; the
recsystem stats must reflect this.
The ping responses must be correct.
"""
ping_count = random.randint(2, 10)
for _ in range(ping_count):
assert self.request('ping') == 'pong'
# NOTE: assert_stats_equal ignores any keys in the first argument
# that are not in the second argument, so we can use this shorthand
# to test only specific stats
self.assert_stats_equal({'ping_count': ping_count})
def test_new_article(self):
"""
Test that new articles are received by the recsystem.
Between two and ten random articles are sent, and the recsystem status
must reflect that it saw new articles with the given article_ids.
"""
article_id_start = random.randint(1, 10000)
articles = generate_articles(article_id_start)
for _ in range(random.randint(2, 10)):
article = next(articles)
self.notify('new_article', article)
self.seen_articles.add(article['article_id'])
# After a sequence of notify commands it is necessary to give the
# recsystem some time to update, since notify calls return immediately
# regardless how long the recsystem takes to handle them on their end
self.assert_stats_equal({'seen_articles': self.seen_articles},
retry_count=len(self.seen_articles))
def test_article_interaction(self):
"""
Currently just tests that the ``article_interaction`` notification
succeeds against different interaction types.
Article interactions are not currently tracked in the stats, so there
is nothing meaningful to test how the recsystem response to article
interactions.
"""
for interaction in [
{'prev_rating': 0, 'rating': 1},
{'prev_rating': 0, 'rating': -1},
{'prev_rating': -1, 'rating': 0},
{'prev_rating': -1, 'rating': 1},
{'prev_rating': 1, 'rating': 0},
{'prev_rating': 1, 'rating': -1},
{'bookmarked': True},
{'bookmarked': False},
{'clicked': True}
]:
interaction.update({
'user_id': random_user_id(),
'article_id': random.randint(1, 10000)
})
self.notify('article_interaction', interaction)
def test_assigned_user(self):
"""
Test assigning users to the recsystem.
Between two and ten random users are assigned to the recsystem. The
recsystem stats must reflect these assignments exactly.
"""
for _ in range(random.randint(2, 10)):
user_id = random_user_id()
self.notify('assigned_user', user_id)
self.assigned_users.add(user_id)
self.assert_stats_equal({'assigned_users': self.assigned_users},
retry_count=len(self.assigned_users))
def test_unassigned_user(self):
"""
Test unassigning users from the recsystem.
Between two and ten previously assigned users are unassigned from the
recsystem. The recsystem stats must reflect the new assignments
exactly.
"""
assert len(self.assigned_users) >= 2
# unassign a random number of users, leaving at least on assigned user
unassigned_users = random.sample(self.assigned_users,
random.randint(1, len(self.assigned_users) - 1))
for user_id in unassigned_users:
self.notify('unassigned_user', user_id)
self.assigned_users.remove(user_id)
# sanity check that at least one assigned user was left
assert len(self.assigned_users) >= 1
self.assert_stats_equal({'assigned_users': self.assigned_users},
retry_count=len(unassigned_users))
def test_recommend(self):
"""
Test the ``recommend`` RPC call.
This test is a little loose because it makes no requirements as to how
recsystems decide what articles to recommend, and a recsystem on cold
start with a few number of articles may not make any recommendations.
The only requirements are that:
* The returned value is a list (it may be an empty list)
* If the list is non-empty it must contain only integers
* If the list is non-empty all values must article_ids of articles
seen by the recsystem since the start of the test
* If the ``limit`` argument is sent, the number of articles returned
must be no greater than the limit
* If the ``since_id`` argument is sent all article_ids (if any) must
be greater than ``since_id``
* If the ``max_id`` argument is sent, all article_ids (if any) must be
less than ``max_id``
Each recommendation request will be made for one of the users assigned
to the recsystem by the `test_assigned_user` test.
"""
assert len(self.assigned_users) >= 1
# Send a few more random articles
start_id = 10000 + random.randint(1, 10000)
end_id = start_id + random.randint(5, 100)
articles = generate_articles(start_id)
for _ in range(start_id, end_id):
article = next(articles)
self.notify('new_article', article)
self.seen_articles.add(article['article_id'])
self.assert_stats_equal({'seen_articles': self.seen_articles},
retry_count=len(self.seen_articles))
def assert_valid_recommendations(recs, limit=None, since_id=None,
max_id=None):
assert isinstance(recs, list)
for rec in recs:
assert rec in self.seen_articles
if limit is not None:
assert len(recs) <= limit
if since_id is not None:
assert all(rec > since_id for rec in recs)
if max_id is not None:
assert all(rec < max_id for rec in recs)
user_id = random.sample(self.assigned_users, 1)[0]
recs = self.request('recommend', user_id=user_id)
assert_valid_recommendations(recs)
recs = self.request('recommend', user_id=user_id, limit=10)
assert_valid_recommendations(recs, limit=10)
since_id = start_id + random.randint(2, 4)
recs = self.request('recommend', user_id=user_id, limit=10,
since_id=since_id)
assert_valid_recommendations(recs, limit=10, since_id=since_id)
max_id = end_id - random.randint(2, 4)
recs = self.request('recommend', user_id=user_id, limit=10,
max_id=max_id)
assert_valid_recommendations(recs, limit=10, max_id=max_id)
class RenewalRecsystemFunctionalTest(RecsystemFunctionalTest):
"""
Implements `RecsystemFunctionalTest` for subclasses of
`renewal_recsystem.recsystem.RenewalRecsystem`.
It assumes that the recsystem has the default argument signature of
``RenewalRecsystem``, that is it accepts at least ``--api-base-uri`` and
``--token``.
Additional CLI arguments can be added by overriding the
`RenewalRecsystemFunctionalTest.get_cmd` method.
"""
@abc.abstractproperty
def CMD(self):
"""
Base command for running the recsystem as a list; additional
command-line arguments are added in the
`RenewalRecsystemFunctionalTest.get_cmd` method.
"""
@classmethod
def get_cmd(cls, uri):
token_file = pth.join(os.getcwd(), 'dummy-token.jwt')
with open(token_file, 'w') as fobj:
fobj.write('dummy-token')
return cls.CMD + ['--api-base-uri', uri, '--token', token_file]
@classmethod
def start_recsystem(cls, uri):
cmd = cls.get_cmd(uri)
return subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import asyncio import asyncio
import itertools import itertools
import sys
from urllib import parse as urlparse from urllib import parse as urlparse
try: try:
...@@ -19,6 +20,8 @@ import pytest ...@@ -19,6 +20,8 @@ import pytest
from renewal_recsystem.articles import ArticleCollection from renewal_recsystem.articles import ArticleCollection
from renewal_recsystem.baseline import BaselineRecsystem as _BaselineRecsystem from renewal_recsystem.baseline import BaselineRecsystem as _BaselineRecsystem
from renewal_recsystem.utils.testing import generate_articles, random_user_id from renewal_recsystem.utils.testing import generate_articles, random_user_id
from renewal_recsystem.utils.testing.functional import (
RenewalRecsystemFunctionalTest)
# It is not possible to use the monkeypatch fixture in a class-scoped fixture; # It is not possible to use the monkeypatch fixture in a class-scoped fixture;
...@@ -241,3 +244,12 @@ class TestBaselineRecsystem: ...@@ -241,3 +244,12 @@ class TestBaselineRecsystem:
# warning but should not otherwise be a problem: # warning but should not otherwise be a problem:
self.run(baseline.unassigned_user(user_id)) self.run(baseline.unassigned_user(user_id))
assert user_id not in baseline.users assert user_id not in baseline.users
class TestBaselineRecsystemFunctional(RenewalRecsystemFunctionalTest):
"""
Run the functional test suite against a running instance of the baseline
recsystem.
"""
CMD = [sys.executable, '-m', 'renewal_recsystem.baseline']
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment