use std::{fmt::Formatter, num::NonZeroUsize, sync::Arc};
use futures_util::{future::join_all, FutureExt as _};
use matrix_sdk::{
config::RequestConfig, event_cache::paginator::PaginatorError, BoxFuture, Room,
SendOutsideWasm, SyncOutsideWasm,
};
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
use thiserror::Error;
use tracing::{debug, warn};
const MAX_CONCURRENT_REQUESTS: usize = 10;
pub struct PinnedEventsLoader {
room: Arc<dyn PinnedEventsRoom>,
max_events_to_load: usize,
}
impl PinnedEventsLoader {
pub fn new(room: Arc<dyn PinnedEventsRoom>, max_events_to_load: usize) -> Self {
Self { room, max_events_to_load }
}
pub async fn load_events(&self) -> Result<Vec<SyncTimelineEvent>, PinnedEventsLoaderError> {
let pinned_event_ids: Vec<OwnedEventId> = self
.room
.pinned_event_ids()
.into_iter()
.rev()
.take(self.max_events_to_load)
.rev()
.collect();
if pinned_event_ids.is_empty() {
return Ok(Vec::new());
}
let request_config = Some(
RequestConfig::default()
.retry_limit(3)
.max_concurrent_requests(NonZeroUsize::new(MAX_CONCURRENT_REQUESTS)),
);
let new_events = join_all(pinned_event_ids.into_iter().map(|event_id| {
let provider = self.room.clone();
async move {
match provider.load_event_with_relations(&event_id, request_config).await {
Ok((event, related_events)) => {
let mut events = vec![event];
events.extend(related_events);
Some(events)
}
Err(err) => {
warn!("error when loading pinned event: {err}");
None
}
}
}
}))
.await;
let mut loaded_events = new_events
.into_iter()
.flatten()
.flatten()
.collect::<Vec<SyncTimelineEvent>>();
if loaded_events.is_empty() {
return Err(PinnedEventsLoaderError::TimelineReloadFailed);
}
loaded_events.sort_by_key(|item| {
item.event
.deserialize()
.map(|e| e.origin_server_ts())
.unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
});
Ok(loaded_events)
}
}
pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec<SyncTimelineEvent>), PaginatorError>>;
fn pinned_event_ids(&self) -> Vec<OwnedEventId>;
fn is_pinned_event(&self, event_id: &EventId) -> bool;
}
impl PinnedEventsRoom for Room {
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
) -> BoxFuture<'a, Result<(SyncTimelineEvent, Vec<SyncTimelineEvent>), PaginatorError>> {
async move {
if let Ok((cache, _handles)) = self.event_cache().await {
if let Some(ret) = cache.event_with_relations(event_id).await {
debug!("Loaded pinned event {event_id} and related events from cache");
return Ok(ret);
}
}
debug!("Loading pinned event {event_id} from HS");
self.event(event_id, request_config)
.await
.map(|e| (e.into(), Vec::new()))
.map_err(|err| PaginatorError::SdkError(Box::new(err)))
}
.boxed()
}
fn pinned_event_ids(&self) -> Vec<OwnedEventId> {
self.clone_info().pinned_event_ids()
}
fn is_pinned_event(&self, event_id: &EventId) -> bool {
self.clone_info().is_pinned_event(event_id)
}
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for PinnedEventsLoader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PinnedEventsLoader")
.field("max_events_to_load", &self.max_events_to_load)
.finish()
}
}
#[derive(Error, Debug)]
pub enum PinnedEventsLoaderError {
#[error("No event found for the given event id.")]
EventNotFound(OwnedEventId),
#[error("Timeline focus is not pinned events.")]
TimelineFocusNotPinnedEvents,
#[error("Could not load pinned events.")]
TimelineReloadFailed,
}