首先先看图,请求从nova-api发起,然后到nova-conductor,再到scheduler进行调度,调度选中某台机器后,通过rpc请求,发送到某台机器上执行创建机器方法,期间会访问glance获取镜像生成磁盘文件,也会访问neutron获取网络相关信息,最后调用libvirt,生成虚机,后面会逐个通过源码给大家讲解。
nova-api
创建虚机,这里从nova层面开始分析。通过http请求,带着参数访问到nova-api。
nova/api//compute/servers.py
-
def create(self, req, body):
-
if body and 'servers' in body:
-
context = req.environ['nova.context']
-
servers = body['servers']
-
return self.create_servers(context, req, servers)
顺着create_server方法进去,进到_create这个方法中,这个方法会获取参数信息,比如机器名字,套餐等,做一些基本验证,并且 调用compute_api的create方法
-
def _create(self, context, body, password):
-
return self.compute_api.create(context,..
compute_api指的是nova.compute.api.API
nova/compute/api.py
-
def create(self, context, instance_type,
-
image_href, kernel_id=None, ramdisk_id=None,
-
min_count=None, max_count=None,
-
display_name=None, display_description=None,
-
key_name=None, key_data=None, security_group=None,
-
availability_zone=None, user_data=None, metadata=None,
-
injected_files=None, admin_password=None,
-
block_device_mapping=None, access_ip_v4=None,
-
access_ip_v6=None, requested_networks=None, config_drive=None,
-
auto_disk_config=None, scheduler_hints=None, legacy_bdm=True):
-
...
-
return self._create_instance(
-
context, instance_type,
-
image_href, kernel_id, ramdisk_id,
-
min_count, max_count,
-
display_name, display_description,
-
key_name, key_data, security_group,
-
availability_zone, user_data, metadata,
-
injected_files, admin_password,
-
access_ip_v4, access_ip_v6,
-
requested_networks, config_drive,
-
block_device_mapping, auto_disk_config,
-
scheduler_hints=scheduler_hints,
-
legacy_bdm=legacy_bdm)
继续跟进到_create_instance方法里面,会做一系列参数验证和封装,进而插入数据库instance的记录,然后调用rpc请求
-
def _create_instance(self, context, instance_type,
-
image_href, kernel_id, ramdisk_id,
-
min_count, max_count,
-
display_name, display_description,
-
key_name, key_data, security_groups,
-
availability_zone, user_data, metadata,
-
injected_files, admin_password,
-
access_ip_v4, access_ip_v6,
-
requested_networks, config_drive,
-
block_device_mapping, auto_disk_config,
-
reservation_id=None, scheduler_hints=None,
-
legacy_bdm=True):
-
...
-
for instance in instances:
-
self._record_action_start(context, instance,
-
instance_actions.CREATE)
-
self.compute_task_api.build_instances...
这个请求会跑到nova/conductor/rpcapi.py中的
-
def build_instances(self, context, instances, image, filter_properties,
-
admin_password, injected_files, requested_networks,
-
security_groups, block_device_mapping, legacy_bdm=True):
-
image_p = jsonutils.to_primitive(image)
-
cctxt = self.client.prepare(version='1.5')
-
cctxt.cast(context, 'build_instances',
-
instances=instances, image=image_p,
-
filter_properties=filter_properties,
-
admin_password=admin_password,
-
injected_files=injected_files,
-
requested_networks=requested_networks,
-
security_groups=security_groups,
-
block_device_mapping=block_device_mapping,
-
legacy_bdm=legacy_bdm)
这个时候发送了rpc请求,我们用的是 zmq点对点,发送到conductor节点上,进到cctxt.cast这个方法里面,看下nova/conductor/rpcapi.py这个文件
-
def __init__(self):
-
super(ComputeTaskAPI, self).__init__()
-
target = messaging.Target(topic=CONF.conductor.topic,
-
namespace='compute_task',
-
version='1.0')
-
serializer = objects_base.NovaObjectSerializer()
-
self.client = rpc.get_client(target, serializer=serializer)
nova-conductor
进入到nova/conductor/manager.py这个文件的build_instances方法
-
def build_instances(self, context, instances, image, filter_properties,
-
admin_password, injected_files, requested_networks,
-
security_groups, block_device_mapping, legacy_bdm=True):
-
...
-
self.scheduler_rpcapi.new_run_instance(context,
-
request_spec=request_spec, admin_password=admin_password,
-
injected_files=injected_files,
-
requested_networks=requested_networks, is_first_time=True,
-
filter_properties=filter_properties,
-
legacy_bdm_in_spec=legacy_bdm)
我们这里改造了下,直接用了new_run_instance这个方法,进去再看下 nova/scheduler/rpcapi.py
-
def new_run_instance(self, ctxt, request_spec, admin_password,
-
injected_files, requested_networks, is_first_time,
-
filter_properties, legacy_bdm_in_spec=True):
-
-
msg_kwargs = {'request_spec': request_spec,
-
'admin_password': admin_password,
-
'injected_files': injected_files,
-
'requested_networks': requested_networks,
-
'is_first_time': is_first_time,
-
'filter_properties': filter_properties,
-
'legacy_bdm_in_spec': legacy_bdm_in_spec}
-
cctxt = self.client.prepare()
-
cctxt.cast(ctxt, 'new_run_instance', **msg_kwargs)
这个时候是发送了zmq请求到了scheduler上了,具体的发送过程,看下这个类的__init__方法即可。
nova-scheduler
方法到了nova/scheduler/manager.py这个文件中,我们看SchedulerManager这个类的new_run_instance方法
-
def new_run_instance(self, context, request_spec, admin_password,
-
injected_files, requested_networks, is_first_time,
-
filter_properties, legacy_bdm_in_spec=True):
-
...
-
return self.driver.new_schedule_run_instance(context,
-
request_spec, admin_password, injected_files,
-
requested_networks, is_first_time, filter_properties,
-
legacy_bdm_in_spec)
这个用到了driver, 这个driver指的就是你使用的过滤器,可能是内存有限过滤,或者CPU, 或者硬盘。这块我们选的驱动是nova.scheduler.filter_scheduler.FilterScheduler,我们进到这个驱动,看下new_schedule_run_instance这个方法。nova/scheduler/filter_scheduler.py, 类FilterScheduler下的new_schedule_run_instance方法:
-
def new_schedule_run_instance(self, context, request_spec,
-
admin_password, injected_files,
-
requested_networks, is_first_time,
-
filter_properties, legacy_bdm_in_spec):
-
...
-
try:
-
self._new_schedule_run_instance(context, request_spec,
-
admin_password, injected_files,
-
requested_networks, is_first_time,
-
filter_properties, legacy_bdm_in_spec)
-
..
这里说说下host_queue, 这个是定时加载的,默认时间是10s, 在nova/scheduler/manager.py中
-
@periodic_task.periodic_task(spacing=CONF.new_scheduler_build_queue_period,
-
run_immediately=True)
-
def build_queue(self, context):
-
current = host_queue.QueueManager()
-
current.init_host_queue(context)
-
current.build_queue()
看下build_queue的具体实现 nova/scheduler/host_queue.py
-
def build_queue(self):
-
...
-
# 从数据库读取compute节点
-
self.compute_nodes = db.compute_node_get_all(elevated)
-
for compute in self.compute_nodes:
-
# 获取extra_resources信息
-
extra_resources = compute.get('extra_resources')
-
# 获取hostname
-
hostname = compute.get('hypervisor_hostname')
-
# 获取queue_name, 默认是kvm
-
queue_name = extra_resources.get('queue_name')
-
new_queue = []
-
if not queue_name:
-
queue_name = CONF.default_queue
-
...
-
# 过滤掉disabled的机器
-
if service['disabled']:
-
LOG.warn("Compute service disabled %s", hostname)
-
continue
-
...
-
# 获取磁盘,cpu, 内存超售比,这些值都是计算节点通过定时任务,汇报自己配置文件信息到数据库中,具体的方法就是resource_tracker
-
disk_allocation_ratio = extra_resources.get('disk_allocation_ratio', 1.0)
-
cpu_allocation_ratio = extra_resources.get('cpu_allocation_ratio', 1.0)
-
ram_allocation_ratio = extra_resources.get('ram_allocation_ratio', 1.0)
-
...
-
# 获取cpu总量,使用量,空闲量
-
vcpus = compute['vcpus'] * cpu_allocation_ratio
-
vcpus_used = compute['vcpus_used']
-
free_vcpus = vcpus - compute['vcpus_used']
-
limits['vcpu'] = vcpus
-
-
local_gb = compute['local_gb'] * disk_allocation_ratio
-
free_local_gb = local_gb - \
-
(compute['local_gb'] - compute['free_disk_gb'])
-
limits['disk_gb'] = local_gb
-
-
# memory_mb
-
memory_mb = compute['memory_mb'] * ram_allocation_ratio
-
free_memory_mb = memory_mb - \
-
(compute['memory_mb'] - compute['free_ram_mb'])
-
limits['memory_mb'] = memory_mb
-
...
-
# 生成对象值,放入QueueManager.host_info中
-
QueueManager.host_info[hostname] = BaseQueue(
-
hostname=hostname,
-
vcpus=vcpus, vcpus_used=vcpus_used, free_vcpus=free_vcpus,
-
memory_mb=memory_mb,
-
free_memory_mb=free_memory_mb, local_gb=local_gb,
-
free_local_gb=free_local_gb, net_bandwidth=net_bandwidth,
-
net_bandwidth_used=net_bandwidth_used,
-
free_net_bandwidth=free_net_bandwidth,
-
disk_bandwidth=disk_bandwidth,
-
disk_bandwidth_used=disk_bandwidth_used,
-
free_disk_bandwidth=free_disk_bandwidth,
-
multi_disk_info=multi_disk_info,
-
updated_at=updated_at, queue_name=queue_name,
-
limits=limits)
我们再回过头继续看调度这块,既然host_queue都有了,我们继续往下看。nova/scheduler/filter_scheduler.py
-
def _new_schedule_run_instance(self, context, request_spec,
-
admin_password, injected_files,
-
requested_networks, is_first_time,
-
filter_properties, legacy_bdm_in_spec):
-
## 获取参数
-
..
-
## 这里参数中如果指定了scheduler_host,直接调度到指定物理机中去创建机器。
-
if scheduler_host:
-
self.schedule_instance_to_assigned_host(context, request_spec,
-
admin_password, injected_files,
-
requested_networks, is_first_time,
-
filter_properties, legacy_bdm_in_spec,
-
scheduler_host, disk_shares,
-
instance_uuids, scheduler_hints)
-
return
-
..
-
## 默认的queue_name叫kvm, 获取队列名字下的机器,这个是在host_queue文件初始化的时候构建的。
-
host_queue = self.get_host_queue(queue_name)
-
-
# 如果有值,这个用的是正则匹配,匹配机器名字中含有scheduler_host_match值的机器
-
if scheduler_host_match:
-
host_queue = self._get_matched_host_queue(host_queue, scheduler_host_match)
-
LOG.debug("matched host queue (%s): %s length is: %d", scheduler_host_match,
-
queue_name, len(host_queue))
-
...
-
# 这里设置了一个值,requested_disk值就是虚机根分区的大小,加上用户分区,再加上swap空间大小,这个在后面比对会用上
-
req_res['requested_disk'] = 1024 * (instance_type['root_gb'] +
-
instance_type['ephemeral_gb']) + \
-
instance_type['swap']
-
# 这个方法就是直接调度获取到匹配传递的参数的机器,这个在下面的方法中讲解
-
host = self._new_schedule(context, host_queue,
-
req_res, request_spec,
-
copy_filter_properties,
-
instance_uuid, retry,
-
different_host_flag,
-
different_host, disk_shares,
-
try_different_host, sign, boundary_host)
-
-
# 获取到机器了,这个时候就继续发送点对点请求,给对应的机器,去创建虚拟机
-
self.pool.spawn(self.compute_rpcapi.new_run_instance,
-
context, instance_uuid, host.hostname,
-
request_spec, copy_filter_properties,
-
requested_networks, injected_files,
-
admin_password, is_first_time,
-
host.hostname, legacy_bdm_in_spec, self._disk_info)
我们继续来看_new_scheduler, 还是在这个文件中
-
def _new_schedule(self, context, host_queue, req_res,
-
request_spec, filter_properties,
-
instance_uuid, retry=None,
-
different_host_flag=None,
-
different_host=None,
-
disk_shares=None,
-
try_different_host=None,
-
sign=1,
-
boundary_host=None):
-
..
-
# 这个含义是,如果设置了different_host为true, 则虚机的调度,要调度到不同的物理机上。
-
这里的实现是通过check_host_different_from_uuids方法,每次选中的host放到数组里面,
-
然后下一次选中的host, 验证下是否在这个数组里面。
-
if different_host:
-
LOG.debug('instance %s different_host: %s', instance_uuid,
-
different_host)
-
if not self.check_host_different_from_uuids(context,
-
instance_uuid, host, different_host):
-
self._find_pos = self._find_pos + sign * 1
-
continue
-
# 这里查看资源是否充足
-
resource_check = self.check_host_resource(context,
-
host=host,
-
req_res=req_res,
-
disk_shares=disk_shares)
-
# 如果匹配,返回host
我们继续深入方法里面,看下check_host_resource方法做了什么(依然还在这个文件中)
-
def check_host_resource(self, context, host, req_res,
-
disk_shares=0):
-
...
-
# 检查要申请的磁盘空间是否比物理机上空闲的磁盘大,如果大,就返回False, 告知check不通过
-
usable_disk_mb = host.free_local_gb * 1024
-
if not usable_disk_mb >= req_res['requested_disk']:
-
return False
-
-
# check 内存
-
if req_res['requested_ram'] > 0:
-
usable_ram = host.free_memory_mb
-
if not usable_ram >= req_res['requested_ram']:
-
return False
-
-
# check vcpus
-
if req_res['requested_vcpus'] > 0:
-
if host.free_vcpus < req_res['requested_vcpus']:
-
return False
-
return True
nova-compute
通过rpc调用到对应的host节点,执行new_run_instance方法
-
def new_run_instance(self, context, instance_uuid, request_spec,
-
filter_properties, requested_networks,
-
injected_files, admin_password,
-
is_first_time, node, legacy_bdm_in_spec,
-
disk_info=None):
-
-
-
# 一方面更新数据库状态,另外一方面,更新资源使用量
-
if disk_info:
-
instance = self._instance_update(
-
context, instance_uuid,
-
disk_shares=disk_info['disk_shares'],
-
selected_dir=disk_info['selected_dir'])
-
else:
-
instance = self._instance_update(context,
-
instance_uuid)
-
-
-
self.run_instance(context, instance, request_spec,
-
filter_properties, requested_networks, injected_files,
-
admin_password, is_first_time, node,
-
legacy_bdm_in_spec)<span style=" font-size: 24px;"><strong>
-
strong>span>
继续往下看,进入run_instance方法
-
def _run_instance(self, context, request_spec,
-
filter_properties, requested_networks, injected_files,
-
admin_password, is_first_time, node, instance,
-
legacy_bdm_in_spec):
-
-
# 首先检查下机器名字是否存在,然后更新下数据库,改成building状态
-
self._prebuild_instance(context, instance)
-
-
...
-
instance, network_info = self._build_instance(context,
-
request_spec, filter_properties, requested_networks,
-
injected_files, admin_password, is_first_time, node,
-
instance, image_meta, legacy_bdm_in_spec)
这个时候,进入方法里面,通过neutron服务获取mac和IP信息(这块就不细说了),直接看代码
-
def _build_instance(self, context, request_spec, filter_properties,
-
requested_networks, injected_files, admin_password, is_first_time,
-
node, instance, image_meta, legacy_bdm_in_spec):
-
..
-
# 查询这个实例上挂了多少盘
-
bdms = block_device_obj.BlockDeviceMappingList.get_by_instance_uuid(
-
context, instance['uuid'])
-
..
-
# 更新资源使用量,cpu 内存 硬盘
-
with rt.instance_claim(context, instance, limits):
-
...
-
# neutron将会为VM分配MAC和IP
-
network_info = self._allocate_network(context, instance,
-
requested_networks, macs, security_groups,
-
dhcp_options)
-
-
instance = self._spawn(context, instance, image_meta,
-
network_info, block_device_info,
-
injected_files, admin_password,
-
set_access_ip=set_access_ip)
spawn方法是相对比较底层的,里面涉及镜像和创建虚机
-
def spawn(self, context, instance, image_meta, injected_files,
-
admin_password, network_info=None, block_device_info=None):
-
disk_info = blockinfo.get_disk_info(CONF.libvirt.virt_type,
-
instance,
-
block_device_info,
-
image_meta)
-
# 需要注入的文件内容,最后会以字符串的形式写入injected_files中
-
if CONF.libvirt.inject_nifcfg_file:
-
self._mk_inject_files(image_meta, network_info, injected_files)
-
-
# 创建磁盘镜像文件 disk disk.local等
-
self._create_image(context, instance,
-
disk_info['mapping'],
-
network_info=network_info,
-
block_device_info=block_device_info,
-
files=injected_files,
-
admin_pass=admin_password)
-
# 生成libvirt.xml文件
-
xml = self.to_xml(context, instance, network_info,
-
disk_info, image_meta,
-
block_device_info=block_device_info,
-
write_to_disk=True)
-
-
# 创建真正的虚机实例domain
-
self._create_domain_and_network(context, xml, instance, network_info,
-
block_device_info)
-
-
LOG.debug(_("Instance is running"), instance=instance)
-
-
# 监控状态是否ok, ok的话返回
-
def _wait_for_boot():
-
"""Called at an interval until the VM is running."""
-
state = self.get_info(instance)['state']
-
-
if state == power_state.RUNNING:
-
LOG.info(_("Instance spawned successfully."),
-
instance=instance)
-
raise loopingcall.LoopingCallDone()
-
-
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_boot)
-
timer.start(interval=0.5).wait()
至此整个流程讲完了,有很多细的地方没有讲到,后续会在源码分析的其他章节讲解。