Chinaunix首页 | 论坛 | 博客
  • 博客访问: 65877
  • 博文数量: 43
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 420
  • 用 户 组: 普通用户
  • 注册时间: 2014-06-27 15:04
个人简介

记录,分享

文章分类

全部博文(43)

文章存档

2017年(24)

2015年(1)

2014年(18)

我的朋友

分类: Java

2017-03-16 12:12:59

DelayQueue实现了BlockingQueue接口和PriorityQueue接口,其泛型类型变量必须实现Delayed接口。

Delayed接口继承自Comparable接口,它又一个getDelay(TimeUnit unit) 方法,代表与此Delayed对象相关的剩余延迟时间。

当元素被加入DelayQueue时,保证队列按照compare实现的比较规则有序(内部拥有一个PriorityQueue实例变量q)。通常是按照剩余延迟时间从小到大排序。调用poll时,内部首先调用qpeek,看其getDelay返回值d是否为0或者负,是则说明该元素在队列里的生命周期已经结束了,可以被取出,调用q.poll,否则等待d代表的时间长度。

常用场景:

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

public class DelayQueueSample {

public static void main(String[] args) throws Exception {

Cache cache = new Cache();

cache.put(1, "aaaa", 5, TimeUnit.SECONDS);

cache.put(2, "bb", 3, TimeUnit.SECONDS);

cache.put(3, "bc", 8, TimeUnit.SECONDS);

Thread.sleep(1000 * 2);

{

String str = cache.get(1);

System.out.println(str);

}

Thread.sleep(1000 * 2);

{

String str = cache.get(1);

System.out.println(str);

}

Thread.sleep(1000 * 10);

}

static class Pair {

public K first;

public V second;

public Pair() {}

public Pair(K first, V second) {

this.first = first;

this.second = second;

}

@Override

public String toString() {

return "Pair [first=" + first + ", second=" + second + "]";

}

}

static class DelayItem implements Delayed {

private static final long NANO_ORIGIN = System.nanoTime();

final static long now() {

return System.nanoTime() - NANO_ORIGIN;

}

private static final AtomicLong sequencer = new AtomicLong(0);

private final long sequenceNumber;

private final long time;

private final T item;

public DelayItem(T submit, long timeout) {

this.time = now() + timeout;

this.item = submit;

this.sequenceNumber = sequencer.getAndIncrement();

}

public T getItem() {

return this.item;

}

public long getDelay(TimeUnit unit) {

long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);

return d;

}

public int compareTo(Delayed other) {

if (other == this) // compare zero ONLY if same object

return 0;

if (other instanceof DelayItem) {

DelayItem x = (DelayItem) other;

long diff = time - x.time;

if (diff < 0)

return -1;

else if (diff > 0)

return 1;

else if (sequenceNumber < x.sequenceNumber)

return -1;

else

return 1;

}

long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));

return (d == 0) ? 0 : ((d < 0) ? -1 : 1);

}

}

static class Cache {

private ConcurrentMap cacheObjMap = new ConcurrentHashMap();

private DelayQueue>> q = new DelayQueue>>();

private Thread daemonThread;

public Cache() {

Runnable daemonTask = new Runnable() {

public void run() {

daemonCheck();

}

};

daemonThread = new Thread(daemonTask);

daemonThread.setDaemon(true);

daemonThread.setName("Cache Daemon");

daemonThread.start();

}

private void daemonCheck() {

System.out.println("daemonCheck() start");

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

for (;;) {

try {

DelayItem> delayItem = q.take();

if (delayItem != null) {

// 超时对象处理

Pair pair = delayItem.getItem();

cacheObjMap.remove(pair.first, pair.second); // compare and remove

String t = sdf.format(new Date());

System.out.printf("[%s] %s - remove %s\n", Thread.currentThread().getName(), t, pair);

}

} catch (InterruptedException e) {

e.printStackTrace();

break;

}

}

System.out.println("cache service stopped.");

}

// 添加缓存对象

public void put(K key, V value, long time, TimeUnit unit) {

V oldValue = cacheObjMap.put(key, value);

if (oldValue != null)

q.remove(oldValue);

long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);

q.put(new DelayItem>(new Pair(key, value), nanoTime));

}

public V get(K key) {

return cacheObjMap.get(key);

}

}

}

阅读(327) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~