# vim: set fileencoding=utf-8 import unittest from bemani.protocol import EAmuseProtocol, Node class TestProtocol(unittest.TestCase): # Define a function that just encrypts/decrypts and encode/decodes, verify # that we can get the same thing back. def assertLoopback(self, root: Node) -> None: proto = EAmuseProtocol() for encoding in [ EAmuseProtocol.BINARY, EAmuseProtocol.BINARY_DECOMPRESSED, EAmuseProtocol.XML, ]: if encoding == EAmuseProtocol.BINARY: loop_name = "binary" elif encoding == EAmuseProtocol.BINARY_DECOMPRESSED: loop_name = "decompressed binary" elif encoding == EAmuseProtocol.XML: loop_name = "xml" else: raise Exception("Logic error!") binary = proto.encode( None, None, root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding, ) newroot = proto.decode(None, None, binary) self.assertEqual( newroot, root, f"Round trip with {loop_name} and no encryption/compression doesn't match!", ) binary = proto.encode( None, "1-abcdef-0123", root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding, ) newroot = proto.decode(None, "1-abcdef-0123", binary) self.assertEqual( newroot, root, f"Round trip with {loop_name}, encryption and no compression doesn't match!", ) binary = proto.encode( "none", None, root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding, ) newroot = proto.decode("none", None, binary) self.assertEqual( newroot, root, f"Round trip with {loop_name}, encryption and no compression doesn't match!", ) binary = proto.encode( "lz77", None, root, text_encoding=EAmuseProtocol.SHIFT_JIS, packet_encoding=encoding, ) newroot = proto.decode("lz77", None, binary) self.assertEqual( newroot, root, f"Round trip with {loop_name}, no encryption and lz77 compression doesn't match!", ) def test_game_packet1(self) -> None: root = Node.void("call") root.set_attribute("model", "M39:J:B:A:2014061900") root.set_attribute("srcid", "012010000000DEADBEEF") root.set_attribute("tag", "1d0cbcd5") pcbevent = Node.void("pcbevent") root.add_child(pcbevent) pcbevent.set_attribute("method", "put") pcbevent.add_child(Node.time("time", 1438375918)) pcbevent.add_child(Node.u32("seq", value=0)) item = Node.void("item") pcbevent.add_child(item) item.add_child(Node.string("name", "boot")) item.add_child(Node.s32("value", 1)) item.add_child(Node.time("time", 1438375959)) self.assertLoopback(root) def test_game_packet2(self) -> None: root = Node.void("call") root.set_attribute("model", "LDJ:A:A:A:2015060700") root.set_attribute("srcid", "012010000000DEADBEEF") root.set_attribute("tag", "9yU+HH4q") eacoin = Node.void("eacoin") root.add_child(eacoin) eacoin.set_attribute("esdate", "2015-08-01T02:09:23") eacoin.set_attribute( "esid", "177baae4bdf0085f1f3da9b6fed02223ee9b482f62b83a28af704a9c7893a370" ) eacoin.set_attribute("method", "consume") eacoin.add_child(Node.string("sessid", "5666-5524")) eacoin.add_child(Node.s16("sequence", 0)) eacoin.add_child(Node.s32("payment", 420)) eacoin.add_child(Node.s32("service", 0)) eacoin.add_child(Node.string("itemtype", "0")) eacoin.add_child(Node.string("detail", "/eacoin/premium_free_1p_3")) self.assertLoopback(root) def test_game_packet3(self) -> None: root = Node.void("response") root.add_child(Node.void("music")) self.assertLoopback(root) def test_game_packet4(self) -> None: root = Node.void("response") game = Node.void("game") root.add_child(game) game.set_attribute("image_no", "1") game.set_attribute("no", "1") game.add_child(Node.s32("ir_phase", 0)) game.add_child(Node.s32("personal_event_phase", 10)) game.add_child(Node.s32("shop_event_phase", 6)) game.add_child(Node.s32("netvs_phase", 0)) game.add_child(Node.s32("card_phase", 9)) game.add_child(Node.s32("other_phase", 9)) game.add_child(Node.s32("music_open_phase", 8)) game.add_child(Node.s32("collabo_phase", 8)) game.add_child(Node.s32("local_matching_enable", 1)) game.add_child(Node.s32("n_maching_sec", 60)) game.add_child(Node.s32("l_matching_sec", 60)) game.add_child(Node.s32("is_check_cpu", 0)) game.add_child(Node.s32("week_no", 0)) game.add_child(Node.s16_array("sel_ranking", [-1, -1, -1, -1, -1])) game.add_child(Node.s16_array("up_ranking", [-1, -1, -1, -1, -1])) self.assertLoopback(root) def test_game_packet5(self) -> None: root = Node.void("call") root.set_attribute("model", "LDJ:A:A:A:2015060700") root.set_attribute("srcid", "012010000000DEADBEEF") root.set_attribute("tag", "9yU+HH4q") iidx22pc = Node.void("IIDX22pc") root.add_child(iidx22pc) iidx22pc.set_attribute("bookkeep", "0") iidx22pc.set_attribute("cltype", "0") iidx22pc.set_attribute("d_achi", "0") iidx22pc.set_attribute("d_disp_judge", "0") iidx22pc.set_attribute("d_exscore", "0") iidx22pc.set_attribute("d_gno", "0") iidx22pc.set_attribute("d_gtype", "0") iidx22pc.set_attribute("d_hispeed", "0.000000") iidx22pc.set_attribute("d_judge", "0") iidx22pc.set_attribute("d_judgeAdj", "-3") iidx22pc.set_attribute("d_largejudge", "0") iidx22pc.set_attribute("d_lift", "0") iidx22pc.set_attribute("d_notes", "0.000000") iidx22pc.set_attribute("d_opstyle", "0") iidx22pc.set_attribute("d_pace", "0") iidx22pc.set_attribute("d_sdlen", "0") iidx22pc.set_attribute("d_sdtype", "0") iidx22pc.set_attribute("d_sorttype", "0") iidx22pc.set_attribute("d_timing", "0") iidx22pc.set_attribute("d_tune", "0") iidx22pc.set_attribute("dp_opt", "0") iidx22pc.set_attribute("dp_opt2", "0") iidx22pc.set_attribute("gpos", "0") iidx22pc.set_attribute("iidxid", "56665524") iidx22pc.set_attribute("lid", "US-3") iidx22pc.set_attribute("method", "save") iidx22pc.set_attribute("mode", "6") iidx22pc.set_attribute("pmode", "0") iidx22pc.set_attribute("rtype", "0") iidx22pc.set_attribute("s_achi", "4428") iidx22pc.set_attribute("s_disp_judge", "1") iidx22pc.set_attribute("s_exscore", "0") iidx22pc.set_attribute("s_gno", "1") iidx22pc.set_attribute("s_gtype", "2") iidx22pc.set_attribute("s_hispeed", "2.302647") iidx22pc.set_attribute("s_judge", "0") iidx22pc.set_attribute("s_judgeAdj", "-3") iidx22pc.set_attribute("s_largejudge", "0") iidx22pc.set_attribute("s_lift", "60") iidx22pc.set_attribute("s_notes", "29.483595") iidx22pc.set_attribute("s_opstyle", "1") iidx22pc.set_attribute("s_pace", "0") iidx22pc.set_attribute("s_sdlen", "203") iidx22pc.set_attribute("s_sdtype", "1") iidx22pc.set_attribute("s_sorttype", "0") iidx22pc.set_attribute("s_timing", "2") iidx22pc.set_attribute("s_tune", "5") iidx22pc.set_attribute("sp_opt", "8194") pyramid = Node.void("pyramid") pyramid.set_attribute("point", "408") iidx22pc.add_child(pyramid) achievements = Node.void("achievements") iidx22pc.add_child(achievements) achievements.set_attribute("last_weekly", "0") achievements.set_attribute("pack_comp", "0") achievements.set_attribute("pack_flg", "0") achievements.set_attribute("pack_id", "349") achievements.set_attribute("play_pack", "0") achievements.set_attribute("visit_flg", "1125899906842624") achievements.set_attribute("weekly_num", "0") achievements.add_child( Node.s64_array( "trophy", [ 648333697107365824, 120628451491823, 281475567654912, 0, 1069547520, 0, 0, 0, 0, 0, 0, 1125899906842624, 4294967296, 60348585478096, 1498943592322, 0, 256, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -4294967296, 0, 0, 4294967704, 858608469, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], ) ) deller = Node.void("deller") iidx22pc.add_child(deller) deller.set_attribute("deller", "450") self.assertLoopback(root) def test_game_packet6(self) -> None: root = Node.void("response") facility = Node.void("facility") root.add_child(facility) location = Node.void("location") facility.add_child(location) location.add_child(Node.string("id", "US-6")) location.add_child(Node.string("country", "US")) location.add_child(Node.string("region", ".")) location.add_child(Node.string("name", "")) location.add_child(Node.u8("type", 0)) line = Node.void("line") facility.add_child(line) line.add_child(Node.string("id", ".")) line.add_child(Node.u8("class", 0)) portfw = Node.void("portfw") facility.add_child(portfw) portfw.add_child(Node.ipv4("globalip", "10.0.0.1")) portfw.add_child(Node.u16("globalport", 20000)) portfw.add_child(Node.u16("privateport", 20000)) public = Node.void("public") facility.add_child(public) public.add_child(Node.u8("flag", 1)) public.add_child(Node.string("name", ".")) public.add_child(Node.string("latitude", "0")) public.add_child(Node.string("longitude", "0")) share = Node.void("share") facility.add_child(share) eacoin = Node.void("eacoin") share.add_child(eacoin) eacoin.add_child(Node.s32("notchamount", 0)) eacoin.add_child(Node.s32("notchcount", 0)) eacoin.add_child(Node.s32("supplylimit", 1000000)) url = Node.void("url") share.add_child(url) url.add_child(Node.string("eapass", "http://some.dummy.net/")) url.add_child(Node.string("arcadefan", "http://some.dummy.net/")) url.add_child(Node.string("konaminetdx", "http://some.dummy.net/")) url.add_child(Node.string("konamiid", "http://some.dummy.net/")) url.add_child(Node.string("eagate", "http://some.dummy.net/")) self.assertLoopback(root) def test_packet1(self) -> None: root = Node.void("test") root.set_attribute("test", "test string value") # Regular nodes root.add_child(Node.void("void_node")) root.add_child(Node.s8("s8_node", -1)) root.add_child(Node.u8("u8_node", 245)) root.add_child(Node.s16("s16_node", -8000)) root.add_child(Node.u16("u16_node", 65000)) root.add_child(Node.s32("s32_node", -2000000000)) root.add_child(Node.u32("u32_node", 4000000000)) root.add_child(Node.s64("s64_node", -1234567890000)) root.add_child(Node.u64("u64_node", 1234567890000)) root.add_child(Node.binary("bin_node", b"DEADBEEF")) root.add_child(Node.string("str_node", "this is a string!")) root.add_child(Node.ipv4("ip4_node", "192.168.1.24")) root.add_child(Node.time("time_node", 1234567890)) root.add_child(Node.float("float_node", 2.5)) root.add_child(Node.fouru8("4u8_node", [0x20, 0x21, 0x22, 0x23])) root.add_child(Node.bool("bool_true_node", True)) root.add_child(Node.bool("bool_false_node", False)) # Array nodes root.add_child(Node.s8_array("s8_array_node", [-1, -2, 3, 4, -5])) root.add_child(Node.u8_array("u8_array_node", [245, 2, 0, 255, 1])) root.add_child(Node.s16_array("s16_array_node", [-8000, 8000])) root.add_child(Node.u16_array("u16_array_node", [65000, 1, 2, 65535])) root.add_child(Node.s32_array("s32_array_node", [-2000000000, -1])) root.add_child(Node.u32_array("u32_array_node", [4000000000, 0, 1, 2])) root.add_child(Node.s64_array("s64_array_node", [-1234567890000, -1, 1, 1337])) root.add_child( Node.u64_array("u64_array_node", [1234567890000, 123, 456, 7890]) ) root.add_child(Node.time_array("time_array_node", [1234567890, 98765432])) root.add_child(Node.float_array("float_array_node", [2.5, 0.0, 5.0, 20.5])) root.add_child(Node.bool_array("bool_array_node", [False, True, True, False])) # XML escaping escape = Node.string( "escape_test", "\r\n & 'thing' \"thing\" \r\nthing on new line\r\n ", ) escape.set_attribute("test", " & 'thing' \"thing\" \r\n thing") root.add_child(escape) # Unicode unicode_node = Node.string("unicode", "今日は") unicode_node.set_attribute("unicode_attr", "わたし") root.add_child(unicode_node) self.assertLoopback(root)