#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
fmt, iter,
sync::Arc,
};
use eyeball::{SharedObservable, Subscriber};
#[cfg(not(target_arch = "wasm32"))]
use eyeball_im::{Vector, VectorDiff};
#[cfg(not(target_arch = "wasm32"))]
use futures_util::Stream;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_crypto::{
store::DynCryptoStore, CollectStrategy, EncryptionSettings, EncryptionSyncChanges, OlmError,
OlmMachine, ToDeviceRequest,
};
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
room::{history_visibility::HistoryVisibility, message::MessageType},
SyncMessageLikeEvent,
};
#[cfg(doc)]
use ruma::DeviceId;
use ruma::{
api::client as api,
events::{
ignored_user_list::IgnoredUserListEvent,
push_rules::{PushRulesEvent, PushRulesEventContent},
room::{
member::{MembershipState, RoomMemberEventContent, SyncRoomMemberEvent},
power_levels::{
RoomPowerLevelsEvent, RoomPowerLevelsEventContent, StrippedRoomPowerLevelsEvent,
},
},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent, AnySyncStateEvent,
AnySyncTimelineEvent, GlobalAccountDataEventType, StateEvent, StateEventType,
SyncStateEvent,
},
push::{Action, PushConditionRoomCtx, Ruleset},
serde::Raw,
time::Instant,
OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
};
use tokio::sync::{broadcast, Mutex};
#[cfg(feature = "e2e-encryption")]
use tokio::sync::{RwLock, RwLockReadGuard};
use tracing::{debug, info, instrument, trace, warn};
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
#[cfg(feature = "e2e-encryption")]
use crate::RoomMemberships;
use crate::{
deserialized_responses::{RawAnySyncOrStrippedTimelineEvent, SyncTimelineEvent},
error::{Error, Result},
event_cache_store::DynEventCacheStore,
rooms::{
normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons},
Room, RoomInfo, RoomState,
},
store::{
ambiguity_map::AmbiguityCache, DynStateStore, MemoryStore, Result as StoreResult,
StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, Store, StoreConfig,
},
sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse, Timeline},
RoomStateFilter, SessionMeta,
};
#[derive(Clone)]
pub struct BaseClient {
pub(crate) store: Store,
event_cache_store: Arc<DynEventCacheStore>,
#[cfg(feature = "e2e-encryption")]
crypto_store: Arc<DynCryptoStore>,
#[cfg(feature = "e2e-encryption")]
olm_machine: Arc<RwLock<Option<OlmMachine>>>,
pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
#[cfg(feature = "e2e-encryption")]
pub room_key_recipient_strategy: CollectStrategy,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for BaseClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("session_meta", &self.store.session_meta())
.field("sync_token", &self.store.sync_token)
.finish_non_exhaustive()
}
}
impl BaseClient {
pub fn new() -> Self {
BaseClient::with_store_config(StoreConfig::default())
}
pub fn with_store_config(config: StoreConfig) -> Self {
let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
broadcast::channel(100);
BaseClient {
store: Store::new(config.state_store),
event_cache_store: config.event_cache_store,
#[cfg(feature = "e2e-encryption")]
crypto_store: config.crypto_store,
#[cfg(feature = "e2e-encryption")]
olm_machine: Default::default(),
ignore_user_list_changes: Default::default(),
room_info_notable_update_sender,
#[cfg(feature = "e2e-encryption")]
room_key_recipient_strategy: Default::default(),
}
}
#[cfg(feature = "e2e-encryption")]
pub fn clone_with_in_memory_state_store(&self) -> Self {
let config = StoreConfig::new().state_store(MemoryStore::new());
let config = config.crypto_store(self.crypto_store.clone());
let mut result = Self::with_store_config(config);
result.room_key_recipient_strategy = self.room_key_recipient_strategy.clone();
result
}
#[cfg(not(feature = "e2e-encryption"))]
pub fn clone_with_in_memory_state_store(&self) -> Self {
let config = StoreConfig::new().state_store(MemoryStore::new());
Self::with_store_config(config)
}
pub fn session_meta(&self) -> Option<&SessionMeta> {
self.store.session_meta()
}
pub fn rooms(&self) -> Vec<Room> {
self.store.rooms()
}
pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
self.store.rooms_filtered(filter)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
self.store.rooms_stream()
}
pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
self.store.get_or_create_room(
room_id,
room_state,
self.room_info_notable_update_sender.clone(),
)
}
#[allow(unknown_lints, clippy::explicit_auto_deref)]
pub fn store(&self) -> &DynStateStore {
&*self.store
}
pub fn event_cache_store(&self) -> &DynEventCacheStore {
&*self.event_cache_store
}
pub fn logged_in(&self) -> bool {
self.store.session_meta().is_some()
}
pub async fn set_session_meta(
&self,
session_meta: SessionMeta,
#[cfg(feature = "e2e-encryption")] custom_account: Option<
crate::crypto::vodozemac::olm::Account,
>,
) -> Result<()> {
debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login");
self.store
.set_session_meta(session_meta.clone(), &self.room_info_notable_update_sender)
.await?;
#[cfg(feature = "e2e-encryption")]
self.regenerate_olm(custom_account).await?;
Ok(())
}
#[cfg(feature = "e2e-encryption")]
pub async fn regenerate_olm(
&self,
custom_account: Option<crate::crypto::vodozemac::olm::Account>,
) -> Result<()> {
tracing::debug!("regenerating OlmMachine");
let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
let olm_machine = OlmMachine::with_store(
&session_meta.user_id,
&session_meta.device_id,
self.crypto_store.clone(),
custom_account,
)
.await
.map_err(OlmError::from)?;
*self.olm_machine.write().await = Some(olm_machine);
Ok(())
}
pub async fn sync_token(&self) -> Option<String> {
self.store.sync_token.read().await.clone()
}
#[cfg(feature = "e2e-encryption")]
async fn handle_verification_event(
&self,
event: &AnySyncMessageLikeEvent,
room_id: &RoomId,
) -> Result<()> {
if let Some(olm) = self.olm_machine().await.as_ref() {
olm.receive_verification_event(&event.clone().into_full_event(room_id.to_owned()))
.await?;
}
Ok(())
}
#[cfg(feature = "e2e-encryption")]
async fn decrypt_sync_room_event(
&self,
event: &Raw<AnySyncTimelineEvent>,
room_id: &RoomId,
) -> Result<Option<SyncTimelineEvent>> {
let olm = self.olm_machine().await;
let Some(olm) = olm.as_ref() else { return Ok(None) };
let event: SyncTimelineEvent =
olm.decrypt_room_event(event.cast_ref(), room_id).await?.into();
if let Ok(AnySyncTimelineEvent::MessageLike(e)) = event.event.deserialize() {
match &e {
AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
original_event,
)) => {
if let MessageType::VerificationRequest(_) = &original_event.content.msgtype {
self.handle_verification_event(&e, room_id).await?;
}
}
_ if e.event_type().to_string().starts_with("m.key.verification") => {
self.handle_verification_event(&e, room_id).await?;
}
_ => (),
}
}
Ok(Some(event))
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn handle_timeline(
&self,
room: &Room,
limited: bool,
events: Vec<Raw<AnySyncTimelineEvent>>,
prev_batch: Option<String>,
push_rules: &Ruleset,
user_ids: &mut BTreeSet<OwnedUserId>,
room_info: &mut RoomInfo,
changes: &mut StateChanges,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
ambiguity_cache: &mut AmbiguityCache,
) -> Result<Timeline> {
let mut timeline = Timeline::new(limited, prev_batch);
let mut push_context = self.get_push_room_context(room, room_info, changes).await?;
for event in events {
let mut event: SyncTimelineEvent = event.into();
match event.event.deserialize() {
Ok(e) => {
#[allow(clippy::single_match)]
match &e {
AnySyncTimelineEvent::State(s) => {
match s {
AnySyncStateEvent::RoomMember(member) => {
Box::pin(ambiguity_cache.handle_event(
changes,
room.room_id(),
member,
))
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => {
user_ids.remove(member.state_key());
}
}
handle_room_member_event_for_profiles(
room.room_id(),
member,
changes,
);
}
_ => {
room_info.handle_state_event(s);
}
}
let raw_event: Raw<AnySyncStateEvent> = event.event.clone().cast();
changes.add_state_event(room.room_id(), s.clone(), raw_event);
}
AnySyncTimelineEvent::MessageLike(
AnySyncMessageLikeEvent::RoomRedaction(r),
) => {
let room_version =
room_info.room_version().unwrap_or(&RoomVersionId::V1);
if let Some(redacts) = r.redacts(room_version) {
room_info.handle_redaction(r, event.event.cast_ref());
let raw_event = event.event.clone().cast();
changes.add_redaction(room.room_id(), redacts, raw_event);
}
}
#[cfg(feature = "e2e-encryption")]
AnySyncTimelineEvent::MessageLike(e) => match e {
AnySyncMessageLikeEvent::RoomEncrypted(
SyncMessageLikeEvent::Original(_),
) => {
if let Ok(Some(e)) = Box::pin(
self.decrypt_sync_room_event(&event.event, room.room_id()),
)
.await
{
event = e;
}
}
AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Original(original_event),
) => match &original_event.content.msgtype {
MessageType::VerificationRequest(_) => {
Box::pin(self.handle_verification_event(e, room.room_id()))
.await?;
}
_ => (),
},
_ if e.event_type().to_string().starts_with("m.key.verification") => {
Box::pin(self.handle_verification_event(e, room.room_id())).await?;
}
_ => (),
},
#[cfg(not(feature = "e2e-encryption"))]
AnySyncTimelineEvent::MessageLike(_) => (),
}
if let Some(context) = &mut push_context {
self.update_push_room_context(
context,
room.own_user_id(),
room_info,
changes,
)
} else {
push_context = self.get_push_room_context(room, room_info, changes).await?;
}
if let Some(context) = &push_context {
let actions = push_rules.get_actions(&event.event, context);
if actions.iter().any(Action::should_notify) {
notifications.entry(room.room_id().to_owned()).or_default().push(
Notification {
actions: actions.to_owned(),
event: RawAnySyncOrStrippedTimelineEvent::Sync(
event.event.clone(),
),
},
);
}
event.push_actions = actions.to_owned();
}
}
Err(e) => {
warn!("Error deserializing event: {e}");
}
}
timeline.events.push(event);
}
Ok(timeline)
}
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn handle_invited_state(
&self,
room: &Room,
events: &[Raw<AnyStrippedStateEvent>],
push_rules: &Ruleset,
room_info: &mut RoomInfo,
changes: &mut StateChanges,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
) -> Result<()> {
let mut state_events = BTreeMap::new();
for raw_event in events {
match raw_event.deserialize() {
Ok(e) => {
room_info.handle_stripped_state_event(&e);
state_events
.entry(e.event_type())
.or_insert_with(BTreeMap::new)
.insert(e.state_key().to_owned(), raw_event.clone());
}
Err(err) => {
warn!(
room_id = ?room_info.room_id,
"Couldn't deserialize stripped state event: {err:?}",
);
}
}
}
changes.stripped_state.insert(room_info.room_id().to_owned(), state_events.clone());
if let Some(push_context) = self.get_push_room_context(room, room_info, changes).await? {
for event in state_events.values().flat_map(|map| map.values()) {
let actions = push_rules.get_actions(event, &push_context);
if actions.iter().any(Action::should_notify) {
notifications.entry(room.room_id().to_owned()).or_default().push(
Notification {
actions: actions.to_owned(),
event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()),
},
);
}
}
}
Ok(())
}
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn handle_state(
&self,
raw_events: &[Raw<AnySyncStateEvent>],
events: &[AnySyncStateEvent],
room_info: &mut RoomInfo,
changes: &mut StateChanges,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut state_events = BTreeMap::new();
let mut user_ids = BTreeSet::new();
assert_eq!(raw_events.len(), events.len());
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = &event {
ambiguity_cache.handle_event(changes, &room_info.room_id, member).await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
handle_room_member_event_for_profiles(&room_info.room_id, member, changes);
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
changes.state.insert((*room_info.room_id).to_owned(), state_events);
Ok(user_ids)
}
#[instrument(skip_all, fields(?room_id))]
pub(crate) async fn handle_room_account_data(
&self,
room_id: &RoomId,
events: &[Raw<AnyRoomAccountDataEvent>],
changes: &mut StateChanges,
room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
) {
fn on_room_info<F>(
room_id: &RoomId,
changes: &mut StateChanges,
client: &BaseClient,
mut on_room_info: F,
) where
F: FnMut(&mut RoomInfo),
{
if let Some(room_info) = changes.room_infos.get_mut(room_id) {
on_room_info(room_info);
}
else if let Some(room) = client.store.room(room_id) {
let mut room_info = room.clone_info();
on_room_info(&mut room_info);
changes.add_room(room_info);
}
}
for raw_event in events {
match raw_event.deserialize() {
Ok(event) => {
changes.add_room_account_data(room_id, event.clone(), raw_event.clone());
match event {
AnyRoomAccountDataEvent::MarkedUnread(event) => {
on_room_info(room_id, changes, self, |room_info| {
if room_info.base_info.is_marked_unread != event.content.unread {
room_info_notable_updates
.entry(room_id.to_owned())
.or_default()
.insert(RoomInfoNotableUpdateReasons::UNREAD_MARKER);
}
room_info.base_info.is_marked_unread = event.content.unread;
});
}
AnyRoomAccountDataEvent::Tag(event) => {
on_room_info(room_id, changes, self, |room_info| {
room_info.base_info.handle_notable_tags(&event.content.tags);
});
}
_ => {}
}
}
Err(err) => {
warn!("unable to deserialize account data event: {err}");
}
}
}
}
#[instrument(skip_all)]
pub(crate) async fn handle_account_data(
&self,
events: &[Raw<AnyGlobalAccountDataEvent>],
changes: &mut StateChanges,
) {
let mut account_data = BTreeMap::new();
for raw_event in events {
let event = match raw_event.deserialize() {
Ok(e) => e,
Err(e) => {
let event_type: Option<String> = raw_event.get_field("type").ok().flatten();
warn!(event_type, "Failed to deserialize a global account data event: {e}");
continue;
}
};
if let AnyGlobalAccountDataEvent::Direct(e) = &event {
let mut new_dms = HashMap::<&RoomId, HashSet<OwnedUserId>>::new();
for (user_id, rooms) in e.content.iter() {
for room_id in rooms {
new_dms.entry(room_id).or_default().insert(user_id.clone());
}
}
let rooms = self.store.rooms();
let mut old_dms = rooms
.iter()
.filter_map(|r| {
let direct_targets = r.direct_targets();
(!direct_targets.is_empty()).then(|| (r.room_id(), direct_targets))
})
.collect::<HashMap<_, _>>();
for (room_id, new_direct_targets) in new_dms {
if let Some(old_direct_targets) = old_dms.remove(&room_id) {
if old_direct_targets == new_direct_targets {
continue;
}
}
trace!(
?room_id, targets = ?new_direct_targets,
"Marking room as direct room"
);
if let Some(info) = changes.room_infos.get_mut(room_id) {
info.base_info.dm_targets = new_direct_targets;
} else if let Some(room) = self.store.room(room_id) {
let mut info = room.clone_info();
info.base_info.dm_targets = new_direct_targets;
changes.add_room(info);
}
}
for room_id in old_dms.keys() {
trace!(?room_id, "Unmarking room as direct room");
if let Some(info) = changes.room_infos.get_mut(*room_id) {
info.base_info.dm_targets.clear();
} else if let Some(room) = self.store.room(room_id) {
let mut info = room.clone_info();
info.base_info.dm_targets.clear();
changes.add_room(info);
}
}
}
account_data.insert(event.event_type(), raw_event.clone());
}
changes.account_data = account_data;
}
#[cfg(feature = "e2e-encryption")]
#[instrument(skip_all)]
pub(crate) async fn preprocess_to_device_events(
&self,
encryption_sync_changes: EncryptionSyncChanges<'_>,
#[cfg(feature = "experimental-sliding-sync")] changes: &mut StateChanges,
#[cfg(not(feature = "experimental-sliding-sync"))] _changes: &mut StateChanges,
#[cfg(feature = "experimental-sliding-sync")] room_info_notable_updates: &mut BTreeMap<
OwnedRoomId,
RoomInfoNotableUpdateReasons,
>,
#[cfg(not(feature = "experimental-sliding-sync"))]
_room_info_notable_updates: &mut BTreeMap<
OwnedRoomId,
RoomInfoNotableUpdateReasons,
>,
) -> Result<Vec<Raw<ruma::events::AnyToDeviceEvent>>> {
if let Some(o) = self.olm_machine().await.as_ref() {
let (events, room_key_updates) =
o.receive_sync_changes(encryption_sync_changes).await?;
#[cfg(feature = "experimental-sliding-sync")]
for room_key_update in room_key_updates {
if let Some(room) = self.get_room(&room_key_update.room_id) {
self.decrypt_latest_events(&room, changes, room_info_notable_updates).await;
}
}
#[cfg(not(feature = "experimental-sliding-sync"))]
drop(room_key_updates); Ok(events)
} else {
Ok(encryption_sync_changes.to_device_events)
}
}
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
async fn decrypt_latest_events(
&self,
room: &Room,
changes: &mut StateChanges,
room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
) {
if let Some((found, found_index)) = self.decrypt_latest_suitable_event(room).await {
room.on_latest_event_decrypted(found, found_index, changes, room_info_notable_updates);
}
}
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
async fn decrypt_latest_suitable_event(
&self,
room: &Room,
) -> Option<(Box<LatestEvent>, usize)> {
let enc_events = room.latest_encrypted_events();
for (i, event) in enc_events.iter().enumerate().rev() {
let decrypt_sync_room_event =
Box::pin(self.decrypt_sync_room_event(event, room.room_id()));
if let Ok(Some(decrypted)) = decrypt_sync_room_event.await {
if let Ok(any_sync_event) = decrypted.event.deserialize() {
match is_suitable_for_latest_event(&any_sync_event) {
PossibleLatestEvent::YesRoomMessage(_)
| PossibleLatestEvent::YesPoll(_)
| PossibleLatestEvent::YesCallInvite(_)
| PossibleLatestEvent::YesCallNotify(_)
| PossibleLatestEvent::YesSticker(_) => {
return Some((Box::new(LatestEvent::new(decrypted)), i));
}
_ => (),
}
}
}
}
None
}
pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
let room = self.store.get_or_create_room(
room_id,
RoomState::Joined,
self.room_info_notable_update_sender.clone(),
);
if room.state() != RoomState::Joined {
let _sync_lock = self.sync_lock().lock().await;
let mut room_info = room.clone_info();
room_info.mark_as_joined();
room_info.mark_state_partially_synced();
room_info.mark_members_missing(); let mut changes = StateChanges::default();
changes.add_room(room_info.clone());
self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
}
Ok(room)
}
pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
let room = self.store.get_or_create_room(
room_id,
RoomState::Left,
self.room_info_notable_update_sender.clone(),
);
if room.state() != RoomState::Left {
let _sync_lock = self.sync_lock().lock().await;
let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
room_info.mark_members_missing(); let mut changes = StateChanges::default();
changes.add_room(room_info.clone());
self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
}
Ok(())
}
pub fn sync_lock(&self) -> &Mutex<()> {
self.store.sync_lock()
}
#[instrument(skip_all)]
pub async fn receive_sync_response(
&self,
response: api::sync::sync_events::v3::Response,
) -> Result<SyncResponse> {
if self.store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
info!("Got the same sync response twice");
return Ok(SyncResponse::default());
}
let now = Instant::now();
let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
#[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
let mut room_info_notable_updates =
BTreeMap::<OwnedRoomId, RoomInfoNotableUpdateReasons>::new();
#[cfg(feature = "e2e-encryption")]
let to_device = self
.preprocess_to_device_events(
EncryptionSyncChanges {
to_device_events: response.to_device.events,
changed_devices: &response.device_lists,
one_time_keys_counts: &response.device_one_time_keys_count,
unused_fallback_keys: response.device_unused_fallback_key_types.as_deref(),
next_batch_token: Some(response.next_batch.clone()),
},
&mut changes,
&mut room_info_notable_updates,
)
.await?;
#[cfg(not(feature = "e2e-encryption"))]
let to_device = response.to_device.events;
let mut ambiguity_cache = AmbiguityCache::new(self.store.inner.clone());
self.handle_account_data(&response.account_data.events, &mut changes).await;
let push_rules = self.get_push_rules(&changes).await?;
let mut new_rooms = RoomUpdates::default();
let mut notifications = Default::default();
for (room_id, new_info) in response.rooms.join {
let room = self.store.get_or_create_room(
&room_id,
RoomState::Joined,
self.room_info_notable_update_sender.clone(),
);
let mut room_info = room.clone_info();
room_info.mark_as_joined();
room_info.update_from_ruma_summary(&new_info.summary);
room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
room_info.mark_state_fully_synced();
let state_events = Self::deserialize_state_events(&new_info.state.events);
let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
state_events.into_iter().unzip();
let mut user_ids = self
.handle_state(
&raw_state_events,
&state_events,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
)
.await?;
for raw in &new_info.ephemeral.events {
match raw.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
changes.add_receipts(&room_id, event.content);
}
Ok(_) => {}
Err(e) => {
let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
#[rustfmt::skip]
info!(
?room_id, event_id,
"Failed to deserialize ephemeral room event: {e}"
);
}
}
}
if new_info.timeline.limited {
room_info.mark_members_missing();
}
let timeline = self
.handle_timeline(
&room,
new_info.timeline.limited,
new_info.timeline.events,
new_info.timeline.prev_batch,
&push_rules,
&mut user_ids,
&mut room_info,
&mut changes,
&mut notifications,
&mut ambiguity_cache,
)
.await?;
changes.add_room(room_info);
self.handle_room_account_data(
&room_id,
&new_info.account_data.events,
&mut changes,
&mut Default::default(),
)
.await;
let mut room_info = changes.room_infos.get(&room_id).unwrap().clone();
#[cfg(feature = "e2e-encryption")]
if room_info.is_encrypted() {
if let Some(o) = self.olm_machine().await.as_ref() {
if !room.is_encrypted() {
let user_ids =
self.store.get_user_ids(&room_id, RoomMemberships::ACTIVE).await?;
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
}
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
}
}
let notification_count = new_info.unread_notifications.into();
room_info.update_notification_count(notification_count);
let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
new_rooms.join.insert(
room_id,
JoinedRoomUpdate::new(
timeline,
new_info.state.events,
new_info.account_data.events,
new_info.ephemeral.events,
notification_count,
ambiguity_changes,
),
);
changes.add_room(room_info);
}
for (room_id, new_info) in response.rooms.leave {
let room = self.store.get_or_create_room(
&room_id,
RoomState::Left,
self.room_info_notable_update_sender.clone(),
);
let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
let state_events = Self::deserialize_state_events(&new_info.state.events);
let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
state_events.into_iter().unzip();
let mut user_ids = self
.handle_state(
&raw_state_events,
&state_events,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
)
.await?;
let timeline = self
.handle_timeline(
&room,
new_info.timeline.limited,
new_info.timeline.events,
new_info.timeline.prev_batch,
&push_rules,
&mut user_ids,
&mut room_info,
&mut changes,
&mut notifications,
&mut ambiguity_cache,
)
.await?;
changes.add_room(room_info);
self.handle_room_account_data(
&room_id,
&new_info.account_data.events,
&mut changes,
&mut Default::default(),
)
.await;
let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
new_rooms.leave.insert(
room_id,
LeftRoomUpdate::new(
timeline,
new_info.state.events,
new_info.account_data.events,
ambiguity_changes,
),
);
}
for (room_id, new_info) in response.rooms.invite {
let room = self.store.get_or_create_room(
&room_id,
RoomState::Invited,
self.room_info_notable_update_sender.clone(),
);
let mut room_info = room.clone_info();
room_info.mark_as_invited();
room_info.mark_state_fully_synced();
self.handle_invited_state(
&room,
&new_info.invite_state.events,
&push_rules,
&mut room_info,
&mut changes,
&mut notifications,
)
.await?;
changes.add_room(room_info);
new_rooms.invite.insert(room_id, new_info);
}
self.handle_account_data(&response.account_data.events, &mut changes).await;
changes.presence = response
.presence
.events
.iter()
.filter_map(|e| {
let event = e.deserialize().ok()?;
Some((event.sender, e.clone()))
})
.collect();
changes.ambiguity_maps = ambiguity_cache.cache;
{
let _sync_lock = self.sync_lock().lock().await;
self.store.save_changes(&changes).await?;
*self.store.sync_token.write().await = Some(response.next_batch.clone());
self.apply_changes(&changes, room_info_notable_updates);
}
new_rooms.update_in_memory_caches(&self.store).await;
info!("Processed a sync response in {:?}", now.elapsed());
let response = SyncResponse {
rooms: new_rooms,
presence: response.presence.events,
account_data: response.account_data.events,
to_device,
notifications,
};
Ok(response)
}
pub(crate) fn apply_changes(
&self,
changes: &StateChanges,
room_info_notable_updates: BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
) {
if changes.account_data.contains_key(&GlobalAccountDataEventType::IgnoredUserList) {
if let Some(event) =
changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
{
match event.deserialize_as::<IgnoredUserListEvent>() {
Ok(event) => {
let user_ids: Vec<String> =
event.content.ignored_users.keys().map(|id| id.to_string()).collect();
self.ignore_user_list_changes.set(user_ids);
}
Err(error) => {
warn!("Failed to deserialize ignored user list event: {error}")
}
}
}
}
for (room_id, room_info) in &changes.room_infos {
if let Some(room) = self.store.room(room_id) {
let room_info_notable_update_reasons =
room_info_notable_updates.get(room_id).copied().unwrap_or_default();
room.set_room_info(room_info.clone(), room_info_notable_update_reasons)
}
}
}
#[instrument(skip_all, fields(?room_id))]
pub async fn receive_all_members(
&self,
room_id: &RoomId,
request: &api::membership::get_member_events::v3::Request,
response: &api::membership::get_member_events::v3::Response,
) -> Result<()> {
if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
{
return Err(Error::InvalidReceiveMembersParameters);
}
let Some(room) = self.store.room(room_id) else {
return Ok(());
};
let mut chunk = Vec::with_capacity(response.chunk.len());
let mut changes = StateChanges::default();
#[cfg(feature = "e2e-encryption")]
let mut user_ids = BTreeSet::new();
let mut ambiguity_map: BTreeMap<String, BTreeSet<OwnedUserId>> = BTreeMap::new();
for raw_event in &response.chunk {
let member = match raw_event.deserialize() {
Ok(ev) => ev,
Err(e) => {
let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
debug!(event_id, "Failed to deserialize member event: {e}");
continue;
}
};
#[cfg(feature = "e2e-encryption")]
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
if let StateEvent::Original(e) = &member {
if let Some(d) = &e.content.displayname {
ambiguity_map.entry(d.clone()).or_default().insert(member.state_key().clone());
}
}
let sync_member: SyncRoomMemberEvent = member.clone().into();
handle_room_member_event_for_profiles(room_id, &sync_member, &mut changes);
changes
.state
.entry(room_id.to_owned())
.or_default()
.entry(member.event_type())
.or_default()
.insert(member.state_key().to_string(), raw_event.clone().cast());
chunk.push(member);
}
#[cfg(feature = "e2e-encryption")]
if room.is_encrypted() {
if let Some(o) = self.olm_machine().await.as_ref() {
o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
}
}
changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
let _sync_lock = self.sync_lock().lock().await;
let mut room_info = room.clone_info();
room_info.mark_members_synced();
changes.add_room(room_info);
self.store.save_changes(&changes).await?;
self.apply_changes(&changes, Default::default());
Ok(())
}
pub async fn receive_filter_upload(
&self,
filter_name: &str,
response: &api::filter::create_filter::v3::Response,
) -> Result<()> {
Ok(self
.store
.set_kv_data(
StateStoreDataKey::Filter(filter_name),
StateStoreDataValue::Filter(response.filter_id.clone()),
)
.await?)
}
pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
let filter = self
.store
.get_kv_data(StateStoreDataKey::Filter(filter_name))
.await?
.map(|d| d.into_filter().expect("State store data not a filter"));
Ok(filter)
}
#[cfg(feature = "e2e-encryption")]
pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
match self.olm_machine().await.as_ref() {
Some(o) => {
let (history_visibility, settings) = self
.get_room(room_id)
.map(|r| (r.history_visibility(), r.encryption_settings()))
.unwrap_or((HistoryVisibility::Joined, None));
let filter = if history_visibility == HistoryVisibility::Joined {
RoomMemberships::JOIN
} else {
RoomMemberships::ACTIVE
};
let members = self.store.get_user_ids(room_id, filter).await?;
let settings = settings.ok_or(Error::EncryptionNotEnabled)?;
let settings = EncryptionSettings::new(
settings,
history_visibility,
self.room_key_recipient_strategy.clone(),
);
Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
}
None => panic!("Olm machine wasn't started"),
}
}
pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
self.store.room(room_id)
}
#[cfg(feature = "e2e-encryption")]
pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
self.olm_machine.read().await
}
pub async fn get_push_rules(&self, changes: &StateChanges) -> Result<Ruleset> {
if let Some(event) = changes
.account_data
.get(&GlobalAccountDataEventType::PushRules)
.and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
{
Ok(event.content.global)
} else if let Some(event) = self
.store
.get_account_data_event_static::<PushRulesEventContent>()
.await?
.and_then(|ev| ev.deserialize().ok())
{
Ok(event.content.global)
} else if let Some(session_meta) = self.store.session_meta() {
Ok(Ruleset::server_default(&session_meta.user_id))
} else {
Ok(Ruleset::new())
}
}
pub async fn get_push_room_context(
&self,
room: &Room,
room_info: &RoomInfo,
changes: &StateChanges,
) -> Result<Option<PushConditionRoomCtx>> {
let room_id = room.room_id();
let user_id = room.own_user_id();
let member_count = room_info.active_members_count();
let user_display_name = if let Some(AnySyncStateEvent::RoomMember(member)) =
changes.state.get(room_id).and_then(|events| {
events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
}) {
member
.as_original()
.and_then(|ev| ev.content.displayname.clone())
.unwrap_or_else(|| user_id.localpart().to_owned())
} else if let Some(AnyStrippedStateEvent::RoomMember(member)) =
changes.stripped_state.get(room_id).and_then(|events| {
events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
})
{
member.content.displayname.unwrap_or_else(|| user_id.localpart().to_owned())
} else if let Some(member) = Box::pin(room.get_member(user_id)).await? {
member.name().to_owned()
} else {
trace!("Couldn't get push context because of missing own member information");
return Ok(None);
};
let power_levels = if let Some(event) = changes.state.get(room_id).and_then(|types| {
types
.get(&StateEventType::RoomPowerLevels)?
.get("")?
.deserialize_as::<RoomPowerLevelsEvent>()
.ok()
}) {
Some(event.power_levels().into())
} else if let Some(event) = changes.stripped_state.get(room_id).and_then(|types| {
types
.get(&StateEventType::RoomPowerLevels)?
.get("")?
.deserialize_as::<StrippedRoomPowerLevelsEvent>()
.ok()
}) {
Some(event.power_levels().into())
} else {
self.store
.get_state_event_static::<RoomPowerLevelsEventContent>(room_id)
.await?
.and_then(|e| e.deserialize().ok())
.map(|event| event.power_levels().into())
};
Ok(Some(PushConditionRoomCtx {
user_id: user_id.to_owned(),
room_id: room_id.to_owned(),
member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
user_display_name,
power_levels,
}))
}
pub fn update_push_room_context(
&self,
push_rules: &mut PushConditionRoomCtx,
user_id: &UserId,
room_info: &RoomInfo,
changes: &StateChanges,
) {
let room_id = &*room_info.room_id;
push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap_or(UInt::MAX);
if let Some(AnySyncStateEvent::RoomMember(member)) =
changes.state.get(room_id).and_then(|events| {
events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
})
{
push_rules.user_display_name = member
.as_original()
.and_then(|ev| ev.content.displayname.clone())
.unwrap_or_else(|| user_id.localpart().to_owned())
}
if let Some(AnySyncStateEvent::RoomPowerLevels(event)) =
changes.state.get(room_id).and_then(|types| {
types.get(&StateEventType::RoomPowerLevels)?.get("")?.deserialize().ok()
})
{
push_rules.power_levels = Some(event.power_levels().into());
}
}
pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
self.ignore_user_list_changes.subscribe()
}
pub(crate) fn deserialize_state_events(
raw_events: &[Raw<AnySyncStateEvent>],
) -> Vec<(Raw<AnySyncStateEvent>, AnySyncStateEvent)> {
raw_events
.iter()
.filter_map(|raw_event| match raw_event.deserialize() {
Ok(event) => Some((raw_event.clone(), event)),
Err(e) => {
warn!("Couldn't deserialize state event: {e}");
None
}
})
.collect()
}
pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
self.room_info_notable_update_sender.subscribe()
}
}
impl Default for BaseClient {
fn default() -> Self {
Self::new()
}
}
fn handle_room_member_event_for_profiles(
room_id: &RoomId,
event: &SyncStateEvent<RoomMemberEventContent>,
changes: &mut StateChanges,
) {
if event.state_key() == event.sender() {
changes
.profiles
.entry(room_id.to_owned())
.or_default()
.insert(event.sender().to_owned(), event.into());
}
if *event.membership() == MembershipState::Invite {
changes
.profiles_to_delete
.entry(room_id.to_owned())
.or_default()
.push(event.state_key().clone());
}
}
#[cfg(test)]
mod tests {
use matrix_sdk_test::{
async_test, ruma_response_from_json, sync_timeline_event, InvitedRoomBuilder,
LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder,
};
use ruma::{api::client as api, room_id, serde::Raw, user_id, UserId};
use serde_json::{json, value::to_raw_value};
use super::BaseClient;
use crate::{
store::StateStoreExt, test_utils::logged_in_base_client, DisplayName, RoomState,
SessionMeta,
};
#[async_test]
async fn test_invite_after_leaving() {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!test:example.org");
let client = logged_in_base_client(Some(user_id)).await;
let mut sync_builder = SyncResponseBuilder::new();
let response = sync_builder
.add_left_room(LeftRoomBuilder::new(room_id).add_timeline_event(sync_timeline_event!({
"content": {
"displayname": "Alice",
"membership": "left",
},
"event_id": "$994173582443PhrSn:example.org",
"origin_server_ts": 1432135524678u64,
"sender": user_id,
"state_key": user_id,
"type": "m.room.member",
})))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
let response = sync_builder
.add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
StrippedStateTestEvent::Custom(json!({
"content": {
"displayname": "Alice",
"membership": "invite",
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653u64,
"sender": "@example:example.org",
"state_key": user_id,
"type": "m.room.member",
})),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
}
#[async_test]
async fn test_invite_displayname() {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
let client = logged_in_base_client(Some(user_id)).await;
let response = ruma_response_from_json(&json!({
"next_batch": "asdkl;fjasdkl;fj;asdkl;f",
"device_one_time_keys_count": {
"signed_curve25519": 50u64
},
"device_unused_fallback_key_types": [
"signed_curve25519"
],
"rooms": {
"invite": {
"!ithpyNKDtmhneaTQja:example.org": {
"invite_state": {
"events": [
{
"content": {
"creator": "@test:example.org",
"room_version": "9"
},
"sender": "@test:example.org",
"state_key": "",
"type": "m.room.create"
},
{
"content": {
"join_rule": "invite"
},
"sender": "@test:example.org",
"state_key": "",
"type": "m.room.join_rules"
},
{
"content": {
"algorithm": "m.megolm.v1.aes-sha2"
},
"sender": "@test:example.org",
"state_key": "",
"type": "m.room.encryption"
},
{
"content": {
"avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
"displayname": "Kyra",
"membership": "join"
},
"sender": "@test:example.org",
"state_key": "@test:example.org",
"type": "m.room.member"
},
{
"content": {
"avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
"displayname": "alice",
"is_direct": true,
"membership": "invite"
},
"origin_server_ts": 1650878657984u64,
"sender": "@test:example.org",
"state_key": "@alice:example.org",
"type": "m.room.member",
"unsigned": {
"age": 14u64
},
"event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
}
]
}
}
}
}
}));
client.receive_sync_response(response).await.unwrap();
let room = client.get_room(room_id).expect("Room not found");
assert_eq!(room.state(), RoomState::Invited);
assert_eq!(
room.compute_display_name().await.expect("fetching display name failed"),
DisplayName::Calculated("Kyra".to_owned())
);
}
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
#[async_test]
async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
use std::collections::BTreeMap;
use crate::{rooms::normal::RoomInfoNotableUpdateReasons, StateChanges};
let user_id = user_id!("@u:u.to");
let room_id = room_id!("!r:u.to");
let client = logged_in_base_client(Some(user_id)).await;
let room = process_room_join_test_helper(&client, room_id, "$1", user_id).await;
assert!(room.latest_encrypted_events().is_empty());
assert!(room.latest_event().is_none());
let mut changes = StateChanges::default();
let mut room_info_notable_updates = BTreeMap::new();
client.decrypt_latest_events(&room, &mut changes, &mut room_info_notable_updates).await;
assert!(room.latest_encrypted_events().is_empty());
assert!(room.latest_event().is_none());
assert!(changes.room_infos.is_empty());
assert!(!room_info_notable_updates
.get(room_id)
.copied()
.unwrap_or_default()
.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
}
#[cfg(feature = "e2e-encryption")]
async fn process_room_join_test_helper(
client: &BaseClient,
room_id: &ruma::RoomId,
event_id: &str,
user_id: &UserId,
) -> crate::Room {
let mut sync_builder = SyncResponseBuilder::new();
let response = sync_builder
.add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_timeline_event(
sync_timeline_event!({
"content": {
"displayname": "Alice",
"membership": "join",
},
"event_id": event_id,
"origin_server_ts": 1432135524678u64,
"sender": user_id,
"state_key": user_id,
"type": "m.room.member",
}),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
client.get_room(room_id).expect("Just-created room not found!")
}
#[async_test]
async fn test_deserialization_failure() {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
let client = BaseClient::new();
client
.set_session_meta(
SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
#[cfg(feature = "e2e-encryption")]
None,
)
.await
.unwrap();
let response = ruma_response_from_json(&json!({
"next_batch": "asdkl;fjasdkl;fj;asdkl;f",
"rooms": {
"join": {
"!ithpyNKDtmhneaTQja:example.org": {
"state": {
"events": [
{
"invalid": "invalid",
},
{
"content": {
"name": "The room name"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653u64,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 1234
}
},
]
}
}
}
}
}));
client.receive_sync_response(response).await.unwrap();
client
.store()
.get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
.await
.expect("Failed to fetch state event")
.expect("State event not found")
.deserialize()
.expect("Failed to deserialize state event");
}
#[async_test]
async fn test_invited_members_arent_ignored() {
let user_id = user_id!("@alice:example.org");
let inviter_user_id = user_id!("@bob:example.org");
let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
let client = BaseClient::new();
client
.set_session_meta(
SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
#[cfg(feature = "e2e-encryption")]
None,
)
.await
.unwrap();
let mut sync_builder = SyncResponseBuilder::new();
let response = sync_builder
.add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
let raw_member_event = json!({
"content": {
"avatar_url": "mxc://localhost/fewjilfewjil42",
"displayname": "Invited Alice",
"membership": "invite"
},
"event_id": "$151800140517rfvjc:localhost",
"origin_server_ts": 151800140,
"room_id": room_id,
"sender": inviter_user_id,
"state_key": user_id,
"type": "m.room.member",
"unsigned": {
"age": 13374242,
}
});
let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
to_raw_value(&raw_member_event).unwrap(),
)]);
client.receive_all_members(room_id, &request, &response).await.unwrap();
let room = client.get_room(room_id).unwrap();
let member = room.get_member(user_id).await.expect("ok").expect("exists");
assert_eq!(member.user_id(), user_id);
assert_eq!(member.display_name().unwrap(), "Invited Alice");
assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
}
#[async_test]
async fn test_reinvited_members_get_a_display_name() {
let user_id = user_id!("@alice:example.org");
let inviter_user_id = user_id!("@bob:example.org");
let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
let client = BaseClient::new();
client
.set_session_meta(
SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
#[cfg(feature = "e2e-encryption")]
None,
)
.await
.unwrap();
let mut sync_builder = SyncResponseBuilder::new();
let response = sync_builder
.add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
StateTestEvent::Custom(json!({
"content": {
"avatar_url": null,
"displayname": null,
"membership": "leave"
},
"event_id": "$151803140217rkvjc:localhost",
"origin_server_ts": 151800139,
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
"type": "m.room.member",
})),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
let raw_member_event = json!({
"content": {
"avatar_url": "mxc://localhost/fewjilfewjil42",
"displayname": "Invited Alice",
"membership": "invite"
},
"event_id": "$151800140517rfvjc:localhost",
"origin_server_ts": 151800140,
"room_id": room_id,
"sender": inviter_user_id,
"state_key": user_id,
"type": "m.room.member",
"unsigned": {
"age": 13374242,
}
});
let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
to_raw_value(&raw_member_event).unwrap(),
)]);
client.receive_all_members(room_id, &request, &response).await.unwrap();
let room = client.get_room(room_id).unwrap();
let member = room.get_member(user_id).await.expect("ok").expect("exists");
assert_eq!(member.user_id(), user_id);
assert_eq!(member.display_name().unwrap(), "Invited Alice");
assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
}
}