使用Java的观察者以及定期任务实现redis的缓存过期机制

什么是观察者模式

观察者模式,顾名思义,就是一个对象去观察另一个对象,当被观察对象发生改变时,能够被观察者所观察到并作出相应的反应;举个例子:

1
商店的自动门,其中,顾客是可以被观察的对象,而自动门上的红外感应器,可以称作观察者,当感应器捕捉到有顾客靠近门时,通知相应的设备执行一系列操作,从而打开商店的门,让顾客进来。

观察者模式离不开两个重要的对象—-Observable以及Observer

Observable

Observable,表示该对象为可以被观察的对象;因此,我们可以将要被观察的对象继承Observable,其中,Observable对象实现了setChanged()notifyObservers(Object arg)addObserver(Observer o)等方法;利用addObserver(Observer o)方法,可以为被观察对象添加观察者;当被观察者的某一状态被改变时,调用setChanged()标记状态的改变后,接着调用notifyObservers()去将事件广播出去,告诉观察该对象的所有观察者

Observer

Observer,表示观察者对象;我们可以通过继承Observer,实现Observerupdate(Observable o, Object arg)方法,当被观察对象发出setChanged()notifyObservers()后会调用update方法

定期任务

推荐文章: <> 石先

最终代码实现

ExpireMap.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package com.executor.org;

import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

