Source code for detectree.utils

"""detectree general utility functions."""

import datetime as dt
import itertools
import logging as lg
import os
import sys
import unicodedata
from os import path

import numpy as np
import rasterio as rio
from rasterio import windows
from tqdm import tqdm

from . import settings

__all__ = [
    "split_into_tiles",
    "img_rgb_from_filepath",
    "get_img_filepaths",
    "log",
    "get_logger",
]


# See https://bit.ly/2KkELpI
[docs] def split_into_tiles( input_filepath, output_dir, *, tile_width=None, tile_height=None, output_filename=None, only_full_tiles=False, keep_empty_tiles=False, custom_meta=None, ): """ Split the image of `input_filepath` into tiles. Parameters ---------- input_filepath : str, file object or pathlib.Path object Path to a file, URI, file object opened in binary ('rb') mode, or a Path object representing the image to be classified. The value will be passed to `rasterio.open` output_dir : str or pathlib.Path object Path to the directory where the predicted images are to be dumped. tile_width : int, optional Tile width in pixels. If no value is provided, the value set in `settings.TILE_WIDTH` is used. tile_height : int, optional Tile height in pixels. If no value is provided, the value set in `settings.TILE_HEIGHT` is used. output_filename : str, optional Template to be string-formatted in order to name the output tiles. If no value is provided, the value set in `settings.TILE_OUTPUT_FILENAME` is used. only_full_tiles : bool, optional (default False) Whether only full tiles (of size `tile_width`x`tile_height`) should be dumped. keep_empty_tiles : bool, optional (default False) Whether tiles containing only no-data pixels should be dumped. custom_meta : dict, optional Custom meta data for the output tiles. Returns ------- output_filepaths : list List of the file paths of the dumped tiles. """ if tile_width is None: tile_width = settings.TILE_WIDTH if tile_height is None: tile_height = settings.TILE_HEIGHT if output_filename is None: output_filename = settings.TILE_OUTPUT_FILENAME output_filepaths = [] with rio.open(input_filepath) as src: meta = src.meta.copy() if custom_meta is not None: meta.update(custom_meta) def _get_window_transform(width, height): num_rows, num_cols = src.meta["height"], src.meta["width"] offsets = itertools.product( range(0, num_cols, width), range(0, num_rows, height) ) big_window = windows.Window( col_off=0, row_off=0, width=num_cols, height=num_rows ) for col_off, row_off in offsets: window = windows.Window( col_off=col_off, row_off=row_off, width=width, height=height, ).intersection(big_window) transform = windows.transform(window, src.transform) yield window, transform iterator = _get_window_transform(tile_width, tile_height) if tqdm is not None: iterator = tqdm(iterator) # tests whether a given tile should be dumped or not. Since there are two # possible tests that depend on the arguments provided by the user, we will use # a list of tests and then check whether any test must be applied. This # mechanism avoids having to check whether tests must be applied at each # iteration (see the if/else at the end of this function). tests = [] if only_full_tiles: def test_full_tile(window): return window.width == tile_width and window.height == tile_height tests.append(test_full_tile) if not keep_empty_tiles: def test_empty_tile(window): return np.any(src.dataset_mask(window=window)) tests.append(test_empty_tile) def inner_loop(window, transform): meta["transform"] = transform meta["width"], meta["height"] = window.width, window.height output_filepath = path.join( output_dir, output_filename.format(int(window.col_off), int(window.row_off)), ) with rio.open(output_filepath, "w", **meta) as dst: dst.write(src.read(window=window)) log(f"Dumped tile to {output_filepath}") output_filepaths.append(output_filepath) if tests: for window, transform in iterator: if all(test(window) for test in tests): inner_loop(window, transform) else: for window, transform in iterator: inner_loop(window, transform) return output_filepaths
def img_rgb_from_filepath(img_filepath): """ Read an RGB image file into a 3-D array. See the `background <https://bit.ly/2KlCICO>`_ example notebook for more details. Parameters ---------- img_filepath : str, file object or pathlib.Path object Path to a file, URI, file object opened in binary ('rb') mode, or a Path object representing the image for which a GIST descriptor will be computed. The value will be passed to `rasterio.open`. Returns ------- img_rgb : numpy.ndarray 3-D array with the RGB image. """ with rio.open(img_filepath) as src: arr = src.read() return np.rollaxis(arr[:3], 0, 3) # non-image utils def get_img_filepaths(split_df, img_cluster, train): """ Get image filepaths from a train/test split data frame. Parameters ---------- split_df : pandas DataFrame Data frame with the train/test split. img_cluster : int The label of the cluster of tiles. train : bool Whether the list of training (True) or testing (False) tiles must be returned. Returns ------- img_filepaths : pandas Series List of paths to image files. """ if train: train_cond = split_df["train"] else: train_cond = ~split_df["train"] try: return split_df[train_cond & (split_df["img_cluster"] == img_cluster)][ "img_filepath" ] except KeyError: raise ValueError( "If `method` is 'cluster-II', `split_df` must have a " "'img_cluster' column" ) # logging (from https://github.com/gboeing/osmnx/blob/master/osmnx/utils.py) def log(message, *, level=None, name=None, filename=None): """ Write a message to the log file and/or print to the the console. Parameters ---------- message : string the content of the message to log. level : int one of the logger.level constants. name : string name of the logger. filename : string name of the log file. """ if level is None: level = settings.log_level if name is None: name = settings.log_name if filename is None: filename = settings.log_filename # if logging to file is turned on if settings.log_file: # get the current logger (or create a new one, if none), then log message at # requested level logger = get_logger(level=level, name=name, filename=filename) if level == lg.DEBUG: logger.debug(message) elif level == lg.INFO: logger.info(message) elif level == lg.WARNING: logger.warning(message) elif level == lg.ERROR: logger.error(message) # if logging to console is turned on, convert message to ascii and print to the # console if settings.log_console: # capture current stdout, then switch it to the console, print the message, then # switch back to what had been the stdout. this prevents logging to notebook - # instead, it goes to console standard_out = sys.stdout sys.stdout = sys.__stdout__ # convert message to ascii for console display so it doesn't break windows # terminals message = ( unicodedata.normalize("NFKD", str(message)) .encode("ascii", errors="replace") .decode() ) print(message) sys.stdout = standard_out def get_logger(*, level=None, name=None, filename=None): """ Create a logger or return the current one if already instantiated. Parameters ---------- level : int one of the logger.level constants. name : string name of the logger. filename : string name of the log file. Returns ------- logger.logger """ if level is None: level = settings.log_level if name is None: name = settings.log_name if filename is None: filename = settings.log_filename logger = lg.getLogger(name) # if a logger with this name is not already set up if not getattr(logger, "handler_set", None): # get today's date and construct a log filename todays_date = dt.datetime.today().strftime("%Y_%m_%d") log_filename = path.join( settings.logs_folder, "{}_{}.log".format(filename, todays_date) ) # if the logs folder does not already exist, create it if not path.exists(settings.logs_folder): os.makedirs(settings.logs_folder) # create file handler and log formatter and set them up handler = lg.FileHandler(log_filename, encoding="utf-8") formatter = lg.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(level) logger.handler_set = True return logger