Fixed reconnecting for realsies now. I hope.

This commit is contained in:
Erwin Boskma 2023-10-14 16:44:23 +02:00
parent 9052d3c14b
commit ccaad6eda2
Signed by: erwin
SSH key fingerprint: SHA256:9LmFDe1C6jSrEyqxxvX8NtJBmcbB105XoqyUZF092bg

View file

@ -6,6 +6,7 @@ use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use thiserror::Error; use thiserror::Error;
use tokio::time::timeout;
use tracing::{debug, error, trace}; use tracing::{debug, error, trace};
use crate::output::{i3blocks::I3Blocks, waybar::Waybar, OutputFormat}; use crate::output::{i3blocks::I3Blocks, waybar::Waybar, OutputFormat};
@ -215,20 +216,6 @@ impl Message {
} }
} }
fn ping() -> Self {
Self {
message_type: MessageType::Ping,
..Default::default()
}
}
fn pong() -> Self {
Self {
message_type: MessageType::Pong,
..Default::default()
}
}
fn to_json(&self) -> String { fn to_json(&self) -> String {
serde_json::to_string(&self).unwrap() serde_json::to_string(&self).unwrap()
} }
@ -260,14 +247,11 @@ impl HomeAssistant {
let (mut ws_stream, _) = connect_async(&api_url).await?; let (mut ws_stream, _) = connect_async(&api_url).await?;
let mut ping_interval = tokio::time::interval(Duration::from_millis(5000));
loop { loop {
tokio::select! { match timeout(Duration::from_secs(5), ws_stream.next()).await {
msg = ws_stream.next() => match msg { Ok(msg) => match msg {
Some(msg) => match msg { Some(msg) => match msg {
Ok(msg) => { Ok(msg) => match self.handle_message(msg).await {
match self.handle_message(msg).await {
Ok(response_messages) => { Ok(response_messages) => {
for response in response_messages { for response in response_messages {
ws_stream.send(response).await? ws_stream.send(response).await?
@ -284,35 +268,32 @@ impl HomeAssistant {
"Received '{message_type}', we're currently not handling that." "Received '{message_type}', we're currently not handling that."
) )
} }
HomeAssistantError::JsonError(e) => error!("Error deserializing JSON: {e}"), HomeAssistantError::JsonError(e) => {
error!("Error deserializing JSON: {e}")
}
} }
} else { } else {
error!("{e}"); error!("{e}");
} }
} }
}
}, },
Err(e) => { Err(e) => {
error!("Error in receiving message: {e}"); error!("Error in receiving message: {e}");
// break; // break;
} }
} },
None => { None => {
// I recon we should reconnect here. Probably. // I recon we should reconnect here. Probably.
error!("Received 'None' from handle_message"); error!("Received 'None' from handle_message");
break; break;
},
},
_ = ping_interval.tick() => {
if self.auth_complete {
let mut ping_message = Message::ping();
ping_message.id = Some(self.incrementing_id());
debug!("[S] Ping!");
ws_stream.send(ping_message.to_message()).await?;
} }
}, },
Err(e) => {
error!("Timeout reading message: {e}");
// return Err(HomeAssistantError::EmptyMessage);
bail!(e);
} }
};
} }
Ok(()) Ok(())
@ -384,18 +365,6 @@ impl HomeAssistant {
vec![] vec![]
} }
MessageType::Ping => {
debug!("[R] Ping!");
let mut pong = Message::pong();
pong.id = Some(self.incrementing_id());
vec![pong.to_message()]
}
MessageType::Pong => {
debug!("[R] Pong!");
vec![]
}
_ => bail!(HomeAssistantError::UnhandledMessage( _ => bail!(HomeAssistantError::UnhandledMessage(
message.message_type.to_string() message.message_type.to_string()
)), )),