Skip to content

Signatures #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
33 changes: 18 additions & 15 deletions nibabel/analyze.py
Original file line number Diff line number Diff line change
@@ -800,6 +800,22 @@ def scaling_from_data(self, data):
out_dtype,
self.has_data_intercept)

def state_stamper(self, caller):
""" Return stamp for current state of `self`

Parameters
----------
caller : callable
May be object from which this method was called. Not used by
analyze headers, but may be used by subclasses

Returns
-------
stamp : object
object unique to this state of `self`
"""
return self.__class__, self.binaryblock

@classmethod
def _get_checks(klass):
''' Return sequence of check functions for this class '''
@@ -899,18 +915,7 @@ class AnalyzeImage(SpatialImage):
files_types = (('image','.img'), ('header','.hdr'))
_compressed_exts = ('.gz', '.bz2')

class ImageArrayProxy(ArrayProxy):
''' Analyze-type implemention of array proxy protocol

The array proxy allows us to freeze the passed fileobj and
header such that it returns the expected data array.
'''
def _read_data(self):
fileobj = allopen(self.file_like)
data = self.header.data_from_fileobj(fileobj)
if isinstance(self.file_like, basestring): # filename
fileobj.close()
return data
ImageArrayProxy = ArrayProxy

def get_header(self):
''' Return header
@@ -941,9 +946,7 @@ def from_file_map(klass, file_map):
img = klass(data, None, header, file_map=file_map)
# set affine from header though
img._affine = header.get_best_affine()
img._load_cache = {'header': hdr_copy,
'affine': img._affine.copy(),
'file_map': copy_file_map(file_map)}
img._stored_state['affine'] = img.stamper(img._affine)
return img

@staticmethod
75 changes: 72 additions & 3 deletions nibabel/arrayproxy.py
Original file line number Diff line number Diff line change
@@ -6,14 +6,48 @@
# copyright and license terms.
#
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
''' Array proxy base class '''
""" Array proxy base class

The API is - at minimum:

* The object has an attribute ``shape``
* that the object returns the data array from ``np.asarray(obj)``
* that modifying no object outside ``obj`` will affect the result of
``np.asarray(obj)``. Specifically, if you pass a header into the the
__init__, then modifying the original header will not affect the result of the
array return.

You might also want to implement ``state_stamper``
"""

from .volumeutils import allopen


class ArrayProxy(object):
"""
The array proxy allows us to freeze the passed fileobj and header such that
it returns the expected data array.

This fairly generic implementation allows us to deal with Analyze and its
variants, including Nifti1, and with the MGH format, apparently.

It requires a ``header`` object with methods:
* copy
* get_data_shape
* data_from_fileobj

Other image types might need to implement their own implementation of this
API. See :mod:`minc` for an example.
"""
def __init__(self, file_like, header):
self.file_like = file_like
self.header = header.copy()
self._data = None
self.shape = header.get_data_shape()
self._shape = header.get_data_shape()

@property
def shape(self):
return self._shape

def __array__(self):
''' Cached read of data from file '''
@@ -22,6 +56,41 @@ def __array__(self):
return self._data

def _read_data(self):
raise NotImplementedError
fileobj = allopen(self.file_like)
data = self.header.data_from_fileobj(fileobj)
if isinstance(self.file_like, basestring): # filename
fileobj.close()
return data

def state_stamper(self, caller):
""" Return stamp for current state of `self`

The result somewhat uniquely identifies the state of the array proxy.
It assumes that the underlying ``self.file_like`` does not get modified.
Specifically, if you open a file-like object, pass into an arrayproxy
(call it ``ap``) and get the stamp (say with ``Stamper()(ap)``, then
this stamp will uniquely identify the result of ``np.asarry(ap)`` only
if the file-like object has not changed.

Parameters
----------
caller : callable
callable object from which this method was called.

Returns
-------
stamp : object
object unique to this state of `self`

Notes
-----
The stamp changes if the array to be returned has been cached
(``_data`` attribute). This is because this makes it possible to change
the array outside the proxy object, because further calls to
``__array__`` returns a refernce to ``self._data``, and the reference
allows the caller to modify the array in-place.
"""
return (self.__class__,
self.file_like,
caller(self.header),
caller(self._data))
25 changes: 24 additions & 1 deletion nibabel/fileholders.py
Original file line number Diff line number Diff line change
@@ -90,6 +90,30 @@ def same_file_as(self, other):
return ((self.filename == other.filename) and
(self.fileobj == other.fileobj))

def state_stamper(self, caller):
""" Get record of state of fileholder

See: :mod:`stampers`

Parameters
----------
caller : object
Passed from stamper object, but not used by us

Returns
-------
stamp : tuple
state stamp

Notes
-----
We can get state stamp for these file objects assuming that the same
filename corresponds to the same file. We can let pass the position of
reading in the file because we are recording the position with
``self.pos``.
"""
return (self.filename, self.fileobj, self.pos)


def copy_file_map(file_map):
''' Copy mapping of fileholders given by `file_map`
@@ -109,4 +133,3 @@ def copy_file_map(file_map):
for key, fh in file_map.items():
fm_copy[key] = copy(fh)
return fm_copy

16 changes: 1 addition & 15 deletions nibabel/freesurfer/mghformat.py
Original file line number Diff line number Diff line change
@@ -429,18 +429,7 @@ class MGHImage(SpatialImage):
files_types = (('image', '.mgh'),)
_compressed_exts = ('.mgz',)

