Chinaunix首页 | 论坛 | 博客
  • 博客访问: 315770
  • 博文数量: 82
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 490
  • 用 户 组: 普通用户
  • 注册时间: 2016-06-13 10:58
文章分类

全部博文(82)

文章存档

2018年(2)

2017年(9)

2016年(71)

我的朋友

分类: 嵌入式

2017-03-30 15:28:37

1.服务端,server.lua。功能有直接转发并执行命令,还有每日定时任务功能和普通定时任务功能,module如下:
提供的接口:
server.addrPort(addr, port)
server.runLoop()
  1. #!/usr/bin/lua

  2. local socket = require "socket"
  3. local json = require "cjson.safe"
  4. local log = require "log"

  5. local M = {
  6.     headLength = 10,
  7.     address = "0.0.0.0",
  8.     port = 9201,
  9.     io = {},
  10.     timer = {}
  11. }

  12. server = M

  13. --外部调用
  14. function M.addrPort(addr, port)
  15.     M.address = addr
  16.     M.port = port
  17. end

  18. function M.runLoop()
  19.     if (false == M.createServerThread()) then
  20.         log.error("run loop exit")
  21.         return nil
  22.     end
  23.     M.doEvents()
  24. end

  25. --内部实现
  26. function M.createNewIoEvent(_fd, _entry, _func, _event, _data)
  27.     local eventInfo = {
  28.         _fd = _fd,
  29.         _entry = _entry,
  30.         _event = _event,
  31.         _data = _data,
  32.         _func = _func
  33.     }
  34.     log.debug("create new io event, fd (%s), entry (%s), func (%s), event (%s), data (%s)",
  35.              _fd, _entry, _func, _event, _data)    
  36.     M.io[#M.io+1] = eventInfo
  37. end

  38. function M.createNewTimerEvent(_func, _at, _everyday_at, _interval, _repeat, _data)
  39.     local eventInfo = {
  40.         _func = _func,
  41.         _at = _at,
  42.         _everyday_at = _everyday_at,
  43.         _interval = _interval,
  44.         _repeat = _repeat,
  45.         _data = _data
  46.     }
  47.     log.info("create new timer event, func (%s), at (%s), _everyday_at (%s)," ..
  48.              " interval (%s), repeat (%s), data (%s)",
  49.              _func, _at, _everyday_at, _interval, _repeat, _data)    
  50.     M.timer[#M.timer+1] = eventInfo
  51. end

  52. function M.executeCommand(cmd)
  53.     local file = assert(io.popen(cmd, 'r'))
  54.     local data = file:read('*all')
  55.     file:close()
  56.     log.info("execute command (%s)", cmd)
  57.     return data
  58. end

  59. function M.getEverydaySeconds()
  60.     return tonumber(os.date("%H")) * 3600 +
  61.          tonumber(os.date("%M")) * 60 +
  62.          tonumber(os.date("%S"))
  63. end

  64. function M.addTimerTask(_func, _after, _everyday_at, _interval, _repeat, _data)
  65.     local _at
  66.     
  67.     if (_everyday_at > 0) then
  68.         if (_everyday_at >= M.getEverydaySeconds()) then
  69.             _at = socket.gettime() - M.getEverydaySeconds() + _everyday_at
  70.         else
  71.             _at = socket.gettime() - M.getEverydaySeconds() + _everyday_at + 86400
  72.         end
  73.         log.debug("add new everyday timer task. everyday at (%s)", _at)
  74.     else
  75.         _at = socket.gettime() + _after
  76.         log.debug("add new interval timer task. at (%s)", _at)
  77.     end
  78.     
  79.     M.createNewTimerEvent(_func, _at, _everyday_at, _interval, _repeat, _data)
  80.     return ""
  81. end

  82. function M.removeTimerTask(_data)
  83.     local timer = M.timer
  84.     for i=#timer, 1, -1 do
  85.         if (timer[i]._data == _data) then
  86.             log.info("remove timer task, data (%s)", _data)
  87.             table.remove(timer, i)
  88.         end
  89.     end
  90.     return ""
  91. end

  92. function M.packetStringData(s)
  93.     local length = #s
  94.     
  95.     if (length >= 2^32) then
  96.         return nil
  97.     end
  98.             
  99.     local head = string.format("%0."..tostring(M.headLength).."u", #s)
  100.     return head .. s
  101. end

  102. function M.packetAndSendData(_data, fd)
  103.     local cmd = {
  104.         cmd = 0,
  105.         data = _data
  106.     }
  107.     
  108.     local data = M.packetStringData(json.encode(cmd))
  109.     local event = "closed"
  110.     if (not data) then
  111.         log.error("failed to packet data, data is to large. length (%s)", _data)
  112.     end
  113.     log.info("packet data successfully, start to send. fd (%s), data (%s)", fd, data)
  114.     
  115.     while true do
  116.         --socket.sleep(1)
  117.         fd:settimeout(0)
  118.         local count, status = fd:send(data)
  119.         
  120.         if (status == "timeout") then
  121.             log.info("send data timeout, now yield. fd (%s)", fd)    
  122.             coroutine.yield(fd)
  123.         elseif (status == "closed") then
  124.             log.info("closed by peer, fd (%s)", fd)    
  125.             event = "closed"
  126.             break
  127.         end
  128.         
  129.         if (nil ~= count) then
  130.             if (count == #data) then
  131.                 log.info("send data completed, contine to read other command, fd (%s)", fd)
  132.                 event = "read"
  133.                 break
  134.             else
  135.                 data = string.sub(data, count+1, -1)
  136.                 log.debug("send total %s, remain data (%s), fd (%s)", count, data, fd)
  137.             end
  138.         end
  139.     end
  140.     
  141.     return event
  142. end

  143. function M.receiveAndParseData(fd)
  144.     local event, rest
  145.     local headlen = M.headLength
  146.     local total = 0
  147.     local data = ""
  148.     
  149.     log.info("start to receive data, fd (%s)", fd)
  150.     while true do
  151.         --socket.sleep(1)
  152.         
  153.         if (total == 0) then
  154.             rest = headlen - #data
  155.         else
  156.             rest = total - #data
  157.         end
  158.         
  159.         fd:settimeout(0)
  160.         local s, status, partial = fd:receive(rest)
  161.         local receive = (s or partial)
  162.         data = data .. receive
  163.         
  164.         log.debug("new receive (%s), fd (%s)", receive, fd)

  165.         if (#receive == 0) then -- partial is a string, so receive is a string    
  166.             if status == "timeout" then
  167.                 log.info("receive data timeout, now yield. fd (%s)", fd)    
  168.                 coroutine.yield(fd)
  169.             elseif status == "closed" then
  170.                 log.info("closed by peer, fd (%s)", fd)    
  171.                 event = "closed"
  172.                 data = ""
  173.                 break
  174.             end
  175.         end
  176.         
  177.         if (#data >= 10) and (total == 0) then
  178.             total = tonumber(string.sub(data, 1, 10))
  179.             data = string.sub(data, 11, -1)
  180.             log.info("receive head completed, len (%s), data len (%s)", 10, total)
  181.         end
  182.         
  183.         if (#data > 0) and (#data == total) then
  184.             log.info("receive data completed, len (%s), data (%s)", #data, data)
  185.             event = "pending"
  186.             data = data
  187.             break
  188.         end
  189.     end
  190.     
  191.     return event, data
  192. end

  193. function M.findInfoNode(fd)
  194.     local pos
  195.     local io_ev = M.io
  196.     for i=1, #io_ev do
  197.         if io_ev[i]._fd == fd then
  198.             pos = i
  199.             break
  200.         end
  201.     end
  202.     return io_ev[pos]
  203. end

  204. function M.clientCmdDispatchAndExecute(data)
  205.     local cmdt = json.decode(data)
  206.     
  207.     log.info("dispatch and execute command. data (%s)", data)
  208.     
  209.     if (cmdt.cmd == 1) then -- execute one script
  210.             cmdt.data = M.executeCommand(cmdt.data)
  211.     elseif (cmdt.cmd == 2) then -- add one timer task
  212.             cmdt.data = M.addTimerTask(M.executeCommand, cmdt.after, cmdt.everyday_at,
  213.                                      cmdt.interval, cmdt["repeat"], cmdt.data)
  214.     elseif (cmdt.cmd == 3) then -- remove one timer task
  215.             cmdt.data = M.removeTimerTask(cmdt.data)
  216.     end
  217.     
  218.     return "write", cmdt.data
  219. end

  220. function M.doClient(client)
  221.     local io_ev = M.findInfoNode(client)

  222.     while (io_ev._event ~= "closed") do
  223.         -- receive data
  224.         if (io_ev._event == "read") then
  225.             io_ev._event, io_ev._data = M.receiveAndParseData(client)            
  226.         end
  227.         
  228.         -- do real task
  229.         if (io_ev._event == "pending") then
  230.             io_ev._event, io_ev._data = io_ev._func(io_ev._data)
  231.         end
  232.         
  233.         -- send data
  234.         if (io_ev._event == "write") then
  235.             io_ev._event = M.packetAndSendData(io_ev._data, client)
  236.         end
  237.     end
  238. end

  239. function M.acceptNewClient(server)
  240.     server:settimeout(0)
  241.     local newClient, status = server:accept()
  242.     if newClient ~= nil then
  243.         local entry = coroutine.create(function ()
  244.             M.doClient(newClient)
  245.         end)
  246.         log.info("accept new client successfully, newClient (%s)", newClient)    
  247.         M.createNewIoEvent(newClient, entry, M.clientCmdDispatchAndExecute, "read", "")
  248.     else
  249.         log.debug("no need to accept new client, now yield. status (%s)", status)
  250.         coroutine.yield(true, true)
  251.     end
  252. end

  253. function M.doServer(server)
  254.     log.debug("do server task")    
  255.     local io_ev = M.findInfoNode(server)
  256.     while true do
  257.         io_ev._func(server)
  258.     end
  259. end

  260. function M.getSelectfd()
  261.     local fd = {
  262.         readfd = {},
  263.         writefd = {}
  264.     }
  265.     
  266.     local io_ev = M.io
  267.     for i=#io_ev, 1, -1 do
  268.         if io_ev[i]._event == "read" then
  269.             log.debug("readfd (%s)", io_ev[i]._fd)
  270.             fd.readfd[#fd.readfd+1] = io_ev[i]._fd
  271.         elseif io_ev[i]._event == "write" then
  272.             log.debug("writefd (%s)", io_ev[i]._fd)
  273.             fd.writefd[#fd.writefd+1] = io_ev[i]._fd
  274.         elseif io_ev[i]._event == "closed" then    
  275.             log.info("remove io event because of closed. fd (%s)", io_ev[i]._fd)
  276.             table.remove(io_ev, i)
  277.         end
  278.     end
  279.     
  280.     return fd
  281. end

  282. function M.calculateSelectTime()
  283.     local timer = M.timer
  284.     
  285.     if (#timer > 0) then
  286.         local at = timer[1]._at     
  287.         for i=2, #timer do
  288.             if (at > timer[i]._at) then
  289.                 at = timer[i]._at
  290.             end
  291.         end
  292.         
  293.         local now = socket.gettime()
  294.         log.debug("now (%s), at (%s)", now, at)
  295.         
  296.         if (now >= at) then
  297.             return 0
  298.         else
  299.             return at - now
  300.         end
  301.     else
  302.         return nil
  303.     end
  304. end

  305. function M.doIoEvent()
  306.     local io_ev = M.io
  307.     local dftime = M.calculateSelectTime()
  308.     local fd = M.getSelectfd()
  309.     
  310.     socket.select(fd.readfd, fd.writefd, dftime)
  311.     for i=#io_ev, 1, -1 do
  312.         log.debug("coroutine resume, fd (%s)", io_ev[i]._fd)
  313.         local status, res = coroutine.resume(io_ev[i]._entry)
  314.         log.debug("resume response, status (%s), res (%s)", status, res)
  315.         if (not status) then
  316.             log.error("failed to resume coroutine, fd (%s)", io_ev[i]._fd)
  317.         end
  318.         
  319.         if (not res) then
  320.             log.info("io thread exit, fd (%s)", io_ev[i]._fd)
  321.             table.remove(io_ev, i)
  322.         end
  323.     end
  324. end

  325. function M.doTimerEvent()
  326.     local timer = M.timer
  327.     local now = socket.gettime()
  328.     
  329.     for i=#timer, 1, -1 do
  330.         if (timer[i]._at <= now) then
  331.             log.info("execute timer event, data (%s), everyday_at (%s), interval (%s), repeat (%s), now(%s)",
  332.                      timer[i]._data, timer[i]._everyday_at, timer[i]._interval, timer[i]._repeat, now)
  333.             timer[i]._func(timer[i]._data)
  334.             
  335.             if (timer[i]._everyday_at < 0) then
  336.                 timer[i]._at = now + timer[i]._interval
  337.                 if (timer[i]._repeat > 0) then
  338.                     timer[i]._repeat = timer[i]._repeat - 1
  339.                     if (timer[i]._repeat == 0) then
  340.                         log.info("remove timer event, data (%s), interval (%s), repeat (%s)",
  341.                                      timer[i]._data, timer[i]._interval, timer[i]._repeat)                    
  342.                         table.remove(timer, i)
  343.                     end
  344.                 end
  345.             else
  346.                 timer[i]._at = socket.gettime() - M.getEverydaySeconds()
  347.                              + 86400 + timer[i]._everyday_at
  348.             end
  349.         end
  350.     end
  351. end

  352. function M.doEvents()
  353.     local io_ev = M.io
  354.     local timer_ev = M.timer
  355.     while true do
  356.         M.doIoEvent()
  357.         M.doTimerEvent()
  358.     end
  359. end

  360. function M.createServer(srv)
  361.     local server, ret

  362.     server, err = socket.tcp()
  363.     if (nil == server) then
  364.         log.error("failed to create tcp socket, err (%s)", err)
  365.         return nil, err
  366.     end
  367.     
  368.     ret, err = server:setoption("reuseaddr", true)
  369.     if (nil == ret) then
  370.         log.error("failed to set option, err (%s)", err)
  371.         return nil, err
  372.     end
  373.     
  374.     ret, err = server:bind(srv.address, srv.port)
  375.     if (nil == ret) then
  376.         log.error("failed to bind server, err (%s)", err)
  377.         return nil, err
  378.     end
  379.     
  380.     ret, err = server:listen(100)
  381.     if (nil == ret) then
  382.         log.error("failed to create server listen, err (%s)", err)    
  383.         return nil, err
  384.     end
  385.     
  386.     log.info("create tcp server successfully, fd (%s), addr (%s), port (%s)",
  387.              server, srv.address, srv.port)    
  388.     return server
  389. end

  390. function M.createServerThread()
  391.     local server = M.createServer(M)
  392.     
  393.     if (server == nil) then
  394.         log.error("failed to create new server. addr (%s), port (%s)",
  395.                  M.address, M.port)
  396.         return false
  397.     end
  398.     
  399.     local entry = coroutine.create(function ()
  400.         M.doServer(server)
  401.     end)
  402.     M.createNewIoEvent(server, entry, M.acceptNewClient, "read", "")
  403.     
  404.     return true
  405. end

  406. return server

2.客服端,client.lua。功能主要是发送相应的命令给服务端执行,module如下:
提供的接口:
client.server(addr,port)
client.socketTimeout(sendTimeout, receiveTimeout)
client.send(_cmd, _data, _everyday_at, _after, _interval, _repeat)

  1. #!/usr/bin/lua

  2. local socket = require "socket"
  3. local json = require "cjson.safe"
  4. local log = require "log"

  5. -- private data structure
  6. local M = {
  7.     sendTimeOut = 1,                -- Socket send timeout
  8.     receiveTimeOut = 1, -- Socket receive timeout
  9.     headLength = 10,                -- Data head length
  10.     serverAddress = "127.0.0.1", -- Server ip address
  11.     serverPort = 9201,                -- Server port
  12.     data = {                        -- Real data
  13.         cmd = 1,                    -- Command id, 0 - success, 1 - direct execution command, 2 - register timing task
  14.         data = "",                    -- script
  15.         everyday_at = -1,            -- everyday execute once at this time
  16.         after = -1,                    -- Start timer task after seconds
  17.         interval = -1,                -- Timer task interval
  18.         ["repeat"] = -1                -- Execution times
  19.     }
  20. }

  21. client = M

  22. --外部调用
  23. function M.server(addr, port)
  24.     M.serverAddress = addr
  25.     M.serverPort = port            
  26. end

  27. function M.socketTimeout(sendTimeOut, receiveTimeOut)
  28.     M.sendTimeOut = sendTimeOut    
  29.     M.receiveTimeOut = receiveTimeOut
  30. end

  31. function M.send(_cmd, _data, _everyday_at, _after, _interval, _repeat)
  32.     local data = {
  33.         cmd = _cmd,                    -- 命令
  34.         data = _data,                -- 具体需要执行的命令
  35.         everyday_at = _everyday_at,    -- 每日定时任务执行的时间,没有填-1
  36.         after = _after,                -- 普通定时任务,之后几秒执行,没有填-1
  37.         interval = _interval,        -- 普通定时任务,间隔几秒执行,没有填-1
  38.         ["repeat"] = _repeat        -- 普通定时任务,执行几次
  39.     }
  40.     
  41.     M.data = data
  42.     return M.doClient()
  43. end

  44. --内部实现
  45. function M.packetStringData(s)
  46.     local length = #s
  47.     
  48.     if (length >= 2^32) then
  49.         return nil
  50.     end
  51.             
  52.     local head = string.format("%0.10u", #s)
  53.     return head .. s
  54. end

  55. function M.sendData(fd, s)
  56.     local data = s
  57.     local ret
  58.     log.debug("start to send data. data (%s), timeout (%s)", s, M.sendTimeOut)
  59.     
  60.     while #data > 0 do
  61.         fd:settimeout(M.sendTimeOut)
  62.         local count, status = fd:send(data)
  63.         
  64.         if (status == "timeout") then
  65.             log.info("send data timeout")
  66.             ret = status
  67.             break
  68.         elseif (status == "closed") then
  69.             log.info("closed by peer")
  70.             ret = status
  71.             break
  72.         end
  73.         
  74.         if (count ~= nil) then
  75.             if (count == #data) then
  76.                 log.info("send data completed")
  77.                 break
  78.             else
  79.                 data = string.sub(data, count+1, -1)
  80.                 log.debug("send total %s, remain data (%s)", count, data)
  81.             end            
  82.         end
  83.     end
  84.     
  85.     return ret
  86. end

  87. function M.receiveData(fd)
  88.     local ret, rest
  89.     local total = 0
  90.     local data = ""
  91.     
  92.     log.debug("start to receive data. timout (%s)", M.receiveTimeOut)
  93.     while true do        
  94.         if (total == 0) then
  95.             rest = M.headLength - #data
  96.         else
  97.             rest = total - #data
  98.         end
  99.         
  100.         fd:settimeout(M.receiveTimeOut)
  101.         local s, status, partial = fd:receive(rest)
  102.         local receive = (s or partial)
  103.         data = data .. receive
  104.         
  105.         log.debug("new receive data (%s)", data)

  106.         if (#receive == 0) then -- partial is a string, so receive is a string    
  107.             if status == "timeout" then
  108.                 log.info("receive data timeout.", fd)    
  109.                 ret = "timeout"
  110.                 data = ""
  111.                 break
  112.             elseif status == "closed" then
  113.                 log.info("closed by peer.", fd)    
  114.                 ret = "closed"
  115.                 data = ""
  116.                 break
  117.             end
  118.         end
  119.         
  120.         if (#data >= 10) and (total == 0) then
  121.             total = tonumber(string.sub(data, 1, 10))
  122.             data = string.sub(data, 11, -1)
  123.             log.info("receive head completed, len (%s), data len (%s)", 10, total)
  124.         end
  125.         
  126.         if (#data > 0) and (#data == total) then
  127.             log.info("receive data completed, len (%s), data (%s)", #data, data)
  128.             data = data
  129.             break
  130.         end
  131.     end
  132.     
  133.     return ret, data
  134. end

  135. function M.doClient()
  136.     local err, ret
  137.     local client = assert(socket.connect(M.serverAddress, M.serverPort))    
  138.     local data = M.packetStringData(json.encode(M.data))
  139.     
  140.     if (data == nil) then
  141.         log.error("failed to packet data, client (%s)", json.encode(M))
  142.         client:close(client)
  143.         return nil
  144.     end    
  145.     
  146.     err = M.sendData(client, data)
  147.     
  148.     if (err == "timeout") or (err == "closed") then
  149.         log.error("fail to send data, err (%s), client (%s)", err, json.encode(M))
  150.         client:close(client)
  151.         return nil
  152.     end
  153.     
  154.     err, ret = M.receiveData(client)
  155.     if (err == "timeout") or (err == "closed") then
  156.         log.error("fail to receive data, err (%s), client (%s)", err, json.encode(M))
  157.         client:close(client)
  158.         return nil
  159.     end
  160.     
  161.     log.info("do client successfully. ret (%s)", ret)
  162.     return ret
  163. end

  164. return client

3.测试代码:
  客服端,test_client.lua
  1. #!/usr/bin/lua

  2. local server = require "server"
  3. local log = require "log"

  4. log.configure(4, "stdout", "./log")

  5. server.addrPort("0.0.0.0", 9201)     -- 配置监听地址
  6. server.runLoop()              -- 开启服务
  服务端,test_server.lua
  1. #!/usr/bin/lua

  2. local client = require "client"
  3. local log = require "log"

  4. log.configure(4, "stdout", "./log") -- 配置打印等级4,到标准输出

  5. client.server("127.0.0.1", 9201)  -- 配置服务端ip和端口
  6. client.socketTimeout(1, 1)    -- 配置数据发送和接收时间

  7. -- 直接执行一次命令
  8. local ret = client.send(1, "/bin/ls -a", -1, -1, -1, -1)
  9. log.warn("%s", ret)

  10. -- 每日定时任务,每日凌晨3点执行一次
  11. local ret = client.send(2, "/bin/ls -l", 3 * 3600, -1, -1, -1)
  12. log.warn("%s", ret)
  13. -- 取消每日定时任务
  14. local ret = client.send(3, "/bin/ls -l", -1, -1, -1, -1)     
  15. log.warn("%s", ret)

  16. -- 两秒后执行,3秒执行一次,共执行4次
  17. local ret = client.send(2, "/bin/ls -a", -1, 2, 3, 4)     
  18. log.warn("%s", ret)
  19. -- 取消一般定时任务
  20. local ret = client.send(3, "/bin/ls -a", -1, -1, -1, -1)
  21. log.warn("%s", ret)
在两个不同的终端分别执行test_server.lua和test_client.lua就行了,可看到通信过程的打印和返回的数据。这个server.lua只能使用io.popen()执行命令,并不能执行用户定义的函数。没有实现自己注册命令的功能,这里做得比较失败了,有时间可以再改改

4.注意事项
1) 首先用到了lua的cjson库,可以直接到官网下载,使用和下载地址:
2) 其次是luasokect库,使用和下载地址:
    这里主要要注意socket.receive()函数,这个函数接收数据的方式有多种,一定要小心,主要是它的第二个函数参数。
     '*a' : 设置这个参数之后,socket被closed之后才会返回,如果设置了阻塞方式,那么在socket关闭之前,这里将一直阻塞着,不会返回任何东西。
     '*l' : 设置这个参数之后,当收到数据中的字符有回车换行时才会返回,且回车换行在返回的数据中会被抹掉,如果收到的数据中没有回车换行,将永远不会返回。
     number : 这个表示缓冲中大于等于number个字符时,函数才返回,不然会一直阻塞。
    只要上边三种情况导致阻塞,就只能在sokect关闭的情况下才能返回。这时候缓冲中剩余的由于数据也会全部返回,并存放在返回的第三个参数partial中,而返回的第一个参数为nil。解决阻塞的办法就是在调用receive函数之前调用socket:settimeout()函数,设置阻塞的时间。超时过后,receive函数就会返回,这时如果缓冲中有数据,那么会一起返回,在partial中。
3) 关于用到的log函数则是上一遍博客介绍的打印module。    
阅读(16723) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~