消息队列的好处直接,就是 1. 解耦;2. 削峰;3. 异步化。基本上和缓存一样是居家必备之良药。然而消息队列虽然重要,但是同时其实是蛮重的一个组件。

所以就想能不能先简单的通过Redis来实现消息队列呢?不考虑PubSub、分布式、持久化、事务等复杂的情况。就像JDK的各种Queue一样。答案当然是可以的,因为Redis提供的list数据结构就非常适合做消息队列。

List

List是一个双向链表,支持双向的Pop/Push,江湖规矩一般从左端Push,右端Pop——LPush/RPop,而且还有Blocking的版本BLPop/BRPop,客户端可以阻塞在那直到有消息到来,所有操作都是O(1)的好孩子,可以当Message Queue来用。当多个Client并发阻塞等待,有消息入列时谁先被阻塞谁先被服务。任务队列系统Resque是其典型应用。

还有RPopLPush/ BRPopLPush,弹出来返回给client的同时,把自己又推入另一个list,LLen获取列表的长度。

还有按值进行的操作:LRem(按值删除元素)、LInsert(插在某个值的元素的前后),复杂度是O(N),N是List长度,因为List的值不唯一,所以要遍历全部元素,而Set只要O(log(N))。

按下标进行的操作:下标从0开始,队列从左到右算,下标为负数时则从右到左。

  • LSet ,按下标设置元素值。
  • LIndex,按下标返回元素。
  • LRange,不同于POP直接弹走元素,只是返回列表内一段下标的元素,是分页的最爱。
  • LTrim,限制List的大小,比如只保留最新的20条消息。

复杂度也是O(N),其中LSet的N是List长度,LIndex的N是下标的值,LRange的N是start的值+列出元素的个数,因为是链表而不是数组,所以按下标访问其实要遍历链表,除非下标正好是队头和队尾。LTrim的N是移除元素的个数。

在消息队列中,并没有JMS的ack机制,如果消费者把job给Pop走了又没处理完就死机了怎么办?

  • 解决方法之一是加多一个sorted set,分发的时候同时发到list与sorted set,以分发时间为score,用户把job做完了之后要用ZREM消掉sorted set里的job,并且定时从sorted set中取出超时没有完成的任务,重新放回list。
  • 另一个做法是为每个worker多加一个的list,弹出任务时改用RPopLPush,将job同时放到worker自己的list中,完成时用LREM消掉。如果集群管理(如zookeeper)发现worker已经挂掉,就将worker的list内容重新放回主list。

大概实现如下。

首先定义了一个消息处理接口:

package me.arganzheng.study.message.queue.processor;

/**
 * 消息处理接口。这里不引入MessageConverter的概念,只接收textMessage,一般来说是JSON
 * 
 * @author zhengzhibin
 * 
 */
public interface MessageHandler {
	void onMessage(String message);
}

业务只需要实现这个接口就可以了。

然后定义一个QueueProcessor,处理从队列中获取消息调用业务MessageHandler的骨架逻辑:

package me.arganzheng.study.message.queue.processor;

import java.util.List;
import java.util.concurrent.Executor;

import javax.annotation.PostConstruct;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.util.Assert;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * 消息监听器。
 * 
 * @author zhengzhibin
 * 
 */
public class QueueProcessor {
	public static final Logger logger = Logger.getLogger(QueueProcessor.class);

	private Jedis jedis;

	private JedisPool jedisPool;

	private String host;
	private int port;
	private int timeout;
	private String password;

	private String queueName;

	private MessageHandler messageHandler;

	public void setJedis(Jedis jedis) {
		this.jedis = jedis;
	}

	public void setJedisPool(JedisPool jedisPool) {
		this.jedisPool = jedisPool;
	}

