从话题列表到实际应用:手把手教你用MAVROS读取Pixhawk的IMU和GPS数据(树莓派4B实战)
2026/5/4 22:41:08 网站建设 项目流程

从话题列表到实际应用:手把手教你用MAVROS读取Pixhawk的IMU和GPS数据(树莓派4B实战)

当树莓派4B与Pixhawk通过MAVROS建立通信后,开发者常面临一个现实问题:如何将飞控传感器产生的海量ROS话题数据转化为可操作的开发资源?本文将以/mavros/imu/data/mavros/global_position/global两个核心话题为例,演示从数据订阅到可视化分析的全流程实战。

1. MAVROS话题解析基础

在ROS生态中,MAVROS将Pixhawk的传感器数据转换为标准ROS消息类型。理解消息结构是二次开发的前提:

# IMU消息结构示例 (sensor_msgs/Imu) Header header # 时间戳和坐标系 geometry_msgs/Quaternion orientation # 四元数姿态 float64[9] orientation_covariance # 姿态协方差 geometry_msgs/Vector3 angular_velocity # 角速度(rad/s) float64[9] angular_velocity_covariance geometry_msgs/Vector3 linear_acceleration # 加速度(m/s²) float64[9] linear_acceleration_covariance # GPS消息结构示例 (sensor_msgs/NavSatFix) Header header float64 latitude # 纬度(度) float64 longitude # 经度(度) float64 altitude # 海拔高度(m) uint8 status # 定位状态 float64[9] position_covariance # 位置协方差

注意:实际开发中建议使用rostopic type [话题名]查看完整消息定义,不同固件版本可能存在字段差异。

2. Python订阅与数据处理实战

2.1 搭建基础订阅节点

以下代码演示如何同时订阅IMU和GPS数据:

#!/usr/bin/env python3 import rospy from sensor_msgs.msg import Imu, NavSatFix class SensorSubscriber: def __init__(self): rospy.init_node('mavros_sensor_subscriber', anonymous=True) # 创建数据缓存字典 self.sensor_data = { 'imu': None, 'gps': None } # 订阅IMU数据 rospy.Subscriber( "/mavros/imu/data", Imu, self.imu_callback, queue_size=10 ) # 订阅GPS数据 rospy.Subscriber( "/mavros/global_position/global", NavSatFix, self.gps_callback, queue_size=10 ) def imu_callback(self, msg): """处理IMU数据回调""" self.sensor_data['imu'] = { 'timestamp': msg.header.stamp.to_sec(), 'accel_x': msg.linear_acceleration.x, 'accel_y': msg.linear_acceleration.y, 'accel_z': msg.linear_acceleration.z, 'gyro_x': msg.angular_velocity.x, 'gyro_y': msg.angular_velocity.y, 'gyro_z': msg.angular_velocity.z } def gps_callback(self, msg): """处理GPS数据回调""" self.sensor_data['gps'] = { 'timestamp': msg.header.stamp.to_sec(), 'latitude': msg.latitude, 'longitude': msg.longitude, 'altitude': msg.altitude, 'status': msg.status.status } if __name__ == '__main__': node = SensorSubscriber() rospy.spin()

2.2 数据同步技巧

由于IMU和GPS的发布频率不同(通常IMU 50Hz,GPS 5-10Hz),需要特殊处理时间同步:

from collections import deque import numpy as np class DataSynchronizer: def __init__(self, max_gap=0.1): self.imu_queue = deque(maxlen=100) self.gps_queue = deque(maxlen=10) self.max_time_gap = max_gap # 最大允许时间差(秒) def add_imu(self, data): self.imu_queue.append(data) def add_gps(self, data): self.gps_queue.append(data) def get_synced_data(self): """获取时间对齐的传感器数据""" if not self.imu_queue or not self.gps_queue: return None # 获取最新GPS数据 latest_gps = self.gps_queue[-1] # 在IMU队列中寻找时间最接近的样本 time_diffs = [ abs(imu['timestamp'] - latest_gps['timestamp']) for imu in self.imu_queue ] min_idx = np.argmin(time_diffs) if time_diffs[min_idx] <= self.max_time_gap: return { 'gps': latest_gps, 'imu': self.imu_queue[min_idx], 'time_diff': time_diffs[min_idx] } return None

3. 数据可视化方案

3.1 实时曲线绘制

使用PyQtGraph实现高性能实时可视化:

import pyqtgraph as pg from pyqtgraph.Qt import QtGui class RealTimePlot: def __init__(self): self.app = QtGui.QApplication([]) self.win = pg.GraphicsLayoutWidget(title="Sensor Data") # 创建加速度子图 self.accel_plot = self.win.addPlot(title="Acceleration") self.accel_plot.addLegend() self.accel_curves = { 'x': self.accel_plot.plot(pen='r', name='X'), 'y': self.accel_plot.plot(pen='g', name='Y'), 'z': self.accel_plot.plot(pen='b', name='Z') } # 创建角速度子图 self.win.nextRow() self.gyro_plot = self.win.addPlot(title="Angular Velocity") self.gyro_plot.addLegend() self.gyro_curves = { 'x': self.gyro_plot.plot(pen='r', name='X'), 'y': self.gyro_plot.plot(pen='g', name='Y'), 'z': self.gyro_plot.plot(pen='b', name='Z') } # 数据缓冲区 self.time_data = [] self.accel_data = {'x': [], 'y': [], 'z': []} self.gyro_data = {'x': [], 'y': [], 'z': []} self.max_points = 200 def update(self, sensor_data): """更新绘图数据""" timestamp = sensor_data['imu']['timestamp'] # 维护固定长度的数据队列 if len(self.time_data) >= self.max_points: self.time_data.pop(0) for axis in ['x', 'y', 'z']: self.accel_data[axis].pop(0) self.gyro_data[axis].pop(0) self.time_data.append(timestamp) for axis in ['x', 'y', 'z']: self.accel_data[axis].append( sensor_data['imu'][f'accel_{axis}'] ) self.gyro_data[axis].append( sensor_data['imu'][f'gyro_{axis}'] ) # 更新曲线 for axis, curve in self.accel_curves.items(): curve.setData(self.time_data, self.accel_data[axis]) for axis, curve in self.gyro_curves.items(): curve.setData(self.time_data, self.gyro_data[axis]) def run(self): self.win.show() QtGui.QApplication.instance().exec_()

3.2 地理信息可视化

使用Folium库创建交互式地图:

import folium from folium.plugins import TimestampedGeoJson def create_gps_trajectory(gps_records): """生成带时间戳的轨迹地图""" features = [] for record in gps_records: features.append({ 'type': 'Feature', 'geometry': { 'type': 'Point', 'coordinates': [record['longitude'], record['latitude']] }, 'properties': { 'time': record['timestamp'], 'altitude': record['altitude'], 'style': {'color': 'red'}, 'icon': 'circle', 'iconstyle': { 'fillColor': 'red', 'fillOpacity': 0.8, 'stroke': 'false', 'radius': 5 } } }) m = folium.Map( location=[gps_records[0]['latitude'], gps_records[0]['longitude']], zoom_start=15 ) TimestampedGeoJson( {'type': 'FeatureCollection', 'features': features}, period='PT1M', add_last_point=True, auto_play=False, loop=False, max_speed=1, loop_button=True, date_options='YYYY/MM/DD HH:mm:ss', time_slider_drag_update=True ).add_to(m) return m

4. 数据持久化方案

4.1 ROS Bag高效存储

# 录制指定话题 rosbag record -O sensor_data.bag /mavros/imu/data /mavros/global_position/global # Python代码控制录制 import rosbag with rosbag.Bag('sensor_data.bag', 'w') as bag: def imu_callback(msg): bag.write('/mavros/imu/data', msg) def gps_callback(msg): bag.write('/mavros/global_position/global', msg) rospy.Subscriber("/mavros/imu/data", Imu, imu_callback) rospy.Subscriber("/mavros/global_position/global", NavSatFix, gps_callback) rospy.spin()

4.2 CSV格式存储

