Source code for elektronn2.data.knossos_array

# -*- coding: utf-8 -*-
# ELEKTRONN2 Toolkit
# Copyright (c) 2015 Marius Killinger
# All rights reserved

# TODO: Python 3 compatibility


__all__ = ['KnossosArray', 'KnossosArrayMulti']

import glob
import itertools
import re
import os
import sys
import traceback
import ctypes
import multiprocessing as mp
from collections import deque

import numpy as np

from .. import utils

if sys.version_info[:2] != (2, 7):
    raise ImportError(
        '\nSorry, this module only supports Python 2.7.'
        '\nYour current Python version is {}\n'.format(sys.version)
    )


def our_glob(s):
    l = []
    for g in glob.glob(s):
        l.append(g.replace(os.path.sep,"/"))
    return l

def get_remaining_size(fd):
    try:
        fn = fd.fileno()
    except AttributeError:
        return os.path.getsize(fd.name) - fd.tell()
    st = os.fstat(fn)
    size = st.st_size - fd.tell()
    return size

def load_rawfile(path, out, offset=0):
    fd = open(path, 'rb')
    if (offset > 0):
        fd.seek(offset, 1)
    size = get_remaining_size(fd)
    itemsize = out.itemsize
    shapeprod = out.size
    nbytes = shapeprod * itemsize
    if nbytes > size:
        raise ValueError(
                "Not enough bytes left in file for specified shape and type")

    nbytesread = fd.readinto(out.data)
    if nbytesread != nbytes:
        raise IOError("Didn't read as many bytes as expected")

    fd.close()
    return out

class LoadProc(mp.Process):
    def __init__(self, mp_array, path, dtype, status):
        super(LoadProc, self).__init__()
        self.path = path
        self.mp_array = mp_array
        self.status = status
        self.array = np.frombuffer(mp_array.get_obj(), dtype)

    def run(self):
        try:
            self.mp_array.acquire()
            a_buffer = np.empty(len(self.array), np.uint8)
            load_rawfile(self.path, a_buffer)
            self.array[:] = a_buffer
            self.array /= 255
            self.mp_array.release()
        except:
            self.mp_array.release()
            traceback.print_exc()
            self.status.set()


