只有偏执狂才能生存
分类: LINUX
2015-08-10 10:50:44
这篇文章来看下docker中的apiserver的实现。
我们从api := apiserver.New(serverConfig)这个代码开始看。来看下New的实现是什么,这里serverConfig结构体的内容为:
1
2
3
4
5
6
7
|
serverConfig := &apiserver.ServerConfig{
Logging: true,
EnableCors: daemonCfg.EnableCors,
CorsHeaders: daemonCfg.CorsHeaders,
Version: dockerversion.VERSION,
}
serverConfig = setPlatformServerConfig(serverConfig, daemonCfg)
|
New的代码为:
1
2
3
4
5
6
7
8
9
|
func New(cfg *ServerConfig) *Server {
srv := &Server{
cfg: cfg,
start: make(chan struct{}),
}
r := createRouter(srv)
srv.router = r
returnsrv
}
|
这里建立了一个server结构体:
1
2
3
4
5
6
7
|
type Server struct{
daemon *daemon.Daemon
cfg *ServerConfig
router *mux.Router
start chan struct{}
servers []serverCloser
}
|
同时这个server结构体的router被赋值了一个router结构体,这里的router是什么呢?
熟悉OpenStack中使用频率特别高的routes组件的同学应该能猜到了,这里的router就是决定了一个URL最终映射到什么处理函数的一个路由。我们可以看到createRoute的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
// we keep enableCors just for legacy usage, need to be removed in the future
func createRouter(s *Server) *mux.Router {
r := mux.NewRouter()
ifos.Getenv("DEBUG") != ""{
ProfilerSetup(r, "/debug/")
}
m := map[string]map[string]HttpApiFunc{
"GET": {
"/_ping": s.ping,
......
},
"POST": {
"/auth": s.postAuth,
......
},
"DELETE": {
"/containers/{name:.*}": s.deleteContainers,
......
},
"OPTIONS": {
"": s.optionsHandler,
},
}
// If "api-cors-header" is not given, but "api-enable-cors" is true, we set cors to "*"
// otherwise, all head values will be passed to HTTP handler
corsHeaders := s.cfg.CorsHeaders
ifcorsHeaders == ""&& s.cfg.EnableCors {
corsHeaders = "*"
}
formethod, routes := range m {
forroute, fct := range routes {
logrus.Debugf("Registering %s, %s", method, route)
// NOTE: scope issue, make sure the variables are local and won't be changed
localRoute := route
localFct := fct
localMethod := method
// build the handler function
f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))
// add the new route
iflocalRoute == ""{
r.Methods(localMethod).HandlerFunc(f)
} else{
r.Path("/v{version:[0-9.]+}"+ localRoute).Methods(localMethod).HandlerFunc(f)
r.Path(localRoute).Methods(localMethod).HandlerFunc(f)
}
}
}
returnr
}
|
上面的代码首先建立了一个mux.NewRouter()结构体r,接着我们可以看到一个m的map,其中存放了HTTP请求类型、URL地址以及实际的处理函数的映射关系。然后会将这个map注入到r中,来看下面的这段代码:
1
2
3
4
5
6
7
8
9
10
|
// build the handler function
f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))
// add the new route
iflocalRoute == ""{
r.Methods(localMethod).HandlerFunc(f)
} else{
r.Path("/v{version:[0-9.]+}"+ localRoute).Methods(localMethod).HandlerFunc(f)
r.Path(localRoute).Methods(localMethod).HandlerFunc(f)
}
|
makeHttpHandler实质上是对localFct做了一个wrap,也就是说当apiserver收到一个HTTP请求的时候,makeHttpHandler生成的func会先对这个HTTP请求做一些检查,比如client和server的版本号是否ok等等,然后才会调用localFct:
1
2
3
4
|
iferr := handlerFunc(version, w, r, mux.Vars(r)); err != nil {
logrus.Errorf("Handler for %s %s returned error: %s", localMethod, localRoute, err)
httpError(w, err)
}
|
接着代码会判断localRoute是否为空字符串,如果是的话就不绑定带版本号的URL了。至于r.Path(localRoute).Methods(localMethod).HandlerFunc(f)做的就是在r中存放这个信息。熟悉Python中routes或者是熟悉tornado的同学应该很容易理解这个过程。之后在看某个方法的具体实现的时候可以来这里找到URL的mapping,然后查看对应函数的实现。
现在我们有了一个server,看下这个server启动时发生了什么。在代码中可以看到:
1
2
3
4
5
6
7
8
|
go func() {
iferr := api.ServeApi(flHosts); err != nil {
logrus.Errorf("ServeAPI error: %v", err)
serveAPIWait <- err
return
}
serveAPIWait <- nil
}()
|
这里应该是启动我们的server了,看下其实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// ServeApi loops through all of the protocols sent in to docker and spawns
// off a go routine to setup a serving http.Server for each.
func (s *Server) ServeApi(protoAddrs []string) error {
var chErrors = make(chan error, len(protoAddrs))
for_, protoAddr := range protoAddrs {
protoAddrParts := strings.SplitN(protoAddr, "://", 2)
iflen(protoAddrParts) != 2 {
returnfmt.Errorf("bad format, expected PROTO://ADDR")
}
srv, err := s.newServer(protoAddrParts[0], protoAddrParts[1])
iferr != nil {
returnerr
}
s.servers = append(s.servers, srv...)
for_, s := range srv {
logrus.Infof("Listening for HTTP on %s (%s)", protoAddrParts[0], protoAddrParts[1])
go func(s serverCloser) {
iferr := s.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
err = nil
}
chErrors <- err
}(s)
}
}
fori := 0; i < len(protoAddrs); i++ {
err := <-chErrors
iferr != nil {
returnerr
}
}
returnnil
}
|
首先根据监听的地址分别调用s.newServer(protoAddrParts[0], protoAddrParts[1])创建一个干活的server,然后后者调用Serve开始服务。s.newServer会根据监听的协议类型建立套接字,然后启动一个HttpServer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
......
switchproto {
......
case"tcp":
l, err := s.initTcpSocket(addr)
iferr != nil {
returnnil, err
}
ls = append(ls, l)
default:
returnnil, fmt.Errorf("Invalid protocol format: %q", proto)
}
var res []serverCloser
for_, l := range ls {
res = append(res, &HttpServer{
&http.Server{
Addr: addr,
Handler: s.router,
},
l,
})
}
returnres, nil
}
|
这里HttpServer的结构体为:
1
2
3
4
|
type HttpServer struct{
srv *http.Server
l net.Listener
}
|
而http.Server则是go的原生server。
稍微来理一下,我们现在有下面的结构:
* 一个api,这个api是docker实现的server,代表了提供HTTP服务的一个逻辑上的server
* api下面有一个servers数组,每个数组都是实际提供服务的server,之所以有这么多server是因为可能存在多种监听方式或者是监听的地址
* api.servers中的server的最终实现是通过go的http.Server实现的
再来接着看主线代码:
1
2
3
4
5
6
7
8
9
10
11
12
|
registryService := registry.NewService(registryCfg)
d, err := daemon.NewDaemon(daemonCfg, registryService)
iferr != nil {
ifpfile != nil {
iferr := pfile.Remove(); err != nil {
logrus.Error(err)
}
}
logrus.Fatalf("Error starting daemon: %v", err)
}
logrus.Info("Daemon has completed initialization")
|
这里我们看到了一个registryService以及一个daemon.NewDaemon的函数,registryService目前只是一个保存了配置信息的结构体:
1
2
3
4
5
6
7
8
9
10
11
|
// NewService returns a new instance of Service ready to be
// installed no an engine.
func NewService(options *Options) *Service {
return&Service{
Config: NewServiceConfig(options),
}
}
......
type Service struct{
Config *ServiceConfig
}
|
daemon.NewDaemon比较复杂,我们最后再看。先来继续看api的剩余代码:
1
2
3
|
// after the daemon is done setting up we can tell the api to start
// accepting connections with specified daemon
api.AcceptConnections(d)
|
1
2
3
4
5
6
7
8
9
10
11
12
|
func (s *Server) AcceptConnections(d *daemon.Daemon) {
// Tell the init daemon we are accepting requests
s.daemon = d
s.registerSubRouter()
go systemd.SdNotify("READY=1")
// close the lock so the listeners start accepting connections
select {
case<-s.start:
default:
close(s.start)
}
}
|
这里通过channel释放了start的锁。start的锁是在等待呢?上面提到的套接字建立的代码中可以看到类似如下代码:
1
2
3
4
5
|
case"tcp":
l, err := s.initTcpSocket(addr)
......
case"unix":
l, err := sockets.NewUnixSocket(addr, s.cfg.SocketGroup, s.start)
|
initTcpSocket中也会传递s.start。
apiserver基本上就是这么多东西,其实就是一个带了route的http server。接下来有两大块地方要分析,一个是我们的daemon,还有一个就是请求的真正实现。我们先通过一个简单的请求的实现代码来看下daemon的作用:
1
|
"/containers/ps": s.getContainersJSON,
|
s.getContainersJSON的实现为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func (s *Server) getContainersJSON(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
iferr := parseForm(r); err != nil {
returnerr
}
config := &daemon.ContainersConfig{
All: boolValue(r, "all"),
Size: boolValue(r, "size"),
Since: r.Form.Get("since"),
Before: r.Form.Get("before"),
Filters: r.Form.Get("filters"),
}
iftmpLimit := r.Form.Get("limit"); tmpLimit != ""{
limit, err := strconv.Atoi(tmpLimit)
iferr != nil {
returnerr
}
config.Limit = limit
}
containers, err := s.daemon.Containers(config)
iferr != nil {
returnerr
}
returnwriteJSON(w, http.StatusOK, containers)
}
|
可以看到实际上这个请求是由s.daemon完成的。我们上面看到了api其实就是一个路由请求的框架,会把URL路由的s的相关函数上。但是这些函数其实还是空架子,真正干活的是s.daemon。现在对daemon的地位及功能有了大概的认识后,我们接下来的文章会分析daemon的实现以及一些重要请求处理函数的实现。