需求

由于Redis的单线程模型,我们需要对Redis进行多实例部署,然后采用一致性哈希进行负载均衡。但是一致性哈希只是解决增加节点的影响范围,不能做到实时的主从自动切换。而我们知道Redis是支持主从同步,同时提供了sentinel支持主从自动切换的。能不能在一致性哈希的基础上对每个节点增加主从自动切换呢,并且这一切都对客户端透明呢?

设计

redis-ha

实现

Redis的Java客户端一般是Jedis,不过它不能同时支持Shareded和Sentinel,只能自己封装一个了。

package me.arganzheng.study.server.dataaccess.cache.jedis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.regex.Pattern;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Hashing;
import redis.clients.util.Pool;

import me.arganzheng.study.server.dataaccess.cache.configuration.MasterConfiguration;

/**
 * <pre>
 * Jedis不能同时支持Shareded和Sentinel,只能自己封装一个了。
 * 
 * 大部分代码copy至JedisSentinelPool和ShardedJedisPool,把单masterName改成多master,同时把Jedis改成ShardedJedis。
 * </pre>
 * 
 * @author zhengzhibin
 * 
 */
public class ShardedJedisSentinelPool extends Pool<ShardedJedis> {

    protected GenericObjectPoolConfig poolConfig;

    protected int timeout = Protocol.DEFAULT_TIMEOUT;

    protected Set<MasterListener> masterListeners = new HashSet<MasterListener>();

    private volatile Map<String, MasterInfo> currentHostMastersMap = new HashMap<String, MasterInfo>();

    protected Logger log = Logger.getLogger(getClass().getName());

    public ShardedJedisSentinelPool(List<MasterConfiguration> masters, Set<String> sentinels,
            final GenericObjectPoolConfig poolConfig) {
        this.poolConfig = poolConfig;

        initSentinels(sentinels, masters);
        initPool();
    }

    public void destroy() {
        for (MasterListener m : masterListeners) {
            m.shutdown();
        }

        super.destroy();
    }

    public List<MasterInfo> getCurrentHostMaster() {
        List<MasterInfo> ml = new ArrayList<MasterInfo>();
        for (MasterInfo m : currentHostMastersMap.values()) {
            ml.add(m);
        }
        return ml;
    }

    public Map<String, MasterInfo> getCurrentHostMastersMap() {
        return currentHostMastersMap;
    }

