Source code for irsa.io

import pickle
from collections import OrderedDict

import numpy as np
import pandas as pd
import torch
from jcamp import jcamp_readfile
from scipy.interpolate import interp1d
from scipy.ndimage.filters import gaussian_filter1d
from torch.utils.data import Dataset

from irsa.networks import PairedNeuralNet


[docs] def load_model(path, model=PairedNeuralNet, **kwargs): """ Load model from path. Parameters ---------- path : str Path to saved model state dict. model : :obj:`~torch.Module` Model class to initialize. kwargs Keyword arguments for model initialization. Returns ------- :obj:`~torch.Module` Initialized model. """ map_location = kwargs.pop("map_location", None) m = model(**kwargs) if map_location: m.load_state_dict(torch.load(path, map_location=map_location)) else: m.load_state_dict(torch.load(path)) return m
[docs] def load_experimental(path): """ Load experimental spectra from path using JCAMP. Parameters ---------- path : str Path to DX file. Returns ------- freq : :obj:`~numpy.array` Array of frequencies. intensity : :obj:`~numpy.array` Array of intensities. """ # Load experimental spectra with JCAMP spec = jcamp_readfile(path) # Frequency freq = spec["x"] # Intensity intensity = spec["y"] # Normalize intensity = (intensity - intensity.min()) / (intensity.max() - intensity.min()) intensity = intensity / intensity.sum() # Data frame df = pd.DataFrame({"frequency": freq, "intensity": intensity}).sort_values( by="intensity", ascending=False ) # Drop duplicates df = ( df.drop_duplicates(subset="frequency") .sort_values(by="frequency") .reset_index(drop=True) ) return df["frequency"].values, df["intensity"].values
[docs] def load_predicted(path): """ Load NWChem-predicted spectra from path using ISiCLE. Parameters ---------- path : str Path to ISiCLE file. Returns ------- freq : :obj:`~numpy.array` Array of frequencies. intensity : :obj:`~numpy.array` Array of intensities. """ # Load predicted spectra with open(path, "rb") as f: spec = pickle.load(f) # Check for completion if "frequency" not in spec: raise ValueError("Frequency not detected in input.") # Frequency freq = np.abs(spec["frequency"]["wavenumber"]) # Intensity intensity = np.abs(spec["frequency"]["intensity"]) return freq.astype(np.float32), intensity.astype(np.float32)
[docs] def preprocess_predicted(freq, intensity, exp_freq, sigma=2): """ Process predicted spectra to broaden, normalize, and map to experimental frequencies. Parameters ---------- freq : :obj:`~numpy.array` Array of predicted frequencies. intensity : :obj:`~numpy.array` Array of predicted intensities. exp_freq : :obj:`~numpy.array` Array of experimental frequencies. sigma : int Sigma value to broaden by Gaussian function. Returns ------- freq : :obj:`~numpy.array` Array of experimental frequencies. intensity : :obj:`~numpy.array` Array of processed intensities at experimental frequencies. """ # Index of intersecting frequencies idx = (freq >= exp_freq.min()) & (freq <= exp_freq.max()) # Truncate freq = freq[idx] intensity = intensity[idx] # Ensure both experimental and predicted points are sampled freq = np.concatenate((freq, exp_freq)) intensity = np.concatenate((intensity, np.zeros_like(exp_freq))) # Ensure monotonic frequencies idx = np.argsort(freq) freq = freq[idx] intensity = intensity[idx] # Apply naive broadening function gauss = gaussian_filter1d(intensity, sigma, mode="constant", cval=0) # Fit 1D spline to broadened spectra spl = interp1d(freq, gauss, kind="linear", bounds_error=False, fill_value=0) # Sample spline at same points as experimental y_test = spl(exp_freq) # Normalize y_test = (y_test - y_test.min()) / (y_test.max() - y_test.min()) return exp_freq.copy().astype(np.float32), (y_test / y_test.sum()).astype( np.float32 )
[docs] class PairedExpPredDataset(Dataset): """ Create true positive & true negative pairs of experiment and predicted spectra. Parameters ---------- exp : :obj:`~numpy.array` Array of experiment values. exp_labels : :obj:`~numpy.array` Array of experiment labels. pred : :obj:`~numpy.array` Array of predicted values. pred_labels : :obj:`~numpy.array` Array of predicted labels. deterministic : bool True for `np.random` shuffling only once at initialization. False for `np.random` shuffling each iteration. """ def __init__(self, exp, exp_labels, pred, pred_labels, deterministic=False): self.exp = torch.from_numpy(exp.astype(np.float32)) self.exp_labels = exp_labels self.pred = torch.from_numpy(pred.astype(np.float32)) self.pred_labels = pred_labels self.deterministic = deterministic # Shape magic if len(self.exp.shape) != len(self.pred.shape): raise ValueError("Shape mismatch between predicted and experimental") # (M, ) to (N, M) where N=1 if len(self.exp.shape) == 1: self.exp = torch.unsqueeze(self.exp, 0) self.pred = torch.unsqueeze(self.pred, 0) # (N, M) to (N, K, M) where K=1 if len(self.exp.shape) == 2: self.exp = torch.unsqueeze(self.exp, 1) self.pred = torch.unsqueeze(self.pred, 1) # Check if shape conversion successful if len(self.exp.shape) != 3: raise ValueError("Unable to coerce correct shape") # Boolean pairwise comparison matrix bmat = self.exp_labels[:, None] == self.pred_labels # Indices of nonmatched pairs self.neg_pairs = np.stack(np.where(bmat == False)).T # Shuffle if determinisitic if self.deterministic is True: np.random.shuffle(self.neg_pairs) # Indices of matched pairs self.pos_pairs = np.stack(np.where(bmat == True)).T # Labels self.pos_label = torch.from_numpy(np.array([1.0], dtype=np.float32)) self.neg_label = torch.from_numpy(np.array([0.0], dtype=np.float32)) def __len__(self): """ Length will be twice the number of matching instances, as half will be different by design. """ return 2 * self.pos_pairs.shape[0]
[docs] def _get_same(self, idx): """ Return same-labeled spectra, label=1. """ # Get matching pair indices idx1, idx2 = self.pos_pairs[idx, :] # Release instance return OrderedDict( [ ("exp_spec", self.exp[idx1]), ("exp_label", self.exp_labels[idx1]), ("pred_spec", self.pred[idx2]), ("pred_label", self.pred_labels[idx2]), ("label", self.pos_label), ] )
[docs] def _get_different(self): """ Return different experimental and predicted data, label=0. As this class is larger, indices will be selected randomly. """ # Get non-matching pair indices idx1, idx2 = self.neg_pairs[np.random.randint(0, len(self.neg_pairs)), :] # Release instance return OrderedDict( [ ("exp_spec", self.exp[idx1]), ("exp_label", self.exp_labels[idx1]), ("pred_spec", self.pred[idx2]), ("pred_label", self.pred_labels[idx2]), ("label", self.neg_label), ] )
[docs] def _get_different_deterministic(self, idx): """ Return same-labeled spectra, label=1. """ # Get matching pair indices idx1, idx2 = self.neg_pairs[idx, :] # Release instance return OrderedDict( [ ("exp_spec", self.exp[idx1]), ("exp_label", self.exp_labels[idx1]), ("pred_spec", self.pred[idx2]), ("pred_label", self.pred_labels[idx2]), ("label", self.neg_label), ] )
def __getitem__(self, idx): """ Alternate same, different data. """ if idx % 2 == 0: return self._get_same(idx // 2) if self.deterministic is True: return self._get_different_deterministic(idx // 2) return self._get_different()
[docs] class ExpPredDataset(Dataset): """ Create pairs for M experiment spectra and N predicted spectra. (M x N pairs) Parameters ---------- exp : :obj:`~numpy.array` Array of experiment values. exp_labels : :obj:`~numpy.array` Array of experiment labels. pred : :obj:`~numpy.array` Array of predicted values. pred_labels : :obj:`~numpy.array` Array of predicted labels. """ def __init__(self, exp, exp_labels, pred, pred_labels): self.exp = torch.from_numpy(exp.astype(np.float32)) self.exp_labels = exp_labels self.pred = torch.from_numpy(pred.astype(np.float32)) self.pred_labels = pred_labels # Validate shape compatibility if len(self.exp.shape) == 1: self.exp = torch.unsqueeze(self.exp, 0) # Reshaping for multi-instance comparison if len(self.exp.shape) == 2: self.exp = torch.unsqueeze(self.exp, 1) if len(self.exp.shape) != 3: raise ValueError("Unable to coerce correct shape for experimental data") if self.exp.shape[-1] != self.pred.shape[-1]: raise ValueError( "Feature dimension mismatch between predicted and experimental data" ) def __len__(self): """ Total length is the product of the number of experimental and predicted instances. """ return self.exp.shape[0] * self.pred.shape[0]
[docs] def _create_pair(self, exp_idx, pred_idx): """ Create data pairing for a single experimental instance paired with a predicted instance. """ # Get experimental data and label exp_spec = self.exp[exp_idx] exp_label = self.exp_labels[exp_idx] # Get predicted data and label pred_spec = self.pred[pred_idx] pred_label = self.pred_labels[pred_idx] # Release paired instance return OrderedDict( [ ("exp_spec", exp_spec), ("exp_label", exp_label), ("pred_spec", pred_spec), ("pred_label", pred_label), ] )
def __getitem__(self, idx): """ Retrieve a pairing for multi-instance experimental data with a predicted instance. Use division and modulo operations to access pair indices. """ exp_idx = idx // self.pred.shape[0] pred_idx = idx % self.pred.shape[0] return self._create_pair(exp_idx, pred_idx)