Skip to content
Prev Previous commit
Next Next commit
Clean functions
  • Loading branch information
Mihir Thalanki authored and Mihir Thalanki committed Sep 3, 2024
commit 2121dda611e9180f19eb9f3a8141950ccbe41628
5 changes: 3 additions & 2 deletions unravel/soccer/graphs/graph_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from ...utils.features.node_feature_set import NodeFeatureSet


@dataclass(repr=True)
class GraphConverter:
"""
Expand Down Expand Up @@ -107,7 +108,7 @@ class GraphConverter:
settings: GraphSettings = field(
init=False, repr=False, default_factory=GraphSettings
)

node_features: NodeFeatureSet = field(
init=False, repr=False, default_factory=NodeFeatureSet
)
Expand Down Expand Up @@ -271,7 +272,7 @@ def to_graph_frames(self) -> dict:
label=label,
graph_id=graph_id,
settings=self.settings,
node_features = self.node_features
node_features=self.node_features,
)
if gnn_frame.graph_data:
self.graph_frames.append(gnn_frame)
Expand Down
172 changes: 124 additions & 48 deletions unravel/utils/features/node_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

from .utils import (
normalize_coords,
coord,
unit_vector,
normalize_speed,
normalize_angles,
normalize_distance
normalize_distance,
)


Expand All @@ -24,96 +23,173 @@ def add_x(self, normed: bool = False):
If 'normed=True', the function will normalize the x coordinate
"""
if normed:
self.node_feature_functions.append(('normalize_x', normalize_coords, ['x', 'max_x']))
self.node_feature_functions.append(
("normalize_x", normalize_coords, ["x", "max_x"])
)
else:
self.node_feature_functions.append(('coord_x', lambda x: x, ['x']))
self.node_feature_functions.append(("coord_x", lambda x: x, ["x"]))

return self

def add_y(self, normed: bool = False):
"""
Adds a function to calculate the x coordinate to the node feature set.
If 'normed=True', the function will normalize the x coordinate
"""
if normed:
self.node_feature_functions.append(('normalize_y', normalize_coords, ['y', 'max_y']))
self.node_feature_functions.append(
("normalize_y", normalize_coords, ["y", "max_y"])
)
else:
self.node_feature_functions.append(('coord_y', lambda y: y, ['y']))
self.node_feature_functions.append(("coord_y", lambda y: y, ["y"]))

return self

def add_velocity(self, x: bool = True, y: bool = True, angle: bool = True, normed: bool = False):

def add_velocity(
self, x: bool = True, y: bool = True, angle: bool = True, normed: bool = False
):
if not (x or y):
print("Warning: No velocity component added. Please add either x or y components")
print(
"Warning: No velocity component added. Please add either x or y components"
)
return
if x:
self.node_feature_functions.append(('unit_velocity_x', lambda velocity: unit_vector(velocity)[0], ['velocity']))
self.node_feature_functions.append(
(
"unit_velocity_x",
lambda velocity: unit_vector(velocity)[0],
["velocity"],
)
)
if y:
self.node_feature_functions.append(('unit_velocity_y', lambda velocity: unit_vector(velocity)[1], ['velocity']))

self.node_feature_functions.append(
(
"unit_velocity_y",
lambda velocity: unit_vector(velocity)[1],
["velocity"],
)
)

if angle:
if not (x and y):
print("Warning: Angle cannot be calculated because either x or y component was not computed")
print(
"Warning: Angle cannot be calculated because either x or y component was not computed"
)
else:
if normed:
self.node_feature_functions.append(('normalized_velocity_angle', lambda velocity: normalize_angles(np.arctan2(velocity[1], velocity[0])), ['velocity']))
self.node_feature_functions.append(
(
"normalized_velocity_angle",
lambda velocity: normalize_angles(
np.arctan2(velocity[1], velocity[0])
),
["velocity"],
)
)
else:
self.node_feature_functions.append(('velocity_angle', lambda velocity: np.arctan2(velocity[1], velocity[0]), ['velocity']))
self.node_feature_functions.append(
(
"velocity_angle",
lambda velocity: np.arctan2(velocity[1], velocity[0]),
["velocity"],
)
)
return self