import csv from datetime import datetime class CSVSaver: def __init__(self, filename_prefix='sensor_data'): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.imu_file = f"{filename_prefix}_imu_{timestamp}.csv" self.gps_file = f"{filename_prefix}_gps_{timestamp}.csv" # 初始化IMU CSV文件 with open(self.imu_file, 'w') as f: writer = csv.writer(f) writer.writerow([ 'timestamp', 'accel_x', 'accel_y', 'accel_z', 'gyro_x', 'gyro_y', 'gyro_z' ]) # 初始化GPS CSV文件 with open(self.gps_file, 'w') as f: writer = csv.writer(f) writer.writerow([ 'timestamp', 'latitude', 'longitude', 'altitude', 'status' ]) def save_imu(self, data): with open(self.imu_file, 'a') as f: writer = csv.writer(f) writer.writerow([ data['timestamp'], data['accel_x'], data['accel_y'], data['accel_z'], data['gyro_x'], data['gyro_y'], data['gyro_z'] ]) def save_gps(self, data): with open(self.gps_file, 'a') as f: writer = csv.writer(f) writer.writerow([ data['timestamp'], data['latitude'], data['longitude'], data['altitude'], data['status'] ])

5. 典型应用场景实现

5.1 异常运动检测

class MotionAnalyzer: def __init__(self, accel_threshold=15.0, gyro_threshold=5.0): self.accel_threshold = accel_threshold # m/s² self.gyro_threshold = gyro_threshold # rad/s self.alert_count = 0 def check_abnormal(self, imu_data): # 计算加速度幅值 accel_magnitude = np.sqrt( imu_data['accel_x']**2 + imu_data['accel_y']**2 + imu_data['accel_z']**2 ) # 计算角速度幅值 gyro_magnitude = np.sqrt( imu_data['gyro_x']**2 + imu_data['gyro_y']**2 + imu_data['gyro_z']**2 ) if (accel_magnitude > self.accel_threshold or gyro_magnitude > self.gyro_threshold): self.alert_count += 1 return True return False def generate_report(self): severity = "Warning" if self.alert_count < 5 else "Critical" return { "alert_count": self.alert_count, "severity": severity, "last_check": time.time() }

5.2 轨迹回放系统

import pygame from pygame.locals import * class TrajectoryPlayer: def __init__(self, gps_records, map_image=None): pygame.init() self.screen = pygame.display.set_mode((800, 600)) self.clock = pygame.time.Clock() self.gps_records = sorted(gps_records, key=lambda x: x['timestamp']) self.current_index = 0 self.playing = False self.font = pygame.font.SysFont('Arial', 20) # 坐标归一化 lats = [r['latitude'] for r in self.gps_records] lons = [r['longitude'] for r in self.gps_records] self.min_lat, self.max_lat = min(lats), max(lats) self.min_lon, self.max_lon = min(lons), max(lons) def latlon_to_screen(self, lat, lon): """将经纬度坐标转换为屏幕坐标""" x = int(800 * (lon - self.min_lon) / (self.max_lon - self.min_lon)) y = int(600 * (1 - (lat - self.min_lat) / (self.max_lat - self.min_lat))) return x, y def run(self): running = True while running: for event in pygame.event.get(): if event.type == QUIT: running = False elif event.type == KEYDOWN: if event.key == K_SPACE: self.playing = not self.playing self.screen.fill((255, 255, 255)) # 绘制轨迹线 points = [ self.latlon_to_screen(r['latitude'], r['longitude']) for r in self.gps_records[:self.current_index+1] ] if len(points) > 1: pygame.draw.lines(self.screen, (0, 0, 255), False, points, 2) # 绘制当前位置 if self.current_index < len(self.gps_records): current_pos = self.latlon_to_screen( self.gps_records[self.current_index]['latitude'], self.gps_records[self.current_index]['longitude'] ) pygame.draw.circle(self.screen, (255, 0, 0), current_pos, 5) # 显示信息 info = self.font.render( f"Time: {self.gps_records[self.current_index]['timestamp']} | " f"Alt: {self.gps_records[self.current_index]['altitude']:.1f}m", True, (0, 0, 0) ) self.screen.blit(info, (10, 10)) if self.playing and self.current_index < len(self.gps_records)-1: self.current_index += 1 pygame.display.flip() self.clock.tick(30) pygame.quit()

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询