openc2lib.core.consumer

OpenC2 Consumer

The Consumer implements the expected behaviour of an OpenC2 Consumer server that dispatches OpenC2 Commands to the Actuators.

  1"""OpenC2 Consumer
  2
  3The `Consumer` implements the expected behaviour of an OpenC2 Consumer server that dispatches OpenC2 Commands
  4to the Actuators.
  5"""
  6
  7import logging
  8
  9from openc2lib.types.datatypes import DateTime, ResponseType
 10
 11from openc2lib.core.encoder import Encoder
 12from openc2lib.core.transfer import Transfer
 13from openc2lib.core.message import Message, Response
 14from openc2lib.core.response import StatusCode, StatusCodeDescription
 15
 16logger = logging.getLogger('openc2')
 17
 18class Consumer:
 19	"""OpenC2 Consumer
 20
 21		The `Consumer` is designed to dispatch OpenC2 `Message`s to the relevant `Actuator`. 
 22		The current implementation receives the configuration at initialization time. It is therefore
 23		not conceived to be runned itself as a service, but to be integrated in an external component 
 24		that reads the relevant configuration from file and passes it to the Consumer.
 25
 26		The `Consumer` has two main tasks:
 27		- creating the OpenC2 stack to process Messages (namely the combination of an Encoding format and
 28				a Transfer protocol);
 29		- dispatching incoming `Command`s to the relevant `Actuator`.
 30
 31		Each `Consumer` will only run a single `Transfer` protocol. All registered `Encoder`s can be used,
 32		and a default `Encoder` is explicitely given that will be used when no other selection is available 
 33		(e.g., to answer Messages that the Consumer does not understand).
 34		
 35	"""
 36	def __init__(self, consumer: str, actuators: [] =None, encoder: Encoder = None, transfer: Transfer = None):
 37		""" Create a `Consumer`
 38			:param consumer: This is a string that identifies the `Consumer` and is used in `from` 
 39				and `to` fields of the OpenC2 `Message` (see Table 3.1 of the Language Specification.
 40			:param actuators: This must be a list of available `Actuator`s. The list contains the
 41				`Actuator` instances that will be used by the `Consumer`.
 42			:param encoder: This is an instance of the `Encoder` that will be used by default.
 43			:param transfer: This is the `Transfer` protocol that will be used to send/receive `Message`s.
 44		"""
 45		self.consumer = consumer
 46		self.encoder = encoder
 47		self.transfer = transfer
 48		self.actuators = actuators
 49
 50		# TODO: Read configuration from file
 51
 52	# TODO: Manage non-blocking implementation of the Transfer.receive() function
 53	def run(self, encoder: Encoder = None, transfer: Transfer = None):
 54		"""Runs a `Consumer`
 55
 56			This is the entry point of the `Consumer`. It must be invoked to start operation of the `Consumer`.
 57			This method may be blocking, depending on the implementation of the `receive()` method of the 
 58			used `Transfer`.
 59
 60			The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using 
 61			different `Encoder`s and `Transfer`s). This feature clearly requires the `Transfer` 
 62			implementation to be non-blocking.
 63
 64			:param encoder: A different `Encoder` that might be passed to overwrite what set at initialization time. 
 65			:param transfer: A different `Transfer` that might be passed to overwrite what set at initialization time.
 66			:return: None.
 67		"""
 68		if not encoder: encoder = self.encoder
 69		if not transfer: transfer = self.transfer
 70		if not transfer: raise ValueError('Missing transfer object')
 71
 72		transfer.receive(self.dispatch, self.encoder)
 73
 74
 75	def dispatch(self, msg):
 76		""" Dispatches Commands to Actuators
 77
 78			This method scans the actuator profile carried in the `Command` and select one or more
 79			`Actuator`s that will process the `Command`. 
 80			
 81			The current implementation is only meant to be used within the
 82			implementation of `Transfer` protocols as a callback for returning control to the main code.
 83			This approach is motivated by those Transfer protocols that replies to messages on the same 
 84			TCP connection, so to avoid errors with NAT and firewalls 
 85			(if a Command were passed back from the `Transfer.receive()` and processed within the `Consumer.run()`, 
 86			 the following `Transfer.send() would use a different TCP connection).
 87			
 88			:param msg: The full openc2lib `Message` that embeds the `Command` to be processed.
 89			:return: A `Message` that embeds the `Response` (from the `Actuator` or elaborated by the `Consumer` in
 90					case of errors).
 91		"""
 92		#TODO: The logic to select the actuator that matches the request
 93		# OC2 Architecture, Sec. 2.1:
 94		# The Profile field, if present, specifies the profile that defines the function 
 95		# to be performed. A Consumer executes the command if it supports the specified 
 96		# profile, otherwise the command is ignored. The Profile field may be omitted and 
 97		# typically will not be included in implementations where the functions of the 
 98		# recipients are unambiguous or when a high- level effects-based command is 
 99		# desired and tactical decisions on how the effect is achieved is left to the 
100		# recipient. If Profile is omitted and the recipient supports multiple profiles, 
101		# the command will be executed in the context of each profile that supports the 
102		# command's combination of action and target.
103		try:
104			profile = msg.content.actuator.getName()
105		except AttributeError:
106			# For a packet filter-only consumer, the following may apply:
107			# profile = slpf.nsid
108			# Default: execute in the context of multiple profiles
109			profile = None
110			# TODO: how to mix responses from multiple actuators?
111			# Workaround: strictly require a profile to be present
112			response = Response(status=StatusCode.BADREQUEST, status_text='Missing profile')
113			return self.__respmsg(msg, response)
114
115		try:
116			asset_id = msg.content.actuator.getObj()['asset_id']
117		except KeyError:
118			# assed_id = None means the default actuator that implements the required profile
119			asset_id = None
120
121		try:
122			if profile == None:
123				# Select all actuators
124				actuator = list(self.actuators.values())
125			elif asset_id == None:
126				# Select all actuators that implement the specific profile
127				actuator = list(dict(filter(lambda p: p[0][0]==profile, self.actuators.items())).values())
128			else:
129				# Only one instance is expected to be present in this case
130				actuator = [self.actuators[(profile,asset_id)]]
131		except KeyError:
132			response = Response(status=StatusCode.NOTFOUND, status_text='No actuator available')
133			return self.__respmsg(msg, response)
134
135		response_content = None
136		if msg.content.args:
137			if 'response_requested' in msg.content.args.keys():
138				match msg.content.args['response_requested']:
139					case ResponseType.none:
140						response_content = None
141					case ResponseType.ack:
142						response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING])
143						# TODO: Spawn a process to run the process offline
144						logger.warn("Command: %s not run! -- Missing code")
145					case ResponseType.status:
146						response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING])
147						# TODO: Spawn a process to run the process offline
148						logger.warn("Command: %s not run! -- Missing code")
149					case ResponseType.complete:
150						response_content = self.__runcmd(msg, actuator)
151					case _:
152						response_content = Response(status=StatusCode.BADREQUEST, status_text="Invalid response requested")
153
154		if not response_content:
155			# Default: ResponseType == complete. Return an answer after the command is executed.
156			response_content = self.__runcmd(msg, actuator)
157					
158		logger.debug("Actuator %s returned: %s", actuator, response_content)
159
160		# Add the metadata to be returned to the Producer
161		return self.__respmsg(msg, response_content)
162
163	def __runcmd(self, msg, actuator):
164		# Run the command and collect the response
165		# TODO: Define how to manage concurrent execution from more than one actuator
166		try:
167			# TODO: How to merge multiple responses?
168			# for a in actuators.items(): 
169			response_content = actuator[0].run(msg.content) 
170		except (IndexError,AttributeError):
171			response_content = Response(status=StatusCode.NOTFOUND, status_text='No actuator available')
172
173		return response_content
174
175	def __respmsg(self, msg, response):
176		if response:
177			respmsg = Message(response)
178			respmsg.from_=self.consumer
179			respmsg.to=[msg.from_]
180			respmsg.content_type=msg.content_type
181			respmsg.request_id=msg.request_id
182			respmsg.created=int(DateTime())
183			respmsg.status=response['status']
184		else:
185			respmsg = None
186		logger.debug("Response to be sent: %s", respmsg)
187
188		return respmsg
189
190
191
192# TODO: Add main to load configuration from file
logger = <Logger openc2 (WARNING)>
class Consumer:
 19class Consumer:
 20	"""OpenC2 Consumer
 21
 22		The `Consumer` is designed to dispatch OpenC2 `Message`s to the relevant `Actuator`. 
 23		The current implementation receives the configuration at initialization time. It is therefore
 24		not conceived to be runned itself as a service, but to be integrated in an external component 
 25		that reads the relevant configuration from file and passes it to the Consumer.
 26
 27		The `Consumer` has two main tasks:
 28		- creating the OpenC2 stack to process Messages (namely the combination of an Encoding format and
 29				a Transfer protocol);
 30		- dispatching incoming `Command`s to the relevant `Actuator`.
 31
 32		Each `Consumer` will only run a single `Transfer` protocol. All registered `Encoder`s can be used,
 33		and a default `Encoder` is explicitely given that will be used when no other selection is available 
 34		(e.g., to answer Messages that the Consumer does not understand).
 35		
 36	"""
 37	def __init__(self, consumer: str, actuators: [] =None, encoder: Encoder = None, transfer: Transfer = None):
 38		""" Create a `Consumer`
 39			:param consumer: This is a string that identifies the `Consumer` and is used in `from` 
 40				and `to` fields of the OpenC2 `Message` (see Table 3.1 of the Language Specification.
 41			:param actuators: This must be a list of available `Actuator`s. The list contains the
 42				`Actuator` instances that will be used by the `Consumer`.
 43			:param encoder: This is an instance of the `Encoder` that will be used by default.
 44			:param transfer: This is the `Transfer` protocol that will be used to send/receive `Message`s.
 45		"""
 46		self.consumer = consumer
 47		self.encoder = encoder
 48		self.transfer = transfer
 49		self.actuators = actuators
 50
 51		# TODO: Read configuration from file
 52
 53	# TODO: Manage non-blocking implementation of the Transfer.receive() function
 54	def run(self, encoder: Encoder = None, transfer: Transfer = None):
 55		"""Runs a `Consumer`
 56
 57			This is the entry point of the `Consumer`. It must be invoked to start operation of the `Consumer`.
 58			This method may be blocking, depending on the implementation of the `receive()` method of the 
 59			used `Transfer`.
 60
 61			The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using 
 62			different `Encoder`s and `Transfer`s). This feature clearly requires the `Transfer` 
 63			implementation to be non-blocking.
 64
 65			:param encoder: A different `Encoder` that might be passed to overwrite what set at initialization time. 
 66			:param transfer: A different `Transfer` that might be passed to overwrite what set at initialization time.
 67			:return: None.
 68		"""
 69		if not encoder: encoder = self.encoder
 70		if not transfer: transfer = self.transfer
 71		if not transfer: raise ValueError('Missing transfer object')
 72
 73		transfer.receive(self.dispatch, self.encoder)
 74
 75
 76	def dispatch(self, msg):
 77		""" Dispatches Commands to Actuators
 78
 79			This method scans the actuator profile carried in the `Command` and select one or more
 80			`Actuator`s that will process the `Command`. 
 81			
 82			The current implementation is only meant to be used within the
 83			implementation of `Transfer` protocols as a callback for returning control to the main code.
 84			This approach is motivated by those Transfer protocols that replies to messages on the same 
 85			TCP connection, so to avoid errors with NAT and firewalls 
 86			(if a Command were passed back from the `Transfer.receive()` and processed within the `Consumer.run()`, 
 87			 the following `Transfer.send() would use a different TCP connection).
 88			
 89			:param msg: The full openc2lib `Message` that embeds the `Command` to be processed.
 90			:return: A `Message` that embeds the `Response` (from the `Actuator` or elaborated by the `Consumer` in
 91					case of errors).
 92		"""
 93		#TODO: The logic to select the actuator that matches the request
 94		# OC2 Architecture, Sec. 2.1:
 95		# The Profile field, if present, specifies the profile that defines the function 
 96		# to be performed. A Consumer executes the command if it supports the specified 
 97		# profile, otherwise the command is ignored. The Profile field may be omitted and 
 98		# typically will not be included in implementations where the functions of the 
 99		# recipients are unambiguous or when a high- level effects-based command is 
