diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 2e58169..252f6d9 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -7,6 +7,6 @@ resources: endpoint: github jobs: - - template: collection-2021-1.0.yml@templates + - template: 2022-2.3-py39.yml@templates parameters: beamline_acronym: LIX diff --git a/startup/00-base.py b/startup/00-base.py index f598c54..8b75aab 100644 --- a/startup/00-base.py +++ b/startup/00-base.py @@ -60,7 +60,7 @@ def __call__(self, *args, **kwargs): RE = CustomRunEngine() -nslsii.configure_base(get_ipython().user_ns, 'lix', bec=True, pbar=False) +nslsii.configure_base(get_ipython().user_ns, 'lix', bec=True, pbar=False, publish_documents_with_kafka=True) def reload_macros(file='~/.ipython/profile_collection/startup/99-macros.py'): ipy = get_ipython() diff --git a/startup/02-vars.py b/startup/02-vars.py index eeead70..1cde96e 100644 --- a/startup/02-vars.py +++ b/startup/02-vars.py @@ -1,2 +1,22 @@ -current_cycle = '2022-1' -pilatus_data_dir = "/exp_path/hdf" +from enum import Enum + +class data_file_path(Enum): + old_gpfs = '/GPFS/xf16id/exp_path' + lustre_legacy = '/nsls2/data/lix/legacy' + lustre_asset = '/nsls2/data/lix/asset' + lustre_proposals = '/nsls2/data/lix/proposals' + gpfs = '/nsls2/xf16id1/data' + gpfs_experiments = '/nsls2/xf16id1/experiments' + ramdisk = '/exp_path' + +current_cycle = '2022-2' +pilatus_data_dir = data_file_path.lustre_legacy.value +#pilatus_data_dir = f"{data_file_path.ramdisk.value}/hdf" +data_destination = data_file_path.lustre_legacy.value # this is where all IOC data files should eventually go +#proc_destination = data_file_path.lustre_proposals.value +proc_destination = data_file_path.gpfs_experiments.value +procdir_prefix = "pass-" + +bl_comm_proposal = "310032" + + diff --git a/startup/03-security.py b/startup/03-security.py index 0e4b756..59ea374 100644 --- a/startup/03-security.py +++ b/startup/03-security.py @@ -1,16 +1,39 @@ -import os,stat,time,shutil +import os +import re +import shutil +import stat +import subprocess +import time + from IPython import get_ipython +from lixtools.atsas import run as run_cmd username = None proposal_id = None run_id = None data_path = "" -collection_lock_file = "/nsls2/xf16id1/.lock" -okay_to_move_file = "/nsls2/xf16id1/.okay_to_move" +collection_lock_file = f"{data_file_path.lustre_legacy.value}/.lock" login_time = -1 -def login(uname = None, pID = None, rID = None, debug=True, test_only=False, - root_path='/nsls2/xf16id1', replace_froot=pilatus_data_dir, share_with=[]): +def check_access(fn): + if not os.path.exists(fn): + raise Exception(f"{fn} does not exist ...") + if os.access(fn, os.W_OK): + print(f"write access to {fn} verified ...") + return + + # this below may not be necessary + out = run_cmd(["getfacl", "-cn", fn]) + wgrps = [int(t[:-4].lstrip("group:")) for t in re.findall("groups:[0-9]*:rw.", out)] + ugrps = os.getgroups() + if len(set(wgrps) & set(ugrps))==0: + print("groups with write permission: ", wgrps) + print("user group membership: ", ugrps) + raise Exception(f"the current user does not have write access to {fn}") + else: + print(f"write access to {fn} verified ...") + +def login(uname = None, pID = None, rID = None, debug=True, test_only=False): """ Ask the user for his credentials and proposal information for the data collection create_proc_dir: if True, create the directory where h5 files will be saved share_with: list of e-mails to share the proc_path with @@ -20,7 +43,6 @@ def login(uname = None, pID = None, rID = None, debug=True, test_only=False, global proposal_id global run_id global data_path - global o_data_path global proc_path global login_time @@ -35,8 +57,11 @@ def login(uname = None, pID = None, rID = None, debug=True, test_only=False, correct_info = True elif test_only: username = "lix" - proposal_id = "test" - run_id = "test" + proposal_id = bl_comm_proposal + if rID: + run_id = rID + else: + run_id = "test" correct_info = True while not correct_info: @@ -51,28 +76,16 @@ def login(uname = None, pID = None, rID = None, debug=True, test_only=False, RE.md['proposal_id'] = proposal_id RE.md['run_id'] = run_id login_time = time.time() - - if test_only: - path = f"{root_path}/data/" - else: - path = f"{root_path}/data/{current_cycle}/" - rpath = str(proposal_id)+"/"+str(run_id)+"/" - data_path = path + rpath - makedirs(data_path, mode=0o0777) - RE.md['data_path'] = data_path - o_data_path = data_path # for IOCs on xf16id-ioc1 - - if replace_froot is not None: - if replace_froot[-1]!="/": - replace_froot+="/" - data_path = data_path.replace(path, replace_froot) - makedirs(data_path, mode=0o0777) - #input(f"make sure {data_path} exists on the detector conputer. Hit any key to continue ...") - - if test_only: - proc_path = data_path - else: - proc_path = f"{root_path}/experiments/{current_cycle}/{proposal_id}/{run_id}/" + + #rpath = f"{proposal_id}/{run_id}/" + #data_path = f"{data_destination}/{rpath}" + # makedirs(data_path, mode=0o0777) this will be created by the IOC? + data_path = f"{data_destination}/%s/{current_cycle}/{proposal_id}/{run_id}/" + RE.md['data_path'] = data_path # different IOCs will be writing into subdirectories + + proc_path = f"{proc_destination}/{current_cycle}/{procdir_prefix}{proposal_id}/" + check_access(proc_path) + proc_path += f"{run_id}/" RE.md['proc_path'] = proc_path if not os.path.isdir(proc_path): makedirs(proc_path, mode=0o2755) @@ -86,16 +99,13 @@ def login(uname = None, pID = None, rID = None, debug=True, test_only=False, elif os.path.isfile(f"{db[-1].start['proc_path']}/exp.h5"): shutil.copy(f"{db[-1].start['proc_path']}/exp.h5", f"{proc_path}") - if len(share_with)>0: - share_dir(proc_path, share_with) - dw,mo,da,tt,yr = time.asctime().split() if not os.path.isdir(proc_path+"log"): os.mkdir(proc_path+"log") logfile = proc_path+("log/%s." % username)+yr+mo+("%02d_" % int(da))+tt.replace(':', '') ip = get_ipython() - ip.magic("logstop") - ip.magic("logstart -ort %s" % logfile) + ip.run_line_magic("logstop", "") + ip.run_line_magic("logstart", f"-ort {logfile}") ip.logger.log_write(f"**LOGIN** {username} @ {time.asctime()}\n") if debug: @@ -106,7 +116,15 @@ def print_time_callback(name, doc): print("#STARTDOC : {}".format(time.ctime(t1))) RE.subscribe(print_time_callback) - +def get_IOC_datapath(ioc_name, substitute_path=None): + if data_path=="": + print("login first to specify data path:") + login() + if substitute_path: + return data_path.replace(data_destination, substitute_path)%ioc_name + else: + return data_path%ioc_name + def write_log_msg(msg): ip = get_ipython() ip.logger.log_write(msg) @@ -155,12 +173,6 @@ def change_path(): if username==None or proposal_id==None or run_id==None: login() - - # to be safe, need to have some kind of lock - #get_lock() - #if os.path.exists(link_to_data_path): - # os.remove(link_to_data_path) - #os.symlink(data_path, link_to_data_path) def logoff(quiet=False): @@ -189,12 +201,10 @@ def logoff(quiet=False): data_path = "" proc_path = None - del RE.md['owner'] - del RE.md['proposal_id'] - del RE.md['run_id'] - del RE.md['data_path'] - del RE.md['proc_path'] + for k in ['owner', 'proposal_id', 'run_id', 'data_path', 'proc_path']: + if k in RE.md.keys(): + del RE.md[k] ip = get_ipython() - ip.magic("logstop") + ip.run_line_magic("logstop", "") diff --git a/startup/04-sample.py b/startup/04-sample.py index 9b518a4..7831473 100644 --- a/startup/04-sample.py +++ b/startup/04-sample.py @@ -1,14 +1,8 @@ import glob,re -from enum import Enum - -class data_file_path(Enum): - old_gpfs = '/GPFS/xf16id/exp_path' - gpfs = '/nsls2/xf16id1/data' - ramdisk = '/exp_path' current_sample="test" -def check_sample_name(sample_name, sub_dir=None, check_for_duplicate=True, check_dir=False): +def check_sample_name(sample_name, ioc_name="pil1M", sub_dir=None, check_for_duplicate=True, check_dir=False): if len(sample_name)>42: # file name length limit for Pilatus detectors print("Error: the sample name is too long:", len(sample_name)) return False @@ -18,7 +12,7 @@ def check_sample_name(sample_name, sub_dir=None, check_for_duplicate=True, check return False if check_for_duplicate: - f_path = data_path + f_path = data_path%ioc_name if sub_dir is not None: f_path += ('/'+sub_dir+'/') #if PilatusFilePlugin.froot == data_file_path.ramdisk: @@ -47,7 +41,11 @@ def change_sample(sample_name=None, check_sname=True, exception=True): if sample_name is None or sample_name == "": sample_name = "test" elif check_sname: - ret = check_sample_name(sample_name) #, exception) + try: + ioc_name = pil.active_detectors[0].name + except: + ioc_name = "pil1M" + ret = check_sample_name(sample_name, ioc_name=ioc_name) #, exception) if ret==False and exception: raise Exception() diff --git a/startup/10-motors.py b/startup/10-motors.py index 1985d24..b981a43 100644 --- a/startup/10-motors.py +++ b/startup/10-motors.py @@ -214,7 +214,7 @@ def inverse(self, pos): sg1 = SlitsCenterAndGap('XF:16IDC-OP{Slt:G1', name='sg1') ## Guard Slits 2 -sg2 = SlitsCenterAndGap('XF:16IDC-OP{Slt:G2', name='sg2') +sg2 = Blades('XF:16IDC-OP{Slt:G2', name='sg2') ######################################### ## Detector System diff --git a/startup/10-vacuum.py b/startup/10-vacuum.py index 1849631..3b5f6b6 100644 --- a/startup/10-vacuum.py +++ b/startup/10-vacuum.py @@ -49,7 +49,7 @@ def pressure(self): elif Pr==">1.0E+03": P0 = 1.1e3 else: - P0 = np.float(Pr) + P0 = float(Pr) return P0 diff --git a/startup/19-EM_sol_robot.py b/startup/19-EM_sol_robot.py index 4fd8c76..32a7dbc 100644 --- a/startup/19-EM_sol_robot.py +++ b/startup/19-EM_sol_robot.py @@ -38,11 +38,11 @@ def execute(self,*args): def waitReady(timeout=-1): state = PV("SW:State") - start = time.clock() + start = time.perf_counter() while True: if state.get() not in ("Running","Moving","Busy","Initialize"): break - if (timeout>=0) and ((time.clock() - start) > timeout): + if (timeout>=0) and ((time.perf_counter() - start) > timeout): raise Exception("Timeout waiting ready") time.sleep(0.01) diff --git a/startup/20-bpm.py b/startup/20-bpm.py index 7e6a14f..d2bca60 100644 --- a/startup/20-bpm.py +++ b/startup/20-bpm.py @@ -89,7 +89,5 @@ class NSLS_EM1(NSLS_EM): em0.read_attrs = ['x_position', 'y_position'] #tetramm = TetrAMM('XF:16IDC-ES{TETRAMM:1}', name='tetramm') - - bpm = Best('XF:16IDB-CT{Best}', name='bpm') diff --git a/startup/20-detectors.py b/startup/20-detectors.py index 2d8d56e..01e0d40 100644 --- a/startup/20-detectors.py +++ b/startup/20-detectors.py @@ -1,7 +1,3 @@ -#from ophyd import (ProsilicaDetector, SingleTrigger, TIFFPlugin, -# ImagePlugin, DetectorBase, HDF5Plugin, -# AreaDetector, EpicsSignal, EpicsSignalRO, ROIPlugin, -# TransformPlugin, ProcessPlugin) from ophyd.areadetector.detectors import ProsilicaDetector from ophyd.areadetector.plugins import ImagePlugin_V34 as ImagePlugin from ophyd.areadetector.plugins import TIFFPlugin_V34 as TIFFPlugin @@ -26,13 +22,10 @@ def make_filename(self): return fname, read_path, write_path def stage(self): - global o_data_path,proposal_id,run_id,current_cycle,current_sample - # rpath = f"{o_data_path}/tif/" - rpath = f"/nsls2/data/lix/legacy/{self.parent.name}/{current_cycle}/{proposal_id}/{run_id}/{current_sample}" - #makedirs(rpath, mode=0o0777) - #set_and_wait(self.file_path, rpath) + global proposal_id, run_id, current_cycle, current_sample + rpath = f"{get_IOC_datapath(self.parent.name)}/{current_sample}" self.write_path_template = rpath - self.create_directory.put(-5) # create up to 4 levels: ioc, cycle, pid, rid + self.create_directory.put(-6) # create up to 6 levels: ioc, cycle, pid, rid, ???, ?? super().stage() class StandardProsilica(ProsilicaDetector, SingleTrigger): @@ -51,7 +44,7 @@ class StandardProsilica(ProsilicaDetector, SingleTrigger): stats3 = Cpt(StatsPlugin, 'Stats3:') tiff = Cpt(TIFFPluginWithFileStore, suffix='TIFF1:', - write_path_template='/nsls2/xf16id1/data/', # this is updated when the plug in is staged + write_path_template='/nsls2/xf16id1/data/', # this is updated when the plugin is staged ) def __init__(self, *args, detector_id=None, **kwargs): diff --git a/startup/20-pilatus.py b/startup/20-pilatus.py index 3576e9c..3a608a8 100644 --- a/startup/20-pilatus.py +++ b/startup/20-pilatus.py @@ -73,7 +73,7 @@ def stage(self): self.file_number.get() - 1) self._fp = read_path if not self.file_path_exists.get(): - raise IOError("Path %s does not exist on IOC." + raise IOError("Path %s does not exist on IOC server." "" % self.file_path.get()) @@ -100,7 +100,7 @@ def get_frames_per_point(self): def stage(self): super().stage() res_kwargs = {'frame_per_point': self.get_frames_per_point()} - self._generate_resource(res_kwargs) + self._generate_resource(res_kwargs) class LIXhdfPlugin(HDF5Plugin, LiXFileStoreHDF5): @@ -109,7 +109,7 @@ class LIXhdfPlugin(HDF5Plugin, LiXFileStoreHDF5): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.fnbr = 0 + self.fnbr = 0 def make_filename(self): ''' replaces FileStorePluginBase.make_filename() @@ -122,15 +122,15 @@ def make_filename(self): write_path : str Path that the IOC can write to ''' - global data_path,current_sample - + global current_sample + filename = f"{current_sample}_{self.parent.detector_id}" - #write_path = f"/nsls2/data/lix/legacy/{self.parent.name}/{current_cycle}/{proposal_id}/{run_id}/{current_sample}/" - write_path = data_path if self.sub_directory is None else f"{data_path}/{self.sub_directory}" + write_path = get_IOC_datapath(self.parent.name, pilatus_data_dir) + if self.sub_directory: + write_path += f"/{self.sub_directory}" read_path = write_path # might want to handle this differently, this shows up in res/db - #read_path = self.parent.cbf_file_path.get() return filename, read_path, write_path - + #def stage(self): # """ need to set the number of images to collect and file path # """ @@ -140,79 +140,46 @@ def make_filename(self): # filename, read_path, write_path = self.make_filename() # self._fn = self.file_template.get() % (read_path, filename, self.fnbr) # set_and_wait(self.full_file_name, self._fn) - #def unstage(self): # self.fnbr = self.file_number.get() # super().unstage() - + def get_frames_per_point(self): if self.parent.trigger_mode is PilatusTriggerMode.ext: return self.parent.parent._num_images else: return 1 - + class LIXPilatus(PilatusDetector): hdf = Cpt(LIXhdfPlugin, suffix="HDF1:", write_path_template="", root='/') - + cbf_file_path = ADComponent(EpicsSignalWithRBV, 'cam1:FilePath', string=True) cbf_file_name = ADComponent(EpicsSignalWithRBV, 'cam1:FileName', string=True) cbf_file_number = ADComponent(EpicsSignalWithRBV, 'cam1:FileNumber') HeaderString = Cpt(EpicsSignal, "cam1:HeaderString") ThresholdEnergy = Cpt(EpicsSignal, "cam1:ThresholdEnergy") armed = Cpt(EpicsSignal, "cam1:Armed") - flatfield = Cpt(EpicsSignal, "cam1:FlatFieldFile") - ff_minv = Cpt(EpicsSignal, "cam1:MinFlatField") - ff_valid = Cpt(EpicsSignalRO, "cam1:FlatFieldValid") def __init__(self, *args, hostname, detector_id, **kwargs): self.detector_id = detector_id self.hostname = hostname super().__init__(*args, **kwargs) - + self._acquisition_signal = self.cam.acquire self._counter_signal = self.cam.array_counter self.set_cbf_file_default(f"/ramdisk/{self.name}/", "current") - self.ts = [] + self.ts = [] if self.hdf.run_time.get()==0: # first time using the plugin self.hdf.warmup() - + def update_cbf_name(self, cn=None): if cn is None: #ts = time.localtime() #cn = f"{ts.tm_year}-{ts.tm_mon:02d}-{ts.tm_mday:02d}.{ts.tm_hour:02d}{ts.tm_min:02d}{ts.tm_sec:02d}" cn = time.asctime().replace(" ","_").replace(":", "") self.set_cbf_file_default(f"/ramdisk/{self.name}/", cn) - - def set_flatfield(self, fn, minV=100): - """ do some changing first - make sure that the image size is correct and the values are reasonable - - documentation on the PV: - - Name of a file to be used to correct for the flat field. If this record does not point to a valid - flat field file then no flat field correction is performed. The flat field file is simply a TIFF - or CBF file collected by the Pilatus that is used to correct for spatial non-uniformity in the - response of the detector. It should be collected with a spatially uniform intensity on the detector - at roughly the same energy as the measurements being corrected. When the flat field file is read, - the average pixel value (averageFlatField) is computed using all pixels with intensities - >PilatusMinFlatField. All pixels with intensity 0: break time.sleep(0.5) + print("trouble getting group status, retrying ...: ", err,ret) + self._moving = bool(int(ret.split(',')[self.controller.motors[self.motorName]['index']])) - return self._moving @property @@ -385,11 +386,11 @@ def describe_collect(self): '''Describe details for the flyer collect() method''' ret = {} ret[self.traj_par['fast_axis']] = {'dtype': 'number', - 'shape': (1,), + 'shape': (len(self.read_back['fast_axis']),), 'source': 'PVT trajectory readback position'} if self.motor2 is not None: ret[self.traj_par['slow_axis']] = {'dtype': 'number', - 'shape': (1,), + 'shape': (len(self.read_back['slow_axis']),), 'source': 'motor position readback'} for det in pil.active_detectors: ret[f'{det.name}_image'] = det.make_data_key() @@ -425,7 +426,7 @@ def define_traj(self, motor, N, dx, dt, motor2=None, dy=0, Nr=2): self.flying_motor = self.controller.motors[self.motors[motor.name]]['ophyd'] err,ret = self.xps.PositionerMaximumVelocityAndAccelerationGet(self.sID, self.motors[motor.name]) - mvel,macc = np.asarray(ret.split(','), dtype=np.float) + mvel,macc = np.asarray(ret.split(','), dtype=float) midx = self.controller.motors[self.motors[motor.name]]['index'] jj = np.zeros(Nr+N+Nr) diff --git a/startup/33-CBFhandler.py b/startup/33-CBFhandler.py deleted file mode 100644 index f599cf9..0000000 --- a/startup/33-CBFhandler.py +++ /dev/null @@ -1,122 +0,0 @@ -import os -from databroker.assets.handlers_base import HandlerBase -from databroker.assets.base_registry import DuplicateHandler -import fabio - -# for backward compatibility, fpp was always 1 before Jan 2018 -#global pilatus_fpp -#pilatus_fpp = 1 - -# this is used by the CBF file handler -from enum import Enum -class triggerMode(Enum): - software_trigger_single_frame = 1 - software_trigger_multi_frame = 2 - external_trigger = 3 - fly_scan = 4 - #external_trigger_multi_frame = 5 # this is unnecessary, difference is fpp - -#global pilatus_trigger_mode -#global default_data_path_root -#global substitute_data_path_root -#global CBF_replace_data_path - -#pilatus_trigger_mode = triggerMode.software_trigger_single_frame - -# if the cbf files have been moved already -#CBF_replace_data_path = False - -class PilatusCBFHandler(HandlerBase): - specs = {'AD_CBF'} | HandlerBase.specs - froot = data_file_path.gpfs - subdir = None - trigger_mode = triggerMode.software_trigger_single_frame - # assuming that the data files always have names with these extensions - std_image_size = { - 'SAXS': (1043, 981), - 'WAXS1': (619, 487), - 'WAXS2': (1043, 981) # orignal WAXS2 was (619, 487) - } - - def __init__(self, rpath, template, filename, frame_per_point=1, initial_number=1): - print(f'Initializing CBF handler for {self.trigger_mode} ...') - self._template = template - self._fpp = frame_per_point - self._filename = filename - self._initial_number = initial_number - self._image_size = None - self._default_path = os.path.join(rpath, '') - self._path = "" - - for k in self.std_image_size: - if template.find(k)>=0: - self._image_size = self.std_image_size[k] - if self._image_size is None: - raise Exception(f'Unrecognized data file extension in filename template: {template}') - - for fr in data_file_path: - if self._default_path.find(fr.value)==0: - self._dir = self._default_path[len(fr.value):] - return - raise Exception(f"invalid file path: {self._default_path}") - - def update_path(self): - # this is a workaround for data that are save in /exp_path then moved to /nsls2/xf16id1/exp_path - if not self.froot in data_file_path: - raise Exception(f"invalid froot: {self.froot}") - self._path = self.froot.value+self._dir - print(f"updating path, will read data from {self._path} ...") - - def get_data(self, fn): - """ the file may not exist - """ - try: - img = fabio.open(fn) - data = img.data - if data.shape!=self._image_size: - print(f'got incorrect image size from {fn}: {data.shape}') #, return an empty frame instead.') - except: - print(f'could not read {fn}, return an empty frame instead.') - data = np.zeros(self._image_size) - #print(data.shape) - return data - - def __call__(self, point_number): - start = self._initial_number #+ point_number - stop = start + 1 - ret = [] - - tplt = self._template.replace("6.6d", "06d") # some early templates are not correctly formatted - tl = tplt.replace(".", "_").split("_") - # e.g. ['%s%s', '%06d', 'SAXS', 'cbf'], ['%s%s', '%06d', 'SAXS', '%05d', 'cbf'] - # resulting in file names like test_000125_SAXS.cbf vs test_000125_SAXS_00001.cbf - if self.trigger_mode != triggerMode.software_trigger_single_frame and self._fpp>1: - # the template needs to have two number fileds - if len(tl)==4: - tl = tl[:-1]+["%05d"]+tl[-1:] - elif len(tl)==5: - tl = tl[:-2]+tl[-1:] - self._template = "_".join(tl[:-1])+"."+tl[-1] - - print("CBF handler called: start=%d, stop=%d" % (start, stop)) - print(" ", self._initial_number, point_number, self._fpp) - print(" ", self._template, self._path, self._initial_number) - self.update_path() - if self.subdir is not None: - self._path += f"{self.subdir}/" - - if self.trigger_mode == triggerMode.software_trigger_single_frame or self._fpp == 1: - fn = self._template % (self._path, self._filename, self._initial_number+point_number) - ret.append(self.get_data(fn)) - elif self.trigger_mode in [triggerMode.software_trigger_multi_frame, - triggerMode.fly_scan]: - for i in range(self._fpp): - fn = self._template % (self._path, self._filename, start, point_number+i) - ret.append(self.get_data(fn)) - elif self.trigger_mode==triggerMode.external_trigger: - fn = self._template % (self._path, self._filename, start, point_number) - ret.append(self.get_data(fn)) - - return np.array(ret).squeeze() - -db.reg.register_handler('AD_CBF', PilatusCBFHandler, overwrite=True) diff --git a/startup/39-original_suitcase.py b/startup/39-original_suitcase.py index e5d997c..60e6653 100644 --- a/startup/39-original_suitcase.py +++ b/startup/39-original_suitcase.py @@ -6,15 +6,13 @@ # The full license is in the file LICENSE, distributed with this software. #------------------------------------------------------------------------- -import collections +from collections.abc import Mapping import numpy as np import warnings import h5py import json -import copy +import copy, shutil from databroker import Header -import copy -import dask.dataframe as dd def conv_to_list(d): if isinstance(d, float) or isinstance(d, int) or isinstance(d, str): @@ -28,14 +26,45 @@ def conv_to_list(d): return d1 def update_res_path(res_path, replace_res_path={}): - #res_path = res["resource_path"] for rp1,rp2 in replace_res_path.items(): print("updating resource path ...") if rp1 in res_path: res_path = res_path.replace(rp1, rp2) return res_path -def hdf5_export(headers, filename, +def locate_h5_resource(res, replace_res_path, debug=False): + """ this is intended to move h5 file created by Pilatus + these files are originally saved on PPU RAMDISK, but should be moved to the IOC data directory + this function will look for the file at the original location, and relocate the file first if it is there + and return the h5 dataset + """ + fn_orig = res["root"] + res["resource_path"] + fn = update_res_path(fn_orig, replace_res_path) + if debug: + print(f"resource locations: {fn_orig} -> {fn}") + + if not(os.path.exists(fn_orig) or os.path.exists(fn)): + print(f"could not locate the resource at either {fn} or {fn_orig} ...") + raise Exception + if os.path.exists(fn_orig) and os.path.exists(fn) and fn_orig!=fn: + print(f"both {fn} and {fn_orig} exist, resolve the conflict manually first ..." ) + raise Exception + if not os.path.exists(fn): + fdir = os.path.dirname(fn) + if not os.path.exists(fdir): + makedirs(fdir, mode=0o2775) + if debug: + print(f"copying {fn_orig} to {fdir}") + tfn = fn+"_partial" + shutil.copy(fn_orig, tfn) + os.rename(tfn, fn) + os.remove(fn_orig) + + hf5 = h5py.File(fn, "r") + return hf5, hf5["/entry/data/data"] + + +def hdf5_export(headers, filename, debug=False, stream_name=None, fields=None, bulk_h5_res=True, timestamps=True, use_uid=True, db=None, replace_res_path={}): """ @@ -73,6 +102,7 @@ def hdf5_export(headers, filename, headers = [headers] with h5py.File(filename, "w") as f: + #f.swmr_mode = True # Unable to start swmr writing (file superblock version - should be at least 3) for header in headers: try: db = header.db @@ -85,7 +115,9 @@ def hdf5_export(headers, filename, for n,d in header.documents(): if n=="resource": res_docs[d['uid']] = d - + if debug: + print("res_docs:\n", res_docs) + try: descriptors = header.descriptors except KeyError: @@ -105,6 +137,8 @@ def hdf5_export(headers, filename, if descriptor['name'] != stream_name: continue descriptor.pop('_name', None) + if debug: + print(f"processing stream {stream_name}") if use_uid: desc_group = group.create_group(descriptor['uid']) @@ -119,14 +153,19 @@ def hdf5_export(headers, filename, events = list(header.events(stream_name=descriptor['name'], fill=False)) res_dict = {} - if "filled" in events[0].keys(): - for k in list(events[0]['filled'].keys()): + for k, v in list(events[0]['data'].items()): + if not isinstance(v, str): + continue + if v.split('/')[0] in res_docs.keys(): res_dict[k] = [] for ev in events: res_uid = ev['data'][k].split("/")[0] if not res_uid in res_dict[k]: res_dict[k].append(res_uid) - + + if debug: + print("res_dict:\n", res_dict) + event_times = [e['time'] for e in events] desc_group.create_dataset('time', data=event_times, compression='gzip', fletcher32=True) @@ -148,22 +187,25 @@ def hdf5_export(headers, filename, fletcher32=True) if key in list(res_dict.keys()): - res = res_docs[res_dict[key][0]] + res = res_docs[res_dict[key][0]] + print(f"processing resource ...\n", res) + + # pilatus data, change the path from ramdisk to IOC data directory + if key in ["pil1M_image", "pilW2_image"]: + rp = {pilatus_data_dir: data_destination} + if res['spec'] == "AD_HDF5" and bulk_h5_res: rawdata = None N = len(res_dict[key]) + print(f"copying data from source h5 file(s) directly, N={N} ...") if N==1: - res = res_docs[res_dict[key][0]] - hf5 = h5py.File(res["root"]+update_res_path(res["resource_path"], replace_res_path), "r") - data = hf5["/entry/data/data"] + hf5, data = locate_h5_resource(res_docs[res_dict[key][0]], replace_res_path=rp, debug=debug) data_group.copy(data, key) hf5.close() dataset = data_group[key] else: # ideally this should never happen, only 1 hdf5 file/resource per scan for i in range(N): - res = res_docs[res_dict[key][i]] - hf5 = h5py.File(res["root"]+update_res_path(res["resource_path"], replace_res_path), "r") - data = hf5["/entry/data/data"] + hf5, data = locate_h5_resource(res_docs[res_dict[key][i]]) if i==0: dataset = data_group.create_dataset( key, shape=(N, *data.shape), @@ -172,9 +214,11 @@ def hdf5_export(headers, filename, dataset[i,:] = data hf5.close() else: + print(f"getting resource data using handlers ...") rawdata = header.table(stream_name=descriptor['name'], fields=[key], fill=True)[key] # this returns the time stamps as well else: + print(f"compiling resource data from individual events ...") rawdata = [e['data'][key] for e in events] if rawdata is not None: @@ -195,12 +239,12 @@ def hdf5_export(headers, filename, dataset = data_group.create_dataset( key, data=data, compression='gzip') else: - raise ValueError('Array of str with ndim >= 3 can not be saved.') + raise ValueError(f'Array of str with ndim >= 3 can not be saved: {key}') else: # save numerical data - try: + try: if isinstance(rawdata, list): blk = rawdata[0] - else: + else: blk = rawdata[1] if isinstance(blk, np.ndarray): # detector image data = np.vstack(rawdata) @@ -219,7 +263,7 @@ def hdf5_export(headers, filename, data = np.array(conv_to_list(rawdata)) # issue with list of lists chunks = False dataset = data_group.create_dataset( - key, data=data, + key, data=data, compression='gzip', fletcher32=True) except: raise @@ -236,7 +280,7 @@ def _clean_dict(d): d = dict(d) for k, v in list(d.items()): # Store dictionaries as JSON strings. - if isinstance(v, collections.Mapping): + if isinstance(v, Mapping): d[k] = _clean_dict(d[k]) continue try: diff --git a/startup/40-hdf5.py b/startup/40-hdf5.py index 8bb8ee3..57adcdc 100644 --- a/startup/40-hdf5.py +++ b/startup/40-hdf5.py @@ -1,14 +1,9 @@ -#from suitcase import hdf5 #,nexus # available in suitcase 0.6 - import h5py,json,os import threading import numpy as np import epics,socket from collections import deque -#global default_data_path_root -#global substitute_data_path_root -#global DET_replace_data_path global proc_path def lsh5(hd, prefix='', top_only=False): @@ -46,9 +41,33 @@ def pack_h5_with_lock(*args, **kwargs): ret = None pack_h5_lock.release() return ret + + +def compile_replace_res_path(h): + """ protocol prior to May 2022: + md['data_path'] specifies the directories all data files are supposed to go + e.g. /nsls2/xf16id1/data/2022-1/310121/308824 + the original location of the data is recorded in the databroker, but not in the meta data + however, this location should follow the format of the {pilatus_data_dir}/{proposal_id}/{run_id} + protocol since May 2022: + md['data_path'] specifies where all IOC data are supposed to go + e.g. /nsls2/data/lix/legacy/%s/2022-1/310032/test + md['pilatus']['ramdisk'] specifies where the Pilatus data are originally saved + e.g. /exp_path/hdf + """ + md = h.start + ret = {} + dpath = md['data_path'] + try: + ret[md['pilatus']['ramdisk']] = dpath.split("%s")[0] + except: + cycle_id = re.search("20[0-9][0-9]-[0-9]", dpath)[0] + ret[pilatus_data_dir] = dpath.split(cycle_id)[0]+cycle_id + return ret + def pack_h5(uids, dest_dir='', fn=None, fix_sample_name=True, stream_name=None, - attach_uv_file=False, delete_old_file=True, include_motor_pos=True, + attach_uv_file=False, delete_old_file=True, include_motor_pos=True, debug=False, fields=['em2_current1_mean_value', 'em2_current2_mean_value', 'em1_sum_all_mean_value', 'em2_sum_all_mean_value', 'em2_ts_SumAll', 'em1_ts_SumAll', 'xsp3_spectrum_array_data', "pilatus_trigger_time", @@ -81,6 +100,10 @@ def pack_h5(uids, dest_dir='', fn=None, fix_sample_name=True, stream_name=None, fn = header.table(fields=[f])[f][1] headers = [header] + # if replace_res_path is not specified, try to figure out whether it is necessary + if len(replace_res_path)==0: + replace_res_path = compile_replace_res_path(headers[0]) + fds0 = headers[0].fields() # only these fields are considered relevant to be saved in the hdf5 file fds = list(set(fds0) & set(fields)) @@ -103,7 +126,8 @@ def pack_h5(uids, dest_dir='', fn=None, fix_sample_name=True, stream_name=None, pass print(fds) - hdf5_export(headers, fn, fields=fds, stream_name=stream_name, use_uid=False, replace_res_path=replace_res_path) #, mds= db.mds, use_uid=False) + hdf5_export(headers, fn, fields=fds, stream_name=stream_name, use_uid=False, + replace_res_path=replace_res_path, debug=debug) #, mds= db.mds, use_uid=False) # by default the groups in the hdf5 file are named after the scan IDs if fix_sample_name: @@ -196,8 +220,8 @@ def send_to_packing_queue(uid, data_type, froot=data_file_path.gpfs, move_first= print(f"scan {uid} was not successful.") return - PilatusCBFHandler.froot = data_file_path[froot.name] - threading.Thread(target=pack_and_move, args=(data_type,uid,proc_path,move_first,)).start() + threading.Thread(target=pack_and_process, args=(data_type,uid,proc_path,)).start() + #threading.Thread(target=pack_and_process, args=(data_type,uid,proc_path,move_first,)).start() print("processing thread started ...") @@ -216,31 +240,7 @@ def send_to_packing_queue_remote(uid, datatype, froot=data_file_path.gpfs, move_ s.send(msg.encode('ascii')) s.close() - -def move_files_from_RAMDISK(uids, dir_name=None): - for uid in uids: - print(f'moving files for {uid} from RAMDISK to GPFS ...') - h = db[uid] - p1 = h.start['data_path'] - #p2 = p1.replace(default_data_path_root, '/ramdisk/') - p2 = p1.replace(data_file_path.gpfs.value, '/ramdisk') - sample_name = h.start['sample_name'] - if dir_name is not None: - cmd = f"rsync -ahv --remove-source-files det@10.16.0.14:{p2}{dir_name} {p1}" - elif "subdir" in h.start.keys(): - subdir = h.start['subdir'] - cmd = f"rsync -ahv --remove-source-files det@10.16.0.14:{p2}{subdir} {p1}{subdir}" - elif os.system(f"ssh -q det@10.16.0.14 [[ -d {p2}{sample_name} ]]")==0: - # if sample name is a directory on the RAMDISK, move the entire directory - cmd = f"rsync -ahv --remove-source-files det@10.16.0.14:{p2}{sample_name} {p1}" - else: - cmd = f"rsync -ahv --remove-source-files det@10.16.0.14:{p2}{sample_name}_*.* {p1}" - #os.system(f"rsync -ahv --remove-source-files det@10.16.0.14:{p2}{h.start['sample_name']}_*.log {p1}") - print(cmd) - os.system(cmd) - - -def pack_and_move(data_type, uid, dest_dir, move_first=True): +def pack_and_process(data_type, uid, dest_dir): # useful for moving files from RAM disk to GPFS during fly scans # # assume other type of data are saved on RAM disk as well (GPFS not working for WAXS2) @@ -248,7 +248,6 @@ def pack_and_move(data_type, uid, dest_dir, move_first=True): #global pilatus_trigger_mode #,CBF_replace_data_path print(f"packing: {data_type}, {uid}, {dest_dir}") - print(f"data source: {PilatusCBFHandler.froot}") t0 = time.time() # if the dest_dir contains exp.h5, read detectors/qgrid from it try: @@ -257,29 +256,11 @@ def pack_and_move(data_type, uid, dest_dir, move_first=True): dt_exp = None dir_name = None - original_file_path = PilatusCBFHandler.froot - - if PilatusCBFHandler.froot==data_file_path.ramdisk and move_first: - print("move files to GPFS first ...") - if isinstance(uid, str): - uids = [uid] - else: - uids = uid - hdr = db[uids[0]].start - if 'holderName' in list(hdr.keys()): - dir_name = hdr['holderName'] - move_files_from_RAMDISK(uids, dir_name) - PilatusCBFHandler.froot=data_file_path.gpfs if data_type in ["multi", "sol", "mscan", "mfscan"]: uids = uid.split('|') if data_type=="sol": sb_dict = json.loads(uids.pop()) - PilatusCBFHandler.trigger_mode = triggerMode.fly_scan - elif data_type=="mfscan": - PilatusCBFHandler.trigger_mode = triggerMode.fly_scan - else: - PilatusCBFHandler.trigger_mode = triggerMode.external_trigger ## assume that the meta data contains the holderName if 'holderName' not in list(db[uids[0]].start.keys()): print("cannot find holderName from the header, using tmp.h5 as filename ...") @@ -303,24 +284,15 @@ def pack_and_move(data_type, uid, dest_dir, move_first=True): dt.load_data(debug="quiet") dt.fh5.close() del dt,dt_exp - if fh5_name is not "tmp.h5": # temporary fix, for some reason other processes cannot open the packed file + if fh5_name != "tmp.h5": # temporary fix, for some reason other processes cannot open the packed file os.system(f"cd {dest_dir} ; cp tmp.h5 {fh5_name} ; rm tmp.h5") - try: - gen_report(fh5_name) - except: - pass + if data_type == "sol": + try: + gen_report(fh5_name) + except: + pass elif data_type=="HPLC": uids = [uid] - if db[uid].start['plan_name']=="hplc_scan": - # this was software_trigger_single_frame when using the flyer-based hplc_scan - PilatusCBFHandler.trigger_mode = triggerMode.software_trigger_single_frame - else: - # data collected using ct - # could be ct(num=1) (software multiframe trigger), or ct(num=N) (hardware trigger) - if db[uid].start['num_points']==1: - PilatusCBFHandler.trigger_mode = triggerMode.software_trigger_multi_frame - else: - PilatusCBFHandler.trigger_mode = triggerMode.external_trigger fn = pack_h5_with_lock(uid, dest_dir=dest_dir, attach_uv_file=True) if fn is not None and dt_exp is not None: print('procesing ...') @@ -329,10 +301,6 @@ def pack_and_move(data_type, uid, dest_dir, move_first=True): dt.fh5.close() del dt,dt_exp elif data_type=="flyscan" or data_type=="scan": - if data_type=="flyscan": - PilatusCBFHandler.trigger_mode = triggerMode.fly_scan - else: - PilatusCBFHandler.trigger_mode = triggerMode.external_trigger uids = [uid] fn = pack_h5_with_lock(uid, dest_dir=dest_dir) else: @@ -343,11 +311,6 @@ def pack_and_move(data_type, uid, dest_dir, move_first=True): return # packing unsuccessful, print(f"{time.asctime()}: finished packing/processing, total time lapsed: {time.time()-t0:.1f} sec ...") - if PilatusCBFHandler.froot==data_file_path.ramdisk and not move_first: - move_files_from_RAMDISK(uids) - - original_file_path = PilatusCBFHandler.froot - def process_packing_queue(): """ this should only run on xf16idc-gpu1, moved to srv1 Mar 2022 @@ -381,10 +344,8 @@ def process_packing_queue(): if db[uid].stop['exit_status'] != 'success': # the scan actually finished print(f"scan {uid} was not successful.") return - #print(uid) - #print(path) - PilatusCBFHandler.froot = data_file_path[frn] - threading.Thread(target=pack_and_move, args=(data_type,uid,path,move_first,)).start() + + threading.Thread(target=pack_and_process, args=(data_type,uid,path,)).start() print("processing thread started ...") diff --git a/startup/other_profiles/metadata_user_solution.py b/startup/other_profiles/metadata_user_solution.py index 9e41cc2..732519d 100644 --- a/startup/other_profiles/metadata_user_solution.py +++ b/startup/other_profiles/metadata_user_solution.py @@ -2,7 +2,6 @@ 'waxs1': {'x':waxs1.x.position, 'y':waxs1.y.position, 'z':waxs1.z.position}, 'waxs2': {'x':waxs2.x.position, 'y':waxs2.y.position, 'z':waxs2.z.position}, }) -del saxs,waxs1,waxs2 RE.md['optics'] = ({'wbm_y': wbm.y.position, 'wbm_pitch': wbm.pitch.position, @@ -13,7 +12,6 @@ 'vfm_y1': vfm.y1.position, 'vfm_y2': vfm.y2.position, }) -del wbm,hfm,vfm if crl: RE.md['CRL'] = ({'state': crl.state(), @@ -23,13 +21,11 @@ 'y1': crl.y2.position, 'z': crl.z.position, }) - del crl else: RE.md['CRL'] = 'undefined' def update_metadata(): print('updating meta data ...', end='') -# RE.md['BPM']['beam position'] = ({'x': best.x_mean.get(), 'y': best.y_mean.get()}) RE.md['energy'] = ({'mono_bragg': mono.bragg.position, 'energy': pseudoE.energy.position, 'gap': pseudoE.IVUgap.position @@ -41,7 +37,7 @@ def update_metadata(): }) RE.md['BPM'] = ({'stage position': {'x': bpm_pos.x.position, 'y': bpm_pos.y.position}, - 'beam position': {'x': best.x_mean.get(), 'y': best.y_mean.get()} + 'beam position': {'x': bpm.x_mean.get(), 'y': bpm.y_mean.get()} }) - #RE.md['XBPM'] = XBPM_pos() + RE.md['XBPM'] = xbpm.pos() print('Done.')