Source code for cngi.dio.write_vis

#  CASA Next Generation Infrastructure
#  Copyright (C) 2021 AUI, Inc. Washington DC, USA
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
this module will be included in the api
"""


'''
To Do:
1. zarr.consolidate_metadata(outfile) is very slow for a zarr group (datatset) with many chunks (there is a python for loop that checks each file). We might have to implement our own version. This is also important for cngi.dio.append_zarr
'''

[docs]def write_vis(mxds, outfile, chunks_on_disk=None, partition=None, consolidated=True, compressor=None, graph_name='write_zarr'): """ Write xarray dataset to zarr format on disk. When chunks_on_disk is not specified the chunking in the input dataset is used. When chunks_on_disk is specified that dataset is saved using that chunking. Parameters ---------- mxds : xarray.core.dataset.Dataset Dataset of dataset to write to disk outfile : str outfile filename, generally ends in .zarr chunks_on_disk : dict of int A dictionary with the chunk size that will be used when writing to disk. For example {'time': 20, 'chan': 6}. If chunks_on_disk is not specified the chunking of dataset will be used. partition : str or list Name of partition xds to write into outfile (from the mxds attributes section). Overwrites existing partition of same name. Default None writes entire mxds compressor : numcodecs.blosc.Blosc The blosc compressor to use when saving the converted data to disk using zarr. If None the zstd compression algorithm used with compression level 2. graph_name : string The time taken to execute the graph and save the dataset is measured and saved as an attribute in the zarr file. The graph_name is the label for this timing information. Returns ------- """ import xarray as xr import zarr import time from numcodecs import Blosc from itertools import cycle import os import numpy as np if compressor is None: compressor = Blosc(cname='zstd', clevel=2, shuffle=0) if partition is None: partition = list(mxds.attrs.keys()) partition = list(np.atleast_1d(partition)) os.system("rm -fr " + outfile) os.system("mkdir " + outfile) for xds_name in partition: if "xds" in xds_name: xds_outfile = outfile + '/' + xds_name xds_for_disk = mxds.attrs[xds_name] if chunks_on_disk is not None: xds_for_disk = xds_for_disk.chunk(chunks=chunks_on_disk) else: xds_outfile = outfile + '/global/' + xds_name xds_for_disk = mxds.attrs[xds_name] # Create compression encoding for each datavariable encoding = dict(zip(list(xds_for_disk.data_vars), cycle([{'compressor': compressor}]))) start = time.time() # Consolidated is set to False so that the timing information is included in the consolidate metadata. xr.Dataset.to_zarr(xds_for_disk, store=xds_outfile, mode='w', encoding=encoding,consolidated=False) time_to_calc_and_store = time.time() - start print('Time to store and execute graph for ', xds_name, graph_name, time_to_calc_and_store) #Add timing information dataset_group = zarr.open_group(xds_outfile, mode='a') dataset_group.attrs[graph_name+'_time'] = time_to_calc_and_store if consolidated == True: zarr.consolidate_metadata(xds_outfile)