Add audiooutput module

This module allows changing the fallback sink and moving active sink inputs with it
master
Lars Vierbergen 8 years ago
parent abe6990abf
commit a16146ef43
  1. 20
      daemon.py
  2. 161
      modules/audiooutput/__init__.py
  3. 65
      modules/audiooutput/naming_map.py
  4. 24
      modules/audiooutput/sink_filter.py
  5. 20
      modules/audiooutput/sink_input_filter.py
  6. 177
      modules/cycle.py

@ -2,6 +2,8 @@
import logging
import modules
import modules.core
from modules.audiooutput import PulseCtlDefaultSinkCycleAction
from modules.audiooutput import naming_map, sink_filter, sink_input_filter
#logging.basicConfig(level=logging.DEBUG)
@ -16,6 +18,24 @@ modules.Application(
action='pavucontrol',
buttons=modules.core.Button.RIGHT
),
CycleControl(
PulseCtlDefaultSinkCycleAction(
naming_map=naming_map.partial(
naming_map.foldr, [
naming_map.description,
naming_map.partial(
naming_map.drop_kwargs,
naming_map.partial(naming_map.foldr, [
naming_map.partial(naming_map.drop_first_if_eq, 'Built-in Audio '),
naming_map.first_char
])
)
]
),
sink_filter=sink_filter.hardware_only,
sink_input_filter=sink_input_filter.connected_sink
),
)
).run()

@ -0,0 +1,161 @@
from collections import OrderedDict
import pulsectl
import logging
from ..cycle import OrderedDictCycleAction
from .naming_map import description
from .sink_filter import all as sink_filter_all
from .sink_input_filter import all as sink_input_filter_all
from functools import partial
logger = logging.getLogger(__name__)
__all__ = ['PulseCtlDefaultSinkCycleAction']
class PulseProxy:
class PulseServerInfo:
def __init__(self, realobj, proxy):
self.__realobj = realobj
self.__proxy = proxy
@property
def default_sink_name(self):
if self.__proxy._fake_default_sink_name is None:
return self.__realobj.default_sink_name
elif self.__realobj.default_sink_name == self.__proxy._real_default_sink_name:
return self.__proxy._fake_default_sink_name
else:
return self.__realobj.default_sink_name
def __init__(self, name, sink_filter: callable, sink_input_filter: callable):
self.__pulse = pulsectl.Pulse(name)
self.__sink_filter = partial(sink_filter, pulse=self)
self.__sink_input_filter = partial(sink_input_filter, pulse=self)
self._fake_default_sink_name = None
self._real_default_sink_name = None
def server_info(self):
return self.PulseServerInfo(self.__pulse.server_info(), self)
@property
def __real_default_sink(self):
default_sink_name = self.__pulse.server_info().default_sink_name
return self.__find_sink_by(lambda sink: sink.name == default_sink_name)
def sink_default_set(self, value: pulsectl.PulseSinkInfo):
real_default_sink = self.__real_default_sink
if not self.__sink_filter(real_default_sink): # Current default sink is filtered out
self._fake_default_sink_name = value.name # Fake setting of default sink
self._real_default_sink_name = real_default_sink.name # Save real default sink
else: # Current default sink is not filtered out
self.__pulse.sink_default_set(value)
self._fake_default_sink_name = None
self._real_default_sink_name = None
def sink_list(self):
return list(filter(self.__sink_filter, self.__pulse.sink_list()))
def sink_info(self, *a, **k):
return self.__pulse.sink_info(*a, **k)
def sink_input_list(self):
return list(filter(self.__sink_input_filter, self.__pulse.sink_input_list()))
def sink_input_move(self, *a, **k):
return self.__pulse.sink_input_move(*a, **k)
def __find_sink_by(self, filter_: callable):
return next(filter(filter_, self.__pulse.sink_list()))
class PulseCtlDefaultSinkCycleAction(OrderedDictCycleAction):
"""
A cycle action that allows to select the pulseaudio fallback sink,
and also moves active sink inputs to that sink.
"""
def __init__(self, naming_map: callable = description, sink_filter: callable = sink_filter_all,
sink_input_filter: callable = sink_input_filter_all):
"""
:param naming_map: A function that maps a sink object to a visual representation. Must accept an arbitrary number of keyword arguments
:param sink_filter: A filter that is applied on all sinks to select the ones that should be shown.
Must accept an arbitrary number of keyword arguments
The fallback sink will only be changed when the current fallback sink is not filtered out.
:param sink_input_filter: A filter that is applied on all sink inputs to select the ones that should be moved to the new fallback sink.
Must accept an arbitrary number of keyword arguments
Sink inputs that do not match the filter are never moved
"""
self.__od = OrderedDict()
super().__init__(self.__od)
self.__pulse = PulseProxy(self.__class__.__name__, sink_filter=sink_filter,
sink_input_filter=partial(sink_input_filter, sink_filter=sink_filter))
self.__naming_func = partial(naming_map, pulse=self.__pulse)
self.__update_items()
self.current = self.__pulse.server_info().default_sink_name
@OrderedDictCycleAction.current.setter
def current(self, value):
# if self.current == value:
# return
if value not in self.__od:
return
while self.current != value:
super().next()
self.__update_default_sink()
def __update_default_sink(self):
default_sink = self.__od[self.current]
self.__pulse.sink_default_set(default_sink)
logger.debug('%s.__update_default_sink: Set default sink to %r', self.__class__.__name__, default_sink)
for sink_input in self.__pulse.sink_input_list():
try:
self.__pulse.sink_input_move(sink_input.index, default_sink.index)
logger.debug('%s.__move_sink_inputs: Moved sink input %r to sink %r', self.__class__.__name__,
sink_input, default_sink)
except pulsectl.PulseOperationFailed:
logger.exception('Failed moving sink input %d to sink %d', sink_input.index, default_sink.index)
def __update_items(self):
changed = False
sinks = self.__pulse.sink_list()
for sink in sinks:
if sink.name not in self.__od:
logger.debug('%s.__update_items: Added sink %r', self.__class__.__name__, sink)
changed = True
self.__od[sink.name] = sink
to_delete = []
for k, sink in self.__od.items():
if sink not in sinks:
logger.debug('%s.__update_items: Removed sink %r', self.__class__.__name__, sink)
to_delete.append(k)
changed = True
for k in to_delete:
del self.__od[k]
default_name = self.__pulse.server_info().default_sink_name
if self.current != default_name:
self.current = default_name
changed = True
return changed
def periodic(self):
return self.__update_items()
@property
def items(self):
self.__update_items()
return map(self.__naming_func, super().items())
def prev(self):
super().prev()
self.__update_default_sink()
def next(self):
super().next()
self.__update_default_sink()
def __str__(self):
return self.__naming_func(self.__od[self.current])

