Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClockEventLoop class with fixture and test (close #95) #96

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pytest_asyncio/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def pytest_runtest_setup(item):
# to marked test functions
_markers_2_fixtures = {
'asyncio': 'event_loop',
'asyncio_clock': 'clock_event_loop',
}


Expand All @@ -204,6 +205,56 @@ def event_loop(request):
loop.close()


def _clock_event_loop_class():
"""
Create a new class for ClockEventLoop based on the current
class-type produced by `asyncio.new_event_loop()`. This is important
for instances in which the enent-loop-policy has been changed.
"""
class ClockEventLoop(asyncio.new_event_loop().__class__):
"""
A custom event loop that explicitly advances time when requested. Otherwise,
this event loop advances time as expected.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._offset = 0

def time(self):
"""
Return the time according the event loop's clock.

This time is adjusted by the stored offset that allows for advancement
with `advance_time`.
"""
return super().time() + self._offset

def advance_time(self, seconds):
'''
Advance time by a given offset in seconds. Returns an awaitable
that will complete after all tasks scheduled for after advancement
of time are proceeding.
'''
if seconds > 0:
# advance the clock by the given offset
self._offset += seconds

# Once the clock is adjusted, new tasks may have just been
# scheduled for running in the next pass through the event loop
return self.create_task(asyncio.sleep(0))

return ClockEventLoop


@pytest.yield_fixture
def clock_event_loop(request):
"""Create an instance of the default event loop for each test case."""
loop = _clock_event_loop_class()()
asyncio.get_event_loop_policy().set_event_loop(loop)
yield loop
loop.close()


def _unused_tcp_port():
"""Find an unused localhost TCP port from 1024-65535 and return it."""
with contextlib.closing(socket.socket()) as sock:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_simple_35.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,41 @@ async def test_asyncio_marker_method(self, event_loop):
def test_async_close_loop(event_loop):
event_loop.close()
return 'ok'


@pytest.mark.asyncio_clock
async def test_mark_asyncio_clock():
"""
Test that coroutines marked with asyncio_clock are run with a ClockEventLoop
"""
assert hasattr(asyncio.get_event_loop(), 'advance_time')


def test_clock_loop_loop_fixture(clock_event_loop):
"""
Test that the clock_event_loop fixture returns a proper instance of the loop
"""
assert hasattr(asyncio.get_event_loop(), 'advance_time')
clock_event_loop.close()
return 'ok'


@pytest.mark.asyncio_clock
async def test_clock_loop_advance_time(clock_event_loop):
"""
Test the sliding time event loop fixture
"""
# A task is created that will sleep some number of seconds
SLEEP_TIME = 10

# create the task
task = clock_event_loop.create_task(asyncio.sleep(SLEEP_TIME))
assert not task.done()

# start the task
await clock_event_loop.advance_time(0)
assert not task.done()

# process the timeout
await clock_event_loop.advance_time(SLEEP_TIME)
assert task.done()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make test reliable I tend to await explicitly:

TIMEOUT=1    #define earlier in the file

await asyncio.wait_for(task, TIMEOUT)

(and change short_nap/advance to 10)

I do this because though advance_time marks task's sleep as finished, but the exact number of event loop iterations between this and the actual closing of the task is implementation-dependent.