class ImageArrayProxy(ArrayProxy):
''' Analyze-type implemention of array proxy protocol

The array proxy allows us to freeze the passed fileobj and
header such that it returns the expected data array.
'''
def _read_data(self):
fileobj = allopen(self.file_like)
data = self.header.data_from_fileobj(fileobj)
if isinstance(self.file_like, basestring): # filename
fileobj.close()
return data
ImageArrayProxy = ArrayProxy

def get_header(self):
''' Return the MGH header given the MGHImage'''
@@ -483,9 +472,6 @@ def from_file_map(klass, file_map):
hdr_copy = header.copy()
data = klass.ImageArrayProxy(mghf, hdr_copy)
img = klass(data, affine, header, file_map=file_map)
img._load_cache = {'header': hdr_copy,
'affine': affine.copy(),
'file_map': copy_file_map(file_map)}
return img

def to_file_map(self, file_map=None):
30 changes: 30 additions & 0 deletions nibabel/nifti1.py
Original file line number Diff line number Diff line change
@@ -502,6 +502,21 @@ def from_fileobj(klass, fileobj, size, byteswap):
extensions.append(ext)
return extensions

def state_stamper(self, caller):
""" Return stamp for current state of `self`

Parameters
----------
caller : callable
callable with which we can process our state

Returns
-------
stamp : object
object unique to this state of `self`
"""
return self.__class__, caller(list(self))


class Nifti1Header(SpmAnalyzeHeader):
''' Class for NIFTI1 header
@@ -620,6 +635,21 @@ def default_structarr(klass, endianness=None):
hdr_data['vox_offset'] = 0
return hdr_data

def state_stamper(self, caller):
""" Return stamp for current state of `self`

Parameters
----------
caller : None or callable
May be object from which this method was called.

Returns
-------
stamp : object
object unique to this state of `self`
"""
return self.__class__, self.binaryblock, caller(self.extensions)

def get_qform_quaternion(self):
''' Compute quaternion from b, c, d of quaternion

96 changes: 94 additions & 2 deletions nibabel/spatialimages.py
Original file line number Diff line number Diff line change
@@ -130,6 +130,7 @@

import numpy as np

from .stampers import NdaStamper
from .filename_parser import types_filenames, TypesFilenamesError
from .fileholders import FileHolder
from .volumeutils import shape_zoom_affine
@@ -164,7 +165,7 @@ def from_header(klass, header=None):
if header is None:
return klass()
# I can't do isinstance here because it is not necessarily true
# that a subclass has exactly the same interface as it's parent
# that a subclass has exactly the same interface as its parent
# - for example Nifti1Images inherit from Analyze, but have
# different field names
if type(header) == klass:
@@ -254,6 +255,24 @@ def data_from_fileobj(self, fileobj):
data_bytes = fileobj.read(data_size)
return np.ndarray(shape, dtype, data_bytes, order='F')

def state_stamper(self, caller):
""" Return stamp for current state of `self`

Parameters
----------
caller : callable
May be object from which this method was called.

Returns
-------
stamp : object
object unique to this state of `self`
"""
return (self.__class__,
np.dtype(self._dtype),
tuple(self._shape),
tuple(self._zooms))


class ImageDataError(Exception):
pass
@@ -266,6 +285,8 @@ class ImageFileError(Exception):
class SpatialImage(object):
header_class = Header
files_types = (('image', None),)
# Object with which to get state stamps for components
stamper = NdaStamper()
_compressed_exts = ()

''' Template class for images '''
@@ -319,7 +340,7 @@ def __init__(self, data, affine, header=None,
if file_map is None:
file_map = self.__class__.make_file_map()
self.file_map = file_map
self._load_cache = None
self._stored_state = self.current_state()

def update_header(self):
''' Update header from information in image'''
@@ -558,3 +579,74 @@ def from_image(klass, img):
klass.header_class.from_header(img.get_header()),
extra=img.extra.copy())

def current_state(self, stamper=None):
""" Return dictionary unique to current state of the image

The state of an image is defined by all of:
* data
* affine
* header
* file_map

Note we ignore ``extra`` in defining state.

Parameters
----------
stamper : None or callable
Object with which to create state stamps for components of image.
Defaults to an ndarray-aware stamper

Returns
-------
state : dict
dictionary with key, value pairs for each image component
"""
if stamper is None:
stamper = self.stamper
return dict(data=stamper(self._data),
affine=stamper(self._affine),
header=stamper(self._header),
file_map=stamper(self.file_map))

def reset_changed(self):
""" Reset stored state so that changes are relative to current state

Checkpoints image stored state so that ``maybe_changed`` is relative to
state as of this call. See ``maybe_changed``.
"""
self._stored_state = self.current_state()

def maybe_changed(self):
""" True if image might have changed relative to last checkpoint

We record the image state when you create the image object, and when you
call ``reset_changed`` explicitly. We return True from this method if
the recorded image state may be different from the current image state.
We also return True if the image state is too time-consuming to
calculate.
"""
return self._stored_state != self.current_state()

