test: implement ser/unser of I2P addresses in functional tests

This commit is contained in:
Vasil Dimov 2021-06-11 15:31:54 +02:00
parent 86742811ce
commit 33e211d2a4
No known key found for this signature in database
GPG Key ID: 54DF06F64B55CBBF

View File

@ -18,6 +18,7 @@ ser_*, deser_*: functions that handle serialization/deserialization.
Classes use __slots__ to ensure extraneous attributes aren't accidentally added Classes use __slots__ to ensure extraneous attributes aren't accidentally added
by tests, compromising their intended effect. by tests, compromising their intended effect.
""" """
from base64 import b32decode, b32encode
from codecs import encode from codecs import encode
import copy import copy
import hashlib import hashlib
@ -207,15 +208,20 @@ class CAddress:
# see https://github.com/bitcoin/bips/blob/master/bip-0155.mediawiki # see https://github.com/bitcoin/bips/blob/master/bip-0155.mediawiki
NET_IPV4 = 1 NET_IPV4 = 1
NET_I2P = 5
ADDRV2_NET_NAME = { ADDRV2_NET_NAME = {
NET_IPV4: "IPv4" NET_IPV4: "IPv4",
NET_I2P: "I2P"
} }
ADDRV2_ADDRESS_LENGTH = { ADDRV2_ADDRESS_LENGTH = {
NET_IPV4: 4 NET_IPV4: 4,
NET_I2P: 32
} }
I2P_PAD = "===="
def __init__(self): def __init__(self):
self.time = 0 self.time = 0
self.nServices = 1 self.nServices = 1
@ -255,24 +261,33 @@ class CAddress:
self.nServices = deser_compact_size(f) self.nServices = deser_compact_size(f)
self.net = struct.unpack("B", f.read(1))[0] self.net = struct.unpack("B", f.read(1))[0]
assert self.net == self.NET_IPV4 assert self.net in (self.NET_IPV4, self.NET_I2P)
address_length = deser_compact_size(f) address_length = deser_compact_size(f)
assert address_length == self.ADDRV2_ADDRESS_LENGTH[self.net] assert address_length == self.ADDRV2_ADDRESS_LENGTH[self.net]
self.ip = socket.inet_ntoa(f.read(4)) addr_bytes = f.read(address_length)
if self.net == self.NET_IPV4:
self.ip = socket.inet_ntoa(addr_bytes)
else:
self.ip = b32encode(addr_bytes)[0:-len(self.I2P_PAD)].decode("ascii").lower() + ".b32.i2p"
self.port = struct.unpack(">H", f.read(2))[0] self.port = struct.unpack(">H", f.read(2))[0]
def serialize_v2(self): def serialize_v2(self):
"""Serialize in addrv2 format (BIP155)""" """Serialize in addrv2 format (BIP155)"""
assert self.net == self.NET_IPV4 assert self.net in (self.NET_IPV4, self.NET_I2P)
r = b"" r = b""
r += struct.pack("<I", self.time) r += struct.pack("<I", self.time)
r += ser_compact_size(self.nServices) r += ser_compact_size(self.nServices)
r += struct.pack("B", self.net) r += struct.pack("B", self.net)
r += ser_compact_size(self.ADDRV2_ADDRESS_LENGTH[self.net]) r += ser_compact_size(self.ADDRV2_ADDRESS_LENGTH[self.net])
r += socket.inet_aton(self.ip) if self.net == self.NET_IPV4:
r += socket.inet_aton(self.ip)
else:
sfx = ".b32.i2p"
assert self.ip.endswith(sfx)
r += b32decode(self.ip[0:-len(sfx)] + self.I2P_PAD, True)
r += struct.pack(">H", self.port) r += struct.pack(">H", self.port)
return r return r