Source code for pynta.model.experiment.publisher

# -*- coding: utf-8 -*-
"""
Publisher
=========

Publishers are responsible for broadcasting the message over the ZMQ PUB/SUB architecture. The publisher runs
continuously on a separated process and grabs elements from a queue, which in turn are sent through a socket to any
other processes listening. 

.. TODO:: In the current implementation, data is serialized for being added to a Queue, then deserialized by the
    publisher and serialized again to be sent. These three steps could be simplify into one if, for example, one assumes
    that objects where pickled. There is also a possibility of assuming numpy arrays and using a zero-copy strategy.

:copyright:  Aquiles Carattino <aquiles@uetke.com>
:license: GPLv3, see LICENSE for more details
"""

import logging
from multiprocessing import Queue, Event, Process
from time import sleep
import zmq

from pynta import general_stop_event
from pynta.model.experiment import config
from pynta.util.log import get_logger


[docs]class Publisher: """ Publisher class in which the queue for publishing messages is defined and also a separated process is started. It is important to have a new process, since the serialization/deserialization of messages from the QUEUE may be a bottleneck for performance. """ def __init__(self, port=None): self.logger = get_logger(name=__name__) if not port: self._port = config.zmq_port else: self._port = port self._queue = Queue() # The publisher will grab and broadcast the messages from this queue self._event = Event() # This event is used to stop the process self._process = None self.logger.info('Initialized publisher on port {}'.format(self._port))
[docs] def start(self): """ Start a new process that will be responsible for broadcasting the messages. .. TODO:: Find a way to start the publisher on a different port if the one specified is in use. """ self._event.clear() try: self._process = Process(target=publisher, args=[self._queue, self._event, self._port]) self._process.start() return True except zmq.ZMQError: self._port += 1 config.zmq_port = self._port return self.start()
# sleep(1) # This forces the start to block until the publisher is ready
[docs] def stop(self): self._event.set() self.empty_queue()
[docs] def empty_queue(self): """ If the publisher stops before broadcasting all the messages, the Queue may still be using some memory. This method is simply getting all the elements in order to free memory. Can be useful for garbage collection or better control of the downstream program. """ self.logger.info('Emptying the queue of the publisher') self.logger.debug('Queue length: {}'.format(self._queue.qsize())) self._queue.close()
[docs] def publish(self, topic, data): """ Adapts the data to make it faster to broadcast :param str topic: Topic in which to publish the data :param data: Data to be published :return: None """ self.logger.debug('Adding data of type {} to topic {}'.format(type(data), topic)) try: self._queue.put({'topic': topic, 'data': data}) except AssertionError: # This means the queue has been closed already pass
@property def port(self): return self._port @port.setter def port(self, new_port): if new_port != self._port: self._port = new_port self.logger.warning('Changing the port requires restarting the publisher process') self.logger.debug('Setting the new publisher port to {}'.format(new_port)) self.stop() self._process.join() self._process = Process(target=publisher, args=[self._queue, self._event, self._port]) self.start() else: self.logger.warning('New port {} is the same as the old port'.format(new_port))
[docs] def join(self, timeout=0): if self._process.is_alive(): self.logger.debug('Waiting for Publisher process to finish') self._process.join(timeout)
[docs]def publisher(queue, event, port): """ Simple method that starts a publisher on the port 5555. :param multiprocessing.Queue queue: Queue of messages to be broadcasted :param multiprocessing.Event event: Event to stop the publisher :param int port: port in which to broadcast data .. TODO:: The publisher's port should be determined in a configuration file. .. deprecated:: 0.1.0 """ logger = get_logger(name=__name__) port_pub = port logger.debug(f'Binding publisher on port {port}') context = zmq.Context() socket = context.socket(zmq.PUB) print(port_pub) socket.bind("tcp://*:%s" % port_pub) sleep(1) # It takes a time for subscribers to propagate to the publisher. # Without this sleep the first packages may be lost logger.info('Bound socket on {}'.format(port_pub)) while not event.is_set(): while not queue.empty(): data = queue.get() # Should be a dictionary {'topic': topic, 'data': data} logger.debug('Sending {} on {}'.format(data['data'], data['topic'])) socket.send_string(data['topic'], zmq.SNDMORE) socket.send_pyobj(data['data']) if general_stop_event.is_set(): break sleep(0.05) # Sleeps 5 milliseconds to be polite with the CPU sleep(1) # Gives enough time to the subscribers to update their status socket.close() logger.info('Stopped the publisher')
if __name__ == "__main__": logger = get_logger(name=__name__) # 'nanoparticle_tracking.model.experiment.nanoparticle_tracking.saver' logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) p = Publisher() p.start() p.publish('Testing', [1, 2, 3, 4]) sleep(1) p.stop() p.join()