Skip to content

Conversation

@masenf
Copy link
Collaborator

@masenf masenf commented Oct 14, 2025

  • New env var: REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS (default 2.0)
  • If the debounce is non-zero, then state manager will queue the disk write
  • Queued writes will be processed in order of set time after they exceed the debounce timeout
  • New StateManager.close method standardized in base class
  • Close app.state_manager when the server is going down
  • Flush all queued writes when the StateManagerDisk closes
  • Add memory cache state expiration to StateManagerDisk
  • Use run_in_thread for the actual disk write and purging of expired states to avoid blocking the event loop while writing to disk
  • Update test cases to always call state_manager.close()

* New env var: REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS (default 2.0)
* If the debounce is non-zero, then state manager will queue the disk write
* Queued writes will be processed in order of set time after they exceed the debounce timeout
* New StateManager.close method standardized in base class
* Close app.state_manager when the server is going down
* Flush all queued writes when the StateManagerDisk closes
* Update test cases to always call `state_manager.close()`
@linear
Copy link

linear bot commented Oct 14, 2025

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

Implements a deferred write queue for StateManagerDisk to reduce disk I/O overhead by batching state writes. The debounce period is configurable via REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS (default 2.0 seconds). A background task processes queued writes after the debounce timeout and handles state expiration cleanup.

Major changes:

  • New QueueItem dataclass to track pending writes with timestamps
  • Background _process_write_queue() task that processes writes older than debounce period
  • Memory cache expiration tracking via _token_last_touched dictionary
  • Standardized close() method in base StateManager class to flush pending writes on shutdown
  • Integration with app lifespan to ensure clean shutdown
  • Updated tests to always call state_manager.close()

Issues found:

  • Critical: Line 295 only adds to queue if token not present, causing subsequent updates to be silently dropped
  • Critical: In-memory cache self.states never updated in set_state(), causing stale data to be served while waiting for debounced writes

Confidence Score: 1/5

  • This PR contains critical bugs that will cause data loss and stale state issues in production
  • Two critical logical errors in the core state management flow: (1) subsequent state updates are silently dropped when debouncing is enabled due to conditional queue insertion, and (2) the in-memory cache is never updated during set_state, causing stale data to be served to clients. These bugs will manifest as lost state updates and inconsistent application state.
  • reflex/istate/manager/disk.py requires immediate attention - the set_state() method has critical bugs at lines 285-305

Important Files Changed

File Analysis

Filename Score Overview
reflex/istate/manager/disk.py 1/5 Adds deferred write queue with debouncing and expiration handling; critical bug where subsequent updates to same token are dropped, and memory cache not updated causing stale data
reflex/istate/manager/init.py 5/5 Adds close() method to base StateManager class with empty default implementation
reflex/environment.py 5/5 Adds interpret_float_env() helper and REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS environment variable with 2.0 default
reflex/app_mixins/lifespan.py 5/5 Calls state_manager.close() during app shutdown to flush pending writes

Sequence Diagram

sequenceDiagram
    participant Client
    participant App
    participant StateManagerDisk
    participant WriteQueue
    participant ProcessTask
    participant Disk

    Note over StateManagerDisk: Initialization
    StateManagerDisk->>StateManagerDisk: __post_init__()
    StateManagerDisk->>Disk: _purge_expired_states()

    Note over Client,Disk: State Update Flow (with debounce)
    Client->>App: User action triggers state change
    App->>StateManagerDisk: modify_state(token)
    StateManagerDisk->>StateManagerDisk: get_state(token)
    StateManagerDisk-->>App: yield state
    App->>App: Modify state
    App->>StateManagerDisk: set_state(token, state)
    StateManagerDisk->>WriteQueue: Add/Update QueueItem(token, state, timestamp)
    StateManagerDisk->>StateManagerDisk: _schedule_process_write_queue()
    StateManagerDisk->>ProcessTask: create_task() if not running
    StateManagerDisk-->>App: Return (state not yet on disk)

    Note over ProcessTask,Disk: Background Write Processing
    loop Every debounce period
        ProcessTask->>ProcessTask: Check queue for items older than debounce
        ProcessTask->>WriteQueue: Pop items ready to write
        ProcessTask->>Disk: set_state_for_substate() via run_in_thread
        ProcessTask->>ProcessTask: Check for expired tokens
        ProcessTask->>Disk: _purge_expired_states() via run_in_thread
        ProcessTask->>ProcessTask: _process_write_queue_delay()
    end

    Note over App,Disk: App Shutdown
    App->>StateManagerDisk: close()
    StateManagerDisk->>ProcessTask: cancel()
    ProcessTask->>WriteQueue: Flush all remaining items
    ProcessTask->>Disk: Write all items to disk
    StateManagerDisk-->>App: Shutdown complete
Loading

Additional Comments (1)

  1. reflex/istate/manager/disk.py, line 285-305 (link)

    logic: in-memory cache is never updated during set_state, only during get_state. this causes stale data to be served from memory cache while waiting for debounced writes. add memory cache update before queueing the write

6 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@codspeed-hq
Copy link

codspeed-hq bot commented Oct 14, 2025

CodSpeed Performance Report

Merging #5883 will not alter performance

Comparing masenf/disk-state-throttler (d1359cb) with main (ade1254)1

Summary

✅ 8 untouched

Footnotes

  1. No successful run was found on main (1e2a8c9) during the generation of this report, so ade1254 was used instead as the comparison base. There might be some changes unrelated to this pull request in this report.

@adhami3310 adhami3310 merged commit 2cc6884 into main Oct 16, 2025
45 of 47 checks passed
@adhami3310 adhami3310 deleted the masenf/disk-state-throttler branch October 16, 2025 00:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants