From 75b621a09f2540a3687dc9197da65facfeac9072 Mon Sep 17 00:00:00 2001 From: yaapu Date: Sat, 21 Mar 2020 01:24:35 +0100 Subject: [PATCH] Tools: autotest added frsky serial_protocol 4 and 10 testing against mavlink telemetry --- Tools/autotest/common.py | 426 +++++++++++++++++++++++++++++++++++---- 1 file changed, 389 insertions(+), 37 deletions(-) diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index d43bd84fae..e81d020127 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -505,7 +505,7 @@ class FRSkySPort(FRSky): self.SENSOR_ID_FAS = 0x22 # Sensor ID 2 self.SENSOR_ID_GPS = 0x83 # Sensor ID 3 self.SENSOR_ID_SP2UR = 0xC6 # Sensor ID 6 - self.SENSOR_ID_28 = 0x1B # Sensor ID 28 + self.SENSOR_ID_27 = 0x1B # Sensor ID 27 self.state = self.state_WANT_FRAME_TYPE @@ -527,13 +527,19 @@ class FRSkySPort(FRSky): 0x5007: "parameters", # SPort non-passthrough: + 0x01: "GPS_ALT_BP", 0x02: "Temp1", 0x04: "Fuel", 0x05: "Temp2", - 0x10: "Baro Alt BP", + 0x09: "GPS_ALT_AP", + 0x10: "BARO_ALT_BP", + 0x11: "GPS_ALT_AP", + 0x14: "HDG", + 0x19: "GPS_SPEED_AP", 0x21: "BARO_ALT_AP", + 0x28: "CURR", 0x30: "VARIO", - 0x39: "VFAS", + 0x39: "VFAS" # 0x800: "GPS", ## comments as duplicated dictrionary key } @@ -686,7 +692,7 @@ class FRSkyPassThrough(FRSkySPort): def __init__(self, destination_address): super(FRSkyPassThrough, self).__init__(destination_address) - self.sensors_to_poll = [self.SENSOR_ID_28] + self.sensors_to_poll = [self.SENSOR_ID_27] def progress(self, message): print("FRSkyPassthrough: %s" % message) @@ -6053,22 +6059,188 @@ switch value''' if m3.state != 0: raise NotAchievedException("Didn't get expected mask back in message (mask=0 state=%u" % (m3.state)) - def tfp_validate_vel_and_yaw(self, value): + def compare_number_percent(self, num1, num2, percent): + if num1 == 0 and num2 == 0: + return True + if abs(num1-num2)/max(abs(num1),abs(num2)) <= percent*0.01: + return True + return False + + def bit_extract(self,number,offset,length): + mask = 0 + for i in range(offset,offset+length): + mask |= 1 << i + return (number & mask) >> offset + + def tf_encode_gps_latitude(self,lat): + value = 0 + if lat < 0: + value = ((abs(lat)/100)*6) | 0x40000000 + else: + value = ((abs(lat)/100)*6) + return value + + def tf_validate_gps(self, value): # shared by proto 4 and proto 10 + self.progress("validating gps (0x%02x)" % value) + lat = value + gpi = self.mav.recv_match( + type='GLOBAL_POSITION_INT', + blocking=True, + timeout=1 + ) + if gpi is None: + raise NotAchievedException("Did not get GLOBAL_POSITION_INT message") + gpi_lat = self.tf_encode_gps_latitude(gpi.lat) + self.progress("GLOBAL_POSITION_INT lat==%f frsky==%f" % (gpi_lat, lat)) + if gpi_lat == lat: + return True + return False + def tfp_prep_number(self,number,digits,power): + res = 0 + abs_number = abs(number) + if digits == 2 and power == 1: # number encoded on 8 bits: 7 bits for digits + 1 for 10^power + if abs_number < 100: + res = abs_number<<1 + elif abs_number < 1270: + res = (round(abs_number * 0.1)<<1)|0x1 + else: # transmit max possible value (0x7F x 10^1 = 1270) + res = 0xFF + if number < 0: # if number is negative, add sign bit in front + res |= 0x1<<8 + elif digits == 2 and power == 2: # number encoded on 9 bits: 7 bits for digits + 2 for 10^power + if abs_number < 100: + res = abs_number<<2 + elif abs_number < 1000: + res = (round(abs_number * 0.1)<<2)|0x1 + elif abs_number < 10000: + res = (round(abs_number * 0.01)<<2)|0x2 + elif abs_number < 127000: + res = (round(abs_number * 0.001)<<2)|0x3 + else: # transmit max possible value (0x7F x 10^3 = 127000) + res = 0x1FF + if number < 0: # if number is negative, add sign bit in front + res |= 0x1<<9 + elif digits == 3 and power == 1: # number encoded on 11 bits: 10 bits for digits + 1 for 10^power + if abs_number < 1000: + res = abs_number<<1 + elif abs_number < 10240: + res = (round(abs_number * 0.1)<<1)|0x1 + else: # transmit max possible value (0x3FF x 10^1 = 10240) + res = 0x7FF + if number < 0: # if number is negative, add sign bit in front + res |= 0x1<<11 + elif digits == 3 and power == 2: # number encoded on 12 bits: 10 bits for digits + 2 for 10^power + if abs_number < 1000: + res = abs_number<<2 + elif abs_number < 10000: + res = (round(abs_number * 0.1)<<2)|0x1 + elif abs_number < 100000: + res = (round(abs_number * 0.01)<<2)|0x2 + elif abs_number < 1024000: + res = (round(abs_number * 0.001)<<2)|0x3 + else: # transmit max possible value (0x3FF x 10^3 = 127000) + res = 0xFFF + if number < 0: # if number is negative, add sign bit in front + res |= 0x1<<12 + return res + + def tfp_validate_ap_status(self, value): # 0x5001 + self.progress("validating ap_status(0x%02x)" % value) + flight_mode = self.bit_extract(value,0,5) - 1 # first mode is 1 not 0 :-) + simple_mode = self.bit_extract(value,5,2) + is_flying = not self.bit_extract(value,7,1) + status_armed = self.bit_extract(value,8,1) + batt_failsafe = self.bit_extract(value,9,1) + ekf_failsafe = self.bit_extract(value,10,2) + imu_temp = self.bit_extract(value,26,6) + 19 # IMU temperature: 0 means temp =< 19, 63 means temp => 82 + heartbeat = self.mav.recv_match( + type='HEARTBEAT', + blocking=True, + timeout=1 + ) + if heartbeat is None: + raise NotAchievedException("Did not get HEARTBEAT message") + mav_flight_mode = heartbeat.custom_mode + self.progress(" mode=%u heartbeat=%u" % (flight_mode, mav_flight_mode)) + if mav_flight_mode == flight_mode: + self.progress("flight mode match") + return True + # FIXME: need to check other values as well + return False + + def tfp_validate_attitude(self, value): + self.progress("validating attitude(0x%02x)" % value) + roll = (min(self.bit_extract(value,0,11),1800) - 900) * 0.2 # roll [0,1800] ==> [-180,180] + pitch = (min(self.bit_extract(value,11,10),900) - 450) * 0.2 # pitch [0,900] ==> [-90,90] + rng_cm = self.bit_extract(value,22,10) * (10^self.bit_extract(value,21,1)) # cm + atti = self.mav.recv_match( + type='ATTITUDE', + blocking=True, + timeout=1 + ) + if atti is None: + raise NotAchievedException("Did not get ATTITUDE message") + atti_roll = round(atti.roll) + self.progress("ATTITUDE roll==%f frsky==%f" % (atti_roll, roll)) + if abs(atti_roll - roll) < 5: + return True + # FIXME: need to check other values as well + return False + + def tfp_validate_home_status(self, value): + self.progress("validating home status(0x%02x)" % value) + home_dist_m = self.bit_extract(value,2,10) * (10^self.bit_extract(value,0,2)) + home_alt_m = self.bit_extract(value,14,10) * (10^self.bit_extract(value,12,2)) * 0.1 * (self.bit_extract(value,24,1) == 1 and -1 or 1) + home_angle_d = self.bit_extract(value, 25, 7) * 3 + gpi = self.mav.recv_match( + type='GLOBAL_POSITION_INT', + blocking=True, + timeout=1 + ) + if gpi is None: + raise NotAchievedException("Did not get GLOBAL_POSITION_INT message") + gpi_relative_alt = gpi.relative_alt + self.progress("GLOBAL_POSITION_INT rel_alt==%f frsky==%f" % (gpi_relative_alt, home_alt_m)) + if abs(gpi_relative_alt - home_alt_m) < 1: + return True + # FIXME: need to check other values as well + return False + + def tfp_validate_gps_status(self, value): + self.progress("validating gps status(0x%02x)" % value) + num_sats = self.bit_extract(value,0,4) + gps_status = self.bit_extract(value,4,2) + self.bit_extract(value,14,2) + gps_hdop = self.bit_extract(value,7,7) * (10^self.bit_extract(value,6,1)) # dm + gps_alt = self.bit_extract(value,24,7) * (10^self.bit_extract(value,22,2)) * (self.bit_extract(value,31,1) == 1 and -1 or 1) # dm + gri = self.mav.recv_match( + type='GPS_RAW_INT', + blocking=True, + timeout=1 + ) + if gri is None: + raise NotAchievedException("Did not get GPS_RAW_INT message") + gri_status = gri.fix_type + self.progress("GPS_RAW_INT fix_type==%f frsky==%f" % (gri_status, gps_status)) + if gps_status == gri_status: + return True + # FIXME: need to check other values as well + return False + def tfp_validate_vel_and_yaw(self, value): # 0x5005 self.progress("validating vel_and_yaw(0x%02x)" % value) - VELANDYAW_XYVEL_OFFSET = 9 - VELANDYAW_YAW_LIMIT = 0x7FF - VELANDYAW_YAW_OFFSET = 16 - yaw = value >> VELANDYAW_YAW_OFFSET - xy_vel = value >> VELANDYAW_XYVEL_OFFSET & 0xFF - z_vel_dm_per_second = value & 0xFFFF - - gpi = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) + z_vel_dm_per_second = self.bit_extract(value,1,7) * (10^self.bit_extract(value,0,1)) * (self.bit_extract(value,8,1) == 1 and -1 or 1) + xy_vel = self.bit_extract(value,10,7) * (10^self.bit_extract(value,9,1)) + yaw = self.bit_extract(value,17,11) * 0.2 + gpi = self.mav.recv_match( + type='GLOBAL_POSITION_INT', + blocking=True, + timeout=1 + ) if gpi is None: return - self.progress(" yaw=%u gpi=%u" % (yaw, gpi.hdg)) + self.progress(" yaw=%u gpi=%u" % (yaw, gpi.hdg*0.01)) self.progress(" xy_vel=%u" % xy_vel) self.progress(" z_vel_dm_per_second=%u" % z_vel_dm_per_second) - if int(round(yaw/10.0)) == int(round(gpi.hdg/100.0)): + if self.compare_number_percent(gpi.hdg*0.01,yaw,10): self.progress("Yaw match") return True # FIXME: need to be under way to check the velocities, really.... @@ -6076,11 +6248,10 @@ switch value''' def tfp_validate_battery1(self, value): self.progress("validating battery1 (0x%02x)" % value) - BATT_VOLTAGE_LIMIT = 0x1FF - BATT_CURRENT_OFFSET = 9 - BATT_TOTALMAH_LIMIT = 0x7FFF - BATT_TOTALMAH_OFFSET = 17 - voltage = (value & BATT_VOLTAGE_LIMIT)/10.0 + voltage = self.bit_extract(value,0,9) #dV + current = self.bit_extract(value,10,7) * (10^self.bit_extract(value,9,1)) + mah = self.bit_extract(value,17,15) + voltage = value * 0.1 batt = self.mav.recv_match( type='BATTERY_STATUS', blocking=True, @@ -6090,10 +6261,31 @@ switch value''' if batt is None: raise NotAchievedException("Did not get BATTERY_STATUS message") battery_status_value = batt.voltages[0]/1000.0 - self.progress("BATTERY_STATUS==%f frsky==%f" % (battery_status_value, voltage)) + self.progress("BATTERY_STATUS voltage==%f frsky==%f" % (battery_status_value, voltage)) if abs(battery_status_value - voltage) > 0.1: return False + # FIXME: need to check other values as well return True + def tfp_validate_params(self, value): + self.progress("validating params (0x%02x)" % value) + param_id = self.bit_extract(value,24,4) + param_value = self.bit_extract(value,0,24) + if param_id != 1: + return False + frame_type = param_value + hb = self.mav.recv_match( + type='HEARTBEAT', + blocking=True, + timeout=1, + ) + if hb is None: + raise NotAchievedException("Did not get HEARTBEAT message") + hb_type = hb.type + self.progress("HEARTBEAT type==%f frsky==%f" % (hb_type, frame_type)) + if hb_type == frame_type: + return True + # FIXME: need to check other values as well + return False def test_frsky_passthrough(self): self.set_parameter("SERIAL5_PROTOCOL", 10) # serial5 is FRSky passthrough @@ -6170,15 +6362,15 @@ switch value''' # message. These are ordered according to the wfq scheduler wants = { 0x5000: lambda xx : True, - 0x5006: lambda xx : True, - 0x800: lambda xx : True, + 0x5006: self.tfp_validate_attitude, + 0x800: self.tf_validate_gps, 0x5005: self.tfp_validate_vel_and_yaw, - 0x5001: lambda xx : True, - 0x5002: lambda xx : True, - 0x5004: lambda xx : True, - # 0x5008: lambda x : True, # no second battery, so this doesn't arrive + 0x5001: self.tfp_validate_ap_status, + 0x5002: self.tfp_validate_gps_status, + 0x5004: self.tfp_validate_home_status, + #0x5008: lambda x : True, # no second battery, so this doesn't arrive 0x5003: self.tfp_validate_battery1, - 0x5007: lambda xx : True, + 0x5007: self.tfp_validate_params, } tstart = self.get_sim_time_cached() while len(wants): @@ -6197,6 +6389,160 @@ switch value''' self.progress(" Fulfilled") del wants[want] + def tfs_validate_gps_alt(self, value): + self.progress("validating gps altitude integer part (0x%02x)" % value) + alt = value + gpi = self.mav.recv_match( + type='GLOBAL_POSITION_INT', + blocking=True, + timeout=1 + ) + if gpi is None: + raise NotAchievedException("Did not get GLOBAL_POSITION_INT message") + gpi_alt = round(gpi.alt*0.001) + self.progress("GLOBAL_POSITION_INT alt==%f frsky==%f" % (gpi_alt, alt)) + if self.compare_number_percent(gpi_alt,alt,10): + return True + return False + def tfs_validate_baro_alt(self, value): + self.progress("validating baro altitude integer part (0x%02x)" % value) + alt = value + gpi = self.mav.recv_match( + type='GLOBAL_POSITION_INT', + blocking=True, + timeout=1 + ) + if gpi is None: + raise NotAchievedException("Did not get GLOBAL_POSITION_INT message") + gpi_alt = round(gpi.relative_alt) + self.progress("GLOBAL_POSITION_INT relative_alt==%f frsky==%f" % (gpi_alt, alt)) + if abs(gpi_alt -alt) < 10: + return True + return False + def tfs_validate_gps_speed(self, value): + self.progress("validating gps speed integer part (0x%02x)" % value) + speed = value + vfr_hud = self.mav.recv_match( + type='VFR_HUD', + blocking=True, + timeout=1 + ) + if vfr_hud is None: + raise NotAchievedException("Did not get VFR_HUD message") + vfr_hud_speed = round(vfr_hud.groundspeed) + self.progress("VFR_HUD groundspeed==%f frsky==%f" % (vfr_hud_speed, speed)) + if self.compare_number_percent(vfr_hud_speed,speed,10): + return True + return False + def tfs_validate_yaw(self, value): + self.progress("validating yaw (0x%02x)" % value) + yaw = value + vfr_hud = self.mav.recv_match( + type='VFR_HUD', + blocking=True, + timeout=1 + ) + if vfr_hud is None: + raise NotAchievedException("Did not get VFR_HUD message") + vfr_hud_yaw = round(vfr_hud.heading) + self.progress("VFR_HUD heading==%f frsky==%f" % (vfr_hud_yaw, yaw)) + if self.compare_number_percent(vfr_hud_yaw,yaw,10): + return True + return False + def tfs_validate_vspeed(self, value): + self.progress("validating vspeed (0x%02x)" % value) + vspeed = value + vfr_hud = self.mav.recv_match( + type='VFR_HUD', + blocking=True, + timeout=1 + ) + if vfr_hud is None: + raise NotAchievedException("Did not get VFR_HUD message") + vfr_hud_vspeed = round(vfr_hud.climb) + self.progress("VFR_HUD climb==%f frsky==%f" % (vfr_hud_vspeed, vspeed)) + if self.compare_number_percent(vfr_hud_vspeed,vspeed,10): + return True + return False + def tfs_validate_battery1(self, value): + self.progress("validating battery1 (0x%02x)" % value) + voltage = value/10.0 + batt = self.mav.recv_match( + type='BATTERY_STATUS', + blocking=True, + timeout=5, + condition="BATTERY_STATUS.id==0" + ) + if batt is None: + raise NotAchievedException("Did not get BATTERY_STATUS message") + battery_status_value = batt.voltages[0]/1000.0 + self.progress("BATTERY_STATUS volatge==%f frsky==%f" % (battery_status_value, voltage)) + if self.compare_number_percent(battery_status_value,voltage,10): + return True + return False + def tfs_validate_current1(self, value): + self.progress("validating battery1 (0x%02x)" % value) + current = value + batt = self.mav.recv_match( + type='BATTERY_STATUS', + blocking=True, + timeout=5, + condition="BATTERY_STATUS.id==0" + ) + if batt is None: + raise NotAchievedException("Did not get BATTERY_STATUS message") + battery_status_current = batt.current_battery/100.0 + self.progress("BATTERY_STATUS current==%f frsky==%f" % (battery_status_current, current)) + if self.compare_number_percent(battery_status_current,current,10): + return True + return False + def tfs_validate_fuel(self, value): + self.progress("validating fuel (0x%02x)" % value) + fuel = value + batt = self.mav.recv_match( + type='BATTERY_STATUS', + blocking=True, + timeout=5, + condition="BATTERY_STATUS.id==0" + ) + if batt is None: + raise NotAchievedException("Did not get BATTERY_STATUS message") + battery_status_fuel = batt.battery_remaining + self.progress("BATTERY_STATUS fuel==%f frsky==%f" % (battery_status_fuel, fuel)) + if self.compare_number_percent(battery_status_fuel,fuel,10): + return True + return False + + def tfs_validate_tmp1(self, value): + self.progress("validating tmp1 (0x%02x)" % value) + tmp1 = value + heartbeat = self.mav.recv_match( + type='HEARTBEAT', + blocking=True, + timeout=1 + ) + if heartbeat is None: + raise NotAchievedException("Did not get HEARTBEAT message") + heartbeat_tmp1 = heartbeat.custom_mode + self.progress("GLOBAL_POSITION_INT custom_mode==%f frsky==%f" % (heartbeat_tmp1, tmp1)) + if heartbeat_tmp1 == tmp1: + return True + return False + def tfs_validate_tmp2(self, value): + self.progress("validating tmp2 (0x%02x)" % value) + tmp2 = value + gps_raw = self.mav.recv_match( + type='GPS_RAW_INT', + blocking=True, + timeout=1 + ) + if gps_raw is None: + raise NotAchievedException("Did not get GPS_RAW_INT message") + gps_raw_tmp2 = gps_raw.satellites_visible*10 + gps_raw.fix_type + self.progress("GPS_RAW_INT tmp2==%f frsky==%f" % (gps_raw_tmp2, tmp2)) + if gps_raw_tmp2 == tmp2: + return True + return False def test_frsky_sport(self): self.set_parameter("SERIAL5_PROTOCOL", 4) # serial5 is FRSky sport self.customise_SITL_commandline([ @@ -6209,14 +6555,20 @@ switch value''' # This, at least makes sure we're getting some of each # message. wants = { - 0x02: lambda x : True, - 0x04: lambda x : True, - 0x05: lambda x : True, - 0x10: lambda x : True, - 0x21: lambda x : True, - 0x30: lambda x : True, - 0x39: lambda x : True, - 0x800: lambda x : True, + 0x01: self.tfs_validate_gps_alt, # gps altitude integer m + 0x02: self.tfs_validate_tmp1, # Tmp1 + 0x04: self.tfs_validate_fuel, # fuel + 0x05: self.tfs_validate_tmp2, # Tmp2 + 0x09: lambda x : True, # gps altitude decimal cm + 0x10: self.tfs_validate_baro_alt, # baro alt integer m + 0x11: self.tfs_validate_gps_speed, # gps speed integer m/s + 0x14: self.tfs_validate_yaw, # yaw in degrees + 0x19: lambda x : True, # gps speed decimal cm/s + 0x21: lambda x : True, # altitude decimal m + 0x28: self.tfs_validate_current1, # current A + 0x30: self.tfs_validate_vspeed, # vertical speed m/s + 0x39: self.tfs_validate_battery1, # battery 1 voltage + 0x800: self.tf_validate_gps, # gps lat/lon } tstart = self.get_sim_time_cached() last_wanting_print = 0 @@ -6226,7 +6578,7 @@ switch value''' self.progress("Still wanting (%s)" % ",".join([ ("0x%02x" % x) for x in wants.keys()])) last_wanting_print = now wants_copy = copy.copy(wants) - if now - tstart > 10: + if now - tstart > 60: raise AutoTestTimeoutException("Failed to get frsky data") frsky.update() for want in wants_copy: