sample: review-driven cleanup, fix streaming bernoulli header bug, dead retry loop, cluster pre-alloc#3774
Merged
Merged
Conversation
…ad retry loop, cluster pre-alloc P0: - run() Bernoulli probability validation now happens before the URL/streaming dispatch, so `--bernoulli 1.5 https://…` no longer reaches random_bool() and panics. Also added is_finite() guard against NaN/Inf sample sizes. - stream_bernoulli_sampling rewrite: single buffer with manual header split on first newline (no more lost data records co-located with the header chunk), fixed inverted has_headers flag in the data loop, lazy RNG creation (only the chosen kind, was eagerly building all three including slow-init HC128), honors --max-size like the download path. - do_weighted_sampling: removed dead outer while-loop. The inner for-loop consumes the &mut Iterator, so retries were impossible — single clear pass now, dropped unused HashSet/attempts/records_exhausted bookkeeping. - sample_cluster: pre-size unique_clusters/all_clusters with cardinality (the total number of unique clusters), not requested_clusters (the sample size). Also dropped a per-record to_vec() on the second pass via Borrow<[u8]>. P1: - parse_timestamp: disambiguate Unix sec vs ms by magnitude (10^11 cutoff) so millisecond timestamps don't silently parse as far-future dates. - do_reservoir_sampling: bound writes against reservoir.len().min(sample_size) rather than just sample_size, defensible against an undersized slice; fixed misleading "constant memory" comment (it's O(k)). - check_stats_cache: dropped the redundant chars().all(char::is_numeric) pre-check (Unicode-numeric foot-gun); just parse::<usize>().ok().or_else. P2: - get_column_index: direct &[u8] byte comparison; no more from_utf8_lossy allocation per field. - check_stats_cache: dead flag_force write into SchemaArgs (we already returned early when set) replaced with literal false + comment. - sample_stratified: dropped the parallel strata_counts HashMap whose count was never read; reservoirs map built in one pass. - do_stratified_sampling: lookup via Borrow<[u8]> first, only allocate the Vec<u8> stratum key once per stratum instead of per record. - Removed unnecessary unsafe get_unchecked_mut blocks in do_reservoir_sampling, do_stratified_sampling, sample_indices. - aggregate_records: takes a precomputed &[bool] numeric mask; sample_timeseries scans the dataset once after sorting (with early exit once every column is ruled out) instead of repeating the per-column "are all values numeric?" probe inside every interval. Also pass refs into aggregate_records to avoid per-interval ByteRecord clones. - with_rng! macro: collapses the recurring three-arm dispatch (Standard/Faster/Cryptosecure) into a single macro invocation across sample_systematic, sample_stratified, sample_weighted, sample_cluster, sample_timeseries, and the default INDEXED path in run(). Tests: 73/73 sample tests pass; clippy -D warnings clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…fer flush, i64::MIN abs, stratified clarity
Medium:
- stream_bernoulli_sampling: header was split on the first raw `\n`, which
mis-handles RFC 4180 headers whose fields contain a quoted newline (the
csv reader gets a slice that ends inside the quote and either errors or
truncates). Replaced with csv-driven boundary detection: parse one record
via csv::Reader and use position().byte() to drain exactly that many
bytes. The same bug existed in the data loop's rposition('\n') split, so
the data path now uses the same approach — only commit records whose
terminator lies INSIDE the buffer; hold back trailing partials until more
data arrives or the stream closes naturally.
Low:
- stream_bernoulli_sampling: with --max-size truncation, the post-loop
flush could emit one malformed half-record at the cap boundary. The new
loop tracks `parser_eof = stream_done && !size_capped` and only treats
trailing bytes as a real record on a NATURAL EOS.
- parse_timestamp: replaced `ts_val.abs() < SEC_LIMIT` with a range form
(`(-SEC_LIMIT..SEC_LIMIT).contains(&ts_val)`) so an input of `i64::MIN`
no longer panics in debug / wraps in release.
- do_stratified_sampling: replaced the awkward two-lookup
records_seen.get_mut().or-insert pattern by unifying the per-stratum
state into a single HashMap<Vec<u8>, (Vec<ByteRecord>, usize)>. Halves
per-record HashMap lookups and reads more naturally.
- check_stats_cache: dropped the `flag_force: false` literal in favor of
passing `args.flag_force` plus a `debug_assert!(!args.flag_force)`,
keeping the early-return guard and the SchemaArgs construction in sync
if the guard ever moves.
Tests: 73/73 sample tests pass; clippy -D warnings clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Complexity | 106 |
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
to avoid typos lint error
Contributor
There was a problem hiding this comment.
Pull request overview
This PR refactors and tightens qsv sample by removing duplicated RNG dispatch code, improving performance in several sampling methods, and fixing multiple correctness issues—most notably around streaming Bernoulli sampling over remote URLs.
Changes:
- Fixes streaming Bernoulli sampling correctness (quoted-newline-safe record boundaries,
--no-headershandling, early probability validation,--max-sizeenforcement) and reduces RNG init overhead. - Simplifies sampling implementations (removes dead retry loop in weighted sampling, removes several
unsafeindexing sites, bounds reservoir writes defensively). - Performance/maintainability cleanups (byte-based column lookup, prealloc tweaks for cluster sampling, precomputed numeric-column mask for time-series aggregation,
with_rng!macro to unify RNG branching).
…ror status) stream_bernoulli_sampling: - Honor --delimiter for header AND data probes. Previously it always used default_delim derived from QSV_DEFAULT_DELIMITER (or comma), so a user-supplied --delimiter was silently ignored when sampling a remote CSV. Resolved once at function entry: --delimiter > QSV_DEFAULT_DELIMITER > comma. - Fail fast on non-2xx HTTP responses by chaining .error_for_status() onto .send().await — reqwest's send() does not error on HTTP error status, so without this a 404/500 HTML body would have been streamed straight into the csv parser and surfaced as confusing record errors. Tests: 73/73 sample tests pass; clippy -D warnings clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
4 tasks
jqnatividad
added a commit
that referenced
this pull request
Apr 28, 2026
…3775) * test/sample: add integration tests for streaming Bernoulli URL path Closes the test-coverage gap flagged in PR #3774. Stands up a local actix-web fixture (port 8082, distinct from test_fetch's 8081) and exercises the boundary detection and validation guards added there: - sample_bernoulli_url_quoted_newline_header: header field 0 contains a literal `\n` inside an RFC-4180 quote. Asserts the header arrives intact (3 fields, embedded newline preserved) and that every emitted data row also has 3 fields. Old code would have split on the raw byte and corrupted every following record. - sample_bernoulli_url_max_size_truncation: serves a ~1.2 MiB CSV with fixed 100-byte records so `--max-size 1` cuts deterministically inside record 10486. Asserts max id <= 10485 (no half-record at the cap) and that every emitted row is well-formed. - sample_bernoulli_url_404_fails_fast: hits an unmapped path on the fixture server. Asserts qsv exits with error instead of feeding the HTML 404 body into the csv parser (regression for the missing `error_for_status()`). - sample_bernoulli_url_custom_delimiter: serves a TSV and passes `--delimiter '\t'`. Reads raw stdout and splits on tab (the writer also honors --delimiter, so read_stdout's comma parser would collapse rows). Asserts header and data rows split into 3 fields. Tests use #[serial] so they don't race on the port. 77/77 sample tests pass; clippy --bin qsv clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * typo: mis-split->split incorrectly * test/sample: address review feedback on streaming Bernoulli tests - (#1 #2) Replace SAMPLE_TEST_PORT + SAMPLE_TEST_HOST (which duplicated the port and could drift) with a single SAMPLE_TEST_PORT + SAMPLE_TEST_BIND_HOST literal. URL-builder and bind() both derive from the same source — no more brittle .split(':').next().unwrap() that would also panic on IPv6 hosts. - (#3) Wrap the ServerHandle in a SampleWebServer RAII guard. The server now stops in Drop, so a panic inside read_stdout / stdout doesn't leak the port and cascade into "Address already in use" on the next #[serial] test. - (#4) Call wrk.assert_success(&mut cmd) before reading stdout in the success-path tests, so a regression that makes qsv exit non-zero surfaces qsv's stderr instead of a generic CSV-parse error. 77/77 sample tests pass; clippy --bin qsv clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test/sample: address Copilot review on PR #3775 — single-run cmd, server start timeout - Replace the assert_success-then-read_stdout double-run pattern with a single capture-and-parse helper. The previous shape ran qsv twice per test, doubling fixture-server requests (and the ~1.2 MiB max-size download) and meaning the parsed stdout came from a different execution than the one whose status was asserted. - Added run_and_assert_success(): runs once, asserts status, returns Output (with stderr surfaced on failure). - Added parse_csv_stdout(): mirrors wrk.read_stdout's Vec<Vec<String>> shape but reads from a captured buffer. - All three success-path tests (quoted newline header, max-size truncation, custom delimiter) now use these helpers. - Switch the SampleWebServer startup channel to send Result<ServerHandle, String> and use recv_timeout(10s) instead of recv(). A failed bind (e.g., port already in use) used to leave start() blocked forever; it now panics fast with the bind error surfaced from the server thread. 77/77 sample tests pass; clippy --bin qsv clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Review-driven cleanup of
src/cmd/sample.rs(net -44 lines, 8 sampling methods touched). Two commits:98693bdff— initial review passd6ba2ea2a— addresses roborev build(deps): bump simple-expand-tilde from 0.1.4 to 0.1.5 #1767 follow-up reviewBugs fixed
\n, so an RFC 4180 header with a quoted newline truncated the headers and shifted every record. Fixed by csv-driven boundary detection (Reader::position().byte()); same fix applied to the data loop'srposition('\n'). Trailing partial records are held back until either more data arrives or the stream ends naturally.--no-headerswas passed straight intohas_headers(...)(inverted), discarding the first data row. Fixed.--bernoulli 1.5 https://…reachedrandom_bool()and panicked because validation lived after the URL dispatch. Validation hoisted to the top ofrun()(also rejects NaN/Inf sample sizes).--max-sizewas silently ignored on the streaming path. Now honored; capped buffers do not flush a final partial record.whileloop indo_weighted_samplingwas dead code (the innerforconsumes the iterator, no retry possible). Removed; single clear pass with the existing under-fill warning.unique_clusters/all_clusterswith the requested sample size instead of the cardinality, causing the HashMap to rehash its way up. Now uses cardinality (and falls back to the estimate when cardinality is unknown).parse_timestamp: a millisecond Unix timestamp like1_704_067_200_000parsed as seconds (year ~55,899). Disambiguated by magnitude (10^11 cutoff). Range-form check ((-SEC_LIMIT..SEC_LIMIT).contains(&ts_val)) soi64::MINdoesn't panic.do_reservoir_sampling: tightened bounds (reservoir.len().min(sample_size)) so an undersized reservoir slice is defensible. Removed unnecessaryunsafe { get_unchecked_mut(...) }blocks here, indo_stratified_sampling, and insample_indices.Cleanups
get_column_index: direct&[u8]byte comparison; no moreString::from_utf8_lossyallocation per field.check_stats_cache: dropped thechars().all(char::is_numeric)Unicode-numeric pre-check — justparse::<usize>().ok().or_else(...).flag_forcesynced viadebug_assert!instead of the misleadingfalseliteral.sample_stratified: collapsed the parallelstrata_countsHashMap (its count was never read) and the parallelrecords_seenmap into a singleHashMap<Vec<u8>, (Vec<ByteRecord>, usize)>. Halves per-record lookups in the hot loop and removes the awkward "lookup-or-insert" dance.aggregate_records: numeric-column detection hoisted out of the per-interval loop.sample_timeseriesdoes one O(records × cols) scan after sorting (with early exit once every column is ruled out).aggregate_recordsnow takes a precomputed&[bool]mask and&[&ByteRecord](no per-interval ByteRecord clones).with_rng!macro: replaces the recurring three-armmatch rng_kind { Standard => …, Faster => …, Cryptosecure => … }blocks acrosssample_systematic,sample_stratified,sample_weighted,sample_cluster,sample_timeseries(random-offset branch), and the defaultINDEXEDpath inrun(). Net: ~80 LOC deleted, "did I update all three branches?" trip-hazards removed, log lines unified to{:?} <METHOD> sampling….stream_bernoulli_sampling: lazy RNG creation (only the chosen kind, was eagerly building all three including slow-init HC128).do_reservoir_sampling("constant memory" → O(k)).Test plan
cargo build --locked --bin qsv -F all_features— cleancargo t sample -F all_features— 73 / 73 passcargo clippy --locked --bin qsv -F all_features -- -D warnings— clean--max-sizemid-record truncationqsv statscache integration on stratified / weighted / cluster paths🤖 Generated with Claude Code