Initial commit; first semi-usable version

This commit is contained in:
Tomáš Mládek 2019-02-10 13:33:37 +01:00
commit f93a807175
7 changed files with 309 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
__pycache__

3
requirements.in Normal file
View file

@ -0,0 +1,3 @@
launchpad_py
pygame
python-osc

9
requirements.txt Normal file
View file

@ -0,0 +1,9 @@
#
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile --output-file requirements.txt requirements.in
#
launchpad-py==0.8.1
pygame==1.9.4
python-osc==1.7.0

15
run.py Normal file
View file

@ -0,0 +1,15 @@
import logging
from src.controller import Controller
from src.resolume import Resolume
logging.basicConfig(format="%(asctime)s [%(levelname)s] %(message)s", level=logging.DEBUG)
controller = Controller()
resolume = Resolume()
controller.set_resolume(resolume)
resolume.set_controller(controller)
resolume.start()
controller.run()

8
src/common.py Normal file
View file

@ -0,0 +1,8 @@
from enum import Enum
class ClipState(Enum):
NO_CLIP = 0,
NOT_RUNNING = 1,
MOVED = 2,
RUNNING = 3

172
src/controller.py Normal file
View file

@ -0,0 +1,172 @@
import math
from enum import Enum, auto
from logging import getLogger
from time import sleep
import launchpad_py as launchpad
class ControlButtons(Enum):
UP_ARROW = (0, 0)
DOWN_ARROW = (1, 0)
LEFT_ARROW = (2, 0)
RIGHT_ARROW = (3, 0)
LAUNCH_BUTTON = (4, 0)
MIXER_BUTTON = (7, 0)
class Mode(Enum):
LAUNCH = auto()
MIXER = auto()
class Controller:
POLLING_DELAY = 0.1
mode = None
launch_x_offset = 0
launch_deck = 0
layers_bypassed = [False for _ in range(8)]
resolume = None
def __init__(self):
self.lp = launchpad.Launchpad()
self.lp.Open()
self.lp.Check()
self.lp.Reset()
def set_resolume(self, resolume):
self.resolume = resolume
def run(self):
self._switch_mode(Mode.LAUNCH)
while True:
while True:
event = self.lp.ButtonStateXY()
if len(event) > 0:
x, y, down = event
getLogger('controller').debug("EV: {}, {} {}".format(x, y, "DOWN" if down else "UP"))
self._handle(x, y, down)
else:
break
sleep(self.POLLING_DELAY)
def stop(self):
self.lp.Reset()
self.lp.Close()
def unset_clip(self, layer, clip):
if self.mode == Mode.LAUNCH and self.launch_x_offset < clip <= self.launch_x_offset + 8:
self.lp.LedCtrlXY(clip - 1 - self.launch_x_offset, 8 - (layer - 1), 0, 0)
def reset_clip(self, layer, clip):
if self.mode == Mode.LAUNCH and self.launch_x_offset < clip <= self.launch_x_offset + 8:
self.lp.LedCtrlXY(clip - 1 - self.launch_x_offset, 8 - (layer - 1), 0, 3)
def arm_clip(self, layer, clip):
if self.mode == Mode.LAUNCH and self.launch_x_offset < clip <= self.launch_x_offset + 8:
self.lp.LedCtrlXY(clip - 1 - self.launch_x_offset, 8 - (layer - 1), 3, 3)
def set_layer_clear(self, layer, state):
if self.mode == Mode.LAUNCH:
self.lp.LedCtrlXY(8, 8 - (layer - 1), *((1, 0) if state else (3, 0)))
def set_layer_opacity(self, layer, opacity):
if self.mode == Mode.MIXER:
level = math.floor(8 * opacity)
for x in range(0, level):
self.lp.LedCtrlXY(x, 8 - (layer - 1), 0, 3)
for x in range(level, 8):
self.lp.LedCtrlXY(x, 8 - (layer - 1), 0, 0)
def set_layer_bypass(self, layer, state):
self.layers_bypassed[layer - 1] = state
if self.mode == Mode.MIXER:
self.lp.LedCtrlXY(8, 8 - (layer - 1), *((3, 0) if state else (1, 0)))
def _switch_mode(self, mode):
if mode == self.mode:
return
self.mode = mode
self._reset()
if self.mode == Mode.LAUNCH:
self.resolume.poll_for_launch_state(self.launch_x_offset)
elif self.mode == Mode.MIXER:
self.resolume.poll_for_mixer_state()
def _reset(self):
self.lp.Reset()
for button in [ControlButtons.LAUNCH_BUTTON, ControlButtons.MIXER_BUTTON]:
self.lp.LedCtrlXY(*button.value, 0, 3)
if self.mode == Mode.LAUNCH:
self.lp.LedCtrlXY(*ControlButtons.LAUNCH_BUTTON.value, 3, 3)
self._update_launch_arrows()
elif self.mode == Mode.MIXER:
self.lp.LedCtrlXY(*ControlButtons.MIXER_BUTTON.value, 3, 3)
def _handle(self, x, y, down):
if (x, y) == ControlButtons.LAUNCH_BUTTON.value and down:
self._switch_mode(Mode.LAUNCH)
elif (x, y) == ControlButtons.MIXER_BUTTON.value and down:
self._switch_mode(Mode.MIXER)
elif self.mode == Mode.LAUNCH:
self._handle_launch(x, y, down)
elif self.mode == Mode.MIXER:
self._handle_mixer(x, y, down)
def _handle_launch(self, x, y, down):
if y == 0:
if not down:
return
if (x, y) == ControlButtons.LEFT_ARROW.value:
if self.launch_x_offset > 0:
self.launch_x_offset -= 1
self.resolume.poll_for_launch_state(self.launch_x_offset)
elif (x, y) == ControlButtons.RIGHT_ARROW.value:
self.launch_x_offset += 1
self.resolume.poll_for_launch_state(self.launch_x_offset)
# elif (x, y) == ControlButtons.DOWN_ARROW.value:
# if self.launch_deck > 0:
# self.launch_deck -= 1
# self._reset()
# self.resolume.select_deck(self.launch_deck + 1)
# elif (x, y) == ControlButtons.UP_ARROW.value:
# self.launch_deck += 1
# self._reset()
# self.resolume.select_deck(self.launch_deck + 1)
self._update_launch_arrows()
else:
if x < 8:
if not down:
return
layer, column = 8 - (y - 1), x + 1 + self.launch_x_offset
self.resolume.launch_clip(layer, column)
else:
self.resolume.clear_layer(8 - (y - 1), down)
def _update_launch_arrows(self):
# self.lp.LedCtrlXY(*ControlButtons.UP_ARROW.value, 0, 3)
# if self.launch_deck > 0:
# self.lp.LedCtrlXY(*ControlButtons.DOWN_ARROW.value, 0, 3)
# else:
# self.lp.LedCtrlXY(*ControlButtons.DOWN_ARROW.value, 0, 1)
self.lp.LedCtrlXY(*ControlButtons.RIGHT_ARROW.value, 0, 3)
if self.launch_x_offset > 0:
self.lp.LedCtrlXY(*ControlButtons.LEFT_ARROW.value, 0, 3)
else:
self.lp.LedCtrlXY(*ControlButtons.LEFT_ARROW.value, 0, 1)
def _handle_mixer(self, x, y, down):
if not down:
return
if x == 8:
layer = 8 - (y - 1)
self.resolume.set_layer_bypassed(layer, not self.layers_bypassed[layer - 1])
else:
level = (x + 1) / 8
self.resolume.set_layer_opacity(8 - (y - 1), level)

