VOXL OpenVINS Server 1.0
Visual Inertial Odometry Server for VOXL Platform
Loading...
Searching...
No Matches
CameraQueueFusion.cpp
Go to the documentation of this file.
1/**
2 * @file CameraQueueFusion.cpp
3 * @brief Camera queue fusion system implementation for VOXL OpenVINS
4 * @author Zauberflote
5 * @date 2025
6 * @version 1.0
7 *
8 * This file implements the camera queue fusion system that synchronizes
9 * and combines data from multiple cameras. It provides temporal alignment
10 * and batch processing capabilities for multi-camera VIO systems.
11 *
12 * The implementation provides:
13 * - Multi-camera synchronization using bit masks
14 * - Temporal alignment of camera frames
15 * - Thread-safe queue management
16 * - Event-driven processing with condition variables
17 * - Batch processing capabilities
18 * - Background fusion thread management
19 */
20
21#include "CameraQueueFusion.h"
22#include "CameraManager.h"
23
24/**
25 * @brief Get singleton instance
26 *
27 * Returns the single instance of the CameraQueueFusion, creating it
28 * if it doesn't exist (lazy initialization). This ensures that only
29 * one fusion system exists throughout the application lifecycle.
30 *
31 * @return Reference to the singleton CameraQueueFusion instance
32 */
34{
35 static CameraQueueFusion instance;
36 return instance;
37}
38
39/**
40 * @brief Start the fusion system
41 *
42 * Initializes the fusion system with the specified number of cameras
43 * and starts the background fusion thread.
44 *
45 * The initialization process includes:
46 * - Validating the number of cameras (must be > 0 and <= MAX_CAMERA_COUNT)
47 * - Setting up the expected mask for camera readiness tracking
48 * - Starting the background fusion thread
49 * - Setting the running flag
50 *
51 * The expected mask is calculated as (1 << num_cams) - 1, which creates
52 * a bit mask with the lowest num_cams bits set to 1.
53 *
54 * @param num_cams Number of cameras to synchronize
55 */
56void CameraQueueFusion::start(size_t num_cams)
57{
58 if (running_.exchange(true))
59 return;
60
61 // Validate the number of cameras passed in
62 if (num_cams == 0 || num_cams > MAX_CAMERA_COUNT)
63 {
64 std::cerr << "Invalid number of cameras: " << num_cams << " (max: " << MAX_CAMERA_COUNT << ")" << std::endl;
65 running_.store(false, std::memory_order_release);
66 return;
67 }
68
69 num_cams_ = num_cams;
70 // Expect contiguous camera IDs 0..num_cams-1
71 expected_mask_ = (1u << num_cams_) -1;
72
73 fusion_thread_ = std::thread(&CameraQueueFusion::fusionLoop, this);
74 fusion_thread_.detach();
75}
76
77/**
78 * @brief Mark a camera as ready with new data
79 *
80 * This method is called when a camera has new data available.
81 * It updates the camera ready mask and may trigger fusion processing.
82 *
83 * The method performs the following operations:
84 * - Validates the camera ID to prevent buffer overflow
85 * - Sets the corresponding bit in the camera ready mask
86 * - Notifies the fusion thread that data is available
87 *
88 * The camera ready mask uses a bit field where each bit represents
89 * whether a specific camera has new data available.
90 *
91 * @param cam_id Camera identifier (0-based)
92 */
94{
95 // Bounds check to avoid overflow in the ready mask
96 if (cam_id >= MAX_CAMERA_COUNT)
97 {
98 std::cerr << "Camera ID out of range: " << cam_id
99 << " (max: " << MAX_CAMERA_COUNT - 1 << ")" << std::endl;
100 return;
101 }
102
103 camera_ready_mask_.fetch_or(1u << cam_id);
104
105 // NOW NOTIFYING THE FUSION THREAD
106 cv_.notify_one();
107}
108
109/**
110 * @brief Get sorted batch of camera data
111 *
112 * Retrieves a batch of synchronized camera data that is sorted
113 * by timestamp and filtered by the specified cutoff time.
114 *
115 * The method performs the following operations:
116 * - Locks the fusion mutex for thread-safe access
117 * - Checks if fused frames are available
118 * - Finds frames with timestamps greater than the cutoff
119 * - Moves qualifying frames to the output vector
120 * - Removes processed frames from the internal queue
121 *
122 * @param timestamp_cutoff Timestamp cutoff for data inclusion
123 * @param out Output vector to store the sorted camera data
124 * @return true if data was retrieved, false if no data available
125 */
126bool CameraQueueFusion::getSortedBatch(double timestamp_cutoff, std::vector<ov_core::CameraData> &out)
127{
128 std::lock_guard<std::mutex> lock(fusion_mutex_);
129
130 if (fused_frames_.empty())
131 {
132 return false;
133 }
134
135 // Find the first frame that has a timestamp greater than the cutoff
136 auto it = std::find_if(fused_frames_.begin(), fused_frames_.end(),
137 [timestamp_cutoff](const auto &frame)
138 {
139 return frame.timestamp > timestamp_cutoff;
140 });
141
142 // If all frames are before the cutoff, return false
143 if (it == fused_frames_.begin())
144 {
145 return false;
146 }
147
148 // Move all frames before the cutoff to the output vector
149 out.insert(out.end(),
150 std::make_move_iterator(fused_frames_.begin()),
151 std::make_move_iterator(it));
152
153 // Remove the moved frames from the queue
154 fused_frames_.erase(fused_frames_.begin(), it);
155
156 return !out.empty();
157}
158
159/**
160 * @brief Main fusion loop
161 *
162 * Background thread function that continuously processes camera data
163 * and performs temporal synchronization and fusion.
164 *
165 * The fusion loop performs the following operations:
166 * - Waits for CameraManager initialization
167 * - Waits for all cameras to have data available (with timeout)
168 * - Collects data from all cameras
169 * - Sorts data by timestamp
170 * - Merges data with identical timestamps
171 * - Stores fused data in the internal queue
172 * - Resets the camera ready mask
173 *
174 * The loop runs at approximately 50Hz (20ms period) and uses
175 * condition variables for efficient waiting and notification.
176 */
177void CameraQueueFusion::fusionLoop()
178{
179 using namespace std::chrono;
180 const auto timeout = 1us;
181 const auto init_check_interval = 50us;
182 const auto kMinLoopPeriod = std::chrono::duration<double, std::milli>(fusion_rate_dt_ms); // try to run at 50Hz could be 34ms for 30Hz
183
184 do
185 {
186 // First check if CameraManager is initialized
187 if (!voxl::CameraManager::getInstance().isInitialized())
188 {
189 if (en_debug)
190 {
191 std::cerr << "Camera fusion waiting for CameraManager initialization..." << std::endl;
192 }
193 std::this_thread::sleep_for(init_check_interval);
194 continue;
195 }
196 // auto start = steady_clock::now(); // Unused variable - commented out
197
198 // while ((camera_ready_mask_.load() & expected_mask_) != expected_mask_) {
199 // if (steady_clock::now() - start >= timeout) break;
200 // std::this_thread::yield();
201 // }
202
203 // WAIT UNTIL ALL CAMERAS HAVE PRODUCED DATA OR WE HIT THE TIMEOUT RATE BASED ON THE LOOP PERIOD
204 std::unique_lock<std::mutex> lk(cv_mtx_);
205 bool all_cameras_ready = cv_.wait_for(lk, kMinLoopPeriod, [this]
206 { return ((camera_ready_mask_.load() & expected_mask_) == expected_mask_) || !main_running; });
207 lk.unlock();
208
209 // If duration has elapsed, run the thread regardless of expected_mask
210 // The wait_for returns false if timeout expired, true if predicate became true
211 if (!all_cameras_ready && main_running)
212 {
213 // Timeout occurred - process whatever cameras are ready
214 if (en_debug)
215 {
216 std::cerr << "Camera fusion timeout - processing available cameras (ready mask: 0x"
217 << std::hex << camera_ready_mask_.load()
218 << ", expected: 0x" << expected_mask_ << std::dec << ")" << std::endl;
219 }
220 }
221
222 std::vector<ov_core::CameraData> batch;
223
224 for (const auto &cam : voxl::CameraManager::getInstance().getAllCameras())
225 {
226 ov_core::CameraData msg;
227 while (cam->popCameraData(msg))
228 {
229 batch.push_back(std::move(msg));
230 }
231 }
232
233 if (!batch.empty())
234 {
235 // Sort by timestamp first --> could use OPENVINS operator< instead
236 std::sort(batch.begin(), batch.end(),
237 [](const auto &a, const auto &b)
238 {
239 return a.timestamp < b.timestamp;
240 // return a < b; // uses CameraData::operator<
241 });
242
243 // Merge entries that share the same timestamp
244 std::vector<ov_core::CameraData> merged;
245 merged.reserve(batch.size());
246
247 for (auto &msg : batch)
248 {
249 if (!merged.empty() && msg.timestamp == merged.back().timestamp)
250 {
251 // Append sensor ids, images, and masks
252 merged.back().sensor_ids.insert(merged.back().sensor_ids.end(),
253 msg.sensor_ids.begin(), msg.sensor_ids.end());
254 merged.back().images.insert(merged.back().images.end(),
255 msg.images.begin(), msg.images.end());
256 merged.back().masks.insert(merged.back().masks.end(),
257 msg.masks.begin(), msg.masks.end());
258 merged.back().img_frames.insert(merged.back().img_frames.end(),
259 msg.img_frames.begin(), msg.img_frames.end());
260 }
261 else
262 {
263 merged.push_back(std::move(msg));
264 }
265 }
266
267 std::lock_guard<std::mutex> lock(fusion_mutex_);
268 fused_frames_.insert(fused_frames_.end(),
269 std::make_move_iterator(merged.begin()),
270 std::make_move_iterator(merged.end()));
271 }
272
273 camera_ready_mask_.store(0, std::memory_order_release);
274 } while (main_running);
275}
Camera management system for VOXL OpenVINS.
Camera queue fusion system for VOXL OpenVINS.
constexpr size_t MAX_CAMERA_COUNT
Maximum number of cameras supported.
float fusion_rate_dt_ms
Fusion rate in milliseconds.
Definition VoxlVars.cpp:217
volatile int main_running
Main process running flag.
Definition VoxlVars.cpp:38
int en_debug
Enable debug output.
Definition VoxlVars.cpp:148
Camera queue fusion system for multi-camera synchronization.
static CameraQueueFusion & getInstance()
Get singleton instance.
void start(size_t num_cams)
Start the fusion system.
void markCameraReady(size_t cam_id)
Mark a camera as ready with new data.
bool getSortedBatch(double timestamp_cutoff, std::vector< ov_core::CameraData > &out)
Get sorted batch of camera data.
static CameraManager & getInstance()
Get the singleton instance of the CameraManager.
Main namespace for VOXL OpenVINS server components.