    private void initPool() {
        log.info("Creating JedisPool for masters: " + currentHostMastersMap);

        List<JedisShardInfo> shardMasters = makeShardInfoList(getCurrentHostMaster().toArray(new MasterInfo[0]));
        initPool(poolConfig, new ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, null));
        log.info("Created JedisPool for masters: " + currentHostMastersMap);
    }

    /**
     * 对变化的master重新初始化pool。
     * 
     * @param masterAddr
     */
    private void reinitPool(String masterName, HostAndPort masterAddr) {
        MasterInfo oldMaster = currentHostMastersMap.get(masterName);
        if (oldMaster == null) { // new added master, should not happen!
            log.warning("New added master=" + masterName + ", should not happend!");
        } else if (oldMaster.getAddress().equals(masterAddr)) { // 注意必须判断这个,因为每个sentinel都会通知一次,所以这个函数会被调用多次。
            log.info(masterName + " already exist on addrest " + masterAddr + ", ignore reinitPool for it!");
        } else { // swith master
            log.info("Recreated JedisPool for master=" + masterName + " at " + masterAddr);

            MasterInfo masterInfo = new MasterInfo();
            masterInfo.setAddress(masterAddr);
            masterInfo.setName(masterName);
            masterInfo.setPassword(oldMaster.getPassword());
            masterInfo.setTimeout(oldMaster.getTimeout());
            masterInfo.setWeight(oldMaster.getWeight());

            List<JedisShardInfo> shardMasters = makeShardInfoList(masterInfo);

            initPool(poolConfig, new ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, null));

            currentHostMastersMap.put(masterName, masterInfo);

            log.info("masters: " + currentHostMastersMap);
        }
    }

    private List<JedisShardInfo> makeShardInfoList(MasterInfo...masterInfos) {
        List<JedisShardInfo> shardMasters = new ArrayList<JedisShardInfo>();
        for (MasterInfo master : masterInfos) {
            HostAndPort masterAddr = master.getAddress();
            JedisShardInfo shardInfo =
                    new JedisShardInfo(masterAddr.getHost(), masterAddr.getPort(), master.getTimeout(),
                            master.getWeight());
            shardInfo.setPassword(master.getPassword());

            shardMasters.add(shardInfo);
        }
        return shardMasters;
    }

    /**
     * 初始化所有的shardedMasters
     * 
     * @param sentinels
     * @param masterNames
     * @return
     */
    private void initSentinels(Set<String> sentinels, final List<MasterConfiguration> masterConfigs) {
        log.info("Trying to find all shared masters from available Sentinels: " + sentinels);

        List<String> masterNames = new ArrayList<String>();
        for (MasterConfiguration masterConfig : masterConfigs) {
            String masterName = masterConfig.getName();

            HostAndPort masterAddress = null;
            boolean running = true;

            outer: while (running) {

                log.info("Trying to find master " + masterName + " from available Sentinels: " + sentinels);

                for (String sentinel : sentinels) {

                    final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));

                    log.fine("Connecting to Sentinel " + hap);

                    Jedis jedis = null;
                    try {
                        jedis = new Jedis(hap.getHost(), hap.getPort());
                        MasterInfo masterInfo = currentHostMastersMap.get(masterName);
                        if (masterInfo == null) { // not yes
                            List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
                            if (masterAddr == null || masterAddr.size() != 2) {
                                log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: "
                                        + hap + ".");
                                continue;
                            }
                            masterAddress = toHostAndPort(masterAddr);
                            log.fine("Found Redis master at " + masterAddress);

                            masterInfo = new MasterInfo();
                            masterInfo.setName(masterName);
                            masterInfo.setPassword(masterConfig.getPassword());
                            masterInfo.setTimeout(masterConfig.getTimeout());
                            masterInfo.setWeight(masterConfig.getWeight());
                            masterInfo.setAddress(masterAddress);

                            currentHostMastersMap.put(masterName, masterInfo);
                            masterNames.add(masterName);
                            log.info("Redis master " + masterName + " running at " + masterAddress);
                            break outer;
                        } else {
                            log.info("Redis master " + masterName + " aready running at " + masterInfo.getAddress());
                        }
                    } catch (JedisConnectionException e) {
                        log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one.");
                    } finally {
                        if (jedis != null) {
                            jedis.close();
                        }
                    }
                }

                try {
                    log.severe("All sentinels down, cannot determine where is " + masterName
                            + " master is running... sleeping 1000ms.");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        log.info("Starting Sentinel listeners...");
        for (String sentinel : sentinels) {
            final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
            MasterListener masterListener = new MasterListener(masterNames, hap.getHost(), hap.getPort());
            masterListeners.add(masterListener);
            masterListener.start();
        }

    }

    private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
        String host = getMasterAddrByNameResult.get(0);
        int port = Integer.parseInt(getMasterAddrByNameResult.get(1));

        return new HostAndPort(host, port);
    }

    protected class JedisPubSubAdapter extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
        }

        @Override
        public void onPMessage(String pattern, String channel, String message) {
        }

        @Override
        public void onPSubscribe(String pattern, int subscribedChannels) {
        }

        @Override
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
        }

        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
        }

        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
        }
    }

    protected class MasterListener extends Thread {

        protected List<String> masterNames;
        protected String host;
        protected int port;
        protected long subscribeRetryWaitTimeMillis = 5000;
        protected Jedis j;
        protected AtomicBoolean running = new AtomicBoolean(false);

        protected MasterListener() {
        }

        public MasterListener(List<String> masterNames, String host, int port) {
            this.masterNames = masterNames;
            this.host = host;
            this.port = port;
        }

        public MasterListener(List<String> masterNames, String host, int port, long subscribeRetryWaitTimeMillis) {
            this(masterNames, host, port);
            this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
        }

        public void run() {

            running.set(true);

            while (running.get()) {

                j = new Jedis(host, port);

                try {
                    j.subscribe(new JedisPubSubAdapter() {
                        @Override
                        public void onMessage(String channel, String message) {
                            log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");

                            String[] switchMasterMsg = message.split(" ");

                            if (switchMasterMsg.length > 3) {
                                String masterName = switchMasterMsg[0];
                                int index = masterNames.indexOf(masterName);
                                if (index >= 0) {
                                    HostAndPort newHostMaster =
                                            toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
                                    reinitPool(masterName, newHostMaster);
                                } else {
                                    log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0]
                                            + ", our master names are " + masterNames);
                                }

                            } else {
                                log.severe("Invalid message received on Sentinel " + host + ":" + port
                                        + " on channel +switch-master: " + message);
                            }
                        }
                    }, "+switch-master");

                } catch (JedisConnectionException e) {

                    if (running.get()) {
                        log.severe("Lost connection to Sentinel at " + host + ":" + port
                                + ". Sleeping 5000ms and retrying.");
                        try {
                            Thread.sleep(subscribeRetryWaitTimeMillis);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                    } else {
                        log.fine("Unsubscribing from Sentinel at " + host + ":" + port);
                    }
                }
            }
        }

        public void shutdown() {
            try {
                log.fine("Shutting down listener on " + host + ":" + port);
                running.set(false);
                // This isn't good, the Jedis object is not thread safe
                j.disconnect();
            } catch (Exception e) {
                log.severe("Caught exception while shutting down: " + e.getMessage());
            }
        }
    }

    /**
     * PoolableObjectFactory custom impl.
     */
    private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
        private List<JedisShardInfo> shards;
        private Hashing algo;
        private Pattern keyTagPattern;

        public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
            this.shards = shards;
            this.algo = algo;
            this.keyTagPattern = keyTagPattern;
        }

        @Override
        public PooledObject<ShardedJedis> makeObject() throws Exception {
            ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
            return new DefaultPooledObject<ShardedJedis>(jedis);
        }

        @Override
        public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws Exception {
            final ShardedJedis shardedJedis = pooledShardedJedis.getObject();
            for (Jedis jedis : shardedJedis.getAllShards()) {
                try {
                    try {
                        jedis.quit();
                    } catch (Exception e) {
                        // do nothging
                    }
                    jedis.disconnect();
                } catch (Exception e) {
                    // do nothging
                }
            }
        }

        @Override
        public boolean validateObject(PooledObject<ShardedJedis> pooledShardedJedis) {
            try {
                ShardedJedis jedis = pooledShardedJedis.getObject();
                for (Jedis shard : jedis.getAllShards()) {
                    if (!shard.ping().equals("PONG")) {
                        return false;
                    }
                }
                return true;
            } catch (Exception ex) {
                return false;
            }
        }

        @Override
        public void activateObject(PooledObject<ShardedJedis> p) throws Exception {

        }

        @Override
        public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {

        }
    }

}

