记录,分享
分类: Java
2017-03-16 12:12:59
DelayQueue实现了BlockingQueue接口和PriorityQueue接口,其泛型类型变量必须实现Delayed接口。
Delayed接口继承自Comparable接口,它又一个getDelay(TimeUnit unit) 方法,代表与此Delayed对象相关的剩余延迟时间。
当元素被加入DelayQueue时,保证队列按照compare实现的比较规则有序(内部拥有一个PriorityQueue实例变量q)。通常是按照剩余延迟时间从小到大排序。调用poll时,内部首先调用q的peek,看其getDelay返回值d是否为0或者负,是则说明该元素在队列里的生命周期已经结束了,可以被取出,调用q.poll,否则等待d代表的时间长度。
常用场景:
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
public class DelayQueueSample {
public static void main(String[] args) throws Exception {
Cache
cache.put(1, "aaaa", 5, TimeUnit.SECONDS);
cache.put(2, "bb", 3, TimeUnit.SECONDS);
cache.put(3, "bc", 8, TimeUnit.SECONDS);
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
Thread.sleep(1000 * 10);
}
static
class Pair
public K first;
public V second;
public Pair() {}
public Pair(K first, V second) {
this.first = first;
this.second = second;
}
@Override
public String toString() {
return "Pair [first=" + first + ", second=" + second + "]";
}
}
static
class DelayItem
private static final long NANO_ORIGIN = System.nanoTime();
final static long now() {
return System.nanoTime() - NANO_ORIGIN;
}
private static final AtomicLong sequencer = new AtomicLong(0);
private final long sequenceNumber;
private final long time;
private final T item;
public DelayItem(T submit, long timeout) {
this.time = now() + timeout;
this.item = submit;
this.sequenceNumber = sequencer.getAndIncrement();
}
public T getItem() {
return this.item;
}
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof DelayItem) {
DelayItem x = (DelayItem) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}
static
class Cache
private
ConcurrentMap
private
DelayQueue
private Thread daemonThread;
public Cache() {
Runnable daemonTask = new Runnable() {
public void run() {
daemonCheck();
}
};
daemonThread = new Thread(daemonTask);
daemonThread.setDaemon(true);
daemonThread.setName("Cache Daemon");
daemonThread.start();
}
private void daemonCheck() {
System.out.println("daemonCheck() start");
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
for (;;) {
try {
DelayItem
if (delayItem != null) {
// 超时对象处理
Pair
cacheObjMap.remove(pair.first, pair.second); // compare and remove
String t = sdf.format(new Date());
System.out.printf("[%s] %s - remove %s\n", Thread.currentThread().getName(), t, pair);
}
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
System.out.println("cache service stopped.");
}
// 添加缓存对象
public void put(K key, V value, long time, TimeUnit unit) {
V oldValue = cacheObjMap.put(key, value);
if (oldValue != null)
q.remove(oldValue);
long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
q.put(new
DelayItem
}
public V get(K key) {
return cacheObjMap.get(key);
}
}
}