# -*- coding: utf-8 -*-
"""
Created on Sat Nov 13 13:45:05 2021
@author: José Miguel Algarín Guisado
MRILAB @ I3M
"""
import os
import sys
#*****************************************************************************
# Get the directory of the current script
main_directory = os.path.dirname(os.path.realpath(__file__))
parent_directory = os.path.dirname(main_directory)
parent_directory = os.path.dirname(parent_directory)
# Define the subdirectories you want to add to sys.path
subdirs = ['MaRGE', 'marcos_client']
# Add the subdirectories to sys.path
for subdir in subdirs:
full_path = os.path.join(parent_directory, subdir)
sys.path.append(full_path)
#******************************************************************************
import numpy as np
import experiment as ex
import scipy.signal as sig
import configs.hw_config as hw # Import the scanner hardware config
import seq.mriBlankSeq as blankSeq # Import the mriBlankSequence for any new sequence.
import pyqtgraph as pg
import configs.units as units
from datetime import date
from datetime import datetime
import ismrmrd
import ismrmrd.xsd
import datetime
import ctypes
import matplotlib
import xml.etree.ElementTree as ET
from scipy.io import loadmat
[docs]
class GRE3D(blankSeq.MRIBLANKSEQ):
def __init__(self):
super(GRE3D, self).__init__()
# Input the parameters
self.addParameter(key='seqName', string='GRE3DInfo', val='GRE3D')
self.addParameter(key='nScans', string='Number of scans', val=2, field='IM')
self.addParameter(key='freqOffset', string='Larmor frequency offset (kHz)', val=0.0, units=units.kHz,
field='RF')
self.addParameter(key='rfExFA', string='Excitation flip angle (º)', val=90, field='RF')
self.addParameter(key='rfExTime', string='RF excitation time (us)', val=30.0, units=units.us, field='RF')
self.addParameter(key='echo_time', string='Echo time (ms)', val=4.0, units=units.ms, field='SEQ')
self.addParameter(key='repetition_time', string='Repetition time (ms)', val=500., units=units.ms, field='SEQ')
self.addParameter(key='fov', string='FOV[x,y,z] (cm)', val=[15.0, 15.0, 15.0], units=units.cm, field='IM')
self.addParameter(key='dfov', string='dFOV[x,y,z] (mm)', val=[0.0, 0.0, 0.0], units=units.mm, field='IM',
tip="Position of the gradient isocenter")
self.addParameter(key='nPoints', string='nPoints (rd, ph, sl)', val=[60, 60, 2], field='IM')
self.addParameter(key='angle', string='Angle (º)', val=0.0, field='IM')
self.addParameter(key='rotationAxis', string='Rotation axis', val=[0, 0, 1], field='IM')
self.addParameter(key='acq_time', string='Acquisition time (ms)', val=1.0, units=units.ms, field='SEQ')
self.addParameter(key='axesOrientation', string='Axes[rd,ph,sl]', val=[0, 1, 2], field='IM',
tip="0=x, 1=y, 2=z")
self.addParameter(key='axesEnable', string='Axes enable', val=[1, 1, 0],
tip="Use 0 for directions with matrix size 1, use 1 otherwise.")
self.addParameter(key='rdGradTime', string='Rd gradient time (ms)', val=1.0, units=units.ms, field='OTH')
self.addParameter(key='dephGradTime', string='Rd dephasing time (ms)', val=1.0, units = units.ms, field='OTH')
self.addParameter(key='dummy_pulses', string='Dummy pulses', val=1, field='SEQ')
self.addParameter(key='shimming', string='Shimming (a.u.)', val=[0.0, 0.0, 0.0], units=units.sh, field='OTH')
self.addParameter(key='sl_fraction', string='Partial fourier fraction', val=1.0, field='OTH',
tip="Fraction of k planes aquired in slice direction")
self.addParameter(key='mode', string='Sequence mode', val=0, field='SEQ',
tip='0: normal, 1: rf spoiler, 2: gradient spoiler, 3: rf and gradient spoilre, 4: balanced.')
self.addParameter(key='spoiler_order', string='Gradient spoiler order', val=3, field='SEQ',
tip='Higher orders will require longer repetition times')
# self.addParameter(key='freqCal', string='Calibrate frequency', val=1, field='OTH',
# tip="0 to not calibrate, 1 to calibrate")
self.addParameter(key='unlock_orientation', string='Unlock image orientation', val=0, field='OTH',
tip='0: Images oriented according to standard. 1: Image raw orientation')
self.acq = ismrmrd.Acquisition()
self.img = ismrmrd.Image()
self.header= ismrmrd.xsd.ismrmrdHeader()
# ******************************************************************************************************************
# ******************************************************************************************************************
# ******************************************************************************************************************
[docs]
def sequenceInfo(self):
print("3D GRE sequence")
print("Author: Dr. J.M. Algarín")
print("Contact: josalggui@i3m.upv.es")
print("mriLab @ i3M, CSIC, Spain \n")
[docs]
def sequenceTime(self):
nScans = self.mapVals['nScans']
nPoints = np.array(self.mapVals['nPoints'])
repetition_time = self.mapVals['repetition_time']
return(nPoints[1]*nPoints[2]*repetition_time*1e-3*nScans/60) # minutes, scanTime
[docs]
def sequenceAtributes(self):
super().sequenceAtributes()
# Conversion of variables to non-multiplied units
self.angle = self.angle * np.pi / 180 # rads
# Add rotation, dfov and fov to the history
self.rotation = self.rotationAxis.tolist()
self.rotation.append(self.angle)
self.rotations.append(self.rotation)
self.dfovs.append(self.dfov.tolist())
self.fovs.append(self.fov.tolist())
[docs]
def sequenceRun(self, plotSeq=0, demo=False):
init_gpa = False # Starts the gpa
self.demo = demo
# Set the fov
self.dfov = self.getFovDisplacement()
self.dfov = self.dfov[self.axesOrientation]
self.fov = self.fov[self.axesOrientation]
# Check for used axes
axesEnable = []
for ii in range(3):
if self.nPoints[ii] == 1:
axesEnable.append(0)
else:
axesEnable.append(1)
self.mapVals['axesEnable'] = axesEnable
# Miscellaneous
resolution = self.fov/self.nPoints
rfExAmp = self.rfExFA / (self.rfExTime * 1e6 * hw.b1Efficiency) * np.pi / 180
for ii in range(3):
if self.nPoints[ii]==1: axesEnable[ii] = 0
self.mapVals['resolution'] = resolution
self.mapVals['grad_rise_time'] = hw.grad_rise_time
self.mapVals['addRdPoints'] = hw.addRdPoints
# Matrix size
n_rd = self.nPoints[0]+2*hw.addRdPoints
n_ph = self.nPoints[1]
n_sl = self.nPoints[2]
# par_acq_lines in case par_acq_lines = 0
par_acq_lines = int(int(self.nPoints[2] * self.sl_fraction) - self.nPoints[2] / 2)
self.mapVals['partial_acquisition'] = par_acq_lines
# bw
bw = self.nPoints[0] / self.acq_time * 1e-6 # MHz
bw_ov = bw * hw.oversamplingFactor # MHz
sampling_period = 1 / bw_ov # us
# Check if dephasing grad time is ok
max_deph_grad_time = self.echo_time-0.5*(self.rfExTime+self.rdGradTime)-3*hw.grad_rise_time
if self.dephGradTime==0 or self.dephGradTime>max_deph_grad_time:
self.dephGradTime = max_deph_grad_time*1
# Max gradient amplitude
rd_grad_amplitude = self.nPoints[0]/(hw.gammaB*self.fov[0]*self.acq_time)
rd_deph_amplitude = -rd_grad_amplitude*(self.rdGradTime+hw.grad_rise_time)/(2*(self.dephGradTime+hw.grad_rise_time))
ph_grad_amplitude = n_ph/(2*hw.gammaB*self.fov[1]*(self.dephGradTime+hw.grad_rise_time))*axesEnable[1]
sl_grad_amplitude = n_sl/(2*hw.gammaB*self.fov[2]*(self.dephGradTime+hw.grad_rise_time))*axesEnable[2]
self.mapVals['rd_grad_amplitude'] = rd_grad_amplitude
self.mapVals['rd_deph_amplitude'] = rd_deph_amplitude
self.mapVals['ph_grad_amplitude'] = ph_grad_amplitude
self.mapVals['sl_grad_amplitude'] = sl_grad_amplitude
# Phase and slice gradient vector
ph_gradients = np.linspace(-ph_grad_amplitude,ph_grad_amplitude,num=n_ph,endpoint=False)
sl_gradients = np.linspace(-sl_grad_amplitude,sl_grad_amplitude,num=n_sl,endpoint=False)
self.mapVals['ph_gradients'] = ph_gradients
self.mapVals['sl_gradients'] = sl_gradients
# Now fix the number of slices to partailly acquired k-space
if self.nPoints[2]==1:
n_sl = 1
else:
n_sl = int(self.nPoints[2]/2)+par_acq_lines
n_repetitions = n_ph*n_sl
# Get the rotation matrix
rot = self.getRotationMatrix()
gradAmp = np.array([0.0, 0.0, 0.0])
gradAmp[self.axesOrientation[0]] = 1
gradAmp = np.reshape(gradAmp, (3, 1))
result = np.dot(rot, gradAmp)
print("Readout direction:")
print(np.reshape(result, (1, 3)))
# Initialize k-vectors
k_ph_sl_xyz = np.ones((3, self.nPoints[0] * self.nPoints[1] * n_sl)) * hw.gammaB * (
self.dephGradTime + hw.grad_rise_time)
k_rd_xyz = np.ones((3, self.nPoints[0] * self.nPoints[1] * n_sl)) * hw.gammaB
# Changing time parameters to us
self.rfExTime *= 1e6
self.echo_time *= 1e6
self.repetition_time *= 1e6
grad_rise_time = hw.grad_rise_time*1e6
self.dephGradTime *= 1e6
self.rdGradTime *= 1e6
self.acq_time *= 1e6
self.mapVals['scanTime'] = n_repetitions*self.repetition_time*1e-6
# Create sequence instructions
def createSequence(ph_index=0, sl_index=0, ln_index=0, repe_index_global=0):
repe_index = 0
acq_points = 0
orders = 0
# check in case of dummy pulse filling the cache
if(self.dummy_pulses>0 and n_rd*2>hw.maxRdPoints) or (self.dummy_pulses==0 and n_rd>hw.maxRdPoints):
print('ERROR: Too many acquired points.')
return 0
# Set shimming
self.iniSequence(20, self.shimming)
# Run sequence batch
while acq_points+n_rd<=hw.maxRdPoints and orders<=hw.maxOrders and repe_index_global<n_repetitions:
# Initialize time
t_ex = 20e3+self.repetition_time*repe_index
# First I do a noise measurement
if repe_index==0:
t0 = t_ex-4*self.acq_time
self.rxGate(t0, self.acq_time+2*hw.addRdPoints/bw)
acq_points += n_rd
# Excitation pulse
t0 = t_ex-hw.blkTime-self.rfExTime/2
if self.mode==1 or self.mode==3: # rf spoiling
self.rfRecPulse(t0, self.rfExTime, rfExAmp * np.exp(1j * 117 * np.pi / 180 * repe_index))
elif self.mode==4: # balanced
self.rfRecPulse(t0, self.rfExTime, rfExAmp * np.exp(1j * np.pi/2 * (1 + (-1) ** repe_index)))
elif self.mode==0 or self.mode==2:
self.rfRecPulse(t0, self.rfExTime, rfExAmp)
# Dephasing gradient
if repe_index>=self.dummy_pulses:
grad_amp_deph = np.array([0.0, 0.0, 0.0])
grad_amp_deph[self.axesOrientation[0]] = rd_deph_amplitude
grad_amp_deph[self.axesOrientation[1]] = ph_gradients[ph_index]
grad_amp_deph[self.axesOrientation[2]] = sl_gradients[sl_index]
grad_amp_deph = np.dot(rot, np.reshape(grad_amp_deph, (3, 1)))
t0 = t_ex+self.rfExTime/2-hw.gradDelay
self.gradTrap(t0, grad_rise_time, self.dephGradTime, grad_amp_deph[0], hw.grad_steps, 0, self.shimming)
self.gradTrap(t0, grad_rise_time, self.dephGradTime, grad_amp_deph[1], hw.grad_steps, 1, self.shimming)
self.gradTrap(t0, grad_rise_time, self.dephGradTime, grad_amp_deph[2], hw.grad_steps, 2, self.shimming)
orders = orders+hw.grad_steps*6
# get k-point
k_ph_sl_xyz[:, self.nPoints[0] * ln_index:self.nPoints[0] * (ln_index + 1)] = \
np.diag(np.reshape(grad_amp_deph, -1)) @ \
k_ph_sl_xyz[:, self.nPoints[0] * ln_index:self.nPoints[0] * (ln_index + 1)]
# Rephasing readout gradient
if repe_index>=self.dummy_pulses:
grad_amp_reph = np.array([0.0, 0.0, 0.0])
grad_amp_reph[self.axesOrientation[0]] = rd_grad_amplitude
grad_amp_reph = np.dot(rot, np.reshape(grad_amp_reph, (3, 1)))
t0 = t_ex + self.echo_time - self.rdGradTime / 2 - grad_rise_time - hw.gradDelay
orders = orders + hw.grad_steps * 6
if self.mode==0 or self.mode==1 or self.mode==4: # normal, only rf spoiler, or balanced
rd_grad_time = self.rdGradTime
elif self.mode==2 or self.mode==3: # gradient spoiler
rd_grad_time = 0.5*(self.rdGradTime-grad_rise_time)+self.acq_time*self.spoiler_order
self.gradTrap(t0, grad_rise_time, rd_grad_time, grad_amp_reph[0], hw.grad_steps, 0,
self.shimming)
self.gradTrap(t0, grad_rise_time, rd_grad_time, grad_amp_reph[1], hw.grad_steps, 1,
self.shimming)
self.gradTrap(t0, grad_rise_time, rd_grad_time, grad_amp_reph[2], hw.grad_steps, 2,
self.shimming)
# Rx gate
if repe_index>=self.dummy_pulses:
t0 = t_ex+self.echo_time-self.acq_time/2-hw.addRdPoints/bw
self.rxGate(t0, self.acq_time+2*hw.addRdPoints/bw)
acq_points += n_rd
# get k-point
k_rd_xyz[:, self.nPoints[0] * ln_index:self.nPoints[0] * (ln_index + 1)] = \
np.diag(np.reshape(gradAmp, -1)) @ \
k_rd_xyz[:, self.nPoints[0] * ln_index:self.nPoints[0] * (ln_index + 1)] @ \
np.diag(self.time_vector)
# gradient balance
if repe_index>=self.dummy_pulses and (self.mode==2 or self.mode==3 or self.mode==4):
grad_amp_bala = np.array([0.0, 0.0, 0.0])
if self.mode==4: # bSSFP
grad_amp_bala[self.axesOrientation[0]] = rd_deph_amplitude
grad_amp_bala[self.axesOrientation[1]] = -ph_gradients[ph_index]
grad_amp_bala[self.axesOrientation[2]] = -sl_gradients[sl_index]
grad_amp_bala = np.dot(rot, np.reshape(grad_amp_bala, (3, 1)))
t0 = t_ex + self.echo_time - self.rdGradTime / 2 + rd_grad_time + grad_rise_time - hw.gradDelay
self.gradTrap(t0, grad_rise_time, self.dephGradTime, grad_amp_bala[0], hw.grad_steps, 0,
self.shimming)
self.gradTrap(t0, grad_rise_time, self.dephGradTime, grad_amp_bala[1], hw.grad_steps, 1,
self.shimming)
self.gradTrap(t0, grad_rise_time, self.dephGradTime, grad_amp_bala[2], hw.grad_steps, 2,
self.shimming)
# Update the phase and slice gradient
if repe_index>=self.dummy_pulses:
ln_index += 1
if ph_index == n_ph-1:
ph_index = 0
sl_index += 1
else:
ph_index += 1
if repe_index>=self.dummy_pulses: repe_index_global += 1 # Update the global repe_index
repe_index+=1 # Update the repe_index after the ETL
# Turn off the gradients after the end of the batch
self.endSequence(repe_index*self.repetition_time)
# Return the output variables
return(ph_index, sl_index, ln_index, repe_index_global, acq_points)
# Calibrate frequency
# if self.freqCal and (not plotSeq) and (not self.demo):
# hw.larmorFreq = self.freqCalibration(bw=0.05)
# hw.larmorFreq = self.freqCalibration(bw=0.005)
self.mapVals['larmorFreq'] = hw.larmorFreq
# Initialize the experiment
data_full = []
over_data = []
noise = []
n_batches = 0
repe_index_array = np.array([0])
repe_index_global = repe_index_array[0]
ph_index = 0
sl_index = 0
ln_index = 0
acq_points_per_batch = []
while repe_index_global<n_repetitions:
n_batches += 1
if not demo:
self.expt = ex.Experiment(lo_freq=hw.larmorFreq + self.freqOffset * 1e-6, rx_t=sampling_period,
init_gpa=init_gpa, gpa_fhdo_offset_time=(1 / 0.2 / 3.1))
sampling_period = self.expt.get_rx_ts()[0]
bw = 1/sampling_period/hw.oversamplingFactor # MHz
# Time vector for main points
self.time_vector = np.linspace(-self.nPoints[0] / bw / 2 + 0.5 / bw, self.nPoints[0] / bw / 2 - 0.5 / bw,
self.nPoints[0]) * 1e-6 # s
self. acq_time = self.nPoints[0]/bw # us
self.mapVals['bw'] = bw
ph_index, sl_index, ln_index, repe_index_global, aa = createSequence(ph_index=ph_index,
sl_index=sl_index,
ln_index=ln_index,
repe_index_global=repe_index_global)
# Save instructions into MaRCoS if not a demo
if not self.demo:
if self.floDict2Exp(rewrite=n_batches == 1):
print("Sequence waveforms loaded successfully")
pass
else:
print("ERROR: sequence waveforms out of hardware bounds")
return False
repe_index_array = np.concatenate((repe_index_array, np.array([repe_index_global-1])), axis=0)
acq_points_per_batch.append(aa)
# Execute current batch nScans times
for ii in range(self.nScans):
if not plotSeq:
print("Batch %i, scan %i running..." % (n_batches, ii+1))
if not self.demo:
acq_points = 0
while acq_points != (aa * hw.oversamplingFactor):
rxd, msgs = self.expt.run()
rxd['rx0'] = rxd['rx0'] * hw.adcFactor # Here I normalize to get the result in mV
acq_points = np.size(rxd['rx0'])
print("Acquired points = %i" % acq_points)
print("Expected points = %i" % (aa * hw.oversamplingFactor))
print("Batch %i, scan %i ready!")
else:
rxd = {}
rxd['rx0'] = np.random.randn(aa * hw.oversamplingFactor) + 1j * np.random.randn(
aa * hw.oversamplingFactor)
print("Batch %i, scan %i ready!" % (n_batches, ii + 1))
# Get noise data
noise = np.concatenate((noise, rxd['rx0'][0:n_rd * hw.oversamplingFactor]), axis=0)
rxd['rx0'] = rxd['rx0'][n_rd * hw.oversamplingFactor::]
# Get data
over_data = np.concatenate((over_data, rxd['rx0']), axis=0)
if not demo: self.expt.__del__()
del aa
if not plotSeq:
acq_points_per_batch = (acq_points_per_batch-n_rd)*self.nScans
self.mapVals['noise_data'] = noise
self.mapVals['over_data'] = over_data
# Generate data_full
data_full = sig.decimate(over_data, hw.oversamplingFactor, ftype='fir', zero_phase=True) ##size 4800 = 60*(60+2*addrdpoints)
if n_batches>1:
data_full_a = data_full[0:sum(acq_points_per_batch[0:-1])]
data_full_b = data_full[sum(acq_points_per_batch[0:-1])::]
# Subtract phase in case of rf spoling or balanced
if n_batches>1:
if self.mode==1 or self.mode==3: # rf spoiling
data_full_a = np.reshape(data_full_a, ((n_batches - 1) * self.nScans, -1, n_rd))
for repe_index in range(np.size(data_full_a, 1)):
data_full_a[:, repe_index, :] *= np.exp(-1j * 117 * np.pi / 180 * repe_index)
data_full_a = np.reshape(data_full_a, -1)
data_full_b = np.reshape(data_full_b, (self.nScans, -1, n_rd))
for repe_index in range(np.size(data_full_b, 1)):
data_full_b[:, repe_index, :] *= np.exp(-1j * 117 * np.pi / 180 * repe_index)
data_full_b = np.reshape(data_full_b, -1)
if self.mode==4:
data_full_a = np.reshape(data_full_a, ((n_batches - 1) * self.nScans, -1, n_rd))
for repe_index in range(np.size(data_full_a, 1)):
data_full_a[:, repe_index, :] *= np.exp(-1j * np.pi / 2 *(1 + (-1) ** repe_index))
data_full_a = np.reshape(data_full_a, -1)
data_full_b = np.reshape(data_full_b, ((n_batches - 1) * self.nScans, -1, n_rd))
for repe_index in range(np.size(data_full_b, 1)):
data_full_b[:, repe_index, :] *= np.exp(-1j * np.pi / 2 * (1 + (-1) ** repe_index))
data_full_b = np.reshape(data_full_b, -1)
else:
if self.mode==1 or self.mode==3: # rf spoiling
data_full = np.reshape(data_full, (n_batches * self.nScans, -1, n_rd))
for repe_index in range(np.size(data_full, 1)):
data_full[:, repe_index, :] *= np.exp(-1j * 117 * np.pi / 180 * repe_index)
data_full = np.reshape(data_full, -1)
if self.mode==4:
data_full = np.reshape(data_full, (n_batches * self.nScans, -1, n_rd))
for repe_index in range(np.size(data_full, 1)):
data_full[:, repe_index, :] *= np.exp(-1j * np.pi / 2 * (1 + (-1) ** repe_index))
data_full = np.reshape(data_full, -1)
# Reorganize data_full
data_prov = np.zeros([self.nScans,n_sl*n_ph*n_rd], dtype=complex)
if n_batches>1:
data_full_a = np.reshape(data_full_a, (n_batches-1, self.nScans, -1, n_rd))
data_full_b = np.reshape(data_full_b, (1, self.nScans, -1, n_rd))
else:
data_full = np.reshape(data_full, (n_batches, self.nScans, -1, n_rd))
for scan in range(self.nScans):
if n_batches>1:
data_prov[scan, :] = np.concatenate((np.reshape(data_full_a[:,ii,:,:],-1), np.reshape(data_full_b[:,ii,:,:],-1)), axis=0)
else:
data_prov[scan, :] = np.reshape(data_full[:,scan,:,:],-1)
# Save data_full to save it in .h5
self.data_full_mat=data_full
data_full = np.reshape(data_prov,-1)
# Get index for krd = 0
# Average data
data_prov = np.reshape(data_full, (self.nScans, n_rd*n_ph*n_sl))
data_prov = np.average(data_prov, axis=0)
data_prov = np.reshape(data_prov, (n_sl, n_ph, n_rd))
# Check where is krd = 0
data_prov = data_prov[int(self.nPoints[2]/2), int(n_ph/2), :]
indkrd0 = np.argmax(np.abs(data_prov))
if indkrd0 < n_rd/2-hw.addRdPoints or indkrd0 > n_rd/2+hw.addRdPoints:
indkrd0 = int(n_rd/2)
# Get individual images
data_full = np.reshape(data_full, (self.nScans, n_sl, n_ph, n_rd))
data_full = data_full[:, :, :, indkrd0-int(self.nPoints[0]/2):indkrd0+int(self.nPoints[0]/2)]
img_full = data_full*0
for scan in range(self.nScans):
img_full[scan, :, :, :] = np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(data_full[scan, :, :, :])))
self.mapVals['data_full'] = data_full
self.mapVals['img_full'] = img_full
# Average data
data = np.average(data_full, axis=0)
# Concatenate with k_xyz
k_xyz = k_ph_sl_xyz + k_rd_xyz
self.mapVals['sampled_xyz'] = np.concatenate((k_xyz.T, np.reshape(data, (n_sl * n_ph * self.nPoints[0], 1))),
axis=1)
data = np.reshape(data, (n_sl, n_ph, self.nPoints[0]))
# Do zero padding
dataTemp = np.zeros((self.nPoints[2], self.nPoints[1], self.nPoints[0]), dtype=complex)
dataTemp[0:n_sl, :, :] = data
data = np.reshape(dataTemp, (1, self.nPoints[0]*self.nPoints[1]*self.nPoints[2]))
# Fix the position of the sample according to dfov
kMax = np.array(self.nPoints)/(2*np.array(self.fov))*np.array(axesEnable)
k_rd = self.time_vector*hw.gammaB*rd_grad_amplitude
k_ph = np.linspace(-kMax[1],kMax[1],num=self.nPoints[1],endpoint=False)
k_sl = np.linspace(-kMax[2],kMax[2],num=self.nPoints[2],endpoint=False)
k_ph, k_sl, k_rd = np.meshgrid(k_ph, k_sl, k_rd)
k_rd = np.reshape(k_rd, (1, self.nPoints[0]*self.nPoints[1]*self.nPoints[2]))
k_ph = np.reshape(k_ph, (1, self.nPoints[0]*self.nPoints[1]*self.nPoints[2]))
k_sl = np.reshape(k_sl, (1, self.nPoints[0]*self.nPoints[1]*self.nPoints[2]))
d_phase = np.exp(-2*np.pi*1j*(self.dfov[0]*k_rd+self.dfov[1]*k_ph+self.dfov[2]*k_sl))
data = np.reshape(data*d_phase, (self.nPoints[2], self.nPoints[1], self.nPoints[0]))
self.mapVals['kSpace3D'] = data
img=np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(data)))
self.mapVals['image3D'] = img
data = np.reshape(data, (1, self.nPoints[0]*self.nPoints[1]*self.nPoints[2]))
# Create sampled data
k_rd = np.reshape(k_rd, (self.nPoints[0]*self.nPoints[1]*self.nPoints[2], 1))
k_ph = np.reshape(k_ph, (self.nPoints[0]*self.nPoints[1]*self.nPoints[2], 1))
k_sl = np.reshape(k_sl, (self.nPoints[0]*self.nPoints[1]*self.nPoints[2], 1))
data = np.reshape(data, (self.nPoints[0]*self.nPoints[1]*self.nPoints[2], 1))
self.mapVals['kMax'] = kMax
self.mapVals['sampled'] = np.concatenate((k_rd, k_ph, k_sl, data), axis=1)
self.mapVals['sampledCartesian'] = self.mapVals['sampled']
data = np.reshape(data, (self.nPoints[2], self.nPoints[1], self.nPoints[0]))
return True
[docs]
def sequenceAnalysis(self, mode=None):
self.mode = mode
axesEnable = self.mapVals['axesEnable']
# Get axes in strings
axes_dict = {'x': 0, 'y': 1, 'z': 2}
axes_keys = list(axes_dict.keys())
axes_vals = list(axes_dict.values())
axes_str = ['', '', '']
n = 0
for val in self.axesOrientation:
index = axes_vals.index(val)
axes_str[n] = axes_keys[index]
n += 1
if (axesEnable[1] == 0 and axesEnable[2] == 0):
bw = self.mapVals['bw']*1e-3 # kHz
t_vector = np.linspace(-self.acq_time/2, self.acq_time/2, self.nPoints[0])*1e-3 # ms
s_vector = self.mapVals['sampled'][:, 3]
f_vector = np.linspace(-bw/2, bw/2, self.nPoints[0])
i_vector = np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(s_vector)))
result1 = {'widget': 'curve',
'xData': t_vector,
'yData': [np.abs(s_vector), np.real(s_vector), np.imag(s_vector)],
'xLabel': 'Time (ms)',
'yLabel': "Signal amplitude (mV)",
'title': "Signal",
'legend': ['Magnitude', 'Real', 'Imaginary'],
'row': 0,
'col': 0}
result2 = {'widget': 'curve',
'xData': f_vector,
'yData': [np.abs(i_vector)],
'xLabel': "Frequency (kHz)",
'yLabel': "Amplitude (a.u.)",
'title': "Spectrum",
'legend': ['Spectrum magnitude'],
'row': 1,
'col': 0}
self.output = [result1, result2]
else:
# Plot image
image = np.abs(self.mapVals['image3D'])
image = image / np.max(np.reshape(image, -1)) * 100
if not self.unlock_orientation: # Image orientation
if self.axesOrientation[2] == 2: # Sagittal
title = "Sagittal"
if self.axesOrientation[0] == 0 and self.axesOrientation[1] == 1: # OK
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
xLabel = "(-Y) A | PHASE | P (+Y)"
yLabel = "(-X) I | READOUT | S (+X)"
else:
image = np.transpose(image, (0, 2, 1))
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
xLabel = "(-Y) A | READOUT | P (+Y)"
yLabel = "(-X) I | PHASE | S (+X)"
elif self.axesOrientation[2] == 1: # Coronal
title = "Coronal"
if self.axesOrientation[0] == 0 and self.axesOrientation[1] == 2: # OK
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
image = np.flip(image, axis=0)
xLabel = "(+Z) R | PHASE | L (-Z)"
yLabel = "(-X) I | READOUT | S (+X)"
else:
image = np.transpose(image, (0, 2, 1))
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
image = np.flip(image, axis=0)
xLabel = "(+Z) R | READOUT | L (-Z)"
yLabel = "(-X) I | PHASE | S (+X)"
elif self.axesOrientation[2] == 0: # Transversal
title = "Transversal"
if self.axesOrientation[0] == 1 and self.axesOrientation[1] == 2:
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
xLabel = "(+Z) R | PHASE | L (-Z)"
yLabel = "(+Y) P | READOUT | A (-Y)"
else: # OK
image = np.transpose(image, (0, 2, 1))
image = np.flip(image, axis=2)
image = np.flip(image, axis=1)
xLabel = "(+Z) R | READOUT | L (-Z)"
yLabel = "(+Y) P | PHASE | A (-Y)"
else:
xLabel = "%s axis" % axesStr[1]
yLabel = "%s axis" % axesStr[0]
title = "Image"
result1 = {}
result1['widget'] = 'image'
result1['data'] = image
result1['xLabel'] = xLabel
result1['yLabel'] = yLabel
result1['title'] = title
result1['row'] = 0
result1['col'] = 0
result2 = {}
result2['widget'] = 'image'
if self.sl_fraction == 1:
result2['data'] = np.log10(np.abs(self.mapVals['kSpace3D']))
else:
result2['data'] = np.abs(self.mapVals['kSpace3D'])
result2['xLabel'] = "k%s" % axes_str[1]
result2['yLabel'] = "k%s" % axes_str[0]
result2['title'] = "k-Space"
result2['row'] = 0
result2['col'] = 1
# Reset rotation angle and dfov to zero
self.mapVals['angle'] = 0.0
self.mapVals['dfov'] = [0.0, 0.0, 0.0]
hw.dfov = [0.0, 0.0, 0.0]
# DICOM TAGS
# Image
imageDICOM = np.transpose(image, (0, 2, 1))
# If it is a 3d image
if len(imageDICOM.shape) > 2:
# Obtener dimensiones
slices, rows, columns = imageDICOM.shape
self.meta_data["Columns"] = columns
self.meta_data["Rows"] = rows
self.meta_data["NumberOfSlices"] = slices
self.meta_data["NumberOfFrames"] = slices
# if it is a 2d image
else:
# Obtener dimensiones
rows, columns = imageDICOM.shape
self.meta_data["Columns"] = columns
self.meta_data["Rows"] = rows
self.meta_data["NumberOfSlices"] = 1
self.meta_data["NumberOfFrames"] = 1
imgAbs = np.abs(imageDICOM)
imgFullAbs = np.abs(imageDICOM) * (2 ** 15 - 1) / np.amax(np.abs(imageDICOM))
x2 = np.amax(np.abs(imageDICOM))
imgFullInt = np.int16(np.abs(imgFullAbs))
imgFullInt = np.reshape(imgFullInt, (slices, rows, columns))
arr = np.zeros((slices, rows, columns), dtype=np.int16)
arr = imgFullInt
self.meta_data["PixelData"] = arr.tobytes()
self.meta_data["WindowWidth"] = 26373
self.meta_data["WindowCenter"] = 13194
# Sequence parameters
self.meta_data["RepetitionTime"] = self.mapVals['repetition_time']
self.meta_data["EchoTime"] = self.mapVals['echo_time']
# Add results into the output attribute (result1 must be the image to save in dicom)
self.output = [result1, result2]
# Save results
self.saveRawData()
self.save_ismrmrd()
if self.mode == 'Standalone':
self.plotResults()
return self.output
[docs]
def save_ismrmrd(self):
"""
Save the current instance's data in ISMRMRD format.
This method saves the raw data, header information, and reconstructed images to an HDF5 file
using the ISMRMRD (Image Storage and Reconstruction format for MR Data) format.
Steps performed:
1. Generate a timestamp-based filename and directory path for the output file.
2. Initialize the ISMRMRD dataset with the generated path.
3. Populate the header and write the XML header to the dataset. Informations can be added.
4. Reshape the raw data matrix and iterate over scans, slices, and phases to write each acquisition.
5. Set acquisition flags and properties.
6. Append the acquisition data to the dataset.
7. Reshape and save the reconstructed images.
8. Close the dataset.
Attribute:
- self.data_full_mat (numpy.array): Full matrix of raw data to be reshaped and saved.
Returns:
None. It creates an HDF5 file with the ISMRMRD format.
"""
directory_rmd = self.directory_rmd
name = datetime.datetime.now()
name_string = name.strftime("%Y.%m.%d.%H.%M.%S.%f")[:-3]
self.mapVals['name_string'] = name_string
if hasattr(self, 'raw_data_name'):
file_name = "%s.%s" % (self.raw_data_name, name_string)
else:
self.raw_data_name = self.mapVals['seqName']
file_name = "%s.%s" % (self.mapVals['seqName'], name_string)
path= "%s/%s.h5" % (directory_rmd, file_name)
dset = ismrmrd.Dataset(path, f'/dataset', True) # Create the dataset
addRdPoints = hw.addRdPoints
nScans = self.mapVals['nScans']
nPoints = np.array(self.mapVals['nPoints'])
nRD = self.nPoints[0]
nPH = self.nPoints[1]
nSL = self.nPoints[2]
nRep=nPH
bw = self.mapVals['bw']
axesOrientation = self.axesOrientation
axesOrientation_list = axesOrientation.tolist()
read_dir = [0, 0, 0]
phase_dir = [0, 0, 0]
slice_dir = [0, 0, 0]
read_dir[axesOrientation_list.index(0)] = 1
phase_dir[axesOrientation_list.index(1)] = 1
slice_dir[axesOrientation_list.index(2)] = 1
# Experimental Conditions field
exp = ismrmrd.xsd.experimentalConditionsType()
magneticFieldStrength = hw.larmorFreq*1e6/hw.gammaB
exp.H1resonanceFrequency_Hz = hw.larmorFreq
self.header.experimentalConditions = exp
# Acquisition System Information field
sys = ismrmrd.xsd.acquisitionSystemInformationType()
sys.receiverChannels = 1
self.header.acquisitionSystemInformation = sys
# Encoding field can be filled if needed
encoding = ismrmrd.xsd.encodingType()
encoding.trajectory = ismrmrd.xsd.trajectoryType.CARTESIAN
# # encoded and recon spaces
# efov = ismrmrd.xsd.fieldOfViewMm()
# efov.x =
# efov.y =
# efov.z =
# rfov = ismrmrd.xsd.fieldOfViewMm()
# rfov.x =
# rfov.y =
# rfov.z =
# ematrix = ismrmrd.xsd.matrixSizeType()
# rmatrix = ismrmrd.xsd.matrixSizeType()
# ematrix.x =
# ematrix.y =
# ematrix.z =
# rmatrix.x =
# rmatrix.y =
# rmatrix.z =
# espace = ismrmrd.xsd.encodingSpaceType()
# espace.matrixSize = ematrix
# espace.fieldOfView_mm = efov
# rspace = ismrmrd.xsd.encodingSpaceType()
# rspace.matrixSize = rmatrix
# rspace.fieldOfView_mm = rfov
# encoding.encodedSpace = espace
# encoding.reconSpace = rspace
# Encoding limits field can be filled if needed
# limits = ismrmrd.xsd.encodingLimitsType()
# limits1 = ismrmrd.xsd.limitType()
# limits1.minimum =
# limits1.center =
# limits1.maximum =
# limits.kspace_encoding_step_1 = limits1
# limits_rep = ismrmrd.xsd.limitType()
# limits_rep.minimum =
# limits_rep.center =
# limits_rep.maximum =
# limits.repetition = limits_rep
# limits_rest = ismrmrd.xsd.limitType()
# limits_rest.minimum =
# limits_rest.center =
# limits_rest.maximum =
# limits.kspace_encoding_step_0 =
# limits.slice =
# limits.average =
# limits.contrast =
# limits.kspaceEncodingStep2 =
# limits.phase =
# limits.segment =
# limits.set =
# encoding.encodingLimits = limits
# self.header.encoding.append(encoding)
dset.write_xml_header(self.header.toXML())
self.data_full_mat = np.reshape(self.data_full_mat, (nScans, nSL, nPH, nRD + 2 * addRdPoints))
counter = 0
for average in range(nScans):
repetition=0
for slice_idx in range(nSL):
for phase_idx in range(nPH):
# Extraire la ligne correspondante
line = self.data_full_mat[average, slice_idx, phase_idx, :]
# Préparer les données complexes
line2d = np.reshape(line, (1, nRD+2*addRdPoints))
acq = ismrmrd.Acquisition.from_array(line2d, None)
counter += 1
repetition += 1 ##verifier
acq.idx.repetition = repetition
acq.idx.kspace_encode_step_1 = phase_idx + 1
acq.idx.slice = slice_idx + 1
acq.idx.average = average + 1
acq.clearAllFlags()
if phase_idx == 0:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_PHASE)
elif phase_idx == nPH - 1:
acq.setFlag(ismrmrd.ACQ_LAST_IN_PHASE)
if slice_idx == 0:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_SLICE)
elif slice_idx == nSL - 1:
acq.setFlag(ismrmrd.ACQ_LAST_IN_SLICE)
if repetition == 1:
acq.setFlag(ismrmrd.ACQ_FIRST_IN_AVERAGE)
elif repetition == nPH*nSL:
acq.setFlag(ismrmrd.ACQ_LAST_IN_AVERAGE)
acq.scan_counter = counter
acq.discard_pre = hw.addRdPoints
acq.discard_post = hw.addRdPoints
acq.sample_time_us = 1/bw
self.dfov = np.array(self.dfov)
acq.position = (ctypes.c_float * 3)(*self.dfov.flatten())
acq.read_dir = (ctypes.c_float * 3)(*read_dir)
acq.phase_dir = (ctypes.c_float * 3)(*phase_dir)
acq.slice_dir = (ctypes.c_float * 3)(*slice_dir)
# Ajouter l'acquisition au dataset
dset.append_acquisition(acq)
image=self.mapVals['image3D']
image_reshaped = np.reshape(image, (nSL, nPH, nRD))
#for scan in range (nScans): ## image3d does not have scan dimension
for slice_idx in range (nSL):
image_slice = image_reshaped[slice_idx, :, :]
img = ismrmrd.Image.from_array(image_slice)
img.field_of_view = (ctypes.c_float * 3)(*(self.fov)*10) # mm
img.position = (ctypes.c_float * 3)(*self.dfov)
img.sample_time_us = 1/bw
img.image_type = 5 ## COMPLEX
img.read_dir = (ctypes.c_float * 3)(*read_dir)
img.phase_dir = (ctypes.c_float * 3)(*phase_dir)
img.slice_dir = (ctypes.c_float * 3)(*slice_dir)
dset.append_image(f"image_raw", img)
dset.close()
if __name__ == '__main__':
seq = GRE3D()
seq.sequenceAtributes()
seq.sequenceRun(demo=True, plotSeq=False)
seq.sequencePlot(standalone=True)
seq.sequenceRun(demo=True)
seq.sequenceAnalysis(mode='Standalone')