You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
182 lines
6.1 KiB
182 lines
6.1 KiB
6 years ago
|
#! /usr/bin/env python3
|
||
|
"""
|
||
|
detectors
|
||
|
"""
|
||
|
from typing import Optional
|
||
|
import numpy as np
|
||
|
from pyulog import ULog
|
||
|
|
||
|
|
||
|
class PreconditionError(Exception):
|
||
|
"""
|
||
|
a class for a Precondition Error
|
||
|
"""
|
||
|
|
||
|
|
||
|
class Airtime(object):
|
||
|
"""
|
||
|
Airtime struct.
|
||
|
"""
|
||
|
def __init__(self, take_off: float, landing: float):
|
||
|
self.take_off = take_off
|
||
|
self.landing = landing
|
||
|
|
||
|
|
||
|
class InAirDetector(object):
|
||
|
"""
|
||
|
handles airtime detection.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, ulog: ULog, min_flight_time_seconds: float = 0.0,
|
||
|
in_air_margin_seconds: float = 0.0) -> None:
|
||
|
"""
|
||
|
initializes an InAirDetector instance.
|
||
|
:param ulog:
|
||
|
:param min_flight_time_seconds: set this value to return only airtimes that are at least
|
||
|
min_flight_time_seconds long
|
||
|
:param in_air_margin_seconds: removes a margin of in_air_margin_seconds from the airtime
|
||
|
to avoid ground effects.
|
||
|
"""
|
||
|
|
||
|
self._ulog = ulog
|
||
|
self._min_flight_time_seconds = min_flight_time_seconds
|
||
|
self._in_air_margin_seconds = in_air_margin_seconds
|
||
|
|
||
|
try:
|
||
|
self._vehicle_land_detected = ulog.get_dataset('vehicle_land_detected').data
|
||
|
self._landed = self._vehicle_land_detected['landed']
|
||
|
except:
|
||
|
self._in_air = []
|
||
|
raise PreconditionError(
|
||
|
'InAirDetector: Could not find vehicle land detected message and/or landed field'
|
||
|
' and thus not find any airtime.')
|
||
|
|
||
|
self._log_start = self._ulog.start_timestamp / 1.0e6
|
||
|
|
||
|
self._in_air = self._detect_airtime()
|
||
|
|
||
|
|
||
|
def _detect_airtime(self) -> Optional[Airtime]:
|
||
|
"""
|
||
|
detects the airtime take_off and landing of a ulog.
|
||
|
:return: a named tuple of ('Airtime', ['take_off', 'landing']) or None.
|
||
|
"""
|
||
|
|
||
|
# test whether flight was in air at all
|
||
|
if (self._landed > 0).all():
|
||
|
print('InAirDetector: always on ground.')
|
||
|
return []
|
||
|
|
||
|
# find the indices of all take offs and landings
|
||
|
take_offs = np.where(np.diff(self._landed) < 0)[0].tolist()
|
||
|
landings = np.where(np.diff(self._landed) > 0)[0].tolist()
|
||
|
|
||
|
# check for start in air.
|
||
|
if len(take_offs) == 0 or ((len(landings) > 0) and (landings[0] < take_offs[0])):
|
||
|
|
||
|
print('Started in air. Take first timestamp value as start point.')
|
||
|
take_offs = [-1] + take_offs
|
||
|
|
||
|
# correct for offset: add 1 to take_off list
|
||
|
take_offs = [take_off + 1 for take_off in take_offs]
|
||
|
if len(landings) < len(take_offs):
|
||
|
print('No final landing detected. Assume last timestamp is landing.')
|
||
|
landings += [len(self._landed) - 2]
|
||
|
# correct for offset: add 1 to landing list
|
||
|
landings = [landing + 1 for landing in landings]
|
||
|
|
||
|
assert len(landings) == len(take_offs), 'InAirDetector: different number of take offs' \
|
||
|
' and landings.'
|
||
|
|
||
|
in_air = []
|
||
|
for take_off, landing in zip(take_offs, landings):
|
||
|
if (self._vehicle_land_detected['timestamp'][landing] / 1e6 -
|
||
|
self._in_air_margin_seconds) - \
|
||
|
(self._vehicle_land_detected['timestamp'][take_off] / 1e6 +
|
||
|
self._in_air_margin_seconds) >= self._min_flight_time_seconds:
|
||
|
in_air.append(Airtime(
|
||
|
take_off=(self._vehicle_land_detected['timestamp'][take_off] -
|
||
|
self._ulog.start_timestamp) / 1.0e6 + self._in_air_margin_seconds,
|
||
|
landing=(self._vehicle_land_detected['timestamp'][landing] -
|
||
|
self._ulog.start_timestamp) / 1.0e6 - self._in_air_margin_seconds))
|
||
|
if len(in_air) == 0:
|
||
|
print('InAirDetector: no airtime detected.')
|
||
|
|
||
|
return in_air
|
||
|
|
||
|
@property
|
||
|
def airtimes(self):
|
||
|
"""
|
||
|
airtimes
|
||
|
:return:
|
||
|
"""
|
||
|
return self._in_air
|
||
|
|
||
|
@property
|
||
|
def take_off(self) -> Optional[float]:
|
||
|
"""
|
||
|
first take off
|
||
|
:return:
|
||
|
"""
|
||
|
return self._in_air[0].take_off if self._in_air else None
|
||
|
|
||
|
@property
|
||
|
def landing(self) -> Optional[float]:
|
||
|
"""
|
||
|
last landing
|
||
|
:return: the last landing of the flight.
|
||
|
"""
|
||
|
return self._in_air[-1].landing if self._in_air else None
|
||
|
|
||
|
@property
|
||
|
def log_start(self) -> Optional[float]:
|
||
|
"""
|
||
|
log start
|
||
|
:return: the start time of the log.
|
||
|
"""
|
||
|
return self._log_start
|
||
|
|
||
|
def get_take_off_to_last_landing(self, dataset) -> list:
|
||
|
"""
|
||
|
return all indices of the log file between the first take_off and the
|
||
|
last landing.
|
||
|
:param dataset:
|
||
|
:return:
|
||
|
"""
|
||
|
try:
|
||
|
data = self._ulog.get_dataset(dataset).data
|
||
|
except:
|
||
|
print('InAirDetector: {:s} not found in log.'.format(dataset))
|
||
|
return []
|
||
|
|
||
|
if self._in_air:
|
||
|
airtime = np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >=
|
||
|
self._in_air[0].take_off) & (
|
||
|
(data['timestamp'] - self._ulog.start_timestamp) /
|
||
|
1.0e6 < self._in_air[-1].landing))[0]
|
||
|
|
||
|
else:
|
||
|
airtime = []
|
||
|
|
||
|
return airtime
|
||
|
|
||
|
def get_airtime(self, dataset) -> list:
|
||
|
"""
|
||
|
return all indices of the log file that are in air
|
||
|
:param dataset:
|
||
|
:return:
|
||
|
"""
|
||
|
try:
|
||
|
data = self._ulog.get_dataset(dataset).data
|
||
|
except:
|
||
|
raise PreconditionError('InAirDetector: {:s} not found in log.'.format(dataset))
|
||
|
|
||
|
airtime = []
|
||
|
if self._in_air is not None:
|
||
|
for i in range(len(self._in_air)):
|
||
|
airtime.extend(np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >=
|
||
|
self._in_air[i].take_off) & (
|
||
|
(data['timestamp'] - self._ulog.start_timestamp) /
|
||
|
1.0e6 < self._in_air[i].landing))[0])
|
||
|
|
||
|
return airtime
|