Fixed the trafotron
[cerebrum.git] / tools / trafotron.py
blob81a28214007984fbc7ec7bbac5b2f93d6509ee0c
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 import time
5 from threading import Thread
6 import copy
7 import json
8 import requests
9 from http.server import HTTPServer, BaseHTTPRequestHandler
10 from pylibcerebrum.serial_mux import SerialMux
12 CBEAM = 'http://c-beam.cbrp3.c-base.org:4254/rpc/'
13 HYPERBLAST = 'http://10.0.1.23:1337/'
14 CLOCC = 'http://c-lab-lock.cbrp3.c-base.org:1337/'
15 PORT = '/dev/serial/by-id/usb-Arduino__www.arduino.cc__0043_7523230313535161C072-if00'
16 BAUDRATE = 115200
17 AVG_SAMPLES = 128
18 SEND_THRESHOLD = 3
20 s = SerialMux(PORT, BAUDRATE)
21 print('discovering cerebrum devices')
22 results = []
23 while not results:
24 results = s.discover()
25 print(results)
26 print('opening first device')
27 g = s.open(0)
28 print('initializing device')
29 print(dir(g))
30 #bar status outputs
31 for io in [g.digital3, g.digital5, g.digital6, g.digital9, g.digital10, g.digital11]:
32 io.direction = 1
33 io.pwm_enabled = 1
34 print('starting event loop')
36 oldre, oldge, oldgn = None,None,None
37 def ampel(re,ge,gn):
38 global oldre, oldge, oldgn
39 if re and ge and gn:
40 gn = False
41 # Ensure no more than two lights are on at the same time, even for very short periods of time
42 if oldre != re:
43 g.ampelrot.state = re
44 if oldge != ge:
45 g.ampelgelb.state = ge
46 if oldgn != gn:
47 g.ampelgrün.state = gn
48 oldre,oldge,oldgn = re,ge,gn
50 #HACK ctrl-c ctrl-p -ed from barstatus.py
51 barstatus = 'closed'
52 ampelstate = ((0,0,0), (0,0,0))
53 lastchange = time.time() - 180
54 def animate():
55 global barstatus, lastchange, ampelstate
56 while True:
57 lookup = barstatus
58 if time.time() - lastchange < 180:
59 if barstatus == 'open':
60 lookup = 'opened'
61 elif barstatus == 'closed':
62 lookup = 'lastcall'
63 l1, r1 = {'open': ((10, 255, 10), (128, 128, 128)),
64 'opened': ((10, 128, 255), (128, 128, 128)),
65 'closed': ((128, 128, 128), (255, 4, 4)),
66 'lastcall': ((10, 255, 10), (128, 128, 128))}.get(lookup)
67 l2, r2 = {'open': ((10, 255, 10), (128, 128, 128)),
68 'opened': ((10, 255, 10), (128, 128, 128)),
69 'closed': ((128, 128, 128), (255, 4, 4)),
70 'lastcall': ((10, 255, 10), (255, 255, 10))}.get(lookup)
71 (g.digital3.pwm, g.digital5.pwm, g.digital6.pwm), (g.digital9.pwm, g.digital10.pwm, g.digital11.pwm) = l1, r1
72 ampel(*ampelstate[0])
73 time.sleep(0.33)
74 (g.digital3.pwm, g.digital5.pwm, g.digital6.pwm), (g.digital9.pwm, g.digital10.pwm, g.digital11.pwm) = l2, r2
75 ampel(*ampelstate[1])
76 time.sleep(0.66)
78 animator = Thread(target=animate)
79 animator.daemon = True
80 animator.start()
83 def sendstate(value):
84 print('SENDING', value)
85 try:
86 requests.post(CBEAM, data=json.dumps({'method': 'trafotron', 'params': [value], 'id': 0}))
87 except requests.exceptions.ConnectionError:
88 pass
90 class AmpelHandler(BaseHTTPRequestHandler):
91 def do_POST(self):
92 global ampelstate
93 self.send_response(200)
94 self.end_headers()
95 postlen = int(self.headers['Content-Length'])
96 postdata = str(self.rfile.read(postlen), 'utf-8')
97 data = json.loads(postdata)
98 method = data.get('method')
99 if method == 'ampel':
100 p = data.get('params')
101 if type(p[0]) is list:
102 (r1,y1,g1),(r2,y2,g2) = p
103 r1,y1,g1 = bool(r1), bool(y1), bool(g1)
104 r2,y2,g2 = bool(r2), bool(y2), bool(g2)
105 ampelstate = ((r1,y1,g1),(r2,y2,g2))
106 elif type(p[0]) is int and len(p) == 1:
107 a,b = (bool(p[0]&32), bool(p[0]&16), bool(p[0]&8)), (bool(p[0]&4), bool(p[0]&2), bool(p[0]&1))
108 ampelstate = a,b
109 else:
110 r,y,g = p
111 r,y,g = bool(r), bool(y), bool(g)
112 ampelstate = (r,y,g), (r,y,g)
114 HOST, PORT = '', 1337
115 server = HTTPServer((HOST, PORT), AmpelHandler)
116 t = Thread(target=server.serve_forever)
117 t.daemon = True
118 t.start()
120 time.sleep(2)
122 # Enable pull-up on Arduino analog pin 4
123 g.analog4.state = 1
124 oldval = -2*SEND_THRESHOLD
125 oldbarstate = None
126 newbarstate = None
127 while True:
128 val = sum([ g.analog5.analog for i in range(AVG_SAMPLES)])/AVG_SAMPLES
129 if abs(val-oldval) > SEND_THRESHOLD:
130 oldval = val
131 sendstate(int(val))
132 if g.analog4.state:
133 newbarstate = 'closed'
134 else:
135 newbarstate = 'open'
136 if newbarstate != oldbarstate:
137 oldbarstate = newbarstate
139 #comm with animation thread
140 barstatus = newbarstate
141 lastchange = time.time()
143 try:
144 requests.post(HYPERBLAST, data=json.dumps({'method': 'barstatus', 'params': [newbarstate], 'id': 0}))
145 except requests.exceptions.ConnectionError:
146 pass
147 try:
148 requests.post(CBEAM, data=json.dumps({'method': 'barstatus', 'params': [newbarstate], 'id': 0}))
149 except requests.exceptions.ConnectionError:
150 pass
151 try:
152 requests.post(CLOCC, data=json.dumps({'method': 'barstatus', 'params': [newbarstate], 'id': 0}))
153 except requests.exceptions.ConnectionError:
154 pass