add freq
[dumbwifi.git] / ui.py
blobaf6ca1b6b0f72dae55eb2a961ef25bad7de90458
1 #!/usr/bin/env python
3 import curses
4 import os
5 import stat
6 import traceback
8 from conf import config
9 from output import logger
10 import network
11 import output
12 import system
13 import time
14 import sys
17 def init_routine():
18 as_root_check()
19 init_config()
22 def as_root_check():
23 if not system.as_root():
24 logger.err("Must run as root", log=False)
25 sys.exit(1)
28 def init_config():
29 try:
30 # make sure config file is protected from user access
31 mode = os.stat(config.configfile)[stat.ST_MODE]
32 if mode & (stat.S_IRWXG | stat.S_IRWXO):
33 logger.err(\
34 "Config file %s should only be accessible by root (mode is: %s)"\
35 % (config.configfile, oct(stat.S_IMODE(mode))))
36 sys.exit(1)
38 config.init_config()
39 except Exception, msg:
40 logger.err(msg)
41 sys.exit(1)
44 def tools_check():
45 logger.display("Checking for missing tools...", log=False)
46 for cmd_name in config.tools:
47 var_name = cmd_name.replace("-", "_")
48 path = config.__dict__[var_name]
49 if os.path.isabs(path):
50 logger.display(" [ok] %s" % path, log=False)
51 else:
52 logger.display(" [missing] %s" % cmd_name, log=False)
53 sys.exit(0)
56 def ip_check():
57 logger.await("Checking for ip")
58 ips = [(x, y) for (x, y) in
59 [(i.interface, network.has_ip(i.interface))
60 for i in config.interfaces]
61 if y != None] # (iface, ip) pairs where ip is set
62 if ips:
63 logger.result(
64 reduce(lambda x, y: "%s %s" % (x, y),
65 map(lambda (x, y): "%s (%s)" % (y, x), ips)))
66 return True
67 else:
68 logger.result("none")
71 def choose_medium():
72 logger.await("Selecting preferred network medium")
73 iface = config.interfaces.get_top()
74 logger.result(iface.medium)
75 if iface.medium == "wired": return True
78 def check_wired_link():
79 logger.await("Checking for wired link")
80 ifaces = config.interfaces.get_all(pred=lambda x: x.medium == "wired")
81 connected = [network.wire_connected(i.interface) for i in ifaces \
82 if network.wire_connected(i.interface)]
83 if connected:
84 logger.result(reduce(lambda x,y: "%s %s" % (x,y),
85 map(lambda x: "%s" % x, connected)))
86 return True
87 else:
88 logger.result("none, use wireless")
91 def request_ip(iface, net=None, tries=3):
92 n = iface.interface
93 if net:
94 n = "%s (from %s)" % (n, net.essid)
95 if not network.setup_wifi(iface.interface, net):
96 logger.err("Failed to associate with access point for network %s"\
97 % net.essid)
98 return
100 attempt = 0
101 while attempt < tries:
102 attempt += 1
103 logger.await("(%d/%d) Request ip on %s" % (attempt, tries, n))
105 start_time = time.time()
106 ip = network.request_ip(iface.interface)
107 stop_time = time.time()
108 duration = stop_time - start_time
110 if ip:
111 logger.result("%s (%ds)" % (ip, duration))
112 return True
113 else: logger.result("failed (%ds)" % duration)
116 def scan_wifi_networks(iface, wardrive=None):
117 logger.await("Scan for wireless networks")
118 nets = network.read_scan(network.normal_scan(iface.interface))
119 nets = config.networks.merge(nets, 'essid')
120 f = lambda x: x.priority != None and x.signal != None
121 if wardrive:
122 f = lambda x: x.signal != None and x.priority == None\
123 and x.encrypted == None and x.essid != "<hidden>"
124 nets = nets.sort(sort_key='quality').get_all(pred=f)
125 if nets:
126 logger.result(reduce(lambda x,y: "%s %s" % (x,y),
127 map(lambda x: "%s" % x.essid, nets[:3])))
128 return nets
129 else: logger.result("none")
132 def display_live_networks(nets, field=None, column=None):
133 if not nets:
134 return "No networks detected"
136 keys = ['bssid', 'essid', 'channel', 'bitrate', 'sec', 'signal', 'seen', 'freq']
137 width = 19; mw = 4; sp = 2 # width: max width mw : min width sp : separate
139 if field:
140 nets = nets.sort(sort_key=field)
141 elif column and 0 < column <= len(keys):
142 nets = nets.sort(sort_key=keys[column-1])
143 else:
144 nets = nets.sort(sort_key='signal')
146 def col_len(dcts, key):
147 "figure out column length to assign to fields based on content length"
148 l = []
149 for dct in dcts:
150 # ugly special case handling
151 if key == 'bitrate':
152 l.append(len('bitra')+sp)
153 continue
154 if key == 'seen':
155 l.append(len('seen')+sp)
156 continue
157 if key == 'freq':
158 l.append(len('freq')+sp)
159 continue
161 val = dct.get(key)
162 if val:
163 l.append(len(val)+sp)
164 else:
165 l.append(sp)
166 return l
167 ws = [max(min(max(col_len(nets, k)), width), mw) for k in keys]
169 s = ""
170 for (i, key) in enumerate(keys):
171 s += ("%s" % key[:ws[i]-sp]).ljust(ws[i])
172 s = s[:-sp] + "\n"
173 for net in nets:
174 for (i, key) in enumerate(keys):
175 val = net.get(key)
177 # ugly special case handling
178 if key == 'bitrate':
179 val = val.replace('/s', '')
180 if key == 'seen':
181 val = "%s" % output.format_timespan(time.time()-val)
182 if key == 'freq':
183 val = "%s%%" % int(100 * int(val) / config.scan_count)
185 if val:
186 s += (val[:ws[i]-sp]).ljust(ws[i])
187 else:
188 s += "".ljust(ws[i])
189 s = s[:-sp] + "\n"
190 return s + "%d network(s) found" % len(nets)
193 def curse(iface, timeout):
194 def curses_init():
195 logger.mute(quiet=True)
197 scr = curses.initscr()
198 curses.noecho()
199 curses.cbreak()
200 scr.keypad(1)
201 curses.curs_set(0)
202 return scr
204 def curses_shutdown(scr):
205 curses.nocbreak()
206 scr.keypad(0)
207 curses.echo()
208 curses.endwin()
210 logger.unmute(quiet=True)
212 def display_list(scr, header, status, list_s):
213 if list_s:
214 ss = list_s.split("\n")
215 scr.clear()
216 try:
217 scr.addstr(header + " / Ctrl+C to exit\n")
218 if status:
219 scr.addstr(status + "\n")
220 scr.addstr("\n"+ss[0]+"\n", curses.A_REVERSE)
222 # add one by one until possible exception is thrown
223 # when the list is too long
224 for s in ss[1:]:
225 scr.addstr(s+"\n")
226 except:
227 pass
228 scr.refresh()
230 def get_status(timeout, t):
231 if timeout > 0:
232 return "-> scanning for %ds (-%ds)" % (timeout, t-time.time())
233 return ""
235 list_s = ""
236 t = time.time()+timeout
237 header = "%s scanning for wireless networks (%s)" % \
238 (config.program_name, iface)
239 status = get_status(timeout, t)
241 try:
242 scr = curses_init()
243 display_list(scr, header, status, display_live_networks(None))
245 while 1:
246 status = get_status(timeout, t)
247 if timeout > 0:
248 if t < time.time(): break
250 scan_data = network.single_scan(iface)
251 nets = network.read_scan(scan_data)
252 list_s = display_live_networks(nets)
254 display_list(scr, header, status, list_s)
256 curses.napms(500)
258 except KeyboardInterrupt:
259 curses_shutdown(scr)
260 except:
261 curses_shutdown(scr)
262 traceback.print_stack()
263 traceback.print_exc()
264 else:
265 curses_shutdown(scr)
267 print list_s