def state_stamper(self, caller):
""" Return stamp for current state of `self`

The state of an image is defined by all of:
* data
* affine
* header
* file_map

Note we ignore ``extra`` in defining state.

Parameters
----------
caller : callable
May be object from which this method was called.

Returns
-------
stamp : object
object unique to this state of `self`
"""
cstate = self.current_state(caller)
return self.__class__, tuple(cstate.items())
2 changes: 2 additions & 0 deletions nibabel/spm99analyze.py
Original file line number Diff line number Diff line change
@@ -279,6 +279,8 @@ def from_file_map(klass, file_map):
to_111 = np.eye(4)
to_111[:3,3] = 1
ret._affine = np.dot(ret._affine, to_111)
# Update stored affine stamp
ret._stored_state['affine'] = ret.stamper(ret._affine)
return ret

def to_file_map(self, file_map=None):
253 changes: 253 additions & 0 deletions nibabel/stampers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
""" State stamps
A state stamp is something that defines the state of an object. Let's say we have
an object ``X`` is in a state $S$.
Let's call the state stamp finder ``get_state_stamp``. This could be the result
of ``get_state_stamp = Stamper()`` below, for example.
The *state stamp* of ``X`` is some value ``g`` such that ``get_state_stamp(X) ==
g`` if and only if ``X`` is in state $S$ - however defined.
``get_state_stamp(Y) == g`` should in general not be true if ``Y`` is a
different class for ``X``.
Thus the state stamp guarantees a particular state of ``X`` - as defined by you,
dear programmer. Conversely, if ``get_state_stamp(X) != g`` this does not
guarantee they are different. It may be that you (dear programmer) don't know
if they are different, and do not want to spend resources on working it out.
For example, if ``X`` is a huge array, you might want to return the
``Unknown()`` state stamp.
The state stamp ``Unknown()`` is the state stamp such that ``get_state_stamp(X) ==
Unknown()`` is always False.
If you have objects you want compared, you can do one of:
* define a ``state_stamp`` method, taking a single argument ``caller`` which is
the callable from which the method has been called. You can then return
something which is unique for the states you want to be able to distinguish.
Don't forget that (usually) stamps from objects of different types should
compare unequal.
* subclass the ``Stamper`` class, and extend the ``__call__`` method to
handle a new object type. The ``NdaStamper`` class below is an example.
It's up to the object how to do the stamping. In general, don't test what the
stamp is, test whether it compares equal in the situations you are expecting, so
that the object can change it's mind about how it will do the stamping without
you having to rewrite the tests.
"""

import hashlib

import numpy as np

from .py3k import bytes, unicode


class Unknown(object):
""" state stamp that never matches
Examples
--------
>>> u = Unknown()
>>> u == u
False
>>> p = Unknown()
>>> u == p
False
Notes
-----
You would think this could be a singleton, but not so, because:
>>> u = Unknown()
>>> (1, u) == (1, u)
True
Why? Because comparisons within sequences in CPython, use
``PyObject_RichCompareBool`` for the elements. See around line 572 in
``objects/tupleobject.c`` and around line 607 in ``objects/object.c`` in
cpython 69528:fecf9e6d7630. This does an identity check equivalent to ``u
is u``; if this passes it does not do a further equality check (``u == u``).
For that reason, if you want to make sure nothing matches ``Unknown()``
within sequences, you need a fresh instances.
"""
_is_unknown = True

def __eq__(self, other):
return False

def __ne__(self, other):
return True

def __repr__(self):
return 'Unknown()'


def is_unknown(obj):
""" Return True if `obj` is an Unknown instance
Examples
--------
>>> is_unknown(Unknown())
True
>>> is_unknown(object())
False
"""
try:
return obj._is_unknown
except AttributeError:
return False


class Stamper(object):
r""" Basic state stamp collector
Instantiate and call on objects to get state stamps
Examples
--------
>>> asker = Stamper()
>>> asker(1) == asker(1)
True
>>> asker(1) == asker(2)
False
>>> asker('a string') == asker('a string')
True
>>> asker(1.0) == asker(1.0)
True
>>> asker(1) == asker(1.0) # different types
False
>>> asker(object()) == asker(object()) # not known -> False
False
List and tuples
>>> L = [1, 2]
>>> asker(L) == asker([1, 2])
True
>>> L[0] = 3
>>> asker(L) == asker([1, 2])
False
>>> T = (1, 2)
>>> asker(T) == asker((1, 2))
True
>>> asker(T) == asker([1, 2])
False
>>> asker([1, object()]) == asker([1, object()])
False
If your object implements ``state_stamper``, you can customized the
behavior.
>>> class D(object):
... def state_stamper(self, cstate):
... return 28
>>> asker(D()) == asker(D())
True
"""
def __init__(self, funcs = None):
""" Initialize stamper with optional functions ``funcs``
Parameters
----------
funcs : sequence of callables, optional
callables that will be called to process otherwise unknown objects.
The signature for the callable is ``f(obj, caller)`` where `obj` is
the object being stamped, and ``caller`` will be the
``Stamper``-like object from which the function will be called.
Examples
--------
>>> st = Stamper()
>>> st((1, object())) == st((1, object()))
False
>>> def func(obj, caller):
... return type(obj), 28
>>> st2 = Stamper((func,))
>>> st2((1, object())) == st2((1, object()))
True
"""
if funcs is None:
funcs = []
self.funcs = list(funcs)
# In case custom objects want an intermediate store
self.call_state = {}

