Source code for desiutil.census

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
===============
desiutil.census
===============

Determine the number of files and size in DESI data file systems.

As of 2022, this module is deprecated, and the command-line interface,
:command:`desi_data_census` has been removed.  It was superseded by NERSC-provided
metadata files that are much faster to parse.

Notes
-----

* Directories to check:

  - Imaging raw & reduced.
  - spectro raw & reduced.
  - Work directories.
  - Non-Footprint image data.

* Check group id, readability.
* Count number of files and size.
* Extract year from mtime. Shift to fiscal year.  FY starts in October.
* Don't record filenames, just high-level directories.
* Treat projecta as same system, follow symlinks to projecta
* If a symlink is followed to another filesystem, :func:`os.walk` can't get back
  to the original filesystem.
* Symlinks to another subdirectory should only count as the symlink.  The
  file itself belongs to the other subdirectory.
* Physical directories count toward inode and size total.
"""


[docs]class ScannedFile(object): """Simple object to store results of a file scan. Attributes ---------- filename : :class:`str` Name of the file. size : :class:`int` Size in bytes of the file. year : :class:`int` Year the file was modified. islink : :class:`bool` Is the file a symbolic link? isexternal : :class:`bool` If the file is a symbolic link, does it link outside the tree being scanned? linkname : :class:`str` If the file is a symbolic link, it points to this file. linksize : :class:`int` If the file is a symbolic link, this is the size of the link *itself*, and the size attribute is the size of the file it *points to*. linkyear : :class:`int` Year the link *itself* was modified. """ def __init__(self, filename, size, year): self.filename = filename self.size = size self.year = year self.islink = False self.isexternal = False self.linkname = None self.linksize = None self.linkyear = None return
[docs]def get_options(test_args=None): """Parse command-line options. Parameters ---------- test_args : :class:`list` Override command-line arguments for testing purposes. Returns ------- :class:`argparse.Namespace` A simple object containing the parsed options. """ from sys import argv from os.path import basename from argparse import ArgumentParser from pkg_resources import resource_filename parser = ArgumentParser(description="Count number and size of DESI data files.", prog=basename(argv[0])) parser.add_argument('-c', '--config-file', action='store', dest='config', metavar='FILE', default=resource_filename('desiutil', 'data/census.yaml'), help="Read configuration from FILE (default %(default)s).") parser.add_argument('-o', '--output', action='store', metavar='FILE', default='desi_data_census.csv', help="Output CSV file (default ./%(default)s).") parser.add_argument('-r', '--raw-output', action='store', dest='raw', metavar='FILE', default='desi_data_census.yml', help="Dump raw data to FILE (default ./%(default)s)") parser.add_argument('-v', '--verbose', action='store_true', help="Print lots of extra information.") if test_args is None: # pragma: no cover options = parser.parse_args() else: options = parser.parse_args(test_args) return options
[docs]def walk_error(e): """Handle errors reported by :func:`os.walk`. Parameters ---------- e : :class:`OSError` The exception reported. """ from .log import get_logger log = get_logger() log.error(str(e)) return
[docs]def year(mtime, fy=True): """Convert a file's modification time into a year. Parameters ---------- mtime : :class:`int` or :class:`float` File modification time as reported by :func:`os.stat`. fy : :class:`bool`, optional If ``True`` use Fiscal Year (FY) instead of calendar year. FY is defined to begin 1 October. Returns ------- :class:`int` The year to which a file belongs. """ from time import gmtime tm = gmtime(mtime) if fy and tm.tm_mon >= 10: return tm.tm_year + 1 return tm.tm_year
[docs]def scan_directories(conf, data): """Scan the directories specified by the configuration file. Parameters ---------- conf : :class:`dict` The configuration that applies to all directories. data : :class:`list` The specific directories to scan. Returns ------- :class:`list` A list containing data structures summarizing data found. """ from collections import OrderedDict from os import walk from os.path import basename, dirname, isdir, join from .log import get_logger log = get_logger() summary = list() for d in data: subdirs = list() dir_summary = OrderedDict() dir_summary[d['root']] = dict() log.debug('root = {root}'.format(**d)) log.debug('category = {category}'.format(**d)) log.debug('description = {description}'.format(**d)) log.debug('group = {group}'.format(**d)) if 'subdirs' in d: for sd in d['subdirs']: fsd = join(d['root'], sd['root']) subdirs.append(fsd) log.debug('subdir = {0}'.format(fsd)) log.debug('description = {description}'.format(**sd)) dir_summary[fsd] = dict() auxilliary_links = dict() for dirpath, dirnames, filenames in walk(d['root'], topdown=True, onerror=walk_error, followlinks=False): sum_files, ext = scan_directory(dirpath, dirnames, filenames, conf['gid'][d['group']]) for y in sum_files: try: dir_summary[d['root']][y]['number'] += sum_files[y]['number'] dir_summary[d['root']][y]['size'] += sum_files[y]['size'] except KeyError: dir_summary[d['root']][y] = {'number': sum_files[y]['number'], 'size': sum_files[y]['size']} for fsd in subdirs: if in_path(fsd, dirpath): try: dir_summary[fsd][y]['number'] += sum_files[y]['number'] dir_summary[fsd][y]['size'] += sum_files[y]['size'] except KeyError: dir_summary[fsd][y] = {'number': sum_files[y]['number'], 'size': sum_files[y]['size']} for key in ext: log.debug("External link detected: {0} -> {1}.".format(key, ext[key])) for primary, aux in conf['filesystems'].items(): log.debug("primary = {0}; aux = {1}".format(primary, aux)) for k in subdirs + [d['root']]: if ext[key].startswith(aux) and in_path(k, ext[key].replace(aux, primary)): log.info("Found link to auxilliary filesystem: {0} -> {1}.".format(key, ext[key])) if isdir(key): log.info("Found link to directory on auxilliary filesystem: {0} -> {1}. Data belongs to {2}.".format(key, ext[key], k)) if ext[key] in auxilliary_links: auxilliary_links[ext[key]].append(k) else: auxilliary_links[ext[key]] = [k] else: log.info("Found link to single file on auxilliary filesystem: {0} -> {1}. Data belongs to {2}.".format(key, ext[key], k)) f = scan_file(dirname(ext[key]), basename(ext[key]), conf['gid'][d['group']]) try: dir_summary[k][f.year]['number'] += 1 dir_summary[k][f.year]['size'] += f.size except KeyError: dir_summary[k][f.year] = {'number': 1, 'size': f.size} if ext[key] in auxilliary_links: if len(auxilliary_links[ext[key]]) > 2: log.warning("Extraneous auxilliary links found for {0} -> {1}.".format(key, ext[key])) if auxilliary_links[ext[key]][1] != d['root']: log.warning("Malformed auxilliary link found for {0} -> {1}.".format(key, ext[key])) # # Check auxilliary links *after* the first os.walk() has completed. # for aux_root in auxilliary_links: aux_fsd = auxilliary_links[aux_root][0] for aux_dirpath, aux_dirnames, aux_filenames in walk(aux_root, topdown=True, onerror=walk_error, followlinks=False): sum_files, ext = scan_directory(aux_dirpath, aux_dirnames, aux_filenames, conf['gid'][d['group']]) for aux_y in sum_files: try: dir_summary[d['root']][aux_y]['number'] += sum_files[aux_y]['number'] dir_summary[d['root']][aux_y]['size'] += sum_files[aux_y]['size'] except KeyError: dir_summary[d['root']][aux_y] = {'number': sum_files[aux_y]['number'], 'size': sum_files[aux_y]['size']} try: dir_summary[aux_fsd][aux_y]['number'] += sum_files[aux_y]['number'] dir_summary[aux_fsd][aux_y]['size'] += sum_files[aux_y]['size'] except KeyError: dir_summary[aux_fsd][aux_y] = {'number': sum_files[aux_y]['number'], 'size': sum_files[aux_y]['size']} summary.append(dir_summary) return summary
[docs]def scan_directory(dirpath, dirnames, filenames, gid): """Count number and size of files in a single directory hierarchy. Parameters ---------- dirpath : :class:`str` Current directory, returned by :func:`os.walk`. dirnames : :class:`list` List of directories in `dirpath`. filenames : :class:`list` List of files in `dirpath`. gid : :class:`int` Group ID number that should be associated with this directory. Returns ------- :func:`tuple` A tuple containing two dictionaries: the summary results organized by year, and a summary of links to external directories. """ from .log import get_logger log = get_logger() summary = dict() external = dict() log.debug("dirpath = {0}".format(dirpath)) for k in dirnames + filenames: f = scan_file(dirpath, k, gid) if f.isexternal: external[f.filename] = f.linkname else: if f.year in summary: summary[f.year]['number'] += 1 summary[f.year]['size'] += f.size else: summary[f.year] = {'number': 1, 'size': f.size} if f.islink: if f.linkyear in summary: summary[f.linkyear]['number'] += 1 summary[f.linkyear]['size'] += f.linksize else: summary[f.linkyear] = {'number': 1, 'size': f.linksize} return (summary, external)
[docs]def scan_file(dirpath, filename, gid): """Analyze a single file or directory. Parameters ---------- dirpath : :class:`str` Current directory, returned by :func:`os.walk`. filename : :class:`str` Base name of current file. gid : :class:`int` Group ID number that should be associated with this directory. Returns ------- :class:`ScannedFile` A simple object containing the metadata relating to the file. """ from os import lstat, stat from os.path import islink, join, realpath from .log import get_logger log = get_logger() fd = join(dirpath, filename) log.debug("os.stat('{0}')".format(fd)) s = stat(fd) if s.st_gid != gid: log.warning("{0} does not have correct group id!".format(fd)) f = ScannedFile(fd, s.st_size, year(s.st_mtime)) if islink(fd): f.islink = True log.debug("os.lstat('{0}')".format(fd)) s = lstat(fd) if s.st_gid != gid: log.warning("{0} does not have correct group id!".format(fd)) f.linkname = realpath(fd) f.linksize = s.st_size f.linkyear = year(s.st_mtime) if in_path(dirpath, f.linkname): log.debug("Found internal link {0.filename} -> {0.linkname}.".format(f)) else: f.isexternal = True log.debug("Found external link {0.filename} -> {0.linkname}.".format(f)) return f
[docs]def in_path(root, path): """Check if `path` is in the same directory hierarchy as `root`. Parameters ---------- root : :class:`str` Root directory. path : :class:`str` Filename, could be a file or a directory. Returns ------- :class:`bool` ``True`` if `path` is in `root`. """ from os.path import commonpath return commonpath([root, path]).startswith(root)
[docs]def output_csv(summary, filename): """Convert data into CSV file. Parameters ---------- summary : :class:`list` A data structure. filename : :class:`str` Name of the file to write to. Returns ------- :class:`list` The data written to the CSV file, as a list of rows. """ import csv directories = list() years = set() for s in summary: for root in s: directories.append(root) years.update(set(s[root].keys())) number = dict() size = dict() for d in directories: number[d] = dict() size[d] = dict() for y in years: number[d][y] = 0 size[d][y] = 0 for s in summary: for root in s: for y in sorted(years): try: previous_number = number[root][y-1] except KeyError: previous_number = 0 try: previous_size = size[root][y-1] except KeyError: previous_size = 0 try: this_number = s[root][y]['number'] except KeyError: this_number = 0 try: this_size = s[root][y]['size'] except KeyError: this_size = 0 number[root][y] = this_number + previous_number size[root][y] = this_size + previous_size data = [(['Directory'] + ['FY{0:d} Number'.format(y) for y in sorted(years)] + ['FY{0:d} Size'.format(y) for y in sorted(years)])] for d in directories: data.append([d] + [str(number[d][y]) for y in sorted(years)] + [str(size[d][y]) for y in sorted(years)]) try: csvfile = open(filename, 'w', newline='') except TypeError: # pragma: no cover # Python 2 csvfile = open(filename, 'w') writer = csv.writer(csvfile) writer.writerows(data) csvfile.close() return data
[docs]def main(): """Entry point for the :command:`desi_data_census` script. Returns ------- :class:`int` Exit status that will be passed to :func:`sys.exit`. """ import yaml from .log import get_logger, DEBUG, INFO options = get_options() # # Logging. # if options.verbose: log = get_logger(DEBUG) log.debug("Verbose logging is set.") else: log = get_logger() # # Configuration # log.info("Reading configuration from {0}.".format(options.config)) with open(options.config) as y: config = yaml.safe_load(y) log.debug(repr(config)) summary = scan_directories(config['configuration'], config['data']) with open(options.raw, 'w') as y: yaml.dump(summary, y, default_flow_style=False) data = output_csv(summary, options.output) return 0