use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[cfg(feature = "crossbeam-channel")]
use crossbeam_channel::Receiver;
#[cfg(not(feature = "crossbeam-channel"))]
use std::sync::mpsc::Receiver;
use crate::stream::{OutputStreamHandle, PlayError};
use crate::{queue, source::Done, Sample, Source};
use cpal::FromSample;
pub struct Sink {
queue_tx: Arc<queue::SourcesQueueInput<f32>>,
sleep_until_end: Mutex<Option<Receiver<()>>>,
controls: Arc<Controls>,
sound_count: Arc<AtomicUsize>,
detached: bool,
}
struct Controls {
pause: AtomicBool,
volume: Mutex<f32>,
stopped: AtomicBool,
speed: Mutex<f32>,
to_clear: Mutex<u32>,
}
impl Sink {
#[inline]
pub fn try_new(stream: &OutputStreamHandle) -> Result<Sink, PlayError> {
let (sink, queue_rx) = Sink::new_idle();
stream.play_raw(queue_rx)?;
Ok(sink)
}
#[inline]
pub fn new_idle() -> (Sink, queue::SourcesQueueOutput<f32>) {
let (queue_tx, queue_rx) = queue::queue(true);
let sink = Sink {
queue_tx,
sleep_until_end: Mutex::new(None),
controls: Arc::new(Controls {
pause: AtomicBool::new(false),
volume: Mutex::new(1.0),
stopped: AtomicBool::new(false),
speed: Mutex::new(1.0),
to_clear: Mutex::new(0),
}),
sound_count: Arc::new(AtomicUsize::new(0)),
detached: false,
};
(sink, queue_rx)
}
#[inline]
pub fn append<S>(&self, source: S)
where
S: Source + Send + 'static,
f32: FromSample<S::Item>,
S::Item: Sample + Send,
{
if self.controls.stopped.load(Ordering::SeqCst) {
if self.sound_count.load(Ordering::SeqCst) > 0 {
self.sleep_until_end();
}
self.controls.stopped.store(false, Ordering::SeqCst);
}
let controls = self.controls.clone();
let start_played = AtomicBool::new(false);
let source = source
.speed(1.0)
.pausable(false)
.amplify(1.0)
.skippable()
.stoppable()
.periodic_access(Duration::from_millis(5), move |src| {
if controls.stopped.load(Ordering::SeqCst) {
src.stop();
}
{
let mut to_clear = controls.to_clear.lock().unwrap();
if *to_clear > 0 {
let _ = src.inner_mut().skip();
*to_clear -= 1;
}
}
let amp = src.inner_mut().inner_mut();
amp.set_factor(*controls.volume.lock().unwrap());
amp.inner_mut()
.set_paused(controls.pause.load(Ordering::SeqCst));
amp.inner_mut()
.inner_mut()
.set_factor(*controls.speed.lock().unwrap());
start_played.store(true, Ordering::SeqCst);
})
.convert_samples();
self.sound_count.fetch_add(1, Ordering::Relaxed);
let source = Done::new(source, self.sound_count.clone());
*self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
}
#[inline]
pub fn volume(&self) -> f32 {
*self.controls.volume.lock().unwrap()
}
#[inline]
pub fn set_volume(&self, value: f32) {
*self.controls.volume.lock().unwrap() = value;
}
#[inline]
pub fn speed(&self) -> f32 {
*self.controls.speed.lock().unwrap()
}
#[inline]
pub fn set_speed(&self, value: f32) {
*self.controls.speed.lock().unwrap() = value;
}
#[inline]
pub fn play(&self) {
self.controls.pause.store(false, Ordering::SeqCst);
}
pub fn pause(&self) {
self.controls.pause.store(true, Ordering::SeqCst);
}
pub fn is_paused(&self) -> bool {
self.controls.pause.load(Ordering::SeqCst)
}
pub fn clear(&self) {
let len = self.sound_count.load(Ordering::SeqCst) as u32;
*self.controls.to_clear.lock().unwrap() = len;
self.sleep_until_end();
self.pause();
}
pub fn skip_one(&self) {
let len = self.sound_count.load(Ordering::SeqCst) as u32;
let mut to_clear = self.controls.to_clear.lock().unwrap();
if len > *to_clear {
*to_clear += 1;
}
}
#[inline]
pub fn stop(&self) {
self.controls.stopped.store(true, Ordering::SeqCst);
}
#[inline]
pub fn detach(mut self) {
self.detached = true;
}
#[inline]
pub fn sleep_until_end(&self) {
if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
let _ = sleep_until_end.recv();
}
}
#[inline]
pub fn empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn len(&self) -> usize {
self.sound_count.load(Ordering::Relaxed)
}
}
impl Drop for Sink {
#[inline]
fn drop(&mut self) {
self.queue_tx.set_keep_alive_if_empty(false);
if !self.detached {
self.controls.stopped.store(true, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use crate::buffer::SamplesBuffer;
use crate::{Sink, Source};
use std::sync::atomic::Ordering;
#[test]
fn test_pause_and_stop() {
let (sink, mut queue_rx) = Sink::new_idle();
let v = vec![10i16, -10, 20, -20, 30, -30];
sink.append(SamplesBuffer::new(1, 1, v.clone()));
let mut src = SamplesBuffer::new(1, 1, v).convert_samples();
assert_eq!(queue_rx.next(), src.next());
assert_eq!(queue_rx.next(), src.next());
sink.pause();
assert_eq!(queue_rx.next(), Some(0.0));
sink.play();
assert_eq!(queue_rx.next(), src.next());
assert_eq!(queue_rx.next(), src.next());
sink.stop();
assert_eq!(queue_rx.next(), Some(0.0));
assert_eq!(sink.empty(), true);
}
#[test]
fn test_stop_and_start() {
let (sink, mut queue_rx) = Sink::new_idle();
let v = vec![10i16, -10, 20, -20, 30, -30];
sink.append(SamplesBuffer::new(1, 1, v.clone()));
let mut src = SamplesBuffer::new(1, 1, v.clone()).convert_samples();
assert_eq!(queue_rx.next(), src.next());
assert_eq!(queue_rx.next(), src.next());
sink.stop();
assert!(sink.controls.stopped.load(Ordering::SeqCst));
assert_eq!(queue_rx.next(), Some(0.0));
src = SamplesBuffer::new(1, 1, v.clone()).convert_samples();
sink.append(SamplesBuffer::new(1, 1, v));
assert!(!sink.controls.stopped.load(Ordering::SeqCst));
let mut queue_rx = queue_rx.skip_while(|v| *v == 0.0);
assert_eq!(queue_rx.next(), src.next());
assert_eq!(queue_rx.next(), src.next());
}
#[test]
fn test_volume() {
let (sink, mut queue_rx) = Sink::new_idle();
let v = vec![10i16, -10, 20, -20, 30, -30];
sink.append(SamplesBuffer::new(2, 44100, v.clone()));
let src = SamplesBuffer::new(2, 44100, v.clone()).convert_samples();
let mut src = src.amplify(0.5);
sink.set_volume(0.5);
for _ in 0..v.len() {
assert_eq!(queue_rx.next(), src.next());
}
}
}