RecordBatch normalization (flattening)#6758
Conversation
RecordBatch normalization (flattening)
… iterative function for `RecordBatch`. Not sure which one is better currently.
There was a problem hiding this comment.
I had some questions regarding the implementation of this, since the one example from PyArrow doesn't seem to clarify on the edge cases here. Normalizing the Schema seems fairly straight forward to me, I'm just not sure on
- Whether the iterative or recursive approach is better (or something I missed)
- If
DataType::Structis the onlyDataTypethat requires flattening. To me, it looks like that's the only one that can contained nestedFields.
(I'm also not sure if I'm missing something with unwrapping like a List<Struct>)
Any feedback/help would be appreciated!
|
@kszlim can you please help review this PR ? You requested the feature and we are currently quite short on review capacity in arrow-rs |
|
I'll take a look, though please feel free to disregard anything I say and especially defer to the maintainers. |
No problem at all, it's the holiday season! Hope everyone's taking a good break. Appreciate the feedback though! I'll get to work on it :) |
|
Sorry for the delays on this one, made changes based on the feedback, would appreciate another look! Hopefully the new documentation is more clear. |
Jefffrey
left a comment
There was a problem hiding this comment.
Some potential simplifications
…d if statements, simplified the VecDeque fields.
|
Appreciate the feedback, as always. Changed some bits of the code, added some responses (and some stuff to work on). |
|
Sorry, fell ill there for a good while. Added some additional tests to hopefully cover some more of the edges. I was trying to adapt it over for // Initialize schema
let a = Arc::new(Field::new("a", DataType::Int64, true));
let b = Arc::new(Field::new("b", DataType::Int64, false));
let c = Arc::new(Field::new("c", DataType::Int64, true));
let one = Arc::new(Field::new(
"1",
DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])),
false,
));
let two = Arc::new(Field::new(
"2",
DataType::List(Arc::new(Field::new_list_field(
DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])),
true,
))),
false,
));
let exclamation = Arc::new(Field::new(
"!",
DataType::Struct(Fields::from(vec![one.clone(), two.clone()])),
false,
));
let schema = Schema::new(vec![exclamation.clone()]);
// Initialize fields
let a_field = Int64Array::from(vec![Some(0), Some(1)]);
let b_field = Int64Array::from(vec![Some(2), Some(3)]);
let c_field = Int64Array::from(vec![None, Some(4)]);
let one_field = StructArray::from(vec![
(a.clone(), Arc::new(a_field.clone()) as ArrayRef),
(b.clone(), Arc::new(b_field.clone()) as ArrayRef),
(c.clone(), Arc::new(c_field.clone()) as ArrayRef),
]);
let two_field_data = ArrayData::builder(DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])))
.len(2)
.add_child_data(Arc::new(a_field.clone()).to_data())
.add_child_data(Arc::new(b_field.clone()).to_data())
.add_child_data(Arc::new(c_field.clone()).to_data())
.build()
.unwrap();
let two_field = ListArray::from(two_field_data);
let exclamation_field = Arc::new(StructArray::from(vec![
(one.clone(), Arc::new(one_field) as ArrayRef),
(two.clone(), Arc::new(two_field) as ArrayRef),
]));
// Normalize all levels
let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field])
.expect("valid conversion")
.normalize(".", None)
.expect("valid normalization");
let expected = RecordBatch::try_from_iter_with_nullable(vec![
("!.1.a", Arc::new(a_field.clone()) as ArrayRef, true),
("!.1.b", Arc::new(b_field.clone()) as ArrayRef, false),
("!.1.c", Arc::new(c_field.clone()) as ArrayRef, true),
("!.2.a", Arc::new(a_field.clone()) as ArrayRef, true),
("!.2.b", Arc::new(b_field.clone()) as ArrayRef, false),
("!.2.c", Arc::new(c_field.clone()) as ArrayRef, true),
])
.expect("valid conversion");
assert_eq!(expected, normalized); |
Jefffrey
left a comment
There was a problem hiding this comment.
Some minor comments (which are also applicable for the schema code); otherwise it looks good to me
|
I'll merge this tomorrow or day after to leave some time for any last comments |
|
Thanks @ngli-me |
Which issue does this PR close?
Closes #6369.
Rationale for this change
Adds normalization (flattening) for
RecordBatch, with normalization viaSchema. Based on pandas/pola-rs.What changes are included in this PR?
Are there any user-facing changes?