/**
* 带有过期清除功能的Map
*
* @author tensor
*/
public class ExpireMap<K, V, T extends Long> extends Observable {

/**
* 存储真正的数据信息
*/
protected Hashtable expireTable = new Hashtable<>();
/**
* 默认过期时间为 6 Minutes
*/
private static long DEFAULT_EXPIRE_TIME = 1000 * 60 * 6;
/**
* 缓存过期时间
*/
private HashMap<String, Long> cacheMap = new HashMap<>();

/**
* 任务线程服务
*/
private ScheduledExecutorService scanService;

private boolean openExpire;
private boolean openNotify;

/**
* 根据参数构造对象
* @param openExpire
* @param openNotify
* @param observer
*/
private ExpireMap(boolean openExpire, boolean openNotify, Observer observer) {
this.openExpire = openExpire;
this.openNotify = openNotify;
if (openExpire) {
startExpireScan();
}
if (this.openNotify && observer != null) {
this.addObserver(observer);
}
}

/**
* 默认函数构造方法
* @return
*/
public static ExpireMap newExpireMap() {
return new ExpireMap<>(true, false, null);
}

/**
* 自定义过期时间的构造方法
*
* @param defaultExpireTime {单位为毫秒}
* @return
*/
public static ExpireMap newExpireMap(long defaultExpireTime) {
DEFAULT_EXPIRE_TIME = defaultExpireTime;
return new ExpireMap<>(true, false, null);
}

/**
* 自行决定是否开启定时扫描线程实现过期扫描任务,若设置为true,
* 则{@link ExpireMap.ExpireKeyScan#isExpire(String)}会
* 覆盖外部类{@link ExpireMap#isExpire(Object)}
*
* @param defaultExpireTime {单位为毫秒}
* @param openExpire
* @return
*/
public static ExpireMap newExpireMap(long defaultExpireTime, boolean openExpire) {
DEFAULT_EXPIRE_TIME = defaultExpireTime;
return new ExpireMap<>(openExpire, false, null);
}

/**
* 自行决定是否开启定时扫描线程实现过期扫描任务,若设置为true,
* 则{@link ExpireMap.ExpireKeyScan#isExpire(String)}会
* 覆盖外部类{@link ExpireMap#isExpire(Object)};同时设置观察者对象{@link Observer observer}
*
* @param defaultExpireTime {单位为毫秒}
* @param openExpire
* @param observer
* @return
*/
public static ExpireMap newExpireMap(long defaultExpireTime, boolean openExpire, Observer observer) {
DEFAULT_EXPIRE_TIME = defaultExpireTime;
return new ExpireMap<>(openExpire, true, observer);
}

/**
* 开启单线程执行过期扫描任务
*/
public void startExpireScan() {
scanService = newSingleThreadScheduledExecutor();
scanService.scheduleAtFixedRate(new ExpireMap.ExpireKeyScan(), DEFAULT_EXPIRE_TIME, DEFAULT_EXPIRE_TIME + 10,
TimeUnit.MILLISECONDS);
}

/**
* 采用预设过期时间存储
*
* @param k
* @param v
*/
public void put(K k, V v) {
if (k instanceof String) {
System.out.println("K is :[" + k + "], V is [" + v + "]");
this.expireTable.put(k, v);
this.cacheMap.put((String) k, System.currentTimeMillis() + DEFAULT_EXPIRE_TIME);
} else {
throw new RuntimeException("Key must be java.lang.String");
}
}

/**
* 采取自定义的缓存时间存储{key-value}数据, 时间数据 {@link Long t}
*
* @param k
* @param v
* @param t
*/
public void put(K k, V v, T t) {
if (k instanceof String) {
System.out.println("K is :[" + k + "], V is [" + v + "], T is [" + t + "]");
this.expireTable.put(k, v);
this.cacheMap.put((String) k, System.currentTimeMillis() + (long) t);
} else {
throw new RuntimeException("Key must be java.lang.String");
}
}

/**
* 根据Key获取对应的值,如果Key已过期或者Key在cacheMap中不存在,则抛出过期异常
*
* @param k
* @return
*/
public V get(K k) {
if (isExpire(k) || this.cacheMap.get(k) == null) {
throw new RuntimeException("This key had expire");
}
return (V) expireTable.get(k);
}

public String log() {
return this.expireTable.toString();
}

/**
* 如果{@link boolean openExpire}为true,则采用线程定时扫描的方式覆盖该方法
*
* @param k
* @return
*/
public boolean isExpire(K k) {
if (!this.openExpire) {
if (this.cacheMap.get(k) < System.currentTimeMillis()) {
this.cacheMap.remove(k);
this.expireTable.remove(k);
if (openNotify) {
this.setChanged();
this.notifyObservers(k);
}
return true;
}
return false;
}
throw new RuntimeException("This function had coverd by ExpireMap.ExpireKeyScan");
}

public static long GET_DEFAULT_EXPIRE_TIME() {
return DEFAULT_EXPIRE_TIME;
}

private class ExpireKeyScan implements Runnable {

private boolean isExpire(String k) {
if (cacheMap.get(k) < System.currentTimeMillis()) {
if (openNotify) {
setChanged();
notifyObservers(k);
}
return true;
}
return false;
}

/**
* 开启一个定时扫描线程,定期扫描{@link HashMap
* cacheMap},利用函数{@link ExpireKeyScan#isExpire(String)}扫描出
* 已过期的Key去remove在{@link java.util.Hashtable expireTable}、{@link HashMap
* cacheMap}对应的值
*
* @return {@link Long cout} 过期的key的数目
*/
@Override
public void run() {
for (Iterator<Map.Entry<String, Long>> iterator = cacheMap.entrySet().iterator(); iterator.hasNext(); iterator.next()) {
Map.Entry<String, Long> entry = iterator.next();
if (isExpire(entry.getKey())) {
iterator.remove();
}
}
}
}

}

AbstractExpireNotify.java

1
2
3
4
5
6
7
8
9
10
package com.executor.org;

import java.util.Observer;

/**
* @author tensor
*/
public abstract class AbstractExpireNotify implements Observer {

}

ExpireNotifyService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.executor.org;

import java.util.Observable;

/**
* @author tensor
*/
public class ExpireNotifyService extends AbstractExpireNotify {

@Override
public void update(Observable o, Object arg) {
System.out.println("过期Key为 : [" + arg + "]");
}

}

视频演示

BiliBili java-web-9 使用 Java 的观察者以及定期任务实现 redis 的缓存过期机制