def __call__(self, obj):
r""" Get state stamp for object `obj`
Parmeters
---------
obj : object
Object for which to extract state stamp
Returns
-------
stamp_state : object
state stamp. This is an object that compares equal to another
object in the same `state`
"""
# Reset call state, in case someone wants to use it
self.call_state = {}
# None passes through
if obj is None:
return None
tobj = type(obj)
# Pass through classes before doing method check on instance
if tobj == type: # class
return type, obj
try:
return obj.state_stamper(self)
except AttributeError:
pass
# Immutable objects are their own state stamps
if tobj in (str, unicode, bytes, int, float):
return tobj, obj
if tobj is dict:
return dict, sorted(self(obj.items()))
# Recurse into known sequence types
if tobj in (list, tuple):
return tobj(self(v) for v in obj)
# Try any additional functions we know about
for func in self.funcs:
res = func(obj, self)
if not res is None and not is_unknown(res):
return res
return Unknown()


class NdaStamper(Stamper):
r""" Collect state stamps, using byte buffers for smallish ndarrays
>>> nda_asker = NdaStamper()
The standard Stamper behavior
>>> nda_asker(1) == nda_asker(1)
True
Can also deal with small arrays by hashing byte contents:
>>> arr = np.zeros((3,), dtype=np.int16)
>>> nda_asker(arr) == nda_asker(arr)
True
Depending on the threshold for the number of bytes:
>>> small_asker = NdaStamper(byte_thresh=5)
>>> small_asker(arr)
Unknown()
"""
def __init__(self, funcs = None, byte_thresh = 2**16):
self.byte_thresh = byte_thresh
if funcs is None:
funcs = []
def _proc_array(obj, cstate):
if type(obj) is np.ndarray and obj.nbytes <= byte_thresh:
return (type(obj),
tuple(obj.shape),
obj.dtype,
hashlib.md5(obj.tostring()).digest())
super(NdaStamper, self).__init__(list(funcs) + [_proc_array])
78 changes: 78 additions & 0 deletions nibabel/tests/test_analyze.py
Original file line number Diff line number Diff line change
@@ -26,6 +26,8 @@
from ..loadsave import read_img_data
from .. import imageglobals
from ..casting import as_int
from ..stampers import Stamper
from ..stampers import Stamper, NdaStamper

from numpy.testing import (assert_array_equal,
assert_array_almost_equal)
@@ -450,6 +452,18 @@ def test_base_affine(self):
[ 0., 0., 1., -3.],
[ 0., 0., 0., 1.]])

def test_state_stamp(self):
# Test state stamp is sensitive to state
klass = self.header_class
hdr1 = klass()
hdr2 = klass()
stamper = Stamper()
assert_equal(stamper(hdr1), stamper(hdr2))
hdr1.set_data_shape((3,5,7))
assert_not_equal(stamper(hdr1), stamper(hdr2))
hdr2.set_data_shape((3,5,7))
assert_equal(stamper(hdr1), stamper(hdr2))


def test_best_affine():
hdr = AnalyzeHeader()
@@ -512,6 +526,48 @@ def test_data_code_error():
class TestAnalyzeImage(tsi.TestSpatialImage):
image_class = AnalyzeImage

def test_state_stamper(self):
# Extend tests of state stamping
super(TestAnalyzeImage, self).test_state_stamper()
# Test modifications of header
stamper = NdaStamper()
img_klass = self.image_class
hdr_klass = self.image_class.header_class
# The first test we have done in the parent, but just for completeness
arr = np.arange(5, dtype=np.int16)
aff = np.eye(4)
hdr = hdr_klass()
hdr.set_data_dtype(arr.dtype)
img1 = img_klass(arr, aff, hdr)
img2 = img_klass(arr, aff, hdr)
assert_equal(img1.current_state(), img2.current_state())
assert_equal(stamper(img1), stamper(img2))
hdr['descrip'] = asbytes('something')
# Doesn't affect original images
assert_equal(img1.current_state(), img2.current_state())
assert_equal(stamper(img1), stamper(img2))
# Does affect new image
img3 = img_klass(arr, aff, hdr)
assert_not_equal(img1.current_state(), img3.current_state())
assert_not_equal(stamper(img1), stamper(img3))

def test_maybe_changed(self):
# Check that changing header signaled in maybe_changed
super(TestAnalyzeImage, self).test_maybe_changed()
# Test modifications of header
img_klass = self.image_class
# The first test we have done in the parent, but just for completeness
arr = np.arange(5, dtype=np.int16)
aff = np.eye(4)
# Get an adapted header
hdr = img_klass(arr, aff).get_header()
img = img_klass(arr, aff, hdr)
assert_false(img.maybe_changed())
# Changing the header in the image signaled in maybe_change
ihdr = img.get_header()
ihdr['descrip'] = asbytes('something')
assert_true(img.maybe_changed())

def test_data_hdr_cache(self):
# test the API for loaded images, such that the data returned
# from img.get_data() is not affected by subsequent changes to
@@ -590,6 +646,28 @@ def test_header_updating(self):
assert_array_equal(img_back.shape, (3, 2, 4))


def test_load_caching(self):
# Check save / load change recording
img_klass = self.image_class
arr = np.arange(5, dtype=np.int16)
aff = np.diag([2.0,3,4,1])
img = img_klass(arr, aff)
for key in img.file_map:
img.file_map[key].fileobj = BytesIO()
img.to_file_map()
img2 = img.from_file_map(img.file_map)
assert_false(img2.maybe_changed())
# Save loads data, so changes image
img2.to_file_map()
assert_true(img2.maybe_changed())
# New loading resets change flag
img3 = img.from_file_map(img2.file_map)
assert_false(img3.maybe_changed())
# Loading data makes change impossible to detect
data = img3.get_data()
assert_true(img3.maybe_changed())


