主动推送的好处

  1. 实时性: 每次数据的变化基本都在毫秒内返回给应用程序
  2. 提升用户体验
  3. 服务端数据过滤:不是轮询拉到数据再进行处理,可以指定推送条件(比如IFTTT)。
  4. 减少服务器压力(开发者轮询)
  5. 易用:只需要一个HTTP请求,就可以不停的接受消息,同时官方提供了SDK,开发者只需要简单实现接口,处理业务数据。

常见应用场景

  1. 订单状态变化
  2. 物流信息
  3. 商品价格/上下架(导购类网站)

示例

  1. 订阅商品变更通知

从polling pull到real time push

在主动推送之前,只能使用client不断polling接口的方式。无法实时获取数据,对服务端也是一个压力。这正是 观察者模式 最适合解决的问题。

实现上有三种方式:

法一: Callback Push(aka WebHooks, user-defined HTTP callbacks)

短链接回调,返回格式我们确定。正如监听者模式中回调接口由通知方确定一样。

  1. fackbook:Realtime Updates
  2. foursquare: Real-Time API
  3. Google calendar: Push Notifications
  4. ebay: eBay ClientAlerts API User Guide
  5. instagram: Real-time Photo Updates

法二: HTTP Streaming

长链接通知,返回格式我们确定。正如监听者模式中回调接口由通知方确定一样。

  1. Twitter: The Streaming APIs
  2. 淘宝: 主动通知

法三:message queue

消息队列,前提是外部用户可以访问。

  1. amazon: Amazon Simple Notification Service。支持多种协议:Amazon SQS, HTTP/S, email, SMS。

说明

1. 关于消息确认与重试

目前几乎所有开放平台的Streaming API都是单向通信的(WebSockets可以支持双向通信,但是只是浏览器。),客户端只负责接收服务端推送的消息。服务端不关心客户端的处理结果,它只关心是不是把消息推送到了客户端(网络原因,不关心业务处理结果),只要客户端成功接收到消息,就认为是下发成功。

对于callback url方式,有些平台会要求消息确认,并且会根据确认消息状态进行一定程度的重试。比如Google calendar Push Notifications

Responding to notifications

To indicate success, you can return any of the following status codes: 200, 201, 202, 204, or 102. If your service returns 500, 502, 503, or 504, the Google Calendar API will retry with exponential backoff.

Every other return status code is considered to be a message failure and the Google Calendar API will not retry this particular notification.

2. 有些平台会进行一定程度消息聚合批量发送,比如 fackbook:Realtime Updates

Facebook aggregates changes and sends batched updates every 5 seconds, or when the number of unsent changes exceeds 1,000. Your endpoint should be set up to handle this level of load, although of course, actual traffic depends on the subscriptions you configure.

3. 很多平台主动推送的只是事件和相应的变更字段,还需要用户使用REST API获取相应的消息。如facebook,只是推送变化的字段:

Here is one example of a callback made for a user object subscription. In it, two users’ field changes are batched together:

{
  "object": "user",
  "entry": [
    {
      "uid": 1335845740,
      "changed_fields": [
        "name",
        "picture"
      ],
      "time": 232323
    }, {
      "uid": 1234,
      "changed_fields": [
        "friends"
      ],
      "time": 232325
    }
  ]
}

Note that this does not include the actual data values (either from before or after the update). To obtain those, your app can request them as normal, subject to the usual privacy restrictions. For data that you have access to at any time, you may wish to query for that data immediately following this callback so that it is ready for when the user returns to your app.

还有ebay Getting Notifications About a Listing

Notes and Next Steps

This section contains notes about the tutorial and suggestions for extending it.

Here are some suggestions for ways you can learn more about the API:

  • Get to know the Platform Notifications chapter in the Trading API Platform Notifications Guide call.
  • Explore other calls and processes: Listing an Item, Buying an Item, or Completing a Sale.
  • Notifications are a great way to get real-time information about events on the eBay site, but you should always use API calls to confirm the data received using notifications. For example, if you have subscribed to AuctionCheckoutComplete notification, you should also set up periodic polling of GetOrders.
  • Your application should respond to notifications with a standard HTTP status of 200 OK. Absent this response to a notification, eBay will attempt redelivery, but after a significant number of unacknowledged notifications, eBay may stop sending notifications to your application.

还有淘宝,很多接口也是需要再次反查的:

3、商品消息格式-2)更新商品消息(changed_filds中的值,会在消息体中保存)

{"notify_item":{
"topic":"item",
"status":"ItemUpdate",
"nick":"sellerNick",
"changed_fields":"title",
"title":"TOP商品测试宝贝"
"num_iid":150345617894,
"user_id":1234455676,
"modified":"2011-08-17 16:01:20"}}

