Source code for pynta.tests.test_zmq

"""
    Test the limits of the pyZMQ library
    =====================================

    Example on how to test the bandwidth limits for pyZMQ. We create a Queue with a given number of frames and an empty
    queue. The first is fed to the publisher, while the second is used by the subscriber to append the data. In this way
    we can check whether all the frames arrived in order and the bandwidth limitation for ZMQ.

    If any attempts at improve the architecture of publisher/subscriber are tried, this test is going to be useful to
    determine whether there is a real improvement in performance.

    .. note:: Replacing send_pyobj by the nocopy option is faster, however it also requires to reshape the data, which
        adds an overhead and eventually reaches equivalent bandwidths.

    .. warning:: This script consumes a lot of memory because it allocates a queue with thousands of elements in it. If
        you have a more limited system, consider lowering the amount of elements or changing the data type.

"""
from time import time

import numpy as np
from multiprocessing import Process, Queue

from pynta.model.experiment.publisher import publisher
from pynta.model.experiment.subscriber import subscriber
from pynta.util import get_logger


[docs]def test_func(data, queue): logger = get_logger(name=__name__) logger.info('Putting data to queue') queue.put(data)
[docs]def main(num_data_frames = 1000, dtype=np.uint16): data = np.random.randint(0, high=2**8, size=(1000, 1000), dtype=dtype) recv_q = Queue() # Queue where data will be received send_q = Queue() for i in range(num_data_frames): send_q.put({'topic': 'test', 'data': data}) send_q.put({'topic': 'test', 'data': 'stop'}) send_q.put({'topic': '', 'stop_pub': 'stop_pub'}) sub1 = Process(target=subscriber, args=[test_func, 'test', recv_q]) sub1.start() t0 = time() pub = Process(target=publisher, args=[send_q]) pub.start() pub.join() t1 = time() - 2 - t0 # The publishers sleeps for 2 seconds, 1 at the beginning, 1 at the end bandwidth = num_data_frames * data.nbytes / t1 i = 0 while not recv_q.empty() or recv_q.qsize() > 0: d = recv_q.get() i += 1 return bandwidth, t1, i
if __name__ == '__main__': from argparse import ArgumentParser parser = ArgumentParser(description='Test the limits of ZMQ on this system') parser.add_argument("--data-frame", dest="number_frames", default=1000, help="Number of data frames to send") parser.add_argument("--data-type", dest="dtype", default="uint8", help="Data type to use, uint8, uint16, uint32") args = parser.parse_args() if args.dtype == 'uint8': dtype = np.uint8 elif args.dtype == 'uint16': dtype = np.uint16 elif args.dtype == 'uint32': dtype = np.uint32 else: raise ValueError('Dtype must be either uint8, 16 or 32') num_frames = int(args.number_frames) print('Starting the test. It may take a couple of seconds to complete...') bandwidth, t, rcv_num_frames = main(num_data_frames=num_frames, dtype=dtype) print('Recevied {} frames out of {} sent'.format(num_frames, rcv_num_frames)) print('Total execution time: {:3.1f}s'.format(t)) print('Estimated bandwidth: {:3.1f}MB/s'.format(bandwidth/1024/1024)) # # print('Total ellapsed time: {:2.1f}s'.format(t1-t0)) # # print('Bandwidth: {:4.1f}MB/s'.format(bandwidth/1024/1024)) # # # print('Total received points: {}/{}'.format(i, num_data_frames))