diff --git a/sotodlib/io/load_book.py b/sotodlib/io/load_book.py index 00e2df766..3e8410f92 100644 --- a/sotodlib/io/load_book.py +++ b/sotodlib/io/load_book.py @@ -50,6 +50,9 @@ #: Signal DAC units are rescaled to phase before returning. SIGNAL_RESCALE = np.pi / 2**15 +#: Signal dtype used to cast / create new arrays as needed. +SIGNAL_DTYPE = 'float32' + DEG = np.pi / 180 TMPDIR_VAR = 'SOTODLIB_TOD_TMPDIR' @@ -142,7 +145,7 @@ def load_obs_book(db, obs_id, dets=None, prefix=None, samples=None, signal_buffer = None if samples[1] is not None and not no_signal: signal_buffer = np.empty((len(dets_req), samples[1] - samples[0]), - dtype='float32') + dtype=SIGNAL_DTYPE) ancil = None timestamps = None @@ -314,11 +317,13 @@ def _load_book_detset(files, prefix='', load_ancil=True, signal_acc = Accumulator2d( samples=samples, keys_to_keep=dets, + dtype=SIGNAL_DTYPE, calibrate=SIGNAL_RESCALE) if special_channels: tones_acc = Accumulator2d( samples=samples, + dtype=SIGNAL_DTYPE, calibrate=SIGNAL_RESCALE) else: tones_acc = None @@ -364,7 +369,10 @@ def _load_book_detset(files, prefix='', load_ancil=True, more_data |= signal_acc.append(frame['signal'], frame_offset) if 'untracked' in frame and special_channels: - more_data |= tones_acc.append(frame['untracked'], frame_offset) + # The len(names) check here is related to a shape check + # bug in case of empty arrays. + if len(frame['untracked'].names): + more_data |= tones_acc.append(frame['untracked'], frame_offset) if 'flag_smurfgaps' in frame and 'smurfgaps' in flag_accs: more_data |= flag_accs['smurfgaps'].append(frame['flag_smurfgaps']) @@ -507,7 +515,7 @@ def _concat_filesets(results, ancil=None, timestamps=None, aman.wrap('signal', signal_buffer, [(0, 'dets'), (1, 'samps')]) else: - aman.wrap_new('signal', shape=('dets', 'samps'), dtype='float32') + aman.wrap_new('signal', shape=('dets', 'samps'), dtype=SIGNAL_DTYPE) dets_ofs = 0 for v in results.values(): d = v['signal'].finalize() @@ -534,7 +542,7 @@ def _concat_filesets(results, ancil=None, timestamps=None, aman.tones.wrap('stream_id', ts, axis_map=[(0, 'tdets')]) aman.tones.wrap('band', tb, axis_map=[(0, 'tdets')]) aman.tones.wrap('channel', tc, axis_map=[(0, 'tdets')]) - aman.tones.wrap_new('signal', shape=('tdets', 'samps'), dtype='float32') + aman.tones.wrap_new('signal', shape=('tdets', 'samps'), dtype=SIGNAL_DTYPE) dets_ofs = 0 for v in results.values(): d = v['tones'].data @@ -689,8 +697,10 @@ def append(self, data, preconsumed=None): # self.samples; global sample indices of this block are: block_samples = [self.consumed, self.consumed + data_count] - # Does this block precede our area of interest? - if block_samples[1] <= self.samples[0]: + # Does this block precede or follow our area of interest? + if (block_samples[1] <= self.samples[0]) \ + or (self.samples[1] is not None + and block_samples[0] >= self.samples[1]): self.consumed += data_count return True @@ -715,11 +725,11 @@ def append(self, data, preconsumed=None): # Specialization ... self._extract(data, src_slice, dest_slice) - if over_samples[1] != block_samples[1]: - return False - + # It's important to keep this up to date even once finished + # reading. self.consumed += data_count - return True + + return (over_samples[1] == block_samples[1]) class Accumulator1d(Accumulator): @@ -765,12 +775,7 @@ def _extract(self, data, src_slice, dest_slice): if self.data is None: if self.samples[1] is not None: self.shape = (self.samples[1] - self.samples[0], ) - if hasattr(data, 'names'): - # G3SuperTimestream ... - self.keys = [k for k in data.names] - else: - # G3TimesampleMap ... - self.keys = [k for k in data.keys()] + self.keys = [k for k in data.names] if self.shape is not None: # Determinate. @@ -793,11 +798,15 @@ def finalize(self): class AccumulatorTimesampleMap(AccumulatorNamed): - """Accumulator for unpacking 2-d data (G3TimestampleMap) into + """Accumulator for unpacking 2-d data (G3TimesampleMap) into individual (named) 1-d vectors. """ def _extract(self, data, src_slice, dest_slice): + # This differs from AccumulatorNamed._extract in how the field + # names are determined, and in how the data vectors are + # extracted from the container. + # On first frame, check if we know the final data shape. if self.data is None: if self.samples[1] is not None: @@ -825,27 +834,30 @@ class Accumulator2d(Accumulator): """ def __init__(self, *args, insert_at=None, keys_to_keep=None, - calibrate=None, **kwargs): + dtype=None, calibrate=None, **kwargs): super().__init__(*args, **kwargs) # An optional destination buffer for the data. self.insert_at = insert_at + self.dtype = dtype self.keys_to_keep = keys_to_keep self.insert_at_idx = None self.extract_at_idx = None if self.insert_at is not None and self.samples[1] is None: self.samples[1] = self.insert_at.shape[-1] + self.samples[0] self.calibrate = calibrate + self._cal_preapplied = False def _sample_count(self, _data): return len(_data.times) def _extract(self, data, src_slice, dest_slice): - if self.calibrate is not None: + if self.calibrate is not None and hasattr(data, 'calibrate'): # This is a low cost operation if you do it before # decompression. (Also do it before you use data.dtype, # in "first frame stuff".) data.calibrate(np.array([self.calibrate] * len(data.names))) + self._cal_preapplied = True # First frame stuff ... if self.data is None: @@ -864,7 +876,8 @@ def _extract(self, data, src_slice, dest_slice): # Set shape, if we know it. if self.samples[1] is not None: self.shape = (len(self.keys), self.samples[1] - self.samples[0]) - self.data = np.empty(self.shape, data.dtype) + _dtype = self.dtype if self.dtype is not None else data.dtype + self.data = np.empty(self.shape, dtype=_dtype) else: self.data = [] @@ -892,25 +905,30 @@ def _extract(self, data, src_slice, dest_slice): return # Store data from this frame. + _data = data.data + del data + if self.calibrate is not None and not self._cal_preapplied: + _data = _data * self.calibrate + if self.insert_at is not None: # Indexed by name for i0, i1 in zip(self.insert_at_idx, self.extract_at_idx): - self.insert_at[i0, dest_slice] = data.data[i1, src_slice] + self.insert_at[i0, dest_slice] = _data[i1, src_slice] elif self.shape is not None: # Full array to hold data. if self.extract_at_idx is not None: for i, j in enumerate(self.extract_at_idx): - self.data[i, dest_slice] = data.data[j, src_slice] + self.data[i, dest_slice] = _data[j, src_slice] else: - self.data[:, dest_slice] = data.data[:, src_slice] + self.data[:, dest_slice] = _data[:, src_slice] else: # List of arrays, to be hstacked later. if self.extract_at_idx is not None: - self.data.append(data.data[self.extract_at_idx, src_slice]) + self.data.append(_data[self.extract_at_idx, src_slice]) else: - self.data.append(data.data[:, src_slice]) + self.data.append(_data[:, src_slice]) def finalize(self): if self.insert_at is not None: @@ -965,8 +983,9 @@ def _frames_iterator(files, prefix, samples, smurf_proc=None, use_temp_dir=None) if smurf_proc is not None and smurf_proc.process(frame): # We found a dump frame, so stop looking. smurf_proc = None - if frame.type is not spt3g_core.G3FrameType.Scan: + if frame.type != spt3g_core.G3FrameType.Scan: continue + yield frame, offset offset += len(frame['ancil'].times) # Alternately, use frame['sample_range'] @@ -1020,5 +1039,94 @@ def get_cal_obsids(ctx, obs_id, cal_type): return obs_ids +# +# Testing +# + +def _timev(t): + return spt3g_core.G3VectorTime(t * spt3g_core.G3Units.s) + +if hasattr(so3g, 'G3SuperTimestream'): + def _supertimestream(*a, **kw): + return so3g.G3SuperTimestream(*a, **kw) +else: + def _supertimestream(keys, times, data): + return spt3g_core.G3TimestreamMap(keys, data, times[0], times[-1], + compression_level=5) + +def _sim_g3_generator(dets, samps, stream_id=None, frame_size=200): + """ + Simulate book data and yield G3Frames. + """ + prims = ['Counter0', 'Counter2', 'UnixTimestamp'] + bias_lines = ['bias%02i' % i for i in range(_TES_BIAS_COUNT)] + if stream_id is None: + stream_id = 'ufm_fako99' + untracked_dets = []#'sch_NONE_3_4'] + + if isinstance(dets, int): + dets = np.array(['det_%04i' % i for i in range(dets)]) + nd = len(dets) + ns = samps + + t0 = 1678900000. + f = 200. + times = np.arange(ns) / f + t0 + + test_det = 32 + test_sig = (np.random.uniform(size=ns) * 100).astype('int32') + + f = spt3g_core.G3Frame(spt3g_core.G3FrameType.Wiring) + wiring = { + 'AMCc.SmurfProcessor.ChannelMapper.NumChannels': nd, + 'AMCc.SmurfProcessor.ChannelMapper.Mask': + ','.join(['%i' % i for i in range(nd)]), + 'AMCc.FpgaTopLevel.AppTop.AppCore.SysgenCryo.Base[0].digitizerFrequency_MHz': 614.4, + 'AMCc.FpgaTopLevel.AppTop.AppCore.RtmCryoDet.RampMaxCnt': 76799, + } + f['status'] = spt3g_core.G3String(yaml.dump(wiring)) + f['dump'] = True + f['time'] = spt3g_core.G3Time(t0 * spt3g_core.G3Units.s) + yield f + + offset = 0 + while offset < ns: + _ns = min(frame_size, ns - offset) + _t = times[offset:offset+_ns] + _tv = _timev(_t) + f = spt3g_core.G3Frame(spt3g_core.G3FrameType.Scan) + + # Metadata + f['stream_id'] = spt3g_core.G3String(stream_id) + f['sample_range'] = spt3g_core.G3VectorInt([offset, offset+_ns]) + + # Ancil + ancil = spt3g_core.G3TimesampleMap() + ancil.times = _tv + for k, m, b in [ + ('az_enc', 1., 270.), + ('el_enc', 0., 55.), + ('boresight_enc', 0, -45.)]: + ancil[k] = spt3g_core.G3VectorDouble(m * (_t - t0) + b) + f['ancil']= ancil + + # Main smurf payloads + f['signal'] = _supertimestream( + dets, _tv, np.zeros((nd, _ns), dtype='int32')) + f['signal'].data[:] = (np.arange(nd)+1000)[:,None] + f['signal'].data[test_det,:] = test_sig[offset:offset+_ns] + f['primary'] = _supertimestream( + prims, _tv, np.zeros((len(prims), _ns), dtype='int64')) + f['tes_biases'] = _supertimestream( + bias_lines, _tv, np.zeros((len(bias_lines), _ns), dtype='int32')) + f['untracked'] = _supertimestream( + untracked_dets, _tv, np.zeros((len(untracked_dets), _ns), dtype='int32')) + + # Flags + f['flag_smurfgaps'] = spt3g_core.G3VectorBool([False] * _ns) + + yield f + offset += _ns + core.OBSLOADER_REGISTRY['obs-book'] = load_obs_book diff --git a/tests/test_load_book.py b/tests/test_load_book.py new file mode 100644 index 000000000..daf6cb5e4 --- /dev/null +++ b/tests/test_load_book.py @@ -0,0 +1,55 @@ +import unittest +import tempfile +import os + +import numpy as np +from sotodlib import core +from sotodlib.io import load_book + +# Import spt3g after so3g for backwards compat. +import so3g +from spt3g import core as spt3g_core + + +class TestLoadBook(unittest.TestCase): + + def test_100_load_file(self): + ndet, nsamp = 100, 2000 + with tempfile.TemporaryDirectory() as tempdir: + filenames = [] + filename = os.path.join(tempdir, 'test_%03i.g3') + wi, w, wfc = 0, None, -1 + for f in load_book._sim_g3_generator(ndet, nsamp): + if (wfc := (wfc + 1) % 5) == 0: + w = spt3g_core.G3Writer(filename % wi) + filenames.append(filename % wi) + wi += 1 + w.Process(f) + del w + + assert len(filenames) > 1 + + # Load it all + aman0 = load_book.load_book_file(filenames) + assert aman0.dets.count == ndet + assert aman0.samps.count == nsamp + + # Load sample subset + sslice = slice(123, 1435) + aman1 = load_book.load_book_file( + filenames, samples=(sslice.start, sslice.stop)) + assert (aman1.signal == aman0.signal[:, sslice]).all() + + # Load det subset + sub_idx = [12, 43, 35, 98] + sub_dets = [aman0.dets.vals[i] for i in sub_idx] + aman1 = load_book.load_book_file( + filenames, dets=sub_dets) + for i, j in enumerate(sub_idx): + assert (aman1.signal[i] == aman0.signal[j]).all() + + # Load both subset + aman1 = load_book.load_book_file( + filenames, samples=(sslice.start, sslice.stop), dets=sub_dets) + for i, j in enumerate(sub_idx): + assert (aman1.signal[i] == aman0.signal[j, sslice]).all()