Signed-off-by: 吴文峰 <kevin@lmve.net>

This commit is contained in:
2026-03-03 22:16:00 +08:00
parent 7ae6e9e999
commit 88d56e1e9e
1660 changed files with 281430 additions and 0 deletions
@@ -0,0 +1,40 @@
#include "concurrency/BinarySemaphoreFreeRTOS.h"
#include "configuration.h"
#include <assert.h>
#ifdef HAS_FREE_RTOS
namespace concurrency
{
BinarySemaphoreFreeRTOS::BinarySemaphoreFreeRTOS() : semaphore(xSemaphoreCreateBinary())
{
assert(semaphore);
}
BinarySemaphoreFreeRTOS::~BinarySemaphoreFreeRTOS()
{
vSemaphoreDelete(semaphore);
}
/**
* Returns false if we were interrupted
*/
bool BinarySemaphoreFreeRTOS::take(uint32_t msec)
{
return xSemaphoreTake(semaphore, pdMS_TO_TICKS(msec));
}
void BinarySemaphoreFreeRTOS::give()
{
xSemaphoreGive(semaphore);
}
IRAM_ATTR void BinarySemaphoreFreeRTOS::giveFromISR(BaseType_t *pxHigherPriorityTaskWoken)
{
xSemaphoreGiveFromISR(semaphore, pxHigherPriorityTaskWoken);
}
} // namespace concurrency
#endif
@@ -0,0 +1,30 @@
#pragma once
#include "../freertosinc.h"
namespace concurrency
{
#ifdef HAS_FREE_RTOS
class BinarySemaphoreFreeRTOS
{
SemaphoreHandle_t semaphore;
public:
BinarySemaphoreFreeRTOS();
~BinarySemaphoreFreeRTOS();
/**
* Returns false if we timed out
*/
bool take(uint32_t msec);
void give();
void giveFromISR(BaseType_t *pxHigherPriorityTaskWoken);
};
#endif
} // namespace concurrency
@@ -0,0 +1,28 @@
#include "concurrency/BinarySemaphorePosix.h"
#include "configuration.h"
#ifndef HAS_FREE_RTOS
namespace concurrency
{
BinarySemaphorePosix::BinarySemaphorePosix() {}
BinarySemaphorePosix::~BinarySemaphorePosix() {}
/**
* Returns false if we timed out
*/
bool BinarySemaphorePosix::take(uint32_t msec)
{
delay(msec); // FIXME
return false;
}
void BinarySemaphorePosix::give() {}
IRAM_ATTR void BinarySemaphorePosix::giveFromISR(BaseType_t *pxHigherPriorityTaskWoken) {}
} // namespace concurrency
#endif
@@ -0,0 +1,30 @@
#pragma once
#include "../freertosinc.h"
namespace concurrency
{
#ifndef HAS_FREE_RTOS
class BinarySemaphorePosix
{
// SemaphoreHandle_t semaphore;
public:
BinarySemaphorePosix();
~BinarySemaphorePosix();
/**
* Returns false if we timed out
*/
bool take(uint32_t msec);
void give();
void giveFromISR(BaseType_t *pxHigherPriorityTaskWoken);
};
#endif
} // namespace concurrency
@@ -0,0 +1,35 @@
#include "concurrency/InterruptableDelay.h"
#include "configuration.h"
namespace concurrency
{
InterruptableDelay::InterruptableDelay() {}
InterruptableDelay::~InterruptableDelay() {}
/**
* Returns false if we were interrupted
*/
bool InterruptableDelay::delay(uint32_t msec)
{
// LOG_DEBUG("delay %u ", msec);
// sem take will return false if we timed out (i.e. were not interrupted)
bool r = semaphore.take(msec);
// LOG_DEBUG("interrupt=%d", r);
return !r;
}
void InterruptableDelay::interrupt()
{
semaphore.give();
}
IRAM_ATTR void InterruptableDelay::interruptFromISR(BaseType_t *pxHigherPriorityTaskWoken)
{
semaphore.giveFromISR(pxHigherPriorityTaskWoken);
}
} // namespace concurrency
@@ -0,0 +1,41 @@
#pragma once
#include "../freertosinc.h"
#ifdef HAS_FREE_RTOS
#include "concurrency/BinarySemaphoreFreeRTOS.h"
#define BinarySemaphore BinarySemaphoreFreeRTOS
#else
#include "concurrency/BinarySemaphorePosix.h"
#define BinarySemaphore BinarySemaphorePosix
#endif
namespace concurrency
{
/**
* An object that provides delay(msec) like functionality, but can be interrupted by calling interrupt().
*
* Useful for they top level loop() delay call to keep the CPU powered down until our next scheduled event or some external event.
*
* This is implemented for FreeRTOS but should be easy to port to other operating systems.
*/
class InterruptableDelay
{
BinarySemaphore semaphore;
public:
InterruptableDelay();
~InterruptableDelay();
/**
* Returns false if we were interrupted
*/
bool delay(uint32_t msec);
void interrupt();
void interruptFromISR(BaseType_t *pxHigherPriorityTaskWoken);
};
} // namespace concurrency
+38
View File
@@ -0,0 +1,38 @@
#include "Lock.h"
#include "configuration.h"
#include <cassert>
namespace concurrency
{
#ifdef HAS_FREE_RTOS
Lock::Lock() : handle(xSemaphoreCreateBinary())
{
assert(handle);
if (xSemaphoreGive(handle) == false) {
abort();
}
}
void Lock::lock()
{
if (xSemaphoreTake(handle, portMAX_DELAY) == false) {
abort();
}
}
void Lock::unlock()
{
if (xSemaphoreGive(handle) == false) {
abort();
}
}
#else
Lock::Lock() {}
void Lock::lock() {}
void Lock::unlock() {}
#endif
} // namespace concurrency
+35
View File
@@ -0,0 +1,35 @@
#pragma once
#include "../freertosinc.h"
namespace concurrency
{
/**
* @brief Simple wrapper around FreeRTOS API for implementing a mutex lock
*/
class Lock
{
public:
Lock();
Lock(const Lock &) = delete;
Lock &operator=(const Lock &) = delete;
/// Locks the lock.
//
// Must not be called from an ISR.
void lock();
// Unlocks the lock.
//
// Must not be called from an ISR.
void unlock();
private:
#ifdef HAS_FREE_RTOS
SemaphoreHandle_t handle;
#endif
};
} // namespace concurrency
@@ -0,0 +1,17 @@
#include "LockGuard.h"
#include "configuration.h"
namespace concurrency
{
LockGuard::LockGuard(Lock *lock) : lock(lock)
{
lock->lock();
}
LockGuard::~LockGuard()
{
lock->unlock();
}
} // namespace concurrency
+24
View File
@@ -0,0 +1,24 @@
#pragma once
#include "Lock.h"
namespace concurrency
{
/**
* @brief RAII lock guard
*/
class LockGuard
{
public:
explicit LockGuard(Lock *lock);
~LockGuard();
LockGuard(const LockGuard &) = delete;
LockGuard &operator=(const LockGuard &) = delete;
private:
Lock *lock;
};
} // namespace concurrency
@@ -0,0 +1,96 @@
#include "NotifiedWorkerThread.h"
#include "configuration.h"
#include "main.h"
namespace concurrency
{
static bool debugNotification;
/**
* Notify this thread so it can run
*/
bool NotifiedWorkerThread::notify(uint32_t v, bool overwrite)
{
bool r = notifyCommon(v, overwrite);
if (r)
mainDelay.interrupt();
return r;
}
/**
* Notify this thread so it can run
*/
IRAM_ATTR bool NotifiedWorkerThread::notifyCommon(uint32_t v, bool overwrite)
{
if (overwrite || notification == 0) {
enabled = true;
setInterval(0); // Run ASAP
runASAP = true;
notification = v;
if (debugNotification) {
LOG_DEBUG("Set notification %d", v);
}
return true;
} else {
if (debugNotification) {
LOG_DEBUG("Drop notification %d", v);
}
return false;
}
}
/**
* Notify from an ISR
*
* This must be inline or IRAM_ATTR on ESP32
*/
IRAM_ATTR bool NotifiedWorkerThread::notifyFromISR(BaseType_t *highPriWoken, uint32_t v, bool overwrite)
{
bool r = notifyCommon(v, overwrite);
if (r)
mainDelay.interruptFromISR(highPriWoken);
return r;
}
/**
* Schedule a notification to fire in delay msecs
*/
bool NotifiedWorkerThread::notifyLater(uint32_t delay, uint32_t v, bool overwrite)
{
bool didIt = notify(v, overwrite);
if (didIt) { // If we didn't already have something queued, override the delay to be larger
setIntervalFromNow(delay); // a new version of setInterval relative to the current time
if (debugNotification) {
LOG_DEBUG("Delay notification %u", delay);
}
}
return didIt;
}
void NotifiedWorkerThread::checkNotification()
{
// Atomically read and clear. (This avoids a potential race condition where an interrupt handler could set a new notification
// after checkNotification reads but before it clears, which would cause us to miss that notification until the next one comes
// in.)
auto n = notification.exchange(0); // read+clear atomically: like `n = notification; notification = 0;` but interrupt-safe
if (n) {
onNotify(n);
}
}
int32_t NotifiedWorkerThread::runOnce()
{
enabled = false; // Only run once per notification
checkNotification();
return RUN_SAME;
}
} // namespace concurrency
@@ -0,0 +1,57 @@
#pragma once
#include "OSThread.h"
#include <atomic>
namespace concurrency
{
/**
* @brief A worker thread that waits on a freertos notification
*/
class NotifiedWorkerThread : public OSThread
{
/**
* The notification that was most recently used to wake the thread. Read from runOnce()
*/
std::atomic<uint32_t> notification{0};
public:
NotifiedWorkerThread(const char *name) : OSThread(name) {}
/**
* Notify this thread so it can run
*/
bool notify(uint32_t v, bool overwrite);
/**
* Notify from an ISR
*
* This must be inline or IRAM_ATTR on ESP32
*/
bool notifyFromISR(BaseType_t *highPriWoken, uint32_t v, bool overwrite);
/**
* Schedule a notification to fire in delay msecs
*/
bool notifyLater(uint32_t delay, uint32_t v, bool overwrite);
protected:
virtual void onNotify(uint32_t notification) = 0;
/// just calls checkNotification()
virtual int32_t runOnce() override;
/// Sometimes we might want to check notifications independently of when our thread was getting woken up (i.e. if we are about
/// to change radio transmit/receive modes we want to handle any pending interrupts first). You can call this method and if
/// any notifications are currently pending they will be handled immediately.
void checkNotification();
private:
/**
* Notify this thread so it can run
*/
bool notifyCommon(uint32_t v, bool overwrite);
};
} // namespace concurrency
+144
View File
@@ -0,0 +1,144 @@
#include "OSThread.h"
#include "configuration.h"
#include "memGet.h"
#include <assert.h>
namespace concurrency
{
/// Show debugging info for disabled threads
bool OSThread::showDisabled;
/// Show debugging info for threads when we run them
bool OSThread::showRun = false;
/// Show debugging info for threads we decide not to run;
bool OSThread::showWaiting = false;
const OSThread *OSThread::currentThread;
ThreadController mainController, timerController;
InterruptableDelay mainDelay;
void OSThread::setup()
{
mainController.ThreadName = "mainController";
timerController.ThreadName = "timerController";
}
OSThread::OSThread(const char *_name, uint32_t period, ThreadController *_controller)
: Thread(NULL, period), controller(_controller)
{
assertIsSetup();
ThreadName = _name;
if (controller) {
bool added = controller->add(this);
assert(added);
}
}
OSThread::~OSThread()
{
if (controller)
controller->remove(this);
}
/**
* Wait a specified number msecs starting from the current time (rather than the last time we were run)
*/
void OSThread::setIntervalFromNow(unsigned long _interval)
{
// Save interval
interval = _interval;
// Cache the next run based on the last_run
_cached_next_run = millis() + interval;
}
bool OSThread::shouldRun(unsigned long time)
{
bool r = Thread::shouldRun(time);
if (showRun && r) {
LOG_DEBUG("Thread %s: run", ThreadName.c_str());
}
if (showWaiting && enabled && !r) {
LOG_DEBUG("Thread %s: wait %lu", ThreadName.c_str(), interval);
}
if (showDisabled && !enabled) {
LOG_DEBUG("Thread %s: disabled", ThreadName.c_str());
}
return r;
}
void OSThread::run()
{
#ifdef DEBUG_HEAP
auto heap = memGet.getFreeHeap();
#endif
currentThread = this;
auto newDelay = runOnce();
#ifdef DEBUG_HEAP
auto newHeap = memGet.getFreeHeap();
if (newHeap < heap)
LOG_HEAP("------ Thread %s leaked heap %d -> %d (%d) ------", ThreadName.c_str(), heap, newHeap, newHeap - heap);
if (heap < newHeap)
LOG_HEAP("++++++ Thread %s freed heap %d -> %d (%d) ++++++", ThreadName.c_str(), heap, newHeap, newHeap - heap);
#endif
#ifdef DEBUG_LOOP_TIMING
LOG_DEBUG("====== Thread next run in: %d", newDelay);
#endif
runned();
if (newDelay >= 0)
setInterval(newDelay);
currentThread = NULL;
}
int32_t OSThread::disable()
{
enabled = false;
setInterval(INT32_MAX);
return INT32_MAX;
}
/**
* This flag is set **only** when setup() starts, to provide a way for us to check for sloppy static constructor calls.
* Call assertIsSetup() to force a crash if someone tries to create an instance too early.
*
* it is super important to never allocate those object statically. instead, you should explicitly
* new them at a point where you are guaranteed that other objects that this instance
* depends on have already been created.
*
* in particular, for OSThread that means "all instances must be declared via new() in setup() or later" -
* this makes it guaranteed that the global mainController is fully constructed first.
*/
bool hasBeenSetup;
void assertIsSetup()
{
/**
* Dear developer comrade - If this assert fails() that means you need to fix the following:
*
* This flag is set **only** when setup() starts, to provide a way for us to check for sloppy static constructor calls.
* Call assertIsSetup() to force a crash if someone tries to create an instance too early.
*
* it is super important to never allocate those object statically. instead, you should explicitly
* new them at a point where you are guaranteed that other objects that this instance
* depends on have already been created.
*
* in particular, for OSThread that means "all instances must be declared via new() in setup() or later" -
* this makes it guaranteed that the global mainController is fully constructed first.
*/
assert(hasBeenSetup);
}
} // namespace concurrency
+91
View File
@@ -0,0 +1,91 @@
#pragma once
#include <cstdlib>
#include <stdint.h>
#include "Thread.h"
#include "ThreadController.h"
#include "concurrency/InterruptableDelay.h"
namespace concurrency
{
extern ThreadController mainController, timerController;
extern InterruptableDelay mainDelay;
#define RUN_SAME -1
/**
* @brief Base threading
*
* This is a pseudo threading layer that is super easy to port, well suited to our slow network and very ram & power efficient.
*
* TODO FIXME @geeksville
*
* move more things into OSThreads
* remove lock/lockguard
*
* move typedQueue into concurrency
* remove freertos from typedqueue
*/
class OSThread : public Thread
{
ThreadController *controller;
/// Show debugging info for disabled threads
static bool showDisabled;
/// Show debugging info for threads when we run them
static bool showRun;
/// Show debugging info for threads we decide not to run;
static bool showWaiting;
public:
/// For debug printing only (might be null)
static const OSThread *currentThread;
OSThread(const char *name, uint32_t period = 0, ThreadController *controller = &mainController);
virtual ~OSThread();
virtual bool shouldRun(unsigned long time);
static void setup();
virtual int32_t disable();
/**
* Wait a specified number msecs starting from the current time (rather than the last time we were run)
*/
void setIntervalFromNow(unsigned long _interval);
protected:
/**
* The method that will be called each time our thread gets a chance to run
*
* Returns desired period for next invocation (or RUN_SAME for no change)
*/
virtual int32_t runOnce() = 0;
bool sleepOnNextExecution = false;
// Do not override this
virtual void run();
};
/**
* This flag is set **only** when setup() starts, to provide a way for us to check for sloppy static constructor calls.
* Call assertIsSetup() to force a crash if someone tries to create an instance too early.
*
* it is super important to never allocate those object statically. instead, you should explicitly
* new them at a point where you are guaranteed that other objects that this instance
* depends on have already been created.
*
* in particular, for OSThread that means "all instances must be declared via new() in setup() or later" -
* this makes it guaranteed that the global mainController is fully constructed first.
*/
extern bool hasBeenSetup;
void assertIsSetup();
} // namespace concurrency
+29
View File
@@ -0,0 +1,29 @@
#pragma once
#include <functional>
#include <utility>
#include "concurrency/OSThread.h"
namespace concurrency
{
/**
* @brief Periodically invoke a callback.
* Supports both legacy function pointers and modern callables.
*/
class Periodic : public OSThread
{
public:
// callback returns the period for the next callback invocation (or 0 if we should no longer be called)
Periodic(const char *name, int32_t (*cb)()) : OSThread(name), callback(cb) {}
Periodic(const char *name, std::function<int32_t()> cb) : OSThread(name), callback(std::move(cb)) {}
protected:
int32_t runOnce() override { return callback ? callback() : 0; }
private:
std::function<int32_t()> callback;
};
} // namespace concurrency