import midas import midas.frontend import midas.event import numpy as np import random import time class DataSimulatorEquipment(midas.frontend.EquipmentBase): def __init__(self, client, frontend): equip_name = "Python Data Simulator" default_common = midas.frontend.InitialEquipmentCommon() default_common.equip_type = midas.EQ_POLLED default_common.buffer_name = "SYSTEM" default_common.trigger_mask = 0 default_common.event_id = 2 default_common.period_ms = 100 default_common.read_when = midas.RO_RUNNING default_common.log_history = 1 midas.frontend.EquipmentBase.__init__(self, client, equip_name, default_common) print("Initialization complete") self.set_status("Initialized") self.frontend = frontend def readout_func(self): event = midas.event.Event() # Create a bank for zero buffer event.create_bank("CR00", midas.TID_SHORT, self.frontend.zero_buffer) # Simulate the addition of `data` in the periodic event ''' data_block = [] data_block.extend(self.frontend.data) # Append the simulated data to the event event.create_bank("CR00", midas.TID_SHORT, data_block) ''' return event def poll_func(self): current_time = time.time() if current_time - self.frontend.last_poll_time >= self.frontend.poll_time: self.frontend.last_poll_time = current_time self.frontend.poll_count += 1 self.frontend.poll_timestamps.append(current_time) return True # Indicate that an event is available return False # No event available yet class DataSimulatorFrontend(midas.frontend.FrontendBase): def __init__(self): midas.frontend.FrontendBase.__init__(self, "DataSimulator-Python") # Data and zero buffer initialization self.data = [] self.zero_buffer = [] self.generator = random.Random() self.total_data_size = 1250000 self.load_data_from_file("fake_data.txt") self.init_zero_buffer() # Polling variables self.poll_time = 0.001 # Poll time in seconds self.last_poll_time = time.time() self.poll_count = 0 self.poll_timestamps = [] self.add_equipment(DataSimulatorEquipment(self.client, self)) def load_data_from_file(self, filename): try: with open(filename, 'r') as file: for line in file: values = [int(value) for value in line.strip().split(',')] self.data.extend(values) print(f"Loaded data from {filename}: {self.data[:10]}...") # Display the first few values for verification except IOError as e: print(f"Error opening file: {e}") def init_zero_buffer(self): self.zero_buffer = [0] * self.total_data_size print(f"Initialized zero buffer with {self.total_data_size } zeros.") def begin_of_run(self, run_number): self.set_all_equipment_status("Running", "greenLight") self.client.msg(f"Frontend has started run number {run_number}") return midas.status_codes["SUCCESS"] def end_of_run(self, run_number): self.set_all_equipment_status("Finished", "greenLight") self.client.msg(f"Frontend has ended run number {run_number}") # Print poll function statistics at the end of the run self.print_poll_stats() return midas.status_codes["SUCCESS"] def frontend_exit(self): print("Frontend is exiting.") def print_poll_stats(self): if len(self.poll_timestamps) > 1: intervals = [self.poll_timestamps[i] - self.poll_timestamps[i-1] for i in range(1, len(self.poll_timestamps))] avg_interval = sum(intervals) / len(intervals) print(f"Poll function was called {self.poll_count} times.") print(f"Average interval between poll calls: {avg_interval:.6f} seconds") else: print(f"Poll function was called {self.poll_count} times. Not enough data for interval calculation.") if __name__ == "__main__": with DataSimulatorFrontend() as my_fe: my_fe.run()