Source code for seq.larmor_pypulseq

"""
@author: José Miguel Algarín Guisado
@modifield: T. Guallart Naval, february 28th 2022
MRILAB @ I3M
"""

import os
import sys

#*****************************************************************************
# Get the directory of the current script
main_directory = os.path.dirname(os.path.realpath(__file__))
parent_directory = os.path.dirname(main_directory)
parent_directory = os.path.dirname(parent_directory)

# Define the subdirectories you want to add to sys.path
subdirs = ['MaRGE', 'marcos_client']

# Add the subdirectories to sys.path
for subdir in subdirs:
    full_path = os.path.join(parent_directory, subdir)
    sys.path.append(full_path)
#******************************************************************************
import numpy as np
import seq.mriBlankSeq as blankSeq  # Import the mriBlankSequence for any new sequence.
import scipy.signal as sig
import configs.hw_config as hw
import configs.units as units
import experiment as ex
from marga_pulseq.interpreter import PSInterpreter
import pypulseq as pp


[docs] class LarmorPyPulseq(blankSeq.MRIBLANKSEQ): def __init__(self): super(LarmorPyPulseq, self).__init__() # Input the parameters self.dF = None self.bw = None self.demo = None self.expt = None self.larmorFreq = None self.repetitionTime = None self.rfReTime = None self.rfReFA = None self.rfExFA = None self.rfExTime = None self.nScans = None self.addParameter(key='seqName', string='LarmorInfo', val='Larmor PyPulseq') self.addParameter(key='nScans', string='Number of scans', val=1, field='SEQ') self.addParameter(key='larmorFreq', string='Larmor frequency (MHz)', val=3.066, units=units.MHz, field='RF') self.addParameter(key='rfExFA', string='Excitation flip angle (º)', val=90.0, field='RF') self.addParameter(key='rfReFA', string='Refocusing flip angle (º)', val=180.0, field='RF') self.addParameter(key='rfExTime', string='RF excitation time (us)', val=30.0, field='RF', units=units.us) self.addParameter(key='rfReTime', string='RF refocusing time (us)', val=60.0, field='RF', units=units.us) self.addParameter(key='repetitionTime', string='Repetition time (ms)', val=1000., field='SEQ', units=units.ms) self.addParameter(key='bw', string='Bandwidth (kHz)', val=50, field='RF', units=units.kHz) self.addParameter(key='dF', string='Frequency resolution (Hz)', val=100, field='RF') self.addParameter(key='shimming', string='Shimming', val=[-12.5, -12.5, 7.5], field='OTH', units=units.sh)
[docs] def sequenceInfo(self): print("Larmor") print("Author: PhD. J.M. Algarín") print("Contact: josalggui@i3m.upv.es") print("mriLab @ i3M, CSIC, Spain") print("This sequence runs a single spin echo to find larmor\n")
[docs] def sequenceTime(self): nScans = self.mapVals['nScans'] repetitionTime = self.mapVals['repetitionTime'] * 1e-3 return (repetitionTime * nScans / 60) # minutes, scanTime
[docs] def sequenceRun(self, plotSeq=0, demo=False, standalone=False): init_gpa = False # Starts the gpa self.demo = demo # Define the interpreter. It should be updated on calibration self.flo_interpreter = PSInterpreter(tx_warmup=hw.blkTime, # us rf_center=hw.larmorFreq * 1e6, # Hz rf_amp_max=hw.b1Efficiency / (2 * np.pi) * 1e6, # Hz gx_max=hw.gFactor[0] * hw.gammaB, # Hz/m gy_max=hw.gFactor[1] * hw.gammaB, # Hz/m gz_max=hw.gFactor[2] * hw.gammaB, # Hz/m grad_max=np.max(hw.gFactor) * hw.gammaB, # Hz/m ) # Define system properties according to hw_config file self.system = pp.Opts( rf_dead_time=hw.blkTime * 1e-6, # s max_grad=hw.max_grad, # mT/m grad_unit='mT/m', max_slew=hw.max_slew_rate, # mT/m/ms slew_unit='mT/m/ms', grad_raster_time=hw.grad_raster_time, # s rise_time=hw.grad_rise_time, # s ) # Set the refocusing time in to twice the excitation time if self.rfReTime == 0: self.rfReTime = 2 * self.rfExTime # Calculate acq_time and echo_time n_points = int(self.bw / self.dF) acq_time = 1 / self.dF # s echo_time = 2 * acq_time # s self.mapVals['nPoints'] = n_points self.mapVals['acqTime'] = acq_time self.mapVals['echoTime'] = echo_time # Initialize the experiment self.bw = n_points / acq_time * hw.oversamplingFactor # Hz sampling_period = 1 / self.bw # s self.mapVals['samplingPeriod'] = sampling_period if not self.demo: self.expt = ex.Experiment(lo_freq=self.larmorFreq * 1e-6, # MHz rx_t=sampling_period * 1e6, # us init_gpa=init_gpa, gpa_fhdo_offset_time=(1 / 0.2 / 3.1), auto_leds=True ) sampling_period = self.expt.get_rx_ts()[0] # us self.bw = 1 / sampling_period / hw.oversamplingFactor # MHz acq_time = n_points / self.bw * 1e-6 # s self.mapVals['bw_true'] = self.bw * 1e3 # kHz else: sampling_period = sampling_period * 1e6 # us self.bw = 1 / sampling_period / hw.oversamplingFactor # MHz acq_time = n_points / self.bw * 1e-6 # s # Create excitation rf event delay_rf_ex = self.repetitionTime-acq_time/2-echo_time-self.rfExTime/2-hw.blkTime*1e-6 event_rf_ex = pp.make_block_pulse(flip_angle=self.rfExFA * np.pi / 180, duration=self.rfExTime, delay=delay_rf_ex, system=self.system, use="excitation",) # Create refocusing rf event delay_rf_re = echo_time/2-self.rfExTime/2-self.rfReTime/2-hw.blkTime*1e-6 event_rf_re = pp.make_block_pulse(flip_angle=self.rfReFA * np.pi / 180, duration=self.rfReTime, delay=delay_rf_re, system=self.system, use="refocusing") # Create ADC event delay_adc = echo_time/2-self.rfReTime/2-acq_time/2 event_adc = pp.make_adc(num_samples=n_points*hw.oversamplingFactor, duration=acq_time, # s delay=delay_adc, # s system=self.system) # Create the sequence here def createSequence(): # Here I will test pypulseq rd_points = 0 # # Shimming # self.iniSequence(t0, self.shimming) # Add excitatoin self.seq.add_block(event_rf_ex) # Add refocusing self.seq.add_block(event_rf_re) # Add ADC self.seq.add_block(event_adc) rd_points += n_points # Create the sequence file self.seq.write('sequence.seq') # Run the interpreter to get the waveforms waveforms, param_dict = self.flo_interpreter.interpret('sequence.seq') # Convert waveform to mriBlankSeq tools (just do it) self.pypulseq2mriblankseq(waveforms=waveforms, shimming=self.shimming) return rd_points # Create the sequence and add instructions to the experiment acq_points = createSequence() if not self.demo: if self.floDict2Exp(): print("Sequence waveforms loaded successfully") pass else: print("ERROR: sequence waveforms out of hardware bounds") return False # Run the experiment data_over = [] # To save oversampled data if not plotSeq: for scan in range(self.nScans): print("Scan %i running..." % (scan + 1)) if not self.demo: rxd, msgs = self.expt.run() else: rxd = {'rx0': np.random.randn(acq_points * hw.oversamplingFactor) + 1j * np.random.randn(acq_points * hw.oversamplingFactor)} data_over = np.concatenate((data_over, rxd['rx0']), axis=0) print("Acquired points = %i" % np.size([rxd['rx0']])) print("Expected points = %i" % ((acq_points + 2 * hw.addRdPoints) * hw.oversamplingFactor)) print("Scan %i ready!" % (scan + 1)) elif plotSeq and standalone: self.sequencePlot(standalone=standalone) return True # Close the experiment if not self.demo: self.expt.__del__() # Process data to be plotted if not plotSeq: data_full = sig.decimate(data_over, hw.oversamplingFactor, ftype='fir', zero_phase=True) self.mapVals['data_full'] = data_full data = np.average(np.reshape(data_full, (self.nScans, -1)), axis=0) self.mapVals['data'] = data # Data to sweep sequence self.mapVals['sampledPoint'] = data[int(n_points / 2)] return True
[docs] def sequenceAnalysis(self, mode=None): self.mode = mode # Load data signal = self.mapVals['data'] acq_time = self.mapVals['acqTime'] * 1e3 # ms n_points = self.mapVals['nPoints'] # Generate time and frequency vectors and calcualte the signal spectrum tVector = np.linspace(-acq_time / 2, acq_time / 2, n_points) fVector = np.linspace(-self.bw / 2, self.bw / 2, n_points) * 1e3 # kHz spectrum = np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(signal))) # Get the central frequency idf = np.argmax(np.abs(spectrum)) fCentral = fVector[idf] * 1e-3 # MHz hw.larmorFreq = self.mapVals['larmorFreq'] + fCentral print('Larmor frequency: %1.5f MHz' % hw.larmorFreq) self.mapVals['larmorFreq'] = hw.larmorFreq self.mapVals['signalVStime'] = [tVector, signal] self.mapVals['spectrum'] = [fVector, spectrum] if mode != 'Standalone': for sequence in self.sequence_list.values(): if 'larmorFreq' in sequence.mapVals: sequence.mapVals['larmorFreq'] = hw.larmorFreq # Add time signal to the layout result1 = {'widget': 'curve', 'xData': tVector, 'yData': [np.abs(signal), np.real(signal), np.imag(signal)], 'xLabel': 'Time (ms)', 'yLabel': 'Signal amplitude (mV)', 'title': 'Echo', 'legend': ['abs', 'real', 'imag'], 'row': 0, 'col': 0} # Add frequency spectrum to the layout result2 = {'widget': 'curve', 'xData': fVector, 'yData': [np.abs(spectrum)], 'xLabel': 'Frequency (kHz)', 'yLabel': 'Spectrum amplitude (a.u.)', 'title': 'Spectrum', 'legend': [''], 'row': 1, 'col': 0} # create self.out to run in iterative mode self.output = [result1, result2] # save data once self.output is created self.saveRawData() # Plot result in standalone execution if self.mode == 'Standalone': self.plotResults() return self.output
if __name__ == '__main__': seq = LarmorPyPulseq() seq.sequenceAtributes() seq.sequenceRun(plotSeq=False, demo=True, standalone=True) # seq.sequencePlot(standalone=True) seq.sequenceAnalysis(mode='Standalone')