Tutorial¶
This tutorial involves taking a simple application and
making it work with Tk, first without rjgtoys.tkthread and
then introducing two different ways of using the rjgtoys.tkthread
components.
As far as I’m aware it will only run on Linux.
The examples show the full code for the four versions of the script, and therefore are a little lengthy, but I felt it was better to show all the code in context than to try to show fragments that might have been unclear.
Step 1: A simple pyudev application¶
The starting point is a simple command-line application that listens for udev events, such as plugging in, or unplugging, a USB stick.
The pyudev package offers a pyudev.MonitorObserver class that
will make callbacks when an event is detected. This script uses that to
print some details of each event.
"""
Monitor udev, without tkinter
This is a basic example of the logic.
"""
import time
import pyudev
class UdevTracer:
def __init__(self):
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
observer = pyudev.MonitorObserver(monitor, callback=self.notify, name='udev_notk')
observer.start()
def notify(self, device):
print("Event: {0}".format(device))
def main():
trace = UdevTracer()
while True:
print("Wait, or hit CTRL/C")
time.sleep(5)
# Or hit CTRL/C
if __name__ == "__main__":
main()
Step 2: A simple application using Tk¶
The next version introduces Tk. Instead of printing events, it adds them
to a tkinter.Text widget. In order to do that safely, because
the callbacks from the pyudev.MonitorObserver are executed in
a thread that is not main thread running the Tk main loop, the events
generated by udev are sent into a queue.Queue and pulled from
there by a Tk timer event handler.
The timer handler runs every five seconds. As a result there can be a delay
of up to five seconds between
an event being detected, and it being reported. The delay can be reduced by
decreasing the interval between timer events, or it can be eliminated altogether
by replacing the timer event handler with an after_idle handler, but as the
delay reduces, the CPU utilisation increases, and using an after_idle handler
will leave one CPU of your machine permanently busy, which is wasteful.
"""
Monitor udev, with tkinter but without tkthread
This one will spin a CPU, because it is polling a queue, in an idle handler.
"""
import time
import tkinter as tk
import queue
import pyudev
class UdevTracer:
def __init__(self, widget):
self.widget = widget
self.queue = queue.Queue()
self.cleanup()
self.process()
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
observer = pyudev.MonitorObserver(monitor, callback=self.notify, name='udev_hot')
observer.start()
def notify(self, device):
msg = "Event: {0}".format(device)
self.queue.put(msg)
def cleanup(self):
self.reset()
self.widget.update()
self.widget.after(5000, self.cleanup)
def reset(self):
self.widget.delete('1.0','end')
self.widget.insert('1.0','Wait, or kill the window\n')
def add_message(self, msg):
self.widget.insert('end',msg)
self.widget.insert('end', '\n')
def process(self):
"""Process queued messages."""
# This is the expensive bit; we can't afford to wait, here,
# because we'll make the rest of the UI unresponsive if we do.
while True:
try:
msg = self.queue.get(block=False)
except queue.Empty:
break
self.add_message(msg)
self.queue.task_done()
self.widget.update()
self.widget.after_idle(self.process)
def main():
root = tk.Tk()
app = tk.Text(root)
w = UdevTracer(app)
app.pack()
app.mainloop()
if __name__ == "__main__":
main()
Step 3: Efficient event handling with a rjgtoys.tkthread.EventQueue¶
One way to avoid the delay or polling overhead of the previous example is to
use an rjgtoys.tkthread.EventQueue.
This encapsulates the queueing that is needed to help pass events from the source thread to the main loop thread, and the creation of a Tk event handler that executes in the same thread as the Tk main event loop and consumes the queued events.
"""
Monitor udev, with tkinter and tkthread.EventQueue
This one will not spin a CPU, because EventQueue avoids tight polling.
"""
import os
import time
import tkinter as tk
import queue
import pyudev
from rjgtoys.tkthread import EventQueue
class UdevTracer:
def __init__(self, widget):
self.widget = widget
self.queue = EventQueue(handler=self.add_message, widget=widget)
self.cleanup()
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
observer = pyudev.MonitorObserver(monitor, callback=self.notify, name='udev_cold')
observer.start()
def notify(self, device):
msg = "Event: {0}".format(device)
self.queue.put(msg)
def cleanup(self):
self.reset()
self.widget.update()
self.widget.after(5000, self.cleanup)
def reset(self):
self.widget.delete('1.0','end')
self.widget.insert('1.0','Wait, or kill the window\n')
def add_message(self, msg):
self.widget.insert('end',msg)
self.widget.insert('end', '\n')
def main():
root = tk.Tk()
w = tk.Text(root)
tracer = UdevTracer(w)
w.pack()
w.mainloop()
if __name__ == "__main__":
main()
Step 4: Generic event collection with rjgtoys.tkthread.EventGenerator¶
The callback interface provided by pyudev.MonitorObserver is very
nice to use, but it’s a bit unusual; in many cases the source of events will be
a function that will deliver the next event, when it arrives - and will block
until then. That’s a simple way to do file or network I/O, for example.
When the source of events can be framed as a generator, you can use
rjgtoys.tkthread.EventGenerator
to do the job of consuming events from the generator and delivering them into
an EventQueue.
As a result, you no longer have to write a separate thread to do the event collection.
"""
Monitor udev, with tkinter and tkthread.EventGenerator
"""
import os
import time
import tkinter as tk
import queue
import pyudev
from rjgtoys.tkthread import EventGenerator
class UdevTracer:
def __init__(self, widget):
self.widget = widget
self.cleanup()
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
self.events = EventGenerator(
widget=self.widget,
generator=iter(monitor.poll, None),
handler=self.add_message
)
def cleanup(self):
self.reset()
self.widget.update()
self.widget.after(5000, self.cleanup)
def reset(self):
self.widget.delete('1.0','end')
self.widget.insert('1.0','Wait, or kill the window\n')
def add_message(self, msg):
self.widget.insert('end',msg)
self.widget.insert('end', '\n')
def main():
root = tk.Tk()
w = tk.Text(root)
tracer = UdevTracer(w)
w.pack()
w.mainloop()
if __name__ == "__main__":
main()