11import logging
22import os
33import time
4+ from collections .abc import Callable
45from threading import Lock
5- from typing import Any , Callable , Dict , Optional
6+ from typing import Any
67
78from preswald .engine .runner import ScriptRunner
89from preswald .engine .utils import (
@@ -28,19 +29,19 @@ class BasePreswaldService:
2829 _not_initialized_msg = "Base service not initialized."
2930
3031 def __init__ (self ):
31- self ._component_states : Dict [str , Any ] = {}
32+ self ._component_states : dict [str , Any ] = {}
3233 self ._lock = Lock ()
3334
3435 # Data management
35- self .data_manager : Optional [ DataManager ] = None # set during server creation
36+ self .data_manager : DataManager | None = None # set during server creation
3637
3738 # Initialize service state
38- self ._script_path : Optional [ str ] = None
39+ self ._script_path : str | None = None
3940 self ._is_shutting_down : bool = False
4041 self ._render_buffer = RenderBuffer ()
4142
4243 # Initialize session tracking
43- self .script_runners : Dict [str , ScriptRunner ] = {}
44+ self .script_runners : dict [str , ScriptRunner ] = {}
4445
4546 # Layout management
4647 self ._layout_manager = LayoutManager ()
@@ -61,7 +62,7 @@ def initialize(cls, script_path=None):
6162 return cls ._instance
6263
6364 @property
64- def script_path (self ) -> Optional [ str ] :
65+ def script_path (self ) -> str | None :
6566 return self ._script_path
6667
6768 @script_path .setter
@@ -76,9 +77,10 @@ def script_path(self, path: str):
7677 def append_component (self , component ):
7778 """Add a component to the layout manager"""
7879 try :
79-
8080 if isinstance (component , dict ):
81- logger .info (f"[APPEND] Appending component: { component .get ('id' )} , type: { component .get ('type' )} " )
81+ logger .info (
82+ f"[APPEND] Appending component: { component .get ('id' )} , type: { component .get ('type' )} "
83+ )
8284 # Clean any NaN values in the component
8385 clean_start = time .time ()
8486 cleaned_component = clean_nan_values (component )
@@ -101,7 +103,9 @@ def append_component(self, component):
101103 )
102104 self ._layout_manager .add_component (cleaned_component )
103105 if logger .isEnabledFor (logging .DEBUG ):
104- logger .debug (f"Added component with state: { cleaned_component } " )
106+ logger .debug (
107+ f"Added component with state: { cleaned_component } "
108+ )
105109 else :
106110 # Components without IDs are added as-is
107111 self ._layout_manager .add_component (cleaned_component )
@@ -137,7 +141,7 @@ def get_rendered_components(self):
137141 rows = self ._layout_manager .get_layout ()
138142 return {"rows" : rows }
139143
140- async def handle_client_message (self , client_id : str , message : Dict [str , Any ]):
144+ async def handle_client_message (self , client_id : str , message : dict [str , Any ]):
141145 """Process incoming messages from clients"""
142146 start_time = time .time ()
143147 try :
@@ -196,7 +200,7 @@ async def unregister_client(self, client_id: str):
196200 def _create_send_callback (self , websocket : Any ) -> Callable :
197201 """Create a message sending callback for a specific websocket"""
198202
199- async def send_message (msg : Dict [str , Any ]):
203+ async def send_message (msg : dict [str , Any ]):
200204 if not self ._is_shutting_down :
201205 try :
202206 await websocket .send_json (msg )
@@ -206,7 +210,7 @@ async def send_message(msg: Dict[str, Any]):
206210 return send_message
207211
208212 async def _broadcast_state_updates (
209- self , states : Dict [str , Any ], exclude_client : Optional [ str ] = None
213+ self , states : dict [str , Any ], exclude_client : str | None = None
210214 ):
211215 """Broadcast state updates to all clients except the sender"""
212216
@@ -231,18 +235,15 @@ async def _broadcast_state_updates(
231235 except Exception as e :
232236 logger .error (f"Error broadcasting to { client_id } : { e } " )
233237
234- async def _handle_component_update (self , client_id : str , message : Dict [str , Any ]):
238+ async def _handle_component_update (self , client_id : str , message : dict [str , Any ]):
235239 """Handle component state update messages"""
236240 states = message .get ("states" , {})
237241 if not states :
238242 await self ._send_error (client_id , "Component update missing states" )
239243 raise ValueError ("Component update missing states" )
240244
241245 # Only rerun if any state actually changed
242- changed_states = {
243- k : v for k , v in states .items ()
244- if self .should_render (k , v )
245- }
246+ changed_states = {k : v for k , v in states .items () if self .should_render (k , v )}
246247
247248 if not changed_states :
248249 logger .debug ("[STATE] No actual state changes detected. Skipping rerun." )
@@ -260,6 +261,10 @@ async def _handle_component_update(self, client_id: str, message: Dict[str, Any]
260261 # Broadcast updates to other clients
261262 await self ._broadcast_state_updates (changed_states , exclude_client = client_id )
262263
264+ def connect_data_manager (self ):
265+ """Connect the data manager"""
266+ self .data_manager .connect ()
267+
263268 def _initialize_data_manager (self , script_path : str ) -> None :
264269 script_dir = os .path .dirname (script_path )
265270 preswald_path = os .path .join (script_dir , "preswald.toml" )
@@ -269,7 +274,9 @@ def _initialize_data_manager(self, script_path: str) -> None:
269274 preswald_path = preswald_path , secrets_path = secrets_path
270275 )
271276
272- async def _register_common_client_setup (self , client_id : str , websocket : Any ) -> ScriptRunner :
277+ async def _register_common_client_setup (
278+ self , client_id : str , websocket : Any
279+ ) -> ScriptRunner :
273280 logger .info (f"Registering client: { client_id } " )
274281
275282 self .websocket_connections [client_id ] = websocket
@@ -309,7 +316,7 @@ async def _send_initial_states(self, websocket: Any):
309316 except Exception as e :
310317 logger .error (f"Error sending initial states: { e } " )
311318
312- def _update_component_states (self , states : Dict [str , Any ]):
319+ def _update_component_states (self , states : dict [str , Any ]):
313320 """Update internal state dictionary with cleaned component values."""
314321 with self ._lock :
315322 logger .debug ("[STATE] Updating states" )
0 commit comments