diff --git a/docs/connpy/cli/terminal_ui.html b/docs/connpy/cli/terminal_ui.html index cebfcc5..aff4dcb 100644 --- a/docs/connpy/cli/terminal_ui.html +++ b/docs/connpy/cli/terminal_ui.html @@ -90,9 +90,10 @@ el.replaceWith(d); async def run_session(self, raw_bytes: bytes, - cmd_byte_positions: List[tuple], node_info: dict, - on_ai_call: Callable): + on_ai_call: Callable, + cmd_byte_positions: List[tuple] = None, + blocks: List[tuple] = None): """ Runs the interactive Copilot session. on_ai_call: async function(active_buffer, question) -> result_dict @@ -102,9 +103,11 @@ el.replaceWith(d); try: # Prepare UI state buffer = log_cleaner(raw_bytes.decode(errors='replace')) - blocks = self.ai_service.build_context_blocks(raw_bytes, cmd_byte_positions, node_info) - last_line = buffer.split('\n')[-1].strip() if buffer.strip() else "(prompt)" - blocks.append((len(raw_bytes), last_line[:80])) + + # Use pre-calculated blocks if provided (remote mode), otherwise calculate locally (local mode) + if blocks is None: + last_line = buffer.split('\n')[-1].strip() if buffer.strip() else "(prompt)" + blocks = self.ai_service.build_context_blocks(raw_bytes, cmd_byte_positions, node_info, last_line=last_line) state = { 'context_cmd': 1, @@ -121,7 +124,7 @@ el.replaceWith(d); self.console.print("") # Salto de línea real self.console.print(Rule(title="[bold cyan] AI TERMINAL COPILOT [/bold cyan]", style="cyan")) self.console.print(Panel( - "[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel.\n" + "[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel. Type / for commands.\n" "Tab to change context mode. Ctrl+\u2191/\u2193 to adjust context. \u2191\u2193 for question history.[/dim]", border_style="cyan" )) @@ -161,9 +164,8 @@ el.replaceWith(d); if state['context_mode'] == self.mode_lines: return '\n'.join(buffer.split('\n')[-state['context_lines']:]) idx = max(0, state['total_cmds'] - state['context_cmd']) - start, preview = blocks[idx] - if state['context_mode'] == self.mode_single and idx + 1 < state['total_cmds']: - end = blocks[idx + 1][0] + start, end, preview = blocks[idx] + if state['context_mode'] == self.mode_single: active_raw = raw_bytes[start:end] else: active_raw = raw_bytes[start:] @@ -205,7 +207,40 @@ el.replaceWith(d); base_str = f'\u25b6 Ctrl+\u2191/\u2193 adjusts by 50 lines [Tab: {m_label}]' else: idx = max(0, state['total_cmds'] - state['context_cmd']) - desc = blocks[idx][1] + import re + + def clean_preview(text): + # Limpia saltos de línea y el prompt inicial (todo hasta #, > o $) para que quede solo el comando + original = text.strip().replace('\r', '').replace('\n', ' ') + cleaned = re.sub(r'^.*?[#>\$]\s*', '', original) + # Si limpiar el prompt nos deja con un string vacío (ej: era solo "iol#"), devolvemos el original + return cleaned if cleaned else original + + if state['context_mode'] == self.mode_range: + range_blocks = blocks[idx:] + # Si hay más de un bloque, el último es siempre el prompt vacío/actual. Lo omitimos visualmente. + if len(range_blocks) > 1: + range_blocks = range_blocks[:-1] + + # Limpiar y truncar comandos muy largos para que no rompan la UI + previews = [] + for b in range_blocks: + p = clean_preview(b[2]) + if p: + # Truncar comandos individuales largos + if len(p) > 25: p = p[:22] + "..." + previews.append(p) + + if not previews: + desc = clean_preview(blocks[idx][2]) + elif len(previews) <= 3: + desc = " + ".join(previews) + else: + desc = f"{previews[0]} + {previews[1]} + {previews[2]} ... (+{len(previews)-3})" + else: + # Modo SINGLE original + desc = clean_preview(blocks[idx][2]) + base_str = f'\u25b6 {desc} [Tab: {m_label}]' # Wrap base_str in a style to maintain consistency and avoid glitches @@ -332,39 +367,67 @@ el.replaceWith(d); # Use persona from overrides (one-shot) or from session state active_persona = merged_node_info.get('persona', self.session_state.get('persona', 'engineer')) persona_color = self._get_theme_color(active_persona, fallback="cyan") + persona_title = "Network Architect" if active_persona == "architect" else "Network Engineer" active_buffer = get_active_buffer() - live_text = "Thinking..." - panel = Panel(live_text, title=f"[bold {persona_color}]Copilot Guide[/bold {persona_color}]", border_style=persona_color) + live_text = "" + first_chunk = True + + import sys + from rich.rule import Rule + from rich.status import Status + from connpy.printer import IncrementalMarkdownParser + + md_parser = IncrementalMarkdownParser(console=self.console) + + status_spinner = Status( + f"[bold {persona_color}]{persona_title}:[/bold {persona_color}] [dim]Thinking...[/dim]", + console=self.console, + spinner="dots" + ) + status_spinner.start() def on_chunk(text): - nonlocal live_text - if live_text == "Thinking...": live_text = "" + nonlocal live_text, first_chunk + if first_chunk: + status_spinner.stop() + # Print header rule before first chunk arrives + self.console.print(Rule( + f"[bold {persona_color}]{persona_title}[/bold {persona_color}]", + style=persona_color + )) + first_chunk = False live_text += text + md_parser.feed(text) - with Live(panel, console=self.console, refresh_per_second=10) as live: - def update_live(t): - live.update(Panel(Markdown(t), title=f"[bold {persona_color}]Copilot Guide[/bold {persona_color}]", border_style=persona_color)) - - wrapped_chunk = lambda t: (on_chunk(t), update_live(live_text)) - - # Check for interruption during AI call - ai_task = asyncio.create_task(on_ai_call(active_buffer, clean_question, wrapped_chunk, merged_node_info)) - - try: - while not ai_task.done(): - await asyncio.sleep(0.05) - result = await ai_task - except asyncio.CancelledError: - return "cancel", None, None + # Check for interruption during AI call + ai_task = asyncio.create_task(on_ai_call(active_buffer, clean_question, on_chunk, merged_node_info)) + + try: + while not ai_task.done(): + await asyncio.sleep(0.05) + result = await ai_task + except asyncio.CancelledError: + status_spinner.stop() + return "cancel", None, None + + # Ensure spinner is stopped if no chunks arrived + if first_chunk: + status_spinner.stop() + # Close the streamed output with a Rule + if not first_chunk: + md_parser.flush() + self.console.print(Rule(style=persona_color)) + if not result or result.get("error"): - if result and result.get("error"): self.console.print(f"[red]Error: {result['error']}[/red]") + if first_chunk and result and result.get("error"): + self.console.print(f"[red]Error: {result['error']}[/red]") return "cancel", None, None - # 4. Handle result - if live_text == "Thinking..." and result.get("guide"): - self.console.print(Panel(Markdown(result["guide"]), title=f"[bold {persona_color}]Copilot Guide[/bold {persona_color}]", border_style=persona_color)) + # If no chunks were streamed but we have a guide, print it as a panel + if first_chunk and result and result.get("guide"): + self.console.print(Panel(Markdown(result["guide"]), title=f"[bold {persona_color}]{persona_title}[/bold {persona_color}]", border_style=persona_color)) commands = result.get("commands", []) if not commands: @@ -466,14 +529,13 @@ el.replaceWith(d); return "cancel", None, None finally: - state['cancelled'] = True - self.console.print("[dim]Returning to session...[/dim]") + state['cancelled'] = True
-async def run_session(self,
raw_bytes: bytes,
cmd_byte_positions: List[tuple],
node_info: dict,
on_ai_call: Callable)
+async def run_session(self,
raw_bytes: bytes,
node_info: dict,
on_ai_call: Callable,
cmd_byte_positions: List[tuple] = None,
blocks: List[tuple] = None)
async def run_session(self,
raw_bytes: bytes,
- cmd_byte_positions: List[tuple],
node_info: dict,
- on_ai_call: Callable):
+ on_ai_call: Callable,
+ cmd_byte_positions: List[tuple] = None,
+ blocks: List[tuple] = None):
"""
Runs the interactive Copilot session.
on_ai_call: async function(active_buffer, question) -> result_dict
@@ -494,9 +557,11 @@ el.replaceWith(d);
try:
# Prepare UI state
buffer = log_cleaner(raw_bytes.decode(errors='replace'))
- blocks = self.ai_service.build_context_blocks(raw_bytes, cmd_byte_positions, node_info)
- last_line = buffer.split('\n')[-1].strip() if buffer.strip() else "(prompt)"
- blocks.append((len(raw_bytes), last_line[:80]))
+
+ # Use pre-calculated blocks if provided (remote mode), otherwise calculate locally (local mode)
+ if blocks is None:
+ last_line = buffer.split('\n')[-1].strip() if buffer.strip() else "(prompt)"
+ blocks = self.ai_service.build_context_blocks(raw_bytes, cmd_byte_positions, node_info, last_line=last_line)
state = {
'context_cmd': 1,
@@ -513,7 +578,7 @@ el.replaceWith(d);
self.console.print("") # Salto de línea real
self.console.print(Rule(title="[bold cyan] AI TERMINAL COPILOT [/bold cyan]", style="cyan"))
self.console.print(Panel(
- "[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel.\n"
+ "[dim]Type your question. Enter to send, Escape/Ctrl+C to cancel. Type / for commands.\n"
"Tab to change context mode. Ctrl+\u2191/\u2193 to adjust context. \u2191\u2193 for question history.[/dim]",
border_style="cyan"
))
@@ -553,9 +618,8 @@ el.replaceWith(d);
if state['context_mode'] == self.mode_lines:
return '\n'.join(buffer.split('\n')[-state['context_lines']:])
idx = max(0, state['total_cmds'] - state['context_cmd'])
- start, preview = blocks[idx]
- if state['context_mode'] == self.mode_single and idx + 1 < state['total_cmds']:
- end = blocks[idx + 1][0]
+ start, end, preview = blocks[idx]
+ if state['context_mode'] == self.mode_single:
active_raw = raw_bytes[start:end]
else:
active_raw = raw_bytes[start:]
@@ -597,7 +661,40 @@ el.replaceWith(d);
base_str = f'\u25b6 Ctrl+\u2191/\u2193 adjusts by 50 lines [Tab: {m_label}]'
else:
idx = max(0, state['total_cmds'] - state['context_cmd'])
- desc = blocks[idx][1]
+ import re
+
+ def clean_preview(text):
+ # Limpia saltos de línea y el prompt inicial (todo hasta #, > o $) para que quede solo el comando
+ original = text.strip().replace('\r', '').replace('\n', ' ')
+ cleaned = re.sub(r'^.*?[#>\$]\s*', '', original)
+ # Si limpiar el prompt nos deja con un string vacío (ej: era solo "iol#"), devolvemos el original
+ return cleaned if cleaned else original
+
+ if state['context_mode'] == self.mode_range:
+ range_blocks = blocks[idx:]
+ # Si hay más de un bloque, el último es siempre el prompt vacío/actual. Lo omitimos visualmente.
+ if len(range_blocks) > 1:
+ range_blocks = range_blocks[:-1]
+
+ # Limpiar y truncar comandos muy largos para que no rompan la UI
+ previews = []
+ for b in range_blocks:
+ p = clean_preview(b[2])
+ if p:
+ # Truncar comandos individuales largos
+ if len(p) > 25: p = p[:22] + "..."
+ previews.append(p)
+
+ if not previews:
+ desc = clean_preview(blocks[idx][2])
+ elif len(previews) <= 3:
+ desc = " + ".join(previews)
+ else:
+ desc = f"{previews[0]} + {previews[1]} + {previews[2]} ... (+{len(previews)-3})"
+ else:
+ # Modo SINGLE original
+ desc = clean_preview(blocks[idx][2])
+
base_str = f'\u25b6 {desc} [Tab: {m_label}]'
# Wrap base_str in a style to maintain consistency and avoid glitches
@@ -724,39 +821,67 @@ el.replaceWith(d);
# Use persona from overrides (one-shot) or from session state
active_persona = merged_node_info.get('persona', self.session_state.get('persona', 'engineer'))
persona_color = self._get_theme_color(active_persona, fallback="cyan")
+ persona_title = "Network Architect" if active_persona == "architect" else "Network Engineer"
active_buffer = get_active_buffer()
- live_text = "Thinking..."
- panel = Panel(live_text, title=f"[bold {persona_color}]Copilot Guide[/bold {persona_color}]", border_style=persona_color)
+ live_text = ""
+ first_chunk = True
+
+ import sys
+ from rich.rule import Rule
+ from rich.status import Status
+ from connpy.printer import IncrementalMarkdownParser
+
+ md_parser = IncrementalMarkdownParser(console=self.console)
+
+ status_spinner = Status(
+ f"[bold {persona_color}]{persona_title}:[/bold {persona_color}] [dim]Thinking...[/dim]",
+ console=self.console,
+ spinner="dots"
+ )
+ status_spinner.start()
def on_chunk(text):
- nonlocal live_text
- if live_text == "Thinking...": live_text = ""
+ nonlocal live_text, first_chunk
+ if first_chunk:
+ status_spinner.stop()
+ # Print header rule before first chunk arrives
+ self.console.print(Rule(
+ f"[bold {persona_color}]{persona_title}[/bold {persona_color}]",
+ style=persona_color
+ ))
+ first_chunk = False
live_text += text
+ md_parser.feed(text)
- with Live(panel, console=self.console, refresh_per_second=10) as live:
- def update_live(t):
- live.update(Panel(Markdown(t), title=f"[bold {persona_color}]Copilot Guide[/bold {persona_color}]", border_style=persona_color))
-
- wrapped_chunk = lambda t: (on_chunk(t), update_live(live_text))
-
- # Check for interruption during AI call
- ai_task = asyncio.create_task(on_ai_call(active_buffer, clean_question, wrapped_chunk, merged_node_info))
-
- try:
- while not ai_task.done():
- await asyncio.sleep(0.05)
- result = await ai_task
- except asyncio.CancelledError:
- return "cancel", None, None
+ # Check for interruption during AI call
+ ai_task = asyncio.create_task(on_ai_call(active_buffer, clean_question, on_chunk, merged_node_info))
+
+ try:
+ while not ai_task.done():
+ await asyncio.sleep(0.05)
+ result = await ai_task
+ except asyncio.CancelledError:
+ status_spinner.stop()
+ return "cancel", None, None
+
+ # Ensure spinner is stopped if no chunks arrived
+ if first_chunk:
+ status_spinner.stop()
+ # Close the streamed output with a Rule
+ if not first_chunk:
+ md_parser.flush()
+ self.console.print(Rule(style=persona_color))
+
if not result or result.get("error"):
- if result and result.get("error"): self.console.print(f"[red]Error: {result['error']}[/red]")
+ if first_chunk and result and result.get("error"):
+ self.console.print(f"[red]Error: {result['error']}[/red]")
return "cancel", None, None
- # 4. Handle result
- if live_text == "Thinking..." and result.get("guide"):
- self.console.print(Panel(Markdown(result["guide"]), title=f"[bold {persona_color}]Copilot Guide[/bold {persona_color}]", border_style=persona_color))
+ # If no chunks were streamed but we have a guide, print it as a panel
+ if first_chunk and result and result.get("guide"):
+ self.console.print(Panel(Markdown(result["guide"]), title=f"[bold {persona_color}]{persona_title}[/bold {persona_color}]", border_style=persona_color))
commands = result.get("commands", [])
if not commands:
@@ -858,8 +983,7 @@ el.replaceWith(d);
return "cancel", None, None
finally:
- state['cancelled'] = True
- self.console.print("[dim]Returning to session...[/dim]")
+ state['cancelled'] = True
Runs the interactive Copilot session. on_ai_call: async function(active_buffer, question) -> result_dict
+async def inject_commands(self, commands, child_fd, on_inject=None)
+@MethodHook
+async def inject_commands(self, commands, child_fd, on_inject=None):
+ """
+ Inject a list of commands into the node's PTY.
+ Handles screen_length_command, history tracking and delays.
+ """
+ if not commands:
+ return
+
+ # 0. Clear line
+ os.write(child_fd, b'\x15')
+ await asyncio.sleep(0.1)
+
+ # 1. Prepare list (prepend screen_length if exists)
+ slc = self.tags.get("screen_length_command") if hasattr(self, 'tags') and isinstance(self.tags, dict) else None
+
+ to_send = list(commands)
+ if slc and slc not in to_send: # avoid duplicates if already there
+ to_send.insert(0, slc)
+
+ # 2. Inject one by one
+ for cmd in to_send:
+ # Register in node's official history (SKIP if it's the administrative screen length command)
+ if cmd != slc and hasattr(self, 'cmd_byte_positions') and self.cmd_byte_positions is not None:
+ log_pos = self.mylog.tell() if hasattr(self, 'mylog') else 0
+ self.cmd_byte_positions.append((log_pos, cmd))
+
+ # Write physically to PTY
+ os.write(child_fd, (cmd + "\n").encode())
+
+ # Notify (e.g., for gRPC or logs) - SKIP for administrative SLC
+ if on_inject and cmd != slc:
+ if asyncio.iscoroutinefunction(on_inject):
+ await on_inject(cmd)
+ else:
+ on_inject(cmd)
+
+ # Delay to avoid overwhelming the router
+ await asyncio.sleep(0.8)
+Inject a list of commands into the node's PTY. +Handles screen_length_command, history tracking and delays.
def interact(self, debug=False, logger=None)
nodeinject_commandsinteractruntestclass AIService(BaseService):
"""Business logic for interacting with AI agents and LLM configurations."""
- def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict) -> list:
+ def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
"""Identifies command blocks in the terminal history."""
blocks = []
- if not (cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes):
+ if not raw_bytes:
return blocks
default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
@@ -72,29 +72,63 @@ el.replaceWith(d);
except Exception:
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
- for i in range(1, len(cmd_byte_positions)):
- pos, known_cmd = cmd_byte_positions[i]
- prev_pos = cmd_byte_positions[i-1][0]
-
- if known_cmd:
- prev_chunk = raw_bytes[prev_pos:pos]
- prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
- prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
- prompt_text = prev_lines[-1].strip() if prev_lines else ""
- preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
- blocks.append((pos, preview[:80]))
- else:
- chunk = raw_bytes[prev_pos:pos]
- cleaned = log_cleaner(chunk.decode(errors='replace'))
- lines = [l for l in cleaned.split('\n') if l.strip()]
- preview = lines[-1].strip() if lines else ""
+ parsed_positions = []
+ if cmd_byte_positions and len(cmd_byte_positions) >= 1:
+ for i in range(1, len(cmd_byte_positions)):
+ pos, known_cmd = cmd_byte_positions[i]
+ prev_pos = cmd_byte_positions[i-1][0]
- if preview:
- match = prompt_re.search(preview)
- if match:
- cmd_text = preview[match.end():].strip()
- if cmd_text:
- blocks.append((pos, preview[:80]))
+ if known_cmd:
+ prev_chunk = raw_bytes[prev_pos:pos]
+ prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
+ prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
+ prompt_text = prev_lines[-1].strip() if prev_lines else ""
+ preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ chunk = raw_bytes[prev_pos:pos]
+ cleaned = log_cleaner(chunk.decode(errors='replace'))
+ lines = [l for l in cleaned.split('\n') if l.strip()]
+ preview = lines[-1].strip() if lines else ""
+
+ if preview:
+ match = prompt_re.search(preview)
+ if match:
+ cmd_text = preview[match.end():].strip()
+ if cmd_text:
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+
+ last_newline = raw_bytes.rfind(b'\n')
+ current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
+ current_end = len(raw_bytes)
+
+ for i, item in enumerate(parsed_positions):
+ if item["type"] == "VALID_CMD":
+ start_pos = item["pos"]
+ preview = item["preview"]
+
+ # Find the end position: next VALID_CMD or EMPTY_PROMPT
+ end_pos = current_prompt_pos
+ for j in range(i + 1, len(parsed_positions)):
+ next_item = parsed_positions[j]
+ if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT"):
+ end_pos = next_item["pos"]
+ break
+
+ blocks.append((start_pos, end_pos, preview))
+
+ # Always ensure there is a final block representing the current prompt
+ if not blocks:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+ elif blocks[-1][0] < current_prompt_pos:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+
return blocks
def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
@@ -317,17 +351,17 @@ el.replaceWith(d);
Ask the AI copilot for terminal assistance.
-def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict) ‑> list
+def build_context_blocks(self,
raw_bytes: bytes,
cmd_byte_positions: list,
node_info: dict,
last_line: str = '') ‑> list
def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict) -> list:
+def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
"""Identifies command blocks in the terminal history."""
blocks = []
- if not (cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes):
+ if not raw_bytes:
return blocks
default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
@@ -338,29 +372,63 @@ el.replaceWith(d);
except Exception:
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
- for i in range(1, len(cmd_byte_positions)):
- pos, known_cmd = cmd_byte_positions[i]
- prev_pos = cmd_byte_positions[i-1][0]
-
- if known_cmd:
- prev_chunk = raw_bytes[prev_pos:pos]
- prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
- prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
- prompt_text = prev_lines[-1].strip() if prev_lines else ""
- preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
- blocks.append((pos, preview[:80]))
- else:
- chunk = raw_bytes[prev_pos:pos]
- cleaned = log_cleaner(chunk.decode(errors='replace'))
- lines = [l for l in cleaned.split('\n') if l.strip()]
- preview = lines[-1].strip() if lines else ""
+ parsed_positions = []
+ if cmd_byte_positions and len(cmd_byte_positions) >= 1:
+ for i in range(1, len(cmd_byte_positions)):
+ pos, known_cmd = cmd_byte_positions[i]
+ prev_pos = cmd_byte_positions[i-1][0]
- if preview:
- match = prompt_re.search(preview)
- if match:
- cmd_text = preview[match.end():].strip()
- if cmd_text:
- blocks.append((pos, preview[:80]))
+ if known_cmd:
+ prev_chunk = raw_bytes[prev_pos:pos]
+ prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
+ prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
+ prompt_text = prev_lines[-1].strip() if prev_lines else ""
+ preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ chunk = raw_bytes[prev_pos:pos]
+ cleaned = log_cleaner(chunk.decode(errors='replace'))
+ lines = [l for l in cleaned.split('\n') if l.strip()]
+ preview = lines[-1].strip() if lines else ""
+
+ if preview:
+ match = prompt_re.search(preview)
+ if match:
+ cmd_text = preview[match.end():].strip()
+ if cmd_text:
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+
+ last_newline = raw_bytes.rfind(b'\n')
+ current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
+ current_end = len(raw_bytes)
+
+ for i, item in enumerate(parsed_positions):
+ if item["type"] == "VALID_CMD":
+ start_pos = item["pos"]
+ preview = item["preview"]
+
+ # Find the end position: next VALID_CMD or EMPTY_PROMPT
+ end_pos = current_prompt_pos
+ for j in range(i + 1, len(parsed_positions)):
+ next_item = parsed_positions[j]
+ if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT"):
+ end_pos = next_item["pos"]
+ break
+
+ blocks.append((start_pos, end_pos, preview))
+
+ # Always ensure there is a final block representing the current prompt
+ if not blocks:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+ elif blocks[-1][0] < current_prompt_pos:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+
return blocks
Identifies command blocks in the terminal history.
class AIService(BaseService):
"""Business logic for interacting with AI agents and LLM configurations."""
- def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict) -> list:
+ def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
"""Identifies command blocks in the terminal history."""
blocks = []
- if not (cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes):
+ if not raw_bytes:
return blocks
default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
@@ -127,29 +127,63 @@ el.replaceWith(d);
except Exception:
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
- for i in range(1, len(cmd_byte_positions)):
- pos, known_cmd = cmd_byte_positions[i]
- prev_pos = cmd_byte_positions[i-1][0]
-
- if known_cmd:
- prev_chunk = raw_bytes[prev_pos:pos]
- prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
- prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
- prompt_text = prev_lines[-1].strip() if prev_lines else ""
- preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
- blocks.append((pos, preview[:80]))
- else:
- chunk = raw_bytes[prev_pos:pos]
- cleaned = log_cleaner(chunk.decode(errors='replace'))
- lines = [l for l in cleaned.split('\n') if l.strip()]
- preview = lines[-1].strip() if lines else ""
+ parsed_positions = []
+ if cmd_byte_positions and len(cmd_byte_positions) >= 1:
+ for i in range(1, len(cmd_byte_positions)):
+ pos, known_cmd = cmd_byte_positions[i]
+ prev_pos = cmd_byte_positions[i-1][0]
- if preview:
- match = prompt_re.search(preview)
- if match:
- cmd_text = preview[match.end():].strip()
- if cmd_text:
- blocks.append((pos, preview[:80]))
+ if known_cmd:
+ prev_chunk = raw_bytes[prev_pos:pos]
+ prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
+ prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
+ prompt_text = prev_lines[-1].strip() if prev_lines else ""
+ preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ chunk = raw_bytes[prev_pos:pos]
+ cleaned = log_cleaner(chunk.decode(errors='replace'))
+ lines = [l for l in cleaned.split('\n') if l.strip()]
+ preview = lines[-1].strip() if lines else ""
+
+ if preview:
+ match = prompt_re.search(preview)
+ if match:
+ cmd_text = preview[match.end():].strip()
+ if cmd_text:
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+
+ last_newline = raw_bytes.rfind(b'\n')
+ current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
+ current_end = len(raw_bytes)
+
+ for i, item in enumerate(parsed_positions):
+ if item["type"] == "VALID_CMD":
+ start_pos = item["pos"]
+ preview = item["preview"]
+
+ # Find the end position: next VALID_CMD or EMPTY_PROMPT
+ end_pos = current_prompt_pos
+ for j in range(i + 1, len(parsed_positions)):
+ next_item = parsed_positions[j]
+ if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT"):
+ end_pos = next_item["pos"]
+ break
+
+ blocks.append((start_pos, end_pos, preview))
+
+ # Always ensure there is a final block representing the current prompt
+ if not blocks:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+ elif blocks[-1][0] < current_prompt_pos:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+
return blocks
def process_copilot_input(self, input_text: str, session_state: dict) -> dict:
@@ -372,17 +406,17 @@ el.replaceWith(d);
Ask the AI copilot for terminal assistance.
-def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict) ‑> list
+def build_context_blocks(self,
raw_bytes: bytes,
cmd_byte_positions: list,
node_info: dict,
last_line: str = '') ‑> list
def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict) -> list:
+def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list:
"""Identifies command blocks in the terminal history."""
blocks = []
- if not (cmd_byte_positions and len(cmd_byte_positions) >= 2 and raw_bytes):
+ if not raw_bytes:
return blocks
default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
@@ -393,29 +427,63 @@ el.replaceWith(d);
except Exception:
prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt))
- for i in range(1, len(cmd_byte_positions)):
- pos, known_cmd = cmd_byte_positions[i]
- prev_pos = cmd_byte_positions[i-1][0]
-
- if known_cmd:
- prev_chunk = raw_bytes[prev_pos:pos]
- prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
- prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
- prompt_text = prev_lines[-1].strip() if prev_lines else ""
- preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
- blocks.append((pos, preview[:80]))
- else:
- chunk = raw_bytes[prev_pos:pos]
- cleaned = log_cleaner(chunk.decode(errors='replace'))
- lines = [l for l in cleaned.split('\n') if l.strip()]
- preview = lines[-1].strip() if lines else ""
+ parsed_positions = []
+ if cmd_byte_positions and len(cmd_byte_positions) >= 1:
+ for i in range(1, len(cmd_byte_positions)):
+ pos, known_cmd = cmd_byte_positions[i]
+ prev_pos = cmd_byte_positions[i-1][0]
- if preview:
- match = prompt_re.search(preview)
- if match:
- cmd_text = preview[match.end():].strip()
- if cmd_text:
- blocks.append((pos, preview[:80]))
+ if known_cmd:
+ prev_chunk = raw_bytes[prev_pos:pos]
+ prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace'))
+ prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()]
+ prompt_text = prev_lines[-1].strip() if prev_lines else ""
+ preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ chunk = raw_bytes[prev_pos:pos]
+ cleaned = log_cleaner(chunk.decode(errors='replace'))
+ lines = [l for l in cleaned.split('\n') if l.strip()]
+ preview = lines[-1].strip() if lines else ""
+
+ if preview:
+ match = prompt_re.search(preview)
+ if match:
+ cmd_text = preview[match.end():].strip()
+ if cmd_text:
+ parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]})
+ else:
+ parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+ else:
+ parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""})
+
+ last_newline = raw_bytes.rfind(b'\n')
+ current_prompt_pos = last_newline + 1 if last_newline != -1 else 0
+ current_end = len(raw_bytes)
+
+ for i, item in enumerate(parsed_positions):
+ if item["type"] == "VALID_CMD":
+ start_pos = item["pos"]
+ preview = item["preview"]
+
+ # Find the end position: next VALID_CMD or EMPTY_PROMPT
+ end_pos = current_prompt_pos
+ for j in range(i + 1, len(parsed_positions)):
+ next_item = parsed_positions[j]
+ if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT"):
+ end_pos = next_item["pos"]
+ break
+
+ blocks.append((start_pos, end_pos, preview))
+
+ # Always ensure there is a final block representing the current prompt
+ if not blocks:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+ elif blocks[-1][0] < current_prompt_pos:
+ blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT"))
+
return blocks
Identifies command blocks in the terminal history.