def add_speed(self, normed: bool = True):
if normed:
self.node_feature_functions.append(('normalized_speed', normalize_speed, ['speed', 'max_speed'])) #Have to round this
self.node_feature_functions.append(
("normalized_speed", normalize_speed, ["speed", "max_speed"])
) # Have to round this
else:
self.node_feature_functions.append(('speed', lambda s: s, ['speed']))
self.node_feature_functions.append(("speed", lambda s: s, ["speed"]))
return self

def add_goal_distance(self, normed: bool = True):
if normed:
self.node_feature_functions.append(('normalized_goal_distance',
lambda position, goal_mouth_position, max_dist_to_goal: normalize_distance(
np.linalg.norm(position - goal_mouth_position),
max_dist_to_goal)
, ['position', 'goal_mouth_position', 'max_dist_to_goal']))
self.node_feature_functions.append(
(
"normalized_goal_distance",
lambda position, goal_mouth_position, max_dist_to_goal: normalize_distance(
np.linalg.norm(position - goal_mouth_position), max_dist_to_goal
),
["position", "goal_mouth_position", "max_dist_to_goal"],
)
)
else:
self.node_feature_functions.append(('goal_distance', lambda position, goal_mouth_position: np.linalg.norm(position - goal_mouth_position), ['position', 'goal_mouth_position']))
self.node_feature_functions.append(
(
"goal_distance",
lambda position, goal_mouth_position: np.linalg.norm(
position - goal_mouth_position
),
["position", "goal_mouth_position"],
)
)
return self

def add_goal_angle(self, normed: bool = True):
if normed:
self.node_feature_functions.append(('normed_goal_angle', normalize_angles, ['goal_angle']))
self.node_feature_functions.append(
("normed_goal_angle", normalize_angles, ["goal_angle"])
)
else:
self.node_feature_functions.append(('goal_angle', lambda a: a, ['goal_angle']))

return self

self.node_feature_functions.append(
("goal_angle", lambda a: a, ["goal_angle"])
)

return self

def add_ball_distance(self, normed: bool = True):
if normed:
self.node_feature_functions.append(('normalized_ball_distance',
lambda position, ball_position, max_dist_to_player: normalize_distance(
np.linalg.norm(position - ball_position),
max_dist_to_player)
, ['position', 'ball_position', 'max_dist_to_player']))
self.node_feature_functions.append(
(
"normalized_ball_distance",
lambda position, ball_position, max_dist_to_player: normalize_distance(
np.linalg.norm(position - ball_position), max_dist_to_player
),
["position", "ball_position", "max_dist_to_player"],
)
)
else:
self.node_feature_functions.append(('ball_distance', lambda position, ball_position: np.linalg.norm(position - ball_position), ['position', 'ball_position']))
self.node_feature_functions.append(
(
"ball_distance",
lambda position, ball_position: np.linalg.norm(
position - ball_position
),
["position", "ball_position"],
)
)
return self

def add_ball_angle(self, normed: bool = True):
if normed:
self.node_feature_functions.append(('normed_ball_angle', normalize_angles, ['ball_angle']))
self.node_feature_functions.append(
("normed_ball_angle", normalize_angles, ["ball_angle"])
)
else:
self.node_feature_functions.append(('ball_angle', lambda a: a, ['ball_angle']))

return self

self.node_feature_functions.append(
("ball_angle", lambda a: a, ["ball_angle"])
)

return self

def add_team(self):
self.node_feature_functions.append(('team', lambda t: t, ['team']))
self.node_feature_functions.append(("team", lambda t: t, ["team"]))
return self

def add_potential_reciever(self):
self.node_feature_functions.append(('potential_reciever', lambda potential_receiver, non_potential_receiver_node_value : 1.0 if potential_receiver else non_potential_receiver_node_value, ['potential_receiver', 'non_potential_receiver_node_value']))
self.node_feature_functions.append(
(
"potential_reciever",
lambda potential_receiver, non_potential_receiver_node_value: (
1.0 if potential_receiver else non_potential_receiver_node_value
),
["potential_receiver", "non_potential_receiver_node_value"],
)
)
return self

def get_features(self):
return self.node_feature_functions

Loading