def test_unsupported():
# analyze does not support uint32
data = np.arange(24, dtype=np.int32).reshape((2,3,4))
122 changes: 122 additions & 0 deletions nibabel/tests/test_arrayproxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# emacs: -*- mode: python-mode; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the NiBabel package for the
# copyright and license terms.
#
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
""" Tests for arrayproxy module
"""
from __future__ import with_statement

from copy import deepcopy

from ..py3k import BytesIO, ZEROB, asbytes
from ..tmpdirs import InTemporaryDirectory

import numpy as np

from ..arrayproxy import ArrayProxy
from ..nifti1 import Nifti1Header
from ..stampers import Stamper

from numpy.testing import assert_array_equal, assert_array_almost_equal
from nose.tools import (assert_true, assert_false, assert_equal,
assert_not_equal, assert_raises)


class FunkyHeader(object):
def __init__(self, shape):
self.shape = shape

def copy(self):
return self.__class__(self.shape[:])

def get_data_shape(self):
return self.shape[:]

def data_from_fileobj(self, fileobj):
return np.arange(np.prod(self.shape)).reshape(self.shape)


def test_init():
bio = BytesIO()
shape = [2,3,4]
hdr = FunkyHeader(shape)
ap = ArrayProxy(bio, hdr)
assert_true(ap.file_like is bio)
assert_equal(ap.shape, shape)
# shape should be read only
assert_raises(AttributeError, setattr, ap, 'shape', shape)
# Check there has been a copy of the header
assert_false(ap.header is hdr)
# Check we can modify the original header without changing the ap version
hdr.shape[0] = 6
assert_not_equal(ap.shape, shape)
# Get the data
assert_array_equal(np.asarray(ap), np.arange(24).reshape((2,3,4)))


def write_raw_data(arr, hdr, fileobj):
hdr.set_data_shape(arr.shape)
hdr.set_data_dtype(arr.dtype)
fileobj.write(ZEROB * hdr.get_data_offset())
fileobj.write(arr.tostring(order='F'))
return hdr


def test_nifti1_init():
bio = BytesIO()
shape = (2,3,4)
hdr = Nifti1Header()
arr = np.arange(24, dtype=np.int16).reshape(shape)
write_raw_data(arr, hdr, bio)
hdr.set_slope_inter(2, 10)
ap = ArrayProxy(bio, hdr)
assert_true(ap.file_like == bio)
assert_equal(ap.shape, shape)
# Check there has been a copy of the header
assert_false(ap.header is hdr)
# Get the data
assert_array_equal(np.asarray(ap), arr * 2.0 + 10)
with InTemporaryDirectory():
f = open('test.nii', 'wb')
write_raw_data(arr, hdr, f)
f.close()
ap = ArrayProxy('test.nii', hdr)
assert_true(ap.file_like == 'test.nii')
assert_equal(ap.shape, shape)
assert_array_equal(np.asarray(ap), arr * 2.0 + 10)


def test_state_stamp():
# Stamps
bio = BytesIO()
shape = (2, 3, 4)
hdr = FunkyHeader(shape)
ap = ArrayProxy(bio, hdr)
stamper = Stamper()
# The header is unstampable in this case
assert_not_equal(stamper(ap), stamper(ap))
# Nifti is stampable
hdr = Nifti1Header()
ap1 = ArrayProxy(bio, hdr)
ap2 = ArrayProxy(bio, hdr)
assert_equal(stamper(ap1), stamper(ap2))
ap3 = ArrayProxy('afilename', hdr)
ap4 = ArrayProxy('afilename', hdr)
assert_equal(stamper(ap3), stamper(ap4))
assert_not_equal(stamper(ap1), stamper(ap3))
# write some data to check arr != proxy
arr = np.arange(24, dtype=np.int16).reshape(shape) + 100
new_hdr = write_raw_data(arr, hdr, bio)
ap5 = ArrayProxy(bio, new_hdr)
assert_equal(stamper(ap5), stamper(ArrayProxy(bio, new_hdr)))
# Reading the data makes the arrayproxy unstampable, because the data is now
# modifiable outside the proxy if we modify the returned array in place.
arr_back = np.asanyarray(ap5)
assert_not_equal(stamper(ap1), stamper(ap5))
# Check that the proxy does not seem to be the same as the array
assert_array_equal(arr, arr_back)
assert_not_equal(stamper(arr), stamper(ap5))
65 changes: 50 additions & 15 deletions nibabel/tests/test_fileholders.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
""" Testing fileholders
"""

from StringIO import StringIO

import numpy as np
from ..py3k import BytesIO

from ..fileholders import FileHolder, FileHolderError, copy_file_map
from ..tmpdirs import InTemporaryDirectory
from ..stampers import Stamper

from numpy.testing import (assert_array_almost_equal,
assert_array_equal)

from nose.tools import assert_true, assert_false, assert_equal, assert_raises
from nose.tools import (assert_true, assert_false,
assert_equal, assert_not_equal,
assert_raises)


