Skip to content

Commit

Permalink
Complete overhaul of interaction between KaitaiStream and KaitaiStruct
Browse files Browse the repository at this point in the history
  • Loading branch information
KOLANICH committed Jun 11, 2020
1 parent 4b97420 commit 6b30257
Showing 1 changed file with 166 additions and 25 deletions.
191 changes: 166 additions & 25 deletions kaitaistruct.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import typing
import itertools
import sys
import struct
from io import open, BytesIO, SEEK_CUR, SEEK_END # noqa
from io import IOBase
import mmap
from pathlib import Path
from abc import ABC, abstractmethod

PY2 = sys.version_info[0] == 2

Expand All @@ -15,51 +20,187 @@
__version__ = '0.9'


class KaitaiStruct(object):
def __init__(self, stream):
self._io = stream
class _NonClosingNonParsingKaitaiStruct:
__slots__ = ("_io", "_parent", "_root")
def __init__(self, _io:"KaitaiStream", _parent:typing.Optional["_NonClosingNonParsingKaitaiStruct"]=None, _root:typing.Optional["_NonClosingNonParsingKaitaiStruct"]=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self


class NonClosingKaitaiStruct(_NonClosingNonParsingKaitaiStruct, ABC):
__slots__ = ()

@abstractmethod
def _read(self):
raise NotImplementedError()


class KaitaiStruct(NonClosingKaitaiStruct):
__slots__ = ("_shouldExit",)
def __init__(self, io: typing.Union["KaitaiStream", Path, bytes, str]):
if not isinstance(io, KaitaiStream):
io = KaitaiStream(io)
super.__init__(io)
self._shouldExit = False

def __enter__(self):
self._shouldExit = not stream.is_entered
if self._shouldExit:
self._io.__enter__()
return self

def __exit__(self, *args, **kwargs):
self.close()
if self.shouldExit:
self._io.__exit__(*args, **kwargs)

def close(self):
self._io.close()
@classmethod
def from_any(cls, o: typing.Union[Path, str]) -> "KaitaiStruct":
with KaitaiStream(o) as io:
s = cls(io)
s._read()
return s

@classmethod
def from_file(cls, filename):
f = open(filename, 'rb')
try:
return cls(KaitaiStream(f))
except Exception:
# close file descriptor, then reraise the exception
f.close()
raise
def from_file(cls, file: typing.Union[Path, str, _io._BufferedIOBase], use_mmap:bool=True) -> "KaitaiStruct":
return cls.from_any(file, use_mmap=use_mmap)

@classmethod
def from_bytes(cls, buf):
return cls(KaitaiStream(BytesIO(buf)))
def from_bytes(cls, data: bytes) -> "KaitaiStruct":
return cls.from_any(data)

@classmethod
def from_io(cls, io):
return cls(KaitaiStream(io))
def from_io(cls, io: IOBase) -> "KaitaiStruct":
return cls.from_any(io)


class KaitaiStream(object):
def __init__(self, io):
self._io = io
self.align_to_byte()
class IKaitaiDownStream(ABC):
__slots__ =("_io",)

def __init__(self, _io: typing.Any):
self._io = _io

@abstractmethod
@property
def is_entered(self):
raise NotImplementedError

@abstractmethod
def __enter__(self):
raise NotImplementedError()

def __exit__(self, *args, **kwargs):
if self.is_entered:
self._io.__exit__(*args, **kwargs)
self._io = None


class KaitaiIODownStream(IKaitaiDownStream):
__slots__ = ()
def __init__(self, data: typing.Any):
super().__init__(data)

@property
def is_entered(self):
return isinstance(self._io, IOBase)

def __enter__(self):
if not self.is_entered
self._io = open(self._io).__enter__()
return self


class KaitaiBytesDownStream(KaitaiIODownStream):
__slots__ = ()
def __init__(self, data: bytes):
super().__init__(data)


class KaitaiFileSyscallDownStream(KaitaiIODownStream):
__slots__ = ()
def __init__(self, io: typing.Union[Path, str, IOBase]):
if isinstance(io, str):
io = Path(io)
super().__init__(io)


class KaitaiFileMapDownStream(IKaitaiFileDownStream):
__slots__ = ("file",)
def __init__(self, io: typing.Union[Path, str, IOBase]):
super().__init__(None)
self.file = KaitaiFileSyscallDownStream(io)

@property
def is_entered(self):
return isinstance(self._io, mmap.mmap)

def __enter__(self):
self.file = self.file.__enter__()
self._io = mmap.mmap(self.file.file.fileno(), 0, access=mmap.ACCESS_READ).__enter__()
return self

def __exit__(self, *args, **kwargs):
self.close()
super().__exit__(*args, **kwargs)
if self.file is not None:
self.file.__exit__(*args, **kwargs)
self.file = None


def get_file_down_stream(path: Path, *args, use_mmap: bool=True, **kwargs) -> IKaitaiDownStream:
if use_mmap:
cls = KaitaiFileMapDownStream
else:
cls = KaitaiFileSyscallDownStream
return cls(path, *args, **kwargs)


downstreamMapping = {
bytes: KaitaiBytesDownStream,
BytesIO: KaitaiBytesDownStream,
str: get_file_down_stream,
Path: get_file_down_stream,
_io._BufferedIOBase: get_file_down_stream,
}


def get_downstream_ctor(x) -> typing.Type[IKaitaiDownStream]:
ctor = downstreamMapping.get(t, None)
if ctor:
return ctor
types = t.mro()
for t1 in types[1:]:
ctor = downstreamMapping.get(t1, None)
if ctor:
downstreamMapping[t] = ctor
return ctor
raise TypeError("Unsupported type", t, types)


def get_downstream(x: typing.Union[bytes, str, Path], *args, **kwargs) -> IKaitaiDownStream:
return get_downstream_ctor(type(x))(x, *args, **kwargs)


class KaitaiStream():
def __init__(self, o: typing.Union[bytes, str, Path, IKaitaiDownStream]):
if not isinstance(o, IKaitaiDownStream):
o = get_downstream(o)
self._downstream = o
self.align_to_byte()

@property
def _io(self):
return self._downstream._io

def __enter__(self):
self._downstream.__enter__()
return self

def close(self):
self._io.close()
@property
def is_entered(self):
return self._downstream is not None and self._downstream.is_entered

def __exit__(self, *args, **kwargs):
self._downstream.__exit__(*args, **kwargs)

# ========================================================================
# Stream positioning
Expand Down

0 comments on commit 6b30257

Please sign in to comment.