Skip to content
Snippets Groups Projects
Commit 180a32ee authored by E. Madison Bray's avatar E. Madison Bray
Browse files

[testing] add several unit tests for the baseline recsystem itself

parent 7549264c
No related branches found
No related tags found
No related merge requests found
...@@ -3,10 +3,14 @@ ...@@ -3,10 +3,14 @@
import asyncio import asyncio
import contextlib import contextlib
import itertools
import json import json
import logging import logging
import random
import string
import sys import sys
import threading import threading
from datetime import datetime
import websockets import websockets
...@@ -15,6 +19,7 @@ from . import shutdown ...@@ -15,6 +19,7 @@ from . import shutdown
try: try:
from jsonrpcclient.clients.websockets_client import WebSocketsClient from jsonrpcclient.clients.websockets_client import WebSocketsClient
from jsonrpcclient.response import Response from jsonrpcclient.response import Response
import lorem
except ImportError: except ImportError:
raise ImportError(__name__, raise ImportError(__name__,
'renewal_recsystem[tests] must be installed to use this module') 'renewal_recsystem[tests] must be installed to use this module')
...@@ -253,3 +258,96 @@ class WebSocketsMultiClient(WebSocketsClient): ...@@ -253,3 +258,96 @@ class WebSocketsMultiClient(WebSocketsClient):
for handler in log.handlers: for handler in log.handlers:
if isinstance(handler, logging.StreamHandler): if isinstance(handler, logging.StreamHandler):
handler.setStream(sys.stdout) handler.setStream(sys.stdout)
def generate_articles(start_id=1):
"""
Iterator which returns random articles with increasing article_ids.
Examples
--------
>>> from renewal_recsystem.utils.testing import generate_articles
>>> from pprint import pprint
>>> import random
>>> random.seed(0)
>>> articles = generate_articles()
>>> pprint(next(articles))
{'article_id': 1,
'authors': ['Occaecat Mollit'],
'date': '...-...-...T...:...:...',
'image_url': 'https://example.com/1/top_image.png',
'keywords': ['proident', 'id', 'duis'],
'lang': 'en',
'metrics': {'bookmarks': 0, 'clicks': 0, 'dislikes': 0, 'likes': 0},
'site': {'icon_url': 'https://localhost/api/v1/images/icons/0123456789abcdef',
'name': 'Example',
'url': 'example.com'},
'summary': 'Ad aliquip ullamco pariatur...'
...
'fugiat commodo nostrud non incididunt nisi.',
'text': 'Eiusmod fugiat cupidatat elit esse ipsum do velit...'
...
'aliqua occaecat sed laborum.',
'title': 'Ad Nisi Ut Pariatur Voluptate',
'url': 'https://example.com/1'}
>>> pprint(next(articles))
{'article_id': 2,
...
'url': 'https://example.com/2'}
"""
# NOTE: This should be kept in sync with the backend API implementation
template = {
'article_id': 0,
'authors': [],
'date': datetime.utcnow().isoformat(timespec='seconds'),
'image_url': 'https://example.com/{article_id}/top_image.png',
'keywords': [],
'lang': 'en',
'metrics': {'bookmarks': 0, 'clicks': 0, 'dislikes': 0, 'likes': 0},
'site': {
'icon_url':
'https://localhost/api/v1/images/icons/0123456789abcdef',
'name': 'Example',
'url': 'example.com'
},
'summary': '',
'text': '',
'title': '',
'url': 'https://example.com/{article_id}'
}
counter = itertools.count(start_id)
while True:
article = template.copy()
article_id = next(counter)
article.update({
'article_id': article_id,
'authors': [lorem.get_word(2, func='capitalize')],
'image_url': article['image_url'].format(article_id=article_id),
'keywords': lorem.get_word(3).split(),
'summary': lorem.get_paragraph(),
'text': lorem.get_paragraph(),
'title': lorem.get_word(5, func='capitalize'),
'url': article['url'].format(article_id=article_id)
})
yield article
def random_user_id(length=28, chars=(string.ascii_letters + string.digits)):
"""
Generate a random user ID in the format used by Firebase.
Example
-------
>>> from renewal_recsystem.utils.testing import random_user_id
>>> import random
>>> random.seed(0)
>>> random_user_id()
'2yW4AcqGFzYtEwLn9isSgN3IjZPe'
"""
return ''.join(random.sample(chars, length))
...@@ -20,7 +20,10 @@ install_requires = ...@@ -20,7 +20,10 @@ install_requires =
[options.extras_require] [options.extras_require]
tests = tests =
asyncmock;python_version<'3.8'
pytest pytest
pytest-asyncio
python-lorem
jsonrpcclient[websockets]>=3.3.0 jsonrpcclient[websockets]>=3.3.0
[tool:pytest] [tool:pytest]
...@@ -30,3 +33,6 @@ doctest_optionflags = ELLIPSIS NORMALIZE_WHITESPACE ...@@ -30,3 +33,6 @@ doctest_optionflags = ELLIPSIS NORMALIZE_WHITESPACE
filterwarnings = filterwarnings =
# ignore warning from pkg_resources' vendored copy of pyparsing # ignore warning from pkg_resources' vendored copy of pyparsing
ignore:Using or importing the ABCs from 'collections' ignore:Using or importing the ABCs from 'collections'
[coverage:run]
source = renewal_recsystem
"""Tests of `BaselineRecsystem`."""
import asyncio
import itertools
from urllib import parse as urlparse
try:
# Python 3.8+ only
from unittest.mock import AsyncMock
except ImportError:
try:
from asyncmock import AsyncMock
except ImportError:
raise ImportError(__name__,
'renewal_recsystem[tests] must be installed to run these tests')
import pytest
from renewal_recsystem.articles import ArticleCollection
from renewal_recsystem.baseline import BaselineRecsystem as _BaselineRecsystem
from renewal_recsystem.utils.testing import generate_articles, random_user_id
# It is not possible to use the monkeypatch fixture in a class-scoped fixture;
# see https://github.com/pytest-dev/pytest/issues/363 so we have to implement
# our own class-scoped monkeypatch
@pytest.fixture(scope='class')
def monkeyclass(request):
from _pytest.monkeypatch import MonkeyPatch
mpatch = MonkeyPatch()
yield mpatch
mpatch.undo()
class BaselineRecsystem(_BaselineRecsystem):
"""
Subclass of the real class modified for testing.
The only difference is that it uses a lower number of initial articles.
"""
INITIAL_ARTICLES = 100
DEFAULT_ARTICLES_LIMIT = 30
N_ASSIGNED_USERS = 100
@pytest.fixture(scope='class')
def baseline(monkeyclass):
"""
Test fixture to construct a `BaselineRecsystem` instance shared by all
tests in the class.
The `BaselineRecsystem.recommendation_mode` defaults to ``'random'`` but
can be changed by individual tests by using the monkeypatch fixture.
"""
recsystem = BaselineRecsystem(api_base_uri='http://localhost/api/v1',
token='fake.token')
# mock the initial backend API requests
def get(self, url, *args, params={}, **kwargs):
if url.endswith('/articles'):
# For the purposes of this test max_id and since_id are currently
# ignored since they are not used by the code under test
limit = params.pop('limit', DEFAULT_ARTICLES_LIMIT)
json = list(itertools.islice(generate_articles(), limit))
elif url.endswith('/user_assignments'):
json = [random_user_id() for _ in range(N_ASSIGNED_USERS)]
else:
json = None
response = AsyncMock()
response.__aenter__ = AsyncMock(return_value=response)
response.json = AsyncMock(return_value=json)
return response
monkeyclass.setattr('aiohttp.ClientSession.get', get)
loop = asyncio.get_event_loop()
loop.run_until_complete(recsystem.initialize_if_needed())
return recsystem
class TestBaselineRecsystem:
"""
Unit tests of the `BaselineRecsystem`'s initialization and RPC methods.
For the purpose of these test we don't actually run the main client loop
(`BaselineRecsystem.run`) as its functionality is tested by tests of the
lower-level server code.
This just tests the results of calling the individual RPC methods directly
as though they were called by an RPC client.
"""
def run(self, coro):
"""
Run an async function call on the event loop and return the results.
"""
return asyncio.get_event_loop().run_until_complete(coro)
def test_initialize(self, baseline):
assert isinstance(baseline.articles, ArticleCollection)
assert len(baseline.articles) == baseline.INITIAL_ARTICLES
assert isinstance(baseline.users, set)
assert len(baseline.users) == N_ASSIGNED_USERS
def test_ping(self, baseline):
"""Test the ping RPC endpoint."""
assert self.run(baseline.ping()) == 'pong'
def test_new_article(self, baseline):
"""Test the new_article RPC endpoint."""
# generate a few new articles with an IDs a little bit higher than the
# heighest article ID currently in the collection:
last_id = baseline.articles.article_ids[-1]
last_len = len(baseline.articles)
article_generator = generate_articles(last_id + 1)
new_articles = list(itertools.islice(article_generator, 5))
# push the new article with the highest ID; this will will create a
# "gap" in the article_ids of stored articles
self.run(baseline.new_article(new_articles[-1]))
assert len(baseline.articles) == last_len + 1
# take a slice
slc = baseline.articles[last_id:]
assert len(slc) == 2
assert slc[0] == baseline.articles[last_id]
assert slc[1] == baseline.articles[new_articles[-1]['article_id']]
# insert an article with a lower article_id and ensure it gets sorted
# into the "gap" (e.g. testing new articles delivered out of order from
# their article_id
self.run(baseline.new_article(new_articles[2]))
assert len(baseline.articles) == last_len + 2
# take a slice
slc = baseline.articles[last_id:]
assert len(slc) == 3
assert slc[0] == baseline.articles[last_id]
assert slc[1] == baseline.articles[new_articles[2]['article_id']]
assert slc[2] == baseline.articles[new_articles[-1]['article_id']]
def test_article_interaction(self, baseline):
"""Test the article_interaction RPC endpoint."""
# Test a basic sequence of interactions on a single article
article_id = baseline.articles.article_ids[-1]
# Template article_interaction object
template = {'user_id': random_user_id(), 'article_id': article_id}
# Shortcut method to generate an interaction object and pass it to
# the article_interaction RPC
def interaction(**kwargs):
interaction = template.copy()
interaction.update(kwargs)
self.run(baseline.article_interaction(interaction))
# Each article_interaction will update the tally of article_metrics
# in-place, so we just need to make sure it is updated properly for
# each interaction type
metrics = baseline.articles[article_id]['metrics']
# The default we should be starting from
assert metrics == {'likes': 0, 'dislikes': 0, 'clicks': 0,
'bookmarks': 0}
interaction(rating=1, prev_rating=0)
assert metrics['likes'] == 1 and metrics['dislikes'] == 0
interaction(rating=1, prev_rating=0)
assert metrics['likes'] == 2 and metrics['dislikes'] == 0
interaction(rating=-1, prev_rating=1)
assert metrics['likes'] == 1 and metrics['dislikes'] == 1
interaction(rating=0, prev_rating=-1)
assert metrics['likes'] == 1 and metrics['dislikes'] == 0
interaction(rating=-1, prev_rating=0)
assert metrics['likes'] == 1 and metrics['dislikes'] == 1
interaction(bookmarked=True)
assert metrics['bookmarks'] == 1
interaction(bookmarked=True)
assert metrics['bookmarks'] == 2
interaction(bookmarked=False)
assert metrics['bookmarks'] == 1
interaction(clicked=True)
assert metrics['clicks'] == 1
@pytest.mark.parametrize('mode', ['random', 'popular'])
def test_recommend(self, mode, baseline, monkeypatch):
"""Test the recommend RPC endpoint."""
# TODO: This test is bare-bones right now. It needs to be fleshed out
# to ensure
# 1) The returned recommendations are according to the pattern expected
# by the active recommendation mode.
# 2) A specific user is not returned the same recommendations more than
# once.
user_id = random_user_id()
monkeypatch.setattr(baseline, 'recommendation_mode', mode)
recs = self.run(baseline.recommend(user_id))
assert len(recs) <= baseline.RECOMMEND_DEFAULT_LIMIT
assert len(recs) == len(set(recs))
assert sorted(recs, reverse=True) == recs
for rec in recs:
assert rec in baseline.articles
# The following two tests are pretty trivial
def test_assigned_user(self, baseline):
"""Test the assigned_user RPC endpoint."""
user_id = random_user_id()
self.run(baseline.assigned_user(user_id))
assert user_id in baseline.users
def test_unassigned_user(self, baseline):
"""Test the unassigned_user RPC endpoint."""
user_id = random_user_id()
self.run(baseline.assigned_user(user_id))
assert user_id in baseline.users
self.run(baseline.unassigned_user(user_id))
assert user_id not in baseline.users
# Try to unassign the user again--this results in a logged
# warning but should not otherwise be a problem:
self.run(baseline.unassigned_user(user_id))
assert user_id not in baseline.users
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment