Building Robot Communication Networks with SwarmTalk
The Foundation of Robot Teamwork
You’ve learned to plan paths (TrajGenPy) and coordinate tasks (TrajAllocPy), but there’s one crucial piece missing: communication. How do robots actually talk to each other in the field? Meet SwarmTalk - your solution for reliable, low-cost robot-to-robot communication using ESP32 modules and ESP-NOW protocol.
Why Communication Matters
Imagine your robot team in action:
- Robot A discovers an obstacle and needs to update the team
- Robot B finds the target and wants to redirect others
- Robot C has low battery and requests backup
Without communication, each robot operates in isolation. With SwarmTalk, they become a truly coordinated team.
What Makes SwarmTalk Special
Traditional Approaches vs. SwarmTalk
Traditional Radio Systems:
- Expensive ($500-2000 per unit)
- Complex licensing requirements
- Heavy and power-hungry
- Centralized architecture
SwarmTalk:
- Low-cost ESP32 modules ($5-15 each)
- Uses license-free 2.4GHz Wi-Fi spectrum
- Lightweight and efficient
- Fully decentralized (no base station needed)
Key Advantages
- 🔄 Decentralized: No single point of failure
- 📡 Ad-hoc: Robots join/leave dynamically
- 🤖 ROS2 Native: Seamless integration with robot software
- 💰 Affordable: Democratizes swarm robotics
- ⚡ High-speed: Up to 1490 bytes per message
- 🔧 Plug-and-play: Minimal configuration required
System Architecture Overview
SwarmTalk consists of three main components:
[ROS2 Node] ←→ [Serial/USB] ←→ [ESP32 + ESP-NOW] ~~~~ [Other Robots]
- ROS2 Driver Node: Handles message routing and topics
- Serial Communication: USB connection to ESP32
- ESP32 Firmware: Wireless communication using ESP-NOW
Hardware Setup
What You’ll Need
Per Robot:
- ESP32 module (C3, C6, or S3 variants supported)
- USB cable for connection to onboard computer
- Optional: External antenna for extended range
Recommended ESP32 Modules:
- ESP32-S3: Best performance, external antenna support
- ESP32-C6: Good balance of features and cost
- ESP32-C3: Most affordable option
Connection Diagram
Robot Computer (Raspberry Pi/Jetson)
|
USB Cable
|
ESP32 Module
|
ESP-NOW Radio ~~~~ Other ESP32 Modules
Software Installation
Step 1: Install Dependencies
# Install ROS2 Humble (if not already installed)
sudo apt update
sudo apt install ros-humble-desktop
# Install SwarmTalk
git clone https://github.com/kasperg3/swarmtalk.git
cd swarmtalk
# Install Python dependencies
pip install pyserial trajallocpy shapely matplotlib numpy geojson
Step 2: Build ROS2 Packages
# Source ROS2 environment
source /opt/ros/humble/setup.bash
# Build SwarmTalk packages
colcon build --symlink-install --packages-skip swarmtalk_firmware
source install/setup.bash
Step 3: Flash ESP32 Firmware
Option A: Web-based Installer (Easiest)
- Connect ESP32 via USB
- Open SwarmTalk web installer in Chrome/Edge
- Select your ESP32 model and flash
Option B: Command Line
cd firmware
idf.py -p /dev/ttyUSB0 build flash monitor
Basic Communication Setup
Configuration Files
Each robot needs its own configuration. Here’s the setup for a 3-robot team:
Robot 0 Config (config/config_agent0.yaml
):
driver:
ros__parameters:
port: '/dev/ttyACM0' # USB port for ESP32
planner:
ros__parameters:
n_agents: 3 # Total team size
agent_id: 0 # This robot's ID
capacity: 3000 # Task capacity
Robot 1 Config (config/config_agent1.yaml
):
driver:
ros__parameters:
port: '/dev/ttyACM1' # Different USB port
planner:
ros__parameters:
n_agents: 3
agent_id: 1 # Different ID
capacity: 3000
Launching the System
Terminal 1 - Robot 0:
# Launch communication driver
source install/setup.bash
ros2 run communication driver --ros-args --params-file config/config_agent0.yaml
Terminal 2 - Robot 0:
# Launch planning node
source install/setup.bash
ros2 run planning planner --ros-args --params-file config/config_agent0.yaml
Repeat for each robot with their respective config files.
Your First Communication Test
Send a Simple Message
import rclpy
from rclpy.node import Node
from common_interface.msg import ByteArray
class TestSender(Node):
def __init__(self):
super().__init__('test_sender')
self.publisher = self.create_publisher(
ByteArray,
'/communication/broadcast',
10
)
# Send test message every 2 seconds
self.timer = self.create_timer(2.0, self.send_message)
self.counter = 0
def send_message(self):
message_text = f"Hello from Robot! Count: {self.counter}"
message_bytes = message_text.encode('utf-8')
# Convert to ByteArray message format
msg = ByteArray()
msg.bytes = [bytes([b]) for b in message_bytes]
self.publisher.publish(msg)
self.get_logger().info(f'Sent: {message_text}')
self.counter += 1
# Run the sender
rclpy.init()
sender = TestSender()
rclpy.spin(sender)
Receive Messages
import rclpy
from rclpy.node import Node
from common_interface.msg import ByteArray
class TestReceiver(Node):
def __init__(self):
super().__init__('test_receiver')
self.subscription = self.create_subscription(
ByteArray,
'/communication/message',
self.message_callback,
10
)
def message_callback(self, msg):
# Convert ByteArray back to string
message_bytes = b''.join(msg.bytes)
message_text = message_bytes.decode('utf-8')
self.get_logger().info(f'Received: {message_text}')
# Run the receiver
rclpy.init()
receiver = TestReceiver()
rclpy.spin(receiver)
Real-World Communication Patterns
1. Heartbeat System
Keep track of team status:
class HeartbeatManager(Node):
def __init__(self, robot_id):
super().__init__(f'heartbeat_{robot_id}')
self.robot_id = robot_id
self.last_seen = {}
# Send heartbeat every second
self.create_timer(1.0, self.send_heartbeat)
# Check for lost robots every 5 seconds
self.create_timer(5.0, self.check_team_status)
def send_heartbeat(self):
message = f"HEARTBEAT:{self.robot_id}:{time.time()}"
self.broadcast_message(message.encode())
def check_team_status(self):
current_time = time.time()
for robot_id, last_time in self.last_seen.items():
if current_time - last_time > 10: # 10 second timeout
self.get_logger().warn(f'Lost contact with Robot {robot_id}')
2. Task Updates
Share mission progress:
class TaskCoordinator(Node):
def __init__(self, robot_id):
super().__init__(f'coordinator_{robot_id}')
self.robot_id = robot_id
def report_task_completion(self, task_id, success):
message = {
'type': 'TASK_COMPLETE',
'robot_id': self.robot_id,
'task_id': task_id,
'success': success,
'timestamp': time.time()
}
# Convert dictionary to JSON string then to bytes
json_message = json.dumps(message)
self.broadcast_message(json_message.encode())
def request_help(self, location, task_type):
message = {
'type': 'HELP_REQUEST',
'robot_id': self.robot_id,
'location': location,
'task_type': task_type,
'priority': 'HIGH'
}
json_message = json.dumps(message)
self.broadcast_message(json_message.encode())
3. Emergency Protocols
Handle critical situations:
class EmergencyManager(Node):
def __init__(self, robot_id):
super().__init__(f'emergency_{robot_id}')
self.robot_id = robot_id
def declare_emergency(self, emergency_type, details):
message = {
'type': 'EMERGENCY',
'robot_id': self.robot_id,
'emergency_type': emergency_type, # 'LOW_BATTERY', 'COLLISION', 'LOST'
'details': details,
'location': self.get_current_position(),
'timestamp': time.time()
}
# Send emergency message multiple times for reliability
json_message = json.dumps(message)
for _ in range(3):
self.broadcast_message(json_message.encode())
time.sleep(0.1)
Performance Optimization
Message Size Management
ESP-NOW has a 1490-byte limit per message:
def send_large_data(self, data):
"""Split large messages into chunks"""
max_chunk_size = 1400 # Leave room for headers
if len(data) <= max_chunk_size:
self.broadcast_message(data)
else:
# Split into chunks
chunks = [data[i:i+max_chunk_size]
for i in range(0, len(data), max_chunk_size)]
for i, chunk in enumerate(chunks):
header = f"CHUNK:{i}:{len(chunks)}:".encode()
self.broadcast_message(header + chunk)
Network Health Monitoring
class NetworkMonitor(Node):
def __init__(self):
super().__init__('network_monitor')
self.message_stats = {}
def log_message_received(self, sender_id):
if sender_id not in self.message_stats:
self.message_stats[sender_id] = {'count': 0, 'last_seen': 0}
self.message_stats[sender_id]['count'] += 1
self.message_stats[sender_id]['last_seen'] = time.time()
def get_network_health(self):
"""Return network statistics"""
current_time = time.time()
health_report = {}
for robot_id, stats in self.message_stats.items():
time_since_last = current_time - stats['last_seen']
health_report[robot_id] = {
'message_count': stats['count'],
'seconds_since_last': time_since_last,
'status': 'ONLINE' if time_since_last < 5 else 'OFFLINE'
}
return health_report
Docker Deployment
For easy multi-robot deployment:
# Launch entire 3-robot system
docker-compose up --build
# Scale to more robots
docker-compose up --scale communication0=1 --scale planning0=1 \
--scale communication1=1 --scale planning1=1 \
--scale communication2=1 --scale planning2=1
Range and Reliability
Expected Performance
- Range: ~200m line-of-sight
- Throughput: Several hundred messages/second
- Latency: <10ms typical
- Reliability: >95% in good conditions
Improving Range and Reliability
- External Antennas: Increase range to 500m+
- Mesh Networking: Robots relay messages for extended coverage
- Frequency Management: Avoid Wi-Fi congestion
- Error Handling: Implement acknowledgments and retries
Troubleshooting Common Issues
ESP32 Not Detected
# Check USB connection
ls /dev/ttyUSB* /dev/ttyACM*
# Add user to dialout group
sudo usermod -a -G dialout $USER
# (Requires logout/login)
No Messages Received
- Check ESP32 firmware is properly flashed
- Verify all robots on same ESP-NOW channel
- Ensure ESP32 modules are compatible (same variant)
Poor Performance
- Reduce message frequency
- Check for Wi-Fi interference
- Verify power supply to ESP32
Integration with Mission Planning
Combine SwarmTalk with TrajAllocPy for complete coordination:
class IntegratedMissionManager(Node):
def __init__(self, robot_id):
super().__init__(f'mission_manager_{robot_id}')
self.robot_id = robot_id
# Initialize communication
self.comm = SwarmTalkInterface(robot_id)
# Initialize task allocation
self.allocator = TaskAllocator(robot_id)
def run_coordinated_mission(self, tasks):
# 1. Share tasks with team
self.comm.broadcast_tasks(tasks)
# 2. Run CBBA allocation
allocation = self.allocator.solve(tasks)
# 3. Share results
self.comm.broadcast_allocation(allocation)
# 4. Execute assigned tasks
self.execute_mission(allocation[self.robot_id])
Next Steps
Now that you have robots communicating:
- Test range limits in your deployment environment
- Implement error handling for message loss
- Add encryption for sensitive missions
- Explore mesh networking for larger teams
- Learn about datasets for testing (our next topic: SARenv)
In our next post, we’ll dive into SARenv - a comprehensive dataset for testing your robot teams in realistic search and rescue scenarios.
Ready to connect your robot team? Start with two robots and a simple message exchange. Build complexity gradually as you gain confidence with the system!