Geometry Nodes: make CSV parser more reliable and faster

This reimplements the CSV parser used by the (still experimental) Import CSV
node.

Reliability is improved by:
* Properly handling quoted fields.
* Unit tests.
* Generalizing the parser to be able to handle customized delimiter, quote and
  escape characters (those are not exposed in the node yet though).
* More accurate detection of column types by actually taking all values of a
  column into account instead of only the first row.

Performance is improved by designing the parser in a way that supports
multi-threaded parsing. I'm measuring about 5x performance improvement which
mainly comes from multi-threading. Some files I wanted to use for benchmarking
didn't load in the version that's in `main` but do load fine with this new
version.

The implementation is now split up into two parts:
1. A general CSV parser in `blenlib` that manages splitting a buffer into
   records and their fields.
2. Application specific parsing of fields into e.g. floats and integers which
   remains in `io/csv/importer`.

This separation simplifies unit testing and makes the core code more reusable.

Pull Request: https://projects.blender.org/blender/blender/pulls/134715
This commit is contained in:
Jacques Lucke
2025-02-19 11:10:59 +01:00
parent 4ec5d600cd
commit ac2cd6c1ef
5 changed files with 1061 additions and 212 deletions

View File

@@ -0,0 +1,231 @@
/* SPDX-FileCopyrightText: 2025 Blender Authors
*
* SPDX-License-Identifier: GPL-2.0-or-later */
#include "BLI_any.hh"
#include "BLI_function_ref.hh"
#include "BLI_offset_indices.hh"
#include "BLI_string_ref.hh"
#include "BLI_vector.hh"
namespace blender::csv_parse {
/**
* Contains the fields of a single record of a .csv file. Usually that corresponds to a single
* line.
*/
class CsvRecord {
private:
Span<Span<char>> fields_;
public:
CsvRecord(Span<Span<char>> fields);
/** Number of fields in the record. */
int64_t size() const;
IndexRange index_range() const;
/** Get the field at the given index. Empty data is returned if the index is too large. */
Span<char> field(const int64_t index) const;
StringRef field_str(const int64_t index) const;
};
/**
* Contains the fields of multiple records.
*/
class CsvRecords {
private:
OffsetIndices<int64_t> offsets_;
Span<Span<char>> fields_;
public:
CsvRecords(OffsetIndices<int64_t> offsets, Span<Span<char>> fields);
/** Number of records (rows). */
int64_t size() const;
IndexRange index_range() const;
/** Get the record at the given index. */
CsvRecord record(const int64_t index) const;
};
struct CsvParseOptions {
/** The character that separates fields within a row. */
char delimiter = ',';
/**
* The character that can be used to enclose fields which contain the delimiter or span multiple
* lines.
*/
char quote = '"';
/**
* Characters that can be used to escape the quote character. By default, "" or \" both represent
* an escaped quote.
*/
Span<char> quote_escape_chars = Span<char>(StringRef("\"\\"));
/** Approximate number of bytes per chunk that the input is split into. */
int64_t chunk_size_bytes = 64 * 1024;
};
/**
* Parses a .csv file. There are two important aspects to the way this interface is designed:
* 1. It allows the file to be split into chunks that can be parsed in parallel.
* 2. Splitting the file into individual records and fields is separated from parsing the actual
* content into e.g. floats. This simplifies the implementation of both parts because the
* logical parsing does not have to worry about e.g. the delimiter or quote characters. It also
* simplifies unit testing.
*
* \param buffer: The buffer containing the .csv file.
* \param options: Options that control how the file is parsed.
* \param process_header: A function that is called at most once and contains the fields of the
* first row/record.
* \param process_records: A function that is called potentially many times in parallel and that
* processes a chunk of parsed records. Typically this function parses raw byte fields into e.g.
* ints or floats. The result of the parsing process has to be returned. Note that under specific
* circumstances, this function may be called twice for the same records. That can happen when
* the .csv file contains multi-line fields which were split incorrectly at first.
* \return A vector containing the return values of the `process_records` function in the correct
* order. Nullopt is returned if the file was malformed, e.g. if it has a quoted field that is
* not closed.
*/
std::optional<Vector<Any<>>> parse_csv_in_chunks(
const Span<char> buffer,
const CsvParseOptions &options,
FunctionRef<void(const CsvRecord &record)> process_header,
FunctionRef<Any<>(const CsvRecords &records)> process_records);
/**
* Same as above, but uses a templated chunk type instead of using #Any which can be more
* convenient to use.
*/
template<typename ChunkT>
inline std::optional<Vector<ChunkT>> parse_csv_in_chunks(
const Span<char> buffer,
const CsvParseOptions &options,
FunctionRef<void(const CsvRecord &record)> process_header,
FunctionRef<ChunkT(const CsvRecords &records)> process_records)
{
std::optional<Vector<Any<>>> result = parse_csv_in_chunks(
buffer, options, process_header, [&](const CsvRecords &records) {
return Any<>(process_records(records));
});
if (!result.has_value()) {
return std::nullopt;
}
Vector<ChunkT> result_chunks;
result_chunks.reserve(result->size());
for (Any<> &value : *result) {
result_chunks.append(std::move(value.get<ChunkT>()));
}
return result_chunks;
}
/* -------------------------------------------------------------------- */
/** \name #CsvRecord inline functions.
* \{ */
inline CsvRecord::CsvRecord(Span<Span<char>> fields) : fields_(fields) {}
inline int64_t CsvRecord::size() const
{
return fields_.size();
}
inline IndexRange CsvRecord::index_range() const
{
return fields_.index_range();
}
inline Span<char> CsvRecord::field(const int64_t index) const
{
BLI_assert(index >= 0);
if (index >= fields_.size()) {
return {};
}
return fields_[index];
}
inline StringRef CsvRecord::field_str(const int64_t index) const
{
const Span<char> value = this->field(index);
return StringRef(value.data(), value.size());
}
/** \} */
/* -------------------------------------------------------------------- */
/** \name #CsvRecords inline functions.
* \{ */
inline CsvRecords::CsvRecords(const OffsetIndices<int64_t> offsets, const Span<Span<char>> fields)
: offsets_(offsets), fields_(fields)
{
}
inline int64_t CsvRecords::size() const
{
return offsets_.size();
}
inline IndexRange CsvRecords::index_range() const
{
return offsets_.index_range();
}
inline CsvRecord CsvRecords::record(const int64_t index) const
{
return CsvRecord(fields_.slice(offsets_[index]));
}
/** \} */
/* -------------------------------------------------------------------- */
/** \name Internal functions exposed for testing.
* \{ */
namespace detail {
/**
* Find the index that ends the current field, i.e. the index of the next delimiter of newline.
* The start index has to be the index of the first character in the field. It may also be the
* end of the field already if it is empty.
*
* \param start: The index of the first character in the field. This may also be the end of the
* field already if it is empty.
* \param delimiter: The character that ends the field.
* \return Index of the next delimiter, a newline character or the end of the buffer.
*/
int64_t find_end_of_simple_field(Span<char> buffer, int64_t start, char delimiter);
/**
* Find the index of the quote that ends the current field.
*
* \param start: The index after the opening quote.
* \param quote: The quote character that ends the field.
* \param escape_chars: The characters that may be used to escape the quote character.
* \return Index of the quote character that ends the field, or std::nullopt if the field is
* malformed and does not have an end.
*/
std::optional<int64_t> find_end_of_quoted_field(Span<char> buffer,
int64_t start,
char quote,
Span<char> escape_chars);
/**
* Finds all fields for the record starting at the given index. Typically, the record ends with a
* newline, but quoted multiline records are supported as well.
*
* \return Index of the the start of the next record or the end of the buffer. Nullopt is returned
* if the buffer has a malformed record at the end, i.e. a quoted field that is not closed.
*/
std::optional<int64_t> parse_record_fields(const Span<char> buffer,
const int64_t start,
const char delimiter,
const char quote,
const Span<char> quote_escape_chars,
Vector<Span<char>> &r_fields);
} // namespace detail
/** \} */
} // namespace blender::csv_parse

