-
Notifications
You must be signed in to change notification settings - Fork 84
Expand file tree
/
Copy pathtest_contract_client.py
More file actions
201 lines (152 loc) · 7.13 KB
/
Copy pathtest_contract_client.py
File metadata and controls
201 lines (152 loc) · 7.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""AllwaysContractClient.substrate_call: reconnect-on-WS-error retry logic.
The wrapper handles transient substrate WebSocket deaths transparently:
on a connection-class exception it invokes the owner-supplied reconnect
callback (which must replace ``client.subtensor`` with a fresh handle)
and retries the call once. Anything else propagates as-is.
"""
from unittest.mock import MagicMock
import pytest
from websockets.exceptions import ConnectionClosed
from allways.contract_client import AllwaysContractClient
def make_subtensor() -> MagicMock:
sub = MagicMock()
sub.substrate = MagicMock()
return sub
class TestSubstrateCall:
def test_returns_value_when_call_succeeds(self):
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
result = client.substrate_call(lambda s: 'ok')
assert result == 'ok'
def test_no_reconnect_callback_re_raises(self):
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
def fn(s):
raise ConnectionClosed(None, None)
with pytest.raises(ConnectionClosed):
client.substrate_call(fn)
def test_reconnects_and_retries_on_connection_closed(self):
first = make_subtensor()
second = make_subtensor()
calls = []
def fn(s):
calls.append(s)
if s is first.substrate:
raise ConnectionClosed(None, None)
return 'recovered'
client = AllwaysContractClient(contract_address='5xx', subtensor=first)
client.reconnect_subtensor = lambda: setattr(client, 'subtensor', second)
result = client.substrate_call(fn)
assert result == 'recovered'
assert calls == [first.substrate, second.substrate]
def test_retries_on_timeout_and_connection_error(self):
for exc in (TimeoutError(), ConnectionError()):
first = make_subtensor()
second = make_subtensor()
attempts = [0]
def fn(s, exc=exc):
attempts[0] += 1
if attempts[0] == 1:
raise exc
return 'ok'
client = AllwaysContractClient(contract_address='5xx', subtensor=first)
client.reconnect_subtensor = lambda: setattr(client, 'subtensor', second)
assert client.substrate_call(fn) == 'ok'
assert attempts[0] == 2
def test_reconnect_callback_failure_re_raises_original(self):
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
original = ConnectionClosed(None, None)
def fn(s):
raise original
def failing_reconnect():
raise RuntimeError('cannot rebuild')
client.reconnect_subtensor = failing_reconnect
with pytest.raises(ConnectionClosed) as ei:
client.substrate_call(fn)
assert ei.value is original
def test_does_not_retry_on_non_connection_error(self):
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
reconnect = MagicMock()
client.reconnect_subtensor = reconnect
def fn(s):
raise ValueError('bad payload')
with pytest.raises(ValueError):
client.substrate_call(fn)
reconnect.assert_not_called()
def test_second_attempt_failure_propagates(self):
first = make_subtensor()
second = make_subtensor()
def fn(s):
raise ConnectionClosed(None, None)
client = AllwaysContractClient(contract_address='5xx', subtensor=first)
client.reconnect_subtensor = lambda: setattr(client, 'subtensor', second)
with pytest.raises(ConnectionClosed):
client.substrate_call(fn)
class TestPendingExtensionDecode:
"""SCALE decode of get_pending_*_extension Option<PendingExtension> payloads."""
@staticmethod
def _encode_some(submitter_bytes: bytes, target: int, proposed: int) -> bytes:
import struct
return b'\x01' + submitter_bytes + struct.pack('<II', target, proposed)
def test_decodes_some(self):
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
submitter = bytes(range(32))
payload = self._encode_some(submitter, target=12345, proposed=12000)
result = client._decode_pending_extension(payload)
assert result is not None
assert result.target_block == 12345
assert result.proposed_at == 12000
def test_decodes_none_discriminant(self):
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
assert client._decode_pending_extension(b'\x00') is None
def test_empty_payload_returns_none(self):
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
assert client._decode_pending_extension(b'') is None
def test_unexpected_discriminant_returns_none(self):
# Anything other than 0x00/0x01 is malformed; treat as None rather than
# raising — matches the existing get_reservation_data shape.
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
assert client._decode_pending_extension(b'\x02junk') is None
class TestExtensionSelectorWiring:
"""Selectors and arg schemas are wired up for every new method."""
@pytest.mark.parametrize(
'method,expected_selector',
[
('propose_extend_reservation', '9c9a8e8e'),
('challenge_extend_reservation', '40b77e21'),
('finalize_extend_reservation', 'baf47953'),
('propose_extend_timeout', '94c87a1d'),
('challenge_extend_timeout', '682cf8eb'),
('finalize_extend_timeout', 'b23b4d80'),
('get_pending_reservation_extension', 'd79424b8'),
('get_pending_timeout_extension', '6bd06828'),
],
)
def test_selector_matches_metadata(self, method, expected_selector):
from allways.contract_client import CONTRACT_SELECTORS
assert CONTRACT_SELECTORS[method].hex() == expected_selector
def test_propose_reservation_args_encode(self):
# Spot-check that encode_args runs end-to-end for the three-field
# propose call (the most complex new signature).
client = AllwaysContractClient(contract_address='5xx', subtensor=make_subtensor())
encoded = client.encode_args(
'propose_extend_reservation',
{
'miner': bytes(32),
'from_tx_hash': bytes(32),
'target_block': 100,
},
)
# 32 (miner) + 32 (hash) + 4 (u32 LE) = 68 bytes
assert len(encoded) == 68
assert encoded[-4:] == (100).to_bytes(4, 'little')
def test_new_error_variants_present(self):
from allways.contract_client import CONTRACT_ERROR_VARIANTS
names = {CONTRACT_ERROR_VARIANTS[i][0] for i in range(27, 34)}
assert names == {
'ProposalAlreadyPending',
'ChallengeWindowOpen',
'ChallengeWindowClosed',
'NoProposal',
'ExtensionTooLong',
'TargetNotForward',
'InvalidTarget',
}