openc2lib.actuators.iptables_manager
Iptables Manager
This module provides a dumb actuator that always answer with a fixed message. Use it for testing only.
1""" Iptables Manager 2 3 This module provides a dumb actuator that always answer with a fixed 4 message. Use it for testing only. 5""" 6from openc2lib import ArrayOf,ActionTargets, TargetEnum, Nsid, Version,Results, StatusCode, StatusCodeDescription, Actions, Command, Response, IPv4Net, IPv4Connection #, IPv6Net, IPv6Connection 7import subprocess 8# A dumb actuator that does not implement any function but can 9# be used to test the openc2 communication. 10class IptablesManager: 11 12 @staticmethod 13 def parse_iptables(cmd): 14 try: 15 result = subprocess.run(cmd, 16 shell=True, 17 check=True, 18 stdout=subprocess.PIPE, 19 stderr=subprocess.PIPE) 20 if result.returncode == 0: 21 print(f"Command executed successfully: {cmd}") 22 print(f"Output: {result.stdout.decode('utf-8')}") 23 return 200 24 else: 25 print(f"Command failed: {cmd}") 26 print(f"Error: {result.stderr.decode('utf-8')}") 27 return 500 28 except subprocess.CalledProcessError as e: 29 print(f"Execution error for command: {cmd}") 30 print(f"Exception: {str(e)}") 31 return 500 32 33 @staticmethod 34 def insert_rule(target, iptables_target): 35 print("Starting insert rule", target, iptables_target) 36 supported_targets = ['ipv4_connection', 'ipv6_connection', 'ipv4_net', 'ipv6_net'] 37 cmd = None 38 base_cmd = "iptables -A INPUT" 39 40 print("type of target: ", type(target)) 41 if isinstance(target, IPv4Connection): # or isinstance(target.IPv6Connection): 42 src_ip = target.src_addr 43 dst_ip = target.dst_addr 44 print("protocol: ", protocol) 45 src_port = target.src_port 46 dst_port = target.dst_port 47 protocol = target.protocol.name 48 print("protocol: ", protocol) 49 cmd = f"{base_cmd} -p {protocol}" 50 if src_ip: 51 cmd += f" -s {src_ip}" 52 if dst_ip: 53 cmd += f" -d {dst_ip}" 54 if src_port: 55 cmd += f" --sport {src_port}" 56 if dst_port: 57 cmd += f" --dport {dst_port}" 58 cmd += f" -j {iptables_target}" 59 60 elif isinstance(target, IPv4Net): # or isinstance(target.IPv6Net): 61 ip = target.addr() 62 cidr = target.prefix() 63 cmd = f"{base_cmd} -s {ip}" 64 if cidr: 65 cmd += f"/{cidr}" 66 cmd += f" -j {iptables_target}" 67 else: 68 return 501 69 70 print("cmd: ", cmd) 71 72 if cmd: 73 result = IptablesManager.parse_iptables([cmd]) 74 print("result: ", result) 75# result[0]['command'] = cmd 76 return result, cmd 77 78 return 500 79 80 @staticmethod 81 def delete_rule(cmd): 82# cmd_parts = additional_cmds[0].split() 83# print("I am here", cmd_parts) 84# if len(cmd_parts) > 1 and cmd_parts[1].startswith('-'): 85# print("split") 86# cmd_parts[1] = '-D' 87# cmd = ' '.join(cmd_parts) 88# print("iptables command: ", cmd) 89 return IptablesManager.parse_iptables(cmd) 90 91 def modify_command_for_deletion(cmd): 92 cmd_parts = cmd.split() 93 if "INPUT" in cmd_parts: 94 input_index = cmd_parts.index("INPUT") 95 if input_index + 1 < len(cmd_parts) and cmd_parts[input_index + 1].isdigit(): 96 del cmd_parts[input_index + 1] 97 for i, part in enumerate(cmd_parts): 98 if part.startswith('-') and not part.startswith('-D'): 99 cmd_parts[i] = '-D' 100 break 101 modified_cmd = ' '.join(cmd_parts) 102 return modified_cmd
class
IptablesManager:
11class IptablesManager: 12 13 @staticmethod 14 def parse_iptables(cmd): 15 try: 16 result = subprocess.run(cmd, 17 shell=True, 18 check=True, 19 stdout=subprocess.PIPE, 20 stderr=subprocess.PIPE) 21 if result.returncode == 0: 22 print(f"Command executed successfully: {cmd}") 23 print(f"Output: {result.stdout.decode('utf-8')}") 24 return 200 25 else: 26 print(f"Command failed: {cmd}") 27 print(f"Error: {result.stderr.decode('utf-8')}") 28 return 500 29 except subprocess.CalledProcessError as e: 30 print(f"Execution error for command: {cmd}") 31 print(f"Exception: {str(e)}") 32 return 500 33 34 @staticmethod 35 def insert_rule(target, iptables_target): 36 print("Starting insert rule", target, iptables_target) 37 supported_targets = ['ipv4_connection', 'ipv6_connection', 'ipv4_net', 'ipv6_net'] 38 cmd = None 39 base_cmd = "iptables -A INPUT" 40 41 print("type of target: ", type(target)) 42 if isinstance(target, IPv4Connection): # or isinstance(target.IPv6Connection): 43 src_ip = target.src_addr 44 dst_ip = target.dst_addr 45 print("protocol: ", protocol) 46 src_port = target.src_port 47 dst_port = target.dst_port 48 protocol = target.protocol.name 49 print("protocol: ", protocol) 50 cmd = f"{base_cmd} -p {protocol}" 51 if src_ip: 52 cmd += f" -s {src_ip}" 53 if dst_ip: 54 cmd += f" -d {dst_ip}" 55 if src_port: 56 cmd += f" --sport {src_port}" 57 if dst_port: 58 cmd += f" --dport {dst_port}" 59 cmd += f" -j {iptables_target}" 60 61 elif isinstance(target, IPv4Net): # or isinstance(target.IPv6Net): 62 ip = target.addr() 63 cidr = target.prefix() 64 cmd = f"{base_cmd} -s {ip}" 65 if cidr: 66 cmd += f"/{cidr}" 67 cmd += f" -j {iptables_target}" 68 else: 69 return 501 70 71 print("cmd: ", cmd) 72 73 if cmd: 74 result = IptablesManager.parse_iptables([cmd]) 75 print("result: ", result) 76# result[0]['command'] = cmd 77 return result, cmd 78 79 return 500 80 81 @staticmethod 82 def delete_rule(cmd): 83# cmd_parts = additional_cmds[0].split() 84# print("I am here", cmd_parts) 85# if len(cmd_parts) > 1 and cmd_parts[1].startswith('-'): 86# print("split") 87# cmd_parts[1] = '-D' 88# cmd = ' '.join(cmd_parts) 89# print("iptables command: ", cmd) 90 return IptablesManager.parse_iptables(cmd) 91 92 def modify_command_for_deletion(cmd): 93 cmd_parts = cmd.split() 94 if "INPUT" in cmd_parts: 95 input_index = cmd_parts.index("INPUT") 96 if input_index + 1 < len(cmd_parts) and cmd_parts[input_index + 1].isdigit(): 97 del cmd_parts[input_index + 1] 98 for i, part in enumerate(cmd_parts): 99 if part.startswith('-') and not part.startswith('-D'): 100 cmd_parts[i] = '-D' 101 break 102 modified_cmd = ' '.join(cmd_parts) 103 return modified_cmd
@staticmethod
def
parse_iptables(cmd):
13 @staticmethod 14 def parse_iptables(cmd): 15 try: 16 result = subprocess.run(cmd, 17 shell=True, 18 check=True, 19 stdout=subprocess.PIPE, 20 stderr=subprocess.PIPE) 21 if result.returncode == 0: 22 print(f"Command executed successfully: {cmd}") 23 print(f"Output: {result.stdout.decode('utf-8')}") 24 return 200 25 else: 26 print(f"Command failed: {cmd}") 27 print(f"Error: {result.stderr.decode('utf-8')}") 28 return 500 29 except subprocess.CalledProcessError as e: 30 print(f"Execution error for command: {cmd}") 31 print(f"Exception: {str(e)}") 32 return 500
@staticmethod
def
insert_rule(target, iptables_target):
34 @staticmethod 35 def insert_rule(target, iptables_target): 36 print("Starting insert rule", target, iptables_target) 37 supported_targets = ['ipv4_connection', 'ipv6_connection', 'ipv4_net', 'ipv6_net'] 38 cmd = None 39 base_cmd = "iptables -A INPUT" 40 41 print("type of target: ", type(target)) 42 if isinstance(target, IPv4Connection): # or isinstance(target.IPv6Connection): 43 src_ip = target.src_addr 44 dst_ip = target.dst_addr 45 print("protocol: ", protocol) 46 src_port = target.src_port 47 dst_port = target.dst_port 48 protocol = target.protocol.name 49 print("protocol: ", protocol) 50 cmd = f"{base_cmd} -p {protocol}" 51 if src_ip: 52 cmd += f" -s {src_ip}" 53 if dst_ip: 54 cmd += f" -d {dst_ip}" 55 if src_port: 56 cmd += f" --sport {src_port}" 57 if dst_port: 58 cmd += f" --dport {dst_port}" 59 cmd += f" -j {iptables_target}" 60 61 elif isinstance(target, IPv4Net): # or isinstance(target.IPv6Net): 62 ip = target.addr() 63 cidr = target.prefix() 64 cmd = f"{base_cmd} -s {ip}" 65 if cidr: 66 cmd += f"/{cidr}" 67 cmd += f" -j {iptables_target}" 68 else: 69 return 501 70 71 print("cmd: ", cmd) 72 73 if cmd: 74 result = IptablesManager.parse_iptables([cmd]) 75 print("result: ", result) 76# result[0]['command'] = cmd 77 return result, cmd 78 79 return 500
@staticmethod
def
delete_rule(cmd):
81 @staticmethod 82 def delete_rule(cmd): 83# cmd_parts = additional_cmds[0].split() 84# print("I am here", cmd_parts) 85# if len(cmd_parts) > 1 and cmd_parts[1].startswith('-'): 86# print("split") 87# cmd_parts[1] = '-D' 88# cmd = ' '.join(cmd_parts) 89# print("iptables command: ", cmd) 90 return IptablesManager.parse_iptables(cmd)
def
modify_command_for_deletion(cmd):
92 def modify_command_for_deletion(cmd): 93 cmd_parts = cmd.split() 94 if "INPUT" in cmd_parts: 95 input_index = cmd_parts.index("INPUT") 96 if input_index + 1 < len(cmd_parts) and cmd_parts[input_index + 1].isdigit(): 97 del cmd_parts[input_index + 1] 98 for i, part in enumerate(cmd_parts): 99 if part.startswith('-') and not part.startswith('-D'): 100 cmd_parts[i] = '-D' 101 break 102 modified_cmd = ' '.join(cmd_parts) 103 return modified_cmd