	public void setHost(String host) {
		this.host = host;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public void setTimeout(int timeout) {
		this.timeout = timeout;
	}

	public void setPassword(String password) {
		this.password = password;
	}

	public void setQueueName(String queueName) {
		this.queueName = queueName;
	}

	public void setMessageHandler(MessageHandler messageHandler) {
		this.messageHandler = messageHandler;
	}

	private Executor executor;

	public void setExecutor(Executor executor) {
		this.executor = executor;
	}

	@PostConstruct
	public void init() throws Exception {
		// init不能卡住,影响Spring的启动,需要在新的线程处理。 @see
		// DefaultMessageListenerContainer.AsyncMessageListenerInvoker
		new Thread(new AsyncMessageListener()).start();
	}

	public void destroy() {
		// TODO
	}

	// -------------------------------------------------------------------------
	// Inner classes used as internal adapters
	// -------------------------------------------------------------------------

	/**
	 * Runnable that performs looped.
	 */
	private class AsyncMessageListener implements Runnable {

		@Override
		public void run() {
			while (true) {
				try {
					process();
				} catch (Exception e) {
					logger.error("exception occur while process.", e);
				}
			}
		}
	}

	/**
	 * Default constructor for convenient dependency injection via setters.
	 */
	public QueueProcessor() {
	}

	public QueueProcessor(final String host, final int port, final int timeout,
			final String password, final String queueName,
			MessageHandler messageProcessor) throws Exception {
		Assert.notNull(host);
		Assert.isTrue(port > 0);
		Assert.notNull(queueName);
		Assert.notNull(messageProcessor);

		this.host = host;
		this.port = port;
		this.timeout = timeout;
		this.password = password;
		this.queueName = queueName;

		this.jedis = createJedis();
		this.messageHandler = messageProcessor;
	}

	private Jedis createJedis() throws Exception {
		final Jedis jedis = new Jedis(this.host, this.port, this.timeout);

		jedis.connect();
		if (null != this.password) {
			jedis.auth(this.password);
		}
		return jedis;
	}

	private Jedis getJedis() throws Exception {
		if (jedisPool != null) {
			return jedisPool.getResource();
		} else if (jedis == null) {
			this.jedis = createJedis();
		}
		return jedis;
	}

	public QueueProcessor(final Jedis jedis, final String queueName,
			MessageHandler messageProcessor) {
		Assert.notNull(jedis);
		Assert.notNull(queueName);
		Assert.notNull(messageProcessor);

		this.jedis = jedis;
		this.messageHandler = messageProcessor;
	}

	public QueueProcessor(final JedisPool jedisPool, final String queueName,
			MessageHandler messageProcessor) {
		Assert.notNull(jedisPool);
		Assert.notNull(queueName);
		Assert.notNull(messageProcessor);

		this.jedisPool = jedisPool;
		this.queueName = queueName;
		this.messageHandler = messageProcessor;
	}

	/**
	 * 同步执行。队列监听器与消息处理器是同一个线程。
	 * 
	 * @param executor
	 * @return
	 * @throws Exception
	 */
	public void process() throws Exception {
		logger.info("Start to process..");
		if (this.executor == null) {
			Jedis jedis = getJedis();
			while (true) {
				// 默认是blocking
				List<String> messages = jedis.blpop(this.timeout, queueName);
				String payload = messages.get(1);
				if (StringUtils.isNotBlank(payload)) {
					messageHandler.onMessage(payload);
				}
			}
		} else {
			process(this.executor);
		}

	}

	/**
	 * 异步执行。消息处理器在executor执行。
	 * 
	 * @param executor
	 * @throws Exception
	 */
	public void process(Executor executor) throws Exception {
		logger.info("Start to process with executor..");

		Jedis jedis = getJedis();
		while (true) {
			List<String> messages = jedis.blpop(this.timeout, queueName);
			final String payload = messages.get(1);
			submitTask(executor, payload);
		}
	}

	private void submitTask(Executor executor, final String payload) {
		if (StringUtils.isNotBlank(payload)) {
			executor.execute(new Runnable() {

				@Override
				public void run() {
					messageHandler.onMessage(payload);
				}
			});
		}
	}

}

可以看到其实我们是简单使用了Redis的List数据结构作为消息队列:List<String> messages = jedis.blpop(this.timeout, queueName);

然后业务要使用的话在Spring中配置一下就可以了:

<bean id="jedisPool" class="redis.clients.jedis.JedisPool"
	destroy-method="destroy">
	<constructor-arg index="0" ref="jedisPoolConfig" />
	<constructor-arg index="1" value="${redis.host}" />
	<constructor-arg index="2" value="${redis.port}" />
	<constructor-arg index="3" value="${redis.timeout}" />
	<constructor-arg index="4" value="${redis.password}" />
</bean>

<!-- registration for message push -->
<bean id="subscribeQueueProcessor"
	class="me.arganzheng.study.message.queue.processor.QueueProcessor"
	destroy-method="destroy">
	<property name="jedisPool" ref="jedisPool" />
	<property name="queueName" value="queue:message:subscribe" />
	<property name="messageHandler" ref="subscribeMessageHandler" />
</bean>

<bean id="subscribeMessageHandler"
	class="me.arganzheng.study.message.queue.processor.handler.SubscribeMessageHandler">
</bean>

其中subscribeMessageHandler定义如下:

package me.arganzheng.study.message.queue.processor.handler;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;

import me.arganzheng.study.message.model.RegistrationInfo;
import me.arganzheng.study.message.service.RegistrationInfoService;
import me.arganzheng.study.message.queue.processor.MessageHandler;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

/**
 * 处理订阅消息
 * 
 * @author zhengzhibin
 * 
 */
public class SubscribeMessageHandler implements MessageHandler {

	private static final Logger logger = Logger.getLogger(SubscribeMessageHandler.class);

	@Autowired
	private RegistrationInfoService registrationInfoService;

	@Override
	public void onMessage(String message) {
		try {
			logger.info("Received subscribed message: " + message);
			Gson gson = new GsonBuilder().create();
			RegistrationInfo regInfo = gson.fromJson(message, RegistrationInfo.class);
			boolean result = registrationInfoService.save(regInfo);
			if (!result) {
				logger.error("save registrationInfo failed! regInfo=" + regInfo);
			}
		} catch (Exception e) {
			logger.error("Handle message failed!message=" + message, e);
		}
	}
}

这是消息消费者的封装。在消息生产者那边,只需要往list里面插入消息就可以了:

public static final String QUEUE_MESSAGE_SUBSCRIBE = "queue:message:subscribe";

@Value("${queueSize}")
private long queueSize = 1000;

public boolean pushToQueue(Map<String, String> registrationInfo) {
    Gson gson = new GsonBuilder().create();
    try {
        final String jsonPayload = gson.toJson(registrationInfo);
        redisUtils.doTask(new JedisTemplate<Long>() {
            @Override
            public Long doJedis(Jedis jedis) {
                Long result = jedis.rpush(QUEUE_MESSAGE_SUBSCRIBE, jsonPayload);
                // 为了避免队列内存溢出,只保留1000个消息
                jedis.ltrim(QUEUE_MESSAGE_SUBSCRIBE, -queueSize, -1);
                return result;
            }
        });
        return true;
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return false;
    }
}

整个实现只花了不到一个小时。成本非常低,满足业务的需要,等后面再考虑选择一个合适的消息中间件。

推荐阅读

  1. The Architecture and Design of a Publish & Subscribe Messaging System Tailored for Big Data Collecting and Analytics - Abstraction Builder