100		# desired and tactical decisions on how the effect is achieved is left to the 
101		# recipient. If Profile is omitted and the recipient supports multiple profiles, 
102		# the command will be executed in the context of each profile that supports the 
103		# command's combination of action and target.
104		try:
105			profile = msg.content.actuator.getName()
106		except AttributeError:
107			# For a packet filter-only consumer, the following may apply:
108			# profile = slpf.nsid
109			# Default: execute in the context of multiple profiles
110			profile = None
111			# TODO: how to mix responses from multiple actuators?
112			# Workaround: strictly require a profile to be present
113			response = Response(status=StatusCode.BADREQUEST, status_text='Missing profile')
114			return self.__respmsg(msg, response)
115
116		try:
117			asset_id = msg.content.actuator.getObj()['asset_id']
118		except KeyError:
119			# assed_id = None means the default actuator that implements the required profile
120			asset_id = None
121
122		try:
123			if profile == None:
124				# Select all actuators
125				actuator = list(self.actuators.values())
126			elif asset_id == None:
127				# Select all actuators that implement the specific profile
128				actuator = list(dict(filter(lambda p: p[0][0]==profile, self.actuators.items())).values())
129			else:
130				# Only one instance is expected to be present in this case
131				actuator = [self.actuators[(profile,asset_id)]]
132		except KeyError:
133			response = Response(status=StatusCode.NOTFOUND, status_text='No actuator available')
134			return self.__respmsg(msg, response)
135
136		response_content = None
137		if msg.content.args:
138			if 'response_requested' in msg.content.args.keys():
139				match msg.content.args['response_requested']:
140					case ResponseType.none:
141						response_content = None
142					case ResponseType.ack:
143						response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING])
144						# TODO: Spawn a process to run the process offline
145						logger.warn("Command: %s not run! -- Missing code")
146					case ResponseType.status:
147						response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING])
148						# TODO: Spawn a process to run the process offline
149						logger.warn("Command: %s not run! -- Missing code")
150					case ResponseType.complete:
151						response_content = self.__runcmd(msg, actuator)
152					case _:
153						response_content = Response(status=StatusCode.BADREQUEST, status_text="Invalid response requested")
154
155		if not response_content:
156			# Default: ResponseType == complete. Return an answer after the command is executed.
157			response_content = self.__runcmd(msg, actuator)
158					
159		logger.debug("Actuator %s returned: %s", actuator, response_content)
160
161		# Add the metadata to be returned to the Producer
162		return self.__respmsg(msg, response_content)
163
164	def __runcmd(self, msg, actuator):
165		# Run the command and collect the response
166		# TODO: Define how to manage concurrent execution from more than one actuator
167		try:
168			# TODO: How to merge multiple responses?
169			# for a in actuators.items(): 
170			response_content = actuator[0].run(msg.content) 
171		except (IndexError,AttributeError):
172			response_content = Response(status=StatusCode.NOTFOUND, status_text='No actuator available')
173
174		return response_content
175
176	def __respmsg(self, msg, response):
177		if response:
178			respmsg = Message(response)
179			respmsg.from_=self.consumer
180			respmsg.to=[msg.from_]
181			respmsg.content_type=msg.content_type
182			respmsg.request_id=msg.request_id
183			respmsg.created=int(DateTime())
184			respmsg.status=response['status']
185		else:
186			respmsg = None
187		logger.debug("Response to be sent: %s", respmsg)
188
189		return respmsg

