"""
Created on Thu June 2 2022
@author: J.M. Algarín, MRILab, i3M, CSIC, Valencia
@email: josalggui@i3m.upv.es
@Summary: rare sequence class
"""
import os
import sys
#*****************************************************************************
# Get the directory of the current script
main_directory = os.path.dirname(os.path.realpath(__file__))
parent_directory = os.path.dirname(main_directory)
parent_directory = os.path.dirname(parent_directory)
# Define the subdirectories you want to add to sys.path
subdirs = ['MaRGE', 'marcos_client']
# Add the subdirectories to sys.path
for subdir in subdirs:
full_path = os.path.join(parent_directory, subdir)
sys.path.append(full_path)
#******************************************************************************
import numpy as np
import controller.experiment_gui as ex
import scipy.signal as sig
from scipy.stats import linregress
import configs.hw_config as hw # Import the scanner hardware config
import configs.units as units
import seq.mriBlankSeq as blankSeq # Import the mriBlankSequence for any new sequence.
from datetime import datetime
import ismrmrd
import ismrmrd.xsd
import datetime
import ctypes
from marga_pulseq.interpreter import PSInterpreter
import pypulseq as pp
#*********************************************************************************
#*********************************************************************************
#*********************************************************************************
[docs]
class RarePyPulseq(blankSeq.MRIBLANKSEQ):
def __init__(self):
super(RarePyPulseq, self).__init__()
# Input the parameters
self.sequence_list = None
self.unlock_orientation = None
self.rdDephTime = None
self.dummyPulses = None
self.nScans = None
self.standalone = None
self.repetitionTime = None
self.inversionTime = None
self.preExTime = None
self.sweepMode = None
self.expt = None
self.echoSpacing = None
self.phGradTime = None
self.rdGradTime = None
self.parFourierFraction = None
self.etl = None
self.rfReFA = None
self.rfExFA = None
self.rfReTime = None
self.rfExTime = None
self.acqTime = None
self.freqOffset = None
self.nPoints = None
self.dfov = None
self.fov = None
self.system = None
self.rotationAxis = None
self.rotation = None
self.angle = None
self.axesOrientation = None
self.addParameter(key='seqName', string='RAREInfo', val='RarePyPulseq')
self.addParameter(key='nScans', string='Number of scans', val=1, field='IM') ## number of scans
self.addParameter(key='freqOffset', string='Larmor frequency offset (kHz)', val=0.0, units=units.kHz, field='RF')
self.addParameter(key='rfExFA', string='Excitation flip angle (º)', val=90, field='RF')
self.addParameter(key='rfReFA', string='Refocusing flip angle (º)', val=180, field='RF')
self.addParameter(key='rfExTime', string='RF excitation time (us)', val=50.0, units=units.us, field='RF')
self.addParameter(key='rfReTime', string='RF refocusing time (us)', val=100.0, units=units.us, field='RF')
self.addParameter(key='echoSpacing', string='Echo spacing (ms)', val=10.0, units=units.ms, field='SEQ')
self.addParameter(key='preExTime', string='Preexitation time (ms)', val=0.0, units=units.ms, field='SEQ')
self.addParameter(key='inversionTime', string='Inversion time (ms)', val=0.0, units=units.ms, field='SEQ', tip="0 to ommit this pulse")
self.addParameter(key='repetitionTime', string='Repetition time (ms)', val=300., units=units.ms, field='SEQ', tip="0 to ommit this pulse")
self.addParameter(key='fov', string='FOV[x,y,z] (cm)', val=[12.0, 12.0, 12.0], units=units.cm, field='IM')
self.addParameter(key='dfov', string='dFOV[x,y,z] (mm)', val=[0.0, 0.0, 0.0], units=units.mm, field='IM', tip="Position of the gradient isocenter")
self.addParameter(key='nPoints', string='nPoints[rd, ph, sl]', val=[120, 120, 10], field='IM')
self.addParameter(key='angle', string='Angle (º)', val=0.0, field='IM')
self.addParameter(key='rotationAxis', string='Rotation axis', val=[0, 0, 1], field='IM')
self.addParameter(key='etl', string='Echo train length', val=6, field='SEQ') ## nm of peaks in 1 repetition
self.addParameter(key='acqTime', string='Acquisition time (ms)', val=4.0, units=units.ms, field='SEQ')
self.addParameter(key='axesOrientation', string='Axes[rd,ph,sl]', val=[2, 1, 0], field='IM', tip="0=x, 1=y, 2=z")
self.addParameter(key='axesEnable', string='Axes enable', val=[1, 1, 1], tip="Use 0 for directions with matrix size 1, use 1 otherwise.")
self.addParameter(key='sweepMode', string='Sweep mode', val=1, field='SEQ', tip="0: sweep from -kmax to kmax. 1: sweep from 0 to kmax. 2: sweep from kmax to 0")
self.addParameter(key='rdGradTime', string='Rd gradient time (ms)', val=5.0, units=units.ms, field='OTH')
self.addParameter(key='rdDephTime', string='Rd dephasing time (ms)', val=1.0, units=units.ms, field='OTH')
self.addParameter(key='phGradTime', string='Ph gradient time (ms)', val=1.0, units=units.ms, field='OTH')
self.addParameter(key='rdPreemphasis', string='Rd preemphasis', val=1.0, field='OTH')
self.addParameter(key='rfPhase', string='RF phase (º)', val=0.0, field='OTH')
self.addParameter(key='dummyPulses', string='Dummy pulses', val=1, field='SEQ', tip="Use last dummy pulse to calibrate k = 0")
self.addParameter(key='shimming', string='Shimming (*1e4)', val=[0.0, 0.0, 0.0], units=units.sh, field='OTH')
self.addParameter(key='parFourierFraction', string='Partial fourier fraction', val=1.0, field='OTH', tip="Fraction of k planes aquired in slice direction")
self.addParameter(key='echo_shift', string='Echo time shift', val=0.0, units=units.us, field='OTH', tip='Shift the gradient echo time respect to the spin echo time.')
self.addParameter(key='unlock_orientation', string='Unlock image orientation', val=0, field='OTH', tip='0: Images oriented according to standard. 1: Image raw orientation')
self.acq = ismrmrd.Acquisition()
self.img = ismrmrd.Image()
self.header = ismrmrd.xsd.ismrmrdHeader()
[docs]
def sequenceInfo(self):
print("3D RARE sequence powered by PyPulseq")
print("Author: Dr. J.M. Algarín")
print("Contact: josalggui@i3m.upv.es")
print("mriLab @ i3M, CSIC, Spain \n")
[docs]
def sequenceTime(self):
n_scans = self.mapVals['nScans']
n_points = np.array(self.mapVals['nPoints'])
etl = self.mapVals['etl']
repetition_time = self.mapVals['repetitionTime']
par_fourier_fraction = self.mapVals['parFourierFraction']
# check if rf amplitude is too high
rf_ex_fa = self.mapVals['rfExFA'] / 180 * np.pi # rads
rf_re_fa = self.mapVals['rfReFA'] / 180 * np.pi # rads
rf_ex_time = self.mapVals['rfExTime'] # us
rf_re_time = self.mapVals['rfReTime'] # us
rf_ex_amp = rf_ex_fa / (rf_ex_time * hw.b1Efficiency)
rf_re_amp = rf_re_fa / (rf_re_time * hw.b1Efficiency)
if rf_ex_amp>1 or rf_re_amp>1:
print("ERROR: RF amplitude is too high, try with longer RF pulse time.")
return 0
seq_time = n_points[1]/etl*n_points[2]*repetition_time*1e-3*n_scans*par_fourier_fraction/60
seq_time = np.round(seq_time, decimals=1)
return seq_time # minutes, scanTime
# TODO: check for min and max values for all fields
[docs]
def sequenceAtributes(self):
super().sequenceAtributes()
# Conversion of variables to non-multiplied units
self.angle = self.angle * np.pi / 180 # rads
# Add rotation, dfov and fov to the history
self.rotation = self.rotationAxis.tolist()
self.rotation.append(self.angle)
self.rotations.append(self.rotation)
self.dfovs.append(self.dfov.tolist())
self.fovs.append(self.fov.tolist())
[docs]
def sequenceRun(self, plot_seq=False, demo=False, standalone=False):
self.demo = demo
self.plotSeq = plot_seq
self.standalone = standalone
print('RARE run...')
'''
Step 1: Define the interpreter for FloSeq/PSInterpreter.
The interpreter is responsible for converting the high-level pulse sequence description into low-level
instructions for the scanner hardware.
'''
flo_interpreter = PSInterpreter(
tx_warmup=hw.blkTime, # Transmit chain warm-up time (us)
rf_center=hw.larmorFreq * 1e6, # Larmor frequency (Hz)
rf_amp_max=hw.b1Efficiency / (2 * np.pi) * 1e6, # Maximum RF amplitude (Hz)
gx_max=hw.gFactor[0] * hw.gammaB, # Maximum gradient amplitude for X (Hz/m)
gy_max=hw.gFactor[1] * hw.gammaB, # Maximum gradient amplitude for Y (Hz/m)
gz_max=hw.gFactor[2] * hw.gammaB, # Maximum gradient amplitude for Z (Hz/m)
grad_max=np.max(hw.gFactor) * hw.gammaB, # Maximum gradient amplitude (Hz/m)
grad_t=hw.grad_raster_time * 1e6, # Gradient raster time (us)
)
'''
Step 2: Define system properties using PyPulseq (pp.Opts).
These properties define the hardware capabilities of the MRI scanner, such as maximum gradient strengths,
slew rates, and dead times. They are typically set based on the hardware configuration file (`hw_config`).
'''
system = pp.Opts(
rf_dead_time=hw.blkTime * 1e-6, # Dead time between RF pulses (s)
max_grad=np.max(hw.gFactor) * 1e3, # Maximum gradient strength (mT/m)
grad_unit='mT/m', # Units of gradient strength
max_slew=hw.max_slew_rate, # Maximum gradient slew rate (mT/m/ms)
slew_unit='mT/m/ms', # Units of gradient slew rate
grad_raster_time=hw.grad_raster_time, # Gradient raster time (s)
rise_time=hw.grad_rise_time, # Gradient rise time (s)
rf_raster_time=1e-6,
block_duration_raster=1e-6
)
'''
Step 3: Perform any calculations required for the sequence.
In this step, students can implement the necessary calculations, such as timing calculations, RF amplitudes, and
gradient strengths, before defining the sequence blocks.
'''
# Set the fov
self.dfov = self.getFovDisplacement()
self.dfov = self.dfov[self.axesOrientation]
self.fov = self.fov[self.axesOrientation]
# Check for used axes
axes_enable = []
for ii in range(3):
if self.nPoints[ii] == 1:
axes_enable.append(0)
else:
axes_enable.append(1)
self.mapVals['axes_enable'] = axes_enable
# Miscellaneous
self.freqOffset = self.freqOffset*1e6 # MHz
resolution = self.fov/self.nPoints
rf_ex_amp = self.rfExFA/(self.rfExTime*1e6*hw.b1Efficiency)*np.pi/180
rf_re_amp = self.rfReFA/(self.rfReTime*1e6*hw.b1Efficiency)*np.pi/180
self.mapVals['rf_ex_amp'] = rf_ex_amp
self.mapVals['rf_re_amp'] = rf_re_amp
self.mapVals['resolution'] = resolution
self.mapVals['grad_rise_time'] = hw.grad_rise_time
self.mapVals['addRdPoints'] = hw.addRdPoints
self.mapVals['larmorFreq'] = hw.larmorFreq + self.freqOffset
if rf_ex_amp > 1 or rf_re_amp > 1:
print("ERROR: RF amplitude is too high, try with longer RF pulse time.")
return 0
# Matrix size
n_rd = self.nPoints[0] + 2 * hw.addRdPoints
n_ph = self.nPoints[1]
n_sl = self.nPoints[2]
# ETL if etl>n_ph
if self.etl>n_ph:
self.etl = n_ph
# Miscellaneous
n_rd_points_per_train = self.etl * n_rd
# par_acq_lines in case par_acq_lines = 0
par_acq_lines = int(int(self.nPoints[2]*self.parFourierFraction)-self.nPoints[2]/2)
self.mapVals['partialAcquisition'] = par_acq_lines
# BW
bw = self.nPoints[0] / self.acqTime * 1e-6 # MHz
sampling_period = 1 / bw # us
# Readout gradient time
if self.rdGradTime<self.acqTime:
self.rdGradTime = self.acqTime
print("Readout gradient time set to %0.1f ms" % (self.rdGradTime * 1e3))
self.mapVals['rdGradTime'] = self.rdGradTime * 1e3 # ms
# Phase and slice de- and re-phasing time
if self.phGradTime == 0 or self.phGradTime > self.echoSpacing/2-self.rfExTime/2-self.rfReTime/2-2*hw.grad_rise_time:
self.phGradTime = self.echoSpacing/2-self.rfExTime/2-self.rfReTime/2-2*hw.grad_rise_time
print("Phase and slice gradient time set to %0.1f ms" % (self.phGradTime * 1e3))
self.mapVals['phGradTime'] = self.phGradTime*1e3 # ms
# Max gradient amplitude
rd_grad_amplitude = self.nPoints[0]/(hw.gammaB*self.fov[0]*self.acqTime)*axes_enable[0]
ph_grad_amplitude = n_ph/(2*hw.gammaB*self.fov[1]*(self.phGradTime+hw.grad_rise_time))*axes_enable[1]
sl_grad_amplitude = n_sl/(2*hw.gammaB*self.fov[2]*(self.phGradTime+hw.grad_rise_time))*axes_enable[2]
self.mapVals['rd_grad_amplitude'] = rd_grad_amplitude
self.mapVals['ph_grad_amplitude'] = ph_grad_amplitude
self.mapVals['sl_grad_amplitude'] = sl_grad_amplitude
# Readout dephasing amplitude
rd_deph_amplitude = 0.5*rd_grad_amplitude*(hw.grad_rise_time+self.rdGradTime)/(hw.grad_rise_time+self.rdDephTime)
self.mapVals['rd_deph_amplitude'] = rd_deph_amplitude
print("Max rd gradient amplitude: %0.1f mT/m" % (max(rd_grad_amplitude, rd_deph_amplitude) * 1e3))
print("Max ph gradient amplitude: %0.1f mT/m" % (ph_grad_amplitude * 1e3))
print("Max sl gradient amplitude: %0.1f mT/m" % (sl_grad_amplitude * 1e3))
# Phase and slice gradient vector
ph_gradients = np.linspace(-ph_grad_amplitude,ph_grad_amplitude,num=n_ph,endpoint=False)
sl_gradients = np.linspace(-sl_grad_amplitude,sl_grad_amplitude,num=n_sl,endpoint=False)
# Now fix the number of slices to partially acquired k-space
n_sl = (int(self.nPoints[2]/2)+par_acq_lines)*axes_enable[2]+(1-axes_enable[2])
print("Number of acquired slices: %i" % n_sl)
# Set phase vector to given sweep mode
ind = self.getIndex(self.etl, n_ph, self.sweepMode)
self.mapVals['sweepOrder'] = ind
ph_gradients = ph_gradients[ind]
self.mapVals['ph_gradients'] = ph_gradients.copy()
self.mapVals['sl_gradients'] = sl_gradients.copy()
# Normalize gradient list
if ph_grad_amplitude != 0:
ph_gradients /= ph_grad_amplitude
if sl_grad_amplitude != 0:
sl_gradients /= sl_grad_amplitude
# Get the rotation matrix
rot = self.getRotationMatrix()
grad_amp = np.array([0.0, 0.0, 0.0])
grad_amp[self.axesOrientation[0]] = 1
grad_amp = np.reshape(grad_amp, (3, 1))
result = np.dot(rot, grad_amp)
# Map the axis to "x", "y", and "z" according ot axesOrientation
axes_map = {0: "x", 1: "y", 2: "z"}
rd_channel = axes_map.get(self.axesOrientation[0], "")
ph_channel = axes_map.get(self.axesOrientation[1], "")
sl_channel = axes_map.get(self.axesOrientation[2], "")
'''
# Step 4: Define the experiment to get the true bandwidth
# In this step, student need to get the real bandwidth used in the experiment. To get this bandwidth, an
# experiment must be defined and the sampling period should be obtained using get_sampling_period()
# Note: experiment must be passed as a class property named self.expt
'''
if not self.demo:
self.expt = ex.Experiment(lo_freq=hw.larmorFreq + self.freqOffset * 1e-6, # MHz
rx_t=sampling_period, # us
init_gpa=False,
gpa_fhdo_offset_time=(1 / 0.2 / 3.1),
auto_leds=True)
sampling_period = self.expt.get_sampling_period() # us
bw = 1 / sampling_period # MHz
sampling_time = sampling_period * n_rd * 1e-6 # s
print("Acquisition bandwidth fixed to: %0.3f kHz" % (bw * 1e3))
self.expt.__del__()
else:
sampling_time = sampling_period * n_rd * 1e-6 # s
self.mapVals['bw_MHz'] = bw
self.mapVals['sampling_period_us'] = sampling_period
self.mapVals['sampling_time_s'] = sampling_time
'''
# Step 5: Define sequence blocks.
# In this step, you will define the building blocks of the MRI sequence, including the RF pulses and gradient pulses.
'''
# First delay, sequence will start after 1 repetition time, this ensure gradient and ADC latency is not an issue.
if self.inversionTime==0 and self.preExTime==0:
delay = self.repetitionTime - self.rfExTime / 2 - system.rf_dead_time
elif self.inversionTime>0 and self.preExTime==0:
delay = self.repetitionTime - self.inversionTime - self.rfReTime / 2 - system.rf_dead_time
elif self.inversionTime==0 and self.preExTime>0:
delay = self.repetitionTime - self.preExTime - self.rfExTime / 2 - system.rf_dead_time
else:
delay = self.repetitionTime - self.preExTime - self.inversionTime - self.rfExTime / 2 - system.rf_dead_time
delay_first = pp.make_delay(delay)
# ADC to get noise
delay = 100e-6
block_adc_noise = pp.make_adc(
num_samples=n_rd,
dwell=sampling_period * 1e-6,
delay=delay,
)
# Pre-excitation pulse
if self.preExTime>0:
flip_pre = self.rfExFA * np.pi / 180
delay = 0
block_rf_pre_excitation = pp.make_block_pulse(
flip_angle=flip_pre,
system=system,
duration=self.rfExTime,
phase_offset=0.0,
delay=0,
)
if self.inversionTime==0:
delay = self.preExTime
else:
delay = self.rfExTime / 2 - self.rfReTime / 2 + self.preExTime
delay_pre_excitation = pp.make_delay(delay)
# Inversion pulse
if self.inversionTime>0:
flip_inv = self.rfReFA * np.pi / 180
block_rf_inversion = pp.make_block_pulse(
flip_angle=flip_inv,
system=system,
duration=self.rfReTime,
phase_offset=0.0,
delay=0,
)
delay = self.rfReTime / 2 - self.rfExTime / 2 + self.inversionTime
delay_inversion = pp.make_delay(delay)
# Excitation pulse
flip_ex = self.rfExFA * np.pi / 180
block_rf_excitation = pp.make_block_pulse(
flip_angle=flip_ex,
system=system,
duration=self.rfExTime,
phase_offset=0.0,
delay=0.0,
use = 'excitation'
)
# De-phasing gradient
delay = system.rf_dead_time + self.rfExTime
block_gr_rd_preph = pp.make_trapezoid(
channel=rd_channel,
system=system,
amplitude=rd_deph_amplitude * hw.gammaB,
flat_time=self.rdDephTime,
rise_time=hw.grad_rise_time,
delay=delay,
)
# Delay to re-focusing pulse
delay_preph = pp.make_delay(self.echoSpacing / 2 + self.rfExTime / 2 - self.rfReTime / 2)
# Refocusing pulse
flip_re = self.rfReFA * np.pi / 180
block_rf_refocusing = pp.make_block_pulse(
flip_angle=flip_re,
system=system,
duration=self.rfReTime,
phase_offset=np.pi / 2,
delay=0,
use='refocusing'
)
# Delay to next refocusing pulse
delay_reph = pp.make_delay(self.echoSpacing)
# Phase gradient de-phasing
delay = system.rf_dead_time + self.rfReTime
block_gr_ph_deph = pp.make_trapezoid(
channel=ph_channel,
system=system,
amplitude=ph_grad_amplitude * hw.gammaB + float(ph_grad_amplitude==0),
flat_time=self.phGradTime,
rise_time=hw.grad_rise_time,
delay=delay,
)
# Slice gradient de-phasing
delay = system.rf_dead_time + self.rfReTime
block_gr_sl_deph = pp.make_trapezoid(
channel=sl_channel,
system=system,
amplitude=sl_grad_amplitude * hw.gammaB + float(sl_grad_amplitude==0),
flat_time=self.phGradTime,
delay=delay,
rise_time=hw.grad_rise_time,
)
# Readout gradient
delay = system.rf_dead_time + self.rfReTime / 2 + self.echoSpacing / 2 - self.rdGradTime / 2 - \
hw.grad_rise_time
block_gr_rd_reph = pp.make_trapezoid(
channel=rd_channel,
system=system,
amplitude=rd_grad_amplitude * hw.gammaB,
flat_time=self.rdGradTime,
rise_time=hw.grad_rise_time,
delay=delay,
)
# ADC to get the signal
delay = system.rf_dead_time + self.rfReTime / 2 + self.echoSpacing / 2 - sampling_time / 2
block_adc_signal = pp.make_adc(
num_samples=n_rd,
dwell=sampling_period * 1e-6,
delay=delay,
)
# Phase gradient re-phasing
delay = system.rf_dead_time + self.rfReTime / 2 - self.echoSpacing / 2 + sampling_time / 2
block_gr_ph_reph = pp.make_trapezoid(
channel=ph_channel,
system=system,
amplitude=ph_grad_amplitude * hw.gammaB + float(ph_grad_amplitude==0),
flat_time=self.phGradTime,
rise_time=hw.grad_rise_time,
delay=delay,
)
# Slice gradient re-phasing
delay = system.rf_dead_time + self.rfReTime / 2 - self.echoSpacing / 2 + sampling_time / 2
block_gr_sl_reph = pp.make_trapezoid(
channel=sl_channel,
system=system,
amplitude=sl_grad_amplitude * hw.gammaB + float(sl_grad_amplitude==0),
flat_time=self.phGradTime,
rise_time=hw.grad_rise_time,
delay=delay,
)
# Delay TR
delay = self.repetitionTime + self.rfReTime / 2 - self.rfExTime / 2 - (self.etl + 0.5) * self.echoSpacing - \
self.inversionTime - self.preExTime
if self.inversionTime > 0 and self.preExTime == 0:
delay -= self.rfExTime / 2
delay_tr = pp.make_delay(delay)
'''
# Step 6: Define your initializeBatch according to your sequence.
# In this step, you will create the initializeBatch method to create dummy pulses that will be initialized for
# each new batch.
'''
def initialize_batch():
"""
Initializes a batch of MRI sequence blocks using PyPulseq for a given experimental configuration.
Returns:
--------
tuple
- `batch` (pp.Sequence): A PyPulseq sequence object containing the configured sequence blocks.
- `n_rd_points` (int): Total number of readout points in the batch.
- `n_adc` (int): Total number of ADC acquisitions in the batch.
Workflow:
---------
1. **Create PyPulseq Sequence Object**:
- Instantiates a new PyPulseq sequence object (`pp.Sequence`) and initializes counters for
readout points (`n_rd_points`) and ADC events (`n_adc`).
2. **Set Gradients to Zero**:
- Initializes slice and phase gradients (`gr_ph_deph`, `gr_sl_deph`, `gr_ph_reph`, `gr_sl_reph`) to zero
by scaling predefined gradient blocks with a factor of 0.
3. **Add Initial Delay and Noise Measurement**:
- Adds an initial delay block (`delay_first`) and a noise measurement ADC block (`block_adc_noise`)
to the sequence.
4. **Generate Dummy Pulses**:
- Creates a specified number of dummy pulses (`self.dummyPulses`) to prepare the system for data acquisition:
- **Pre-excitation Pulse**:
- If `self.preExTime > 0`, adds a pre-excitation pulse with a readout pre-phasing gradient.
- **Inversion Pulse**:
- If `self.inversionTime > 0`, adds an inversion pulse with a scaled readout pre-phasing gradient.
- **Excitation Pulse**:
- Adds an excitation pulse followed by a readout de-phasing gradient (`block_gr_rd_preph`).
- For each dummy pulse:
- **Echo Train**:
- For the last dummy pulse, appends an echo train that includes:
- A refocusing pulse.
- Gradients for readout re-phasing, phase de-phasing, and slice de-phasing.
- ADC signal acquisition block (`block_adc_signal`).
- Gradients for phase and slice re-phasing.
- For other dummy pulses, excludes the ADC signal acquisition.
- **Repetition Time Delay**:
- Adds a delay (`delay_tr`) to separate repetitions.
5. **Return Results**:
- Returns the configured sequence (`batch`), total readout points (`n_rd_points`), and number of ADC events (`n_adc`).
"""
# Instantiate pypulseq sequence object
batch = pp.Sequence(system)
n_rd_points = 0
n_adc = 0
# Set slice and phase gradients to 0
gr_ph_deph = pp.scale_grad(block_gr_ph_deph, scale=0.0)
gr_sl_deph = pp.scale_grad(block_gr_sl_deph, scale=0.0)
gr_ph_reph = pp.scale_grad(block_gr_ph_reph, scale=0.0)
gr_sl_reph = pp.scale_grad(block_gr_sl_reph, scale=0.0)
# Add first delay and first noise measurement
batch.add_block(delay_first, block_adc_noise)
n_rd_points += n_rd
n_adc += 1
# Create dummy pulses
for dummy in range(self.dummyPulses):
# Pre-excitation pulse
if self.preExTime>0:
gr_rd_preex = pp.scale_grad(block_gr_rd_preph, scale=1.0)
batch.add_block(block_rf_pre_excitation,
gr_rd_preex,
delay_pre_excitation)
# Inversion pulse
if self.inversionTime>0:
gr_rd_inv = pp.scale_grad(block_gr_rd_preph, scale=-1.0)
batch.add_block(block_rf_inversion,
gr_rd_inv,
delay_inversion)
# Add excitation pulse and readout de-phasing gradient
batch.add_block(block_gr_rd_preph,
block_rf_excitation,
delay_preph)
# Add echo train
for echo in range(self.etl):
if dummy == self.dummyPulses-1:
batch.add_block(block_rf_refocusing,
block_gr_rd_reph,
gr_ph_deph,
gr_sl_deph,
block_adc_signal,
delay_reph)
batch.add_block(gr_ph_reph,
gr_sl_reph)
n_rd_points += n_rd
n_adc += 1
else:
batch.add_block(block_rf_refocusing,
block_gr_rd_reph,
gr_ph_deph,
gr_sl_deph,
delay_reph)
batch.add_block(gr_ph_reph,
gr_sl_reph)
# Add time delay to next repetition
batch.add_block(delay_tr)
return batch, n_rd_points, n_adc
'''
Step 7: Define your createBatches method.
In this step you will populate the batches adding the blocks previously defined in step 4, and accounting for
number of acquired points to check if a new batch is required.
'''
def create_batches():
"""
Creates and processes multiple batches of MRI sequence blocks for slice and phase encoding sweeps.
Returns:
--------
tuple
- `waveforms` (dict): Dictionary of interpreted waveform data for each batch.
- `n_rd_points_dict` (dict): Dictionary mapping each batch to its total number of readout points.
- `n_adc` (int): Total number of ADC acquisitions across all batches.
Workflow:
---------
1. **Initialization**:
- Initializes dictionaries to store batches (`batches`), waveforms (`waveforms`), and readout points (`n_rd_points_dict`).
- Tracks the current readout point count (`n_rd_points`), ADC window count (`n_adc`), and batch index (`seq_idx`).
2. **Slice and Phase Sweep**:
- Iterates over slices (`n_sl`) and phases (`n_ph`) to build and organize sequence blocks:
- **Batch Management**:
- Creates a new batch if no batch exists or the current batch exceeds the hardware limit (`hw.maxRdPoints`).
- Writes the previous batch to disk, interprets it using `flo_interpreter`, and updates dictionaries.
- Initializes the next batch with `initializeBatch()`.
- **Pre-Excitation and Inversion Pulses**:
- Optionally adds a pre-excitation pulse (`block_rf_pre_excitation`) and an inversion pulse (`block_rf_inversion`), if respective times (`self.preExTime`, `self.inversionTime`) are greater than zero.
- **Excitation and Echo Train**:
- Adds an excitation pulse followed by an echo train for phase and slice gradients:
- Gradients are scaled based on the current slice (`sl_idx`) and phase (`ph_idx`) indices.
- Includes ADC acquisition blocks (`block_adc_signal`) and refocusing pulses for each echo.
- **Repetition Time Delay**:
- Adds a delay (`delay_tr`) between repetitions.
3. **Final Batch Processing**:
- Writes and interprets the last batch after completing all slices and phases.
- Updates the total readout points for the final batch.
Returns:
--------
- `waveforms`: Interpreted waveforms for each batch, generated using the `flo_interpreter`.
- `n_rd_points_dict`: Maps batch names to the total readout points per batch.
- `n_adc`: Total number of ADC acquisition windows across all batches.
"""
batches = {} # Dictionary to save batches PyPulseq sequences
waveforms = {} # Dictionary to store generated waveforms per each batch
n_rd_points_dict = {} # Dictionary to track readout points for each batch
n_rd_points = 0 # To account for number of acquired rd points
seq_idx = 0 # Sequence batch index
n_adc = 0 # To account for number of adc windows
batch_num = "batch_0" # Initial batch name
# Slice sweep
for sl_idx in range(n_sl):
ph_idx = 0
# Phase sweep
while ph_idx < n_ph:
# Check if a new batch is needed (either first batch or exceeding readout points limit)
if seq_idx == 0 or n_rd_points + n_rd_points_per_train > hw.maxRdPoints:
# If a previous batch exists, write and interpret it
if seq_idx > 0:
batches[batch_num].write(batch_num + ".seq")
waveforms[batch_num], param_dict = flo_interpreter.interpret(batch_num + ".seq")
print(f"{batch_num}.seq ready!")
# Update to the next batch
seq_idx += 1
n_rd_points_dict[batch_num] = n_rd_points # Save readout points count
n_rd_points = 0
batch_num = f"batch_{seq_idx}"
batches[batch_num], n_rd_points, n_adc_0 = initialize_batch() # Initialize new batch
n_adc += n_adc_0
print(f"Creating {batch_num}.seq...")
# Pre-excitation pulse
if self.preExTime > 0:
gr_rd_preex = pp.scale_grad(block_gr_rd_preph, scale=+1.0)
batches[batch_num].add_block(block_rf_pre_excitation,
gr_rd_preex,
delay_pre_excitation)
# Inversion pulse
if self.inversionTime > 0:
gr_rd_inv = pp.scale_grad(block_gr_rd_preph, scale=-1.0)
batches[batch_num].add_block(block_rf_inversion,
gr_rd_inv,
delay_inversion)
# Add excitation pulse and readout de-phasing gradient
batches[batch_num].add_block(block_gr_rd_preph,
block_rf_excitation,
delay_preph)
# Add echo train
for echo in range(self.etl):
# Fix the phase and slice amplitude
gr_ph_deph = pp.scale_grad(block_gr_ph_deph, ph_gradients[ph_idx])
gr_sl_deph = pp.scale_grad(block_gr_sl_deph, sl_gradients[sl_idx])
gr_ph_reph = pp.scale_grad(block_gr_ph_reph, - ph_gradients[ph_idx])
gr_sl_reph = pp.scale_grad(block_gr_sl_reph, - sl_gradients[sl_idx])
# Add blocks
batches[batch_num].add_block(block_rf_refocusing,
block_gr_rd_reph,
gr_ph_deph,
gr_sl_deph,
block_adc_signal,
delay_reph)
batches[batch_num].add_block(gr_ph_reph,
gr_sl_reph)
n_rd_points += n_rd
n_adc += 1
ph_idx += 1
# Add time delay to next repetition
batches[batch_num].add_block(delay_tr)
# After final repetition, save and interpret the last batch
batches[batch_num].write(batch_num + ".seq")
waveforms[batch_num], param_dict = flo_interpreter.interpret(batch_num + ".seq")
print(f"{batch_num}.seq ready!")
print(f"{len(batches)} batches created. Sequence ready!")
# Update the number of acquired points in the last batch
n_rd_points_dict.pop('batch_0')
n_rd_points_dict[batch_num] = n_rd_points
return waveforms, n_rd_points_dict, n_adc
'''
Step 8: Run the batches
This step will handle the different batches, run it and get the resulting data. This should not be modified.
Oversampled data will be available in self.mapVals['data_over']
Decimated data will be available in self.mapVals['data_decimated']
'''
waveforms, n_readouts, n_adc = create_batches()
return self.runBatches(waveforms=waveforms,
n_readouts=n_readouts,
n_adc=n_adc,
frequency=hw.larmorFreq + self.freqOffset * 1e-6, # MHz
bandwidth=bw, # MHz
decimate='Normal',
)
[docs]
def sequenceAnalysis(self, mode=None):
self.mode = mode
# Get data
data_over = self.mapVals['data_over']
data_decimated = self.mapVals['data_decimated']
n_rd, n_ph, n_sl = self.nPoints
n_rd = n_rd + 2 * hw.addRdPoints
n_batches = self.mapVals['n_batches']
n_readouts = self.mapVals['n_readouts']
ind = self.getParameter('sweepOrder')
# Get noise data, dummy data and signal data
data_noise = []
data_dummy = []
data_signal = []
points_per_rd = n_rd
points_per_train = points_per_rd * self.etl
idx_0 = 0
idx_1 = 0
for batch in range(n_batches):
n_rds = n_readouts[batch]
for scan in range(self.nScans):
idx_1 += n_rds
data_prov = data_decimated[idx_0:idx_1]
data_noise = np.concatenate((data_noise, data_prov[0:points_per_rd]), axis=0)
if self.dummyPulses > 0:
data_dummy = np.concatenate((data_dummy, data_prov[points_per_rd:points_per_rd+points_per_train]), axis=0)
data_signal = np.concatenate((data_signal, data_prov[points_per_rd+points_per_train::]), axis=0)
idx_0 = idx_1
n_readouts[batch] += -n_rd - n_rd * self.etl
self.mapVals['data_noise'] = data_noise
self.mapVals['data_dummy'] = data_dummy
self.mapVals['data_signal'] = data_signal
# Decimate data to get signal in desired bandwidth
data_full = data_signal
# Reorganize data_full
data_prov = np.zeros(shape=[self.nScans, n_sl * n_ph * n_rd], dtype=complex)
if n_batches > 1:
data_full_a = data_full[0:sum(n_readouts[0:-1]) * self.nScans]
data_full_b = data_full[sum(n_readouts[0:-1]) * self.nScans:]
data_full_a = np.reshape(data_full_a, newshape=(n_batches - 1, self.nScans, -1, n_rd))
data_full_b = np.reshape(data_full_b, newshape=(1, self.nScans, -1, n_rd))
for scan in range(self.nScans):
data_scan_a = np.reshape(data_full_a[:, scan, :, :], -1)
data_scan_b = np.reshape(data_full_b[:, scan, :, :], -1)
data_prov[scan, :] = np.concatenate((data_scan_a, data_scan_b), axis=0)
else:
data_full = np.reshape(data_full, newshape=(1, self.nScans, -1, n_rd))
for scan in range(self.nScans):
data_prov[scan, :] = np.reshape(data_full[:, scan, :, :], -1)
data_full = np.reshape(data_prov, -1)
# Save data_full to save it in .h5
self.data_fullmat = data_full
# Get index for krd = 0
# Average data
data_prov = np.reshape(data_full, newshape=(self.nScans, n_rd * n_ph * n_sl))
data_prov = np.average(data_prov, axis=0)
# Reorganize the data according to sweep mode
data_prov = np.reshape(data_prov, newshape=(n_sl, n_ph, n_rd))
data_temp = np.zeros_like(data_prov)
for ii in range(n_ph):
data_temp[:, ind[ii], :] = data_prov[:, ii, :]
data_prov = data_temp
# Get central line
data_prov = data_prov[int(self.nPoints[2] / 2), int(n_ph / 2), :]
ind_krd_0 = np.argmax(np.abs(data_prov))
if ind_krd_0 < n_rd / 2 - hw.addRdPoints or ind_krd_0 > n_rd / 2 + hw.addRdPoints:
ind_krd_0 = int(n_rd / 2)
# Get individual images
data_full = np.reshape(data_full, newshape=(self.nScans, n_sl, n_ph, n_rd))
data_full = data_full[:, :, :, ind_krd_0 - int(self.nPoints[0] / 2):ind_krd_0 + int(self.nPoints[0] / 2)]
data_temp = np.zeros_like(data_full)
for ii in range(n_ph):
data_temp[:, :, ind[ii], :] = data_full[:, :, ii, :]
data_full = data_temp
self.mapVals['data_full'] = data_full
# Average data
data = np.average(data_full, axis=0)
# Do zero padding
data_temp = np.zeros(shape=(self.nPoints[2], self.nPoints[1], self.nPoints[0]), dtype=complex)
data_temp[0:n_sl, :, :] = data
data = np.reshape(data_temp, newshape=(1, self.nPoints[0] * self.nPoints[1] * self.nPoints[2]))
# Fix the position of the sample according to dfov
bw = self.getParameter('bw_MHz')
time_vector = np.linspace(-self.nPoints[0] / bw / 2 + 0.5 / bw, self.nPoints[0] / bw / 2 - 0.5 / bw,
self.nPoints[0]) * 1e-6 # s
kMax = np.array(self.nPoints) / (2 * np.array(self.fov)) * np.array(self.mapVals['axes_enable'])
kRD = time_vector * hw.gammaB * self.getParameter('rd_grad_amplitude')
kPH = np.linspace(-kMax[1], kMax[1], num=self.nPoints[1], endpoint=False)
kSL = np.linspace(-kMax[2], kMax[2], num=self.nPoints[2], endpoint=False)
kPH, kSL, kRD = np.meshgrid(kPH, kSL, kRD)
kRD = np.reshape(kRD, newshape=(1, self.nPoints[0] * self.nPoints[1] * self.nPoints[2]))
kPH = np.reshape(kPH, newshape=(1, self.nPoints[0] * self.nPoints[1] * self.nPoints[2]))
kSL = np.reshape(kSL, newshape=(1, self.nPoints[0] * self.nPoints[1] * self.nPoints[2]))
dPhase = np.exp(-2 * np.pi * 1j * (self.dfov[0] * kRD + self.dfov[1] * kPH + self.dfov[2] * kSL))
data = np.reshape(data * dPhase, newshape=(self.nPoints[2], self.nPoints[1], self.nPoints[0]))
self.mapVals['kSpace3D'] = data
img = np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(data)))
self.mapVals['image3D'] = img
data = np.reshape(data, newshape=(1, self.nPoints[0] * self.nPoints[1] * self.nPoints[2]))
# Create sampled data
kRD = np.reshape(kRD, newshape=(self.nPoints[0] * self.nPoints[1] * self.nPoints[2], 1))
kPH = np.reshape(kPH, newshape=(self.nPoints[0] * self.nPoints[1] * self.nPoints[2], 1))
kSL = np.reshape(kSL, newshape=(self.nPoints[0] * self.nPoints[1] * self.nPoints[2], 1))
data = np.reshape(data, newshape=(self.nPoints[0] * self.nPoints[1] * self.nPoints[2], 1))
self.mapVals['kMax_1/m'] = kMax
self.mapVals['sampled'] = np.concatenate((kRD, kPH, kSL, data), axis=1)
self.mapVals['sampledCartesian'] = self.mapVals['sampled'] # To sweep
data = np.reshape(data, newshape=(self.nPoints[2], self.nPoints[1], self.nPoints[0]))
axes_enable = self.mapVals['axes_enable']
# Get axes in strings
axes = self.mapVals['axesOrientation']
axesDict = {'x':0, 'y':1, 'z':2}
axesKeys = list(axesDict.keys())
axesVals = list(axesDict.values())
axesStr = ['','','']
n = 0
for val in axes:
index = axesVals.index(val)
axesStr[n] = axesKeys[index]
n += 1
if (axes_enable[1] == 0 and axes_enable[2] == 0):
bw = self.mapVals['bw']*1e-3 # kHz
acqTime = self.mapVals['acqTime'] # ms
tVector = np.linspace(-acqTime/2, acqTime/2, self.nPoints[0])
sVector = self.mapVals['sampled'][:, 3]
fVector = np.linspace(-bw/2, bw/2, self.nPoints[0])
iVector = np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(sVector)))
# Plots to show into the GUI
result1 = {}
result1['widget'] = 'curve'
result1['xData'] = tVector
result1['yData'] = [np.abs(sVector), np.real(sVector), np.imag(sVector)]
result1['xLabel'] = 'Time (ms)'
result1['yLabel'] = 'Signal amplitude (mV)'
result1['title'] = "Signal"
result1['legend'] = ['Magnitude', 'Real', 'Imaginary']
result1['row'] = 0
result1['col'] = 0
result2 = {}
result2['widget'] = 'curve'
result2['xData'] = fVector
result2['yData'] = [np.abs(iVector)]
result2['xLabel'] = 'Frequency (kHz)'
result2['yLabel'] = "Amplitude (a.u.)"
result2['title'] = "Spectrum"
result2['legend'] = ['Spectrum magnitude']
result2['row'] = 1
result2['col'] = 0
self.output = [result1, result2]
else:
# Plot image
image = np.abs(self.mapVals['image3D'])
image = image/np.max(np.reshape(image,-1))*100
# Image orientation
image_orientation_dicom = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0]
x_label = "%s axis" % axesStr[1]
y_label = "%s axis" % axesStr[0]
title = "Image"
if not self.unlock_orientation: # Image orientation
if self.axesOrientation[2] == 2: # Sagittal
title = "Sagittal"
if self.axesOrientation[0] == 0 and self.axesOrientation[1] == 1: #OK
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
x_label = "(-Y) A | PHASE | P (+Y)"
y_label = "(-X) I | READOUT | S (+X)"
image_orientation_dicom = [0.0, 1.0, 0.0, 0.0, 0.0, -1.0]
else:
image = np.transpose(image, (0, 2, 1))
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
x_label = "(-Y) A | READOUT | P (+Y)"
y_label = "(-X) I | PHASE | S (+X)"
image_orientation_dicom = [0.0, 1.0, 0.0, 0.0, 0.0, -1.0]
elif self.axesOrientation[2] == 1: # Coronal
title = "Coronal"
if self.axesOrientation[0] == 0 and self.axesOrientation[1] == 2: #OK
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
image = np.flip(image, axis=0)
x_label = "(+Z) R | PHASE | L (-Z)"
y_label = "(-X) I | READOUT | S (+X)"
image_orientation_dicom = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0]
else:
image = np.transpose(image, (0, 2, 1))
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
image = np.flip(image, axis=0)
x_label = "(+Z) R | READOUT | L (-Z)"
y_label = "(-X) I | PHASE | S (+X)"
image_orientation_dicom = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0]
elif self.axesOrientation[2] == 0: # Transversal
title = "Transversal"
if self.axesOrientation[0] == 1 and self.axesOrientation[1] == 2:
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
x_label = "(+Z) R | PHASE | L (-Z)"
y_label = "(+Y) P | READOUT | A (-Y)"
image_orientation_dicom = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0]
else: #OK
image = np.transpose(image, (0, 2, 1))
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
x_label = "(+Z) R | READOUT | L (-Z)"
y_label = "(+Y) P | PHASE | A (-Y)"
image_orientation_dicom = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0]
result1 = {'widget': 'image',
'data': image,
'xLabel': x_label,
'yLabel': y_label,
'title': title,
'row': 0,
'col': 0}
result2 = {'widget': 'image'}
if self.parFourierFraction==1:
result2['data'] = np.log10(np.abs(self.mapVals['kSpace3D']))
else:
result2['data'] = np.abs(self.mapVals['kSpace3D'])
result2['xLabel'] = "k%s"%axesStr[1]
result2['yLabel'] = "k%s"%axesStr[0]
result2['title'] = "k-Space"
result2['row'] = 0
result2['col'] = 1
# DICOM TAGS
# Image
image_dicom = np.transpose(image, (0, 2, 1))
# If it is a 3d image
if len(image_dicom.shape) > 2:
# Obtener dimensiones
slices, rows, columns = image_dicom.shape
self.meta_data["Columns"] = columns
self.meta_data["Rows"] = rows
self.meta_data["NumberOfSlices"] = slices
self.meta_data["NumberOfFrames"] = slices
# if it is a 2d image
else:
# Obtener dimensiones
rows, columns = image_dicom.shape
slices = 1
self.meta_data["Columns"] = columns
self.meta_data["Rows"] = rows
self.meta_data["NumberOfSlices"] = 1
self.meta_data["NumberOfFrames"] = 1
img_full_abs = np.abs(image_dicom) * (2 ** 15 - 1) / np.amax(np.abs(image_dicom))
img_full_int = np.int16(np.abs(img_full_abs))
img_full_int = np.reshape(img_full_int, (slices, rows, columns))
arr = img_full_int
self.meta_data["PixelData"] = arr.tobytes()
self.meta_data["WindowWidth"] = 26373
self.meta_data["WindowCenter"] = 13194
self.meta_data["ImageOrientationPatient"] = image_orientation_dicom
resolution = self.mapVals['resolution'] * 1e3
self.meta_data["PixelSpacing"] = [resolution[0], resolution[1]]
self.meta_data["SliceThickness"] = resolution[2]
# Sequence parameters
self.meta_data["RepetitionTime"] = self.mapVals['repetitionTime']
self.meta_data["EchoTime"] = self.mapVals['echoSpacing']
self.meta_data["EchoTrainLength"] = self.mapVals['etl']
# Add results into the output attribute (result1 must be the image to save in dicom)
self.output = [result1, result2]
# Reset rotation angle and dfov to zero
self.mapVals['angle'] = self.angle
self.mapVals['dfov'] = np.array(self.mapVals['dfov'])
self.mapVals['dfov'][self.axesOrientation] = self.dfov.reshape(-1)
self.mapVals['dfov'] = list(self.mapVals['dfov'])
# Save results
self.saveRawData()
self.save_ismrmrd()
self.mapVals['angle'] = 0.0
self.mapVals['dfov'] = [0.0, 0.0, 0.0]
try:
self.sequence_list['RARE'].mapVals['angle'] = 0.0
self.sequence_list['RARE'].mapVals['dfov'] = [0.0, 0.0, 0.0]
except:
pass
hw.dfov = [0.0, 0.0, 0.0]
if self.mode == 'Standalone':
self.plotResults()
return self.output
[docs]
def save_ismrmrd(self):
"""
Save the current instance's data in ISMRMRD format.
This method saves the raw data, header information, and reconstructed images to an HDF5 file
using the ISMRMRD (Image Storage and Reconstruction format for MR Data) format.
Steps performed:
1. Generate a timestamp-based filename and directory path for the output file.
2. Initialize the ISMRMRD dataset with the generated path.
3. Populate the header and write the XML header to the dataset. Informations can be added.
4. Reshape the raw data matrix and iterate over scans, slices, and phases to write each acquisition. WARNING : RARE sequence follows ind order to fill the k-space.
5. Set acquisition flags and properties.
6. Append the acquisition data to the dataset.
7. Reshape and save the reconstructed images.
8. Close the dataset.
Attribute:
- self.data_full_mat (numpy.array): Full matrix of raw data to be reshaped and saved.
Returns:
None. It creates an HDF5 file with the ISMRMRD format.
"""
directory_rmd = self.directory_rmd
name = datetime.datetime.now()
name_string = name.strftime("%Y.%m.%d.%H.%M.%S.%f")[:-3]
self.mapVals['name_string'] = name_string
if hasattr(self, 'raw_data_name'):
file_name = "%s.%s" % (self.raw_data_name, name_string)
else:
self.raw_data_name = self.mapVals['seqName']
file_name = "%s.%s" % (self.mapVals['seqName'], name_string)
path= "%s/%s.h5" % (directory_rmd, file_name)
dset = ismrmrd.Dataset(path, f'/dataset', True) # Create the dataset
etl = self.mapVals['etl']
n_rd = self.nPoints[0]
n_ph = self.nPoints[1]
n_sl = self.nPoints[2]
ind = self.getIndex(self.etl, n_ph, self.sweepMode)
nRep = (n_ph//etl)*n_sl
bw = self.mapVals['bw_MHz']
axesOrientation = self.axesOrientation
axesOrientation_list = axesOrientation.tolist()
read_dir = [0, 0, 0]
phase_dir = [0, 0, 0]
slice_dir = [0, 0, 0]
read_dir[axesOrientation_list.index(0)] = 1
phase_dir[axesOrientation_list.index(1)] = 1
slice_dir[axesOrientation_list.index(2)] = 1
# Experimental Conditions field
exp = ismrmrd.xsd.experimentalConditionsType()
magneticFieldStrength = hw.larmorFreq*1e6/hw.gammaB
exp.H1resonanceFrequency_Hz = hw.larmorFreq
self.header.experimentalConditions = exp
# Acquisition System Information field
sys = ismrmrd.xsd.acquisitionSystemInformationType()
sys.receiverChannels = 1
self.header.acquisitionSystemInformation = sys
# Encoding field can be filled if needed
encoding = ismrmrd.xsd.encodingType()
encoding.trajectory = ismrmrd.xsd.trajectoryType.CARTESIAN
#encoding.trajectory =ismrmrd.xsd.trajectoryType[data.processing.trajectory.upper()]
dset.write_xml_header(self.header.toXML()) # Write the header to the dataset
new_data = np.zeros((n_ph * n_sl * self.nScans, n_rd + 2*hw.addRdPoints))
new_data = np.reshape(self.data_fullmat, (self.nScans, n_sl, n_ph, n_rd+ 2*hw.addRdPoints))
counter=0
for scan in range(self.nScans):
for slice_idx in range(n_sl):
for phase_idx in range(n_ph):
line = new_data[scan, slice_idx, phase_idx, :]
line2d = np.reshape(line, (1, n_rd+2*hw.addRdPoints))
acq = ismrmrd.Acquisition.from_array(line2d, None)
index_in_repetition = phase_idx % etl
current_repetition = (phase_idx // etl) + (slice_idx * (n_ph // etl))
acq.clearAllFlags()
if index_in_repetition == 0:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_CONTRAST)
elif index_in_repetition == etl - 1:
acq.setFlag(ismrmrd.ACQ_LAST_IN_CONTRAST)
if ind[phase_idx]== 0:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_PHASE)
elif ind[phase_idx] == n_ph - 1:
acq.setFlag(ismrmrd.ACQ_LAST_IN_PHASE)
if slice_idx == 0:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_SLICE)
elif slice_idx == n_sl - 1:
acq.setFlag(ismrmrd.ACQ_LAST_IN_SLICE)
if int(current_repetition) == 0:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_REPETITION)
elif int(current_repetition) == nRep - 1:
acq.setFlag(ismrmrd.ACQ_LAST_IN_REPETITION)
if scan == 0:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_AVERAGE)
elif scan == self.nScans-1:
acq.setFlag(ismrmrd.ACQ_LAST_IN_AVERAGE)
counter += 1
# +1 to start at 1 instead of 0
acq.idx.repetition = int(current_repetition + 1)
acq.idx.kspace_encode_step_1 = ind[phase_idx]+1 # phase
acq.idx.slice = slice_idx + 1
acq.idx.contrast = index_in_repetition + 1
acq.idx.average = scan + 1 # scan
acq.scan_counter = counter
acq.discard_pre = hw.addRdPoints
acq.discard_post = hw.addRdPoints
acq.sample_time_us = 1/bw
self.dfov = np.array(self.dfov)
acq.position = (ctypes.c_float * 3)(*self.dfov.flatten())
acq.read_dir = (ctypes.c_float * 3)(*read_dir)
acq.phase_dir = (ctypes.c_float * 3)(*phase_dir)
acq.slice_dir = (ctypes.c_float * 3)(*slice_dir)
dset.append_acquisition(acq) # Append the acquisition to the dataset
image=self.mapVals['image3D']
image_reshaped = np.reshape(image, (n_sl, n_ph, n_rd))
for slice_idx in range (n_sl): ## image3d does not have scan dimension
image_slice = image_reshaped[slice_idx, :, :]
img = ismrmrd.Image.from_array(image_slice)
img.transpose = False
img.field_of_view = (ctypes.c_float * 3)(*(self.fov)*10) # mm
img.position = (ctypes.c_float * 3)(*self.dfov)
# img.data_type= 8 ## COMPLEX FLOAT
img.image_type = 5 ## COMPLEX
img.read_dir = (ctypes.c_float * 3)(*read_dir)
img.phase_dir = (ctypes.c_float * 3)(*phase_dir)
img.slice_dir = (ctypes.c_float * 3)(*slice_dir)
dset.append_image(f"image_raw", img) # Append the image to the dataset
dset.close()
if __name__ == '__main__':
seq = RarePyPulseq()
seq.sequenceAtributes()
seq.sequenceRun(plot_seq=False, demo=True, standalone=True)
seq.sequenceAnalysis(mode='Standalone')