Callback Push VS HTTP Streaming

两者其实基本一样,不同的地方在于消息通道的建立不同。对于 callback URL,由客户端在我们系统注册一个回调地址,当用户感兴趣的事件发生时,系统对这个callback URL发起一个HTTP请求。就像客户端调用我们API一样。客户端和我们都不需要关系和维护这个链接。而对于streaming API,是我们指定几个endPoint(类似于消息队列),客户端链接上去,服务端维持这个链接,当有客户感兴趣的事件发生时,服务端会将消息发送到这个链接上。

简单来说就是服务端与客户端这个消息推送是采用长链接还是短链接的方式。长连接可以避免不断的进行TCP三次握手和四次挥手,对于数据变动比较频繁,性能要求较高的业务场景有一定的优势。另外,从某种程度上来说,比起callback URL来说,与客户端更加的解耦(不需要知道每个客户端的callbackUrl)。但是相应的引入了一些复杂性(详见后面),对客户端来说也需要适应这种编程模式(类似于TCP流数据处理模式):1. 消息的顺序不能保证(类似于UDP方式)、消息可能重复、连接可能堵塞,需要客户端多线程处理。2. 单个链接在数据量大的情况下消息发送会遇到性能瓶颈。并且客户端只有一个链接也不好做负载均衡。所以淘宝目前是支持多连接的,通过id来标识,同一个appKey+同一个user(卖家)目前允许的最大连接数是10个。3. 为了避免数据堵塞,接收数据的线程和处理数据的线程最好分开,以免影响接收速度。

Callback Push

优点

  1. 简单
  2. 技术风险小,实现成本低
  3. facebook、google大公司都采用
  4. 与MP平台消息推送方式一致
  5. 负载均衡容易实现
  6. 客户端有机会返回处理结果(response)

缺点

  1. 需要开发者(订阅者)提供公网域名(这个对于内部ERP系统不是很合适)
  2. 需要开发者(订阅者)搭建web服务(这个对于CS架构的app不是很合适)
  3. 如果数据变更频繁,需要不断的进行TCP三次握手和四次挥手
  4. 安全性相对比较低

Streaming API

优点

  1. 每个app只有1到N(淘宝对于同一个appKey+同一个user目前允许的最大连接数是10个)个固定长链接,可以避免不断的进行TCP三次握手和四次挥手。
  2. 客户端可以是CS或者BS架构,只要支持HTTP协议
  3. twitter等大公司采用
  4. 与淘宝消息推送方式保持一直

缺点

  1. 为了维护HTTP长链接,对客户端和服务端都有要求,实现成本比较大,技术细节目前不是很清楚,有一定的技术风险
  2. 需要定义一些相关状态码,以区分推送情况已经相应的处理。具体参见淘宝 主动推送
  3. 对客户端的编程方式有要求,需要有流数据处理以及多线程处理思想。提供SDK可以简化这个过程
  4. 单向通信,客户端不能返回response信息

Callback Push

步骤

客户端(开发者)

STEP_1 订阅感兴趣的事件以及配置CallbackUrl和VerifyToken

在服务平台后台管理页面,选择需要订阅的 消息类型(topics String 如 trade;refund;item)和 消息状态(status String 如 all;ItemAdd;ItemUpdate;ItemStockChanged)。然后配置你的callbackUrl和verifyToken,用于系统回调通知和请求签名。

STEP_2 订阅验证

当你提交配置信息(添加或者修改callbackUrl和verifyToken)之后,开放平台会发送一个GET请求到你填写的callbackUrl,并且带上三个参数:

  • hub.mode - The string “subscribe” is passed in this parameter
  • hub.challenge - A random string
  • hub.verify_token - The verify_token value you specified when you created the subscription

The endpoint should first verify the hub.verify_token. This ensures that your server knows the request is being made by Facebook and relates to the subscription you just configured.

It should then echo just the hub.challenge value back, which confirms to Facebook that this server is configured to accept callbacks, and prevents denial-of-service (DDoS) vulnerabilities.

注意:这里facebook采用了非常简单校验码机制,而不是复杂的签名。相反,MP平台验证消息也是采用签名。

STEP_3 指定接收用户

因为数据属于用户,必须获取用户(卖家)的授权允许才能接收用户的数据。所以首先要获取卖家授权即取得accessToken,再调用主动通知API告诉系统你需要这个用户的消息并且已经得到这个用户的授权。(facebook等平台默认已经授权的用户可以接收消息)

STEP_4 接收callback POST请求

成功订阅之后,当你感兴趣的事件发生时,开发平台会发送一个POST请求到你配置的callbackUrl。消息格式是application/json