def test_init():
fh = FileHolder('a_fname')
assert_equal(fh.filename, 'a_fname')
assert_true(fh.fileobj is None)
assert_equal(fh.pos, 0)
sio0 = StringIO()
fh = FileHolder('a_test', sio0)
bio = BytesIO()
fh = FileHolder('a_test', bio)
assert_equal(fh.filename, 'a_test')
assert_true(fh.fileobj is sio0)
assert_true(fh.fileobj is bio)
assert_equal(fh.pos, 0)
fh = FileHolder('a_test_2', sio0, 3)
fh = FileHolder('a_test_2', bio, 3)
assert_equal(fh.filename, 'a_test_2')
assert_true(fh.fileobj is sio0)
assert_true(fh.fileobj is bio)
assert_equal(fh.pos, 3)


@@ -35,19 +36,53 @@ def test_same_file_as():
assert_true(fh.same_file_as(fh))
fh2 = FileHolder('a_test')
assert_false(fh.same_file_as(fh2))
sio0 = StringIO()
fh3 = FileHolder('a_fname', sio0)
fh4 = FileHolder('a_fname', sio0)
bio = BytesIO()
fh3 = FileHolder('a_fname', bio)
fh4 = FileHolder('a_fname', bio)
assert_true(fh3.same_file_as(fh4))
assert_false(fh3.same_file_as(fh))
fh5 = FileHolder(fileobj=sio0)
fh6 = FileHolder(fileobj=sio0)
fh5 = FileHolder(fileobj=bio)
fh6 = FileHolder(fileobj=bio)
assert_true(fh5.same_file_as(fh6))
# Not if the filename is the same
assert_false(fh5.same_file_as(fh3))
# pos doesn't matter
fh4_again = FileHolder('a_fname', sio0, pos=4)
fh4_again = FileHolder('a_fname', bio, pos=4)
assert_true(fh3.same_file_as(fh4_again))


def test_stamping():
# Test stamping works as expected
stamper = Stamper()
fh1 = FileHolder('a_fname')
fh2 = FileHolder('a_fname')
assert_equal(stamper(fh1), stamper(fh2))
fh3 = FileHolder('a_test')
assert_not_equal(stamper(fh1), stamper(fh3))
bio = BytesIO()
fh4 = FileHolder('a_fname', bio)
fh5 = FileHolder('a_fname', bio)
assert_equal(stamper(fh4), stamper(fh5))
fh6 = FileHolder('a_fname2', bio)
assert_not_equal(stamper(fh4), stamper(fh6))
assert_equal((fh4.pos, fh5.pos), (0, 0))
fh5.pos = 1
assert_not_equal(stamper(fh4), stamper(fh5))
fh4 = FileHolder(fileobj=bio)
fh5 = FileHolder(fileobj=bio)
assert_equal(stamper(fh4), stamper(fh5))
assert_equal((fh4.pos, fh5.pos), (0, 0))
fh5.pos = 1
assert_not_equal(stamper(fh4), stamper(fh5))


def test_copy_file_map():
# Test copy of fileholder using stamping
bio = BytesIO()
fm = dict(one=FileHolder('a_fname', bio), two=FileHolder('a_fname2'))
fm2 = copy_file_map(fm)
stamper = Stamper()
assert_equal(stamper(fm), stamper(fm2))
# Check you can modify the copies independently
fm['one'].pos = 2
assert_not_equal(stamper(fm), stamper(fm2))
39 changes: 38 additions & 1 deletion nibabel/tests/test_nifti1.py
Original file line number Diff line number Diff line change
@@ -20,10 +20,11 @@
from ..nifti1 import (load, Nifti1Header, Nifti1PairHeader, Nifti1Image,
Nifti1Pair, Nifti1Extension, Nifti1Extensions,
data_type_codes, extension_codes, slice_order_codes)
from ..stampers import Stamper

from numpy.testing import assert_array_equal, assert_array_almost_equal
from nose.tools import (assert_true, assert_false, assert_equal,
assert_raises)
assert_not_equal, assert_raises)
from nose import SkipTest

from ..testing import data_path
@@ -60,6 +61,21 @@ def test_from_eg_file(self):
assert_equal(hdr['magic'], asbytes('ni1'))
assert_equal(hdr['sizeof_hdr'], 348)

def test_state_stamp(self):
super(TestNifti1PairHeader, self).test_state_stamp()
# Check that extensions alter state
hdr1 = self.header_class(extensions = Nifti1Extensions())
hdr2 = self.header_class(extensions = Nifti1Extensions())
stamper = Stamper()
assert_equal(stamper(hdr1), stamper(hdr2))
ext = Nifti1Extension('comment', '123')
hdr3 = self.header_class(extensions = Nifti1Extensions((ext,)))
assert_not_equal(stamper(hdr1), stamper(hdr3))
# No extensions stamp at the moment, so any extensions render the stamps
# unequal
hdr4 = self.header_class(extensions = Nifti1Extensions((ext,)))
assert_not_equal(stamper(hdr3), stamper(hdr4))

def test_nifti_log_checks(self):
# in addition to analyze header checks
HC = self.header_class
@@ -172,6 +188,14 @@ def test_binblock_is_file(self):
hdr.write_to(str_io)
assert_equal(str_io.getvalue(), hdr.binaryblock + ZEROB * 4)

def test_state_stamp(self):
# Check that this (single) header differs in stamp from pair
super(TestNifti1PairHeader, self).test_state_stamp()
hdr = self.header_class()
super_hdr = Nifti1PairHeader()
stamper = Stamper()
assert_not_equal(stamper(hdr), stamper(super_hdr))


