java线程工具走出锤子敲铁皮时代。

来源:百度文库 编辑:神马文学网 时间:2024/07/05 11:27:42

长期以来,我都觉得多线程的使用是重要而又较难掌握的,要用的时候现看,下次用的时候忘记了,又要现看,这可能跟我长期从事的是Application Server中,某种固定架构下的编程,平时并不太需要自己管理线程这一事实有比较大的关系。过年的时候,因为着手策划自己的一个应用程序架构,对线程的问题多看了几眼。

在我看来Java api对线程的封装似乎过于简洁,或者说过于基础,太多事情要自己处理。这与Java api其他某些包事无巨细的包办形成鲜明的对比,每当我使用到线程这一部分,就觉得倒退到了锤子敲铁皮的时代。当然这增加了灵活性,但确实也增加了使用多线程开发并发程序的门槛。

Java api中的三大板斧:synchronized/wait/notify,的确简单有效。但是在某些情况下,需要更加复杂更加高层次的同步工具。找到了Doug Lea 编写的一个优秀的并发实用程序开放源码库 util.concurrent,它包括互斥、信号量、适于并发的集合类以及几个工作队列实现。

util.concurrent 库已经用于 Java? SDK 1.5 。

有了这个包,多线程特性可以更方便的使用了。我认为,如果这个包实现的足够好(bug少少,性能高高),将会为我们的编程带来非常大的方便。加入了这个包,java api 的抽象层次,在各个包之间,才算达到平衡的状态。

下面来看看这个包里面具体有些什么样的好东西。

sync接口:专门负责同步操作,用于替代Java提供的synchronized关键字,以实现更加灵活的代码同步。实现这个接口的类有下面几个。

Semaphore:信号量。和synchronized类似,提供了acquire()方法允许在设定时间内尝试锁定信号量,若超时则返回false。

Mutex:和synchronized类似,与之不同的是,synchronized的同步段只能限制在一个方法内,而Mutex对象可以作为参数在方法间传递,所以可以把同步代码范围扩大到跨方法甚至跨对象。

NullSync:一个比较奇怪的东西,其方法的内部实现都是空的,可能是作者认为如果你在实际中发现某段代码根本可以不用同步,但是又不想过多改动这段代码,那么就可以用NullSync来替代原来的Sync实例。此外,由于NullSync的方法都是synchronized,所以还是保留了“内存壁垒”的特性。

ObservableSync:把sync和observer模式结合起来,当sync的方法被调用时,把消息通知给订阅者,可用于同步性能调试。

TimeoutSync:可以包在Sync的外层,实现上锁超时控制的类,具体上锁的代码靠构造函数传入的sync实例来完成,其自身只负责监测上锁操作是否超时,可与SyncSet合用。

Channel接口:代表一种具备同步控制能力的容器,你可以从中存放/读取对象。不同于api中的Collection接口,可以把Channel看作是连接对象生产者(Producer)和对象消费者(Consumer)之间的一根管道。通过和Sync接口配合,Channel提供了阻塞式的对象存取方法(put/take)以及可设置阻塞等待时间的offer/poll方法。实现Channel接口的类有LinkedQueue,BoundedLinkedQueue,BoundedBuffer,BoundedPriorityQueue,SynchronousChannel,Slot等。

使用Channel我们可以很容易的编写具备消息队列功能的代码,示例如下:

Package org.javaresearch.j2seimproved.thread;

Import EDU.oswego.cs.dl.util.concurrent.*;

public class TestChannel {
final Channel msgQ = new LinkedQueue(); //log信息队列

public static void main(String[] args) {
TestChannel tc = new TestChannel();
For(int i = 0;i < 10;i ++){
Try{
tc.serve();
Thread.sleep(1000);
}catch(InterruptedException ie){
}
}
}

public void serve() throws InterruptedException {
String status = doService();
//把doService()返回状态放入Channel,后台logger线程自动读取之
msgQ.put(status);
}

private String doService() {
// Do service here
return "service completed OK! ";
}

public TestChannel() { // start background thread
Runnable logger = new Runnable() {
public void run() {
try {
for (; ; )
System.out.println("Logger: " + msgQ.take());
}
catch (InterruptedException ie) {}
}
};
new Thread(logger).start();
}
}

Excutor/ThreadFactory接口: 把相关的线程创建/回收/维护/调度等工作封装起来,而让调用者只专心于具体任务的编码工作(即实现Runnable接口),不必显式创建Thread类实例就能异步执行任务。
同时,Excutor的某些实现类实现了线程的“轻量级”使用,包括线程池,任务队列等。具体的实现有: PooledExecutor,ThreadedExecutor,QueuedExecutor,FJTaskRunnerGroup等。
下面给出一段代码,使用PooledExecutor实现一个简单的多线程服务器。


package org.javaresearch.j2seimproved.thread;
import java.net.*;
import EDU.oswego.cs.dl.util.concurrent.*;

public class TestExecutor {
public static void main(String[] args) {
PooledExecutor pool =
new PooledExecutor(new BoundedBuffer(10), 20);
pool.createThreads(4);
try {
ServerSocket socket = new ServerSocket(9999);
for (; ; ) {
final Socket connection = socket.accept();
pool.execute(new Runnable() {
public void run() {
new Handler().process(connection);
}
});
}
}
catch (Exception e) {} // die
}
static class Handler {
void process(Socket s){
}
}
}