openc2lib.actuators.iptables_manager

Iptables Manager

This module provides a dumb actuator that always answer with a fixed message. Use it for testing only.

  1""" Iptables Manager
  2
  3	This module provides a dumb actuator that always answer with a fixed 
  4	message. Use it for testing only.
  5"""
  6from openc2lib import ArrayOf,ActionTargets, TargetEnum, Nsid, Version,Results, StatusCode, StatusCodeDescription, Actions, Command, Response, IPv4Net, IPv4Connection #, IPv6Net, IPv6Connection
  7import subprocess
  8# A dumb actuator that does not implement any function but can
  9# be used to test the openc2 communication.
 10class IptablesManager:
 11
 12	@staticmethod
 13	def parse_iptables(cmd):
 14		try:
 15		     result = subprocess.run(cmd,
 16		                             shell=True,
 17		                             check=True,
 18		                             stdout=subprocess.PIPE,
 19		                             stderr=subprocess.PIPE)
 20		     if result.returncode == 0:
 21		         print(f"Command executed successfully: {cmd}")
 22		         print(f"Output: {result.stdout.decode('utf-8')}")
 23		         return 200
 24		     else:
 25		         print(f"Command failed: {cmd}")
 26		         print(f"Error: {result.stderr.decode('utf-8')}")
 27		         return 500
 28		except subprocess.CalledProcessError as e:
 29		     print(f"Execution error for command: {cmd}")
 30		     print(f"Exception: {str(e)}")
 31		     return 500
 32
 33	@staticmethod
 34	def insert_rule(target, iptables_target):
 35		print("Starting insert rule", target, iptables_target)
 36		supported_targets = ['ipv4_connection', 'ipv6_connection', 'ipv4_net', 'ipv6_net']
 37		cmd = None
 38		base_cmd = "iptables -A INPUT"
 39
 40		print("type of target: ", type(target))
 41		if isinstance(target, IPv4Connection): # or isinstance(target.IPv6Connection):
 42			src_ip = target.src_addr
 43			dst_ip = target.dst_addr
 44			print("protocol: ", protocol)
 45			src_port = target.src_port
 46			dst_port = target.dst_port
 47			protocol = target.protocol.name
 48			print("protocol: ", protocol)
 49			cmd = f"{base_cmd} -p {protocol}"
 50			if src_ip:
 51				cmd += f" -s {src_ip}"
 52			if dst_ip:
 53				cmd += f" -d {dst_ip}"
 54			if src_port:
 55				cmd += f" --sport {src_port}"
 56			if dst_port:
 57				cmd += f" --dport {dst_port}"
 58			cmd += f" -j {iptables_target}"
 59
 60		elif isinstance(target, IPv4Net): # or isinstance(target.IPv6Net):
 61			ip = target.addr()
 62			cidr = target.prefix()
 63			cmd = f"{base_cmd} -s {ip}"
 64			if cidr:
 65				cmd += f"/{cidr}"
 66			cmd += f" -j {iptables_target}"
 67		else:
 68			return 501
 69
 70		print("cmd: ", cmd)
 71
 72		if cmd:
 73			result = IptablesManager.parse_iptables([cmd])
 74			print("result: ", result)
 75#	result[0]['command'] = cmd
 76			return result, cmd
 77
 78		return 500
 79
 80	@staticmethod
 81	def delete_rule(cmd):
 82#		cmd_parts = additional_cmds[0].split()
 83#		print("I am here", cmd_parts)
 84#		if len(cmd_parts) > 1 and cmd_parts[1].startswith('-'):
 85#			print("split")
 86#			cmd_parts[1] = '-D'
 87#			cmd = ' '.join(cmd_parts)
 88#			print("iptables command: ", cmd)
 89		return IptablesManager.parse_iptables(cmd)
 90
 91	def modify_command_for_deletion(cmd):
 92		cmd_parts = cmd.split()
 93		if "INPUT" in cmd_parts:
 94			input_index = cmd_parts.index("INPUT")
 95			if input_index + 1 < len(cmd_parts) and cmd_parts[input_index + 1].isdigit():
 96				del cmd_parts[input_index + 1]
 97		for i, part in enumerate(cmd_parts):
 98			if part.startswith('-') and not part.startswith('-D'):
 99				cmd_parts[i] = '-D'