然后我们封装一个CacheManager,对配置的redis集群进行加载和初始化连接池:

package me.arganzheng.study.server.dataaccess.cache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.io.FileSystemResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.Assert;

import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;

import me.arganzheng.study.server.dataaccess.cache.configuration.CacheConfiguration;
import me.arganzheng.study.server.dataaccess.cache.configuration.MasterListConfiguration;
import me.arganzheng.study.server.dataaccess.cache.configuration.RedisConfiguration;
import me.arganzheng.study.server.dataaccess.cache.configuration.ShardConfiguration;

/**
 * 对所有的Redis实例进行管理。注意每次加载配置文件都会创建所有链接池,所以这个类应该是单例使用的。最简单的做法就是通过Spring配置注入。
 * 
 * @author zhengzhibin
 * 
 */
public class CacheManager {

    protected final Log logger = LogFactory.getLog(getClass());

    public static final String DEFAULT_CONFIG_LOCATION = "redis.xml";

    // Redis配置文件信息
    private String configLocation = DEFAULT_CONFIG_LOCATION;

    private final ConcurrentMap<String, CacheClient> redisClientMap = new ConcurrentHashMap<String, CacheClient>(16);
    private Set<String> redisClientNames = new LinkedHashSet<String>(16);

    private boolean initialized = false;

    public CacheManager(String configLocation) {
        this.setConfigLocation(configLocation);
        afterPropertiesSet();
    }

    public CacheManager() {
        afterPropertiesSet();
    }

    public CacheClient getCacheClient(String name) {
        return redisClientMap.get(name);
    }

    public Collection<String> getCacheClientNames() {
        return Collections.unmodifiableSet(this.redisClientNames);
    }

    public synchronized void afterPropertiesSet() {
        if (initialized) {
            return;
        }

        Collection<CacheClient> redisClients = loadRedisClient();

        // preserve the initial order of the cache names
        this.redisClientMap.clear();
        this.redisClientNames.clear();
        for (CacheClient redisClient : redisClients) {
            this.redisClientMap.put(redisClient.getName(), redisClient);
            this.redisClientNames.add(redisClient.getName());
        }

        initialized = true;

        logger.info("success initialized! Success loaded cache client name is " + this.getCacheClientNames());
    }

    /**
     * <pre>
     * 从配置文件中加载和构建RedisClient。
     * 
     * </pre>
     * 
     * @return
     */
    protected Collection<CacheClient> loadRedisClient() {
        logger.info("loading CacheClient..");

        RedisConfiguration redisConfiguration = loadConfiguration();

        Collection<CacheClient> redisClients = new ArrayList<CacheClient>();
        for (CacheConfiguration cache : redisConfiguration.getCache()) {
            // create redisClient from cache config
            JedisPoolConfig poolConfig = cache.getPool();

            CacheClient client = new CacheClient();

            client.setName(cache.getName());

            client.setJedisPoolConfig(poolConfig);

            if (cache.getShards() != null // sharded
                    && CollectionUtils.isNotEmpty(cache.getShards().getShard())) {
                List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
                for (ShardConfiguration shardConfig : cache.getShards().getShard()) {
                    JedisShardInfo shardInfo =
                            new JedisShardInfo(shardConfig.getHost(), shardConfig.getPort(), shardConfig.getTimeout(),
                                    shardConfig.getWeight());
                    shardInfo.setPassword(shardConfig.getPassword());
                    shards.add(shardInfo);
                }
                client.setShardInfos(shards);
            } else if (CollectionUtils.isNotEmpty(cache.getSentinels())) { // shardedSentinel
                MasterListConfiguration masters = cache.getMasters();
                Assert.isTrue(masters != null && CollectionUtils.isNotEmpty(masters.getMaster()),
                        "masters can not be empty while sentinel!");
                client.setMasters(masters.getMaster());
                client.setSentinels(cache.getSentinels());
            } else {
                Assert.isTrue(StringUtils.isNotEmpty(cache.getHost()), "host can not be null for no sharded pool!");

                client.setHost(cache.getHost());

                if (cache.getPort() > 0) {
                    client.setPort(cache.getPort());
                }

                client.setPassword(cache.getPassword());

                if (cache.getTimeout() > 0) {
                    client.setTimeout(cache.getTimeout());
                }
            }

            client.afterPropertiesSet();

            redisClients.add(client);
        }

        logger.info("success load CacheClient from " + configLocation);

        return redisClients;
    }

    private RedisConfiguration loadConfiguration() {
        try {
            // load config with Spring's ResourceLoader
            ResourceLoader resourceLoader = new FileSystemResourceLoader();
            Resource config = resourceLoader.getResource(configLocation);

            // parse xml config file with JAXB
            JAXBContext jaxbContext = JAXBContext.newInstance(RedisConfiguration.class);

            Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
            RedisConfiguration redisConfiguration =
                    (RedisConfiguration) jaxbUnmarshaller.unmarshal(config.getInputStream());
            return redisConfiguration;
        } catch (Exception e) {
            logger.error("load Configuration error! ConfigLocation=" + configLocation, e);
            throw new RuntimeException(e);
        }
    }