OpenC2 Consumer

The Consumer is designed to dispatch OpenC2 Messages to the relevant Actuator. The current implementation receives the configuration at initialization time. It is therefore not conceived to be runned itself as a service, but to be integrated in an external component that reads the relevant configuration from file and passes it to the Consumer.

The Consumer has two main tasks:

  • creating the OpenC2 stack to process Messages (namely the combination of an Encoding format and a Transfer protocol);
  • dispatching incoming Commands to the relevant Actuator.

Each Consumer will only run a single Transfer protocol. All registered Encoders can be used, and a default Encoder is explicitely given that will be used when no other selection is available (e.g., to answer Messages that the Consumer does not understand).

Consumer( consumer: str, actuators: [] = None, encoder: openc2lib.core.encoder.Encoder = None, transfer: openc2lib.core.transfer.Transfer = None)
37	def __init__(self, consumer: str, actuators: [] =None, encoder: Encoder = None, transfer: Transfer = None):
38		""" Create a `Consumer`
39			:param consumer: This is a string that identifies the `Consumer` and is used in `from` 
40				and `to` fields of the OpenC2 `Message` (see Table 3.1 of the Language Specification.
41			:param actuators: This must be a list of available `Actuator`s. The list contains the
42				`Actuator` instances that will be used by the `Consumer`.
43			:param encoder: This is an instance of the `Encoder` that will be used by default.
44			:param transfer: This is the `Transfer` protocol that will be used to send/receive `Message`s.
45		"""
46		self.consumer = consumer
47		self.encoder = encoder
48		self.transfer = transfer
49		self.actuators = actuators
50
51		# TODO: Read configuration from file