100				break
101		modified_cmd = ' '.join(cmd_parts)
102		return modified_cmd
class IptablesManager:
 11class IptablesManager:
 12
 13	@staticmethod
 14	def parse_iptables(cmd):
 15		try:
 16		     result = subprocess.run(cmd,
 17		                             shell=True,
 18		                             check=True,
 19		                             stdout=subprocess.PIPE,
 20		                             stderr=subprocess.PIPE)
 21		     if result.returncode == 0:
 22		         print(f"Command executed successfully: {cmd}")
 23		         print(f"Output: {result.stdout.decode('utf-8')}")
 24		         return 200
 25		     else:
 26		         print(f"Command failed: {cmd}")
 27		         print(f"Error: {result.stderr.decode('utf-8')}")
 28		         return 500
 29		except subprocess.CalledProcessError as e:
 30		     print(f"Execution error for command: {cmd}")
 31		     print(f"Exception: {str(e)}")
 32		     return 500
 33
 34	@staticmethod
 35	def insert_rule(target, iptables_target):
 36		print("Starting insert rule", target, iptables_target)
 37		supported_targets = ['ipv4_connection', 'ipv6_connection', 'ipv4_net', 'ipv6_net']
 38		cmd = None
 39		base_cmd = "iptables -A INPUT"
 40
 41		print("type of target: ", type(target))
 42		if isinstance(target, IPv4Connection): # or isinstance(target.IPv6Connection):
 43			src_ip = target.src_addr
 44			dst_ip = target.dst_addr
 45			print("protocol: ", protocol)
 46			src_port = target.src_port
 47			dst_port = target.dst_port
 48			protocol = target.protocol.name
 49			print("protocol: ", protocol)
 50			cmd = f"{base_cmd} -p {protocol}"
 51			if src_ip:
 52				cmd += f" -s {src_ip}"
 53			if dst_ip:
 54				cmd += f" -d {dst_ip}"
 55			if src_port:
 56				cmd += f" --sport {src_port}"
 57			if dst_port:
 58				cmd += f" --dport {dst_port}"
 59			cmd += f" -j {iptables_target}"
 60
 61		elif isinstance(target, IPv4Net): # or isinstance(target.IPv6Net):
 62			ip = target.addr()
 63			cidr = target.prefix()
 64			cmd = f"{base_cmd} -s {ip}"
 65			if cidr:
 66				cmd += f"/{cidr}"
 67			cmd += f" -j {iptables_target}"
 68		else:
 69			return 501
 70
 71		print("cmd: ", cmd)
 72
 73		if cmd:
 74			result = IptablesManager.parse_iptables([cmd])
 75			print("result: ", result)
 76#	result[0]['command'] = cmd
 77			return result, cmd
 78
 79		return 500
 80
 81	@staticmethod
 82	def delete_rule(cmd):
 83#		cmd_parts = additional_cmds[0].split()
 84#		print("I am here", cmd_parts)
 85#		if len(cmd_parts) > 1 and cmd_parts[1].startswith('-'):
 86#			print("split")
 87#			cmd_parts[1] = '-D'
 88#			cmd = ' '.join(cmd_parts)
 89#			print("iptables command: ", cmd)
 90		return IptablesManager.parse_iptables(cmd)
 91
 92	def modify_command_for_deletion(cmd):
 93		cmd_parts = cmd.split()
 94		if "INPUT" in cmd_parts:
 95			input_index = cmd_parts.index("INPUT")
 96			if input_index + 1 < len(cmd_parts) and cmd_parts[input_index + 1].isdigit():
 97				del cmd_parts[input_index + 1]
 98		for i, part in enumerate(cmd_parts):
 99			if part.startswith('-') and not part.startswith('-D'):