    protected final void addCacheClient(CacheClient redisClient) {
        this.redisClientMap.put(redisClient.getName(), redisClient);
        this.redisClientNames.add(redisClient.getName());
    }

    public String getConfigLocation() {
        return configLocation;
    }

    public void setConfigLocation(String configLocation) {
        this.configLocation = configLocation;
    }
}

CacheManager加载的配置文件定义如下:

<redis>
	<cache name="datastore">
		<host>hk01-xxx-mob24.hk01</host>
		<port>6666</port>
		<password>xxx</password>
		<timeout>5000</timeout>
		<pool>
			<maxTotal>500</maxTotal>
			<maxIdle>50</maxIdle>
			<maxWaitMillis>1000</maxWaitMillis>
			<testOnBorrow>true</testOnBorrow>
			<testOnReturn>false</testOnReturn>
			<testWhileIdle>true</testWhileIdle>
		</pool>
	</cache>
	<cache name="content">
		<host>hk01-xxx-mob24.hk01</host>
		<port>6666</port>
		<password>xxx</password>
		<timeout>5000</timeout>
		<pool>
			<maxTotal>500</maxTotal>
			<maxIdle>50</maxIdle>
			<maxWaitMillis>1000</maxWaitMillis>
			<testOnBorrow>true</testOnBorrow>
			<testOnReturn>false</testOnReturn>
			<testWhileIdle>true</testWhileIdle>
		</pool>
	</cache>
	<cache name="user">
		<shards>
			<shard>
				<host>hk01-xxx-mob24.hk01</host>
				<port>6666</port>
				<password>xxx</password>
				<timeout>5000</timeout>
			</shard>
			<shard>
				<host>hk01-xxx-mob34.hk01</host>
				<port>6666</port>
				<password>xxx</password>
				<timeout>5000</timeout>
			</shard>
		</shards>
		<pool>
			<maxTotal>500</maxTotal>
			<maxIdle>50</maxIdle>
			<maxWaitMillis>1000</maxWaitMillis>
			<testOnBorrow>true</testOnBorrow>
			<testOnReturn>false</testOnReturn>
			<testWhileIdle>true</testWhileIdle>
		</pool>
	</cache>
</redis>

可以看到一个CacheManager可以加载多个Cache,每个Cache可以有多个分片(shards)。注意到主从配置和切换并没有在这里体现,因为这个是服务端的实现(使用sentinel),对客户端是透明的。

然后对没给Cache的操作进行了封装:

package me.arganzheng.study.server.dataaccess.cache;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.Pool;

import me.arganzheng.study.server.dataaccess.cache.configuration.MasterConfiguration;
import me.arganzheng.study.server.dataaccess.cache.jedis.ShardedJedisSentinelPool;
import me.arganzheng.study.server.dataaccess.cache.serializer.RedisSerializer;
import me.arganzheng.study.server.dataaccess.cache.serializer.StringRedisSerializer;

/**
 * <pre>
 * 对Jedis进行 如下增强:
 * 
 * 1. 链接池的管理(支持分片,对客户端透明)
 * 2. 序列化和反序列化
 * 3. 接口适配(比如增加超时时间)
 * 4. 监听sentinel,做主从自动切换
 * 
 * 	注意:
 * 	
 * 	1. 这里不支持事务。如果要支持事务,需要保证MUTL和EXCE之间的命令都是通过同一个链接发起。对于链接池方式来说比较麻烦。
 * 		我们的业务对一致性要求不高,如果确实需要事务,那么考虑使用 脚本 方式。
 * 	2. Redis3.0会支持集群,可能就不需要客户端一致性hash了。
 * </pre>
 * 
 * @author zhengzhibin
 * 
 */
public class CacheClient {
    private static final Log log = LogFactory.getLog(CacheClient.class);

    public static final int DEFAULT_PORT = 6379;
    public static final int DEFAULT_TIMEOUT = 2000;

    // 名称:对应一个Redis实例。目前主要有default, 内容,推荐,推送。
    private String name = "default";

    private String host;
    private int port = DEFAULT_PORT;
    private int timeout = DEFAULT_TIMEOUT;
    private String password;

    private GenericObjectPoolConfig jedisPoolConfig;

    private boolean sharded = false;
    private boolean sentinel = false;

    // 如果不为空,则表示是sharded
    private List<JedisShardInfo> shardInfos;
    // 如果不为空,则表示是sentinel
    private Set<String> sentinels;

    // 如果masters个数为1,表示 sentinel && !shareded
    private List<MasterConfiguration> masters;

    // 1==> !sharded && !sentinel
    private JedisPool jedisPool;

    // 2==> sharded && !sentinel
    private ShardedJedisPool shardedJedisPool;

    // 3==> sentinel && !sharded
    private JedisSentinelPool jedisSentinelPool;

    // 4=> sentinel && sharded
    private ShardedJedisSentinelPool shardedJedisSentinelPool;

    private boolean initialized = false;
    private boolean enableDefaultSerializer = true;

