commit
8a899f53ea
@ -0,0 +1 @@ |
||||
__pycache__/ |
@ -0,0 +1,5 @@ |
||||
#!/bin/bash |
||||
$(echo "$1" > "$2")& |
||||
quit_proc="$!" |
||||
sleep 1 |
||||
kill -9 "$quit_proc" |
@ -0,0 +1,21 @@ |
||||
#!/usr/bin/env python3 |
||||
import logging |
||||
import modules |
||||
import modules.core |
||||
|
||||
#logging.basicConfig(level=logging.DEBUG) |
||||
|
||||
modules.Application( |
||||
modules.GroupedControl( |
||||
modules.CaffeineControl(), |
||||
modules.RedshiftControl(), |
||||
separator='' |
||||
), |
||||
modules.ActionWrapperControl( |
||||
modules.VolumeControl(), |
||||
action='pavucontrol', |
||||
buttons=modules.core.Button.RIGHT |
||||
), |
||||
).run() |
||||
|
||||
|
@ -0,0 +1,5 @@ |
||||
from .application import Application |
||||
from .core import GroupedControl, ActionWrapperControl |
||||
from .caffeine import CaffeineControl |
||||
from .redshift import RedshiftControl |
||||
from .volume import VolumeControl |
@ -0,0 +1,153 @@ |
||||
import argparse |
||||
import pickle |
||||
import signal |
||||
import logging |
||||
|
||||
import time |
||||
import traceback |
||||
|
||||
from .core import GroupedControl |
||||
from .util import QuitControl, ChildReaperControl |
||||
import os |
||||
import stat |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class CreateFileType(argparse.FileType): |
||||
def __init__(self, mode='r', bufsize=-1, encoding=None, errors=None): |
||||
super().__init__(mode, bufsize, encoding, errors) |
||||
self.__mode = mode |
||||
self.__bufsize = bufsize |
||||
self.__encoding = encoding |
||||
self.__errors = errors |
||||
|
||||
def __call__(self, string): |
||||
try: |
||||
return super().__call__(string) |
||||
except argparse.ArgumentTypeError as e: |
||||
return open(string, 'wb' if 'b' in self.__mode else 'w', self.__bufsize, self.__encoding, self.__errors) |
||||
|
||||
|
||||
class PipeFileType(argparse.FileType): |
||||
def __init__(self, *args, lazy=False, **kwargs): |
||||
super().__init__(*args, **kwargs) |
||||
self.lazy = lazy |
||||
|
||||
def __call__(self, string): |
||||
try: |
||||
mode = os.stat(string).st_mode |
||||
if not stat.S_ISFIFO(mode): |
||||
raise argparse.ArgumentTypeError('%s is not a fifo' % strign) |
||||
except FileNotFoundError: |
||||
pass |
||||
if not self.lazy or string == '-': |
||||
return super().__call__(string) |
||||
else: |
||||
return LazyFile(string, self._mode, self._bufsize, self._encoding, self._errors) |
||||
|
||||
class LazyFile: |
||||
def __init__(self, name, *args): |
||||
self.name = name |
||||
self.__args = args |
||||
self.__file = None |
||||
def __open(self): |
||||
if self.__file is None: |
||||
self.__file = open(self.name, *self.__args) |
||||
return self.__file |
||||
def close(self): |
||||
return self.__open().close() |
||||
@property |
||||
def closed(self): |
||||
return self.__open().closed |
||||
def fileno(self): |
||||
return self.__open().fileno() |
||||
def flush(self): |
||||
return self.__open().flush() |
||||
def isatty(self): |
||||
return self.__open().isatty() |
||||
def read(self, *a): |
||||
return self.__open().read(*a) |
||||
def readable(self): |
||||
return self.__open().readable() |
||||
def readline(self): |
||||
return self.__open().readline() |
||||
def readlines(self): |
||||
return self.__open().readlines() |
||||
def seek(self): |
||||
return self.__open().seek() |
||||
def seekable(self): |
||||
return self.__open().seekable() |
||||
def tell(self): |
||||
return self.__open().tell() |
||||
def truncate(self): |
||||
return self.__open().truncate() |
||||
def writable(self): |
||||
return self.__open().writable() |
||||
def write(self, *a): |
||||
return self.__open().write(*a) |
||||
def writelines(self, *a): |
||||
return self.__open().writelines(*a) |
||||
|
||||
|
||||
class Application(GroupedControl): |
||||
def __init__(self, *modules, **kwargs): |
||||
super().__init__(ChildReaperControl(), *modules, **kwargs) |
||||
|
||||
def configure(self, argument_parser): |
||||
argument_parser.add_argument('output_pipe', type=PipeFileType('w', bufsize=1, lazy=True)) |
||||
argument_parser.add_argument('command_pipe', type=PipeFileType('r', bufsize=1, lazy=True)) |
||||
argument_parser.add_argument('--state-file', type=CreateFileType('r+b')) |
||||
super().configure(argument_parser) |
||||
|
||||
def bind_arguments(self, args): |
||||
super().bind_arguments(args) |
||||
if args.state_file is not None and args.state_file.readable(): |
||||
try: |
||||
state = pickle.load(args.state_file) |
||||
logger.info("Loaded state: %r" % state) |
||||
self.load_state_ex(state) |
||||
except: |
||||
traceback.print_exc() |
||||
|
||||
def cleanup(self): |
||||
if self.args.state_file is not None: |
||||
self.args.state_file.seek(0) |
||||
self.args.state_file.truncate() |
||||
state = self.dump_state_ex() |
||||
logger.info("Dumped state: %r" % state) |
||||
pickle.dump(state, self.args.state_file) |
||||
self.args.state_file.close() |
||||
|
||||
super().cleanup() |
||||
|
||||
def respond_to(self, command): |
||||
if command == '': |
||||
return False |
||||
logger.info('Received command %s', command) |
||||
return super().respond_to(command) |
||||
|
||||
def handle_signal(self, signal, tb): |
||||
raise Exception("Received signal %s"%signal) |
||||
|
||||
|
||||
def run(self): |
||||
parser = argparse.ArgumentParser(description='Action manager for xmobar') |
||||
|
||||
self.configure(parser) |
||||
self.bind_arguments(parser.parse_args()) |
||||
|
||||
for sig in {signal.SIGHUP, signal.SIGINT, signal.SIGQUIT, signal.SIGTERM}: |
||||
signal.signal(sig, self.handle_signal) |
||||
|
||||
try: |
||||
self.args.output_pipe.writelines(str(self)+"\n") |
||||
while True: |
||||
if self.respond_to(str.rstrip(self.args.command_pipe.readline())) or self.periodic(): |
||||
self.args.output_pipe.writelines(str(self) + "\n") |
||||
else: |
||||
time.sleep(1) |
||||
except BaseException as e: |
||||
logger.exception('Received exception, shutting down') |
||||
finally: |
||||
self.cleanup() |
@ -0,0 +1,56 @@ |
||||
import subprocess |
||||
import logging |
||||
|
||||
import time |
||||
|
||||
from .core import AbstractControl, action |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class CaffeineControl(AbstractControl): |
||||
def __init__(self): |
||||
super().__init__() |
||||
self.caffeine_enabled = False |
||||
self._activity_proc = None |
||||
self._activity_proc_lastrun = 0 |
||||
|
||||
@property |
||||
def enabled(self): |
||||
return self.args.caffeine_enabled |
||||
|
||||
def configure(self, argument_parser): |
||||
argument_parser.add_argument('--caffeine-enabled', help='Use the caffeine module', action='store_true') |
||||
argument_parser.add_argument('--caffeine-timeout', |
||||
help='Time between user activity reports to xscreensaver (in seconds)', |
||||
type=int, |
||||
default=10) |
||||
|
||||
def respond_to(self, command): |
||||
if command == 'caffeine': |
||||
self.caffeine_enabled = not self.caffeine_enabled |
||||
logger.info("Set caffeine enabled %r", self.caffeine_enabled) |
||||
return True |
||||
|
||||
def __str__(self): |
||||
return action(self.create_pipe_command('caffeine'), 'C' if self.caffeine_enabled else 'c') |
||||
|
||||
def periodic(self): |
||||
if self._activity_proc is not None: |
||||
if self._activity_proc.returncode is None: |
||||
self._activity_proc.poll() |
||||
else: |
||||
logger.debug("Reaped subprocess: %s", self._activity_proc) |
||||
self._activity_proc = None |
||||
|
||||
if self.caffeine_enabled and self._activity_proc is None: |
||||
if self._activity_proc_lastrun + self.args.caffeine_timeout < time.time(): |
||||
logger.debug("Poking screensaver") |
||||
self._activity_proc = subprocess.Popen(['xscreensaver-command', '-deactivate'], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
||||
self._activity_proc_lastrun = time.time() |
||||
|
||||
def dump_state(self): |
||||
return dict(caffeine_enabled=self.caffeine_enabled) |
||||
|
||||
def load_state(self, state): |
||||
self.caffeine_enabled = state['caffeine_enabled'] |
@ -0,0 +1,366 @@ |
||||
import abc |
||||
import os |
||||
import argparse |
||||
|
||||
import sys |
||||
import enum |
||||
import logging |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class AbstractControl(metaclass=abc.ABCMeta): |
||||
""" |
||||
Base class for all modules |
||||
""" |
||||
|
||||
def __init__(self): |
||||
""" |
||||
Creates a new control |
||||
""" |
||||
self.args = None |
||||
|
||||
@property |
||||
def visible(self): |
||||
""" |
||||
Determines if the module is visible in the command bar |
||||
|
||||
:return: bool |
||||
""" |
||||
return self.enabled |
||||
|
||||
@property |
||||
def enabled(self): |
||||
""" |
||||
Determines if the module is enabled. |
||||
|
||||
Disabled modules do not receive any signals. |
||||
:return: bool |
||||
""" |
||||
return True |
||||
|
||||
def configure(self, argument_parser: argparse.ArgumentParser): |
||||
""" |
||||
Configures the argument parser to accept extra arguments handled by this module |
||||
|
||||
:param argument_parser: The argument parser to add arguments to |
||||
:return: void |
||||
""" |
||||
pass |
||||
|
||||
def bind_arguments(self, args: argparse.Namespace): |
||||
""" |
||||
Binds arguments delivered by the argument parser to this module. |
||||
|
||||
When overriding this method, the parent function always has to be called. |
||||
|
||||
:param args: The Namespace object returned by ArgumentParser.parse_args() |
||||
:return: void |
||||
""" |
||||
self.args = args |
||||
|
||||
def periodic(self): |
||||
""" |
||||
Called periodically during the runtime of the daemon. |
||||
|
||||
Actions that are independent of user input should be handled here, |
||||
use respond_to() to handle user input. |
||||
|
||||
Will only be called for modules that report to be enabled |
||||
|
||||
:return: bool Whether the displayed information is changed by the executed operations. |
||||
""" |
||||
return False |
||||
|
||||
def cleanup(self): |
||||
""" |
||||
Called during the shutdown of the daemon to clean up the module's resources. |
||||
|
||||
Will only be called for modules that report to be enabled |
||||
:return: void |
||||
""" |
||||
pass |
||||
|
||||
def respond_to(self, command: str): |
||||
""" |
||||
Responds to a user command |
||||
|
||||
Actions that are dependent on user commands should be handled here, |
||||
use periodic() to handle periodic actions independent of user input |
||||
|
||||
Will only be called for modules that report to be enabled |
||||
:param command: The command received from the user |
||||
:return: bool Whether the displayed information is changed by the executed operations. |
||||
""" |
||||
return False |
||||
|
||||
def __str__(self): |
||||
""" |
||||
Creates the string representation of the module to show on the action bar |
||||
|
||||
Will only be called for modules that report to be visible |
||||
:return: str |
||||
""" |
||||
return super().__str__() |
||||
|
||||
def load_state(self, state): |
||||
""" |
||||
Loads state previously saved by this module |
||||
|
||||
State is loaded once at daemon startup, if there is a statefile present. |
||||
|
||||
Will only be called for modules that report to be enabled |
||||
:param state: Whatever was returned by dump_state() on the previous run |
||||
:return: void |
||||
""" |
||||
pass |
||||
|
||||
def load_state_ex(self, state: dict): |
||||
""" |
||||
Loads all state previously saved |
||||
|
||||
Usually this method filters the state of this module out of the global state to pass to load_state() |
||||
|
||||
Controls that wrap other controls typically want to override this function to pass the whole |
||||
state to their children to avoid putting state in containers named after te wrapper. |
||||
|
||||
:param state: Dictionary containing the saved state |
||||
:return: void |
||||
""" |
||||
if self.__class__.__name__ in state: |
||||
self.load_state(state[self.__class__.__name__]) |
||||
|
||||
def dump_state(self): |
||||
""" |
||||
Dumps current state of this module |
||||
|
||||
State is dumped once at daemon shutdown, if there is a statefile present. |
||||
|
||||
Will only be called for modules that report to be enabled |
||||
:return: any pickleable object representing the state of this module |
||||
""" |
||||
return None |
||||
|
||||
def dump_state_ex(self): |
||||
""" |
||||
Dumps state of this module including unique identifying information for this module |
||||
|
||||
Controls that wrap other controls typically want to override this function to dump the whole |
||||
state of their children to avoid putting state in containers named after te wrapper. |
||||
|
||||
Will only be called for modules that report to be enabled |
||||
:return: dict |
||||
""" |
||||
return {self.__class__.__name__: self.dump_state()} |
||||
|
||||
def create_pipe_command(self, command: str): |
||||
""" |
||||
Creates a shell command that will pass :command to the daemon through the controlpipe |
||||
|
||||
:param command: The command to pass |
||||
:return: str Shell command that will pass the given command through the controlpipe |
||||
""" |
||||
return '{}/command.sh {} {}'.format(os.path.abspath(sys.path[0]), command, |
||||
os.path.abspath(self.args.command_pipe.name)) |
||||
|
||||
|
||||
class GroupedControl(AbstractControl): |
||||
""" |
||||
Groups a set of modules into one module, separated by a given string |
||||
""" |
||||
|
||||
def __init__(self, *modules, separator=' | '): |
||||
""" |
||||
Creates a new grouped control |
||||
|
||||
:param modules: Modules to group in this module, in the order they should be displayed |
||||
:param separator: Separator string used inbetween two modules |
||||
""" |
||||
super().__init__() |
||||
self.__modules = modules |
||||
self.__separator = separator |
||||
|
||||
def bind_arguments(self, args): |
||||
super().bind_arguments(args) |
||||
[m.bind_arguments(args) for m in self.__modules] |
||||
|
||||
@property |
||||
def enabled(self): |
||||
return any([m.enabled for m in self.__modules]) |
||||
|
||||
@property |
||||
def visible(self): |
||||
return any([m.visible for m in self.__modules if m.enabled]) |
||||
|
||||
def configure(self, argument_parser): |
||||
[m.configure(argument_parser) for m in self.__modules] |
||||
|
||||
def load_state_ex(self, state): |
||||
[m.load_state_ex(state) for m in self.__modules if m.enabled] |
||||
|
||||
def cleanup(self): |
||||
[m.cleanup() for m in self.__modules if m.enabled] |
||||
|
||||
def respond_to(self, command): |
||||
return any([m.respond_to(command) for m in self.__modules if m.enabled]) |
||||
|
||||
def periodic(self): |
||||
return any([m.periodic() for m in self.__modules if m.enabled]) |
||||
|
||||
def dump_state_ex(self): |
||||
data = dict() |
||||
for m in self.__modules: |
||||
if m.enabled: |
||||
data.update(m.dump_state_ex()) |
||||
return data |
||||
|
||||
def __passthrough_log(self, fn, s): |
||||
logger.debug('%s.%s(): %s', self.__class__.__name__, fn, s) |
||||
return s |
||||
|
||||
def __str__(self): |
||||
return self.__separator.join([self.__passthrough_log('__str__', str(m)) for m in self.__modules if m.visible]) |
||||
|
||||
|
||||
def action(command, text, **kwargs): |
||||
""" |
||||
Creates an xmobar action tag |
||||
|
||||
:param command: The action (command to execute on click) |
||||
:param text: The text where the action is applied upon |
||||
:param kwargs: Extra parameters for the action tag (known case: button) |
||||
:return: str The xmobar action tag |
||||
""" |
||||
return '<action=`{}`{}>{}</action>'.format(command, ' ' + ( |
||||
' '.join(['{}={}'.format(k, v) for k, v in kwargs.items()])) if len(kwargs) else '', text) |
||||
|
||||
|
||||
class WrappingControl(AbstractControl): |
||||
""" |
||||
Generic wrapper for a module |
||||
|
||||
All method calls are passed through to the child module. |
||||
This wrapper does not affect the state, it is passed through cleanly |
||||
""" |
||||
|
||||
def __init__(self, child_control: AbstractControl) -> None: |
||||
""" |
||||
Creates a new module wrapper |
||||
|
||||
:param child_control: The child module to wrap |
||||
""" |
||||
super().__init__() |
||||
self.child = child_control |
||||
|
||||
def cleanup(self): |
||||
self.child.cleanup() |
||||
|
||||
def dump_state_ex(self): |
||||
return self.child.dump_state_ex() |
||||
|
||||
def load_state(self, state): |
||||
self.child.load_state(state) |
||||
|
||||
def respond_to(self, command): |
||||
return self.child.respond_to(command) |
||||
|
||||
@property |
||||
def enabled(self): |
||||
return self.child.enabled |
||||
|
||||
def dump_state(self): |
||||
self.child.dump_state() |
||||
|
||||
def configure(self, argument_parser): |
||||
self.child.configure(argument_parser) |
||||
|
||||
def bind_arguments(self, args): |
||||
super().bind_arguments(args) |
||||
self.child.bind_arguments(args) |
||||
|
||||
@property |
||||
def visible(self): |
||||
return self.child.visible |
||||
|
||||
def periodic(self): |
||||
self.child.periodic() |
||||
|
||||
def load_state_ex(self, state): |
||||
self.child.load_state_ex(state) |
||||
|
||||
def __str__(self): |
||||
return self.child.__str__() |
||||
|
||||
|
||||
class ActionWrapperControl(WrappingControl): |
||||
""" |
||||
Wraps a module output in an additional action |
||||
""" |
||||
|
||||
def __init__(self, control: AbstractControl, action: str, buttons: str = None) -> None: |
||||
""" |
||||
Creates an action wrapper |
||||
|
||||
:param control: The module to wrap |
||||
:param action: The shell command to execute when :buttons are pressed on the child module |
||||
:param buttons: Optionally, which buttons will trigger the shell command (May be a Button, or a number of OR-ed buttons) |
||||
""" |
||||
super().__init__(control) |
||||
self.__action = action |
||||
self.__buttons = buttons |
||||
|
||||
def __str__(self): |
||||
if self.__buttons: |
||||
return action(self.__action, super().__str__(), button=self.__buttons) |
||||
else: |
||||
return action(self.__action, super().__str__()) |
||||
|
||||
|
||||
@enum.unique |
||||
class Button(enum.Enum): |
||||
""" |
||||
A mouse button |
||||
|
||||
A set of multiple mouse buttons can be constructed by OR-ing buttons together |
||||
e.g.: Button.LEFT|Button.RIGHT |
||||
""" |
||||
LEFT = '1' |
||||
MIDDLE = '2' |
||||
RIGHT = '3' |
||||
SCROLL_UP = '4' |
||||
SCROLL_DOWN = '5' |
||||
|
||||
def __str__(self): |
||||
return self.value |
||||
|
||||
def __or__(self, other): |
||||
""" |
||||
Add multiple buttons together |
||||
|
||||
:param other: Button to add to this one |
||||
:return: An set of multiple buttons |
||||
""" |
||||
if isinstance(other, self.__class__): |
||||
return _Buttons([self, other]) |
||||
return NotImplemented |
||||
|
||||
|
||||
class _Buttons(frozenset): |
||||
""" |
||||
Internal class that represents a set of Button enums and that can be further chained with itself of another Button |
||||
""" |
||||
|
||||
def __str__(self): |
||||
return ''.join([str(b) for b in self]) |
||||
|
||||
def __or__(self, other): |
||||
""" |
||||
Add another Button of _Buttons set to the set |
||||
:param other: Button or _Buttons to add to this set of Buttons |
||||
:return: The superset of this set of buttons and the other set of buttons |
||||
""" |
||||
if isinstance(other, self.__class__): |
||||
return _Buttons(self.union(other)) |
||||
elif isinstance(other, Button): |
||||
return _Buttons(self.union({other})) |
||||
return NotImplemented |
@ -0,0 +1,81 @@ |
||||
import subprocess |
||||
import logging |
||||
|
||||
from .core import AbstractControl, action |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class RedshiftControl(AbstractControl): |
||||
def __init__(self): |
||||
super().__init__() |
||||
self._redshift_proc = None |
||||
|
||||
def configure(self, argument_parser): |
||||
argument_parser.add_argument('--redshift-enabled', help='Use the redshift module', action='store_true') |
||||
argument_parser.add_argument('--redshift-location', help='LAT:LON Your current location', type=str) |
||||
argument_parser.add_argument('--redshift-temperature', |
||||
help='DAY:NIGHT Color temperature to set at daytime/night', type=str) |
||||
|
||||
def bind_arguments(self, args): |
||||
super().bind_arguments(args) |
||||
if self.enabled and not self.redshift_error_message: |
||||
self.redshift_enabled = True |
||||
|
||||
@property |
||||
def enabled(self): |
||||
return self.args.redshift_enabled |
||||
|
||||
@property |
||||
def redshift_enabled(self) -> bool: |
||||
return bool(self._redshift_proc) |
||||
|
||||
@property |
||||
def redshift_error_message(self): |
||||
if self.enabled and not (self.args.redshift_location or self.args.redshift_temperature): |
||||
return "Missing parameter(s) --redshift-location and/or --redshift-temperature" |
||||
if self._redshift_proc is not None and self._redshift_proc.returncode is not None and self._redshift_proc.returncode != 0: |
||||
logger.error("Redshift process died unexpectedly: %s", self._redshift_proc.communicate()) |
||||
return self._redshift_proc.communicate()[1].replace("\n", ' ') |
||||
return None |
||||
|
||||
@redshift_enabled.setter |
||||
def redshift_enabled(self, value: bool) -> None: |
||||
if value == self.redshift_enabled: |
||||
return |
||||
if value: |
||||
logger.info("Starting redshift: -l %s -t %s", self.args.redshift_location, self.args.redshift_temperature) |
||||
self._redshift_proc = subprocess.Popen( |
||||
['redshift', '-l', self.args.redshift_location, '-t', self.args.redshift_temperature], stdin=subprocess.DEVNULL, stderr=subprocess.STDOUT) |
||||
else: |
||||
logger.info("Terminating running redshift process") |
||||
self._redshift_proc.terminate() |
||||
|
||||
def periodic(self): |
||||
if self._redshift_proc: |
||||
self._redshift_proc.poll() |
||||
if self._redshift_proc.returncode is not None: |
||||
self._redshift_proc = None |
||||
return True |
||||
|
||||
def respond_to(self, command): |
||||
if command == 'redshift': |
||||
self.redshift_enabled = not self.redshift_enabled |
||||
return True |
||||
return False |
||||
|
||||
def cleanup(self): |
||||
self.redshift_enabled = False |
||||
if self._redshift_proc: |
||||
self._redshift_proc.wait() |
||||
|
||||
def __str__(self): |
||||
if not self.redshift_error_message: |
||||
return action(self.create_pipe_command('redshift'), 'R' if self.redshift_enabled else 'r') |
||||
return 'E: ' + self.redshift_error_message |
||||
|
||||
def load_state(self, state): |
||||
self.redshift_enabled = state['redshift_enabled'] |
||||
|
||||
def dump_state(self): |
||||
return {'redshift_enabled': self.redshift_enabled} |
@ -0,0 +1,28 @@ |
||||
import sys |
||||
import os.path |
||||
from .core import AbstractControl |
||||
|
||||
|
||||
class QuitControl(AbstractControl): |
||||
@property |
||||
def visible(self): |
||||
return False |
||||
|
||||
def respond_to(self, command): |
||||
if command == 'q': |
||||
sys.exit(0) |
||||
elif command == 'refresh': |
||||
return True |
||||
|
||||
|
||||
class ChildReaperControl(AbstractControl): |
||||
@property |
||||
def visible(self): |
||||
return False |
||||
|
||||
def periodic(self): |
||||
try: |
||||
os.wait3(os.WNOHANG) |
||||
except: |
||||
pass |
||||
|
@ -0,0 +1,125 @@ |
||||
import subprocess |
||||
import logging |
||||
|
||||
import math |
||||
|
||||
from .core import AbstractControl, action, Button |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class VolumeControl(AbstractControl): |
||||
def configure(self, argument_parser): |
||||
argument_parser.add_argument('--volume-enabled', help='Enable volume control', action='store_true') |
||||
|
||||
@property |
||||
def enabled(self): |
||||
return self.args.volume_enabled |
||||
|
||||
@property |
||||
def muted(self): |
||||
return self._muted |
||||
|
||||
@muted.setter |
||||
def muted(self, muted): |
||||
if self._muted != muted: |
||||
logger.info("Setting muted to %s", muted) |
||||
try: |
||||
self._muted = muted |
||||
self._pactl('set-sink-mute', str(int(muted))) |
||||
except subprocess.CalledProcessError as e: |
||||
logger.exception("Error setting mute") |
||||
|
||||
@property |
||||
def volume(self): |
||||
return self._volume |
||||
|
||||
@volume.setter |
||||
def volume(self, volume): |
||||
if volume < 0: |
||||
logger.warning("Cannot set volume to %d, clamping to zero", volume) |
||||
volume = 0 |
||||
if volume > 90000: |
||||
logger.warning("Cannot set volume to %d, clamping to 90000", volume) |
||||
volume = 90000 |
||||
if self.muted: |
||||
self.muted = False |
||||
if self._volume != volume: |
||||
logger.info("Setting volume to %s", volume) |
||||
try: |
||||
self._volume = volume |
||||
self._pactl('set-sink-volume', str(volume)) |
||||
except subprocess.CalledProcessError as e: |
||||
logger.exception("Error setting volume") |
||||
|
||||
def _pa_get_sinks(self): |
||||
return [l.split(b'\t')[0].decode() for l in subprocess.check_output(["pactl", "list", "short", "sinks"], stdin=subprocess.DEVNULL,stderr=subprocess.DEVNULL).split(b'\n') if len(l) > 0] |
||||
|
||||
def _pactl(self, command, arg): |
||||
for i in self._pa_get_sinks(): |
||||
logger.debug("Calling pactl: %s %s %s", command, i, arg) |
||||
subprocess.check_call(["pactl", command, i, arg], stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
||||
|
||||
def __init__(self): |
||||
super().__init__() |
||||
self._muted = False |
||||
self._volume = 0 |
||||
|
||||
def respond_to(self, command): |
||||
if command[0] == '=': |
||||
self.volume = int(command[1:]) * 9000 |
||||
elif command == 'm1': |
||||
self.muted = True |
||||
elif command == 'm0': |
||||
self.muted = False |
||||
elif command == 'mt': |
||||
self.muted = not self.muted |
||||
elif command == '+': |
||||
self.volume += 3000 |
||||
elif command == '-': |
||||
self.volume -= 3000 |
||||
elif command == 'r': |
||||
self.volume = 30000 |
||||
else: |
||||
return False |
||||
return True |
||||
|
||||
def __str__(self): |
||||
return action( |
||||
self.create_pipe_command('+'), |
||||
action( |
||||
self.create_pipe_command('-'), |
||||
self.action_bars(create_bars(self.volume) if not self.muted else ' (mute) '), |
||||
button=Button.SCROLL_DOWN |
||||
), |
||||
button=Button.SCROLL_UP |
||||
) |
||||
|
||||
def action_bars(self, bars): |
||||
return ''.join([action(self.create_pipe_command('=%d' % (i + 1)), c, button=Button.LEFT) for i, c in |
||||
zip(range(len(bars)), bars)]) |
||||
|
||||
def load_state(self, state): |
||||
self.volume = state['volume'] |
||||
self.muted = state['muted'] |
||||
|
||||
def dump_state(self): |
||||
return dict(volume=self.volume, muted=self.muted) |
||||
|
||||
|
||||
def create_bars(volume): |
||||
num_bars = float(volume) / 9000.0 |
||||
return ('/' * math.floor(num_bars)) + partial_bar(num_bars - math.floor(num_bars)) + ( |
||||
' ' * (10 - math.ceil(num_bars))) |
||||
|
||||
|
||||
def partial_bar(bar_size): |
||||
if bar_size == 0.0: |
||||
return '' |
||||
elif bar_size < 0.3: |
||||
return ' ' |
||||
elif bar_size < 0.6: |
||||
return '.' |
||||
elif bar_size < 0.9: |
||||
return '-' |
||||
return '/' |
@ -0,0 +1,29 @@ |
||||
#!/bin/bash |
||||
ARGS="$@" |
||||
while [[ -n "$@" ]]; do |
||||
if [[ "${1:0:1}" != "-" ]]; then |
||||
if [[ -z "$volumepipe" ]]; then |
||||
volumepipe="$1" |
||||
elif [[ -z "$commandpipe" ]]; then |
||||
commandpipe="$1" |
||||
fi |
||||
fi |
||||
shift; |
||||
done |
||||
|
||||
|
||||
|
||||
# volumepipe |
||||
if ! [[ -p "$volumepipe" ]]; then |
||||
rm -rf "$volumepipe" |
||||
mkfifo "$volumepipe" |
||||
fi |
||||
|
||||
# volumecontrol |
||||
if ! [[ -p "$commandpipe" ]]; then |
||||
rm -rf "$commandpipe" |
||||
mkfifo "$commandpipe" |
||||
fi |
||||
|
||||
eval set -- "$ARGS" |
||||
exec "$(dirname "${BASH_SOURCE[0]}")/daemon.py" "$@" |
Loading…
Reference in new issue