use async_std::io;
use bevy::ecs::system::Resource;
use flume::{Receiver, Sender};
use futures::{executor::block_on, future::FutureExt, stream::StreamExt};
use libp2p::{
core::multiaddr::{Multiaddr, Protocol},
dcutr, gossipsub, identify, identity, noise, ping, relay,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, PeerId,
};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, hash::Hash};
use std::{collections::hash_map::DefaultHasher, hash::Hasher};
use std::{sync::OnceLock, time::Duration};
use yoke::{Yoke, Yokeable};
#[derive(NetworkBehaviour)]
pub struct Behaviour {
relay_client: relay::client::Behaviour,
ping: ping::Behaviour,
identify: identify::Behaviour,
gossipsub: gossipsub::Behaviour,
dcutr: dcutr::Behaviour,
}
#[derive(Resource, Clone)]
pub struct NetworkManager {
pub events: Receiver<NetworkEvent>,
pub command_sender: Sender<NetworkCommand>,
pub peer_id: OnceLock<String>,
}
pub enum NetworkCommand {
Dial(PeerId),
Publish(Message<'static>),
}
pub enum NetworkEvent {
Event(SwarmEvent<BehaviourEvent>),
Message(PeerId, Yoke<(u32, Message<'static>), Vec<u8>>),
LocalPeerID(PeerId),
}
#[derive(Serialize, Deserialize, Yokeable, Debug)]
pub enum Message<'a> {
Chat { content: Cow<'a, str> },
}
const RELAY_ID: &'static str =
"/dns4/nations.lol/tcp/4001/p2p/12D3KooWMh5tkGf7NDKD2FbKypGBbMyyv4TVmkXRAbWwvyftixU6";
pub async fn open_network(
command_rx: flume::Receiver<NetworkCommand>,
event_tx: flume::Sender<NetworkEvent>,
) {
let relay_address: Multiaddr = RELAY_ID.parse().unwrap();
let mut command_rx = command_rx.into_stream();
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(generate_ed25519(1))
.with_async_std()
.with_tcp(
tcp::Config::default().port_reuse(true).nodelay(true),
noise::Config::new,
yamux::Config::default,
)
.unwrap()
.with_dns()
.await
.unwrap()
.with_relay_client(noise::Config::new, yamux::Config::default)
.unwrap()
.with_behaviour(|keypair, relay_behaviour| {
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(message_id_fn) .build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config,
)?;
Ok(Behaviour {
relay_client: relay_behaviour,
ping: ping::Behaviour::new(ping::Config::new()),
identify: identify::Behaviour::new(identify::Config::new(
"/TODO/0.0.1".to_string(),
keypair.public(),
)),
gossipsub,
dcutr: dcutr::Behaviour::new(keypair.public().to_peer_id()),
})
})
.unwrap()
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
let topic = gossipsub::IdentTopic::new("test-net");
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
swarm
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
block_on(async {
let mut delay = futures_timer::Delay::new(std::time::Duration::from_secs(1)).fuse();
loop {
futures::select! {
event = swarm.next() => {
match event.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
tracing::info!(%address, "Listening on address");
}
event => panic!("{event:?}"),
}
}
_ = delay => {
break;
}
}
}
});
swarm.dial(relay_address.clone()).unwrap();
block_on(async {
let mut learned_observed_addr = false;
let mut told_relay_observed_addr = false;
loop {
match swarm.next().await.unwrap() {
SwarmEvent::NewListenAddr { .. } => {}
SwarmEvent::Dialing { .. } => {}
SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::Behaviour(BehaviourEvent::Ping(_)) => {}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Sent {
..
})) => {
tracing::info!("Told relay its public address");
told_relay_observed_addr = true;
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received {
info: identify::Info { observed_addr, .. },
..
})) => {
tracing::info!(address=%observed_addr, "Relay told us our observed address");
learned_observed_addr = true;
}
_ => {}
}
if learned_observed_addr && told_relay_observed_addr {
break;
}
}
});
swarm
.listen_on(relay_address.clone().with(Protocol::P2pCircuit))
.unwrap();
event_tx
.send(NetworkEvent::LocalPeerID(swarm.local_peer_id().clone()))
.unwrap();
block_on(async {
loop {
futures::select! {
command = command_rx.select_next_some() => match command {
NetworkCommand::Dial(addr) => swarm.dial(
relay_address.clone()
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(addr)),
).unwrap(),
NetworkCommand::Publish(data) => swarm.behaviour_mut().gossipsub.publish(topic.clone(), postcard::to_stdvec(&(rand::thread_rng().gen::<u32>(), data)).unwrap()).map(|_| ()).unwrap(),
},
event = swarm.select_next_some() => match event {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
tracing::info!(peer=%peer_id, ?endpoint, "Established new connection");
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. })) => {
let peer_id = message.source.unwrap();
let message: Yoke<(u32, Message<'static>), Vec<u8>> = Yoke::try_attach_to_cart(message.data, |c| postcard::from_bytes(c)).unwrap();
event_tx.send(NetworkEvent::Message(peer_id, message)).unwrap();
}
event => event_tx.send(NetworkEvent::Event(event)).unwrap()
}
}
}
})
}
fn generate_ed25519(key_id: u8) -> identity::Keypair {
match std::fs::read(format!("key-{}.protobuf", key_id)) {
Ok(key) => identity::Keypair::from_protobuf_encoding(&key).unwrap(),
Err(_) => {
let key = identity::Keypair::generate_ed25519();
std::fs::write(
format!("key-{}.protobuf", key_id),
key.to_protobuf_encoding().unwrap(),
)
.unwrap();
key
}
}
}