需求
由于Redis的单线程模型,我们需要对Redis进行多实例部署,然后采用一致性哈希进行负载均衡。但是一致性哈希只是解决增加节点的影响范围,不能做到实时的主从自动切换。而我们知道Redis是支持主从同步,同时提供了sentinel支持主从自动切换的。能不能在一致性哈希的基础上对每个节点增加主从自动切换呢,并且这一切都对客户端透明呢?
设计
实现
Redis的Java客户端一般是Jedis,不过它不能同时支持Shareded和Sentinel,只能自己封装一个了。
package me.arganzheng.study.server.dataaccess.cache.jedis;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Hashing;
import redis.clients.util.Pool;
import me.arganzheng.study.server.dataaccess.cache.configuration.MasterConfiguration;
/**
* <pre>
* Jedis不能同时支持Shareded和Sentinel,只能自己封装一个了。
*
* 大部分代码copy至JedisSentinelPool和ShardedJedisPool,把单masterName改成多master,同时把Jedis改成ShardedJedis。
* </pre>
*
* @author zhengzhibin
*
*/
public class ShardedJedisSentinelPool extends Pool<ShardedJedis> {
protected GenericObjectPoolConfig poolConfig;
protected int timeout = Protocol.DEFAULT_TIMEOUT;
protected Set<MasterListener> masterListeners = new HashSet<MasterListener>();
private volatile Map<String, MasterInfo> currentHostMastersMap = new HashMap<String, MasterInfo>();
protected Logger log = Logger.getLogger(getClass().getName());
public ShardedJedisSentinelPool(List<MasterConfiguration> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig) {
this.poolConfig = poolConfig;
initSentinels(sentinels, masters);
initPool();
}
public void destroy() {
for (MasterListener m : masterListeners) {
m.shutdown();
}
super.destroy();
}
public List<MasterInfo> getCurrentHostMaster() {
List<MasterInfo> ml = new ArrayList<MasterInfo>();
for (MasterInfo m : currentHostMastersMap.values()) {
ml.add(m);
}
return ml;
}
public Map<String, MasterInfo> getCurrentHostMastersMap() {
return currentHostMastersMap;
}
private void initPool() {
log.info("Creating JedisPool for masters: " + currentHostMastersMap);
List<JedisShardInfo> shardMasters = makeShardInfoList(getCurrentHostMaster().toArray(new MasterInfo[0]));
initPool(poolConfig, new ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, null));
log.info("Created JedisPool for masters: " + currentHostMastersMap);
}
/**
* 对变化的master重新初始化pool。
*
* @param masterAddr
*/
private void reinitPool(String masterName, HostAndPort masterAddr) {
MasterInfo oldMaster = currentHostMastersMap.get(masterName);
if (oldMaster == null) { // new added master, should not happen!
log.warning("New added master=" + masterName + ", should not happend!");
} else if (oldMaster.getAddress().equals(masterAddr)) { // 注意必须判断这个,因为每个sentinel都会通知一次,所以这个函数会被调用多次。
log.info(masterName + " already exist on addrest " + masterAddr + ", ignore reinitPool for it!");
} else { // swith master
log.info("Recreated JedisPool for master=" + masterName + " at " + masterAddr);
MasterInfo masterInfo = new MasterInfo();
masterInfo.setAddress(masterAddr);
masterInfo.setName(masterName);
masterInfo.setPassword(oldMaster.getPassword());
masterInfo.setTimeout(oldMaster.getTimeout());
masterInfo.setWeight(oldMaster.getWeight());
List<JedisShardInfo> shardMasters = makeShardInfoList(masterInfo);
initPool(poolConfig, new ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, null));
currentHostMastersMap.put(masterName, masterInfo);
log.info("masters: " + currentHostMastersMap);
}
}
private List<JedisShardInfo> makeShardInfoList(MasterInfo...masterInfos) {
List<JedisShardInfo> shardMasters = new ArrayList<JedisShardInfo>();
for (MasterInfo master : masterInfos) {
HostAndPort masterAddr = master.getAddress();
JedisShardInfo shardInfo =
new JedisShardInfo(masterAddr.getHost(), masterAddr.getPort(), master.getTimeout(),
master.getWeight());
shardInfo.setPassword(master.getPassword());
shardMasters.add(shardInfo);
}
return shardMasters;
}
/**
* 初始化所有的shardedMasters
*
* @param sentinels
* @param masterNames
* @return
*/
private void initSentinels(Set<String> sentinels, final List<MasterConfiguration> masterConfigs) {
log.info("Trying to find all shared masters from available Sentinels: " + sentinels);
List<String> masterNames = new ArrayList<String>();
for (MasterConfiguration masterConfig : masterConfigs) {
String masterName = masterConfig.getName();
HostAndPort masterAddress = null;
boolean running = true;
outer: while (running) {
log.info("Trying to find master " + masterName + " from available Sentinels: " + sentinels);
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
jedis = new Jedis(hap.getHost(), hap.getPort());
MasterInfo masterInfo = currentHostMastersMap.get(masterName);
if (masterInfo == null) { // not yes
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: "
+ hap + ".");
continue;
}
masterAddress = toHostAndPort(masterAddr);
log.fine("Found Redis master at " + masterAddress);
masterInfo = new MasterInfo();
masterInfo.setName(masterName);
masterInfo.setPassword(masterConfig.getPassword());
masterInfo.setTimeout(masterConfig.getTimeout());
masterInfo.setWeight(masterConfig.getWeight());
masterInfo.setAddress(masterAddress);
currentHostMastersMap.put(masterName, masterInfo);
masterNames.add(masterName);
log.info("Redis master " + masterName + " running at " + masterAddress);
break outer;
} else {
log.info("Redis master " + masterName + " aready running at " + masterInfo.getAddress());
}
} catch (JedisConnectionException e) {
log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
try {
log.severe("All sentinels down, cannot determine where is " + masterName
+ " master is running... sleeping 1000ms.");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
log.info("Starting Sentinel listeners...");
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
MasterListener masterListener = new MasterListener(masterNames, hap.getHost(), hap.getPort());
masterListeners.add(masterListener);
masterListener.start();
}
}
private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
String host = getMasterAddrByNameResult.get(0);
int port = Integer.parseInt(getMasterAddrByNameResult.get(1));
return new HostAndPort(host, port);
}
protected class JedisPubSubAdapter extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
}
@Override
public void onPMessage(String pattern, String channel, String message) {
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
}
protected class MasterListener extends Thread {
protected List<String> masterNames;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);
protected MasterListener() {
}
public MasterListener(List<String> masterNames, String host, int port) {
this.masterNames = masterNames;
this.host = host;
this.port = port;
}
public MasterListener(List<String> masterNames, String host, int port, long subscribeRetryWaitTimeMillis) {
this(masterNames, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
public void run() {
running.set(true);
while (running.get()) {
j = new Jedis(host, port);
try {
j.subscribe(new JedisPubSubAdapter() {
@Override
public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
String masterName = switchMasterMsg[0];
int index = masterNames.indexOf(masterName);
if (index >= 0) {
HostAndPort newHostMaster =
toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
reinitPool(masterName, newHostMaster);
} else {
log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0]
+ ", our master names are " + masterNames);
}
} else {
log.severe("Invalid message received on Sentinel " + host + ":" + port
+ " on channel +switch-master: " + message);
}
}
}, "+switch-master");
} catch (JedisConnectionException e) {
if (running.get()) {
log.severe("Lost connection to Sentinel at " + host + ":" + port
+ ". Sleeping 5000ms and retrying.");
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
} else {
log.fine("Unsubscribing from Sentinel at " + host + ":" + port);
}
}
}
}
public void shutdown() {
try {
log.fine("Shutting down listener on " + host + ":" + port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
j.disconnect();
} catch (Exception e) {
log.severe("Caught exception while shutting down: " + e.getMessage());
}
}
}
/**
* PoolableObjectFactory custom impl.
*/
private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern;
public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
this.shards = shards;
this.algo = algo;
this.keyTagPattern = keyTagPattern;
}
@Override
public PooledObject<ShardedJedis> makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
return new DefaultPooledObject<ShardedJedis>(jedis);
}
@Override
public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws Exception {
final ShardedJedis shardedJedis = pooledShardedJedis.getObject();
for (Jedis jedis : shardedJedis.getAllShards()) {
try {
try {
jedis.quit();
} catch (Exception e) {
// do nothging
}
jedis.disconnect();
} catch (Exception e) {
// do nothging
}
}
}
@Override
public boolean validateObject(PooledObject<ShardedJedis> pooledShardedJedis) {
try {
ShardedJedis jedis = pooledShardedJedis.getObject();
for (Jedis shard : jedis.getAllShards()) {
if (!shard.ping().equals("PONG")) {
return false;
}
}
return true;
} catch (Exception ex) {
return false;
}
}
@Override
public void activateObject(PooledObject<ShardedJedis> p) throws Exception {
}
@Override
public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {
}
}
}
然后我们封装一个CacheManager,对配置的redis集群进行加载和初始化连接池:
package me.arganzheng.study.server.dataaccess.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.io.FileSystemResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.Assert;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import me.arganzheng.study.server.dataaccess.cache.configuration.CacheConfiguration;
import me.arganzheng.study.server.dataaccess.cache.configuration.MasterListConfiguration;
import me.arganzheng.study.server.dataaccess.cache.configuration.RedisConfiguration;
import me.arganzheng.study.server.dataaccess.cache.configuration.ShardConfiguration;
/**
* 对所有的Redis实例进行管理。注意每次加载配置文件都会创建所有链接池,所以这个类应该是单例使用的。最简单的做法就是通过Spring配置注入。
*
* @author zhengzhibin
*
*/
public class CacheManager {
protected final Log logger = LogFactory.getLog(getClass());
public static final String DEFAULT_CONFIG_LOCATION = "redis.xml";
// Redis配置文件信息
private String configLocation = DEFAULT_CONFIG_LOCATION;
private final ConcurrentMap<String, CacheClient> redisClientMap = new ConcurrentHashMap<String, CacheClient>(16);
private Set<String> redisClientNames = new LinkedHashSet<String>(16);
private boolean initialized = false;
public CacheManager(String configLocation) {
this.setConfigLocation(configLocation);
afterPropertiesSet();
}
public CacheManager() {
afterPropertiesSet();
}
public CacheClient getCacheClient(String name) {
return redisClientMap.get(name);
}
public Collection<String> getCacheClientNames() {
return Collections.unmodifiableSet(this.redisClientNames);
}
public synchronized void afterPropertiesSet() {
if (initialized) {
return;
}
Collection<CacheClient> redisClients = loadRedisClient();
// preserve the initial order of the cache names
this.redisClientMap.clear();
this.redisClientNames.clear();
for (CacheClient redisClient : redisClients) {
this.redisClientMap.put(redisClient.getName(), redisClient);
this.redisClientNames.add(redisClient.getName());
}
initialized = true;
logger.info("success initialized! Success loaded cache client name is " + this.getCacheClientNames());
}
/**
* <pre>
* 从配置文件中加载和构建RedisClient。
*
* </pre>
*
* @return
*/
protected Collection<CacheClient> loadRedisClient() {
logger.info("loading CacheClient..");
RedisConfiguration redisConfiguration = loadConfiguration();
Collection<CacheClient> redisClients = new ArrayList<CacheClient>();
for (CacheConfiguration cache : redisConfiguration.getCache()) {
// create redisClient from cache config
JedisPoolConfig poolConfig = cache.getPool();
CacheClient client = new CacheClient();
client.setName(cache.getName());
client.setJedisPoolConfig(poolConfig);
if (cache.getShards() != null // sharded
&& CollectionUtils.isNotEmpty(cache.getShards().getShard())) {
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
for (ShardConfiguration shardConfig : cache.getShards().getShard()) {
JedisShardInfo shardInfo =
new JedisShardInfo(shardConfig.getHost(), shardConfig.getPort(), shardConfig.getTimeout(),
shardConfig.getWeight());
shardInfo.setPassword(shardConfig.getPassword());
shards.add(shardInfo);
}
client.setShardInfos(shards);
} else if (CollectionUtils.isNotEmpty(cache.getSentinels())) { // shardedSentinel
MasterListConfiguration masters = cache.getMasters();
Assert.isTrue(masters != null && CollectionUtils.isNotEmpty(masters.getMaster()),
"masters can not be empty while sentinel!");
client.setMasters(masters.getMaster());
client.setSentinels(cache.getSentinels());
} else {
Assert.isTrue(StringUtils.isNotEmpty(cache.getHost()), "host can not be null for no sharded pool!");
client.setHost(cache.getHost());
if (cache.getPort() > 0) {
client.setPort(cache.getPort());
}
client.setPassword(cache.getPassword());
if (cache.getTimeout() > 0) {
client.setTimeout(cache.getTimeout());
}
}
client.afterPropertiesSet();
redisClients.add(client);
}
logger.info("success load CacheClient from " + configLocation);
return redisClients;
}
private RedisConfiguration loadConfiguration() {
try {
// load config with Spring's ResourceLoader
ResourceLoader resourceLoader = new FileSystemResourceLoader();
Resource config = resourceLoader.getResource(configLocation);
// parse xml config file with JAXB
JAXBContext jaxbContext = JAXBContext.newInstance(RedisConfiguration.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
RedisConfiguration redisConfiguration =
(RedisConfiguration) jaxbUnmarshaller.unmarshal(config.getInputStream());
return redisConfiguration;
} catch (Exception e) {
logger.error("load Configuration error! ConfigLocation=" + configLocation, e);
throw new RuntimeException(e);
}
}
protected final void addCacheClient(CacheClient redisClient) {
this.redisClientMap.put(redisClient.getName(), redisClient);
this.redisClientNames.add(redisClient.getName());
}
public String getConfigLocation() {
return configLocation;
}
public void setConfigLocation(String configLocation) {
this.configLocation = configLocation;
}
}
CacheManager加载的配置文件定义如下:
<redis>
<cache name="datastore">
<host>hk01-xxx-mob24.hk01</host>
<port>6666</port>
<password>xxx</password>
<timeout>5000</timeout>
<pool>
<maxTotal>500</maxTotal>
<maxIdle>50</maxIdle>
<maxWaitMillis>1000</maxWaitMillis>
<testOnBorrow>true</testOnBorrow>
<testOnReturn>false</testOnReturn>
<testWhileIdle>true</testWhileIdle>
</pool>
</cache>
<cache name="content">
<host>hk01-xxx-mob24.hk01</host>
<port>6666</port>
<password>xxx</password>
<timeout>5000</timeout>
<pool>
<maxTotal>500</maxTotal>
<maxIdle>50</maxIdle>
<maxWaitMillis>1000</maxWaitMillis>
<testOnBorrow>true</testOnBorrow>
<testOnReturn>false</testOnReturn>
<testWhileIdle>true</testWhileIdle>
</pool>
</cache>
<cache name="user">
<shards>
<shard>
<host>hk01-xxx-mob24.hk01</host>
<port>6666</port>
<password>xxx</password>
<timeout>5000</timeout>
</shard>
<shard>
<host>hk01-xxx-mob34.hk01</host>
<port>6666</port>
<password>xxx</password>
<timeout>5000</timeout>
</shard>
</shards>
<pool>
<maxTotal>500</maxTotal>
<maxIdle>50</maxIdle>
<maxWaitMillis>1000</maxWaitMillis>
<testOnBorrow>true</testOnBorrow>
<testOnReturn>false</testOnReturn>
<testWhileIdle>true</testWhileIdle>
</pool>
</cache>
</redis>
可以看到一个CacheManager可以加载多个Cache,每个Cache可以有多个分片(shards)。注意到主从配置和切换并没有在这里体现,因为这个是服务端的实现(使用sentinel),对客户端是透明的。
然后对没给Cache的操作进行了封装:
package me.arganzheng.study.server.dataaccess.cache;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.Pool;
import me.arganzheng.study.server.dataaccess.cache.configuration.MasterConfiguration;
import me.arganzheng.study.server.dataaccess.cache.jedis.ShardedJedisSentinelPool;
import me.arganzheng.study.server.dataaccess.cache.serializer.RedisSerializer;
import me.arganzheng.study.server.dataaccess.cache.serializer.StringRedisSerializer;
/**
* <pre>
* 对Jedis进行 如下增强:
*
* 1. 链接池的管理(支持分片,对客户端透明)
* 2. 序列化和反序列化
* 3. 接口适配(比如增加超时时间)
* 4. 监听sentinel,做主从自动切换
*
* 注意:
*
* 1. 这里不支持事务。如果要支持事务,需要保证MUTL和EXCE之间的命令都是通过同一个链接发起。对于链接池方式来说比较麻烦。
* 我们的业务对一致性要求不高,如果确实需要事务,那么考虑使用 脚本 方式。
* 2. Redis3.0会支持集群,可能就不需要客户端一致性hash了。
* </pre>
*
* @author zhengzhibin
*
*/
public class CacheClient {
private static final Log log = LogFactory.getLog(CacheClient.class);
public static final int DEFAULT_PORT = 6379;
public static final int DEFAULT_TIMEOUT = 2000;
// 名称:对应一个Redis实例。目前主要有default, 内容,推荐,推送。
private String name = "default";
private String host;
private int port = DEFAULT_PORT;
private int timeout = DEFAULT_TIMEOUT;
private String password;
private GenericObjectPoolConfig jedisPoolConfig;
private boolean sharded = false;
private boolean sentinel = false;
// 如果不为空,则表示是sharded
private List<JedisShardInfo> shardInfos;
// 如果不为空,则表示是sentinel
private Set<String> sentinels;
// 如果masters个数为1,表示 sentinel && !shareded
private List<MasterConfiguration> masters;
// 1==> !sharded && !sentinel
private JedisPool jedisPool;
// 2==> sharded && !sentinel
private ShardedJedisPool shardedJedisPool;
// 3==> sentinel && !sharded
private JedisSentinelPool jedisSentinelPool;
// 4=> sentinel && sharded
private ShardedJedisSentinelPool shardedJedisSentinelPool;
private boolean initialized = false;
private boolean enableDefaultSerializer = true;
private RedisSerializer<String> stringSerializer = new StringRedisSerializer();
private RedisSerializer<?> defaultSerializer = new StringRedisSerializer();
private RedisSerializer<?> valueSerializer = null;
public CacheClient() {
}
public CacheClient(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<JedisShardInfo> getShardInfos() {
return shardInfos;
}
public void setShardInfos(List<JedisShardInfo> shardInfos) {
this.shardInfos = shardInfos;
sharded = true;
}
public ShardedJedisPool getShardedJedisPool() {
return shardedJedisPool;
}
public void setShardedJedisPool(ShardedJedisPool shardedJedisPool) {
this.shardedJedisPool = shardedJedisPool;
}
public List<MasterConfiguration> getMasters() {
return masters;
}
public void setMasters(List<MasterConfiguration> masters) {
this.masters = masters;
if (masters.size() > 1) {
sharded = true;
}
}
public Set<String> getSentinels() {
return sentinels;
}
public void setSentinels(Set<String> sentinels) {
this.sentinels = sentinels;
sentinel = true;
}
public ShardedJedisSentinelPool getShardedJedisSentinelPool() {
return shardedJedisSentinelPool;
}
public void setShardedJedisSentinelPool(ShardedJedisSentinelPool shardedJedisSentinelPool) {
this.shardedJedisSentinelPool = shardedJedisSentinelPool;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public GenericObjectPoolConfig getJedisPoolConfig() {
return jedisPoolConfig;
}
public void setJedisPoolConfig(GenericObjectPoolConfig jedisPoolConfig) {
this.jedisPoolConfig = jedisPoolConfig;
}
public boolean isEnableDefaultSerializer() {
return enableDefaultSerializer;
}
public void setEnableDefaultSerializer(boolean enableDefaultSerializer) {
this.enableDefaultSerializer = enableDefaultSerializer;
}
public RedisSerializer<?> getValueSerializer() {
return valueSerializer;
}
public void setValueSerializer(RedisSerializer<?> valueSerializer) {
this.valueSerializer = valueSerializer;
}
public void init() {
afterPropertiesSet();
}
public void afterPropertiesSet() {
boolean defaultUsed = false;
if (enableDefaultSerializer) {
if (valueSerializer == null) {
valueSerializer = defaultSerializer;
defaultUsed = true;
}
}
if (enableDefaultSerializer && defaultUsed) {
Assert.notNull(defaultSerializer, "default serializer null and not all serializers initialized");
}
if (sentinel) {
Assert.isTrue(CollectionUtils.isNotEmpty(sentinels), "sentinels can not be empty while sentinel!");
Assert.isTrue(CollectionUtils.isNotEmpty(masters), "masters can not be empty while sentinel!");
if (sharded) {
shardedJedisSentinelPool = new ShardedJedisSentinelPool(masters, sentinels, jedisPoolConfig);
} else {
MasterConfiguration master = masters.get(0);
jedisSentinelPool =
new JedisSentinelPool(master.getName(), sentinels, jedisPoolConfig, master.getTimeout(),
master.getPassword());
}
} else if (sharded) {
Assert.isTrue(CollectionUtils.isNotEmpty(shardInfos), "ShardInfos can not be empty while sharding!");
shardedJedisPool = new ShardedJedisPool(jedisPoolConfig, shardInfos);
} else {
jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password);
}
initialized = true;
}
protected <T> T execute(final RedisAction<T> action, final String key) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
// sharding要求必须有key。
if (StringUtils.isEmpty(key) && sharded && (shardInfos.size() > 1 | masters.size() > 1)) {
throw new NotSupportedException(
"This operation is not supported in sharded cluster! Key is required for sharded with sharded size > 1");
}
Jedis jedis = null;
ShardedJedis shardedJedis = null;
if (sentinel) {
if (sharded) { // sentinel & sharded
shardedJedis = shardedJedisSentinelPool.getResource();
jedis = shardedJedis.getShard(key);
} else {
jedis = jedisSentinelPool.getResource();
}
} else if (sharded) {
shardedJedis = shardedJedisPool.getResource();
if (StringUtils.isEmpty(key)) {
Collection<Jedis> c = shardedJedis.getAllShards();
for (Jedis e : c) {
jedis = e;
break;
}
} else {
jedis = shardedJedis.getShard(key);
}
} else {
jedis = jedisPool.getResource();
}
if (jedis == null) {
throw new RuntimeException("can't get Jedis from pool");
}
T result = null;
boolean isBroken = false;
try {
result = action.doInRedis(jedis);
} catch (JedisConnectionException e) {
log.error("ConnectionException occur while doInRedis!", e);
isBroken = true;
} catch (JedisDataException de) {
log.error("JedisDataException occur while doInRedis!", de);
throw de;
} finally {
if (sentinel) {
if (sharded) {
returnResource(shardedJedisSentinelPool, shardedJedis, isBroken);
} else {
returnResource(jedisSentinelPool, jedis, isBroken);
}
} else if (sharded) {
returnResource(shardedJedisPool, shardedJedis, isBroken);
} else {
returnResource(jedisPool, jedis, isBroken);
}
}
return result;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void returnResource(Pool pool, Object jedis, boolean isBroken) {
if (isBroken) {
pool.returnBrokenResource(jedis);
} else {
try {
pool.returnResource(jedis);
} catch (Exception ex) {
pool.returnBrokenResource(jedis);
}
}
}
public <T> void set(final String key, final String value) {
set(key, value, 0);
}
public void set(final String key, final String value, final int expiration) {
set(key, value, expiration, stringSerializer);
}
public <T> void set(final String key, final T value, RedisSerializer<T> serializer) {
set(key, value, 0, serializer);
}
public <T> void set(final String key, final T value, final int expiration, RedisSerializer<T> serializer) {
final byte[] keyBytes = serializerKey(key);
final byte[] valueBytes = serializer.serialize(value);
execute(new RedisAction<Object>() {
public Object doInRedis(Jedis jedis) {
if (expiration > 0) {
jedis.setex(keyBytes, expiration, valueBytes);
} else {
jedis.set(keyBytes, valueBytes);
}
return null;
}
}, key);
}
/**
* 返回true如果是自己设置的key,否则返回false
*/
public boolean setIfAbsent(final String key, final String value) {
return setIfAbsent(key, value, 0);
}
/**
* 返回true如果是自己设置的key,否则返回false
*/
public boolean setIfAbsent(final String key, final String value, final int time) {
String ret = execute(new RedisAction<String>() {
public String doInRedis(Jedis jedis) {
if (time > 0) {
return jedis.set(key, value, "NX", "EX", time);
} else {
return jedis.set(key, value, "NX");
}
}
}, key);
if ("OK".equals(ret)) {
return true;
}
return false;
}
public void mset(final String...keysvalues) {
execute(new RedisAction<String>() {
public String doInRedis(Jedis jedis) {
return jedis.mset(keysvalues);
}
}, null);
}
public List<String> mget(final String...keys) {
return execute(new RedisAction<List<String>>() {
public List<String> doInRedis(Jedis jedis) {
return jedis.mget(keys);
}
}, null);
}
public void del(final String key) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.del(key);
}
}, key);
}
public void del(final String...keys) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.del(keys);
}
}, null);
}
public boolean expire(final String key, final int timeout) {
Long ret = execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.expire(key, timeout);
}
}, key);
if (ret > 0) {
return false;
}
return true;
}
public String get(final String key) {
byte[] bs = getAsRaw(key);
if (bs == null) {
return null;
} else {
return stringSerializer.deserialize(bs);
}
}
public <T> T get(String key, RedisSerializer<T> serializer) {
byte[] bs = getAsRaw(key);
return serializer.deserialize(bs);
}
public Integer getInt(final String key) {
String s = get(key);
if (s == null) {
return null;
} else {
try {
return Integer.parseInt(s);
} catch (NumberFormatException e) {
return null;
}
}
}
public String getAndSet(final String key, final String newValue) {
return execute(new RedisAction<String>() {
public String doInRedis(Jedis jedis) {
return jedis.getSet(key, newValue);
}
}, key);
}
public void sadd(final String key, final String...members) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.sadd(key, members);
}
}, key);
}
public void srem(final String key, final String...members) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.srem(key, members);
}
}, key);
}
public Long incr(final String key) {
return execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.incr(key);
}
}, key);
}
public void hincrBy(final String key, final String field, final long increment) {
execute(new RedisAction<Long>() {
@Override
public Long doInRedis(Jedis jedis) {
return jedis.hincrBy(key, field, increment);
}
}, key);
}
public void hset(final String key, final String field, final String value) {
execute(new RedisAction<Long>() {
@Override
public Long doInRedis(Jedis jedis) {
return jedis.hset(key, field, value);
}
}, key);
}
public void hmset(final String key, final Map<String, String> hash) {
if (hash == null || hash.isEmpty()) {
return;
}
execute(new RedisAction<String>() {
@Override
public String doInRedis(Jedis jedis) {
return jedis.hmset(key, hash);
}
}, key);
}
public void decr(final String key) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.decr(key);
}
}, key);
}
public Set<String> keys(final String pattern) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.keys(pattern);
}
}, null);
}
public Set<String> sinter(final String pattern) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.sinter(pattern);
}
}, null);
}
public Long rpush(final String key, final String...strings) {
return execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.rpush(key, strings);
}
}, key);
}
public void ltrim(final String key, final long start, final long end) {
execute(new RedisAction<String>() {
public String doInRedis(Jedis jedis) {
return jedis.ltrim(key, start, end);
}
}, key);
}
public String hget(final String key, final String field) {
return execute(new RedisAction<String>() {
public String doInRedis(Jedis jedis) {
return jedis.hget(key, field);
}
}, key);
}
public Map<String, String> hgetAll(final String key) {
return execute(new RedisAction<Map<String, String>>() {
public Map<String, String> doInRedis(Jedis jedis) {
return jedis.hgetAll(key);
}
}, key);
}
public void zadd(final String key, final Map<String, Double> scoreMembers) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zadd(key, scoreMembers);
}
}, key);
}
public void zadd(final String key, final double score, final String member) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zadd(key, score, member);
}
}, key);
}
public Double zscore(final String key, final String member) {
return execute(new RedisAction<Double>() {
public Double doInRedis(Jedis jedis) {
return jedis.zscore(key, member);
}
}, key);
}
public Long zcount(final String key, final String min, final String max) {
return execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zcount(key, min, max);
}
}, key);
}
public void zrem(final String key, final String...members) {
execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zrem(key, members);
}
}, key);
}
public Long zremrangeByRank(final String key, final long start, final long end) {
return execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zremrangeByRank(key, start, end);
}
}, key);
}
public Long zremrangeByScore(final String key, final double start, final double end) {
return execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zremrangeByScore(key, start, end);
}
}, key);
}
public Set<String> zrevrangeByScore(final String key, final String max, final String min, final int offset,
final int count) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.zrevrangeByScore(key, max, min, offset, count);
}
}, key);
}
public Set<String> zrevrangeByScore(final String key, final double max, final double min, final int offset,
final int count) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.zrevrangeByScore(key, max, min, offset, count);
}
}, key);
}
public Set<String> zrangeByScore(final String key, final String min, final String max, final int offset,
final int count) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.zrangeByScore(key, min, max, offset, count);
}
}, key);
}
public Set<String> zrangeByScore(final String key, final double min, final double max, final int offset,
final int count) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.zrangeByScore(key, min, max, offset, count);
}
}, key);
}
public Set<String> zrange(final String key, final long start, final long end) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.zrange(key, start, end);
}
}, key);
}
public Set<String> zrevrange(final String key, final long start, final long end) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.zrevrange(key, start, end);
}
}, key);
}
public Long zrank(final String key, final String member) {
return execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zrank(key, member);
}
}, key);
}
public Long zrevrank(final String key, final String member) {
return execute(new RedisAction<Long>() {
public Long doInRedis(Jedis jedis) {
return jedis.zrevrank(key, member);
}
}, key);
}
public Set<String> smembers(final String key) {
return execute(new RedisAction<Set<String>>() {
public Set<String> doInRedis(Jedis jedis) {
return jedis.smembers(key);
}
}, key);
}
public boolean sismember(final String setKey, final String member) {
return execute(new RedisAction<Boolean>() {
@Override
public Boolean doInRedis(Jedis jedis) {
return jedis.sismember(setKey, member);
}
}, setKey);
}
public boolean exists(final String key) {
return execute(new RedisAction<Boolean>() {
public Boolean doInRedis(Jedis jedis) {
return jedis.exists(key);
}
}, key);
}
public Long publish(final String channel, final String message) {
return execute(new RedisAction<Long>() {
@Override
public Long doInRedis(Jedis jedis) {
return jedis.publish(channel, message);
}
}, channel);
}
public Long lpush(final String key, final String...strings) {
return execute(new RedisAction<Long>() {
@Override
public Long doInRedis(Jedis jedis) {
return jedis.lpush(key, strings);
}
}, key);
}
public Double zincrby(final String key, final double score, final String member) {
return execute(new RedisAction<Double>() {
@Override
public Double doInRedis(Jedis jedis) {
return jedis.zincrby(key, score, member);
}
}, key);
}
public List<String> srandmember(final String key, final int count) {
return execute(new RedisAction<List<String>>() {
@Override
public List<String> doInRedis(Jedis jedis) {
return jedis.srandmember(key, count);
}
}, key);
}
public String rpop(final String key) {
return execute(new RedisAction<String>() {
@Override
public String doInRedis(Jedis jedis) {
return jedis.rpop(key);
}
}, key);
}
public void subscribe(final JedisPubSub listener, final String...channels) {
execute(new RedisAction<Object>() {
public Object doInRedis(Jedis jedis) {
jedis.subscribe(listener, channels);
return null;
}
}, null);
}
public List<String> lrange(final String key, final long start, final long end) {
return execute(new RedisAction<List<String>>() {
@Override
public List<String> doInRedis(Jedis jedis) {
return jedis.lrange(key, start, end);
}
}, key);
}
/****************** private method *****************/
public byte[] getAsRaw(final String key) {
return (byte[]) execute(new RedisAction<byte[]>() {
public byte[] doInRedis(Jedis jedis) {
return jedis.get(serializerKey(key));
}
}, key);
}
private byte[] serializerKey(String key) {
return stringSerializer.serialize(key);
}
public boolean isSentinel() {
return sentinel;
}
public void setSentinel(boolean sentinel) {
this.sentinel = sentinel;
}
public JedisSentinelPool getJedisSentinelPool() {
return jedisSentinelPool;
}
public void setJedisSentinelPool(JedisSentinelPool jedisSentinelPool) {
this.jedisSentinelPool = jedisSentinelPool;
}
public boolean isSharded() {
return sharded;
}
public void setSharded(boolean sharded) {
this.sharded = sharded;
}
}
其中RedisAction是个接口:
package me.arganzheng.study.server.dataaccess.cache;
import redis.clients.jedis.Jedis;
public interface RedisAction<T> {
T doInRedis(Jedis jedis);
}
基本上核心的类就这几个。其他的都是一些辅组类,比如配置文件、序列化等。
当然,再次强调一下,sentinel的配置需要自己在Redis集群进行部署,然后在redis.xml中配置sentinel的地址和master-group-name就可以了,sentinel提供了 sentinelGetMasterAddrByName(masterName) 获取当前的master实例地址的方法,拿到之后就可以连接实际的master了。我们封装的Sentinel连接池会启动一个后台线程,订阅sentinel发布的消息,进行自动切换。 关于sentinel的配置,可以参考官方文档。
客户端使用
首先要配置一下redis.xml:
<redis>
<cache name="dataStore">
<host>hkg02-global-mob02.hkg02</host>
<port>6699</port>
<password>xxxx</password>
<timeout>5000</timeout>
<pool>
<maxTotal>500</maxTotal>
<maxIdle>50</maxIdle>
<maxWaitMillis>1000</maxWaitMillis>
<testOnBorrow>true</testOnBorrow>
<testOnReturn>false</testOnReturn>
<testWhileIdle>true</testWhileIdle>
</pool>
</cache>
<cache name="content">
<pool>
<maxTotal>500</maxTotal>
<maxIdle>50</maxIdle>
<maxWaitMillis>1000</maxWaitMillis>
<testOnBorrow>true</testOnBorrow>
<testOnReturn>false</testOnReturn>
<testWhileIdle>true</testWhileIdle>
</pool>
<shards>
<shard>
<host>hkg02-global-mob02.hkg02</host>
<port>6666</port>
<password>xxxx</password>
<timeout>5000</timeout>
</shard>
<shard>
<host>hkg02-global-mob02.hkg02</host>
<port>6667</port>
<password>xxxx</password>
<timeout>5000</timeout>
</shard>
<shard>
<host>hkg02-global-mob02.hkg02</host>
<port>6668</port>
<password>xxxx</password>
<timeout>5000</timeout>
</shard>
<shard>
<host>hkg02-global-mob02.hkg02</host>
<port>6669</port>
<password>xxxx</password>
<timeout>5000</timeout>
</shard>
</shards>
</cache>
<cache name="ecommerce">
<pool>
<maxTotal>500</maxTotal>
<maxIdle>50</maxIdle>
<maxWaitMillis>1000</maxWaitMillis>
<testOnBorrow>true</testOnBorrow>
<testOnReturn>false</testOnReturn>
<testWhileIdle>true</testWhileIdle>
</pool>
<masters>
<master>
<name>ecommerce</name>
<password>xxxx</password>
<timeout>5000</timeout>
</master>
</masters>
<sentinels>
<sentinel>hk01-hao123-mob25.hk01:26379</sentinel>
<sentinel>hk01-hao123-mob34.hk01:26379</sentinel>
<sentinel>hk01-hao123-mob37.hk01:26379</sentinel>
</sentinels>
</cache>
</redis>
配置好redis.xml之后,客户端的使用就非常简单了:
package me.arganzheng.study.server.dataaccess.cache;
import java.util.Collection;
import me.arganzheng.study.server.dataaccess.cache.serializer.JsonRedisSerializer;
public class CacheManagerTest {
public static void main(String[] args) {
CacheManager cacheManager = new CacheManager("file:D:\\redis.xml");
CacheClient client = cacheManager.getCacheClient("user");
client.set("test", "hello world", 10);
String value = client.get("test");
System.out.println(value);
Person argan = new Person();
argan.setName("argan");
argan.setAge(10);
argan.setSex(true);
client.set("argan", argan, new JsonRedisSerializer<Person>(Person.class));
Person p = client.get("argan", new JsonRedisSerializer<Person>(Person.class));
System.out.println(p);
}
static class Person {
private String name;
private int age;
private boolean sex;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public boolean isSex() {
return sex;
}
public void setSex(boolean sex) {
this.sex = sex;
}
}
}
可以看到使用起来还是非常简单透明的。