    private RedisSerializer<String> stringSerializer = new StringRedisSerializer();
    private RedisSerializer<?> defaultSerializer = new StringRedisSerializer();
    private RedisSerializer<?> valueSerializer = null;

    public CacheClient() {
    }

    public CacheClient(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public List<JedisShardInfo> getShardInfos() {
        return shardInfos;
    }

    public void setShardInfos(List<JedisShardInfo> shardInfos) {
        this.shardInfos = shardInfos;
        sharded = true;
    }

    public ShardedJedisPool getShardedJedisPool() {
        return shardedJedisPool;
    }

    public void setShardedJedisPool(ShardedJedisPool shardedJedisPool) {
        this.shardedJedisPool = shardedJedisPool;
    }

    public List<MasterConfiguration> getMasters() {
        return masters;
    }

    public void setMasters(List<MasterConfiguration> masters) {
        this.masters = masters;
        if (masters.size() > 1) {
            sharded = true;
        }
    }

    public Set<String> getSentinels() {
        return sentinels;
    }

    public void setSentinels(Set<String> sentinels) {
        this.sentinels = sentinels;
        sentinel = true;
    }

    public ShardedJedisSentinelPool getShardedJedisSentinelPool() {
        return shardedJedisSentinelPool;
    }

    public void setShardedJedisSentinelPool(ShardedJedisSentinelPool shardedJedisSentinelPool) {
        this.shardedJedisSentinelPool = shardedJedisSentinelPool;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public GenericObjectPoolConfig getJedisPoolConfig() {
        return jedisPoolConfig;
    }

    public void setJedisPoolConfig(GenericObjectPoolConfig jedisPoolConfig) {
        this.jedisPoolConfig = jedisPoolConfig;
    }

    public boolean isEnableDefaultSerializer() {
        return enableDefaultSerializer;
    }

    public void setEnableDefaultSerializer(boolean enableDefaultSerializer) {
        this.enableDefaultSerializer = enableDefaultSerializer;
    }

    public RedisSerializer<?> getValueSerializer() {
        return valueSerializer;
    }

    public void setValueSerializer(RedisSerializer<?> valueSerializer) {
        this.valueSerializer = valueSerializer;
    }

    public void init() {
        afterPropertiesSet();
    }

    public void afterPropertiesSet() {
        boolean defaultUsed = false;

        if (enableDefaultSerializer) {
            if (valueSerializer == null) {
                valueSerializer = defaultSerializer;
                defaultUsed = true;
            }
        }

        if (enableDefaultSerializer && defaultUsed) {
            Assert.notNull(defaultSerializer, "default serializer null and not all serializers initialized");
        }

        if (sentinel) {
            Assert.isTrue(CollectionUtils.isNotEmpty(sentinels), "sentinels can not be empty while sentinel!");
            Assert.isTrue(CollectionUtils.isNotEmpty(masters), "masters can not be empty while sentinel!");
            if (sharded) {
                shardedJedisSentinelPool = new ShardedJedisSentinelPool(masters, sentinels, jedisPoolConfig);
            } else {
                MasterConfiguration master = masters.get(0);
                jedisSentinelPool =
                        new JedisSentinelPool(master.getName(), sentinels, jedisPoolConfig, master.getTimeout(),
                                master.getPassword());
            }
        } else if (sharded) {
            Assert.isTrue(CollectionUtils.isNotEmpty(shardInfos), "ShardInfos can not be empty while sharding!");
            shardedJedisPool = new ShardedJedisPool(jedisPoolConfig, shardInfos);
        } else {
            jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password);
        }

        initialized = true;
    }

    protected <T> T execute(final RedisAction<T> action, final String key) {
        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(action, "Callback object must not be null");

        // sharding要求必须有key。
        if (StringUtils.isEmpty(key) && sharded && (shardInfos.size() > 1 | masters.size() > 1)) {
            throw new NotSupportedException(
              "This operation is not supported in sharded cluster! Key is required for sharded with sharded size > 1");
        }

        Jedis jedis = null;
        ShardedJedis shardedJedis = null;
        if (sentinel) {
            if (sharded) { // sentinel & sharded
                shardedJedis = shardedJedisSentinelPool.getResource();
                jedis = shardedJedis.getShard(key);
            } else {
                jedis = jedisSentinelPool.getResource();
            }
        } else if (sharded) {
            shardedJedis = shardedJedisPool.getResource();
            if (StringUtils.isEmpty(key)) {
                Collection<Jedis> c = shardedJedis.getAllShards();
                for (Jedis e : c) {
                    jedis = e;
                    break;
                }
            } else {
                jedis = shardedJedis.getShard(key);
            }
        } else {
            jedis = jedisPool.getResource();
        }

        if (jedis == null) {
            throw new RuntimeException("can't get Jedis from pool");
        }

        T result = null;
        boolean isBroken = false;
        try {
            result = action.doInRedis(jedis);
        } catch (JedisConnectionException e) {
            log.error("ConnectionException occur while doInRedis!", e);
            isBroken = true;
        } catch (JedisDataException de) {
            log.error("JedisDataException occur while doInRedis!", de);
            throw de;
        } finally {
            if (sentinel) {
                if (sharded) {
                    returnResource(shardedJedisSentinelPool, shardedJedis, isBroken);
                } else {
                    returnResource(jedisSentinelPool, jedis, isBroken);
                }
            } else if (sharded) {
                returnResource(shardedJedisPool, shardedJedis, isBroken);
            } else {
                returnResource(jedisPool, jedis, isBroken);
            }
        }
        return result;
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void returnResource(Pool pool, Object jedis, boolean isBroken) {
        if (isBroken) {
            pool.returnBrokenResource(jedis);
        } else {
            try {
                pool.returnResource(jedis);
            } catch (Exception ex) {
                pool.returnBrokenResource(jedis);
            }
        }
    }

    public <T> void set(final String key, final String value) {
        set(key, value, 0);
    }

    public void set(final String key, final String value, final int expiration) {
        set(key, value, expiration, stringSerializer);
    }

    public <T> void set(final String key, final T value, RedisSerializer<T> serializer) {
        set(key, value, 0, serializer);
    }

    public <T> void set(final String key, final T value, final int expiration, RedisSerializer<T> serializer) {
        final byte[] keyBytes = serializerKey(key);
        final byte[] valueBytes = serializer.serialize(value);

        execute(new RedisAction<Object>() {
            public Object doInRedis(Jedis jedis) {
                if (expiration > 0) {
                    jedis.setex(keyBytes, expiration, valueBytes);
                } else {
                    jedis.set(keyBytes, valueBytes);
                }
                return null;
            }
        }, key);
    }

    /**
     * 返回true如果是自己设置的key,否则返回false
     */
    public boolean setIfAbsent(final String key, final String value) {
        return setIfAbsent(key, value, 0);
    }

    /**
     * 返回true如果是自己设置的key,否则返回false
     */
    public boolean setIfAbsent(final String key, final String value, final int time) {
        String ret = execute(new RedisAction<String>() {
            public String doInRedis(Jedis jedis) {
                if (time > 0) {
                    return jedis.set(key, value, "NX", "EX", time);
                } else {
                    return jedis.set(key, value, "NX");
                }
            }
        }, key);

        if ("OK".equals(ret)) {
            return true;
        }
        return false;
    }

    public void mset(final String...keysvalues) {
        execute(new RedisAction<String>() {
            public String doInRedis(Jedis jedis) {
                return jedis.mset(keysvalues);
            }
        }, null);
    }

    public List<String> mget(final String...keys) {
        return execute(new RedisAction<List<String>>() {
            public List<String> doInRedis(Jedis jedis) {
                return jedis.mget(keys);
            }
        }, null);
    }

    public void del(final String key) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.del(key);
            }
        }, key);
    }

    public void del(final String...keys) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.del(keys);
            }
        }, null);
    }

    public boolean expire(final String key, final int timeout) {
        Long ret = execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.expire(key, timeout);
            }
        }, key);
        if (ret > 0) {
            return false;
        }
        return true;
    }

    public String get(final String key) {
        byte[] bs = getAsRaw(key);
        if (bs == null) {
            return null;
        } else {
            return stringSerializer.deserialize(bs);
        }
    }

    public <T> T get(String key, RedisSerializer<T> serializer) {
        byte[] bs = getAsRaw(key);
        return serializer.deserialize(bs);
    }

    public Integer getInt(final String key) {
        String s = get(key);
        if (s == null) {
            return null;
        } else {
            try {
                return Integer.parseInt(s);
            } catch (NumberFormatException e) {
                return null;
            }
        }
    }

    public String getAndSet(final String key, final String newValue) {
        return execute(new RedisAction<String>() {
            public String doInRedis(Jedis jedis) {
                return jedis.getSet(key, newValue);
            }
        }, key);
    }

    public void sadd(final String key, final String...members) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.sadd(key, members);
            }
        }, key);
    }

    public void srem(final String key, final String...members) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.srem(key, members);
            }
        }, key);
    }

    public Long incr(final String key) {
        return execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.incr(key);
            }
        }, key);
    }

    public void hincrBy(final String key, final String field, final long increment) {
        execute(new RedisAction<Long>() {

            @Override
            public Long doInRedis(Jedis jedis) {
                return jedis.hincrBy(key, field, increment);
            }
        }, key);
    }

    public void hset(final String key, final String field, final String value) {
        execute(new RedisAction<Long>() {
            @Override
            public Long doInRedis(Jedis jedis) {
                return jedis.hset(key, field, value);
            }
        }, key);
    }

    public void hmset(final String key, final Map<String, String> hash) {
        if (hash == null || hash.isEmpty()) {
            return;
        }
        execute(new RedisAction<String>() {
            @Override
            public String doInRedis(Jedis jedis) {
                return jedis.hmset(key, hash);
            }
        }, key);
    }

    public void decr(final String key) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.decr(key);
            }
        }, key);
    }

    public Set<String> keys(final String pattern) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.keys(pattern);
            }
        }, null);
    }

    public Set<String> sinter(final String pattern) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.sinter(pattern);
            }
        }, null);
    }

    public Long rpush(final String key, final String...strings) {
        return execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.rpush(key, strings);
            }
        }, key);
    }

    public void ltrim(final String key, final long start, final long end) {
        execute(new RedisAction<String>() {
            public String doInRedis(Jedis jedis) {
                return jedis.ltrim(key, start, end);
            }
        }, key);
    }

    public String hget(final String key, final String field) {
        return execute(new RedisAction<String>() {
            public String doInRedis(Jedis jedis) {
                return jedis.hget(key, field);
            }
        }, key);
    }

    public Map<String, String> hgetAll(final String key) {
        return execute(new RedisAction<Map<String, String>>() {
            public Map<String, String> doInRedis(Jedis jedis) {
                return jedis.hgetAll(key);
            }
        }, key);
    }

    public void zadd(final String key, final Map<String, Double> scoreMembers) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zadd(key, scoreMembers);
            }
        }, key);
    }

    public void zadd(final String key, final double score, final String member) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zadd(key, score, member);
            }
        }, key);
    }

    public Double zscore(final String key, final String member) {
        return execute(new RedisAction<Double>() {
            public Double doInRedis(Jedis jedis) {
                return jedis.zscore(key, member);
            }
        }, key);
    }

    public Long zcount(final String key, final String min, final String max) {
        return execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zcount(key, min, max);
            }
        }, key);
    }

    public void zrem(final String key, final String...members) {
        execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zrem(key, members);
            }
        }, key);
    }

    public Long zremrangeByRank(final String key, final long start, final long end) {
        return execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zremrangeByRank(key, start, end);
            }
        }, key);
    }

    public Long zremrangeByScore(final String key, final double start, final double end) {
        return execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zremrangeByScore(key, start, end);
            }
        }, key);
    }

    public Set<String> zrevrangeByScore(final String key, final String max, final String min, final int offset,
            final int count) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.zrevrangeByScore(key, max, min, offset, count);
            }
        }, key);
    }

    public Set<String> zrevrangeByScore(final String key, final double max, final double min, final int offset,
            final int count) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.zrevrangeByScore(key, max, min, offset, count);
            }
        }, key);
    }

    public Set<String> zrangeByScore(final String key, final String min, final String max, final int offset,
            final int count) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.zrangeByScore(key, min, max, offset, count);
            }
        }, key);
    }

    public Set<String> zrangeByScore(final String key, final double min, final double max, final int offset,
            final int count) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.zrangeByScore(key, min, max, offset, count);
            }
        }, key);
    }

    public Set<String> zrange(final String key, final long start, final long end) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.zrange(key, start, end);
            }
        }, key);
    }

    public Set<String> zrevrange(final String key, final long start, final long end) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.zrevrange(key, start, end);
            }
        }, key);
    }

    public Long zrank(final String key, final String member) {
        return execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zrank(key, member);
            }
        }, key);
    }

    public Long zrevrank(final String key, final String member) {
        return execute(new RedisAction<Long>() {
            public Long doInRedis(Jedis jedis) {
                return jedis.zrevrank(key, member);
            }
        }, key);
    }

    public Set<String> smembers(final String key) {
        return execute(new RedisAction<Set<String>>() {
            public Set<String> doInRedis(Jedis jedis) {
                return jedis.smembers(key);
            }
        }, key);
    }

    public boolean sismember(final String setKey, final String member) {
        return execute(new RedisAction<Boolean>() {
            @Override
            public Boolean doInRedis(Jedis jedis) {
                return jedis.sismember(setKey, member);
            }

        }, setKey);
    }

    public boolean exists(final String key) {
        return execute(new RedisAction<Boolean>() {
            public Boolean doInRedis(Jedis jedis) {
                return jedis.exists(key);
            }
        }, key);
    }

    public Long publish(final String channel, final String message) {
        return execute(new RedisAction<Long>() {
            @Override
            public Long doInRedis(Jedis jedis) {
                return jedis.publish(channel, message);
            }
        }, channel);
    }

    public Long lpush(final String key, final String...strings) {
        return execute(new RedisAction<Long>() {
            @Override
            public Long doInRedis(Jedis jedis) {
                return jedis.lpush(key, strings);
            }
        }, key);
    }

    public Double zincrby(final String key, final double score, final String member) {
        return execute(new RedisAction<Double>() {
            @Override
            public Double doInRedis(Jedis jedis) {
                return jedis.zincrby(key, score, member);
            }
        }, key);
    }

    public List<String> srandmember(final String key, final int count) {
        return execute(new RedisAction<List<String>>() {
            @Override
            public List<String> doInRedis(Jedis jedis) {
                return jedis.srandmember(key, count);
            }
        }, key);
    }

    public String rpop(final String key) {
        return execute(new RedisAction<String>() {
            @Override
            public String doInRedis(Jedis jedis) {
                return jedis.rpop(key);
            }
        }, key);
    }

    public void subscribe(final JedisPubSub listener, final String...channels) {
        execute(new RedisAction<Object>() {
            public Object doInRedis(Jedis jedis) {
                jedis.subscribe(listener, channels);
                return null;
            }
        }, null);
    }

    public List<String> lrange(final String key, final long start, final long end) {
        return execute(new RedisAction<List<String>>() {
            @Override
            public List<String> doInRedis(Jedis jedis) {
                return jedis.lrange(key, start, end);
            }
        }, key);
    }

    /****************** private method *****************/
    public byte[] getAsRaw(final String key) {
        return (byte[]) execute(new RedisAction<byte[]>() {

            public byte[] doInRedis(Jedis jedis) {
                return jedis.get(serializerKey(key));
            }
        }, key);
    }

    private byte[] serializerKey(String key) {
        return stringSerializer.serialize(key);
    }

    public boolean isSentinel() {
        return sentinel;
    }

    public void setSentinel(boolean sentinel) {
        this.sentinel = sentinel;
    }

    public JedisSentinelPool getJedisSentinelPool() {
        return jedisSentinelPool;
    }

    public void setJedisSentinelPool(JedisSentinelPool jedisSentinelPool) {
        this.jedisSentinelPool = jedisSentinelPool;
    }

    public boolean isSharded() {
        return sharded;
    }

    public void setSharded(boolean sharded) {
        this.sharded = sharded;
    }

}

