@ -59,10 +59,11 @@ static BlockingList<WorkQueue *> *_wq_manager_wqs_list{nullptr};
@@ -59,10 +59,11 @@ static BlockingList<WorkQueue *> *_wq_manager_wqs_list{nullptr};
// queue of WorkQueues to be created (as threads in the wq manager task)
static BlockingQueue < const wq_config_t * , 1 > * _wq_manager_create_queue { nullptr } ;
static px4 : : atomic_bool _wq_manager_should_exit { fals e} ;
static px4 : : atomic_bool _wq_manager_should_exit { tru e} ;
static WorkQueue * FindWorkQueueByName ( const char * name )
static WorkQueue *
FindWorkQueueByName ( const char * name )
{
if ( _wq_manager_wqs_list = = nullptr ) {
PX4_ERR ( " not running " ) ;
@ -81,7 +82,8 @@ static WorkQueue *FindWorkQueueByName(const char *name)
@@ -81,7 +82,8 @@ static WorkQueue *FindWorkQueueByName(const char *name)
return nullptr ;
}
WorkQueue * WorkQueueFindOrCreate ( const wq_config_t & new_wq )
WorkQueue *
WorkQueueFindOrCreate ( const wq_config_t & new_wq )
{
if ( _wq_manager_create_queue = = nullptr ) {
PX4_ERR ( " not running " ) ;
@ -116,7 +118,8 @@ WorkQueue *WorkQueueFindOrCreate(const wq_config_t &new_wq)
@@ -116,7 +118,8 @@ WorkQueue *WorkQueueFindOrCreate(const wq_config_t &new_wq)
return wq ;
}
const wq_config_t & device_bus_to_wq ( uint32_t device_id_int )
const wq_config_t &
device_bus_to_wq ( uint32_t device_id_int )
{
union device : : Device : : DeviceId device_id ;
device_id . devid = device_id_int ;
@ -155,7 +158,8 @@ const wq_config_t &device_bus_to_wq(uint32_t device_id_int)
@@ -155,7 +158,8 @@ const wq_config_t &device_bus_to_wq(uint32_t device_id_int)
return wq_configurations : : hp_default ;
} ;
static void * WorkQueueRunner ( void * context )
static void *
WorkQueueRunner ( void * context )
{
wq_config_t * config = static_cast < wq_config_t * > ( context ) ;
WorkQueue wq ( * config ) ;
@ -171,7 +175,8 @@ static void *WorkQueueRunner(void *context)
@@ -171,7 +175,8 @@ static void *WorkQueueRunner(void *context)
return nullptr ;
}
static int WorkQueueManagerRun ( int , char * * )
static int
WorkQueueManagerRun ( int , char * * )
{
_wq_manager_wqs_list = new BlockingList < WorkQueue * > ( ) ;
_wq_manager_create_queue = new BlockingQueue < const wq_config_t * , 1 > ( ) ;
@ -232,7 +237,7 @@ static int WorkQueueManagerRun(int, char **)
@@ -232,7 +237,7 @@ static int WorkQueueManagerRun(int, char **)
int ret_create = pthread_create ( & thread , & attr , WorkQueueRunner , ( void * ) wq ) ;
if ( ret_create = = 0 ) {
PX4_INFO ( " crea ting: %s, priority: %d, stack: %zu bytes " , wq - > name , param . sched_priority , stacksize ) ;
PX4_DEBUG ( " star ting: %s, priority: %d, stack: %zu bytes " , wq - > name , param . sched_priority , stacksize ) ;
} else {
PX4_ERR ( " failed to create thread for %s (%i): %s " , wq - > name , ret_create , strerror ( ret_create ) ) ;
@ -250,46 +255,113 @@ static int WorkQueueManagerRun(int, char **)
@@ -250,46 +255,113 @@ static int WorkQueueManagerRun(int, char **)
return 0 ;
}
int WorkQueueManagerStart ( )
int
WorkQueueManagerStart ( )
{
int task_id = px4_task_spawn_cmd ( " wq:manager " ,
SCHED_DEFAULT ,
PX4_WQ_HP_BASE ,
1200 ,
( px4_main_t ) & WorkQueueManagerRun ,
nullptr ) ;
if ( task_id < 0 ) {
PX4_ERR ( " task start failed (%i) " , task_id ) ;
return - errno ;
if ( _wq_manager_should_exit . load ( ) & & ( _wq_manager_create_queue = = nullptr ) ) {
_wq_manager_should_exit . store ( false ) ;
int task_id = px4_task_spawn_cmd ( " wq:manager " ,
SCHED_DEFAULT ,
PX4_WQ_HP_BASE ,
1280 ,
( px4_main_t ) & WorkQueueManagerRun ,
nullptr ) ;
if ( task_id < 0 ) {
_wq_manager_should_exit . store ( true ) ;
PX4_ERR ( " task start failed (%i) " , task_id ) ;
return - errno ;
}
} else {
PX4_WARN ( " already running " ) ;
return PX4_ERROR ;
}
return 0 ;
return PX4_OK ;
}
int WorkQueueManagerStop ( )
int
WorkQueueManagerStop ( )
{
if ( _wq_manager_wqs_list ! = nullptr ) {
auto lg = _wq_manager_wqs_list - > getLockGuard ( ) ;
if ( ! _wq_manager_should_exit . load ( ) ) {
// ask all work queues (threads) to stop
// NOTE: not currently safe without all WorkItems stopping first
for ( WorkQueue * wq : * _wq_manager_wqs_list ) {
wq - > request_stop ( ) ;
// error can't shutdown until all WorkItems are removed/stopped
if ( ( _wq_manager_wqs_list ! = nullptr ) & & ( _wq_manager_wqs_list - > size ( ) > 0 ) ) {
PX4_ERR ( " can't shutdown with active WQs " ) ;
WorkQueueManagerStatus ( ) ;
return PX4_ERROR ;
}
delete _wq_manager_wqs_list ;
// first ask all WQs to stop
if ( _wq_manager_wqs_list ! = nullptr ) {
{
auto lg = _wq_manager_wqs_list - > getLockGuard ( ) ;
// ask all work queues (threads) to stop
// NOTE: not currently safe without all WorkItems stopping first
for ( WorkQueue * wq : * _wq_manager_wqs_list ) {
wq - > request_stop ( ) ;
}
}
// wait until they're all stopped (empty list)
while ( _wq_manager_wqs_list - > size ( ) > 0 ) {
px4_usleep ( 1000 ) ;
}
delete _wq_manager_wqs_list ;
}
_wq_manager_should_exit . store ( true ) ;
if ( _wq_manager_create_queue ! = nullptr ) {
// push nullptr to wake the wq manager task
_wq_manager_create_queue - > push ( nullptr ) ;
px4_usleep ( 10000 ) ;
delete _wq_manager_create_queue ;
}
} else {
PX4_WARN ( " not running " ) ;
return PX4_ERROR ;
}
_wq_manager_should_exit . store ( true ) ;
return PX4_OK ;
}
if ( _wq_manager_create_queue ! = nullptr ) {
// push nullptr to wake the wq manager task
_wq_manager_create_queue - > push ( nullptr ) ;
int
WorkQueueManagerStatus ( )
{
if ( ! _wq_manager_should_exit . load ( ) & & ( _wq_manager_wqs_list ! = nullptr ) ) {
const size_t num_wqs = _wq_manager_wqs_list - > size ( ) ;
PX4_INFO_RAW ( " \n Work Queue: %-1zu threads RATE INTERVAL \n " , num_wqs ) ;
auto lg = _wq_manager_wqs_list - > getLockGuard ( ) ;
size_t i = 0 ;
px4_usleep ( 1000 ) ;
for ( WorkQueue * wq : * _wq_manager_wqs_list ) {
i + + ;
const bool last_wq = ! ( i < num_wqs ) ;
if ( ! last_wq ) {
PX4_INFO_RAW ( " |__ %zu) " , i ) ;
} else {
PX4_INFO_RAW ( " \\ __ %zu) " , i ) ;
}
wq - > print_status ( last_wq ) ;
}
delete _wq_manager_create_queue ;
} else {
PX4_INFO ( " not running " ) ;
}
return PX4_OK ;