sony_bravia_pro_display_moc.../server.py
2021-04-10 18:54:52 +07:00

248 lines
7.5 KiB
Python

import json
import os
import socket, threading, time
HOST = '127.0.0.1' # localhost 127.0.0.1
PORT = 51234
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(4)
class SonyServer(threading.Thread):
def __init__(self, info):
threading.Thread.__init__(self)
socket, address = info
self.socket = socket
self.address = address
self.power_stat = ''
self.input_stat = ''
self.audio_mute_stat = ''
self.audio_volume_stat = ''
self.picture_mute_stat = ''
if not os.path.exists('sony_stats.json'):
self.setDefaults()
else:
self.getDataFromFile()
self.inputs = {
'1': 'hdmi',
'4': 'component',
'5': 'scrn-mirror'
}
## DUMMY COMMANDS
# *SCINPT0000000100000004
# *SCPOWR0000000000000001
# *SCPMUT0000000000000000
# *SCAMUT0000000000000000
# *SCVOLU0000000000000060
## DUMMY ANSWER
# *SATYPE0000000000000000
## DUMMY QUERIES
# *SEINPT################
# *SEPOWR################
# *SEPMUT################
# *SEAMUT################
# *SEVOLU################
def run(self):
first_run = True
while True:
if first_run:
self.printUpdate()
first_run = False
try:
self.socket.send(b'')
except Exception as error:
print("there was an error pinging socket: ", error)
break
data = self.waitRecv()
if not data:
print("ERROR: emergency break - no data received")
break
if len(data) == 24:
data = data.decode('utf-8')
self.routeCommand(data)
else:
print("Length of data was only: ", len(data))
time.sleep(0.1)
self.socket.close()
print('%s:%s disconnected.' % self.address)
return
def waitRecv(self):
# Had other logic in here previously
return self.socket.recv(24)
def routeCommand(self, command=None):
# print("\ncommand received:", command)
type = command[2]
if type == 'C':
res = self.control(command)
if res:
self.updateFile()
self.printUpdate()
elif type == 'E':
self.query(command)
else:
print("\nwrong message format: {}\n".format(type))
return
def control(self, command):
type = command[3:7]
# print("type: ", type)
if type == 'INPT':
inpt = command[14]
setting = command[19:]
self.input_stat = '*SAINPT0000000{}0000{}'.format(inpt, setting)
elif type == 'AMUT':
setting = command[7:]
self.audio_mute_stat = '*SAAMUT{}'.format(setting)
elif type == 'VOLU':
setting = command[7:]
self.audio_volume_stat = '*SAVOLU{}'.format(setting)
elif type == 'PMUT':
setting = command[7:]
self.picture_mute_stat = '*SAPMUT{}'.format(setting)
elif type == 'POWR':
setting = command[7:]
self.power_stat = '*SAPOWR{}'.format(setting)
else:
print("No control type match")
return False
self.respond(type)
return True
def query(self, command=None):
type = command[3:7]
if type == 'INPT':
data = self.input_stat.encode('utf-8')
self.socket.send(data)
# print("Query: ", command, "\n Sent: ", data)
elif type == 'AMUT':
data = self.audio_mute_stat.encode('utf-8')
self.socket.send(data)
# print("Query: ", command, "\n Sent: ", data)
elif type == 'VOLU':
data = self.audio_volume_stat.encode('utf-8')
self.socket.send(data)
# print("Query: ", command, "\n Sent: ", data)
elif type == 'PMUT':
data = self.picture_mute_stat.encode('utf-8')
self.socket.send(data)
# print("Query: ", command, "\n Sent: ", data)
elif type == 'POWR':
data = self.power_stat.encode('utf-8')
self.socket.send(data)
# print("Query: ", command, "\n Sent: ", data)
def respond(self, type=None):
if type is None:
return
new = None
if type == 'INPT':
new = self.input_stat
elif type == 'AMUT':
new = self.audio_mute_stat
elif type == 'VOLU':
new = self.audio_volume_stat
elif type == 'PMUT':
new = self.picture_mute_stat
elif type == 'POWR':
new = self.power_stat
if new:
notify = new[:2] + 'N' + new[3:]
answer = '*SA{}0000000000000000'.format(type)
self.socket.send(answer.encode('utf-8'))
self.socket.send(notify.encode('utf-8'))
# print("sent response\n")
return
print("no notify sent")
def setDefaults(self):
self.power_stat = '*SAPOWR0000000000000000\n'
self.input_stat = '*SAINPT0000000100000003\n'
self.audio_mute_stat = '*SAAMUT0000000000000000\n'
self.audio_volume_stat = '*SAVOLU0000000000000064\n'
self.picture_mute_stat = '*SAPMUT0000000000000000\n'
def updateFile(self):
with open('sony_stats.json', 'w+') as file:
data = {
'power_stat': self.power_stat,
'input_stat': self.input_stat,
'audio_mute_stat': self.audio_mute_stat,
'audio_volume_stat': self.audio_volume_stat,
'picture_mute_stat': self.picture_mute_stat
}
file.write(json.dumps(data))
def getDataFromFile(self):
with open('sony_stats.json', 'r') as file:
temp = file.read()
if temp:
data = json.loads(temp)
self.power_stat = data.get('power_stat', '')
self.input_stat = data.get('input_stat', '')
self.audio_mute_stat = data.get('audio_mute_stat', '')
self.audio_volume_stat = data.get('audio_volume_stat', '')
self.picture_mute_stat = data.get('picture_mute_stat', '')
else:
self.setDefaults()
def printUpdate(self):
print("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")
print('%s:%s connected.' % self.address)
print("========= SERVER DATA =========")
print("Power: ", "on" if self.power_stat[-2:-1] == '1' else 'off')
print("Input: {} {}".format(self.inputs[self.input_stat[14]], self.input_stat[-2:-1]))
print("Audio Mute: ", 'unmuted' if self.audio_mute_stat[-2:-1] == '0' else 'muted')
print("Audio Volume: ", self.audio_volume_stat[-4:-1])
print("Picture Mute: ", 'unmuted' if self.picture_mute_stat[-2:-1] == '0' else 'muted')
print("===============================")
if __name__ == "__main__":
first_time = True
thread = None
while True: # wait for socket to connect
# send socket to SonyServer and start monitoring
if first_time:
print("waiting for a client connection to HOST: {} on PORT:{}".format(HOST, PORT))
first_time = False
try:
thread = SonyServer(s.accept())
thread.start()
except Exception as error:
print("Issue in thread: ", error)
time.sleep(0.1)