Source

tracko / tests / test_announce_basic.py

Full commit
import sys, os, time
sys.path.append(os.path.join(os.path.dirname(__file__), '../app'))

import util, logic, models, settings
from bencode.bencode import bdecode
from nose.tools import *


def setup():
    util.clear_datastore()
    assert models.Torrent.all().count() == 0
    assert models.TorrentPeer.all().count() == 0


required_request_param_names = [
    'info_hash', 'peer_id', 'port', 'uploaded', 'downloaded', 'left']

allowed_event_param_values = ['started', 'completed', 'stopped', None]

required_response_key_names_on_success = ['interval', 'peers']

good_start_request_params = dict(
  info_hash='*' * 20,
  peer_id='?' * 20,
  port=str(6681),
  uploaded=str(2**128),
  downloaded=str(2**64),
  left=str(2**16),
  event='started')


def test_reject_request_without_required_param():

  def check_required_param(param_name):
    request_params = good_start_request_params.copy()
    del request_params[param_name]
    response = bdecode(
      logic.process_announce_request(request_params, '0.0.0.0', settings))
    assert_equals(response.keys(), ['failure reason'])
    expected_failure_reason = (
      'Your client didn\'t send required parameter "%s". '
      + 'Please upgrade your client.'
      ) % param_name
    assert_equals(response['failure reason'], expected_failure_reason)

  for param_name in required_request_param_names:
    yield check_required_param, param_name


def test_accept_valid_event_param_value():

  def check_valid_event_param_value(value):
    request_params = good_start_request_params.copy()
    if value is None:
      if 'event' in request_params:
        del request_params['event']
    else:
      request_params['event'] = value
    request_params['compact'] = '0'
    response = bdecode(
      logic.process_announce_request(request_params, '0.0.0.0', settings))
    assert_equals(
      frozenset(response.keys()) & \
        frozenset(required_response_key_names_on_success),
      frozenset(required_response_key_names_on_success))
  
  for value in allowed_event_param_values:
    yield check_valid_event_param_value, value


def test_reject_invalid_event_param_value():

  request_params = good_start_request_params.copy()
  request_params['event'] = 'spam'
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '0.0.0.0', settings))

  assert_equals(response.keys(), ['failure reason'])
  assert_equals(
    response['failure reason'],
    'Your client sent nonstandard value for "event" parameter: "spam".')


@with_setup(util.clear_datastore)
def test_successful_peer_started_request():

  request_params = good_start_request_params.copy()
  request_params['compact'] = '0'
  logic.process_announce_request(request_params, '0.0.0.0', settings)
  t = models.Torrent.get_by_info_hash(request_params['info_hash'])
  assert t is not None

  assert_equals(t.info_hash, request_params['info_hash'])
  assert_equals(len(list(t.peers)), 1)

  p = list(t.peers)[0]
  assert_equals(p.address, '0.0.0.0')
  assert_equals(p.peer_id, request_params['peer_id'])
  assert_equals(p.n_bytes_left, 2 ** 16)
  assert_equals(p.port_number, 6681)


def test_required_response_keys():

  def check_required_key(key_name):
    request_params = good_start_request_params.copy()
    request_params['compact'] = '0'
    response = bdecode(
      logic.process_announce_request(request_params, '0.0.0.0', settings))
    assert key_name in response

  for key_name in required_response_key_names_on_success:
    yield check_required_key, key_name


@with_setup(util.clear_datastore)
def test_required_structure_of_peer_list_response():

  request_params = good_start_request_params.copy()
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '0.0.0.0', settings))
  peers = response['peers']

  assert_equals(len(peers), 1)
  peer_dict = peers[0]
  assert 'peer id' in peer_dict.keys()
  assert 'ip' in peer_dict.keys()
  assert 'port' in peer_dict.keys()


@with_setup(util.clear_datastore)
def test_2_peers():

  # 'started' from peer A

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'a' * 20
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '1.2.3.4', settings))

  assert_equals(len(response['peers']), 1)
  assert_equals(response['peers'][0]['peer id'], 'a' * 20)
  assert_equals(response['peers'][0]['ip'], '1.2.3.4')

  # 'started' from peer B

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'b' * 20
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '5.6.7.8', settings))

  sorted_peers = sorted(response['peers'], key=lambda p: p['peer id'])
  assert_equals(len(sorted_peers), 2)
  assert_equals(sorted_peers[0]['peer id'], 'a' * 20)
  assert_equals(sorted_peers[0]['ip'], '1.2.3.4')
  assert_equals(sorted_peers[1]['peer id'], 'b' * 20)
  assert_equals(sorted_peers[1]['ip'], '5.6.7.8')

  # 'completed' from peer A

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'a' * 20
  request_params['event'] = 'completed'
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '1.2.3.4', settings))

  assert_equals(len(response['peers']), 2)

  # 'stopped' from peer B

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'b' * 20
  request_params['event'] = 'stopped'
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '5.6.7.8', settings))

  assert_equals(len(response['peers']), 1)
  assert_equals(response['peers'][0]['peer id'], 'a' * 20)

  # 'stopped' from peer A

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'a' * 20
  request_params['event'] = 'stopped'
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '1.2.3.4', settings))

  assert_equals(len(response['peers']), 0)


@with_setup(util.clear_datastore)
def test_explicit_ip_from_request_takes_precedence():

  request_params = good_start_request_params.copy()
  request_params['ip'] = '127.0.0.1'
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '1.2.3.4', settings))

  assert_equals(len(response['peers']), 1)
  assert_equals(response['peers'][0]['ip'], '127.0.0.1')


@with_setup(util.clear_datastore)
def test_noncompact_mode_allows_ipv6():

  request_params = good_start_request_params.copy()
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(
      request_params, 'F0:0:0:103:0:FFFF:0A0A:0A05', settings))

  assert_equals(len(response['peers']), 1)
  assert_equals(response['peers'][0]['ip'], 'F0:0:0:103:0:FFFF:0A0A:0A05')


@with_setup(util.clear_datastore)
def test_expired_peers_are_not_reported():

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'a' * 20
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '1.2.3.4', settings))

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'b' * 20
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '2.3.4.5', settings))

  assert_true(len(response['peers']) > 0)

  settings.set_values(
    min_peer_expiration_period=2,
    recommended_announce_interval=1)
  time.sleep(3) # wait for existing peers to expire

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'c' * 20
  request_params['compact'] = '0'
  response = bdecode(
    logic.process_announce_request(request_params, '3.4.5.6', settings))

  assert_equals(
    [], [p for p in response['peers'] if p['peer id'] in ('a' * 20, 'b' * 20)])


@with_setup(util.clear_datastore)
def test_large_byte_counts_are_accepted():

  request_params = good_start_request_params.copy()
  request_params['peer_id'] = 'x' * 20
  request_params['compact'] = '0'
  request_params['uploaded'] = str(2**1024 - 1)
  request_params['downloaded'] = str(3**512 - 1)
  request_params['left'] = str(3**512 - 1)
  logic.process_announce_request(request_params, '1.2.3.4', settings)

  p = models.TorrentPeer.get_by_peer_id('x' * 20)
  assert_equals(p.n_bytes_left, 3**512 - 1)