Source code for pysyncq.tests.benchmark


'''
Get a measure of the message transfer time from a source process to a reader
through a PySyncQ interface. Prints table in which the average, standard
deviation and error of the mean, minimum and maximum of the measured transfer
times are given in milliseconds, for a range of message body sizes. Runs with 8
processes connected to one PySyncQ object.
'''


#--- Import block ---#

# Standard library
import gc , time
import          random as rnd
import      statistics as stat
import multiprocessing as mp

# pysyncq
from pysyncq import pysyncq as pq


#--- Globals ---#

# Largest message body size, given as the power of two i.e. 2 ** maxsize
maxsize = 16

# The total number of processes to create. 1 is parent and sender. The rest are
# child processes that echo the incoming message.
nprocs = 8

# Number of times to send/receive a given message per timing measurement. Set to
# one thousand, making the measured time duration equal to the average number
# of milliseconds that the operation took.
transfers = 1_000

# Number of samples to measure for the average ± SEM. Each sample times a run of
# transfers message send/receive cycles.
samples = 30

# Raise this flag to True to print each sample. Otherwise, keep it False.
samflg = False


#--- Child function ---#

[docs]def cfun ( q , name ) : # Open queue and use given sender name q.open( name ) # Filter all messages of type 'echo' q.scrntype.add( 'echo' ) # We do not want automatic garbage collection to mess up timing gc.disable( ) # Read loop. Wait indefinitely for new messages. for ( _ , typ , msg ) in q( block = True , timer = None ) : # Kill signal, terminate program if typ == 'kill' : break # Force garbage collection if typ == 'gc' : gc.collect( ) ; continue # Echo the message back to the queue q.append( 'echo' , msg , block = True , timer = None ) # Release PySyncQ object q.close( )
#--- Message transfer timing function ---#
[docs]def transtime ( q , msg , N ) : # Stop auto garbage collection during time measurement gc.disable( ) # Measure start time tstart = time.time( ) # Transfers for i in range( N ) : # Send the message q.append( 'origin' , msg , block = True , timer = None ) # Get the echo q.pop( block = True , timer = None ) # Screen all others for _ in q : pass # Measure end time tend = time.time( ) # Trigger garbage collection in all processes q.append( 'gc' ) gc.collect( ) gc.enable( ) # Half the consumed time, subtracting out the echoed message dt = ( tend - tstart ) / 2.0 # Individual samples if samflg : print( f'Sample: {dt} seconds' ) # Return sampled time duration return dt
#--- MAIN ---# if __name__ == "__main__" : # Create new synchronisation queue. Request enough shared memory so that the # queue is unlikely to lack space for any write. We are chiefly interested # in transfer times under favourable conditions. q = pq.PySyncQ( name = 'transtime' , size = 10 * nprocs * 2 ** maxsize ) # Create child process objects. Each with a copy of the queue, and a unique # message sender name. P = [ mp.Process( target = cfun , args = ( q , f'child-{ i }' ) ) for i in range( nprocs - 1 ) ] # Start child process execution, so that parent process can ... for p in P : p.start( ) # ... connect to the queue. q.open( 'parent' ) # Report print( 'Created queue for benchmarking\n' , q , '\n' ) # Brief wait so that child processes can all initialise time.sleep( 0.1 ) # Parent process will ignore echoed messages from all but one child process. for i in range( 1 , nprocs - 1 ) : q.scrnsend.add( f'child-{ i }' ) # Burn-in timer function transtime( q , bytes( 10 ) , 100 ) # Table headers print( 'Msg bytes,Avg time (ms),St.Dev.,SEM,Min,Max' ) # Message sizes for p in range( 1 , maxsize + 1 ) : # Make message just once msg = bytes( 2 ** p ) # Take numerous samples of the unidirectional transfer time X = [ transtime( q , msg , transfers ) for i in range( samples ) ] # Compute statistics from samples. The mean and SEM transfer time, in ms avg = stat.mean( X ) std = stat.stdev( X ) sem = std / samples ** 0.5 xmn = min( X ) xmx = max( X ) # Show the result print( f'{2**p},{avg:.3},{std:.3},{sem:.3},{xmn:.3},{xmx:.3}' ) # Send kill signal q.append( 'kill' ) # Release queue resources q.close( ) # Clean up terminated child processes for p in P : p.join( )