Conversation
substrate/rpc/src/chain/mod.rs
Outdated
| #[pubsub(name = "chain_newHead")] { | ||
| /// New head subscription | ||
| #[rpc(name = "subscribe_newHead")] | ||
| #[rpc(name = "subscribe_newHead", alias = ["chain_subscribeNewHead", ])] |
There was a problem hiding this comment.
Great. Maybe it is better to have the "old" as the alias and the "new" one as the name?
(e.g. chain_subscribeNewHead is actually probably the preferred one to match since it aligns with other RPCs, my gut tells me the preferred one should be the "default")
dvdplm
left a comment
There was a problem hiding this comment.
Good stuff. I can't say I understand it all, but overall very readable and interesting.
substrate/client/src/client.rs
Outdated
|
|
||
| /// Get storage changes event stream. | ||
| /// | ||
| /// Passing `None` as keys subscribes to all possible keys |
There was a problem hiding this comment.
typo: …as keys should be …as key
There was a problem hiding this comment.
Rephrased the whole sentence, hope it's clearer now.
substrate/client/src/client.rs
Outdated
| if let Some(storage_update) = storage_update { | ||
| if let Some((storage_update, changes)) = storage_update { | ||
| transaction.update_storage(storage_update)?; | ||
| // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? |
There was a problem hiding this comment.
Couldn't we emit a re-org event with some meta data about what changed and let interested subscribers re-fetch?
There was a problem hiding this comment.
Yes, that's possible, although it's not easy to get storage changes for blocks that are already imported/executed. Re-fetching changes would mean to re-execute the blocks, but I suppose based on the filter_keys we could just return all the storage values in the re-orged blocks.
| subscribers.extend(listeners.iter()); | ||
| } | ||
|
|
||
| if has_wildcard || listeners.is_some() { |
There was a problem hiding this comment.
Couldn't you check for !susbscribers.is_empty() here? Or is it faster to do it this way?
There was a problem hiding this comment.
I'm actually interested only in subscribers for that particular key. So if there is a set of changes:
[(1, Some(2)), (2, None), (3, Some(4))]
but we have no wildcard_listeners and only listener for key=1, changes vector will only contain [(StorageKey(1), Some(StorageData(2))]
| filter: filter.clone(), | ||
| })).is_err() | ||
| }, | ||
| None => false, |
There was a problem hiding this comment.
If I read this right, if the subscriber is gone when we get here, then we assume they've been removed properly already, so we're returning false to avoid calling remove_subscriber() again for them? It's a bit unclear to me how they can still be in the subscribers collection though, can you elaborate on that?
There was a problem hiding this comment.
Indeed, I could actually .expect() here, since if the structure is consistent the subscribers should always be in self.sinks. The check here is superfluous.
I could refactor to:
let &(ref sink, ref filter) = self.sinks.get(&subscriber).expect("subscribers returned from self.listeners are always in self.sinks; qed");
let result = sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
filter: filter.clone(),
}));
if result.is_err() {
self.remove_subscriber(subscriber);
}or
if let Some(&(ref sink, ref filter)) = match self.sinks.get(&subscriber) {
let result = sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
filter: filter.clone(),
}));
if result.is_err() {
self.remove_subscriber(subscriber);
}
}Which one do you prefer?
There was a problem hiding this comment.
Oh, actually I can't since .get() borrows immutably, so remove_subscriber has to be outside of the scope.
| assert_eq!(notifications.listeners.len(), 2); | ||
| assert_eq!(notifications.wildcard_listeners.len(), 1); | ||
| } | ||
|
|
There was a problem hiding this comment.
The channels are closed here, correct?
There was a problem hiding this comment.
Yes and since the receiving end is dropped, sending to such channel will trigger an error.
| sink | ||
| .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) | ||
| .send_all(stream) | ||
| // we ignore the resulting Stream (if the first stream is over we are unsubscribed) |
There was a problem hiding this comment.
…is over…? Do you mean …is closed?
There was a problem hiding this comment.
is over as in is finished/is done. Which means that the stream will not emit any more items.
| /// Drain committed changes to an iterator. | ||
| /// | ||
| /// Panics: | ||
| /// Will panic if there are any uncommitted prospective changes. |
There was a problem hiding this comment.
Is "prospective" sort of like "pending"?
There was a problem hiding this comment.
Yes, those are changes that can still be easily discarded. You can see example of usage inside block builder:
- We run a transaction
- It produces a set of prospective changes
- If we detect that it's somehow invalid we discard the prospective changes
- If we accept the transaction we commit prospective changes.
substrate/client/src/client.rs
Outdated
| if let Some((storage_update, changes)) = storage_update { | ||
| transaction.update_storage(storage_update)?; | ||
| // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? | ||
| self.storage_notifications.lock() |
There was a problem hiding this comment.
Are you sure that it should be called before transaction is committed?
There was a problem hiding this comment.
Good point. Moved the notification after commit and also guarded by the same if as block import notification.
gavofyork
left a comment
There was a problem hiding this comment.
Aside from the minor comment
| @@ -0,0 +1,267 @@ | |||
| // Copyright 2017 Parity Technologies (UK) Ltd. | |||
| // This file is part of Polkadot. | |||
3d6c0d9 to
e5e7c4c
Compare
* master: Collator for the "adder" (formerly basic-add) parachain and various small fixes (#438) Storage changes subscription (#464) Wasm execution optimizations (#466) Fix the --key generation (#475) Fix typo in service.rs (#472) Fix session phase in early-exit (#453) Make ping unidirectional (#458) Update README.adoc
* Initial implementation of storage events. * Attaching storage events. * Expose storage modification stream over RPC. * Use FNV for hashing small keys. * Fix and add tests. * Swap alias and RPC name. * Fix demo. * Addressing review grumbles. * Fix comment.
Skip plot if it is too small
* Bump release version to v0.18.0 Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Update changelog Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Update dependency version to v0.18.0 Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Modify changelog Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Move changelog entries from added to changed
CC @jacogr