Source code for cngi.image.reframe

#  CASA Next Generation Infrastructure
#  Copyright (C) 2021 AUI, Inc. Washington DC, USA
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
this module will be included in the api
"""

[docs]def reframe( ixds, outframe=None, reference_time=None, observer_location=None, target_location=None, reference_frequency=None, # velocity=None, ): """ Change the velocity system of an image .. warn:: This function is stil being implemented .. todo: Use refpix attribute (if present) to improve accuracy of target_location assignment .. todo: Account for epoch, distance, and velocity (if present in input attributes) in target_location and observer_location .. todo: decide whether to drop velocity coordinate (if assigned) Parameters ---------- xds : xarray.core.dataset.Dataset input Image outframe : str reference frame to which the input will be converted reference_time : numpy.datetime64 or datetime, optional any valid initializer of astropy.time.TimeBase should work observer_location : astropy.cordinates.SkyCoord, optional position and velocity of observer for frame transformation target_location : astropy.cordinates.SkyCoord, optional position and velocity of source for frame transformation reference_frequency : astropy.units.Quantity, optional value and unit to use when expressing the spectral value as a velocity, input to SpectralCoord doppler_rest parameter Returns ------- xarray.core.dataset.Dataset output Image """ from astropy import units from astropy.coordinates import EarthLocation, SkyCoord, SpectralCoord from astropy.time import Time from datetime import datetime import xarray as xr import numpy as np # create a working object _ixds = ixds # it would be better if the axis attributes were a single dict _ixds.attrs["units_map"] = dict( zip(_ixds.attrs["axisnames"], _ixds.attrs["axisunits"]) ) # Assigning reference_time if reference_time is None: try: reference_time = _ixds["time"].values[0] except: reference_time = datetime.now() finally: print(f"Adopted reference time: {reference_time}") # Assigning observer_location if observer_location is None: try: telpos = [ xx.strip(",[]()") for xx in _ixds.attrs["telescope_position"].split(" ") ] telcoord = [float(xyz.strip("m")) for xyz in telpos[:-1]] if telpos[-1].lower() == "itrf": observer_location = SkyCoord( x=telcoord[0], y=telcoord[1], z=telcoord[2], unit=units.Unit("m"), frame="itrs", obstime=_ixds["time"].values[0], ) else: raise NotImplementedError( "Unsupported frame accompanying telescope_position attribute of input dataset" ) except ValueError: print("Failed to find valid telescope postition from dataset attributes") print("Defaulting to astropy-data telescope location") observer_location = EarthLocation.of_site( _ixds.attrs["telescope"] ).get_itrs(obstime=Time(_ixds["time"].values[0])) finally: print(f"Adopted reference postion: {observer_location}") # Assigning reference_frequency if reference_frequency is None: try: reference_frequency = units.Quantity(_ixds.attrs["rest_frequency"]) except ValueError: print("Failed to assign valid rest frequency from dataset attributes") print("Attempting to parse and convert") try: malformed_quantity = _ixds.attrs["rest_frequency"].split(" ") if isinstance(malformed_quantity[0], str): value = float(malformed_quantity[0]) # this is where pint-xarray would be useful # note that we'll have to extend this logic to handle a variety of inputs, e.g., km/s if malformed_quantity[1].lower() == "hz": unit = "Hz" reference_frequency = units.Quantity(value, unit) except (AttributeError, ValueError): # AttributeError: seems like the attribute is not a string # ValueError: seems like attribute can't be converted to float print("Failed to convert dataset attributes to valid rest frequency") print("Assuming 100 GHz, this may not be desired") value, unit = 100, "GHz" reference_frequency = units.Quantity(value, unit) finally: print(f"Adopted reference frequency: {reference_frequency}") # Assigning target_location if target_location is None: try: target_location = SkyCoord.from_name( _ixds.attrs["object_name"], frame=_ixds.attrs["spectral__reference"] ) except: # NameResolveError # Here is where we could add another try block to check for velocity attributes in the ixds # Also check for refpix attribute in the input instead of having to calculate it # If not available and we couldn't get pointing center in terms of coordinates... print( f"Direction reference '{_ixds.attrs['pointing_center']}' is not a valid SkyCoord initializer" ) target_location = SkyCoord( _ixds["right_ascension"][_ixds.l.size // 2][_ixds.m.size // 2].values * units.Unit(ixds.attrs["units_map"]["Right Ascension"]), _ixds["declination"][_ixds.l.size // 2][_ixds.m.size // 2].values * units.Unit(_ixds.attrs["units_map"]["Declination"]), frame=_ixds.attrs["spectral__reference"], # if target frame velocity reference is available, it would be easiest to assign it here ) # note that we could include telescope dependent conditionals (e.g., FK5 vs ICRS) # these could likely be based on ixds.attrs["direction_reference"] finally: print(f"Adopted target location: {target_location}") if target_location.is_transformable_to(outframe): pass else: print("Input to outframe argument incompatible with target position") print( "Adding velocity coordinate for per-channel assignment to target_location" ) # if velocity information isn't present in the input # (and it won't be because that logic hasn't been encoded due to available test data) # then calculate a velocity for each channel and assign as dataset coordinate, # for selection inside the mapped function (_change_frame) measured_frequencies = _ixds.chan.values * units.Unit( _ixds.attrs["units_map"]["Frequency"] ) radio_equiv = units.doppler_radio(reference_frequency) measured_velocities = measured_frequencies.to( units.km / units.s, equivalencies=radio_equiv ) _ixds = _ixds.assign_coords(velocity=("chan", measured_velocities.value)) _ixds.attrs["units_map"][ "Velocity" ] = measured_velocities.unit.to_string() # Defining a function to apply the transformation across the chan axis def _change_frame(ixds_block, observer, target, frequency, frame): """ This function will be called using xr.map_blocks reference http://xarray.pydata.org/en/stable/generated/xarray.map_blocks.html and https://xarray-contrib.github.io/xarray-tutorial/scipy-tutorial/06_xarray_and_dask.html#map_blocks """ if ("velocity" in ixds_block.coords) and ( "Velocity" in ixds_block.attrs["units_map"] ): # update target to use velo coordinate value for this block # since this is inside map_blocks, the only velo/chan available is the one we want to use chan_vel = ixds_block.velocity.values * units.Unit( ixds_block.attrs["units_map"]["Velocity"] ) # assignment via `SkyCoord(target, radial_velocity=chan_vel)` doesn't seem to work # see # https://docs.astropy.org/en/stable/coordinates/velocities.html#adding-velocities-to-existing-frame-objects # https://docs.astropy.org/en/stable/coordinates/representations.html#attaching-differential-objects-to-representation-objects target = target.data.with_differentials({'s':RadialDifferential(chan_vel)}) for xda in ixds_block.data_vars: # we don't want to transform boolean data arrays # we also don't want to if there is no chan dimension if ixds_block[xda].dtype != bool and "chan" in ixds_block[xda].dims: # ixds_block[xda].values is the computed ndarray for this block sc = SpectralCoord( ixds_block[xda].values, unit=ixds_block.attrs["units_map"]["Frequency"], observer=observer, target=target, doppler_rest=frequency, doppler_convention=ixds_block.attrs["velocity__type"], ) # see https://docs.astropy.org/en/stable/coordinates/spectralcoord.html#specifying-an-observer-and-a-target-explicitly new_sc = sc.with_observer_stationary_relative_to(frame) ixds_block[xda] = new_sc.values return ixds_block # Apply the function to the data # The parallelism happens inside this function try: output_ixds = xr.map_blocks( obj=_ixds, func=_change_frame, args=[], # no positional args kwargs={ "observer": observer_location, "target": target_location, "frequency": reference_frequency, "frame": outframe, }, template=_ixds, ) except: print("Failed to change frame. Returning original dataset.") output_ixds = _ixds # ouptut_ixds.compute() return output_ixds