#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 22 16:05:52 2024
@author: ixakalabadie
"""
from astropy.io import fits
from astropy.coordinates import SkyCoord
import anywidget
import traitlets
import os
from . import misc, writers
from . import np
from . import u
[docs]
class Cube:
"""
Class to store relevant data to create the visualisation.
Parameters
----------
l_cubes : list
List of 3D arrays with the data. Even if it is only one cube it must be a list.
For overlay the arrays must have the same shape.
coords : array
Array with the coordinates of the cube. It must be a sorted 3x2 array with the minimum and maximum
values of the spatial and spectral axes.
l_isolevels : list
List of arrays with the isosurface levels for each cube.
cmaps : list
Matplotlib colormap to use for each cube.
Must have the same length as l_isolevels.
units : len 4 array
Array of strings with the units of the data cube. The first element is the unit of the data.
mags : len 4 array
Array of strings with the magnitudes of the data cube. E.g. ['I', 'RA', 'DEC', 'V'].
rms : float, optional
RMS of the data cube.
name : str, optional
Name of the object. Default is ''.
delta : len 3 array, optional
Array with the pixel size in each axis. Default is None.
resol : float, optional
Step size for the marching cubes algorithm. Higher value means lower resolution. Default is 1.
galaxies : dict, optional
Dictionary with galaxies to include in the model. Keys are names of galaxies and it must
have two more dictionaries inside with the keys 'coords' and 'col'. 'coords' must have the
coordinates of the galaxy (transformed to units of the cube between -1000 and 1000) and 'col' the color of each galaxy as a string. E.g. {'NGC 1234': {'coords': [-560, 790, 134], 'col': '255 0 0'}}.
'coord' is calculated with '[RA - mean(cubeRA)]/abs(deltaRA)*2000/nPixRA'. The whole dictionary
can be obtained with misc.get_galaxies().
image2d : tuple, optional
Tuple with the name of the survey and the data to plot a 2D image (str, (hex, shape)).
The data is the image in hexadecimal format and the shape of the image.
It can be calculated with the function misc.get_imcol.
If ('blank', (None,None)) a blank plane will be created.
If ('', None) no image will be created.
lines : dict, optional
Dictionary with the names of spectral lines as keys and a tuple with the centre and
the full width of the line as value. The centre must be an astropy quantity and the width
can be a quantity or an integer representing the number of pixels.
E.g. {'NII': (6583*u.Angstrom, 50 [pixels])), 'Halpha': (6562*u.Angstrom, 10*u.Angstrom)}.
interface : str, optional
Format of the web interface. 'full' gives a fully interactive web page and 'minimal' just
the 3D model. Default is 'full'.
"""
def __init__(self, l_cubes, coords, l_isolevels, cmaps, units, mags, delta, rms=None, name='',
resol=1, galaxies=None, image2d=(None, None), lines=None, interface='full'):
self.l_cubes = l_cubes
self.name = name
self.coords = coords
self.units = units
self.mags = mags
self.cmaps = cmaps
self.rms = rms
self.l_isolevels = l_isolevels
self.resol = resol
self.iso_split = None
self.galaxies = galaxies
self.image2d = image2d
self.lines = lines
self.interface = interface
self.delta = delta
def __str__(self):
cen = np.mean(self.coords, axis=1)
de = np.array([self.delta[0], self.delta[1], self.delta[2]])
if self.rms is None:
rms = ''
else:
rms = f'{self.rms:.5g}'
s = f'Name: {self.name},\nCenter: [{cen[0]:.5f}{cen[1]:.5f}, {cen[2]:.9g}],\nLimits:\n{self.coords},\n' + \
f'RMS: {rms},\nMagnitudes: {self.mags},\nUnits: {self.units},\n' + \
f'Delta: [{de[0]:.3e}, {de[1]:.3e}, {de[2]:.3g}],\nResolution: {self.resol},\n2D image: {self.image2d[0]}'
return s
[docs]
def prep_one(cube, header=None, lims=None, unit=None, isolevels=None, colormap='CMRmap_r',
image2d=None, im2dcolor='Greys', galaxies=None):
"""
Prepare the Cube class for a cube with a single spectral line.
Parameters
----------
cube : str or np.ndarray
Path to the FITS file or to the data cube.
header : astropy.io.fits.header.Header, optional
Header of the FITS file.
lims : array-like, optional
3x2 array with the minimum and maximum limits of the cube. If None the whole cube is used.
It can contain limits in astropy quantities or in pixels.
E.g. [[ramin, ramax], [decmin, decmax], [vmin, vmax]]. Default is None.
unit : str, optional
Unit to convert the data cube. If None the original unit is used.
If 'rms' data is divided by the RMS of the cube.
If 'percent' data is divided by the maximum value.
isolevels : array-like, optional
Array with the isosurface levels in the given units. If None it is calculated automatically.
colormap : str, optional
Name of the colormap to use. Default is 'CMRmap_r'.
image2d : str or tuple or 2D or 3D array, optional
- If None, no image is created.
- If 'blank', a blank plane is used.
- Name of the survey (str) to query a 2D image. See survey options in https://astroquery.readthedocs.io/en/latest/skyview/skyview.html.
- Tuple with the name of the survey (str) and the number of pixels (int) to set the resolution of the image (default is 1000).
- Tuple can be given with a name for the image (str) and the image data (2D or 3D array).
If 3D array, the image data in RGB format between 0 and 1 (3D). The RGB column must be last.
If 2D, the image will be converted automatically.
The image will cover the full FoV of the created cube model (after applying limits).
im2dcolor : str, optional
Name of the colormap to use for the 2D image. Default is 'Greys'.
galaxies : list or str, optional
List with the names of galaxies to include in the model. If 'query' a NED query is made within the limits of the cube. If None no galaxies are included.
Returns
-------
Cube
An object of the Cube class for a single spectral line model.
"""
if isinstance(cube, str):
with fits.open(cube) as hdul:
for l in range(len(hdul)):
cube = hdul[l].data # this is pol, v, dec, ra
header = hdul[l].header
if cube is not None and len(cube.shape) >= 3:
break
elif header is None:
raise AttributeError('No header provided')
if len(cube.shape) < 3:
raise ValueError('Not enough axes')
if len(cube.shape) >= 4:
print('Warning: Cube with polarization axis. Using first element.')
cube = cube[0]
cube = np.squeeze(cube)
try:
delta = np.array([header['CDELT1'], header['CDELT2'], header['CDELT3']])
except KeyError:
delta = np.array([header['CD1_1'], header['CD2_2'], header['CD3_3']])
cubeunits = np.array(['','','',''], dtype='<U40')
cubemags = np.array(['unknown','unknown','unknown','unknown'], dtype='<U20')
for i in range(4):
if i == 0:
try:
cubeunits[i] = header['BUNIT']
cubemags[i] = header['BTYPE']
except KeyError:
print('Warning: BUNIT or BTYPE not in header')
continue
else:
try:
cubemags[i] = header[f'CTYPE{i}']
cubeunits[i] = header[f'CUNIT{i}']
except KeyError:
print(f'Warning: CUNIT{i} or CTYPE{i} not in header')
if i < 3:
cubeunits[i] = 'deg'
print(f'Warning: Using "deg" as unit for axis {i}')
elif i == 3:
if header[f'CTYPE{i}'] == 'VOPT-F2W':
cubeunits[i] = 'm/s'
print(f'Warning: Using "m/s" as unit for axis {i}')
continue
unitfactor = None
if cubeunits[0] == 'JY/BEAM':
cubeunits[0] = 'Jy/beam'
elif "10**" in cubeunits[0]:
i = cubeunits[0][4:].find('*')
unitfactor = cubeunits[0][:4+i]
cubeunits[0] = cubeunits[0][4+i+1:]
if unitfactor is not None:
cube = cube * eval(unitfactor)
origunit = cubeunits[0]
nz, ny, nx = cube.shape
if '' in cubeunits and (isinstance(lims, list) and isinstance(lims[0][0], u.quantity.Quantity)):
raise KeyError('Limits given with units but units not in file header. Give in pixels instead or update header.')
if lims is None:
lims = np.array([[0,nx], [0,ny], [0,nz]], dtype=int)
if isinstance(lims, list) and isinstance(lims[0][0], u.quantity.Quantity):
for (i,l) in enumerate(lims):
for (j,_) in enumerate(l):
lims[i][j] = lims[i][j].to(u.Unit(cubeunits[i+1]))
lims[i][j] = lims[i][j].to_value()
cubecoords = np.array(lims)
lims = np.array([
[np.round((cubecoords[0][0]-header['CRVAL1'])/delta[0] + header['CRPIX1']),
np.round((cubecoords[0][1]-header['CRVAL1'])/delta[0] + header['CRPIX1'])],
[np.round((cubecoords[1][0]-header['CRVAL2'])/delta[1] + header['CRPIX2']),
np.round((cubecoords[1][1]-header['CRVAL2'])/delta[1] + header['CRPIX2'])],
[np.round((cubecoords[2][0]-header['CRVAL3'])/delta[2] + header['CRPIX3']),
np.round((cubecoords[2][1]-header['CRVAL3'])/delta[2] + header['CRPIX3'])]
], dtype=int)
else:
cubecoords = np.array([
[header['CRVAL1']+delta[0]*(lims[0][0]-header['CRPIX1']),
header['CRVAL1']+delta[0]*(lims[0][1]-header['CRPIX1'])],
[header['CRVAL2']+delta[1]*(lims[1][0]-header['CRPIX2']),
header['CRVAL2']+delta[1]*(lims[1][1]-header['CRPIX2'])],
[header['CRVAL3']+delta[2]*(lims[2][0]-header['CRPIX3']),
header['CRVAL3']+delta[2]*(lims[2][1]-header['CRPIX3'])],
])
for i in range(3):
if lims[i][0] < 0:
raise ValueError('lims out of range')
if lims[i][1] > cube.shape[2-i]:
raise ValueError('lims out of range')
cubecoords = np.sort(cubecoords)
lims = np.sort(lims)
cube = cube[lims[2][0]:lims[2][1],lims[1][0]:lims[1][1],lims[0][0]:lims[0][1]]
cube = misc.transpose(cube, delta)
cube[np.isnan(cube)] = 0
rms = misc.get_rms(cube)
if unit == 'rms':
cube = cube/rms #rms is in units of cubeunits[0]
cubeunits[0] = 'RMS'
elif unit == 'percent':
cube = cube/np.nanmax(cube)*100
cubeunits[0] = '%'
elif unit is None:
pass
elif unit is not cubeunits[0]:
cube = cube*u.Unit(cubeunits[0]).to(unit)
cubeunits[0] = unit
if isolevels is None:
isolevels = misc.calc_isolevels(cube, unit=unit)
else:
if np.min(isolevels) < np.min(cube):
raise ValueError(f'Isolevels out of range. Min is {np.min(cube)}')
if np.max(isolevels) > np.max(cube):
raise ValueError(f'Isolevels out of range. Max is {np.max(cube)}')
imname = ''
if image2d is None:
pass
elif isinstance(image2d, str) and image2d == 'blank':
image2d = None, None
imname = 'blank'
elif isinstance(image2d, str):
imname = image2d
pixels = 1000
co = SkyCoord(ra=np.mean(cubecoords[0])*u.Unit(cubeunits[1]), dec=np.mean(cubecoords[1])*u.Unit(cubeunits[2]))
co = co.to_string('hmsdms')
imcol, img_shape, _ = misc.get_imcol(position=co, survey=image2d, pixels=pixels,
coordinates='J2000', width=np.diff(cubecoords[0])[0]*u.Unit(cubeunits[1]),
height=np.diff(cubecoords[1])[0]*u.Unit(cubeunits[2]), cmap=im2dcolor)
image2d = imcol, img_shape
else:
if isinstance(image2d[1], int):
imname = image2d[0]
pixels = image2d[1]
co = SkyCoord(ra=np.mean(cubecoords[0])*u.Unit(cubeunits[1]), dec=np.mean(cubecoords[1])*u.Unit(cubeunits[2]))
co = co.to_string('hmsdms')
imcol, img_shape, _ = misc.get_imcol(position=co, survey=imname, pixels=pixels,
coordinates='J2000', width=np.diff(cubecoords[0])[0]*u.Unit(cubeunits[1]),
height=np.diff(cubecoords[1])[0]*u.Unit(cubeunits[2]), cmap=im2dcolor)
image2d = imcol, img_shape
else:
imname = image2d[0]
imcol, img_shape, _ = misc.get_imcol(image = image2d[1], cmap=im2dcolor)
image2d = imcol, img_shape
if galaxies is not None:
nx, ny, nz = cube.shape
trans = (2000/nx, 2000/ny, 2000/nz)
galdict = misc.get_galaxies(galaxies, cubecoords, cubeunits, delta, trans)
else:
galdict = None
if 'OBJECT' in header:
obj = header['OBJECT']
else:
obj = ''
return Cube(l_cubes=[cube], name=obj, coords=cubecoords, units=cubeunits,
mags=cubemags, delta=delta, cmaps=[colormap], rms=rms*u.Unit(origunit), image2d=(imname,image2d),
galaxies=galdict, l_isolevels=[isolevels])
[docs]
def prep_mult(cube, spectral_lims, header=None, spatial_lims=None, l_isolevels=None, unit=None,
colormap=None, image2d=None, im2dcolor='Greys', lines=None):
"""
Prepare the Cube class for a cube with multiple spectral lines.
Makes a subcube for each spectral line.
Parameters
----------
cube : str or np.ndarray
Path to the FITS file or the data cube.
If data it must be in the form [spectral axis, DEC, RA].
spectral_lims : list
List of lists with the limits (minimum and maximum) of the spectral axis.
Can be quatities or pixels.
E.g. [[vmin1, vmax1], [vmin2, vmax2]].
header : astropy.io.fits.header.Header, optional
Header of the FITS file.
spatial_lims : list, optional
List of lists with the limits (minimum and maximum) of the spatial axes.
E.g. [[[ramin1, ramax1], [decmin1, decmax1]], [[ramin2, ramax2], [decmin2, decmax2]]].
Each list will correspond to the subcubes defined in spectral_lims in the same order.
If only one set of limits is provided it will be used for all the subcubes.
If None the whole cube is used. Default is None.
l_isolevels : list, optional
List of arrays with the isosurface levels for each cube. If None it is calculated automatically.
unit : str, optional
Unit to convert the data cube. If None the original unit is used.
If 'rms' data is divided by the RMS of the cube.
If 'percent' data is divided by the maximum value.
colormap : str, optional
Name of the colormaps to use. Must be the same length as the number of spectral lines to model.
If None and the number of lines is <7 they are be chosen automatically. Default is None.
image2d : str or 2D or 3D array, optional
Name of the survey to query a 2D image. See survey options in https://astroquery.readthedocs.io/en/latest/skyview/skyview.html.
If 'blank' a blank plane is used. If None no image is created.
If 2D or 3D array, the mage data in RGB format between 0 and 1 (3D). The RGB column must be last.
If 2D, the image will be converted automatically.
The image will cover the full FoV of the created cube model (after applying limits).
im2dcolor : str, optional
Name of the colormap to use for the 2D image. Default is 'Greys'.
lines : array-like, optional
List with the names of each subcube to use as labels in the web page. Must have same length
as spectral_lims. Default is None.
Returns
-------
Cube
An object of the Cube class for a model with multiple spectral lines.
"""
if isinstance(cube, str):
with fits.open(cube) as hdul:
for l in range(len(hdul)):
cube = hdul[l].data # this is pol, v, dec, ra
header = hdul[l].header
if cube is not None and len(cube.shape) >= 3:
break
elif header is None:
raise AttributeError('No header provided')
if len(cube.shape) < 3:
raise ValueError('Not enough axes')
if len(cube.shape) >= 4:
print('Warning: Cube with polarization axis. Using first element.')
cube = cube[0]
cube = np.squeeze(cube)
try:
delta = np.array([header['CDELT1'], header['CDELT2'], header['CDELT3']])
except KeyError:
delta = np.array([header['CD1_1'], header['CD2_2'], header['CD3_3']])
cubeunits = np.array(['','','',''], dtype='<U40')
cubemags = np.array(['unknown','unknown','unknown','unknown'], dtype='<U20')
for i in range(4):
if i == 0:
try:
cubeunits[i] = header['BUNIT']
cubemags[i] = header['BTYPE']
except KeyError:
print('Warning: BUNIT or BTYPE not in header')
continue
else:
try:
cubemags[i] = header[f'CTYPE{i}']
cubeunits[i] = header[f'CUNIT{i}']
except KeyError:
print(f'Warning: CUNIT{i} or CTYPE{i} not in header')
if i < 3:
cubeunits[i] = 'deg'
print(f'Warning: Using "deg" as unit for axis {i}')
elif i == 3:
if header[f'CTYPE{i}'] == 'VOPT-F2W':
cubeunits[i] = 'm/s'
print(f'Warning: Using "m/s" as unit for axis {i}')
continue
unitfactor = None
if cubeunits[0] == 'JY/BEAM':
cubeunits[0] = 'Jy/beam'
elif "10**" in cubeunits[0]:
i = cubeunits[0][4:].find('*')
unitfactor = cubeunits[0][:4+i]
cubeunits[0] = cubeunits[0][4+i+1:]
if unitfactor is not None:
cube = cube * eval(unitfactor)
origunit = cubeunits[0]
if '' in cubeunits and (spatial_lims is not None and isinstance(spatial_lims[0][0][0], u.quantity.Quantity)):
raise KeyError('Spatial limits given with units but units not in file header. Give in pixels instead or update header.')
if '' in cubeunits and isinstance(spectral_lims[0][0], u.quantity.Quantity):
raise KeyError('Spectral limits given with units but units not in file header. Give in pixels instead or update header.')
nz, ny, nx = cube.shape
lims = np.array([[0,nx], [0,ny], [0,nz]], dtype=int)
cubecoords = np.array([
[header['CRVAL1']+delta[0]*(lims[0][0]-header['CRPIX1']),
header['CRVAL1']+delta[0]*(lims[0][1]-header['CRPIX1'])],
[header['CRVAL2']+delta[1]*(lims[1][0]-header['CRPIX2']),
header['CRVAL2']+delta[1]*(lims[1][1]-header['CRPIX2'])],
[header['CRVAL3']+delta[2]*(lims[2][0]-header['CRPIX3']),
header['CRVAL3']+delta[2]*(lims[2][1]-header['CRPIX3'])]
])
cubecoords = np.sort(cubecoords)
lims = [[],[]]
coords = [[],[]]
if spatial_lims is None:
lims[0] = [np.array([[0,nx], [0,ny]], dtype=int)] # lims[0] are spatial axes
coords[0] = [cubecoords[:2]]
if spatial_lims is not None and isinstance(spatial_lims[0][0][0], u.quantity.Quantity):
for i in range(len(spatial_lims)):
for j in range(2):
spatial_lims[i][j][0] = spatial_lims[i][j][0].to(u.Unit(cubeunits[1])).to_value()
spatial_lims[i][j][1] = spatial_lims[i][j][1].to(u.Unit(cubeunits[2])).to_value()
coords[0].append(np.array(spatial_lims[i]))
lims[0].append(np.array([
[np.round((coords[0][i][0][0]-header['CRVAL1'])/delta[0] + header['CRPIX1']),
np.round((coords[0][i][0][1]-header['CRVAL1'])/delta[0] + header['CRPIX1'])],
[np.round((coords[0][i][1][0]-header['CRVAL2'])/delta[1] + header['CRPIX2']),
np.round((coords[0][i][1][1]-header['CRVAL2'])/delta[1] + header['CRPIX2'])]
], dtype=int))
elif spatial_lims is not None:
lims[0] = np.array(spatial_lims)
for i in range(len(spatial_lims)):
coords[0].append(np.array([
[header['CRVAL1']+delta[0]*(spatial_lims[i][0][0]-header['CRPIX1']),
header['CRVAL1']+delta[0]*(spatial_lims[i][0][1]-header['CRPIX1'])],
[header['CRVAL2']+delta[1]*(spatial_lims[i][1][0]-header['CRPIX2']),
header['CRVAL2']+delta[1]*(spatial_lims[i][1][1]-header['CRPIX2'])]
]))
if len(spectral_lims) == 1:
raise AttributeError('No or single limits for spectral axis. Use prep_one instead.')
elif len(spectral_lims) < len(lims[0]) :
raise AttributeError('Not enough spectral limits for the number of spatial limits.')
elif len(lims[0]) > 1 and len(spectral_lims) != len(lims[0]):
raise AttributeError('Different number of spectral and spatial limits.')
if isinstance(spectral_lims[0][0], u.quantity.Quantity):
for i in range(len(spectral_lims)):
spectral_lims[i][0] = spectral_lims[i][0].to(u.Unit(cubeunits[3]))
spectral_lims[i][1] = spectral_lims[i][1].to(u.Unit(cubeunits[3]))
coords[1].append([spectral_lims[i][0].value, spectral_lims[i][1].value])
lims[1].append(np.array(
[np.round((coords[1][i][0]-header['CRVAL3'])/delta[2] + header['CRPIX3']),
np.round((coords[1][i][1]-header['CRVAL3'])/delta[2] + header['CRPIX3'])],
dtype=int
))
else:
for i in range(len(spectral_lims)):
lims[1].append(np.array(spectral_lims[i]))
coords[1].append(np.array([
[header['CRVAL3']+delta[2]*(spectral_lims[i][0]-header['CRPIX3']),
header['CRVAL3']+delta[2]*(spectral_lims[i][1]-header['CRPIX3'])]
]))
coords[0] = np.sort(coords[0])
coords[1] = np.sort(coords[1])
lims[0] = np.sort(lims[0])
lims[1] = np.sort(lims[1])
# take min and max of subcube limits for the whole cube
cubecoords[0] = [np.min(np.array(coords[0])[:,0]), np.max(np.array(coords[0])[:,0])]
cubecoords[1] = [np.min(np.array(coords[0])[:,1]), np.max(np.array(coords[0])[:,1])]
cubecoords[2] = [np.min(np.array(coords[1])), np.max(np.array(coords[1]))]
if np.min(lims[0]) < 0:
raise ValueError('Spatial lims out of range')
for i in range(len(lims[0])):
if lims[0][i][0][1] > cube.shape[2] or lims[0][i][1][1] > cube.shape[1]:
raise ValueError('Spatial lims out of range')
if np.min(lims[1]) < 0:
raise ValueError('Spectral lims out of range')
if np.max(lims[1]) > cube.shape[0]:
raise ValueError('Spectral lims out of range')
shape = np.array([
[np.min(np.array(lims[1])[:,0]), np.max(np.array(lims[1])[:,1])],
[np.min(np.array(lims[0])[:,0]), np.max(np.array(lims[0])[:,1])],
[np.min(np.array(lims[0])[:,1]), np.max(np.array(lims[0])[:,1])]
], dtype=int)
l_cubes = []
for i in range(len(coords[1])):
l_cubes.append(np.zeros(cube.shape))
if len(coords[0]) == 1:
j = 0
else:
j = i
l_cubes[i][lims[1][i][0]:lims[1][i][1], lims[0][j][1][0]:lims[0][j][1][1],
lims[0][j][0][0]:lims[0][j][0][1]] = \
cube[lims[1][i][0]:lims[1][i][1], lims[0][j][1][0]:lims[0][j][1][1],
lims[0][j][0][0]:lims[0][j][0][1]]
l_cubes[i] = l_cubes[i][shape[0][0]:shape[0][1], shape[1][0]:shape[1][1], shape[2][0]:shape[2][1]]
l_cubes[i][np.isnan(l_cubes[i])] = 0
l_cubes[i] = misc.transpose(l_cubes[i], delta)
# calculate rms with negative pixels
rms = misc.get_rms(cube)
del cube
for i in range(len(l_cubes)):
if unit == 'rms':
l_cubes[i] = l_cubes[i] / rms #rms is in units of cubeunits[0]
cubeunits[0] = 'RMS'
elif unit == 'percent':
l_cubes[i] = l_cubes[i] / np.nanmax(l_cubes[i])*100
cubeunits[0] = '%'
elif unit is None:
pass
elif unit is not cubeunits[0]:
l_cubes[i] = l_cubes[i] * u.Unit(cubeunits[0]).to(unit)
cubeunits[0] = unit
if l_isolevels is None:
l_isolevels = []
for c in l_cubes:
l_isolevels.append(misc.calc_isolevels(c))
else:
if len(l_isolevels) != len(l_cubes):
raise AttributeError('Different number of isolevels and cubes')
for i in range(len(l_cubes)):
if np.min(l_isolevels[i]) < np.min(l_cubes[i]):
raise ValueError('isolevels out of range')
if np.max(l_isolevels[i]) > np.max(l_cubes[i]):
raise ValueError('isolevels out of range')
if colormap is None and len(l_cubes) < 7:
colormap = ['Blues', 'Reds', 'Greens', 'Purples', 'Oranges', 'Greys']
elif colormap is None and len(l_cubes) >= 7:
raise AttributeError('Too many l_cubes for default colormap. Must set colormaps manually.')
elif len(colormap) != len(l_cubes):
raise AttributeError('Different number of colormaps and l_cubes')
imname = ''
if image2d is None:
pass
elif isinstance(image2d, str) and image2d == 'blank':
image2d = None, None
imname = 'blank'
elif isinstance(image2d, str):
imname = image2d
pixels = 1000
co = SkyCoord(ra=np.mean(cubecoords[0])*u.Unit(cubeunits[1]), dec=np.mean(cubecoords[1])*u.Unit(cubeunits[2]))
co = co.to_string('hmsdms')
imcol, img_shape, _ = misc.get_imcol(position=co, survey=image2d, pixels=pixels,
coordinates='J2000', width=np.diff(cubecoords[0])[0]*u.Unit(cubeunits[1]),
height=np.diff(cubecoords[1])[0]*u.Unit(cubeunits[2]), cmap=im2dcolor)
image2d = imcol, img_shape
else:
if isinstance(image2d[1], int):
imname = image2d[0]
pixels = image2d[1]
co = SkyCoord(ra=np.mean(cubecoords[0])*u.Unit(cubeunits[1]), dec=np.mean(cubecoords[1])*u.Unit(cubeunits[2]))
co = co.to_string('hmsdms')
imcol, img_shape, _ = misc.get_imcol(position=co, survey=imname, pixels=pixels,
coordinates='J2000', width=np.diff(cubecoords[0])[0]*u.Unit(cubeunits[1]),
height=np.diff(cubecoords[1])[0]*u.Unit(cubeunits[2]), cmap=im2dcolor)
image2d = imcol, img_shape
else:
imname = image2d[0]
imcol, img_shape, _ = misc.get_imcol(image = image2d[1], cmap=im2dcolor)
image2d = imcol, img_shape
if 'OBJECT' in header:
obj = header['OBJECT']
else:
obj = ''
return Cube(l_cubes=l_cubes, name=obj, coords=cubecoords, units=cubeunits,
mags=cubemags, cmaps=colormap, rms=rms*u.Unit(origunit), image2d=(imname,image2d), galaxies=None,
l_isolevels=l_isolevels, lines=lines, delta=delta)
[docs]
def prep_overlay(cube, header=None, spectral_lims=None, lines=None, spatial_lims=None,
l_isolevels=None, unit=None, colormap=None, image2d=None, im2dcolor='Greys'):
"""
Prepare the Cube class for an overlay of spectral lines. Makes a subcube for each spectral line
and overlays the centre of each line in the same point.
Parameters
----------
cube : str or np.ndarray
Path to the FITS file or the data cube.
header : astropy.io.fits.header.Header, optional
Header of the FITS file.
spectral_lims : list, optional
List of lists with the limits (minimum and maximum) of the spectral axis.
E.g. [[vmin1, vmax1], [vmin2, vmax2]].
lines : dict, optional
Dictionary with the names of spectral lines as keys and a tuple with the centre and
the full width of the line as value. The centre must be an astropy quantity and the width
can be a quantity or an integer representing the number of the pixel.
E.g. {'NII': (6583*u.Angstrom, 50 [pixels])), 'Halpha': (6562*u.Angstrom, 10*u.Angstrom)}.
If None the centres will be the centres of the subcubes.
spatial_lims : list, optional
List of lists with the limits (minimum and maximum) of the spatial axes.
E.g. [[[ramin1, ramax1], [decmin1, decmax1]], [[ramin2, ramax2], [decmin2, decmax2]]].
Each list will correspond to the subcubes defined in spectral_lims in the same order.
If only one set of limits is provided it will be used for all the subcubes.
If None the whole cube is used. Default is None.
l_isolevels : list, optional
List of arrays with the isosurface levels for each cube. If None it is calculated automatically.
unit : str, optional
Unit to convert the data cube. If None the original unit is used.
If 'rms' data is divided by the RMS of the cube.
If 'percent' data is divided by the maximum value.
colormap : str, optional
Name of the colormaps to use. Must be the same length as the number of spectral lines to model.
If None and the number of lines is <7 they are be chosen automatically. Default is None.
image2d : str or 2D or 3D array, optional
Name of the survey to query a 2D image. See survey options in https://astroquery.readthedocs.io/en/latest/skyview/skyview.html.
If 'blank' a blank plane is used. If None no image is created.
If 2D or 3D array, the mage data in RGB format between 0 and 1 (3D). The RGB column must be last.
If 2D, the image will be converted automatically.
The image will cover the full FoV of the created cube model (after applying limits).
im2dcolor : str, optional
Name of the colormap to use for the 2D image. Default is 'Greys'.
Returns
-------
Cube
An object of the Cube class for an overlay of spectral lines.
"""
if isinstance(cube, str):
with fits.open(cube) as hdul:
for l in range(len(hdul)):
cube = hdul[l].data # this is pol, v, dec, ra
header = hdul[l].header
if cube is not None and len(cube.shape) >= 3:
break
elif header is None:
raise AttributeError('No header provided')
if len(cube.shape) < 3:
raise ValueError('Not enough axes')
if len(cube.shape) >= 4:
print('Warning: Cube with polarization axis. Using first element.')
cube = cube[0]
cube = np.squeeze(cube)
try:
delta = np.array([header['CDELT1'], header['CDELT2'], header['CDELT3']])
except KeyError:
delta = np.array([header['CD1_1'], header['CD2_2'], header['CD3_3']])
cubeunits = np.array(['','','',''], dtype='<U40')
cubemags = np.array(['unknown','unknown','unknown','unknown'], dtype='<U20')
for i in range(4):
if i == 0:
try:
cubeunits[i] = header['BUNIT']
cubemags[i] = header['BTYPE']
except KeyError:
print('Warning: BUNIT or BTYPE not in header')
continue
else:
try:
cubemags[i] = header[f'CTYPE{i}']
cubeunits[i] = header[f'CUNIT{i}']
except KeyError:
print(f'Warning: CUNIT{i} or CTYPE{i} not in header')
if i < 3:
cubeunits[i] = 'deg'
print(f'Warning: Using "deg" as unit for axis {i}')
elif i == 3:
if header[f'CTYPE{i}'] == 'VOPT-F2W':
cubeunits[i] = 'm/s'
print(f'Warning: Using "m/s" as unit for axis {i}')
continue
unitfactor = None
if cubeunits[0] == 'JY/BEAM':
cubeunits[0] = 'Jy/beam'
elif "10**" in cubeunits[0]:
i = cubeunits[0][4:].find('*')
unitfactor = cubeunits[0][:4+i]
cubeunits[0] = cubeunits[0][4+i+1:]
if unitfactor is not None:
cube = cube * eval(unitfactor)
origunit = cubeunits[0]
if '' in cubeunits[1:2] and (spatial_lims is not None and isinstance(spatial_lims[0][0][0], u.quantity.Quantity)):
raise KeyError('Spatial limits given with units but units not in file header. Give in pixels instead or update header.')
if cubeunits[3] == '' and (spectral_lims is not None and isinstance(spectral_lims[0][0], u.quantity.Quantity)):
raise KeyError('Spectral limits given with units but units not in file header. Give in pixels instead or update header.')
nz, ny, nx = cube.shape
lims = np.array([[0,nx], [0,ny], [0,nz]], dtype=int)
cubecoords = np.array([
[header['CRVAL1']+delta[0]*(lims[0][0]-header['CRPIX1']),
header['CRVAL1']+delta[0]*(lims[0][1]-header['CRPIX1'])],
[header['CRVAL2']+delta[1]*(lims[1][0]-header['CRPIX2']),
header['CRVAL2']+delta[1]*(lims[1][1]-header['CRPIX2'])],
[header['CRVAL3']+delta[2]*(lims[2][0]-header['CRPIX3']),
header['CRVAL3']+delta[2]*(lims[2][1]-header['CRPIX3'])]
])
cubecoords = np.sort(cubecoords)
lims = [[],[]]
coords = [[],[]]
if spatial_lims is None:
lims[0] = [np.array([[0,nx], [0,ny]], dtype=int)] # lims[0] are spatial axes
coords[0] = [cubecoords[:2]]
if spatial_lims is not None and isinstance(spatial_lims[0][0][0], u.quantity.Quantity):
for i in range(len(spatial_lims)):
for j in range(2):
spatial_lims[i][j][0] = spatial_lims[i][j][0].to(u.Unit(cubeunits[1])).to_value()
spatial_lims[i][j][1] = spatial_lims[i][j][1].to(u.Unit(cubeunits[2])).to_value()
coords[0].append(np.array(spatial_lims[i]))
lims[0].append(np.array([
[np.round((coords[0][i][0][0]-header['CRVAL1'])/delta[0] + header['CRPIX1']),
np.round((coords[0][i][0][1]-header['CRVAL1'])/delta[0] + header['CRPIX1'])],
[np.round((coords[0][i][1][0]-header['CRVAL2'])/delta[1] + header['CRPIX2']),
np.round((coords[0][i][1][1]-header['CRVAL2'])/delta[1] + header['CRPIX2'])]
], dtype=int))
elif spatial_lims is not None:
lims[0] = np.array(spatial_lims)
for i in range(len(spatial_lims)):
coords[0].append(np.array([
[header['CRVAL1']+delta[0]*(spatial_lims[i][0][0]-header['CRPIX1']),
header['CRVAL1']+delta[0]*(spatial_lims[i][0][1]-header['CRPIX1'])],
[header['CRVAL2']+delta[1]*(spatial_lims[i][1][0]-header['CRPIX2']),
header['CRVAL2']+delta[1]*(spatial_lims[i][1][1]-header['CRPIX2'])]
]))
if lines is not None:
if len(lines) < 2:
raise AttributeError('Not enough lines for overlay. Use prep_one instead.')
elif spatial_lims is not None:
if len(spatial_lims) != 1 and len(lines) != len(spatial_lims):
raise AttributeError('Different number of lines and spatial limits.')
for i, (center, width) in enumerate(lines.values()):
if isinstance(center, u.quantity.Quantity):
center = center.to(u.Unit(cubeunits[3]))
else:
center = (center - header["CRPIX3"]) * delta[2] + header["CRVAL3"]
center = center * u.Unit(cubeunits[3])
if isinstance(width, u.quantity.Quantity):
width = width.to(u.Unit(cubeunits[3]))
coords[1].append(np.array([center.value-width.value/2, center.value+width.value/2]))
lims[1].append(np.array(
[np.round((coords[1][i][0]-header['CRVAL3'])/delta[2] + header['CRPIX3']),
np.round((coords[1][i][1]-header['CRVAL3'])/delta[2] + header['CRPIX3'])],
dtype=int
))
else:
cent_pix = np.round((center.value-header['CRVAL3'])/delta[2] + header['CRPIX3'])
lims[1].append(np.array([cent_pix-width/2, cent_pix+width/2]))
coords[1].append(np.array(
[header['CRVAL3']+delta[2]*(lims[1][i][0]-header['CRPIX3']),
header['CRVAL3']+delta[2]*(lims[1][i][1]-header['CRPIX3'])]
))
else:
lines=True
if spectral_lims is not None:
if len(spectral_lims) < 2:
raise AttributeError('Not enough spectral limits for overlay. Use prep_one instead.')
elif len(spatial_lims) != 1 and len(spectral_lims) != len(spatial_lims):
raise AttributeError('Different number of spectral and spatial limits.')
if isinstance(spectral_lims[0][0], u.quantity.Quantity):
for i in range(len(spectral_lims)):
spectral_lims[i][0] = spectral_lims[i][0].to(u.Unit(cubeunits[3]))
spectral_lims[i][1] = spectral_lims[i][1].to(u.Unit(cubeunits[3]))
coords[1].append([spectral_lims[i][0].value, spectral_lims[i][1].value])
lims[1].append(np.array(
[np.round((coords[1][i][0]-header['CRVAL3'])/delta[2] + header['CRPIX3']),
np.round((coords[1][i][1]-header['CRVAL3'])/delta[2] + header['CRPIX3'])],
dtype=int
))
else:
for i in range(len(spectral_lims)):
lims[1].append(np.array(spectral_lims[i], dtype=int))
coords[1].append(np.array([
[header['CRVAL3']+delta[2]*(spectral_lims[i][0]-header['CRPIX3']),
header['CRVAL3']+delta[2]*(spectral_lims[i][1]-header['CRPIX3'])]
]))
coords[0] = np.sort(coords[0])
coords[1] = np.sort(coords[1])
lims[0] = np.array(np.sort(lims[0]), dtype=int)
lims[1] = np.array(np.sort(lims[1]), dtype=int)
if np.min(lims[0]) < 0:
raise ValueError('Spatial lims out of range')
for i in range(len(lims[0])):
if lims[0][i][0][1] > cube.shape[2] or lims[0][i][1][1] > cube.shape[1]:
raise ValueError('Spatial lims out of range')
if np.min(lims[1]) < 0:
raise ValueError('Spectral lims out of range')
if np.max(lims[1]) > cube.shape[0]:
raise ValueError('Spectral lims out of range')
shape = np.array([
np.max(np.diff(lims[1]),axis=None),
np.squeeze(np.max(np.diff(lims[0]),axis=0)[1]),
np.squeeze(np.max(np.diff(lims[0]),axis=0)[0])
], dtype=int)
l_cubes = []
for i in range(len(lims[1])):
l_cubes.append(np.zeros(shape))
if len(coords[0]) == 1:
j = 0
else:
j = i
l_cubes[i] = misc.insert_3darray(l_cubes[i],
cube[lims[1][i][0]:lims[1][i][1],lims[0][j][1][0]:lims[0][j][1][1],
lims[0][j][0][0]:lims[0][j][0][1]])
l_cubes[i][np.isnan(l_cubes[i])] = 0
l_cubes[i] = misc.transpose(l_cubes[i], delta)
# calculate rms with negative pixels
rms = misc.get_rms(cube)
del cube
for i in range(len(l_cubes)):
if unit == 'rms':
l_cubes[i] = l_cubes[i] / rms #rms is in units of cubeunits[0]
cubeunits[0] = 'RMS'
elif unit == 'percent':
l_cubes[i] = l_cubes[i] / np.nanmax(l_cubes[i])*100
cubeunits[0] = '%'
elif unit is None:
pass
elif unit is not cubeunits[0]:
l_cubes[i] = l_cubes[i] * u.Unit(cubeunits[0]).to(unit)
cubeunits[0] = unit
if l_isolevels is None:
l_isolevels = []
for c in l_cubes:
l_isolevels.append(misc.calc_isolevels(c))
else:
if len(l_isolevels) != len(l_cubes):
raise AttributeError('Different number of isolevels and cubes')
for i in range(len(l_cubes)):
if np.min(l_isolevels[i]) < np.min(l_cubes[i]):
raise ValueError('isolevels out of range')
if np.max(l_isolevels[i]) > np.max(l_cubes[i]):
raise ValueError('isolevels out of range')
if colormap is None and len(l_cubes) < 7:
colormap = ['Blues', 'Reds', 'Greens', 'Purples', 'Oranges', 'Greys']
elif colormap is None and len(l_cubes) >= 7:
raise AttributeError('Too many l_cubes for default colormap. Set colormaps manually.')
elif len(colormap) != len(l_cubes):
raise AttributeError('Different number of colormaps and l_cubes')
imname = ''
if image2d is None:
pass
elif isinstance(image2d, str) and image2d == 'blank':
image2d = None, None
imname = 'blank'
elif isinstance(image2d, str):
imname = image2d
pixels = 1000
co = SkyCoord(ra=np.mean(cubecoords[0])*u.Unit(cubeunits[1]), dec=np.mean(cubecoords[1])*u.Unit(cubeunits[2]))
co = co.to_string('hmsdms')
imcol, img_shape, _ = misc.get_imcol(position=co, survey=image2d, pixels=pixels,
coordinates='J2000', width=np.diff(cubecoords[0])[0]*u.Unit(cubeunits[1]),
height=np.diff(cubecoords[1])[0]*u.Unit(cubeunits[2]), cmap=im2dcolor)
image2d = imcol, img_shape
else:
if isinstance(image2d[1], int):
imname = image2d[0]
pixels = image2d[1]
co = SkyCoord(ra=np.mean(cubecoords[0])*u.Unit(cubeunits[1]), dec=np.mean(cubecoords[1])*u.Unit(cubeunits[2]))
co = co.to_string('hmsdms')
imcol, img_shape, _ = misc.get_imcol(position=co, survey=imname, pixels=pixels,
coordinates='J2000', width=np.diff(cubecoords[0])[0]*u.Unit(cubeunits[1]),
height=np.diff(cubecoords[1])[0]*u.Unit(cubeunits[2]), cmap=im2dcolor)
image2d = imcol, img_shape
else:
imname = image2d[0]
imcol, img_shape, _ = misc.get_imcol(image = image2d[1], cmap=im2dcolor)
image2d = imcol, img_shape
if 'OBJECT' in header:
obj = header['OBJECT']
else:
obj = ''
return Cube(l_cubes=l_cubes, name=obj, coords=cubecoords, units=cubeunits,
mags=cubemags, cmaps=colormap, rms=rms*u.Unit(origunit), image2d=(imname,image2d), galaxies=None,
l_isolevels=l_isolevels, lines=lines, delta=delta)
[docs]
def createX3D(cube, filename, shifts=None):
"""
Create X3D file from a Cube object.
Parameters
----------
cube : Cube
Object of the Cube class.
filename : str
Name of the X3D file including the extension (.x3d).
shift : list, optional
A list with a arrays of 3D vectors giving the shift in RA, DEC and spectral axis in
the same units given to the cube. Similar to l_cube or l_isolevels.
"""
file = writers.WriteX3D(filename, cube)
file.make_layers(shifts=shifts)
file.make_outline()
if cube.galaxies is not None:
file.make_galaxies()
if cube.image2d[1] is not None:
file.make_image2d()
file.make_ticklines()
file.make_animation()
file.make_labels()
file.close()
[docs]
def createHTML(cube, filename, description=None, pagetitle=None):
"""
Create HTML file from a Cube object.
Parameters
----------
cube : Cube
Object of the Cube class.
filename : str
Name of the HTML file including the extension (.html).
description : str, optional
A description for the web page.
pagetitle : str, optional
The title of the web page.
"""
file = writers.WriteHTML(filename, cube, description, pagetitle)
if cube.interface == 'minimal':
file.start_x3d()
file.viewpoints()
file.close_x3d(filename.split("/")[-1])
file.close_html()
else:
file.func_layers()
if cube.galaxies is not None:
file.func_galaxies()
file.func_grids()
file.func_axes()
# file.func_pick
file.func_animation()
file.start_x3d()
file.viewpoints()
file.close_x3d(filename.split("/")[-1])
file.buttons(centrot=False)
# mandatory after buttons
if cube.galaxies is not None:
file.func_galsize()
if cube.image2d[1] is not None:
file.func_image2d()
file.func_move2dimage()
file.func_scalev()
file.func_markers()
# html.func_setCenterOfRotation(centers)
file.func_background()
file.func_colormaps()
file.close_html()
print('If the path leads to a local server, the visualisation can be accessed in a ' +\
f'web browser with "localhost/path/from/DocumentRoot/{filename.split("/")[-1]}"')
[docs]
def createVis(cube, filename=None, description=None, pagetitle=None, shifts=None, notebook=False):
"""
Create an HTML file with a 3D visualisation of a datacube (without an X3D file).
Parameters
----------
cube : Cube
Object of the Cube class.
filename : str, optional
Name of the HTML file including the extension (.html). Optional is notebook=True.
description : str, optional
A description for the web page.
pagetitle : str, optional
The title of the web page.
notebook : bool
Display the visualisation in the jupyter notebook. If True, the file will not be saved.
Default is False.
"""
if filename == None:
filename = 'tmp'
file = writers.WriteVis(filename, cube, description, pagetitle)
file.start_x3d()
file.make_layers(shifts=shifts)
file.make_outline()
if cube.galaxies is not None:
file.make_galaxies()
if cube.image2d[1] is not None:
file.make_image2d()
file.make_ticklines()
file.make_animation()
file.make_labels()
file.close_x3d()
if cube.interface != 'minimal':
file.func_layers()
if cube.galaxies is not None:
file.func_galaxies()
file.func_grids()
file.func_axes()
# file.func_pick
file.func_animation()
file.buttons(centrot=False)
# mandatory after buttons
if cube.galaxies is not None:
file.func_galsize()
if cube.image2d[1] is not None:
file.func_image2d()
file.func_move2dimage()
file.func_scalev()
file.func_markers()
# html.func_setCenterOfRotation(centers)
file.func_background()
file.func_colormaps()
file.close_html()
if notebook:
displayVis(filename+'.html')
os.remove(filename+'.html')
try:
os.remove(filename+'.x3d')
except:
pass
[docs]
def displayVis(filename):
"""
Disply a ViSL3D visualisation in a jupyter notebook.
Parameters
----------
filename : str
Name of the file to visualise, including the '.html' extension.
"""
# print('WARNING: the first time might fail, run again if not working after a while. Take into account large visualisations will load slowly.')
with open(filename, "r") as file:
html = file.read()
# modify features for better display in notebooks
html = html.replace("width:100vw", "width:100%")
html = html.replace("height:100vh", "height:85%")
html = html.replace("width:95vw", "width:100%")
html = html.replace("height:75vh", "height:85%")
nav = html.find('<!--A table'), html.find('Left.<p>')
html = html.replace(html[nav[0]:nav[1]+len('Left.<p>')], '')
html = html.replace('<center>', '')
html = html.replace('</center>', '')
html = html.replace('if (showalertopa) {\nalert("The opacity feature does not work alongside the 2D image. Hide the 2D image to change the opacity.")\nshowalertopa = false;\n}', '')
widget = X3DomWidget(html=html)
display(widget)