Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1092555
  • 博文数量: 282
  • 博客积分: 10865
  • 博客等级: 上将
  • 技术积分: 2480
  • 用 户 组: 普通用户
  • 注册时间: 2006-05-12 12:35
文章存档

2017年(1)

2016年(3)

2015年(10)

2014年(12)

2013年(5)

2012年(10)

2011年(29)

2010年(3)

2008年(13)

2007年(92)

2006年(104)

我的朋友

分类: WINDOWS

2008-10-20 09:10:30

{/****************下面给出完成端口工作线程的主要代码有兴趣的朋友可以参考研究******************/}

{ 报警控制TCP服务器工作线程的定义 }
TAlarmWorkThread = class(TThread)
private
FThreadPool : TThreadPool; //线程池指针
FCompletePort : THandle; //完成端口
FAlarmSocketLists: TThreadList; //存放客户Socket相关信息的线程安全列表
FTimeOutCounter : TTimeOutCounter;
private
function FCloseConnAndFree(PHandleData:LPPerHandelData):Integer;//关闭连接并释放相关资源
function FGetClientID(APhandle:LPPerHandelData):integer; //取得ClientID
function FGetClientIDByAddr(Addr:String):Integer; //通过目标地址名取得ClientID
function FGetClientIODataByAddr(Addr:String):LPPerHandelData; //取得Client的IOdata
function FCheckPacketHeader(var WimpHeader;Len:Byte):Integer; //处理包头
function FGetLenOfBodyByHeader(Cmd:Word):Integer; //根据包头取得包体的长度
function FReceiveData(PHandleData: LPPerHandelData;Block: PBlock = nil):Boolean; //接收包头
function FMergePacked(var WimpPacket:TWimpMsg;const Cmd:Word;Var buf):ShortInt; //把缓冲区中的数据合并为一个完整协议包
function FSendPacket(PHandleData: LPPerHandelData; var HeadBuf,BodyBuf; HeadLen, BodyLen: Integer):Integer; //发送一个协议包
function FHandleRecPacket(var WimpPacket:TWimpMsg; const CMD : Word; PHandleData: LPPerHandelData):ShortInt; //处理已经接收完成的协议包
function FRecAndMakePac(PHandleData: LPPerHandelData;Block: PBlock; Transfered: DWORD):Integer; //接收并拼凑协议包
procedure FPostFEListAndParmToRC(PHandleData: LPPerHandelData; RCAddr: String); //提交RC需要的FE设备列表和参数
procedure FPostFEInfoAndParm(PHandleData:LPPerHandelData;Addr:String;LoginType:Byte); //提交FE的配置参数
function FCheckisCompletePac(PHandleData: LPPerHandelData):Integer;
public
procedure close();
constructor Create(ACompletePort:Pointer;APool:TThreadPool;ASocketLists:TThreadList;ATimeOutCounter:TTimeOutCounter);
procedure Execute; override;
destructor Destroy; override;
end;



procedure TAlarmWorkThread.close;
begin
//给完成端口发消息,关闭工作线程
PostQueuedCompletionStatus(FCompletePort,0,0,nil);
Terminate;
end;

constructor TAlarmWorkThread.Create(ACompletePort:Pointer;APool:TThreadPool;ASocketLists:TThreadList;ATimeOutCounter:TTimeOutCounter);
begin
FreeOnTerminate := True;
FTimeOutCounter := ATimeOutCounter;
FCompletePort := THANDLE(ACompletePort);
FAlarmSocketLists := ASocketLists;
FThreadPool := APool;
inherited Create(False);
end;

destructor TAlarmWorkThread.Destroy;
begin
inherited;
end;

procedure TAlarmWorkThread.Execute;
var
BytesTransferred: DWORD ;
PHandleData: LPPerHandelData;
Block: PBlock;
Flags: DWORD ;
BodySize,BytesSend,ErrCode : Integer;
Cmd : Word;
begin
while (not self.Terminated) do
begin
//查询IOCP状态(数据读写操作是否完成)
if (GetQueuedCompletionStatus(FCompletePort, BytesTransferred, DWORD(PHandleData), POverlapped(Block), INFINITE) = true) then
begin
//FThreadPool.PShowCommGauge();
//工作线程被Post消息提示退出
if (PHandleData = nil) then
begin
FThreadPool.PShowRealMsg(APP_MSGTYPE_THREAD,'控制信息','报警控制TCP服务器释放相关资源,成功关闭服务!');
Exit;
end;

//如果链路断开,释放相关资源
if (BytesTransferred = 0) then
begin
FThreadPool.PShowRealMsg(APP_MSGTYPE_THREAD,'控制信息','客户端【Addr=' + PHandleData.PSrcAddr + '】'+ PHandleData.PSrcName + '断开,TCP服务器释放相关资源!');
//清除该客户端相关资源
FCloseConnAndFree(PHandleData);
Continue;
end;

with Block^.Data do
try
case Block^.Data.OperType of
APP_OPERTYPE_RECV:
begin
FRecAndMakePac(PHandleData,Block,BytesTransferred);
Block.Data.IsUse := False;
Block.IsUse := False;
Block := nil;
FReceiveData(PHandleData,Block);
Continue;
end;

APP_OPERTYPE_SEND:
begin
Dec(wsaBuffer.len, BytesTransferred);
if wsaBuffer.len <= 0 then
begin
{ 发送完成,将Block置空,返回到FBlock的可使用的缓区中 }

