Audaspace: MixingThreadDevice

Fixes #135897 by introducing a mixing thread to coreaudio, making the callback lock free. At the same time these changes unify all devices that use a mixing thread (now that's jack, pulse, pipewire and coreaudio).

Pull Request: https://projects.blender.org/blender/blender/pulls/136711
This commit is contained in:
Jörg Müller
2025-04-17 14:30:34 +02:00
committed by Sebastian Parborg
parent 13d6ba1f62
commit e4792cb4e7
14 changed files with 428 additions and 622 deletions

View File

@@ -38,8 +38,8 @@ endif()
set(SRC
src/devices/DeviceManager.cpp
src/devices/MixingThreadDevice.cpp
src/devices/NULLDevice.cpp
src/devices/OpenCloseDevice.cpp
src/devices/ReadDevice.cpp
src/devices/SoftwareDevice.cpp
src/devices/ThreadedDevice.cpp
@@ -147,8 +147,8 @@ set(PUBLIC_HDR
include/devices/IDeviceFactory.h
include/devices/IDevice.h
include/devices/IHandle.h
include/devices/MixingThreadDevice.h
include/devices/NULLDevice.h
include/devices/OpenCloseDevice.h
include/devices/ReadDevice.h
include/devices/SoftwareDevice.h
include/devices/ThreadedDevice.h

View File

@@ -0,0 +1,131 @@
/*******************************************************************************
* Copyright 2009-2016 Jörg Müller
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
#pragma once
/**
* @file MixingThreadDevice.h
* @ingroup device
* The MixingThreadDevice class.
*/
#include <condition_variable>
#include <thread>
#include "devices/SoftwareDevice.h"
#include "util/RingBuffer.h"
AUD_NAMESPACE_BEGIN
/**
* This device extends the SoftwareDevice with code for running mixing in a separate thread.
*/
class AUD_PLUGIN_API MixingThreadDevice : public SoftwareDevice
{
private:
/**
* Whether there is currently playback.
*/
volatile bool m_playback{false};
/**
* The deinterleaving buffer.
*/
Buffer m_mixingBuffer;
/**
* The mixing ring buffer.
*/
RingBuffer m_ringBuffer;
/**
* Whether the device is valid.
*/
bool m_valid{false};
/**
* The mixing thread.
*/
std::thread m_mixingThread;
/**
* Mutex for mixing.
*/
std::mutex m_mixingLock;
/**
* Condition for mixing.
*/
std::condition_variable m_mixingCondition;
/**
* Updates the ring buffer.
*/
AUD_LOCAL void updateRingBuffer();
// delete copy constructor and operator=
MixingThreadDevice(const MixingThreadDevice&) = delete;
MixingThreadDevice& operator=(const MixingThreadDevice&) = delete;
protected:
/**
* Starts the streaming thread.
* @param buffersize Size of the ring buffer in bytes.
*/
void startMixingThread(size_t buffersize);
/**
* Notify the mixing thread.
*/
void notifyMixingThread();
/**
* Get ring buffer for reading.
*/
inline RingBuffer& getRingBuffer()
{
return m_ringBuffer;
}
/**
* Returns whether the thread is running or not.
*/
inline bool isMixingThreadRunning()
{
return m_valid;
}
virtual void playing(bool playing);
/**
* Called every iteration in the mixing thread before mixing.
*/
virtual void preMixingWork(bool playing);
/**
* Empty default constructor. To setup the device call the function create()
* and to uninitialize call destroy().
*/
MixingThreadDevice();
/**
* Stops all playback and notifies the mixing thread to stop.
* \warning The device has to be unlocked to not run into a deadlock.
*/
void stopMixingThread();
};
AUD_NAMESPACE_END

View File

@@ -1,116 +0,0 @@
/*******************************************************************************
* Copyright 2009-2024 Jörg Müller
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
#pragma once
/**
* @file OpenCloseDevice.h
* @ingroup devices
* The OpenCloseDevice class.
*/
#include <chrono>
#include <condition_variable>
#include <thread>
#include "devices/SoftwareDevice.h"
AUD_NAMESPACE_BEGIN
/**
* This device extends the SoftwareDevice with code for running mixing in a separate thread.
*/
class AUD_PLUGIN_API OpenCloseDevice : public SoftwareDevice
{
private:
/**
* Whether the device is opened.
*/
bool m_device_opened{false};
/**
* Whether there is currently playback.
*/
bool m_playing{false};
/**
* Whether thread released the device.
*/
bool m_delayed_close_running{false};
/**
* Thread used to release the device after time delay.
*/
std::thread m_delayed_close_thread;
/**
* Mutex to protect members accessed by multiple threads.
*/
std::mutex m_delayed_close_mutex;
/**
* Condition to close immediately. Used when object is destructed.
*/
std::condition_variable m_immediate_close_condition;
/**
* How long to wait until closing the device..
*/
std::chrono::milliseconds m_device_close_delay{10000};
/**
* Time when playback has stopped.
*/
std::chrono::time_point<std::chrono::steady_clock> m_playback_stopped_time;
/**
* Releases the device after time delay.
*/
void closeAfterDelay();
/**
* Starts the playback.
*/
AUD_LOCAL virtual void start() = 0;
/**
* Stops the playbsck.
*/
AUD_LOCAL virtual void stop() = 0;
/**
* Acquires the device.
*/
AUD_LOCAL virtual void open() = 0;
/**
* Releases the device.
*/
AUD_LOCAL virtual void close() = 0;
// delete copy constructor and operator=
OpenCloseDevice(const OpenCloseDevice&) = delete;
OpenCloseDevice& operator=(const OpenCloseDevice&) = delete;
protected:
OpenCloseDevice() = default;
void closeNow();
virtual void playing(bool playing);
};
AUD_NAMESPACE_END

View File

@@ -26,28 +26,53 @@ OSStatus CoreAudioDevice::CoreAudio_mix(void* data, AudioUnitRenderActionFlags*
{
CoreAudioDevice* device = (CoreAudioDevice*)data;
size_t sample_size = AUD_DEVICE_SAMPLE_SIZE(device->m_specs);
for(int i = 0; i < buffer_list->mNumberBuffers; i++)
{
auto& buffer = buffer_list->mBuffers[i];
device->mix((data_t*)buffer.mData, buffer.mDataByteSize / AUD_DEVICE_SAMPLE_SIZE(device->m_specs));
size_t readsamples = device->getRingBuffer().getReadSize();
size_t num_bytes = size_t(buffer.mDataByteSize);
readsamples = std::min(readsamples, num_bytes) / sample_size;
device->getRingBuffer().read((data_t*) buffer.mData, readsamples * sample_size);
if(readsamples * sample_size < num_bytes)
std::memset((data_t*) buffer.mData + readsamples * sample_size, 0, num_bytes - readsamples * sample_size);
device->notifyMixingThread();
}
return noErr;
}
void CoreAudioDevice::start()
void CoreAudioDevice::playing(bool playing)
{
AudioOutputUnitStart(m_audio_unit);
if(m_playback != playing)
{
if(playing)
AudioOutputUnitStart(m_audio_unit);
else
AudioOutputUnitStop(m_audio_unit);
}
m_playback = playing;
}
void CoreAudioDevice::stop()
CoreAudioDevice::CoreAudioDevice(DeviceSpecs specs, int buffersize) : m_buffersize(uint32_t(buffersize)), m_playback(false), m_audio_unit(nullptr)
{
AudioOutputUnitStop(m_audio_unit);
}
if(specs.channels == CHANNELS_INVALID)
specs.channels = CHANNELS_STEREO;
if(specs.format == FORMAT_INVALID)
specs.format = FORMAT_FLOAT32;
if(specs.rate == RATE_INVALID)
specs.rate = RATE_48000;
m_specs = specs;
void CoreAudioDevice::open()
{
AudioComponentDescription component_description = {};
component_description.componentType = kAudioUnitType_Output;
@@ -126,6 +151,14 @@ void CoreAudioDevice::open()
AUD_THROW(DeviceException, "The audio device couldn't be opened with CoreAudio.");
}
status = AudioUnitSetProperty(m_audio_unit, kAudioDevicePropertyBufferFrameSize, kAudioUnitScope_Input, 0, &m_buffersize, sizeof(m_buffersize));
if(status != noErr)
{
AudioComponentInstanceDispose(m_audio_unit);
AUD_THROW(DeviceException, "Could not set the buffer size for the audio device.");
}
status = AudioUnitInitialize(m_audio_unit);
if(status != noErr)
@@ -174,10 +207,24 @@ void CoreAudioDevice::open()
AudioComponentInstanceDispose(m_audio_unit);
throw;
}
/* Workaround CoreAudio quirk that makes the Clock (m_clock_ref) be in an invalid state
* after we try to re-init the device. (It is fine the first time we init the device...)
* We have to do a start/stop toggle to get it into a valid state again. */
AudioOutputUnitStart(m_audio_unit);
AudioOutputUnitStop(m_audio_unit);
create();
startMixingThread(buffersize * 2 * AUD_DEVICE_SAMPLE_SIZE(specs));
}
void CoreAudioDevice::close()
CoreAudioDevice::~CoreAudioDevice()
{
stopMixingThread();
destroy();
// NOTE: Keep the device open for buggy MacOS versions (see blender issue #121911).
if(__builtin_available(macOS 15.2, *))
{
@@ -188,29 +235,6 @@ void CoreAudioDevice::close()
}
}
CoreAudioDevice::CoreAudioDevice(DeviceSpecs specs, int buffersize) :
m_playback(false),
m_audio_unit(nullptr)
{
if(specs.channels == CHANNELS_INVALID)
specs.channels = CHANNELS_STEREO;
if(specs.format == FORMAT_INVALID)
specs.format = FORMAT_FLOAT32;
if(specs.rate == RATE_INVALID)
specs.rate = RATE_48000;
m_specs = specs;
open();
close();
create();
}
CoreAudioDevice::~CoreAudioDevice()
{
destroy();
closeNow();
}
void CoreAudioDevice::seekSynchronizer(double time)
{
if(isSynchronizerPlaying())

View File

@@ -28,19 +28,22 @@
#include <memory>
#include <AudioToolbox/AudioToolbox.h>
#include <AudioToolbox/CoreAudioClock.h>
#include <AudioUnit/AudioUnit.h>
#include "devices/OpenCloseDevice.h"
#include "devices/MixingThreadDevice.h"
AUD_NAMESPACE_BEGIN
/**
* This device plays back through CoreAudio, the Apple audio API.
*/
class AUD_PLUGIN_API CoreAudioDevice : public OpenCloseDevice
class AUD_PLUGIN_API CoreAudioDevice : public MixingThreadDevice
{
private:
uint32_t m_buffersize;
/**
* Whether there is currently playback.
*/
@@ -65,15 +68,12 @@ private:
*/
AUD_LOCAL static OSStatus CoreAudio_mix(void* data, AudioUnitRenderActionFlags* flags, const AudioTimeStamp* time_stamp, UInt32 bus_number, UInt32 number_frames, AudioBufferList* buffer_list);
AUD_LOCAL void start();
AUD_LOCAL void stop();
AUD_LOCAL void open();
AUD_LOCAL void close();
// delete copy constructor and operator=
CoreAudioDevice(const CoreAudioDevice&) = delete;
CoreAudioDevice& operator=(const CoreAudioDevice&) = delete;
void playing(bool playing) override;
public:
/**
* Opens the CoreAudio audio device for playback.

View File

@@ -25,76 +25,11 @@
AUD_NAMESPACE_BEGIN
void JackDevice::updateRingBuffers()
{
size_t size, temp;
unsigned int samplesize = AUD_SAMPLE_SIZE(m_specs);
unsigned int i, j;
unsigned int channels = m_specs.channels;
sample_t* buffer = m_buffer.getBuffer();
float* deinterleave = m_deinterleavebuf.getBuffer();
jack_transport_state_t state;
jack_position_t position;
std::unique_lock<std::mutex> lock(m_mixingLock);
while(m_valid)
{
state = AUD_jack_transport_query(m_client, &position);
// we sync either when:
// - there was a jack sync callback that requests a playing sync (either start playback or seek during playback) - caused by a m_syncCallRevision change in jack_sync
// - the jack transport state changed to stop from not stopped (i.e. external stopping) - checked here
// - the sync time changes when seeking during the stopped state - caused by a m_syncCallRevision change in jack_mix
if((m_syncCallRevision != m_lastSyncCallRevision) || (state == JackTransportStopped && m_lastState != JackTransportStopped))
{
int syncRevision = m_syncCallRevision;
float syncTime = m_syncTime;
if(m_syncFunc)
m_syncFunc(m_syncFuncData, state != JackTransportStopped, syncTime);
// we reset the ring buffers when we sync to start from the correct position
for(i = 0; i < channels; i++)
AUD_jack_ringbuffer_reset(m_ringbuffers[i]);
m_lastSyncCallRevision = syncRevision;
}
m_lastState = state;
size = AUD_jack_ringbuffer_write_space(m_ringbuffers[0]);
for(i = 1; i < channels; i++)
if((temp = AUD_jack_ringbuffer_write_space(m_ringbuffers[i])) < size)
size = temp;
while(size > samplesize)
{
size /= samplesize;
mix((data_t*)buffer, size);
for(i = 0; i < channels; i++)
{
for(j = 0; j < size; j++)
deinterleave[i * size + j] = buffer[i + j * channels];
AUD_jack_ringbuffer_write(m_ringbuffers[i], (char*)(deinterleave + i * size), size * sizeof(float));
}
size = AUD_jack_ringbuffer_write_space(m_ringbuffers[0]);
for(i = 1; i < channels; i++)
if((temp = AUD_jack_ringbuffer_write_space(m_ringbuffers[i])) < size)
size = temp;
}
m_mixingCondition.wait(lock);
}
}
int JackDevice::jack_mix(jack_nframes_t length, void* data)
{
JackDevice* device = (JackDevice*)data;
unsigned int i;
JackDevice* device = (JackDevice*) data;
int count = device->m_specs.channels;
char* buffer;
float* buffer;
jack_position_t position;
jack_transport_state_t state = AUD_jack_transport_query(device->m_client, &position);
@@ -102,7 +37,7 @@ int JackDevice::jack_mix(jack_nframes_t length, void* data)
if(state == JackTransportStarting)
{
// play silence while syncing
for(unsigned int i = 0; i < count; i++)
for(int i = 0; i < count; i++)
std::memset(AUD_jack_port_get_buffer(device->m_ports[i], length), 0, length * sizeof(float));
}
else
@@ -111,20 +46,25 @@ int JackDevice::jack_mix(jack_nframes_t length, void* data)
if((state == JackTransportRolling) && (device->m_lastMixState != JackTransportRolling))
++device->m_rollingSyncRevision;
size_t temp;
size_t readsamples = AUD_jack_ringbuffer_read_space(device->m_ringbuffers[0]);
for(i = 1; i < count; i++)
if((temp = AUD_jack_ringbuffer_read_space(device->m_ringbuffers[i])) < readsamples)
readsamples = temp;
size_t sample_size = AUD_DEVICE_SAMPLE_SIZE(device->m_specs);
readsamples = std::min(readsamples / sizeof(float), size_t(length));
size_t readsamples = device->getRingBuffer().getReadSize();
for(unsigned int i = 0; i < count; i++)
readsamples = std::min(readsamples / sample_size, static_cast<size_t>(length));
data_t* deinterleave_buffer = reinterpret_cast<data_t*>(device->m_deinterleavebuf.getBuffer());
device->getRingBuffer().read(deinterleave_buffer, readsamples * sample_size);
if(readsamples < length)
std::memset(deinterleave_buffer + readsamples * sample_size, 0, (length - readsamples) * sample_size);
for(int i = 0; i < count; i++)
{
buffer = (char*)AUD_jack_port_get_buffer(device->m_ports[i], length);
AUD_jack_ringbuffer_read(device->m_ringbuffers[i], buffer, readsamples * sizeof(float));
if(readsamples < length)
std::memset(buffer + readsamples * sizeof(float), 0, (length - readsamples) * sizeof(float));
buffer = reinterpret_cast<float*>(AUD_jack_port_get_buffer(device->m_ports[i], length));
for(int j = 0; j < length; j++)
buffer[j] = reinterpret_cast<float*>(deinterleave_buffer)[i + j * count];
}
// if we are stopped and the jack transport position changes, we need to notify the mixing thread to call the sync callback
@@ -139,7 +79,7 @@ int JackDevice::jack_mix(jack_nframes_t length, void* data)
}
}
device->m_mixingCondition.notify_all();
device->notifyMixingThread();
}
device->m_lastMixState = state;
@@ -165,7 +105,7 @@ int JackDevice::jack_sync(jack_transport_state_t state, jack_position_t* pos, vo
{
device->m_syncTime = syncTime;
++device->m_syncCallRevision;
device->m_mixingCondition.notify_all();
device->notifyMixingThread();
device->m_lastRollingSyncRevision = device->m_rollingSyncRevision;
return 0;
}
@@ -173,10 +113,38 @@ int JackDevice::jack_sync(jack_transport_state_t state, jack_position_t* pos, vo
return device->m_syncCallRevision == device->m_lastSyncCallRevision;
}
void JackDevice::preMixingWork([[maybe_unused]] bool playing)
{
jack_transport_state_t state;
jack_position_t position;
state = AUD_jack_transport_query(m_client, &position);
// we sync either when:
// - there was a jack sync callback that requests a playing sync (either start playback or seek during playback) - caused by a m_syncCallRevision change in jack_sync
// - the jack transport state changed to stop from not stopped (i.e. external stopping) - checked here
// - the sync time changes when seeking during the stopped state - caused by a m_syncCallRevision change in jack_mix
if((m_syncCallRevision != m_lastSyncCallRevision) || (state == JackTransportStopped && m_lastState != JackTransportStopped))
{
int syncRevision = m_syncCallRevision;
float syncTime = m_syncTime;
if(m_syncFunc)
m_syncFunc(m_syncFuncData, state != JackTransportStopped, syncTime);
// we reset the ring buffer when we sync to start from the correct position
getRingBuffer().reset();
m_lastSyncCallRevision = syncRevision;
}
m_lastState = state;
}
void JackDevice::jack_shutdown(void* data)
{
JackDevice* device = (JackDevice*)data;
device->m_valid = false;
device->stopMixingThread();
}
JackDevice::JackDevice(const std::string& name, DeviceSpecs specs, int buffersize)
@@ -224,19 +192,16 @@ JackDevice::JackDevice(const std::string& name, DeviceSpecs specs, int buffersiz
m_specs.rate = (SampleRate)AUD_jack_get_sample_rate(m_client);
buffersize *= sizeof(sample_t);
m_ringbuffers = new jack_ringbuffer_t*[specs.channels];
for(unsigned int i = 0; i < specs.channels; i++)
m_ringbuffers[i] = AUD_jack_ringbuffer_create(buffersize);
buffersize *= specs.channels;
if(buffersize < 0)
buffersize = AUD_jack_get_buffer_size(m_client) * 2;
buffersize *= AUD_SAMPLE_SIZE(m_specs);
m_deinterleavebuf.resize(buffersize);
m_buffer.resize(buffersize);
create();
m_lastState = JackTransportStopped;
m_lastMixState = JackTransportStopped;
m_valid = true;
m_syncFunc = nullptr;
m_syncTime = 0;
m_syncCallRevision = 0;
@@ -249,9 +214,6 @@ JackDevice::JackDevice(const std::string& name, DeviceSpecs specs, int buffersiz
{
AUD_jack_client_close(m_client);
delete[] m_ports;
for(unsigned int i = 0; i < specs.channels; i++)
AUD_jack_ringbuffer_free(m_ringbuffers[i]);
delete[] m_ringbuffers;
destroy();
AUD_THROW(DeviceException, "Client activation with JACK failed.");
@@ -267,31 +229,25 @@ JackDevice::JackDevice(const std::string& name, DeviceSpecs specs, int buffersiz
AUD_jack_free(ports);
}
m_mixingThread = std::thread(&JackDevice::updateRingBuffers, this);
startMixingThread(buffersize);
}
JackDevice::~JackDevice()
{
if(m_valid)
if(isMixingThreadRunning())
{
stopMixingThread();
AUD_jack_client_close(m_client);
m_valid = false;
}
delete[] m_ports;
m_mixingCondition.notify_all();
m_mixingThread.join();
for(unsigned int i = 0; i < m_specs.channels; i++)
AUD_jack_ringbuffer_free(m_ringbuffers[i]);
delete[] m_ringbuffers;
destroy();
}
void JackDevice::playing(bool playing)
{
// Do nothing.
MixingThreadDevice::playing(playing);
}
void JackDevice::playSynchronizer()
@@ -343,9 +299,7 @@ private:
std::string m_name;
public:
JackDeviceFactory() :
m_buffersize(AUD_DEFAULT_BUFFER_SIZE),
m_name("Audaspace")
JackDeviceFactory() : m_buffersize(-1), m_name("Audaspace")
{
m_specs.format = FORMAT_FLOAT32;
m_specs.channels = CHANNELS_STEREO;

View File

@@ -30,12 +30,10 @@
#include <condition_variable>
#include <string>
#include <thread>
#include <vector>
#include <jack/jack.h>
#include <jack/ringbuffer.h>
#include "devices/SoftwareDevice.h"
#include "devices/MixingThreadDevice.h"
#include "util/Buffer.h"
AUD_NAMESPACE_BEGIN
@@ -43,7 +41,7 @@ AUD_NAMESPACE_BEGIN
/**
* This device plays back through JACK.
*/
class AUD_PLUGIN_API JackDevice : public SoftwareDevice
class AUD_PLUGIN_API JackDevice : public MixingThreadDevice
{
private:
/**
@@ -56,23 +54,11 @@ private:
*/
jack_client_t* m_client;
/**
* The output buffer.
*/
Buffer m_buffer;
/**
* The deinterleaving buffer.
*/
Buffer m_deinterleavebuf;
jack_ringbuffer_t** m_ringbuffers;
/**
* Whether the device is valid.
*/
bool m_valid;
/**
* Invalidates the jack device.
* \param data The jack device that gets invalidet by jack.
@@ -136,25 +122,7 @@ private:
*/
void* m_syncFuncData;
/**
* The mixing thread.
*/
std::thread m_mixingThread;
/**
* Mutex for mixing.
*/
std::mutex m_mixingLock;
/**
* Condition for mixing.
*/
std::condition_variable m_mixingCondition;
/**
* Updates the ring buffers.
*/
AUD_LOCAL void updateRingBuffers();
AUD_LOCAL void preMixingWork(bool playing) override;
// delete copy constructor and operator=
JackDevice(const JackDevice&) = delete;

View File

@@ -39,6 +39,7 @@ JACK_SYMBOL(jack_on_shutdown);
JACK_SYMBOL(jack_port_register);
JACK_SYMBOL(jack_client_close);
JACK_SYMBOL(jack_get_sample_rate);
JACK_SYMBOL(jack_get_buffer_size);
JACK_SYMBOL(jack_activate);
JACK_SYMBOL(jack_get_ports);
JACK_SYMBOL(jack_port_name);

View File

@@ -29,49 +29,13 @@ AUD_NAMESPACE_BEGIN
void PipeWireDevice::handleStateChanged(void* device_ptr, enum pw_stream_state old, enum pw_stream_state state, const char* error)
{
PipeWireDevice* device = (PipeWireDevice*) device_ptr;
//fprintf(stderr, "stream state: \"%s\"\n", pw_stream_state_as_string(state));
if (state == PW_STREAM_STATE_PAUSED)
// fprintf(stderr, "stream state: \"%s\"\n", pw_stream_state_as_string(state));
if(state == PW_STREAM_STATE_PAUSED)
{
AUD_pw_stream_flush(device->m_stream, false);
}
}
void PipeWireDevice::updateRingBuffers()
{
uint32_t samplesize = AUD_DEVICE_SAMPLE_SIZE(m_specs);
sample_t* rb_data = m_ringbuffer_data.getBuffer();
uint32_t rb_size = m_ringbuffer_data.getSize();
uint32_t rb_index;
Buffer mix_buffer = Buffer(rb_size);
sample_t* mix_buffer_data = mix_buffer.getBuffer();
std::unique_lock<std::mutex> lock(m_mixingLock);
while (m_run_mixing_thread)
{
/* Get the amount of bytes available for writing. */
int32_t rb_avail = rb_size - spa_ringbuffer_get_write_index(&m_ringbuffer, &rb_index);
if (m_fill_ringbuffer && rb_avail > 0) {
/* As we allocated the ring buffer ourselves, we assume that the samplesize and
* the available bytes to read is evenly divisable.
*/
int32_t sample_count = rb_avail / samplesize;
mix(reinterpret_cast<data_t*>(mix_buffer_data), sample_count);
spa_ringbuffer_write_data(&m_ringbuffer, rb_data, rb_size, rb_index % rb_size, mix_buffer_data, rb_avail);
rb_index += rb_avail;
spa_ringbuffer_write_update(&m_ringbuffer, rb_index);
}
if (!m_fill_ringbuffer) {
/* Clear the ringbuffer when we are not playing back to make sure we don't
* keep any outdated data.
*/
spa_ringbuffer_read_update(&m_ringbuffer, rb_index);
}
m_mixingCondition.wait(lock);
}
}
void PipeWireDevice::mixAudioBuffer(void* device_ptr)
{
PipeWireDevice* device = (PipeWireDevice*) device_ptr;
@@ -104,55 +68,50 @@ void PipeWireDevice::mixAudioBuffer(void* device_ptr)
{
n_frames = SPA_MIN(pw_buf->requested, n_frames);
}
size_t readsamples = device->getRingBuffer().getReadSize() / chunk->stride;
if(readsamples < n_frames)
n_frames = readsamples;
chunk->size = n_frames * chunk->stride;
if(!device->m_fill_ringbuffer)
{
/* Queue up silence if we are not queuing up any samples.
* If we don't give Pipewire any buffers, it will think we encountered an error.
*/
memset(spa_data.data, 0, AUD_FORMAT_SIZE(device->m_specs.format) * chunk->size);
AUD_pw_stream_queue_buffer(device->m_stream, pw_buf);
return;
}
uint32_t rb_index;
spa_ringbuffer* ringbuffer = &device->m_ringbuffer;
device->getRingBuffer().read(reinterpret_cast<data_t*>(spa_data.data), chunk->size);
int32_t rb_avail = spa_ringbuffer_get_read_index(ringbuffer, &rb_index);
if (!rb_avail)
{
/* Nothing to read from the ring buffer. */
device->m_mixingCondition.notify_all();
memset(spa_data.data, 0, AUD_FORMAT_SIZE(device->m_specs.format) * chunk->size);
AUD_pw_stream_queue_buffer(device->m_stream, pw_buf);
return;
}
/* Here we assume that, if we have available space to read, that the read
* buffer size is always enough to fill the output buffer.
* This is because the PW_KEY_NODE_LATENCY property that we set should guarantee
* that pipewire can't request any bigger buffer sizes than we requested.
* (But they can be smaller)
*/
uint32_t rb_size = device->m_ringbuffer_data.getSize();
sample_t* rb_data = device->m_ringbuffer_data.getBuffer();
spa_ringbuffer_read_data(ringbuffer, rb_data, rb_size, rb_index % rb_size, spa_data.data, chunk->size);
spa_ringbuffer_read_update(ringbuffer, rb_index + chunk->size);
device->m_mixingCondition.notify_all();
device->notifyMixingThread();
AUD_pw_stream_queue_buffer(device->m_stream, pw_buf);
}
void PipeWireDevice::preMixingWork(bool playing)
{
if(!playing)
{
if((getRingBuffer().getReadSize() == 0) && m_active)
{
AUD_pw_thread_loop_lock(m_thread);
AUD_pw_stream_set_active(m_stream, false);
AUD_pw_thread_loop_unlock(m_thread);
m_active = false;
}
}
}
void PipeWireDevice::playing(bool playing)
{
AUD_pw_thread_loop_lock(m_thread);
AUD_pw_stream_set_active(m_stream, playing);
AUD_pw_thread_loop_unlock(m_thread);
m_fill_ringbuffer = playing;
/* Poke the mixing thread to ensure that it reacts to the m_fill_ringbuffer change. */
m_mixingCondition.notify_all();
std::lock_guard<ILockable> lock(*this);
MixingThreadDevice::playing(playing);
if(playing)
{
AUD_pw_thread_loop_lock(m_thread);
AUD_pw_stream_set_active(m_stream, playing);
AUD_pw_thread_loop_unlock(m_thread);
m_active = true;
}
}
PipeWireDevice::PipeWireDevice(const std::string& name, DeviceSpecs specs, int buffersize) : m_fill_ringbuffer(false), m_run_mixing_thread(true)
PipeWireDevice::PipeWireDevice(const std::string& name, DeviceSpecs specs, int buffersize)
{
if(specs.channels == CHANNELS_INVALID)
specs.channels = CHANNELS_STEREO;
@@ -200,22 +159,13 @@ PipeWireDevice::PipeWireDevice(const std::string& name, DeviceSpecs specs, int b
m_events->state_changed = PipeWireDevice::handleStateChanged;
m_events->process = PipeWireDevice::mixAudioBuffer;
pw_properties *stream_props = AUD_pw_properties_new(
PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, "Playback",
PW_KEY_MEDIA_ROLE, "Production",
NULL);
pw_properties* stream_props = AUD_pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Playback", PW_KEY_MEDIA_ROLE, "Production", NULL);
/* Set the requested sample rate and latency. */
AUD_pw_properties_setf(stream_props, PW_KEY_NODE_RATE, "1/%u", uint(m_specs.rate));
AUD_pw_properties_setf(stream_props, PW_KEY_NODE_LATENCY, "%u/%u", buffersize, uint(m_specs.rate));
m_stream = AUD_pw_stream_new_simple(
AUD_pw_thread_loop_get_loop(m_thread),
name.c_str(),
stream_props,
m_events.get(),
this);
m_stream = AUD_pw_stream_new_simple(AUD_pw_thread_loop_get_loop(m_thread), name.c_str(), stream_props, m_events.get(), this);
if(!m_stream)
{
AUD_pw_thread_loop_destroy(m_thread);
@@ -229,26 +179,21 @@ PipeWireDevice::PipeWireDevice(const std::string& name, DeviceSpecs specs, int b
uint8_t buffer[1024];
spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
const spa_pod *param = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info);
const spa_pod* param = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info);
AUD_pw_stream_connect(m_stream,
PW_DIRECTION_OUTPUT,
PW_ID_ANY,
static_cast<pw_stream_flags>(PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_INACTIVE |
PW_STREAM_FLAG_RT_PROCESS),
&param, 1);
AUD_pw_stream_connect(m_stream, PW_DIRECTION_OUTPUT, PW_ID_ANY,
static_cast<pw_stream_flags>(PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_INACTIVE | PW_STREAM_FLAG_RT_PROCESS), &param, 1);
AUD_pw_thread_loop_start(m_thread);
create();
spa_ringbuffer_init(&m_ringbuffer);
m_ringbuffer_data.resize(buffersize * AUD_DEVICE_SAMPLE_SIZE(m_specs));
m_mixingThread = std::thread(&PipeWireDevice::updateRingBuffers, this);
startMixingThread(buffersize * 2 * AUD_DEVICE_SAMPLE_SIZE(m_specs));
}
PipeWireDevice::~PipeWireDevice()
{
stopMixingThread();
/* Ensure that we are not playing back anything anymore. */
destroy();
@@ -257,14 +202,6 @@ PipeWireDevice::~PipeWireDevice()
AUD_pw_stream_destroy(m_stream);
AUD_pw_thread_loop_destroy(m_thread);
AUD_pw_deinit();
{
/* Ensure that the mixing thread exits. */
std::unique_lock<std::mutex> lock(m_mixingLock);
m_run_mixing_thread = false;
m_mixingCondition.notify_all();
}
m_mixingThread.join();
}
void PipeWireDevice::seekSynchronizer(double time)
@@ -343,7 +280,7 @@ public:
m_buffersize = buffersize;
}
virtual void setName(const std::string &name)
virtual void setName(const std::string& name)
{
m_name = name;
}

View File

@@ -26,47 +26,22 @@
* The PipeWireDevice class.
*/
#include <condition_variable>
#include <thread>
#include <pipewire/pipewire.h>
#include <spa/utils/ringbuffer.h>
#include "devices/SoftwareDevice.h"
#include "devices/MixingThreadDevice.h"
AUD_NAMESPACE_BEGIN
/**
* This device plays back through PipeWire, the simple direct media layer.
*/
class AUD_PLUGIN_API PipeWireDevice : public SoftwareDevice
class AUD_PLUGIN_API PipeWireDevice : public MixingThreadDevice
{
private:
/**
* Whether we should start filling our ringbuffer with audio.
*/
bool m_fill_ringbuffer;
pw_stream* m_stream;
pw_thread_loop* m_thread;
std::unique_ptr<pw_stream_events> m_events;
/**
* The mixing thread.
*/
std::thread m_mixingThread;
bool m_run_mixing_thread;
/**
* Mutex for mixing.
*/
std::mutex m_mixingLock;
/**
* The mixing ringbuffer and mixing data
*/
spa_ringbuffer m_ringbuffer;
Buffer m_ringbuffer_data;
std::condition_variable m_mixingCondition;
bool m_active{false};
/// Synchronizer.
bool m_getSynchronizerStartTime{false};
@@ -75,11 +50,6 @@ private:
AUD_LOCAL static void handleStateChanged(void* device_ptr, enum pw_stream_state old, enum pw_stream_state state, const char* error);
/**
* Updates the ring buffers.
*/
AUD_LOCAL void updateRingBuffers();
/**
* Mixes the next bytes into the buffer.
* \param data The PipeWire device.
@@ -91,6 +61,7 @@ private:
PipeWireDevice& operator=(const PipeWireDevice&) = delete;
protected:
void preMixingWork(bool playing);
virtual void playing(bool playing);
public:

View File

@@ -17,7 +17,6 @@
#include "PulseAudioDevice.h"
#include "Exception.h"
#include "IReader.h"
#include "PulseAudioLibrary.h"
#include "devices/DeviceManager.h"
@@ -25,50 +24,18 @@
AUD_NAMESPACE_BEGIN
void PulseAudioDevice::updateRingBuffer()
void PulseAudioDevice::preMixingWork(bool playing)
{
unsigned int samplesize = AUD_DEVICE_SAMPLE_SIZE(m_specs);
std::unique_lock<std::mutex> lock(m_mixingLock);
Buffer buffer;
while(m_valid)
if(!playing)
{
if(getRingBuffer().getReadSize() == 0 && !m_corked)
{
std::lock_guard<ILockable> device_lock(*this);
if(m_playback)
{
size_t size = m_ring_buffer.getWriteSize();
size_t sample_count = size / samplesize;
if(sample_count > 0)
{
size = sample_count * samplesize;
buffer.assureSize(size);
mix(reinterpret_cast<data_t*>(buffer.getBuffer()), sample_count);
m_ring_buffer.write(reinterpret_cast<data_t*>(buffer.getBuffer()), size);
}
}
else
{
if(m_ring_buffer.getReadSize() == 0 && !m_corked)
{
AUD_pa_threaded_mainloop_lock(m_mainloop);
AUD_pa_stream_cork(m_stream, 1, nullptr, nullptr);
AUD_pa_stream_flush(m_stream, nullptr, nullptr);
AUD_pa_threaded_mainloop_unlock(m_mainloop);
m_corked = true;
}
}
AUD_pa_threaded_mainloop_lock(m_mainloop);
AUD_pa_stream_cork(m_stream, 1, nullptr, nullptr);
AUD_pa_stream_flush(m_stream, nullptr, nullptr);
AUD_pa_threaded_mainloop_unlock(m_mainloop);
m_corked = true;
}
m_mixingCondition.wait(lock);
}
}
@@ -95,20 +62,16 @@ void PulseAudioDevice::PulseAudio_request(pa_stream* stream, size_t total_bytes,
AUD_pa_stream_begin_write(stream, reinterpret_cast<void**>(&buffer), &num_bytes);
size_t readsamples = device->m_ring_buffer.getReadSize();
size_t readsamples = device->getRingBuffer().getReadSize();
readsamples = std::min(readsamples, size_t(num_bytes)) / sample_size;
device->m_ring_buffer.read(buffer, readsamples * sample_size);
device->getRingBuffer().read(buffer, readsamples * sample_size);
if(readsamples * sample_size < num_bytes)
std::memset(buffer + readsamples * sample_size, 0, num_bytes - readsamples * sample_size);
if(device->m_mixingLock.try_lock())
{
device->m_mixingCondition.notify_all();
device->m_mixingLock.unlock();
}
device->notifyMixingThread();
AUD_pa_stream_write(stream, reinterpret_cast<void*>(buffer), num_bytes, nullptr, 0, PA_SEEK_RELATIVE);
@@ -120,7 +83,7 @@ void PulseAudioDevice::playing(bool playing)
{
std::lock_guard<ILockable> lock(*this);
m_playback = playing;
MixingThreadDevice::playing(playing);
if(playing)
{
@@ -131,8 +94,7 @@ void PulseAudioDevice::playing(bool playing)
}
}
PulseAudioDevice::PulseAudioDevice(const std::string& name, DeviceSpecs specs, int buffersize) :
m_playback(false), m_corked(true), m_state(PA_CONTEXT_UNCONNECTED), m_valid(true), m_underflows(0)
PulseAudioDevice::PulseAudioDevice(const std::string& name, DeviceSpecs specs, int buffersize) : m_corked(true), m_state(PA_CONTEXT_UNCONNECTED), m_underflows(0)
{
m_mainloop = AUD_pa_threaded_mainloop_new();
@@ -243,8 +205,6 @@ PulseAudioDevice::PulseAudioDevice(const std::string& name, DeviceSpecs specs, i
buffer_attr.prebuf = -1U;
buffer_attr.tlength = buffersize;
m_ring_buffer.resize(buffersize);
if(AUD_pa_stream_connect_playback(m_stream, nullptr, &buffer_attr, static_cast<pa_stream_flags_t>(PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE), nullptr, nullptr) < 0)
{
AUD_pa_threaded_mainloop_unlock(m_mainloop);
@@ -262,18 +222,12 @@ PulseAudioDevice::PulseAudioDevice(const std::string& name, DeviceSpecs specs, i
create();
m_mixingThread = std::thread(&PulseAudioDevice::updateRingBuffer, this);
startMixingThread(buffersize);
}
PulseAudioDevice::~PulseAudioDevice()
{
m_valid = false;
m_mixingLock.lock();
m_mixingCondition.notify_all();
m_mixingLock.unlock();
m_mixingThread.join();
stopMixingThread();
AUD_pa_threaded_mainloop_stop(m_mainloop);

View File

@@ -26,27 +26,18 @@
* The PulseAudioDevice class.
*/
#include "devices/SoftwareDevice.h"
#include "util/RingBuffer.h"
#include <condition_variable>
#include <thread>
#include <pulse/pulseaudio.h>
#include "devices/MixingThreadDevice.h"
AUD_NAMESPACE_BEGIN
/**
* This device plays back through PulseAudio, the simple direct media layer.
*/
class AUD_PLUGIN_API PulseAudioDevice : public SoftwareDevice
class AUD_PLUGIN_API PulseAudioDevice : public MixingThreadDevice
{
private:
/**
* Whether there is currently playback.
*/
volatile bool m_playback;
bool m_corked;
pa_threaded_mainloop* m_mainloop;
@@ -54,42 +45,14 @@ private:
pa_stream* m_stream;
pa_context_state_t m_state;
/**
* The mixing ring buffer.
*/
RingBuffer m_ring_buffer;
/**
* Whether the device is valid.
*/
bool m_valid;
int m_buffersize;
uint32_t m_underflows;
/**
* The mixing thread.
*/
std::thread m_mixingThread;
/**
* Mutex for mixing.
*/
std::mutex m_mixingLock;
/**
* Condition for mixing.
*/
std::condition_variable m_mixingCondition;
/// Synchronizer.
pa_usec_t m_synchronizerStartTime{0};
double m_synchronizerStartPosition{0.0};
/**
* Updates the ring buffer.
*/
AUD_LOCAL void updateRingBuffer();
AUD_LOCAL void preMixingWork(bool playing) override;
/**
* Reports the state of the PulseAudio server connection.

View File

@@ -0,0 +1,99 @@
/*******************************************************************************
* Copyright 2009-2016 Jörg Müller
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
#include "devices/MixingThreadDevice.h"
AUD_NAMESPACE_BEGIN
void MixingThreadDevice::updateRingBuffer()
{
unsigned int samplesize = AUD_DEVICE_SAMPLE_SIZE(m_specs);
std::unique_lock<std::mutex> lock(m_mixingLock);
while(m_valid)
{
{
std::lock_guard<ILockable> device_lock(*this);
preMixingWork(m_playback);
if(m_playback)
{
size_t size = m_ringBuffer.getWriteSize();
size_t sample_count = size / samplesize;
while(sample_count > 0)
{
size = sample_count * samplesize;
mix(reinterpret_cast<data_t*>(m_mixingBuffer.getBuffer()), sample_count);
m_ringBuffer.write(reinterpret_cast<data_t*>(m_mixingBuffer.getBuffer()), size);
sample_count = m_ringBuffer.getWriteSize() / samplesize;
}
}
}
m_mixingCondition.wait(lock);
}
}
void MixingThreadDevice::startMixingThread(size_t buffersize)
{
m_mixingBuffer.resize(buffersize);
m_ringBuffer.resize(buffersize);
m_valid = true;
m_mixingThread = std::thread(&MixingThreadDevice::updateRingBuffer, this);
}
void MixingThreadDevice::notifyMixingThread()
{
m_mixingCondition.notify_all();
}
void MixingThreadDevice::playing(bool playing)
{
std::lock_guard<ILockable> lock(*this);
m_playback = playing;
if(playing)
notifyMixingThread();
}
void MixingThreadDevice::preMixingWork(bool playing)
{
}
MixingThreadDevice::MixingThreadDevice()
{
}
void aud::MixingThreadDevice::stopMixingThread()
{
m_valid = false;
m_mixingCondition.notify_all();
m_mixingThread.join();
}
AUD_NAMESPACE_END

View File

@@ -1,80 +0,0 @@
/*******************************************************************************
* Copyright 2009-2024 Jörg Müller
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
#include "devices/OpenCloseDevice.h"
AUD_NAMESPACE_BEGIN
void OpenCloseDevice::closeAfterDelay()
{
std::unique_lock<std::mutex> lock(m_delayed_close_mutex);
m_immediate_close_condition.wait_until(lock, m_playback_stopped_time + m_device_close_delay);
m_delayed_close_running = false;
if(m_playing)
return;
close();
m_device_opened = false;
}
void OpenCloseDevice::closeNow()
{
if(m_delayed_close_thread.joinable())
{
m_immediate_close_condition.notify_all();
m_delayed_close_thread.join();
}
}
void OpenCloseDevice::playing(bool playing)
{
std::lock_guard<std::mutex> lock(m_delayed_close_mutex);
if(m_playing != playing)
{
m_playing = playing;
if(playing)
{
if(!m_device_opened)
{
open();
m_device_opened = true;
}
start();
}
else
{
stop();
m_playback_stopped_time = std::chrono::steady_clock::now();
if(m_device_opened && !m_delayed_close_running)
{
if(m_delayed_close_thread.joinable())
m_delayed_close_thread.join();
m_delayed_close_running = true;
m_delayed_close_thread = std::thread(&OpenCloseDevice::closeAfterDelay, this);
}
}
}
}
AUD_NAMESPACE_END