openc2lib.core.consumer
OpenC2 Consumer
The Consumer implements the expected behaviour of an OpenC2 Consumer server that dispatches OpenC2 Commands
to the Actuators.
1"""OpenC2 Consumer 2 3The `Consumer` implements the expected behaviour of an OpenC2 Consumer server that dispatches OpenC2 Commands 4to the Actuators. 5""" 6 7import logging 8 9from openc2lib.types.datatypes import DateTime, ResponseType 10 11from openc2lib.core.encoder import Encoder 12from openc2lib.core.transfer import Transfer 13from openc2lib.core.message import Message, Response 14from openc2lib.core.response import StatusCode, StatusCodeDescription 15 16logger = logging.getLogger('openc2') 17 18class Consumer: 19 """OpenC2 Consumer 20 21 The `Consumer` is designed to dispatch OpenC2 `Message`s to the relevant `Actuator`. 22 The current implementation receives the configuration at initialization time. It is therefore 23 not conceived to be runned itself as a service, but to be integrated in an external component 24 that reads the relevant configuration from file and passes it to the Consumer. 25 26 The `Consumer` has two main tasks: 27 - creating the OpenC2 stack to process Messages (namely the combination of an Encoding format and 28 a Transfer protocol); 29 - dispatching incoming `Command`s to the relevant `Actuator`. 30 31 Each `Consumer` will only run a single `Transfer` protocol. All registered `Encoder`s can be used, 32 and a default `Encoder` is explicitely given that will be used when no other selection is available 33 (e.g., to answer Messages that the Consumer does not understand). 34 35 """ 36 def __init__(self, consumer: str, actuators: [] =None, encoder: Encoder = None, transfer: Transfer = None): 37 """ Create a `Consumer` 38 :param consumer: This is a string that identifies the `Consumer` and is used in `from` 39 and `to` fields of the OpenC2 `Message` (see Table 3.1 of the Language Specification. 40 :param actuators: This must be a list of available `Actuator`s. The list contains the 41 `Actuator` instances that will be used by the `Consumer`. 42 :param encoder: This is an instance of the `Encoder` that will be used by default. 43 :param transfer: This is the `Transfer` protocol that will be used to send/receive `Message`s. 44 """ 45 self.consumer = consumer 46 self.encoder = encoder 47 self.transfer = transfer 48 self.actuators = actuators 49 50 # TODO: Read configuration from file 51 52 # TODO: Manage non-blocking implementation of the Transfer.receive() function 53 def run(self, encoder: Encoder = None, transfer: Transfer = None): 54 """Runs a `Consumer` 55 56 This is the entry point of the `Consumer`. It must be invoked to start operation of the `Consumer`. 57 This method may be blocking, depending on the implementation of the `receive()` method of the 58 used `Transfer`. 59 60 The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using 61 different `Encoder`s and `Transfer`s). This feature clearly requires the `Transfer` 62 implementation to be non-blocking. 63 64 :param encoder: A different `Encoder` that might be passed to overwrite what set at initialization time. 65 :param transfer: A different `Transfer` that might be passed to overwrite what set at initialization time. 66 :return: None. 67 """ 68 if not encoder: encoder = self.encoder 69 if not transfer: transfer = self.transfer 70 if not transfer: raise ValueError('Missing transfer object') 71 72 transfer.receive(self.dispatch, self.encoder) 73 74 75 def dispatch(self, msg): 76 """ Dispatches Commands to Actuators 77 78 This method scans the actuator profile carried in the `Command` and select one or more 79 `Actuator`s that will process the `Command`. 80 81 The current implementation is only meant to be used within the 82 implementation of `Transfer` protocols as a callback for returning control to the main code. 83 This approach is motivated by those Transfer protocols that replies to messages on the same 84 TCP connection, so to avoid errors with NAT and firewalls 85 (if a Command were passed back from the `Transfer.receive()` and processed within the `Consumer.run()`, 86 the following `Transfer.send() would use a different TCP connection). 87 88 :param msg: The full openc2lib `Message` that embeds the `Command` to be processed. 89 :return: A `Message` that embeds the `Response` (from the `Actuator` or elaborated by the `Consumer` in 90 case of errors). 91 """ 92 #TODO: The logic to select the actuator that matches the request 93 # OC2 Architecture, Sec. 2.1: 94 # The Profile field, if present, specifies the profile that defines the function 95 # to be performed. A Consumer executes the command if it supports the specified 96 # profile, otherwise the command is ignored. The Profile field may be omitted and 97 # typically will not be included in implementations where the functions of the 98 # recipients are unambiguous or when a high- level effects-based command is 99 # desired and tactical decisions on how the effect is achieved is left to the 100 # recipient. If Profile is omitted and the recipient supports multiple profiles, 101 # the command will be executed in the context of each profile that supports the 102 # command's combination of action and target. 103 try: 104 profile = msg.content.actuator.getName() 105 except AttributeError: 106 # For a packet filter-only consumer, the following may apply: 107 # profile = slpf.nsid 108 # Default: execute in the context of multiple profiles 109 profile = None 110 # TODO: how to mix responses from multiple actuators? 111 # Workaround: strictly require a profile to be present 112 response = Response(status=StatusCode.BADREQUEST, status_text='Missing profile') 113 return self.__respmsg(msg, response) 114 115 try: 116 asset_id = msg.content.actuator.getObj()['asset_id'] 117 except KeyError: 118 # assed_id = None means the default actuator that implements the required profile 119 asset_id = None 120 121 try: 122 if profile == None: 123 # Select all actuators 124 actuator = list(self.actuators.values()) 125 elif asset_id == None: 126 # Select all actuators that implement the specific profile 127 actuator = list(dict(filter(lambda p: p[0][0]==profile, self.actuators.items())).values()) 128 else: 129 # Only one instance is expected to be present in this case 130 actuator = [self.actuators[(profile,asset_id)]] 131 except KeyError: 132 response = Response(status=StatusCode.NOTFOUND, status_text='No actuator available') 133 return self.__respmsg(msg, response) 134 135 response_content = None 136 if msg.content.args: 137 if 'response_requested' in msg.content.args.keys(): 138 match msg.content.args['response_requested']: 139 case ResponseType.none: 140 response_content = None 141 case ResponseType.ack: 142 response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING]) 143 # TODO: Spawn a process to run the process offline 144 logger.warn("Command: %s not run! -- Missing code") 145 case ResponseType.status: 146 response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING]) 147 # TODO: Spawn a process to run the process offline 148 logger.warn("Command: %s not run! -- Missing code") 149 case ResponseType.complete: 150 response_content = self.__runcmd(msg, actuator) 151 case _: 152 response_content = Response(status=StatusCode.BADREQUEST, status_text="Invalid response requested") 153 154 if not response_content: 155 # Default: ResponseType == complete. Return an answer after the command is executed. 156 response_content = self.__runcmd(msg, actuator) 157 158 logger.debug("Actuator %s returned: %s", actuator, response_content) 159 160 # Add the metadata to be returned to the Producer 161 return self.__respmsg(msg, response_content) 162 163 def __runcmd(self, msg, actuator): 164 # Run the command and collect the response 165 # TODO: Define how to manage concurrent execution from more than one actuator 166 try: 167 # TODO: How to merge multiple responses? 168 # for a in actuators.items(): 169 response_content = actuator[0].run(msg.content) 170 except (IndexError,AttributeError): 171 response_content = Response(status=StatusCode.NOTFOUND, status_text='No actuator available') 172 173 return response_content 174 175 def __respmsg(self, msg, response): 176 if response: 177 respmsg = Message(response) 178 respmsg.from_=self.consumer 179 respmsg.to=[msg.from_] 180 respmsg.content_type=msg.content_type 181 respmsg.request_id=msg.request_id 182 respmsg.created=int(DateTime()) 183 respmsg.status=response['status'] 184 else: 185 respmsg = None 186 logger.debug("Response to be sent: %s", respmsg) 187 188 return respmsg 189 190 191 192# TODO: Add main to load configuration from file
19class Consumer: 20 """OpenC2 Consumer 21 22 The `Consumer` is designed to dispatch OpenC2 `Message`s to the relevant `Actuator`. 23 The current implementation receives the configuration at initialization time. It is therefore 24 not conceived to be runned itself as a service, but to be integrated in an external component 25 that reads the relevant configuration from file and passes it to the Consumer. 26 27 The `Consumer` has two main tasks: 28 - creating the OpenC2 stack to process Messages (namely the combination of an Encoding format and 29 a Transfer protocol); 30 - dispatching incoming `Command`s to the relevant `Actuator`. 31 32 Each `Consumer` will only run a single `Transfer` protocol. All registered `Encoder`s can be used, 33 and a default `Encoder` is explicitely given that will be used when no other selection is available 34 (e.g., to answer Messages that the Consumer does not understand). 35 36 """ 37 def __init__(self, consumer: str, actuators: [] =None, encoder: Encoder = None, transfer: Transfer = None): 38 """ Create a `Consumer` 39 :param consumer: This is a string that identifies the `Consumer` and is used in `from` 40 and `to` fields of the OpenC2 `Message` (see Table 3.1 of the Language Specification. 41 :param actuators: This must be a list of available `Actuator`s. The list contains the 42 `Actuator` instances that will be used by the `Consumer`. 43 :param encoder: This is an instance of the `Encoder` that will be used by default. 44 :param transfer: This is the `Transfer` protocol that will be used to send/receive `Message`s. 45 """ 46 self.consumer = consumer 47 self.encoder = encoder 48 self.transfer = transfer 49 self.actuators = actuators 50 51 # TODO: Read configuration from file 52 53 # TODO: Manage non-blocking implementation of the Transfer.receive() function 54 def run(self, encoder: Encoder = None, transfer: Transfer = None): 55 """Runs a `Consumer` 56 57 This is the entry point of the `Consumer`. It must be invoked to start operation of the `Consumer`. 58 This method may be blocking, depending on the implementation of the `receive()` method of the 59 used `Transfer`. 60 61 The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using 62 different `Encoder`s and `Transfer`s). This feature clearly requires the `Transfer` 63 implementation to be non-blocking. 64 65 :param encoder: A different `Encoder` that might be passed to overwrite what set at initialization time. 66 :param transfer: A different `Transfer` that might be passed to overwrite what set at initialization time. 67 :return: None. 68 """ 69 if not encoder: encoder = self.encoder 70 if not transfer: transfer = self.transfer 71 if not transfer: raise ValueError('Missing transfer object') 72 73 transfer.receive(self.dispatch, self.encoder) 74 75 76 def dispatch(self, msg): 77 """ Dispatches Commands to Actuators 78 79 This method scans the actuator profile carried in the `Command` and select one or more 80 `Actuator`s that will process the `Command`. 81 82 The current implementation is only meant to be used within the 83 implementation of `Transfer` protocols as a callback for returning control to the main code. 84 This approach is motivated by those Transfer protocols that replies to messages on the same 85 TCP connection, so to avoid errors with NAT and firewalls 86 (if a Command were passed back from the `Transfer.receive()` and processed within the `Consumer.run()`, 87 the following `Transfer.send() would use a different TCP connection). 88 89 :param msg: The full openc2lib `Message` that embeds the `Command` to be processed. 90 :return: A `Message` that embeds the `Response` (from the `Actuator` or elaborated by the `Consumer` in 91 case of errors). 92 """ 93 #TODO: The logic to select the actuator that matches the request 94 # OC2 Architecture, Sec. 2.1: 95 # The Profile field, if present, specifies the profile that defines the function 96 # to be performed. A Consumer executes the command if it supports the specified 97 # profile, otherwise the command is ignored. The Profile field may be omitted and 98 # typically will not be included in implementations where the functions of the 99 # recipients are unambiguous or when a high- level effects-based command is 100 # desired and tactical decisions on how the effect is achieved is left to the 101 # recipient. If Profile is omitted and the recipient supports multiple profiles, 102 # the command will be executed in the context of each profile that supports the 103 # command's combination of action and target. 104 try: 105 profile = msg.content.actuator.getName() 106 except AttributeError: 107 # For a packet filter-only consumer, the following may apply: 108 # profile = slpf.nsid 109 # Default: execute in the context of multiple profiles 110 profile = None 111 # TODO: how to mix responses from multiple actuators? 112 # Workaround: strictly require a profile to be present 113 response = Response(status=StatusCode.BADREQUEST, status_text='Missing profile') 114 return self.__respmsg(msg, response) 115 116 try: 117 asset_id = msg.content.actuator.getObj()['asset_id'] 118 except KeyError: 119 # assed_id = None means the default actuator that implements the required profile 120 asset_id = None 121 122 try: 123 if profile == None: 124 # Select all actuators 125 actuator = list(self.actuators.values()) 126 elif asset_id == None: 127 # Select all actuators that implement the specific profile 128 actuator = list(dict(filter(lambda p: p[0][0]==profile, self.actuators.items())).values()) 129 else: 130 # Only one instance is expected to be present in this case 131 actuator = [self.actuators[(profile,asset_id)]] 132 except KeyError: 133 response = Response(status=StatusCode.NOTFOUND, status_text='No actuator available') 134 return self.__respmsg(msg, response) 135 136 response_content = None 137 if msg.content.args: 138 if 'response_requested' in msg.content.args.keys(): 139 match msg.content.args['response_requested']: 140 case ResponseType.none: 141 response_content = None 142 case ResponseType.ack: 143 response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING]) 144 # TODO: Spawn a process to run the process offline 145 logger.warn("Command: %s not run! -- Missing code") 146 case ResponseType.status: 147 response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING]) 148 # TODO: Spawn a process to run the process offline 149 logger.warn("Command: %s not run! -- Missing code") 150 case ResponseType.complete: 151 response_content = self.__runcmd(msg, actuator) 152 case _: 153 response_content = Response(status=StatusCode.BADREQUEST, status_text="Invalid response requested") 154 155 if not response_content: 156 # Default: ResponseType == complete. Return an answer after the command is executed. 157 response_content = self.__runcmd(msg, actuator) 158 159 logger.debug("Actuator %s returned: %s", actuator, response_content) 160 161 # Add the metadata to be returned to the Producer 162 return self.__respmsg(msg, response_content) 163 164 def __runcmd(self, msg, actuator): 165 # Run the command and collect the response 166 # TODO: Define how to manage concurrent execution from more than one actuator 167 try: 168 # TODO: How to merge multiple responses? 169 # for a in actuators.items(): 170 response_content = actuator[0].run(msg.content) 171 except (IndexError,AttributeError): 172 response_content = Response(status=StatusCode.NOTFOUND, status_text='No actuator available') 173 174 return response_content 175 176 def __respmsg(self, msg, response): 177 if response: 178 respmsg = Message(response) 179 respmsg.from_=self.consumer 180 respmsg.to=[msg.from_] 181 respmsg.content_type=msg.content_type 182 respmsg.request_id=msg.request_id 183 respmsg.created=int(DateTime()) 184 respmsg.status=response['status'] 185 else: 186 respmsg = None 187 logger.debug("Response to be sent: %s", respmsg) 188 189 return respmsg
OpenC2 Consumer
The Consumer is designed to dispatch OpenC2 Messages to the relevant Actuator.
The current implementation receives the configuration at initialization time. It is therefore
not conceived to be runned itself as a service, but to be integrated in an external component
that reads the relevant configuration from file and passes it to the Consumer.
The Consumer has two main tasks:
- creating the OpenC2 stack to process Messages (namely the combination of an Encoding format and a Transfer protocol);
- dispatching incoming
Commands to the relevantActuator.
Each Consumer will only run a single Transfer protocol. All registered Encoders can be used,
and a default Encoder is explicitely given that will be used when no other selection is available
(e.g., to answer Messages that the Consumer does not understand).
37 def __init__(self, consumer: str, actuators: [] =None, encoder: Encoder = None, transfer: Transfer = None): 38 """ Create a `Consumer` 39 :param consumer: This is a string that identifies the `Consumer` and is used in `from` 40 and `to` fields of the OpenC2 `Message` (see Table 3.1 of the Language Specification. 41 :param actuators: This must be a list of available `Actuator`s. The list contains the 42 `Actuator` instances that will be used by the `Consumer`. 43 :param encoder: This is an instance of the `Encoder` that will be used by default. 44 :param transfer: This is the `Transfer` protocol that will be used to send/receive `Message`s. 45 """ 46 self.consumer = consumer 47 self.encoder = encoder 48 self.transfer = transfer 49 self.actuators = actuators 50 51 # TODO: Read configuration from file
Create a Consumer
Parameters
- consumer: This is a string that identifies the
Consumerand is used infromandtofields of the OpenC2Message(see Table 3.1 of the Language Specification. - actuators: This must be a list of available
Actuators. The list contains theActuatorinstances that will be used by theConsumer. - encoder: This is an instance of the
Encoderthat will be used by default. - transfer: This is the
Transferprotocol that will be used to send/receiveMessages.
54 def run(self, encoder: Encoder = None, transfer: Transfer = None): 55 """Runs a `Consumer` 56 57 This is the entry point of the `Consumer`. It must be invoked to start operation of the `Consumer`. 58 This method may be blocking, depending on the implementation of the `receive()` method of the 59 used `Transfer`. 60 61 The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using 62 different `Encoder`s and `Transfer`s). This feature clearly requires the `Transfer` 63 implementation to be non-blocking. 64 65 :param encoder: A different `Encoder` that might be passed to overwrite what set at initialization time. 66 :param transfer: A different `Transfer` that might be passed to overwrite what set at initialization time. 67 :return: None. 68 """ 69 if not encoder: encoder = self.encoder 70 if not transfer: transfer = self.transfer 71 if not transfer: raise ValueError('Missing transfer object') 72 73 transfer.receive(self.dispatch, self.encoder)
Runs a Consumer
This is the entry point of the Consumer. It must be invoked to start operation of the Consumer.
This method may be blocking, depending on the implementation of the receive() method of the
used Transfer.
The arguments of this method can be used to create multiple OpenC2 stacks (e.g., using
different Encoders and Transfers). This feature clearly requires the Transfer
implementation to be non-blocking.
Parameters
- encoder: A different
Encoderthat might be passed to overwrite what set at initialization time. - transfer: A different
Transferthat might be passed to overwrite what set at initialization time.
Returns
None.
76 def dispatch(self, msg): 77 """ Dispatches Commands to Actuators 78 79 This method scans the actuator profile carried in the `Command` and select one or more 80 `Actuator`s that will process the `Command`. 81 82 The current implementation is only meant to be used within the 83 implementation of `Transfer` protocols as a callback for returning control to the main code. 84 This approach is motivated by those Transfer protocols that replies to messages on the same 85 TCP connection, so to avoid errors with NAT and firewalls 86 (if a Command were passed back from the `Transfer.receive()` and processed within the `Consumer.run()`, 87 the following `Transfer.send() would use a different TCP connection). 88 89 :param msg: The full openc2lib `Message` that embeds the `Command` to be processed. 90 :return: A `Message` that embeds the `Response` (from the `Actuator` or elaborated by the `Consumer` in 91 case of errors). 92 """ 93 #TODO: The logic to select the actuator that matches the request 94 # OC2 Architecture, Sec. 2.1: 95 # The Profile field, if present, specifies the profile that defines the function 96 # to be performed. A Consumer executes the command if it supports the specified 97 # profile, otherwise the command is ignored. The Profile field may be omitted and 98 # typically will not be included in implementations where the functions of the 99 # recipients are unambiguous or when a high- level effects-based command is 100 # desired and tactical decisions on how the effect is achieved is left to the 101 # recipient. If Profile is omitted and the recipient supports multiple profiles, 102 # the command will be executed in the context of each profile that supports the 103 # command's combination of action and target. 104 try: 105 profile = msg.content.actuator.getName() 106 except AttributeError: 107 # For a packet filter-only consumer, the following may apply: 108 # profile = slpf.nsid 109 # Default: execute in the context of multiple profiles 110 profile = None 111 # TODO: how to mix responses from multiple actuators? 112 # Workaround: strictly require a profile to be present 113 response = Response(status=StatusCode.BADREQUEST, status_text='Missing profile') 114 return self.__respmsg(msg, response) 115 116 try: 117 asset_id = msg.content.actuator.getObj()['asset_id'] 118 except KeyError: 119 # assed_id = None means the default actuator that implements the required profile 120 asset_id = None 121 122 try: 123 if profile == None: 124 # Select all actuators 125 actuator = list(self.actuators.values()) 126 elif asset_id == None: 127 # Select all actuators that implement the specific profile 128 actuator = list(dict(filter(lambda p: p[0][0]==profile, self.actuators.items())).values()) 129 else: 130 # Only one instance is expected to be present in this case 131 actuator = [self.actuators[(profile,asset_id)]] 132 except KeyError: 133 response = Response(status=StatusCode.NOTFOUND, status_text='No actuator available') 134 return self.__respmsg(msg, response) 135 136 response_content = None 137 if msg.content.args: 138 if 'response_requested' in msg.content.args.keys(): 139 match msg.content.args['response_requested']: 140 case ResponseType.none: 141 response_content = None 142 case ResponseType.ack: 143 response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING]) 144 # TODO: Spawn a process to run the process offline 145 logger.warn("Command: %s not run! -- Missing code") 146 case ResponseType.status: 147 response_content = Response(status=StatusCode.PROCESSING, status_text=StatusCodeDescription[StatusCode.PROCESSING]) 148 # TODO: Spawn a process to run the process offline 149 logger.warn("Command: %s not run! -- Missing code") 150 case ResponseType.complete: 151 response_content = self.__runcmd(msg, actuator) 152 case _: 153 response_content = Response(status=StatusCode.BADREQUEST, status_text="Invalid response requested") 154 155 if not response_content: 156 # Default: ResponseType == complete. Return an answer after the command is executed. 157 response_content = self.__runcmd(msg, actuator) 158 159 logger.debug("Actuator %s returned: %s", actuator, response_content) 160 161 # Add the metadata to be returned to the Producer 162 return self.__respmsg(msg, response_content)
Dispatches Commands to Actuators
This method scans the actuator profile carried in the Command and select one or more
Actuators that will process the Command.
The current implementation is only meant to be used within the
implementation of Transfer protocols as a callback for returning control to the main code.
This approach is motivated by those Transfer protocols that replies to messages on the same
TCP connection, so to avoid errors with NAT and firewalls
(if a Command were passed back from the Transfer.receive() and processed within the Consumer.run(),
the following `Transfer.send() would use a different TCP connection).
Parameters
- msg: The full openc2lib
Messagethat embeds theCommandto be processed.
Returns
A
Messagethat embeds theResponse(from theActuatoror elaborated by theConsumerin case of errors).