-
Notifications
You must be signed in to change notification settings - Fork 1.7k
ENG-7948: StateManagerDisk deferred write queue #5883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* New env var: REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS (default 2.0) * If the debounce is non-zero, then state manager will queue the disk write * Queued writes will be processed in order of set time after they exceed the debounce timeout * New StateManager.close method standardized in base class * Close app.state_manager when the server is going down * Flush all queued writes when the StateManagerDisk closes * Update test cases to always call `state_manager.close()`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Summary
Implements a deferred write queue for StateManagerDisk to reduce disk I/O overhead by batching state writes. The debounce period is configurable via REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS (default 2.0 seconds). A background task processes queued writes after the debounce timeout and handles state expiration cleanup.
Major changes:
- New
QueueItemdataclass to track pending writes with timestamps - Background
_process_write_queue()task that processes writes older than debounce period - Memory cache expiration tracking via
_token_last_toucheddictionary - Standardized
close()method in baseStateManagerclass to flush pending writes on shutdown - Integration with app lifespan to ensure clean shutdown
- Updated tests to always call
state_manager.close()
Issues found:
- Critical: Line 295 only adds to queue if token not present, causing subsequent updates to be silently dropped
- Critical: In-memory cache
self.statesnever updated inset_state(), causing stale data to be served while waiting for debounced writes
Confidence Score: 1/5
- This PR contains critical bugs that will cause data loss and stale state issues in production
- Two critical logical errors in the core state management flow: (1) subsequent state updates are silently dropped when debouncing is enabled due to conditional queue insertion, and (2) the in-memory cache is never updated during set_state, causing stale data to be served to clients. These bugs will manifest as lost state updates and inconsistent application state.
- reflex/istate/manager/disk.py requires immediate attention - the set_state() method has critical bugs at lines 285-305
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| reflex/istate/manager/disk.py | 1/5 | Adds deferred write queue with debouncing and expiration handling; critical bug where subsequent updates to same token are dropped, and memory cache not updated causing stale data |
| reflex/istate/manager/init.py | 5/5 | Adds close() method to base StateManager class with empty default implementation |
| reflex/environment.py | 5/5 | Adds interpret_float_env() helper and REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS environment variable with 2.0 default |
| reflex/app_mixins/lifespan.py | 5/5 | Calls state_manager.close() during app shutdown to flush pending writes |
Sequence Diagram
sequenceDiagram
participant Client
participant App
participant StateManagerDisk
participant WriteQueue
participant ProcessTask
participant Disk
Note over StateManagerDisk: Initialization
StateManagerDisk->>StateManagerDisk: __post_init__()
StateManagerDisk->>Disk: _purge_expired_states()
Note over Client,Disk: State Update Flow (with debounce)
Client->>App: User action triggers state change
App->>StateManagerDisk: modify_state(token)
StateManagerDisk->>StateManagerDisk: get_state(token)
StateManagerDisk-->>App: yield state
App->>App: Modify state
App->>StateManagerDisk: set_state(token, state)
StateManagerDisk->>WriteQueue: Add/Update QueueItem(token, state, timestamp)
StateManagerDisk->>StateManagerDisk: _schedule_process_write_queue()
StateManagerDisk->>ProcessTask: create_task() if not running
StateManagerDisk-->>App: Return (state not yet on disk)
Note over ProcessTask,Disk: Background Write Processing
loop Every debounce period
ProcessTask->>ProcessTask: Check queue for items older than debounce
ProcessTask->>WriteQueue: Pop items ready to write
ProcessTask->>Disk: set_state_for_substate() via run_in_thread
ProcessTask->>ProcessTask: Check for expired tokens
ProcessTask->>Disk: _purge_expired_states() via run_in_thread
ProcessTask->>ProcessTask: _process_write_queue_delay()
end
Note over App,Disk: App Shutdown
App->>StateManagerDisk: close()
StateManagerDisk->>ProcessTask: cancel()
ProcessTask->>WriteQueue: Flush all remaining items
ProcessTask->>Disk: Write all items to disk
StateManagerDisk-->>App: Shutdown complete
Additional Comments (1)
-
reflex/istate/manager/disk.py, line 285-305 (link)logic: in-memory cache is never updated during set_state, only during get_state. this causes stale data to be served from memory cache while waiting for debounced writes. add memory cache update before queueing the write
6 files reviewed, 3 comments
CodSpeed Performance ReportMerging #5883 will not alter performanceComparing Summary
Footnotes |
… state would expire conserve resources by pausing the _process_write_queue for the amount of time of the oldest known token to expire.
Avoid interference with _schedule_process_write_queue
run_in_threadfor the actual disk write and purging of expired states to avoid blocking the event loop while writing to diskstate_manager.close()