Source code for seq.pulseq_reader

"""
@author: José Miguel Algarín Guisado
@date: 2024 May 31
mrilab @ i3M, CSIC, Spain
"""

import os
import sys

#*****************************************************************************
# Get the directory of the current script
main_directory = os.path.dirname(os.path.realpath(__file__))
parent_directory = os.path.dirname(main_directory)
parent_directory = os.path.dirname(parent_directory)

# Define the subdirectories you want to add to sys.path
subdirs = ['MaRGE', 'marcos_client']

# Add the subdirectories to sys.path
for subdir in subdirs:
    full_path = os.path.join(parent_directory, subdir)
    sys.path.append(full_path)
#******************************************************************************
import numpy as np
import seq.mriBlankSeq as blankSeq  # Import the mriBlankSequence for any new sequence.
import configs.units as units
import scipy.signal as sig
import experiment as ex
import configs.hw_config as hw
from marga_pulseq.interpreter import PSInterpreter


[docs] class PulseqReader(blankSeq.MRIBLANKSEQ): def __init__(self): super(PulseqReader, self).__init__() # Input the parameters self.files = None self.output = None self.nScans = None self.shimming = None self.expt = None self.larmorFreq = None self.addParameter(key='seqName', string='PulseqReader', val='PulseqReader') self.addParameter(key='nScans', string='Number of scans', val=1, field='IM') self.addParameter(key='larmorFreq', string='Larmor frequency (MHz)', val=3.066, units=units.MHz, field='IM') self.addParameter(key='shimming', string='Shimming', val=[0, 0, 0], field='IM', units=units.sh) self.addParameter(key='files', string='Files', val="entertainer1.seq", field='IM', tip='List .seq files')
[docs] def sequenceInfo(self): print("Pulseq Reader") print("Author: PhD. J.M. Algarín") print("Contact: josalggui@i3m.upv.es") print("mriLab @ i3M, CSIC, Spain") print("Run a list of .seq files\n")
[docs] def sequenceTime(self): return 0 # minutes, scanTime
[docs] def sequenceAtributes(self): super().sequenceAtributes() # Convert files to a list self.files = self.files.strip('[]').split(',') self.files = [s.strip() for s in self.files]
[docs] def sequenceRun(self, plotSeq=0, demo=False, standalone=False): init_gpa = False self.demo = demo # Step 1: Define the interpreter for FloSeq/PSInterpreter. # The interpreter is responsible for converting the high-level pulse sequence description into low-level # instructions for the scanner hardware. You will typically update the interpreter during scanner calibration. self.flo_interpreter = PSInterpreter( tx_warmup=hw.blkTime, # Transmit chain warm-up time (us) rf_center=hw.larmorFreq * 1e6, # Larmor frequency (Hz) rf_amp_max=hw.b1Efficiency / (2 * np.pi) * 1e6, # Maximum RF amplitude (Hz) gx_max=hw.gFactor[0] * hw.gammaB, # Maximum gradient amplitude for X (Hz/m) gy_max=hw.gFactor[1] * hw.gammaB, # Maximum gradient amplitude for Y (Hz/m) gz_max=hw.gFactor[2] * hw.gammaB, # Maximum gradient amplitude for Z (Hz/m) grad_max=np.max(hw.gFactor) * hw.gammaB, # Maximum gradient amplitude (Hz/m) grad_t=100, # Gradient raster time (us) ) # Function to get the dwell time def get_seq_info(file_path): dwell_time = None with open(file_path, 'r') as file: lines = file.readlines() adc_section = False for line in lines: if line.strip() == '[ADC]': adc_section = True continue if adc_section: # If we reach another section, stop processing ADC section if line.startswith('\n'): break # Split the line into components components = line.split() # Check if the line contains the ADC event data if len(components) >= 4: n_readouts = int(components[1]) # Extract the number of acquired points per Rx window dwell_time = int(components[2]) # Extract the dwell time (3rd component) return n_readouts, dwell_time data_over = [] # To save oversampled data for file in self.files: print("Running " + file + "...") # Get the dwell time and n_readouts n_readouts, dwell = get_seq_info(file) # dwell is in ns # Create experiment if not self.demo: self.expt = ex.Experiment(lo_freq=self.larmorFreq * 1e-6, # MHz rx_t=dwell * 1e-3, # us init_gpa=init_gpa, gpa_fhdo_offset_time=(1 / 0.2 / 3.1), ) dwell = self.expt.get_rx_ts()[0] bw = 1/dwell * 1e9 # Hz self.mapVals['samplingPeriod'] = dwell * 1e-9 # s self.mapVals['bw'] = bw # Hz # Run the interpreter to get the waveforms waveforms, param_dict = self.flo_interpreter.interpret(file) # Get number of Rx windows n_rx_windows = int(np.sum(waveforms['rx0_en'][1][:])) # Convert waveform to mriBlankSeq tools (just do it) self.pypulseq2mriblankseq(waveforms=waveforms, shimming=self.shimming) if not self.demo: if self.floDict2Exp(): print("Sequence waveforms loaded successfully") pass else: print("ERROR: sequence waveforms out of hardware bounds") return False # Run the experiment if not plotSeq: for scan in range(self.nScans): print("Scan %i running..." % (scan + 1)) if not self.demo: rxd, msgs = self.expt.run() rxd['rx0'] = hw.adcFactor * (np.real(rxd['rx0']) - 1j * np.imag(rxd['rx0'])) else: rxd = {'rx0': np.random.randn(n_readouts * n_rx_windows) + 1j * np.random.randn(n_readouts * n_rx_windows)} data_over = np.concatenate((data_over, rxd['rx0']), axis=0) print("Acquired points = %i" % np.size([rxd['rx0']])) print("Expected points = %i" % n_readouts * n_rx_windows) print("Scan %i ready!" % (scan + 1)) self.mapVals['data_over'] = data_over elif plotSeq and standalone: self.sequencePlot(standalone=standalone) return True # Close the experiment if not self.demo: self.expt.__del__() return True
[docs] def sequenceAnalysis(self, mode=None): # Decimate the data data_over = self.mapVals['data_over'] data = sig.decimate(data_over, hw.oversamplingFactor, ftype='fir', zero_phase=True) self.mapVals['data'] = data # create self.out to run in iterative mode self.output = [] # save data once self.output is created self.saveRawData() return self.output
if __name__ == '__main__': seq = PulseqReader() seq.sequenceAtributes() seq.sequenceRun(plotSeq=False, demo=True, standalone=True) seq.sequenceAnalysis(mode='Standalone')