old_gravity/server.py

192 lines
6.0 KiB
Python
Raw Normal View History

2020-01-27 16:21:31 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jan 26 17:50:25 2020
@author: suwako
"""
import sys, multiprocessing, socket, time
if sys.platform == 'win32':
import multiprocessing.reduction
from colorama import Fore, Style
import numpy as np
2020-02-03 00:31:31 +01:00
import json
import gzip
2020-01-27 16:21:31 +01:00
def debug(*s): print("[D ] : " +
str(', '.join(map(str, s))) +
Style.RESET_ALL)
def info(*s): print(Fore.GREEN +
Fore.LIGHTGREEN_EX +
"[I -] : " +
Style.RESET_ALL +
str(', '.join(map(str, s))) +
Style.RESET_ALL)
def warn(*s): print(Fore.MAGENTA +
"[W ~] : " +
Style.RESET_ALL +
str(', '.join(map(str, s))) +
Style.RESET_ALL)
def error(*s): print(Fore.RED +
Fore.LIGHTRED_EX +
"[E !] : " +
Style.RESET_ALL +
str(', '.join(map(str, s))) +
Style.RESET_ALL)
2020-02-03 00:31:31 +01:00
level = 0
BUFFER_SIZE = 4096
CONTINUE = b'\x00'
END = b'\xFF'
2020-01-27 16:21:31 +01:00
for i in range(level):
exec("debug info warn error".split()[i] + "= lambda *x: None")
2020-02-03 00:31:31 +01:00
def lookahead(iterable):
it = iter(iterable)
last = next(it)
for val in it:
yield last, CONTINUE
last = val
yield last, END
def chunked(size, source):
for i in range(0, len(source), size):
yield source[i:i+size]
2020-01-27 16:21:31 +01:00
2020-02-03 00:31:31 +01:00
def handle(conn, address, queues):
2020-01-27 16:21:31 +01:00
try:
2020-02-03 00:31:31 +01:00
info("Inbound Connection at " + ':'.join(map(str,address)))
data = bytes()
2020-01-27 16:21:31 +01:00
while True:
2020-02-03 00:31:31 +01:00
in_data = conn.recv(BUFFER_SIZE)
if in_data == b"":
2020-01-27 16:21:31 +01:00
info("Socket closed remotely at " + ':'.join(map(str,address)))
break
2020-02-03 00:31:31 +01:00
if in_data[0] == CONTINUE[0]:
data += in_data[1:]
elif in_data[0] == END[0]:
decompressed = gzip.decompress(data+in_data[1:]).decode('utf8')
queues['in'].put(json.loads(decompressed))
else :
warn("Invalid chunk received !!")
send(conn, {'error':True, 'content':"INV_CHUNK"})
break
2020-01-27 16:21:31 +01:00
finally:
2020-02-03 00:31:31 +01:00
info("Closing socket at " + ':'.join(map(str,address)))
conn.close()
def send(conn, data_obj):
data = gzip.compress(bytes(json.dumps(data_obj), encoding='utf8'))
for chunk, info in lookahead(chunked(BUFFER_SIZE - 1, data)):
conn.sendall(info + chunk.ljust(BUFFER_SIZE - 1, b'\x00'))
2020-01-27 16:21:31 +01:00
class Server(object):
def __init__(self, hostname, port):
self.hostname = hostname
self.port = port
2020-02-03 00:31:31 +01:00
self.run = False
self.connections = []
2020-01-27 16:21:31 +01:00
def start(self):
2020-02-03 00:31:31 +01:00
self.run = True
info(f"Listening on {self.hostname}:{self.port}.")
2020-01-27 16:21:31 +01:00
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2020-02-03 00:31:31 +01:00
try:
self.socket.bind((self.hostname, self.port))
self.socket.listen(1)
while self.run:
conn, address = self.socket.accept()
queues = {'in':multiprocessing.Queue(),
'out':multiprocessing.Queue()}
p = multiprocessing.Process(target=handle, args=(conn,
address,
queues))
p.start()
self.connections.append({'conn':conn, 'address':address,
'queues':queues, 'process':p})
finally:
for process in multiprocessing.active_children():
info("Shutting down process {process}")
process.terminate()
process.join()
self.socket.close()
def stop(self):
info(f"Shutting down the listenner.")
self.run = False
2020-01-27 16:21:31 +01:00
if __name__ == "__main__":
2020-02-03 00:31:31 +01:00
server = Server("0.0.0.0", 9001)
server.start()
if False :
def handle(connection, address):
try:
info("Connected at "+ ':'.join(map(str,address)))
while True:
data = connection.recv(1024)
if data == b"":
info("Socket closed remotely at " + ':'.join(map(str,address)))
break
debug("Received data %r" % data)
connection.sendall(data)
debug("Sent data")
except:
warn("Problem handling request")
connection.close()
raise
finally:
debug("Closing socket with " + ':'.join(map(str,address)))
connection.close()
class Server(object):
def __init__(self, hostname, port):
self.hostname = hostname
self.port = port
self.connections = []
def start(self):
debug("listening")
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.hostname, self.port))
self.socket.listen(1)
while True:
conn, address = self.socket.accept()
process = multiprocessing.Process(target=handle,
args=(conn, address))
process.daemon = True
process.start()
if __name__ == "__main__":
server = Server("0.0.0.0", 9000)
try:
info("Listening")
server.start()
except:
error("Unexpected exception")
for process in multiprocessing.active_children():
info("Shutting down process %r", process)
process.terminate()
process.join()
raise
finally:
info("Shutting down")
for process in multiprocessing.active_children():
info("Shutting down process %r", process)
process.terminate()
process.join()
info("All done")