Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 134 additions & 26 deletions sotodlib/io/load_book.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
#: Signal DAC units are rescaled to phase before returning.
SIGNAL_RESCALE = np.pi / 2**15

#: Signal dtype used to cast / create new arrays as needed.
SIGNAL_DTYPE = 'float32'

DEG = np.pi / 180

TMPDIR_VAR = 'SOTODLIB_TOD_TMPDIR'
Expand Down Expand Up @@ -142,7 +145,7 @@ def load_obs_book(db, obs_id, dets=None, prefix=None, samples=None,
signal_buffer = None
if samples[1] is not None and not no_signal:
signal_buffer = np.empty((len(dets_req), samples[1] - samples[0]),
dtype='float32')
dtype=SIGNAL_DTYPE)

ancil = None
timestamps = None
Expand Down Expand Up @@ -314,11 +317,13 @@ def _load_book_detset(files, prefix='', load_ancil=True,
signal_acc = Accumulator2d(
samples=samples,
keys_to_keep=dets,
dtype=SIGNAL_DTYPE,
calibrate=SIGNAL_RESCALE)

if special_channels:
tones_acc = Accumulator2d(
samples=samples,
dtype=SIGNAL_DTYPE,
calibrate=SIGNAL_RESCALE)
else:
tones_acc = None
Expand Down Expand Up @@ -364,7 +369,10 @@ def _load_book_detset(files, prefix='', load_ancil=True,
more_data |= signal_acc.append(frame['signal'], frame_offset)

if 'untracked' in frame and special_channels:
more_data |= tones_acc.append(frame['untracked'], frame_offset)
# The len(names) check here is related to a shape check
# bug in case of empty arrays.
if len(frame['untracked'].names):
more_data |= tones_acc.append(frame['untracked'], frame_offset)

if 'flag_smurfgaps' in frame and 'smurfgaps' in flag_accs:
more_data |= flag_accs['smurfgaps'].append(frame['flag_smurfgaps'])
Expand Down Expand Up @@ -507,7 +515,7 @@ def _concat_filesets(results, ancil=None, timestamps=None,
aman.wrap('signal', signal_buffer,
[(0, 'dets'), (1, 'samps')])
else:
aman.wrap_new('signal', shape=('dets', 'samps'), dtype='float32')
aman.wrap_new('signal', shape=('dets', 'samps'), dtype=SIGNAL_DTYPE)
dets_ofs = 0
for v in results.values():
d = v['signal'].finalize()
Expand All @@ -534,7 +542,7 @@ def _concat_filesets(results, ancil=None, timestamps=None,
aman.tones.wrap('stream_id', ts, axis_map=[(0, 'tdets')])
aman.tones.wrap('band', tb, axis_map=[(0, 'tdets')])
aman.tones.wrap('channel', tc, axis_map=[(0, 'tdets')])
aman.tones.wrap_new('signal', shape=('tdets', 'samps'), dtype='float32')
aman.tones.wrap_new('signal', shape=('tdets', 'samps'), dtype=SIGNAL_DTYPE)
dets_ofs = 0
for v in results.values():
d = v['tones'].data
Expand Down Expand Up @@ -689,8 +697,10 @@ def append(self, data, preconsumed=None):
# self.samples; global sample indices of this block are:
block_samples = [self.consumed, self.consumed + data_count]

# Does this block precede our area of interest?
if block_samples[1] <= self.samples[0]:
# Does this block precede or follow our area of interest?
if (block_samples[1] <= self.samples[0]) \
or (self.samples[1] is not None
and block_samples[0] >= self.samples[1]):
self.consumed += data_count
return True

Expand All @@ -715,11 +725,11 @@ def append(self, data, preconsumed=None):
# Specialization ...
self._extract(data, src_slice, dest_slice)

if over_samples[1] != block_samples[1]:
return False

# It's important to keep this up to date even once finished
# reading.
self.consumed += data_count
return True

return (over_samples[1] == block_samples[1])


class Accumulator1d(Accumulator):
Expand Down Expand Up @@ -765,12 +775,7 @@ def _extract(self, data, src_slice, dest_slice):
if self.data is None:
if self.samples[1] is not None:
self.shape = (self.samples[1] - self.samples[0], )
if hasattr(data, 'names'):
# G3SuperTimestream ...
self.keys = [k for k in data.names]
else:
# G3TimesampleMap ...
self.keys = [k for k in data.keys()]
self.keys = [k for k in data.names]

if self.shape is not None:
# Determinate.
Expand All @@ -793,11 +798,15 @@ def finalize(self):


class AccumulatorTimesampleMap(AccumulatorNamed):
"""Accumulator for unpacking 2-d data (G3TimestampleMap) into
"""Accumulator for unpacking 2-d data (G3TimesampleMap) into
individual (named) 1-d vectors.

"""
def _extract(self, data, src_slice, dest_slice):
# This differs from AccumulatorNamed._extract in how the field
# names are determined, and in how the data vectors are
# extracted from the container.

# On first frame, check if we know the final data shape.
if self.data is None:
if self.samples[1] is not None:
Expand Down Expand Up @@ -825,27 +834,30 @@ class Accumulator2d(Accumulator):

"""
def __init__(self, *args, insert_at=None, keys_to_keep=None,
calibrate=None, **kwargs):
dtype=None, calibrate=None, **kwargs):
super().__init__(*args, **kwargs)
# An optional destination buffer for the data.
self.insert_at = insert_at
self.dtype = dtype
self.keys_to_keep = keys_to_keep
self.insert_at_idx = None
self.extract_at_idx = None
if self.insert_at is not None and self.samples[1] is None:
self.samples[1] = self.insert_at.shape[-1] + self.samples[0]
self.calibrate = calibrate
self._cal_preapplied = False

def _sample_count(self, _data):
return len(_data.times)

def _extract(self, data, src_slice, dest_slice):

if self.calibrate is not None:
if self.calibrate is not None and hasattr(data, 'calibrate'):
# This is a low cost operation if you do it before
# decompression. (Also do it before you use data.dtype,
# in "first frame stuff".)
data.calibrate(np.array([self.calibrate] * len(data.names)))
self._cal_preapplied = True

# First frame stuff ...
if self.data is None:
Expand All @@ -864,7 +876,8 @@ def _extract(self, data, src_slice, dest_slice):
# Set shape, if we know it.
if self.samples[1] is not None:
self.shape = (len(self.keys), self.samples[1] - self.samples[0])
self.data = np.empty(self.shape, data.dtype)
_dtype = self.dtype if self.dtype is not None else data.dtype
self.data = np.empty(self.shape, dtype=_dtype)
else:
self.data = []

Expand Down Expand Up @@ -892,25 +905,30 @@ def _extract(self, data, src_slice, dest_slice):
return

# Store data from this frame.
_data = data.data
del data
if self.calibrate is not None and not self._cal_preapplied:
_data = _data * self.calibrate

if self.insert_at is not None:
# Indexed by name
for i0, i1 in zip(self.insert_at_idx, self.extract_at_idx):
self.insert_at[i0, dest_slice] = data.data[i1, src_slice]
self.insert_at[i0, dest_slice] = _data[i1, src_slice]

elif self.shape is not None:
# Full array to hold data.
if self.extract_at_idx is not None:
for i, j in enumerate(self.extract_at_idx):
self.data[i, dest_slice] = data.data[j, src_slice]
self.data[i, dest_slice] = _data[j, src_slice]
else:
self.data[:, dest_slice] = data.data[:, src_slice]
self.data[:, dest_slice] = _data[:, src_slice]

else:
# List of arrays, to be hstacked later.
if self.extract_at_idx is not None:
self.data.append(data.data[self.extract_at_idx, src_slice])
self.data.append(_data[self.extract_at_idx, src_slice])
else:
self.data.append(data.data[:, src_slice])
self.data.append(_data[:, src_slice])

def finalize(self):
if self.insert_at is not None:
Expand Down Expand Up @@ -965,8 +983,9 @@ def _frames_iterator(files, prefix, samples, smurf_proc=None, use_temp_dir=None)
if smurf_proc is not None and smurf_proc.process(frame):
# We found a dump frame, so stop looking.
smurf_proc = None
if frame.type is not spt3g_core.G3FrameType.Scan:
if frame.type != spt3g_core.G3FrameType.Scan:
continue

yield frame, offset
offset += len(frame['ancil'].times)
# Alternately, use frame['sample_range']
Expand Down Expand Up @@ -1020,5 +1039,94 @@ def get_cal_obsids(ctx, obs_id, cal_type):

return obs_ids

#
# Testing
#

def _timev(t):
return spt3g_core.G3VectorTime(t * spt3g_core.G3Units.s)

if hasattr(so3g, 'G3SuperTimestream'):
def _supertimestream(*a, **kw):
return so3g.G3SuperTimestream(*a, **kw)
else:
def _supertimestream(keys, times, data):
return spt3g_core.G3TimestreamMap(keys, data, times[0], times[-1],
compression_level=5)

def _sim_g3_generator(dets, samps, stream_id=None, frame_size=200):
"""
Simulate book data and yield G3Frames.
"""
prims = ['Counter0', 'Counter2', 'UnixTimestamp']
bias_lines = ['bias%02i' % i for i in range(_TES_BIAS_COUNT)]
if stream_id is None:
stream_id = 'ufm_fako99'
untracked_dets = []#'sch_NONE_3_4']

if isinstance(dets, int):
dets = np.array(['det_%04i' % i for i in range(dets)])
nd = len(dets)
ns = samps

t0 = 1678900000.
f = 200.
times = np.arange(ns) / f + t0

test_det = 32
test_sig = (np.random.uniform(size=ns) * 100).astype('int32')

f = spt3g_core.G3Frame(spt3g_core.G3FrameType.Wiring)
wiring = {
'AMCc.SmurfProcessor.ChannelMapper.NumChannels': nd,
'AMCc.SmurfProcessor.ChannelMapper.Mask':
','.join(['%i' % i for i in range(nd)]),
'AMCc.FpgaTopLevel.AppTop.AppCore.SysgenCryo.Base[0].digitizerFrequency_MHz': 614.4,
'AMCc.FpgaTopLevel.AppTop.AppCore.RtmCryoDet.RampMaxCnt': 76799,
}
f['status'] = spt3g_core.G3String(yaml.dump(wiring))
f['dump'] = True
f['time'] = spt3g_core.G3Time(t0 * spt3g_core.G3Units.s)
yield f

offset = 0
while offset < ns:
_ns = min(frame_size, ns - offset)
_t = times[offset:offset+_ns]
_tv = _timev(_t)
f = spt3g_core.G3Frame(spt3g_core.G3FrameType.Scan)

# Metadata
f['stream_id'] = spt3g_core.G3String(stream_id)
f['sample_range'] = spt3g_core.G3VectorInt([offset, offset+_ns])

# Ancil
ancil = spt3g_core.G3TimesampleMap()
ancil.times = _tv
for k, m, b in [
('az_enc', 1., 270.),
('el_enc', 0., 55.),
('boresight_enc', 0, -45.)]:
ancil[k] = spt3g_core.G3VectorDouble(m * (_t - t0) + b)
f['ancil']= ancil

# Main smurf payloads
f['signal'] = _supertimestream(
dets, _tv, np.zeros((nd, _ns), dtype='int32'))
f['signal'].data[:] = (np.arange(nd)+1000)[:,None]
f['signal'].data[test_det,:] = test_sig[offset:offset+_ns]
f['primary'] = _supertimestream(
prims, _tv, np.zeros((len(prims), _ns), dtype='int64'))
f['tes_biases'] = _supertimestream(
bias_lines, _tv, np.zeros((len(bias_lines), _ns), dtype='int32'))
f['untracked'] = _supertimestream(
untracked_dets, _tv, np.zeros((len(untracked_dets), _ns), dtype='int32'))

# Flags
f['flag_smurfgaps'] = spt3g_core.G3VectorBool([False] * _ns)

yield f
offset += _ns


core.OBSLOADER_REGISTRY['obs-book'] = load_obs_book
55 changes: 55 additions & 0 deletions tests/test_load_book.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import unittest
import tempfile
import os

import numpy as np
from sotodlib import core
from sotodlib.io import load_book

# Import spt3g after so3g for backwards compat.
import so3g
from spt3g import core as spt3g_core


class TestLoadBook(unittest.TestCase):

def test_100_load_file(self):
ndet, nsamp = 100, 2000
with tempfile.TemporaryDirectory() as tempdir:
filenames = []
filename = os.path.join(tempdir, 'test_%03i.g3')
wi, w, wfc = 0, None, -1
for f in load_book._sim_g3_generator(ndet, nsamp):
if (wfc := (wfc + 1) % 5) == 0:
w = spt3g_core.G3Writer(filename % wi)
filenames.append(filename % wi)
wi += 1
w.Process(f)
del w

assert len(filenames) > 1

# Load it all
aman0 = load_book.load_book_file(filenames)
assert aman0.dets.count == ndet
assert aman0.samps.count == nsamp

# Load sample subset
sslice = slice(123, 1435)
aman1 = load_book.load_book_file(
filenames, samples=(sslice.start, sslice.stop))
assert (aman1.signal == aman0.signal[:, sslice]).all()

# Load det subset
sub_idx = [12, 43, 35, 98]
sub_dets = [aman0.dets.vals[i] for i in sub_idx]
aman1 = load_book.load_book_file(
filenames, dets=sub_dets)
for i, j in enumerate(sub_idx):
assert (aman1.signal[i] == aman0.signal[j]).all()

# Load both subset
aman1 = load_book.load_book_file(
filenames, samples=(sslice.start, sslice.stop), dets=sub_dets)
for i, j in enumerate(sub_idx):
assert (aman1.signal[i] == aman0.signal[j, sslice]).all()
Loading