101
src/resolume.py Normal file
View file

@ -0,0 +1,101 @@
import re
from logging import getLogger
from threading import Thread
from pythonosc import osc_server, dispatcher, udp_client
class Resolume:
controller = None
def __init__(self, host="127.0.0.1", port_out=7000, port_in=7001, debug=False):
self._dispatcher = dispatcher.Dispatcher()
if debug:
self._dispatcher.map("/composition/*", print)
else:
self._dispatcher.map("/composition/layers/*/clips/*/connected", self._handle_connected)
self._dispatcher.map("/composition/layers/*/clear", self._handle_clear)
self._dispatcher.map("/composition/layers/*/video/opacity", self._handle_opacity)
self._dispatcher.map("/composition/layers/*/bypassed", self._handle_bypassed)
self.server = osc_server.ThreadingOSCUDPServer((host, port_in), self._dispatcher)
self.client = udp_client.SimpleUDPClient(host, port_out)
# batch_lock = threading.Lock()
# batch_done = threading.Event()
# batch_timer = None
# batch_result = []
def start(self):
thread = Thread(target=self.server.serve_forever)
thread.start()
getLogger('resolume').info("Started OSC server @ {}:{}".format(*self.server.server_address))
def set_controller(self, controller):
self.controller = controller
def set_layer_opacity(self, layer, opacity):
self._osc_send(f"/composition/layers/{layer}/video/opacity", opacity)
def set_layer_bypassed(self, layer, state):
self._osc_send(f"/composition/layers/{layer}/bypassed", 1 if state else 0)
def launch_clip(self, layer, column):
self._osc_send(f"/composition/layers/{layer}/clips/{column}/connect", 1)
def clear_layer(self, layer, state):
self._osc_send(f"/composition/layers/{layer}/clear", 1 if state else 0)
def select_deck(self, deck):
self._osc_send(f"/composition/decks/{deck}/select", 1)
def poll_for_launch_state(self, column_start, width=8):
for layer in range(1, 8):
self._osc_send(f"/composition/layers/{layer}/clear", "?")
for column in range(column_start, column_start + width + 1):
self._osc_send(f"/composition/layers/{layer}/clips/{column}/connected", "?")
def poll_for_mixer_state(self):
for layer in range(1, 8):
self._osc_send(f"/composition/layers/{layer}/bypassed", "?")
self._osc_send(f"/composition/layers/{layer}/video/opacity", "?")
def _handle_connected(self, address, value):
getLogger('resolume').debug("OSC RECV: %s: '%s'", address, value)
layer, clip = [int(number) for _, number in re.findall(r'(layers|clips)/([0-9]+)', address)]
if value == 0 or value == 2:
self.controller.unset_clip(layer, clip)
elif value == 1:
self.controller.reset_clip(layer, clip)
elif value == 3:
self.controller.arm_clip(layer, clip)
def _handle_clear(self, address, value):
getLogger('resolume').debug("OSC RECV: %s: '%s'", address, value)
layer = int(re.search(r'layers/([0-9]+)', address).group(1))
self.controller.set_layer_clear(layer, bool(value))
def _handle_opacity(self, address, value):
getLogger('resolume').debug("OSC RECV: %s: '%s'", address, value)
layer = int(re.search(r'layers/([0-9]+)', address).group(1))
self.controller.set_layer_opacity(layer, value)
def _handle_bypassed(self, address, value):
getLogger('resolume').debug("OSC RECV: %s: '%s'", address, value)
layer = int(re.search(r'layers/([0-9]+)', address).group(1))
self.controller.set_layer_bypass(layer, bool(value))
def _osc_send(self, address, value):
getLogger('resolume').debug("OSC SEND: %s: '%s'", address, value)
self.client.send_message(address, value)
def debug(self):
for layer in range(1, 4):
self._osc_send(f"/composition/layers/{layer}/playmode", "?")
#
# resolume = Resolume(debug=True)
# resolume.start()
#
# while True:
# resolume.debug()
# sleep(30)