话不多说,上代码
import wx
import wx.lib.delayedresult as delayedresult
class MainFrame(wx.Frame):
def __init__(self, *args, **kwds):
wx.Frame.__init__(self, *args, **kwds)
self.jobID = 0
self.abortEvent = delayedresult.AbortEvent()
pnl = wx.Panel(self)
self.textUseDelayed = wx.StaticText(pnl, -1, "")
self.textUseDelayed.SetLabel("使用delayedresult,非阻塞GUI")
self.buttonGet = wx.Button(pnl, -1, "Get")
self.buttonAbort = wx.Button(pnl, -1, "Abort")
self.slider = wx.Slider(pnl, -1, 0, 0, 10, size=(100, -1),
style=wx.SL_HORIZONTAL | wx.SL_AUTOTICKS)
self.textCtrlResult = wx.TextCtrl(pnl, -1, "", style=wx.TE_READONLY)
self.buttonAbort.Enable(False)
vsizer = wx.BoxSizer(wx.VERTICAL)
vsizer = wx.BoxSizer(wx.VERTICAL)
hsizer = wx.BoxSizer(wx.HORIZONTAL)
vsizer.Add(self.textUseDelayed, 0, wx.ALL, 10)
hsizer.Add(self.buttonGet, 0, wx.ALL, 5)
hsizer.Add(self.buttonAbort, 0, wx.ALL, 5)
hsizer.Add(self.slider, 0, wx.ALL, 5)
hsizer.Add(self.textCtrlResult, 0, wx.ALL, 5)
vsizer.Add(hsizer, 0, wx.ALL, 5)
pnl.SetSizer(vsizer)
vsizer.SetSizeHints(self)
self.Bind(wx.EVT_BUTTON, self.handleGet, self.buttonGet)
self.Bind(wx.EVT_BUTTON, self.handleAbort, self.buttonAbort)
def handleGet(self, event):
"""计算结果在单独的线程,不影响GUI响应。"""
self.buttonGet.Enable(False)
self.buttonAbort.Enable(True)
self.abortEvent.clear()
self.jobID += 1
print("在生产者线程中启动作业%s: GUI保持响应" % self.jobID)
delayedresult.startWorker(self._resultConsumer, self._resultProducer,
wargs=(self.jobID, self.abortEvent), jobID=self.jobID)
# startWorker(consumer, workerFn, cargs=(), ckwargs={}, wargs=(), wkwargs={}, jobID=None,
# group=None, daemon=False, sendReturn=True, senderArg=None)
# 方便的函数,将在单独线程中运行的workerFn产生的数据发送到在主线程中运行的consumer。
# 这个函数仅仅创建了一个SenderCallAfter(或者一个SenderWxEvent,如果consumer是从wx.EvtHandler派生的)和一个Producer,
# 并在启动Producer线程后立即返回。jobID用于Sender,并作为Producer线程的名称。返回创建的线程,以备调用者需要加入/等等。
def handleAbort(self, event):
"""中止结果计算。"""
print("作业%s的中止结果" % self.jobID)
self.buttonGet.Enable(True)
self.buttonAbort.Enable(False)
self.abortEvent.set()
def _resultProducer(self, jobID, abortEvent):
"""假装是一个复杂的工作函数,或者由于网络访问等原因需要很长时间运行的东西。如果没有在单独的线程中调用此方法,则GUI将假死。"""
# 下面是耗时的操作,一直关注abortEvent是否中断当前操作
import time
count = 0
while not abortEvent() and count < 50:
time.sleep(0.1)
count += 1
return jobID
def _resultConsumer(self, delayedResult):
"""接收数据"""
jobID = delayedResult.getJobID()
assert jobID == self.jobID
try:
result = delayedResult.get()
except Exception as exc:
print("作业%s的结果引发异常:%s" % (jobID, exc))
return
print("得到作业%s: %s的结果" % (jobID, result))
self.textCtrlResult.SetValue(str(result))
self.buttonGet.Enable(True)
self.buttonAbort.Enable(False)
if __name__ == "__main__":
app = wx.App()
app.TopWindow = MainFrame(None, title="DelayedResult测试")
app.TopWindow.Show()
app.MainLoop()