Source code for biobox.measures.path

# Copyright (c) 2014-2017 Matteo Degiacomi
#
# BiobOx is free software ;
# you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation ;
# either version 2 of the License, or (at your option) any later version.
# BiobOx is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY ;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with BiobOx ;
# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
#
# Author : Matteo Degiacomi, matteothomas.degiacomi@gmail.com

import heapq
import scipy.spatial.distance as SD
import numpy as np
from sklearn.cluster import DBSCAN

import biobox.lib.fastmath as FM # cythonized
from biobox.lib.graph import Graph # cythonized

from biobox.classes.structure import Structure
from biobox.classes.convex import Sphere


class PriorityQueue(object):
    '''
    Queue for shortest path algorithms in Graph class.
    '''

    def __init__(self):
        self.elements = []

    def empty(self):
        '''
        clear priority queue
        '''
        return len(self.elements) == 0

    def put(self, item, priority):
        '''
        add element in priority queue"

        :param item: item to add in queue
        :param priority: item's priority
        '''
        heapq.heappush(self.elements, (priority, item))

    def get(self):
        '''
        pop top priority element from queue
        '''
        return heapq.heappop(self.elements)[1]


[docs]class Path(object): ''' Methods for finding shortest paths between points in a point cloud (typically a :func:`Structure <structure.Structure>` object). Two algorithms are implemented: A* and Theta*. In order to deal with lists of links within the same dataset, two methodologies are offered, global or local. * global builds a grid around the whole ensemble of points once, and every subsequent distance measure is performed on the same grid. * local a local grid is displaced to surround the points to be linked. Global take longer to initialize, and is more memory intensive, but subsequent measures are quick. Local takes less memory, but individual measures take more time to be performed. ''' def __init__(self, points): ''' An accessibility graph must be initially created around the provided points cloud. This is a mesh where none of its nodes clashes with any of the provided point in the cloud. After instantiation, :func:`setup_local_search <path.Path.setup_local_search>` or :func:`setup_global_search <path.Path.setup_global_search>` must first be called (depending on whether one wants to generate a single grid encompassing all the protein, or a smaller, moving grid). :param points: points for representing obstacles. ''' self.graph = Graph(points) self.kind = "none"
[docs] def search_path(self, start, end, method="theta", get_path=True, update_grid=True, test_los=True): ''' Find the shortest accessible path between two points. :param start: coordinates of starting point :param end: coordinates of target point :param method: can be theta or astar :param get_path: return full path (not only waypoints) :param update_grid: if True, grid will be recalculated (for local search only) :param test_los: if true, a line of sight postprocessing will be performed to make paths straighter ''' ###INITIALIZE PATH SEARCH### euclidean = np.sqrt(np.dot(start - end, start - end)) # if euclidean path is requested, don't go any futher # not the fastest way (when dealing with spheres, would be better to calculate pairwise distances in distance_matrix) # this said, though, we keep distance method selection packed in the # same function if method == "euclidean": waypoints = np.array([start, end]) if get_path: waypoints = self._get_trails(waypoints) return euclidean, waypoints # if points are too far, skip it if euclidean > self.maxdist: return -1, np.array([]) # specify grid search type (local or global) if self.kind == "local" and update_grid: self.graph.place_local_grid(start, end) elif self.kind != "global" and self.kind != "local": raise Exception( "setup_local_search or setup_global_search must first be called") connect_thresh = self.maxdist + self.graph.step # get indices of closest graph neighbors in graph, corresponding to points to connect # in this case start and end will be picked within the same ensemble of coordinates # first, check if atoms are accessible, exit if likely to be buried dists, idx_3d_start = self.graph.get_closest_nodes(np.array([start])) if dists[0] > connect_thresh: return -2, np.array([]) dists, idx_3d_end = self.graph.get_closest_nodes(np.array([end])) if dists[0] > connect_thresh: return -2, np.array([]) idx_start = self.graph.get_flat_index(np.array(idx_3d_start.T))[0] idx_end = self.graph.get_flat_index(np.array(idx_3d_end.T))[0] # if start and end see each other, do not run shortest path algorithms if self._line_of_sight(idx_3d_start[0], idx_3d_end[0]): waypoints = np.array([start, self.graph.get_points_from_idx_flat(idx_start), self.graph.get_points_from_idx_flat(idx_end), end]) else: ###COMPUTE CURVED DISTANCE### if method == "old_theta": # original theta* algorithm came_from, cost_so_far = self.theta_star(idx_start, idx_end) elif method == "astar": # A* algorithm came_from, cost_so_far = self.a_star(idx_start, idx_end) # lazy theta* algorithm, more lightweight than the original elif method == "lazytheta" or method == "theta": came_from, cost_so_far = self.lazy_theta_star( idx_start, idx_end) else: print("ERROR: search method %s unknown." % method) return -1, np.array([]) # get waypoints using path dictionary, end node index and target # endpoint (prepended to resulting path) waypoints = self._get_waypoints(came_from, idx_start, idx_end, end, start, clean_lineofsight=test_los) if len(waypoints) == 0: # points are disconnected return -1, waypoints # measure path length dist = self._measure_path(waypoints) # on request, get full path if get_path: waypoints = self._get_trails(waypoints) return dist, waypoints
# interpret shortest path algorithm output def _get_waypoints(self, came_from, idx_start, best_end_idx, endpoint, start, clean_lineofsight=True): # build path form search algorithm output and compute cost # flattened indices (start with graph endpoint) pts_idx = [best_end_idx] # points coordinates (start with best position within endpoints) pts_crd = [endpoint] pts_crd.append(self.graph.get_points_from_idx_flat(best_end_idx)) # points coordinates cnt = 0 # safety mechanism, regions of start and end point are disconnected while cnt < np.sum(self.graph.access_grid): # extract previous point, and continue if not null try: pred = came_from[pts_idx[-1]] except Exception as ex: return -1, np.array([]) if pred == idx_start: break # coordinates of previous point predpt = self.graph.get_points_from_idx_flat(pred) # print(predpt) # extend path, and compute distance (store waypoints) pts_crd.append(predpt) pts_idx.append(pred) cnt += 1 if cnt == np.sum(self.graph.access_grid): return np.array([]) pts_idx.append(idx_start) # start points indices pts_crd.append(self.graph.get_points_from_idx_flat(idx_start)) pts_crd.append(start) # start points coordinates # if no waypoint cleaning is required, return obtained path if not clean_lineofsight: return np.array(pts_crd) # clear waypoints located between two points "seeing eachother" test = np.ones(len(pts_crd)).astype(bool) w = [] for p in pts_idx: p3d = self.graph.get_3d_index(p) w.append(p3d) waypoints = np.array(w) # forward i = 1 while i < len(waypoints): if test[i] == 1: # if point is visible for j in range(i+1, len(waypoints)): #for j in range(len(waypoints)-1, i + 1, -1): # test from end to second neighbor if test[j] == 1: v = waypoints[i].copy() w = waypoints[j].copy() if self._line_of_sight(v, w): #print(i, j) test[i + 1:j+1] = False #break i += 1 return np.array(pts_crd)[test] # fill intermediate regions between waypoints with points # points are separated with steps of 1A (or less) def _get_trails(self, waypoints): wpts = [waypoints[0]] for i in range(1, len(waypoints), 1): # add points in interval between old and new point vec = waypoints[i - 1] - waypoints[i] distance = np.sqrt(np.dot(vec, vec)) if distance < 1: #was 0.01A continue vec /= np.linalg.norm(vec) start_here = waypoints[i] for a in np.linspace(distance, 1, int(distance)): pt = start_here + a * vec wpts.append(pt) wpts.append(waypoints[i]) return np.array(wpts) # measure the length of a path, provided as waypoints def _measure_path(self, waypoints): # distance initialized with distance between best starting point and # closes graph node dist = 0 for i in range(1, len(waypoints), 1): dist += np.sqrt(np.dot(waypoints[i - 1] - waypoints[i], waypoints[i - 1] - waypoints[i])) return dist # line-of-sight test. Draw line between two points using Bresenham algorithm. # line-of-sight established if all voxels are true in # self.graph.access_grid def _line_of_sight(self, a, b): return FM.c_line_of_sight(self.graph.access_grid, a, b) # def _heuristic(self, a, b): # return self.graph.heuristic(a, b) #a1 = np.array(self.graph.get_3d_index(a.T)).astype(float) #b1 = np.array(self.graph.get_3d_index(b.T)).astype(float) # return np.dot(b1-a1, b1-a1) #manhattan!
[docs] def a_star(self, start, goal): ''' A* algorithm, find path connecting two points in the graph. :param start: starting point (flattened coordinate of a graph grid point). :param goal: end point (flattened coordinate of a graph grid point). ''' self.frontier = PriorityQueue() self.frontier.put(start, 0) self.came_from = {} self.cost_so_far = {} self.came_from[start] = start self.cost_so_far[start] = 0 while not self.frontier.empty(): self.current = self.frontier.get() if self.current == goal: break for thenext in self.graph.neighbors(self.current, True): new_cost = self.cost_so_far[self.current] + self.graph.cost(self.current, thenext) if thenext not in self.cost_so_far or new_cost < self.cost_so_far[thenext]: self.cost_so_far[thenext] = new_cost priority = new_cost + self.graph.heuristic(goal, thenext) self.frontier.put(thenext, priority) self.came_from[thenext] = self.current return self.came_from, self.cost_so_far
[docs] def theta_star(self, start, goal): ''' Theta* algorithm, find path connecting two points in the accessibility graph. :param start: starting point (flattened coordinate of a graph grid point). :param goal: end point (flattened coordinate of a graph grid point). ''' self.frontier = PriorityQueue() self.frontier.put(start, 0) self.came_from = {} self.cost_so_far = {} self.came_from[start] = start self.cost_so_far[start] = 0 while not self.frontier.empty(): self.current = self.frontier.get() if self.current == goal: break for thenext in self.graph.neighbors(self.current, True): new_cost = self.cost_so_far[self.current] + self.graph.cost(self.current, thenext) # test line-of-sight between predecessor of current, and thenext # node. if self.came_from[self.current] != start: a1 = np.array(self.graph.get_3d_index(self.came_from[self.current])) b1 = np.array(self.graph.get_3d_index(thenext)) visible = self._line_of_sight(a1, b1) else: visible = False # if visible, current node is useless and can be ignored. if visible: newcost = self.cost_so_far[self.came_from[self.current]] + self.graph.cost(self.came_from[self.current], thenext) if thenext not in self.cost_so_far or newcost < self.cost_so_far[thenext]: self.came_from[thenext] = self.came_from[self.current] self.cost_so_far[thenext] = newcost priority = self.cost_so_far[thenext] + self.graph.heuristic(goal, thenext) self.frontier.put(thenext, priority) elif thenext not in self.cost_so_far or new_cost < self.cost_so_far[thenext]: self.cost_so_far[thenext] = new_cost priority = new_cost + self.graph.heuristic(goal, thenext) self.frontier.put(thenext, priority) self.came_from[thenext] = self.current return self.came_from, self.cost_so_far
[docs] def lazy_theta_star(self, start, goal): ''' Lazy Theta* algorithm (better than Theta* in terms of amount of line-of-sight tests), find path connecting two points in the graph. :param start: starting point (flattened coordinate of a graph grid point). :param goal: end point (flattened coordinate of a graph grid point). ''' self.frontier = PriorityQueue() self.frontier.put(start, 0) self.came_from = {} self.cost_so_far = {} self.came_from[start] = start self.cost_so_far[start] = 0 while not self.frontier.empty(): self.current = self.frontier.get() # test line-of-sight between predecessor of current, and current # (test if line_of_sight guess was right) if self.current != start: a1 = np.array(self.graph.get_3d_index(self.came_from[self.current])) b1 = np.array(self.graph.get_3d_index(self.current)) visible = self._line_of_sight(a1, b1) else: visible = True # if not visible, select closest visible neighbor if not visible: min_pos = -1 min_cost = 10000000 for thenext in self.graph.neighbors(self.current, True): if thenext in self.cost_so_far and thenext != self.came_from[self.current]: new_cost = self.cost_so_far[thenext] + self.graph.cost(self.current, thenext) if new_cost < min_cost: min_cost = new_cost min_pos = thenext self.came_from[self.current] = min_pos self.cost_so_far[self.current] = min_cost if self.current == goal: break for thenext in self.graph.neighbors(self.current, True): parentpos = self.came_from[self.current] test_cost = self.cost_so_far[parentpos] + self.graph.cost(parentpos, thenext) if thenext not in self.cost_so_far or test_cost < self.cost_so_far[thenext]: self.came_from[thenext] = self.came_from[self.current] self.cost_so_far[thenext] = test_cost priority = self.cost_so_far[thenext] + self.graph.heuristic(goal, thenext) self.frontier.put(thenext, priority) return self.came_from, self.cost_so_far
[docs] def smooth(self, chain, move_angle_thresh=0.0): ''' Utility method aimed at smoothing a chain produced by A* or Theta*, to make it less angular. :param chain: numpy array containing the list of points composing the path :param move_angle_thresh: if angle between three consecutive points is greater than this threshold, smoothing is performed :returns: smoothed chain (3xN numpy array) ''' # if chain is too short, return if len(chain) <= 1: return 0.0, chain elif len(chain) == 2: return np.sqrt(np.dot(chain[1] - chain[0], chain[1] - chain[0])), chain angles_test = np.zeros(len(chain) - 2) # first test: scan all angles, and pinpoint the ones to check for i in range(1, len(chain) - 1, 1): mod1 = np.linalg.norm(chain[i] - chain[i - 1]) mod2 = np.linalg.norm(chain[i + 1] - chain[i]) if mod1 == 0 or mod2 == 0: angles_test[i - 1] = 1 continue a1 = (chain[i] - chain[i - 1]) / mod1 a2 = (chain[i + 1] - chain[i]) / mod2 if not np.any(a1 != a2): continue dd = np.dot(a1, a2) if dd > 1.0: dd = 1.0 # if an angle is not close to straight, flag it for straightening if np.degrees(np.arccos(dd)) > move_angle_thresh: angles_test[i - 1] = 1 # for every flagged angle, try to straighten while np.any(angles_test == 1): for i in range(0, len(angles_test), 1): # if an angle is not (almost) straight, try to straighten it if angles_test[i] == 1: # angle i involves atoms i, i+1 (center to be displaced) # and i+2 point = (chain[i + 2] + chain[i]) / 2.0 chain[i + 1] = point[:] angles_test[i] = -1 # if angle has been moved, tag for angle check its # neighbors if i >= 1 and angles_test[i - 1] != -1: angles_test[i - 1] = 1 if i < len(angles_test) - 1 and angles_test[i + 1] != -1: angles_test[i + 1] = 1 dist = 0 for i in range(0, len(chain) - 1, 1): dist += np.sqrt(np.dot(chain[i] - chain[i + 1], chain[i] - chain[i + 1])) return dist, chain
[docs] def write_grid(self, filename="grid.pdb"): ''' Write the accessibility graph to a PBB file :param filename: output file name ''' # it is not in graph, to keep cython as clean as possible w = np.array(np.where(self.graph.access_grid)).T.astype(float) pts = self.graph.get_points_from_idx(w) S = Structure(p=pts) S.write_pdb(filename)