1
0
mirror of synced 2024-11-15 02:17:36 +01:00
bemaniutils/bemani/client/base.py

410 lines
15 KiB
Python

import time
from typing import Optional, Dict, List, Tuple, Any
from typing_extensions import Final
from bemani.client.common import random_hex_string
from bemani.client.protocol import ClientProtocol
from bemani.protocol import Node
class BaseClient:
"""
The base client that all client emulators subclass from. This includes
a lot of functionality to create cards, exchange packets, verify responses
and verify some basic packets that are always expected to work.
"""
CARD_OK: Final[int] = 0
CARD_NEW: Final[int] = 112
CARD_BAD_PIN: Final[int] = 116
CARD_NOT_ALLOWED: Final[int] = 110
CORRECT_PASSWORD: Final[str] = '1234'
WRONG_PASSWORD: Final[str] = '4321'
def __init__(self, proto: ClientProtocol, pcbid: str, config: Dict[str, Any]) -> None:
self.__proto = proto
self.pcbid = pcbid
self.config = config
def random_card(self) -> str:
return "E004" + random_hex_string(12, caps=True)
def call_node(self) -> Node:
call = Node.void('call')
call.set_attribute('model', self.config['model'])
call.set_attribute('srcid', self.pcbid)
call.set_attribute('tag', random_hex_string(8))
return call
def exchange(self, path: str, tree: Node) -> Node:
module = tree.children[0].name
method = tree.children[0].attribute('method')
return self.__proto.exchange(
f'{path}?model={self.config["model"]}&module={module}&method={method}',
tree,
)
def __assert_path(self, root: Node, path: str) -> bool:
parts = path.split('/')
children = [root]
node: Optional[Node] = None
for part in parts:
if part[0] == '@':
# Verify attribute, should be last part in chain so
# assume its the first node
if node is None:
return False
if part[1:] not in node.attributes:
return False
else:
return True
else:
# Verify node name, might be last in chain
found = False
for child in children:
if child.name == part:
# This is a valid node, set to children and keep going
children = child.children
node = child
found = True
break
if not found:
# Didn't find a noce named this
return False
# Traversed whole chain
return True
def assert_path(self, root: Node, path: str) -> None:
"""
Given a root node and a path string such as a/b/node or a/b/@attr,
validate that the root node has decendents that match the path.
As a convenience, you can check an attribute on a node with @attr
format, where <attr> is the string name of the attribute.
"""
if not self.__assert_path(root, path):
raise Exception(f'Path \'{path}\' not found in root node:\n{root}')
def verify_services_get(self, expected_services: List[str]=[]) -> None:
call = self.call_node()
# Construct node
services = Node.void('services')
call.add_child(services)
services.set_attribute('method', 'get')
if self.config['avs'] is not None:
# Some older games don't include this info
info = Node.void('info')
services.add_child(info)
info.add_child(Node.string('AVS2', self.config['avs']))
# Swap with server
resp = self.exchange('core/services', call)
# Verify that response is correct
self.assert_path(resp, "response/services")
items = resp.child('services').children
returned_services = []
for item in items:
# Make sure it is an item with a url component
self.assert_path(item, 'item/@url')
# Get list of services provided
returned_services.append(item.attribute('name'))
for service in expected_services:
if service not in returned_services:
raise Exception(f'Service \'{service}\' expected but not returned')
def verify_pcbtracker_alive(self, ecflag: int = 1) -> bool:
call = self.call_node()
# Construct node
pcbtracker = Node.void('pcbtracker')
call.add_child(pcbtracker)
pcbtracker.set_attribute('accountid', self.pcbid)
pcbtracker.set_attribute('ecflag', str(ecflag))
pcbtracker.set_attribute('hardid', '01000027584F6D3A')
pcbtracker.set_attribute('method', 'alive')
pcbtracker.set_attribute('softid', '00010203040506070809')
# Swap with server
resp = self.exchange('core/pcbtracker', call)
# Verify that response is correct
self.assert_path(resp, "response/pcbtracker/@ecenable")
# Print out setting
enable = int(resp.child('pcbtracker').attribute('ecenable'))
if enable != 0:
return True
return False
def verify_message_get(self) -> None:
call = self.call_node()
# Construct node
message = Node.void('message')
call.add_child(message)
message.set_attribute('method', 'get')
# Swap with server
resp = self.exchange('core/message', call)
# Verify that response is correct
self.assert_path(resp, "response/message/@status")
def verify_dlstatus_progress(self) -> None:
call = self.call_node()
# Construct node
dlstatus = Node.void('dlstatus')
call.add_child(dlstatus)
dlstatus.set_attribute('method', 'progress')
dlstatus.add_child(Node.s32('progress', 0))
# Swap with server
resp = self.exchange('core/dlstatus', call)
# Verify that response is correct
self.assert_path(resp, "response/dlstatus/@status")
def verify_package_list(self) -> None:
call = self.call_node()
# Construct node
package = Node.void('package')
call.add_child(package)
package.set_attribute('method', 'list')
package.set_attribute('pkgtype', 'all')
# Swap with server
resp = self.exchange('core/package', call)
# Verify that response is correct
self.assert_path(resp, "response/package")
def verify_facility_get(self, encoding: str='SHIFT_JIS') -> str:
call = self.call_node()
# Construct node
facility = Node.void('facility')
call.add_child(facility)
facility.set_attribute('encoding', encoding)
facility.set_attribute('method', 'get')
# Swap with server
resp = self.exchange('core/facility', call)
# Verify that response is correct
self.assert_path(resp, "response/facility/location/id")
self.assert_path(resp, "response/facility/line")
self.assert_path(resp, "response/facility/portfw")
self.assert_path(resp, "response/facility/public")
self.assert_path(resp, "response/facility/share")
return resp.child_value('facility/location/id')
def verify_pcbevent_put(self) -> None:
call = self.call_node()
# Construct node
pcbevent = Node.void('pcbevent')
call.add_child(pcbevent)
pcbevent.set_attribute('method', 'put')
pcbevent.add_child(Node.time('time', int(time.time())))
pcbevent.add_child(Node.u32('seq', 0))
item = Node.void('item')
pcbevent.add_child(item)
item.add_child(Node.string('name', 'boot'))
item.add_child(Node.s32('value', 1))
item.add_child(Node.time('time', int(time.time())))
# Swap with server
resp = self.exchange('core/pcbevent', call)
# Verify that response is correct
self.assert_path(resp, "response/pcbevent")
def verify_cardmng_inquire(self, card_id: str, msg_type: str, paseli_enabled: bool) -> Optional[str]:
call = self.call_node()
# Construct node
cardmng = Node.void('cardmng')
call.add_child(cardmng)
cardmng.set_attribute('cardid', card_id)
cardmng.set_attribute('cardtype', '1')
cardmng.set_attribute('method', 'inquire')
cardmng.set_attribute('update', '0')
if msg_type == 'new' and 'old_profile_model' in self.config:
cardmng.set_attribute('model', self.config['old_profile_model'])
# Swap with server
resp = self.exchange('core/cardmng', call)
if msg_type == 'unregistered':
# Verify that response is correct
self.assert_path(resp, "response/cardmng/@status")
# Verify that we weren't found
status = int(resp.child('cardmng').attribute('status'))
if status != self.CARD_NEW:
raise Exception(f'Card \'{card_id}\' returned invalid status \'{status}\'')
# Nothing to return
return None
elif msg_type == 'new':
# Verify that response is correct
self.assert_path(resp, "response/cardmng/@refid")
self.assert_path(resp, "response/cardmng/@binded")
self.assert_path(resp, "response/cardmng/@newflag")
self.assert_path(resp, "response/cardmng/@ecflag")
binded = int(resp.child('cardmng').attribute('binded'))
newflag = int(resp.child('cardmng').attribute('newflag'))
ecflag = int(resp.child('cardmng').attribute('ecflag'))
if binded != 0:
raise Exception(f'Card \'{card_id}\' returned invalid binded value \'{binded}\'')
if newflag != 1:
raise Exception(f'Card \'{card_id}\' returned invalid newflag value \'{newflag}\'')
if ecflag != (1 if paseli_enabled else 0):
raise Exception(f'Card \'{card_id}\' returned invalid ecflag value \'{newflag}\'')
# Return the refid
return resp.child('cardmng').attribute('refid')
elif msg_type == 'query':
# Verify that response is correct
self.assert_path(resp, "response/cardmng/@refid")
self.assert_path(resp, "response/cardmng/@binded")
self.assert_path(resp, "response/cardmng/@newflag")
self.assert_path(resp, "response/cardmng/@ecflag")
binded = int(resp.child('cardmng').attribute('binded'))
newflag = int(resp.child('cardmng').attribute('newflag'))
ecflag = int(resp.child('cardmng').attribute('ecflag'))
if binded != 1:
raise Exception(f'Card \'{card_id}\' returned invalid binded value \'{binded}\'')
if newflag != 0:
raise Exception(f'Card \'{card_id}\' returned invalid newflag value \'{newflag}\'')
if ecflag != (1 if paseli_enabled else 0):
raise Exception(f'Card \'{card_id}\' returned invalid ecflag value \'{newflag}\'')
# Return the refid
return resp.child('cardmng').attribute('refid')
else:
raise Exception(f'Unrecognized message type \'{msg_type}\'')
def verify_cardmng_getrefid(self, card_id: str) -> str:
call = self.call_node()
# Construct node
cardmng = Node.void('cardmng')
call.add_child(cardmng)
cardmng.set_attribute('cardid', card_id)
cardmng.set_attribute('cardtype', '1')
cardmng.set_attribute('method', 'getrefid')
cardmng.set_attribute('newflag', '0')
cardmng.set_attribute('passwd', self.CORRECT_PASSWORD)
# Swap with server
resp = self.exchange('core/cardmng', call)
# Verify that response is correct
self.assert_path(resp, "response/cardmng/@refid")
return resp.child('cardmng').attribute('refid')
def verify_cardmng_authpass(self, ref_id: str, correct: bool) -> None:
call = self.call_node()
# Construct node
cardmng = Node.void('cardmng')
call.add_child(cardmng)
cardmng.set_attribute('method', 'authpass')
cardmng.set_attribute('pass', self.CORRECT_PASSWORD if correct else self.CORRECT_PASSWORD[::-1])
cardmng.set_attribute('refid', ref_id)
# Swap with server
resp = self.exchange('core/cardmng', call)
# Verify that response is correct
self.assert_path(resp, "response/cardmng/@status")
status = int(resp.child('cardmng').attribute('status'))
if status != (self.CARD_OK if correct else self.CARD_BAD_PIN):
raise Exception(f'Ref ID \'{ref_id}\' returned invalid status \'{status}\'')
def verify_eacoin_checkin(self, card_id: str) -> Tuple[str, int]:
call = self.call_node()
# Construct node
eacoin = Node.void('eacoin')
call.add_child(eacoin)
eacoin.set_attribute('method', 'checkin')
eacoin.add_child(Node.string('cardtype', '1'))
eacoin.add_child(Node.string('cardid', card_id))
eacoin.add_child(Node.string('passwd', self.CORRECT_PASSWORD))
eacoin.add_child(Node.string('ectype', '1'))
# Swap with server
resp = self.exchange('core/eacoin', call)
# Verify that response is correct
self.assert_path(resp, "response/eacoin/sessid")
self.assert_path(resp, "response/eacoin/balance")
return (resp.child('eacoin').child_value('sessid'), resp.child('eacoin').child_value('balance'))
def verify_eacoin_consume(self, sessid: str, balance: int, amount: int) -> None:
call = self.call_node()
# Construct node
eacoin = Node.void('eacoin')
call.add_child(eacoin)
eacoin.set_attribute('method', 'consume')
eacoin.add_child(Node.string('sessid', sessid))
eacoin.add_child(Node.s16('sequence', 0))
eacoin.add_child(Node.s32('payment', amount))
eacoin.add_child(Node.s32('service', 0))
eacoin.add_child(Node.string('itemtype', '0'))
eacoin.add_child(Node.string('detail', '/eacoin/start_pt1'))
# Swap with server
resp = self.exchange('core/eacoin', call)
# Verify that response is correct
self.assert_path(resp, "response/eacoin/balance")
newbalance = resp.child('eacoin').child_value('balance')
if balance - amount != newbalance:
raise Exception(f"Expected to get back balance {balance - amount} but got {newbalance}")
def verify_eacoin_checkout(self, session: str) -> None:
call = self.call_node()
# Construct node
eacoin = Node.void('eacoin')
call.add_child(eacoin)
eacoin.set_attribute('method', 'checkout')
eacoin.add_child(Node.string('sessid', session))
# Swap with server
resp = self.exchange('core/eacoin', call)
# Verify that response is correct
self.assert_path(resp, "response/eacoin/@status")
def verify(self, cardid: Optional[str]) -> None:
raise Exception('Override in subclass!')