服务端(开放平台)

  1. 发送GET请求验证客户订阅配置

  2. 产生消息:目前TOP的消息种类有:商品、退款、交易、酒店类型的消息。消息产生的机制是 AOP 相应的API或者页面操作(两者有稍微的区别)。 这个需要后台业务系统支持。

    1. 事件埋点:可能的是实现方式:AOP,日志分析
    2. 通知开放平台:可能的实现方式是:回调接口,消息队列。
  3. 推送消息

当某个事件发生时,开发平台会发送一个POST请求到感兴趣的用户配置的callbackUrl。消息格式是application/json。使用该应用的secretKey对POST参数进行签名。(verifyToken真的只是一个订阅时候的验证码)

Streaming API

淘宝Streaming API接入步骤

客户端(开发者)

STEP_1 订阅感兴趣的事件

在服务平台后台管理页面,选择需要订阅的 消息类型(topics String 如 trade;refund;item)和 消息状态(status String 如 all;ItemAdd;ItemUpdate;ItemStockChanged)。相比之下,facebook则直接暴露的是领域模型的变更订阅,用户指定要监听的对象(Object)和字段(Fields)。只有它关心的对象的相关字段变更了才会通知用户。

STEP_2 指定接收用户

因为数据属于用户,必须获取用户(卖家)的授权允许才能接收用户的数据。所以首先要获取卖家授权即取得accessToken,再调用主动通知API告诉系统你需要这个用户的消息并且已经得到这个用户的授权。

STEP_3 发起请求

  1. 与服务端建立长链接

客户端发起一个HTTP POST请求到服务端指定的End Point (服务端可以指定多个,用于不同的用途。Connecting to a streaming endpoint),TOP服务端将保持这个请求24小时。如果在连接过程中出现请求参数错误、服务端处理错误、网络异常,服务端会断开链接。错误信息可以从HTTP的header头中获取。

  1. 处理流数据 Processing streaming data

主动推送返回的结果是json字符串。每个消息之间的分隔符(消息边界)是\r\n。

服务端(开放平台)

  1. 接收用户长连接请求,返回响应结果。(TOP是通过HTTP status code来表示)。接受后将保持24小时。

  2. 定时发送心跳包,防止连接被断开。app需要设置读取超时时间为1分钟左右,如果超过1分钟都没有收到数据,说明网络可能有问题了,app需要重新发起连接。

  3. 产生消息:目前TOP的消息种类有:商品、退款、交易、酒店类型的消息。消息产生的机制是 AOP 相应的API或者页面操作(两者有稍微的区别)。 这个需要后台业务系统支持。

    1. 事件埋点:可能的是实现方式:AOP,日志分析
    2. 通知开放平台:可能的实现方式是:回调接口,消息队列。
  4. 推送消息

    1. 发送的消息会保留24小时
    2. 丢失的消息会发送203状态的消息(丢弃通知消息)给应用
    3. 收到203状态的应用,说明应用断开时,有丢失消息。此时需要通过TOP提供的增量接口 taobao.increment.(items/trades/refund).get查询相应的消息是否存在,并获取之。

TOP Streaming API的一些问题

笔者找了TOP的ISV了解了一下,总体上来说对TOP的Streaming API还是很赞的,不过还有如下一些问题:

  1. 心跳不是很稳定,在通知量不大的时候,有时候收不到服务端心跳信息,不知道是连接挂了还是在正常运行,这时候必须重连(会导致客户端readTimeOut)。按照TOP文档,服务端在1分钟内一定会发送一个消息包,不管是业务包还是心跳包给client。
  2. 目前不支持用户筛选,即配置推送条件,比如当价格超过多少的时候才推送。需要客户端进行过滤,对流量是个伤害。
  3. 淘宝主动通知还有一个BUG,就是A订购了应用,然后我们开启A的主动通知,然后A取消SessionKey的授权,这时候主动通知仍然会推送。但是用户主动取消授权,应用并没有收到通知,原SessionKey的 Deadline还未变,这时候去根据消息调用相应REST API就会出错了。
  4. 另外,淘宝对于入塔的ISV,提供了另一套主动数据推送服务: 天猫数据存储服务。ISV在自己购买的RDS上建立一个数据库,淘宝会把数据推送到指定的表中,表的数据格式也是淘宝规定好的。据说实时性不是很好,但是开发成本小很多,而且不会丢消息。

技术难点

  1. 服务端如何在HTTP上创建长链接 comet
  2. 服务端如何维护这么多的长链接 NIO
  3. 如何简化客户端代码:SDK
  4. 后台业务系统支持(调用API的我们可以控制,但是通过管理页面的不好布点,所以最好统一做在底层IDL中。)

如何维持长链接

问题

