"""This module defines the DemoParser class that handles the core functionality.
Core functionality is parsing and cleaning a csgo demo file.
Example::
from awpy.parser import DemoParser
# Create parser object
# Set log=True above if you want to produce a logfile for the parser
demo_parser = DemoParser(
demofile = "og-vs-natus-vincere-m1-dust2.dem",
demo_id = "OG-NaVi-BLAST2020",
parse_rate=128,
trade_time=5,
buy_style="hltv"
)
# Parse the demofile, output results to dictionary
data = demo_parser.parse()
https://github.com/pnxenopoulos/awpy/blob/main/examples/00_Parsing_a_CSGO_Demofile.ipynb
"""
import json
import logging
import os
import subprocess
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Literal, Unpack, get_args, overload
import pandas as pd
from pydantic import TypeAdapter, ValidationError
from awpy.types import (
BuyStyle,
FullParserArgs,
Game,
GameActionKey,
GameRound,
ParserArgs,
ParseRate,
PlayerInfo,
RoundReturnType,
)
from awpy.utils import check_go_version
if TYPE_CHECKING:
from pandas.core.arrays.base import ExtensionArray
[docs]class DemoParser:
"""DemoParser can parse, load and clean data from a CSGO demofile.
Can be instantiated without a specified demofile.
Attributes:
demofile (string): A string denoting the path to the demo file,
which ends in .dem. Defaults to ''
outpath (string): Path where to save the outputfile to.
Default is current directory
demo_id (string): A unique demo name/game id.
Default is inferred from demofile name
output_file (str): The output file name. Default is 'demoid'+".json"
parse_rate (int, optional): One of 128, 64, 32, 16, 8, 4, 2, or 1.
The lower the value, the more frames are collected.
Indicates spacing between parsed demo frames in ticks. Default is 128.
parse_frames (bool): Flag if you want to parse frames (trajectory data) or not.
Default is True
parse_kill_frames (bool): Flag if you want to parse frames on kills.
Default is False
trade_time (int, optional): Length of the window for a trade (in seconds).
Default is 5.
dmg_rolled (bool): Boolean if you want damages rolled up.
As multiple damages for a player can happen in 1 tick from the same weapon.
Default is False
parse_chat (bool): Flag if you want to parse chat messages. Default is False
buy_style (string): Buy style string, one of "hltv" or "csgo"
Default is "hltv"
json_indentation (bool): Whether the json file should be pretty printed
with indentation (larger, more readable)
or not (smaller, less human readable)
Default is False
json (dict): Dictionary containing the parsed json file
Raises:
ValueError: Raises a ValueError if the Golang version is lower than 1.18
"""
def __init__(
self,
*,
demofile: str = "",
outpath: str | None = None,
demo_id: str | None = None,
log: bool = False,
debug: bool = False,
**parser_args: Unpack[ParserArgs],
) -> None:
"""Instantiate a DemoParser.
Args:
demofile (string):
A string denoting the path to the demo file,
which ends in .dem. Defaults to ''
outpath (string):
Path where to save the outputfile to.
Default is current directory
demo_id (string):
A unique demo name/game id.
Default is inferred from demofile name
log (bool, optional):
A boolean indicating if the log should print to stdout.
Default is False
debug (bool, optional):
A boolean indicating if debug output should be used.
Default is False
**parser_args (ParserArgs): Further keyword args:
parse_rate (int, optional):
One of 128, 64, 32, 16, 8, 4, 2, or 1.
The lower the value, the more frames are collected.
Indicates spacing between parsed demo frames in ticks.
Default is 128.
parse_frames (bool, optional):
Flag if you want to parse frames (trajectory data) or not.
Default is True
parse_kill_frames (bool, optional):
Flag if you want to parse frames on kills.
Default is False
trade_time (int, optional):
Length of the window for a trade (in seconds).
Default is 5.
dmg_rolled (bool, optional):
Boolean if you want damages rolled up.
Default is False
parse_chat (bool, optional):
Flag if you want to parse chat messages. Default is False
buy_style (str, optional):
Buy style string, one of "hltv" or "csgo"
Default is "hltv"
json_indentation (bool, optional):
Whether the json file should be pretty printed
with indentation (larger, more readable)
or not (smaller, less human readable)
Default is False
"""
# Set up logger
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
self.logger = logging.getLogger("awpy")
self.logger.propagate = log
# Handle demofile and demo_id name.
# Only take the file name and remove the last extension.
self.demofile = os.path.abspath(demofile)
self.logger.info("Initialized awpy DemoParser with demofile %s", self.demofile)
self._set_demo_id(demo_id, demofile)
self.logger.info("Setting demo id to %s", self.demo_id)
if outpath is None:
outpath = os.path.abspath(os.getcwd())
else:
outpath = os.path.abspath(outpath)
self.output_file = os.path.join(outpath, self.demo_id + ".json")
self.parser_args: FullParserArgs = {
"parse_rate": 128,
"trade_time": 5,
"parse_frames": True,
"parse_kill_frames": False,
"dmg_rolled": False,
"parse_chat": False,
"json_indentation": False,
"buy_style": "hltv",
}
self.parser_args.update(parser_args)
self._check_trade_time()
self._check_parse_rate()
self._check_buy_style()
self.log_settings()
# Set parse error to False
self.parse_error = False
# Initialize json attribute as None
self._json: Game | None = None
[docs] def log_settings(self) -> None:
"""Log the settings produced in the constructor."""
self.logger.info("Rollup damages set to %s", str(self.dmg_rolled))
self.logger.info("Parse chat set to %s", str(self.parse_chat))
self.logger.info("Parse frames set to %s", str(self.parse_frames))
self.logger.info("Parse kill frames set to %s", str(self.parse_kill_frames))
self.logger.info(
"Output json indentation set to %s",
str(self.json_indentation),
)
self.logger.info("Setting trade time to %d", self.trade_time)
self.logger.info("Setting buy style to %s", str(self.buy_style))
@property
def json(self) -> Game | None:
"""Json getter.
Returns:
Game: Parsed demo information in json format
"""
return self._json
@json.setter
def json(self, new_json: Game | None) -> None:
"""Validate json shape via pydantic.
Args:
new_json (Game | None): Game dict to use.
"""
if new_json is not None:
try:
TypeAdapter(Game).validate_python(new_json)
except ValidationError as e:
# Do not always want to log the whole exception.
self.logger.error( # noqa: TRY400
"Loaded json file does not have correct fields."
" This may cause issues later."
" Enable debug output to see the differences."
)
self.logger.debug(e)
self._json = new_json
@property
def buy_style(self) -> BuyStyle:
"""buy_style getter.
Returns:
BuyStyle: Current buy_style.
"""
return self.parser_args["buy_style"]
@buy_style.setter
def buy_style(self, buy_style: BuyStyle) -> None:
"""buy_style setter.
Args:
buy_style (BuyStyle): buy_style to use.
"""
self.parser_args["buy_style"] = buy_style
@property
def json_indentation(self) -> bool:
"""json_indentation getter.
Returns:
bool: Current json_indentation.
"""
return self.parser_args["json_indentation"]
@json_indentation.setter
def json_indentation(self, json_indentation: bool) -> None:
"""json_indentation setter.
Args:
json_indentation (bool): json_indentation to use.
"""
self.parser_args["json_indentation"] = json_indentation
@property
def parse_kill_frames(self) -> bool:
"""parse_kill_frames getter.
Returns:
bool: Current parse_kill_frames.
"""
return self.parser_args["parse_kill_frames"]
@parse_kill_frames.setter
def parse_kill_frames(self, parse_kill_frames: bool) -> None:
"""parse_kill_frames setter.
Args:
parse_kill_frames (bool): parse_kill_frames to use.
"""
self.parser_args["parse_kill_frames"] = parse_kill_frames
@property
def parse_frames(self) -> bool:
"""parse_frames getter.
Returns:
bool: Current parse_frames.
"""
return self.parser_args["parse_frames"]
@parse_frames.setter
def parse_frames(self, parse_frames: bool) -> None:
"""parse_frames setter.
Args:
parse_frames (bool): parse_frames to use.
"""
self.parser_args["parse_frames"] = parse_frames
@property
def parse_chat(self) -> bool:
"""parse_chat getter.
Returns:
bool: Current parse_chat.
"""
return self.parser_args["parse_chat"]
@parse_chat.setter
def parse_chat(self, parse_chat: bool) -> None:
"""parse_chat setter.
Args:
parse_chat (bool): parse_chat to use.
"""
self.parser_args["parse_chat"] = parse_chat
@property
def dmg_rolled(self) -> bool:
"""dmg_rolled getter.
Returns:
bool: Current dmg_rolled.
"""
return self.parser_args["dmg_rolled"]
@dmg_rolled.setter
def dmg_rolled(self, dmg_rolled: bool) -> None:
"""dmg_rolled setter.
Args:
dmg_rolled (bool): dmg_rolled to use.
"""
self.parser_args["dmg_rolled"] = dmg_rolled
@property
def trade_time(self) -> int:
"""Trade time getter.
Returns:
int: Current trade time.
"""
return self.parser_args["trade_time"]
@trade_time.setter
def trade_time(self, trade_time: int) -> None:
"""Trade time setter.
Args:
trade_time (int): Trade time to use.
"""
self.parser_args["trade_time"] = trade_time
def _check_trade_time(self) -> None:
"""Check that trade time is positive and not too large."""
trade_time_upper_bound = 7
if self.trade_time <= 0:
# Handle trade time
trade_time_default = 5
self.logger.warning(
"Trade time can't be negative, setting to default value of %d seconds.",
trade_time_default,
)
self.parser_args["trade_time"] = trade_time_default
elif self.trade_time > trade_time_upper_bound:
self.logger.warning(
"Trade time of %d is rather long. Consider a value between 4-%d.",
self.trade_time,
trade_time_upper_bound,
)
@property
def parse_rate(self) -> ParseRate:
"""Parse rate getter.
Returns:
int: Current parse rate.
"""
return self.parser_args["parse_rate"]
@parse_rate.setter
def parse_rate(self, parse_rate: ParseRate) -> None:
"""Parse rate setter.
Args:
parse_rate (int): Parse rate to use.
"""
self.parser_args["parse_rate"] = parse_rate
def _check_parse_rate(self) -> None:
"""Check that parse rate is not too high or low."""
# Handle parse rate. If the parse rate is less than 64, likely to be slow
parse_rate_lower_bound = 64
parse_rate_upper_bound = 256
if not isinstance(self.parse_rate, int) or self.parse_rate < 1:
self.logger.warning(
"Parse rate of %s not acceptable! "
"Parse rate must be an integer greater than 0.",
str(self.parse_rate),
)
self.parser_args["parse_rate"] = 128
elif 1 < self.parse_rate < parse_rate_lower_bound:
self.logger.warning(
"A parse rate lower than %s may be slow depending on the tickrate "
"of the demo, which is usually 64 for MM and 128 for pro demos.",
parse_rate_lower_bound,
)
elif self.parse_rate >= parse_rate_upper_bound:
self.logger.warning(
"A high parse rate means very few frames. "
"Only use for testing purposes."
)
self.logger.info("Setting parse rate to %s", str(self.parse_rate))
def _check_buy_style(self) -> None:
"""Check that buy style is valid."""
# Handle parse rate. If the parse rate is less than 64, likely to be slow
if self.buy_style not in get_args(BuyStyle):
self.logger.warning(
"Buy style specified is not one of %s, "
"will be set to hltv by default",
get_args(BuyStyle),
)
self.parser_args["buy_style"] = "hltv"
self.logger.info("Setting buy style to %s", str(self.buy_style))
def _set_demo_id(self, demo_id: str | None, demofile: str) -> None:
"""Set demo_id.
If a demo_id was passed in that is used directly.
Otherwise the demoid is inferred from the demofile.
Args:
demo_id (str | None): Optionally demo_id passed to __init__
demofile (str): Name of the demofile
"""
if not demo_id:
self.demo_id = os.path.splitext(
os.path.basename(demofile.replace("\\", "/"))
)[0]
else:
self.demo_id = demo_id
[docs] def parse_demo(self) -> None:
"""Parse a demofile using the Go script parse_demo.go.
This function needs the .demofile to be set in the class,
and the file needs to exist.
Returns:
Outputs a JSON file to current working directory.
Raises:
ValueError: Raises a ValueError if the Golang version is lower than 1.18
FileNotFoundError: Raises a FileNotFoundError
if the demofile path does not exist.
"""
# Check if Golang version is compatible
acceptable_go = check_go_version()
if not acceptable_go:
error_message = (
"Error calling Go. "
"Check if Go is installed using 'go version'."
" Need at least v1.18.0."
)
self.logger.error(error_message)
raise ValueError(error_message)
self.logger.info("Go version>=1.18.0")
# Check if demofile exists
if not os.path.exists(os.path.abspath(self.demofile)):
msg = "Demofile path does not exist!"
self.logger.error(msg)
raise FileNotFoundError(msg)
path = os.path.join(os.path.dirname(__file__), "")
self.logger.info("Running Golang parser from %s", path)
self.logger.info("Looking for file at %s", self.demofile)
parser_cmd = [
"go",
"run",
"parse_demo.go",
"-demo",
self.demofile,
"-parserate",
str(self.parse_rate),
"-tradetime",
str(self.trade_time),
"-buystyle",
str(self.buy_style),
"-demoid",
str(self.demo_id),
"-out",
os.path.dirname(self.output_file),
]
if self.dmg_rolled:
parser_cmd.append("--dmgrolled")
if self.parse_frames:
parser_cmd.append("--parseframes")
if self.parse_kill_frames:
parser_cmd.append("--parsekillframes")
if self.json_indentation:
parser_cmd.append("--jsonindentation")
if self.parse_chat:
parser_cmd.append("--parsechat")
self.logger.debug(parser_cmd)
with subprocess.Popen(
parser_cmd, # noqa: S603
stdout=subprocess.PIPE,
cwd=path,
) as proc:
stdout = (
proc.stdout.read().splitlines() if proc.stdout is not None else None
)
if os.path.isfile(self.output_file):
self.logger.info("Wrote demo parse output to %s", self.output_file)
self.parse_error = False
else:
self.parse_error = True
self.logger.error("No file produced, error in calling Golang")
self.logger.error(stdout)
[docs] def read_json(self, json_path: str) -> Game:
"""Reads the JSON file given a JSON path.
Can be used to read in already processed demofiles.
Args:
json_path (string): Path to JSON file
Returns (Game):
JSON in Python dictionary form
Raises:
FileNotFoundError: Raises a FileNotFoundError if the JSON path doesn't exist
"""
# Check if JSON exists
if not os.path.exists(json_path):
self.logger.error("JSON path does not exist!")
msg = "JSON path does not exist!"
raise FileNotFoundError(msg)
# Read in json to .json attribute
with open(json_path, encoding="utf8") as game_data:
demo_data: Game = json.load(game_data)
self.json = demo_data
self.logger.info(
"JSON data loaded, available in the `json` attribute to parser"
)
return demo_data
@overload
def parse(
self, *, return_type: Literal["json"] = "json", clean: bool = ...
) -> Game:
...
@overload
def parse(self, *, return_type: Literal["df"], clean: bool = ...) -> dict[str, Any]:
...
[docs] def parse(
self, *, return_type: RoundReturnType = "json", clean: bool = True
) -> Game | dict[str, Any]:
"""Wrapper for parse_demo() and read_json(). Use to parse a demo.
Args:
return_type (string, optional): Either "json" or "df". Default is "json"
clean (bool, optional): True to run clean_rounds.
Otherwise, uncleaned data is returned. Defaults to True.
Returns:
A dictionary of output which
is parsed to a JSON file in the working directory.
Raises:
ValueError: Raises a ValueError if the return_type is not "json" or "df"
AttributeError: Raises an AttributeError if the .json attribute is None
"""
self.parse_demo()
self.read_json(json_path=self.output_file)
if clean:
self.clean_rounds()
if self.json:
self.logger.info("JSON output found")
if return_type == "json":
return self.json
if return_type == "df":
demo_data = self.parse_json_to_df()
self.logger.info("Returned dataframe output")
return demo_data
msg = "Parse return_type must be either 'json' or 'df'"
self.logger.error(msg)
raise ValueError(msg)
msg = "No JSON parsed! Error in producing JSON."
self.logger.error(msg)
raise AttributeError(msg)
[docs] def parse_json_to_df(self) -> dict[str, Any]:
# sourcery skip: extract-method
"""Returns JSON into dictionary where keys correspond to data frames.
Returns:
A dictionary of output
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
demo_data: dict[str, Any] = {
"matchID": self.json["matchID"],
"clientName": self.json["clientName"],
"mapName": self.json["mapName"],
"tickRate": self.json["tickRate"],
"playbackTicks": self.json["playbackTicks"],
"rounds": self._parse_rounds(),
}
# Kills
demo_data["kills"] = self._parse_action("kills")
# Damages
demo_data["damages"] = self._parse_action("damages")
# Grenades
demo_data["grenades"] = self._parse_action("grenades")
# Flashes
demo_data["flashes"] = self._parse_action("flashes")
# Weapon Fires
demo_data["weaponFires"] = self._parse_action("weaponFires")
# Bomb Events
demo_data["bombEvents"] = self._parse_action("bombEvents")
# Frames
demo_data["frames"] = self._parse_frames()
# Player Frames
demo_data["playerFrames"] = self._parse_player_frames()
self.logger.info("Returned dataframe output")
return demo_data
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
# Can not easily extract due to type checking
def _parse_frames(self) -> pd.DataFrame: # sourcery skip: extract-method
"""Returns frames as a Pandas dataframe.
Returns:
A Pandas dataframe where each row is a frame (game state) in the demo,
which is a discrete point of time.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
frames_dataframes = []
for game_round in self.json["gameRounds"] or []:
for frame in game_round["frames"] or []:
frame_item: dict[str, Any] = {"roundNum": game_round["roundNum"]}
for k in ("tick", "seconds"):
frame_item[k] = frame[k]
frame_item["ctTeamName"] = frame["ct"]["teamName"]
frame_item["ctEqVal"] = frame["ct"]["teamEqVal"]
frame_item["ctAlivePlayers"] = frame["ct"]["alivePlayers"]
frame_item["ctUtility"] = frame["ct"]["totalUtility"]
frame_item["tTeamName"] = frame["t"]["teamName"]
frame_item["tEqVal"] = frame["t"]["teamEqVal"]
frame_item["tAlivePlayers"] = frame["t"]["alivePlayers"]
frame_item["tUtility"] = frame["t"]["totalUtility"]
frames_dataframes.append(frame_item)
frames_df = pd.DataFrame(frames_dataframes)
frames_df["matchID"] = self.json["matchID"]
frames_df["mapName"] = self.json["mapName"]
return pd.DataFrame(frames_dataframes)
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
def _parse_player_frames(self) -> pd.DataFrame:
# Can not easily extract due to type checking
# sourcery skip: extract-method
"""Returns player frames as a Pandas dataframe.
Returns:
A Pandas dataframe where each row is a player's attributes
at a given frame (game state).
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
player_frames = []
for game_round in self.json["gameRounds"] or []:
for frame in game_round["frames"] or []:
for side in ("ct", "t"):
players = frame[side]["players"]
if players is None:
continue
for player in players:
player_item: dict[str, Any] = {
"roundNum": game_round["roundNum"],
"tick": frame["tick"],
"seconds": frame["seconds"],
"side": side,
"teamName": frame[side]["teamName"],
}
self.add_player_specific_information(player_item, player)
player_frames.append(player_item)
player_frames_df = pd.DataFrame(player_frames)
player_frames_df["matchID"] = self.json["matchID"]
player_frames_df["mapName"] = self.json["mapName"]
return pd.DataFrame(player_frames_df)
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
def _parse_rounds(self) -> pd.DataFrame:
"""Returns rounds as a Pandas dataframe.
Returns:
A Pandas dataframe where each row is a round
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
rounds = []
cols = (
"roundNum",
"startTick",
"freezeTimeEndTick",
"endTick",
"endOfficialTick",
"tScore",
"ctScore",
"endTScore",
"endCTScore",
"tTeam",
"ctTeam",
"winningSide",
"winningTeam",
"losingTeam",
"roundEndReason",
"ctFreezeTimeEndEqVal",
"ctRoundStartEqVal",
"ctRoundSpendMoney",
"ctBuyType",
"tFreezeTimeEndEqVal",
"tRoundStartEqVal",
"tRoundSpendMoney",
"tBuyType",
)
for game_round in self.json["gameRounds"] or []:
round_item: dict[str, Any] = {}
for k in cols:
round_item[k] = game_round[k]
round_item["matchID"] = self.json["matchID"]
round_item["mapName"] = self.json["mapName"]
rounds.append(round_item)
return pd.DataFrame(rounds)
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
def _parse_action(self, action: GameActionKey) -> pd.DataFrame:
"""Returns action as a Pandas dataframe.
Args:
action (str): Action dict to convert to dataframe.
Returns:
A Pandas dataframe where each row is a damage event.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
actions: dict[str, list] = defaultdict(list)
for game_round in self.json["gameRounds"] or []:
for game_action in game_round[action] or []:
for key, value in game_action.items():
actions[key].append(value)
actions["roundNum"].append(game_round["roundNum"])
actions["matchID"].append(self.json["matchID"])
actions["mapName"].append(self.json["mapName"])
# pd.array automatically infors nullable ints.
actions_array: dict[str, ExtensionArray] = {
key: pd.array(value_list) for key, value_list in actions.items()
}
return pd.DataFrame(actions_array)
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
@overload
def clean_rounds(
self,
*,
remove_no_frames: bool = ...,
remove_warmups: bool = ...,
remove_knifes: bool = ...,
remove_bad_timings: bool = ...,
remove_excess_players: bool = ...,
remove_excess_kills: bool = ...,
remove_bad_endings: bool = ...,
remove_bad_scoring: bool = ...,
return_type: Literal["json"] = "json",
save_to_json: bool = ...,
) -> Game:
...
@overload
def clean_rounds(
self,
*,
remove_no_frames: bool = ...,
remove_warmups: bool = ...,
remove_knifes: bool = ...,
remove_bad_timings: bool = ...,
remove_excess_players: bool = ...,
remove_excess_kills: bool = ...,
remove_bad_endings: bool = ...,
remove_bad_scoring: bool = ...,
return_type: Literal["df"] = ...,
save_to_json: bool = ...,
) -> dict[str, Any]:
...
[docs] def clean_rounds(
self,
*,
remove_no_frames: bool = True,
remove_warmups: bool = True,
remove_knifes: bool = True,
remove_bad_timings: bool = True,
remove_excess_players: bool = True,
remove_excess_kills: bool = True,
remove_bad_endings: bool = True,
remove_bad_scoring: bool = True,
return_type: RoundReturnType = "json",
save_to_json: bool = True,
) -> Game | dict[str, Any]:
"""Cleans a parsed demofile JSON.
Args:
remove_no_frames (bool, optional): Remove rounds where there are no frames.
Default to True.
remove_warmups (bool, optional): Remove warmup rounds. Defaults to True.
remove_knifes (bool, optional): Remove knife rounds. Defaults to True.
remove_bad_timings (bool, optional): Remove bad timings. Defaults to True.
remove_excess_players (bool, optional):
Remove rounds with more than 5 players. Defaults to True.
remove_excess_kills (bool, optional): Remove rounds with more than 10 kills.
Defaults to True.
remove_bad_endings (bool, optional):
Remove rounds with bad round end reasons. Defaults to True.
remove_bad_scoring (bool, optional): Remove rounds where the scoring is off
Like scores going below the previous round's. Defaults to False.
return_type (str, optional): Return JSON or DataFrame. Defaults to "json".
save_to_json (bool, optional): Whether to write the JSON to a file.
Defaults to True.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
ValueError: Raises a ValueError if the return type is neither
'json' nor 'df'
Returns:
dict: A dictionary of the cleaned demo.
"""
if self.json:
if remove_no_frames:
self.remove_rounds_with_no_frames()
if remove_warmups:
self.remove_warmups()
if remove_knifes:
self.remove_knife_rounds()
if remove_bad_timings:
self.remove_time_rounds()
if remove_excess_players:
self.remove_excess_players()
if remove_excess_kills:
self.remove_excess_kill_rounds()
if remove_bad_endings:
self.remove_end_round()
if remove_bad_scoring:
self.remove_bad_scoring()
self.renumber_rounds()
self.renumber_frames()
# self.rescore_rounds() -- Need to edit to take into account half switches
if save_to_json:
self.write_json()
if return_type == "json":
return self.json
if return_type == "df":
demo_data = self.parse_json_to_df()
self.logger.info("Returned cleaned dataframe output")
return demo_data
msg = f"Invalid return_type of {return_type}. Use 'json' or 'df' instead!"
raise ValueError(msg)
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def write_json(self) -> None:
"""Rewrite the JSON file."""
with open(self.output_file, "w", encoding="utf8") as file_path:
json.dump(
self.json, file_path, indent=(1 if self.json_indentation else None)
)
[docs] def renumber_rounds(self) -> None:
"""Renumbers the rounds.
Raises:
AttributeError: Raises an AttributeError if the .json attribute
has no "gameRounds" key.
"""
if self.json:
if self.json["gameRounds"] is None:
return
for i, _r in enumerate(self.json["gameRounds"]):
self.json["gameRounds"][i]["roundNum"] = i + 1
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def renumber_frames(self) -> None:
"""Renumbers the frames.
Needed since cleaning can remove frames and cause
some indices to be skipped.
Raises:
AttributeError: Raises an AttributeError if the .json attribute
has no "gameRounds" key.
"""
if self.json:
if self.json["gameRounds"] is None:
return
for index, frame in enumerate(
frame
for game_round in self.json["gameRounds"]
for frame in game_round["frames"] or []
):
frame["globalFrameID"] = index
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def rescore_rounds(self) -> None:
"""Rescore the rounds based on round end reason.
Raises:
AttributeError: Raises an AttributeError if the .json attribute
has no "gameRounds" key.
"""
if self.json:
if self.json["gameRounds"] is None:
return
for i, _r in enumerate(self.json["gameRounds"]):
if i == 0:
self.json["gameRounds"][i]["tScore"] = 0
self.json["gameRounds"][i]["ctScore"] = 0
if self.json["gameRounds"][i]["winningSide"] == "ct":
self.json["gameRounds"][i]["endCTScore"] = 1
self.json["gameRounds"][i]["endTScore"] = 0
if self.json["gameRounds"][i]["winningSide"] == "t":
self.json["gameRounds"][i]["endCTScore"] = 0
self.json["gameRounds"][i]["endTScore"] = 1
elif i > 0:
self.json["gameRounds"][i]["tScore"] = self.json["gameRounds"][
i - 1
]["endTScore"]
self.json["gameRounds"][i]["ctScore"] = self.json["gameRounds"][
i - 1
]["endCTScore"]
if self.json["gameRounds"][i]["winningSide"] == "ct":
self.json["gameRounds"][i]["endCTScore"] = (
self.json["gameRounds"][i]["ctScore"] + 1
)
self.json["gameRounds"][i]["endTScore"] = self.json[
"gameRounds"
][i]["tScore"]
if self.json["gameRounds"][i]["winningSide"] == "t":
self.json["gameRounds"][i]["endCTScore"] = self.json[
"gameRounds"
][i]["ctScore"]
self.json["gameRounds"][i]["endTScore"] = (
self.json["gameRounds"][i]["tScore"] + 1
)
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
def _has_winner_and_not_winner(self, game_round: GameRound) -> bool:
tie_score = 15
ot_tie_score = 3
regular_valid_t_win = (game_round["endTScore"] == tie_score + 1) and (
game_round["endCTScore"] < tie_score
)
regular_valid_ct_win = (
game_round["endCTScore"] == tie_score + 1
and game_round["endTScore"] < tie_score
)
# OT draw scores are of the type
# 15 + 3xN with N a natural number(1, 2, 3, ...)
# So 18, 21, 24, 27
# Wins are 1 higher
# So 19, 22, 25, 28
# So if you subtract 15 + 1 from an OT winning round
# the number is divisible by 3
ot_valid_ct_win = (
(game_round["endCTScore"] - tie_score - 1) % ot_tie_score == 0
# Difference of two needed for a win. e.g 19-17
# Difference of one means that it was 18-18 and went to another
# overtime e.g. 19-18 and thus is not a win for one side yet.
and game_round["endTScore"] < (game_round["endCTScore"] - 1)
and game_round["endCTScore"] > tie_score
)
ot_valid_t_win = (
(game_round["endTScore"] - tie_score - 1) % ot_tie_score == 0
and game_round["endCTScore"] < (game_round["endTScore"] - 1)
and game_round["endTScore"] > tie_score
)
return (
regular_valid_ct_win
or regular_valid_t_win
or ot_valid_ct_win
or ot_valid_t_win
)
[docs] def remove_bad_scoring(self) -> None:
"""Removes rounds where the scoring is bad.
We loop through the rounds:
If the round ahead has equal or less score, we do not add the current round.
If the round ahead has +1 score, we add the current round
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
if self.json["gameRounds"] is None:
return
cleaned_rounds = []
for i, game_round in enumerate(self.json["gameRounds"]):
current_round_total = (
game_round["tScore"]
+ game_round["endTScore"]
+ game_round["ctScore"]
+ game_round["endCTScore"]
)
lookback_round = self.json["gameRounds"][i - 1]
lookback_round_total = (
lookback_round["tScore"]
+ lookback_round["endTScore"]
+ lookback_round["ctScore"]
+ lookback_round["endCTScore"]
)
# Last round just have to have a higher score than the previous
if (
i == (len(self.json["gameRounds"]) - 1)
and current_round_total > lookback_round_total
):
cleaned_rounds.append(game_round)
# Other rounds have more criteria
# They need to have a lower score than the next round
# Or when they are the round that causes the game to end in a win
# Then they just need more rounds than the previous.
# Rounds after are likely not real anymore.
elif i < (len(self.json["gameRounds"]) - 1):
lookahead_round = self.json["gameRounds"][i + 1]
lookahead_round_total = (
lookahead_round["tScore"]
+ lookahead_round["endTScore"]
+ lookahead_round["ctScore"]
+ lookahead_round["endCTScore"]
)
if (lookahead_round_total > current_round_total) or (
self._has_winner_and_not_winner(game_round)
and current_round_total > lookback_round_total
):
cleaned_rounds.append(game_round)
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def remove_rounds_with_no_frames(self) -> None:
"""Removes rounds with no frames.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
if not self.parse_frames:
self.logger.warning(
"parse_frames is set to False, "
"must be true for remove_no_frames to work. "
"Skipping remove_no_frames."
)
else:
cleaned_rounds = [
game_round
for game_round in self.json["gameRounds"] or []
if len(game_round["frames"] or []) > 0
]
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def remove_excess_players(self) -> None:
"""Removes rounds where there are more than 5 players on a side.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
if not self.parse_frames:
self.logger.warning(
"parse_frames is set to False, "
"must be true for remove_excess_players to work. "
"Skipping remove_excess_players."
)
else:
cleaned_rounds = []
# Remove rounds where the number of players is too large
n_players = 5
for game_round in self.json["gameRounds"] or []:
if not game_round["frames"]:
continue
game_frame = game_round["frames"][0]
player_lists = (
game_frame["t"]["players"],
game_frame["ct"]["players"],
)
# Remove if any side has > 5 players
# Remove if both sides are None
if all(
len(player_list or []) <= n_players
for player_list in player_lists
) and any(player_list is not None for player_list in player_lists):
cleaned_rounds.append(game_round)
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def remove_warmups(self) -> None:
"""Removes warmup rounds.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
cleaned_rounds = [
game_round
for game_round in self.json["gameRounds"] or []
if not game_round["isWarmup"]
]
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def remove_end_round(self, bad_endings: list[str] | None = None) -> None:
"""Removes rounds with bad end reason.
Args:
bad_endings (list, optional): List of bad round end reasons.
Defaults to ["Draw", "Unknown", ""].
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if bad_endings is None:
bad_endings = ["Draw", "Unknown", ""]
if self.json:
if (game_rounds := self.json["gameRounds"]) is None:
return
cleaned_rounds = [
game_round
for game_round in game_rounds
if game_round["roundEndReason"] not in bad_endings
]
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def remove_knife_rounds(self) -> None:
"""Removes knife rounds.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
if (game_rounds := self.json["gameRounds"]) is None:
return
cleaned_rounds = []
for game_round in game_rounds:
if game_round["isWarmup"]:
continue
if (kill_actions := game_round["kills"]) is None:
continue
total_kills = len(kill_actions)
total_knife_kills = sum(
1 for k in kill_actions if k["weapon"] == "Knife"
)
if (total_knife_kills != total_kills) | (total_knife_kills == 0):
cleaned_rounds.append(game_round)
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def remove_excess_kill_rounds(self) -> None:
"""Removes rounds with more than 10 kills.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
n_total_players = 10
cleaned_rounds = [
game_round
for game_round in self.json["gameRounds"] or []
if (
not game_round["isWarmup"]
and len(game_round["kills"] or []) <= n_total_players
)
]
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)
[docs] def remove_time_rounds(self) -> None:
"""Remove rounds with odd round timings.
Raises:
AttributeError: Raises an AttributeError if the .json attribute is None
"""
if self.json:
cleaned_rounds = [
game_round
for game_round in self.json["gameRounds"] or []
if (
(game_round["startTick"] <= game_round["endTick"])
and (game_round["startTick"] <= game_round["endOfficialTick"])
and (game_round["startTick"] <= game_round["freezeTimeEndTick"])
)
]
self.json["gameRounds"] = cleaned_rounds
else:
msg = "JSON not found. Run .parse() or .read_json() if JSON already exists"
self.logger.error(msg)
raise AttributeError(msg)