Source code for seq.eddycurrents

import os
import sys

#*****************************************************************************
# Get the directory of the current script
main_directory = os.path.dirname(os.path.realpath(__file__))
parent_directory = os.path.dirname(main_directory)
parent_directory = os.path.dirname(parent_directory)

# Define the subdirectories you want to add to sys.path
subdirs = ['MaRGE', 'marcos_client']

# Add the subdirectories to sys.path
for subdir in subdirs:
    full_path = os.path.join(parent_directory, subdir)
    sys.path.append(full_path)
#******************************************************************************
import controller.experiment_gui as ex
import numpy as np
import seq.mriBlankSeq as blankSeq  # Import the mriBlankSequence for any new sequence.
import configs.hw_config as hw
import configs.units as units

[docs] class EDDYCURRENTS(blankSeq.MRIBLANKSEQ): def __init__(self): super(EDDYCURRENTS, self).__init__() # Input the parameters self.output = None self.larmorFreq = None self.nReadouts = None self.nScans = None self.gAmplitude = None self.acqTime = None self.deadTime = None self.rfExAmp = None self.rfExTime = None self.gAxis = None self.gSteps = None self.gDuration = None self.gRiseTime = None self.repetitionTime = None self.nDelays = None self.tDelayMax = None self.tDelayMin = None self.shimming = None self.addParameter(key='seqName', string='EDDYCURRENTSinfo', val='EDDYCURRENTS') self.addParameter(key='nScans', string='Number of scans', val=1, field='SEQ') self.addParameter(key='larmorFreq', string='Larmor frequency (MHz)', val=3.0, units=units.MHz, field='RF') self.addParameter(key='rfExAmp', string='RF excitation amplitude (a.u.)', val=0.3, field='RF') self.addParameter(key='rfExTime', string='RF excitation time (us)', val=30.0, units=units.us, field='RF') self.addParameter(key='deadTime', string='RF dead time (us)', val=400.0, units=units.us, field='RF') self.addParameter(key='nReadouts', string='Number of points', val=600, field='SEQ') self.addParameter(key='acqTime', string='Acquisition time (ms)', val=3.0, units=units.ms, field='SEQ') self.addParameter(key='repetitionTime', string='Repetition time (ms)', val=20, units=units.ms, field='SEQ') self.addParameter(key='shimming', string='Shimming', val=[0, 0, 0], units=units.sh, field='SEQ') self.addParameter(key='gAxis', string='G axis', val='x', tip="'x', 'y' or 'z'", field='SEQ') self.addParameter(key='gSteps', string='G steps', val=0, field='SEQ', tip="0 if you want to use the default value in the hw_config.py file") self.addParameter(key='gRiseTime', string='G rise time (us)', val=0, units=units.us, field='SEQ', tip="0 if you want to use the default value in the hw_config.py file") self.addParameter(key='gAmplitude', string='G amplitude (mT/m)', val=5.0, units=units.mTm, field='SEQ') self.addParameter(key='gDuration', string='G duration (ms)', val=10, units=units.ms, field='SEQ') self.addParameter(key='tDelayMin', string='Min delay G2RF (ms)', val=1, units=units.ms, field='SEQ') self.addParameter(key='tDelayMax', string='Max delay G2RF (ms)', val=10, units=units.ms, field='SEQ') self.addParameter(key='nDelays', string='Number of delays', val=2, field='SEQ')
[docs] def sequenceInfo(self): print("EDDYCURRENTS") print("Author: Dr. J.M. Algarín") print("Contact: josalggui@i3m.upv.es") print("mriLab @ i3M, CSIC, Spain") print("Sequence to characterize the eddy currents.\n")
[docs] def sequenceTime(self): n_delays = self.mapVals['nDelays'] n_scans = self.mapVals['nScans'] tr = self.mapVals['repetitionTime'] # ms return tr*1e-3*n_scans*n_delays*3/60 # minutes, scanTime
[docs] def sequenceRun(self, plotSeq=0, demo=False, standalone=False): init_gpa = False self.demo = demo # Define delay array delays = np.linspace(self.tDelayMin, self.tDelayMax, self.nDelays)*1e6 #us # Fix gSteps and gRiseTime if self.gSteps == 0: self.gSteps = hw.grad_steps if self.gRiseTime == 0: self.gRiseTime = hw.grad_rise_time*1e-6 # Fix axis if self.gAxis == 'x': self.gAxis = 0 elif self.gAxis == 'y': self.gAxis = 1 elif self.gAxis == 'z': self.gAxis = 2 def createSequence(): rd_points = 0 t_ini = 20 # Shimming self.iniSequence(t_ini, self.shimming) for delay in delays: # Gradient pulse 0 t0 = t_ini t0 += 20 self.gradTrap(t0, self.gRiseTime, self.gDuration, 0, self.gSteps, self.gAxis, self.shimming) # Tx RF pulse t0 = t0 + self.gDuration + 2*self.gRiseTime + delay - hw.blkTime self.rfRecPulse(t0, rfTime=self.rfExTime, rfAmplitude=self.rfExAmp, rfPhase=0) # Rx gate t0 = t0 + hw.blkTime + self.rfExTime + self.deadTime self.rxGateSync(t0, self.acqTime) rd_points += self.nReadouts + 2 * hw.addRdPoints # ********************************************** # Gradient pulse + t0 = t_ini + self.repetitionTime t0 += 20 self.gradTrap(t0, self.gRiseTime, self.gDuration, self.gAmplitude, self.gSteps, self.gAxis, self.shimming) # Tx RF pulse t0 = t0 + self.gDuration + 2 * self.gRiseTime + delay - hw.blkTime self.rfRecPulse(t0, rfTime=self.rfExTime, rfAmplitude=self.rfExAmp, rfPhase=0) # Rx gate t0 = t0 + hw.blkTime + self.rfExTime + self.deadTime self.rxGateSync(t0, self.acqTime) rd_points += self.nReadouts + 2 * hw.addRdPoints # ********************************************** # Gradient pulse - t0 = t_ini + 2 * self.repetitionTime t0 += 20 self.gradTrap(t0, self.gRiseTime, self.gDuration, -self.gAmplitude, self.gSteps, self.gAxis, self.shimming) # Tx RF pulse t0 = t0 + self.gDuration + 2 * self.gRiseTime + delay - hw.blkTime self.rfRecPulse(t0, rfTime=self.rfExTime, rfAmplitude=self.rfExAmp, rfPhase=0) # Rx gate t0 = t0 + hw.blkTime + self.rfExTime + self.deadTime self.rxGateSync(t0, self.acqTime) rd_points += self.nReadouts + 2 * hw.addRdPoints # ********************************************** # Update t_ini t_ini += 3 * self.repetitionTime self.endSequence(self.repetitionTime*3*self.nDelays*self.nScans) return rd_points # Time parameters to us self.gRiseTime *= 1e6 self.gDuration *= 1e6 self.gRiseTime *= 1e6 self.rfExTime *= 1e6 self.acqTime *= 1e6 self.deadTime *= 1e6 self.repetitionTime *= 1e6 # Initialize the experiment bw = self.nReadouts / self.acqTime # MHz sampling_period = 1 / bw # us self.mapVals['samplingPeriod'] = sampling_period if not self.demo: self.expt = ex.Experiment(lo_freq=self.larmorFreq * 1e-6, rx_t=sampling_period, init_gpa=False, gpa_fhdo_offset_time=(1 / 0.2 / 3.1), ) sampling_period = self.expt.getSamplingRate() bw = 1 / sampling_period # MHz self.acqTime = self.nReadouts / bw # us self.mapVals['bw'] = bw * 1e6 # Create the sequence and add instructions to the experiment points_to_measure = createSequence() if not self.demo: if self.floDict2Exp(): print("Sequence waveforms loaded successfully") pass else: print("ERROR: sequence waveforms out of hardware bounds") return False # Run the experiment data_over = [] # To save oversampled data if not plotSeq: for scan in range(self.nScans): print("Scan %i running..." % (scan + 1)) if not self.demo: points_measured = 0 while points_measured != (points_to_measure * hw.oversamplingFactor): rxd, msgs = self.expt.run() points_measured = np.size(rxd['rx0']) print("Acquired points = %i" % points_measured) print("Expected points = %i" % (points_to_measure * hw.oversamplingFactor)) else: rxd = {'rx0': np.random.randn(points_to_measure * hw.oversamplingFactor) + 1j * np.random.randn( points_to_measure * hw.oversamplingFactor)} points_measured = np.size(rxd['rx0']) print("Acquired points = %i" % points_measured) print("Expected points = %i" % (points_to_measure * hw.oversamplingFactor)) data_over = np.concatenate((data_over, rxd['rx0']), axis=0) print("Scan %i ready!" % (scan + 1)) self.mapVals['data_oversampled'] = data_over elif plotSeq and standalone: self.sequencePlot(standalone=standalone) return True # Close the experiment if not self.demo: self.expt.__del__() # Process data to be plotted if not plotSeq: data_full = self.decimate(data_over, 3*self.nScans*self.nDelays, option='Normal') self.mapVals['data_full'] = data_full data = np.average(np.reshape(data_full, (self.nScans, self.nDelays, 3, -1)), axis=0) self.mapVals['data'] = data # Get fft spectrums = np.zeros_like(data, dtype=complex) for delay_idx in range(self.nDelays): for rx_idx in range(3): data_prov = data[delay_idx, rx_idx, :] spectrums[delay_idx, rx_idx, :] = np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(data_prov))) self.mapVals['spectrums'] = spectrums # Data to sweep sequence self.mapVals['sampledPoint'] = data[0, 0, 0] return True
[docs] def sequenceAnalysis(self, mode=None): self.mode = mode # Load data spectrums = self.mapVals['spectrums'] # Get spectrum amplitudes gradZero = np.squeeze(spectrums[:, 0, :]) gradPositive = np.squeeze(spectrums[:, 1, :]) gradNegative = np.squeeze(spectrums[:, 2, :]) gZero = np.max(np.abs(gradZero), axis=1) gPositive = np.max(np.abs(gradPositive), axis=1) gNegative = np.max(np.abs(gradNegative), axis=1) t_vector = np.linspace(self.tDelayMin, self.tDelayMax, self.nDelays)*1e3 # ms # Spectrum maps result1 = {'widget': 'image', 'data': np.abs(np.expand_dims(np.transpose(spectrums, axes=(1, 0, 2))[0, :, :], axis=0)), 'xLabel': 'Frequency (a.u.)', 'yLabel': 'Delay (a.u.)', 'title': 'No gradient', 'row': 0, 'col': 0} # Plot 2 result2 = {'widget': 'image', 'data': np.abs(np.expand_dims(np.transpose(spectrums, axes=(1, 0, 2))[1, :, :], axis=0)), 'xLabel': 'Frequency (a.u.)', 'yLabel': 'Delay (a.u.)', 'title': 'Positive gradient', 'row': 0, 'col': 1} # Plot 3 result3 = {'widget': 'image', 'data': np.abs(np.expand_dims(np.transpose(spectrums, axes=(1, 0, 2))[2, :, :], axis=0)), 'xLabel': 'Frequency (a.u.)', 'yLabel': 'Delay (a.u.)', 'title': 'Negative gradient', 'row': 1, 'col': 0} # Plot 4 result4 = {'widget': 'curve', 'xData': t_vector, 'yData': [gZero, gPositive, gNegative], 'xLabel': 'Delay G2RF Time (ms)', 'yLabel': 'Spectrum amplitude (a.u.)', 'title': 'Echo', 'legend': ['No gradient', 'Positive gradient', 'Negative Gradient'], 'row': 1, 'col': 1 } self.output = [result4] # save data once self.output is created self.saveRawData() # Plot result in standalone execution if self.mode == 'Standalone': self.plotResults() return self.output
# ********************************************************************************************************************** # METHOD TO ASSESS MINIMUM DELAY TO EDDY CURRENTS ATTENUATION FOR A GRADIENT ESTABLISHED IN A ZTE EMBODIMENT # import experiment as ex # import numpy as np # import seq.mriBlankSeq as blankSeq # Import the mriBlankSequence for any new sequence. # import scipy.signal as sig # import configs.hw_config as hw # # class EDDYCURRENTS(blankSeq.MRIBLANKSEQ): # def __init__(self): # super(EDDYCURRENTS, self).__init__() # # Input the parameters # self.addParameter(key='seqName', string='EDDYCURRENTSinfo', val='EDDYCURRENTS') # self.addParameter(key='nScans', string='Number of scans', val=1, field='RF') # self.addParameter(key='larmorFreq', string='Larmor frequency (MHz)', val=8.365, field='RF') # self.addParameter(key='rfExAmp', string='RF excitation amplitude (a.u.)', val=1.0, field='RF') # self.addParameter(key='rfExTime', string='RF excitation time (us)', val=30.0, field='RF') # self.addParameter(key='rfExPhase', string='RF phase', val=60.0, field='RF') # self.addParameter(key='deadTime', string='RF dead time (us)', val=80.0, field='RF') # self.addParameter(key='txChannel', string='Tx channel', val=0, field='RF') # self.addParameter(key='rxChannel', string='Rx channel', val=0, field='RF') # self.addParameter(key='nReadout', string='Number of points', val=600, field='SEQ') # self.addParameter(key='tAdq', string='Acquisition time (ms)', val=3.0, field='SEQ') # self.addParameter(key='repetitionTime', string='Repetition time (ms)', val=1000, field='SEQ') # self.addParameter(key='shimming', string='Shimming (*1e4)', val=[0, 0, 0], field='SEQ') # self.addParameter(key='gAxis', string='G axis', val=2, field='SEQ') # self.addParameter(key='gSteps', string='G steps', val=20, field='SEQ') # self.addParameter(key='gRiseTime', string='G rise time (us)', val=150, field='SEQ') # self.addParameter(key='gAmp', string='G amplitude (mT/m)', val=5.0, field='SEQ') # self.addParameter(key='tDelayInit', string='Delay G-RF initial (ms)', val=1, field='SEQ') # self.addParameter(key='tDelayMiddle', string='Delay G-RF middle (ms)', val=1, field='SEQ') # self.addParameter(key='tDelayFinal', string='Delay G-RF final (ms)', val=1, field='SEQ') # self.addParameter(key='addRdPoints', string='addRdPoints', val=10, field='SEQ') # # def sequenceInfo(self): # # print("EDDYCURRENTS") # print("Author: Dr. J.M. Algarín") # print("Contact: josalggui@i3m.upv.es") # print("mriLab @ i3M, CSIC, Spain") # # def sequenceTime(self): # return(4*self.mapVals['repetitionTime']*1e-3/60) # minutes, scanTime # # def sequenceRun(self, plotSeq=0, demo=False): # init_gpa = False # Starts the gpa # # # Create input parameters # nScans = self.mapVals['nScans'] # larmorFreq = self.mapVals['larmorFreq'] # MHz # rfExAmp = self.mapVals['rfExAmp'] # rfExTime = self.mapVals['rfExTime'] # us # rfExPhase = self.mapVals['rfExPhase'] # deadTime = self.mapVals['deadTime'] # us # nReadout = self.mapVals['nReadout'] # tAdq = self.mapVals['tAdq'] * 1e3 # us # TR = self.mapVals['repetitionTime'] * 1e3 # us # gAxis = self.mapVals['gAxis'] # gSteps = self.mapVals['gSteps'] # gRiseTime = self.mapVals['gRiseTime'] # us # gAmp = self.mapVals['gAmp']*1e-3 #T/m # delayGtoRF_0 = self.mapVals['tDelayInit']*1e3 # us # delayGtoRF_M = self.mapVals['tDelayMiddle'] * 1e3 # us # delayGtoRF_F = self.mapVals['tDelayFinal'] * 1e3 # us # addRdPoints = self.mapVals['addRdPoints'] # txChannel = self.mapVals['txChannel'] # rxChannel = self.mapVals['rxChannel'] # shimming = self.mapVals['shimming'] # # # CONSTANTES # tini = 20 # gAmpMax = 40 # shimming = np.array(shimming) * 1e-4 # # # CONDICIONES PARAMETROS INICIALES. # if gAmp > gAmpMax: # gAmp = gAmpMax # # rfExPhase = rfExPhase * np.pi / 180 # rfExAmp = rfExAmp * np.exp(1j * rfExPhase) # # # CONDICIONES PARAMETROS INICIALES. # # rfExPhase = rfExPhase * np.pi / 180 # rfExAmp = rfExAmp * np.exp(1j * rfExPhase) # # def createSequence(): # # if TR < delayGtoRF_F + rfExTime + deadTime + acqTimeReal: # print("TR so short") # # for scan in range(nScans): # # Gradient pulse # tGup0 = tini # self.gradTrap(tGup0, gRiseTime, TR, gAmp, gSteps, gAxis, shimming) # # # Tx gate 1 # tRF1 = tGup0 + gRiseTime + delayGtoRF_0 - hw.blkTime # self.rfRecPulse(tRF1, rfExTime, rfExAmp, rfExPhase, txChannel=txChannel) # # Rx gate 1 # tRx1 = tRF1 + hw.blkTime + rfExTime + deadTime - addRdPoints / BWreal # self.rxGate(tRx1, acqTimeReal + addRdPoints / BWreal, rxChannel=rxChannel) # # # Tx gate 2 # tRF2 = tGup0 + gRiseTime + delayGtoRF_M - hw.blkTime # self.rfRecPulse(tRF2, rfExTime, rfExAmp, rfExPhase, txChannel=txChannel) # # Rx gate 2 # tRx2 = tRF2 + hw.blkTime + rfExTime + deadTime - addRdPoints / BWreal # self.rxGate(tRx2, acqTimeReal + addRdPoints / BWreal, rxChannel=rxChannel) # # # Tx gate 3 # tRF3 = tGup0 + gRiseTime + delayGtoRF_F - hw.blkTime # self.rfRecPulse(tRF3, rfExTime, rfExAmp, rfExPhase, txChannel=txChannel) # # Rx gate 3 # tRx3 = tRF3 + hw.blkTime + rfExTime + deadTime - addRdPoints / BWreal # self.rxGate(tRx3, acqTimeReal + addRdPoints / BWreal, rxChannel=rxChannel) # # self.endSequence(tini + (TR + 2*gRiseTime)) # # # Initialize the experiment # BWoriginal = nReadout / tAdq # MHz # self.mapVals['BWoriginal'] = BWoriginal # samplingPeriodOriginal = 1 / BWoriginal # us # BWov = BWoriginal * hw.oversamplingFactor # MHz # samplingPeriodov = 1 / BWov # self.expt = ex.Experiment(lo_freq=larmorFreq, rx_t=samplingPeriodov, init_gpa=init_gpa, gpa_fhdo_offset_time=(1 / 0.2 / 3.1)) # samplingPeriodReal = self.expt.get_rx_ts()[0] # BWreal = 1 / samplingPeriodReal / hw.oversamplingFactor # MHz # acqTimeReal = nReadout / BWreal # us # createSequence() # if self.floDict2Exp(): # print("Sequence waveforms loaded successfully") # pass # else: # print("ERROR: sequence waveforms out of hardware bounds") # return False # # if plotSeq == 0: # # Run the experiment and get data # nScans = self.mapVals['nScans'] # rxd, msgs = self.expt.run() # rxd['rx%i' % rxChannel] = np.real(rxd['rx%i'%rxChannel])-1j*np.imag(rxd['rx%i'%rxChannel]) # overData = rxd['rx%i'%rxChannel]*hw.adcFactor # dataFull = sig.decimate(overData, hw.oversamplingFactor, ftype='fir', zero_phase=True) # self.mapVals['overData'] = overData # self.mapVals['dataFull'] = dataFull # data = np.average(np.reshape(dataFull, (nScans, -1)), axis=0) # self.mapVals['data'] = data # signal = np.reshape(data, (3, nReadout+self.mapVals['addRdPoints'])) # signalFiltered = np.delete(signal, np.s_[0:addRdPoints], axis=1) # self.mapVals['signals'] = signalFiltered # self.expt.__del__() # # return True # # def sequenceAnalysis(self, obj=''): # BW = self.mapVals['BWoriginal']*1e3 # FreqVector = np.linspace(-BW/2, BW/2, self.mapVals['nReadout']) # ReadoutVector = np.linspace(0, self.mapVals['tAdq'], self.mapVals['nReadout']) # signals = self.mapVals['signals'] # signal0 = signals[0, :] # signalup = signals[1, :] # signaldown = signals[2, :] # spectrum0 = np.abs(np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(signal0)))) # spectrumup = np.abs(np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(signalup)))) # spectrumdown = np.abs(np.fft.ifftshift(np.fft.ifftn(np.fft.ifftshift(signaldown)))) # # # delayGtoRF_0 = self.mapVals['tDelayInit'] # delayGtoRF_M = self.mapVals['tDelayMiddle'] # delayGtoRF_F =self.mapVals['tDelayFinal'] # # # Add time signal to the layout # result1 = {'widget': 'curve', # 'xData': ReadoutVector, # 'yData': [np.abs(signal0), np.abs(signalup), np.abs(signaldown)], # 'xLabel': 'Time (ms)', # 'yLabel': 'Signal amplitude (mV)', # 'title': 'Signal vs time, Residual Gradient: %1.3f mT/m' % ResidualGradient*1e3, # 'legend': ['G=0', 'G-', 'G+'], # 'row': 0, # 'col': 0} # # # Add frequency spectrum to the layout # result2 = {'widget': 'curve', # 'xData': FreqVector, # 'yData': [np.abs(spectrum0), np.abs(spectrumup), np.abs(spectrumdown)], # 'xLabel': 'Frequency (kHz)', # 'yLabel': 'Signal amplitude (a.u.)', # 'title': 'Signal vs freq', # 'legend': ['G=0', 'G-', 'G+'], # 'row': 1, # 'col': 0} # # self.output = [result1, result2] # # self.saveRawData() # # return self.output if __name__ == '__main__': seq = EDDYCURRENTS() seq.sequenceAtributes() seq.sequenceRun(plotSeq=False, demo=True, standalone=True) # seq.sequencePlot(standalone=True) seq.sequenceAnalysis(mode='Standalone')