举个简单的例子:
有些人喜欢的某个公众号,然后去关注这个公众号,哪天这个公众号发布了篇新的文章,没多久订阅者就会在微信里收到这个公众号推送的新消息,如果感兴趣就打开来阅读。

公众号例子

事件驱动模型可以理解为上面的例子,是设计模式中观察者模式的一种典型应用。除了订阅公众号外,如你关注某人的微博,关注某人的简书,当被关注者发了个新状态或者新文章,你会收到他们新的消息,这些都可以理解为事件驱动模型。

实际上,世间万物各种属性的变化,我们都可以抽象为事件,最直观的是图形界面应用里,如常见的点击、双击、拖动操作,又或者是游戏里的英雄升级了,怪物死亡了等等,都可以视为一个事件发生了。而发送事件的事物称为事件源,对这个事件感兴趣的事物为监听者,事件发生后监听者会收到这个消息,然后做相应的反应。

例如上面公众号例子可以翻译为,监听器(订阅者)监听了(关注了)事件源(公众号),当事件源的发送事件时(公众号发布文章),所有监听该事件的监听器(订阅者)都会接收到消息并作出响应(阅读文章)。

1.公众号为事件源
2.订阅者为事件监听器
3.订阅者关注公众号,相当于监听器监听了事件源
4.公众号发布文章这个动作为发送事件
5.订阅者收到事件后,做出阅读文章的响应动作

第一个公众号发文章的例子 两个py文件

eventengine.py

# encoding: UTF-8
# 系统模块
from queue import Queue, Empty
from threading import *


########################################################################
class EventManager:
    # ----------------------------------------------------------------------
    def __init__(self):
        """初始化事件管理器"""
        # 事件对象列表
        self.__eventQueue = Queue()
        # 事件管理器开关
        self.__active = False
        # 事件处理线程
        self.__thread = Thread(target=self.__Run)

        # 这里的__handlers是一个字典,用来保存对应的事件的响应函数
        # 其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多
        self.__handlers = {}

    # ----------------------------------------------------------------------
    def __Run(self):
        """引擎运行"""
        while self.__active == True:
            try:
                # 获取事件的阻塞时间设为1秒
                event = self.__eventQueue.get(block=True, timeout=1)
                self.__EventProcess(event)
            except Empty:
                pass

    # ----------------------------------------------------------------------
    def __EventProcess(self, event):
        """处理事件"""
        # 检查是否存在对该事件进行监听的处理函数
        if event.type_ in self.__handlers:
            # 若存在,则按顺序将事件传递给处理函数执行
            for handler in self.__handlers[event.type_]:
                handler(event)

    # ----------------------------------------------------------------------
    def Start(self):
        """启动"""
        # 将事件管理器设为启动
        self.__active = True
        # 启动事件处理线程
        self.__thread.start()

    # ----------------------------------------------------------------------
    def Stop(self):
        """停止"""
        # 将事件管理器设为停止
        self.__active = False
        # 等待事件处理线程退出
        self.__thread.join()

    # ----------------------------------------------------------------------
    def AddEventListener(self, type_, handler):
        """绑定事件和监听器处理函数"""
        # 尝试获取该事件类型对应的处理函数列表,若无则创建
        try:
            handlerList = self.__handlers[type_]
        except KeyError:
            handlerList = []

        self.__handlers[type_] = handlerList
        # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
        if handler not in handlerList:
            handlerList.append(handler)

    # ----------------------------------------------------------------------
    def RemoveEventListener(self, type_, handler):
        """移除监听器的处理函数"""
        # 读者自己试着实现

    # ----------------------------------------------------------------------
    def SendEvent(self, event):
        """发送事件,向事件队列中存入事件"""
        self.__eventQueue.put(event)


########################################################################
"""事件对象"""


class Event:
    def __init__(self, type_=None):
        self.type_ = type_  # 事件类型
        self.dict = {}  # 字典用于保存具体的事件数据

wechatdemo.py

# -------------------------------------------------------------------
# encoding: UTF-8
import sys
from datetime import datetime
from threading import *
from EventManager import *

# 事件名称  新文章
EVENT_ARTICAL = "Event_Artical"


# 事件源 公众号
class PublicAccounts:
    def __init__(self, eventManager):
        self.__eventManager = eventManager

    def WriteNewArtical(self):
        # 事件对象,写了新文章
        event = Event(type_=EVENT_ARTICAL)
        event.dict["artical"] = u'如何写出更优雅的代码\n'
        # 发送事件
        self.__eventManager.SendEvent(event)
        print
        u'公众号发送新文章\n'


# 监听器 订阅者
class Listener:
    def __init__(self, username):
        self.__username = username

    # 监听器的处理函数 读文章
    def ReadArtical(self, event):
        print(u'%s 收到新文章' % self.__username)
        print(u'正在阅读新文章内容:%s' % event.dict["artical"])


"""测试函数"""


# --------------------------------------------------------------------
def test():
    listner1 = Listener("thinkroom")  # 订阅者1
    listner2 = Listener("steve")  # 订阅者2

    eventManager = EventManager()

    # 绑定事件和监听器响应函数(新文章)
    eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
    eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
    eventManager.Start()

    publicAcc = PublicAccounts(eventManager)
    timer = Timer(2, publicAcc.WriteNewArtical)
    timer.start()


if __name__ == '__main__':
    test()

第二个获取价格的例子


from queue import Queue, Empty
from threading import Thread
from collections import defaultdict
import random
import time

class Event(object):
    def __init__(self, _type: str, data=None):
        self.type = _type
        self.data = data

class EventEngine(object):
    def __init__(self):
        self._queue = Queue()
        self._thread = Thread(target=self._run)
        self._active = False  # 开关,控制线程是否启动.
        self._handlers = defaultdict(list)  # {'click': []}

    def _run(self):
        while self._active:
            try:
                event = self._queue.get(block=True, timeout=1)
                # 处理
                print(event)
                self._process(event)
            except Empty:
                print("事件引擎的队列为空。。。")


    def start(self):
        """
        开启事件引擎
        :return:
        """
        self._active = True
        self._thread.start()

    def stop(self):
        """
        停止事件引擎.
        """
        self._active = False
        self._thread.join()

    def put(self, event: Event):
        """
        把事件放进队列.
        :param event:
        :return:
        """
        self._queue.put(event)

    def _process(self, event):
        if event.type in self._handlers:
            for handler in self._handlers[event.type]:
                handler(event)  #

            # [handler(event) for handler in self._handlers[event.type]]

    def register(self, type, handler):

        """
        # handler 是一个方法.
        注册事件引擎,如果有新的事件,会调用这个方法,然后执行你的逻辑.
        """
        handler_list = self._handlers[type]
        if handler not in handler_list:
            handler_list.append(handler)

    def unregister(self, type: str, handler):
        """
        取消注册事件
        """
        handler_list = self._handlers[type]

        if handler in handler_list:
            handler_list.remove(handler)

        if not handler_list:
            self._handlers.pop(type)

def on_tick(event):
    print("hello tick event ")
    print(event.type)
    print(event.data)

def on_ticker1(event):
    print(f'on ticker 方法。{event.data}')

if __name__ == '__main__':
    event_engine = EventEngine()  #初始化操作系统.
    event_engine.start() # #启动化操作系统.
    event_engine.register('tick', on_tick)
    event_engine.register('tick', on_ticker1)

    while True:
        # 循环生产事件.
        tick_price = random.randrange(7150, 7210, 1)
        ticker = {"bdi": tick_price, "ask": tick_price + 0.1}
        event = Event(_type='tick',data=ticker)
        event_engine.put(event)
        time.sleep(0.25)