class TestNifti1Image(tana.TestAnalyzeImage):
# Run analyze-flavor spatialimage tests
@@ -493,6 +517,19 @@ def test_extension_list():
assert_true(ext_c0 == ext_c1)


def test_extension_stamping():
# Test we can stamp extension lists
ext_c0 = Nifti1Extensions()
ext_c1 = Nifti1Extensions()
stamper = Stamper()
assert_equal(stamper(ext_c0), stamper(ext_c1))
ext = Nifti1Extension('comment', '123')
ext_c1.append(ext)
assert_not_equal(stamper(ext_c0), stamper(ext_c1))
ext_c1.remove(ext)
assert_equal(stamper(ext_c0), stamper(ext_c1))


def test_nifti_extensions():
nim = load(image_file)
# basic checks of the available extensions
96 changes: 96 additions & 0 deletions nibabel/tests/test_spatialimages.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@

import numpy as np

from ..stampers import Stamper, NdaStamper
from ..spatialimages import (Header, SpatialImage, HeaderDataError,
ImageDataError)

@@ -166,6 +167,22 @@ def test_read_data():
assert_array_equal(data, data2)


def test_hdr_state_stamper():
# State stamping for template header class
stamper = Stamper()
hdr1 = Header(np.int32, shape=(1,2,3), zooms=(3.0, 2.0, 1.0))
hdr2 = Header(np.int32, shape=(1,2,3), zooms=(3.0, 2.0, 1.0))
assert_equal(stamper(hdr1), stamper(hdr2))
hdr3 = Header('i4', shape=[1,2,3], zooms=[3.0, 2.0, 1.0])
assert_equal(stamper(hdr1), stamper(hdr3))
hdr4 = Header('i2', shape=[1,2,3], zooms=[3.0, 2.0, 1.0])
assert_not_equal(stamper(hdr1), stamper(hdr4))
hdr5 = Header('i4', shape=[6,2,3], zooms=[3.0, 2.0, 1.0])
assert_not_equal(stamper(hdr1), stamper(hdr5))
hdr6 = Header('i4', shape=[1,2,3], zooms=[3.1, 2.0, 1.0])
assert_not_equal(stamper(hdr1), stamper(hdr6))


class DataLike(object):
# Minimal class implementing 'data' API
shape = (3,)
@@ -258,3 +275,82 @@ def test_get_shape(self):
assert_equal(img.get_shape(), (1,))
img = img_klass(np.zeros((2,3,4), np.int16), np.eye(4))
assert_equal(img.get_shape(), (2,3,4))

def test_state_stamper(self):
img_klass = self.image_class
hdr_klass = self.image_class.header_class
stamper = NdaStamper()
# Assumes all possible images support int16
# See https://github.com/nipy/nibabel/issues/58
arr = np.arange(5, dtype=np.int16)
aff = np.eye(4)
img1 = img_klass(arr, aff)
img2 = img_klass(arr, aff)
# The test depends on the imput array being small enough to stamp
assert_equal(img1.current_state(), img2.current_state())
assert_equal(img1.current_state(stamper),
img2.current_state(stamper))
assert_equal(stamper(img1), stamper(img2))
img3 = img_klass(arr + 1, aff)
assert_not_equal(img1.current_state(), img3.current_state())
assert_not_equal(stamper(img1), stamper(img3))
img4 = img_klass(arr, np.diag([1,1,2,1]))
assert_not_equal(img1.current_state(), img4.current_state())
assert_not_equal(stamper(img1), stamper(img4))
# passing a default header should be the same as passing no header
hdr = hdr_klass()
hdr.set_data_dtype(arr.dtype)
img5 = img_klass(arr, aff, hdr)
assert_equal(img1.current_state(), img5.current_state())
assert_equal(stamper(img1), stamper(img5))
# Modifying the filemap makes the images unequal
fm_key = list(img_klass.make_file_map().keys())[0]
old_filename = img5.file_map[fm_key].filename
img5.file_map[fm_key].filename = 'test.img'
assert_not_equal(img1.current_state(), img5.current_state())
assert_not_equal(stamper(img1), stamper(img5))
img5.file_map[fm_key].filename = old_filename
assert_equal(img1.current_state(), img5.current_state())
assert_equal(stamper(img1), stamper(img5))

def test_maybe_changed(self):
# Mechanism for checking whether image has changed since initialization
img_klass = self.image_class
arr = np.arange(5, dtype=np.int16)
aff = np.eye(4)
# All image types need to implement int16
img = img_klass(arr, aff)
# Get header back that has been customized to this array
hdr = img.get_header()
# Pass back into image expecting no modifications this time
img = img_klass(arr, aff, hdr)
assert_false(img.maybe_changed())
# Changes to affine or header used in init do not change img
aff[0,0] = 1.1
assert_false(img.maybe_changed())
hdr.set_zooms((2,))
assert_false(img.maybe_changed())
# Changing the affine, header in the image does cause change
iaff = img.get_affine()
ihdr = img.get_header()
iaff[0,0] = 1.2
assert_true(img.maybe_changed())
# we can reset
img.reset_changed()
assert_false(img.maybe_changed())
ihdr.set_zooms((3,))
assert_true(img.maybe_changed())
# we can reset
img.reset_changed()
assert_false(img.maybe_changed())
# Data changes always result in image changes
arr[0] = 99
assert_true(img.maybe_changed())
img.reset_changed()
# Filemap changes change too
fm_key = list(img_klass.make_file_map().keys())[0]
old_filename = img.file_map[fm_key].filename
img.file_map[fm_key].filename = 'test.img'
assert_true(img.maybe_changed())
img.file_map[fm_key].filename = old_filename
assert_false(img.maybe_changed())
122 changes: 122 additions & 0 deletions nibabel/tests/test_stampers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
""" Testing state stamping
"""

