1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
use async_std::io;
use bevy::ecs::system::Resource;
use flume::{Receiver, Sender};
use futures::{executor::block_on, future::FutureExt, stream::StreamExt};
use libp2p::{
    core::multiaddr::{Multiaddr, Protocol},
    dcutr, gossipsub, identify, identity, noise, ping, relay,
    swarm::{NetworkBehaviour, SwarmEvent},
    tcp, yamux, PeerId,
};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, hash::Hash};
use std::{collections::hash_map::DefaultHasher, hash::Hasher};
use std::{sync::OnceLock, time::Duration};
use yoke::{Yoke, Yokeable};

#[derive(NetworkBehaviour)]
pub struct Behaviour {
    relay_client: relay::client::Behaviour,
    ping: ping::Behaviour,
    identify: identify::Behaviour,
    gossipsub: gossipsub::Behaviour,
    dcutr: dcutr::Behaviour,
}

#[derive(Resource, Clone)]
pub struct NetworkManager {
    pub events: Receiver<NetworkEvent>,
    pub command_sender: Sender<NetworkCommand>,
    pub peer_id: OnceLock<String>,
}

pub enum NetworkCommand {
    Dial(PeerId),
    Publish(Message<'static>),
}

pub enum NetworkEvent {
    Event(SwarmEvent<BehaviourEvent>),
    Message(PeerId, Yoke<(u32, Message<'static>), Vec<u8>>),
    LocalPeerID(PeerId),
}

#[derive(Serialize, Deserialize, Yokeable, Debug)]
pub enum Message<'a> {
    Chat { content: Cow<'a, str> },
}

const RELAY_ID: &'static str =
    "/dns4/nations.lol/tcp/4001/p2p/12D3KooWMh5tkGf7NDKD2FbKypGBbMyyv4TVmkXRAbWwvyftixU6";

pub async fn open_network(
    command_rx: flume::Receiver<NetworkCommand>,
    event_tx: flume::Sender<NetworkEvent>,
) {
    let relay_address: Multiaddr = RELAY_ID.parse().unwrap();
    let mut command_rx = command_rx.into_stream();
    let mut swarm = libp2p::SwarmBuilder::with_existing_identity(generate_ed25519(1))
        .with_async_std()
        .with_tcp(
            tcp::Config::default().port_reuse(true).nodelay(true),
            noise::Config::new,
            yamux::Config::default,
        )
        .unwrap()
        .with_dns()
        .await
        .unwrap()
        .with_relay_client(noise::Config::new, yamux::Config::default)
        .unwrap()
        .with_behaviour(|keypair, relay_behaviour| {
            // To content-address message, we can take the hash of message and use it as an ID.
            let message_id_fn = |message: &gossipsub::Message| {
                let mut s = DefaultHasher::new();
                message.data.hash(&mut s);
                gossipsub::MessageId::from(s.finish().to_string())
            };
            // Set a custom gossipsub configuration
            let gossipsub_config = gossipsub::ConfigBuilder::default()
                .heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
                .validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
                .message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
                .build()
                .map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`.

            // build a gossipsub network behaviour
            let gossipsub = gossipsub::Behaviour::new(
                gossipsub::MessageAuthenticity::Signed(keypair.clone()),
                gossipsub_config,
            )?;

            Ok(Behaviour {
                relay_client: relay_behaviour,
                ping: ping::Behaviour::new(ping::Config::new()),
                identify: identify::Behaviour::new(identify::Config::new(
                    "/TODO/0.0.1".to_string(),
                    keypair.public(),
                )),
                gossipsub,
                dcutr: dcutr::Behaviour::new(keypair.public().to_peer_id()),
            })
        })
        .unwrap()
        .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
        .build();

    // Create a Gossipsub topic
    let topic = gossipsub::IdentTopic::new("test-net");
    // subscribes to our topic
    swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();

    swarm
        .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
        .unwrap();

    // Wait to listen on all interfaces.
    block_on(async {
        let mut delay = futures_timer::Delay::new(std::time::Duration::from_secs(1)).fuse();
        loop {
            futures::select! {
                event = swarm.next() => {
                    match event.unwrap() {
                        SwarmEvent::NewListenAddr { address, .. } => {
                            tracing::info!(%address, "Listening on address");
                        }
                        event => panic!("{event:?}"),
                    }
                }
                _ = delay => {
                    // Likely listening on all interfaces now, thus continuing by breaking the loop.
                    break;
                }
            }
        }
    });

    // Connect to the relay server. Not for the reservation or relayed connection, but to (a) learn
    // our local public address and (b) enable a freshly started relay to learn its public address.
    swarm.dial(relay_address.clone()).unwrap();
    block_on(async {
        let mut learned_observed_addr = false;
        let mut told_relay_observed_addr = false;

        loop {
            match swarm.next().await.unwrap() {
                SwarmEvent::NewListenAddr { .. } => {}
                SwarmEvent::Dialing { .. } => {}
                SwarmEvent::ConnectionEstablished { .. } => {}
                SwarmEvent::Behaviour(BehaviourEvent::Ping(_)) => {}
                SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Sent {
                    ..
                })) => {
                    tracing::info!("Told relay its public address");
                    told_relay_observed_addr = true;
                }
                SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received {
                    info: identify::Info { observed_addr, .. },
                    ..
                })) => {
                    tracing::info!(address=%observed_addr, "Relay told us our observed address");
                    learned_observed_addr = true;
                }
                // event => panic!("{event:?}"),
                _ => {}
            }

            if learned_observed_addr && told_relay_observed_addr {
                break;
            }
        }
    });

    swarm
        .listen_on(relay_address.clone().with(Protocol::P2pCircuit))
        .unwrap();
    // match opts.mode {
    //     Mode::Dial => {
    //         swarm
    //             .dial(
    //                 opts.relay_address
    //                     .with(Protocol::P2pCircuit)
    //                     .with(Protocol::P2p(opts.remote_peer_id.unwrap())),
    //             )
    //             .unwrap();
    //     }
    //     Mode::Listen => {
    //     }
    // }

    event_tx
        .send(NetworkEvent::LocalPeerID(swarm.local_peer_id().clone()))
        .unwrap();

    block_on(async {
        loop {
            futures::select! {
                command = command_rx.select_next_some() => match command {
                    NetworkCommand::Dial(addr) => swarm.dial(
                        relay_address.clone()
                            .with(Protocol::P2pCircuit)
                            .with(Protocol::P2p(addr)),
                    ).unwrap(),
                    NetworkCommand::Publish(data) => swarm.behaviour_mut().gossipsub.publish(topic.clone(), postcard::to_stdvec(&(rand::thread_rng().gen::<u32>(), data)).unwrap()).map(|_| ()).unwrap(),
                },
                event = swarm.select_next_some() => match event {
                    SwarmEvent::ConnectionEstablished {
                        peer_id, endpoint, ..
                    } => {
                        tracing::info!(peer=%peer_id, ?endpoint, "Established new connection");
                        swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
                    }
                    SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. })) => {
                        let peer_id = message.source.unwrap();
                        let message: Yoke<(u32, Message<'static>), Vec<u8>> = Yoke::try_attach_to_cart(message.data, |c| postcard::from_bytes(c)).unwrap();
                        event_tx.send(NetworkEvent::Message(peer_id, message)).unwrap();
                    }
                    event => event_tx.send(NetworkEvent::Event(event)).unwrap()
                }
            }
        }
    })
}

fn generate_ed25519(key_id: u8) -> identity::Keypair {
    match std::fs::read(format!("key-{}.protobuf", key_id)) {
        Ok(key) => identity::Keypair::from_protobuf_encoding(&key).unwrap(),
        Err(_) => {
            let key = identity::Keypair::generate_ed25519();
            std::fs::write(
                format!("key-{}.protobuf", key_id),
                key.to_protobuf_encoding().unwrap(),
            )
            .unwrap();
            key
        }
    }
}