import asyncio
|
|
from udsoncan import Response, Request, Dtc, MemoryLocation, services
|
from udsoncan.exceptions import NegativeResponseException, UnexpectedResponseException, ConfigError
|
from udsoncan.configs import default_client_config
|
|
|
class Client(object):
|
|
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, config=default_client_config):
|
self.reader = reader
|
self.writer = writer
|
self.config = dict(config)
|
|
async def send_request(self, request: Request, suppress_positive_response=None, timeout=None):
|
if timeout is None:
|
timeout = self.config['request_timeout']
|
|
payload = request.get_payload(suppress_positive_response)
|
self.writer.write(payload)
|
|
if suppress_positive_response is None:
|
payload = await asyncio.wait_for(self.reader.read(4095), timeout)
|
response = Response.from_payload(payload)
|
|
if not response.positive:
|
raise NegativeResponseException(response)
|
|
return response
|
|
async def change_session(self, newsession, suppress_positive_response=None, timeout=None):
|
request = services.DiagnosticSessionControl.make_request(newsession)
|
|
response = await self.send_request(request, suppress_positive_response, timeout)
|
|
if response is None:
|
return
|
|
services.DiagnosticSessionControl.interpret_response(response)
|
|
if newsession != response.service_data.session_echo:
|
raise UnexpectedResponseException(response, 'Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)' % (
|
response.service_data.session_echo, newsession))
|
|
return response
|
|
async def request_seed(self, level):
|
request = services.SecurityAccess.make_request(
|
level, mode=services.SecurityAccess.Mode.RequestSeed)
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
services.SecurityAccess.interpret_response(
|
response, mode=services.SecurityAccess.Mode.RequestSeed)
|
|
expected_level = services.SecurityAccess.normalize_level(
|
mode=services.SecurityAccess.Mode.RequestSeed, level=level)
|
received_level = response.service_data.security_level_echo
|
|
if expected_level != received_level:
|
raise UnexpectedResponseException(
|
response, 'Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)' % (received_level, expected_level))
|
|
return response
|
|
async def send_key(self, level, key):
|
request = services.SecurityAccess.make_request(
|
level, mode=services.SecurityAccess.Mode.SendKey, key=key)
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
services.SecurityAccess.interpret_response(
|
response, mode=services.SecurityAccess.Mode.SendKey)
|
|
expected_level = services.SecurityAccess.normalize_level(
|
mode=services.SecurityAccess.Mode.SendKey, level=level)
|
received_level = response.service_data.security_level_echo
|
|
if expected_level != received_level:
|
raise UnexpectedResponseException(
|
response, 'Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)' % (received_level, expected_level))
|
|
return response
|
|
async def tester_present(self, suppress_positive_response=None, timeout=None):
|
request = services.TesterPresent.make_request()
|
|
response = await self.send_request(request, suppress_positive_response, timeout)
|
|
if response is None:
|
return
|
|
services.TesterPresent.interpret_response(response)
|
|
if request.subfunction != response.service_data.subfunction_echo:
|
raise UnexpectedResponseException(response, 'Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)' % (
|
response.service_data.subfunction_echo, request.subfunction))
|
|
return response
|
|
async def read_data_by_identifier_first(self, didlist):
|
didlist = services.ReadDataByIdentifier.validate_didlist_input(didlist)
|
response = await self.read_data_by_identifier(didlist)
|
values = response.service_data.values
|
if len(values) > 0 and len(didlist) > 0:
|
return values[didlist[0]]
|
|
async def read_data_by_identifier(self, didlist):
|
didlist = services.ReadDataByIdentifier.validate_didlist_input(didlist)
|
|
request = services.ReadDataByIdentifier.make_request(
|
didlist=didlist, didconfig=self.config['data_identifiers'])
|
|
if 'data_identifiers' not in self.config or not isinstance(self.config['data_identifiers'], dict):
|
raise AttributeError(
|
'Configuration does not contains a valid data identifier description.')
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
params = {
|
'didlist': didlist,
|
'didconfig': self.config['data_identifiers'],
|
'tolerate_zero_padding': self.config['tolerate_zero_padding']
|
}
|
|
try:
|
services.ReadDataByIdentifier.interpret_response(
|
response, **params)
|
except ConfigError as e:
|
if e.key in didlist:
|
raise
|
else:
|
raise UnexpectedResponseException(
|
response, "Server returned values for data identifier 0x%04x that was not requested and no Codec was defined for it. Parsing must be stopped." % (e.key))
|
|
set_request_didlist = set(didlist)
|
set_response_didlist = set(response.service_data.values.keys())
|
extra_did = set_response_didlist - set_request_didlist
|
missing_did = set_request_didlist - set_response_didlist
|
|
if len(extra_did) > 0:
|
raise UnexpectedResponseException(
|
response, "Server returned values for %d data identifier that were not requested. Dids are : %s" % (len(extra_did), extra_did))
|
|
if len(missing_did) > 0:
|
raise UnexpectedResponseException(
|
response, "%d data identifier values are missing from server response. Dids are : %s" % (len(missing_did), missing_did))
|
|
return response
|
|
async def write_data_by_identifier(self, did, value):
|
request = services.WriteDataByIdentifier.make_request(
|
did, value, didconfig=self.config['data_identifiers'])
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
services.WriteDataByIdentifier.interpret_response(response)
|
|
if response.service_data.did_echo != did:
|
raise UnexpectedResponseException(response, 'Server returned a response for data identifier 0x%04x while client requested for did 0x%04x' % (
|
response.service_data.did_echo, did))
|
|
return response
|
|
async def ecu_reset(self, reset_type, suppress_positive_response=None, timeout=None):
|
request = services.ECUReset.make_request(reset_type)
|
|
response = await self.send_request(request, suppress_positive_response, timeout)
|
|
if response is None:
|
return
|
|
services.ECUReset.interpret_response(response)
|
|
if response.service_data.reset_type_echo != reset_type:
|
raise UnexpectedResponseException(response, 'Response subfunction received from server (0x%02x) does not match the requested subfunction (0x%02x)' % (
|
response.service_data.reset_type_echo, reset_type))
|
|
return response
|
|
async def clear_dtc(self, group=0xFFFFFF):
|
request = services.ClearDiagnosticInformation.make_request(group)
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
services.ClearDiagnosticInformation.interpret_response(response)
|
|
return response
|
|
async def start_routine(self, routine_id, data=None):
|
response = await self.routine_control(routine_id, services.RoutineControl.ControlType.startRoutine, data)
|
return response
|
|
async def stop_routine(self, routine_id, data=None):
|
response = await self.routine_control(routine_id, services.RoutineControl.ControlType.stopRoutine, data)
|
return response
|
|
async def routine_control(self, routine_id, control_type, data=None):
|
request = services.RoutineControl.make_request(
|
routine_id, control_type, data=data)
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
services.RoutineControl.interpret_response(response)
|
|
if control_type != response.service_data.control_type_echo:
|
raise UnexpectedResponseException(response, 'Control type of response (0x%02x) does not match request control type (0x%02x)' % (
|
response.service_data.control_type_echo, control_type))
|
|
if routine_id != response.service_data.routine_id_echo:
|
raise UnexpectedResponseException(response, 'Response received from server (ID = 0x%04x) does not match the requested routine ID (0x%04x)' % (
|
response.service_data.routine_id_echo, routine_id))
|
|
return response
|
|
async def communication_control(self, control_type, communication_type, suppress_positive_response=None, timeout=None):
|
request = services.CommunicationControl.make_request(
|
control_type, communication_type)
|
|
response = await self.send_request(request, suppress_positive_response, timeout)
|
|
if response is None:
|
return
|
|
services.CommunicationControl.interpret_response(response)
|
|
if control_type != response.service_data.control_type_echo:
|
raise UnexpectedResponseException(response, 'Control type of response (0x%02x) does not match request control type (0x%02x)' % (
|
response.service_data.control_type_echo, control_type))
|
|
return response
|
|
async def request_download(self, memory_location, dfi=None):
|
response = await self.request_upload_download(services.RequestDownload, memory_location, dfi)
|
return response
|
|
async def request_upload(self, memory_location, dfi=None):
|
response = await self.request_upload_download(services.RequestUpload, memory_location, dfi)
|
return response
|
|
async def request_upload_download(self, service_cls, memory_location, dfi=None):
|
dfi = service_cls.normalize_data_format_identifier(dfi)
|
|
if service_cls not in [services.RequestDownload, services.RequestUpload]:
|
raise ValueError(
|
'Service must either be RequestDownload or RequestUpload')
|
|
if not isinstance(memory_location, MemoryLocation):
|
raise ValueError(
|
'memory_location must be an instance of MemoryLocation')
|
|
if 'server_address_format' in self.config:
|
memory_location.set_format_if_none(
|
address_format=self.config['server_address_format'])
|
|
if 'server_memorysize_format' in self.config:
|
memory_location.set_format_if_none(
|
memorysize_format=self.config['server_memorysize_format'])
|
|
request = service_cls.make_request(
|
memory_location=memory_location, dfi=dfi)
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
service_cls.interpret_response(response)
|
|
return response
|
|
async def transfer_data(self, sequence_number, data=None):
|
request = services.TransferData.make_request(sequence_number, data)
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
services.TransferData.interpret_response(response)
|
|
if sequence_number != response.service_data.sequence_number_echo:
|
raise UnexpectedResponseException(response, 'Block sequence number of response (0x%02x) does not match request block sequence number (0x%02x)' % (
|
response.service_data.sequence_number_echo, sequence_number))
|
|
return response
|
|
async def request_transfer_exit(self, data=None):
|
request = services.RequestTransferExit.make_request(data)
|
|
response = await self.send_request(request)
|
|
if response is None:
|
return
|
|
services.RequestTransferExit.interpret_response(response)
|
|
return response
|
|
async def control_dtc_setting(self, setting_type, data=None, suppress_positive_response=None, timeout=None):
|
request = services.ControlDTCSetting.make_request(
|
setting_type, data=data)
|
|
response = await self.send_request(request, suppress_positive_response, timeout)
|
|
if response is None:
|
return
|
|
services.ControlDTCSetting.interpret_response(response)
|
|
if response.service_data.setting_type_echo != setting_type:
|
raise UnexpectedResponseException(response, 'Setting type of response (0x%02x) does not match request control type (0x%02x)' % (
|
response.service_data.setting_type_echo, setting_type))
|
|
return response
|
|
async def get_dtc_by_status_mask(self, status_mask):
|
pass
|
|
async def get_number_of_dtc_by_status_mask(self, status_mask):
|
pass
|
|
async def get_supported_dtc(self):
|
pass
|
|
async def read_dtc_information(self, subfunction, status_mask=None, severity_mask=None, dtc=None, snapshot_record_number=None, extended_data_record_number=None, extended_data_size=None):
|
pass
|