@ -0,0 +1,65 @@
import pulsectl
from functools import partial, reduce
def description(sink: pulsectl.PulseSinkInfo, **k) -> str:
"""
Uses the description property as naming
"""
return sink.description
def n_chars(n: int, s) -> str:
"""
Takes the first n characters
"""
return s[0:n]
first_char = partial(n_chars, 1)
def first_word(s: str) -> str:
"""
Takes the first word
"""
return s.split(' ')[0]
def drop_first_n_words(n: int, s: str) -> str:
"""
Removes the first word
"""
return ' '.join(s.split(' ')[n:])
drop_first_word = partial(drop_first_n_words, 1)
def drop_first_word_if_eq(drop_if: str, s: str) -> str:
return drop_first_word(s) if drop_if == first_word(s) else s
def apply(*args: [callable], **k):
fns = args[:-1]
data = args[-1]
for fn in fns:
data = fn(data, **k)
k = {}
return data
def drop_first_if_eq(drop_if: str, s: str) -> str:
return s[len(drop_if):] if s[0:len(drop_if)] == drop_if else s
def apply(x, y, **k):
return y(x, **k)
def foldr(fns, data, **kwargs):
return reduce(partial(apply, **kwargs), fns, data)
def drop_kwargs(x, *a, **k):
return x(*a)

@ -0,0 +1,24 @@
import pulsectl
__all__ = ['hardware_only', 'virtual_only', 'all']
def all(sink: pulsectl.PulseSinkInfo, **k) -> bool:
"""
Selects all output devices
"""
return True
def hardware_only(sink: pulsectl.PulseSinkInfo, **k) -> bool:
"""
Selects hardware output devices
"""
return sink.flags & 0x4 == 0x4
def virtual_only(sink: pulsectl.PulseSinkInfo, **k) -> bool:
"""
Selects virtual output devices
"""
return not hardware_only(sink)

