Source code for awpy.visibility

"""Module for calculating visibility.

Reference: https://github.com/AtomicBool/cs2-map-parser
"""

from __future__ import annotations

import pathlib
import struct
from dataclasses import dataclass
from typing import Literal, overload

from loguru import logger

import awpy.vector


@dataclass
class Triangle:
    """A triangle in 3D space defined by three vertices.

    Attributes:
        p1: First vertex of the triangle.
        p2: Second vertex of the triangle.
        p3: Third vertex of the triangle.
    """

    p1: awpy.vector.Vector3
    p2: awpy.vector.Vector3
    p3: awpy.vector.Vector3

    def get_centroid(self) -> awpy.vector.Vector3:
        """Calculate the centroid of the triangle.

        Returns:
            awpy.vector.Vector3: Centroid of the triangle.
        """
        return awpy.vector.Vector3(
            (self.p1.x + self.p2.x + self.p3.x) / 3,
            (self.p1.y + self.p2.y + self.p3.y) / 3,
            (self.p1.z + self.p2.z + self.p3.z) / 3,
        )


@dataclass
class Edge:
    """An edge in a triangulated mesh.

    Attributes:
        next: Index of the next edge in the face.
        twin: Index of the twin edge in the adjacent face.
        origin: Index of the vertex where this edge starts.
        face: Index of the face this edge belongs to.
    """

    next: int
    twin: int
    origin: int
    face: int


