'''Scan.
The Scan class represents an image from a PAD detector. This file contains all
code relating to Scans.
'''
import warnings
import logging
logger = logging.getLogger('axeap')
from PIL import Image
from scipy.ndimage.filters import gaussian_filter
import numpy as np
from pathlib import Path
import pandas as pd
import re
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from .item import DataItem, DataItemSet, Saveable, Loadable
from .signal import Signal
from .conventions import X, Y, PathType
from ..utils import get_trailing_number
[docs]class Scan(DataItem, Saveable, Loadable):
"""
Class representing a scan image taken with a XES detector.
"""
TYPE_NAME = "Scan"
LOAD_FILE_EXTENTIONS = ['tiff','tif','scan']
SAVE_PATH_TYPE = PathType.FILE
SAVE_FILE_EXTENTIONS = ['tif', 'png', 'scan']
SAVE_PATH_TYPE = PathType.FILE
name = "Scan"
[docs] def __init__(self, img, imgspec=None, name=None, meta=None, cache=True):
"""
Parameters
----------
img : :obj:`numpy.ndarray`
2D array of intensity values
name : :obj:`str`
Name for the scan.
cache : :obj:`bool`, :obj:`dict`
Pass :obj:`False` to turn off cacheing of images. An
already-prepared cache of images in the form a dictionary of image
specs to images can also be passed. Do not use unless you know what
you're doing.
"""
DataItem.__init__(self)
self.img = img
self.name = name
self.default_imgspec = imgspec if imgspec is not None else IS.NOCHANGE
if isinstance(cache, dict):
self._imgcache = cache
elif cache is False:
self._imgcache = None
elif cache is True:
self._imgcache = {}
if meta is not None:
self.meta |= meta
def __repr__(self):
"""
Returns
-------
:obj:`str`
Representation of scan consists of name and uuid.
"""
return f"Scan(name:{self.name},uuid:{self.uuid})"
@property
def dims(self):
"""
Dimensions of scan image.
Returns
-------
:obj:`tuple`
Dimensions of scan image in form (x,y).
"""
return self.getImg().shape
[docs] def getImgFromSpecs(self, imgspec):
"""Get image after applying imagespec.
Parameters
----------
imgspec : :obj:`ImageSpec`)
Specifications for image.
Returns
-------
:obj:`numpy.ndarray`
2D array representing image.
"""
if self._imgcache is not None and imgspec in self._imgcache:
return self._imgcache[imgspec]
if imgspec == IS.NOCHANGE:
return self.img
cuts = imgspec['cuts']
scale = imgspec['scale']
blur = imgspec['blur']
roi = imgspec['roi']
if cuts != -1 or blur != -1 or scale != -1:
img = self.img.copy()
else:
img = self.img
if cuts != -1:
img[np.logical_or(img<cuts[0], img>cuts[1])] = 0
if blur!=-1: img = gaussian_filter(img, sigma=blur)
if scale != -1:
img *= scale
if roi!=-1: img = roi._selectArea(img)
img.flags.writeable = False
if self._imgcache is not None: self._imgcache[imgspec] = img
return img
def _getISFromArgs(self, *args, **kwargs):
imgspec = None
if len(args) == 0:
if len(kwargs) == 0:
imgspec = self.default_imgspec # Called without args so pass default
else:
imgspec = IS(self.default_imgspec, **kwargs) # No ImageSpec object passed so create it
elif len(args)==1: # ImageSpec object passed, use it
if isinstance(args[0],IS):
if len(kwargs) != 0:
warnings.warn('If ImageSpec object is passed, keyword args are ignored.')
imgspec = args[0]
elif isinstance(args[0],dict): imgspec = IS(**args[0])
else:
raise ValueError('Only an ImageSpec object can be passed as a positional argument.')
else:
raise ValueError('Only one ImageSpec object can be passed as a positional argument.')
return imgspec
[docs] def getImg(self, *args, **kwargs):
"""
Get 2D array of intensity values associated with scan, optionally
pre-processed.
Parameters
----------
cuts : :obj:`tuple`, optional
Pair of values (l,h) where any pixel values with intensity <l or
>h are set to 0.
scale : :obj:`float`, optional
Number to scale all intensity values in image by.
blur : :obj:`int`, optional
Kernel size of gaussian blur to apply to image.
roi : :obj:`.core.ROI`, optional
Region of interest to which to restrict image. The rest of the image
will be masked off.
Returns
-------
:obj:`numpy.ndarray`
2D array of intensity values representing image.
"""
imgspec = self._getISFromArgs(*args, **kwargs)
return self.getImgFromSpecs(imgspec)
[docs] def mod(self, *args, **kwargs):
"""
Return scan with a different default imgspec.
Parameters
----------
cuts : :obj:`tuple`, optional
Pair of values (l,h) where any pixel values with intensity <l or
>h are set to 0.
scale : :obj:`float`, optional
Number to scale all intensity values in image by.
blur : :obj:`int`, optional
Kernel size of gaussian blur to apply to image.
roi : :obj:`.core.ROI`, optional
Region of interest to which to restrict image. The rest of the image
will be masked off.
Returns
-------
:obj:`Scan`
Scan object with given default imgspec.
"""
imgspec = self._getISFromArgs(*args, **kwargs)
return Scan(self.img, name=self.name, imgspec=imgspec,
meta=self.meta, cache=self._imgcache)
[docs] def loadImageFromFile(fpath):
"""Load an image from a file.
Parameters
----------
fpath : :obj:`pathlib.Path`, :obj:`str`
Path of image file (either TIF or NPY).
Returns:
`numpy.ndarray`: 2D image data array.
"""
fpath = Path(fpath) # Convert to Path in case passed as str
img = None
if fpath.suffix.lower() == '.tif' or fpath.suffix.lower() == '.tiff':
with open(fpath) as f:
img = np.array(Image.open(fpath), dtype=np.float32)
#img = np.array(Image.open(fpath))
img = np.swapaxes(img, 0, 1)
return img
elif fpath.suffix.lower() == '.npy': # Only for loading .npy files exported by Scan
return np.load(fpath)
else:
raise ValueError(f'Cannot load Scan from "{fpath.suffix}" file. Only TIF and NPY supported.')
[docs] def loadFromPath(fpath):
"""
Load scan data from file.
Parameters
----------
fpath : :obj:`pathlib.Path`, :obj:`str`
Path to file containing image data.
Returns:
:obj:`Scan`
Scan object containing data from file.
"""
fpath = Path(fpath) # Convert to Path object in case path is string
s = Scan(Scan.loadImageFromFile(fpath),name=fpath.name)
s.meta['fpath'] = fpath
return s
[docs] def saveToPath(self, fpath):
"""
Save scan to file.
Parameters
----------
fpath : :obj:`pathlib.Path`, :obj:`str`
Path of file to which to save scan.
"""
fpath = Path(fpath) # Convert to Path in case passed as str
if fpath.suffix.lower() == '.tif' or fpath.suffix.lower() == '.tiff':
Image.fromarray(self._img).save(fpath)
elif fpath.suffix.lower() == '.npy':
return np.save(fpath, self._img, allow_pickle=False)
else:
raise ValueError(f'Cannot save Scan to "{fpath.suffix.upper()}" file. Only TIF and NPY supported.')
[docs] def count(self, roi=None):
"""Count total intensity within ROI.
Parameters
----------
roi : :obj:`.core.roi.ROI`, optional
Region of Interest to restrict count to. If not given, region is
considered to be entire scan.
Returns
:obj:`int`
Total intensity summed over region.
"""
if roi:
img = self.getImg(roi=roi)
else:
img = self.getImg()
return np.sum(img)
[docs]class ImageSpec(dict):
"""
Class representing specifications applied to scan image.
"""
NOCUTS = -1 # (0,100000)
NOSCALE = -1
NOBLUR = -1
NOROI = -1
[docs] def __init__(self,
basespec=None,
cuts=None,
scale=None,
blur=None,
roi=None):
"""
Parameters
----------
basespec : :obj:`ImageSpec`
:obj:`ImageSpec` to use as default values.
cuts : :obj:`tuple`, optional
Pair of values (l,h) where any pixel values with intensity <l or
>h are set to 0.
scale : :obj:`float`, optional
Number to scale all intensity values in image by.
blur : :obj:`int`, optional
Kernel size of gaussian blur to apply to image.
roi : :obj:`.core.ROI`, optional
Region of interest to which to restrict image. The rest of the image
will be masked off.
"""
default = basespec if basespec is not None else ImageSpec.NOCHANGE
self['cuts'] = cuts if cuts is not None else default['cuts']
self['scale'] = scale if scale is not None else default['scale']
self['blur'] = blur if blur is not None else default['blur']
self['roi'] = roi if roi is not None else default['roi']
def __hash__(self):
return hash(tuple(sorted(self.items())))
def __eq__(self, other):
if not isinstance(other, ImageSpec) and isinstance(other, dict):
other = ImageSpec(**other)
return hash(self)==hash(other)
[docs] @classmethod
@property
def NOCHANGE(cls):
"""An ImageSpec that signifies no change to be made to an image."""
return ImageSpec({}, cuts=ImageSpec.NOCUTS, blur=ImageSpec.NOBLUR,
scale=ImageSpec.NOSCALE, roi=ImageSpec.NOROI)
IS = ImageSpec
"""Alias for ImageSpec."""
[docs]class ScanSet(DataItemSet, Loadable):
""":obj:`.core.item.DataItemSet` for :obj:`Scan` objects."""
TYPE_NAME = "Scan Set"
LOAD_PATH_TYPE = PathType.DIR
DEFAULT_IMGDIMS = (100,100)
def __init__(self, scans=None, name=None, imgdims=None, selection_default=True):
self.name = name if name is not None else "scanset"
self.imgdims = imgdims
DataItemSet.__init__(self, items=scans, selection_default=selection_default)
@property
def dims(self):
return self.imgdims
[docs] def loadFromPath(dpath, calibinfopath=None):
dpath = Path(dpath)
fpaths = [fpath for fpath in dpath.glob('*')
if fpath.suffix.lstrip('.').lower() in Scan.LOAD_FILE_EXTENTIONS]
fpaths = sorted(fpaths, key=lambda x: int(get_trailing_number(x.stem,default=0)))
scans = [Scan.loadFromPath(fpath) for fpath in fpaths]
ss = ScanSet(scans, name=dpath.stem)
if calibinfopath is not None:
ss.addCalibRunInfo(CalibRunInfo(calibinfopath))
else:
allfiles = dpath.glob("*")
cri = None
for p in allfiles:
ext = p.suffix.lstrip('.')
if ext.isnumeric():
try:
cri = CalibRunInfo(p)
except:
...
if cri is not None:
ss.addCalibRunInfo(cri)
return ss
[docs] def add(self, scan):
"""Add Scan to set.
See :obj:`.core.item.DataItemSet.add`.
"""
if self.imgdims:
if scan.dims != self.imgdims:
raise ValueError(f'All scans in set must have dimensions {imgdims}. Scan has dimensions {s.dims}.')
else:
self.imgdims = scan.dims
DataItemSet.add(self, scan)
[docs] def composite(self, imgkwargs=None):
"""Create scan by adding images from all selected scans in set.
Parameters
----------
imgkwargs : :obj:`ImageSpec`
Specifications to apply to image extracted from each
:obj:`Scan`.
Returns:
:obj:`Scan`
Scan object representing combined image.
"""
selectedscans = self.getSelectedScans()
if len(selectedscans)==0:
logger.debug("No scans selected.")
dims = self.imgdims if self.imgdims is not None else self.DEFAULT_IMGDIMS
return Scan(np.zeros(dims))
else:
if not imgkwargs: imgkwargs = {}
selectedscans = iter(selectedscans)
i = next(selectedscans).getImg(**imgkwargs).copy()
for s in selectedscans:
i += s.getImg(**imgkwargs)
return Scan(i)
[docs] def addCalibRunInfo(self, runinfo):
"""Populate metadata of scans with calibration run information (incident
energy, intensity, etc).
Parameters
----------
runinfo : :obj:`CalibRunInfo`
Calibration run information.
"""
if len(self) != len(runinfo.energies):
raise ValueError("Number of scans in calibration run information" \
f" file ({len(runinfo.energies)})does not match number of scans in" \
f" scan set ({len(self)}).")
else:
for s, e, in zip(self, runinfo.energies):
s.meta['IncidentEnergy'] = e
[docs]class CalibRunInfo:
"""
Class that represents the settings used during the collection of a set of
calibration scans.
Attributes
----------
energies : :obj:`list`
List of energies scans were taken at.
"""
[docs] def __init__(self, fpath):
"""
Parameters
----------
fpath : :obj:`pathlib.Path`, :obj:`str`
Path of file containing information.
"""
self._table = CalibRunInfo._readDataTable(fpath)
self._table.rename(
columns={'Mono Energy (alt) *':'Energy'},
inplace = True)
@property
def energies(self):
"""Get list of energies used to take scans"""
return self._table['Energy'].values
[docs] def getEnergy(self, i):
"""
Get energy used to take scan by scan number.
Parameters
----------
i : :obj:`int`
Index of scan (0 for first scan, 1 for second, etc).
Returns:
:obj:`float`
Energy of scan
"""
return self.energies[i]
[docs] def getIncidentIntensity(self, i):
"""
Get incident intensity that a scan was taken with.
Parameters
----------
i : :obj:`int`, :obj:`float`
Scan number or scan energy.
Returns
-------
:obj:`float`
Incident x-ray intensity scan was taken with.
"""
if i in self.energies:
return self._table.loc[self._table['Energy']==i, 'I0'].values[0]
else:
return self._table['I0'][i]
def _readDataTable(fpath):
"""
Function that reads settings file from run and loads data on each scan into
a Pandas table with appropriate column headers.
Parameters
----------
fpath : :obj:`pathlib.Path`, :obj:`str`
Path of file.
Returns
-------
`pandas.DataFrame`
Table containing data from file.
"""
fpath = Path(fpath) # In case string was passed
lines = fpath.read_text().splitlines()
# Find index of heading block
indices = [i for i in range(len(lines))
if lines[i].endswith('Here is a readable list of column headings:')]
if len(indices) == 0:
raise ValueError("Could not find heading block in calibration info file.")
else:
hblockstart = indices[0]+1
headings = {}
for l in lines[hblockstart:]: # Looking at lines starting at beginning of heading block
l = l.lstrip('#') # Remove comment markers
# Strings are formatted like:
# 1) headingA 3) headingC
# 2) headingB 4) headingD
hnums = [int(s.rstrip(')')) for s in re.findall(r'\d+\)', l)] # Split at the numbers
hs = [s for s in re.split(r'\d+\) ', l) if not s.isspace()]
if len(hs)==0: # Quit if strings no longer formatted like above
break
else:
for n, h in zip(hnums, hs):
headings[n] = h.lstrip().rstrip() # Strip whitespace and associate heading with number
headings = [headings[i] for i in range(1,len(headings)+1)] # Turn into list in-order
# Read table and use extracted heading names
data = pd.read_csv(fpath, delim_whitespace=True, comment='#',
names=headings)
return data