Create a Consumer

Parameters
  • consumer: This is a string that identifies the Consumer and is used in from and to fields of the OpenC2 Message (see Table 3.1 of the Language Specification.
  • actuators: This must be a list of available Actuators. The list contains the Actuator instances that will be used by the Consumer.
  • encoder: This is an instance of the Encoder that will be used by default.
  • transfer: This is the Transfer protocol that will be used to send/receive Messages.
consumer
encoder
transfer
actuators
def run( self, encoder: openc2lib.core.encoder.Encoder = None, transfer: openc2lib.core.transfer.Transfer = None):
54	def run(self, encoder: Encoder = None, transfer: Transfer = None):
55		"""Runs a `Consumer`
56
57			This is the entry point of the `Consumer`. It must be invoked to start operation of the `Consumer`.
58			This method may be blocking, depending on the implementation of the `receive()` method of the 
59			used `Transfer`.
60
61			The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using 
62			different `Encoder`s and `Transfer`s). This feature clearly requires the `Transfer` 
63			implementation to be non-blocking.
64
65			:param encoder: A different `Encoder` that might be passed to overwrite what set at initialization time. 
66			:param transfer: A different `Transfer` that might be passed to overwrite what set at initialization time.
67			:return: None.
68		"""
69		if not encoder: encoder = self.encoder
70		if not transfer: transfer = self.transfer
71		if not transfer: raise ValueError('Missing transfer object')
72
73		transfer.receive(self.dispatch, self.encoder)

Runs a Consumer

This is the entry point of the Consumer. It must be invoked to start operation of the Consumer. This method may be blocking, depending on the implementation of the receive() method of the used Transfer.

The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using different Encoders and Transfers). This feature clearly requires the Transfer implementation to be non-blocking.

Parameters
  • encoder: A different Encoder that might be passed to overwrite what set at initialization time.
  • transfer: A different Transfer that might be passed to overwrite what set at initialization time.
Returns

None.

