python事件驱动
举个简单的例子:
有些人喜欢的某个公众号,然后去关注这个公众号,哪天这个公众号发布了篇新的文章,没多久订阅者就会在微信里收到这个公众号推送的新消息,如果感兴趣就打开来阅读。
公众号例子
事件驱动模型可以理解为上面的例子,是设计模式中观察者模式的一种典型应用。除了订阅公众号外,如你关注某人的微博,关注某人的简书,当被关注者发了个新状态或者新文章,你会收到他们新的消息,这些都可以理解为事件驱动模型。
实际上,世间万物各种属性的变化,我们都可以抽象为事件,最直观的是图形界面应用里,如常见的点击、双击、拖动操作,又或者是游戏里的英雄升级了,怪物死亡了等等,都可以视为一个事件发生了。而发送事件的事物称为事件源,对这个事件感兴趣的事物为监听者,事件发生后监听者会收到这个消息,然后做相应的反应。
例如上面公众号例子可以翻译为,监听器(订阅者)监听了(关注了)事件源(公众号),当事件源的发送事件时(公众号发布文章),所有监听该事件的监听器(订阅者)都会接收到消息并作出响应(阅读文章)。
1.公众号为事件源
2.订阅者为事件监听器
3.订阅者关注公众号,相当于监听器监听了事件源
4.公众号发布文章这个动作为发送事件
5.订阅者收到事件后,做出阅读文章的响应动作
第一个公众号发文章的例子 两个py文件
eventengine.py
# encoding: UTF-8
# 系统模块
from queue import Queue, Empty
from threading import *
########################################################################
class EventManager:
# ----------------------------------------------------------------------
def __init__(self):
"""初始化事件管理器"""
# 事件对象列表
self.__eventQueue = Queue()
# 事件管理器开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target=self.__Run)
# 这里的__handlers是一个字典,用来保存对应的事件的响应函数
# 其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多
self.__handlers = {}
# ----------------------------------------------------------------------
def __Run(self):
"""引擎运行"""
while self.__active == True:
try:
# 获取事件的阻塞时间设为1秒
event = self.__eventQueue.get(block=True, timeout=1)
self.__EventProcess(event)
except Empty:
pass
# ----------------------------------------------------------------------
def __EventProcess(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
# 若存在,则按顺序将事件传递给处理函数执行
for handler in self.__handlers[event.type_]:
handler(event)
# ----------------------------------------------------------------------
def Start(self):
"""启动"""
# 将事件管理器设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start()
# ----------------------------------------------------------------------
def Stop(self):
"""停止"""
# 将事件管理器设为停止
self.__active = False
# 等待事件处理线程退出
self.__thread.join()
# ----------------------------------------------------------------------
def AddEventListener(self, type_, handler):
"""绑定事件和监听器处理函数"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type_]
except KeyError:
handlerList = []
self.__handlers[type_] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
# ----------------------------------------------------------------------
def RemoveEventListener(self, type_, handler):
"""移除监听器的处理函数"""
# 读者自己试着实现
# ----------------------------------------------------------------------
def SendEvent(self, event):
"""发送事件,向事件队列中存入事件"""
self.__eventQueue.put(event)
########################################################################
"""事件对象"""
class Event:
def __init__(self, type_=None):
self.type_ = type_ # 事件类型
self.dict = {} # 字典用于保存具体的事件数据
wechatdemo.py
# -------------------------------------------------------------------
# encoding: UTF-8
import sys
from datetime import datetime
from threading import *
from EventManager import *
# 事件名称 新文章
EVENT_ARTICAL = "Event_Artical"
# 事件源 公众号
class PublicAccounts:
def __init__(self, eventManager):
self.__eventManager = eventManager
def WriteNewArtical(self):
# 事件对象,写了新文章
event = Event(type_=EVENT_ARTICAL)
event.dict["artical"] = u'如何写出更优雅的代码\n'
# 发送事件
self.__eventManager.SendEvent(event)
print
u'公众号发送新文章\n'
# 监听器 订阅者
class Listener:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章' % self.__username)
print(u'正在阅读新文章内容:%s' % event.dict["artical"])
"""测试函数"""
# --------------------------------------------------------------------
def test():
listner1 = Listener("thinkroom") # 订阅者1
listner2 = Listener("steve") # 订阅者2
eventManager = EventManager()
# 绑定事件和监听器响应函数(新文章)
eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
eventManager.Start()
publicAcc = PublicAccounts(eventManager)
timer = Timer(2, publicAcc.WriteNewArtical)
timer.start()
if __name__ == '__main__':
test()
第二个获取价格的例子
from queue import Queue, Empty
from threading import Thread
from collections import defaultdict
import random
import time
class Event(object):
def __init__(self, _type: str, data=None):
self.type = _type
self.data = data
class EventEngine(object):
def __init__(self):
self._queue = Queue()
self._thread = Thread(target=self._run)
self._active = False # 开关,控制线程是否启动.
self._handlers = defaultdict(list) # {'click': []}
def _run(self):
while self._active:
try:
event = self._queue.get(block=True, timeout=1)
# 处理
print(event)
self._process(event)
except Empty:
print("事件引擎的队列为空。。。")
def start(self):
"""
开启事件引擎
:return:
"""
self._active = True
self._thread.start()
def stop(self):
"""
停止事件引擎.
"""
self._active = False
self._thread.join()
def put(self, event: Event):
"""
把事件放进队列.
:param event:
:return:
"""
self._queue.put(event)
def _process(self, event):
if event.type in self._handlers:
for handler in self._handlers[event.type]:
handler(event) #
# [handler(event) for handler in self._handlers[event.type]]
def register(self, type, handler):
"""
# handler 是一个方法.
注册事件引擎,如果有新的事件,会调用这个方法,然后执行你的逻辑.
"""
handler_list = self._handlers[type]
if handler not in handler_list:
handler_list.append(handler)
def unregister(self, type: str, handler):
"""
取消注册事件
"""
handler_list = self._handlers[type]
if handler in handler_list:
handler_list.remove(handler)
if not handler_list:
self._handlers.pop(type)
def on_tick(event):
print("hello tick event ")
print(event.type)
print(event.data)
def on_ticker1(event):
print(f'on ticker 方法。{event.data}')
if __name__ == '__main__':
event_engine = EventEngine() #初始化操作系统.
event_engine.start() # #启动化操作系统.
event_engine.register('tick', on_tick)
event_engine.register('tick', on_ticker1)
while True:
# 循环生产事件.
tick_price = random.randrange(7150, 7210, 1)
ticker = {"bdi": tick_price, "ask": tick_price + 0.1}
event = Event(_type='tick',data=ticker)
event_engine.put(event)
time.sleep(0.25)