[docs] class KV3Parser: """Parser for KV3 format files used in Source 2 engine. This class provides functionality to parse KV3 files, which are used to store various game data including physics collision meshes. Attributes: content: Raw content of the KV3 file. index: Current parsing position in the content. parsed_data: Resulting parsed data structure. """ def __init__(self) -> None: """Initialize a new KV3Parser instance.""" self.content = "" self.index = 0 self.parsed_data = None
[docs] def parse(self, content: str) -> None: """Parse the given KV3 content string. Args: content: String containing KV3 formatted data. """ self.content = content self.index = 0 self._skip_until_first_bracket() self.parsed_data = self._parse_value()
[docs] def get_value(self, path: str) -> str: """Get a value from the parsed data using a dot-separated path. Args: path: Dot-separated path to the desired value, e.g., "section.subsection[0].value" Returns: String value at the specified path, or empty string if not found. """ if not self.parsed_data: return "" current = self.parsed_data for segment in path.split("."): key = segment array_index = None if "[" in segment: key = segment[: segment.find("[")] array_index = int(segment[segment.find("[") + 1 : segment.find("]")]) if isinstance(current, dict) and key in current: current = current[key] else: return "" if array_index is not None: if isinstance(current, list) and array_index < len(current): current = current[array_index] else: return "" return current if isinstance(current, str) else ""
def _skip_until_first_bracket(self) -> None: """Skip content until the first opening bracket is found.""" while self.index < len(self.content) and self.content[self.index] != "{": self.index = self.content.find("\n", self.index) + 1 def _skip_whitespace(self) -> None: """Skip all whitespace characters at the current position.""" while self.index < len(self.content) and self.content[self.index].isspace(): self.index += 1 def _parse_value(self) -> dict | list | str | None: """Parse a value from the current position. Returns: Parsed value which can be a dictionary, list, or string, or None if parsing fails. """ self._skip_whitespace() if self.index >= len(self.content): return None char = self.content[self.index] if char == "{": return self._parse_object() if char == "[": return self._parse_array() if char == "#" and self.index + 1 < len(self.content) and self.content[self.index + 1] == "[": self.index += 1 return self._parse_byte_array() return self._parse_string() def _parse_object(self) -> dict: """Parse a KV3 object starting at the current position. Returns: Dictionary containing the parsed key-value pairs. """ self.index += 1 # Skip { obj = {} while self.index < len(self.content): self._skip_whitespace() if self.content[self.index] == "}": self.index += 1 return obj key = self._parse_string() self._skip_whitespace() if self.content[self.index] == "=": self.index += 1 value = self._parse_value() if key and value is not None: obj[key] = value self._skip_whitespace() if self.content[self.index] == ",": self.index += 1 return obj def _parse_array(self) -> list: """Parse a KV3 array starting at the current position. Returns: List containing the parsed values. """ self.index += 1 # Skip [ arr = [] while self.index < len(self.content): self._skip_whitespace() if self.content[self.index] == "]": self.index += 1 return arr value = self._parse_value() if value is not None: arr.append(value) self._skip_whitespace() if self.content[self.index] == ",": self.index += 1 return arr def _parse_byte_array(self) -> str: """Parse a KV3 byte array starting at the current position. Returns: Space-separated string of byte values. """ self.index += 1 # Skip [ start = self.index while self.index < len(self.content) and self.content[self.index] != "]": self.index += 1 byte_str = self.content[start : self.index].strip() self.index += 1 # Skip ] return " ".join(byte_str.split()) def _parse_string(self) -> str: """Parse a string value at the current position. Returns: Parsed string value. """ start = self.index while self.index < len(self.content): char = self.content[self.index] if char in "={}[], \n": break self.index += 1 return self.content[start : self.index].strip()
[docs] class VphysParser: """Parser for VPhys collision files. This class extracts and processes collision geometry data from VPhys files, converting it into a set of triangles. Attributes: vphys_file (Path): Path to the VPhys file. triangles (list[Triangle]): List of parsed triangles from the VPhys file. kv3_parser (KV3Parser): Helper parser for extracting key-value data from the .vphys file. """ def __init__(self, vphys_file: str | pathlib.Path) -> None: """Initializes the parser with the path to a VPhys file. Args: vphys_file (str | pathlib.Path): Path to the VPhys file to parse. """ self.vphys_file = pathlib.Path(vphys_file) self.triangles: list[Triangle] = [] self.kv3_parser = KV3Parser() self.parse() @overload @staticmethod def bytes_to_vec(byte_str: str, element_type: Literal["uint8", "int32"]) -> list[int]: ... @overload @staticmethod def bytes_to_vec(byte_str: str, element_type: Literal["float"]) -> list[float]: ...
[docs] @staticmethod def bytes_to_vec(byte_str: str, element_type: Literal["uint8", "int32", "float"]) -> list[int] | list[float]: """Converts a space-separated string of byte values into a list of numbers. Args: byte_str (str): Space-separated string of hexadecimal byte values. element_type (int): Types represented by the bytes (uint8, int32, float). Returns: list[int | float]: List of converted values (integers for uint8, floats for size 4). """ bytes_list = [int(b, 16) for b in byte_str.split()] result = [] if element_type == "uint8": return bytes_list element_size = 4 # For int and float # Convert bytes to appropriate type based on size for i in range(0, len(bytes_list), element_size): chunk = bytes(bytes_list[i : i + element_size]) if element_type == "float": # float val = struct.unpack("f", chunk)[0] result.append(val) else: # int32 val = struct.unpack("i", chunk)[0] result.append(val) return result
[docs] def get_collision_attribute_indices_for_default_group(self) -> list[str]: """Get collision attribute indices for the default group. Returns: list[int]: List of collision attribute indices for the default group. """ collision_attribute_indices = [] idx = 0 while True: collision_group_string = self.kv3_parser.get_value(f"m_collisionAttributes[{idx}].m_CollisionGroupString") if not collision_group_string: break if collision_group_string.lower() == '"default"': collision_attribute_indices.append(str(idx)) idx += 1 return collision_attribute_indices
[docs] def parse(self) -> None: """Parses the VPhys file and extracts collision geometry. Processes hulls and meshes in the VPhys file to generate a list of triangles. """ if len(self.triangles) > 0: logger.debug(f"VPhys data already parsed, got {len(self.triangles)} triangles.") return logger.debug(f"Parsing vphys file: {self.vphys_file}") # Read file with open(self.vphys_file) as f: data = f.read() # Parse VPhys data self.kv3_parser.parse(data) collision_attribute_indices = self.get_collision_attribute_indices_for_default_group() logger.debug(f"Extracted collision attribute indices: {collision_attribute_indices}") # Process hulls hull_idx = 0 hull_count = 0 while True: if hull_idx % 1000 == 0: logger.debug(f"Processing hull {hull_idx}...") collision_idx = self.kv3_parser.get_value( f"m_parts[0].m_rnShape.m_hulls[{hull_idx}].m_nCollisionAttributeIndex" ) if not collision_idx: break if collision_idx in collision_attribute_indices: # Get vertices vertex_str = self.kv3_parser.get_value( f"m_parts[0].m_rnShape.m_hulls[{hull_idx}].m_Hull.m_VertexPositions" ) if not vertex_str: vertex_str = self.kv3_parser.get_value( f"m_parts[0].m_rnShape.m_hulls[{hull_idx}].m_Hull.m_Vertices" ) vertex_data = self.bytes_to_vec(vertex_str, "float") vertices = [ awpy.vector.Vector3(vertex_data[i], vertex_data[i + 1], vertex_data[i + 2]) for i in range(0, len(vertex_data), 3) ] # Get faces and edges faces = self.bytes_to_vec( self.kv3_parser.get_value(f"m_parts[0].m_rnShape.m_hulls[{hull_idx}].m_Hull.m_Faces"), "uint8", ) edge_data = self.bytes_to_vec( self.kv3_parser.get_value(f"m_parts[0].m_rnShape.m_hulls[{hull_idx}].m_Hull.m_Edges"), "uint8", ) edges = [ Edge( edge_data[i], edge_data[i + 1], edge_data[i + 2], edge_data[i + 3], ) for i in range(0, len(edge_data), 4) ] # Process triangles for start_edge in faces: edge = edges[start_edge].next while edge != start_edge: next_edge = edges[edge].next self.triangles.append( Triangle( vertices[edges[start_edge].origin], vertices[edges[edge].origin], vertices[edges[next_edge].origin], ) ) edge = next_edge hull_count += 1 hull_idx += 1 # Process meshes mesh_idx = 0 mesh_count = 0 while True: logger.debug(f"Processing mesh {mesh_idx}...") collision_idx = self.kv3_parser.get_value( f"m_parts[0].m_rnShape.m_meshes[{mesh_idx}].m_nCollisionAttributeIndex" ) if not collision_idx: break if collision_idx in collision_attribute_indices: # Get triangles and vertices tri_data = self.bytes_to_vec( self.kv3_parser.get_value(f"m_parts[0].m_rnShape.m_meshes[{mesh_idx}].m_Mesh.m_Triangles"), "int32", ) vertex_data = self.bytes_to_vec( self.kv3_parser.get_value(f"m_parts[0].m_rnShape.m_meshes[{mesh_idx}].m_Mesh.m_Vertices"), "float", ) vertices = [ awpy.vector.Vector3(vertex_data[i], vertex_data[i + 1], vertex_data[i + 2]) for i in range(0, len(vertex_data), 3) ] for i in range(0, len(tri_data), 3): self.triangles.append( Triangle( vertices[int(tri_data[i])], vertices[int(tri_data[i + 1])], vertices[int(tri_data[i + 2])], ) ) mesh_count += 1 mesh_idx += 1
[docs] def to_tri(self, path: str | pathlib.Path | None) -> None: """Export parsed triangles to a .tri file. Args: path: Path to the output .tri file. """ if not path: path = self.vphys_file.with_suffix(".tri") outpath = pathlib.Path(path) logger.debug(f"Exporting {len(self.triangles)} triangles to {outpath}") with open(outpath, "wb") as f: for triangle in self.triangles: # Write all awpy.vector.Vector3 components as float32 f.write(struct.pack("f", triangle.p1.x)) f.write(struct.pack("f", triangle.p1.y)) f.write(struct.pack("f", triangle.p1.z)) f.write(struct.pack("f", triangle.p2.x)) f.write(struct.pack("f", triangle.p2.y)) f.write(struct.pack("f", triangle.p2.z)) f.write(struct.pack("f", triangle.p3.x)) f.write(struct.pack("f", triangle.p3.y)) f.write(struct.pack("f", triangle.p3.z)) logger.success(f"Processed {len(self.triangles)} triangles from {self.vphys_file} -> {outpath}")
[docs] class AABB: """Axis-Aligned Bounding Box for efficient collision detection.""" def __init__(self, min_point: awpy.vector.Vector3, max_point: awpy.vector.Vector3) -> None: """Initialize the AABB with minimum and maximum points. Args: min_point (awpy.vector.Vector3): Minimum point of the AABB. max_point (awpy.vector.Vector3): Maximum point of the AABB. """ self.min_point = min_point self.max_point = max_point
[docs] @classmethod def from_triangle(cls, triangle: Triangle) -> AABB: """Create an AABB from a triangle. Args: triangle (Triangle): Triangle to create the AABB from. Returns: AABB: Axis-Aligned Bounding Box encompassing the triangle. """ min_point = awpy.vector.Vector3( min(triangle.p1.x, triangle.p2.x, triangle.p3.x), min(triangle.p1.y, triangle.p2.y, triangle.p3.y), min(triangle.p1.z, triangle.p2.z, triangle.p3.z), ) max_point = awpy.vector.Vector3( max(triangle.p1.x, triangle.p2.x, triangle.p3.x), max(triangle.p1.y, triangle.p2.y, triangle.p3.y), max(triangle.p1.z, triangle.p2.z, triangle.p3.z), ) return cls(min_point, max_point)
[docs] def intersects_ray(self, ray_origin: awpy.vector.Vector3, ray_direction: awpy.vector.Vector3) -> bool: """Check if a ray intersects with the AABB. Args: ray_origin (awpy.vector.Vector3): Ray origin point. ray_direction (awpy.vector.Vector3): Ray direction vector. Returns: bool: True if the ray intersects with the AABB, False otherwise. """ epsilon = 1e-6 def check_axis(origin: float, direction: float, min_val: float, max_val: float) -> tuple[float, float]: if abs(direction) < epsilon: if origin < min_val or origin > max_val: return float("inf"), float("-inf") return float("-inf"), float("inf") t1 = (min_val - origin) / direction t2 = (max_val - origin) / direction return (min(t1, t2), max(t1, t2)) tx_min, tx_max = check_axis(ray_origin.x, ray_direction.x, self.min_point.x, self.max_point.x) ty_min, ty_max = check_axis(ray_origin.y, ray_direction.y, self.min_point.y, self.max_point.y) tz_min, tz_max = check_axis(ray_origin.z, ray_direction.z, self.min_point.z, self.max_point.z) t_enter = max(tx_min, ty_min, tz_min) t_exit = min(tx_max, ty_max, tz_max) return t_enter <= t_exit and t_exit >= 0
[docs] class BVHNode: """Node in the Bounding Volume Hierarchy tree.""" def __init__( self, aabb: AABB, triangle: Triangle | None = None, left: BVHNode | None = None, right: BVHNode | None = None, ) -> None: """Initialize a BVHNode with an AABB and optional triangle and children. Args: aabb (AABB): Axis-Aligned Bounding Box of the node. triangle (Triangle | None, optional): Triangle contained in the node. Defaults to None. left (BVHNode | None, optional): Left child node. Defaults to None. right (BVHNode | None, optional): Right child node. Defaults to None. """ self.aabb = aabb self.triangle = triangle self.left = left self.right = right
[docs] class VisibilityChecker: """Class for visibility checking in 3D space using a BVH structure.""" def __init__(self, path: pathlib.Path | None = None, triangles: list[Triangle] | None = None) -> None: """Initialize the visibility checker with a list of triangles. Args: path (pathlib.Path | None, optional): Path to a .tri file to read triangles from. triangles (list[Triangle] | None, optional): List of triangles to build the BVH from. """ if path is not None: triangles = self.read_tri_file(path) self.n_triangles = len(triangles) self.root = self._build_bvh(triangles) def __repr__(self) -> str: """Return a string representation of the VisibilityChecker.""" return f"VisibilityChecker(n_triangles={self.n_triangles})" def _build_bvh(self, triangles: list[Triangle]) -> BVHNode: """Build a BVH tree from a list of triangles. Args: triangles (list[Triangle]): List of triangles to build the BVH from. Returns: BVHNode: Root node of the BVH tree. """ if len(triangles) == 1: return BVHNode(AABB.from_triangle(triangles[0]), triangle=triangles[0]) # Calculate centroids and find split axis centroids = [t.get_centroid() for t in triangles] # Find the axis with the largest spread min_x = min(c.x for c in centroids) max_x = max(c.x for c in centroids) min_y = min(c.y for c in centroids) max_y = max(c.y for c in centroids) min_z = min(c.z for c in centroids) max_z = max(c.z for c in centroids) x_spread = max_x - min_x y_spread = max_y - min_y z_spread = max_z - min_z # Choose split axis if x_spread >= y_spread and x_spread >= z_spread: axis = 0 # x-axis elif y_spread >= z_spread: axis = 1 # y-axis else: axis = 2 # z-axis # Sort triangles based on centroid position triangles = sorted( triangles, key=lambda t: ( t.get_centroid().x if axis == 0 else t.get_centroid().y if axis == 1 else t.get_centroid().z ), ) # Split triangles into two groups mid = len(triangles) // 2 left = self._build_bvh(triangles[:mid]) right = self._build_bvh(triangles[mid:]) # Create encompassing AABB min_point = awpy.vector.Vector3( min(left.aabb.min_point.x, right.aabb.min_point.x), min(left.aabb.min_point.y, right.aabb.min_point.y), min(left.aabb.min_point.z, right.aabb.min_point.z), ) max_point = awpy.vector.Vector3( max(left.aabb.max_point.x, right.aabb.max_point.x), max(left.aabb.max_point.y, right.aabb.max_point.y), max(left.aabb.max_point.z, right.aabb.max_point.z), ) return BVHNode(AABB(min_point, max_point), left=left, right=right) def _ray_triangle_intersection( self, ray_origin: awpy.vector.Vector3, ray_direction: awpy.vector.Vector3, triangle: Triangle, ) -> float | None: """Check if a ray intersects with a triangle. Args: ray_origin (awpy.vector.Vector3): Ray origin point. ray_direction (awpy.vector.Vector3): Ray direction vector. triangle (Triangle): Triangle to check intersection with. Returns: float | None: Distance to the intersection point, or None if no intersection. """ epsilon = 1e-6 edge1 = triangle.p2 - triangle.p1 edge2 = triangle.p3 - triangle.p1 h = ray_direction.cross(edge2) a = edge1.dot(h) if -epsilon < a < epsilon: return None f = 1.0 / a s = ray_origin - triangle.p1 u = f * s.dot(h) if u < 0.0 or u > 1.0: return None q = s.cross(edge1) v = f * ray_direction.dot(q) if v < 0.0 or u + v > 1.0: return None t = f * edge2.dot(q) if t > epsilon: return t return None def _traverse_bvh( self, node: BVHNode, ray_origin: awpy.vector.Vector3, ray_direction: awpy.vector.Vector3, max_distance: float, ) -> bool: """Traverse the BVH tree to check for ray-triangle intersections. Args: node (BVHNode): Current node in the BVH tree. ray_origin (awpy.vector.Vector3): Ray origin point. ray_direction (awpy.vector.Vector3): Ray direction vector. max_distance (float): Maximum distance to check for intersections. Returns: bool: True if an intersection is found, False otherwise. """ if not node.aabb.intersects_ray(ray_origin, ray_direction): return False # Leaf node - check triangle intersection if node.triangle: t = self._ray_triangle_intersection(ray_origin, ray_direction, node.triangle) return bool(t is not None and t <= max_distance) # Internal node - recurse through children return self._traverse_bvh(node.left, ray_origin, ray_direction, max_distance) or self._traverse_bvh( node.right, ray_origin, ray_direction, max_distance )
[docs] def is_visible( self, start: awpy.vector.Vector3 | tuple | list, end: awpy.vector.Vector3 | tuple | list, ) -> bool: """Check if a line segment is visible in the 3D space. Args: start (awpy.vector.Vector3 | tuple | list): Start point of the line segment. end (awpy.vector.Vector3 | tuple | list): End point of the line segment. Returns: bool: True if the line segment is visible, False otherwise. """ start_vec = awpy.vector.Vector3.from_input(start) end_vec = awpy.vector.Vector3.from_input(end) # Calculate ray direction and length direction = end_vec - start_vec distance = direction.length() if distance < 1e-6: return True direction = direction.normalize() # Check for intersections return not self._traverse_bvh(self.root, start_vec, direction, distance)
[docs] @staticmethod def read_tri_file(tri_file: str | pathlib.Path, buffer_size: int = 1000) -> list[Triangle]: """Read triangles from a .tri file.""" tri_file = pathlib.Path(tri_file) triangles: list[Triangle] = [] with open(tri_file, "rb") as f: chunk_size = buffer_size * 9 * 4 while True: data = f.read(chunk_size) if not data: break num_floats = len(data) // 4 num_complete_triangles = num_floats // 9 for i in range(num_complete_triangles): offset = i * 36 values = struct.unpack("9f", data[offset : offset + 36]) triangles.append( Triangle( awpy.vector.Vector3(values[0], values[1], values[2]), awpy.vector.Vector3(values[3], values[4], values[5]), awpy.vector.Vector3(values[6], values[7], values[8]), ) ) return triangles