Source code for seq.mse_pp_jma

"""
Created on Wen April 10 2024
@author: J.M. Algarín, MRILab, i3M, CSIC, Valencia
@Summary: mse sequence class
"""

import os
import sys
#*****************************************************************************
# Get the directory of the current script
main_directory = os.path.dirname(os.path.realpath(__file__))
parent_directory = os.path.dirname(main_directory)
parent_directory = os.path.dirname(parent_directory)

# Define the subdirectories you want to add to sys.path
subdirs = ['MaRGE', 'marcos_client']

# Add the subdirectories to sys.path
for subdir in subdirs:
    full_path = os.path.join(parent_directory, subdir)
    sys.path.append(full_path)
#******************************************************************************
import numpy as np
import experiment as ex
import scipy.signal as sig
from scipy.stats import linregress
import configs.hw_config as hw # Import the scanner hardware config
import configs.units as units
import seq.mriBlankSeq as blankSeq  # Import the mriBlankSequence for any new sequence.
from scipy.optimize import curve_fit
from marga_pulseq.interpreter import PSInterpreter
import pypulseq as pp

#*********************************************************************************
#*********************************************************************************
#*********************************************************************************

[docs] class MSE(blankSeq.MRIBLANKSEQ): def __init__(self): super(MSE, self).__init__() # Input the parameters self.addParameter(key='seqName', string='MSEInfo', val='MSE_jma') self.addParameter(key='nScans', string='Number of scans', val=2, field='IM') self.addParameter(key='freqOffset', string='Larmor frequency offset (kHz)', val=0.0, units=units.kHz, field='RF') self.addParameter(key='rfExFA', string='Excitation flip angle (º)', val=90, field='RF') self.addParameter(key='rfReFA', string='Refocusing flip angle (º)', val=180, field='RF') self.addParameter(key='rfExTime', string='RF excitation time (us)', val=60.0, units=units.us, field='RF') self.addParameter(key='rfReTime', string='RF refocusing time (us)', val=120.0, units=units.us, field='RF') self.addParameter(key='echoSpacing', string='Echo spacing (ms)', val=10.0, units=units.ms, field='SEQ') self.addParameter(key='preExTime', string='Preexitation time (ms)', val=0.0, units=units.ms, field='SEQ') self.addParameter(key='inversionTime', string='Inversion time (ms)', val=0.0, units=units.ms, field='SEQ', tip="0 to ommit this pulse") self.addParameter(key='repetitionTime', string='Repetition time (ms)', val=300., units=units.ms, field='SEQ', tip="0 to ommit this pulse") self.addParameter(key='fov', string='FOV[x,y,z] (cm)', val=[15.0, 15.0, 15.0], units=units.cm, field='IM') self.addParameter(key='dfov', string='dFOV[x,y,z] (mm)', val=[0.0, 0.0, 0.0], units=units.mm, field='IM', tip="Position of the gradient isocenter") self.addParameter(key='nPoints', string='nPoints[rd, ph, sl]', val=[50, 50, 10], field='IM') self.addParameter(key='angle', string='Angle (º)', val=0.0, field='IM') self.addParameter(key='rotationAxis', string='Rotation axis', val=[0, 0, 1], field='IM') self.addParameter(key='etl', string='Echo train length', val=10, field='SEQ') self.addParameter(key='acqTime', string='Acquisition time (ms)', val=2.0, units=units.ms, field='SEQ') self.addParameter(key='axesOrientation', string='Axes[rd,ph,sl]', val=[2, 1, 0], field='IM', tip="0=x, 1=y, 2=z") self.addParameter(key='axesEnable', string='Axes enable', val=[1, 1, 1], field='IM', tip="Use 0 for directions with matrix size 1, use 1 otherwise.") self.addParameter(key='sweepMode', string='Sweep mode', val=0, field='SEQ', tip="0: sweep from -kmax to kmax. 1: sweep from 0 to kmax. 2: sweep from kmax to 0") self.addParameter(key='rdGradTime', string='Rd gradient time (ms)', val=3.0, units=units.ms, field='OTH') self.addParameter(key='rdDephTime', string='Rd dephasing time (ms)', val=0.5, units=units.ms, field='OTH') self.addParameter(key='phGradTime', string='Ph gradient time (ms)', val=0.5, units=units.ms, field='OTH') self.addParameter(key='rdPreemphasis', string='Rd preemphasis', val=1.0, field='OTH') self.addParameter(key='rfPhase', string='RF phase (º)', val=0.0, field='OTH') self.addParameter(key='dummyPulses', string='Dummy pulses', val=1, field='SEQ', tip="Use last dummy pulse to calibrate k = 0") self.addParameter(key='shimming', string='Shimming (*1e4)', val=[0.0, 0.0, 0.0], units=units.sh, field='OTH') self.addParameter(key='parFourierFraction', string='Partial fourier fraction', val=1.0, field='OTH', tip="Fraction of k planes aquired in slice direction") self.addParameter(key='echo_shift', string='Echo time shift', val=0.0, units=units.us, field='OTH', tip='Shift the gradient echo time respect to the spin echo time.') self.addParameter(key='unlock_orientation', string='Unlock image orientation', val=0, field='OTH', tip='0: Images oriented according to standard. 1: Image raw orientation') # self.addParameter(key='calculateMap', string='Calculate T2 Map', val=1, field='OTH', tip='0: Do not calculate. 1: Calculate') self.addParameter(key='rfMode', string='RF mode', val=0, field='OTH', tip='0: CPMG. 1: APCP. 2:APCPMG. 3:CP')
[docs] def sequenceInfo(self): print("3D MSE sequence") print("Author: Dr. J.M. Algarín") print("mriLab @ i3M, CSIC, Spain \n")
[docs] def sequenceTime(self): nScans = self.mapVals['nScans'] nPoints = np.array(self.mapVals['nPoints']) etl = self.mapVals['etl'] repetitionTime = self.mapVals['repetitionTime'] parFourierFraction = self.mapVals['parFourierFraction'] # check if rf amplitude is too high rfExFA = self.mapVals['rfExFA'] / 180 * np.pi # rads rfReFA = self.mapVals['rfReFA'] / 180 * np.pi # rads rfExTime = self.mapVals['rfExTime'] # us rfReTime = self.mapVals['rfReTime'] # us rfExAmp = rfExFA / (rfExTime * hw.b1Efficiency) rfReAmp = rfReFA / (rfReTime * hw.b1Efficiency) if rfExAmp>1 or rfReAmp>1: print("RF amplitude is too high, try with longer RF pulse time.") return(0) seqTime = nPoints[1]*nPoints[2]*repetitionTime*1e-3*nScans*parFourierFraction/60 seqTime = np.round(seqTime, decimals=1) return(seqTime) # minutes, scanTime
# TODO: check for min and max values for all fields
[docs] def sequenceAtributes(self): super().sequenceAtributes() # Conversion of variables to non-multiplied units self.angle = self.angle * np.pi / 180 # rads # Add rotation, dfov and fov to the history self.rotation = self.rotationAxis.tolist() self.rotation.append(self.angle) self.rotations.append(self.rotation) # self.dfovs.append(self.dfov.tolist()) self.fovs.append(self.fov.tolist())
[docs] def sequenceRun(self, plotSeq=False, demo=False, standalone=False): print('MSE run') init_gpa=False # Starts the gpa self.demo = demo # Step 1: Define the interpreter for FloSeq/PSInterpreter. # The interpreter is responsible for converting the high-level pulse sequence description into low-level # instructions for the scanner hardware. You will typically update the interpreter during scanner calibration. self.flo_interpreter = PSInterpreter( tx_warmup=hw.blkTime, # Transmit chain warm-up time (us) rf_center=hw.larmorFreq * 1e6, # Larmor frequency (Hz) rf_amp_max=hw.b1Efficiency / (2 * np.pi) * 1e6, # Maximum RF amplitude (Hz) gx_max=hw.gFactor[0] * hw.gammaB, # Maximum gradient amplitude for X (Hz/m) gy_max=hw.gFactor[1] * hw.gammaB, # Maximum gradient amplitude for Y (Hz/m) gz_max=hw.gFactor[2] * hw.gammaB, # Maximum gradient amplitude for Z (Hz/m) grad_max=np.max(hw.gFactor) * hw.gammaB, # Maximum gradient amplitude (Hz/m) grad_t=hw.grad_raster_time * 1e6, # Gradient raster time (us) ) # Step 2: Define system properties using PyPulseq (pp.Opts). # These properties define the hardware capabilities of the MRI scanner, such as maximum gradient strengths, # slew rates, and dead times. They are typically set based on the hardware configuration file (`hw_config`). self.system = pp.Opts( rf_dead_time=hw.blkTime * 1e-6, # Dead time between RF pulses (s) max_grad=np.max(hw.gFactor) * 1e3, # Maximum gradient strength (mT/m) grad_unit='mT/m', # Units of gradient strength max_slew=hw.max_slew_rate, # Maximum gradient slew rate (mT/m/ms) slew_unit='mT/m/ms', # Units of gradient slew rate grad_raster_time=hw.grad_raster_time, # Gradient raster time (s) rise_time=hw.grad_rise_time, # Gradient rise time (s) rf_raster_time=1e-6, block_duration_raster=1e-6 ) # Set the fov self.dfov = self.getFovDisplacement() self.dfov = self.dfov[self.axesOrientation] self.fov = self.fov[self.axesOrientation] # Miscellaneous self.freqOffset = self.freqOffset*1e-6 # MHz gradRiseTime = hw.grad_rise_time gSteps = hw.grad_steps addRdPoints = hw.addRdPoints # Initial rd points to avoid artifact at the begining of rd randFactor = 0e-3 # Random amplitude to add to the phase gradients resolution = self.fov/self.nPoints rfExAmp = self.rfExFA/(self.rfExTime*1e6*hw.b1Efficiency)*np.pi/180 rfReAmp = self.rfReFA/(self.rfReTime*1e6*hw.b1Efficiency)*np.pi/180 self.mapVals['rfExAmp'] = rfExAmp self.mapVals['rfReAmp'] = rfReAmp self.mapVals['resolution'] = resolution self.mapVals['gradRiseTime'] = gradRiseTime self.mapVals['randFactor'] = randFactor self.mapVals['addRdPoints'] = addRdPoints self.mapVals['larmorFreq'] = hw.larmorFreq + self.freqOffset if rfExAmp>1 or rfReAmp>1: print("RF amplitude is too high, try with longer RF pulse time.") return(0) # Matrix size nRD = self.nPoints[0]+2*addRdPoints nPH = self.nPoints[1] nSL = self.nPoints[2] n_rd_points_per_train = nRD * self.etl # ETL if etl>nPH if self.etl>nPH: self.etl = nPH # parAcqLines in case parAcqLines = 0 parAcqLines = int(int(self.nPoints[2]*self.parFourierFraction)-self.nPoints[2]/2) self.mapVals['partialAcquisition'] = parAcqLines # BW BW = self.nPoints[0]/self.acqTime*1e-6 # MHz BWov = BW*hw.oversamplingFactor # MHz samplingPeriod = 1/BWov # us # Readout gradient time if self.rdGradTime<self.acqTime: self.rdGradTime = self.acqTime self.mapVals['rdGradTime'] = self.rdGradTime * 1e3 # ms # Phase and slice de- and re-phasing time if self.phGradTime==0 or self.phGradTime>self.echoSpacing/2-self.rfExTime/2-self.rfReTime/2-2*gradRiseTime: self.phGradTime = self.echoSpacing/2-self.rfExTime/2-self.rfReTime/2-2*gradRiseTime self.mapVals['phGradTime'] = self.phGradTime*1e3 # ms # Max gradient amplitude rdGradAmplitude = self.nPoints[0]/(hw.gammaB*self.fov[0]*self.acqTime)*self.axesEnable[0] phGradAmplitude = nPH/(2*hw.gammaB*self.fov[1]*(self.phGradTime+gradRiseTime))*self.axesEnable[1] slGradAmplitude = nSL/(2*hw.gammaB*self.fov[2]*(self.phGradTime+gradRiseTime))*self.axesEnable[2] self.mapVals['rdGradAmplitude'] = rdGradAmplitude self.mapVals['phGradAmplitude'] = phGradAmplitude self.mapVals['slGradAmplitude'] = slGradAmplitude # Readout dephasing amplitude rdDephAmplitude = 0.5*rdGradAmplitude*(gradRiseTime+self.rdGradTime)/(gradRiseTime+self.rdDephTime) self.mapVals['rdDephAmplitude'] = rdDephAmplitude # Phase and slice gradient vector phGradients = np.linspace(-phGradAmplitude,phGradAmplitude,num=nPH,endpoint=False) slGradients = np.linspace(-slGradAmplitude,slGradAmplitude,num=nSL,endpoint=False) # Now fix the number of slices to partailly acquired k-space nSL = (int(self.nPoints[2]/2)+parAcqLines)*self.axesEnable[2]+(1-self.axesEnable[2]) # Add random displacemnt to phase encoding lines for ii in range(nPH): if ii<np.ceil(nPH/2-nPH/20) or ii>np.ceil(nPH/2+nPH/20): phGradients[ii] = phGradients[ii]+randFactor*np.random.randn() kPH = hw.gammaB*phGradients*(gradRiseTime+self.phGradTime) self.mapVals['phGradients'] = phGradients self.mapVals['slGradients'] = slGradients # Set phase vector to given sweep mode ind = self.getIndex(self.etl, nPH, self.sweepMode) self.mapVals['sweepOrder'] = ind phGradients = phGradients[ind] # Get the rotation matrix rot = self.getRotationMatrix() gradAmp = np.array([0.0, 0.0, 0.0]) gradAmp[self.axesOrientation[0]] = 1 gradAmp = np.reshape(gradAmp, (3, 1)) result = np.dot(rot, gradAmp) # Initialize k-vectors k_ph_sl_xyz = np.ones((3, self.nPoints[0]*self.nPoints[1]*nSL))*hw.gammaB*(self.phGradTime+hw.grad_rise_time) k_rd_xyz = np.ones((3, self.nPoints[0]*self.nPoints[1]*nSL))*hw.gammaB # Map the axis to "x", "y", and "z" according ot axesOrientation axes_map = {0: "x", 1: "y", 2: "z"} rd_channel = axes_map.get(self.axesOrientation[0], "") ph_channel = axes_map.get(self.axesOrientation[1], "") sl_channel = axes_map.get(self.axesOrientation[2], "") ############################ # EVENTS ################### ############################ delay_first = pp.make_delay(20e-3 - self.system.rf_dead_time - self.rfExTime/2) # Excitation pulse flip_ex = self.rfExFA * np.pi / 180 rf_ex = pp.make_block_pulse( flip_angle=flip_ex, system=self.system, duration=self.rfExTime, delay=0, phase_offset=0.0, ) # Dephasing gradient gr_preph = pp.make_trapezoid( channel=rd_channel, system=self.system, amplitude=rdDephAmplitude*hw.gammaB, flat_time=self.rdDephTime, delay=0, rise_time=hw.grad_rise_time, ) delay_preph = pp.make_delay(self.echoSpacing / 2 - self.rfExTime / 2 - self.rfReTime / 2 - self.system.rf_dead_time) # Refocusing pulse flip_re = self.rfReFA * np.pi / 180 rf_ref = pp.make_block_pulse( flip_angle=flip_re, system=self.system, duration=self.rfReTime, delay=0, phase_offset=np.pi/2, ) delay_reph = pp.make_delay(self.echoSpacing) # Phase gradient dephasing gr_phase_d = pp.make_trapezoid( channel=ph_channel, system=self.system, amplitude=phGradAmplitude * hw.gammaB, flat_time=self.phGradTime, delay=self.rfReTime + self.system.rf_dead_time, rise_time=hw.grad_rise_time, ) # Phase gradient dephasing gr_slice_d = pp.make_trapezoid( channel=sl_channel, system=self.system, amplitude=slGradAmplitude * hw.gammaB, flat_time=self.phGradTime, delay=self.rfReTime + self.system.rf_dead_time, rise_time=hw.grad_rise_time, ) # Readout gradient delay = self.rfReTime / 2 + self.echoSpacing /2 - self.rdGradTime / 2 - hw.grad_rise_time + \ self.system.rf_dead_time gr_readout = pp.make_trapezoid( channel=rd_channel, system=self.system, amplitude=rdGradAmplitude * hw.gammaB, flat_time=self.rdGradTime, delay=delay, rise_time=hw.grad_rise_time, ) # ADC delay = self.rfReTime / 2 + self.echoSpacing / 2 - (nRD / 2 / BW) * 1e-6 + self.system.rf_dead_time adc = pp.make_adc(num_samples=nRD * hw.oversamplingFactor, dwell = samplingPeriod * 1e-6, delay=delay) # Phase gradient rephasing delay = -self.echoSpacing / 2 + (nRD / 2 / BW) * 1e-6 + self.rfReTime / 2 + self.system.rf_dead_time gr_phase_r = pp.make_trapezoid( channel=ph_channel, system=self.system, amplitude=phGradAmplitude * hw.gammaB, flat_time=self.phGradTime, delay=delay, rise_time=hw.grad_rise_time, ) # Phase gradient rephasing delay = -self.echoSpacing / 2 + (nRD / 2 / BW) * 1e-6 + self.rfReTime / 2 + self.system.rf_dead_time gr_slice_r = pp.make_trapezoid( channel=sl_channel, system=self.system, amplitude=slGradAmplitude * hw.gammaB, flat_time=self.phGradTime, delay=delay, rise_time=hw.grad_rise_time, ) # Delay to complete repetition delay_tr = pp.make_delay(self.repetitionTime - self.rfExTime / 2 - self.echoSpacing / 2 + self.rfReTime / 2 - self.echoSpacing * self.etl) batches = {} def initializeBatch(name="pp_1"): # Instantiate pypulseq sequence object and save it into the batches dictionarly batches[name] = pp.Sequence(self.system) # Set slice and phase gradients to 0 gs_d = pp.scale_grad(gr_slice_d, 0.0) gp_d = pp.scale_grad(gr_phase_d, 0.0) gs_r = pp.scale_grad(gr_slice_r, 0.0) gp_r = pp.scale_grad(gr_phase_r, 0.0) batches[name].add_block(delay_first) # Create dummy pulses for dummy in range(self.dummyPulses): # Add excitation pulse and readout de-phasing gradient batches[name].add_block(rf_ex) batches[name].add_block(gr_preph, delay_preph) # Add echo train for k_echo in range(self.etl): batches[name].add_block(rf_ref, delay_reph, gp_d, gs_d, gr_readout) batches[name].add_block(gs_r, gp_r) # Add time delay to next repetition batches[name].add_block(delay_tr) def createBatches(): n_rd_points = 0 n_rd_points_dict = {} seq_idx = 0 seq_num = "batch_0" # Slice sweep for Cz in range(nSL): # Get slice gradient amplitude Nph_range = range(nPH) # Phase sweep for Cy in Nph_range: # Initialize new sequence with corresponding dummy pulses if seq_idx == 0 or n_rd_points + n_rd_points_per_train > hw.maxRdPoints: seq_idx += 1 n_rd_points_dict[seq_num] = n_rd_points seq_num = "batch_%i" % seq_idx initializeBatch(seq_num) n_rd_points = 0 # Fix the phase and slice amplitude sl_scale = (Cz - nSL / 2) / nSL * 2 pe_scale = (Cy - nPH / 2) / nPH * 2 gs_d = pp.scale_grad(gr_slice_d, sl_scale) gp_d = pp.scale_grad(gr_phase_d, pe_scale) gs_r = pp.scale_grad(gr_slice_r, -sl_scale) gp_r = pp.scale_grad(gr_phase_r, -pe_scale) # Add excitation pulse and readout de-phasing gradient batches[seq_num].add_block(rf_ex) batches[seq_num].add_block(gr_preph, delay_preph) # Add the echo train for k_echo in range(self.etl): # Add refocusing pulse batches[seq_num].add_block(rf_ref, delay_reph, gp_d, gs_d, gr_readout, adc) batches[seq_num].add_block(gs_r, gp_r) n_rd_points += nRD # Add time delay to next repetition batches[seq_num].add_block(delay_tr) # Get the rd point list n_rd_points_dict.pop('batch_0') n_rd_points_dict[seq_num] = n_rd_points # Check whether the timing of the sequence is correct (ok, error_report) = batches[seq_num].check_timing() if ok: print("Timing check passed successfully") else: print("Timing check failed. Error listing follows:") [print(e) for e in error_report] # Write the sequence files waveforms = {} for seq_num in batches.keys(): batches[seq_num].write(seq_num + ".seq") waveforms[seq_num], param_dict = self.flo_interpreter.interpret(seq_num + ".seq") return waveforms, n_rd_points_dict # Create the batches waveforms, n_readouts = createBatches() self.mapVals['n_readouts'] = list(n_readouts.values()) self.mapVals['n_batches'] = len(n_readouts.values()) scan_time = (nPH * nSL + self.mapVals['n_batches'] * self.dummyPulses) * self.repetitionTime * self.nScans self.mapVals['Scan time (s)'] = scan_time # Execute the batches data_over = [] # To save oversampled data for seq_num in waveforms.keys(): # Initialize the experiment if not self.demo: self.expt = ex.Experiment(lo_freq=hw.larmorFreq + self.freqOffset * 1e-6, # MHz rx_t=samplingPeriod, # us init_gpa=init_gpa, gpa_fhdo_offset_time=(1 / 0.2 / 3.1), auto_leds=True ) sampling_period = self.expt.get_rx_ts()[0] # us bw = 1 / sampling_period / hw.oversamplingFactor # MHz sampling_time = sampling_period * nRD * 1e-6 # s print("Acquisition bandwidth fixed to: %0.3f kHz" % (bw * 1e3)) else: # To be fixed in demo case sampling_period = samplingPeriod * 1e6 # us bw = 1 / sampling_period / hw.oversamplingFactor # MHz sampling_time = nRD / bw * 1e-6 # s self.mapVals['Bandwidth (Hz)'] = bw * 1e6 # Hz self.mapVals['Sampling time (s)'] = sampling_time self.mapVals['Larmor Frequency (Hz)'] = hw.larmorFreq # Save the waveforms into the mriBlankSeq dictionaries self.pypulseq2mriblankseq(waveforms=waveforms[seq_num], shimming=self.shimming) # Load the waveforms into the red pitaya if not self.demo: if self.floDict2Exp(): print("Sequence waveforms loaded successfully") pass else: print("ERROR: sequence waveforms out of hardware bounds") return False # Run the experiment or plot the sequence if not plotSeq: for scan in range(self.nScans): print("Scan %i, batch %s/%i running..." % ((scan + 1), seq_num[-1], len(n_readouts.values()))) acq_points = 0 nn = n_readouts[seq_num] * hw.oversamplingFactor while acq_points != n_readouts[seq_num] * hw.oversamplingFactor: if not self.demo: rxd, msgs = self.expt.run() else: rxd = {'rx0': np.random.randn(nn) + 1j * np.random.randn(nn)} acq_points = np.size([rxd['rx0']]) data_over = np.concatenate((data_over, rxd['rx0']), axis=0) print("Acquired points = %i" % acq_points) print("Expected points = %i" % (n_readouts[seq_num] * hw.oversamplingFactor)) print("Scan %i ready!" % (scan + 1)) elif plotSeq and standalone: self.sequencePlot(standalone=standalone) return True # Close the experiment if not self.demo: self.expt.__del__() # Process data to be plotted if not plotSeq: self.mapVals['data_over'] = data_over data_full = sig.decimate(data_over, hw.oversamplingFactor, ftype='fir', zero_phase=True) self.mapVals['data_full'] = data_full return True
[docs] def sequenceAnalysis(self, mode=None): self.mode = mode # Get data data_full = self.mapVals['data_full'] nRD, nPH, nSL = self.nPoints nRD = nRD + 2 * hw.addRdPoints n_batches = self.mapVals['n_batches'] # Reorganize data_full data_prov = np.zeros([self.nScans, nRD * nPH * nSL * self.etl], dtype=complex) if n_batches > 1: n_rds = self.mapVals['n_readouts'] data_full_a = data_full[0:sum(n_rds[0:-1]) * self.nScans] data_full_b = data_full[sum(n_rds[0:-1]) * self.nScans:] data_full_a = np.reshape(data_full_a, newshape=(n_batches - 1, self.nScans, -1, nRD)) data_full_b = np.reshape(data_full_b, newshape=(1, self.nScans, -1, nRD)) for scan in range(self.nScans): data_scan_a = np.reshape(data_full_a[:, scan, :, :], -1) data_scan_b = np.reshape(data_full_b[:, scan, :, :], -1) data_prov[scan, :] = np.concatenate((data_scan_a, data_scan_b), axis=0) else: data_full = np.reshape(data_full, (1, self.nScans, -1, nRD)) for scan in range(self.nScans): data_prov[scan, :] = np.reshape(data_full[:, scan, :, :], -1) data_full = np.reshape(data_prov, -1) # Average data data_full = np.reshape(data_full, newshape=(self.nScans, -1)) data = np.average(data_full, axis=0) self.mapVals['data'] = data # Generate different k-space data data_ind = np.zeros(shape=(self.etl, nSL, nPH, nRD), dtype=complex) data = np.reshape(data, newshape=(nSL, nPH, self.etl, nRD)) for echo in range(self.etl): data_ind[echo, :, :, :] = np.squeeze(data[:, :, echo, :]) # Remove added data in readout direction data_ind = data_ind[:, :, :, hw.addRdPoints: nRD - hw.addRdPoints] self.mapVals['kSpace'] = data_ind # Get images image_ind = np.zeros_like(data_ind) for echo in range(self.etl): image_ind[echo] = np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(data_ind[echo]))) self.mapVals['iSpace'] = image_ind # Prepare data to plot (plot central slice) axes_dict = {'x': 0, 'y': 1, 'z': 2} axes_keys = list(axes_dict.keys()) axes_vals = list(axes_dict.values()) axes_str = ['', '', ''] n = 0 for val in self.axesOrientation: index = axes_vals.index(val) axes_str[n] = axes_keys[index] n += 1 # Normalize image k_space = np.zeros((self.etl * nSL, nPH, nRD - 2 * hw.addRdPoints)) image = np.zeros((self.etl * nSL, nPH, nRD - 2 * hw.addRdPoints)) n = 0 for slice in range(nSL): for echo in range(self.etl): k_space[n, :, :] = np.abs(data_ind[echo, slice, :, :]) image[n, :, :] = np.abs(image_ind[echo, slice, :, :]) n += 1 image = image / np.max(image) * 100 imageOrientation_dicom = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] if not self.unlock_orientation: # Image orientation pass if self.axesOrientation[2] == 2: # Sagittal title = "Sagittal" if self.axesOrientation[0] == 0 and self.axesOrientation[1] == 1: # OK image = np.flip(image, axis=2) image = np.flip(image, axis=1) k_space = np.flip(k_space, axis=2) k_space = np.flip(k_space, axis=1) x_label = "(-Y) A | PHASE | P (+Y)" y_label = "(-X) I | READOUT | S (+X)" imageOrientation_dicom = [0.0, 1.0, 0.0, 0.0, 0.0, -1.0] else: image = np.transpose(image, (0, 2, 1)) image = np.flip(image, axis=2) image = np.flip(image, axis=1) k_space = np.transpose(k_space, (0, 2, 1)) k_space = np.flip(k_space, axis=2) k_space = np.flip(k_space, axis=1) x_label = "(-Y) A | READOUT | P (+Y)" y_label = "(-X) I | PHASE | S (+X)" imageOrientation_dicom = [0.0, 1.0, 0.0, 0.0, 0.0, -1.0] elif self.axesOrientation[2] == 1: # Coronal title = "Coronal" if self.axesOrientation[0] == 0 and self.axesOrientation[1] == 2: # OK image = np.flip(image, axis=2) image = np.flip(image, axis=1) image = np.flip(image, axis=0) k_space = np.flip(k_space, axis=2) k_space = np.flip(k_space, axis=1) k_space = np.flip(k_space, axis=0) x_label = "(+Z) R | PHASE | L (-Z)" y_label = "(-X) I | READOUT | S (+X)" imageOrientation_dicom = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0] else: image = np.transpose(image, (0, 2, 1)) image = np.flip(image, axis=2) image = np.flip(image, axis=1) image = np.flip(image, axis=0) k_space = np.transpose(k_space, (0, 2, 1)) k_space = np.flip(k_space, axis=2) k_space = np.flip(k_space, axis=1) k_space = np.flip(k_space, axis=0) x_label = "(+Z) R | READOUT | L (-Z)" y_label = "(-X) I | PHASE | S (+X)" imageOrientation_dicom = [1.0, 0.0, 0.0, 0.0, 0.0, -1.0] elif self.axesOrientation[2] == 0: # Transversal title = "Transversal" if self.axesOrientation[0] == 1 and self.axesOrientation[1] == 2: image = np.flip(image, axis=2) image = np.flip(image, axis=1) k_space = np.flip(k_space, axis=2) k_space = np.flip(k_space, axis=1) x_label = "(+Z) R | PHASE | L (-Z)" y_label = "(+Y) P | READOUT | A (-Y)" imageOrientation_dicom = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] else: # OK image = np.transpose(image, (0, 2, 1)) image = np.flip(image, axis=2) image = np.flip(image, axis=1) k_space = np.transpose(k_space, (0, 2, 1)) k_space = np.flip(k_space, axis=2) k_space = np.flip(k_space, axis=1) x_label = "(+Z) R | READOUT | L (-Z)" y_label = "(+Y) P | PHASE | A (-Y)" imageOrientation_dicom = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] else: x_label = "%s axis" % axes_str[1] y_label = "%s axis" % axes_str[0] title = "Image" result1 = {'widget': 'image', 'data': image, 'xLabel': x_label, 'yLabel': y_label, 'title': title, 'row': 0, 'col': 0} result2 = {'widget': 'image', 'data': np.log10(k_space), 'xLabel': x_label, 'yLabel': y_label, 'title': "k_space", 'row': 0, 'col': 1} # Dicom tags image_DICOM = np.transpose(image, (0, 2, 1)) slices, rows, columns = image_DICOM.shape self.meta_data["Columns"] = columns self.meta_data["Rows"] = rows self.meta_data["NumberOfSlices"] = slices self.meta_data["NumberOfFrames"] = slices img_full_abs = np.abs(image_DICOM) * (2 ** 15 - 1) / np.amax(np.abs(image_DICOM)) img_full_int = np.int16(np.abs(img_full_abs)) img_full_int = np.reshape(img_full_int, newshape=(slices, rows, columns)) arr = img_full_int self.meta_data["PixelData"] = arr.tobytes() self.meta_data["WindowWidth"] = 26373 self.meta_data["WindowCenter"] = 13194 self.meta_data["ImageOrientationPatient"] = imageOrientation_dicom resolution = self.mapVals['resolution'] * 1e3 self.meta_data["PixelSpacing"] = [resolution[0], resolution[1]] self.meta_data["SliceThickness"] = resolution[2] # Sequence parameters self.meta_data["RepetitionTime"] = self.mapVals['repetitionTime'] self.meta_data["EchoTime"] = self.mapVals['echoSpacing'] self.meta_data["EchoTrainLength"] = self.mapVals['etl'] # create self.out to run in iterative mode self.output = [result1, result2] # save data once self.output is created self.saveRawData() # Plot result in standalone execution if self.mode == 'Standalone': self.plotResults() return self.output
if __name__ == "__main__": # main(plot=True, write_seq=True) seq = MSE() seq.sequenceAtributes() seq.sequenceRun(plotSeq=False, demo=True, standalone=True) seq.sequenceAnalysis(mode='standalone') seq.plotResults()