1
0
mirror of synced 2024-11-24 22:40:12 +01:00
bemaniutils/bemani/tests/test_protocol.py

377 lines
14 KiB
Python
Raw Normal View History

# vim: set fileencoding=utf-8
import unittest
from typing import Optional
from bemani.protocol import EAmuseProtocol, Node
class TestProtocol(unittest.TestCase):
# Define a function that just encrypts/decrypts and encode/decodes, verify
# that we can get the same thing back.
def assertLoopback(self, root: Node) -> Optional[int]:
proto = EAmuseProtocol()
for encoding in [EAmuseProtocol.BINARY, EAmuseProtocol.XML]:
if encoding == EAmuseProtocol.BINARY:
loop_name = "binary"
elif encoding == EAmuseProtocol.XML:
loop_name = "xml"
else:
raise Exception("Logic error!")
binary = proto.encode(None, None, root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding)
newroot = proto.decode(None, None, binary)
self.assertEqual(newroot, root, f"Round trip with {loop_name} and no encryption/compression doesn't match!")
binary = proto.encode(None, '1-abcdef-0123', root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding)
newroot = proto.decode(None, '1-abcdef-0123', binary)
self.assertEqual(newroot, root, f"Round trip with {loop_name}, encryption and no compression doesn't match!")
binary = proto.encode('none', None, root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding)
newroot = proto.decode('none', None, binary)
self.assertEqual(newroot, root, f"Round trip with {loop_name}, encryption and no compression doesn't match!")
binary = proto.encode('lz77', None, root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding)
newroot = proto.decode('lz77', None, binary)
self.assertEqual(newroot, root, f"Round trip with {loop_name}, no encryption and lz77 compression doesn't match!")
def test_game_packet1(self) -> Node:
root = Node.void('call')
root.set_attribute('model', 'M39:J:B:A:2014061900')
root.set_attribute('srcid', '012010000000DEADBEEF')
root.set_attribute('tag', '1d0cbcd5')
pcbevent = Node.void('pcbevent')
root.add_child(pcbevent)
pcbevent.set_attribute('method', 'put')
pcbevent.add_child(Node.time('time', 1438375918))
pcbevent.add_child(Node.u32('seq', value=0))
item = Node.void('item')
pcbevent.add_child(item)
item.add_child(Node.string('name', 'boot'))
item.add_child(Node.s32('value', 1))
item.add_child(Node.time('time', 1438375959))
self.assertLoopback(root)
def test_game_packet2(self) -> Node:
root = Node.void('call')
root.set_attribute('model', 'LDJ:A:A:A:2015060700')
root.set_attribute('srcid', '012010000000DEADBEEF')
root.set_attribute('tag', '9yU+HH4q')
eacoin = Node.void('eacoin')
root.add_child(eacoin)
eacoin.set_attribute('esdate', '2015-08-01T02:09:23')
eacoin.set_attribute('esid', '177baae4bdf0085f1f3da9b6fed02223ee9b482f62b83a28af704a9c7893a370')
eacoin.set_attribute('method', 'consume')
eacoin.add_child(Node.string('sessid', '5666-5524'))
eacoin.add_child(Node.s16('sequence', 0))
eacoin.add_child(Node.s32('payment', 420))
eacoin.add_child(Node.s32('service', 0))
eacoin.add_child(Node.string('itemtype', '0'))
eacoin.add_child(Node.string('detail', '/eacoin/premium_free_1p_3'))
self.assertLoopback(root)
def test_game_packet3(self) -> Node:
root = Node.void('response')
root.add_child(Node.void('music'))
self.assertLoopback(root)
def test_game_packet4(self) -> Node:
root = Node.void('response')
game = Node.void('game')
root.add_child(game)
game.set_attribute('image_no', '1')
game.set_attribute('no', '1')
game.add_child(Node.s32('ir_phase', 0))
game.add_child(Node.s32('personal_event_phase', 10))
game.add_child(Node.s32('shop_event_phase', 6))
game.add_child(Node.s32('netvs_phase', 0))
game.add_child(Node.s32('card_phase', 9))
game.add_child(Node.s32('other_phase', 9))
game.add_child(Node.s32('music_open_phase', 8))
game.add_child(Node.s32('collabo_phase', 8))
game.add_child(Node.s32('local_matching_enable', 1))
game.add_child(Node.s32('n_maching_sec', 60))
game.add_child(Node.s32('l_matching_sec', 60))
game.add_child(Node.s32('is_check_cpu', 0))
game.add_child(Node.s32('week_no', 0))
game.add_child(Node.s16_array('sel_ranking', [-1, -1, -1, -1, -1]))
game.add_child(Node.s16_array('up_ranking', [-1, -1, -1, -1, -1]))
self.assertLoopback(root)
def test_game_packet5(self) -> Node:
root = Node.void('call')
root.set_attribute('model', 'LDJ:A:A:A:2015060700')
root.set_attribute('srcid', '012010000000DEADBEEF')
root.set_attribute('tag', '9yU+HH4q')
iidx22pc = Node.void('IIDX22pc')
root.add_child(iidx22pc)
iidx22pc.set_attribute('bookkeep', '0')
iidx22pc.set_attribute('cltype', '0')
iidx22pc.set_attribute('d_achi', '0')
iidx22pc.set_attribute('d_disp_judge', '0')
iidx22pc.set_attribute('d_exscore', '0')
iidx22pc.set_attribute('d_gno', '0')
iidx22pc.set_attribute('d_gtype', '0')
iidx22pc.set_attribute('d_hispeed', '0.000000')
iidx22pc.set_attribute('d_judge', '0')
iidx22pc.set_attribute('d_judgeAdj', '-3')
iidx22pc.set_attribute('d_largejudge', '0')
iidx22pc.set_attribute('d_lift', '0')
iidx22pc.set_attribute('d_notes', '0.000000')
iidx22pc.set_attribute('d_opstyle', '0')
iidx22pc.set_attribute('d_pace', '0')
iidx22pc.set_attribute('d_sdlen', '0')
iidx22pc.set_attribute('d_sdtype', '0')
iidx22pc.set_attribute('d_sorttype', '0')
iidx22pc.set_attribute('d_timing', '0')
iidx22pc.set_attribute('d_tune', '0')
iidx22pc.set_attribute('dp_opt', '0')
iidx22pc.set_attribute('dp_opt2', '0')
iidx22pc.set_attribute('gpos', '0')
iidx22pc.set_attribute('iidxid', '56665524')
iidx22pc.set_attribute('lid', 'US-3')
iidx22pc.set_attribute('method', 'save')
iidx22pc.set_attribute('mode', '6')
iidx22pc.set_attribute('pmode', '0')
iidx22pc.set_attribute('rtype', '0')
iidx22pc.set_attribute('s_achi', '4428')
iidx22pc.set_attribute('s_disp_judge', '1')
iidx22pc.set_attribute('s_exscore', '0')
iidx22pc.set_attribute('s_gno', '1')
iidx22pc.set_attribute('s_gtype', '2')
iidx22pc.set_attribute('s_hispeed', '2.302647')
iidx22pc.set_attribute('s_judge', '0')
iidx22pc.set_attribute('s_judgeAdj', '-3')
iidx22pc.set_attribute('s_largejudge', '0')
iidx22pc.set_attribute('s_lift', '60')
iidx22pc.set_attribute('s_notes', '29.483595')
iidx22pc.set_attribute('s_opstyle', '1')
iidx22pc.set_attribute('s_pace', '0')
iidx22pc.set_attribute('s_sdlen', '203')
iidx22pc.set_attribute('s_sdtype', '1')
iidx22pc.set_attribute('s_sorttype', '0')
iidx22pc.set_attribute('s_timing', '2')
iidx22pc.set_attribute('s_tune', '5')
iidx22pc.set_attribute('sp_opt', '8194')
pyramid = Node.void('pyramid')
pyramid.set_attribute('point', '408')
iidx22pc.add_child(pyramid)
achievements = Node.void('achievements')
iidx22pc.add_child(achievements)
achievements.set_attribute('last_weekly', '0')
achievements.set_attribute('pack_comp', '0')
achievements.set_attribute('pack_flg', '0')
achievements.set_attribute('pack_id', '349')
achievements.set_attribute('play_pack', '0')
achievements.set_attribute('visit_flg', '1125899906842624')
achievements.set_attribute('weekly_num', '0')
achievements.add_child(Node.s64_array('trophy', [
648333697107365824,
120628451491823,
281475567654912,
0,
1069547520,
0,
0,
0,
0,
0,
0,
1125899906842624,
4294967296,
60348585478096,
1498943592322,
0,
256,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
-4294967296,
0,
0,
4294967704,
858608469,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
5,
2,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]))
deller = Node.void('deller')
iidx22pc.add_child(deller)
deller.set_attribute('deller', '450')
self.assertLoopback(root)
def test_game_packet6(self) -> Node:
root = Node.void('response')
facility = Node.void('facility')
root.add_child(facility)
location = Node.void('location')
facility.add_child(location)
location.add_child(Node.string('id', 'US-6'))
location.add_child(Node.string('country', 'US'))
location.add_child(Node.string('region', '.'))
location.add_child(Node.string('name', ''))
location.add_child(Node.u8('type', 0))
line = Node.void('line')
facility.add_child(line)
line.add_child(Node.string('id', '.'))
line.add_child(Node.u8('class', 0))
portfw = Node.void('portfw')
facility.add_child(portfw)
portfw.add_child(Node.ipv4('globalip', '10.0.0.1'))
portfw.add_child(Node.u16('globalport', 20000))
portfw.add_child(Node.u16('privateport', 20000))
public = Node.void('public')
facility.add_child(public)
public.add_child(Node.u8('flag', 1))
public.add_child(Node.string('name', '.'))
public.add_child(Node.string('latitude', '0'))
public.add_child(Node.string('longitude', '0'))
share = Node.void('share')
facility.add_child(share)
eacoin = Node.void('eacoin')
share.add_child(eacoin)
eacoin.add_child(Node.s32('notchamount', 0))
eacoin.add_child(Node.s32('notchcount', 0))
eacoin.add_child(Node.s32('supplylimit', 1000000))
url = Node.void('url')
share.add_child(url)
url.add_child(Node.string('eapass', 'http://some.dummy.net/'))
url.add_child(Node.string('arcadefan', 'http://some.dummy.net/'))
url.add_child(Node.string('konaminetdx', 'http://some.dummy.net/'))
url.add_child(Node.string('konamiid', 'http://some.dummy.net/'))
url.add_child(Node.string('eagate', 'http://some.dummy.net/'))
self.assertLoopback(root)
def test_packet1(self) -> Node:
root = Node.void('test')
root.set_attribute('test', 'test string value')
# Regular nodes
root.add_child(Node.void('void_node'))
root.add_child(Node.s8('s8_node', -1))
root.add_child(Node.u8('u8_node', 245))
root.add_child(Node.s16('s16_node', -8000))
root.add_child(Node.u16('u16_node', 65000))
root.add_child(Node.s32('s32_node', -2000000000))
root.add_child(Node.u32('u32_node', 4000000000))
root.add_child(Node.s64('s64_node', -1234567890000))
root.add_child(Node.u64('u64_node', 1234567890000))
root.add_child(Node.binary('bin_node', b'DEADBEEF'))
root.add_child(Node.string('str_node', 'this is a string!'))
root.add_child(Node.ipv4('ip4_node', '192.168.1.24'))
root.add_child(Node.time('time_node', 1234567890))
root.add_child(Node.float('float_node', 2.5))
root.add_child(Node.fouru8('4u8_node', [0x20, 0x21, 0x22, 0x23]))
root.add_child(Node.bool('bool_true_node', True))
root.add_child(Node.bool('bool_false_node', False))
# Array nodes
root.add_child(Node.s8_array('s8_array_node', [-1, -2, 3, 4, -5]))
root.add_child(Node.u8_array('u8_array_node', [245, 2, 0, 255, 1]))
root.add_child(Node.s16_array('s16_array_node', [-8000, 8000]))
root.add_child(Node.u16_array('u16_array_node', [65000, 1, 2, 65535]))
root.add_child(Node.s32_array('s32_array_node', [-2000000000, -1]))
root.add_child(Node.u32_array('u32_array_node', [4000000000, 0, 1, 2]))
root.add_child(Node.s64_array('s64_array_node', [-1234567890000, -1, 1, 1337]))
root.add_child(Node.u64_array('u64_array_node', [1234567890000, 123, 456, 7890]))
root.add_child(Node.time_array('time_array_node', [1234567890, 98765432]))
root.add_child(Node.float_array('float_array_node', [2.5, 0.0, 5.0, 20.5]))
root.add_child(Node.bool_array('bool_array_node', [False, True, True, False]))
# XML escaping
escape = Node.string('escape_test', '\r\n<testing> & \'thing\' "thing" \r\nthing on new line\r\n ')
escape.set_attribute('test', '<testing> & \'thing\' "thing" \r\n thing')
root.add_child(escape)
# Unicode
unicode_node = Node.string('unicode', '今日は')
unicode_node.set_attribute('unicode_attr', 'わたし')
root.add_child(unicode_node)
self.assertLoopback(root)