Ping and Pong

Send a ping message every 5 seconds, and handle incoming Ping and Pong messages.
This commit is contained in:
Erwin Boskma 2022-04-19 20:28:04 +02:00
parent 9a4c243369
commit 9c21ddd35b
Signed by: erwin
GPG key ID: 270B20D17394F7E5

View file

@ -1,4 +1,4 @@
use std::{fmt::Display, io::Write}; use std::{fmt::Display, io::Write, time::Duration};
use async_tungstenite::{tokio::connect_async, tungstenite}; use async_tungstenite::{tokio::connect_async, tungstenite};
use color_eyre::{eyre::bail, Result}; use color_eyre::{eyre::bail, Result};
@ -60,13 +60,15 @@ impl HomeAssistantBuilder {
} }
pub(crate) fn build(self) -> HomeAssistant { pub(crate) fn build(self) -> HomeAssistant {
HomeAssistant::new( HomeAssistant {
self.host.unwrap(), host: self.host.unwrap(),
self.entity.unwrap(), entity: self.entity.unwrap(),
self.insecure, insecure: self.insecure,
self.token.unwrap(), token: self.token.unwrap(),
self.format, format: self.format,
) id: 1,
auth_complete: false,
}
} }
} }
@ -77,6 +79,7 @@ pub(crate) struct HomeAssistant {
host: String, host: String,
id: u64, id: u64,
format: OutputFormat, format: OutputFormat,
auth_complete: bool,
} }
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
@ -209,6 +212,18 @@ impl Message {
msg msg
} }
fn ping() -> Self {
let mut msg = Self::default();
msg.message_type = MessageType::Ping;
msg
}
fn pong() -> Self {
let mut msg = Self::default();
msg.message_type = MessageType::Pong;
msg
}
fn to_json(self) -> String { fn to_json(self) -> String {
serde_json::to_string(&self).unwrap() serde_json::to_string(&self).unwrap()
} }
@ -219,22 +234,23 @@ impl Message {
} }
impl HomeAssistant { impl HomeAssistant {
pub(crate) fn new( // pub(crate) fn new(
host: String, // host: String,
entity: String, // entity: String,
insecure: bool, // insecure: bool,
token: String, // token: String,
format: OutputFormat, // format: OutputFormat,
) -> Self { // ) -> Self {
Self { // Self {
token, // token,
entity, // entity,
insecure, // insecure,
host, // host,
format, // format,
id: 1, // id: 1,
} // auth_complete: false,
} // }
// }
pub(crate) fn builder() -> HomeAssistantBuilder { pub(crate) fn builder() -> HomeAssistantBuilder {
HomeAssistantBuilder::new() HomeAssistantBuilder::new()
@ -257,28 +273,48 @@ impl HomeAssistant {
let (mut ws_stream, _) = connect_async(&api_url).await?; let (mut ws_stream, _) = connect_async(&api_url).await?;
while let Some(msg) = ws_stream.next().await { let mut ping_interval = tokio::time::interval(Duration::from_millis(5000));
let msg = msg?;
match self.handle_message(msg).await { loop {
Ok(response_messages) => { tokio::select! {
for response in response_messages { msg = ws_stream.next() => match msg {
ws_stream.send(response).await? Some(msg) => {
} let msg = msg?;
}
Err(e) => { match self.handle_message(msg).await {
if let Some(err) = e.downcast_ref::<HomeAssistantError>() { Ok(response_messages) => {
match err { for response in response_messages {
HomeAssistantError::EmptyMessage => { ws_stream.send(response).await?
debug!("Received empty message, ignoring...") }
} }
HomeAssistantError::UnhandledMessage(message_type) => debug!( Err(e) => {
if let Some(err) = e.downcast_ref::<HomeAssistantError>() {
match err {
HomeAssistantError::EmptyMessage => {
debug!("Received empty message, ignoring...")
}
HomeAssistantError::UnhandledMessage(message_type) => {
debug!(
"Received '{message_type}', we're currently not handling that." "Received '{message_type}', we're currently not handling that."
), )
HomeAssistantError::JsonError(e) => error!("{e}"), }
HomeAssistantError::JsonError(e) => error!("{e}"),
}
} else {
error!("{e}");
}
}
} }
} else { }
error!("{e}"); None => break,
},
_ = ping_interval.tick() => {
if self.auth_complete {
let mut ping_message = Message::ping();
ping_message.id = Some(self.incrementing_id());
debug!("[S] Ping!");
ws_stream.send(ping_message.to_message()).await?;
} }
} }
} }
@ -315,6 +351,8 @@ impl HomeAssistant {
let mut get_states = Message::get_states(); let mut get_states = Message::get_states();
get_states.id = Some(self.incrementing_id()); get_states.id = Some(self.incrementing_id());
self.auth_complete = true;
vec![subscribe.to_message(), get_states.to_message()] vec![subscribe.to_message(), get_states.to_message()]
} }
MessageType::Event => { MessageType::Event => {
@ -351,6 +389,18 @@ impl HomeAssistant {
vec![] vec![]
} }
MessageType::Ping => {
debug!("[R] Ping!");
let mut pong = Message::pong();
pong.id = Some(self.incrementing_id());
vec![pong.to_message()]
}
MessageType::Pong => {
debug!("[R] Pong!");
vec![]
}
_ => bail!(HomeAssistantError::UnhandledMessage( _ => bail!(HomeAssistantError::UnhandledMessage(
message.message_type.to_string() message.message_type.to_string()
)), )),
@ -367,7 +417,7 @@ impl HomeAssistant {
let is_muted = attributes["is_volume_muted"].as_bool().unwrap_or(false); let is_muted = attributes["is_volume_muted"].as_bool().unwrap_or(false);
let volume = if is_muted { let volume = if is_muted {
String::from("[ ]") String::from("[ \u{fc5d} ]")
} else if volume_raw >= 0. { } else if volume_raw >= 0. {
format!("[{:3.0}%] ", volume_raw * 100.) format!("[{:3.0}%] ", volume_raw * 100.)
} else { } else {