[docs]class KnossosArray(object): """ Interfaces with knossos cubes, all axes are in zxy order! """ def __init__(self, path, max_ram=1000, n_preload=2, fixed_mag=1): path = os.path.expanduser(path) # Knossos attributes self._shape = np.zeros(3, dtype=np.int) self._shape_spatial = np.zeros(3, dtype=np.int) self.scale = np.zeros(3, dtype=np.float) self._knossos_path = None self._experiment_name = None self._cube_boundary = None self._cube_length = 128 self._cube_sh = [self._cube_length,]*3 self._mag = [] self._n_f = 1 # slicing/loading attributes self.dtype = np.dtype(np.float32) _n_preload = np.ones(3, dtype=np.float)*n_preload _n_preload[0] /= 2 self.n_preload = np.ceil(_n_preload).astype(np.int) max_bytes = max_ram * 2**20 self.cache_size = int(max_bytes / (self._cube_length**3 * self.dtype.itemsize)) if self.cache_size<np.prod(self.n_preload*2+1): ram = np.prod(self.n_preload*2+1) * (self._cube_length**3 * self.dtype.itemsize) // 2**20 raise ValueError("For %s preload cubes more RAM is required: %i MB" %(n_preload, ram)) self.mpa_used = {} self.mpa_empty = [] self.cubes = {} self.load_q = deque() self.loading = {} for i in range(self.cache_size): mp_array = mp.Array(ctypes.c_float, int(self._cube_length**3), lock=True) mp_array.acquire() self.mpa_empty.append(mp_array) self._initialize_from_knossos_path(path, fixed_mag) # self.track = [] @property def shape(self): return self._shape @property def n_f(self): return self._n_f def __repr__(self): s = "<KnossosArray> %s\n"%(self.shape,) s += "%i cubes in cache\n" %len(self.cubes) s += "%i mpa in use\n" %len(self.mpa_used) s += "%i mpa empty\n" %len(self.mpa_empty) s += "%i mpa loading\n" %len(self.loading) s += "%i loading q\n" %len(self.load_q) return s def _initialize_from_knossos_path(self, path, fixed_mag=0): """ Initialises the dataset by parsing the knossos.conf in path + "mag1" :param path: str forward-slash separated path to the dataset folder - not .../mag ! :param fixed_mag: int fixes available mag to one specific value :return: nothing """ self._knossos_path = path all_mag_folders = our_glob(os.path.join(path,"*mag*")) if fixed_mag > 0: self._mag.append(fixed_mag) else: for mag_test_nb in range(32): for mag_folder in all_mag_folders: if "mag"+str(2**mag_test_nb) in mag_folder: self._mag.append(2**mag_test_nb) break mag_folder = our_glob(os.path.join(path,"*mag*"))[0].split("/") if len(mag_folder[-1]) > 1: mag_folder = mag_folder[-1] else: mag_folder = mag_folder[-2] self._name_mag_folder = \ mag_folder[:-len(re.findall("[\d]+", mag_folder)[-1])] try: p = our_glob(os.path.join(path,"*mag1"))[0]+"/knossos.conf" print("Reading %s" %p) f = open(p) lines = f.readlines() f.close() except: raise NotImplementedError("Could not find/read *mag1/knossos.conf") parsed_dict = {} for line in lines: try: match = re.search(r'(?P<key>[A-Za-z _]+)' r'((?P<numeric_value>[0-9\.]+)' r'|"(?P<string_value>[A-Za-z0-9._/-]+)");', line).groupdict() if match['string_value']: val = match['string_value'] elif '.' in match['numeric_value']: val = float(match['numeric_value']) elif match['numeric_value']: val = int(match['numeric_value']) else: raise Exception('Malformed knossos.conf') parsed_dict[match["key"]] = val except: print("Unreadable line in knossos.conf - ignored.") self._shape[2] = parsed_dict['boundary x '] self._shape[1] = parsed_dict['boundary y '] self._shape[0] = parsed_dict['boundary z '] self._shape_spatial[:] = self._shape self._cube_boundary = np.ceil(self._shape.astype(np.float) / self._cube_length).astype(np.int) # the boundary is the first for which no cube is available! self.scale[2] = parsed_dict['scale x '] self.scale[1] = parsed_dict['scale y '] self.scale[0] = parsed_dict['scale z '] self._experiment_name = parsed_dict['experiment name '] if self._experiment_name.endswith("mag1"): self._experiment_name = self._experiment_name[:-5] def _request_cube(self, coords): assert (coords not in self.cubes) and (coords not in self.loading) assert len(self.mpa_empty) # self.track.append(coords) coords_xyz = [coords[2], coords[1], coords[0]] # knossos has xyz order, we have zyx mag = self._mag[0] path = self._knossos_path+self._name_mag_folder+\ str(mag) + "/x%04d/y%04d/z%04d/" \ % (coords_xyz[0], coords_xyz[1], coords_xyz[2]) + \ self._experiment_name + '_mag' + str(mag) + \ "_x%04d_y%04d_z%04d.raw" \ % (coords_xyz[0], coords_xyz[1], coords_xyz[2]) if not os.path.exists(path): raise ValueError("Slice out of bounce, no cube %s " "[Note: not all cubes exist within bounding box!].)" %(coords,)) mp_array = self.mpa_empty.pop() status = mp.Event() # is not set mp_array.release() proc = LoadProc(mp_array, path, self.dtype, status) proc.start() self.load_q.append([status, proc, mp_array, coords]) self.loading[coords]=True def _clear_q(self, wait_for): if wait_for is None: states = [q[1].is_alive() for q in self.load_q] try: max_i = states.index(True) except: # all have terminated already max_i = len(self.load_q) elif wait_for=='all': max_i = len(self.load_q) else: max_i = len(self.load_q) i = 0 while i<max_i: status, proc, mp_array, coords = self.load_q.popleft() if proc.is_alive(): print("WAITING for cube %s" %(coords,)) proc.join() if status.is_set(): raise Exception("An error happened in one of the loading processes") mp_array.acquire() np_array = np.frombuffer(mp_array.get_obj(), self.dtype).reshape(self._cube_sh) assert coords not in self.cubes, "%s"%(coords,) self.cubes[coords] = np_array self.mpa_used[coords] = mp_array self.loading.pop(coords) #print("cube %s loaded" %(coords,)) if coords==wait_for: break i += 1
[docs] def preload(self, position, start_end=None, sync=False): """ preloads around position preload distance but at least to cover start-end """ position = np.array(position, dtype=np.int) center_cube = np.floor((position)/self._cube_length).astype(np.int) start = np.maximum(center_cube - self.n_preload, 0) end = np.minimum(center_cube + self.n_preload + 1, self._cube_boundary) # preload indices ijk_pre = list(itertools.product(range(start[0], end[0]), range(start[1], end[1]), range(start[2], end[2]))) # requested indices: if start_end is not None: start_r, end_r = start_end if np.any(end_r > self._cube_boundary): print("Warning: Slice at %s might be out of bounce (no cube %s)" %(position, end_r-1)) ijk_req = list(itertools.product(range(start_r[0], end_r[0]), range(start_r[1], end_r[1]), range(start_r[2], end_r[2]))) ijk_req_set = set((ijk_req)) ijk_pre = list(filter(lambda x: x not in ijk_req_set, ijk_pre)) ijk = ijk_req + ijk_pre else: ijk = ijk_pre fetch = [] keep = {} for coords in ijk: if (coords in self.cubes): keep[coords] = True elif (coords not in self.loading): fetch.append(coords) if len(fetch) > len(self.mpa_empty): # need to release arrays first n = len(fetch) - len(self.mpa_empty) if len(ijk) > self.cache_size: f = float(len(ijk)) / self.cache_size raise ValueError("The requested array+preload margin is %.2f x" " too large for the cache limit" %f) self._release_cubes(center_cube, keep, n) for i,j,k in fetch: self._request_cube((i,j,k)) if sync: self._clear_q("all") if start_end is not None: return ijk_req
def _release_cube(self, coords): self.cubes.pop(coords) mp_array = self.mpa_used.pop(coords) self.mpa_empty.append(mp_array) #print("cube %s deleted" %(coords,)) def _release_cubes(self, center_cube, keep, n): release = [] for coords in self.cubes: if coords not in keep: dist = np.linalg.norm((center_cube - coords) * [2,1,1]) release.append((coords, dist)) release.sort(key=lambda x: x[1], reverse=True) if len(release)< n: # not enough cubes available, they are still in loading q for i in range(len(release)): # release the possible cubes first self._release_cube(release[i][0]) n_left = n - len(release) # now clear the q try: # without waiting self._clear_q(None) self._release_cubes(center_cube, keep, n_left) except: # if it doesn't work, wait for all self._clear_q('all') self._release_cubes(center_cube, keep, n_left) return for i in range(n): self._release_cube(release[i][0]) def _fetch_cube(self, coords): """ Fetch a cube that is in cache or in loading q. i.e. this cube must have been preloaded Don't use this to request/preload a cube! """ if coords not in self.cubes: assert coords in self.loading self._clear_q(coords) return self.cubes[coords] #@utils.timeit
[docs] def cut_slice(self, shape, offset, out=None): #tt = utils.Timer() shape = np.array(shape, dtype=np.int) offset = np.array(offset, dtype=np.int) start = np.floor((offset)/self._cube_length).astype(np.int) end = np.floor((offset+shape-1)/self._cube_length).astype(np.int) + 1 position = offset + shape//2 #tt.check("setup", True) ijk = self.preload(position=position, start_end=(start, end), sync=False) #tt.check("preload", True) if out is None: out = np.empty(shape, self.dtype) else: assert out.dtype==self.dtype assert np.allclose(out.shape, shape) l = self._cube_length for i,j,k in ijk: # Flat triple-nested loop values = self._fetch_cube((i,j,k)) #tt.check("fetch", True) xl = max(offset[0]-i*l, 0) xh = min(offset[0]-i*l+shape[0], l) yl = max(offset[1]-j*l, 0) yh = min(offset[1]-j*l+shape[1], l) zl = max(offset[2]-k*l, 0) zh = min(offset[2]-k*l+shape[2], l) cut = values[xl:xh, yl:yh, zl:zh] #tt.check("cut", True) xl1 = max(i*l-offset[0], 0) xh1 = min((i+1)*l-offset[0], shape[0]) yl1 = max(j*l-offset[1], 0) yh1 = min((j+1)*l-offset[1], shape[1]) zl1 = max(k*l-offset[2], 0) zh1 = min((k+1)*l-offset[2], shape[2]) out[xl1:xh1, yl1:yh1, zl1:zh1] = cut #tt.check('write', True) #tt.summary() return out
def _normalise_idx(self, idx): new_idx = [] shape = np.zeros(3, dtype=np.int) offset = np.zeros(3, dtype=np.int) for i,sl in enumerate(idx): if isinstance(sl, int): raise ValueError("Cannot slice single slices, only ranges") start, stop, step = sl.indices(self._shape_spatial[i]) if start < 0 or stop>=self._shape_spatial[i]: raise ValueError("Slice %s out of bounce for knossos " "array of shape %s" %(idx[i], self._shape_spatial[i])) if step != 1: raise NotImplementedError("Subsampled slicing not implemented:" "instructins: use mag(stride) cubes " "and reuse slicing function") new_idx.append(slice(start, stop, step)) shape[i] = (stop - start)//step offset[i] = start return shape, offset, new_idx def __getitem__(self, idx): assert len(idx) in [3,4] add_channel = False if len(idx)==4: assert idx[0].start==idx[0].stop==idx[0].step==None idx = idx[1:] add_channel = True shape, offset, new_idx = self._normalise_idx(idx) out = self.cut_slice(shape, offset) if add_channel: out = out[None] return out
[docs]class KnossosArrayMulti(KnossosArray): def __init__(self, path_prefix, feature_paths, max_ram=3000, n_preload=2, fixed_mag=1): self._arrays = [] self._n_f = len(feature_paths) self.dtype = np.dtype(np.float32) for feature in feature_paths: p = os.path.expanduser(os.path.join(path_prefix,feature)) self._arrays.append(KnossosArray(p, max_ram // self._n_f, n_preload, fixed_mag)) self._shape = (self._n_f,) + tuple(self._arrays[0].shape) self._shape_spatial = self._arrays[0].shape
[docs] def preload(self, position, sync=True): if len(position)==4: position = position[1:] for a in self._arrays: a.preload(position, sync=sync)
def __getitem__(self, idx): assert len(idx)==4 feature_slice = idx[0] idx = idx[1:] shape, offset, new_idx = self._normalise_idx(idx) feature_indices = list(range(*feature_slice.indices(self._n_f))) out = np.empty((len(feature_indices),)+tuple(shape), self.dtype) for i in feature_indices: self._arrays[i].cut_slice(shape, offset, out[i]) return out
[docs] def cut_slice(self, shape, offset, out=None): if out is None: out = np.empty((self._n_f,)+tuple(shape), self.dtype) for i in range(self._n_f): self._arrays[i].cut_slice(shape, offset, out[i]) return out
def __repr__(self): s = "<KnossosArray> %s\nFirst sub-array:\n"%(self.shape,) s += repr(self._arrays[0]) return s
class KnossosArrayMultiMy(KnossosArrayMulti): def __getitem__(self, idx): tmp = super(KnossosArrayMultiMy, self).__getitem__(idx) sh = list(tmp.shape) sh[0] = 2 out = np.empty(sh, dtype=np.float32) out[0] = tmp[0] out[1] = np.maximum(np.maximum(tmp[1], tmp[2]) - tmp[3], 0) return out @property def n_f(self): return self.n_f @property def shape(self): sh = list(self._shape) sh[0] = 2 return tuple(sh) if __name__ == "__main__": import time, sys sys.path.append(os.path.expanduser("~/axon/mkilling/devel/")) from knossos_utils import KnossosDataset from elektronn2.utils.plotting import scroll_plot, scroll_plot2 path = os.path.expanduser("~/lustre/sdorkenw/j0126_3d_rrbarrier/") arr = KnossosArray(path, max_ram=1200, n_preload=2) arr.preload([0,0,0], sync=True) sh = np.array([120,240,240]) off = np.array([0,0,0]) + np.random.randint(0, 500, size=3) # a = arr[0:80, 0:220, 0:230] # a0 = arr[0:100, 0:400, 0:200] # a1 = arr[0:100, 100:400+100, 100:200+100] # b = arr[0:640, 0:640, 0:384] # c = arr[500:600, 500:800, 500:800] # kds = KnossosDataset() kds.initialize_from_knossos_path(path) get = utils.timeit(kds.from_raw_cubes_to_matrix) # a0 = arr.cut_slice(sh, off) # a1 = get(sh[[2,1,0]], off[[2,1,0]]) # a2 = (a1.T).astype(np.float32)/255 # print(np.allclose(a0, a2)) # fig = scroll_plot2([a0, a2], 'ab') t = 0 out = np.empty(sh, dtype=np.float32) N = 500 for i in range(N): t0 = time.time() a = arr.cut_slice(sh, off, out=out) t += time.time()-t0 time.sleep(0.2) off += np.random.randint(0, 10, size=3) print("%.3f slices/s" %(float(N)/t)) arr._clear_q(None) fig = scroll_plot(out, 'a')