View File

@@ -58,6 +58,7 @@ set(SRC
intern/convexhull_2d.cc
intern/cpp_type.cc
intern/cpp_types.cc
intern/csv_parse.cc
intern/delaunay_2d.cc
intern/dot_export.cc
intern/easing.cc
@@ -215,6 +216,7 @@ set(SRC
BLI_cpp_type_make.hh
BLI_cpp_types.hh
BLI_cpp_types_make.hh
BLI_csv_parse.hh
BLI_delaunay_2d.hh
BLI_devirtualize_parameters.hh
BLI_dial_2d.h
@@ -527,6 +529,7 @@ if(WITH_GTESTS)
tests/BLI_color_test.cc
tests/BLI_convexhull_2d_test.cc
tests/BLI_cpp_type_test.cc
tests/BLI_csv_parse_test.cc
tests/BLI_delaunay_2d_test.cc
tests/BLI_disjoint_set_test.cc
tests/BLI_expr_pylike_eval_test.cc

View File

@@ -0,0 +1,286 @@
/* SPDX-FileCopyrightText: 2025 Blender Authors
*
* SPDX-License-Identifier: GPL-2.0-or-later */
#include "BLI_csv_parse.hh"
#include "BLI_enumerable_thread_specific.hh"
#include "BLI_task.hh"
namespace blender::csv_parse {
/**
* Returns a guess for the start of the next record. Note that this could split up quoted fields.
* This case needs to be detected at a higher level.
*/
static int64_t guess_next_record_start(const Span<char> buffer, const int64_t start)
{
int64_t i = start;
while (i < buffer.size()) {
const char c = buffer[i];
if (c == '\n') {
return i + 1;
}
i++;
}
return buffer.size();
}
/**
* Split the buffer into chunks of approximately the given size. The function attempts to align the
* chunks so that records are not split. This works in the majority of cases, but can fail with
* multi-line fields. This has to be detected at a higher level.
*/
static Vector<Span<char>> split_into_aligned_chunks(const Span<char> buffer,
int64_t approximate_chunk_size)
{
approximate_chunk_size = std::max<int64_t>(approximate_chunk_size, 1);
Vector<Span<char>> chunks;
int64_t start = 0;
while (start < buffer.size()) {
int64_t end = std::min(start + approximate_chunk_size, buffer.size());
end = guess_next_record_start(buffer, end);
chunks.append(buffer.slice(IndexRange::from_begin_end(start, end)));
start = end;
}
return chunks;
}
/**
* Parses the given buffer into records and their fields.
*
* r_data_offsets and r_data_fields are passed into to be able to reuse their memory.
*/
static std::optional<CsvRecords> parse_records(const Span<char> buffer,
const CsvParseOptions &options,
Vector<int64_t> &r_data_offsets,
Vector<Span<char>> &r_data_fields)
{
using namespace detail;
/* Clear the data that may still be in there, but do not free the memory. */
r_data_offsets.clear();
r_data_fields.clear();
r_data_offsets.append(0);
int64_t start = 0;
while (start < buffer.size()) {
const std::optional<int64_t> next_record_start = parse_record_fields(
buffer,
start,
options.delimiter,
options.quote,
options.quote_escape_chars,
r_data_fields);
if (!next_record_start.has_value()) {
return std::nullopt;
}
r_data_offsets.append(r_data_fields.size());
start = *next_record_start;
}
return CsvRecords(OffsetIndices<int64_t>(r_data_offsets), r_data_fields);
}
std::optional<Vector<Any<>>> parse_csv_in_chunks(
const Span<char> buffer,
const CsvParseOptions &options,
FunctionRef<void(const CsvRecord &record)> process_header,
FunctionRef<Any<>(const CsvRecords &records)> process_records)
{
using namespace detail;
/* First parse the first row to get the column names. */
Vector<Span<char>> header_fields;
const std::optional<int64_t> first_data_record_start = parse_record_fields(
buffer, 0, options.delimiter, options.quote, options.quote_escape_chars, header_fields);
if (!first_data_record_start.has_value()) {
return std::nullopt;
}
/* Call this before starting to process the remaining data. This allows the caller to do some
* preprocessing that is used during chunk parsing. */
process_header(CsvRecord(header_fields));
/* This buffer contains only the data records, without the header. */
const Span<char> data_buffer = buffer.drop_front(*first_data_record_start);
/* Split the buffer into chunks that can be processed in parallel. */
const Vector<Span<char>> data_buffer_chunks = split_into_aligned_chunks(
data_buffer, options.chunk_size_bytes);
/* It's not common, but it can happen that .csv files contain quoted multi-line values. In the
* unlucky case that we split the buffer in the middle of such a multi-line field, there will be
* malformed chunks. In this case we fallback to parsing the whole buffer with a single thread.
* If this case becomes more common, we could try to avoid splitting into malformed chunks by
* making the splitting logic a bit smarter. */
std::atomic<bool> found_malformed_chunk = false;
Vector<std::optional<Any<>>> chunk_results(data_buffer_chunks.size());
struct TLS {
Vector<int64_t> data_offsets;
Vector<Span<char>> data_fields;
};
threading::EnumerableThreadSpecific<TLS> all_tls;
threading::parallel_for(chunk_results.index_range(), 1, [&](const IndexRange range) {
TLS &tls = all_tls.local();
for (const int64_t i : range) {
if (found_malformed_chunk.load(std::memory_order_relaxed)) {
/* All work is cancelled when there was a malformed chunk. */
return;
}
const Span<char> chunk_buffer = data_buffer_chunks[i];
const std::optional<CsvRecords> records = parse_records(
chunk_buffer, options, tls.data_offsets, tls.data_fields);
if (!records.has_value()) {
found_malformed_chunk.store(true, std::memory_order_relaxed);
return;
}
chunk_results[i] = process_records(*records);
}
});
/* If there was a malformed chunk, process the data again in a single thread without splitting
* the input into chunks. This should happen quite rarely but is important for overall
* correctness. */
if (found_malformed_chunk) {
chunk_results.clear();
TLS &tls = all_tls.local();
const std::optional<CsvRecords> records = parse_records(
data_buffer, options, tls.data_offsets, tls.data_fields);
if (!records.has_value()) {
return std::nullopt;
}
chunk_results.append(process_records(*records));
}
/* Prepare the return value. */
Vector<Any<>> results;
for (std::optional<Any<>> &result : chunk_results) {
BLI_assert(result.has_value());
results.append(std::move(result.value()));
}
return results;
}
namespace detail {
std::optional<int64_t> parse_record_fields(const Span<char> buffer,
const int64_t start,
const char delimiter,
const char quote,
const Span<char> quote_escape_chars,
Vector<Span<char>> &r_fields)
{
using namespace detail;
const auto handle_potentially_trailing_delimiter = [&](const int64_t i) {
if (i <= buffer.size()) {
if (i < buffer.size()) {
if (ELEM(buffer[i], '\n', '\r')) {
r_fields.append({});
}
}
else {
r_fields.append({});
}
}
};
int64_t i = start;
while (i < buffer.size()) {
const char c = buffer[i];
if (c == '\n') {
return i + 1;
}
if (c == '\r') {
i++;
continue;
}
if (c == delimiter) {
r_fields.append({});
i++;
handle_potentially_trailing_delimiter(i);
continue;
}
if (c == quote) {
i++;
const std::optional<int64_t> end_of_field = find_end_of_quoted_field(
buffer, i, quote, quote_escape_chars);
if (!end_of_field.has_value()) {
return std::nullopt;
}
r_fields.append(buffer.slice(IndexRange::from_begin_end(i, *end_of_field)));
i = *end_of_field;
while (i < buffer.size()) {
const char inner_c = buffer[i];
if (inner_c == quote) {
i++;
continue;
}
if (inner_c == delimiter) {
i++;
handle_potentially_trailing_delimiter(i);
break;
}
if (ELEM(inner_c, '\n', '\r')) {
break;
}
i++;
}
continue;
}
const int64_t end_of_field = find_end_of_simple_field(buffer, i, delimiter);
r_fields.append(buffer.slice(IndexRange::from_begin_end(i, end_of_field)));
i = end_of_field;
while (i < buffer.size()) {
const char inner_c = buffer[i];
if (inner_c == delimiter) {
i++;
handle_potentially_trailing_delimiter(i);
break;
}
if (ELEM(inner_c, '\n', '\r')) {
break;
}
BLI_assert_unreachable();
}
}
return buffer.size();
}
int64_t find_end_of_simple_field(const Span<char> buffer,
const int64_t start,
const char delimiter)
{
int64_t i = start;
while (i < buffer.size()) {
const char c = buffer[i];
if (ELEM(c, delimiter, '\n', '\r')) {
return i;
}
i++;
}
return buffer.size();
}
std::optional<int64_t> find_end_of_quoted_field(const Span<char> buffer,
const int64_t start,
const char quote,
const Span<char> escape_chars)
{
int64_t i = start;
while (i < buffer.size()) {
const char c = buffer[i];
if (escape_chars.contains(c)) {
if (i + 1 < buffer.size() && buffer[i + 1] == quote) {
i += 2;
continue;
}
}
if (c == quote) {
return i;
}
i++;
}
return std::nullopt;
}
} // namespace detail
} // namespace blender::csv_parse

View File

@@ -0,0 +1,259 @@
/* SPDX-FileCopyrightText: 2025 Blender Authors
*
* SPDX-License-Identifier: Apache-2.0 */
#include "testing/testing.h"
#include "BLI_csv_parse.hh"
#include "BLI_string_ref.hh"
namespace blender::csv_parse::tests {
static std::optional<int64_t> find_end_of_simple_field(const StringRef buffer,
const int64_t start,
const char delimiter = ',')
{
return detail::find_end_of_simple_field(Span<char>(buffer), start, delimiter);
}
static std::optional<int64_t> find_end_of_quoted_field(
const StringRef buffer,
const int64_t start,
const char quote = '"',
const Span<char> escape_chars = Span<char>(StringRef("\"\\")))
{
return detail::find_end_of_quoted_field(Span<char>(buffer), start, quote, escape_chars);
}
static std::optional<Vector<std::string>> parse_record_fields(
const StringRef buffer,
const int64_t start = 0,
const char delimiter = ',',
const char quote = '"',
const Span<char> quote_escape_chars = Span<char>{'"', '\\'})
{
Vector<Span<char>> fields;
const std::optional<int64_t> end_of_record = detail::parse_record_fields(
Span<char>(buffer), start, delimiter, quote, quote_escape_chars, fields);
if (!end_of_record.has_value()) {
return std::nullopt;
}
Vector<std::string> result;
for (const Span<char> field : fields) {
result.append(std::string(field.begin(), field.end()));
}
return result;
}
struct StrParseResult {
bool success = false;
Vector<std::string> column_names;
Vector<Vector<std::string>> records;
};
static StrParseResult parse_csv_fields(const StringRef str, const CsvParseOptions &options)
{
struct Chunk {
Vector<Vector<std::string>> fields;
};
StrParseResult result;
const std::optional<Vector<Chunk>> chunks = parse_csv_in_chunks<Chunk>(
Span<char>(str),
options,
[&](const CsvRecord &record) {
for (const int64_t i : record.index_range()) {
result.column_names.append(record.field_str(i));
}
},
[&](const CsvRecords &records) {
Chunk result;
for (const int64_t record_i : records.index_range()) {
const CsvRecord record = records.record(record_i);
Vector<std::string> fields;
for (const int64_t column_i : record.index_range()) {
fields.append(record.field_str(column_i));
}
result.fields.append(std::move(fields));
}
return result;
});
if (!chunks.has_value()) {
result.success = false;
return result;
}
result.success = true;
for (const Chunk &chunk : *chunks) {
result.records.extend(std::move(chunk.fields));
}
return result;
}
TEST(csv_parse, FindEndOfSimpleField)
{
EXPECT_EQ(find_end_of_simple_field("123", 0), 3);
EXPECT_EQ(find_end_of_simple_field("123", 1), 3);
EXPECT_EQ(find_end_of_simple_field("123", 2), 3);
EXPECT_EQ(find_end_of_simple_field("123", 3), 3);
EXPECT_EQ(find_end_of_simple_field("1'3", 3), 3);
EXPECT_EQ(find_end_of_simple_field("123,", 0), 3);
EXPECT_EQ(find_end_of_simple_field("123,456", 0), 3);
EXPECT_EQ(find_end_of_simple_field("123,456,789", 0), 3);
EXPECT_EQ(find_end_of_simple_field(" 23", 0), 3);
EXPECT_EQ(find_end_of_simple_field("", 0), 0);
EXPECT_EQ(find_end_of_simple_field("\n", 0), 0);
EXPECT_EQ(find_end_of_simple_field("12\n", 0), 2);
EXPECT_EQ(find_end_of_simple_field("0,12\n", 0), 1);
EXPECT_EQ(find_end_of_simple_field("0,12\n", 2), 4);
EXPECT_EQ(find_end_of_simple_field("\r\n", 0), 0);
EXPECT_EQ(find_end_of_simple_field("12\r\n", 0), 2);
EXPECT_EQ(find_end_of_simple_field("0,12\r\n", 0), 1);
EXPECT_EQ(find_end_of_simple_field("0,12\r\n", 2), 4);
EXPECT_EQ(find_end_of_simple_field("0,\t12\r\n", 2), 5);
EXPECT_EQ(find_end_of_simple_field("0,\t12\r\n", 2, '\t'), 2);
}
TEST(csv_parse, FindEndOfQuotedField)
{
EXPECT_EQ(find_end_of_quoted_field("", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("123", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("123\n", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("123\r\n", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("123\"", 0), 3);
EXPECT_EQ(find_end_of_quoted_field("\"", 0), 0);
EXPECT_EQ(find_end_of_quoted_field("\"\"", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("\"\"\"", 0), 2);
EXPECT_EQ(find_end_of_quoted_field("123\"\"", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("123\"\"\"", 0), 5);
EXPECT_EQ(find_end_of_quoted_field("123\"\"\"\"", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("123\"\"\"\"\"", 0), 7);
EXPECT_EQ(find_end_of_quoted_field("123\"\"0\"\"\"", 0), 8);
EXPECT_EQ(find_end_of_quoted_field(",", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field(",\"", 0), 1);
EXPECT_EQ(find_end_of_quoted_field("0,1\"", 0), 3);
EXPECT_EQ(find_end_of_quoted_field("0,1\n", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("0,1\"\"", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("0,1\"\"\"", 0), 5);
EXPECT_EQ(find_end_of_quoted_field("0\n1\n\"", 0), 4);
EXPECT_EQ(find_end_of_quoted_field("\n\"", 0), 1);
EXPECT_EQ(find_end_of_quoted_field("\\\"", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("\\\"\"", 0), 2);
EXPECT_EQ(find_end_of_quoted_field("\\\"\"\"", 0), std::nullopt);
EXPECT_EQ(find_end_of_quoted_field("\\\"\"\"\"", 0), 4);
}
TEST(csv_parse, ParseRecordFields)
{
using StrVec = Vector<std::string>;
EXPECT_EQ(parse_record_fields(""), StrVec());
EXPECT_EQ(parse_record_fields("1"), StrVec{"1"});
EXPECT_EQ(parse_record_fields("1,2"), StrVec({"1", "2"}));
EXPECT_EQ(parse_record_fields("1,2,3"), StrVec({"1", "2", "3"}));
EXPECT_EQ(parse_record_fields("1\n,2,3"), StrVec({"1"}));
EXPECT_EQ(parse_record_fields("1, 2\n,3"), StrVec({"1", " 2"}));
EXPECT_EQ(parse_record_fields("1, 2\r\n,3"), StrVec({"1", " 2"}));
EXPECT_EQ(parse_record_fields("\"1,2,3\""), StrVec({"1,2,3"}));
EXPECT_EQ(parse_record_fields("\"1,2,3"), std::nullopt);
EXPECT_EQ(parse_record_fields("\"1,\n2\t\r\n,3\""), StrVec({"1,\n2\t\r\n,3"}));
EXPECT_EQ(parse_record_fields("\"1,2,3\",\"4,5\""), StrVec({"1,2,3", "4,5"}));
EXPECT_EQ(parse_record_fields(","), StrVec({"", ""}));
EXPECT_EQ(parse_record_fields(",,"), StrVec({"", "", ""}));
EXPECT_EQ(parse_record_fields(",,\n"), StrVec({"", "", ""}));
EXPECT_EQ(parse_record_fields("\r\n,,"), StrVec());
EXPECT_EQ(parse_record_fields("\"a\"\"b\""), StrVec({"a\"\"b"}));
EXPECT_EQ(parse_record_fields("\"a\\\"b\""), StrVec({"a\\\"b"}));
EXPECT_EQ(parse_record_fields("\"a\"\nb"), StrVec({"a"}));
EXPECT_EQ(parse_record_fields("\"a\" \nb"), StrVec({"a"}));
}
TEST(csv_parse, ParseCsvBasic)
{
CsvParseOptions options;
options.chunk_size_bytes = 1;
StrParseResult result = parse_csv_fields("a,b,c\n1,2,3,4\n4\n77,88,99\n", options);
EXPECT_TRUE(result.success);
EXPECT_EQ(result.column_names.size(), 3);
EXPECT_EQ(result.column_names[0], "a");
EXPECT_EQ(result.column_names[1], "b");
EXPECT_EQ(result.column_names[2], "c");
EXPECT_EQ(result.records.size(), 3);
EXPECT_EQ(result.records[0].size(), 4);
EXPECT_EQ(result.records[1].size(), 1);
EXPECT_EQ(result.records[2].size(), 3);
EXPECT_EQ(result.records[0][0], "1");
EXPECT_EQ(result.records[0][1], "2");
EXPECT_EQ(result.records[0][2], "3");
EXPECT_EQ(result.records[0][3], "4");
EXPECT_EQ(result.records[1][0], "4");
EXPECT_EQ(result.records[2][0], "77");
EXPECT_EQ(result.records[2][1], "88");
EXPECT_EQ(result.records[2][2], "99");
}
TEST(csv_parse, ParseCsvMissingEnd)
{
CsvParseOptions options;
options.chunk_size_bytes = 1;
StrParseResult result = parse_csv_fields("a,b,c\n1,\"2", options);
EXPECT_FALSE(result.success);
}
TEST(csv_parse, ParseCsvMultiLine)
{
CsvParseOptions options;
options.chunk_size_bytes = 1;
StrParseResult result = parse_csv_fields("a,b,c\n1,\"2\n\n\",3,4", options);
EXPECT_TRUE(result.success);
EXPECT_EQ(result.records.size(), 1);
EXPECT_EQ(result.records[0].size(), 4);
EXPECT_EQ(result.records[0][0], "1");
EXPECT_EQ(result.records[0][1], "2\n\n");
EXPECT_EQ(result.records[0][2], "3");
EXPECT_EQ(result.records[0][3], "4");
}
TEST(csv_parse, ParseCsvEmpty)
{
CsvParseOptions options;
options.chunk_size_bytes = 1;
StrParseResult result = parse_csv_fields("", options);
EXPECT_TRUE(result.success);
EXPECT_EQ(result.column_names.size(), 0);
EXPECT_EQ(result.records.size(), 0);
}
TEST(csv_parse, ParseCsvTitlesOnly)
{
CsvParseOptions options;
options.chunk_size_bytes = 1;
StrParseResult result = parse_csv_fields("a,b,c", options);
EXPECT_TRUE(result.success);
EXPECT_EQ(result.column_names.size(), 3);
EXPECT_EQ(result.column_names[0], "a");
EXPECT_EQ(result.column_names[1], "b");
EXPECT_EQ(result.column_names[2], "c");
EXPECT_TRUE(result.records.is_empty());
}
TEST(csv_parse, ParseCsvTrailingNewline)
{
CsvParseOptions options;
options.chunk_size_bytes = 1;
StrParseResult result = parse_csv_fields("a\n1\n2\n", options);
EXPECT_TRUE(result.success);
EXPECT_EQ(result.column_names.size(), 1);
EXPECT_EQ(result.column_names[0], "a");
EXPECT_EQ(result.records.size(), 2);
EXPECT_EQ(result.records[0].size(), 1);
EXPECT_EQ(result.records[0][0], "1");
EXPECT_EQ(result.records[1].size(), 1);
EXPECT_EQ(result.records[1][0], "2");
}
} // namespace blender::csv_parse::tests

View File

@@ -6,207 +6,257 @@
* \ingroup csv
*/
#include <atomic>
#include <charconv>
#include <optional>
#include <variant>
#include "fast_float.h"
#include "BKE_anonymous_attribute_id.hh"
#include "BKE_attribute.hh"
#include "BKE_pointcloud.hh"
#include "BKE_report.hh"
#include "BLI_csv_parse.hh"
#include "BLI_fileops.hh"
#include "BLI_generic_span.hh"
#include "BLI_implicit_sharing.hh"
#include "BLI_vector.hh"
#include "IO_csv.hh"
#include "IO_string_utils.hh"
namespace blender::io::csv {
static Vector<StringRef> parse_column_names(const StringRef line)
struct ColumnInfo {
StringRef name;
bool has_invalid_name = false;
std::atomic<bool> found_invalid = false;
std::atomic<bool> found_int = false;
std::atomic<bool> found_float = false;
};
using ColumnData = std::variant<std::monostate, Vector<float>, Vector<int>>;
struct ChunkResult {
int rows_num;
Vector<ColumnData> columns;
};
struct ParseFloatColumnResult {
Vector<float> data;
bool found_invalid = false;
};
struct ParseIntColumnResult {
Vector<int> data;
bool found_invalid = false;
bool found_float = false;
};
static ParseFloatColumnResult parse_column_as_floats(const csv_parse::CsvRecords &records,
const int column_i)
{
Vector<StringRef> columns;
const char delim = ',';
const char *start = line.begin(), *end = line.end();
const char *cell_start = start, *cell_end = start;
int64_t delim_index = line.find_first_of(delim);
while (delim_index != StringRef::not_found) {
cell_end = start + delim_index;
columns.append_as(cell_start, cell_end);
cell_start = cell_end + 1;
delim_index = line.find_first_of(delim, delim_index + 1);
}
columns.append_as(cell_start, end);
return columns;
}
static std::optional<eCustomDataType> get_column_type(const char *start, const char *end)
{
bool success = false;
int _val_int = 0;
try_parse_int(start, end, 0, success, _val_int);
if (success) {
return CD_PROP_INT32;
}
float _val_float = 0.0f;
try_parse_float(start, end, 0.0f, success, _val_float);
if (success) {
return CD_PROP_FLOAT;
}
return std::nullopt;
}
static bool get_column_types(const StringRef line, Vector<eCustomDataType> &column_types)
{
const char delim = ',';
const char *start = line.begin(), *end = line.end();
const char *cell_start = start, *cell_end = start;
int64_t delim_index = line.find_first_of(delim);
while (delim_index != StringRef::not_found) {
cell_end = start + delim_index;
std::optional<eCustomDataType> column_type = get_column_type(cell_start, cell_end);
if (!column_type.has_value()) {
return false;
ParseFloatColumnResult result;
result.data.reserve(records.size());
for (const int row_i : records.index_range()) {
const Span<char> value_span = records.record(row_i).field(column_i);
const char *value_begin = value_span.begin();
const char *value_end = value_span.end();
/* Skip leading whitespace and plus sign. */
while (value_begin < value_end && ELEM(*value_begin, ' ', '+')) {
value_begin++;
}
column_types.append(column_type.value());
cell_start = cell_end + 1;
delim_index = line.find_first_of(delim, delim_index + 1);
}
std::optional<eCustomDataType> column_type = get_column_type(cell_start, end);
if (!column_type.has_value()) {
return false;
}
column_types.append(column_type.value());
return true;
}
static int64_t get_row_count(StringRef buffer)
{
int64_t row_count = 1;
while (!buffer.is_empty()) {
read_next_line(buffer);
row_count++;
}
return row_count;
}
static void parse_csv_cell(const Span<GMutableSpan> data,
const Span<eCustomDataType> types,
const Span<StringRef> column_names,
const int64_t row_index,
const int64_t col_index,
const char *start,
const char *end,
const CSVImportParams &import_params)
{
bool success = false;
switch (types[col_index]) {
case CD_PROP_INT32: {
int value = 0;
try_parse_int(start, end, 0, success, value);
data[col_index].typed<int>()[row_index] = value;
if (!success) {
StringRef column_name = column_names[col_index];
BKE_reportf(import_params.reports,
RPT_ERROR,
"CSV Import: file '%s' has an unexpected value at row %d for column %s of "
"type Integer",
import_params.filepath,
int(row_index),
std::string(column_name).c_str());
float value;
fast_float::from_chars_result res = fast_float::from_chars(value_begin, value_end, value);
if (res.ec != std::errc()) {
result.found_invalid = true;
return result;
}
if (res.ptr < value_end) {
/* Allow trailing whitespace in the value. */
while (res.ptr < value_end && res.ptr[0] == ' ') {
res.ptr++;
}
break;
}
case CD_PROP_FLOAT: {
float value = 0.0f;
try_parse_float(start, end, 0.0f, success, value);
data[col_index].typed<float>()[row_index] = value;
if (!success) {
StringRef column_name = column_names[col_index];
BKE_reportf(import_params.reports,
RPT_ERROR,
"CSV Import: file '%s' has an unexpected value at row %d for column %s of "
"type Float",
import_params.filepath,
int(row_index),
std::string(column_name).c_str());
if (res.ptr < value_end) {
result.found_invalid = true;
return result;
}
break;
}
default: {
StringRef column_name = column_names[col_index];
BKE_reportf(import_params.reports,
RPT_ERROR,
"CSV Import: file '%s' has an unsupported value at row %d for column %s",
import_params.filepath,
int(row_index),
std::string(column_name).c_str());
break;
}
result.data.append(value);
}
return result;
}
static void parse_csv_line(const Span<GMutableSpan> data,
const Span<eCustomDataType> types,
const Span<StringRef> column_names,
int64_t row_index,
const StringRef line,
const CSVImportParams &import_params)
static ParseIntColumnResult parse_column_as_ints(const csv_parse::CsvRecords &records,
const int column_i)
{
const char delim = ',';
const char *start = line.begin(), *end = line.end();
const char *cell_start = start, *cell_end = start;
int64_t col_index = 0;
int64_t delim_index = line.find_first_of(delim);
while (delim_index != StringRef::not_found) {
cell_end = start + delim_index;
parse_csv_cell(
data, types, column_names, row_index, col_index, cell_start, cell_end, import_params);
col_index++;
cell_start = cell_end + 1;
delim_index = line.find_first_of(delim, delim_index + 1);
ParseIntColumnResult result;
result.data.reserve(records.size());
for (const int row_i : records.index_range()) {
const Span<char> value_span = records.record(row_i).field(column_i);
const char *value_begin = value_span.begin();
const char *value_end = value_span.end();
/* Skip leading whitespace and plus sign. */
while (value_begin < value_end && ELEM(*value_begin, ' ', '+')) {
value_begin++;
}
int value;
std::from_chars_result res = std::from_chars(value_begin, value_end, value);
if (res.ec != std::errc()) {
result.found_invalid = true;
return result;
}
if (res.ptr < value_end) {
/* If the next character after the value is a dot, it should be parsed again as float. */
if (res.ptr[0] == '.') {
result.found_float = true;
return result;
}
/* Allow trailing whitespace in the value. */
while (res.ptr < value_end && res.ptr[0] == ' ') {
res.ptr++;
}
if (res.ptr < value_end) {
result.found_invalid = true;
return result;
}
}
result.data.append(value);
}
parse_csv_cell(data, types, column_names, row_index, col_index, cell_start, end, import_params);
return result;
}
static void parse_csv_data(const Span<GMutableSpan> data,
const Span<eCustomDataType> types,
const Span<StringRef> column_names,
StringRef buffer,
const CSVImportParams &import_params)
static ChunkResult parse_records_chunk(const csv_parse::CsvRecords &records,
MutableSpan<ColumnInfo> columns_info)
{
int64_t row_index = 0;
while (!buffer.is_empty()) {
const StringRef line = read_next_line(buffer);
parse_csv_line(data, types, column_names, row_index, line, import_params);
row_index++;
const int columns_num = columns_info.size();
ChunkResult chunk_result;
chunk_result.rows_num = records.size();
chunk_result.columns.resize(columns_num);
for (const int column_i : IndexRange(columns_num)) {
ColumnInfo &column_info = columns_info[column_i];
if (column_info.has_invalid_name) {
/* Column can be ignored. */
continue;
}
if (column_info.found_invalid.load(std::memory_order_relaxed)) {
/* Invalid values have been found in this column already, skip it. */
continue;
}
/* A float was found in this column already, so parse everything as floats. */
const bool found_float = column_info.found_float.load(std::memory_order_relaxed);
if (found_float) {
ParseFloatColumnResult float_column_result = parse_column_as_floats(records, column_i);
if (float_column_result.found_invalid) {
column_info.found_invalid.store(true, std::memory_order_relaxed);
continue;
}
chunk_result.columns[column_i] = std::move(float_column_result.data);
continue;
}
/* No float was found so far in this column, so attempt to parse it as integers. */
ParseIntColumnResult int_column_result = parse_column_as_ints(records, column_i);
if (int_column_result.found_invalid) {
column_info.found_invalid.store(true, std::memory_order_relaxed);
continue;
}
if (!int_column_result.found_float) {
chunk_result.columns[column_i] = std::move(int_column_result.data);
column_info.found_int.store(true, std::memory_order_relaxed);
continue;
}
/* While parsing it as integers, floats were detected. So parse it as floats again. */
column_info.found_float.store(true, std::memory_order_relaxed);
ParseFloatColumnResult float_column_result = parse_column_as_floats(records, column_i);
if (float_column_result.found_invalid) {
column_info.found_invalid.store(true, std::memory_order_relaxed);
continue;
}
chunk_result.columns[column_i] = std::move(float_column_result.data);
}
return chunk_result;
}
/**
* So far, the parsed data is still split into many chunks. This function flattens the chunks into
* continuous buffers that can be used as attributes.
*/
static Array<std::optional<GArray<>>> flatten_valid_attribute_chunks(
const Span<ColumnInfo> columns_info,
OffsetIndices<int> chunk_offsets,
MutableSpan<ChunkResult> chunks)
{
const int points_num = chunk_offsets.total_size();
Array<std::optional<GArray<>>> flattened_attributes(columns_info.size());
threading::parallel_for(columns_info.index_range(), 1, [&](const IndexRange columns_range) {
for (const int column_i : columns_range) {
const ColumnInfo &column_info = columns_info[column_i];
if (column_info.has_invalid_name || column_info.found_invalid) {
/* Column can be ignored. */
continue;
}
if (column_info.found_float) {
/* Should read column as floats. */
GArray<> attribute(CPPType::get<float>(), points_num);
float *attribute_buffer = static_cast<float *>(attribute.data());
threading::parallel_for(chunks.index_range(), 1, [&](const IndexRange chunks_range) {
for (const int chunk_i : chunks_range) {
const IndexRange dst_range = chunk_offsets[chunk_i];
ChunkResult &chunk = chunks[chunk_i];
ColumnData &column_data = chunk.columns[column_i];
if (const auto *float_vec = std::get_if<Vector<float>>(&column_data)) {
BLI_assert(float_vec->size() == dst_range.size());
uninitialized_copy_n(
float_vec->data(), dst_range.size(), attribute_buffer + dst_range.first());
}
else if (const auto *int_vec = std::get_if<Vector<int>>(&column_data)) {
/* This chunk was read entirely as integers, so it still has to be converted to
* floats. */
BLI_assert(int_vec->size() == dst_range.size());
uninitialized_convert_n(int_vec->data(), dst_range.size(), attribute_buffer);
}
else {
/* Expected data to be available, because the `found_invalid` flag was not
* set. */
BLI_assert_unreachable();
}
/* Free data for chunk. */
column_data = std::monostate{};
}
});
flattened_attributes[column_i] = std::move(attribute);
continue;
}
if (column_info.found_int) {
/* Should read column as ints. */
GArray<> attribute(CPPType::get<int>(), points_num);
int *attribute_buffer = static_cast<int *>(attribute.data());
threading::parallel_for(chunks.index_range(), 1, [&](const IndexRange chunks_range) {
for (const int chunk_i : chunks_range) {
const IndexRange dst_range = chunk_offsets[chunk_i];
ChunkResult &chunk = chunks[chunk_i];
ColumnData &column_data = chunk.columns[column_i];
if (const auto *int_vec = std::get_if<Vector<int>>(&column_data)) {
BLI_assert(int_vec->size() == dst_range.size());
uninitialized_copy_n(
int_vec->data(), dst_range.size(), attribute_buffer + dst_range.first());
}
else {
/* Expected data to be available, because the `found_invalid` and
* `found_float` flags were not set. */
BLI_assert_unreachable();
}
/* Free data for chunk. */
column_data = std::monostate{};
}
});
flattened_attributes[column_i] = std::move(attribute);
continue;
}
}
});
return flattened_attributes;
}
PointCloud *import_csv_as_point_cloud(const CSVImportParams &import_params)
@@ -220,62 +270,82 @@ PointCloud *import_csv_as_point_cloud(const CSVImportParams &import_params)
import_params.filepath);
return nullptr;
}
BLI_SCOPED_DEFER([&]() { MEM_freeN(buffer); });
StringRef buffer_str{static_cast<char *>(buffer), int64_t(buffer_len)};
if (buffer_str.is_empty()) {
if (buffer_len == 0) {
BKE_reportf(
import_params.reports, RPT_ERROR, "CSV Import: empty file '%s'", import_params.filepath);
return nullptr;
}
const StringRef header = read_next_line(buffer_str);
const Vector<StringRef> names = parse_column_names(header);
Array<ColumnInfo> columns_info;
if (buffer_str.is_empty()) {
const auto parse_header = [&](const csv_parse::CsvRecord &record) {
columns_info.reinitialize(record.size());
for (const int i : record.index_range()) {
ColumnInfo &column_info = columns_info[i];
const StringRef name = record.field_str(i);
column_info.name = name;
if (!bke::allow_procedural_attribute_access(name) ||
bke::attribute_name_is_anonymous(name) || name.is_empty())
{
column_info.has_invalid_name = true;
continue;
}
}
};
const auto parse_data_chunk = [&](const csv_parse::CsvRecords &records) {
return parse_records_chunk(records, columns_info);
};
const Span<char> buffer_span{static_cast<char *>(buffer), int64_t(buffer_len)};
csv_parse::CsvParseOptions parse_options;
std::optional<Vector<ChunkResult>> parsed_chunks = csv_parse::parse_csv_in_chunks<ChunkResult>(
buffer_span, parse_options, parse_header, parse_data_chunk);
if (!parsed_chunks.has_value()) {
BKE_reportf(import_params.reports,
RPT_ERROR,
"CSV Import: no rows in file '%s'",
"CSV import: failed to parse file '%s'",
import_params.filepath);
return nullptr;
}
/* Shallow copy buffer to preserve pointers from first row for parsing */
const StringRef data_buffer(buffer_str.begin(), buffer_str.end());
const StringRef first_row = read_next_line(buffer_str);
Vector<eCustomDataType> column_types;
if (!get_column_types(first_row, column_types)) {
std::string column_name = names[column_types.size()];
BKE_reportf(import_params.reports,
RPT_ERROR,
"CSV Import: file '%s', Column %s is of unsupported data type",
import_params.filepath,
column_name.c_str());
return nullptr;
/* Count the total number of records and compute the offset of each chunk which is used when
* flattening the parsed data. */
Vector<int> chunk_offsets_vec;
chunk_offsets_vec.append(0);
for (const ChunkResult &chunk : *parsed_chunks) {
chunk_offsets_vec.append(chunk_offsets_vec.last() + chunk.rows_num);
}
const OffsetIndices<int> chunk_offsets(chunk_offsets_vec);
const int points_num = chunk_offsets_vec.last();
const int64_t rows_num = get_row_count(buffer_str);
PointCloud *pointcloud = BKE_pointcloud_new_nomain(points_num);
PointCloud *pointcloud = BKE_pointcloud_new_nomain(rows_num);
pointcloud->positions_for_write().fill(float3(0));
Array<bke::GSpanAttributeWriter> attribute_writers(names.size());
Array<GMutableSpan> attribute_data(names.size());
Array<std::optional<GArray<>>> flattened_attributes;
threading::memory_bandwidth_bound_task(points_num * 16, [&]() {
threading::parallel_invoke([&]() { pointcloud->positions_for_write().fill(float3(0)); },
[&]() {
flattened_attributes = flatten_valid_attribute_chunks(
columns_info, chunk_offsets, *parsed_chunks);
});
});
/* Add all valid attributes to the pointcloud. */
bke::MutableAttributeAccessor attributes = pointcloud->attributes_for_write();
for (const int i : names.index_range()) {
attribute_writers[i] = attributes.lookup_or_add_for_write_span(
names[i], bke::AttrDomain::Point, column_types[i]);
attribute_data[i] = attribute_writers[i].span;
}
parse_csv_data(attribute_data, column_types, names, data_buffer, import_params);
for (bke::GSpanAttributeWriter &attr : attribute_writers) {
attr.finish();
for (const int column_i : columns_info.index_range()) {
const std::optional<GArray<>> &attribute = flattened_attributes[column_i];
if (!attribute.has_value()) {
continue;
}
const auto *data = new ImplicitSharedValue<GArray<>>(std::move(*attribute));
const eCustomDataType type = bke::cpp_type_to_custom_data_type(attribute->type());
const ColumnInfo &column_info = columns_info[column_i];
attributes.add(column_info.name,
bke::AttrDomain::Point,
type,
bke::AttributeInitShared{data->data.data(), *data});
data->remove_user_and_delete_if_last();
}
return pointcloud;