文章目录 一:NoSQL介绍二、Redis介绍三、Redis五大数据类型1、String(字符串类型)2、List(列表)3、Set(集合)4、Hash(哈希,类似 Java里的Map)5、Zset(sorted set:有序集合) 四、Redis三种特殊数据类型1、Geospatial(地理位置)2、Hyperloglog(基数统计)3、BitMaps(位图) 五、Redis事务1、Redis事务介绍2、Redis事务相关操作3、Redis事务错误4、Watch监控 六、Jedis(Java中使用Redis)七、SpringBoot整合Redis1、lettuce和jedis2、基础使用3、关于对象的保存(重写Redis配置类)4、自定义Redis工具类 八、redis.conf 配置详解九、Redis持久化1、RDB(Redis DataBase)快照2、AOF(Append Only File)RDB和AOF的选择3、虚拟内存4、diskstore 十、Redis发布与订阅十一、Redis主从复制1、主从模式2、哨兵模式 十二、Redis缓存穿透和雪崩1、缓存穿透(查不到)2、缓存击穿(量太大,缓存过期)3、缓存雪崩 一:NoSQL介绍
1、概念
NoSQL最常见的解释是“non-relational”, “Not Only SQL”也被很多人接受。NoSQL仅仅是一个概念,泛指非关系型的数据库,区别于关系数据库,它们不保证关系数据的ACID特性。NoSQL是一项全新的数据库革命性运动,其拥护者们提倡运用非关系型的数据存储,相对于铺天盖地的关系型数据库运用,这一概念无疑是一种全新的思维的注入。
2、特点
易扩展 NoSQL数据库种类繁多,但是一个共同的特点都是去掉关系数据库的关系型特性。数据之间无关系,这样就非常容易扩展。无形之间,在架构的层面上带来了可扩展的能力。大数据量,高性能 NoSQL数据库都具有非常高的读写性能,尤其在大数据量下,同样表现优秀。这得益于它的无关系性,数据库的结构简单。一般MySQL使用Query Cache。NoSQL的Cache是记录级的,是一种细粒度的Cache,所以NoSQL在这个层面上来说性能就要高很多。灵活的数据模型 NoSQL无须事先为要存储的数据建立字段,随时可以存储自定义的数据格式。而在关系数据库里,增删字段是一件非常麻烦的事情。如果是非常大数据量的表,增加字段简直就是——个噩梦。这点在大数据量的Web 2.0时代尤其明显高可用 NoSQL在不太影响性能的情况,就可以方便地实现高可用的架构。比如Cassandra、HBase模型,通过复制模型也能实现高可用。3、分类
二、Redis介绍1、概念
Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
2、Redis能做什么?
内存存储和持久化:redis支持异步将内存中的数据写到硬盘上,同时不影响继续服务取最新N个数据的操作,如:可以将最新的10条评论的ID放在Redis的List集合里面发布、订阅消息系统地图信息分析定时器、计数器…3、常用网站
https://redis.io/ 官网 http://·中文网
4、安装
我是在centos系统下用docker安装的,特别方便(安装教程省略,这里讲启动教程)
查看docker镜像 docker images 查看运行进程 docker ps-a 进入redis容器 docker exec -it redis bash 启动redis客户端 redis-cli5、测试redis性能
我是在阿里云上安装的redis
Redis-benchmark是官方自带的Redis性能测试工具,可以有效的测试Redis服务的性能 redis 性能测试工具可选参数如下所示: 测试命令:redis-benchmark -h localhost -p 6379 -c 100 -n 100000 是指 100个并发连接,100000个请求,检测host为localhost 端口为6379的redis服务器性能
结果
====== SET ====== 100000 requests completed in 3.68 seconds # 对集合写入测试 100 parallel clients # 每次请求有100个并发客户端 3 bytes payload # 每次写入3个字节的数据,有效载荷 keep alive: 1 # 保持一个连接,一台服务器来处理这些请求 host configuration "save": 3600 1 300 100 60 10000 host configuration "appendonly": no multi-thread: no Latency by percentile distribution: 0.000% <= 0.815 milliseconds (cumulative count 1) 50.000% <= 2.231 milliseconds (cumulative count 50076) 75.000% <= 2.919 milliseconds (cumulative count 75252) 87.500% <= 3.295 milliseconds (cumulative count 87597) 93.750% <= 3.535 milliseconds (cumulative count 93859) 96.875% <= 4.039 milliseconds (cumulative count 96903) 98.438% <= 4.527 milliseconds (cumulative count 98441) 99.219% <= 4.727 milliseconds (cumulative count 99232) 99.609% <= 4.831 milliseconds (cumulative count 99618) 99.805% <= 4.927 milliseconds (cumulative count 99810) 99.902% <= 5.255 milliseconds (cumulative count 99903) 99.951% <= 7.367 milliseconds (cumulative count 99952) 99.976% <= 7.975 milliseconds (cumulative count 99976) 99.988% <= 8.303 milliseconds (cumulative count 99988) # 所有请求在 8.303 毫秒内完成 27248.14 requests per second # 每秒处理 53248.14 次请求Redis 基本常识
默认16个数据库,类似数组下标从零开始,初始默认使用零号库
Select命令切换数据库
dbsize查看当前数据库的key的数量
keys * 查看数据库所有的key
set key value 设置键值对
get key 通过key获得value
type key 查看key是什么类型
EXPIRE key second 为key设置生存时间,当key过期时(生存时间为 0 ),它会被自动删 除。
ttl name 查看key还有多少秒过期,-1 表示永不过期,-2 表示已过期
Flushdb 清空当前库
Flushall 清空全部的库
Redis是单线程的!
我们首先要明白,Redis很快!官方表示,因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了。
Redis为什么单线程还这么快? 误区1:高性能的服务器一定是多线程的? 误区2:多线程(CPU上下文会切换)一定比单线程效率高。 核心:Redis是将所有的数据全部放在内存中的,所以说使用单线程操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!!!),对于内存系统来说,如果没有上下文切换效率就是最高的。多次读写都是在一个CPU上,在内存情况下,这个就是最佳的方案。
三、Redis五大数据类型 1、String(字符串类型) String是redis最基本的类型,一个key对应一个value。String类型是二进制安全的,意思是redis的string可以包含任何数据,比如jpg图片或者序列化的对象。String类型是redis最基本的数据类型,一个redis中字符串value最多可以是512M。String常用命令说明
String类似的使用场景:value除了是字符串还可以是数字,用途举例:
计数器统计多单位的数量:uid:123666:follow 0粉丝数对象存储缓存 2、List(列表)Redis列表是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)。 它的底层实际是个链表(双向列表)。可以进行双端操作。
list实际上是一个链表,before Node after , left, right 都可以插入值
如果key不存在,则创建新的链表
如果key存在,新增内容
如果移除了所有值,空链表,也代表不存在
在两边插入或者改动值,效率最高!修改中间元素,效率相对较低
应用场景: 消息排队!消息队列(Lpush Rpop),栈(Lpush Lpop)
3、Set(集合)Redis的Set是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。 Redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。 集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。
在微博应用中,可以将一个用户所有的关注人存在一个集合中,将其所有粉丝存在一个集合。Redis还为集合提供了求交集、并集、差集等操作,可以非常方便的实现如共同关注、共同喜好、二度好友等功能,对上面的所有集合操作,你还可以使用不同的命令选择将结果返回给客户端还是存集到一个新的集合中。
4、Hash(哈希,类似 Java里的Map)Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。 Set就是一种简化的Hash,只变动key,而value使用默认值填充。可以将一个Hash表作为一个对象进行存储,表中存放对象的信息。
Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。 存储部分变更的数据,如用户信息等。
5、Zset(sorted set:有序集合)不同的是每个元素都会关联一个double类型的分数(score)。redis正是通过分数来为集合中的成员进行从小到大的排序。 score相同:按字典顺序排序 有序集合的成员是唯一的,但分数(score)却可以重复。
应用案例:
set排序 存储班级成绩表 工资表排序!普通消息,1.重要消息 2.带权重进行判断排行榜应用实现,取Top N测试 四、Redis三种特殊数据类型 1、Geospatial(地理位置)使用经纬度定位地理坐标并用一个有序集合zset保存,所以zset命令也可以使用
六个常用命令
有效经纬度
有效的经度从-180度到180度。有效的纬度从-85.05112878度到85.05112878度。指定单位的参数 unit 必须是以下单位的其中一个:
m 表示单位为米。
km 表示单位为千米。
mi 表示单位为英里。
ft 表示单位为英尺。
关于GEORADIUS的参数
通过georadius就可以完成 附近的人功能withcoord:带上坐标withdist:带上距离,单位与半径单位相同COUNT n : 只显示前n个(按距离递增排序) ----------------georadius--------------------- 127.0.0.1:6379> GEORADIUS china:city 120 30 500 km withcoord withdist # 查询经纬度(120,30)坐标500km半径内的成员 1) 1) "hangzhou" 2) "29.4151" 3) 1) "120.20000249147415" 2) "30.199999888333501" 2) 1) "shanghai" 2) "205.3611" 3) 1) "121.40000134706497" 2) "31.400000253193539" ------------geohash--------------------------- 127.0.0.1:6379> geohash china:city yichang shanghai # 获取成员经纬坐标的geohash表示 1) "wmrjwbr5250" 2) "wtw6ds0y300"GEO没有提供删除成员的命令,但是因为GEO的底层实现是zset,所以可以借用zrem命令实现对地理位置信息的删除.
2、Hyperloglog(基数统计)Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。 花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。 因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。 其底层使用string数据类型
什么是基数?
数据集中不重复的元素的个数。
应用场景:
假如我要统计网页的UV(浏览用户数量,一天内同一个用户多次访问只能算一次),传统的解决方案是使用Set来保存用户id,然后统计Set中的元素数量来获取页面UV。但这种方案只能承载少量用户,一旦用户数量大起来就需要消耗大量的空间来存储用户id。我的目的是统计用户数量而不是保存用户,这简直是个吃力不讨好的方案!而使用Redis的HyperLogLog最多需要12k就可以统计大量的用户数,尽管它大概有0.81%的错误率,但对于统计UV这种不需要很精确的数据是可以忽略不计的。
如果允许容错,那么一定可以使用Hyperloglog !
如果不允许容错,就使用set或者自己的数据类型即可 !
3、BitMaps(位图)什么是位图?
使用位存储,信息状态只有 0 和 1 Bitmap是一串连续的2进制数字(0或1),每一位所在的位置为偏移(offset),在bitmap上可执行AND,OR,XOR,NOT以及其它位操作。
应用场景
签到统计、状态统计
1、概念
Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。 总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。
Redis事务没有隔离级别的概念: 批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行! Redis不保证原子性: Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。
2、Redis事务执行过程
开始事务(multi)命令入队执行事务(exec) 2、Redis事务相关操作1、相关指令操作
watch key1 key2 ... #监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则 事务被打断 ( 类似乐观锁 ) multi # 标记一个事务块的开始( queued ) exec # 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 ) discard # 取消事务,放弃事务块中的所有命令 unwatch # 取消watch对所有key的监控结果: 正常执行
127.0.0.1:6379> multi # 开启事务 OK 127.0.0.1:6379> set k1 v1 # 命令入队 QUEUED 127.0.0.1:6379> set k2 v2 # .. QUEUED 127.0.0.1:6379> get k1 QUEUED 127.0.0.1:6379> set k3 v3 QUEUED 127.0.0.1:6379> keys * QUEUED 127.0.0.1:6379> exec # 事务执行 1) OK 2) OK 3) "v1" 4) OK 5) 1) "k3" 2) "k2" 3) "k1"取消事务(discard)
127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> DISCARD # 放弃事务 OK 127.0.0.1:6379> EXEC (error) ERR EXEC without MULTI # 当前未开启事务 127.0.0.1:6379> get k1 # 被放弃事务中命令并未执行 (nil) 3、Redis事务错误1、代码语法错误(编译时异常)所有的命令都不执行
127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> error k1 # 这是一条语法错误命令 (error) ERR unknown command `error`, with args beginning with: `k1`, # 会报错但是不影响后续命令入队 127.0.0.1:6379> get k2 QUEUED 127.0.0.1:6379> EXEC (error) EXECABORT Transaction discarded because of previous errors. # 执行报错 127.0.0.1:6379> get k1 (nil) # 其他命令并没有被执行2、代码逻辑错误 (运行时异常) 其他命令可以正常执行 >>> 所以不保证事务原子性
127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> INCR k1 # 这条命令逻辑错误(对字符串进行增量) QUEUED 127.0.0.1:6379> get k2 QUEUED 127.0.0.1:6379> exec 1) OK 2) OK 3) (error) ERR value is not an integer or out of range # 运行时报错 4) "v2" # 其他命令正常执行虽然中间有一条命令报错了,但是后面的指令依旧正常执行成功了。 所以说Redis单条指令保证原子性,但是Redis事务不能保证原子性。
4、Watch监控1、悲观锁
悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿到这个数据就会block直到它拿到锁。传统的关系型数据库里面就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在操作之前先上锁。
2、乐观锁
乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁。但是在更新的时候会判断一下再此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量,乐观锁策略:提交版本必须大于记录当前版本才能执行更新。
在redis中使用watch key监控指定数据,相当于乐观锁加锁。
3、测试
1、正常执行
127.0.0.1:6379> set money 100 # 设置余额:100 OK 127.0.0.1:6379> set use 0 # 支出使用:0 OK 127.0.0.1:6379> watch money # 监视money (上锁) OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> DECRBY money 20 QUEUED 127.0.0.1:6379> INCRBY use 20 QUEUED 127.0.0.1:6379> exec # 监视值没有被中途修改,事务正常执行 1) (integer) 80 2) (integer) 202、测试多线程修改值,使用watch可以当做redis的乐观锁操作(相当于getversion)
我们启动另外一个客户端模拟插队线程。
线程1:
127.0.0.1:6379> watch money # money上锁 OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> DECRBY money 20 QUEUED 127.0.0.1:6379> INCRBY use 20 QUEUED 127.0.0.1:6379> # 此时事务并没有执行模拟线程插队,线程2:
127.0.0.1:6379> INCRBY money 500 # 修改了线程一中监视的money (integer) 600回到线程1,执行事务:
127.0.0.1:6379> EXEC # 执行之前,另一个线程修改了我们的值,这个时候就会导致事务执行失败 (nil) # 没有结果,说明事务执行失败 127.0.0.1:6379> get money # 线程2 修改生效 "600" 127.0.0.1:6379> get use # 线程1事务执行失败,数值没有被修改 "0"注意:每次提交执行exec后都会自动释放锁,不管是否成功
六、Jedis(Java中使用Redis)Jedis是Redis官方推荐的Java连接开发工具。要在Java开发中使用好Redis中间件,必须对Jedis熟悉才能写成漂亮的代码
1、测试连通
1、新建一个普通的Maven项目 2、导入redis的依赖!
<!--导入jredis的包--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> </dependency> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.70</version> </dependency>3、编写测试代码
public class TestPing { public static void main(String[] args) { Jedis jedis = new Jedis("192.168.xx.xxx", 6379); String response = jedis.ping(); System.out.println(response); // PONG } }2、常用API
对key操作的命令
public class TestKey { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); System.out.println("清空数据:"+jedis.flushDB()); System.out.println("判断某个键是否存在:"+jedis.exists("username")); System.out.println("新增<'username','kuangshen'>的键值 对:"+jedis.set("username", "kuangshen")); System.out.println("新增<'password','password'>的键值 对:"+jedis.set("password", "password")); System.out.print("系统中所有的键如下:"); Set<String> keys = jedis.keys("*"); System.out.println(keys); System.out.println("删除键password:"+jedis.del("password")); System.out.println("判断键password是否存 在:"+jedis.exists("password")); System.out.println("查看键username所存储的值的类 型:"+jedis.type("username")); System.out.println("随机返回key空间的一个:"+jedis.randomKey()); System.out.println("重命名key:"+jedis.rename("username","name")); System.out.println("取出改后的name:"+jedis.get("name")); System.out.println("按索引查询:"+jedis.select(0)); System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB()); System.out.println("返回当前数据库中key的数目:"+jedis.dbSize()); System.out.println("删除所有数据库中的所有key:"+jedis.flushAll()); } }3、事务
public class TestTX { public static void main(String[] args) { Jedis jedis = new Jedis("39.99.xxx.xx", 6379); JSONObject jsonObject = new JSONObject(); jsonObject.put("hello", "world"); jsonObject.put("name", "kuangshen"); // 开启事务 Transaction multi = jedis.multi(); String result = jsonObject.toJSONString(); // jedis.watch(result) try { multi.set("user1", result); multi.set("user2", result); // 执行事务 multi.exec(); }catch (Exception e){ // 放弃事务 multi.discard(); } finally { // 关闭连接 System.out.println(jedis.get("user1")); System.out.println(jedis.get("user2")); jedis.close(); } } } 七、SpringBoot整合Redis在SpringBoot中一般使用RedisTemplate提供的方法来操作Redis。
1、lettuce和jedisspringboot 2.x后 ,原来使用的 Jedis 被 lettuce 替换。
jedis:采用的直连,多个线程操作的话,是不安全的。如果要避免不安全,使用jedis pool连接池!更像BIO模式。lettuce:采用netty,实例可以在多个线程中共享,不存在线程不安全的情况!可以减少线程数据了,更像NIO模式。Redis配置分析
1、首先去查看Redis的自动配置类org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration, 2、去RedisProperties.class看看可以配置什么 其他可配置字段 3、对比LettuceConnectionConfiguration.class和JedisConnectionConfiguration.class
首先可以看到在SpringBoot2.x中Jedis已经被遗弃。
但是Lecture是正常的
2、基础使用1、导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>2、编写配置文件appliocation.properties
# 配置redis spring.redis.host=39.99.xxx.xx spring.redis.port=63793、测试
@SpringBootTest class Redis02SpringbootApplicationTests { @Autowired private RedisTemplate redisTemplate; @Test void contextLoads() { // redisTemplate 操作不同的数据类型,api和我们的指令是一样的 // opsForValue 操作字符串 类似String // opsForList 操作List 类似List // opsForHah // 除了基本的操作,我们常用的方法都可以直接通过redisTemplate操作,比如事务和基本的CRUD // 获取连接对象 //RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); //connection.flushDb(); //connection.flushAll(); redisTemplate.opsForValue().set("mykey","kuangshen"); System.out.println(redisTemplate.opsForValue().get("mykey")); } } 3、关于对象的保存(重写Redis配置类)1、定义一个User实体类
@Component @AllArgsConstructor @NoArgsConstructor @Data public class User { private String name; private int age; }2、测试
@Test void contextLoads2() throws JsonProcessingException { // 真实的开发一般都是使用json来传递对象 User user = new User("zhanghan", 25); String jsonUser = new ObjectMapper().writeValueAsString(user); System.out.println(jsonUser); redisTemplate.opsForValue().set("user",user);; System.out.println(redisTemplate.opsForValue().get("user")); }3、结果
可以看到是乱码的,那么如何解决了?
我们需要在对User实体进行序列化,并在我们自己写的Reids配置类中定义序列化的配置
1、定义序列化的实体类
@Component @AllArgsConstructor @NoArgsConstructor @Data // 在企业中,我们所有的pojo都会序列化!SpringBoot public class User implements Serializable { private String name; private int age; }2、重写配置类
@Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { // 将template 泛型设置为 <String, Object> RedisTemplate<String, Object> template = new RedisTemplate(); // 连接工厂,不必修改 template.setConnectionFactory(redisConnectionFactory); // 1、定义Json的序列化设置 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 2、定义String的序列化设置 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // 配置具体的序列化方式 // string的 key 采用 String序列化方式 template.setKeySerializer(stringRedisSerializer); // hash的key 采用 String序列化方式 template.setHashKeySerializer(stringRedisSerializer); // string的 value 采用 Jackson 序列化方式 template.setValueSerializer(jackson2JsonRedisSerializer); // hash的value 采用 Jackson 序列化方式 template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; }3、确保引入我们配置类的模板
@Autowired private RedisTemplate redisTemplate;4、测试
@Test void contextLoads2() throws JsonProcessingException { // 真实的开发一般都是使用json来传递对象 User user = new User("zhanghan", 25); String jsonUser = new ObjectMapper().writeValueAsString(user); System.out.println(jsonUser); redisTemplate.opsForValue().set("user",jsonUser); System.out.println(redisTemplate.opsForValue().get("user")); }5、结果
对于User对象的保存,一种是直接保存对象实体redisTemplate.opsForValue().set("user",user);,另外一种是将User实体转为json对象,再进行保存redisTemplate.opsForValue().set("user",jsonUser);,在实际的开发中,一般都是使用json来传递对象。
对于对象实体的保存 对于json对象的保存
4、自定义Redis工具类使用RedisTemplate需要频繁调用.opForxxx然后才能进行对应的操作,这样使用起来代码效率低下,工作中一般不会这样使用,而是将这些常用的公共API抽取出来封装成为一个工具类,然后直接使用工具类来间接操作Redis,不但效率高并且易用。
package com.zhang.redis02springboot.utils; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; /** * Redis工具类 */ @Component public final class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; // =============================common============================ /** * 指定缓存失效时间 * @param key 键 * @param time 时间(秒) * @return */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据key 获取过期时间 * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } /** * 判断key是否存在 * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * @param key 可以传一个值 或多个 */ @SuppressWarnings("unchecked") public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { redisTemplate.delete(key[0]); } else { redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key)); } } } // ============================String============================= /** * 普通缓存获取 * @param key 键 * @return 值 */ public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 递增 * @param key 键 * @param delta 要增加几(大于0) * @return */ public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * 递减 * @param key 键 * @param delta 要减少几(小于0) * @return */ public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } // ================================Map================================= /** * HashGet * @param key 键 不能为null * @param item 项 不能为null * @return 值 */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * @param key 键 * @return 对应的多个键值 */ public Map<Object, Object> hmget(String key) { return redisTemplate.opsForHash().entries(key); } /** * HashSet * @param key 键 * @param map 对应多个键值 * @return true 成功 false 失败 */ public boolean hmset(String key, Map<String, Object> map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map<String, Object> map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除hash表中的值 * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } /** * 判断hash表中是否有该项的值 * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * @param key 键 * @param item 项 * @param by 要增加几(大于0) * @return */ public double hincr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); } /** * hash递减 * @param key 键 * @param item 项 * @param by 要减少记(小于0) * @return */ public double hdecr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); } // ============================set============================= /** * 根据key获取Set中的所有值 * @param key 键 * @return */ public Set<Object> sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0) expire(key, time); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * @param key 键 * @return */ public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } // ===============================list================================= /** * 获取list缓存的内容 * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 * @return */ public List<Object> lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * @param key 键 * @return */ public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 * @return */ public Object lGetIndex(String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } } }使用过程
1、引入工具类
@Autowired private RedisUtil redisUtil;2、测试
@Test void test(){ redisUtil.set("name","zhangsan"); System.out.println(redisUtil.get("name")); }顿时redis操作变的简单了好多
八、redis.conf 配置详解1、常用
2、快照
快照,主要涉及的是redis的RDB持久化相关的配置 我们可以用如下的指令来让数据保存到磁盘上,即控制RDB快照功能: save <seconds> <changes>
例如:Redis 默认配置文件中提供了三个条件:
save 900 1 //表示每15分钟且至少有1个key改变,就触发一次持久化,将数据从内存写到磁盘 save 300 10 //表示每5分钟且至少有10个key改变,就触发一次持久化 save 60 10000 //表示每60秒至少有10000个key改变,就触发一次持久化3、主从复制
主从复制。使用 slaveof 来让一个 redis 实例成为另一个reids 实例的副本。
4、安全
5、限制
内存达到上限的处理策略的处理策略有六种:
volatile-lru:使用LRU算法移除过期集合中的keyallkeys-lru:使用LRU算法移除keyvolatile-random:在过期集合中移除随机的keyallkeys-random:移除随机的keyvolatile-ttl:移除那些TTL值最小的key,即那些最近才过期的key。noeviction:不进行移除。针对写操作,只是返回错误信息。6、追加
7、LUA脚本
8、虚拟内存
九、高级设置
redis是一个支持持久化的内存数据库,也就是说redis需要经常将内存中的数据同步到磁盘来保证持久化。redis支持四种持久化方式,一是 RDB Snapshotting(快照) 也是默认方式;二是 Append-only file(AOF) 的方式;三是虚拟内存方式;四是diskstore方式。
1、RDB(Redis DataBase)快照1、什么是RDB快照?
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
默认情况下, Redis 将数据库快照保存在名字为 dump.rdb的二进制文件中。文件名可以在配置文件中进行自定义
2、过程分析
1、Fork是什么? Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量,环境变量,程序计数器等)数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程。
2、过程 在进行 RDB 的时候,redis 的主线程是不会做 io 操作的,主线程会 fork 一个子线程来完成该操作;
Redis 调用forks。同时拥有父进程和子进程。子进程将数据集写入到一个临时 RDB 文件中。当子进程完成对新 RDB 文件的写入时,Redis 用新 RDB 文件替换原来的 RDB 文件,并删除旧的 RDB 文件。这种工作方式使得 Redis 可以从写时复制(copy-on-write)机制中获益(因为是使用子进程进行写操作,而父进程依然可以接收来自客户端的请求。)
3、触发机制
命令save(在redis.conf中的默认配置)或者是bgsave规则满足时,会自动触发rdb原则
1)save 使用 save 命令,会立刻对当前内存中的数据进行持久化 ,但是会阻塞,也就是不接受其他操作了; 由于 save 命令是同步命令,会占用Redis的主进程。若Redis数据非常多时,save命令执行速度会非常慢,阻塞所有客户端的请求。
2)bgsave bgsave 是异步进行,进行持久化的时候,redis 还可以将继续响应客户端请求 ; 3)save和bgsave的对比
4、RDB的优缺点
优点: 1、适合大规模的数据恢复 2、对数据的完整性要求不高
缺点: 1、在一定间隔时间做一次备份,所以如果redis意外down掉的话,就会丢失最后一次快照后的所有修改 2、Fork的时候,内存中的数据被克隆了一份,会占用一定的内容空间。
2、AOF(Append Only File)1、什么是AOF?
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
2、AOF的配置
appendonly no # 默认是不开启aof模式的, 默认是使用rdb方式持久化的,在大部分情况下,rdb完全够用! Appendfilename "appendonly.aof" # 持久化的文件的名字 # appendfsync always # 每次修改都会sync .消耗性能 appendfsync everysec # 每秒执行一次sync,可能会丢失这1s的数据! # appendfsync no # 不执行sync ,这个时候操作系统自己同步数据,速度最快! No-appendfsync-on-rewrite #重写时是否可以运用Appendfsync,用默认no即可,保证数据安 全性 Auto-aof-rewrite-min-size # 设置重写的基准值 Auto-aof-rewrite-percentage #设置重写的基准值3、AOF的启动/恢复/修复
1)正常恢复
启动:设置Yes,修改默认的appendonly no,改为yes将有数据的aof文件复制一份保存到对应目录(config get dir)恢复:重启redis然后重新加载2)异常恢复:
启动:设置Yes故意破坏 appendonly.aof 文件!修复: redis-check-aof --fix appendonly.aof 进行修复恢复:重启 redis 然后重新加载4、AOF的重写(rewrite)规则
1) 什么是重写 AOF 采用文件追加方式,文件会越来越大,为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis 就会启动AOF 文件的内容压缩,只保留可以恢复数据的最小指令集,可以使用命令 bgrewriteaof !
2)重写原理 AOF 文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧的aof文件,这点和快照有点类似!
3)触发机制: Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的已被且文件大于64M的触发。
5、AOF的优缺点
1)优点
每修改同步:appendfsync always 同步持久化,每次发生数据变更会被立即记录到磁盘,性能较差但数据完整性比较好每秒同步: appendfsync everysec 异步操作,每秒记录 ,如果一秒内宕机,有数据丢失不同步: appendfsync no 从不同步2)缺点
相同数据集的数据而言,aof 文件要远大于 rdb文件,恢复速度慢于 rdb。Aof 运行效率要慢于 rdb,每秒同步策略效率较好,不同步效率和rdb相同。 RDB和AOF的选择1、RDB和AOF的对比
2、如何选择使用哪种持久化方式?
1) 一般来说, 如果想达到足以媲美 PostgreSQL 的数据安全性, 你应该同时使用两种持久化功能。 2)如果是大量数据恢复并在意恢复速度, 但仍然可以承受数分钟以内的数据丢失, 那么你可以只使用 RDB 持久化。 3)不推荐只使用AOF持久化,因为RDB 非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快。 4)只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化。
3、虚拟内存首先说明:在Redis-2.4后虚拟内存功能已经被deprecated了,原因如下:
slow restart(重启太慢)slow saving(保存数据太慢)slow replication(上面两条导致 replication 太慢)complex code(代码过于复杂)主要思路和目的
暂时把不经常访问的数据从内存交换到磁盘中,从而腾出宝贵的内存空间用于其他需要访问的数据 如果我们的存储的数据总是有少部分数据被经常访问,大部分数据很少被访问,对于网站来说确实总是只有少量用户经常活跃。当少量数据被经常访问时,使用vm不但能提高单台redis server数据库的容量,而且也不会对性能造成太多影响。
4、diskstore 十、Redis发布与订阅1、概念
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。Redis 客户端可以订阅任意数量的频道。 订阅/发布消息图: 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系: 当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
2、命令
3、测试
------------订阅端---------------------- 127.0.0.1:6379> SUBSCRIBE sakura # 订阅sakura频道 Reading messages... (press Ctrl-C to quit) # 等待接收消息 1) "subscribe" # 订阅成功的消息 2) "sakura" 3) (integer) 1 1) "message" # 接收到来自sakura频道的消息 "hello world" 2) "sakura" 3) "hello world" 1) "message" # 接收到来自sakura频道的消息 "hello i am sakura" 2) "sakura" 3) "hello i am sakura" --------------消息发布端------------------- 127.0.0.1:6379> PUBLISH sakura "hello world" # 发布消息到sakura频道 (integer) 1 127.0.0.1:6379> PUBLISH sakura "hello i am sakura" # 发布消息 (integer) 1 -----------------查看活跃的频道------------ 127.0.0.1:6379> PUBSUB channels 1) "sakura"4、原理
通过 SUBSCRIBE 命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 channel,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中。 通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的 channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
客户端订阅,就被链接到对应频道的链表的尾部,退订则就是将客户端节点从链表中移除。
5、缺点
如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息。6、应用
消息订阅:公众号订阅,微博关注等等(起始更多是使用消息队列来进行实现)多人在线聊天室。稍微复杂的场景,我们就会使用消息中间件MQ处理。
十一、Redis主从复制 1、主从模式1、概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器.前者称为主节点(master/leader),后者称为从节点(slave/follower); 数据的复制是单向的,只能由主节点到从节点. Master以写为主,Slave以读为主.
默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点.
2、主从复制的作用
数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
3、环境配置
我使用docker配置reids集群的艰辛就不多阐述了见https://blog.csdn.net/qq_38327769 最终屈服了,采用linux主机搭建 结果就是实现了一主二从: 主机:
两个从机:
4、测试
1)读写测试 主机写,从机读,从机不能写 2)宕机测试 主机断开连接,从机依旧连接到主机的, 但是没有写操作, 这个时候, 主机如果回来了,从机依旧可以直接直接获取到主机写的信息!
3)层层链路测试 中间的作为主机的从机,又作为下一个从机的主机,这可以有效减轻主机写的压力(不用将主机的数据同步给多个从机,这会在复制原理里提及) 4)谋权篡位测试 一主二从的情况下,如果主机断了,从机可以使用命令 SLAVEOF NO ONE 将自己改为主机!其他的从机就可以手动连接到最新的这个主节点(手动)! 主机再回来,也只是一个光杆司令了,自己为了正常使用主动成为了新的主机的从机!
5、复制原理
1)Slave 启动成功连接到 master 后会发送一个sync(同步)命令 2)Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行。 3)完毕之后,master将传送整个数据文件到slave,并完成一次完全同步(全量复制)。 4)后续会继续进行增量复制。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。 增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步。
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行
2、哨兵模式1、哨兵模式概述
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例 然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
故障转移(failover)的过程:
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
2、哨兵的作用
通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。3、测试
我们目前的状态是一主二从!
1)配置哨兵的配置文件sentinel.conf
我们只需要在配置文件里加如下配置即可
# sentinel monitor myredis(被监控的名称) 127.0.0.1 6379 1(1代表同意主结点不可用的哨兵数量) sentinel monitor myredis 127.0.0.1 6379 12) 启动哨兵
redis-sentinel conf/sentinel.conf监听中… … … 3)这时候shutdown主机,那么哨兵就会从从机中随机选择一个服务器 4)这时候如果主机回来了,那不好意思了,皇位被抢了,你只能做从机了。
4、哨兵日志(全部)
# Example sentinel.conf # 哨兵sentinel实例运行的端口 默认26379 port 26379 # 哨兵sentinel的工作目录 dir /tmp # 哨兵sentinel监控的redis主节点的 ip port # master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。 # quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了 # sentinel monitor <master-name> <ip> <redis-port> <quorum> sentinel monitor mymaster 127.0.0.1 6379 1 # 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码 # 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码 # sentinel auth-pass <master-name> <password> sentinel auth-pass mymaster MySUPER--secret-0123passw0rd # 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒 # sentinel down-after-milliseconds <master-name> <milliseconds> sentinel down-after-milliseconds mymaster 30000 # 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步, # 这个数字越小,完成failover所需的时间就越长, # 但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。 # 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。 # sentinel parallel-syncs <master-name> <numslaves> sentinel parallel-syncs mymaster 1 # 故障转移的超时时间 failover-timeout 可以用在以下这些方面: #1. 同一个sentinel对同一个master两次failover之间的间隔时间。 #2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。 #3.当想要取消一个正在进行的failover所需要的时间。 #4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了 # 默认三分钟 # sentinel failover-timeout <master-name> <milliseconds> sentinel failover-timeout mymaster 180000 # SCRIPTS EXECUTION #配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。 #对于脚本的运行结果有以下规则: #若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10 #若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。 #如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。 #一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。 #通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本, #这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数, #一个是事件的类型, #一个是事件的描述。 #如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。 #通知脚本 # sentinel notification-script <master-name> <script-path> sentinel notification-script mymaster /var/redis/notify.sh # 客户端重新配置主节点参数脚本 # 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。 # 以下参数将会在调用脚本时传给脚本: # <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> # 目前<state>总是“failover”, # <role>是“leader”或者“observer”中的一个。 # 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的 # 这个脚本应该是通用的,能被多次调用,不是针对性的。 # sentinel client-reconfig-script <master-name> <script-path> sentinel client-reconfig-script mymaster /var/redis/reconfig.sh6、哨兵的优缺点
优点
哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有。主从可以切换,故障可以转移,系统可用性更好。哨兵模式是主从模式的升级,系统更健壮,可用性更高。缺点
Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。实现哨兵模式的配置也不简单,甚至可以说有些繁琐 十二、Redis缓存穿透和雪崩 1、缓存穿透(查不到)1、概念
在默认情况下,用户请求数据时,会先在缓存(Redis)中查找,若没找到即缓存未命中,再在数据库(比如MySQL)中进行查找,数量少可能问题不大,可是一旦大量的请求数据(例如秒杀场景)缓存都没有命中的话,就会全部转移到数据库上,造成数据库极大的压力,就有可能导致数据库崩溃。网络安全中也有人恶意使用这种手段进行攻击被称为洪泛攻击。
2、解决方案
1)解决方案一:布隆过滤器 对所有可能查询的参数以Hash的形式存储,以便快速确定是否存在这个值,在控制层先进行拦截校验,校验不通过直接打回,减轻了存储系统的压力。 2)解决方案二:缓存空对象 当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;
但是这种做法有两个问题:
如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。 2、缓存击穿(量太大,缓存过期)1、概念
相较于缓存穿透,缓存击穿的目的性更强,一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。这就是缓存被击穿,只是针对其中某个key的缓存不可用而导致击穿,但是其他的key依然可以使用缓存响应。
比如热搜排行上,一个热点新闻被同时大量访问就可能导致缓存击穿。
2、解决方案
1)设置热点数据永不过期 这样就不会出现热点数据过期的情况,但是当Redis内存空间满的时候也会清理部分数据,而且此种方案会占用空间,一旦热点数据多了起来,就会占用部分空间。
2)加互斥锁(分布式锁) 在访问key之前,采用SETNX(set if not exists)来设置另一个短期key来锁住当前key的访问,访问结束再删除该短期key。保证同时刻只有一个线程访问。这样对锁的要求就十分高。
3、缓存雪崩1、概念
大量的key设置了相同的过期时间,导致在缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。
2、解决方案
1)redis高可用 这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群
2)限流降级 这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
3)数据预热 数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |
标签: #redis #not #only #SQL也被很多人接受