//FSendHandlePacByCmd(PHandleData,Block,BytesTransferred);
Block.Data.IsUse := False;
Block.IsUse := False;
Block := nil;
end else
begin
{ 数据还没发送完成,继续发送 }
Flags := 0;
Inc(Pbyte(wsaBuffer.buf), BytesTransferred);
FillChar(Overlapped, SizeOf(Overlapped), 0);
if SOCKET_ERROR = WSASend( PHandleData.Socket, @wsaBuffer, 1, @BytesSend,Flags, @Overlapped, nil) then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
begin
FThreadPool.PShowRealMsg(APP_MSGTYPE_THREAD,'控制信息','客户端【Addr=' + PHandleData.PSrcAddr + '】异常断开,TCP服务器释放相关资源!');
FCloseConnAndFree(PHandleData);//清除该客户端相关资源
Continue;
end;
end;
end;
end;
end;
except
end;
end else begin
if (PHandleData <> nil ) then
begin
FThreadPool.PShowRealMsg(APP_MSGTYPE_THREAD,'视频服务','客户端【Addr=' + PHandleData.PSrcAddr + '】异常断开,TCP服务器释放相关资源!');
FCloseConnAndFree(PHandleData);//清除该客户端相关资源
end;
end;
end;
end;


function TAlarmWorkThread.FRecAndMakePac(PHandleData: LPPerHandelData;Block: PBlock; Transfered: DWORD): Integer;
begin
PHandleData.RingBuffer.WriteBuffer(Block.Data.Buffer,Transfered);
FCheckisCompletePac(PHandleData);
end;

function TAlarmWorkThread.FCheckisCompletePac(PHandleData: LPPerHandelData): Integer;
var
Tmp:string;
PackedLen,BodySize: Word;
PackedHead: TNetAlarmHead;
dataSize : LongInt;
RevBuffer: array [0..MAX_BUFFER_LEN] of Char;
WimpPacket: TWimpMsg;
cmd:Word;
begin
dataSize := PHandleData.RingBuffer.GetDataCount();
if dataSize < SizeOf(TNetAlarmHead) then Exit; //如果不够包头长就退出
PHandleData.RingBuffer.Copybuffer(PackedHead,SizeOf(TNetAlarmHead));
cmd := ntohs(PackedHead.Command_Id);
PackedLen := FGetLenOfBodyByHeader(cmd); //包体长度
if PackedLen < 0 then
begin
PHandleData.RingBuffer.Readbuffer(PackedHead,SizeOf(TNetAlarmHead));
FCheckisCompletePac(PHandleData); //嵌套执行
end else begin
if dataSize >= (PackedLen + SizeOf(TNetAlarmHead)) then
begin
PHandleData.RingBuffer.Readbuffer(PackedHead,SizeOf(TNetAlarmHead));
PHandleData.RingBuffer.Readbuffer(RevBuffer,PackedLen);
FMergePacked(WimpPacket,cmd,RevBuffer);
FHandleRecPacket(WimpPacket,cmd,PHandleData);
FCheckisCompletePac(PHandleData); //嵌套执行
end;
end;
end;


function TAlarmWorkThread.FSendPacket(PHandleData: LPPerHandelData; var HeadBuf,BodyBuf; HeadLen, BodyLen: Integer): Integer;
var
ErrCode: Integer;
Flags, Transfer: Cardinal;
Block: PBlock;
begin
Result := 0;
Block := PHandleData.MemoryBuffer.AllocBlock;
with Block^.Data do
begin
Flags := 0;
OperType := APP_OPERTYPE_SEND;
FillChar(Buffer, SizeOf(Buffer), 0);
Move(HeadBuf,Buffer,HeadLen);
Move(BodyBuf,Buffer[HeadLen],BodyLen);
FillChar(Overlapped, SizeOf(Overlapped), 0);
wsaBuffer.buf := @buffer;
wsaBuffer.len := HeadLen + BodyLen;
if SOCKET_ERROR = WSASend(PHandleData.Socket, @wsaBuffer, 1, @Transfer, Flags, @Overlapped, nil) then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
begin
Result := SOCKET_ERROR;
end;
end;
end;
end;



function TAlarmWorkThread.FCloseConnAndFree(PHandleData:LPPerHandelData): Integer;
var PIoDataID : Integer;
Llt : TList;
begin
Result := 0;
PIoDataID := FGetClientID(PHandleData);
FThreadPool.PUpdateOnLine(PHandleData.PSrcType,PHandleData.PSrcAddr,False);
FThreadPool.PShowRealMsg(APP_MSGTYPE_THREAD,'控制信息','客户端【登录名=' + PHandleData.PSrcAddr + '】'+ PHandleData.PSrcName + '断开,TCP服务器释放相关资源!');
if PIoDataID >=0 then
begin
FThreadPool.PUpdateCount(PHandleData.PSrcType,APP_UPDATECOUNT_DELETE);
Llt := FAlarmSocketLists.LockList();
Llt.Delete(PIoDataID);
FAlarmSocketLists.UnlockList;
shutdown(PHandleData.Socket,FD_CLOSE);
closesocket(PHandleData.Socket);
FreeAndNil(PHandleData.MemoryBuffer);
FreeAndNil(PHandleData.RingBuffer);
GlobalFree(DWORD(PHandleData));
end;
end;

function TAlarmWorkThread.FGetClientID(APhandle: LPPerHandelData): integer;
var i:Integer;
Llt:TList;
begin
Result := -1;
try
Llt := FAlarmSocketLists.LockList();
for i:=0 to Llt.Count - 1 Do
begin
if LPPerHandelData(Llt.Items[i]) = APhandle then
begin
Result := i;
Break;
end;
end;
finally
FAlarmSocketLists.UnlockList;
end;
end;
 
转载:
阅读(1456) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~