一、在Node的事件循环中,如果某个事件回调函数容易发生堵塞,它就不应该放在Node内部处理,Node应该只负责快速运算和处理返回的结果

二、发布 - 订阅模型(publish-subscribe pattern)

1. 以一个普通的用户注册流程为例。当用户自己注册时,应用程序会在数据库中保存一条新的记录,并发送邮件给该用户。它也许还会记录下注册过程中的一些统计数据,比如整个过程包括了几个步骤、花费了多少时间。

2. 如果用户刚在你的网页上点击提交按钮,系统就马上处理那么多的操作,其实并没有太大意义。比如,发送邮件的流程也许需要花费几秒钟(如果你运气不佳,要花上几分钟)来完成,数据库调用可以等到用户受到欢迎之后再进行操作,统计数据可以从程序的主逻辑独立出去处理。

3. 可以选择生成一条消息,来通知程序的其他部分有新用户注册了,这样的程序也可能是完全运行在另外一台服务器上的。这就是我们所称的发布 - 订阅模型(publish-subscribe pattern)

三、请求 - 回复模型(request-reply pattern)

假设有一个集群的机器运行了Node.js程序,当一台新机器要加入到集群的时候,它发出一条信息来请求配置信息。配置服务器返回的信息包含了新机器整合到 集群中所需要的配置信息列表,这称为请求 - 回复模型(request-reply pattern)

四、消息队列允许程序员发布事件然后继续其他操作,通过进程间通信频道,提高了并发处理的效率,并实现了更高的扩展性

五、RabbitMQ

1. RabbitMQ是一个消息代理,支持高级消息队列协议(AMQP)

2. 它适用的情景有跨服务器的数据交换和同一台服务器上的跨进程通信

3. RabbitMQ使用Erlang语言编写,能够提供集群的高可用性,并且很容易安装和使用

4. 安装RabbitMQ

http://www.feihu1996.cn/?id=501
# 安装Node的AMQP驱动
npm install amqp

5. 发布与订阅

// RabbitMQ 使用标准的 AMQP 协议进行通信
// AMQP 提供了对厂商中立的抽象规范
// 可以提供通用的(不只针对金融行业的)消息中间件服务
// 并且旨在解决不同类型系统间通信的问题

//  建立一个到RabbitMQ消息代理的连接
//  默认情况(依照AMQP协议)是localhost的5672端口
const connection = require('amqp').createConnection()

connection.on('ready', ()=>{
    console.log('Connected to RabbitMQ')

    //  指定 up-and-running 作为 exchange 的名字
    //  让当前程序与运行在同一台服务器上的其他 exchange 隔离开
    //  exchange 是负责接收消息并把它们传递给绑定的队列的实体
    var e = connection.exchange('up-and-running')

    var q = connection.queue('up-and-running-queue')

    q.on('queueDeclareOk', (args)=>{
        console.log('Queue opened')

        //  队列自己并不会做任何操作
        //  它必须绑定到某个 exchange 之后才能进行其他操作
        //  把名为up-and-running-queue的队列添加到名为up-and-running的exchange上
        //  让 exchange 监听所有传给队列的消息(通过 '#' 参数)
        //  可以把#改为其他关键字来过滤消息
        q.bind(e, '#')

        q.on('queueBindOk', ()=>{
            console.log('Queue bound')

            //  当客户端订阅了此队列之后
            //  AMQP库会触发basicConsumeOk事件
            q.on('basicConsumeOk', ()=>{
                console.log('Consumer has subscribed, publishing message.')

                // AMQP的中心思想是发布者永远不知道哪些订阅者连接了
                // 所以需要有一个作为
                // 路由的关键词备用

                //  通过Node
                //  发布一条
                //  hello world消息以及用来过滤的关键词routingKey
                //  在这里,过滤的关键词是什么并没有关系
                //  队列绑定了所有内容(通过 bind('#') 命令)
                e.publish('routingKey', {hello:'world'})
            })
        })

        //  消息传给 exchange 并通过 queue 传输之后
        //  都会调用回调函数
        q.subscribe(function(msg){
            console.log('Message received:')
            console.log(msg)
            connection.end()
        })
    })
})

6. 工作队列

//  如果长时间运行的任务超出了用户的容忍度
//  比如等待一个网页加载时
//  或者是该任务会堵塞整个程序
//  使用队列就很合适

//  用 AMQP 发布长时间运行任务

const connection = require('amqp').createConnection()
const count = 0

connection.on('ready', ()=>{
    console.log('Connected to RabbitMQ')
    
    var e = connection.exchange('up-and-running')
    var q = connection.queue('up-and-running-queue')

    q.on('queueDeclareOk', (args)=>{
        console.log('Queue opened')
       
        q.bind(e, '#')
       
        q.on('queueBindOk', ()=>{
            console.log('Queue bound')

            setInterval(()=>{
                //  每隔 1000 毫秒
                //  往队列发布一条消息
                console.log('Publishing message #' + ++count)
                e.publish('routingKey', {count:count})
            }, 1000)
        })
    })
})    
// 用 AMQP 处理长时间运行任务

const connection = require('amqp').createConnection()

function sleep(milliseconds){
    var start  = new Date().getTime()
    while(new Date().getTime() < start + milliseconds){}
}

connection.on('ready', ()=>{
    console.log('Connected to RabbitMQ')

    var e = connection.exchange('up-and-running')
    var q = connection.queue('up-and-running-queue')    

    q.on('queueDeclareOk', (args)=>{
        console.log('Queue opened')
        
        q.bind(e, '#')

        //  客户端从队列获取消息
        //  睡眠5秒(模仿耗时操作,相当于花了5秒钟处理这条消息)
        //  然后从队列获取下一条消息
        //  并不断重复

        //  发布者每隔1秒发送一条消息到队列
        //  订阅者要花费5秒才能处理一条消息
        //  订阅者会很快就落后于发布者
        //  此时,可以打开若干个窗口,运行若干个客户端
        //  从而可以进一步分散负载
        //  这种部署就称为工作队列
        
        q.subscribe({
            //  {ack:true}参数的作用是通知AMQP等待用户确认
            //  看该消息是否已经处理完成
            ack: true,
        }, (msg)=>{
            console.log('Message received:')
            console.log(msg.count)

            sleep(5000)

            console.log('Processed. Waiting for next message.')

            //  通知AMQP当前消息已经处理完成
            //  应答之后
            //  消息将从队列中移除
            //  同时将其从服务里拿掉
            //  如果一个工作进程在处理某个消息的过程中没有发送反馈死掉了
            //  RabbitMQ代理会把消息发给下一个可用的客户端            
            q.shift()
        })
    })
})