attoworld.data
This module will contain functions for loading the various data formats used across the labs.
1""" 2This module will contain functions for loading the various data formats used across the labs. 3""" 4 5from .file_io import ( 6 read_dwc, 7 load_waves_from_matfile, 8 load_waveform_from_text, 9 load_spectrum_from_text, 10 read_Trebino_FROG_matrix, 11 read_Trebino_FROG_speck, 12 read_Trebino_FROG_data, 13) 14from .LunaResult import LunaResult 15from .data_structures import ( 16 yaml_io, 17 Waveform, 18 ComplexSpectrum, 19 IntensitySpectrum, 20 ComplexEnvelope, 21 FrogData, 22 Spectrogram, 23) 24 25__all__ = [ 26 "read_dwc", 27 "load_waves_from_matfile", 28 "load_waveform_from_text", 29 "load_spectrum_from_text", 30 "FrogData", 31 "read_Trebino_FROG_matrix", 32 "read_Trebino_FROG_speck", 33 "read_Trebino_FROG_data", 34 "LunaResult", 35 "yaml_io", 36 "Waveform", 37 "ComplexSpectrum", 38 "IntensitySpectrum", 39 "ComplexEnvelope", 40 "Spectrogram", 41]
19def read_dwc(file_path): 20 """ 21 Reads files in the .dwc format produced by many FROG scanners 22 23 Args: 24 file_path: path to the .dwc file 25 26 Returns: 27 Spectrogram: the loaded data 28 """ 29 with open(file_path, "r") as file: 30 lines = file.readlines() 31 delay_increment = float(lines[2].strip().split("=")[1]) 32 33 wavelength_vector = pd.read_csv( 34 file_path, skiprows=5, nrows=1, delimiter="\t", header=None 35 ).values[0] 36 37 data_array = pd.read_csv( 38 file_path, skiprows=8, delimiter="\t", header=None, dtype=float 39 ).values 40 41 freqs, first_spec = wavelength_to_frequency(wavelength_vector, data_array[:, 0]) 42 if freqs is not None: 43 data_array_freq = np.zeros((len(freqs), data_array.shape[1])) 44 data_array_freq[:, 0] = first_spec 45 for _i in range(data_array.shape[1]): 46 _f, _dat = wavelength_to_frequency( 47 wavelength_vector, data_array[:, _i], freqs 48 ) 49 data_array_freq[:, _i] = _dat 50 dt = 1e-15 * delay_increment 51 delays = dt * np.array(range(data_array.shape[1])) 52 delays -= np.mean(delays) 53 return Spectrogram(data=data_array_freq, time=delays, freq=freqs) 54 else: 55 raise Exception("Interpolation failure reading dwc file")
Reads files in the .dwc format produced by many FROG scanners
Arguments:
- file_path: path to the .dwc file
Returns:
Spectrogram: the loaded data
117def load_waves_from_matfile(filename: str, phase: Optional[float] = None): 118 """Load the contents of an attolab scanner file in .mat format 119 120 Args: 121 phase (float): phase to use when interpreting the lock-in data 122 filename (str): path to the mat file 123 Returns: 124 time_delay: array of time delay values 125 signal: signals corresponding to the time delays 126 """ 127 128 datablob = sio.loadmat(filename) 129 stage_position = datablob["xdata"][0, :] 130 time_delay = -2e-3 * stage_position / 2.9979e8 131 lia_x = datablob["x0"] 132 lia_y = datablob["y0"] 133 if phase is None: 134 optimized_phase = np.atan2(np.sum(lia_y[:] ** 2), np.sum(lia_x[:] ** 2)) 135 signal = np.fliplr( 136 lia_x * np.cos(optimized_phase) + lia_y * np.sin(optimized_phase) 137 ) 138 return time_delay, signal 139 else: 140 signal = np.fliplr(lia_x * np.cos(phase) - lia_y * np.sin(phase)) 141 return time_delay, signal
Load the contents of an attolab scanner file in .mat format
Arguments:
- phase (float): phase to use when interpreting the lock-in data
- filename (str): path to the mat file
Returns:
time_delay: array of time delay values signal: signals corresponding to the time delays
88def load_waveform_from_text( 89 filename: str, 90 time_multiplier: float = 1e-15, 91 time_field: str = "delay (fs)", 92 wave_field: str = "field (a.u.)", 93 sep="\t", 94) -> Waveform: 95 """Loads a waveform from a text file 96 97 Args: 98 filename (str): path to the file 99 time_multiplier (float): multiplier needed to convert the time unit in the file to seconds 100 time_field (str): name (header) of the column containing the times 101 wave_field (str): name (header) of the column containing the waveform 102 sep (str): separator used in the file format (tab is default) 103 104 Returns: 105 Waveform: the waveform 106 """ 107 108 data = pd.read_csv(filename, sep=sep) 109 time = time_multiplier * data[time_field].to_numpy() 110 wave = data[wave_field].to_numpy() 111 dt = time[1] - time[0] 112 diff_time = np.diff(time) 113 uniform = bool(np.all(np.isclose(diff_time, diff_time[0]))) 114 return Waveform(wave=wave, time=time, dt=dt, is_uniformly_spaced=uniform)
Loads a waveform from a text file
Arguments:
- filename (str): path to the file
- time_multiplier (float): multiplier needed to convert the time unit in the file to seconds
- time_field (str): name (header) of the column containing the times
- wave_field (str): name (header) of the column containing the waveform
- sep (str): separator used in the file format (tab is default)
Returns:
Waveform: the waveform
58def load_spectrum_from_text( 59 filename: str, 60 wavelength_multiplier: float = 1e-9, 61 wavelength_field: str = "wavelength (nm)", 62 spectrum_field: str = "intensity (a.u.)", 63 sep: str = "\t", 64): 65 """ 66 Load a spectrum contained in a text file. 67 68 Args: 69 filename (str): path to the file 70 wavelength_multiplier (float): multiplier to convert wavelength to m 71 wavelength_field (str): name of the field in the data corresponding to wavelength 72 spectrum_field (str): name of the field in the data corresponding to spectral intensity 73 sep (str): column separator 74 Returns: 75 IntensitySpectrum: the intensity spectrum""" 76 data = pd.read_csv(filename, sep=sep) 77 wavelength = wavelength_multiplier * np.array(data[wavelength_field]) 78 freq = constants.speed_of_light / wavelength 79 spectrum = np.array(data[spectrum_field]) 80 return IntensitySpectrum( 81 spectrum=spectrum, 82 wavelength=wavelength, 83 freq=freq, 84 phase=np.zeros(spectrum.shape, dtype=float), 85 )
Load a spectrum contained in a text file.
Arguments:
- filename (str): path to the file
- wavelength_multiplier (float): multiplier to convert wavelength to m
- wavelength_field (str): name of the field in the data corresponding to wavelength
- spectrum_field (str): name of the field in the data corresponding to spectral intensity
- sep (str): column separator
Returns:
IntensitySpectrum: the intensity spectrum
889@yaml_io 890@dataclass(frozen=True, slots=True) 891class FrogData: 892 """ 893 Stores data from a FROG measurement 894 895 Attributes: 896 spectrum (ComplexSpectrum): the reconstructed complex spectrum 897 pulse (Waveform): time-domain reconstructed field 898 measured_spectrogram (Spectrogram): measured (binned) data 899 reconstructed_spectrogram (Spectrogram): spectrogram resulting from reconstructed field 900 """ 901 902 spectrum: ComplexSpectrum 903 pulse: Waveform 904 measured_spectrogram: Spectrogram 905 reconstructed_spectrogram: Spectrogram 906 raw_reconstruction: np.ndarray 907 f0: float 908 dt: float 909 910 def lock(self): 911 """ 912 Make the data immutable 913 """ 914 self.raw_reconstruction.setflags(write=False) 915 self.spectrum.lock() 916 self.pulse.lock() 917 self.measured_spectrogram.lock() 918 self.reconstructed_spectrogram.lock() 919 920 def save(self, base_filename): 921 """ 922 Save in the Trebino FROG format 923 924 Args: 925 base_filename: base of the file path; 4 files will be made from it: .A.dat, .Arecon.dat, .Ek.dat, and .Speck.dat 926 """ 927 self.measured_spectrogram.save(base_filename + ".A.dat") 928 self.reconstructed_spectrogram.save(base_filename + ".Arecon.dat") 929 930 t = 1e15 * self.dt * np.array(range(len(self.raw_reconstruction))) 931 t -= np.mean(t) 932 f = np.fft.fftshift(np.fft.fftfreq(len(t), d=self.dt)) + self.f0 933 lam = constants.speed_of_light / f 934 raw_spec = np.fft.fftshift(np.fft.fft(self.raw_reconstruction)) 935 936 with open(base_filename + ".Ek.dat", "w") as time_file: 937 for _i in range(len(self.raw_reconstruction)): 938 time_file.write( 939 f"{t[_i]:.15g}\t{np.abs(self.raw_reconstruction[_i]) ** 2:.15g}\t{np.angle(self.raw_reconstruction[_i]):.15g}\t{np.real(self.raw_reconstruction[_i]):.15g}\t{np.imag(self.raw_reconstruction[_i]):.15g}\n" 940 ) 941 942 with open(base_filename + ".Speck.dat", "w") as spec_file: 943 for _i in range(len(self.raw_reconstruction)): 944 spec_file.write( 945 f"{lam[_i]:.15g}\t{np.abs(raw_spec[_i]) ** 2:.15g}\t{np.angle(raw_spec[_i]):.15g}\t{np.real(raw_spec[_i]):.15g}\t{np.imag(raw_spec[_i]):.15g}\n" 946 ) 947 948 def plot_measured_spectrogram(self, ax: Optional[Axes] = None): 949 """ 950 Plot the measured spectrogram. 951 952 Args: 953 ax: optionally plot onto a pre-existing matplotlib Axes 954 """ 955 if ax is None: 956 fig, ax = plt.subplots() 957 else: 958 fig = ax.get_figure() 959 self.measured_spectrogram.plot(ax) 960 ax.set_title("Measured") 961 return fig 962 963 def plot_reconstructed_spectrogram(self, ax: Optional[Axes] = None): 964 """ 965 Plot the reconstructed spectrogram. 966 967 Args: 968 ax: optionally plot onto a pre-existing matplotlib Axes 969 """ 970 if ax is None: 971 fig, ax = plt.subplots() 972 else: 973 fig = ax.get_figure() 974 self.reconstructed_spectrogram.plot(ax) 975 ax.set_title( 976 f"Retrieved (G': {self.get_error():0.2e}; G: {self.get_G_error():0.2e})" 977 ) 978 return fig 979 980 def plot_pulse( 981 self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None 982 ): 983 """ 984 Plot the reconstructed pulse. 985 986 Args: 987 ax: optionally plot onto a pre-existing matplotlib Axes 988 phase_blanking: only show phase information (instantaneous frequency) above this level relative to max intensity 989 xlim: pass arguments to set_xlim() to constrain the x-axis 990 """ 991 return self.pulse.to_complex_envelope().plot(ax, phase_blanking, xlim) 992 993 def plot_spectrum( 994 self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None 995 ): 996 """ 997 Plot the reconstructed spectrum and group delay curve. 998 999 Args: 1000 ax: optionally plot onto a pre-existing matplotlib Axes 1001 phase_blanking: only show phase information (group delay) above this level relative to max intensity 1002 xlim: pass arguments to set_xlim() to constrain the x-axis 1003 """ 1004 return self.spectrum.to_intensity_spectrum().plot_with_group_delay( 1005 ax, phase_blanking, xlim 1006 ) 1007 1008 def plot_all( 1009 self, phase_blanking=0.05, time_xlims=None, wavelength_xlims=None, figsize=None 1010 ): 1011 """ 1012 Produce a 4-panel plot of the FROG results, combining calls to plot_measured_spectrogram(), 1013 plot_reconstructed_spectrogram(), plot_pulse() and plot_spectrum() as subplots, with letter labels. 1014 1015 Args: 1016 phase_blanking: relative intensity at which to show phase information 1017 time_xlim: x-axis limits to pass to the plot of the pulse 1018 wavelength_xlim: x-axis limits to pass to the plot of the spectrum 1019 figsize: custom figure size 1020 """ 1021 if figsize is None: 1022 default_figsize = plt.rcParams["figure.figsize"] 1023 figsize = (default_figsize[0] * 2, default_figsize[1] * 2) 1024 fig, ax = plt.subplots(2, 2, figsize=figsize) 1025 self.plot_measured_spectrogram(ax[0, 0]) 1026 label_letter("a", ax[0, 0]) 1027 self.plot_reconstructed_spectrogram(ax[1, 0]) 1028 label_letter("b", ax[1, 0]) 1029 self.plot_pulse(ax[0, 1], xlim=time_xlims) 1030 label_letter("c", ax[0, 1]) 1031 self.plot_spectrum(ax[1, 1], xlim=wavelength_xlims) 1032 label_letter("d", ax[1, 1]) 1033 return fig 1034 1035 def get_error(self) -> float: 1036 """ 1037 Get the G' error of the reconstruction 1038 """ 1039 norm_measured = np.linalg.norm(self.measured_spectrogram.data) 1040 norm_retrieved = np.linalg.norm(self.reconstructed_spectrogram.data) 1041 return np.sqrt( 1042 np.sum( 1043 ( 1044 self.measured_spectrogram.data[:] / norm_measured 1045 - self.reconstructed_spectrogram.data[:] / norm_retrieved 1046 ) 1047 ** 2 1048 ) 1049 / np.sum((self.measured_spectrogram.data[:] / norm_measured) ** 2) 1050 ) 1051 1052 def get_G_error(self) -> float: 1053 """ 1054 Get the G (note: no apostrophe) error. This one doesn't mean much, but is useful 1055 for comparing reconstructions of the same spectrogram between different programs. 1056 """ 1057 return np.sqrt( 1058 (1.0 / float(len(self.measured_spectrogram.data[:]) ** 2)) 1059 * np.sum( 1060 ( 1061 self.measured_spectrogram.data[:] 1062 / np.max(self.measured_spectrogram.data[:]) 1063 - self.reconstructed_spectrogram.data[:] 1064 / np.max(self.reconstructed_spectrogram.data[:]) 1065 ) 1066 ** 2 1067 ) 1068 ) 1069 1070 def get_fwhm(self) -> float: 1071 """ 1072 Get the full-width-at-half-max value of the reconstructed pulse 1073 """ 1074 return self.pulse.get_envelope_fwhm()
Stores data from a FROG measurement
Attributes:
- spectrum (ComplexSpectrum): the reconstructed complex spectrum
- pulse (Waveform): time-domain reconstructed field
- measured_spectrogram (Spectrogram): measured (binned) data
- reconstructed_spectrogram (Spectrogram): spectrogram resulting from reconstructed field
910 def lock(self): 911 """ 912 Make the data immutable 913 """ 914 self.raw_reconstruction.setflags(write=False) 915 self.spectrum.lock() 916 self.pulse.lock() 917 self.measured_spectrogram.lock() 918 self.reconstructed_spectrogram.lock()
Make the data immutable
920 def save(self, base_filename): 921 """ 922 Save in the Trebino FROG format 923 924 Args: 925 base_filename: base of the file path; 4 files will be made from it: .A.dat, .Arecon.dat, .Ek.dat, and .Speck.dat 926 """ 927 self.measured_spectrogram.save(base_filename + ".A.dat") 928 self.reconstructed_spectrogram.save(base_filename + ".Arecon.dat") 929 930 t = 1e15 * self.dt * np.array(range(len(self.raw_reconstruction))) 931 t -= np.mean(t) 932 f = np.fft.fftshift(np.fft.fftfreq(len(t), d=self.dt)) + self.f0 933 lam = constants.speed_of_light / f 934 raw_spec = np.fft.fftshift(np.fft.fft(self.raw_reconstruction)) 935 936 with open(base_filename + ".Ek.dat", "w") as time_file: 937 for _i in range(len(self.raw_reconstruction)): 938 time_file.write( 939 f"{t[_i]:.15g}\t{np.abs(self.raw_reconstruction[_i]) ** 2:.15g}\t{np.angle(self.raw_reconstruction[_i]):.15g}\t{np.real(self.raw_reconstruction[_i]):.15g}\t{np.imag(self.raw_reconstruction[_i]):.15g}\n" 940 ) 941 942 with open(base_filename + ".Speck.dat", "w") as spec_file: 943 for _i in range(len(self.raw_reconstruction)): 944 spec_file.write( 945 f"{lam[_i]:.15g}\t{np.abs(raw_spec[_i]) ** 2:.15g}\t{np.angle(raw_spec[_i]):.15g}\t{np.real(raw_spec[_i]):.15g}\t{np.imag(raw_spec[_i]):.15g}\n" 946 )
Save in the Trebino FROG format
Arguments:
- base_filename: base of the file path; 4 files will be made from it: .A.dat, .Arecon.dat, .Ek.dat, and .Speck.dat
948 def plot_measured_spectrogram(self, ax: Optional[Axes] = None): 949 """ 950 Plot the measured spectrogram. 951 952 Args: 953 ax: optionally plot onto a pre-existing matplotlib Axes 954 """ 955 if ax is None: 956 fig, ax = plt.subplots() 957 else: 958 fig = ax.get_figure() 959 self.measured_spectrogram.plot(ax) 960 ax.set_title("Measured") 961 return fig
Plot the measured spectrogram.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
963 def plot_reconstructed_spectrogram(self, ax: Optional[Axes] = None): 964 """ 965 Plot the reconstructed spectrogram. 966 967 Args: 968 ax: optionally plot onto a pre-existing matplotlib Axes 969 """ 970 if ax is None: 971 fig, ax = plt.subplots() 972 else: 973 fig = ax.get_figure() 974 self.reconstructed_spectrogram.plot(ax) 975 ax.set_title( 976 f"Retrieved (G': {self.get_error():0.2e}; G: {self.get_G_error():0.2e})" 977 ) 978 return fig
Plot the reconstructed spectrogram.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
980 def plot_pulse( 981 self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None 982 ): 983 """ 984 Plot the reconstructed pulse. 985 986 Args: 987 ax: optionally plot onto a pre-existing matplotlib Axes 988 phase_blanking: only show phase information (instantaneous frequency) above this level relative to max intensity 989 xlim: pass arguments to set_xlim() to constrain the x-axis 990 """ 991 return self.pulse.to_complex_envelope().plot(ax, phase_blanking, xlim)
Plot the reconstructed pulse.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
- phase_blanking: only show phase information (instantaneous frequency) above this level relative to max intensity
- xlim: pass arguments to set_xlim() to constrain the x-axis
993 def plot_spectrum( 994 self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None 995 ): 996 """ 997 Plot the reconstructed spectrum and group delay curve. 998 999 Args: 1000 ax: optionally plot onto a pre-existing matplotlib Axes 1001 phase_blanking: only show phase information (group delay) above this level relative to max intensity 1002 xlim: pass arguments to set_xlim() to constrain the x-axis 1003 """ 1004 return self.spectrum.to_intensity_spectrum().plot_with_group_delay( 1005 ax, phase_blanking, xlim 1006 )
Plot the reconstructed spectrum and group delay curve.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
- phase_blanking: only show phase information (group delay) above this level relative to max intensity
- xlim: pass arguments to set_xlim() to constrain the x-axis
1008 def plot_all( 1009 self, phase_blanking=0.05, time_xlims=None, wavelength_xlims=None, figsize=None 1010 ): 1011 """ 1012 Produce a 4-panel plot of the FROG results, combining calls to plot_measured_spectrogram(), 1013 plot_reconstructed_spectrogram(), plot_pulse() and plot_spectrum() as subplots, with letter labels. 1014 1015 Args: 1016 phase_blanking: relative intensity at which to show phase information 1017 time_xlim: x-axis limits to pass to the plot of the pulse 1018 wavelength_xlim: x-axis limits to pass to the plot of the spectrum 1019 figsize: custom figure size 1020 """ 1021 if figsize is None: 1022 default_figsize = plt.rcParams["figure.figsize"] 1023 figsize = (default_figsize[0] * 2, default_figsize[1] * 2) 1024 fig, ax = plt.subplots(2, 2, figsize=figsize) 1025 self.plot_measured_spectrogram(ax[0, 0]) 1026 label_letter("a", ax[0, 0]) 1027 self.plot_reconstructed_spectrogram(ax[1, 0]) 1028 label_letter("b", ax[1, 0]) 1029 self.plot_pulse(ax[0, 1], xlim=time_xlims) 1030 label_letter("c", ax[0, 1]) 1031 self.plot_spectrum(ax[1, 1], xlim=wavelength_xlims) 1032 label_letter("d", ax[1, 1]) 1033 return fig
Produce a 4-panel plot of the FROG results, combining calls to plot_measured_spectrogram(), plot_reconstructed_spectrogram(), plot_pulse() and plot_spectrum() as subplots, with letter labels.
Arguments:
- phase_blanking: relative intensity at which to show phase information
- time_xlim: x-axis limits to pass to the plot of the pulse
- wavelength_xlim: x-axis limits to pass to the plot of the spectrum
- figsize: custom figure size
1035 def get_error(self) -> float: 1036 """ 1037 Get the G' error of the reconstruction 1038 """ 1039 norm_measured = np.linalg.norm(self.measured_spectrogram.data) 1040 norm_retrieved = np.linalg.norm(self.reconstructed_spectrogram.data) 1041 return np.sqrt( 1042 np.sum( 1043 ( 1044 self.measured_spectrogram.data[:] / norm_measured 1045 - self.reconstructed_spectrogram.data[:] / norm_retrieved 1046 ) 1047 ** 2 1048 ) 1049 / np.sum((self.measured_spectrogram.data[:] / norm_measured) ** 2) 1050 )
Get the G' error of the reconstruction
1052 def get_G_error(self) -> float: 1053 """ 1054 Get the G (note: no apostrophe) error. This one doesn't mean much, but is useful 1055 for comparing reconstructions of the same spectrogram between different programs. 1056 """ 1057 return np.sqrt( 1058 (1.0 / float(len(self.measured_spectrogram.data[:]) ** 2)) 1059 * np.sum( 1060 ( 1061 self.measured_spectrogram.data[:] 1062 / np.max(self.measured_spectrogram.data[:]) 1063 - self.reconstructed_spectrogram.data[:] 1064 / np.max(self.reconstructed_spectrogram.data[:]) 1065 ) 1066 ** 2 1067 ) 1068 )
Get the G (note: no apostrophe) error. This one doesn't mean much, but is useful for comparing reconstructions of the same spectrogram between different programs.
1070 def get_fwhm(self) -> float: 1071 """ 1072 Get the full-width-at-half-max value of the reconstructed pulse 1073 """ 1074 return self.pulse.get_envelope_fwhm()
Get the full-width-at-half-max value of the reconstructed pulse
32 def from_dict(cls, data: dict): 33 """ 34 Takes a dict and makes an instance of the class 35 36 Args: 37 data (dict): the result of a call of .to_dict on the class 38 """ 39 40 def handle_complex_array(serialized_array) -> np.ndarray: 41 """Helper function to deserialize numpy arrays, handling complex types""" 42 if isinstance(serialized_array, list) and all( 43 isinstance(item, dict) and "re" in item and "im" in item 44 for item in serialized_array 45 ): 46 return np.array( 47 [complex(item["re"], item["im"]) for item in serialized_array], 48 dtype=np.complex128, 49 ) 50 return np.array(serialized_array) 51 52 loaded_data = {} 53 for field_name, field_type in cls.__annotations__.items(): 54 if field_type is np.ndarray: 55 loaded_data[field_name] = handle_complex_array(data[field_name]) 56 elif is_dataclass(field_type): 57 loaded_data[field_name] = field_type.from_dict(data[field_name]) 58 else: 59 loaded_data[field_name] = data[field_name] 60 return cls(**loaded_data)
Takes a dict and makes an instance of the class
Arguments:
- data (dict): the result of a call of .to_dict on the class
62 def load_yaml(cls, filename: str): 63 """ 64 load from a yaml file 65 66 Args: 67 filename (str): path to the file 68 """ 69 with open(filename, "r") as file: 70 data = yaml.load(file, yaml.SafeLoader) 71 return cls.from_dict(data)
load from a yaml file
Arguments:
- filename (str): path to the file
84 def to_dict(instance): 85 """ 86 serialize the class into a dict 87 """ 88 data_dict = {} 89 for field_name, field_type in instance.__annotations__.items(): 90 field_value = getattr(instance, field_name) 91 if field_type is np.ndarray: 92 if field_value.dtype == np.complex128: 93 data_dict[field_name] = [ 94 {"re": num.real, "im": num.imag} for num in field_value.tolist() 95 ] 96 else: 97 data_dict[field_name] = field_value.tolist() 98 elif is_dataclass(field_type): 99 data_dict[field_name] = field_value.to_dict() 100 elif field_type is np.float64 or field_type is float: 101 data_dict[field_name] = float(field_value) 102 else: 103 data_dict[field_name] = field_value 104 return data_dict
serialize the class into a dict
73 def save_yaml(instance, filename: str): 74 """ 75 save to a yaml file 76 77 Args: 78 filename (str): path to the file 79 """ 80 data_dict = instance.to_dict() 81 with open(filename, "w") as file: 82 yaml.dump(data_dict, file)
save to a yaml file
Arguments:
- filename (str): path to the file
144def read_Trebino_FROG_matrix(filename: Path | str) -> Spectrogram: 145 """ 146 Read a spectrogram file made by the Trebino FROG code 147 148 Args: 149 filename (Path | str): the name (path) of the file 150 """ 151 with open(filename, "r") as f: 152 line = str(f.readline()) 153 line = line.split() 154 n1 = int(line[0]) 155 n2 = int(line[1]) 156 line = str(f.readline()) 157 line = line.split() 158 measured_data = pd.read_csv(filename, sep="\t", header=None, skiprows=2) 159 measure = [] 160 raw_freq = ( 161 1e9 * constants.speed_of_light / np.array(measured_data[0][0:n2]).squeeze() 162 ) 163 df = np.mean(np.diff(raw_freq)) 164 freq = raw_freq[0] + df * np.array(range(raw_freq.shape[0])) 165 time = 1e-15 * np.array(measured_data[0][n2 : (n2 + n1)]).squeeze() 166 for i in range(n1): 167 measure.append(measured_data[0][(i + 2) * n2 : (i + 3) * n2]) 168 data = np.array(measure) 169 return Spectrogram(data=data, time=time, freq=freq)
Read a spectrogram file made by the Trebino FROG code
Arguments:
- filename (Path | str): the name (path) of the file
172def read_Trebino_FROG_speck(filename: Path | str) -> ComplexSpectrum: 173 """ 174 Read a .Speck file made by the Trebino FROG code 175 176 Args: 177 filename (Path | str): the name (path) of the file 178 """ 179 data = np.array(pd.read_csv(filename, sep="\t", header=None), dtype=float) 180 raw_freq = 1e9 * constants.speed_of_light / data[:, 0] 181 df = np.mean(np.diff(raw_freq)) 182 freq = np.linspace(0.0, raw_freq[-1], int(np.ceil(raw_freq[-1] / df))) 183 spectrum = interpolate(freq, raw_freq, data[:, 3]) + 1j * interpolate( 184 freq, raw_freq, data[:, 4] 185 ) 186 return ComplexSpectrum(spectrum=spectrum, freq=freq)
Read a .Speck file made by the Trebino FROG code
Arguments:
- filename (Path | str): the name (path) of the file
189def read_Trebino_FROG_data(filename: str) -> FrogData: 190 """ 191 Read a set of data produced by the Trebino FROG reconstruction code. 192 193 Args: 194 filename: Base filename of the .bin file; e.g. if the data are mydata.bin.Speck.dat etc., this will be "mydata.bin" """ 195 spectrum = read_Trebino_FROG_speck(filename + ".Speck.dat") 196 pulse = spectrum.to_centered_waveform() 197 measured_spectrogram = read_Trebino_FROG_matrix(filename + ".A.dat") 198 reconstructed_spectrogram = read_Trebino_FROG_matrix(filename + ".Arecon.dat") 199 raw_speck = np.array( 200 pd.read_csv(filename + ".Speck.dat", sep="\t", header=None), dtype=float 201 ) 202 raw_ek = np.array( 203 pd.read_csv(filename + ".Ek.dat", sep="\t", header=None), dtype=float 204 ) 205 f0 = 1e9 * np.mean(constants.speed_of_light / raw_speck[:, 0]) 206 dt = 1e-15 * (raw_ek[1, 0] - raw_ek[0, 0]) 207 raw_reconstruction = raw_ek[:, 3] + 1.0j * raw_ek[:, 4] 208 return FrogData( 209 spectrum=spectrum, 210 pulse=pulse, 211 measured_spectrogram=measured_spectrogram, 212 reconstructed_spectrogram=reconstructed_spectrogram, 213 raw_reconstruction=raw_reconstruction, 214 dt=dt, 215 f0=float(f0), 216 )
Read a set of data produced by the Trebino FROG reconstruction code.
Arguments:
- filename: Base filename of the .bin file; e.g. if the data are mydata.bin.Speck.dat etc., this will be "mydata.bin"
39class LunaResult: 40 """Loads and handles the Luna simulation result. 41 42 The result must be in the HDF5 format using the saving option in the Luna.Interface.prop_capillary() function [filepath="..."]. 43 As opposed to most of the analysis routines here, units are in SI! 44 45 Attributes: 46 z: position along the fiber (for the field data) 47 fieldFT: complex FFT of the field (positive freq axis); shape: (n_z, n_modes, n_freq) [or (n_z, n_freq) after mode selection/averaging] 48 omega: angular frequency axis for the FFT field; shape: (n_z, n_modes, n_freq) [or (n_z, n_freq) after mode selection/averaging] 49 50 stats_z: position along the fiber (for stats data) 51 stats_density: particle density along the fiber (m^-3) 52 stats_electrondensity: electron density along the fiber (m^-3) 53 stats_pressure: gas pressure profile along the fiber (bar) 54 stats_energy: pulse energy along the fiber (J) 55 stats_peakpower: peak power of the pulse along the fiber (W) 56 stats_peakintensity: peak intensity of the pulse along the fiber 57 stats_peak_ionization_rate: peak ionization rate along the fiber 58 """ 59 60 def __init__(self, filename): 61 """Constructor of class LunaResult. 62 63 Args: 64 filename: saved result, file path 65 """ 66 67 self.filename = filename 68 self.fieldFT = None 69 self.omega = None 70 self.z = None 71 72 self.stats_z = None 73 self.stats_energy = None 74 self.stats_electrondensity = None 75 self.stats_density = None 76 self.stats_pressure = None 77 self.stats_peakpower = None 78 self.stats_peakintensity = None 79 self.stats_zdw = None 80 self.stats_peak_ionization_rate = None 81 82 self.open_Luna_result(filename) 83 84 def open_Luna_result(self, filename): 85 """Opens the Luna result file and loads the data""" 86 with h5py.File(filename, "r") as data: 87 # FIELD DATA 88 self.fieldFT = np.array(data["Eω"]) 89 self.omega = np.array(data["grid"]["ω"]) 90 self.z = np.array(data["z"]) 91 92 # STATS 93 self.stats_z = np.array(data["stats"]["z"]) 94 self.stats_energy = np.array(data["stats"]["energy"]) 95 self.stats_electrondensity = np.array(data["stats"]["electrondensity"]) 96 self.stats_density = np.array(data["stats"]["density"]) 97 self.stats_pressure = np.array(data["stats"]["pressure"]) 98 self.stats_peakpower = np.array(data["stats"]["peakpower"]) 99 self.stats_peakintensity = np.array(data["stats"]["peakintensity"]) 100 self.stats_zdw = np.array(data["stats"]["zdw"]) 101 self.stats_peak_ionization_rate = np.array( 102 data["stats"]["peak_ionisation_rate"] 103 ) 104 105 def average_modes(self): 106 """Averages the propagation modes in the Luna result file""" 107 if len(self.fieldFT.shape) == 3: 108 self.fieldFT = np.mean(self.fieldFT, axis=1) 109 self.stats_zdw = None 110 self.stats_peakpower = None 111 self.stats_energy = np.sum(self.stats_energy, axis=1) 112 113 def select_mode(self, mode: int): 114 if len(self.fieldFT.shape) < 3: 115 print("WARNING: No mode to select") 116 elif mode >= self.fieldFT.shape[1] or mode < 0: 117 print("WARNING: mode ", mode, " is out of range") 118 else: 119 self.fieldFT = self.fieldFT[:, mode, :] 120 self.stats_zdw = self.stats_zdw[:, mode] 121 self.stats_peakpower = self.stats_peakpower[:, mode] 122 self.stats_energy = self.stats_energy[:, mode] 123 124 def get_time_field(self, position=None): 125 """Get the electric field in time from the Luna result file. If no mode was previously selected, the method computes the average of all modes. 126 Therefore, after calling get_time_field(), mode selection is not possible any more. 127 128 Args: 129 position (float): position along the fiber in m. If None, the end of the fiber is used. 130 131 Returns: 132 timeV (numpy.ndarray): time axis in seconds 133 fieldV (numpy.ndarray): electric field in V/m 134 """ 135 self.average_modes() 136 if position is None: 137 position = self.z[-1] 138 index = np.argmin(np.abs(self.z - position)) 139 if position > np.max(self.z) or position < np.min(self.z): 140 print("WARNING: position ", position, "m is out of range") 141 check_equal_length(self.fieldFT[index], self.omega) 142 fieldFFT = np.concatenate( 143 (self.fieldFT[index, :], np.conjugate(self.fieldFT[index, :][::-1]) * 0) 144 ) 145 freq = np.concatenate((self.omega, -self.omega[::-1])) / 2 / np.pi 146 timeV, fieldV = inverse_fourier_transform(freq, fieldFFT) 147 return timeV, np.real(fieldV) 148 149 def get_wavelength_spectrum(self, position=None): 150 """Get the spectrum from the Luna result file (|FFT|^2 * (2 * pi * c / λ^2)) 151 152 Args: 153 position (float): position along the fiber in m. If None, the end of the fiber is used. 154 155 Returns: 156 wvl (numpy.ndarray): wavelength axis in m 157 wvlSpectrum (numpy.ndarray): electric field spectrum in V/m 158 """ 159 self.average_modes() 160 if position is None: 161 position = self.z[-1] 162 index = np.argmin(np.abs(self.z - position)) 163 if position > np.max(self.z) or position < np.min(self.z): 164 print("WARNING: position ", position, "m is out of range") 165 wvl = 2 * np.pi * constants.speed_of_light / self.omega[::-1] 166 wvlSpectrum = np.abs(self.fieldFT[index, ::-1]) ** 2 * ( 167 2 * np.pi * constants.speed_of_light / wvl**2 168 ) 169 return wvl, wvlSpectrum 170 171 def get_spectral_phase(self, position=None): 172 """Get the spectral phase from the Luna result file 173 174 Args: 175 position (float): position along the fiber in m. If None, the end of the fiber is used. 176 177 Returns: 178 wvl (numpy.ndarray): wavelength axis in m 179 phase (numpy.ndarray): spectral phase in rad 180 """ 181 self.average_modes() 182 if position is None: 183 position = self.z[-1] 184 index = np.argmin(np.abs(self.z - position)) 185 if position > np.max(self.z) or position < np.min(self.z): 186 print("WARNING: position ", position, "m is out of range") 187 wvl = 2 * np.pi * constants.speed_of_light / self.omega[::-1] 188 phase = np.angle(self.fieldFT[index, ::-1]) 189 return wvl, phase 190 191 def plot_stats(self): 192 """Plots the 'stats_' attribute of the simulation stored in the present object.""" 193 194 fig, axs = plt.subplots(2, 2, figsize=[7, 5]) 195 axs[0, 0].plot(self.stats_z, self.stats_pressure) 196 axs[0, 0].set_xlabel("z (m)") 197 axs[0, 0].set_ylabel("gas pressure (bar)") 198 ax2 = axs[0, 0].twinx() 199 ax2.plot(self.stats_z, self.stats_density, color="r") 200 ax2.set_ylabel("gas particle density ($m^{-3}$)", color="r") 201 ax2.tick_params(axis="y", colors="r") 202 ax2.yaxis.label.set_color("r") 203 axs[0, 1].plot(self.stats_z, self.stats_electrondensity) 204 axs[0, 1].set_xlabel("z (m)") 205 axs[0, 1].set_ylabel("electron density ($m^{-3}$)") 206 ax2 = axs[0, 1].twinx() 207 if len(self.stats_energy.shape) == 2: 208 ax2.plot(self.stats_z, np.sum(self.stats_energy, axis=1) * 1e6, color="r") 209 else: 210 ax2.plot(self.stats_z, self.stats_energy * 1e6, color="r") 211 ax2.set_ylabel(f"pulse energy ({Char.mu}J)", color="r") 212 ax2.tick_params(axis="y", colors="r") 213 ax2.yaxis.label.set_color("r") 214 if self.stats_peakpower is not None: 215 if len(self.stats_peakpower.shape) == 2: 216 axs[1, 0].plot(self.stats_z, self.stats_peakpower[:, 0]) 217 axs[1, 0].set_xlabel("z (m)") 218 axs[1, 0].set_ylabel("peak power (W)") 219 elif len(self.stats_peakpower.shape) == 1: 220 axs[1, 0].plot(self.stats_z, self.stats_peakpower) 221 axs[1, 0].set_xlabel("z (m)") 222 axs[1, 0].set_ylabel("peak power (W)") 223 axs[1, 1].plot(self.stats_z, self.stats_peak_ionization_rate) 224 axs[1, 1].set_xlabel("z (m)") 225 axs[1, 1].set_ylabel("peak ionization rate") 226 plt.tight_layout() 227 228 return fig
Loads and handles the Luna simulation result.
The result must be in the HDF5 format using the saving option in the Luna.Interface.prop_capillary() function [filepath="..."]. As opposed to most of the analysis routines here, units are in SI!
Attributes:
- z: position along the fiber (for the field data)
- fieldFT: complex FFT of the field (positive freq axis); shape: (n_z, n_modes, n_freq) [or (n_z, n_freq) after mode selection/averaging]
- omega: angular frequency axis for the FFT field; shape: (n_z, n_modes, n_freq) [or (n_z, n_freq) after mode selection/averaging]
- stats_z: position along the fiber (for stats data)
- stats_density: particle density along the fiber (m^-3)
- stats_electrondensity: electron density along the fiber (m^-3)
- stats_pressure: gas pressure profile along the fiber (bar)
- stats_energy: pulse energy along the fiber (J)
- stats_peakpower: peak power of the pulse along the fiber (W)
- stats_peakintensity: peak intensity of the pulse along the fiber
- stats_peak_ionization_rate: peak ionization rate along the fiber
60 def __init__(self, filename): 61 """Constructor of class LunaResult. 62 63 Args: 64 filename: saved result, file path 65 """ 66 67 self.filename = filename 68 self.fieldFT = None 69 self.omega = None 70 self.z = None 71 72 self.stats_z = None 73 self.stats_energy = None 74 self.stats_electrondensity = None 75 self.stats_density = None 76 self.stats_pressure = None 77 self.stats_peakpower = None 78 self.stats_peakintensity = None 79 self.stats_zdw = None 80 self.stats_peak_ionization_rate = None 81 82 self.open_Luna_result(filename)
Constructor of class LunaResult.
Arguments:
- filename: saved result, file path
84 def open_Luna_result(self, filename): 85 """Opens the Luna result file and loads the data""" 86 with h5py.File(filename, "r") as data: 87 # FIELD DATA 88 self.fieldFT = np.array(data["Eω"]) 89 self.omega = np.array(data["grid"]["ω"]) 90 self.z = np.array(data["z"]) 91 92 # STATS 93 self.stats_z = np.array(data["stats"]["z"]) 94 self.stats_energy = np.array(data["stats"]["energy"]) 95 self.stats_electrondensity = np.array(data["stats"]["electrondensity"]) 96 self.stats_density = np.array(data["stats"]["density"]) 97 self.stats_pressure = np.array(data["stats"]["pressure"]) 98 self.stats_peakpower = np.array(data["stats"]["peakpower"]) 99 self.stats_peakintensity = np.array(data["stats"]["peakintensity"]) 100 self.stats_zdw = np.array(data["stats"]["zdw"]) 101 self.stats_peak_ionization_rate = np.array( 102 data["stats"]["peak_ionisation_rate"] 103 )
Opens the Luna result file and loads the data
105 def average_modes(self): 106 """Averages the propagation modes in the Luna result file""" 107 if len(self.fieldFT.shape) == 3: 108 self.fieldFT = np.mean(self.fieldFT, axis=1) 109 self.stats_zdw = None 110 self.stats_peakpower = None 111 self.stats_energy = np.sum(self.stats_energy, axis=1)
Averages the propagation modes in the Luna result file
113 def select_mode(self, mode: int): 114 if len(self.fieldFT.shape) < 3: 115 print("WARNING: No mode to select") 116 elif mode >= self.fieldFT.shape[1] or mode < 0: 117 print("WARNING: mode ", mode, " is out of range") 118 else: 119 self.fieldFT = self.fieldFT[:, mode, :] 120 self.stats_zdw = self.stats_zdw[:, mode] 121 self.stats_peakpower = self.stats_peakpower[:, mode] 122 self.stats_energy = self.stats_energy[:, mode]
124 def get_time_field(self, position=None): 125 """Get the electric field in time from the Luna result file. If no mode was previously selected, the method computes the average of all modes. 126 Therefore, after calling get_time_field(), mode selection is not possible any more. 127 128 Args: 129 position (float): position along the fiber in m. If None, the end of the fiber is used. 130 131 Returns: 132 timeV (numpy.ndarray): time axis in seconds 133 fieldV (numpy.ndarray): electric field in V/m 134 """ 135 self.average_modes() 136 if position is None: 137 position = self.z[-1] 138 index = np.argmin(np.abs(self.z - position)) 139 if position > np.max(self.z) or position < np.min(self.z): 140 print("WARNING: position ", position, "m is out of range") 141 check_equal_length(self.fieldFT[index], self.omega) 142 fieldFFT = np.concatenate( 143 (self.fieldFT[index, :], np.conjugate(self.fieldFT[index, :][::-1]) * 0) 144 ) 145 freq = np.concatenate((self.omega, -self.omega[::-1])) / 2 / np.pi 146 timeV, fieldV = inverse_fourier_transform(freq, fieldFFT) 147 return timeV, np.real(fieldV)
Get the electric field in time from the Luna result file. If no mode was previously selected, the method computes the average of all modes. Therefore, after calling get_time_field(), mode selection is not possible any more.
Arguments:
- position (float): position along the fiber in m. If None, the end of the fiber is used.
Returns:
timeV (numpy.ndarray): time axis in seconds fieldV (numpy.ndarray): electric field in V/m
149 def get_wavelength_spectrum(self, position=None): 150 """Get the spectrum from the Luna result file (|FFT|^2 * (2 * pi * c / λ^2)) 151 152 Args: 153 position (float): position along the fiber in m. If None, the end of the fiber is used. 154 155 Returns: 156 wvl (numpy.ndarray): wavelength axis in m 157 wvlSpectrum (numpy.ndarray): electric field spectrum in V/m 158 """ 159 self.average_modes() 160 if position is None: 161 position = self.z[-1] 162 index = np.argmin(np.abs(self.z - position)) 163 if position > np.max(self.z) or position < np.min(self.z): 164 print("WARNING: position ", position, "m is out of range") 165 wvl = 2 * np.pi * constants.speed_of_light / self.omega[::-1] 166 wvlSpectrum = np.abs(self.fieldFT[index, ::-1]) ** 2 * ( 167 2 * np.pi * constants.speed_of_light / wvl**2 168 ) 169 return wvl, wvlSpectrum
Get the spectrum from the Luna result file (|FFT|^2 * (2 * pi * c / λ^2))
Arguments:
- position (float): position along the fiber in m. If None, the end of the fiber is used.
Returns:
wvl (numpy.ndarray): wavelength axis in m wvlSpectrum (numpy.ndarray): electric field spectrum in V/m
171 def get_spectral_phase(self, position=None): 172 """Get the spectral phase from the Luna result file 173 174 Args: 175 position (float): position along the fiber in m. If None, the end of the fiber is used. 176 177 Returns: 178 wvl (numpy.ndarray): wavelength axis in m 179 phase (numpy.ndarray): spectral phase in rad 180 """ 181 self.average_modes() 182 if position is None: 183 position = self.z[-1] 184 index = np.argmin(np.abs(self.z - position)) 185 if position > np.max(self.z) or position < np.min(self.z): 186 print("WARNING: position ", position, "m is out of range") 187 wvl = 2 * np.pi * constants.speed_of_light / self.omega[::-1] 188 phase = np.angle(self.fieldFT[index, ::-1]) 189 return wvl, phase
Get the spectral phase from the Luna result file
Arguments:
- position (float): position along the fiber in m. If None, the end of the fiber is used.
Returns:
wvl (numpy.ndarray): wavelength axis in m phase (numpy.ndarray): spectral phase in rad
191 def plot_stats(self): 192 """Plots the 'stats_' attribute of the simulation stored in the present object.""" 193 194 fig, axs = plt.subplots(2, 2, figsize=[7, 5]) 195 axs[0, 0].plot(self.stats_z, self.stats_pressure) 196 axs[0, 0].set_xlabel("z (m)") 197 axs[0, 0].set_ylabel("gas pressure (bar)") 198 ax2 = axs[0, 0].twinx() 199 ax2.plot(self.stats_z, self.stats_density, color="r") 200 ax2.set_ylabel("gas particle density ($m^{-3}$)", color="r") 201 ax2.tick_params(axis="y", colors="r") 202 ax2.yaxis.label.set_color("r") 203 axs[0, 1].plot(self.stats_z, self.stats_electrondensity) 204 axs[0, 1].set_xlabel("z (m)") 205 axs[0, 1].set_ylabel("electron density ($m^{-3}$)") 206 ax2 = axs[0, 1].twinx() 207 if len(self.stats_energy.shape) == 2: 208 ax2.plot(self.stats_z, np.sum(self.stats_energy, axis=1) * 1e6, color="r") 209 else: 210 ax2.plot(self.stats_z, self.stats_energy * 1e6, color="r") 211 ax2.set_ylabel(f"pulse energy ({Char.mu}J)", color="r") 212 ax2.tick_params(axis="y", colors="r") 213 ax2.yaxis.label.set_color("r") 214 if self.stats_peakpower is not None: 215 if len(self.stats_peakpower.shape) == 2: 216 axs[1, 0].plot(self.stats_z, self.stats_peakpower[:, 0]) 217 axs[1, 0].set_xlabel("z (m)") 218 axs[1, 0].set_ylabel("peak power (W)") 219 elif len(self.stats_peakpower.shape) == 1: 220 axs[1, 0].plot(self.stats_z, self.stats_peakpower) 221 axs[1, 0].set_xlabel("z (m)") 222 axs[1, 0].set_ylabel("peak power (W)") 223 axs[1, 1].plot(self.stats_z, self.stats_peak_ionization_rate) 224 axs[1, 1].set_xlabel("z (m)") 225 axs[1, 1].set_ylabel("peak ionization rate") 226 plt.tight_layout() 227 228 return fig
Plots the 'stats_' attribute of the simulation stored in the present object.
27def yaml_io(cls): 28 """ 29 Adds functions to save and load the dataclass as yaml 30 """ 31 32 def from_dict(cls, data: dict): 33 """ 34 Takes a dict and makes an instance of the class 35 36 Args: 37 data (dict): the result of a call of .to_dict on the class 38 """ 39 40 def handle_complex_array(serialized_array) -> np.ndarray: 41 """Helper function to deserialize numpy arrays, handling complex types""" 42 if isinstance(serialized_array, list) and all( 43 isinstance(item, dict) and "re" in item and "im" in item 44 for item in serialized_array 45 ): 46 return np.array( 47 [complex(item["re"], item["im"]) for item in serialized_array], 48 dtype=np.complex128, 49 ) 50 return np.array(serialized_array) 51 52 loaded_data = {} 53 for field_name, field_type in cls.__annotations__.items(): 54 if field_type is np.ndarray: 55 loaded_data[field_name] = handle_complex_array(data[field_name]) 56 elif is_dataclass(field_type): 57 loaded_data[field_name] = field_type.from_dict(data[field_name]) 58 else: 59 loaded_data[field_name] = data[field_name] 60 return cls(**loaded_data) 61 62 def load_yaml(cls, filename: str): 63 """ 64 load from a yaml file 65 66 Args: 67 filename (str): path to the file 68 """ 69 with open(filename, "r") as file: 70 data = yaml.load(file, yaml.SafeLoader) 71 return cls.from_dict(data) 72 73 def save_yaml(instance, filename: str): 74 """ 75 save to a yaml file 76 77 Args: 78 filename (str): path to the file 79 """ 80 data_dict = instance.to_dict() 81 with open(filename, "w") as file: 82 yaml.dump(data_dict, file) 83 84 def to_dict(instance): 85 """ 86 serialize the class into a dict 87 """ 88 data_dict = {} 89 for field_name, field_type in instance.__annotations__.items(): 90 field_value = getattr(instance, field_name) 91 if field_type is np.ndarray: 92 if field_value.dtype == np.complex128: 93 data_dict[field_name] = [ 94 {"re": num.real, "im": num.imag} for num in field_value.tolist() 95 ] 96 else: 97 data_dict[field_name] = field_value.tolist() 98 elif is_dataclass(field_type): 99 data_dict[field_name] = field_value.to_dict() 100 elif field_type is np.float64 or field_type is float: 101 data_dict[field_name] = float(field_value) 102 else: 103 data_dict[field_name] = field_value 104 return data_dict 105 106 cls.from_dict = classmethod(from_dict) 107 cls.load_yaml = classmethod(load_yaml) 108 cls.to_dict = to_dict 109 cls.save_yaml = save_yaml 110 return cls
Adds functions to save and load the dataclass as yaml
283@yaml_io 284@dataclass(frozen=True, slots=True) 285class Waveform: 286 """ 287 Contains data describing an electric field waveform. 288 In SI units. 289 290 Attributes: 291 wave (np.ndarray): the waveform data 292 time (np.ndarray): time vector corresponding to the wave array 293 dt (float): the time step of time, if it is uniformly spaced 294 is_uniformly_spaced (bool): says whether or not the time steps are uniform 295 """ 296 297 wave: np.ndarray 298 time: np.ndarray 299 dt: float 300 is_uniformly_spaced: bool = False 301 302 def lock(self): 303 """ 304 Make the data immutable 305 """ 306 self.wave.setflags(write=False) 307 self.time.setflags(write=False) 308 309 def copy(self): 310 """ 311 Returns a deep copy 312 """ 313 return copy.deepcopy(self) 314 315 def time_fs(self): 316 if self.time is not None: 317 return 1e15 * self.time 318 else: 319 raise Exception("No data") 320 321 def to_uniformly_spaced(self): 322 """ 323 Returns a version of the struct with uniformly spaced time 324 """ 325 if self.is_uniformly_spaced: 326 return self 327 else: 328 timesteps = np.abs(np.diff(self.time)) 329 new_dt = np.min(timesteps[timesteps > 0.0]) 330 new_time_length = self.time[-1] - self.time[0] 331 new_time = self.time[0] + new_dt * np.array( 332 range(int(new_time_length / new_dt)) 333 ) 334 return Waveform( 335 wave=interpolate(new_time, self.time, self.wave), 336 time=new_time, 337 dt=new_dt, 338 is_uniformly_spaced=True, 339 ) 340 341 def to_windowed(self, window_desc): 342 """ 343 Create a windowed version of the waveform. Output will be uniformly spaced in time, even if current state isn't. 344 345 Args: 346 window_desc: String or tuple describing the desired window in the same format as scipy.signal.windows.get_window. 347 348 Examples: 349 >>> waveform_with_tukey_window = waveform.to_windowed('tukey') 350 >>> waveform_with_supergaussian_window = waveform.to_windowed(('general_gaussian', 2, 500)) 351 """ 352 uniform_self = self.to_uniformly_spaced() 353 new_wave = uniform_self.wave * sig.windows.get_window( 354 window_desc, Nx=uniform_self.wave.shape[0] 355 ) 356 return Waveform( 357 wave=new_wave, 358 time=uniform_self.time, 359 dt=uniform_self.dt, 360 is_uniformly_spaced=True, 361 ) 362 363 def to_bandpassed(self, frequency: float, sigma: float, order: int = 4): 364 r""" 365 Apply a bandpass filter, with the same spec as to_bandpassed method of ComplexSpectrum: 366 $e^{\frac{(f-f_0)^r}{2\sigma^r}}$ 367 where $f_0$ is the frequency argument, $\sigma$ is the sigma argument, and r is the order argument 368 369 Args: 370 frequency: the central frequency (Hz) of the bandpass 371 sigma: the width of the bandpass (Hz) 372 order: the order of the supergaussian 373 """ 374 return ( 375 self.to_complex_spectrum() 376 .to_bandpassed(frequency, sigma, order) 377 .to_waveform() 378 ) 379 380 def to_complex_spectrum(self, padding_factor: int = 1): 381 """ 382 Converts to a ComplexSpectrum class 383 384 Args: 385 padding_factor (int): factor by which to expand the temporal length in the FFT, giving a smoother spectrum 386 """ 387 if self.is_uniformly_spaced: 388 new_spectrum = np.fft.rfft( 389 self.wave, n=self.wave.shape[0] * padding_factor, axis=0 390 ) 391 new_freq = np.fft.rfftfreq(self.wave.shape[0] * padding_factor, d=self.dt) 392 else: 393 uniform_self = self.to_uniformly_spaced() 394 new_spectrum = np.fft.rfft( 395 uniform_self.wave, n=uniform_self.wave.shape[0] * padding_factor, axis=0 396 ) 397 new_freq = np.fft.rfftfreq( 398 uniform_self.wave.shape[0] * padding_factor, d=uniform_self.dt 399 ) 400 401 return ComplexSpectrum(spectrum=new_spectrum, freq=new_freq) 402 403 def to_intensity_spectrum( 404 self, wavelength_scaled: bool = True, padding_factor: int = 1 405 ): 406 """ 407 Converts to an intensity spectrum 408 409 Args: 410 wavelength_scaled (bool): Correct the spectral intensities for plotting on a wavelength scale 411 """ 412 return self.to_complex_spectrum(padding_factor).to_intensity_spectrum( 413 wavelength_scaled 414 ) 415 416 def to_time_derivative(self): 417 """ 418 Return the time-derivative of the waveform 419 """ 420 return self.to_complex_spectrum().to_time_derivative().to_waveform() 421 422 def to_normalized(self): 423 """ 424 Return a normalized version of the waveform 425 """ 426 max_loc, max_val = find_maximum_location( 427 np.abs(np.array(sig.hilbert(self.wave))) 428 ) 429 return Waveform( 430 wave=self.wave / max_val, 431 time=np.array(self.time), 432 dt=self.dt, 433 is_uniformly_spaced=self.is_uniformly_spaced, 434 ) 435 436 def to_complex_envelope(self, f0: float = 0.0): 437 """ 438 Return a ComplexEnvelope class corresponding to the waveform 439 440 Args: 441 f0 (float): central frequency to use when constructing the envelope. E.g. oscillation at this frequency will be cancelled. 442 """ 443 uniform_self = self.to_uniformly_spaced() 444 analytic = np.array( 445 sig.hilbert(uniform_self.wave) 446 * np.exp(-1j * 2 * np.pi * f0 * uniform_self.time) 447 ) 448 return ComplexEnvelope( 449 envelope=analytic, 450 time=uniform_self.time, 451 dt=uniform_self.dt, 452 carrier_frequency=f0, 453 ) 454 455 def get_envelope_fwhm(self) -> float: 456 """ 457 Get the full-width-at-half-maximum of the intensity envelope. 458 459 Returns: 460 float: the FWHM 461 """ 462 return self.to_complex_envelope().get_fwhm() 463 464 def get_field_squared_fwhm(self): 465 """ 466 Get the full-width-at-half-maximum of the square of the field 467 468 Returns: 469 float: the FWHM 470 """ 471 uniform_self = self.to_uniformly_spaced() 472 return fwhm(uniform_self.wave**2, uniform_self.dt)
Contains data describing an electric field waveform. In SI units.
Attributes:
- wave (np.ndarray): the waveform data
- time (np.ndarray): time vector corresponding to the wave array
- dt (float): the time step of time, if it is uniformly spaced
- is_uniformly_spaced (bool): says whether or not the time steps are uniform
302 def lock(self): 303 """ 304 Make the data immutable 305 """ 306 self.wave.setflags(write=False) 307 self.time.setflags(write=False)
Make the data immutable
321 def to_uniformly_spaced(self): 322 """ 323 Returns a version of the struct with uniformly spaced time 324 """ 325 if self.is_uniformly_spaced: 326 return self 327 else: 328 timesteps = np.abs(np.diff(self.time)) 329 new_dt = np.min(timesteps[timesteps > 0.0]) 330 new_time_length = self.time[-1] - self.time[0] 331 new_time = self.time[0] + new_dt * np.array( 332 range(int(new_time_length / new_dt)) 333 ) 334 return Waveform( 335 wave=interpolate(new_time, self.time, self.wave), 336 time=new_time, 337 dt=new_dt, 338 is_uniformly_spaced=True, 339 )
Returns a version of the struct with uniformly spaced time
341 def to_windowed(self, window_desc): 342 """ 343 Create a windowed version of the waveform. Output will be uniformly spaced in time, even if current state isn't. 344 345 Args: 346 window_desc: String or tuple describing the desired window in the same format as scipy.signal.windows.get_window. 347 348 Examples: 349 >>> waveform_with_tukey_window = waveform.to_windowed('tukey') 350 >>> waveform_with_supergaussian_window = waveform.to_windowed(('general_gaussian', 2, 500)) 351 """ 352 uniform_self = self.to_uniformly_spaced() 353 new_wave = uniform_self.wave * sig.windows.get_window( 354 window_desc, Nx=uniform_self.wave.shape[0] 355 ) 356 return Waveform( 357 wave=new_wave, 358 time=uniform_self.time, 359 dt=uniform_self.dt, 360 is_uniformly_spaced=True, 361 )
Create a windowed version of the waveform. Output will be uniformly spaced in time, even if current state isn't.
Arguments:
- window_desc: String or tuple describing the desired window in the same format as scipy.signal.windows.get_window.
Examples:
>>> waveform_with_tukey_window = waveform.to_windowed('tukey') >>> waveform_with_supergaussian_window = waveform.to_windowed(('general_gaussian', 2, 500))
363 def to_bandpassed(self, frequency: float, sigma: float, order: int = 4): 364 r""" 365 Apply a bandpass filter, with the same spec as to_bandpassed method of ComplexSpectrum: 366 $e^{\frac{(f-f_0)^r}{2\sigma^r}}$ 367 where $f_0$ is the frequency argument, $\sigma$ is the sigma argument, and r is the order argument 368 369 Args: 370 frequency: the central frequency (Hz) of the bandpass 371 sigma: the width of the bandpass (Hz) 372 order: the order of the supergaussian 373 """ 374 return ( 375 self.to_complex_spectrum() 376 .to_bandpassed(frequency, sigma, order) 377 .to_waveform() 378 )
Apply a bandpass filter, with the same spec as to_bandpassed method of ComplexSpectrum: $e^{\frac{(f-f_0)^r}{2\sigma^r}}$ where $f_0$ is the frequency argument, $\sigma$ is the sigma argument, and r is the order argument
Arguments:
- frequency: the central frequency (Hz) of the bandpass
- sigma: the width of the bandpass (Hz)
- order: the order of the supergaussian
380 def to_complex_spectrum(self, padding_factor: int = 1): 381 """ 382 Converts to a ComplexSpectrum class 383 384 Args: 385 padding_factor (int): factor by which to expand the temporal length in the FFT, giving a smoother spectrum 386 """ 387 if self.is_uniformly_spaced: 388 new_spectrum = np.fft.rfft( 389 self.wave, n=self.wave.shape[0] * padding_factor, axis=0 390 ) 391 new_freq = np.fft.rfftfreq(self.wave.shape[0] * padding_factor, d=self.dt) 392 else: 393 uniform_self = self.to_uniformly_spaced() 394 new_spectrum = np.fft.rfft( 395 uniform_self.wave, n=uniform_self.wave.shape[0] * padding_factor, axis=0 396 ) 397 new_freq = np.fft.rfftfreq( 398 uniform_self.wave.shape[0] * padding_factor, d=uniform_self.dt 399 ) 400 401 return ComplexSpectrum(spectrum=new_spectrum, freq=new_freq)
Converts to a ComplexSpectrum class
Arguments:
- padding_factor (int): factor by which to expand the temporal length in the FFT, giving a smoother spectrum
403 def to_intensity_spectrum( 404 self, wavelength_scaled: bool = True, padding_factor: int = 1 405 ): 406 """ 407 Converts to an intensity spectrum 408 409 Args: 410 wavelength_scaled (bool): Correct the spectral intensities for plotting on a wavelength scale 411 """ 412 return self.to_complex_spectrum(padding_factor).to_intensity_spectrum( 413 wavelength_scaled 414 )
Converts to an intensity spectrum
Arguments:
- wavelength_scaled (bool): Correct the spectral intensities for plotting on a wavelength scale
416 def to_time_derivative(self): 417 """ 418 Return the time-derivative of the waveform 419 """ 420 return self.to_complex_spectrum().to_time_derivative().to_waveform()
Return the time-derivative of the waveform
422 def to_normalized(self): 423 """ 424 Return a normalized version of the waveform 425 """ 426 max_loc, max_val = find_maximum_location( 427 np.abs(np.array(sig.hilbert(self.wave))) 428 ) 429 return Waveform( 430 wave=self.wave / max_val, 431 time=np.array(self.time), 432 dt=self.dt, 433 is_uniformly_spaced=self.is_uniformly_spaced, 434 )
Return a normalized version of the waveform
436 def to_complex_envelope(self, f0: float = 0.0): 437 """ 438 Return a ComplexEnvelope class corresponding to the waveform 439 440 Args: 441 f0 (float): central frequency to use when constructing the envelope. E.g. oscillation at this frequency will be cancelled. 442 """ 443 uniform_self = self.to_uniformly_spaced() 444 analytic = np.array( 445 sig.hilbert(uniform_self.wave) 446 * np.exp(-1j * 2 * np.pi * f0 * uniform_self.time) 447 ) 448 return ComplexEnvelope( 449 envelope=analytic, 450 time=uniform_self.time, 451 dt=uniform_self.dt, 452 carrier_frequency=f0, 453 )
Return a ComplexEnvelope class corresponding to the waveform
Arguments:
- f0 (float): central frequency to use when constructing the envelope. E.g. oscillation at this frequency will be cancelled.
455 def get_envelope_fwhm(self) -> float: 456 """ 457 Get the full-width-at-half-maximum of the intensity envelope. 458 459 Returns: 460 float: the FWHM 461 """ 462 return self.to_complex_envelope().get_fwhm()
Get the full-width-at-half-maximum of the intensity envelope.
Returns:
float: the FWHM
464 def get_field_squared_fwhm(self): 465 """ 466 Get the full-width-at-half-maximum of the square of the field 467 468 Returns: 469 float: the FWHM 470 """ 471 uniform_self = self.to_uniformly_spaced() 472 return fwhm(uniform_self.wave**2, uniform_self.dt)
Get the full-width-at-half-maximum of the square of the field
Returns:
float: the FWHM
32 def from_dict(cls, data: dict): 33 """ 34 Takes a dict and makes an instance of the class 35 36 Args: 37 data (dict): the result of a call of .to_dict on the class 38 """ 39 40 def handle_complex_array(serialized_array) -> np.ndarray: 41 """Helper function to deserialize numpy arrays, handling complex types""" 42 if isinstance(serialized_array, list) and all( 43 isinstance(item, dict) and "re" in item and "im" in item 44 for item in serialized_array 45 ): 46 return np.array( 47 [complex(item["re"], item["im"]) for item in serialized_array], 48 dtype=np.complex128, 49 ) 50 return np.array(serialized_array) 51 52 loaded_data = {} 53 for field_name, field_type in cls.__annotations__.items(): 54 if field_type is np.ndarray: 55 loaded_data[field_name] = handle_complex_array(data[field_name]) 56 elif is_dataclass(field_type): 57 loaded_data[field_name] = field_type.from_dict(data[field_name]) 58 else: 59 loaded_data[field_name] = data[field_name] 60 return cls(**loaded_data)
Takes a dict and makes an instance of the class
Arguments:
- data (dict): the result of a call of .to_dict on the class
62 def load_yaml(cls, filename: str): 63 """ 64 load from a yaml file 65 66 Args: 67 filename (str): path to the file 68 """ 69 with open(filename, "r") as file: 70 data = yaml.load(file, yaml.SafeLoader) 71 return cls.from_dict(data)
load from a yaml file
Arguments:
- filename (str): path to the file
84 def to_dict(instance): 85 """ 86 serialize the class into a dict 87 """ 88 data_dict = {} 89 for field_name, field_type in instance.__annotations__.items(): 90 field_value = getattr(instance, field_name) 91 if field_type is np.ndarray: 92 if field_value.dtype == np.complex128: 93 data_dict[field_name] = [ 94 {"re": num.real, "im": num.imag} for num in field_value.tolist() 95 ] 96 else: 97 data_dict[field_name] = field_value.tolist() 98 elif is_dataclass(field_type): 99 data_dict[field_name] = field_value.to_dict() 100 elif field_type is np.float64 or field_type is float: 101 data_dict[field_name] = float(field_value) 102 else: 103 data_dict[field_name] = field_value 104 return data_dict
serialize the class into a dict
73 def save_yaml(instance, filename: str): 74 """ 75 save to a yaml file 76 77 Args: 78 filename (str): path to the file 79 """ 80 data_dict = instance.to_dict() 81 with open(filename, "w") as file: 82 yaml.dump(data_dict, file)
save to a yaml file
Arguments:
- filename (str): path to the file
475@yaml_io 476@dataclass(frozen=True, slots=True) 477class ComplexSpectrum: 478 """ 479 Contains a complex spectrum, with spectral weights on a frequency scale 480 481 Attributes: 482 spectrum (np.ndarray): the spectrum in complex128 format 483 freq (np.ndarray): 484 """ 485 486 spectrum: np.ndarray 487 freq: np.ndarray 488 489 def lock(self): 490 """ 491 Make the data immutable 492 """ 493 self.spectrum.setflags(write=False) 494 self.freq.setflags(write=False) 495 496 def copy(self): 497 return copy.deepcopy(self) 498 499 def to_time_derivative(self): 500 r"""Return a ComplexSpectrum corresponding to the time derivative (multiply by $i\omega$)""" 501 d_dt = 1j * 2 * np.pi * self.freq * self.spectrum 502 return ComplexSpectrum(spectrum=d_dt, freq=np.array(self.freq)) 503 504 def to_bandpassed(self, frequency: float, sigma: float, order: int = 4): 505 r""" 506 Return the complex spectrum after applying a supergaussian bandpass filter to the spectrum, of the form 507 $e^{\frac{(f-f_0)^r}{2\sigma^r}}$ 508 where $f_0$ is the frequency argument, $\sigma$ is the sigma argument, and r is the order argument 509 510 Args: 511 frequency: the central frequency (Hz) of the bandpass 512 sigma: the width of the bandpass (Hz) 513 order: the order of the supergaussian 514 """ 515 new_spectrum = self.spectrum * np.exp( 516 -((self.freq - frequency) ** order) / (2 * sigma**order) 517 ) 518 return ComplexSpectrum(spectrum=new_spectrum, freq=np.array(self.freq)) 519 520 def to_waveform(self): 521 """ 522 Create a Waveform based on this complex spectrum. 523 """ 524 wave = np.fft.irfft(self.spectrum, axis=0) 525 dt = 0.5 / (self.freq[-1] - self.freq[0]) 526 time = dt * np.array(range(wave.shape[0])) 527 return Waveform(wave=wave, time=time, dt=dt, is_uniformly_spaced=True) 528 529 def to_centered_waveform(self): 530 """ 531 Create a Waveform based on this complex spectrum and center it in the time window 532 """ 533 wave = np.fft.irfft(self.spectrum, axis=0) 534 dt = 0.5 / (self.freq[-1] - self.freq[0]) 535 time = dt * np.array(range(wave.shape[0])) 536 max_ind, max_val = find_maximum_location(np.abs(np.array(sig.hilbert(wave)))) 537 max_loc = dt * max_ind - 0.5 * time[-1] 538 wave = np.fft.irfft( 539 np.exp(-1j * self.freq * 2 * np.pi * max_loc) * self.spectrum, axis=0 540 ) 541 return Waveform(wave=wave, time=time, dt=dt, is_uniformly_spaced=True) 542 543 def to_intensity_spectrum(self, wavelength_scaled: bool = True): 544 """Create an IntensitySpectrum based on the current ComplexSpectrum 545 546 Args: 547 wavelength_scaled (bool): Apply the wavelength^-2 Jakobian such to correspond to W/nm spectrum""" 548 new_spectrum = np.array(np.abs(self.spectrum[self.freq > 0.0]) ** 2) 549 new_freq = np.array(self.freq[self.freq > 0.0]) 550 new_wavelength = constants.speed_of_light / new_freq 551 if wavelength_scaled: 552 new_spectrum /= new_wavelength**2 553 return IntensitySpectrum( 554 spectrum=new_spectrum, 555 phase=np.array(np.angle(self.spectrum[self.freq > 0.0])), 556 freq=new_freq, 557 wavelength=new_wavelength, 558 is_frequency_scaled=wavelength_scaled, 559 )
Contains a complex spectrum, with spectral weights on a frequency scale
Attributes:
- spectrum (np.ndarray): the spectrum in complex128 format
- freq (np.ndarray):
489 def lock(self): 490 """ 491 Make the data immutable 492 """ 493 self.spectrum.setflags(write=False) 494 self.freq.setflags(write=False)
Make the data immutable
499 def to_time_derivative(self): 500 r"""Return a ComplexSpectrum corresponding to the time derivative (multiply by $i\omega$)""" 501 d_dt = 1j * 2 * np.pi * self.freq * self.spectrum 502 return ComplexSpectrum(spectrum=d_dt, freq=np.array(self.freq))
Return a ComplexSpectrum corresponding to the time derivative (multiply by $i\omega$)
504 def to_bandpassed(self, frequency: float, sigma: float, order: int = 4): 505 r""" 506 Return the complex spectrum after applying a supergaussian bandpass filter to the spectrum, of the form 507 $e^{\frac{(f-f_0)^r}{2\sigma^r}}$ 508 where $f_0$ is the frequency argument, $\sigma$ is the sigma argument, and r is the order argument 509 510 Args: 511 frequency: the central frequency (Hz) of the bandpass 512 sigma: the width of the bandpass (Hz) 513 order: the order of the supergaussian 514 """ 515 new_spectrum = self.spectrum * np.exp( 516 -((self.freq - frequency) ** order) / (2 * sigma**order) 517 ) 518 return ComplexSpectrum(spectrum=new_spectrum, freq=np.array(self.freq))
Return the complex spectrum after applying a supergaussian bandpass filter to the spectrum, of the form $e^{\frac{(f-f_0)^r}{2\sigma^r}}$ where $f_0$ is the frequency argument, $\sigma$ is the sigma argument, and r is the order argument
Arguments:
- frequency: the central frequency (Hz) of the bandpass
- sigma: the width of the bandpass (Hz)
- order: the order of the supergaussian
520 def to_waveform(self): 521 """ 522 Create a Waveform based on this complex spectrum. 523 """ 524 wave = np.fft.irfft(self.spectrum, axis=0) 525 dt = 0.5 / (self.freq[-1] - self.freq[0]) 526 time = dt * np.array(range(wave.shape[0])) 527 return Waveform(wave=wave, time=time, dt=dt, is_uniformly_spaced=True)
Create a Waveform based on this complex spectrum.
529 def to_centered_waveform(self): 530 """ 531 Create a Waveform based on this complex spectrum and center it in the time window 532 """ 533 wave = np.fft.irfft(self.spectrum, axis=0) 534 dt = 0.5 / (self.freq[-1] - self.freq[0]) 535 time = dt * np.array(range(wave.shape[0])) 536 max_ind, max_val = find_maximum_location(np.abs(np.array(sig.hilbert(wave)))) 537 max_loc = dt * max_ind - 0.5 * time[-1] 538 wave = np.fft.irfft( 539 np.exp(-1j * self.freq * 2 * np.pi * max_loc) * self.spectrum, axis=0 540 ) 541 return Waveform(wave=wave, time=time, dt=dt, is_uniformly_spaced=True)
Create a Waveform based on this complex spectrum and center it in the time window
543 def to_intensity_spectrum(self, wavelength_scaled: bool = True): 544 """Create an IntensitySpectrum based on the current ComplexSpectrum 545 546 Args: 547 wavelength_scaled (bool): Apply the wavelength^-2 Jakobian such to correspond to W/nm spectrum""" 548 new_spectrum = np.array(np.abs(self.spectrum[self.freq > 0.0]) ** 2) 549 new_freq = np.array(self.freq[self.freq > 0.0]) 550 new_wavelength = constants.speed_of_light / new_freq 551 if wavelength_scaled: 552 new_spectrum /= new_wavelength**2 553 return IntensitySpectrum( 554 spectrum=new_spectrum, 555 phase=np.array(np.angle(self.spectrum[self.freq > 0.0])), 556 freq=new_freq, 557 wavelength=new_wavelength, 558 is_frequency_scaled=wavelength_scaled, 559 )
Create an IntensitySpectrum based on the current ComplexSpectrum
Arguments:
- wavelength_scaled (bool): Apply the wavelength^-2 Jakobian such to correspond to W/nm spectrum
32 def from_dict(cls, data: dict): 33 """ 34 Takes a dict and makes an instance of the class 35 36 Args: 37 data (dict): the result of a call of .to_dict on the class 38 """ 39 40 def handle_complex_array(serialized_array) -> np.ndarray: 41 """Helper function to deserialize numpy arrays, handling complex types""" 42 if isinstance(serialized_array, list) and all( 43 isinstance(item, dict) and "re" in item and "im" in item 44 for item in serialized_array 45 ): 46 return np.array( 47 [complex(item["re"], item["im"]) for item in serialized_array], 48 dtype=np.complex128, 49 ) 50 return np.array(serialized_array) 51 52 loaded_data = {} 53 for field_name, field_type in cls.__annotations__.items(): 54 if field_type is np.ndarray: 55 loaded_data[field_name] = handle_complex_array(data[field_name]) 56 elif is_dataclass(field_type): 57 loaded_data[field_name] = field_type.from_dict(data[field_name]) 58 else: 59 loaded_data[field_name] = data[field_name] 60 return cls(**loaded_data)
Takes a dict and makes an instance of the class
Arguments:
- data (dict): the result of a call of .to_dict on the class
62 def load_yaml(cls, filename: str): 63 """ 64 load from a yaml file 65 66 Args: 67 filename (str): path to the file 68 """ 69 with open(filename, "r") as file: 70 data = yaml.load(file, yaml.SafeLoader) 71 return cls.from_dict(data)
load from a yaml file
Arguments:
- filename (str): path to the file
84 def to_dict(instance): 85 """ 86 serialize the class into a dict 87 """ 88 data_dict = {} 89 for field_name, field_type in instance.__annotations__.items(): 90 field_value = getattr(instance, field_name) 91 if field_type is np.ndarray: 92 if field_value.dtype == np.complex128: 93 data_dict[field_name] = [ 94 {"re": num.real, "im": num.imag} for num in field_value.tolist() 95 ] 96 else: 97 data_dict[field_name] = field_value.tolist() 98 elif is_dataclass(field_type): 99 data_dict[field_name] = field_value.to_dict() 100 elif field_type is np.float64 or field_type is float: 101 data_dict[field_name] = float(field_value) 102 else: 103 data_dict[field_name] = field_value 104 return data_dict
serialize the class into a dict
73 def save_yaml(instance, filename: str): 74 """ 75 save to a yaml file 76 77 Args: 78 filename (str): path to the file 79 """ 80 data_dict = instance.to_dict() 81 with open(filename, "w") as file: 82 yaml.dump(data_dict, file)
save to a yaml file
Arguments:
- filename (str): path to the file
562@yaml_io 563@dataclass(frozen=True, slots=True) 564class IntensitySpectrum: 565 """ 566 Contains an intensity spectrum - real valued. SI units 567 568 Attributes: 569 spectrum (Optional[np.ndarray]): the spectral intensity 570 phase (Optional[np.ndarray]): the spectral phase 571 freq (Optional[np.ndarray]): the frequencies corresponding to the spectrum 572 wavelength (Optional[np.ndarray]): the wavelengths 573 is_frequency_scaled (bool): has the lamda^-2 Jakobian been applied to Fourier transformed data? 574 """ 575 576 spectrum: np.ndarray 577 phase: Optional[np.ndarray] 578 freq: np.ndarray 579 wavelength: np.ndarray 580 is_frequency_scaled: bool = False 581 582 def lock(self): 583 """ 584 Make the data immutable 585 """ 586 self.spectrum.setflags(write=False) 587 self.freq.setflags(write=False) 588 self.wavelength.setflags(write=False) 589 if self.phase is not None: 590 self.phase.setflags(write=False) 591 592 def copy(self): 593 return copy.deepcopy(self) 594 595 def wavelength_nm(self): 596 """ 597 Returns: 598 Optional[np.ndarray]: the wavelengths in nm 599 """ 600 if self.wavelength is not None: 601 return 1e9 * self.wavelength 602 else: 603 return None 604 605 def wavelength_micron(self): 606 """ 607 Returns: 608 Optional[np.ndarray]: the wavelengths in microns 609 """ 610 if self.wavelength is not None: 611 return 1e6 * self.wavelength 612 else: 613 return None 614 615 def from_spectrometer_spectrum_nanometers( 616 self, wavelengths_nanometers: np.ndarray, spectrum: np.ndarray 617 ): 618 """ 619 Generate an instance based on a spepctrum array and wavelength array in nm (typical spectrometer data) 620 621 Args: 622 wavelengths_nanometers: the wavelengths in nanometers 623 spectrum: the spectral intensity 624 """ 625 wavelengths = 1e-9 * wavelengths_nanometers 626 freqs = constants.speed_of_light / wavelengths 627 phase = np.zeros(freqs.shape, dtype=float) 628 return IntensitySpectrum( 629 spectrum=spectrum, wavelength=wavelengths, freq=freqs, phase=phase 630 ) 631 632 def get_transform_limited_pulse(self, gate_level: Optional[float] = None): 633 """ 634 Returns the transform-limited pulse corresponding to the spectrum. 635 636 Args: 637 gate_level (float): Apply a gate such that only values above gate_level*max(spectrum) are included 638 639 Returns: 640 ComplexEnvelope: the transform-limited pulse 641 """ 642 if self.spectrum is not None: 643 spectrum = np.array(self.spectrum) 644 lam = None 645 if self.wavelength is None and self.freq is not None: 646 lam = constants.speed_of_light / self.freq[self.freq > 0.0] 647 spectrum = spectrum[self.freq > 0.0] 648 elif self.wavelength is not None: 649 lam = self.wavelength 650 if lam is not None: 651 if self.is_frequency_scaled: 652 spectrum /= lam**2 653 t, intensity = transform_limited_pulse_from_spectrometer( 654 1e9 * lam, spectrum, gate_level 655 ) 656 return ComplexEnvelope( 657 envelope=np.sqrt(intensity), time=t, dt=t[1] - t[0] 658 ) 659 raise Exception("Missing data") 660 661 def get_wavelength_spectrum(self): 662 """ 663 Return a wavelength-scaled spectrum, independent of the state of the current instance 664 665 Returns: 666 np.ndarray, np.ndarray: wavelength and spectrum 667 """ 668 if self.is_frequency_scaled: 669 if self.freq is not None and self.spectrum is not None: 670 return frequency_to_wavelength(self.freq, self.spectrum) 671 else: 672 raise Exception("Missing data") 673 else: 674 if self.wavelength is not None and self.spectrum is not None: 675 return self.wavelength, self.spectrum 676 else: 677 raise Exception("Missing data") 678 679 def get_frequency_spectrum(self): 680 """ 681 Return a frequency-scaled spectrum, independent of the state of the current instance 682 683 Returns: 684 np.ndarray, np.ndarray: wavelength and spectrum 685 """ 686 if self.is_frequency_scaled: 687 if self.freq is not None and self.spectrum is not None: 688 return self.freq, self.spectrum 689 else: 690 raise Exception("Missing data") 691 else: 692 if self.wavelength is not None and self.spectrum is not None: 693 return wavelength_to_frequency(1e9 * self.wavelength, self.spectrum) 694 else: 695 raise Exception("Missing data") 696 697 def to_normalized(self): 698 """ 699 Returns a normalized version of the current instance. 700 """ 701 702 normalized_spectrum = self.spectrum / np.max(self.spectrum) 703 704 return IntensitySpectrum( 705 spectrum=normalized_spectrum, 706 phase=np.array(self.phase), 707 freq=np.array(self.freq), 708 wavelength=np.array(self.wavelength), 709 is_frequency_scaled=self.is_frequency_scaled, 710 ) 711 712 def plot_with_group_delay( 713 self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None 714 ): 715 """ 716 Plot the spectrum and group delay curve. 717 718 Args: 719 ax: optionally plot onto a pre-existing matplotlib Axes 720 phase_blanking: only show phase information (group delay) above this level relative to max intensity 721 xlim: pass arguments to set_xlim() to constrain the x-axis 722 """ 723 if ax is None: 724 fig, ax = plt.subplots() 725 else: 726 fig = ax.get_figure() 727 728 start_index = np.argmax(self.spectrum > 0) 729 intensity = self.spectrum[start_index::] 730 freq = self.freq[start_index::] 731 wl = constants.speed_of_light / freq 732 if self.phase is not None: 733 phase = np.unwrap(self.phase[start_index::]) 734 else: 735 phase = np.zeros(intensity.shape, dtype=float) 736 737 intensity /= np.max(intensity) 738 intensity_line = ax.plot(1e9 * wl, intensity, label="Intensity") 739 ax.set_xlabel("Wavelength (nm)") 740 ax.set_ylabel("Intensity (Arb. unit)") 741 ax_phase = plt.twinx(ax) 742 group_delay = (1e15 / (2 * np.pi)) * derivative(phase, 1) / (freq[1] - freq[2]) 743 assert isinstance(ax_phase, Axes) 744 ax_phase.plot([], []) 745 phase_line = ax_phase.plot( 746 1e9 * wl[intensity > phase_blanking], 747 group_delay[intensity > phase_blanking], 748 "--", 749 label="Group delay", 750 ) 751 ax_phase.set_ylabel("Group delay (fs)") 752 if xlim is not None: 753 ax.set_xlim(xlim) 754 ax_phase.set_xlim(xlim) 755 lines = lines = intensity_line + phase_line 756 ax.legend(lines, [str(line.get_label()) for line in lines]) 757 return fig
Contains an intensity spectrum - real valued. SI units
Attributes:
- spectrum (Optional[np.ndarray]): the spectral intensity
- phase (Optional[np.ndarray]): the spectral phase
- freq (Optional[np.ndarray]): the frequencies corresponding to the spectrum
- wavelength (Optional[np.ndarray]): the wavelengths
- is_frequency_scaled (bool): has the lamda^-2 Jakobian been applied to Fourier transformed data?
582 def lock(self): 583 """ 584 Make the data immutable 585 """ 586 self.spectrum.setflags(write=False) 587 self.freq.setflags(write=False) 588 self.wavelength.setflags(write=False) 589 if self.phase is not None: 590 self.phase.setflags(write=False)
Make the data immutable
595 def wavelength_nm(self): 596 """ 597 Returns: 598 Optional[np.ndarray]: the wavelengths in nm 599 """ 600 if self.wavelength is not None: 601 return 1e9 * self.wavelength 602 else: 603 return None
Returns:
Optional[np.ndarray]: the wavelengths in nm
605 def wavelength_micron(self): 606 """ 607 Returns: 608 Optional[np.ndarray]: the wavelengths in microns 609 """ 610 if self.wavelength is not None: 611 return 1e6 * self.wavelength 612 else: 613 return None
Returns:
Optional[np.ndarray]: the wavelengths in microns
615 def from_spectrometer_spectrum_nanometers( 616 self, wavelengths_nanometers: np.ndarray, spectrum: np.ndarray 617 ): 618 """ 619 Generate an instance based on a spepctrum array and wavelength array in nm (typical spectrometer data) 620 621 Args: 622 wavelengths_nanometers: the wavelengths in nanometers 623 spectrum: the spectral intensity 624 """ 625 wavelengths = 1e-9 * wavelengths_nanometers 626 freqs = constants.speed_of_light / wavelengths 627 phase = np.zeros(freqs.shape, dtype=float) 628 return IntensitySpectrum( 629 spectrum=spectrum, wavelength=wavelengths, freq=freqs, phase=phase 630 )
Generate an instance based on a spepctrum array and wavelength array in nm (typical spectrometer data)
Arguments:
- wavelengths_nanometers: the wavelengths in nanometers
- spectrum: the spectral intensity
632 def get_transform_limited_pulse(self, gate_level: Optional[float] = None): 633 """ 634 Returns the transform-limited pulse corresponding to the spectrum. 635 636 Args: 637 gate_level (float): Apply a gate such that only values above gate_level*max(spectrum) are included 638 639 Returns: 640 ComplexEnvelope: the transform-limited pulse 641 """ 642 if self.spectrum is not None: 643 spectrum = np.array(self.spectrum) 644 lam = None 645 if self.wavelength is None and self.freq is not None: 646 lam = constants.speed_of_light / self.freq[self.freq > 0.0] 647 spectrum = spectrum[self.freq > 0.0] 648 elif self.wavelength is not None: 649 lam = self.wavelength 650 if lam is not None: 651 if self.is_frequency_scaled: 652 spectrum /= lam**2 653 t, intensity = transform_limited_pulse_from_spectrometer( 654 1e9 * lam, spectrum, gate_level 655 ) 656 return ComplexEnvelope( 657 envelope=np.sqrt(intensity), time=t, dt=t[1] - t[0] 658 ) 659 raise Exception("Missing data")
Returns the transform-limited pulse corresponding to the spectrum.
Arguments:
- gate_level (float): Apply a gate such that only values above gate_level*max(spectrum) are included
Returns:
ComplexEnvelope: the transform-limited pulse
661 def get_wavelength_spectrum(self): 662 """ 663 Return a wavelength-scaled spectrum, independent of the state of the current instance 664 665 Returns: 666 np.ndarray, np.ndarray: wavelength and spectrum 667 """ 668 if self.is_frequency_scaled: 669 if self.freq is not None and self.spectrum is not None: 670 return frequency_to_wavelength(self.freq, self.spectrum) 671 else: 672 raise Exception("Missing data") 673 else: 674 if self.wavelength is not None and self.spectrum is not None: 675 return self.wavelength, self.spectrum 676 else: 677 raise Exception("Missing data")
Return a wavelength-scaled spectrum, independent of the state of the current instance
Returns:
np.ndarray, np.ndarray: wavelength and spectrum
679 def get_frequency_spectrum(self): 680 """ 681 Return a frequency-scaled spectrum, independent of the state of the current instance 682 683 Returns: 684 np.ndarray, np.ndarray: wavelength and spectrum 685 """ 686 if self.is_frequency_scaled: 687 if self.freq is not None and self.spectrum is not None: 688 return self.freq, self.spectrum 689 else: 690 raise Exception("Missing data") 691 else: 692 if self.wavelength is not None and self.spectrum is not None: 693 return wavelength_to_frequency(1e9 * self.wavelength, self.spectrum) 694 else: 695 raise Exception("Missing data")
Return a frequency-scaled spectrum, independent of the state of the current instance
Returns:
np.ndarray, np.ndarray: wavelength and spectrum
697 def to_normalized(self): 698 """ 699 Returns a normalized version of the current instance. 700 """ 701 702 normalized_spectrum = self.spectrum / np.max(self.spectrum) 703 704 return IntensitySpectrum( 705 spectrum=normalized_spectrum, 706 phase=np.array(self.phase), 707 freq=np.array(self.freq), 708 wavelength=np.array(self.wavelength), 709 is_frequency_scaled=self.is_frequency_scaled, 710 )
Returns a normalized version of the current instance.
712 def plot_with_group_delay( 713 self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None 714 ): 715 """ 716 Plot the spectrum and group delay curve. 717 718 Args: 719 ax: optionally plot onto a pre-existing matplotlib Axes 720 phase_blanking: only show phase information (group delay) above this level relative to max intensity 721 xlim: pass arguments to set_xlim() to constrain the x-axis 722 """ 723 if ax is None: 724 fig, ax = plt.subplots() 725 else: 726 fig = ax.get_figure() 727 728 start_index = np.argmax(self.spectrum > 0) 729 intensity = self.spectrum[start_index::] 730 freq = self.freq[start_index::] 731 wl = constants.speed_of_light / freq 732 if self.phase is not None: 733 phase = np.unwrap(self.phase[start_index::]) 734 else: 735 phase = np.zeros(intensity.shape, dtype=float) 736 737 intensity /= np.max(intensity) 738 intensity_line = ax.plot(1e9 * wl, intensity, label="Intensity") 739 ax.set_xlabel("Wavelength (nm)") 740 ax.set_ylabel("Intensity (Arb. unit)") 741 ax_phase = plt.twinx(ax) 742 group_delay = (1e15 / (2 * np.pi)) * derivative(phase, 1) / (freq[1] - freq[2]) 743 assert isinstance(ax_phase, Axes) 744 ax_phase.plot([], []) 745 phase_line = ax_phase.plot( 746 1e9 * wl[intensity > phase_blanking], 747 group_delay[intensity > phase_blanking], 748 "--", 749 label="Group delay", 750 ) 751 ax_phase.set_ylabel("Group delay (fs)") 752 if xlim is not None: 753 ax.set_xlim(xlim) 754 ax_phase.set_xlim(xlim) 755 lines = lines = intensity_line + phase_line 756 ax.legend(lines, [str(line.get_label()) for line in lines]) 757 return fig
Plot the spectrum and group delay curve.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
- phase_blanking: only show phase information (group delay) above this level relative to max intensity
- xlim: pass arguments to set_xlim() to constrain the x-axis
32 def from_dict(cls, data: dict): 33 """ 34 Takes a dict and makes an instance of the class 35 36 Args: 37 data (dict): the result of a call of .to_dict on the class 38 """ 39 40 def handle_complex_array(serialized_array) -> np.ndarray: 41 """Helper function to deserialize numpy arrays, handling complex types""" 42 if isinstance(serialized_array, list) and all( 43 isinstance(item, dict) and "re" in item and "im" in item 44 for item in serialized_array 45 ): 46 return np.array( 47 [complex(item["re"], item["im"]) for item in serialized_array], 48 dtype=np.complex128, 49 ) 50 return np.array(serialized_array) 51 52 loaded_data = {} 53 for field_name, field_type in cls.__annotations__.items(): 54 if field_type is np.ndarray: 55 loaded_data[field_name] = handle_complex_array(data[field_name]) 56 elif is_dataclass(field_type): 57 loaded_data[field_name] = field_type.from_dict(data[field_name]) 58 else: 59 loaded_data[field_name] = data[field_name] 60 return cls(**loaded_data)
Takes a dict and makes an instance of the class
Arguments:
- data (dict): the result of a call of .to_dict on the class
62 def load_yaml(cls, filename: str): 63 """ 64 load from a yaml file 65 66 Args: 67 filename (str): path to the file 68 """ 69 with open(filename, "r") as file: 70 data = yaml.load(file, yaml.SafeLoader) 71 return cls.from_dict(data)
load from a yaml file
Arguments:
- filename (str): path to the file
84 def to_dict(instance): 85 """ 86 serialize the class into a dict 87 """ 88 data_dict = {} 89 for field_name, field_type in instance.__annotations__.items(): 90 field_value = getattr(instance, field_name) 91 if field_type is np.ndarray: 92 if field_value.dtype == np.complex128: 93 data_dict[field_name] = [ 94 {"re": num.real, "im": num.imag} for num in field_value.tolist() 95 ] 96 else: 97 data_dict[field_name] = field_value.tolist() 98 elif is_dataclass(field_type): 99 data_dict[field_name] = field_value.to_dict() 100 elif field_type is np.float64 or field_type is float: 101 data_dict[field_name] = float(field_value) 102 else: 103 data_dict[field_name] = field_value 104 return data_dict
serialize the class into a dict
73 def save_yaml(instance, filename: str): 74 """ 75 save to a yaml file 76 77 Args: 78 filename (str): path to the file 79 """ 80 data_dict = instance.to_dict() 81 with open(filename, "w") as file: 82 yaml.dump(data_dict, file)
save to a yaml file
Arguments:
- filename (str): path to the file
760@yaml_io 761@dataclass(frozen=True, slots=True) 762class ComplexEnvelope: 763 """ 764 Data corresponding to a complex envelope of a pulse, e.g. from a FROG measurement. 765 766 Attributes: 767 envelope (np.ndarray): the complex envelope 768 time: (np.ndarray): the time array 769 dt (float): the time step 770 carrier_frequency (float): the carrier frequency of the envelope 771 """ 772 773 envelope: np.ndarray 774 time: np.ndarray 775 dt: float 776 carrier_frequency: float = 0.0 777 778 def lock(self): 779 """ 780 Make the data immutable 781 """ 782 self.envelope.setflags(write=False) 783 self.time.setflags(write=False) 784 785 def time_fs(self): 786 return 1e15 * self.time 787 788 def copy(self): 789 return copy.deepcopy(self) 790 791 def get_fwhm(self) -> float: 792 """ 793 Full-width-at-half-maximum value of the envelope 794 Returns: 795 float: the fwhm 796 """ 797 return fwhm(np.abs(self.envelope) ** 2, self.dt) 798 799 def to_complex_spectrum(self, padding_factor: int = 1) -> ComplexSpectrum: 800 """ 801 Returns a ComplexSpectrum based on the data 802 """ 803 804 return ComplexSpectrum( 805 spectrum=np.fft.rfft( 806 self.envelope, self.envelope.shape[0] * padding_factor 807 ), 808 freq=np.fft.rfftfreq(self.envelope.shape[0] * padding_factor, self.dt) 809 + self.carrier_frequency, 810 ) 811 812 def to_waveform( 813 self, interpolation_factor: int = 1, CEP_shift: float = 0.0 814 ) -> Waveform: 815 """ 816 Returns a Waveform based on the data 817 """ 818 output_dt = self.dt / interpolation_factor 819 output_time = self.time[0] + output_dt * np.array( 820 range(self.time.shape[0] * interpolation_factor) 821 ) 822 if interpolation_factor != 1: 823 output_envelope = sig.resample( 824 np.real(self.envelope), output_time.shape[0] 825 ) + 1j * np.array( 826 sig.resample(np.imag(self.envelope), output_time.shape[0]) 827 ) 828 else: 829 output_envelope = self.envelope 830 return Waveform( 831 wave=np.real( 832 np.exp( 833 1j * self.carrier_frequency * 2 * np.pi * output_time 834 - 1j * CEP_shift 835 ) 836 * output_envelope 837 ), 838 time=output_time, 839 dt=output_dt, 840 is_uniformly_spaced=True, 841 ) 842 843 def plot(self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None): 844 """ 845 Plot the pulse. 846 847 Args: 848 ax: optionally plot onto a pre-existing matplotlib Axes 849 phase_blanking: only show phase information (instantaneous frequency) above this level relative to max intensity 850 xlim: pass arguments to set_xlim() to constrain the x-axis 851 """ 852 if ax is None: 853 fig, ax = plt.subplots() 854 else: 855 fig = ax.get_figure() 856 time_ax = self.time_fs() - np.mean(self.time_fs()) 857 intensity = np.abs(self.envelope) ** 2 858 intensity /= np.max(intensity) 859 intensity_line = ax.plot( 860 time_ax, 861 intensity, 862 label=f"Intensity, fwhm {1e15 * self.get_fwhm():0.1f} fs", 863 ) 864 ax.set_xlabel("Time (fs)") 865 ax.set_ylabel("Intensity (Arb. unit)") 866 inst_freq = ( 867 (1e-12 / (2 * np.pi)) 868 * derivative(np.unwrap(np.angle(self.envelope)), 1) 869 / self.dt 870 ) 871 ax_phase = plt.twinx(ax) 872 assert isinstance(ax_phase, Axes) 873 ax_phase.plot([], []) 874 phase_line = ax_phase.plot( 875 time_ax[intensity > phase_blanking], 876 inst_freq[intensity > phase_blanking], 877 "--", 878 label="Inst. frequency", 879 ) 880 ax_phase.set_ylabel("Inst. frequency (THz)") 881 if xlim is not None: 882 ax.set_xlim(xlim) 883 ax_phase.set_xlim(xlim) 884 lines = lines = intensity_line + phase_line 885 ax.legend(lines, [str(line.get_label()) for line in lines]) 886 return fig
Data corresponding to a complex envelope of a pulse, e.g. from a FROG measurement.
Attributes:
- envelope (np.ndarray): the complex envelope
- time: (np.ndarray): the time array
- dt (float): the time step
- carrier_frequency (float): the carrier frequency of the envelope
778 def lock(self): 779 """ 780 Make the data immutable 781 """ 782 self.envelope.setflags(write=False) 783 self.time.setflags(write=False)
Make the data immutable
791 def get_fwhm(self) -> float: 792 """ 793 Full-width-at-half-maximum value of the envelope 794 Returns: 795 float: the fwhm 796 """ 797 return fwhm(np.abs(self.envelope) ** 2, self.dt)
Full-width-at-half-maximum value of the envelope
Returns:
float: the fwhm
799 def to_complex_spectrum(self, padding_factor: int = 1) -> ComplexSpectrum: 800 """ 801 Returns a ComplexSpectrum based on the data 802 """ 803 804 return ComplexSpectrum( 805 spectrum=np.fft.rfft( 806 self.envelope, self.envelope.shape[0] * padding_factor 807 ), 808 freq=np.fft.rfftfreq(self.envelope.shape[0] * padding_factor, self.dt) 809 + self.carrier_frequency, 810 )
Returns a ComplexSpectrum based on the data
812 def to_waveform( 813 self, interpolation_factor: int = 1, CEP_shift: float = 0.0 814 ) -> Waveform: 815 """ 816 Returns a Waveform based on the data 817 """ 818 output_dt = self.dt / interpolation_factor 819 output_time = self.time[0] + output_dt * np.array( 820 range(self.time.shape[0] * interpolation_factor) 821 ) 822 if interpolation_factor != 1: 823 output_envelope = sig.resample( 824 np.real(self.envelope), output_time.shape[0] 825 ) + 1j * np.array( 826 sig.resample(np.imag(self.envelope), output_time.shape[0]) 827 ) 828 else: 829 output_envelope = self.envelope 830 return Waveform( 831 wave=np.real( 832 np.exp( 833 1j * self.carrier_frequency * 2 * np.pi * output_time 834 - 1j * CEP_shift 835 ) 836 * output_envelope 837 ), 838 time=output_time, 839 dt=output_dt, 840 is_uniformly_spaced=True, 841 )
Returns a Waveform based on the data
843 def plot(self, ax: Optional[Axes] = None, phase_blanking: float = 0.05, xlim=None): 844 """ 845 Plot the pulse. 846 847 Args: 848 ax: optionally plot onto a pre-existing matplotlib Axes 849 phase_blanking: only show phase information (instantaneous frequency) above this level relative to max intensity 850 xlim: pass arguments to set_xlim() to constrain the x-axis 851 """ 852 if ax is None: 853 fig, ax = plt.subplots() 854 else: 855 fig = ax.get_figure() 856 time_ax = self.time_fs() - np.mean(self.time_fs()) 857 intensity = np.abs(self.envelope) ** 2 858 intensity /= np.max(intensity) 859 intensity_line = ax.plot( 860 time_ax, 861 intensity, 862 label=f"Intensity, fwhm {1e15 * self.get_fwhm():0.1f} fs", 863 ) 864 ax.set_xlabel("Time (fs)") 865 ax.set_ylabel("Intensity (Arb. unit)") 866 inst_freq = ( 867 (1e-12 / (2 * np.pi)) 868 * derivative(np.unwrap(np.angle(self.envelope)), 1) 869 / self.dt 870 ) 871 ax_phase = plt.twinx(ax) 872 assert isinstance(ax_phase, Axes) 873 ax_phase.plot([], []) 874 phase_line = ax_phase.plot( 875 time_ax[intensity > phase_blanking], 876 inst_freq[intensity > phase_blanking], 877 "--", 878 label="Inst. frequency", 879 ) 880 ax_phase.set_ylabel("Inst. frequency (THz)") 881 if xlim is not None: 882 ax.set_xlim(xlim) 883 ax_phase.set_xlim(xlim) 884 lines = lines = intensity_line + phase_line 885 ax.legend(lines, [str(line.get_label()) for line in lines]) 886 return fig
Plot the pulse.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
- phase_blanking: only show phase information (instantaneous frequency) above this level relative to max intensity
- xlim: pass arguments to set_xlim() to constrain the x-axis
32 def from_dict(cls, data: dict): 33 """ 34 Takes a dict and makes an instance of the class 35 36 Args: 37 data (dict): the result of a call of .to_dict on the class 38 """ 39 40 def handle_complex_array(serialized_array) -> np.ndarray: 41 """Helper function to deserialize numpy arrays, handling complex types""" 42 if isinstance(serialized_array, list) and all( 43 isinstance(item, dict) and "re" in item and "im" in item 44 for item in serialized_array 45 ): 46 return np.array( 47 [complex(item["re"], item["im"]) for item in serialized_array], 48 dtype=np.complex128, 49 ) 50 return np.array(serialized_array) 51 52 loaded_data = {} 53 for field_name, field_type in cls.__annotations__.items(): 54 if field_type is np.ndarray: 55 loaded_data[field_name] = handle_complex_array(data[field_name]) 56 elif is_dataclass(field_type): 57 loaded_data[field_name] = field_type.from_dict(data[field_name]) 58 else: 59 loaded_data[field_name] = data[field_name] 60 return cls(**loaded_data)
Takes a dict and makes an instance of the class
Arguments:
- data (dict): the result of a call of .to_dict on the class
62 def load_yaml(cls, filename: str): 63 """ 64 load from a yaml file 65 66 Args: 67 filename (str): path to the file 68 """ 69 with open(filename, "r") as file: 70 data = yaml.load(file, yaml.SafeLoader) 71 return cls.from_dict(data)
load from a yaml file
Arguments:
- filename (str): path to the file
84 def to_dict(instance): 85 """ 86 serialize the class into a dict 87 """ 88 data_dict = {} 89 for field_name, field_type in instance.__annotations__.items(): 90 field_value = getattr(instance, field_name) 91 if field_type is np.ndarray: 92 if field_value.dtype == np.complex128: 93 data_dict[field_name] = [ 94 {"re": num.real, "im": num.imag} for num in field_value.tolist() 95 ] 96 else: 97 data_dict[field_name] = field_value.tolist() 98 elif is_dataclass(field_type): 99 data_dict[field_name] = field_value.to_dict() 100 elif field_type is np.float64 or field_type is float: 101 data_dict[field_name] = float(field_value) 102 else: 103 data_dict[field_name] = field_value 104 return data_dict
serialize the class into a dict
73 def save_yaml(instance, filename: str): 74 """ 75 save to a yaml file 76 77 Args: 78 filename (str): path to the file 79 """ 80 data_dict = instance.to_dict() 81 with open(filename, "w") as file: 82 yaml.dump(data_dict, file)
save to a yaml file
Arguments:
- filename (str): path to the file
113@yaml_io 114@dataclass(frozen=True, slots=True) 115class Spectrogram: 116 data: np.ndarray 117 time: np.ndarray 118 freq: np.ndarray 119 120 def lock(self): 121 """ 122 Make the data immutable 123 """ 124 self.data.setflags(write=False) 125 self.time.setflags(write=False) 126 self.freq.setflags(write=False) 127 128 def save(self, filename): 129 """ 130 Save in the .A.dat file format used by FROG .etc 131 Args: 132 filename: the file to be saved 133 134 The file is structured like this: 135 [number of wavelengths] [number of times] 136 [minimum value of the trace] [maximum value of the trace] 137 [array of wavelengths] 138 [array of times] 139 [data array as single column] 140 """ 141 with open(filename, "w") as file: 142 file.write(f"{len(self.freq)}\t{len(self.time)}\n") 143 file.write(f"{np.min(self.data[:])}\t{np.max(self.data[:])}\n") 144 for freq in self.freq: 145 wavelength_nm = 1e9 * constants.speed_of_light / freq 146 file.write(f"{wavelength_nm:15.15g}\n") 147 for time in self.time: 148 time_fs = 1e15 * time 149 file.write(f"{time_fs:15.15g}\n") 150 for x in self.data: 151 for y in x: 152 file.write(f"{y:15.15g}\n") 153 154 def to_block_binned(self, freq_bin: int, time_bin: int, method: str = "mean"): 155 """ 156 Apply block-binning to the spectrogram. 157 158 Args: 159 freq_bin (int): block size for averaging in the frequency direction 160 time_bin (int): block size for averaging in the time-direction 161 method (str): can be ```mean``` or ```median``` 162 """ 163 return Spectrogram( 164 data=block_binning_2d(self.data, time_bin, freq_bin, method), 165 freq=block_binning_1d(self.freq, freq_bin, "mean"), 166 time=block_binning_1d(self.time, time_bin, "mean"), 167 ) 168 169 def to_per_frequency_dc_removed(self, extra_offset: float = 0.0): 170 """Perform DC offset removal on a measured spectrogram, on a per-frequency basis 171 172 Args: 173 extra_offset (float): subtract a value from the entire array (negative values are always set to zero) 174 175 Returns: 176 Spectrogram: the spectrogram with offset removed.""" 177 new_data = np.array(self.data) 178 new_data -= extra_offset 179 new_data[new_data < 0.0] = 0.0 180 for _i in range(new_data.shape[0]): 181 new_data[_i, :] -= np.min(new_data[_i, :]) 182 183 return Spectrogram(data=new_data, time=self.time, freq=self.freq) 184 185 def to_symmetrized(self): 186 """ 187 Average the trace with a time-reversed copy. This might be useful for getting a reconstruction of difficult data, but keep in mind that the resulting measured trace will no longer represent the real measurement and should not be published as such. 188 """ 189 return Spectrogram( 190 data=0.5 * (self.data + np.fliplr(self.data)), 191 time=self.time, 192 freq=self.freq, 193 ) 194 195 def to_binned( 196 self, 197 dim: int = 64, 198 dt: float = 5e-15, 199 t0: Optional[float] = None, 200 f0: float = 750e12, 201 ): 202 """Bin a spectrogram to a FFT-appropriate shape 203 204 Args: 205 dim (int): size of each size of the resulting square data 206 dt (float): time step of the data 207 t0: (Optional[float]): time-zero of the data. If not specified, will be calculated by the first moment of the time-distribution of the signal 208 f0: (float): central frequency of the binned array 209 210 Returns: 211 Spectrogram: the binned spectrogram 212 """ 213 _t = np.array(range(dim)) * dt 214 _t -= np.mean(_t) 215 _f = np.fft.fftshift(np.fft.fftfreq(dim, d=dt) + f0) 216 binned_data = np.zeros((dim, self.time.shape[0]), dtype=float) 217 for _i in range(self.time.shape[0]): 218 binned_data[:, _i] = interpolate( 219 _f, self.freq, np.array(self.data[:, _i]), neighbors=2 220 ) 221 binned_data /= np.max(binned_data[:]) 222 if t0 is None: 223 ac = np.sum(binned_data, axis=0) 224 t0 = np.sum(ac * self.time) / np.sum(ac) 225 binned_data_square = np.zeros((dim, dim), dtype=float) 226 for _i in range(dim): 227 binned_data_square[_i, :] = interpolate( 228 _t, self.time - t0, np.array(binned_data[_i, :]), neighbors=2 229 ) 230 return Spectrogram(data=binned_data_square, time=_t, freq=_f) 231 232 def plot(self, ax: Optional[Axes] = None): 233 """ 234 Plot the spectrogram. 235 236 Args: 237 ax: optionally plot onto a pre-existing matplotlib Axes 238 """ 239 if ax is None: 240 fig, ax = plt.subplots() 241 else: 242 fig = ax.get_figure() 243 a = ax.pcolormesh( 244 1e15 * self.time, 245 1e-12 * self.freq, 246 self.data / np.max(self.data[:]), 247 rasterized=True, 248 ) 249 ax.set_xlabel("Time (fs)") 250 ax.set_ylabel("Frequency (THz)") 251 plt.colorbar(a) 252 return fig 253 254 def plot_log(self, ax: Optional[Axes] = None): 255 """ 256 Plot the spectrogram. 257 258 Args: 259 ax: optionally plot onto a pre-existing matplotlib Axes 260 """ 261 if ax is None: 262 fig, ax = plt.subplots() 263 else: 264 fig = ax.get_figure() 265 logdata = np.array(self.data) 266 logdata[self.data > 0.0] = np.log(self.data[self.data > 0.0]) 267 logdata[self.data < 0.0] = 0.0 268 a = ax.pcolormesh( 269 1e15 * self.time, 270 1e-12 * self.freq, 271 logdata, 272 rasterized=True, 273 vmin=-11, 274 vmax=0, 275 ) 276 ax.set_xlabel("Time (fs)") 277 ax.set_ylabel("Frequency (THz)") 278 ax.grid(True, lw=1) 279 plt.colorbar(a) 280 return fig
120 def lock(self): 121 """ 122 Make the data immutable 123 """ 124 self.data.setflags(write=False) 125 self.time.setflags(write=False) 126 self.freq.setflags(write=False)
Make the data immutable
128 def save(self, filename): 129 """ 130 Save in the .A.dat file format used by FROG .etc 131 Args: 132 filename: the file to be saved 133 134 The file is structured like this: 135 [number of wavelengths] [number of times] 136 [minimum value of the trace] [maximum value of the trace] 137 [array of wavelengths] 138 [array of times] 139 [data array as single column] 140 """ 141 with open(filename, "w") as file: 142 file.write(f"{len(self.freq)}\t{len(self.time)}\n") 143 file.write(f"{np.min(self.data[:])}\t{np.max(self.data[:])}\n") 144 for freq in self.freq: 145 wavelength_nm = 1e9 * constants.speed_of_light / freq 146 file.write(f"{wavelength_nm:15.15g}\n") 147 for time in self.time: 148 time_fs = 1e15 * time 149 file.write(f"{time_fs:15.15g}\n") 150 for x in self.data: 151 for y in x: 152 file.write(f"{y:15.15g}\n")
Save in the .A.dat file format used by FROG .etc
Arguments:
- filename: the file to be saved
The file is structured like this: [number of wavelengths] [number of times] [minimum value of the trace] [maximum value of the trace] [array of wavelengths] [array of times] [data array as single column]
154 def to_block_binned(self, freq_bin: int, time_bin: int, method: str = "mean"): 155 """ 156 Apply block-binning to the spectrogram. 157 158 Args: 159 freq_bin (int): block size for averaging in the frequency direction 160 time_bin (int): block size for averaging in the time-direction 161 method (str): can be ```mean``` or ```median``` 162 """ 163 return Spectrogram( 164 data=block_binning_2d(self.data, time_bin, freq_bin, method), 165 freq=block_binning_1d(self.freq, freq_bin, "mean"), 166 time=block_binning_1d(self.time, time_bin, "mean"), 167 )
Apply block-binning to the spectrogram.
Arguments:
- freq_bin (int): block size for averaging in the frequency direction
- time_bin (int): block size for averaging in the time-direction
- method (str): can be
mean
ormedian
169 def to_per_frequency_dc_removed(self, extra_offset: float = 0.0): 170 """Perform DC offset removal on a measured spectrogram, on a per-frequency basis 171 172 Args: 173 extra_offset (float): subtract a value from the entire array (negative values are always set to zero) 174 175 Returns: 176 Spectrogram: the spectrogram with offset removed.""" 177 new_data = np.array(self.data) 178 new_data -= extra_offset 179 new_data[new_data < 0.0] = 0.0 180 for _i in range(new_data.shape[0]): 181 new_data[_i, :] -= np.min(new_data[_i, :]) 182 183 return Spectrogram(data=new_data, time=self.time, freq=self.freq)
Perform DC offset removal on a measured spectrogram, on a per-frequency basis
Arguments:
- extra_offset (float): subtract a value from the entire array (negative values are always set to zero)
Returns:
Spectrogram: the spectrogram with offset removed.
185 def to_symmetrized(self): 186 """ 187 Average the trace with a time-reversed copy. This might be useful for getting a reconstruction of difficult data, but keep in mind that the resulting measured trace will no longer represent the real measurement and should not be published as such. 188 """ 189 return Spectrogram( 190 data=0.5 * (self.data + np.fliplr(self.data)), 191 time=self.time, 192 freq=self.freq, 193 )
Average the trace with a time-reversed copy. This might be useful for getting a reconstruction of difficult data, but keep in mind that the resulting measured trace will no longer represent the real measurement and should not be published as such.
195 def to_binned( 196 self, 197 dim: int = 64, 198 dt: float = 5e-15, 199 t0: Optional[float] = None, 200 f0: float = 750e12, 201 ): 202 """Bin a spectrogram to a FFT-appropriate shape 203 204 Args: 205 dim (int): size of each size of the resulting square data 206 dt (float): time step of the data 207 t0: (Optional[float]): time-zero of the data. If not specified, will be calculated by the first moment of the time-distribution of the signal 208 f0: (float): central frequency of the binned array 209 210 Returns: 211 Spectrogram: the binned spectrogram 212 """ 213 _t = np.array(range(dim)) * dt 214 _t -= np.mean(_t) 215 _f = np.fft.fftshift(np.fft.fftfreq(dim, d=dt) + f0) 216 binned_data = np.zeros((dim, self.time.shape[0]), dtype=float) 217 for _i in range(self.time.shape[0]): 218 binned_data[:, _i] = interpolate( 219 _f, self.freq, np.array(self.data[:, _i]), neighbors=2 220 ) 221 binned_data /= np.max(binned_data[:]) 222 if t0 is None: 223 ac = np.sum(binned_data, axis=0) 224 t0 = np.sum(ac * self.time) / np.sum(ac) 225 binned_data_square = np.zeros((dim, dim), dtype=float) 226 for _i in range(dim): 227 binned_data_square[_i, :] = interpolate( 228 _t, self.time - t0, np.array(binned_data[_i, :]), neighbors=2 229 ) 230 return Spectrogram(data=binned_data_square, time=_t, freq=_f)
Bin a spectrogram to a FFT-appropriate shape
Arguments:
- dim (int): size of each size of the resulting square data
- dt (float): time step of the data
- t0: (Optional[float]): time-zero of the data. If not specified, will be calculated by the first moment of the time-distribution of the signal
- f0: (float): central frequency of the binned array
Returns:
Spectrogram: the binned spectrogram
232 def plot(self, ax: Optional[Axes] = None): 233 """ 234 Plot the spectrogram. 235 236 Args: 237 ax: optionally plot onto a pre-existing matplotlib Axes 238 """ 239 if ax is None: 240 fig, ax = plt.subplots() 241 else: 242 fig = ax.get_figure() 243 a = ax.pcolormesh( 244 1e15 * self.time, 245 1e-12 * self.freq, 246 self.data / np.max(self.data[:]), 247 rasterized=True, 248 ) 249 ax.set_xlabel("Time (fs)") 250 ax.set_ylabel("Frequency (THz)") 251 plt.colorbar(a) 252 return fig
Plot the spectrogram.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
254 def plot_log(self, ax: Optional[Axes] = None): 255 """ 256 Plot the spectrogram. 257 258 Args: 259 ax: optionally plot onto a pre-existing matplotlib Axes 260 """ 261 if ax is None: 262 fig, ax = plt.subplots() 263 else: 264 fig = ax.get_figure() 265 logdata = np.array(self.data) 266 logdata[self.data > 0.0] = np.log(self.data[self.data > 0.0]) 267 logdata[self.data < 0.0] = 0.0 268 a = ax.pcolormesh( 269 1e15 * self.time, 270 1e-12 * self.freq, 271 logdata, 272 rasterized=True, 273 vmin=-11, 274 vmax=0, 275 ) 276 ax.set_xlabel("Time (fs)") 277 ax.set_ylabel("Frequency (THz)") 278 ax.grid(True, lw=1) 279 plt.colorbar(a) 280 return fig
Plot the spectrogram.
Arguments:
- ax: optionally plot onto a pre-existing matplotlib Axes
32 def from_dict(cls, data: dict): 33 """ 34 Takes a dict and makes an instance of the class 35 36 Args: 37 data (dict): the result of a call of .to_dict on the class 38 """ 39 40 def handle_complex_array(serialized_array) -> np.ndarray: 41 """Helper function to deserialize numpy arrays, handling complex types""" 42 if isinstance(serialized_array, list) and all( 43 isinstance(item, dict) and "re" in item and "im" in item 44 for item in serialized_array 45 ): 46 return np.array( 47 [complex(item["re"], item["im"]) for item in serialized_array], 48 dtype=np.complex128, 49 ) 50 return np.array(serialized_array) 51 52 loaded_data = {} 53 for field_name, field_type in cls.__annotations__.items(): 54 if field_type is np.ndarray: 55 loaded_data[field_name] = handle_complex_array(data[field_name]) 56 elif is_dataclass(field_type): 57 loaded_data[field_name] = field_type.from_dict(data[field_name]) 58 else: 59 loaded_data[field_name] = data[field_name] 60 return cls(**loaded_data)
Takes a dict and makes an instance of the class
Arguments:
- data (dict): the result of a call of .to_dict on the class
62 def load_yaml(cls, filename: str): 63 """ 64 load from a yaml file 65 66 Args: 67 filename (str): path to the file 68 """ 69 with open(filename, "r") as file: 70 data = yaml.load(file, yaml.SafeLoader) 71 return cls.from_dict(data)
load from a yaml file
Arguments:
- filename (str): path to the file
84 def to_dict(instance): 85 """ 86 serialize the class into a dict 87 """ 88 data_dict = {} 89 for field_name, field_type in instance.__annotations__.items(): 90 field_value = getattr(instance, field_name) 91 if field_type is np.ndarray: 92 if field_value.dtype == np.complex128: 93 data_dict[field_name] = [ 94 {"re": num.real, "im": num.imag} for num in field_value.tolist() 95 ] 96 else: 97 data_dict[field_name] = field_value.tolist() 98 elif is_dataclass(field_type): 99 data_dict[field_name] = field_value.to_dict() 100 elif field_type is np.float64 or field_type is float: 101 data_dict[field_name] = float(field_value) 102 else: 103 data_dict[field_name] = field_value 104 return data_dict
serialize the class into a dict
73 def save_yaml(instance, filename: str): 74 """ 75 save to a yaml file 76 77 Args: 78 filename (str): path to the file 79 """ 80 data_dict = instance.to_dict() 81 with open(filename, "w") as file: 82 yaml.dump(data_dict, file)
save to a yaml file
Arguments:
- filename (str): path to the file