Skip to content

Commit

Permalink
Align RecordBatch buffers retrieved via IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Oct 1, 2024
1 parent b6316c0 commit 77cc70a
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 1 deletion.
15 changes: 15 additions & 0 deletions cpp/src/arrow/array/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <utility>
#include <vector>
#include <arrow/util/range.h>

#include "arrow/array/statistics.h"
#include "arrow/buffer.h"
Expand Down Expand Up @@ -232,6 +233,20 @@ struct ARROW_EXPORT ArrayData {
return null_count.load() != length;
}

Status AlignBuffers() {
// align buffers according to their data type's layout
for (auto&& [buffer, layout] : internal::Zip(buffers, type->layout().buffers)) {
if (layout.kind == DataTypeLayout::FIXED_WIDTH && buffer->address() % layout.byte_width) {
RETURN_NOT_OK(Buffer::Copy(buffer, buffer->memory_manager()).Value(&buffer));
}
}
// align children data recursively
for (unsigned int i=0; i<child_data.size(); i++) {
RETURN_NOT_OK(child_data[i]->AlignBuffers());
}
return Status::OK();
}

// Access a buffer's data as a typed C pointer
template <typename T>
inline const T* GetValues(int i, int64_t absolute_offset) const {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ struct ARROW_EXPORT IpcReadOptions {
/// RecordBatchStreamReader and StreamDecoder classes.
bool ensure_native_endian = true;

/// \brief Whether to align incoming data if mis-aligned
///
/// Received mis-aligned data is copied to aligned memory locations.
bool ensure_memory_alignment = true;

/// \brief Options to control caching behavior when pre-buffering is requested
///
/// The lazy property will always be reset to true to deliver the expected behavior
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
return Status::IOError("Array length did not match record batch length");
}
columns[i] = std::move(column);
if (context.options.ensure_memory_alignment) {
RETURN_NOT_OK(columns[i]->AlignBuffers());
}
if (inclusion_mask) {
filtered_columns.push_back(columns[i]);
filtered_fields.push_back(schema->field(i));
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
vector[int] included_fields
c_bool use_threads
c_bool ensure_native_endian
c_bool ensure_memory_alignment

@staticmethod
CIpcReadOptions Defaults()
Expand Down
12 changes: 12 additions & 0 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ cdef class IpcReadOptions(_Weakrefable):
----------
ensure_native_endian : bool, default True
Whether to convert incoming data to platform-native endianness.
ensure_memory_alignment : bool, default True
Whether to align incoming data if mis-aligned.
use_threads : bool
Whether to use the global CPU thread pool to parallelize any
computational tasks like decompression
Expand All @@ -133,9 +135,11 @@ cdef class IpcReadOptions(_Weakrefable):
# cdef block is in lib.pxd

def __init__(self, *, bint ensure_native_endian=True,
bint ensure_memory_alignment=True,
bint use_threads=True, list included_fields=None):
self.c_options = CIpcReadOptions.Defaults()
self.ensure_native_endian = ensure_native_endian
self.ensure_memory_alignment = ensure_memory_alignment
self.use_threads = use_threads
if included_fields is not None:
self.included_fields = included_fields
Expand All @@ -148,6 +152,14 @@ cdef class IpcReadOptions(_Weakrefable):
def ensure_native_endian(self, bint value):
self.c_options.ensure_native_endian = value

@property
def ensure_memory_alignment(self):
return self.c_options.ensure_memory_alignment

@ensure_memory_alignment.setter
def ensure_memory_alignment(self, bint value):
self.c_options.ensure_memory_alignment = value

@property
def use_threads(self):
return self.c_options.use_threads
Expand Down
8 changes: 7 additions & 1 deletion python/pyarrow/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,11 +548,16 @@ def test_read_options():
options = pa.ipc.IpcReadOptions()
assert options.use_threads is True
assert options.ensure_native_endian is True
assert options.ensure_memory_alignment is True
assert options.ens is True
assert options.included_fields == []

options.ensure_native_endian = False
assert options.ensure_native_endian is False

options.ensure_memory_alignment = False
assert options.ensure_memory_alignment is False

options.use_threads = False
assert options.use_threads is False

Expand All @@ -564,10 +569,11 @@ def test_read_options():

options = pa.ipc.IpcReadOptions(
use_threads=False, ensure_native_endian=False,
included_fields=[1]
ensure_memory_alignment=False, included_fields=[1]
)
assert options.use_threads is False
assert options.ensure_native_endian is False
assert options.ensure_memory_alignment is False
assert options.included_fields == [1]


Expand Down

0 comments on commit 77cc70a

Please sign in to comment.