Source code for concepts.hw_interface.robot_state_visualizer.visualizer_multiproc
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : visualizer.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 11/20/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
"""Robot state visualizer written using Dash and ZMQ for inter-process communication."""
import pickle
import threading
import time
import zmq
from concepts.hw_interface.robot_state_visualizer.visualizer import RobotStateVisualizer
[docs]
class RobotStateVisualizerMultiproc(RobotStateVisualizer):
    INIT_PORT = 5557
    SUB_PORT = 5556
[docs]
    def listener_thread(self):
        context = zmq.Context()
        socket = context.socket(zmq.SUB)
        socket.connect(f'tcp://localhost:{self.SUB_PORT}')
        socket.setsockopt_string(zmq.SUBSCRIBE, '')
        while True:
            try:
                message = socket.recv()
                message = pickle.loads(message)
                if message['type'] == 'update':
                    payload = message['data']
                    with self.mutex:
                        for key, data in payload.items():
                            timestamp, value = data
                            if isinstance(key, tuple):
                                tab, name = key
                                self.update_queue_with_mutex(name, timestamp, value, tab=tab)
                            else:
                                self.update_queue_with_mutex(key, timestamp, value)
                else:
                    print(f'Unknown message type: {message["type"]}')
            except Exception as e:
                print(f'Error: {e}')
                import traceback
                traceback.print_exc() 
[docs]
    def start(self):
        # Create a REP socket to receive initialization message.
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        print(f'Binding to port {self.INIT_PORT}')
        socket.bind(f'tcp://*:{self.INIT_PORT}')
        print(f'Visualizer server started at port {self.INIT_PORT}.')
        message = socket.recv()
        message = pickle.loads(message)
        self.reset(queues=message['data'])
        self.initialized = True
        socket.send(b'OK')
        socket.close()
        print('Visualizer initialized.')
        threading.Thread(target=self.listener_thread, daemon=True).start()
        super().start()
        while True:
            time.sleep(1) 
 
[docs]
class RobotStateVisualizerPublisher(object):
    PUB_PORT = 5556
[docs]
    def __init__(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.PUB)
        self.socket.bind(f'tcp://*:{self.PUB_PORT}')
        self.reset_message_sent = False 
[docs]
    def publish(self, message):
        if not self.reset_message_sent:
            raise ValueError('Reset message not sent before update message.')
        message = {'type': 'update', 'data': message}
        self.socket.send(pickle.dumps(message)) 
[docs]
    def reset(self, queues):
        if self.reset_message_sent:
            raise ValueError('Reset message already sent.')
        message = {'type': 'reset', 'data': queues}
        context = zmq.Context()
        socket = context.socket(zmq.REQ)
        socket.connect(f'tcp://localhost:{RobotStateVisualizerMultiproc.INIT_PORT}')
        socket.send(pickle.dumps(message))
        socket.recv()
        socket.close()
        self.reset_message_sent = True