100				cmd_parts[i] = '-D'
101				break
102		modified_cmd = ' '.join(cmd_parts)
103		return modified_cmd
@staticmethod
def parse_iptables(cmd):
13	@staticmethod
14	def parse_iptables(cmd):
15		try:
16		     result = subprocess.run(cmd,
17		                             shell=True,
18		                             check=True,
19		                             stdout=subprocess.PIPE,
20		                             stderr=subprocess.PIPE)
21		     if result.returncode == 0:
22		         print(f"Command executed successfully: {cmd}")
23		         print(f"Output: {result.stdout.decode('utf-8')}")
24		         return 200
25		     else:
26		         print(f"Command failed: {cmd}")
27		         print(f"Error: {result.stderr.decode('utf-8')}")
28		         return 500
29		except subprocess.CalledProcessError as e:
30		     print(f"Execution error for command: {cmd}")
31		     print(f"Exception: {str(e)}")
32		     return 500
@staticmethod
def insert_rule(target, iptables_target):
34	@staticmethod
35	def insert_rule(target, iptables_target):
36		print("Starting insert rule", target, iptables_target)
37		supported_targets = ['ipv4_connection', 'ipv6_connection', 'ipv4_net', 'ipv6_net']
38		cmd = None
39		base_cmd = "iptables -A INPUT"
40
41		print("type of target: ", type(target))
42		if isinstance(target, IPv4Connection): # or isinstance(target.IPv6Connection):
43			src_ip = target.src_addr
44			dst_ip = target.dst_addr
45			print("protocol: ", protocol)
46			src_port = target.src_port
47			dst_port = target.dst_port
48			protocol = target.protocol.name
49			print("protocol: ", protocol)
50			cmd = f"{base_cmd} -p {protocol}"
51			if src_ip:
52				cmd += f" -s {src_ip}"
53			if dst_ip:
54				cmd += f" -d {dst_ip}"
55			if src_port:
56				cmd += f" --sport {src_port}"
57			if dst_port:
58				cmd += f" --dport {dst_port}"
59			cmd += f" -j {iptables_target}"
60
61		elif isinstance(target, IPv4Net): # or isinstance(target.IPv6Net):
62			ip = target.addr()
63			cidr = target.prefix()
64			cmd = f"{base_cmd} -s {ip}"
65			if cidr:
66				cmd += f"/{cidr}"
67			cmd += f" -j {iptables_target}"
68		else:
69			return 501
70
71		print("cmd: ", cmd)
72
73		if cmd:
74			result = IptablesManager.parse_iptables([cmd])
75			print("result: ", result)
76#	result[0]['command'] = cmd
77			return result, cmd
78
79		return 500
@staticmethod
def delete_rule(cmd):
81	@staticmethod
82	def delete_rule(cmd):
83#		cmd_parts = additional_cmds[0].split()
84#		print("I am here", cmd_parts)
85#		if len(cmd_parts) > 1 and cmd_parts[1].startswith('-'):
86#			print("split")
87#			cmd_parts[1] = '-D'
88#			cmd = ' '.join(cmd_parts)
89#			print("iptables command: ", cmd)
90		return IptablesManager.parse_iptables(cmd)
def modify_command_for_deletion(cmd):
 92	def modify_command_for_deletion(cmd):
 93		cmd_parts = cmd.split()
 94		if "INPUT" in cmd_parts:
 95			input_index = cmd_parts.index("INPUT")
 96			if input_index + 1 < len(cmd_parts) and cmd_parts[input_index + 1].isdigit():
 97				del cmd_parts[input_index + 1]
 98		for i, part in enumerate(cmd_parts):
 99			if part.startswith('-') and not part.startswith('-D'):
100				cmd_parts[i] = '-D'
101				break
102		modified_cmd = ' '.join(cmd_parts)
103		return modified_cmd