use std::sync::Arc;
use eyeball_im::{ObservableVectorTransaction, ObservableVectorTransactionEntry};
use indexmap::IndexMap;
use matrix_sdk::{
crypto::types::events::UtdCause, deserialized_responses::EncryptionInfo, send_queue::SendHandle,
};
use ruma::{
events::{
poll::{
unstable_end::UnstablePollEndEventContent,
unstable_response::UnstablePollResponseEventContent,
unstable_start::{
NewUnstablePollStartEventContent, NewUnstablePollStartEventContentWithoutRelation,
UnstablePollStartEventContent,
},
},
reaction::ReactionEventContent,
receipt::Receipt,
relation::Replacement,
room::{
member::RoomMemberEventContent,
message::{self, RoomMessageEventContent, RoomMessageEventContentWithoutRelation},
},
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
AnySyncTimelineEvent, BundledMessageLikeRelations, EventContent, FullStateEventContent,
MessageLikeEventType, StateEventType, SyncStateEvent,
},
html::RemoveReplyFallback,
serde::Raw,
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId,
};
use tracing::{debug, error, field::debug, info, instrument, trace, warn};
use super::{
controller::{TimelineMetadata, TimelineStateTransaction},
day_dividers::DayDividerAdjuster,
event_item::{
AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
LocalEventTimelineItem, Profile, ReactionsByKeyBySender, RemoteEventOrigin,
RemoteEventTimelineItem, TimelineEventItemId,
},
polls::PollState,
reactions::FullReactionKey,
util::{rfind_event_by_id, rfind_event_item},
EventTimelineItem, InReplyToDetails, Message, OtherState, Sticker, TimelineDetails,
TimelineItem, TimelineItemContent,
};
use crate::{
events::SyncTimelineEventWithoutContent,
timeline::{
event_item::{ReactionInfo, ReactionStatus},
reactions::PendingReaction,
},
DEFAULT_SANITIZER_MODE,
};
pub(super) enum Flow {
Local {
txn_id: OwnedTransactionId,
send_handle: Option<SendHandle>,
},
Remote {
event_id: OwnedEventId,
txn_id: Option<OwnedTransactionId>,
raw_event: Raw<AnySyncTimelineEvent>,
position: TimelineItemPosition,
encryption_info: Option<EncryptionInfo>,
},
}
pub(super) struct TimelineEventContext {
pub(super) sender: OwnedUserId,
pub(super) sender_profile: Option<Profile>,
pub(super) timestamp: MilliSecondsSinceUnixEpoch,
pub(super) is_own_event: bool,
pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
pub(super) is_highlighted: bool,
pub(super) flow: Flow,
pub(super) should_add_new_items: bool,
}
#[derive(Clone, Debug)]
pub(super) enum TimelineEventKind {
Message {
content: AnyMessageLikeEventContent,
relations: BundledMessageLikeRelations<AnySyncMessageLikeEvent>,
},
RedactedMessage { event_type: MessageLikeEventType },
Redaction { redacts: OwnedEventId },
RoomMember {
user_id: OwnedUserId,
content: FullStateEventContent<RoomMemberEventContent>,
sender: OwnedUserId,
},
OtherState { state_key: String, content: AnyOtherFullStateEventContent },
FailedToParseMessageLike { event_type: MessageLikeEventType, error: Arc<serde_json::Error> },
FailedToParseState {
event_type: StateEventType,
state_key: String,
error: Arc<serde_json::Error>,
},
}
impl TimelineEventKind {
pub fn from_event(event: AnySyncTimelineEvent, room_version: &RoomVersionId) -> Self {
match event {
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
if let Some(redacts) = ev.redacts(room_version).map(ToOwned::to_owned) {
Self::Redaction { redacts }
} else {
Self::RedactedMessage { event_type: ev.event_type() }
}
}
AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
Some(content) => Self::Message { content, relations: ev.relations() },
None => Self::RedactedMessage { event_type: ev.event_type() },
},
AnySyncTimelineEvent::State(ev) => match ev {
AnySyncStateEvent::RoomMember(ev) => match ev {
SyncStateEvent::Original(ev) => Self::RoomMember {
user_id: ev.state_key,
content: FullStateEventContent::Original {
content: ev.content,
prev_content: ev.unsigned.prev_content,
},
sender: ev.sender,
},
SyncStateEvent::Redacted(ev) => Self::RoomMember {
user_id: ev.state_key,
content: FullStateEventContent::Redacted(ev.content),
sender: ev.sender,
},
},
ev => Self::OtherState {
state_key: ev.state_key().to_owned(),
content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
},
},
}
}
pub(super) fn failed_to_parse(
event: SyncTimelineEventWithoutContent,
error: serde_json::Error,
) -> Self {
let error = Arc::new(error);
match event {
SyncTimelineEventWithoutContent::OriginalMessageLike(ev) => {
Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
}
SyncTimelineEventWithoutContent::RedactedMessageLike(ev) => {
Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
}
SyncTimelineEventWithoutContent::OriginalState(ev) => Self::FailedToParseState {
event_type: ev.content.event_type,
state_key: ev.state_key,
error,
},
SyncTimelineEventWithoutContent::RedactedState(ev) => Self::FailedToParseState {
event_type: ev.content.event_type,
state_key: ev.state_key,
error,
},
}
}
}
#[derive(Clone, Copy, Debug)]
pub(super) enum TimelineItemPosition {
Start { origin: RemoteEventOrigin },
End { origin: RemoteEventOrigin },
#[cfg(feature = "e2e-encryption")]
Update(usize),
}
#[derive(Default)]
pub(super) struct HandleEventResult {
pub(super) item_added: bool,
#[cfg(feature = "e2e-encryption")]
pub(super) item_removed: bool,
pub(super) items_updated: u16,
}
pub(super) struct TimelineEventHandler<'a, 'o> {
items: &'a mut ObservableVectorTransaction<'o, Arc<TimelineItem>>,
meta: &'a mut TimelineMetadata,
ctx: TimelineEventContext,
result: HandleEventResult,
}
impl<'a, 'o> TimelineEventHandler<'a, 'o> {
pub(super) fn new(
state: &'a mut TimelineStateTransaction<'o>,
ctx: TimelineEventContext,
) -> Self {
let TimelineStateTransaction { items, meta, .. } = state;
Self { items, meta, ctx, result: HandleEventResult::default() }
}
#[instrument(skip_all, fields(txn_id, event_id, position))]
pub(super) async fn handle_event(
mut self,
day_divider_adjuster: &mut DayDividerAdjuster,
event_kind: TimelineEventKind,
raw_event: Option<&Raw<AnySyncTimelineEvent>>,
) -> HandleEventResult {
let span = tracing::Span::current();
day_divider_adjuster.mark_used();
match &self.ctx.flow {
Flow::Local { txn_id, .. } => {
span.record("txn_id", debug(txn_id));
debug!("Handling local event");
}
Flow::Remote { event_id, txn_id, position, .. } => {
span.record("event_id", debug(event_id));
span.record("position", debug(position));
if let Some(txn_id) = txn_id {
span.record("txn_id", debug(txn_id));
}
trace!("Handling remote event");
}
};
let should_add = self.ctx.should_add_new_items;
match event_kind {
TimelineEventKind::Message { content, relations } => match content {
AnyMessageLikeEventContent::Reaction(c) => {
self.handle_reaction(c);
}
AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
relates_to: Some(message::Relation::Replacement(re)),
..
}) => {
self.handle_room_message_edit(re);
}
AnyMessageLikeEventContent::RoomMessage(c) => {
if should_add {
self.add_item(TimelineItemContent::message(c, relations, self.items));
}
}
AnyMessageLikeEventContent::RoomEncrypted(c) => {
let cause = UtdCause::determine(raw_event);
self.add_item(TimelineItemContent::unable_to_decrypt(c, cause));
if let Some(hook) = self.meta.unable_to_decrypt_hook.as_ref() {
if let Flow::Remote { event_id, .. } = &self.ctx.flow {
hook.on_utd(event_id, cause).await;
}
}
}
AnyMessageLikeEventContent::Sticker(content) => {
if should_add {
self.add_item(TimelineItemContent::Sticker(Sticker { content }));
}
}
AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::Replacement(c),
) => self.handle_poll_start_edit(c.relates_to),
AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::New(c),
) => self.handle_poll_start(c, should_add),
AnyMessageLikeEventContent::UnstablePollResponse(c) => self.handle_poll_response(c),
AnyMessageLikeEventContent::UnstablePollEnd(c) => self.handle_poll_end(c),
AnyMessageLikeEventContent::CallInvite(_) => {
if should_add {
self.add_item(TimelineItemContent::CallInvite);
}
}
AnyMessageLikeEventContent::CallNotify(_) => {
if should_add {
self.add_item(TimelineItemContent::CallNotify)
}
}
_ => {
debug!(
"Ignoring message-like event of type `{}`, not supported (yet)",
content.event_type()
);
}
},
TimelineEventKind::RedactedMessage { event_type } => {
if event_type != MessageLikeEventType::Reaction && should_add {
self.add_item(TimelineItemContent::RedactedMessage);
}
}
TimelineEventKind::Redaction { redacts } => {
self.handle_redaction(redacts);
}
TimelineEventKind::RoomMember { user_id, content, sender } => {
if should_add {
self.add_item(TimelineItemContent::room_member(user_id, content, sender));
}
}
TimelineEventKind::OtherState { state_key, content } => {
if should_add {
self.add_item(TimelineItemContent::OtherState(OtherState {
state_key,
content,
}));
}
}
TimelineEventKind::FailedToParseMessageLike { event_type, error } => {
if should_add {
self.add_item(TimelineItemContent::FailedToParseMessageLike {
event_type,
error,
});
}
}
TimelineEventKind::FailedToParseState { event_type, state_key, error } => {
if should_add {
self.add_item(TimelineItemContent::FailedToParseState {
event_type,
state_key,
error,
});
}
}
}
if !self.result.item_added {
trace!("No new item added");
#[cfg(feature = "e2e-encryption")]
if let Flow::Remote { position: TimelineItemPosition::Update(idx), .. } = self.ctx.flow
{
trace!("Removing UTD that was successfully retried");
self.items.remove(idx);
self.result.item_removed = true;
}
}
self.result
}
#[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))]
fn handle_room_message_edit(
&mut self,
replacement: Replacement<RoomMessageEventContentWithoutRelation>,
) {
let Some((item_pos, item)) = rfind_event_by_id(self.items, &replacement.event_id) else {
debug!("Timeline item not found, discarding edit");
return;
};
if self.ctx.sender != item.sender() {
info!(
original_sender = ?item.sender(), edit_sender = ?self.ctx.sender,
"Edit event applies to another user's timeline item, discarding"
);
return;
}
let TimelineItemContent::Message(msg) = item.content() else {
info!(
"Edit of message event applies to {:?}, discarding",
item.content().debug_string(),
);
return;
};
let mut msgtype = replacement.new_content.msgtype;
msgtype.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::No);
let new_content = TimelineItemContent::Message(Message {
msgtype,
in_reply_to: msg.in_reply_to.clone(),
thread_root: msg.thread_root.clone(),
edited: true,
mentions: replacement.new_content.mentions,
});
let edit_json = match &self.ctx.flow {
Flow::Local { .. } => None,
Flow::Remote { raw_event, .. } => Some(raw_event.clone()),
};
let mut new_item = item.with_content(new_content, edit_json);
if let EventTimelineItemKind::Remote(remote_event) = &item.kind {
if let Flow::Remote { encryption_info, .. } = &self.ctx.flow {
new_item = new_item.with_kind(EventTimelineItemKind::Remote(
remote_event.with_encryption_info(encryption_info.clone()),
));
}
}
trace!("Applying edit");
self.items.set(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned()));
self.result.items_updated += 1;
}
#[instrument(skip_all, fields(relates_to_event_id = ?c.relates_to.event_id))]
fn handle_reaction(&mut self, c: ReactionEventContent) {
let reacted_to_event_id = &c.relates_to.event_id;
let (reaction_id, send_handle, old_txn_id) = match &self.ctx.flow {
Flow::Local { txn_id, send_handle, .. } => {
(TimelineEventItemId::TransactionId(txn_id.clone()), send_handle.clone(), None)
}
Flow::Remote { event_id, txn_id, .. } => {
(TimelineEventItemId::EventId(event_id.clone()), None, txn_id.as_ref())
}
};
if let Some((idx, event_item)) = rfind_event_by_id(self.items, reacted_to_event_id) {
if let TimelineItemContent::RedactedMessage = event_item.content() {
debug!("Ignoring reaction on redacted event");
return;
}
trace!("Added reaction");
let mut reactions = event_item.reactions.clone();
reactions.entry(c.relates_to.key.clone()).or_default().insert(
self.ctx.sender.clone(),
ReactionInfo {
timestamp: self.ctx.timestamp,
status: match &reaction_id {
TimelineEventItemId::TransactionId(_txn_id) => {
ReactionStatus::LocalToRemote(send_handle)
}
TimelineEventItemId::EventId(event_id) => {
ReactionStatus::RemoteToRemote(event_id.clone())
}
},
},
);
self.items.set(idx, event_item.with_reactions(reactions));
self.result.items_updated += 1;
} else {
trace!("Timeline item not found, adding reaction to the pending list");
let TimelineEventItemId::EventId(reaction_event_id) = reaction_id.clone() else {
error!("Adding local reaction echo to event absent from the timeline");
return;
};
self.meta.reactions.pending.entry(reacted_to_event_id.to_owned()).or_default().insert(
reaction_event_id,
PendingReaction {
key: c.relates_to.key.clone(),
sender_id: self.ctx.sender.clone(),
timestamp: self.ctx.timestamp,
},
);
}
if let Some(txn_id) = old_txn_id {
self.meta.reactions.map.remove(&TimelineEventItemId::TransactionId(txn_id.clone()));
}
self.meta.reactions.map.insert(
reaction_id,
FullReactionKey {
item: TimelineEventItemId::EventId(c.relates_to.event_id),
sender: self.ctx.sender.clone(),
key: c.relates_to.key,
},
);
}
#[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))]
fn handle_poll_start_edit(
&mut self,
replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation>,
) {
let Some((item_pos, item)) = rfind_event_by_id(self.items, &replacement.event_id) else {
debug!("Timeline item not found, discarding poll edit");
return;
};
if self.ctx.sender != item.sender() {
info!(
original_sender = ?item.sender(), edit_sender = ?self.ctx.sender,
"Edit event applies to another user's timeline item, discarding"
);
return;
}
let TimelineItemContent::Poll(poll_state) = &item.content() else {
info!("Edit of poll event applies to {}, discarding", item.content().debug_string(),);
return;
};
let new_content = match poll_state.edit(&replacement.new_content) {
Ok(edited_poll_state) => TimelineItemContent::Poll(edited_poll_state),
Err(e) => {
info!("Failed to apply poll edit: {e:?}");
return;
}
};
let edit_json = match &self.ctx.flow {
Flow::Local { .. } => None,
Flow::Remote { raw_event, .. } => Some(raw_event.clone()),
};
trace!("Applying poll start edit.");
self.items.set(
item_pos,
TimelineItem::new(
item.with_content(new_content, edit_json),
item.internal_id.to_owned(),
),
);
self.result.items_updated += 1;
}
fn handle_poll_start(&mut self, c: NewUnstablePollStartEventContent, should_add: bool) {
let mut poll_state = PollState::new(c);
if let Flow::Remote { event_id, .. } = &self.ctx.flow {
self.meta.poll_pending_events.apply(event_id, &mut poll_state);
}
if should_add {
self.add_item(TimelineItemContent::Poll(poll_state));
}
}
fn handle_poll_response(&mut self, c: UnstablePollResponseEventContent) {
let Some((item_pos, item)) = rfind_event_by_id(self.items, &c.relates_to.event_id) else {
self.meta.poll_pending_events.add_response(
&c.relates_to.event_id,
&self.ctx.sender,
self.ctx.timestamp,
&c,
);
return;
};
let TimelineItemContent::Poll(poll_state) = item.content() else {
return;
};
let new_item = item.with_content(
TimelineItemContent::Poll(poll_state.add_response(
&self.ctx.sender,
self.ctx.timestamp,
&c,
)),
None,
);
trace!("Adding poll response.");
self.items.set(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned()));
self.result.items_updated += 1;
}
fn handle_poll_end(&mut self, c: UnstablePollEndEventContent) {
let Some((item_pos, item)) = rfind_event_by_id(self.items, &c.relates_to.event_id) else {
self.meta.poll_pending_events.add_end(&c.relates_to.event_id, self.ctx.timestamp);
return;
};
let TimelineItemContent::Poll(poll_state) = item.content() else {
return;
};
match poll_state.end(self.ctx.timestamp) {
Ok(poll_state) => {
let new_item = item.with_content(TimelineItemContent::Poll(poll_state), None);
trace!("Ending poll.");
self.items.set(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned()));
self.result.items_updated += 1;
}
Err(_) => {
info!("Got multiple poll end events, discarding");
}
}
}
#[instrument(skip_all, fields(redacts_event_id = ?redacted))]
fn handle_redaction(&mut self, redacted: OwnedEventId) {
if self.handle_reaction_redaction(TimelineEventItemId::EventId(redacted.clone())) {
return;
}
if let Some((idx, item)) = rfind_event_by_id(self.items, &redacted) {
if item.as_remote().is_some() {
if let TimelineItemContent::RedactedMessage = &item.content {
debug!("event item is already redacted");
} else {
let new_item = item.redact(&self.meta.room_version);
self.items.set(idx, TimelineItem::new(new_item, item.internal_id.to_owned()));
self.result.items_updated += 1;
}
} else {
error!("inconsistent state: redaction received on a non-remote event item");
}
} else {
debug!("Timeline item not found, discarding redaction");
};
self.items.for_each(|mut entry| {
let Some(event_item) = entry.as_event() else { return };
let Some(message) = event_item.content.as_message() else { return };
let Some(in_reply_to) = message.in_reply_to() else { return };
let TimelineDetails::Ready(replied_to_event) = &in_reply_to.event else { return };
if redacted == in_reply_to.event_id {
let replied_to_event = replied_to_event.redact(&self.meta.room_version);
let in_reply_to = InReplyToDetails {
event_id: in_reply_to.event_id.clone(),
event: TimelineDetails::Ready(Box::new(replied_to_event)),
};
let content = TimelineItemContent::Message(message.with_in_reply_to(in_reply_to));
let new_item = entry.with_kind(event_item.with_content(content, None));
ObservableVectorTransactionEntry::set(&mut entry, new_item);
}
});
}
#[instrument(skip_all, fields(redacts = ?reaction_id))]
fn handle_reaction_redaction(&mut self, reaction_id: TimelineEventItemId) -> bool {
if let Some(FullReactionKey {
item: TimelineEventItemId::EventId(reacted_to_event_id),
key,
sender,
}) = self.meta.reactions.map.remove(&reaction_id)
{
let Some((item_pos, item)) = rfind_event_by_id(self.items, &reacted_to_event_id) else {
if let TimelineEventItemId::EventId(event_id) = reaction_id {
if let Some(reactions) =
self.meta.reactions.pending.get_mut(&reacted_to_event_id)
{
reactions.swap_remove(&event_id);
}
}
return false;
};
let mut reactions = item.reactions.clone();
if reactions.remove_reaction(&sender, &key).is_some() {
trace!("Removing reaction");
self.items.set(item_pos, item.with_reactions(reactions));
self.result.items_updated += 1;
return true;
}
}
warn!("Reaction to redact was missing from the reaction or user map");
false
}
fn add_item(&mut self, content: TimelineItemContent) {
self.result.item_added = true;
let sender = self.ctx.sender.to_owned();
let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
let timestamp = self.ctx.timestamp;
let reactions = self.pending_reactions(&content).unwrap_or_default();
let kind: EventTimelineItemKind = match &self.ctx.flow {
Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
send_state: EventSendState::NotSentYet,
transaction_id: txn_id.to_owned(),
send_handle: send_handle.clone(),
}
.into(),
Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
let origin = match *position {
TimelineItemPosition::Start { origin }
| TimelineItemPosition::End { origin } => origin,
#[cfg(feature = "e2e-encryption")]
TimelineItemPosition::Update(idx) => self.items[idx]
.as_event()
.and_then(|ev| Some(ev.as_remote()?.origin))
.unwrap_or_else(|| {
error!("Decryption retried on a local event");
RemoteEventOrigin::Unknown
}),
};
RemoteEventTimelineItem {
event_id: event_id.clone(),
transaction_id: txn_id.clone(),
read_receipts: self.ctx.read_receipts.clone(),
is_own: self.ctx.is_own_event,
is_highlighted: self.ctx.is_highlighted,
encryption_info: encryption_info.clone(),
original_json: Some(raw_event.clone()),
latest_edit_json: None,
origin,
}
.into()
}
};
let is_room_encrypted = self.meta.is_room_encrypted;
let mut item = EventTimelineItem::new(
sender,
sender_profile,
timestamp,
content,
kind,
reactions,
is_room_encrypted,
);
match &self.ctx.flow {
Flow::Local { .. } => {
trace!("Adding new local timeline item");
let item = self.meta.new_timeline_item(item);
self.items.push_back(item);
}
Flow::Remote { position: TimelineItemPosition::Start { .. }, event_id, .. } => {
if self
.items
.iter()
.filter_map(|ev| ev.as_event()?.event_id())
.any(|id| id == event_id)
{
trace!("Skipping back-paginated event that has already been seen");
return;
}
trace!("Adding new remote timeline item at the start");
let item = self.meta.new_timeline_item(item);
self.items.push_front(item);
}
Flow::Remote {
position: TimelineItemPosition::End { .. }, txn_id, event_id, ..
} => {
let result = rfind_event_item(self.items, |it| {
txn_id.is_some() && it.transaction_id() == txn_id.as_deref()
|| it.event_id() == Some(event_id)
});
let mut removed_event_item_id = None;
if let Some((idx, old_item)) = result {
if old_item.as_remote().is_some() {
trace!(?item, old_item = ?*old_item, "Received duplicate event");
if old_item.content.is_redacted() && !item.content.is_redacted() {
warn!("Got original form of an event that was previously redacted");
item.content = item.content.redact(&self.meta.room_version);
item.reactions.clear();
}
}
transfer_details(&mut item, &old_item);
let old_item_id = old_item.internal_id;
if idx == self.items.len() - 1 {
trace!(idx, "Replacing existing event");
self.items.set(idx, TimelineItem::new(item, old_item_id.to_owned()));
return;
}
trace!("Removing local echo or duplicate timeline item");
removed_event_item_id = Some(self.items.remove(idx).internal_id.clone());
}
let latest_event_idx = self
.items
.iter()
.enumerate()
.rev()
.find_map(|(idx, item)| (!item.as_event()?.is_local_echo()).then_some(idx));
let insert_idx = latest_event_idx.map_or(0, |idx| idx + 1);
trace!("Adding new remote timeline item after all non-pending events");
let new_item = match removed_event_item_id {
Some(id) => TimelineItem::new(item, id),
None => self.meta.new_timeline_item(item),
};
if insert_idx == self.items.len() {
self.items.push_back(new_item);
} else if insert_idx == 0 {
self.items.push_front(new_item);
} else {
self.items.insert(insert_idx, new_item);
}
}
#[cfg(feature = "e2e-encryption")]
Flow::Remote { position: TimelineItemPosition::Update(idx), .. } => {
trace!("Updating timeline item at position {idx}");
let id = self.items[*idx].internal_id.clone();
self.items.set(*idx, TimelineItem::new(item, id));
}
}
if !self.meta.has_up_to_date_read_marker_item {
self.meta.update_read_marker(self.items);
}
}
fn pending_reactions(
&mut self,
content: &TimelineItemContent,
) -> Option<ReactionsByKeyBySender> {
if let TimelineItemContent::RedactedMessage = content {
return None;
}
match &self.ctx.flow {
Flow::Local { .. } => None,
Flow::Remote { event_id, .. } => {
let reactions = self.meta.reactions.pending.remove(event_id)?;
let mut bundled = ReactionsByKeyBySender::default();
for (reaction_event_id, reaction) in reactions {
let group: &mut IndexMap<OwnedUserId, ReactionInfo> =
bundled.entry(reaction.key).or_default();
group.insert(
reaction.sender_id,
ReactionInfo {
timestamp: reaction.timestamp,
status: ReactionStatus::RemoteToRemote(reaction_event_id),
},
);
}
Some(bundled)
}
}
}
}
fn transfer_details(item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
let TimelineItemContent::Message(msg) = &mut item.content else { return };
let TimelineItemContent::Message(old_msg) = &old_item.content else { return };
let Some(in_reply_to) = &mut msg.in_reply_to else { return };
let Some(old_in_reply_to) = &old_msg.in_reply_to else { return };
if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
in_reply_to.event = old_in_reply_to.event.clone();
}
}