Compare commits

...

10 Commits

Author SHA1 Message Date
3e34d0f310 publish prod version with capture 2025-08-13 16:07:32 -03:00
4c76405549 add capture feature, change printing mechanics for all app, bug fixes 2025-08-04 11:34:22 -03:00
c3f9f75f70 fix ai 2025-07-11 22:31:42 -03:00
e2e4c9bfe7 minor bug fixes, add new tags, add bulk using file 2025-05-09 17:44:29 -03:00
582459d6d3 bug fix docker 2025-01-30 17:39:56 -03:00
b1188587fc change to openai 4o mini and improvements to ai code for new model 2024-07-21 18:39:46 -03:00
4d8244a10f Add features:
- New protocols: Docker and Kubectl
 - Add contexts to filter the number of nodes
 - Add option to modify the api using plugins
 - Minor bug fixes
2024-07-15 15:38:01 -03:00
a71d8adcb3 bug fix 2024-07-05 17:49:53 -03:00
3c01d76391 add contexts and api plugins 2024-07-02 16:53:07 -03:00
89e828451c add kubectl and docker support 2024-06-17 15:58:28 -03:00
17 changed files with 2161 additions and 1326 deletions

3
.gitignore vendored
View File

@@ -130,3 +130,6 @@ dmypy.json
#clients
*sync_client*
#App
connpy-completion-helper

143
README.md
View File