def dispatch(self, msg):
 76	def dispatch(self, msg):
 77		""" Dispatches Commands to Actuators
 78
 79			This method scans the actuator profile carried in the `Command` and select one or more
 80			`Actuator`s that will process the `Command`. 
 81			
 82			The current implementation is only meant to be used within the
 83			implementation of `Transfer` protocols as a callback for returning control to the main code.
 84			This approach is motivated by those Transfer protocols that replies to messages on the same 
 85			TCP connection, so to avoid errors with NAT and firewalls 
 86			(if a Command were passed back from the `Transfer.receive()` and processed within the `Consumer.run()`, 
 87			 the following `Transfer.send() would use a different TCP connection).
 88			
 89			:param msg: The full openc2lib `Message` that embeds the `Command` to be processed.
 90			:return: A `Message` that embeds the `Response` (from the `Actuator` or elaborated by the `Consumer` in
 91					case of errors).
 92		"""
 93		#TODO: The logic to select the actuator that matches the request
 94		# OC2 Architecture, Sec. 2.1:
 95		# The Profile field, if present, specifies the profile that defines the function 
 96		# to be performed. A Consumer executes the command if it supports the specified 
 97		# profile, otherwise the command is ignored. The Profile field may be omitted and 
 98		# typically will not be included in implementations where the functions of the 
 99		# recipients are unambiguous or when a high- level effects-based command is 
100		# desired and tactical decisions on how the effect is achieved is left to the 
101		# recipient. If Profile is omitted and the recipient supports multiple profiles, 
102		# the command will be executed in the context of each profile that supports the 
103		# command's combination of action and target.
104		try:
105			profile = msg.content.actuator.getName()
106		except AttributeError:
107			# For a packet filter-only consumer, the following may apply:
108			# profile = slpf.nsid
109			# Default: execute in the context of multiple profiles
110			profile = None
111			# TODO: how to mix responses from multiple actuators?
112			# Workaround: strictly require a profile to be present
113			response = Response(status=StatusCode.BADREQUEST, status_text='Missing profile')
114			return self.__respmsg(msg, response)
115
116		try:
117			asset_id = msg.content.actuator.getObj()['asset_id']
118		except KeyError:
119			# assed_id = None means the default actuator that implements the required profile
120			asset_id = None
121
122		try:
123			if profile == None:
124				# Select all actuators
125				actuator = list(self.actuators.values())
126			elif asset_id == None:
127				# Select all actuators that implement the specific profile
128				actuator = list(dict(filter(lambda p: p[0][0]==profile, self.actuators.items())).values())
129			else:
130				# Only one instance is expected to be present in this case
131				actuator = [self.actuators[(profile,asset_id)]]
132		except KeyError:
133			response = Response(status=StatusCode.NOTFOUND, status_text='No actuator available')
134			return self.__respmsg(msg, response)
135
136		response_content = None
137		if msg.content.args:
138			if 'response_requested' in msg.content.args.keys():
139				match msg.content.args['response_requested']:
140					case ResponseType.none:
141						response_content = None
142					case ResponseType.ack:
143						response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING])
144						# TODO: Spawn a process to run the process offline
145						logger.warn("Command: %s not run! -- Missing code")
146					case ResponseType.status:
147						response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING])
148						# TODO: Spawn a process to run the process offline
149						logger.warn("Command: %s not run! -- Missing code")
150					case ResponseType.complete:
151						response_content = self.__runcmd(msg, actuator)
152					case _:
153						response_content = Response(status=StatusCode.BADREQUEST, status_text="Invalid response requested")
154
155		if not response_content:
156			# Default: ResponseType == complete. Return an answer after the command is executed.
157			response_content = self.__runcmd(msg, actuator)
158					
159		logger.debug("Actuator %s returned: %s", actuator, response_content)
160
161		# Add the metadata to be returned to the Producer
162		return self.__respmsg(msg, response_content)

Dispatches Commands to Actuators

This method scans the actuator profile carried in the Command and select one or more Actuators that will process the Command.

The current implementation is only meant to be used within the implementation of Transfer protocols as a callback for returning control to the main code. This approach is motivated by those Transfer protocols that replies to messages on the same TCP connection, so to avoid errors with NAT and firewalls (if a Command were passed back from the Transfer.receive() and processed within the Consumer.run(), the following `Transfer.send() would use a different TCP connection).

Parameters
  • msg: The full openc2lib Message that embeds the Command to be processed.
Returns

A Message that embeds the Response (from the Actuator or elaborated by the Consumer in case of errors).