1
0
mirror of synced 2024-11-15 02:17:36 +01:00
bemaniutils/bemani/utils/bemanishark.py

96 lines
3.7 KiB
Python
Raw Normal View History

from typing import Optional
import argparse
from bemani.sniff import Sniffer
from bemani.protocol import EAmuseProtocol, EAmuseException
from bemani.common import HTTP
def mainloop(address: Optional[str]=None, port: int=80, verbose: bool=False) -> None:
"""
Main loop of BEMANIShark. Starts an instance of Sniffer and EAmuseProtocol and does a
lazy job of banging them together with the above HTTP.parse. Will loop trying to decode
packets forever.
Arguments:
address - A string representing an IP of interest
port - An integer representing a port of interest
"""
sniffer = Sniffer(address=address, port=port)
parser = EAmuseProtocol()
while True:
packets = sniffer.recv_stream()
inbound = HTTP.parse(packets['inbound'], request=True)
outbound = HTTP.parse(packets['outbound'], response=True)
if inbound is not None:
if inbound['data'] is None:
in_req = None
else:
try:
in_req = parser.decode(
inbound['headers'].get('x-compress'),
inbound['headers'].get('x-eamuse-info'),
inbound['data']
)
except EAmuseException:
in_req = None
print("Inbound request (from {}:{} to {}:{}):".format(
packets['source_address'],
packets['source_port'],
packets['destination_address'],
packets['destination_port'],
))
if verbose:
print("HTTP {} request for URI {}".format(inbound['method'], inbound['uri']))
print("Compression is {}".format(inbound['headers'].get('x-compress', 'none')))
print("Encryption key is {}".format(inbound['headers'].get('x-eamuse-info', 'none')))
if in_req is None:
print("Inbound request was not parseable")
else:
print(in_req)
if outbound is not None:
if outbound['data'] is None:
out_req = None
else:
try:
out_req = parser.decode(
outbound['headers'].get('x-compress'),
outbound['headers'].get('x-eamuse-info'),
outbound['data']
)
except EAmuseException:
out_req = None
print("Outbound response (from {}:{} to {}:{}):".format(
packets['destination_address'],
packets['destination_port'],
packets['source_address'],
packets['source_port'],
))
if verbose:
print("Compression is {}".format(outbound['headers'].get('x-compress', 'none')))
print("Encryption key is {}".format(outbound['headers'].get('x-eamuse-info', 'none')))
if out_req is None:
print("Outbound response was not parseable")
else:
print(out_req)
def main() -> None:
parser = argparse.ArgumentParser(description="A utility to sniff packets and decode them as eAmusement packets. Should probably be run as root.")
parser.add_argument("-p", "--port", help="Port to sniff on. Defaults to 80", type=int, default=80)
parser.add_argument("-a", "--address", help="Address to sniff on. Defaults to all addresses", type=str, default=None)
parser.add_argument("-v", "--verbose", help="Show extra packet information", action='store_true')
args = parser.parse_args()
mainloop(address=args.address, port=args.port, verbose=args.verbose)
if __name__ == '__main__':
main()