You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
182 lines
6.1 KiB
182 lines
6.1 KiB
#! /usr/bin/env python3 |
|
""" |
|
detectors |
|
""" |
|
from typing import Optional |
|
import numpy as np |
|
from pyulog import ULog |
|
|
|
|
|
class PreconditionError(Exception): |
|
""" |
|
a class for a Precondition Error |
|
""" |
|
|
|
|
|
class Airtime(object): |
|
""" |
|
Airtime struct. |
|
""" |
|
def __init__(self, take_off: float, landing: float): |
|
self.take_off = take_off |
|
self.landing = landing |
|
|
|
|
|
class InAirDetector(object): |
|
""" |
|
handles airtime detection. |
|
""" |
|
|
|
def __init__(self, ulog: ULog, min_flight_time_seconds: float = 0.0, |
|
in_air_margin_seconds: float = 0.0) -> None: |
|
""" |
|
initializes an InAirDetector instance. |
|
:param ulog: |
|
:param min_flight_time_seconds: set this value to return only airtimes that are at least |
|
min_flight_time_seconds long |
|
:param in_air_margin_seconds: removes a margin of in_air_margin_seconds from the airtime |
|
to avoid ground effects. |
|
""" |
|
|
|
self._ulog = ulog |
|
self._min_flight_time_seconds = min_flight_time_seconds |
|
self._in_air_margin_seconds = in_air_margin_seconds |
|
|
|
try: |
|
self._vehicle_land_detected = ulog.get_dataset('vehicle_land_detected').data |
|
self._landed = self._vehicle_land_detected['landed'] |
|
except: |
|
self._in_air = [] |
|
raise PreconditionError( |
|
'InAirDetector: Could not find vehicle land detected message and/or landed field' |
|
' and thus not find any airtime.') |
|
|
|
self._log_start = self._ulog.start_timestamp / 1.0e6 |
|
|
|
self._in_air = self._detect_airtime() |
|
|
|
|
|
def _detect_airtime(self) -> Optional[Airtime]: |
|
""" |
|
detects the airtime take_off and landing of a ulog. |
|
:return: a named tuple of ('Airtime', ['take_off', 'landing']) or None. |
|
""" |
|
|
|
# test whether flight was in air at all |
|
if (self._landed > 0).all(): |
|
print('InAirDetector: always on ground.') |
|
return [] |
|
|
|
# find the indices of all take offs and landings |
|
take_offs = np.where(np.diff(self._landed) < 0)[0].tolist() |
|
landings = np.where(np.diff(self._landed) > 0)[0].tolist() |
|
|
|
# check for start in air. |
|
if len(take_offs) == 0 or ((len(landings) > 0) and (landings[0] < take_offs[0])): |
|
|
|
print('Started in air. Take first timestamp value as start point.') |
|
take_offs = [-1] + take_offs |
|
|
|
# correct for offset: add 1 to take_off list |
|
take_offs = [take_off + 1 for take_off in take_offs] |
|
if len(landings) < len(take_offs): |
|
print('No final landing detected. Assume last timestamp is landing.') |
|
landings += [len(self._landed) - 2] |
|
# correct for offset: add 1 to landing list |
|
landings = [landing + 1 for landing in landings] |
|
|
|
assert len(landings) == len(take_offs), 'InAirDetector: different number of take offs' \ |
|
' and landings.' |
|
|
|
in_air = [] |
|
for take_off, landing in zip(take_offs, landings): |
|
if (self._vehicle_land_detected['timestamp'][landing] / 1e6 - |
|
self._in_air_margin_seconds) - \ |
|
(self._vehicle_land_detected['timestamp'][take_off] / 1e6 + |
|
self._in_air_margin_seconds) >= self._min_flight_time_seconds: |
|
in_air.append(Airtime( |
|
take_off=(self._vehicle_land_detected['timestamp'][take_off] - |
|
self._ulog.start_timestamp) / 1.0e6 + self._in_air_margin_seconds, |
|
landing=(self._vehicle_land_detected['timestamp'][landing] - |
|
self._ulog.start_timestamp) / 1.0e6 - self._in_air_margin_seconds)) |
|
if len(in_air) == 0: |
|
print('InAirDetector: no airtime detected.') |
|
|
|
return in_air |
|
|
|
@property |
|
def airtimes(self): |
|
""" |
|
airtimes |
|
:return: |
|
""" |
|
return self._in_air |
|
|
|
@property |
|
def take_off(self) -> Optional[float]: |
|
""" |
|
first take off |
|
:return: |
|
""" |
|
return self._in_air[0].take_off if self._in_air else None |
|
|
|
@property |
|
def landing(self) -> Optional[float]: |
|
""" |
|
last landing |
|
:return: the last landing of the flight. |
|
""" |
|
return self._in_air[-1].landing if self._in_air else None |
|
|
|
@property |
|
def log_start(self) -> Optional[float]: |
|
""" |
|
log start |
|
:return: the start time of the log. |
|
""" |
|
return self._log_start |
|
|
|
def get_take_off_to_last_landing(self, dataset) -> list: |
|
""" |
|
return all indices of the log file between the first take_off and the |
|
last landing. |
|
:param dataset: |
|
:return: |
|
""" |
|
try: |
|
data = self._ulog.get_dataset(dataset).data |
|
except: |
|
print('InAirDetector: {:s} not found in log.'.format(dataset)) |
|
return [] |
|
|
|
if self._in_air: |
|
airtime = np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >= |
|
self._in_air[0].take_off) & ( |
|
(data['timestamp'] - self._ulog.start_timestamp) / |
|
1.0e6 < self._in_air[-1].landing))[0] |
|
|
|
else: |
|
airtime = [] |
|
|
|
return airtime |
|
|
|
def get_airtime(self, dataset) -> list: |
|
""" |
|
return all indices of the log file that are in air |
|
:param dataset: |
|
:return: |
|
""" |
|
try: |
|
data = self._ulog.get_dataset(dataset).data |
|
except: |
|
raise PreconditionError('InAirDetector: {:s} not found in log.'.format(dataset)) |
|
|
|
airtime = [] |
|
if self._in_air is not None: |
|
for i in range(len(self._in_air)): |
|
airtime.extend(np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >= |
|
self._in_air[i].take_off) & ( |
|
(data['timestamp'] - self._ulog.start_timestamp) / |
|
1.0e6 < self._in_air[i].landing))[0]) |
|
|
|
return airtime
|
|
|