Source code for desiutil.io

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
===========
desiutil.io
===========

Module for I/O related code.
"""
import os
import stat
import warnings
from contextlib import contextmanager
import numpy as np
from astropy.table import Table


[docs]def combine_dicts(dict1, dict2): """Combine two :class:`dict` objects into one, respecting common keys. If `dict1` and `dict2` both have key ``a``, then ``dict1[a]`` and ``dict2[a]`` must both be dictionaries to recursively merge. Parameters ---------- dict1 : :class:`dict` First dictionary. dict2 : :class:`dict` Second dictionary. Returns ------- :class:`dict` The combined dictionary. Raises ------ ValueError If the values for a common key are not both :class:`dict`. """ output = {} cdict2 = dict2.copy() for item, value in dict1.items(): if item in cdict2: if (not isinstance(cdict2[item], dict)) or (not isinstance(dict1[item], dict)): raise ValueError("Overlapping leafs must both be dicts") try: output[item] = combine_dicts(value, cdict2.pop(item)) except AttributeError: raise AttributeError("Cannot mix dicts with scalar and dict on the same key") else: output[item] = value for item, value in cdict2.items(): output[item] = value return output
[docs]def yamlify(obj, debug=False): """Recursively process an object so it can be serialised for yaml. Based on jsonify in `linetools <https://pypi.python.org/pypi/linetools>`_. Note: All string-like keys in :class:`dict` s are converted to :class:`str`. Parameters ---------- obj : :class:`object` Any object. debug : :class:`bool`, optional Print extra information if requested. Returns ------- :class:`object` An object suitable for yaml serialization. For example :class:`numpy.ndarray` is converted to :class:`list`, :class:`numpy.int64` is converted to :class:`int`, etc. """ if isinstance(obj, (np.float64, np.float32)): obj = float(obj) elif isinstance(obj, (np.int32, np.int64, np.int16)): obj = int(obj) elif isinstance(obj, np.bool_): obj = bool(obj) elif isinstance(obj, (np.string_, str)): obj = str(obj) # elif isinstance(obj, Quantity): # obj = dict(value=obj.value, unit=obj.unit.to_string()) elif isinstance(obj, np.ndarray): # Must come after Quantity obj = obj.tolist() elif isinstance(obj, dict): # First convert keys nobj = {} for key, value in obj.items(): nobj[str(key)] = value # Now recursive obj = nobj for key, value in obj.items(): obj[key] = yamlify(value, debug=debug) elif isinstance(obj, list): for i, item in enumerate(obj): obj[i] = yamlify(item, debug=debug) elif isinstance(obj, tuple): obj = list(obj) for i, item in enumerate(obj): obj[i] = yamlify(item, debug=debug) obj = tuple(obj) # elif isinstance(obj, Unit): # obj = obj.name # elif obj is u.dimensionless_unscaled: # obj = 'dimensionless_unit' if debug: print(type(obj)) return obj
[docs]def _dtype_size(dtype): '''Parse `dtype` to find its size. For example, ``<U14`` returns 14. Parameters ---------- dtype : :class:`numpy.dtype` Dtype object. Returns ------- :class:`int` The size of the type. Notes ----- This is different from ``dtype.itemsize``, which is number of bytes. ''' i = dtype.str.find(dtype.kind) return int(dtype.str[i+1:])
[docs]def _pick_encoding(table, encoding): '''Pick which encoding to use; giving warning if options are in conflict. Parameters ---------- table : :class:`astropy.table.Table` Table object. encoding : :class:`str` Encoding to use. If ``None``, use ``table.meta['ENCODING']``. Returns ------- :class:`str` The chosen encoding. Raises ------ UnicodeError If no enoding could be found at all. Notes ----- `encoding` trumps ```table.meta['ENCODING']``. ''' if encoding is None: if 'ENCODING' in table.meta: encoding = table.meta['ENCODING'] else: raise UnicodeError('No encoding given as argument or in table metadata') elif 'ENCODING' in table.meta and table.meta['ENCODING'] != encoding: message = """data.metadata['ENCODING']=='{}' does not match option '{}'; use encoding=None to use data.metadata['ENCODING'] instead""".format(table.meta['ENCODING'], encoding) warnings.warn(message) return encoding
[docs]def encode_table(data, encoding='ascii'): '''Encode unicode strings in a table into bytes using ``numpy.char.encode``. Parameters ---------- data : numpy structured array or :class:`~astropy.table.Table` Data for conversion. encoding : :class:`str`, optional Encoding to use for converting unicode to bytes; default 'ascii' (FITS and HDF5 friendly); if ``None``, try ``ENCODING`` keyword in `data` instead. Returns ------- :class:`~astropy.table.Table` Table with unicode columns converted to bytes. Raises ------ UnicodeEncodeError If any input strings cannot be encoded using the specified encoding. UnicodeError If no encoding is given as argument or in table metadata. Notes ----- `encoding` option overides ``data.meta['ENCODING']``; use ``encoding=None`` to use ``data.meta['ENCODING']`` instead. ''' try: table = Table(data, copy=False) except ValueError: # https://github.com/astropy/astropy/issues/5298 table = Table(data, copy=True) encoding = _pick_encoding(table, encoding) for col in table.colnames: dtype = table[col].dtype if dtype.kind == 'U': Sn = 'S{}'.format(_dtype_size(dtype)) table.replace_column(col, np.char.encode(table[col], encoding=encoding).astype(Sn)) table.meta['ENCODING'] = encoding return table
[docs]def decode_table(data, encoding='ascii', native=True): '''Decode byte strings in a table into unicode strings. Parameters ---------- data : numpy structured array or :class:`~astropy.table.Table` Data for conversion. encoding : :class:`str`, optional Encoding to use for converting bytes into unicode; default 'ascii'; if ``None``, try ``ENCODING`` keyword in `data` instead. native : :class:`bool`, optional If `True` (default), only decode if native str type is unicode (*i.e.* python3 but not python2) Returns ------- :class:`~astropy.table.Table` Decoded data. Notes ----- `encoding` option overides ``data.meta['ENCODING']``; use ``encoding=None`` to use ``data.meta['ENCODING']`` instead. ''' try: table = Table(data, copy=False) except ValueError: # https://github.com/astropy/astropy/issues/5298 table = Table(data, copy=True) # Check if native str type is bytes if native and np.str_('a').dtype.kind == 'S': return table encoding = _pick_encoding(table, encoding) for col in table.colnames: dtype = table[col].dtype if dtype.kind == 'S': Un = 'U{}'.format(_dtype_size(dtype)) table.replace_column(col, np.char.decode(table[col], encoding=encoding).astype(Un)) table.meta['ENCODING'] = encoding return table
[docs]@contextmanager def unlock_file(*args, **kwargs): """Unlock a read-only file, return a file-like object, and restore the read-only state when done. Arguments are the same as :func:`open`. Returns ------- file-like A file-like object, as returned by :func:`open`. Notes ----- * This assumes that the user of this function is also the owner of the file. :func:`os.chmod` would not be expected to work in any other circumstance. * Technically, this restores the *original* permissions of the file, it does not care what the original permissions were. * If the named file does not exist, this function effectively does not attempt to guess what the final permissions of the file would be. In other words, it just does whatever :func:`open` would do. In this case it is the user's responsibilty to change permissions as needed after creating the file. Examples -------- >>> with unlock_file('read-only.txt', 'w') as f: ... f.write(new_data) """ w = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH # # Get original permissions, unlock permissions # # uid = os.getuid() old_mode = None if os.path.exists(args[0]): old_mode = stat.S_IMODE(os.stat(args[0]).st_mode) os.chmod(args[0], old_mode | stat.S_IWUSR) f = open(*args, **kwargs) try: yield f finally: # # Restore permissions to read-only state. # f.close() if old_mode is None: old_mode = stat.S_IMODE(os.stat(args[0]).st_mode) os.chmod(args[0], old_mode & ~w)