@@ -9,7 +9,8 @@
[![](https://img.shields.io/pypi/l/connpy.svg?style=flat-square)](https://github.com/fluzzi/connpy/blob/main/LICENSE)
[![](https://img.shields.io/pypi/dm/connpy.svg?style=flat-square)](https://pypi.org/pypi/connpy/)
Connpy is a ssh and telnet connection manager and automation module for Linux, Mac and Docker
Connpy is a SSH, SFTP, Telnet, kubectl, and Docker pod connection manager and automation module for Linux, Mac, and Docker.
## Installation
@@ -43,33 +44,34 @@ Connpy integrates with Google services for backup purposes:
For more detailed information, please read our [Privacy Policy](https://connpy.gederico.dynu.net/fluzzi32/connpy/src/branch/main/PRIVATE_POLICY.md).
### Features
- You can generate profiles and reference them from nodes using @profilename so you dont
need to edit multiple nodes when changing password or other information.
- Nodes can be stored on @folder or @subfolder@folder to organize your devices. Then can
be referenced using node@subfolder@folder or node@folder
- If you have too many nodes. Get completion script using: conn config --completion.
Or use fzf installing pyfzf and running conn config --fzf true
- Create in bulk, copy, move, export and import nodes for easy management.
- Run automation scripts in network devices.
- use GPT AI to help you manage your devices.
- Manage connections using SSH, SFTP, Telnet, kubectl, and Docker exec.
- Set contexts to manage specific nodes from specific contexts (work/home/clients/etc).
- You can generate profiles and reference them from nodes using @profilename so you don't
need to edit multiple nodes when changing passwords or other information.
- Nodes can be stored on @folder or @subfolder@folder to organize your devices. They can
be referenced using node@subfolder@folder or node@folder.
- If you have too many nodes, get a completion script using: conn config --completion.
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- Use GPT AI to help you manage your devices.
- Add plugins with your own scripts.
- Much more!
### Usage:
```
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]
conn {profile,move,mv,copy,cp,list,ls,bulk,export,import,ai,run,api,plugin,config} ...
conn {profile,move,mv,copy,cp,list,ls,bulk,export,import,ai,run,api,plugin,config,sync,context} ...
positional arguments:
node|folder node[@subfolder][@folder]
Connect to specific node or show all matching nodes
[@subfolder][@folder]
Show all available connections globaly or in specified path
```
Show all available connections globally or in specified path
### Options:
```
options:
-h, --help show this help message and exit
-v, --version Show version
-a, --add Add new node[@subfolder][@folder] or [@subfolder]@folder
@@ -78,10 +80,8 @@ positional arguments:
-s, --show Show node[@subfolder][@folder]
-d, --debug Display all conections steps
-t, --sftp Connects using sftp instead of ssh
```
### Commands:
```
Commands:
profile Manage profiles
move(mv) Move node
copy(cp) Copy node
@@ -95,6 +95,7 @@ positional arguments:
plugin Manage plugins
config Manage app config
sync Sync config with Google
context Manage contexts with regex matching
```
### Manage profiles:
@@ -115,14 +116,26 @@ options:
### Examples:
```
#Add new profile
conn profile --add office-user
#Add new folder
conn --add @office
#Add new subfolder
conn --add @datacenter@office
#Add node to subfolder
conn --add server@datacenter@office
#Add node to folder
conn --add pc@office
#Show node information
conn --show server@datacenter@office
#Connect to nodes
conn pc@office
conn server
#Create and set new context
conn context -a office .*@office
conn context --set office
#Run a command in a node
conn run server ls -la
```
## Plugin Requirements for Connpy
@@ -141,9 +154,8 @@ options:
- **Purpose**: Handles parsing of command-line arguments.
- **Requirements**:
- Must contain only one method: `__init__`.
- The `__init__` method must initialize at least two attributes:
- The `__init__` method must initialize at least one attribute:
- `self.parser`: An instance of `argparse.ArgumentParser`.
- `self.description`: A string containing the description of the parser.
2. **Class `Entrypoint`**:
- **Purpose**: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- **Requirements**:
@@ -240,6 +252,97 @@ There are 2 methods that allows you to define custom logic to be executed before
- `if __name__ == "__main__":`
- This block allows the plugin to be run as a standalone script for testing or independent use.
### Command Completion Support
Plugins can provide intelligent **tab completion** by defining a function called `_connpy_completion` in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.
#### Function Signature
```
def _connpy_completion(wordsnumber, words, info=None):
...
```
#### Parameters
| Parameter | Description |
|----------------|-------------|
| `wordsnumber` | Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., `connpy <plugin> ...`). |
| `words` | A list of tokens (words) already typed. `words[0]` is always the name of the plugin, followed by any subcommands or arguments. |
| `info` | A dictionary of structured context data provided by Connpy to help with suggestions. |
#### Contents of `info`
The `info` dictionary contains helpful context to generate completions:
```
info = {
"config": config_dict, # The full loaded configuration
"nodes": node_list, # List of all known node names
"folders": folder_list, # List of all defined folder names
"profiles": profile_list, # List of all profile names
"plugins": plugin_list # List of all plugin names
}
```
You can use this data to generate suggestions based on the current input.
#### Return Value
The function must return a list of suggestion strings to be presented to the user.
#### Example
```
def _connpy_completion(wordsnumber, words, info=None):
if wordsnumber == 3:
return ["--help", "--verbose", "start", "stop"]
elif wordsnumber == 4 and words[2] == "start":
return info["nodes"] # Suggest node names
return []
```
> In this example, if the user types `connpy myplugin start ` and presses Tab, it will suggest node names.
### Handling Unknown Arguments
Plugins can choose to accept and process unknown arguments that are **not explicitly defined** in the parser. To enable this behavior, the plugin must define the following hidden argument in its `Parser` class:
```
self.parser.add_argument(
"--unknown-args",
action="store_true",
default=True,
help=argparse.SUPPRESS
)
```
#### Behavior:
- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
- These unknown arguments will be passed to the plugin as `args.unknown_args` inside the `Entrypoint`.
- If the user does not pass any unknown arguments, `args.unknown_args` will contain the default value (`True`, unless overridden).
#### Example:
If a plugin accepts unknown tcpdump flags like this:
```
connpy myplugin -nn -s0
```
And defines the hidden `--unknown-args` flag as shown above, then:
- `args.unknown_args` inside `Entrypoint.__init__()` will be: `['-nn', '-s0']`
> This allows the plugin to receive and process arguments intended for external tools (e.g., `tcpdump`) without argparse raising an error.
#### Note:
If a plugin does **not** define `--unknown-args`, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
### Script Verification
- The `verify_script` method in `plugins.py` is used to check the plugin script's compliance with these standards.
- Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.

View File

@@ -2,32 +2,35 @@
'''
## Connection manager
Connpy is a connection manager that allows you to store nodes to connect them fast and password free.
Connpy is a SSH, SFTP, Telnet, kubectl, and Docker pod connection manager and automation module for Linux, Mac, and Docker.
### Features
- You can generate profiles and reference them from nodes using @profilename so you dont
need to edit multiple nodes when changing password or other information.
- Nodes can be stored on @folder or @subfolder@folder to organize your devices. Then can
be referenced using node@subfolder@folder or node@folder
- If you have too many nodes. Get completion script using: conn config --completion.
Or use fzf installing pyfzf and running conn config --fzf true
- Create in bulk, copy, move, export and import nodes for easy management.
- Run automation scripts in network devices.
- use GPT AI to help you manage your devices.
- Manage connections using SSH, SFTP, Telnet, kubectl, and Docker exec.
- Set contexts to manage specific nodes from specific contexts (work/home/clients/etc).
- You can generate profiles and reference them from nodes using @profilename so you don't
need to edit multiple nodes when changing passwords or other information.
- Nodes can be stored on @folder or @subfolder@folder to organize your devices. They can
be referenced using node@subfolder@folder or node@folder.
- If you have too many nodes, get a completion script using: conn config --completion.
Or use fzf by installing pyfzf and running conn config --fzf true.
- Create in bulk, copy, move, export, and import nodes for easy management.
- Run automation scripts on network devices.
- Use GPT AI to help you manage your devices.
- Add plugins with your own scripts.
- Much more!
### Usage
```
usage: conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]
conn {profile,move,mv,copy,cp,list,ls,bulk,export,import,ai,run,api,plugin,config} ...
conn {profile,move,mv,copy,cp,list,ls,bulk,export,import,ai,run,api,plugin,config,sync,context} ...
positional arguments:
node|folder node[@subfolder][@folder]
Connect to specific node or show all matching nodes
[@subfolder][@folder]
Show all available connections globaly or in specified path
Options:
Show all available connections globally or in specified path
options:
-h, --help show this help message and exit
-v, --version Show version
-a, --add Add new node[@subfolder][@folder] or [@subfolder]@folder
@@ -51,6 +54,7 @@ Commands:
plugin Manage plugins
config Manage app config
sync Sync config with Google
context Manage contexts with regex matching
```
### Manage profiles
@@ -71,14 +75,26 @@ options:
### Examples
```
#Add new profile
conn profile --add office-user
#Add new folder
conn --add @office
#Add new subfolder
conn --add @datacenter@office
#Add node to subfolder
conn --add server@datacenter@office
#Add node to folder
conn --add pc@office
#Show node information
conn --show server@datacenter@office
#Connect to nodes
conn pc@office
conn server
#Create and set new context
conn context -a office .*@office
conn context --set office
#Run a command in a node
conn run server ls -la
```
## Plugin Requirements for Connpy
### General Structure
@@ -96,9 +112,8 @@ options:
- **Purpose**: Handles parsing of command-line arguments.
- **Requirements**:
- Must contain only one method: `__init__`.
- The `__init__` method must initialize at least two attributes:
- The `__init__` method must initialize at least one attribute:
- `self.parser`: An instance of `argparse.ArgumentParser`.
- `self.description`: A string containing the description of the parser.
2. **Class `Entrypoint`**:
- **Purpose**: Acts as the entry point for plugin execution, utilizing parsed arguments and integrating with the main application.
- **Requirements**:
@@ -194,6 +209,97 @@ There are 2 methods that allows you to define custom logic to be executed before
- `if __name__ == "__main__":`
- This block allows the plugin to be run as a standalone script for testing or independent use.
### Command Completion Support
Plugins can provide intelligent **tab completion** by defining a function called `_connpy_completion` in the plugin script. This function will be called by Connpy to assist with command-line completion when the user types partial input.
#### Function Signature
```
def _connpy_completion(wordsnumber, words, info=None):
...
```
#### Parameters
| Parameter | Description |
|----------------|-------------|
| `wordsnumber` | Integer indicating the number of words (space-separated tokens) currently on the command line. For plugins, this typically starts at 3 (e.g., `connpy <plugin> ...`). |
| `words` | A list of tokens (words) already typed. `words[0]` is always the name of the plugin, followed by any subcommands or arguments. |
| `info` | A dictionary of structured context data provided by Connpy to help with suggestions. |
#### Contents of `info`
The `info` dictionary contains helpful context to generate completions:
```
info = {
"config": config_dict, # The full loaded configuration
"nodes": node_list, # List of all known node names
"folders": folder_list, # List of all defined folder names
"profiles": profile_list, # List of all profile names
"plugins": plugin_list # List of all plugin names
}
```
You can use this data to generate suggestions based on the current input.
#### Return Value
The function must return a list of suggestion strings to be presented to the user.
#### Example
```
def _connpy_completion(wordsnumber, words, info=None):
if wordsnumber == 3:
return ["--help", "--verbose", "start", "stop"]
elif wordsnumber == 4 and words[2] == "start":
return info["nodes"] # Suggest node names
return []
```
> In this example, if the user types `connpy myplugin start ` and presses Tab, it will suggest node names.
### Handling Unknown Arguments
Plugins can choose to accept and process unknown arguments that are **not explicitly defined** in the parser. To enable this behavior, the plugin must define the following hidden argument in its `Parser` class:
```
self.parser.add_argument(
"--unknown-args",
action="store_true",
default=True,
help=argparse.SUPPRESS
)
```
#### Behavior:
- When this argument is present, Connpy will parse the known arguments and capture any extra (unknown) ones.
- These unknown arguments will be passed to the plugin as `args.unknown_args` inside the `Entrypoint`.
- If the user does not pass any unknown arguments, `args.unknown_args` will contain the default value (`True`, unless overridden).
#### Example:
If a plugin accepts unknown tcpdump flags like this:
```
connpy myplugin -nn -s0
```
And defines the hidden `--unknown-args` flag as shown above, then:
- `args.unknown_args` inside `Entrypoint.__init__()` will be: `['-nn', '-s0']`
> This allows the plugin to receive and process arguments intended for external tools (e.g., `tcpdump`) without argparse raising an error.
#### Note:
If a plugin does **not** define `--unknown-args`, any extra arguments passed will cause argparse to fail with an unrecognized arguments error.
### Script Verification
- The `verify_script` method in `plugins.py` is used to check the plugin script's compliance with these standards.
- Non-compliant scripts will be rejected to ensure consistency and proper functionality within the plugin system.
@@ -406,8 +512,9 @@ from .ai import ai
from .plugins import Plugins
from ._version import __version__
from pkg_resources import get_distribution
from . import printer
__all__ = ["node", "nodes", "configfile", "connapp", "ai", "Plugins"]
__all__ = ["node", "nodes", "configfile", "connapp", "ai", "Plugins", "printer"]
__author__ = "Federico Luzzi"
__pdoc__ = {
'core': False,
@@ -422,5 +529,6 @@ __pdoc__ = {
'node.deferred_class_hooks': False,
'nodes.deferred_class_hooks': False,
'connapp': False,
'connapp.encrypt': True
'connapp.encrypt': True,
'printer': False
}

View File

@@ -1,2 +1,2 @@
__version__ = "4.0.3"
__version__ = "4.2"

View File

@@ -1,4 +1,4 @@
import openai
from openai import OpenAI
import time
import json
import re
@@ -14,7 +14,7 @@ class ai:
### Attributes:
- model (str): Model of GPT api to use. Default is gpt-3.5-turbo.
- model (str): Model of GPT api to use. Default is gpt-4o-mini.
- temp (float): Value between 0 and 1 that control the randomness
of generated text, with higher values increasing
@@ -22,7 +22,7 @@ class ai:
'''
def __init__(self, config, org = None, api_key = None, model = None, temp = 0.7):
def __init__(self, config, org = None, api_key = None, model = None):
'''
### Parameters:
@@ -39,7 +39,7 @@ class ai:
- api_key (str): A unique authentication token required to access
and interact with the API.
- model (str): Model of GPT api to use. Default is gpt-3.5-turbo.
- model (str): Model of GPT api to use. Default is gpt-4o-mini.
- temp (float): Value between 0 and 1 that control the randomness
of generated text, with higher values increasing
@@ -48,28 +48,24 @@ class ai:
'''
self.config = config
if org:
openai.organization = org
else:
try:
openai.organization = self.config.config["openai"]["organization"]
except:
raise ValueError("Missing openai organization")
if api_key:
openai.api_key = api_key
else:
try:
openai.api_key = self.config.config["openai"]["api_key"]
except:
final_api_key = api_key if api_key else self.config.config["openai"]["api_key"]
except Exception:
raise ValueError("Missing openai api_key")
try:
final_org = org if org else self.config.config["openai"]["organization"]
except Exception:
raise ValueError("Missing openai organization")
self.client = OpenAI(api_key=final_api_key, organization=final_org)
if model:
self.model = model
else:
try:
self.model = self.config.config["openai"]["model"]
except:
self.model = "gpt-3.5-turbo"
self.temp = temp
self.model = "gpt-5-nano"
self.__prompt = {}
self.__prompt["original_system"] = """
You are the AI chatbot and assistant of a network connection manager and automation app called connpy. When provided with user input analyze the input and extract the following information. If user wants to chat just reply and don't call a function:
@@ -128,7 +124,7 @@ Categorize the user's request based on the operation they want to perform on the
self.__prompt["original_function"]["parameters"]["required"] = ["type", "filter"]
self.__prompt["command_system"] = """
For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server).
The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting.
The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required.
If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
Note: Preserving the integrity of user-provided commands is of utmost importance. If a user has provided a specific command to run, include that command exactly as it was given, even if it's not recognized or understood. Under no circumstances should you modify or alter user-provided commands.
"""
@@ -143,7 +139,7 @@ Categorize the user's request based on the operation they want to perform on the
self.__prompt["command_function"]["name"] = "get_commands"
self.__prompt["command_function"]["descriptions"] = """
For each OS listed below, provide the command(s) needed to perform the specified action, depending on the device OS (e.g., Cisco IOSXR router, Linux server).
The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting.
The application knows how to connect to devices via SSH, so you only need to provide the command(s) to run after connecting. This includes access configuration mode and commiting if required.
If the commands needed are not for the specific OS type, just send an empty list (e.g., []).
"""
self.__prompt["command_function"]["parameters"] = {}
@@ -196,7 +192,7 @@ Categorize the user's request based on the operation they want to perform on the
@MethodHook
def _clean_command_response(self, raw_response, node_list):
#Parse response for command request to openAI GPT.
# Parse response for command request to openAI GPT.
info_dict = {}
info_dict["commands"] = []
info_dict["variables"] = {}
@@ -204,14 +200,24 @@ Categorize the user's request based on the operation they want to perform on the
for key, value in node_list.items():
newvalue = {}
commands = raw_response[value]
for i,e in enumerate(commands, start=1):
newvalue[f"command{i}"] = e
# Ensure commands is a list
if isinstance(commands, str):
commands = [commands]
# Determine the number of digits required for zero-padding
num_commands = len(commands)
num_digits = len(str(num_commands))
for i, e in enumerate(commands, start=1):
# Zero-pad the command number
command_num = f"command{str(i).zfill(num_digits)}"
newvalue[command_num] = e
if f"{{command{i}}}" not in info_dict["commands"]:
info_dict["commands"].append(f"{{command{i}}}")
info_dict["variables"]["__global__"][f"command{i}"] = ""
info_dict["commands"].append(f"{{{command_num}}}")
info_dict["variables"]["__global__"][command_num] = ""
info_dict["variables"][key] = newvalue
return info_dict
@MethodHook
def _get_commands(self, user_input, nodes):
#Send the request for commands for each device to openAI GPT.
@@ -239,17 +245,22 @@ Categorize the user's request based on the operation they want to perform on the
message.append({"role": "assistant", "content": None, "function_call": self.__prompt["command_assistant"]})
message.append({"role": "user", "content": command_input})
functions = [command_function]
response = openai.ChatCompletion.create(
response = self.client.chat.completions.create(
model=self.model,
messages=message,
functions=functions,
function_call={"name": "get_commands"},
temperature=self.temp
)
output = {}
result = response["choices"][0]["message"].to_dict()
json_result = json.loads(result["function_call"]["arguments"])
msg = response.choices[0].message # Es un objeto ChatCompletionMessage
# Puede que function_call sea None. Verificá primero.
if msg.function_call and msg.function_call.arguments:
json_result = json.loads(msg.function_call.arguments)
output["response"] = self._clean_command_response(json_result, node_list)
else:
# Manejo de error o fallback, según tu lógica
output["response"] = None
return output
@MethodHook
@@ -264,32 +275,45 @@ Categorize the user's request based on the operation they want to perform on the
chat_history = []
chat_history.append({"role": "user", "content": user_input})
message.extend(chat_history)
response = openai.ChatCompletion.create(
response = self.client.chat.completions.create(
model=self.model,
messages=message,
functions=functions,
function_call="auto",
temperature=self.temp,
top_p=1
)
def extract_quoted_strings(text):
pattern = r'["\'](.*?)["\']'
matches = re.findall(pattern, text)
return matches
expected = extract_quoted_strings(user_input)
output = {}
result = response["choices"][0]["message"].to_dict()
if result["content"]:
msg = response.choices[0].message # Objeto ChatCompletionMessage
if msg.content: # Si hay texto libre del modelo (caso "no app-related")
output["app_related"] = False
chat_history.append({"role": "assistant", "content": result["content"]})
output["response"] = result["content"]
chat_history.append({"role": "assistant", "content": msg.content})
output["response"] = msg.content
else:
json_result = json.loads(result["function_call"]["arguments"])
# Si hay function_call, es app-related
if msg.function_call and msg.function_call.arguments:
json_result = json.loads(msg.function_call.arguments)
output["app_related"] = True
output["filter"] = json_result["filter"]
output["type"] = json_result["type"]
chat_history.append({"role": "assistant", "content": result["content"], "function_call": {"name": result["function_call"]["name"], "arguments": json.dumps(json_result)}})
chat_history.append({
"role": "assistant",
"content": msg.content,
"function_call": {
"name": msg.function_call.name,
"arguments": json.dumps(json_result)
}
})
else:
# Fallback defensivo si no hay nada
output["app_related"] = False
output["response"] = None
output["expected"] = expected
output["chat_history"] = chat_history
return output
@@ -300,23 +324,27 @@ Categorize the user's request based on the operation they want to perform on the
message = []
message.append({"role": "user", "content": user_input})
functions = [self.__prompt["confirmation_function"]]
response = openai.ChatCompletion.create(
response = self.client.chat.completions.create(
model=self.model,
messages=message,
functions=functions,
function_call={"name": "get_confirmation"},
temperature=self.temp,
top_p=1
)
result = response["choices"][0]["message"].to_dict()
json_result = json.loads(result["function_call"]["arguments"])
msg = response.choices[0].message # Es un objeto ChatCompletionMessage
output = {}
if msg.function_call and msg.function_call.arguments:
json_result = json.loads(msg.function_call.arguments)
if json_result["result"] == "true":
output["result"] = True
elif json_result["result"] == "false":
output["result"] = False
elif json_result["result"] == "none":
output["result"] = json_result["response"]
output["result"] = json_result.get("response") # .get para evitar KeyError si falta
else:
output["result"] = None # O el valor que tenga sentido para tu caso
return output
@MethodHook

View File

@@ -1,11 +1,13 @@
from flask import Flask, request, jsonify
from connpy import configfile, node, nodes, hooks
from flask_cors import CORS
from connpy import configfile, node, nodes, hooks, printer
from connpy.ai import ai as myai
from waitress import serve
import os
import signal
app = Flask(__name__)
CORS(app)
conf = configfile()
PID_FILE1 = "/run/connpy.pid"
@@ -141,7 +143,7 @@ def stop_api():
port = int(f.readline().strip())
PID_FILE=PID_FILE2
except:
print("Connpy api server is not running.")
printer.warning("Connpy API server is not running.")
return
# Send a SIGTERM signal to the process
try:
@@ -150,7 +152,7 @@ def stop_api():
pass
# Delete the PID file
os.remove(PID_FILE)
print(f"Server with process ID {pid} stopped.")
printer.info(f"Server with process ID {pid} stopped.")
return port
@hooks.MethodHook
@@ -166,7 +168,7 @@ def start_server(port=8048):
@hooks.MethodHook
def start_api(port=8048):
if os.path.exists(PID_FILE1) or os.path.exists(PID_FILE2):
print("Connpy server is already running.")
printer.warning("Connpy server is already running.")
return
pid = os.fork()
if pid == 0:
@@ -180,7 +182,7 @@ def start_api(port=8048):
with open(PID_FILE2, "w") as f:
f.write(str(pid) + "\n" + str(port))
except:
print("Cound't create PID file")
return
print(f'Server is running with process ID {pid} in port {port}')
printer.error("Couldn't create PID file.")
exit(1)
printer.start(f"Server is running with process ID {pid} on port {port}")

View File

@@ -8,7 +8,8 @@ import sys
import inquirer
from .core import node,nodes
from ._version import __version__
from .api import start_api,stop_api,debug_api
from . import printer
from .api import start_api,stop_api,debug_api,app
from .ai import ai
from .plugins import Plugins
import yaml
@@ -17,8 +18,13 @@ class NoAliasDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True
import ast
from rich import print as mdprint
from rich.markdown import Markdown
from rich.console import Console, Group
from rich.panel import Panel
from rich.text import Text
from rich.rule import Rule
from rich.style import Style
mdprint = Console().print
try:
from pyfzf.pyfzf import FzfPrompt
except:
@@ -42,6 +48,7 @@ class connapp:
the config file.
'''
self.app = app
self.node = node
self.nodes = nodes
self.start_api = start_api
@@ -69,7 +76,7 @@ class connapp:
'''
#DEFAULTPARSER
defaultparser = argparse.ArgumentParser(prog = "conn", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
defaultparser = argparse.ArgumentParser(prog = "connpy", description = "SSH and Telnet connection manager", formatter_class=argparse.RawTextHelpFormatter)
subparsers = defaultparser.add_subparsers(title="Commands", dest="subcommand")
#NODEPARSER
nodeparser = subparsers.add_parser("node", formatter_class=argparse.RawTextHelpFormatter)
@@ -109,6 +116,7 @@ class connapp:
#BULKPARSER
bulkparser = subparsers.add_parser("bulk", description="Add nodes in bulk")
bulkparser.add_argument("bulk", const="bulk", nargs=0, action=self._store_type, help="Add nodes in bulk")
bulkparser.add_argument("-f", "--file", nargs=1, help="Import nodes from a file. First line nodes, second line hosts")
bulkparser.set_defaults(func=self._func_others)
# EXPORTPARSER
exportparser = subparsers.add_parser("export", description="Export connection folder to Yaml file")
@@ -188,6 +196,10 @@ class connapp:
argv[0] = "profile"
if len(argv) < 1 or argv[0] not in self.commands:
argv.insert(0,"node")
args, unknown_args = defaultparser.parse_known_args(argv)
if hasattr(args, "unknown_args"):
args.unknown_args = unknown_args
else:
args = defaultparser.parse_args(argv)
if args.subcommand in self.plugins.plugins:
self.plugins.plugins[args.subcommand].Entrypoint(args, self.plugins.plugin_parsers[args.subcommand].parser, self)
@@ -209,14 +221,14 @@ class connapp:
return actions.get(args.action)(args)
def _version(self, args):
print(__version__)
printer.info(f"Connpy {__version__}")
def _connect(self, args):
if args.data == None:
matches = self.nodes_list
if len(matches) == 0:
print("There are no nodes created")
print("try: conn --help")
printer.warning("There are no nodes created")
printer.info("try: connpy --help")
exit(9)
else:
if args.data.startswith("@"):
@@ -224,7 +236,7 @@ class connapp:
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
@@ -241,16 +253,16 @@ class connapp:
def _del(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
matches = list(filter(lambda k: k == args.data, self.folders))
else:
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
print("Removing: {}".format(matches))
printer.info("Removing: {}".format(matches))
question = [inquirer.Confirm("delete", message="Are you sure you want to continue?")]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -265,14 +277,14 @@ class connapp:
self.config._connections_del(**nodeuniques)
self.config._saveconfig(self.config.file)
if len(matches) == 1:
print("{} deleted succesfully".format(matches[0]))
printer.success("{} deleted successfully".format(matches[0]))
else:
print(f"{len(matches)} nodes deleted succesfully")
printer.success(f"{len(matches)} nodes deleted successfully")
def _add(self, args):
args.data = self._type_node(args.data)
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
elif args.data.startswith("@"):
type = "folder"
@@ -283,83 +295,78 @@ class connapp:
matches = list(filter(lambda k: k == args.data, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + args.data, self.folders))
if len(matches) > 0:
print("{} already exist".format(matches[0]))
printer.error("{} already exist".format(matches[0]))
exit(4)
if len(reversematches) > 0:
print("{} already exist".format(reversematches[0]))
printer.error("{} already exist".format(reversematches[0]))
exit(4)
else:
if type == "folder":
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid folder {}".format(args.data))
printer.error("Invalid folder {}".format(args.data))
exit(5)
if "subfolder" in uniques.keys():
parent = "@" + uniques["folder"]
if parent not in self.folders:
print("Folder {} not found".format(uniques["folder"]))
printer.error("Folder {} not found".format(uniques["folder"]))
exit(2)
self.config._folder_add(**uniques)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
printer.success("{} added successfully".format(args.data))
if type == "node":
nodefolder = args.data.partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print(nodefolder + " not found")
printer.error(nodefolder + " not found")
exit(2)
uniques = self.config._explode_unique(args.data)
if uniques == False:
print("Invalid node {}".format(args.data))
printer.error("Invalid node {}".format(args.data))
exit(5)
print("You can use the configured setting in a profile using @profilename.")
print("You can also leave empty any value except hostname/IP.")
print("You can pass 1 or more passwords using comma separated @profiles")
print("You can use this variables on logging file name: ${id} ${unique} ${host} ${port} ${user} ${protocol}")
print("Some useful tags to set for automation are 'os', 'screen_length_command', and 'prompt'.")
self._print_instructions()
newnode = self._questions_nodes(args.data, uniques)
if newnode == False:
exit(7)
self.config._connections_add(**newnode)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data))
printer.success("{} added successfully".format(args.data))
def _show(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
matches = list(filter(lambda k: k == args.data, self.nodes_list))
if args.data.startswith("@"):
matches = list(filter(lambda k: args.data in k, self.nodes_list))
else:
matches = list(filter(lambda k: k.startswith(args.data), self.nodes_list))
if len(matches) == 0:
print("{} not found".format(args.data))
printer.error("{} not found".format(args.data))
exit(2)
elif len(matches) > 1:
matches[0] = self._choose(matches,"node", "connect")
if matches[0] == None:
exit(7)
node = self.config.getitem(matches[0])
for k, v in node.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + d)
yaml_output = yaml.dump(node, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
def _mod(self, args):
if args.data == None:
print("Missing argument node")
printer.error("Missing argument node")
exit(3)
matches = self.config._getallnodes(args.data)
if len(matches) == 0:
print("No connection found with filter: {}".format(args.data))
printer.error("No connection found with filter: {}".format(args.data))
exit(2)
elif len(matches) == 1:
uniques = self.config._explode_unique(args.data)
uniques = self.config._explode_unique(matches[0])
unique = matches[0]
else:
uniques = {"id": None, "folder": None}
unique = None
print("Editing: {}".format(matches))
printer.info("Editing: {}".format(matches))
node = {}
for i in matches:
node[i] = self.config.getitem(i)
@@ -373,12 +380,12 @@ class connapp:
uniques.update(node[matches[0]])
uniques["type"] = "connection"
if sorted(updatenode.items()) == sorted(uniques.items()):
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatenode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data))
printer.success("{} edited successfully".format(args.data))
else:
for k in node:
updatednode = self.config._explode_unique(k)
@@ -390,12 +397,12 @@ class connapp:
editcount += 1
updatednode[key] = updatenode[key]
if not editcount:
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._connections_add(**updatednode)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(matches))
printer.success("{} edited successfully".format(matches))
return
@@ -409,57 +416,48 @@ class connapp:
def _profile_del(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
if matches[0] == "default":
print("Can't delete default profile")
printer.error("Can't delete default profile")
exit(6)
usedprofile = self.config._profileused(matches[0])
if len(usedprofile) > 0:
print("Profile {} used in the following nodes:".format(matches[0]))
print(", ".join(usedprofile))
printer.error(f"Profile {matches[0]} used in the following nodes:\n{', '.join(usedprofile)}")
exit(8)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {}?".format(matches[0]))]
confirm = inquirer.prompt(question)
if confirm["delete"]:
self.config._profiles_del(id = matches[0])
self.config._saveconfig(self.config.file)
print("{} deleted succesfully".format(matches[0]))
printer.success("{} deleted successfully".format(matches[0]))
def _profile_show(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
for k, v in profile.items():
if isinstance(v, str):
print(k + ": " + v)
elif isinstance(v, list):
print(k + ":")
for i in v:
print(" - " + i)
elif isinstance(v, dict):
print(k + ":")
for i,d in v.items():
print(" - " + i + ": " + d)
yaml_output = yaml.dump(profile, sort_keys=False, default_flow_style=False)
printer.custom(matches[0],"")
print(yaml_output)
def _profile_add(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) > 0:
print("Profile {} Already exist".format(matches[0]))
printer.error("Profile {} Already exist".format(matches[0]))
exit(4)
newprofile = self._questions_profiles(args.data[0])
if newprofile == False:
exit(7)
self.config._profiles_add(**newprofile)
self.config._saveconfig(self.config.file)
print("{} added succesfully".format(args.data[0]))
printer.success("{} added successfully".format(args.data[0]))
def _profile_mod(self, args):
matches = list(filter(lambda k: k == args.data[0], self.profiles))
if len(matches) == 0:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
profile = self.config.profiles[matches[0]]
oldprofile = {"id": matches[0]}
@@ -471,12 +469,12 @@ class connapp:
if not updateprofile:
exit(7)
if sorted(updateprofile.items()) == sorted(oldprofile.items()):
print("Nothing to do here")
printer.info("Nothing to do here")
return
else:
self.config._profiles_add(**updateprofile)
self.config._saveconfig(self.config.file)
print("{} edited succesfully".format(args.data[0]))
printer.success("{} edited successfully".format(args.data[0]))
def _func_others(self, args):
#Function called when using other commands
@@ -511,7 +509,9 @@ class connapp:
formated[upper_key] = upper_value
newitems.append(args.format[0].format(**formated))
items = newitems
print(*items, sep="\n")
yaml_output = yaml.dump(items, sort_keys=False, default_flow_style=False)
printer.custom(args.data,"")
print(yaml_output)
def _mvcp(self, args):
if not self.case:
@@ -520,20 +520,20 @@ class connapp:
source = list(filter(lambda k: k == args.data[0], self.nodes_list))
dest = list(filter(lambda k: k == args.data[1], self.nodes_list))
if len(source) != 1:
print("{} not found".format(args.data[0]))
printer.error("{} not found".format(args.data[0]))
exit(2)
if len(dest) > 0:
print("Node {} Already exist".format(args.data[1]))
printer.error("Node {} Already exist".format(args.data[1]))
exit(4)
nodefolder = args.data[1].partition("@")
nodefolder = "@" + nodefolder[2]
if nodefolder not in self.folders and nodefolder != "@":
print("{} not found".format(nodefolder))
printer.error("{} not found".format(nodefolder))
exit(2)
olduniques = self.config._explode_unique(args.data[0])
newuniques = self.config._explode_unique(args.data[1])
if newuniques == False:
print("Invalid node {}".format(args.data[1]))
printer.error("Invalid node {}".format(args.data[1]))
exit(5)
node = self.config.getitem(source[0])
newnode = {**newuniques, **node}
@@ -542,9 +542,23 @@ class connapp:
self.config._connections_del(**olduniques)
self.config._saveconfig(self.config.file)
action = "moved" if args.command == "move" else "copied"
print("{} {} succesfully to {}".format(args.data[0],action, args.data[1]))
printer.success("{} {} successfully to {}".format(args.data[0],action, args.data[1]))
def _bulk(self, args):
if args.file and os.path.isfile(args.file[0]):
with open(args.file[0], 'r') as f:
lines = f.readlines()
# Expecting exactly 2 lines
if len(lines) < 2:
printer.error("The file must contain at least two lines: one for nodes, one for hosts.")
exit(11)
nodes = lines[0].strip()
hosts = lines[1].strip()
newnodes = self._questions_bulk(nodes, hosts)
else:
newnodes = self._questions_bulk()
if newnodes == False:
exit(7)
@@ -559,10 +573,10 @@ class connapp:
matches = list(filter(lambda k: k == unique, self.nodes_list))
reversematches = list(filter(lambda k: k == "@" + unique, self.folders))
if len(matches) > 0:
print("Node {} already exist, ignoring it".format(unique))
printer.info("Node {} already exist, ignoring it".format(unique))
continue
if len(reversematches) > 0:
print("Folder with name {} already exist, ignoring it".format(unique))
printer.info("Folder with name {} already exist, ignoring it".format(unique))
continue
newnode = {"id": n}
if newnodes["location"] != "":
@@ -586,9 +600,9 @@ class connapp:
self.nodes_list = self.config._getallnodes()
if count > 0:
self.config._saveconfig(self.config.file)
print("Succesfully added {} nodes".format(count))
printer.success("Successfully added {} nodes".format(count))
else:
print("0 nodes added")
printer.info("0 nodes added")
def _completion(self, args):
if args.data[0] == "bash":
@@ -623,7 +637,7 @@ class connapp:
folder = os.path.abspath(args.data[0]).rstrip('/')
with open(pathfile, "w") as f:
f.write(str(folder))
print("Config saved")
printer.success("Config saved")
def _openai(self, args):
if "openai" in self.config.config:
@@ -637,37 +651,37 @@ class connapp:
def _change_settings(self, name, value):
self.config.config[name] = value
self.config._saveconfig(self.config.file)
print("Config saved")
printer.success("Config saved")
def _func_plugin(self, args):
if args.add:
if not os.path.exists(args.add[1]):
print("File {} dosn't exists.".format(args.add[1]))
printer.error("File {} dosn't exists.".format(args.add[1]))
exit(14)
if args.add[0].isalpha() and args.add[0].islower() and len(args.add[0]) <= 15:
disabled_dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py.bkp")
if args.add[0] in self.commands or os.path.exists(disabled_dest_file):
print("Plugin name can't be the same as other commands.")
printer.error("Plugin name can't be the same as other commands.")
exit(15)
else:
check_bad_script = self.plugins.verify_script(args.add[1])
if check_bad_script:
print(check_bad_script)
printer.error(check_bad_script)
exit(16)
else:
try:
dest_file = os.path.join(self.config.defaultdir + "/plugins", args.add[0] + ".py")
shutil.copy2(args.add[1], dest_file)
print(f"Plugin {args.add[0]} added succesfully.")
printer.success(f"Plugin {args.add[0]} added successfully.")
except Exception as e:
print(f"Failed importing plugin file. {e}")
printer.error(f"Failed importing plugin file. {e}")
exit(17)
else:
print("Plugin name should be lowercase letters up to 15 characters.")
printer.error("Plugin name should be lowercase letters up to 15 characters.")
exit(15)
elif args.update:
if not os.path.exists(args.update[1]):
print("File {} dosn't exists.".format(args.update[1]))
printer.error("File {} dosn't exists.".format(args.update[1]))
exit(14)
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.update[0] + ".py.bkp")
@@ -676,7 +690,7 @@ class connapp:
if plugin_exist or disabled_plugin_exist:
check_bad_script = self.plugins.verify_script(args.update[1])
if check_bad_script:
print(check_bad_script)
printer.error(check_bad_script)
exit(16)
else:
try:
@@ -686,13 +700,13 @@ class connapp:
shutil.copy2(args.update[1], disabled_dest_file)
else:
shutil.copy2(args.update[1], dest_file)
print(f"Plugin {args.update[0]} updated succesfully.")
printer.success(f"Plugin {args.update[0]} updated successfully.")
except Exception as e:
print(f"Failed updating plugin file. {e}")
printer.error(f"Failed updating plugin file. {e}")
exit(17)
else:
print("Plugin {} dosn't exist.".format(args.update[0]))
printer.error("Plugin {} dosn't exist.".format(args.update[0]))
exit(14)
elif args.delete:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.delete[0] + ".py")
@@ -700,7 +714,7 @@ class connapp:
plugin_exist = os.path.exists(plugin_file)
disabled_plugin_exist = os.path.exists(disabled_plugin_file)
if not plugin_exist and not disabled_plugin_exist:
print("Plugin {} dosn't exist.".format(args.delete[0]))
printer.error("Plugin {} dosn't exist.".format(args.delete[0]))
exit(14)
question = [inquirer.Confirm("delete", message="Are you sure you want to delete {} plugin?".format(args.delete[0]))]
confirm = inquirer.prompt(question)
@@ -712,33 +726,33 @@ class connapp:
os.remove(plugin_file)
elif disabled_plugin_exist:
os.remove(disabled_plugin_file)
print(f"plugin {args.delete[0]} deleted succesfully.")
printer.success(f"plugin {args.delete[0]} deleted successfully.")
except Exception as e:
print(f"Failed deleting plugin file. {e}")
printer.error(f"Failed deleting plugin file. {e}")
exit(17)
elif args.disable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.disable[0] + ".py.bkp")
if not os.path.exists(plugin_file) or os.path.exists(disabled_plugin_file):
print("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
printer.error("Plugin {} dosn't exist or it's disabled.".format(args.disable[0]))
exit(14)
try:
os.rename(plugin_file, disabled_plugin_file)
print(f"plugin {args.disable[0]} disabled succesfully.")
printer.success(f"plugin {args.disable[0]} disabled successfully.")
except Exception as e:
print(f"Failed disabling plugin file. {e}")
printer.error(f"Failed disabling plugin file. {e}")
exit(17)
elif args.enable:
plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py")
disabled_plugin_file = os.path.join(self.config.defaultdir + "/plugins", args.enable[0] + ".py.bkp")
if os.path.exists(plugin_file) or not os.path.exists(disabled_plugin_file):
print("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
printer.error("Plugin {} dosn't exist or it's enabled.".format(args.enable[0]))
exit(14)
try:
os.rename(disabled_plugin_file, plugin_file)
print(f"plugin {args.enable[0]} enabled succesfully.")
printer.success(f"plugin {args.enable[0]} enabled successfully.")
except Exception as e:
print(f"Failed enabling plugin file. {e}")
printer.error(f"Failed enabling plugin file. {e}")
exit(17)
elif args.list:
enabled_files = []
@@ -758,18 +772,19 @@ class connapp:
if disabled_files:
plugins["Disabled"] = disabled_files
if plugins:
printer.custom("plugins","")
print(yaml.dump(plugins, sort_keys=False))
else:
print("There are no plugins added.")
printer.warning("There are no plugins added.")
def _func_import(self, args):
if not os.path.exists(args.data[0]):
print("File {} dosn't exist".format(args.data[0]))
printer.error("File {} dosn't exist".format(args.data[0]))
exit(14)
print("This could overwrite your current configuration!")
printer.warning("This could overwrite your current configuration!")
question = [inquirer.Confirm("import", message="Are you sure you want to import {} file?".format(args.data[0]))]
confirm = inquirer.prompt(question)
if confirm == None:
@@ -779,7 +794,7 @@ class connapp:
with open(args.data[0]) as file:
imported = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for k,v in imported.items():
uniques = self.config._explode_unique(k)
@@ -798,12 +813,12 @@ class connapp:
uniques.update(v)
self.config._connections_add(**uniques)
self.config._saveconfig(self.config.file)
print("File {} imported succesfully".format(args.data[0]))
printer.success("File {} imported successfully".format(args.data[0]))
return
def _func_export(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
printer.error("File {} already exists".format(args.data[0]))
exit(14)
if len(args.data[1:]) == 0:
foldercons = self.config._getallnodesfull(extract = False)
@@ -811,13 +826,13 @@ class connapp:
for folder in args.data[1:]:
matches = list(filter(lambda k: k == folder, self.folders))
if len(matches) == 0 and folder != "@":
print("{} folder not found".format(folder))
printer.error("{} folder not found".format(folder))
exit(2)
foldercons = self.config._getallnodesfull(args.data[1:], extract = False)
with open(args.data[0], "w") as file:
yaml.dump(foldercons, file, Dumper=NoAliasDumper, default_flow_style=False)
file.close()
print("File {} generated succesfully".format(args.data[0]))
printer.success("File {} generated successfully".format(args.data[0]))
exit()
return
@@ -967,13 +982,13 @@ class connapp:
def _yaml_generate(self, args):
if os.path.exists(args.data[0]):
print("File {} already exists".format(args.data[0]))
printer.error("File {} already exists".format(args.data[0]))
exit(14)
else:
with open(args.data[0], "w") as file:
file.write(self._help("generate"))
file.close()
print("File {} generated succesfully".format(args.data[0]))
printer.success("File {} generated successfully".format(args.data[0]))
exit()
def _yaml_run(self, args):
@@ -981,7 +996,7 @@ class connapp:
with open(args.data[0]) as file:
scripts = yaml.load(file, Loader=yaml.FullLoader)
except:
print("failed reading file {}".format(args.data[0]))
printer.error("failed reading file {}".format(args.data[0]))
exit(10)
for script in scripts["tasks"]:
self._cli_run(script)
@@ -997,11 +1012,11 @@ class connapp:
if action == "test":
args["expected"] = script["expected"]
except KeyError as e:
print("'{}' is mandatory".format(e.args[0]))
printer.error("'{}' is mandatory".format(e.args[0]))
exit(11)
nodes = self.config._getallnodes(nodelist)
if len(nodes) == 0:
print("{} don't match any node".format(nodelist))
printer.error("{} don't match any node".format(nodelist))
exit(2)
nodes = self.nodes(self.config.getitems(nodes), config = self.config)
stdout = False
@@ -1027,32 +1042,47 @@ class connapp:
columns = int(p.group(1))
except:
columns = 80
PANEL_WIDTH = columns
if action == "run":
nodes.run(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if stdout:
for line in nodes.output[i].splitlines():
print(" " + line)
header = f"{script['name'].upper()}"
elif action == "test":
nodes.test(**args)
print(script["name"].upper() + "-" * (columns - len(script["name"])))
for i in nodes.status.keys():
print(" " + i + " " + "-" * (columns - len(i) - 13) + (" PASS(0)" if nodes.status[i] == 0 else " FAIL({})".format(nodes.status[i])))
if nodes.status[i] == 0:
max_length = max(len(s) for s in nodes.result[i].keys())
for k,v in nodes.result[i].items():
print(" TEST for '{}'".format(k) + " "*(max_length - len(k) + 1) + "--> " + str(v).upper())
if stdout:
if nodes.status[i] == 0:
print(" " + "-" * (max_length + 21))
for line in nodes.output[i].splitlines():
print(" " + line)
header = f"{script['name'].upper()}"
else:
print("Wrong action '{}'".format(action))
printer.error(f"Wrong action '{action}'")
exit(13)
mdprint(Rule(header, style="white"))
for node in nodes.status:
status_str = "[✓] PASS(0)" if nodes.status[node] == 0 else f"[x] FAIL({nodes.status[node]})"
title_line = f"{node}{status_str}"
test_output = Text()
if action == "test" and nodes.status[node] == 0:
results = nodes.result[node]
test_output.append("TEST RESULTS:\n")
max_key_len = max(len(k) for k in results.keys())
for k, v in results.items():
status = "[✓]" if str(v).upper() == "TRUE" else "[x]"
test_output.append(f" {k.ljust(max_key_len)} {status}\n")
output = nodes.output[node].strip()
code_block = Text()
if stdout and output:
code_block = Text(output + "\n")
if action == "test" and nodes.status[node] == 0:
highlight_words = [k for k, v in nodes.result[node].items() if str(v).upper() == "TRUE"]
code_block.highlight_words(highlight_words, style=Style(color="green", bold=True, underline=True))
panel_content = Group(test_output, Text(""), code_block)
mdprint(Panel(panel_content, title=title_line, width=PANEL_WIDTH, border_style="white"))
def _choose(self, list, name, action):
#Generates an inquirer list to pick
if FzfPrompt and self.fzf:
@@ -1081,16 +1111,16 @@ class connapp:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
return True
def _profile_protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$)"):
def _profile_protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^kubectl$|^docker$|^$)"):
#Validate protocol in inquirer when managing profiles
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet or leave empty")
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, kubectl, docker or leave empty")
return True
def _protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^$|^@.+$)"):
def _protocol_validation(self, answers, current, regex = "(^ssh$|^telnet$|^kubectl$|^docker$|^$|^@.+$)"):
#Validate protocol in inquirer when managing nodes
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, leave empty or @profile")
raise inquirer.errors.ValidationError("", reason="Pick between ssh, telnet, kubectl, docker leave empty or @profile")
if current.startswith("@"):
if current[1:] not in self.profiles:
raise inquirer.errors.ValidationError("", reason="Profile {} don't exist".format(current))
@@ -1111,7 +1141,7 @@ class connapp:
def _port_validation(self, answers, current, regex = "(^[0-9]*$|^@.+$)"):
#Validate port in inquirer when managing nodes
if not re.match(regex, current):
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-65535, @profile or leave empty")
raise inquirer.errors.ValidationError("", reason="Pick a port between 1-6553/app5, @profile or leave empty")
try:
port = int(current)
except:
@@ -1217,7 +1247,7 @@ class connapp:
#Inquirer questions when editing nodes or profiles
questions = []
questions.append(inquirer.Confirm("host", message="Edit Hostname/IP?"))
questions.append(inquirer.Confirm("protocol", message="Edit Protocol?"))
questions.append(inquirer.Confirm("protocol", message="Edit Protocol/app?"))
questions.append(inquirer.Confirm("port", message="Edit Port?"))
questions.append(inquirer.Confirm("options", message="Edit Options?"))
questions.append(inquirer.Confirm("logs", message="Edit logging path/file?"))
@@ -1247,7 +1277,7 @@ class connapp:
else:
node["host"] = defaults["host"]
if edit["protocol"]:
questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation, default=defaults["protocol"]))
questions.append(inquirer.Text("protocol", message="Select Protocol/app", validate=self._protocol_validation, default=defaults["protocol"]))
else:
node["protocol"] = defaults["protocol"]
if edit["port"]:
@@ -1255,7 +1285,7 @@ class connapp:
else:
node["port"] = defaults["port"]
if edit["options"]:
questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation, default=defaults["options"]))
questions.append(inquirer.Text("options", message="Pass extra options to protocol/app", validate=self._default_validation, default=defaults["options"]))
else:
node["options"] = defaults["options"]
if edit["logs"]:
@@ -1321,7 +1351,7 @@ class connapp:
else:
profile["host"] = defaults["host"]
if edit["protocol"]:
questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._profile_protocol_validation, default=defaults["protocol"]))
questions.append(inquirer.Text("protocol", message="Select Protocol/app", validate=self._profile_protocol_validation, default=defaults["protocol"]))
else:
profile["protocol"] = defaults["protocol"]
if edit["port"]:
@@ -1329,7 +1359,7 @@ class connapp:
else:
profile["port"] = defaults["port"]
if edit["options"]:
questions.append(inquirer.Text("options", message="Pass extra options to protocol", default=defaults["options"]))
questions.append(inquirer.Text("options", message="Pass extra options to protocol/app", default=defaults["options"]))
else:
profile["options"] = defaults["options"]
if edit["logs"]:
@@ -1364,15 +1394,15 @@ class connapp:
result["id"] = unique
return result
def _questions_bulk(self):
def _questions_bulk(self, nodes="", hosts=""):
#Questions when using bulk command
questions = []
questions.append(inquirer.Text("ids", message="add a comma separated list of nodes to add", validate=self._bulk_node_validation))
questions.append(inquirer.Text("ids", message="add a comma separated list of nodes to add", default=nodes, validate=self._bulk_node_validation))
questions.append(inquirer.Text("location", message="Add a @folder, @subfolder@folder or leave empty", validate=self._bulk_folder_validation))
questions.append(inquirer.Text("host", message="Add comma separated list of Hostnames or IPs", validate=self._bulk_host_validation))
questions.append(inquirer.Text("protocol", message="Select Protocol", validate=self._protocol_validation))
questions.append(inquirer.Text("host", message="Add comma separated list of Hostnames or IPs", default=hosts, validate=self._bulk_host_validation))
questions.append(inquirer.Text("protocol", message="Select Protocol/app", validate=self._protocol_validation))
questions.append(inquirer.Text("port", message="Select Port Number", validate=self._port_validation))
questions.append(inquirer.Text("options", message="Pass extra options to protocol", validate=self._default_validation))
questions.append(inquirer.Text("options", message="Pass extra options to protocol/app", validate=self._default_validation))
questions.append(inquirer.Text("logs", message="Pick logging path/file ", validate=self._default_validation))
questions.append(inquirer.Text("tags", message="Add tags dictionary", validate=self._tags_validation))
questions.append(inquirer.Text("jumphost", message="Add Jumphost node", validate=self._jumphost_validation))
@@ -1419,7 +1449,7 @@ class connapp:
if subparser.description != None:
commands.append(subcommand)
commands = ",".join(commands)
usage_help = f"conn [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n conn {{{commands}}} ..."
usage_help = f"connpy [-h] [--add | --del | --mod | --show | --debug] [node|folder] [--sftp]\n connpy {{{commands}}} ..."
return usage_help
if type == "end":
help_dict = {}
@@ -1552,3 +1582,44 @@ tasks:
output: null
...'''
def _print_instructions(self):
instructions = """
Welcome to Connpy node Addition Wizard!
Here are some important instructions and tips for configuring your new node:
1. **Profiles**:
- You can use the configured settings in a profile using `@profilename`.
2. **Available Protocols and Apps**:
- ssh
- telnet
- kubectl (`kubectl exec`)
- docker (`docker exec`)
3. **Optional Values**:
- You can leave any value empty except for the hostname/IP.
4. **Passwords**:
- You can pass one or more passwords using comma-separated `@profiles`.
5. **Logging**:
- You can use the following variables in the logging file name:
- `${id}`
- `${unique}`
- `${host}`
- `${port}`
- `${user}`
- `${protocol}`
6. **Well-Known Tags**:
- `os`: Identified by AI to generate commands based on the operating system.
- `screen_length_command`: Used by automation to avoid pagination on different devices (e.g., `terminal length 0` for Cisco devices).
- `prompt`: Replaces default app prompt to identify the end of output or where the user can start inputting commands.
- `kube_command`: Replaces the default command (`/bin/bash`) for `kubectl exec`.
- `docker_command`: Replaces the default command for `docker exec`.
Please follow these instructions carefully to ensure proper configuration of your new node.
"""
mdprint(Markdown(instructions))

View File

@@ -13,6 +13,7 @@ import threading
from pathlib import Path
from copy import deepcopy
from .hooks import ClassHook, MethodHook
from . import printer
import io
#functions and classes
@@ -28,7 +29,7 @@ class node:
- result(bool): True if expected value is found after running
the commands using test method.
- status (int): 0 if the method run or test run succesfully.
- status (int): 0 if the method run or test run successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.
@@ -57,7 +58,7 @@ class node:
- port (str): Port to connect to node, default 22 for ssh and 23
for telnet.
- protocol (str): Select ssh or telnet. Default is ssh.
- protocol (str): Select ssh, telnet, kubectl or docker. Default is ssh.
- user (str): Username to of the node.
@@ -254,7 +255,7 @@ class node:
if connect == True:
size = re.search('columns=([0-9]+).*lines=([0-9]+)',str(os.get_terminal_size()))
self.child.setwinsize(int(size.group(2)),int(size.group(1)))
print("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
printer.success("Connected to " + self.unique + " at " + self.host + (":" if self.port != '' else '') + self.port + " via: " + self.protocol)
if 'logfile' in dir(self):
# Initialize self.mylog
if not 'mylog' in dir(self):
@@ -279,7 +280,7 @@ class node:
f.write(self._logclean(self.mylog.getvalue().decode(), True))
else:
print(connect)
printer.error(connect)
exit(1)
@MethodHook
@@ -326,6 +327,14 @@ class node:
connect = self._connect(timeout = timeout)
now = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')
if connect == True:
# Attempt to set the terminal size
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
@@ -413,6 +422,14 @@ class node:
'''
connect = self._connect(timeout = timeout)
if connect == True:
# Attempt to set the terminal size
try:
self.child.setwinsize(65535, 65535)
except Exception:
try:
self.child.setwinsize(10000, 10000)
except Exception:
pass
if "prompt" in self.tags:
prompt = self.tags["prompt"]
expects = [prompt, pexpect.EOF, pexpect.TIMEOUT]
@@ -468,104 +485,169 @@ class node:
return connect
@MethodHook
def _connect(self, debug = False, timeout = 10, max_attempts = 3):
# Method to connect to the node, it parse all the information, create the ssh/telnet command and login to the node.
if self.protocol in ["ssh", "sftp"]:
def _generate_ssh_sftp_cmd(self):
cmd = self.protocol
if self.idletime > 0:
cmd = cmd + " -o ServerAliveInterval=" + str(self.idletime)
if self.port != '':
cmd += " -o ServerAliveInterval=" + str(self.idletime)
if self.port:
if self.protocol == "ssh":
cmd = cmd + " -p " + self.port
cmd += " -p " + self.port
elif self.protocol == "sftp":
cmd = cmd + " -P " + self.port
if self.options != '':
cmd = cmd + " " + self.options
if self.logs != '':
self.logfile = self._logfile()
if self.jumphost != '':
cmd = cmd + " " + self.jumphost
if self.password[0] != '':
passwords = self._passtx(self.password)
else:
passwords = []
if self.user == '':
cmd = cmd + " {}".format(self.host)
else:
cmd = cmd + " {}".format("@".join([self.user,self.host]))
expects = ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: (ssh|sftp)', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', '[p|P]assword:|[u|U]sername:', r'>$|#$|\$$|>.$|#.$|\$.$', 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"]
cmd += " -P " + self.port
if self.options:
cmd += " " + self.options
if self.jumphost:
cmd += " " + self.jumphost
user_host = f"{self.user}@{self.host}" if self.user else self.host
cmd += f" {user_host}"
return cmd
@MethodHook
def _generate_telnet_cmd(self):
cmd = f"telnet {self.host}"
if self.port:
cmd += f" {self.port}"
if self.options:
cmd += f" {self.options}"
return cmd
@MethodHook
def _generate_kube_cmd(self):
cmd = f"kubectl exec {self.options} {self.host} -it --"
kube_command = self.tags.get("kube_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash"
cmd += f" {kube_command}"
return cmd
@MethodHook
def _generate_docker_cmd(self):
cmd = f"docker {self.options} exec -it {self.host}"
docker_command = self.tags.get("docker_command", "/bin/bash") if isinstance(self.tags, dict) else "/bin/bash"
cmd += f" {docker_command}"
return cmd
@MethodHook
def _get_cmd(self):
if self.protocol in ["ssh", "sftp"]:
return self._generate_ssh_sftp_cmd()
elif self.protocol == "telnet":
cmd = "telnet " + self.host
if self.port != '':
cmd = cmd + " " + self.port
if self.options != '':
cmd = cmd + " " + self.options
return self._generate_telnet_cmd()
elif self.protocol == "kubectl":
return self._generate_kube_cmd()
elif self.protocol == "docker":
return self._generate_docker_cmd()
else:
raise ValueError(f"Invalid protocol: {self.protocol}")
@MethodHook
def _connect(self, debug=False, timeout=10, max_attempts=3):
cmd = self._get_cmd()
passwords = self._passtx(self.password) if self.password[0] else []
if self.logs != '':
self.logfile = self._logfile()
if self.password[0] != '':
passwords = self._passtx(self.password)
else:
passwords = []
expects = ['[u|U]sername:', 'refused', 'supported', 'invalid option', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', '[p|P]assword:', r'>$|#$|\$$|>.$|#.$|\$.$', 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"]
else:
raise ValueError("Invalid protocol: " + self.protocol)
default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$'
prompt = self.tags.get("prompt", default_prompt) if isinstance(self.tags, dict) else default_prompt
password_prompt = '[p|P]assword:|[u|U]sername:' if self.protocol != 'telnet' else '[p|P]assword:'
expects = {
"ssh": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: ssh', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"],
"sftp": ['yes/no', 'refused', 'supported', 'Invalid|[u|U]sage: sftp', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"],
"telnet": ['[u|U]sername:', 'refused', 'supported', 'invalid|unrecognized option', 'ssh-keygen.*\"', 'timeout|timed.out', 'unavailable', 'closed', password_prompt, prompt, 'suspend', pexpect.EOF, pexpect.TIMEOUT, "No route to host", "resolve hostname", "no matching", "[b|B]ad (owner|permissions)"],
"kubectl": ['[u|U]sername:', '[r|R]efused', '[E|e]rror', 'DEPRECATED', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF, "expired|invalid"],
"docker": ['[u|U]sername:', 'Cannot', '[E|e]rror', 'failed', 'not a docker command', 'unknown', 'unable to resolve', pexpect.TIMEOUT, password_prompt, prompt, pexpect.EOF]
}
error_indices = {
"ssh": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16],
"sftp": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16],
"telnet": [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16],
"kubectl": [1, 2, 3, 4, 8], # Define error indices for kube
"docker": [1, 2, 3, 4, 5, 6, 7] # Define error indices for docker
}
eof_indices = {
"ssh": [8, 9, 10, 11],
"sftp": [8, 9, 10, 11],
"telnet": [8, 9, 10, 11],
"kubectl": [5, 6, 7], # Define eof indices for kube
"docker": [8, 9, 10] # Define eof indices for docker
}
initial_indices = {
"ssh": [0],
"sftp": [0],
"telnet": [0],
"kubectl": [0], # Define special indices for kube
"docker": [0] # Define special indices for docker
}
attempts = 1
while attempts <= max_attempts:
child = pexpect.spawn(cmd)
if isinstance(self.tags, dict) and self.tags.get("console"):
child.sendline()
if debug:
print(cmd)
printer.debug(f"Command:\n{cmd}")
self.mylog = io.BytesIO()
child.logfile_read = self.mylog
if len(passwords) > 0:
loops = len(passwords)
else:
loops = 1
endloop = False
for i in range(0, loops):
for i in range(len(passwords) if passwords else 1):
while True:
results = child.expect(expects, timeout=timeout)
if results == 0:
results = child.expect(expects[self.protocol], timeout=timeout)
results_value = expects[self.protocol][results]
if results in initial_indices[self.protocol]:
if self.protocol in ["ssh", "sftp"]:
child.sendline('yes')
elif self.protocol == "telnet":
if self.user != '':
elif self.protocol in ["telnet", "kubectl", "docker"]:
if self.user:
child.sendline(self.user)
else:
self.missingtext = True
break
if results in [1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16]:
elif results in error_indices[self.protocol]:
child.terminate()
if results == 12 and attempts != max_attempts:
if results_value == pexpect.TIMEOUT and attempts != max_attempts:
attempts += 1
endloop = True
break
else:
if results == 12:
after = "Connection timeout"
else:
after = child.after.decode()
return ("Connection failed code:" + str(results) + "\n" + child.before.decode().lstrip() + after + child.readline().decode()).rstrip()
if results == 8:
if len(passwords) > 0:
after = "Connection timeout" if results_value == pexpect.TIMEOUT else child.after.decode()
return f"Connection failed code: {results}\n{child.before.decode().lstrip()}{after}{child.readline().decode()}".rstrip()
elif results in eof_indices[self.protocol]:
if results_value == password_prompt:
if passwords:
child.sendline(passwords[i])
else:
self.missingtext = True
break
if results in [9, 11]:
elif results_value == "suspend":
child.sendline("\r")
sleep(2)
else:
endloop = True
child.sendline()
break
if results == 10:
child.sendline("\r")
sleep(2)
if endloop:
break
if results == 12:
if results_value == pexpect.TIMEOUT:
continue
else:
break
if isinstance(self.tags, dict) and self.tags.get("post_connect_commands"):
cmds = self.tags.get("post_connect_commands")
commands = [cmds] if isinstance(cmds, str) else cmds
for command in commands:
child.sendline(command)
sleep(1)
child.readline(0)
self.child = child
from pexpect import fdpexpect
self.raw_child = fdpexpect.fdspawn(self.child.child_fd)
return True
@ClassHook
@@ -587,7 +669,7 @@ class nodes:
Created after running method test.
- status (dict): Dictionary formed by nodes unique as keys, value:
0 if method run or test ended succesfully.
0 if method run or test ended successfully.
1 if connection failed.
2 if expect timeouts without prompt or EOF.

View File

@@ -0,0 +1,399 @@
import argparse
import sys
import subprocess
import random
import socket
import time
import threading
from pexpect import TIMEOUT
from connpy import printer
class RemoteCapture:
def __init__(self, connapp, node_name, interface, namespace=None, use_wireshark=False, tcpdump_filter=None, tcpdump_args=None):
self.connapp = connapp
self.node_name = node_name
self.interface = interface
self.namespace = namespace
self.use_wireshark = use_wireshark
self.tcpdump_filter = tcpdump_filter or []
self.tcpdump_args = tcpdump_args if isinstance(tcpdump_args, list) else []
if node_name.startswith("@"): # fuzzy match
matches = [k for k in connapp.nodes_list if node_name in k]
else:
matches = [k for k in connapp.nodes_list if k.startswith(node_name)]
if not matches:
printer.error(f"Node '{node_name}' not found.")
sys.exit(2)
elif len(matches) > 1:
matches[0] = connapp._choose(matches, "node", "capture")
if matches[0] is None:
sys.exit(7)
node_data = connapp.config.getitem(matches[0])
self.node = connapp.node(matches[0], **node_data, config=connapp.config)
if self.node.protocol != "ssh":
printer.error(f"Node '{self.node.unique}' must be an SSH connection.")
sys.exit(2)
self.wireshark_path = connapp.config.config.get("wireshark_path")
def _start_local_listener(self, port, ws_proc=None):
self.fake_connection = False
self.listener_active = True
self.listener_conn = None
self.listener_connected = threading.Event()
def listen():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("localhost", port))
s.listen(1)
printer.start(f"Listening on localhost:{port}")
conn, addr = s.accept()
self.listener_conn = conn
if not self.fake_connection:
printer.start(f"Connection from {addr}")
self.listener_connected.set()
try:
while self.listener_active:
data = conn.recv(4096)
if not data:
break
if self.use_wireshark and ws_proc:
try:
ws_proc.stdin.write(data)
ws_proc.stdin.flush()
except BrokenPipeError:
printer.info("Wireshark closed the pipe.")
break
else:
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
except Exception as e:
if isinstance(e, BrokenPipeError):
printer.info("Listener closed due to broken pipe.")
else:
printer.error(f"Listener error: {e}")
finally:
conn.close()
self.listener_conn = None
self.listener_thread = threading.Thread(target=listen)
self.listener_thread.daemon = True
self.listener_thread.start()
def _is_port_in_use(self, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
def _find_free_port(self, start=20000, end=30000):
for _ in range(10):
port = random.randint(start, end)
if not self._is_port_in_use(port):
return port
raise RuntimeError("No free port found for SSH tunnel.")
def _monitor_wireshark(self, ws_proc):
try:
while True:
try:
ws_proc.wait(timeout=1)
self.listener_active = False
if self.listener_conn:
printer.info("Wireshark exited, stopping listener.")
try:
self.listener_conn.shutdown(socket.SHUT_RDWR)
self.listener_conn.close()
except Exception:
pass
break
except subprocess.TimeoutExpired:
if not self.listener_active:
break
time.sleep(0.2)
except Exception as e:
printer.warning(f"Error in monitor_wireshark: {e}")
def _detect_sudo_requirement(self):
base_cmd = f"tcpdump -i {self.interface} -w - -U -c 1"
if self.namespace:
base_cmd = f"ip netns exec {self.namespace} {base_cmd}"
cmds = [base_cmd, f"sudo {base_cmd}"]
printer.info(f"Verifying sudo requirement")
for cmd in cmds:
try:
self.node.child.sendline(cmd)
start_time = time.time()
while time.time() - start_time < 3:
try:
index = self.node.child.expect([
r'listening on',
r'permission denied',
r'cannot',
r'No such file or directory',
], timeout=1)
if index == 0:
self.node.child.send("\x03")
return "sudo" in cmd
else:
break
except Exception:
continue
self.node.child.send("\x03")
time.sleep(0.5)
try:
self.node.child.read_nonblocking(size=1024, timeout=0.5)
except Exception:
pass
except Exception as e:
printer.warning(f"Error during sudo detection: {e}")
continue
printer.error(f"Failed to run tcpdump on remote node '{self.node.unique}'")
sys.exit(4)
def _monitor_capture_output(self):
try:
index = self.node.child.expect([
r'Broken pipe',
r'packet[s]? captured'
], timeout=None)
if index == 0:
printer.error("Tcpdump failed: Broken pipe.")
else:
printer.success("Tcpdump finished capturing packets.")
self.listener_active = False
except:
pass
def _sendline_until_connected(self, cmd, retries=5, interval=2):
for attempt in range(1, retries + 1):
printer.info(f"Attempt {attempt}/{retries} to connect listener...")
self.node.child.sendline(cmd)
try:
index = self.node.child.expect([
r'listening on',
TIMEOUT,
r'permission',
r'not permitted',
r'invalid',
r'unrecognized',
r'Unable',
r'No such',
r'illegal',
r'not found',
r'non-ether',
r'syntax error'
], timeout=5)
if index == 0:
self.monitor_end = threading.Thread(target=self._monitor_capture_output)
self.monitor_end.daemon = True
self.monitor_end.start()
if self.listener_connected.wait(timeout=interval):
printer.success("Listener successfully received a connection.")
return True
else:
printer.warning("No connection yet. Retrying...")
elif index == 1:
error = f"tcpdump did not respond within the expected time.\n" \
f"Command used:\n{cmd}\n" \
f"→ Please verify the command syntax."
return f"{error}"
else:
before_last_line = self.node.child.before.decode().splitlines()[-1]
error = f"Tcpdump error detected: " \
f"{before_last_line}{self.node.child.after.decode()}{self.node.child.readline().decode()}".rstrip()
return f"{error}"
except Exception as e:
printer.warning(f"Unexpected error during tcpdump startup: {e}")
return False
return False
def _build_tcpdump_command(self):
base = f"tcpdump -i {self.interface}"
if self.use_wireshark:
base += " -w - -U"
else:
base += " -l"
if self.namespace:
base = f"ip netns exec {self.namespace} {base}"
if self.requires_sudo:
base = f"sudo {base}"
if self.tcpdump_args:
base += " " + " ".join(self.tcpdump_args)
if self.tcpdump_filter:
base += " " + " ".join(self.tcpdump_filter)
base += f" | nc localhost {self.local_port}"
return base
def run(self):
if self.use_wireshark:
if not self.wireshark_path:
printer.error("Wireshark path not set in config.\nUse '--set-wireshark-path /full/path/to/wireshark' to configure it.")
sys.exit(1)
self.local_port = self._find_free_port()
self.node.options += f" -o ExitOnForwardFailure=yes -R {self.local_port}:localhost:{self.local_port}"
connection = self.node._connect()
if connection is not True:
printer.error(f"Could not connect to {self.node.unique}\n{connection}")
sys.exit(1)
self.requires_sudo = self._detect_sudo_requirement()
tcpdump_cmd = self._build_tcpdump_command()
ws_proc = None
monitor_thread = None
if self.use_wireshark:
printer.info(f"Live capture from {self.node.unique}:{self.interface}, launching Wireshark...")
try:
ws_proc = subprocess.Popen(
[self.wireshark_path, "-k", "-i", "-"],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
)
except Exception as e:
printer.error(f"Failed to launch Wireshark: {e}\nMake sure the path is correct and Wireshark is installed.")
exit(1)
monitor_thread = threading.Thread(target=self._monitor_wireshark, args=(ws_proc,))
monitor_thread.daemon = True
monitor_thread.start()
else:
printer.info(f"Live text capture from {self.node.unique}:{self.interface}")
printer.info("Press Ctrl+C to stop.\n")
try:
self._start_local_listener(self.local_port, ws_proc=ws_proc)
time.sleep(1) # small delay before retry attempts
result = self._sendline_until_connected(tcpdump_cmd, retries=5, interval=2)
if result is not True:
if isinstance(result, str):
printer.error(f"{result}")
else:
printer.error("Listener connection failed after all retries.")
printer.debug(f"Command used:\n{tcpdump_cmd}")
if not self.listener_conn:
try:
self.fake_connection = True
socket.create_connection(("localhost", self.local_port), timeout=1).close()
except:
pass
self.listener_active = False
return
while self.listener_active:
time.sleep(0.5)
except KeyboardInterrupt:
print("")
printer.warning("Capture interrupted by user.")
self.listener_active = False
finally:
if self.listener_conn:
try:
self.listener_conn.shutdown(socket.SHUT_RDWR)
self.listener_conn.close()
except:
pass
if hasattr(self.node, "child"):
self.node.child.close(force=True)
if self.listener_thread.is_alive():
self.listener_thread.join()
if monitor_thread and monitor_thread.is_alive():
monitor_thread.join()
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Capture packets remotely using a saved SSH node", epilog="All unknown arguments will be passed to tcpdump.")
self.parser.add_argument("node", nargs='?', help="Name of the saved node (must use SSH)")
self.parser.add_argument("interface", nargs='?', help="Network interface to capture on")
self.parser.add_argument("--ns", "--namespace", dest="namespace", help="Optional network namespace")
self.parser.add_argument("-w","--wireshark", action="store_true", help="Open live capture in Wireshark")
self.parser.add_argument("--set-wireshark-path", metavar="PATH", help="Set the default path to Wireshark binary")
self.parser.add_argument(
"-f", "--filter",
dest="tcpdump_filter",
metavar="ARG",
nargs="*",
default=["not", "port", "22"],
help="tcpdump filter expression (e.g., -f port 443 and udp). Default: not port 22"
)
self.parser.add_argument(
"--unknown-args",
action="store_true",
default=True,
help=argparse.SUPPRESS
)
class Entrypoint:
def __init__(self, args, parser, connapp):
if "--" in args.unknown_args:
args.unknown_args.remove("--")
if args.set_wireshark_path:
connapp._change_settings("wireshark_path", args.set_wireshark_path)
return
if not args.node or not args.interface:
parser.error("node and interface are required unless --set-wireshark-path is used")
capture = RemoteCapture(
connapp=connapp,
node_name=args.node,
interface=args.interface,
namespace=args.namespace,
use_wireshark=args.wireshark,
tcpdump_filter=args.tcpdump_filter,
tcpdump_args=args.unknown_args
)
capture.run()
def _connpy_completion(wordsnumber, words, info = None):
if wordsnumber == 3:
result = ["--help", "--set-wireshark-path"]
result.extend(info["nodes"])
elif wordsnumber == 5 and words[1] in info["nodes"]:
result = ['--wireshark', '--namespace', '--filter', '--help']
elif wordsnumber == 6 and words[3] in ["-w", "--wireshark"]:
result = ['--namespace', '--filter', '--help']
elif wordsnumber == 7 and words[3] in ["-n", "--namespace"]:
result = ['--wireshark', '--filter', '--help']
elif wordsnumber == 8:
if any(w in words for w in ["-w", "--wireshark"]) and any(w in words for w in ["-n", "--namespace"]):
result = ['--filter', '--help']
else:
result = []
return result

View File

@@ -0,0 +1,181 @@
import argparse
import yaml
import re
from connpy import printer
class context_manager:
def __init__(self, connapp):
self.connapp = connapp
self.config = connapp.config
self.contexts = self.config.config["contexts"]
self.current_context = self.config.config["current_context"]
self.regex = [re.compile(regex) for regex in self.contexts[self.current_context]]
def add_context(self, context, regex):
if not context.isalnum():
printer.error("Context name has to be alphanumeric.")
exit(1)
elif context in self.contexts:
printer.error(f"Context {context} already exists.")
exit(2)
else:
self.contexts[context] = regex
self.connapp._change_settings("contexts", self.contexts)
def modify_context(self, context, regex):
if context == "all":
printer.error("Can't modify default context: all")
exit(3)
elif context not in self.contexts:
printer.error(f"Context {context} doesn't exist.")
exit(4)
else:
self.contexts[context] = regex
self.connapp._change_settings("contexts", self.contexts)
def delete_context(self, context):
if context == "all":
printer.error("Can't delete default context: all")
exit(3)
elif context not in self.contexts:
printer.error(f"Context {context} doesn't exist.")
exit(4)
if context == self.current_context:
printer.error(f"Can't delete current context: {self.current_context}")
exit(5)
else:
self.contexts.pop(context)
self.connapp._change_settings("contexts", self.contexts)
def list_contexts(self):
for key in self.contexts.keys():
if key == self.current_context:
printer.success(f"{key} (active)")
else:
printer.custom(" ",key)
def set_context(self, context):
if context not in self.contexts:
printer.error(f"Context {context} doesn't exist.")
exit(4)
elif context == self.current_context:
printer.info(f"Context {context} already set")
exit(0)
else:
self.connapp._change_settings("current_context", context)
def show_context(self, context):
if context not in self.contexts:
printer.error(f"Context {context} doesn't exist.")
exit(4)
else:
yaml_output = yaml.dump(self.contexts[context], sort_keys=False, default_flow_style=False)
printer.custom(context,"")
print(yaml_output)
@staticmethod
def add_default_context(config):
config_modified = False
if "contexts" not in config.config:
config.config["contexts"] = {}
config.config["contexts"]["all"] = [".*"]
config_modified = True
if "current_context" not in config.config:
config.config["current_context"] = "all"
config_modified = True
if config_modified:
config._saveconfig(config.file)
def match_any_regex(self, node, regex_list):
return any(regex.match(node) for regex in regex_list)
def modify_node_list(self, *args, **kwargs):
filtered_nodes = [node for node in kwargs["result"] if self.match_any_regex(node, self.regex)]
return filtered_nodes
def modify_node_dict(self, *args, **kwargs):
filtered_nodes = {key: value for key, value in kwargs["result"].items() if self.match_any_regex(key, self.regex)}
return filtered_nodes
class Preload:
def __init__(self, connapp):
#define contexts if doesn't exist
connapp.config.modify(context_manager.add_default_context)
#filter nodes using context
cm = context_manager(connapp)
connapp.nodes_list = [node for node in connapp.nodes_list if cm.match_any_regex(node, cm.regex)]
connapp.folders = [node for node in connapp.folders if cm.match_any_regex(node, cm.regex)]
connapp.config._getallnodes.register_post_hook(cm.modify_node_list)
connapp.config._getallfolders.register_post_hook(cm.modify_node_list)
connapp.config._getallnodesfull.register_post_hook(cm.modify_node_dict)
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Manage contexts with regex matching", formatter_class=argparse.RawTextHelpFormatter)
# Define the context name as a positional argument
self.parser.add_argument("context_name", help="Name of the context", nargs='?')
group = self.parser.add_mutually_exclusive_group(required=True)
group.add_argument("-a", "--add", nargs='+', help='Add a new context with regex values.\nUsage: context -a name "regex1" "regex2"')
group.add_argument("-r", "--rm", "--del", action='store_true', help="Delete a context.\nUsage: context -d name")
group.add_argument("--ls", action='store_true', help="List all contexts.\nUsage: context --ls")
group.add_argument("--set", action='store_true', help="Set the used context.\nUsage: context --set name")
group.add_argument("-s", "--show", action='store_true', help="Show the defined regex of a context.\nUsage: context --show name")
group.add_argument("-e", "--edit", "--mod", nargs='+', help='Modify an existing context.\nUsage: context --mod name "regex1" "regex2"')
class Entrypoint:
def __init__(self, args, parser, connapp):
if args.add and len(args.add) < 2:
parser.error("--add requires at least 2 arguments: name and at least one regex")
if args.edit and len(args.edit) < 2:
parser.error("--edit requires at least 2 arguments: name and at least one regex")
if args.ls and args.context_name is not None:
parser.error("--ls does not require a context name")
if args.rm and not args.context_name:
parser.error("--rm require a context name")
if args.set and not args.context_name:
parser.error("--set require a context name")
if args.show and not args.context_name:
parser.error("--show require a context name")
cm = context_manager(connapp)
if args.add:
cm.add_context(args.add[0], args.add[1:])
elif args.rm:
cm.delete_context(args.context_name)
elif args.ls:
cm.list_contexts()
elif args.edit:
cm.modify_context(args.edit[0], args.edit[1:])
elif args.set:
cm.set_context(args.context_name)
elif args.show:
cm.show_context(args.context_name)
def _connpy_completion(wordsnumber, words, info=None):
if wordsnumber == 3:
result = ["--help", "--add", "--del", "--rm", "--ls", "--set", "--show", "--edit", "--mod"]
elif wordsnumber == 4 and words[1] in ["--del", "-r", "--rm", "--set", "--edit", "--mod", "-e", "--show", "-s"]:
contexts = info["config"]["config"]["contexts"].keys()
current_context = info["config"]["config"]["current_context"]
default_context = "all"
if words[1] in ["--del", "-r", "--rm"]:
# Filter out default context and current context
result = [context for context in contexts if context not in [default_context, current_context]]
elif words[1] == "--set":
# Filter out current context
result = [context for context in contexts if context != current_context]
elif words[1] in ["--edit", "--mod", "-e"]:
# Filter out default context
result = [context for context in contexts if context != default_context]
elif words[1] in ["--show", "-s"]:
# No filter for show
result = list(contexts)
return result

View File

@@ -7,6 +7,7 @@ import tempfile
import io
import yaml
import threading
from connpy import printer
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
@@ -50,33 +51,33 @@ class sync:
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
print("Logged in successfully.")
printer.success("Logged in successfully.")
except RefreshError as e:
# If refresh fails, delete the invalid token file and start a new login flow
if os.path.exists(self.token_file):
os.remove(self.token_file)
print("Existing token was invalid and has been removed. Please log in again.")
printer.warning("Existing token was invalid and has been removed. Please log in again.")
flow = InstalledAppFlow.from_client_secrets_file(
self.google_client, self.scopes)
creds = flow.run_local_server(port=0, access_type='offline')
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
print("Logged in successfully after re-authentication.")
printer.success("Logged in successfully after re-authentication.")
def logout(self):
if os.path.exists(self.token_file):
os.remove(self.token_file)
print("Logged out successfully.")
printer.success("Logged out successfully.")
else:
print("No credentials file found. Already logged out.")
printer.info("No credentials file found. Already logged out.")
def get_credentials(self):
# Load credentials from token.json
if os.path.exists(self.token_file):
creds = Credentials.from_authorized_user_file(self.token_file, self.scopes)
else:
print("Credentials file not found.")
printer.error("Credentials file not found.")
return 0
# If there are no valid credentials available, ask the user to log in again
@@ -85,10 +86,10 @@ class sync:
try:
creds.refresh(Request())
except RefreshError:
print("Could not refresh access token. Please log in again.")
printer.warning("Could not refresh access token. Please log in again.")
return 0
else:
print("Credentials are missing or invalid. Please log in.")
printer.warning("Credentials are missing or invalid. Please log in.")
return 0
return creds
@@ -114,8 +115,8 @@ class sync:
return False
def status(self):
print(f"Login: {self.check_login_status()}")
print(f"Sync: {self.sync}")
printer.info(f"Login: {self.check_login_status()}")
printer.info(f"Sync: {self.sync}")
def get_appdata_files(self):
@@ -151,17 +152,18 @@ class sync:
return files_info
except HttpError as error:
print(f"An error occurred: {error}")
printer.error(f"An error occurred: {error}")
return 0
def dump_appdata_files_yaml(self):
files_info = self.get_appdata_files()
if not files_info:
print("Failed to retrieve files or no files found.")
printer.error("Failed to retrieve files or no files found.")
return
# Pretty print as YAML
yaml_output = yaml.dump(files_info, sort_keys=False, default_flow_style=False)
printer.custom("backups","")
print(yaml_output)
@@ -233,16 +235,16 @@ class sync:
oldest_file = min(app_data_files, key=lambda x: x['timestamp'])
delete_old = self.delete_file_by_id(oldest_file['id'])
if delete_old:
print(delete_old)
printer.error(delete_old)
return 1
# Upload the new file
upload_new = self.backup_file_to_drive(zip_path, timestamp)
if upload_new:
print(upload_new)
printer.error(upload_new)
return 1
print("Backup to google uploaded successfully.")
printer.success("Backup to google uploaded successfully.")
return 0
def decompress_zip(self, zip_path):
@@ -253,7 +255,7 @@ class sync:
zipf.extract(".osk", os.path.dirname(self.key))
return 0
except Exception as e:
print(f"An error occurred: {e}")
printer.error(f"An error occurred: {e}")
return 1
def download_file_by_id(self, file_id, destination_path):
@@ -282,14 +284,14 @@ class sync:
# Get the files in the app data folder
app_data_files = self.get_appdata_files()
if not app_data_files:
print("No files found in app data folder.")
printer.error("No files found in app data folder.")
return 1
# Check if a specific file_id was provided and if it exists in the list
if file_id:
selected_file = next((f for f in app_data_files if f['id'] == file_id), None)
if not selected_file:
print(f"No file found with ID: {file_id}")
printer.error(f"No file found with ID: {file_id}")
return 1
else:
# Find the latest file based on timestamp
@@ -302,10 +304,10 @@ class sync:
# Unzip the downloaded file to the destination folder
if self.decompress_zip(temp_download_path):
print("Failed to decompress the file.")
printer.error("Failed to decompress the file.")
return 1
print(f"Backup from Google Drive restored successfully: {selected_file['name']}")
printer.success(f"Backup from Google Drive restored successfully: {selected_file['name']}")
return 0
def config_listener_post(self, args, kwargs):
@@ -314,7 +316,7 @@ class sync:
if not kwargs["result"]:
self.compress_and_upload()
else:
print("Sync cannot be performed. Please check your login status.")
printer.warning("Sync cannot be performed. Please check your login status.")
return kwargs["result"]
def config_listener_pre(self, *args, **kwargs):
@@ -337,7 +339,6 @@ class Preload:
class Parser:
def __init__(self):
self.parser = argparse.ArgumentParser(description="Sync config with Google")
self.description = "Sync config with Google"
subparsers = self.parser.add_subparsers(title="Commands", dest='command',metavar="")
login_parser = subparsers.add_parser("login", help="Login to Google to enable synchronization")
logout_parser = subparsers.add_parser("logout", help="Logout from Google")

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
#Imports
from functools import wraps, partial, update_wrapper
from . import printer
#functions and classes
@@ -19,7 +20,7 @@ class MethodHook:
try:
args, kwargs = hook(*args, **kwargs)
except Exception as e:
print(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
printer.error(f"{self.func.__name__} Pre-hook {hook.__name__} raised an exception: {e}")
try:
result = self.func(*args, **kwargs)
@@ -30,7 +31,7 @@ class MethodHook:
try:
result = hook(*args, **kwargs, result=result) # Pass result to hooks
except Exception as e:
print(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
printer.error(f"{self.func.__name__} Post-hook {hook.__name__} raised an exception: {e}")
return result

View File

@@ -4,6 +4,7 @@ import importlib.util
import sys
import argparse
import os
from connpy import printer
class Plugins:
def __init__(self):
@@ -30,8 +31,7 @@ class Plugins:
### Verifications:
- The presence of only allowed top-level elements.
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
and 'self.description'.
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'.
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
If any of these checks fail, the function returns an error message indicating
@@ -77,11 +77,12 @@ class Plugins:
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
return "Parser class should only have __init__ method"
# Check if 'self.parser' and 'self.description' are assigned in __init__ method
# Check if 'self.parser' is assigned in __init__ method
init_method = node.body[0]
assigned_attrs = [target.attr for expr in init_method.body if isinstance(expr, ast.Assign) for target in expr.targets if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self']
if 'parser' not in assigned_attrs or 'description' not in assigned_attrs:
return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
if 'parser' not in assigned_attrs:
return "Parser class should set self.parser"
elif node.name == 'Entrypoint':
has_entrypoint = True
@@ -124,13 +125,14 @@ class Plugins:
filepath = os.path.join(directory, filename)
check_file = self.verify_script(filepath)
if check_file:
print(f"Failed to load plugin: {filename}. Reason: {check_file}")
printer.error(f"Failed to load plugin: {filename}. Reason: {check_file}")
continue
else:
self.plugins[root_filename] = self._import_from_path(filepath)
if hasattr(self.plugins[root_filename], "Parser"):
self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
plugin = self.plugin_parsers[root_filename]
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, usage=plugin.parser.usage, description=plugin.parser.description, epilog=plugin.parser.epilog, formatter_class=plugin.parser.formatter_class)
if hasattr(self.plugins[root_filename], "Preload"):
self.preloads[root_filename] = self.plugins[root_filename]

33
connpy/printer.py Normal file
View File

@@ -0,0 +1,33 @@
import sys
def _format_multiline(tag, message):
lines = message.splitlines()
if not lines:
return f"[{tag}]"
formatted = [f"[{tag}] {lines[0]}"]
indent = " " * (len(tag) + 3)
for line in lines[1:]:
formatted.append(f"{indent}{line}")
return "\n".join(formatted)
def info(message):
print(_format_multiline("i", message))
def success(message):
print(_format_multiline("", message))
def start(message):
print(_format_multiline("+", message))
def warning(message):
print(_format_multiline("!", message))
def error(message):
print(_format_multiline("", message), file=sys.stderr)
def debug(message):
print(_format_multiline("d", message))
def custom(tag, message):
print(_format_multiline(tag, message))

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,11 @@
Flask>=2.3.2
Flask_Cors>=4.0.1
google_api_python_client>=2.125.0
google_auth_oauthlib>=1.2.0
inquirer>=3.2.4
openai>=0.27.8
inquirer>=3.3.0
openai>=1.98.0
pexpect>=4.8.0
protobuf>=5.26.1
protobuf>=5.27.2
pycryptodome>=3.18.0
pyfzf>=0.3.1
PyYAML>=6.0.1

View File

@@ -4,7 +4,7 @@ version = attr: connpy._version.__version__
description = Connpy is a SSH/Telnet connection manager and automation module
long_description = file: README.md
long_description_content_type = text/markdown
keywords = networking, automation, ssh, telnet, connection manager
keywords = networking, automation, docker, kubernetes, ssh, telnet, connection manager
author = Federico Luzzi
author_email = fluzzi@gmail.com
url = https://github.com/fluzzi/connpy
@@ -29,6 +29,7 @@ install_requires =
pexpect
pycryptodome
Flask
Flask_Cors
pyfzf
waitress
PyYAML