##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################
"""
pykiso Control Message Protocol
*******************************
:module: message
:synopsis: Message that will be send though the different agents
.. currentmodule:: message
"""
from __future__ import annotations
import enum
import itertools
import logging
import struct
from typing import Dict, Optional, Union
msg_cnt = itertools.cycle(
range(256)
) # Will be used as token. It increases each time a Message is created
log = logging.getLogger(__name__)
[docs]@enum.unique
class MessageType(enum.IntEnum):
"""List of messages allowed."""
COMMAND = 0
REPORT = 1
ACK = 2
LOG = 3
[docs]@enum.unique
class MessageCommandType(enum.IntEnum):
"""List of commands allowed."""
# Ping
PING = 0
# Setups
TEST_SECTION_SETUP = 1
TEST_SUITE_SETUP = 2
TEST_CASE_SETUP = 3
# Runs
TEST_SECTION_RUN = 11
TEST_SUITE_RUN = 12
TEST_CASE_RUN = 13
# Teardowns
TEST_SECTION_TEARDOWN = 21
TEST_SUITE_TEARDOWN = 22
TEST_CASE_TEARDOWN = 23
# Abort
ABORT = 99
[docs]@enum.unique
class MessageReportType(enum.IntEnum):
"""List of possible received messages."""
TEST_PASS = 0
TEST_FAILED = 1
TEST_NOT_IMPLEMENTED = 2
def __str__(self) -> str:
return f"{self.__class__.__name__}.{self.name}"
[docs]@enum.unique
class MessageAckType(enum.IntEnum):
"""List of possible received messages."""
ACK = 0
NACK = 1
[docs]@enum.unique
class MessageLogType(enum.IntEnum):
"""List of possible received log messages."""
RESERVED = 0
# Link types and sub-types
type_sub_type_dict = {
MessageType.COMMAND: MessageCommandType,
MessageType.REPORT: MessageReportType,
MessageType.ACK: MessageAckType,
MessageType.LOG: MessageLogType,
}
[docs]class Message:
"""A message who fit testApp protocol.
The created message is a tlv style message with the following format:
TYPE: msg_type | message_token | sub_type | errorCode |
"""
crc_byte_size = 2
header_size = 8
max_payload_size = 0xFF
max_message_size = header_size + max_payload_size + crc_byte_size
reserved = 0
def __init__(
self,
msg_type: Union[int, MessageType] = 0,
sub_type: int = 0,
error_code: int = 0,
test_suite: int = 0,
test_case: int = 0,
tlv_dict: Optional[Dict[str, Union[int, bytes]]] = None,
):
"""Create a generic message.
:param msg_type: Message type
:type msg_type: MessageType
:param sub_type: Message sub-type
:type sub_type: Message<MessageType>Type
:param error_code: Error value
:type error_code: integer
:param test_suite: Suite value
:type test_suite: integer
:param test_case: Test value
:type test_case: integer
:param tlv_dict: Dictionary containing tlvs elements in the form {'type':'value', ...}
:type tlv_dict: dict
"""
self.msg_type = msg_type
global msg_cnt
self.msg_token = next(msg_cnt)
self.sub_type = sub_type
self.error_code = error_code
self.test_suite = test_suite
self.test_case = test_case
self.tlv_dict = tlv_dict
def __str__(self):
"""String representation of a message object."""
string = "msg_type:{}, message_token:{}, type:{}, error_code:{}, reserved:{}, test_suite ID:{}, test_case ID:{}".format(
self.msg_type,
self.msg_token,
self.sub_type,
self.error_code,
self.reserved,
self.test_suite,
self.test_case,
)
if self.tlv_dict is not None:
# Convert dec to ascii
tlv = {
key: "".join(chr(i) for i in val) for key, val in self.tlv_dict.items()
}
string += f", tlv_dict:{tlv}"
return string
[docs] def serialize(self) -> bytes:
"""Serialize message into raw packet.
Format: | msg_type (1b) | msg_token (1b) | sub_type (1b) | error_code (1b) |
| test_section (1b) | test_suite (1b) | test_case (1b) | payload_length (1b) |
| tlv_type (1b) | tlv_size (1b) | ... | crc_checksum (2b)
:return: bytes representing the Message object
"""
raw_packet = b""
raw_packet += struct.pack(
"BBBBBBB",
((int(self.msg_type) << 4) | (1 << 6)),
self.msg_token,
int(self.sub_type),
self.error_code,
self.reserved,
self.test_suite,
self.test_case,
)
# Calculate and convert the dictionaries tlv elements into bytes
if self.tlv_dict is not None:
payload = b""
for key, value in self.tlv_dict.items():
# Check first if it the dict is conform
parsed_key = b""
if isinstance(key, TlvKnownTags):
parsed_key = struct.pack("B", int(key))
else:
log.internal_warning("{} is not a supported format".format(key))
parsed_value = b""
if isinstance(value, str): # If string given
parsed_value = parsed_value.join(
[struct.pack("B", ord(val)) for val in value]
)
elif isinstance(value, int):
parsed_value = struct.pack(
"H", value
) # TODO check endianness later on
elif isinstance(value, bytes):
parsed_value = value
else:
log.internal_warning("{} is not a supported format".format(value))
# Add the TLV element:
payload += parsed_key
payload += struct.pack("B", len(parsed_value))
payload += parsed_value
# Add the tlvs elements within the raw_packet
raw_packet += struct.pack("B", len(payload))
raw_packet += payload
else:
# Add the payload length to 0
raw_packet += struct.pack("B", 0)
# Add crc to raw_packet
raw_packet += struct.pack("H", self.get_crc(raw_packet, self.crc_byte_size))
return raw_packet
[docs] @classmethod
def parse_packet(cls, raw_packet: bytes) -> Message:
"""Factory function to create a Message object from raw data.
:param raw_packet: array of a received message
:return: itself
"""
msg = cls()
if (not isinstance(raw_packet, bytes)) and (
len(raw_packet) < (msg.header_size + msg.crc_byte_size)
):
log.error("Packet is not understandable")
# Check the CRC
crc = cls.get_crc(raw_packet[: -msg.crc_byte_size], msg.crc_byte_size)
if crc != struct.unpack("H", raw_packet[-msg.crc_byte_size :])[0]:
log.error(
f"CRC check failed {crc} != {struct.unpack('H', raw_packet[-msg.crc_byte_size:])[0]}"
)
unpack_header = struct.unpack("BBBBBBB", raw_packet[:7])
msg.msg_type = (MessageType)(int((unpack_header[0] & 0x30) >> 4))
msg.msg_token = int(unpack_header[1])
# Because the sub-type depend on the type:
msg.sub_type = (type_sub_type_dict[msg.msg_type])(int(unpack_header[2]))
msg.error_code = int(unpack_header[3])
msg.reserved = int(unpack_header[4])
msg.test_suite = int(unpack_header[5])
msg.test_case = int(unpack_header[6])
payload_length = int(raw_packet[7])
# Create payload based on known tlvs
if payload_length != 0:
msg.tlv_dict = {}
for tag, value in cls._parse_tlv(
raw_packet[msg.header_size : -msg.crc_byte_size]
):
msg.tlv_dict[TlvKnownTags(tag)] = value
return msg
@classmethod
def _parse_tlv(cls, tlv_packet: bytes) -> tuple:
"""Generator used to parse TLV formatted bytes array.
:param tlv_packet: raw TLV formatted bytes array
:return: tuple containing the extract tag(int) and value(list)
"""
tlv_iterator = iter(tlv_packet)
while True:
try:
tag = int(next(itertools.islice(tlv_iterator, 1)))
length = int(next(itertools.islice(tlv_iterator, 1)))
value = [val for val in itertools.islice(tlv_iterator, length)]
yield (tag, value)
except StopIteration:
break
[docs] def generate_ack_message(self, ack_type: int) -> Union[Message, None]:
"""Generate acknowledgement to send out.
:param ack_type: ack or nack
:return: filled acknowledge message otherwise None
"""
# Return if wrong parameter given
if not isinstance(ack_type, MessageAckType):
return None
# Create ack message
ack_message = Message(
msg_type=MessageType.ACK,
sub_type=ack_type,
error_code=0,
test_suite=self.test_suite,
test_case=self.test_case,
)
ack_message.msg_token = self.msg_token
# Return the ack message
return ack_message
[docs] def check_if_ack_message_is_matching(self, ack_message: Message) -> bool:
"""Check if the ack message was for this sent message.
:param ack_message: received acknowledge message
:return: True if message type and token are valid otherwise
False
"""
if (
ack_message.msg_type == MessageType.ACK
and ack_message.msg_token == self.msg_token
):
return True
else:
log.internal_info(
"ack_message: {} \ndifferent of \nthis message: {}".format(
str(ack_message), str(self)
)
)
return False
[docs] def get_message_type(self) -> Union[int, MessageType]:
"""Return actual message type."""
return self.msg_type
[docs] def get_message_token(self) -> int:
"""Return actual message token."""
return self.msg_token
[docs] def get_message_sub_type(self) -> int:
"""Return actual message subtype."""
return self.sub_type
[docs] def get_message_tlv_dict(self) -> dict:
"""Return actual message type/length/value dictionary."""
return self.tlv_dict
[docs] @classmethod
def get_crc(cls, serialized_msg: bytes, crc_byte_size: int = 2) -> int:
"""Get the CRC checksum for a bytes message.
:param serialized_msg: message used for the crc calculation
:param crc_byte_size: number of bytes dedicated for the crc
:return: CRC checksum
"""
crc = 0
crc_mask = 255
crc_size = (2 ** (crc_byte_size * 8)) - 1
for _, msg in enumerate(serialized_msg):
crc = ((crc >> 8) | (crc << 8)) & crc_size
crc ^= int(msg)
crc ^= (crc & crc_mask) >> 4
crc ^= (crc << 12) & crc_size
crc ^= ((crc & crc_mask) << 5) & crc_size
return crc