@ -0,0 +1,20 @@
import pulsectl
import logging
__all__ = ['all', 'connected_sink']
logger = logging.getLogger(__name__)
def all(sink_input: pulsectl.PulseSinkInputInfo, **k) -> bool:
"""
Selects all output devices
"""
return True
def connected_sink(sink_input: pulsectl.PulseSinkInputInfo, sink_filter: callable, pulse: pulsectl.Pulse, **k) -> bool:
"""
Selects all output devices that are attached to a sink matching a sink filter
"""
sink = pulse.sink_info(sink_input.sink)
return sink_filter(sink)

@ -0,0 +1,177 @@
import abc
from collections import OrderedDict
from .core import AbstractControl, WrappingControl, action, Button
import logging
__all__ = ['AbstractCycleAction', 'OrderedDictCycleAction', 'CycleControl', 'ExpandedCycleControlAction']
logger = logging.getLogger(__name__)
class AbstractCycleAction(AbstractControl, metaclass=abc.ABCMeta):
"""
Base class for all cycle actions
"""
@abc.abstractmethod
def next(self):
"""
Go to the next item in the cycle
"""
pass
@abc.abstractmethod
def prev(self):
"""
Go to the previous item in the cycle
"""
pass
@abc.abstractproperty
def current(self) -> object:
"""
:return: An unique identifier for the current item in the cycle
"""
return None
@abc.abstractproperty
def items(self):
"""
:return: An iterator over all items in the cycler, as tuples of unique identifier to string representation
"""
return iter([])
def load_state(self, state):
if 'current' in state:
prev_current = self.current
while self.current != state['current']:
self.next()
if prev_current == state['current']:
logger.warning('%s.load_state: Saved item is not present in cycle.', self.__class__.__name__)
break
def dump_state(self):
return {'current': self.current}
def __str__(self):
"""
:return: The visual representation of the current item in the cycle
"""
return self.current
class OrderedDictCycleAction(AbstractCycleAction):
"""
A cycle action that cycles through an ordered dictionary.
Dictionary keys are used as unique identifiers, dictionary values are their visual represenation
"""
def __init__(self, items: OrderedDict = None):
super().__init__()
self.__items = items if items is not None else OrderedDict()
def prev(self):
# prev: a b c -> c a b
# ^ ^
# Move last item to front
for k in reversed(self.__items.keys()):
self.__items.move_to_end(k, last=False)
break
def next(self):
# next: a b c -> b c a
# ^ ^
# Move first item to back
for k in self.__items.keys():
self.__items.move_to_end(k)
break
@property
def items(self):
return iter(self.__items.items())
@property
def current(self):
for k in self.__items.keys():
return k
@property
def visible(self):
return len(self.__items) > 1
def __str__(self):
return self.__items[self.current]
class CycleControl(WrappingControl):
"""
Implements a simple cycle control.
When clicked or scrolled over it changes to the previous/next value in a circular fashion.
"""
def __init__(self, cycle_action: AbstractCycleAction):
super().__init__(cycle_action)
def respond_to(self, command: str):
if command == ':next':
self.child.next()
return True
elif command == ':prev':
self.child.prev()
return True
def __str__(self):
return action(
command=self.create_pipe_command(':next'),
button=Button.LEFT | Button.SCROLL_UP,
text=action(
command=self.create_pipe_command(':prev'),
button=Button.RIGHT | Button.SCROLL_DOWN,
text=str(self.child)
)
)
class ExpandedCycleControlAction(WrappingControl, AbstractCycleAction):
"""
An expanded cycler that always shows all items in its child cycler, separated by a separator.
Every item is clickable separately and will force a jump to that state.
"""
def __init__(self, child_action: AbstractCycleAction, separator: str = ''):
super().__init__(child_action)
self.__separator = separator
def next(self):
self.child.next()
def prev(self):
self.child.prev()
@property
def items(self):
return self.child.items
@property
def current(self):
return self.child.current
def __get_control_command(self, item_key):
return ':set:%s' % item_key
def respond_to(self, command: str):
for item_key, _ in self.items:
if command == self.__get_control_command(item_key):
prev_current = self.current
while self.current != item_key:
self.next()
if self.current == prev_current:
logger.error('%s.respond_to: Item %s not found in cycle.', self.__class__.__name__, item_key)
return True
def __str__(self):
return self.__separator.join([action(
command=self.create_pipe_command(self.__get_control_command(item_key)),
button=Button.LEFT,
text=item_value
) for item_key, item_value in self.items])
Loading…
Cancel
Save