其中RedisAction是个接口:

package me.arganzheng.study.server.dataaccess.cache;

import redis.clients.jedis.Jedis;

public interface RedisAction<T> {

	T doInRedis(Jedis jedis);
}

基本上核心的类就这几个。其他的都是一些辅组类,比如配置文件、序列化等。

当然,再次强调一下,sentinel的配置需要自己在Redis集群进行部署,然后在redis.xml中配置sentinel的地址和master-group-name就可以了,sentinel提供了 sentinelGetMasterAddrByName(masterName) 获取当前的master实例地址的方法,拿到之后就可以连接实际的master了。我们封装的Sentinel连接池会启动一个后台线程,订阅sentinel发布的消息,进行自动切换。 关于sentinel的配置,可以参考官方文档。

客户端使用

首先要配置一下redis.xml:

<redis>
	<cache name="dataStore">
		<host>hkg02-global-mob02.hkg02</host>
		<port>6699</port>
		<password>xxxx</password>
		<timeout>5000</timeout>
		<pool>
			<maxTotal>500</maxTotal>
			<maxIdle>50</maxIdle>
			<maxWaitMillis>1000</maxWaitMillis>
			<testOnBorrow>true</testOnBorrow>
			<testOnReturn>false</testOnReturn>
			<testWhileIdle>true</testWhileIdle>
		</pool>
	</cache>
	<cache name="content">
		<pool>
			<maxTotal>500</maxTotal>
			<maxIdle>50</maxIdle>
			<maxWaitMillis>1000</maxWaitMillis>
			<testOnBorrow>true</testOnBorrow>
			<testOnReturn>false</testOnReturn>
			<testWhileIdle>true</testWhileIdle>
		</pool>
		<shards>
			<shard>
				<host>hkg02-global-mob02.hkg02</host>
				<port>6666</port>
				<password>xxxx</password>
				<timeout>5000</timeout>
			</shard>
			<shard>
				<host>hkg02-global-mob02.hkg02</host>
				<port>6667</port>
				<password>xxxx</password>
				<timeout>5000</timeout>
			</shard>
			<shard>
				<host>hkg02-global-mob02.hkg02</host>
				<port>6668</port>
				<password>xxxx</password>
				<timeout>5000</timeout>
			</shard>
			<shard>
				<host>hkg02-global-mob02.hkg02</host>
				<port>6669</port>
				<password>xxxx</password>
				<timeout>5000</timeout>
			</shard>
		</shards>
	</cache>
	<cache name="ecommerce">
		<pool>
			<maxTotal>500</maxTotal>
			<maxIdle>50</maxIdle>
			<maxWaitMillis>1000</maxWaitMillis>
			<testOnBorrow>true</testOnBorrow>
			<testOnReturn>false</testOnReturn>
			<testWhileIdle>true</testWhileIdle>
		</pool>
		<masters>
			<master>
				<name>ecommerce</name>
				<password>xxxx</password>
				<timeout>5000</timeout>
			</master>
		</masters>
		<sentinels>
			<sentinel>hk01-hao123-mob25.hk01:26379</sentinel>
			<sentinel>hk01-hao123-mob34.hk01:26379</sentinel>
			<sentinel>hk01-hao123-mob37.hk01:26379</sentinel>
		</sentinels>
	</cache>
