Source code for leavitt.utils

#!/usr/bin/env python
#
# DLNPYUTILS.PY - Utility functions.
#

from __future__ import print_function

__authors__ = 'David Nidever <dnidever@noao.edu>'
__version__ = '20180823'  # yyyymmdd

import re
import logging
import os
import sys
import numpy as np
import warnings
from astropy.io import fits
from astropy.table import Table, Column
from astropy import modeling
from astropy.convolution import Gaussian1DKernel, convolve
from glob import glob
from scipy.signal import medfilt
from scipy.ndimage.filters import median_filter,gaussian_filter1d
from scipy.optimize import curve_fit, least_squares
from scipy.special import erf
from scipy.interpolate import interp1d
from scipy.linalg import svd
#from astropy.utils.exceptions import AstropyWarning
#import socket
#from scipy.signal import convolve2d
#from scipy.ndimage.filters import convolve
import astropy.stats
import matplotlib.pyplot as plt


# Ignore these warnings, it's a bug
warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")

# Size, number of elements
[docs] def size(a=None): """Returns the number of elements""" if a is None: return 0 return np.array(a,ndmin=1).size
# Median Absolute Deviation
[docs] def mad(data, axis=None, func=None, ignore_nan=True): """ Calculate the median absolute deviation.""" if type(data) is not np.ndarray: raise ValueError("data must be a numpy array") return 1.4826 * astropy.stats.median_absolute_deviation(data,axis=axis,func=func,ignore_nan=ignore_nan)
[docs] def minmax(a): """ Return a 2-element array of minimum and maximum.""" if type(a) is not np.ndarray: raise ValueError("a must be a numpy array") return np.array([np.min(a),np.max(a)])
[docs] def stat(a=None,silent=False): """ Returns basic statistics on an array.""" if a is None: raise ValueError("a must be input") if type(a) is not np.ndarray: raise ValueError("a must be a numpy array") # This is what stat returns: # info[0]: Number of Elements # info[1]: Minimum # info[2]: Maximum # info[3]: Range # info[4]: Mean # info[5]: Median # info[6]: Standard Deviation # info[7]: Standard Error # info[8]: Root Mean Square (R.M.S.) # info[9]: MAD estimate of St.Dev. info = np.zeros(10,float) info[0] = len(a) info[1] = np.min(a) info[2] = np.max(a) info[3] = info[2]-info[1] info[4] = np.mean(a) if info[0] > 1: info[5] = np.median(a) info[6] = np.std(a,ddof=1) # std. dev. info[7] = info[6]/np.sqrt(info[0]) # std. err. info[8] = np.sqrt(np.sum(a**2)/info[0]) # RMS info[9] = mad(a) # MAD if silent is not True: print('----------------------') print('elements = %d' % info[0]) print('minimum = %f' % info[1]) print('maximum = %f' % info[2]) print('range = %f' % info[3]) print('mean = %f' % info[4]) print('median = %f' % info[5]) print('st. dev. = %f' % info[6]) print('st. err. = %f' % info[7]) print('r.m.s. = %f' % info[8]) print('mad s.d. = %f' % info[9]) print('----------------------') return
[docs] def where(statement,comp=False): """ Wrapper around numpy.where() to be more like IDL""" # If comp=True then the complement will be returned gd, = np.where(statement) ngd = len(gd) if comp: bd, = np.where(~statement) nbd = len(bd) return gd,ngd,bd,nbd else: return gd,ngd
[docs] def strlen(lst=None): """ Calculate the string lengths of a string array.""" if lst is None: raise ValueError("lst must be input") n = size(lst) out = np.zeros(n,int) for i,a in enumerate(np.array(lst,ndmin=1)): out[i] = len(a) if n==1: out=int(out) return out
[docs] def strip(lst=None,chars=None): """ Strip on a scalar or list.""" if lst is None: raise ValueError("lst must be input") if type(lst) is str: return lst.strip(chars) return [o.strip(chars) for o in np.array(lst,ndmin=1)]
[docs] def strjoin(a=None,b=None,c=None,sep=None): """ Join two string lists/arrays or scalars""" if (a is None) | (b is None): raise ValueError("a and b must be input") na = size(a) nb = size(b) nc = size(c) if sep is None: sep='' n = np.max([na,nb,nc]) len1 = strlen(a) t1 = type(a) len2 = strlen(b) t2 = type(b) if nc>0: len3 = strlen(c) t3 = type(c) else: len3 = 0 t3 = t2 nlen = np.max(len1)+np.max(len2)+np.max(len3)+len(sep) out = np.zeros(n,(np.str,nlen)) for i in range(n): if na>1: a1 = a[i] else: a1 = np.array(a,ndmin=1)[0] arr = tuple(a1) if nb>1: b1 = b[i] else: b1 = np.array(b,ndmin=1)[0] arr += tuple(b1) if nc>0: if nc>1: c1 = c[i] else: c1 = np.array(c,ndmin=1)[0] arr += tuple(c1) out[i] = sep.join(arr) if (n==1) & (t1 is str) & (t2 is str) & (t3 is str): return out[0] # scalar if (t1 is list) | (t2 is list) | (t3 is list): return list(out) return out
[docs] def strsplit(lst=None,delim=None,asarray=False): """ Split a string array.""" if (lst is None): raise ValueError("lst must be input") if size(lst)==1: out = lst.split(delim) else: out = [l.split(delim) for l in lst] if asarray is True: nlst = np.array(lst).size nel = [len(o) for o in out] nlen = np.max(strlen(lst)) outarr = np.zeros((nlst,np.max(nel)),(np.str,nlen)) for i in range(nlst): temp = np.array(out[i]) ntemp = len(temp) outarr[i,0:ntemp] = temp return outarr else: return out
[docs] def pathjoin(indir=None,name=None): """ Join two or more pathname components, inserting '/' as needed Same as os.path.join but also works on arrays/lists.""" if indir is None: raise ValueError("must input indir") if name is None: raise ValueError("must input name") nindir = size(indir) nname = size(name) n = np.max([nindir,nname]) len1 = strlen(indir) len2 = strlen(name) nlen = np.max(len1)+np.max(len2)+1 out = np.zeros(n,(np.str,nlen)) for i in range(n): if nindir>1: indir1 = indir[i] else: indir1 = np.array(indir,ndmin=1)[0] if indir1[-1] != '/': indir1+='/' if nname>1: name1 = nname[i] else: name1 = np.array(name,ndmin=1)[0] out[i] = indir1+name1 if (n==1) & (type(indir) is str) & (type(name) is str): return out[0] # scalar if (type(indir) is list) | (type(name) is list): return list(out) return out
[docs] def first_el(lst): """ Return the first element""" if lst is None: return None if size(lst)>1: return lst[0] if (size(lst)==1) & (type(lst) is list): return lst.pop() if (size(lst)==1) & (type(lst) is np.ndarray): return lst.item() return lst
# Standard grep function that works on string list
[docs] def grep(lines=None,expr=None,index=False): """ Similar to the standard unit "grep" but run on a list of strings. Returns a list of the matching lines unless index=True is set, then it returns the indices. Parameters ---------- lines : list The list of string lines to check. expr : str Scalar string expression to search for. index : bool, optional If this is ``True`` then the indices of matching lines will be returned instead of the actual lines. index is ``False`` by default. Returns ------- out : list The list of matching lines or indices. Examples -------- Search for a string and return the matching lines: .. code-block:: python mlines = grep(lines,"hello") Search for a string and return the indices of the matching lines: .. code-block:: python index = grep(lines,"hello",index=True) """ if lines is None: raise ValueError("lines must be input") if expr is None: raise ValueError("expr must be input") out = [] cnt = 0 for l in np.array(lines,ndmin=1): m = re.search(expr,l) if m != None: if index is False: out.append(l) else: out.append(cnt) cnt = cnt+1 return out
# Create an empty file
[docs] def touch(fname): open(fname, 'a').close()
# Read in all lines of files
[docs] def readlines(fil=None,raw=False): """ Read in all lines of a file. Parameters ---------- file : str The name of the file to load. raw : bool, optional, default is false Do not trim \n off the ends of the lines. Returns ------- lines : list The list of lines from the file Examples -------- .. code-block:: python lines = readlines("file.txt") """ if fil is None: raise ValueError("File not input") f = open(fil,'r') lines = f.readlines() f.close() # Strip newline off if raw is False: lines = [l.rstrip('\n') for l in lines] return lines
# Write all lines to file
[docs] def writelines(filename=None,lines=None,overwrite=True,raw=False): """ Write a list of lines to a file. Parameters ---------- filename : str The filename to write the lines to. lines : list The list of lines to write to a file. overwrite : bool, optional, default is True If the output file already exists, then overwrite it. raw : bool, optional, default is False Do not modify the lines. Write out as is. Returns ------- Nothing is returned. The lines are written to `fil`. Examples -------- .. code-block:: python writelines("file.txt",lines) """ # Not enough inputs if lines is None: raise ValueError("No lines input") if filename is None: raise ValueError("No file name input") # Check if the file exists already if os.path.exists(filename): if overwrite is True: os.remove(filename) else: print(filename+" already exists and overwrite=False") return # Modify the input as needed if raw is False: # List, make sure it ends with \n if type(lines) is list: for i,l in enumerate(lines): if l.endswith('\n') is False: lines[i] += '\n' # Make sure final element does not end in \n n = size(lines) if n>1: if lines[-1].endswith('\n'): lines[-1] = lines[-1][0:-1] else: if lines.endswith('\n'): lines = lines[0:-1] # Convert string to list if (type(lines) is str) | (type(lines) is np.str_): lines=list(lines) # Convert numpy array and numbers to list of strings if type(lines) is not list: if hasattr(lines,'__iter__'): lines = [str(l)+'\n' for l in lines] # Make sure final element does not end in \n if lines[-1].endswith('\n'): lines[-1] = lines[-1][0:-1] else: lines = str(lines) # Write the file f = open(filename,'w') f.writelines(lines) f.close()
# Remove indices from a list
[docs] def remove_indices(lst=None,index=None): """ This will remove elements from a list given their indices. Use numpy.delete() for numpy arrays instead. Parameters ---------- lst : list The list from which to remove elements. index : list or array The list or array of indices to remove. Returns ------- newlst : list The new list with indices removed. Examples -------- Remove indices 1 and 5 from array `arr`. .. code-block:: python index = [1,5] arr = range(10) arr2 = remove_indices(arr,index) print(arr) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] """ if lst is None: raise ValueError("list must be input") if index is None: raise ValueError("index must be input") newlst = [] for i in range(len(lst)): if i not in index: newlst.append(lst[i]) if type(lst) is np.ndarray: newlst = np.array(newlst) return newlst
# Little function used by numlines
[docs] def blocks(files, size=65536): """ This is a small utility function used by numlines() """ while True: b = files.read(size) if not b: break yield b
# Read number of lines in a file
[docs] def numlines(fil=None): """ This function quickly counts the number of lines in a file. Parameters ---------- fil : str The filename to check the number of lines. Returns ------- nlines : int The number of lines in `fil`. Examples -------- .. code-block:: python n = numlines("file.txt") """ if fil is None: raise ValueError("file must be input") with open(fil, "r") as f: return (sum(bl.count("\n") for bl in blocks(f)))
# Could also use this #count=0 #for line in open(fil): count += 1 # Set up basic logging to screen
[docs] def basiclogger(name=None): """ This sets up a basic logger that writes just to the screen. """ if name is None: name = "log" logger = logging.getLogger(name) # Only add a handler if none exists # the logger might already have been created if len(logger.handlers)==0: handler = logging.StreamHandler() formatter = logging.Formatter('%(levelname)-2s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger
# Delete files
[docs] def remove(files=None,allow=True): """ Delete a list of files.""" if files is None: raise ValueError("No files input") nfiles = np.array(files).size for f in np.array(files,ndmin=1): if os.path.exists(f): os.remove(f) else: if allow is False: raise Exception(f+" does not exist")
# Do files exist
[docs] def exists(files=None): """ Check if a list of files exist.""" if files is None: raise ValueError("No files input") nfiles = np.array(files).size out = np.zeros(nfiles,np.bool)+False for i,f in enumerate(np.array(files,ndmin=1)): if os.path.exists(f): out[i] = True return out
[docs] def lt(x,limit): """Takes the lesser of x or limit""" # np.minimum() also does this if np.array(x).size>1: out = [i if (i<limit) else limit for i in x] else: out = x if (x<limit) else limit if type(x) is np.ndarray: return np.array(out) return out
[docs] def gt(x,limit): """Takes the greater of x or limit""" # np.maximum() also does this if np.array(x).size>1: out = [i if (i>limit) else limit for i in x] else: out = x if (x>limit) else limit if type(x) is np.ndarray: return np.array(out) return out
[docs] def limit(x,llimit,ulimit): """Require x to be within upper and lower limits""" return lt(gt(x,llimit),ulimit)
[docs] def valrange(array): if size(array)==1: return 0.0 else: return np.max(array)-np.min(array)
[docs] def signs(inp): """ Return the sign of input. Return +1.0 for 0.0""" s = np.sign(inp) bad,nbad = where(s== 0) if nbad>0: if size(s)>1: s[bad] = 1 else: s = 1.0 return s
[docs] def scale(arr,oldrange,newrange): """ This function maps an array or image onto a new scale given two points on the old scale and the corresponding points on the new scale. The array is converted to double type. It's similar to BYTSCL.PRO except that you can set the bottom value as well. The ranges can be increasing or decreasing. INPUTS: arr The array of values to be scaled oldrange Two-element array specifiying The original range which will be scaled to newrange. newrange Two-element array specifiying The new range which the oldrange will be scaled to. OUTPUTS: narr The new scaled array USAGE: arr2 = scale(arr,[0,1],[150,2000]) By D.Nidever March 2007 """ if len(newrange)!=2: raise ValueError("newrange must be a 2-element array or list") if len(oldrange)!=2: raise ValueError("oldrange must be a 2-element array or list") # Does it flip around signchange = 1.0 if signs(oldrange[1]-oldrange[0]) != signs(newrange[1]-newrange[0]): signchange = -1.0 # Scale narr = valrange(newrange) * signchange*(np.float64(arr)-oldrange[0])/val.range(oldrange) + newrange[0] return narr
[docs] def scale_vector(vector, minrange, maxrange): """ Scale a vector to minrange and maxrange. """ # Make sure we are working with floating point numbers. minRange = np.float64( minrange ) maxRange = np.float64( maxrange ) # Make sure we have a valid range. if (maxRange == minrange): raise ValueError("Range max and min are coincidental") return vector*0+minrange vectormin = np.float64(np.min(vector)) vectormax = np.float64(np.max(vector)) # Calculate the scaling factors. scaleFactor = [((minrange * vectormax)-(maxrange * vectormin)) / (vectormax - vectormin), (maxrange - minrange) / (vectormax - vectormin)] # Return the scaled vector. return vector * scaleFactor[1] + scaleFactor[0]
[docs] def quadratic_bisector(x,y): """ Calculate the axis of symmetric or bisector of parabola""" #https://www.azdhs.gov/documents/preparedness/state-laboratory/lab-licensure-certification/technical-resources/ # calibration-training/12-quadratic-least-squares-regression-calib.pdf #quadratic regression statistical equation n = len(x) if n<3: return None Sxx = np.sum(x**2) - np.sum(x)**2/n Sxy = np.sum(x*y) - np.sum(x)*np.sum(y)/n Sxx2 = np.sum(x**3) - np.sum(x)*np.sum(x**2)/n Sx2y = np.sum(x**2 * y) - np.sum(x**2)*np.sum(y)/n Sx2x2 = np.sum(x**4) - np.sum(x**2)**2/n #a = ( S(x^2*y)*S(xx)-S(xy)*S(xx^2) ) / ( S(xx)*S(x^2x^2) - S(xx^2)^2 ) #b = ( S(xy)*S(x^2x^2) - S(x^2y)*S(xx^2) ) / ( S(xx)*S(x^2x^2) - S(xx^2)^2 ) denom = Sxx*Sx2x2 - Sxx2**2 if denom==0: return np.nan a = ( Sx2y*Sxx - Sxy*Sxx2 ) / denom b = ( Sxy*Sx2x2 - Sx2y*Sxx2 ) / denom if a==0: return np.nan return -b/(2*a)
[docs] def wtmean(x,sigma,error=False,reweight=False,magnitude=False): """ Calculate weighted mean and error""" n = len(x) wt = 1/sigma**2 # Magnitudes if magnitude: fmn = np.sum( 2.5118864**x * wt) / np.sum(wt) xmn = 2.50*np.log10(fmn) else: xmn = np.sum(wt*x)/np.sum(wt) # Reweight the points based on the residuals # using formula similar to the one given by # Stetson (1996) pg.4 if reweight: if magnitude: resid = x-xmn wt2 = wt/(1+np.abs(resid)**2/np.mean(sigma)) fmn2 = np.sum( 2.5118864**x * wt2) / np.sum(wt2) xmn = 2.50*np.log10(fmn2) else: resid = x-xmn wt2 = wt/(1+np.abs(resid)**2/np.mean(sigma)) xmn = np.sum(wt2*x)/np.sum(wt2) # Include uncertainty if error: if magnitude: xerr = np.sqrt(1.0/np.sum(wt)) else: xerr = np.sqrt( np.sum( ((x-xmn)**2)*wt)*n / ((n-1)*np.sum(wt))) / np.sqrt(n) return xmn,xerr else: return xmn
[docs] def wtslope(x,y,sigma,error=False,reweight=False): """ Calculate weighted slope and error""" n = len(x) wt = 1/sigma**2 totwt = np.sum(wt) mnx = np.sum(wt*x)/totwt mny = np.sum(wt*y)/totwt wtx = (np.sum(wt*x*y)/totwt-mnx*mny)/(np.sum(wt*x**2)/totwt-mnx**2) # Reweight the points based on the residuals # using formula similar to the one given by # Stetson (1996) pg.4 if reweight: resid = y-wtx*x resid -= np.mean(resid) wt2 = wt/(1+np.abs(resid)**2/np.mean(sigma)) totwt2 = np.sum(wt2) mnx2 = np.sum(wt2*x)/totwt2 mny2 = np.sum(wt2*y)/totwt2 wtx = (np.sum(wt2*x*y)/totwt2-mnx2*mny2)/(np.sum(wt2*x**2)/totwt2-mnx2**2) if error: wtxerr = 1.0/np.sqrt( np.sum(wt*x**2)-mnx**2 * np.sum(wt)) return wtx, wtxerr else: return wtx
[docs] def robust_slope(x,y,sigma,limits=None,npt=15,reweight=False): """ Calculate robust weighted slope""" # Maybe add sigma outlier rejection in the future n = len(x) if n==2: return wtslope(x,y,sigma,error=True,reweight=reweight) # Calculate weighted pmx/pmxerr wt_slp,wt_slperr = wtslope(x,y,sigma,error=True,reweight=reweight) wt_y, wt_yerr = wtmean(y,sigma,error=True,reweight=reweight) # Unweighted slope uwt_slp = wtslope(x,y,sigma*0+1,reweight=reweight) # Calculate robust loss metric for range of slope values # chisq = Sum( abs(y-(x*slp-mean(x*slp)))/sigma ) if limits is None: limits = np.array([np.min([0.5*wt_slp,0.5*uwt_slp]), np.max([1.5*wt_slp,1.5*uwt_slp])]) slp_step = (np.max(limits)-np.min(limits))/(npt-1) slp_arr = np.arange(npt)*slp_step + np.min(limits) # Vectorize it resid = np.outer(y,np.ones(npt))-np.outer(x,np.ones(npt))*np.outer(np.ones(n),slp_arr) mnresid = np.mean(resid,axis=0) resid -= np.outer(np.ones(n),mnresid) # remove the mean chisq = np.sum( np.abs(resid) / np.outer(sigma,np.ones(npt)) ,axis=0) bestind = np.argmin(chisq) best_slp = slp_arr[bestind] # Get parabola bisector lo = np.maximum(0,bestind-2) hi = np.maximum(bestind+2,n) quad_slp = quadratic_bisector(slp_arr[lo:hi],chisq[lo:hi]) # Problem with parabola bisector, use best point instead if np.isnan(quad_slp) | (np.abs(quad_slp-best_slp)> slp_step): best_slp = best_slp else: best_slp = quad_slp return best_slp, wt_slperr
[docs] def wtmedian(val,wt): """Weighted median can be computed by sorting the set of numbers and finding the smallest numbers which sums to half the weight of total weight.""" # https://en.wikipedia.org/wiki/Weighted_median si = np.argsort(val.flatten()) totwt = np.cumsum(np.abs(wt).flatten()[si]) ind = totwt.searchsorted(totwt.max()*0.5) return val.flatten()[si[ind-1]]
[docs] def gaussian(x, amp, cen, sig, const=0.0, slp=0.0): """1-D gaussian: gaussian(x, amp, cen, sig)""" #return (amp / (np.sqrt(2*np.pi) * sig)) * np.exp(-(x-cen)**2 / (2*sig**2)) + const return amp * np.exp(-(x-cen)**2 / (2*sig**2)) + const + slp*(x-cen)
[docs] def gaussbin(x, amp, cen, sig, const=0, slp=0.0, dx=1.0): """1-D gaussian with pixel binning This function returns a binned Gaussian par = [height, center, sigma] Parameters ---------- x : array The array of X-values. amp : float The Gaussian height/amplitude. cen : float The central position of the Gaussian. sig : float The Gaussian sigma. const : float, optional, default=0.0 A constant offset. slp : float, optional, default=0.0 A linear slope around cen. dx : float, optional, default=1.0 The width of each "pixel" (scalar). Returns ------- geval : array The binned Gaussian in the pixel """ xcen = np.array(x)-cen # relative to the center x1cen = xcen - 0.5*dx # left side of bin x2cen = xcen + 0.5*dx # right side of bin t1cen = x1cen/(np.sqrt(2.0)*sig) # scale to a unitless Gaussian t2cen = x2cen/(np.sqrt(2.0)*sig) # For each value we need to calculate two integrals # one on the left side and one on the right side # Evaluate each point # ERF = 2/sqrt(pi) * Integral(t=0-z) exp(-t^2) dt # negative for negative z geval_lower = erf(t1cen) geval_upper = erf(t2cen) geval = amp*np.sqrt(2.0)*sig * np.sqrt(np.pi)/2.0 * ( geval_upper - geval_lower ) geval += const + slp(x-cen) # add constant offset and slope return geval
[docs] def gaussfit(x,y,initpar,sigma=None, bounds=None, binned=False): """Fit 1-D Gaussian to X/Y data""" #gmodel = Model(gaussian) #result = gmodel.fit(y, x=x, amp=initpar[0], cen=initpar[1], sig=initpar[2], const=initpar[3]) #return result func = gaussian if binned is True: func=gaussbin return curve_fit(func, x, y, p0=initpar, sigma=sigma, bounds=bounds)
[docs] def voigt(x, height, cen, sigma, gamma, const=0.0, slp=0.0): """ Return the Voigt line shape at x with Lorentzian component HWHM gamma and Gaussian sigma. """ maxy = np.real(wofz((1j*gamma)/sigma/np.sqrt(2))) / sigma\ /np.sqrt(2*np.pi) return (height/maxy) * np.real(wofz(((x-cen) + 1j*gamma)/sigma/np.sqrt(2))) / sigma\ /np.sqrt(2*np.pi) + const + slp*(x-cen)
[docs] def voigtfit(x,y,initpar=None,sigma=None,bounds=(-np.inf,np.inf)): """Fit a Voigt profile to data.""" if initpar is None: initpar = [np.max(y),x[np.argmax(y)],1.0,1.0,np.median(y),0.0] func = voigt return curve_fit(func, x, y, p0=initpar, sigma=sigma, bounds=bounds)
[docs] def voigtarea(pars): """ Compute area of Voigt profile""" sig = np.maximum(pars[2],pars[3]) x = np.linspace(-20*sig,20*sig,1000)+pars[1] dx = x[1]-x[0] v = voigt(x,np.abs(pars[0]),pars[1],pars[2],pars[3]) varea = np.sum(v*dx) return varea
[docs] def poly(x,coef,*args): """ Evaluate a polynomial function of a variable.""" # p(x) = p[0] * x**deg + ... + p[deg] y = np.array(x).copy()*0.0 # concatenate coefficients if len(args)>0: coef = np.hstack((coef,np.array(args))) n = len(coef) for i in range(n): y += coef[i]*x**(n-1-i) return y
[docs] def poly_resid(coef,x,y,sigma=1.0): sig = sigma if sigma is None: sig=1.0 return (poly(x,coef)-y)/sig
[docs] def poly_fit(x,y,nord,robust=False,sigma=None,initpar=None,bounds=(-np.inf,np.inf),error=False,max_nfev=None): if initpar is None: initpar = np.zeros(nord+1) # Normal polynomial fitting #if sigma is None: sigma=np.zeros(len(x))+1 #coef, cov = curve_fit(poly, x, y, p0=initpar, sigma=sigma, bounds=bounds) #perr = np.sqrt(np.diag(cov)) #return coef, perr #weights = None #if sigma is not None: weights=1/sigma**2 #if error: # if len(x)>nord+3: # coef, cov = np.polyfit(x,y,nord,w=weights,cov='unscaled') # perr = np.sqrt(np.diag(cov)) # else: # coef = np.polyfit(x,y,nord,w=weights) # perr = coef.copy()*0.0 # # return coef, perr #else: # coef = np.polyfit(x,y,nord,w=weights) # return coef loss = 'linear' if robust: loss='soft_l1' if sigma is None: sigma=np.zeros(len(x))+1 res = least_squares(poly_resid, initpar, loss=loss, f_scale=0.1, args=(x,y,sigma), max_nfev=max_nfev) if res.success is False: import pdb; pdb.set_trace() raise Exception("Problem with least squares polynomial fitting. Status="+str(res.status)) return initpar+np.nan coef = res.x # Calculate the covariance matrix # this is how scipy.optimize.curve_fit computes the covariance matrix # https://github.com/scipy/scipy/blob/2526df72e5d4ca8bad6e2f4b3cbdfbc33e805865/scipy/optimize/minpack.py#L739 if error: _, s, VT = svd(res.jac, full_matrices=False) threshold = np.finfo(float).eps * max(res.jac.shape) * s[0] s = s[s > threshold] VT = VT[:s.size] pcov = np.dot(VT.T / s**2, VT) # Compute errors on the parameters perr = np.sqrt(np.diag(pcov)) return coef, perr else: return coef
# Derivative or slope of an array
[docs] def slope(array): """Derivative or slope of an array: slp = slope(array)""" n = len(array) return array[1:n]-array[0:n-1]
# Gaussian filter
[docs] def gsmooth(data,fwhm,mask=None,boundary='extend',fill=0.0,truncate=4.0,squared=False): # astropy.convolve automatically ignores NaNs # Create kernel xsize = np.ceil(fwhm/2.35*truncate*2) if xsize % 2 == 0: xsize+=1 # must be odd g = Gaussian1DKernel(stddev=fwhm/2.35,x_size=xsize) if squared is False: return convolve(data, g.array, mask=mask, boundary=boundary, fill_value=fill) #return gaussian_filter1d(data,fwhm/2.35,axis=axis,mode=mode,cval=cval,truncate=truncate) else: return convolve(data, g.array**2, mask=mask, boundary=boundary, fill_value=fill, normalize_kernel=False)
#return gaussian_filter1d(data,fwhm/2.35,axis=axis,mode=mode,cval=cval,truncate=truncate)**2 # Rebin data
[docs] def rebin(arr, new_shape): if arr.ndim>2: raise Exception("Maximum 2D arrays") if arr.ndim==0: raise Exception("Must be an array") if arr.ndim==2: shape = (new_shape[0], arr.shape[0] // new_shape[0], new_shape[1], arr.shape[1] // new_shape[1]) return arr.reshape(shape).mean(-1).mean(1) if arr.ndim==1: shape = (np.array(new_shape,ndmin=1)[0], arr.shape[0] // np.array(new_shape,ndmin=1)[0]) return arr.reshape(shape).mean(-1)
[docs] def roi_cut(xcut,ycut,x,y): """ Use cuts in a 2D plane to select points from arrays. Parameters ---------- xcut : numpy array Array of x-values for the cut ycut : numpy array Array of y-values for the cut x : numpy array or list Array of x-values that should be cut y : numpy array or list Array of y-values that should be cut Returns ------- ind : numpy array The indices of values OUTSIDE the cut cutind : The indices of values INSIDE the cut Example ------- .. code-block:: python ind, cutind = roi_cut(xcut,ycut,x,y) """ from matplotlib.path import Path tupVerts = list(zip(xcut,ycut)) points = np.vstack((x,y)).T p = Path(tupVerts) # make a polygon inside = p.contains_points(points) ind, = np.where(~inside) cutind, = np.where(inside) return ind, cutind
[docs] def create_index(arr): """ Create an index of array values like reverse indices. arr[index['index'][index['lo'][2]:index['hi'][2]+1]] """ narr = size(arr) if narr==0: raise ValueError('arr has no elements') si = np.argsort(arr) sarr = np.array(arr)[si] brklo, = np.where(sarr != np.roll(sarr,1)) nbrk = len(brklo) if nbrk>0: brkhi = np.hstack((brklo[1:nbrk]-1,narr-1)) num = brkhi-brklo+1 index = {'index':np.atleast_1d(si),'value':np.atleast_1d(sarr[brklo]), 'num':np.atleast_1d(num),'lo':np.atleast_1d(brklo),'hi':np.atleast_1d(brkhi)} else: index = {'index':np.atleast_1d(si),'value':np.atleast_1d(arr[0]), 'num':np.atleast_1d(narr),'lo':np.atleast_1d(0),'hi':np.atleast_1d(narr-1)} return index
[docs] def match(a,b,epsilon=0): """ Routine to match values in two vectors. CALLING SEQUENCE: match, a, b, suba, subb, [ COUNT =, /SORT, EPSILON = ] INPUTS: a,b - two vectors to match elements, numeric or string data types OUTPUTS: suba - subscripts of elements in vector a with a match in vector b subb - subscripts of the positions of the elements in vector b with matchs in vector a. suba and subb are ordered such that a[suba] equals b[subb] suba and subb are set to !NULL if there are no matches (or set to -1 if prior to IDL Version 8.0) OPTIONAL INPUT KEYWORD: /SORT - By default, MATCH uses two different algorithm: (1) the /REVERSE_INDICES keyword to HISTOGRAM is used for integer data, while (2) a sorting algorithm is used for non-integer data. The histogram algorithm is usually faster, except when the input vectors are sparse and contain very large numbers, possibly causing memory problems. Use the /SORT keyword to always use the sort algorithm. epsilon - if values are within epsilon, they are considered equal. Used only only for non-integer matching. Note that input vectors should be unique to within epsilon to provide one-to-one mapping. Default=0. OPTIONAL KEYWORD OUTPUT: COUNT - set to the number of matches, integer scalar SIDE EFFECTS: The obsolete system variable !ERR is set to the number of matches; however, the use !ERR is deprecated in favor of the COUNT keyword RESTRICTIONS: The vectors a and b should not have duplicate values within them. You can use rem_dup function to remove duplicate values in a vector EXAMPLE: If a = [3,5,7,9,11] & b = [5,6,7,8,9,10] then IDL> match, a, b, suba, subb, COUNT = count will give suba = [1,2,3], subb = [0,2,4], COUNT = 3 and a[suba] = b[subb] = [5,7,9] METHOD: For non-integer data types, the two input vectors are combined and sorted and the consecutive equal elements are identified. For integer data types, the /REVERSE_INDICES keyword to HISTOGRAM of each array is used to identify where the two arrays have elements in common. HISTORY: D. Lindler Mar. 1986. Fixed "indgen" call for very large arrays W. Landsman Sep 1991 Added COUNT keyword W. Landsman Sep. 1992 Fixed case where single element array supplied W. Landsman Aug 95 Use a HISTOGRAM algorithm for integer vector inputs for improved performance W. Landsman March 2000 Work again for strings W. Landsman April 2000 Use size(/type) W. Landsman December 2002 Work for scalar integer input W. Landsman June 2003 Assume since V5.4, use COMPLEMENT to WHERE() W. Landsman Apr 2006 Added epsilon keyword Kim Tolbert March 14, 2008 Fix bug with Histogram method with all negative values W. Landsman/ R. Gutermuth, return !NULL for no matches November 2017 Added epsilon test in na=1||nb=1 section (missed that when added epsilon in 2008) Kim Tolbert July 10, 2018 """ #da = size(a,/type) & db =size(b,/type) #if keyword_set(sort) then hist = 0b else $ # hist = (( da LE 3 ) || (da GE 12)) && ((db LE 3) || (db GE 12 )) na = size(a) # number of elements in a nb = size(b) # number of elements in b # Check for a single element array if (na==1) | (nb==1): if (nb>1): if epsilon==0.0: subb, = np.where(b==a) nw = len(subb) else: subb, = np.where(np.abs(b-a) < epsilon) nw = len(subb) if (nw>0): suba = np.zeros(nw,int) else: suba = np.array([]) else: if epsilon==0.0: suba, = np.where(a==b) nw = len(suba) else: suba, = np.where(np.abs(a-b) < epsilon) nw = len(suba) if (nw>0): subb, = np.zeros(nw,int) else: subb = np.array([]) count = nw return suba,subb # Conver to numpy.chararray if either of them are strings a1 = first_el(a) b1 = first_el(b) if isinstance(a,np.chararray) | isinstance(a1,str) | isinstance(a1,np.string_) | \ isinstance(b,np.chararray) | isinstance(b1,str) | isinstance(b1,np.string_): atemp = np.char.array(a) btemp = np.char.array(b) # Use the dtype with the largest number of characters if atemp.dtype > btemp.dtype: dtype = atemp.dtype else: dtype = btemp.dtype c = np.zeros(na+nb,dtype=dtype) c[0:na] = atemp c[na:] = btemp c = np.char.array(c) # convert to np.chararray, removes trailing strings del atemp, btemp #c = np.hstack((np.char.array(a),np.char.array(b))) # combined list of a and b else: c = np.hstack((np.array(a),np.array(b))) # combined list of a and b ind = np.hstack((np.arange(na),np.arange(nb))) # combined list of indices vec = np.hstack((np.zeros(na,bool),np.zeros(nb,bool)+True)) # flag of which vector in combined #list False - a True - b # sort combined list sub = np.argsort(c) c = c[sub] ind = ind[sub] vec = vec[sub] # find duplicates in sorted combined list n = na + nb #t otal elements in c if epsilon == 0.0: firstdup, = np.where( (c == np.roll(c,-1)) & (vec != np.roll(vec,-1)) ) count = len(firstdup) else: firstdup, = np.where( (np.abs(c - np.roll(c,-1)) < epsilon) & (vec != np.roll(vec,-1)) ) count = len(firstdup) if count==0: # any found? suba = np.array([]) subb = np.array([]) return suba,subb dup = np.zeros( count*2, int ) # both duplicate values even = np.arange( len(firstdup))*2 # Changed to LINDGEN 6-Sep-1991 dup[even] = firstdup dup[even+1] = firstdup+1 ind = ind[dup] # indices of duplicates vec = vec[dup] # vector id of duplicates vone, = np.where(vec) vzero, = np.where(~vec) subb = ind[vone] # b subscripts suba = ind[vzero] # # Integer calculation using histogram. # else: # # minab = min(a, MAX=maxa) > min(b, MAX=maxb) #Only need intersection of ranges # maxab = maxa < maxb # # #If either set is empty, or their ranges don't intersect: # # result = NULL (which is denoted by integer = -1) # !ERR = -1 # if !VERSION.RELEASE GE '8.0' then begin # suba = !NULL # subb = !NULL # endif else begin # suba = -1 # subb = -1 # endelse # COUNT = 0L # if maxab lt minab then return #No overlap # # ha = histogram([a], MIN=minab, MAX=maxab, reverse_indices=reva) # hb = histogram([b], MIN=minab, MAX=maxab, reverse_indices=revb) # # r = where((ha ne 0) and (hb ne 0), count) # # if count gt 0 then begin # suba = reva[reva[r]] # subb = revb[revb[r]] return suba, subb
# Interpolation with extrapolation
[docs] def interp(x,y,xout,kind='cubic',bounds_error=False,assume_sorted=True,extrapolate=True,exporder=2,fill_value=np.nan): yout = interp1d(x,y,kind=kind,bounds_error=bounds_error,fill_value=(fill_value,fill_value),assume_sorted=assume_sorted)(xout) # Need to extrapolate if ((np.min(xout)<np.min(x)) | (np.max(xout)>np.max(x))) & (extrapolate is True): si = np.argsort(x) npix = len(x) nfit = np.min([10,npix]) # At the beginning if (np.min(xout)<np.min(x)): coef1 = poly_fit(x[0:nfit], y[0:nfit], exporder) bd1, nbd1 = where(xout < np.min(x)) yout[bd1] = poly(xout[bd1],coef1) # At the end if (np.max(xout)>np.max(x)): coef2 = poly_fit(x[npix-nfit:], y[npix-nfit:], exporder) bd2, nbd2 = where(xout > np.max(x)) yout[bd2] = poly(xout[bd2],coef2) return yout
[docs] def concatenate(a,b=None): # Concatenate two or more numpy structured arrays # Can input two numpy structured arrays or a list of them if (b is None and type(a) is not list) | (b is not None and type(a) is list): raise Exception('Must input two numpy structured arrays or a list of them') return if type(a) is not list: a=list(a) if b is not None: a.append(b) # Get dtypes for all of the numpy structured arrays ncat = size(a) dtypearr = [] ncols = [] nrows = [] for a1 in a: dtype1 = a1.dtype dtypearr.append(dtype1) ncols.append(len(dtype1.names)) nrows.append(len(a1)) ncols = np.array(ncols) nrows = np.array(nrows) # Ncols not the same if np.min(ncols) != np.max(ncols): raise Exception('Number of columns are not the same: min='+str(np.min(ncols))+' max='+str(np.max(ncols))) # Checking column names colnames = np.zeros((ncat,ncols[0]),dtype=(np.str,100)) for i in range(ncat): colnames[i,:] = a[i].dtype.names if i>0: if list(colnames[0,:]) != list(colnames[i,:]): raise Exception('Column names are not the same') # Make the final dtype # make sure string columns are same length dtype_list = [] for f in a[0].dtype.names: if a[0].dtype[f].char == 'S': isize = [] for d1 in dtypearr: isize.append(d1[f].itemsize) maxsize = np.max(isize) dtype_list.append((f,'S'+str(maxsize))) else: dtype_list.append((f,a[0].dtype[f].str)) dtype = np.dtype(dtype_list) # Create the final structure and load the data nlstr = np.sum(nrows) lstr = np.zeros(nlstr,dtype=dtype) count = 0 for i in range(ncat): a1 = a[i] n = len(a1) lstr[count:count+n] = a[i] count += n return lstr
[docs] def addcatcols(cat,dt): """ Add new columns to an existing numpty structured array catalog.""" ncat = len(cat) odt = cat.dtype # Concatenate the dtypes dtype_list = [] for f in cat.dtype.names: dtype_list.append((f,cat.dtype[f])) # dtype_list.append((f,cat.dtype[f].str)) for f in dt.names: dtype_list.append((f,dt[f])) # dtype_list.append((f,dt[f].str)) newdtype = np.dtype(dtype_list) # Create the final structure and load the data new = np.zeros(ncat,dtype=newdtype) for n in cat.dtype.names: new[n] = cat[n] return new
[docs] def onclick(event): #print('%s click: button=%d, x=%d, y=%d, xdata=%f, ydata=%f' % # ('double' if event.dblclick else 'single', event.button, # event.x, event.y, event.xdata, event.ydata)) #global ix, iy if event.xdata is None: global cid fig.canvas.mpl_disconnect(cid) print('Done. Coordinates are in global "coords" list') return ix, iy = event.xdata, event.ydata print('x = %d, y = %d'%(ix, iy)) global coords try: dum = len(coords) except: coords = [] coords.append((ix, iy)) return
[docs] def clicker(): # im=fits.getdata('F1-00507803_23.fits.fz') # fig = plt.figure() # plt.imshow(im) # clicker() # then the coordinates will be the global "coords" list print('Click outside the plot to end') global cid cid = fig.canvas.mpl_connect('button_press_event', onclick) coords = []
[docs] def add_elements(cat,num=10000): """ Add more elements to a catalog""" ncat = len(cat) old = cat.copy() num = gt(num,ncat) cat = np.zeros(ncat+num,dtype=old.dtype) cat[0:ncat] = old del old return cat
[docs] def ellipsecoords(pars,npoints=100): """ Create coordinates of an ellipse.""" # [x,y,asemi,bsemi,theta] # copied from ellipsecoords.pro xc = pars[0] yc = pars[1] asemi = pars[2] bsemi = pars[3] pos_ang = pars[4] phi = 2*np.pi*(np.arange(npoints)/(npoints-1)) # Divide circle into Npoints ang = np.deg2rad(pos_ang) # Position angle in radians cosang = np.cos(ang) sinang = np.sin(ang) x = asemi*np.cos(phi) # Parameterized equation of ellipse y = bsemi*np.sin(phi) xprime = xc + x*cosang - y*sinang # Rotate to desired position angle yprime = yc + x*sinang + y*cosang return xprime, yprime
[docs] def closest(array,value): array = np.asarray(array) idx = (np.abs(array - value)).argmin() return array[idx],idx
[docs] def splitfilename(filename): """ Split filename into directory, base and extensions.""" fdir = os.path.dirname(filename) base = os.path.basename(filename) exten = ['.fit','.fits','.fit.gz','.fits.gz','.fit.fz','.fits.fz'] for e in exten: if base[-len(e):]==e: base = base[0:-len(e)] ext = e break return (fdir,base,ext)
[docs] def phase_fold(mjd, period, mjd0=None, centeredzero=False): ''' Given a series of dates and a period, it phase folds them into a number of cycles. Parameters ---------- mjd: array-like Array of dates to phase fold in Mean Julian Date (MJD). period: Quantity Period to use for the phase fold, in days. mjd0: float, optional Initial day to use for the phase fold. Default is the earliest date in mjd. centeredzero: boolean, optional Whether the phase goes from 0 to 1 or -0.5 to 0.5. Default is False (0 to 1). Returns ------- phase: array-like Converted mjd to phase values. ''' if mjd0==None: mjd0 = min(mjd) temp = mjd - mjd0 nonfold_phase = temp/period cycle = np.array(nonfold_phase, dtype=int) phase = nonfold_phase - cycle if centeredzero: phase -= 0.5 return phase
[docs] def plot_periodogram(frequency, power, filename='periodogram.png', units='days'): ''' Plots a periodogram based on the power and frequency given. Parameters ---------- frequency: array-like Frequency as 1/period, where the period is measured in days. power: array-like Corresponding power for each frequency. Must be the same shape as frequency. filename: str, optional Name of the file where to store the plot. units: str, optional Units used for the periodogram plot. Default is days. Returns ------- Saves a file to disk. ''' periods = 1/frequency fig = plt.figure() ax = fig.add_subplot() ax.plot(periods, power, lw=1, color='k') ax.set_ylabel('Power') ax.set_xlabel(f'Period ({units})') ax.set_xscale('log') fig.savefig(filename,bbox_inches = 'tight')
[docs] def plot_phased_lightcurve(phase, mags, mags_errs=None, filters=None, filename='timecurve.png'): ''' Plot a phase folded light curve. If multiple filters are present, plots them with different colors. Parameters ---------- phase: array-like Phases for each exposures. mags: array-like Magnitude of object in each exposure. Must have same shape as phase. mags_errs: array-like Associated errors for magnitudes. Must have the same shape as phase and mags. filters: string, array-like, optional Unique filters present in the data. If not given, all will be assumed to be the same. filename: string Path to save the plot. ''' fig = plt.figure() ax = fig.add_subplot() if filters is not None: for fltr in np.unique(filters): sel = filters==fltr ax.scatter(phase[sel], mags[sel], label=fltr, s=3) if mags_errs is not None: ax.errorbar(phase[sel], mags[sel], yerr=mags_errs[sel], fmt='none', capsize=0, elinewidth=1.5, ecolor='gray', alpha=0.7) ax.legend(markerscale=2) else: ax.scatter(phase, mags, label=fltr, s=3) if mags_errs is not None: ax.errorbar(phase, mags, yerr=mags_errs, fmt='none', capsize=0, elinewidth=1.5, ecolor='gray', alpha=0.7) ax.set_xlabel('Phase') ax.set_ylabel('Magnitude') fig.savefig(filename,bbox_inches = 'tight')
[docs] def most_frequent(array, return_counts = False): ''' Returns the most frequent element in an array. Parameters ---------- array: array-like Input array. return_counts: bool, optional Whether to return the number of counts of the most frequent element. Default is false. Returns ------- most_frequent Most frequent element in array. counts: float ''' unique, counts = np.unique(array, return_counts=True) index = np.argmax(counts) if return_counts: return unique[index], counts[index] else: return unique[index]