diff --git a/.gitignore b/.gitignore index 5456202..5e47088 100644 --- a/.gitignore +++ b/.gitignore @@ -164,6 +164,7 @@ connpy_roadmap.md MULTI_USER_PLAN.md COPILOT_PLAN.md ARCHITECTURAL_DEBT_REFACTOR.md +COPILOT_UI_FEATURES.md #themes nord.yml diff --git a/connpy/ai.py b/connpy/ai.py index e987b21..9e0d20f 100755 --- a/connpy/ai.py +++ b/connpy/ai.py @@ -205,6 +205,7 @@ class ai: - COMPLETE MISSIONS: Execute ALL steps of a mission before reporting back. - DIAGRAM: Use ASCII art or Unicode box-drawing characters directly in your responses to visualize topologies or paths when helpful. - EVIDENCE: Include 'Key Snippets' from tool outputs. Be token-efficient. + - LANGUAGE: You MUST respond in the same language used by the user in their question or instruction. - NO WANDERING: Do not speculate. If stuck, report attempts. - SAFETY: When you use 'run_commands' with configuration commands, the system automatically prompts the user for confirmation. Just execute - don't ask permission first. {architect_instructions} @@ -222,6 +223,7 @@ class ai: - ENGINEER CAPABILITIES: Your Engineer can: * Filter nodes (list_nodes), Run CLI commands (run_commands), Get metadata (get_node_info). - ANALYSIS: Review technical findings to identify patterns or design failures. + - LANGUAGE: You MUST respond in the same language used by the user in their question or instruction. - MEMORY: Update long-term facts ONLY when the user explicitly requests it. CRITICAL - EFFICIENT DELEGATION: @@ -1334,6 +1336,7 @@ Your brief tactical guide in markdown. low 6. Risk level is usually "low" for read-only/no commands. +7. You MUST respond in the same language used by the user in their question. Terminal Context: {terminal_buffer} @@ -1359,6 +1362,7 @@ command 2 low, high, or destructive 6. Risk level: "low" for read-only/no commands, "high" for config changes, "destructive" for potentially dangerous ops. +7. You MUST respond in the same language used by the user in their question. Terminal Context: {terminal_buffer} diff --git a/connpy/cli/terminal_ui.py b/connpy/cli/terminal_ui.py index e80967c..f76f485 100644 --- a/connpy/cli/terminal_ui.py +++ b/connpy/cli/terminal_ui.py @@ -57,9 +57,10 @@ class CopilotInterface: async def run_session(self, raw_bytes: bytes, - cmd_byte_positions: List[tuple], node_info: dict, - on_ai_call: Callable): + on_ai_call: Callable, + cmd_byte_positions: List[tuple] = None, + blocks: List[tuple] = None): """ Runs the interactive Copilot session. on_ai_call: async function(active_buffer, question) -> result_dict @@ -69,9 +70,11 @@ class CopilotInterface: try: # Prepare UI state buffer = log_cleaner(raw_bytes.decode(errors='replace')) - blocks = self.ai_service.build_context_blocks(raw_bytes, cmd_byte_positions, node_info) - last_line = buffer.split('\n')[-1].strip() if buffer.strip() else "(prompt)" - blocks.append((len(raw_bytes), last_line[:80])) + + # Use pre-calculated blocks if provided (remote mode), otherwise calculate locally (local mode) + if blocks is None: + last_line = buffer.split('\n')[-1].strip() if buffer.strip() else "(prompt)" + blocks = self.ai_service.build_context_blocks(raw_bytes, cmd_byte_positions, node_info, last_line=last_line) state = { 'context_cmd': 1, @@ -88,7 +91,7 @@ class CopilotInterface: self.console.print("") # Salto de línea real self.console.print(Rule(title="[bold cyan] AI TERMINAL COPILOT [/bold cyan]", style="cyan")) self.console.print(Panel( - "[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel.\n" + "[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel. Type / for commands.\n" "Tab to change context mode. Ctrl+\u2191/\u2193 to adjust context. \u2191\u2193 for question history.[/dim]", border_style="cyan" )) diff --git a/connpy/core.py b/connpy/core.py index c4dcad2..fe2bc21 100755 --- a/connpy/core.py +++ b/connpy/core.py @@ -211,6 +211,7 @@ class node: self.output = "" self.status = 1 self.result = {} + self.cmd_byte_positions = [(0, None)] @MethodHook def _passtx(self, passwords, *, keyfile=None): @@ -385,9 +386,9 @@ class node: loop = asyncio.get_running_loop() child_reader_queue = asyncio.Queue() - # Track command byte positions for copilot context navigation + # Reset and track command byte positions for copilot context navigation # Each entry is (byte_position, command_text_or_None) - cmd_byte_positions = [(0, None)] + self.cmd_byte_positions = [(self.mylog.tell() if hasattr(self, 'mylog') else 0, None)] def _child_read_ready(): try: @@ -428,7 +429,7 @@ class node: node_info["prompt"] = to_str(self.tags.get("prompt", r'>$|#$|\$$|>.$|#.$|\$.$')) # Invoke copilot (async callback handles UI) - await copilot_handler(self.mylog.getvalue(), node_info, local_stream, child_fd, cmd_byte_positions) + await copilot_handler(self.mylog.getvalue(), node_info, local_stream, child_fd, self.cmd_byte_positions) continue # Remove any stray \x00 bytes and forward normally @@ -436,10 +437,9 @@ class node: if clean_data: # Track command boundaries when user hits Enter if hasattr(self, 'mylog') and (b'\r' in clean_data or b'\n' in clean_data): - cmd_byte_positions.append((self.mylog.tell(), None)) - - try: - os.write(child_fd, clean_data) + self.cmd_byte_positions.append((self.mylog.tell(), None)) + + try: os.write(child_fd, clean_data) except OSError: break self.lastinput = time() @@ -561,6 +561,45 @@ class node: finally: local_stream.teardown() + @MethodHook + async def inject_commands(self, commands, child_fd, on_inject=None): + """ + Inject a list of commands into the node's PTY. + Handles screen_length_command, history tracking and delays. + """ + if not commands: + return + + # 0. Clear line + os.write(child_fd, b'\x15') + await asyncio.sleep(0.1) + + # 1. Prepare list (prepend screen_length if exists) + slc = self.tags.get("screen_length_command") if hasattr(self, 'tags') and isinstance(self.tags, dict) else None + + to_send = list(commands) + if slc and slc not in to_send: # avoid duplicates if already there + to_send.insert(0, slc) + + # 2. Inject one by one + for cmd in to_send: + # Register in node's official history (SKIP if it's the administrative screen length command) + if cmd != slc and hasattr(self, 'cmd_byte_positions') and self.cmd_byte_positions is not None: + log_pos = self.mylog.tell() if hasattr(self, 'mylog') else 0 + self.cmd_byte_positions.append((log_pos, cmd)) + + # Write physically to PTY + os.write(child_fd, (cmd + "\n").encode()) + + # Notify (e.g., for gRPC or logs) - SKIP for administrative SLC + if on_inject and cmd != slc: + if asyncio.iscoroutinefunction(on_inject): + await on_inject(cmd) + else: + on_inject(cmd) + + # Delay to avoid overwhelming the router + await asyncio.sleep(0.8) @MethodHook def interact(self, debug=False, logger=None): @@ -642,7 +681,7 @@ class node: while True: action, commands, custom_cmd = await interface.run_session( raw_bytes=raw_bytes, - cmd_byte_positions=cmd_byte_positions, + cmd_byte_positions=self.cmd_byte_positions, node_info=node_info, on_ai_call=on_ai_call ) @@ -658,20 +697,7 @@ class node: if action in ("send_all", "custom"): cmds_to_send = commands if action == "send_all" else custom_cmd - - if cmds_to_send: - os.write(child_fd, b'\x15') # Ctrl+U - await asyncio.sleep(0.1) - - # Prepend screen length command to avoid pagination - if "screen_length_command" in self.tags: - cmds_to_send.insert(0, self.tags["screen_length_command"]) - - for cmd in cmds_to_send: - if cmd_byte_positions is not None: - cmd_byte_positions.append((self.mylog.tell(), cmd)) - os.write(child_fd, (cmd + "\n").encode()) - await asyncio.sleep(0.8) + await self.inject_commands(cmds_to_send, child_fd) else: os.write(child_fd, b'\x15\r') except Exception as e: diff --git a/connpy/grpc_layer/server.py b/connpy/grpc_layer/server.py index e053f95..1f72b02 100644 --- a/connpy/grpc_layer/server.py +++ b/connpy/grpc_layer/server.py @@ -207,15 +207,34 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): import json import asyncio import os + from ..services.ai_service import AIService + + service = AIService(self.service.config) if node_info is None: node_info = {} + + # Calculate real command blocks from history using the central service + raw_bytes = n.mylog.getvalue() if hasattr(n, 'mylog') else buffer + if not isinstance(raw_bytes, bytes): + raw_bytes = str(raw_bytes).encode() + + from connpy.utils import log_cleaner + last_line = log_cleaner(raw_bytes.decode(errors='replace')).split('\n')[-1].strip() + blocks = service.build_context_blocks(raw_bytes, n.cmd_byte_positions, node_info, last_line=last_line) + node_info["context_blocks"] = blocks node_info_json = json.dumps(node_info) # Convert buffer to string if it's bytes for the preview preview_str = buffer[-200:].decode(errors='replace') if isinstance(buffer, bytes) else str(buffer)[-200:] + # Generate a unique session ID for this copilot interaction to prevent race conditions + import uuid + copilot_session_id = str(uuid.uuid4()) + node_info["session_id"] = copilot_session_id + node_info_json = json.dumps(node_info) + # 1. Send prompt to client response_queue.put(connpy_pb2.InteractResponse( copilot_prompt=True, @@ -224,6 +243,13 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): )) while True: + # 0. Drain the queue of any stale messages before starting a new interaction + while not remote_stream.copilot_queue.empty(): + try: + remote_stream.copilot_queue.get_nowait() + except: + break + # 2. Await the question from client via the copilot_queue import threading def preload_ai_deps(): @@ -236,8 +262,17 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): try: req_data = await asyncio.wait_for(remote_stream.copilot_queue.get(), timeout=120) if not req_data: return - if "question" not in req_data or not req_data["question"] or req_data["question"] == "CANCEL" or req_data.get("action") == "cancel": - os.write(child_fd, b'\x15\r') + + # Validate session ID if provided by client (skip validation if not provided for CLI compatibility) + req_session_id = req_data.get("session_id") + if req_session_id and req_session_id != copilot_session_id: + continue # Ignore stale request from a previous session + + if "question" not in req_data or not req_data["question"] or req_data["question"] == "CANCEL" or req_data.get("action") in ("cancel", "web_cancel"): + if req_data.get("action") == "web_cancel": + os.write(child_fd, b'\x05') + else: + os.write(child_fd, b'\x15\r') return question = req_data["question"] @@ -264,9 +299,6 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): return # 3. Call AI Service with streaming - from ..services.ai_service import AIService - service = AIService(self.service.config) - def chunk_callback(chunk_text): if chunk_text: response_queue.put(connpy_pb2.InteractResponse( @@ -287,8 +319,11 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): if wait_action_task in done: req_data = wait_action_task.result() ai_task.cancel() - if req_data.get("action") == "cancel" or req_data.get("question") == "CANCEL": - os.write(child_fd, b'\x15\r') + if req_data.get("action") in ("cancel", "web_cancel") or req_data.get("question") == "CANCEL": + if req_data.get("action") == "web_cancel": + os.write(child_fd, b'\x05') + else: + os.write(child_fd, b'\x15\r') return continue # Loop back instead of returning to keep session alive else: @@ -312,45 +347,27 @@ class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): if action == "continue": continue # Loop back for next question - if action == "cancel": - os.write(child_fd, b'\x15\r') + if action in ("cancel", "web_cancel"): + if action == "web_cancel": + os.write(child_fd, b'\x05') + else: + os.write(child_fd, b'\x15\r') return except asyncio.TimeoutError: os.write(child_fd, b'\x15\r') return + def on_inject(cmd): + response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=cmd)) + if action == "send_all": commands = result.get("commands", []) - os.write(child_fd, b'\x15') # Ctrl+U to clear line - await asyncio.sleep(0.1) - - # Prepend screen length command to avoid pagination - if "screen_length_command" in n.tags: - os.write(child_fd, (n.tags["screen_length_command"] + "\n").encode()) - response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=n.tags["screen_length_command"])) - await asyncio.sleep(0.8) - - for cmd in commands: - os.write(child_fd, (cmd + "\n").encode()) - response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=cmd)) - await asyncio.sleep(0.8) + await n.inject_commands(commands, child_fd, on_inject=on_inject) return elif action.startswith("custom:"): - custom_cmds = action[7:] - os.write(child_fd, b'\x15') - await asyncio.sleep(0.1) - - # Prepend screen length command to avoid pagination - if "screen_length_command" in n.tags: - os.write(child_fd, (n.tags["screen_length_command"] + "\n").encode()) - response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=n.tags["screen_length_command"])) - await asyncio.sleep(0.8) - - for cmd in custom_cmds.split('\n'): - if cmd.strip(): - os.write(child_fd, (cmd.strip() + "\n").encode()) - response_queue.put(connpy_pb2.InteractResponse(copilot_injected_command=cmd.strip())) - await asyncio.sleep(0.8) + custom_cmds_raw = action[7:] + custom_cmds = [cmd.strip() for cmd in custom_cmds_raw.split('\n') if cmd.strip()] + await n.inject_commands(custom_cmds, child_fd, on_inject=on_inject) return else: os.write(child_fd, b'\x15\r') diff --git a/connpy/grpc_layer/stubs.py b/connpy/grpc_layer/stubs.py index 5250c5e..24d6218 100644 --- a/connpy/grpc_layer/stubs.py +++ b/connpy/grpc_layer/stubs.py @@ -43,7 +43,7 @@ class NodeStub: self.remote_host = remote_host self.config = config - def _handle_remote_copilot(self, res, request_queue, response_queue, client_buffer_bytes, cmd_byte_positions, pause_generator, resume_generator, old_tty): + def _handle_remote_copilot(self, res, request_queue, response_queue, client_buffer_bytes, pause_generator, resume_generator, old_tty): import json, asyncio, termios, sys, tty, queue from ..core import copilot_terminal_mode from . import connpy_pb2 @@ -51,6 +51,10 @@ class NodeStub: pause_generator() termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) + + node_info = json.loads(res.copilot_node_info_json) if res.copilot_node_info_json else {} + blocks = node_info.get("context_blocks", []) + interface = CopilotInterface( self.config, history=getattr(self, 'copilot_history', None), @@ -58,8 +62,6 @@ class NodeStub: ) self.copilot_history = interface.history self.copilot_state = interface.session_state - - node_info = json.loads(res.copilot_node_info_json) if res.copilot_node_info_json else {} async def on_ai_call_remote(active_buffer, question, chunk_callback, merged_node_info): # Send request to server @@ -85,9 +87,9 @@ class NodeStub: while True: action, commands, custom_cmd = await interface.run_session( raw_bytes=bytes(client_buffer_bytes), - cmd_byte_positions=cmd_byte_positions, node_info=node_info, - on_ai_call=on_ai_call_remote + on_ai_call=on_ai_call_remote, + blocks=blocks ) if action == "continue": @@ -124,7 +126,6 @@ class NodeStub: request_queue = queue.Queue() client_buffer_bytes = bytearray() - cmd_byte_positions = [(0, None)] pause_stdin = [False] wake_r, wake_w = os.pipe() @@ -171,8 +172,6 @@ class NodeStub: data = os.read(sys.stdin.fileno(), 1024) if not data: break - if b'\r' in data or b'\n' in data: - cmd_byte_positions.append((len(client_buffer_bytes), None)) yield connpy_pb2.InteractRequest(stdin_data=data) except OSError: break @@ -246,14 +245,11 @@ class NodeStub: if res.copilot_prompt: self._handle_remote_copilot( res, request_queue, response_queue, - client_buffer_bytes, cmd_byte_positions, + client_buffer_bytes, pause_generator, resume_generator, old_tty ) continue - if res.copilot_injected_command: - cmd_byte_positions.append((len(client_buffer_bytes), res.copilot_injected_command)) - if res.stdout_data: os.write(sys.stdout.fileno(), res.stdout_data) client_buffer_bytes.extend(res.stdout_data) @@ -275,7 +271,6 @@ class NodeStub: params_json = json.dumps(connection_params) request_queue = queue.Queue() client_buffer_bytes = bytearray() - cmd_byte_positions = [(0, None)] pause_stdin = [False] wake_r, wake_w = os.pipe() @@ -323,8 +318,6 @@ class NodeStub: data = os.read(sys.stdin.fileno(), 1024) if not data: break - if b'\r' in data or b'\n' in data: - cmd_byte_positions.append((len(client_buffer_bytes), None)) yield connpy_pb2.InteractRequest(stdin_data=data) except OSError: break @@ -397,14 +390,11 @@ class NodeStub: if res.copilot_prompt: self._handle_remote_copilot( res, request_queue, response_queue, - client_buffer_bytes, cmd_byte_positions, + client_buffer_bytes, pause_generator, resume_generator, old_tty ) continue - if res.copilot_injected_command: - cmd_byte_positions.append((len(client_buffer_bytes), res.copilot_injected_command)) - if res.stdout_data: os.write(sys.stdout.fileno(), res.stdout_data) client_buffer_bytes.extend(res.stdout_data) diff --git a/connpy/services/ai_service.py b/connpy/services/ai_service.py index b9eccd2..f30e8a1 100644 --- a/connpy/services/ai_service.py +++ b/connpy/services/ai_service.py @@ -6,10 +6,10 @@ from connpy.utils import log_cleaner class AIService(BaseService): """Business logic for interacting with AI agents and LLM configurations.""" - def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict) -> list: + def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list: """Identifies command blocks in the terminal history.""" blocks = [] - if not (cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes): + if not raw_bytes: return blocks default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$' @@ -20,29 +20,43 @@ class AIService(BaseService): except Exception: prompt_re = re.compile(re.sub(r'(?= 1: + for i in range(1, len(cmd_byte_positions)): + pos, known_cmd = cmd_byte_positions[i] + prev_pos = cmd_byte_positions[i-1][0] - if preview: - match = prompt_re.search(preview) - if match: - cmd_text = preview[match.end():].strip() - if cmd_text: - blocks.append((pos, preview[:80])) + if known_cmd: + prev_chunk = raw_bytes[prev_pos:pos] + prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace')) + prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()] + prompt_text = prev_lines[-1].strip() if prev_lines else "" + preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd + blocks.append((pos, preview[:80])) + else: + chunk = raw_bytes[prev_pos:pos] + cleaned = log_cleaner(chunk.decode(errors='replace')) + lines = [l for l in cleaned.split('\n') if l.strip()] + preview = lines[-1].strip() if lines else "" + + if preview: + match = prompt_re.search(preview) + if match: + cmd_text = preview[match.end():].strip() + if cmd_text: + blocks.append((pos, preview[:80])) + + # Always ensure there is a final block representing the current prompt + # Find the start of the last line in the raw buffer to avoid selecting everything + # when no commands have been executed yet. + last_newline = raw_bytes.rfind(b'\n') + current_prompt_pos = last_newline + 1 if last_newline != -1 else 0 + + if not blocks: + blocks.append((current_prompt_pos, last_line[:80] if last_line else "CURRENT CONTEXT")) + elif blocks[-1][0] < current_prompt_pos: + # If the last command block ends before the current prompt, add the prompt block + blocks.append((current_prompt_pos, last_line[:80] if last_line else "CURRENT CONTEXT")) + return blocks def process_copilot_input(self, input_text: str, session_state: dict) -> dict: diff --git a/docs/connpy/index.html b/docs/connpy/index.html index 7886fa4..1585ac4 100644 --- a/docs/connpy/index.html +++ b/docs/connpy/index.html @@ -84,10 +84,10 @@ cd connpy docker compose build # Run it like a native app (completely silent) -docker compose --log-level ERROR run --rm --remove-orphans connpy-app [command] +docker compose run --rm --remove-orphans connpy-app [command] # Pro Tip: Add this alias for a 100% native experience from any folder -alias conn='docker compose -f /path/to/connpy/docker-compose.yml --log-level ERROR run --rm --remove-orphans connpy-app' +alias conn='docker compose -f /path/to/connpy/docker-compose.yml run --rm --remove-orphans connpy-app'