</redis>

配置好redis.xml之后,客户端的使用就非常简单了:

package me.arganzheng.study.server.dataaccess.cache;

import java.util.Collection;

import me.arganzheng.study.server.dataaccess.cache.serializer.JsonRedisSerializer;

public class CacheManagerTest {
	public static void main(String[] args) {

		CacheManager cacheManager = new CacheManager("file:D:\\redis.xml");

		CacheClient client = cacheManager.getCacheClient("user");

		client.set("test", "hello world", 10);
		String value = client.get("test");
		System.out.println(value);

		Person argan = new Person();
		argan.setName("argan");
		argan.setAge(10);
		argan.setSex(true);
		client.set("argan", argan, new JsonRedisSerializer<Person>(Person.class));

		Person p = client.get("argan", new JsonRedisSerializer<Person>(Person.class));
		System.out.println(p);
	}

	static class Person {

		private String name;

		private int age;

		private boolean sex;

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public int getAge() {
			return age;
		}

		public void setAge(int age) {
			this.age = age;
		}

		public boolean isSex() {
			return sex;
		}

		public void setSex(boolean sex) {
			this.sex = sex;
		}

	}

}

可以看到使用起来还是非常简单透明的。

推荐阅读

  1. 某分布式应用实践一致性哈希的一些问题
  2. 分布式缓存的一起问题
  3. Redis Sentinel Documentation
  4. Redis几个认识误区
  5. 新兵训练营系列课程——分布式缓存介绍