Source code for seq.spds

Created on Tue October 1st, 2024
@author: [Student's Name]
@Summary: [Sequence Name] class template

import os
import sys

# Get the directory of the current script
main_directory = os.path.dirname(os.path.realpath(__file__))
parent_directory = os.path.dirname(main_directory)
parent_directory = os.path.dirname(parent_directory)

# Define the subdirectories you want to add to sys.path
subdirs = ['MaRGE', 'marcos_client']

# Add the subdirectories to sys.path
for subdir in subdirs:
    full_path = os.path.join(parent_directory, subdir)
import numpy as np
import controller.experiment_gui as ex
import configs.hw_config as hw  # Import the scanner hardware config
import configs.units as units
import seq.mriBlankSeq as blankSeq  # Import the mriBlankSequence for any new sequence.
from marga_pulseq.interpreter import PSInterpreter  # Import the marga_pulseq interpreter
import pypulseq as pp  # Import PyPulseq

# Template Class for MRI Sequences
[docs] class spds(blankSeq.MRIBLANKSEQ): """ Executes the SPDS (Single Point Double Shot) sequence, designed to estimate the B₀ map by acquiring data with two distinct acquisition windows. """ def __init__(self): """ Defines the parameters for the sequence. """ super(spds, self).__init__() self.mask = None self.axesOrientation = None self.rfExFA = None self.standalone = None self.dummyPulses = None self.repetitionTime = None self.rfExTime = None self.deadTime = None self.nPoints = None self.fov = None self.dfov = None = None self.plotSeq = None self.addParameter(key='seqName', string='Sequence Name', val='SPDS', tip="The identifier name for the sequence.") self.addParameter(key='nScans', string='Number of scans', val=1, field='IM', tip='Number of repetitions of the full scan.') self.addParameter(key='FreqOffset', string='Frequency offset (kHz)', val=0, units=units.kHz, field='RF', tip='Frequency offset respect Larmor.') self.addParameter(key='rfExFA', string='Excitation Flip Angle (degrees)', val=45.0, field='RF', tip="Flip angle of the excitation RF pulse in degrees") self.addParameter(key='rfExTime', string='Excitation time (us)', val=50.0,, field='RF', tip="Duration of the RF excitation pulse in microseconds (us).") self.addParameter(key='nPoints', string='Matrix size [rd, ph, sl]', val=[10, 10, 10], field='IM', tip='Matrix size for the acquired images.') self.addParameter(key='fov', string='Field of View (cm)', val=[15.0, 15.0, 15.0],, field='IM', tip='Field of View (cm).') self.addParameter(key='axesOrientation', string='Axes[rd,ph,sl]', val=[2, 1, 0], field='IM', tip="0=x, 1=y, 2=z") self.addParameter(key='repetitionTime', string='Repetition Time (ms)', val=50.0,, field='SEQ', tip="The time between successive excitation pulses, in milliseconds (ms).") self.addParameter(key='deadTime', string='Dead times (us)', val=[500.0, 550.0],, field='SEQ', tip='Dead time for the two acquisitions in microseconds (us).') self.addParameter(key='dummyPulses', string='Number of dummy pulses', val=1, field='SEQ', tip='Number of dummy pulses at the beginning of each batch.') self.addParameter(key='shimming', string='Shimming', val=[0.0, 0.0, 0.0],, field='OTH', tip='Shimming parameter to compensate B0 linear inhomogeneity.') self.addParameter(key='bw', string='Bandwidth (kHz)', val=50.0, units=units.kHz, field='IMG', tip='Set acquisition bandwidth in kilohertz (kHz).')
[docs] def sequenceInfo(self): """ Description of the sequence. """ print("SPDS") print("Contributor: PhD. J.M. Algarín") print("Contributor: PhD. J. Borreguero") print("Contact:") print("mriLab @ i3M, CSIC, Spain \n") print("Single Point Double Shot protocol to measure B0 map")
[docs] def sequenceTime(self): """ Calculate the sequence time based on its parameters. Output: ------- - time (float): sequence time in minutes """ nPoints = self.mapVals['nPoints'] kx = np.linspace(start=-1, stop=1, endpoint=False, num=nPoints[0]) ky = np.linspace(start=-1, stop=1, endpoint=False, num=nPoints[1]) kz = np.linspace(start=-1, stop=1, endpoint=False, num=nPoints[2]) ky, kz, kx = np.meshgrid(ky, kz, kx) k_norm = np.zeros(shape=(np.size(kx), 3)) k_norm[:, 0] = np.reshape(kx, -1) k_norm[:, 1] = np.reshape(ky, -1) k_norm[:, 2] = np.reshape(kz, -1) distance = np.sqrt(np.sum(k_norm ** 2, axis=1)) self.mask = distance <= 1 n = np.sum(self.mask) tr = self.mapVals['repetitionTime'] * 1e-3 # s time = tr * n / 60 * 2 * self.mapVals['nScans'] # minutes return time # minutes
[docs] def sequenceAtributes(self): """ Assign input parameters as attributes for the sequence. This method is called by the GUI before invoking the `sequenceRun` method. It ensures that any input parameters defined using methods like `self.addParameter(key='param', string='Parameter', val=1)` are assigned as class attributes, allowing them to be accessed directly using `self.param`. Example: If you define an input parameter as: `self.addParameter(key='param', string='Parameter', val=1, ...)` You can access its value later in the sequence as: `self.param` """ super().sequenceAtributes()
[docs] def sequenceRun(self, plotSeq=False, demo=False, standalone=False): """ This method orchestrates the definition, preparation, and execution of the sequence. Parameters: ----------- plotSeq : bool, optional If True, the sequence is plotted for visualization without execution (default: False). demo : bool, optional If True, runs the sequence in demonstration mode without hardware communication (default: False). standalone : bool, optional If True, runs the sequence independently, without external triggering or batch processing (default: False). Output: ------- - Oversampled and decimated data stored in `self.mapVals['data_over']` and `self.mapVals['data_decimated']`. - Generated sequence files for verification and debugging. Notes: ------ - This method is tailored for SPDS sequences, which are particularly useful for estimating B₀ inhomogeneities in low-field MRI systems under high inhomogeneities. """ self.demo = demo self.plotSeq = plotSeq self.standalone = standalone ''' Step 1: Define the interpreter for FloSeq/PSInterpreter. The interpreter is responsible for converting the high-level pulse sequence description into low-level instructions for the scanner hardware. ''' flo_interpreter = PSInterpreter( tx_warmup=hw.blkTime, # Transmit chain warm-up time (us) rf_center=hw.larmorFreq * 1e6, # Larmor frequency (Hz) rf_amp_max=hw.b1Efficiency / (2 * np.pi) * 1e6, # Maximum RF amplitude (Hz) gx_max=hw.gFactor[0] * hw.gammaB, # Maximum gradient amplitude for X (Hz/m) gy_max=hw.gFactor[1] * hw.gammaB, # Maximum gradient amplitude for Y (Hz/m) gz_max=hw.gFactor[2] * hw.gammaB, # Maximum gradient amplitude for Z (Hz/m) grad_max=np.max(hw.gFactor) * hw.gammaB, # Maximum gradient amplitude (Hz/m) grad_t=hw.grad_raster_time * 1e6, # Gradient raster time (us) ) ''' Step 2: Define system properties using PyPulseq (pp.Opts). These properties define the hardware capabilities of the MRI scanner, such as maximum gradient strengths, slew rates, and dead times. They are typically set based on the hardware configuration file (`hw_config`). ''' system = pp.Opts( rf_dead_time=hw.blkTime * 1e-6, # Dead time between RF pulses (s) max_grad=np.max(hw.gFactor) * 1e3, # Maximum gradient strength (mT/m) grad_unit='mT/m', # Units of gradient strength max_slew=hw.max_slew_rate, # Maximum gradient slew rate (mT/m/ms) slew_unit='mT/m/ms', # Units of gradient slew rate grad_raster_time=1e-6, # Gradient raster time (s) rise_time=hw.grad_rise_time, # Gradient rise time (s) rf_raster_time=1e-6, block_duration_raster=1e-6 ) ''' Step 3: Perform any calculations required for the sequence. In this step, students can implement the necessary calculations, such as timing calculations, RF amplitudes, and gradient strengths, before defining the sequence blocks. ''' # Get k-space info dk = 1 / self.fov # m^-1 k_max = self.nPoints / (2 * self.fov) # m^-1 self.mapVals['dk'] = dk self.mapVals['k_max'] = k_max # Get bandwidths and acquisition window bw_a = # Hz bw_b = # Hz n_rd = 1 + 2 * hw.addRdPoints # Get timing parameters time_acq_a = n_rd / bw_a # s time_acq_b = n_rd / bw_b # s time_delay0_a = self.rfExTime / 2 + self.deadTime[0] + time_acq_a / 2 # s time_delay0_b = self.rfExTime / 2 + self.deadTime[1] + time_acq_b / 2 # s time_grad_a = self.repetitionTime - time_delay0_a - hw.grad_rise_time - self.rfExTime / 2 # s time_grad_b = self.repetitionTime - time_delay0_b - hw.grad_rise_time - self.rfExTime / 2 # s # Get cartesian points kx = np.linspace(start=-1, stop=1, endpoint=False, num=self.nPoints[0]) ky = np.linspace(start=-1, stop=1, endpoint=False, num=self.nPoints[1]) kz = np.linspace(start=-1, stop=1, endpoint=False, num=self.nPoints[2]) ky, kz, kx = np.meshgrid(ky, kz, kx) k_norm = np.zeros(shape=(np.size(kx), 3)) k_norm[:, 0] = np.reshape(kx, -1) k_norm[:, 1] = np.reshape(ky, -1) k_norm[:, 2] = np.reshape(kz, -1) distance = np.sqrt(np.sum(k_norm ** 2, axis=1)) k_cartesian = np.zeros_like(k_norm) k_cartesian[:, 0] = k_norm[:, 0] * k_max[0] # m^-1 k_cartesian[:, 1] = k_norm[:, 1] * k_max[1] # m^-1 k_cartesian[:, 2] = k_norm[:, 2] * k_max[2] # m^-1 self.mask = distance <= 1 self.mapVals['k_cartesian'] = k_cartesian # Get gradients gradients_a = k_cartesian / (hw.gammaB * self.deadTime[0]) # T/m gradients_b = k_cartesian / (hw.gammaB * self.deadTime[1]) # T/m gradients_a = gradients_a[self.mask] gradients_b = gradients_b[self.mask] self.mapVals['gradients_a'] = gradients_a self.mapVals['gradients_b'] = gradients_b gradients_a = np.vstack([[0.0, 0.0, 0.0], gradients_a]) gradients_b = np.vstack([[0.0, 0.0, 0.0], gradients_b]) # Map the axis to "x", "y", and "z" according ot axesOrientation axes_map = {0: "x", 1: "y", 2: "z"} rd_channel = axes_map.get(self.axesOrientation[0], "") ph_channel = axes_map.get(self.axesOrientation[1], "") sl_channel = axes_map.get(self.axesOrientation[2], "") ''' Step 4: Define the experiment to get the true bandwidth In this step, student needs to get the real bandwidth used in the experiment. To get this bandwidth, an experiment must be defined and the sampling period should be obtained using get_ ''' # Experiment A if not demo: expt = ex.Experiment( lo_freq=hw.larmorFreq, # Larmor frequency in MHz rx_t=1e6 / bw_a, # Sampling time in us init_gpa=False, # Whether to initialize GPA board (False for True) gpa_fhdo_offset_time=(1 / 0.2 / 3.1), # GPA offset time calculation auto_leds=True # Automatic control of LEDs (False or True) ) sampling_period = expt.get_sampling_period() # us bw_a = 1 / sampling_period # MHz print("Acquisition bandwidth A fixed to: %0.3f kHz" % (bw_a * 1e3)) expt.__del__() else: bw_a *= 1e-6 # MHz self.mapVals['bw_a_kHz'] = bw_a * 1e3 # Experiment B if not demo: expt = ex.Experiment( lo_freq=hw.larmorFreq, # Larmor frequency in MHz rx_t=1e6 / bw_b, # Sampling time in us init_gpa=False, # Whether to initialize GPA board (False for True) gpa_fhdo_offset_time=(1 / 0.2 / 3.1), # GPA offset time calculation auto_leds=True # Automatic control of LEDs (False or True) ) sampling_period = expt.get_sampling_period() # us bw_b = 1 / sampling_period # MHz print("Acquisition bandwidth B fixed to: %0.3f kHz" % (bw_b * 1e3)) expt.__del__() else: bw_b *= 1e-6 # MHz self.mapVals['bw_b_kHz'] = bw_b * 1e3 ''' Step 5: Define sequence blocks. In this step, you will define the building blocks of the MRI sequence, including the RF pulses, gradient pulses, and ADC blocks. ''' # First delay, sequence will start after 1 repetition time, this ensure gradient and ADC latency is not an issue. delay_first_a = pp.make_delay(time_delay0_a) delay_first_b = pp.make_delay(time_delay0_b) ## Excitation pulse # Define the RF excitation pulse using PyPulseq. The flip angle is typically in radians. flip_ex = self.rfExFA * np.pi / 180 # Convert flip angle from degrees to radians block_rf_a = pp.make_block_pulse( flip_angle=flip_ex, # Set the flip angle for the RF pulse system=system, # Use the system properties defined earlier duration=self.rfExTime, # Set the RF pulse duration delay=hw.grad_rise_time + time_grad_a, # Delay before the RF pulse (if any) phase_offset=0.0, # Set the phase offset for the pulse (0 by default) ) block_rf_b = pp.make_block_pulse( flip_angle=flip_ex, # Set the flip angle for the RF pulse system=system, # Use the system properties defined earlier duration=self.rfExTime, # Set the RF pulse duration delay=hw.grad_rise_time + time_grad_b, # Delay before the RF pulse (if any) phase_offset=0.0, # Set the phase offset for the pulse (0 by default) ) ## ADC block # Define the ADC block using PyPulseq. You need to specify number of samples and delay. block_adc_a = pp.make_adc( num_samples=n_rd, dwell=1 / bw_a * 1e-6, delay=self.repetitionTime - n_rd / bw_a * 1e-6, ) block_adc_b = pp.make_adc( num_samples=n_rd, dwell=1 / bw_b * 1e-6, delay=self.repetitionTime - n_rd / bw_b * 1e-6, ) ''' Step 6: Define your initializeBatch according to your sequence. In this step, you will create the initializeBatch method to create dummy pulses that will be initialized for each new batch. ''' def initialize_batch(case='a'): # Instantiate pypulseq sequence object batch = pp.Sequence(system) n_rd_points = 0 n_adc = 0 # Add dummy pulses if case == 'a': batch.add_block(delay_first_a) for _ in range(self.dummyPulses): batch.add_block(block_rf_a, pp.make_delay(self.repetitionTime)) elif case == 'b': batch.add_block(delay_first_b) for _ in range(self.dummyPulses): batch.add_block(block_rf_b, pp.make_delay(self.repetitionTime)) return batch, n_rd_points, n_adc ''' Step 7: Define your createBatches method. In this step you will populate the batches adding the blocks previously defined in step 4, and accounting for number of acquired points to check if a new batch is required. ''' def create_batches(case='a'): batches = {} # Dictionary to save batches PyPulseq sequences waveforms = {} # Dictionary to store generated waveforms per each batch n_rd_points_dict = {} # Dictionary to track readout points for each batch n_adc = 0 # To account for number of adc windows batch_name = f'sequence_{case}' # Initialize new batch batches[batch_name], n_rd_points, n_adc_0 = initialize_batch(case=case) n_adc += n_adc_0 # Populate the sequence for ii in range(1, np.size(gradients_a, 0)): if case == 'a': g_rd_amp = np.array([gradients_a[ii - 1, 0], gradients_a[ii, 0], gradients_a[ii, 0]]) * hw.gammaB g_ph_amp = np.array([gradients_a[ii - 1, 1], gradients_a[ii, 1], gradients_a[ii, 1]]) * hw.gammaB g_sl_amp = np.array([gradients_a[ii - 1, 2], gradients_a[ii, 2], gradients_a[ii, 2]]) * hw.gammaB g_time = np.array([0.0, hw.grad_rise_time, self.repetitionTime]) batches[batch_name].add_block( pp.make_extended_trapezoid(channel=rd_channel, amplitudes=g_rd_amp, times=g_time), pp.make_extended_trapezoid(channel=ph_channel, amplitudes=g_ph_amp, times=g_time), pp.make_extended_trapezoid(channel=sl_channel, amplitudes=g_sl_amp, times=g_time), block_rf_a, block_adc_a ) n_adc += 1 n_rd_points += n_rd elif case == 'b': g_rd_amp = np.array([gradients_b[ii - 1, 0], gradients_b[ii, 0], gradients_b[ii, 0]]) * hw.gammaB g_ph_amp = np.array([gradients_b[ii - 1, 1], gradients_b[ii, 1], gradients_b[ii, 1]]) * hw.gammaB g_sl_amp = np.array([gradients_b[ii - 1, 2], gradients_b[ii, 2], gradients_b[ii, 2]]) * hw.gammaB g_time = np.array([0.0, hw.grad_rise_time, self.repetitionTime]) batches[batch_name].add_block( pp.make_extended_trapezoid(channel=rd_channel, amplitudes=g_rd_amp, times=g_time), pp.make_extended_trapezoid(channel=ph_channel, amplitudes=g_ph_amp, times=g_time), pp.make_extended_trapezoid(channel=sl_channel, amplitudes=g_sl_amp, times=g_time), block_rf_b, block_adc_b ) n_adc += 1 n_rd_points += n_rd # Write sequence file and interpret the sequence to get the waveform batches[batch_name].write(batch_name + ".seq") waveforms[batch_name], param_dict = flo_interpreter.interpret(batch_name + ".seq") print(f"{batch_name}.seq ready!") n_rd_points_dict[batch_name] = n_rd_points return waveforms, n_rd_points_dict, n_adc ''' Step 8: Run the batches This step will handle the different batches, run it and get the resulting data. This should not be modified. Oversampled data will be available in self.mapVals['data_over'] Decimated data will be available in self.mapVals['data_decimated'] The decimated data is shifted to account for CIC delay, so data is synchronized with real-time signal ''' # Create batches waveforms_a, n_readouts_a, n_adc_a = create_batches(case='a') waveforms_b, n_readouts_b, n_adc_b = create_batches(case='b') # Run sequence a if self.runBatches(waveforms=waveforms_a, n_readouts=n_readouts_a, n_adc=n_adc_a, frequency=hw.larmorFreq, # MHz bandwidth=bw_a, # MHz decimate='Normal', hardware=True, output='a' ): pass else: return False # Run sequence b return self.runBatches(waveforms=waveforms_b, n_readouts=n_readouts_b, n_adc=n_adc_b, frequency=hw.larmorFreq, # MHz bandwidth=bw_b, # MHz decimate='Normal', hardware=True, output='b' )
[docs] def sequenceAnalysis(self, mode=None): """ Analyzes the data acquired from the SPDS (Single Point Double Shot) sequence to estimate the B₀ map, generate k-space and spatial domain images, and prepare the outputs for visualization. Parameters: ----------- mode : str, optional Execution mode of the analysis. If set to 'Standalone', the results are plotted immediately after analysis (default: None). Outputs: -------- - `output` (list): A list of dictionaries defining the data and parameters for visualization. Includes: - Spatial domain magnitude images for channels A and B. - B₀ field map. - k-space magnitude images for channels A and B. - Updates `self.mapVals` with intermediate results, including k-space, spatial images, and the B₀ field map. - If `mode == 'Standalone'`, plots the results. Notes: ------ - Assumes that the k-space mask and orientation settings are correctly preconfigured. """ # Pass mode to the self, it will be required by the mriBlankSeq self.mode = mode # Load data data_a = self.mapVals['data_decimated_a'] data_b = self.mapVals['data_decimated_b'] k_points = self.mapVals['k_cartesian'] mask = self.mask # Delete the addRdPoints data_a = np.reshape(data_a, (-1, 1 + 2 * hw.addRdPoints)) data_b = np.reshape(data_b, (-1, 1 + 2 * hw.addRdPoints)) data_a = data_a[:, hw.addRdPoints] data_b = data_b[:, hw.addRdPoints] # Fill k_space k_data_a = np.zeros(np.size(k_points, 0), dtype=complex) k_data_b = np.zeros(np.size(k_points, 0), dtype=complex) jj = 0 for ii in range(np.size(mask)): if mask[ii]: k_data_a[ii] = data_a[jj] k_data_b[ii] = data_b[jj] jj += 1 # Get images k_data_a = np.reshape(k_data_a, (self.nPoints[2], self.nPoints[1], self.nPoints[0])) k_data_b = np.reshape(k_data_b, (self.nPoints[2], self.nPoints[1], self.nPoints[0])) i_data_a = self.runIFFT(k_data_a) i_data_b = self.runIFFT(k_data_b) self.mapVals['space_k_a'] = k_data_a self.mapVals['space_k_b'] = k_data_b self.mapVals['space_i_a'] = i_data_a self.mapVals['space_i_b'] = i_data_b # Generate mask p_max = np.max(np.abs(i_data_a)) mask = np.abs(i_data_a) < p_max/3 # Get phase i_phase_a = np.angle(i_data_a) i_phase_b = np.angle(i_data_b) # Get magnetic field b_field = (i_phase_b - i_phase_a) / (2 * np.pi * hw.gammaB * (self.deadTime[1] - self.deadTime[0])) b_field[mask] = 0 self.mapVals['b_field'] = b_field axes_map = {0: "x", 1: "y", 2: "z"} rd_channel = axes_map.get(self.axesOrientation[0], "") ph_channel = axes_map.get(self.axesOrientation[1], "") sl_channel = axes_map.get(self.axesOrientation[2], "") # Create the outputs to be plotted output_0 = self.fix_image_orientation(b_field, axes=self.axesOrientation) output_0['row'] = 0 output_0['col'] = 0 # Create the outputs to be plotted output_1 = self.fix_image_orientation(np.abs(i_data_a), axes=self.axesOrientation) output_1['row'] = 0 output_1['col'] = 1 # Create the outputs to be plotted output_2 = self.fix_image_orientation(np.abs(i_data_b), axes=self.axesOrientation) output_2['row'] = 0 output_2['col'] = 2 output_3 = { 'widget': 'image', 'data': np.abs(k_data_a), 'xLabel': ph_channel, 'yLabel': rd_channel, 'title': 'k-space A', 'row': 1, 'col': 1, } output_4 = { 'widget': 'image', 'data': np.abs(k_data_b), 'xLabel': ph_channel, 'yLabel': rd_channel, 'title': 'k-space B', 'row': 1, 'col': 2, } # create self.out to run in iterative mode self.output = [output_0, output_1, output_2, output_3, output_4] # save data once self.output is created self.saveRawData() # Plot result in standalone execution if self.mode == 'Standalone': self.plotResults() return self.output
if __name__ == "__main__": seq = spds() seq.sequenceAtributes() seq.sequenceRun(plotSeq=True, demo=True, standalone=True) # seq.sequenceAnalysis(mode='Standalone')