import numpy as np

from ..py3k import ZEROB, asbytes

from ..stampers import Unknown, is_unknown, Stamper, NdaStamper

from numpy.testing import (assert_array_almost_equal,
assert_array_equal)

from nose.tools import (assert_true, assert_false, assert_equal,
assert_not_equal, assert_raises)


def test_uknown():
# Unknown singleton-like
u = Unknown()
assert_equal(repr(u), 'Unknown()')
assert_equal(str(u), 'Unknown()')
assert_false(u == u)
assert_true(u != u)
assert_not_equal(u, u)
p = Unknown()
assert_not_equal(u, p)
assert_true(is_unknown(u))
assert_false(is_unknown(1))
# Note - this _is_ equal
# assert_not_equal((1, u), (1, u))


def test_stamper():
# state_stamp can can from
# * state_stamper() method
# Some immutables -> themselves
# Otherwise the signature is Unknown()
class D(object): # Class for testing get_signature
def state_stamper(self, cstate):
return self.__class__, 28
for ster in (Stamper(), NdaStamper()):
assert_equal(ster(None), ster(None))
assert_not_equal(ster(None), ster(1))
assert_equal(ster(1), ster(1))
assert_not_equal(ster(1), ster(2))
assert_equal(ster('a string'), ster('a string'))
assert_not_equal(ster('a string'), ster(1))
bs = asbytes('some bytes')
assert_equal(ster(bs), ster(bs))
assert_equal(ster(1.0), ster(1.0))
assert_not_equal(ster(1.0), ster(1))
# an anonymous object, usually not stampable
ob = object()
assert_not_equal(ster(ob), ster(ob))
L = [1, 2]
T = (1, 2)
assert_equal(ster(L), ster(L[:]))
assert_equal(ster(T), ster(T[:]))
assert_not_equal(ster(L), ster(T))
assert_not_equal(ster((1, ob)), ster((1, ob)))
d1 = D()
d2 = D()
assert_equal(ster(d1), ster(d2))
# Dictionaries
di1 = dict(a = 1, b = 2)
# Entry order does not matter - but this is difficult to test because
# key order is not defined
di2 = dict(b = 2, a = 1)
assert_equal(ster(di1), ster(di2))
# Dictionaries different
assert_not_equal(ster(di1), ster(dict(b = 2, a = 2)))
assert_not_equal(ster(di1), ster(dict(b = 2, c = 1)))
# They are not just defined by their items, but by their type
assert_not_equal(ster(di1), ster(di2.items()))
# Inherited types don't work because they might have more state
class MyList(list): pass
class MyTuple(tuple): pass
class MyDict(dict): pass
assert_not_equal(ster(MyList((1,2))), ster(MyList((1,2))))
assert_not_equal(ster(MyTuple((1,2))), ster(MyTuple((1,2))))
assert_not_equal(ster(MyDict(a=1, b=2)), ster(MyDict(a=1, b=2)))
# Classes pass through, even if they have state_stamper methods
assert_equal(ster(D), ster(D))


def test_nda_stamper():
# Arrays work if they are small
nda_ster = NdaStamper()
arr1 = np.zeros((3,), dtype=np.int16)
arr2 = np.zeros((3,), dtype=np.int16)
assert_equal(nda_ster(arr1), nda_ster(arr2))
# The data has to be the same
arr2p1 = arr2.copy()
arr2p1[0] = 1
assert_not_equal(nda_ster(arr1), nda_ster(arr2p1))
# Comparison depends on the byte threshold
nda_ster5 = NdaStamper(byte_thresh = 5)
assert_not_equal(nda_ster5(arr1), nda_ster5(arr1))
# Byte thresh gets passed down to iterations of lists
assert_equal(nda_ster([1, arr1]), nda_ster([1, arr2]))
assert_not_equal(nda_ster5([1, arr1]), nda_ster5([1, arr1]))
# Arrays in dicts
d1 = dict(a = 1, b = arr1)
d2 = dict(a = 1, b = arr2)
assert_equal(nda_ster(d1), nda_ster(d2))
# Byte thresh gets passed down to iterations of dicts
assert_not_equal(nda_ster5(d1), nda_ster5(d1))
# Make sure strings distinguished from arrays
bs = asbytes('byte string')
sarr = np.array(bs, dtype = 'S')
assert_equal(nda_ster(sarr), nda_ster(sarr.copy()))
assert_not_equal(nda_ster(sarr), nda_ster(bs))
# shape and dtype also distinguished
arr3 = arr2.reshape((1,3))
assert_not_equal(nda_ster(arr1), nda_ster(arr3))
arr4 = arr3.reshape((3,))
assert_equal(nda_ster(arr1), nda_ster(arr4))
arr5 = arr1.newbyteorder('s')
assert_array_equal(arr1, arr5)
assert_not_equal(nda_ster(arr1), nda_ster(arr5))
arr6 = arr5.newbyteorder('s')
assert_equal(nda_ster(arr1), nda_ster(arr6))