HTTP的特点

  1. stateless 服务端没有记录客户端的链接信息
  2. request-response 请求-响应的服务模式,客户端请求一个资源(URL),服务端返回资源数据。服务端不能主动链接客户端,主动发送消息。
  3. HTTP1.1 后才支持keepAlive (allow more than one request per connection),但是由于WebServer的连接数以及实际请求情况,不能也不需要维持太久。Enable keep alive。KeepAlive原来要解决的问题是浏览器请求到HTML页面之后,还需要请求相应的js/CSS/img文件。但是服务端还是不会主动发送数据给客户端。

Enable keep-alive in Apache

#
# KeepAlive: Whether or not to allow persistent connections (more than
# one request per connection). Set to "Off" to deactivate.
#
KeepAlive On
  
#
# MaxKeepAliveRequests: The maximum number of requests to allow
# during a persistent connection. Set to 0 to allow an unlimited amount.
# We recommend you leave this number high, for maximum performance.
#
MaxKeepAliveRequests 100


#
# KeepAliveTimeout: Number of seconds to wait for the next request from the
# same client on the same connection.
#  
KeepAliveTimeout 100

解决方案

  1. Client Side Polling:客户端不断的发送心跳信息给服务端
  2. Comet: requires that the server is able to pause a request and resume or complete it after a potentially long delay(Jetty includes an asynchronous API called Jetty Continuations, which allows a request to be paused and resumed later. tomcat has an equivalent of Jetty’s Continuations called Advanced I/O to support Comet. Advanced I/O is much more of a low-level wrapper around NIO than a good API to facilitate Comet usage. It is poorly documented, and there are few examples of applications using this API.)
  3. WebSockets: HTML5引入技术,双向通讯,事件驱动。但是运行在浏览器,而且目前并不是所有浏览器都支持。另外目前很多服务服务端并不支持(web server规范会说明,比如jetty7就声明支持Java Servlet Specification 3.0和WebSockets, 而tomcat就不支持WebSockets)。一般使用更高层的JS库(Socket.IO)

Streaming API 对服务端的要求(web server选型)

1. thread-per-connection

传统的web server一般是thread-per-connection ,即对于每个请求的HTTP连接,分配一个线程进行处理。但是对于长链接来说,这种模式并不适合。因为需要维持大量的链接,而且这些链接并不是时刻在传输数据(事件推送)。这样会导致大量的空闲线程。我们知道线程的创建和上下文切换是很耗CPU的,而且在没有数据推送的情况下,这些线程就空闲在那里,一样消耗内存和CPU,因为操作系统需要进行线程调度和切换。

2. thread-per-request

现在的web server大多少都使用NIO(non-blocking I/O)来处理大量链接。底层调用了select/epool,可以用固定数量的线程处理不断增加的HTTP连接。当某个链接上有事件触发时(request),该监听线程会被唤醒去处理。在java平台,封装了selector API,有开源的三方库像mina和jetty进行了更高层的封装。Jetty服务器是Java中这类服务器的代表。

  1. tomcat: 支持NIO,支持Servlet3.0,支持Comet,不支持WebSockets
  2. jetty: 支持NIO,支持Servlet3.0,支持Comet,支持WebSockets
  3. Grizzly and Glassfish: 支持NIO,支持Servlet3.0,支持Comet,支持WebSockets
  4. Jboss: 支持NIO,支持Servlet3.0,支持Comet,不支持WebSockets
  5. WebSphere: 支持NIO,支持Servlet3.0,支持Comet,不支持WebSockets

说明 其实支持NIO和Servlet3就是支持Comet,像Jetty Continuation包其实与jetty容器无关,它会判断如果应用是跑在jetty容器,那么使用Native Jetty API,否则,使用Servlet3.0 API。

Streaming API的实现

Widely, the streaming APIs are implemented using following transport protocols/technologies:

  1. Comet[1]
  2. Websockets[2]

Protocols used:

  1. XMPP[3]
  2. Bayeux[4]
  3. STOMP[5]

More here in a presentation in qcon-sf detailing to how to build a streaming API[6].

References:

[1]. Comet (programming) [2]. WebSocket [3]. XMPP [4]. Bayeux | cometd.org [5]. STOMP [6]. Going Real-time: How to Build a Streaming API

HTTP长链接其他应用

  1. quora 使用http长链接实现 页面 部分/即时 更新 (http://www.tornadoweb.org/en/stable/)

参考文章

  1. 淘宝开放平台技术历程
  2. The Streaming APIs
  3. Reverse Ajax
  4. Known Issues and Best Practices for the Use of Long Polling and Streaming in Bidirectional HTTP
  5. Real-time Web and Streaming APIs
  6. Going Real-time: How to Build a Streaming API
  7. Twitter Streaming API Architecture
  8. Real-time Data Delivery: HTTP Streaming Versus PubSubHubbub