root/usb_console/usb_console.py

Revision 419:dc9530ee0842, 2.8 kB (checked in by J?r?me Flesch <jflesch@…>, 14 months ago)

USB: Changing the way the driver read the data on the USB port:
- The user app specifies the buffer where the driver must write the read data
- The endpoint 1 is disable after each read to let the host know that we can't read the data

  • Property exe set to *
Line 
1#!/usr/bin/env python
2
3import sys
4import os.path
5import threading
6import Queue
7import curses
8import curses.wrapper
9import time;
10from nxt.lowlevel import get_device
11
12NXOS_INTERFACE = 0
13
14COMMAND = 0
15INPUT = 1
16OUTPUT = 2
17
18def usb_thread(brick, command_queue, output_queue):
19    output_queue.put((COMMAND, "-- USB I/O thread up"))
20    while True:
21        from_brick = brick.read(128)
22        if from_brick:
23            output_queue.put((INPUT, "<< %s" % repr(from_brick)[1:-1]))
24        try:
25            to_brick = command_queue.get_nowait()
26            if to_brick is None:
27                brick.close()
28                return
29            while not (brick.write(to_brick)):
30                time.sleep(1)
31        except Queue.Empty:
32            pass
33
34def output_thread(stdscr, output_queue):
35    curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
36    curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK)
37    curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
38
39    y,x = stdscr.getmaxyx()
40    logpos = 1
41    log = curses.newwin(y-1,x,0,0)
42    log.border()
43    log.noutrefresh()
44    stdscr.noutrefresh()
45    curses.doupdate()
46    output_queue.put((COMMAND, "-- Output thread up"))
47    while True:
48        data = output_queue.get()
49        if data is None:
50            return
51        log.addstr(logpos, 1, repr(data[1])[1:-1], curses.color_pair(data[0]+1))
52        #stdscr.move(y-1,0)
53        log.noutrefresh()
54        stdscr.noutrefresh()
55        curses.doupdate()
56        logpos += 1
57        if logpos == y:
58            logpos=1
59
60def init(stdscr):
61    brick = get_device(0x0694, 0xff00, timeout=60)
62    if not brick:
63        return False
64    brick.open(NXOS_INTERFACE)
65
66    command_queue = Queue.Queue(1)
67    output_queue = Queue.Queue()
68
69    output = threading.Thread(target=output_thread, args=[stdscr, output_queue])
70    output.start()
71
72    usb = threading.Thread(target=usb_thread,
73                           args=[brick, command_queue, output_queue])
74    usb.start()
75
76    return (command_queue, output_queue, output, usb)
77
78def input_loop(stdscr, command_queue, output_queue):
79    try:
80        curses.echo()
81        while True:
82            mx = stdscr.getmaxyx()
83            send = stdscr.getstr(mx[0]-1,0).strip()
84            stdscr.clrtoeol()
85            output_queue.put((OUTPUT, '>> %s' % send))
86            command_queue.put(send)
87            if send == 'halt':
88                break
89    except KeyboardInterrupt:
90        pass
91
92def ncurses_main(stdscr):
93    ret = init(stdscr)
94    if not ret:
95        return 1
96    command_queue, output_queue, output, usb = ret
97
98    input_loop(stdscr, command_queue, output_queue)
99
100    command_queue.put(None)
101    output_queue.put(None)
102    output.join()
103    usb.join()
104
105def main():
106    curses.wrapper(ncurses_main)
107
108if __name__ == '__main__':
109    sys.exit(main())
Note: See TracBrowser for help on using the browser.