每一个大型的项目,都会必须要设计log,log是重要的调试手段,也是很好的学习入口。跟踪log可以让一个新手快速的理解代码,分析log可以帮助工程师很好的定位问题。
下面通过跟踪ceph-mon这个可执行文件,了解ceph中的log实现。
ceph_mon 初始化中 会调用global_init, global_init一开始就会调用 common_preinit函数
创建一个重要的数据结构CephContext cct。
-
CephContext::CephContext(uint32_t module_type_)
-
: nref(1),
-
_conf(new md_config_t()),
-
_log(NULL),
-
_module_type(module_type_),
-
_service_thread(NULL),
-
_log_obs(NULL),
-
_admin_socket(NULL),
-
_perf_counters_collection(NULL),
-
_perf_counters_conf_obs(NULL),
-
_heartbeat_map(NULL),
-
_crypto_none(NULL),
-
_crypto_aes(NULL)
-
{
-
pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
-
-
_log = new ceph::log::Log(&_conf->subsys);
-
_log->start();
-
-
_log_obs = new LogObs(_log);
-
_conf->add_observer(_log_obs);
-
-
_perf_counters_collection = new PerfCountersCollection(this);
-
_admin_socket = new AdminSocket(this);
-
_heartbeat_map = new HeartbeatMap(this);
-
-
_admin_hook = new CephContextHook(this);
-
_admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
-
_admin_socket->register_command("1", "1", _admin_hook, "");
-
_admin_socket->register_command("perf dump", "perf dump", _admin_hook, "dump perfcounters value");
-
_admin_socket->register_command("perfcounters_schema", "perfcounters_schema", _admin_hook, "");
-
_admin_socket->register_command("2", "2", _admin_hook, "");
-
_admin_socket->register_command("perf schema", "perf schema", _admin_hook, "dump perfcounters schema");
-
_admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings");
-
_admin_socket->register_command("config set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config set [ ...]: set a config variable");
-
_admin_socket->register_command("config get", "config get name=var,type=CephString", _admin_hook, "config get : get the config value");
-
_admin_socket->register_command("log flush", "log flush", _admin_hook, "flush log entries to log file");
-
_admin_socket->register_command("log dump", "log dump", _admin_hook, "dump recent log entries to log file");
-
_admin_socket->register_command("log reopen", "log reopen", _admin_hook, "reopen log file");
-
-
_crypto_none = new CryptoNone;
-
_crypto_aes = new CryptoAES;
-
}
这个函数并不长,但是绝不简单,本文就是介绍Log线程
-
_log = new ceph::log::Log(&_conf->subsys);
-
_log->start();
第一句 new 是创建了Log实例:基本就是初始化
1 初始化自旋锁 m_lock
2 初始化互斥量 m_flush_mutex 和m_queue_mutex
3 初始化条件变量 m_con_logger 和m_cond_flusher
-
Log::Log(SubsystemMap *s)
-
: m_indirect_this(NULL),
-
m_subs(s),
-
m_new(), m_recent(),
-
m_fd(-1),
-
m_syslog_log(-2), m_syslog_crash(-2),
-
m_stderr_log(1), m_stderr_crash(-1),
-
m_stop(false),
-
m_max_new(DEFAULT_MAX_NEW),
-
m_max_recent(DEFAULT_MAX_RECENT)
-
{
-
int ret;
-
-
ret = pthread_spin_init(&m_lock, PTHREAD_PROCESS_SHARED);
-
assert(ret == 0);
-
-
ret = pthread_mutex_init(&m_flush_mutex, NULL);
-
assert(ret == 0);
-
-
ret = pthread_mutex_init(&m_queue_mutex, NULL);
-
assert(ret == 0);
-
-
ret = pthread_cond_init(&m_cond_loggers, NULL);
-
assert(ret == 0);
-
-
ret = pthread_cond_init(&m_cond_flusher, NULL);
-
assert(ret == 0);
-
-
// kludge for prealloc testing
-
if (false)
-
for (int i=0; i < PREALLOC; i++)
-
m_recent.enqueue(new Entry);
-
}
接下来启动了log线程:
-
void Log::start()
-
{
-
assert(!is_started());
-
pthread_mutex_lock(&m_queue_mutex);
-
m_stop = false;
-
pthread_mutex_unlock(&m_queue_mutex);
-
create();
-
}
关键一句是create,这个create从哪里冒出来的?
这个调用的是 Thread里面的create 方法:
-
int Thread::try_create(size_t stacksize)
-
{
-
pthread_attr_t *thread_attr = NULL;
-
stacksize &= CEPH_PAGE_MASK; // must be multiple of page
-
if (stacksize) {
-
thread_attr = (pthread_attr_t*) malloc(sizeof(pthread_attr_t));
-
if (!thread_attr)
-
return -ENOMEM;
-
pthread_attr_init(thread_attr);
-
pthread_attr_setstacksize(thread_attr, stacksize);
-
}
-
int r;
-
// The child thread will inherit our signal mask. Set our signal mask to
-
// the set of signals we want to block. (It's ok to block signals more
-
// signals than usual for a little while-- they will just be delivered to
-
// another thread or delieverd to this thread later.)
-
sigset_t old_sigset;
-
if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
-
block_signals(NULL, &old_sigset);
-
}
-
else {
-
int to_block[] = { SIGPIPE , 0 };
-
block_signals(to_block, &old_sigset);
-
}
-
r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
-
restore_sigset(&old_sigset);
-
if (thread_attr)
-
free(thread_attr);
-
return r;
-
}
-
void Thread::create(size_t stacksize)
-
{
-
int ret = try_create(stacksize);
-
if (ret != 0) {
-
char buf[256];
-
snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
-
"failed with error %d", ret);
-
dout_emergency(buf);
-
assert(ret == 0);
-
}
-
}
创建线程,该方法做了下面的事情:
1 可以设置线程栈的size
2 对于daemon进程,会创建线程前会阻塞SIGPIPE
3 调用pthread_create函数创建线程
等一下,线程执行什么函数? _entry_func到底是什么?
-
void *Thread::_entry_func(void *arg) {
-
void *r = ((Thread*)arg)->entry();
-
return r;
-
}
也就是说线程执行的函数,记录在pthread_create的第四个变量的里面:
-
namespace ceph {
-
namespace log {
-
class Log : private Thread
-
{
-
Log **m_indirect_this;
-
SubsystemMap *m_subs;
-
-
pthread_spinlock_t m_lock;
-
pthread_mutex_t m_queue_mutex;
-
pthread_mutex_t m_flush_mutex;
-
pthread_cond_t m_cond_loggers;
-
pthread_cond_t m_cond_flusher;
-
EntryQueue m_new; ///< new entries
-
EntryQueue m_recent; ///< recent (less new) entries we've already written at low detail
-
std::string m_log_file;
-
int m_fd;
-
int m_syslog_log, m_syslog_crash;
-
int m_stderr_log, m_stderr_crash;
-
bool m_stop;
-
int m_max_new, m_max_recent;
-
void *entry();
-
void _flush(EntryQueue *q, EntryQueue *requeue, bool crash);
-
void _log_message(const char *s, bool crash);
-
public:
-
Log(SubsystemMap *s);
-
virtual ~Log();
-
void set_flush_on_exit();
-
void set_max_new(int n);
-
void set_max_recent(int n);
-
void set_log_file(std::string fn);
-
void reopen_log_file();
-
void flush();
-
void dump_recent();
-
void set_syslog_level(int log, int crash);
-
void set_stderr_level(int log, int crash);
-
Entry *create_entry(int level, int subsys);
-
void submit_entry(Entry *e);
-
void start();
-
void stop();
-
};
-
}
-
}
即Log的类中,有成员函数 entry,即线程应该执行的函数:
-
void *Log::entry()
-
{
-
pthread_mutex_lock(&m_queue_mutex);
-
while (!m_stop) {
-
if (!m_new.empty()) {
-
pthread_mutex_unlock(&m_queue_mutex);
-
flush();
-
pthread_mutex_lock(&m_queue_mutex);
-
continue;
-
}
-
pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
-
}
-
pthread_mutex_unlock(&m_queue_mutex);
-
flush();
-
return NULL;
-
}
这就是log线程执行的动作。
1 m_stop来控制线程是否终止
2 如果m_new 这个队列不空,就调用flush,负责写入log
3 如果队列空了,条件等待,有新的log出现在队列上,会通知到这个线程
暂且不管如果控制线程,谁来通知log线程有新的log,直接看下线程的主要工作,flush主要工作如下
-
void Log::flush()
-
{
-
pthread_mutex_lock(&m_flush_mutex);
-
pthread_mutex_lock(&m_queue_mutex);
-
EntryQueue t;
-
t.swap(m_new);
-
pthread_cond_broadcast(&m_cond_loggers);
-
pthread_mutex_unlock(&m_queue_mutex);
-
_flush(&t, &m_recent, false);
-
// trim
-
//m_recent有一个最大值,超出了最大值,就从队列中删除最老的log,内存也就释放了
-
while (m_recent.m_len > m_max_recent) {
-
delete m_recent.dequeue();
-
}
-
pthread_mutex_unlock(&m_flush_mutex);
-
}
首先,创建一个临时队列,t,执行swap将m_new里面的log都接了过去。所谓接过去,不过是将指针指向队列内容的事情交给临时队列,m_new 头指针和尾指针置成NULL
做了这件事之后,m_new又变成了空的队列,好心地给其他线程发了广播之后,就可以解锁m_queue_mutex互斥量了。
swap操作比较简单,就是交换指针的彼此指向:
-
void swap(EntryQueue& other) {
-
int len = m_len;
-
struct Entry *h = m_head, *t = m_tail;
-
m_len = other.m_len;
-
m_head = other.m_head;
-
m_tail = other.m_tail;
-
other.m_len = len;
-
other.m_head = h;
-
other.m_tail = t;
-
}
然后将主要的写日志的工作就委托给了__flush函数。
可以看出,有的log需要写入日志文件,有的需要写入syslog,有些日志需要写入stderr
同时,日志还有一个优先级的概念,因此会根据should_log 来控制。
-
void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
-
{
-
Entry *e;
-
char buf[80];
-
while ((e = t->dequeue()) != NULL) {
-
unsigned sub = e->m_subsys;
-
bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;
-
bool do_fd = m_fd >= 0 && should_log;
-
bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
-
bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
-
if (do_fd || do_syslog || do_stderr) {
-
int buflen = 0;
-
if (crash)
-
buflen += snprintf(buf, sizeof(buf), "%6d> ", -t->m_len);
-
buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
-
buflen += snprintf(buf + buflen, sizeof(buf)-buflen, " %lx %2d ",
-
(unsigned long)e->m_thread, e->m_prio);
-
// FIXME: this is slow
-
string s = e->get_str();
-
if (do_fd) {
-
int r = safe_write(m_fd, buf, buflen);
-
if (r >= 0)
-
r = safe_write(m_fd, s.data(), s.size());
-
if (r >= 0)
-
r = write(m_fd, "\n", 1);
-
if (r < 0)
-
cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
-
}
-
if (do_syslog) {
-
syslog(LOG_USER, "%s%s", buf, s.c_str());
-
}
-
if (do_stderr) {
-
cerr << buf << s << std::endl;
-
}
-
}
-
requeue->enqueue(e);
-
}
-
}
接下来是common字段,日志总免不了要记录消息的发生时间。:
-
buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
这个m_stamp是utime_t类型,他的sprintf实现如下:
-
int sprintf(char *out, int outlen) const {
-
struct tm bdt;
-
time_t tt = sec();
-
localtime_r(&tt, &bdt);
-
-
return snprintf(out, outlen,
-
"%04d-%02d-%02d %02d:%02d:%02d.%06ld",
-
bdt.tm_year + 1900, bdt.tm_mon + 1, bdt.tm_mday,
-
bdt.tm_hour, bdt.tm_min, bdt.tm_sec, usec());
-
}
-
因此log的前缀是
-
2015-05-29 10:20:45.076105
接下来的字段是,线程的线程ID以及 消息的优先级,所谓线程ID并不是调度意义上的ID,而是pthread_self的返回值那个ID
-
buflen += snprintf(buf + buflen, sizeof(buf)-buflen, " %lx %2d ",
-
(unsigned long)e->m_thread, e->m_prio);
ceph-mon是多线程的程序:
-
root@test3:/var/log/ceph# pidof ceph--mon
-
root@test3:/var/log/ceph# pidof ceph-mon
-
138812
-
root@test3:/var/log/ceph#
-
root@test3:/var/log/ceph# ll /proc/138812/task
-
total 0
-
dr-xr-xr-x 23 root root 0 May 29 16:17 ./
-
dr-xr-xr-x 9 root root 0 May 29 16:11 ../
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138812/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138813/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138814/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138815/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138816/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138817/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138818/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138819/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138820/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138821/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138822/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138823/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 138824/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 139643/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 139895/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 139896/
-
dr-xr-xr-x 6 root root 0 May 29 16:17 143413/
-
dr-xr-xr-x 6 root root 0 May 30 16:47 78042/
-
dr-xr-xr-x 6 root root 0 May 30 16:47 78044/
-
dr-xr-xr-x 6 root root 0 May 30 17:08 90496/
-
dr-xr-xr-x 6 root root 0 May 30 17:08 90497/
因此,我们可以取来一条log查看下ceph log的前缀:
-
2015-05-29 10:20:45.076105 7fa1c6a1a700 0 mon.jnqfg@2(peon).data_health(30) update_stats avail 92% total 95990980 used 2183384 avail 88908400
值得一提的是,ceph维护了一个m_recent队列,所有的消息都会存放到该队列中去,哪怕优先级比较低,不会打印到日志文件中去。
这就是_flush 函数中的 enqueue做的事情:
当然了,队列是有大小的限制,否则队列就会膨胀,导致内存耗尽。这个默认的限制是10000.
-
while (m_recent.m_len > m_max_recent) {
-
delete m_recent.dequeue();
-
}
ceph提供了方法来查看最近的log,这就是log dump方法。
-
ceph daemon /var/run/ceph/ceph-mon.*asok log dump
-
{}
-
表面看啥也没输出,实际上将log dump到了日志文件中,比如我们例子,在log文件中出现:
-
v2 ==== 42+0+0 (3105867923 0 0) 0x48d6bc0 con 0x3d5c9a0
-
-13> 2015-05-30 17:23:06.772212 7f5856cac700 10 mon.jnqfg@2(peon) e3 handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
-
-12> 2015-05-30 17:23:06.772217 7f5856cac700 10 mon.jnqfg@2(peon) e3 check_sub monmap next 4 have 3
-
-11> 2015-05-30 17:23:06.772222 7f5856cac700 10 mon.jnqfg@2(peon).osd e260 check_sub 0x631bdc0 next 0 (onetime)
-
-10> 2015-05-30 17:23:06.773223 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260 src has 1..260) v3 -- ?+0 0x4230b40
-
-9> 2015-05-30 17:23:06.773239 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663 10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0 0x48d6680
-
-8> 2015-05-30 17:23:06.773586 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663 10.16.20.181:0/1138587 4 ==== mon_subscribe({monmap=4+,osdmap=0}) v2 ==== 42+0+0 (3105867923 0 0) 0x48d5180 con 0x3d5c9a0
-
-7> 2015-05-30 17:23:06.773607 7f5856cac700 10 mon.jnqfg@2(peon) e3 handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
-
-6> 2015-05-30 17:23:06.773613 7f5856cac700 10 mon.jnqfg@2(peon) e3 check_sub monmap next 4 have 3
-
-5> 2015-05-30 17:23:06.773620 7f5856cac700 10 mon.jnqfg@2(peon).osd e260 check_sub 0x631a7c0 next 0 (onetime)
-
-4> 2015-05-30 17:23:06.774626 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260 src has 1..260) v3 -- ?+0 0x4232ac0
-
-3> 2015-05-30 17:23:06.774641 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663 10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0 0x48d6bc0
-
-2> 2015-05-30 17:23:06.775345 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663 10.16.20.181:0/1138587 5 ==== mon_command({"prefix": "get_command_descriptions"} v 0) v1 ==== 80+0+0 (3374501561 0 0) 0x477f0e0 con 0x3d5c9a0
-
-1> 2015-05-30 17:23:06.776789 7f5856cac700 1 -- 10.16.20.183:6789/0 --> 10.16.20.181:0/1138587 -- mon_command_ack([{"prefix": "get_command_descriptions"}]=0 v0) v1 -- ?+24689 0x3e62760 con 0x3d5c9a0
-
0> 2015-05-30 17:23:06.821389 7f5859662700 1 do_command 'log dump'
很有意思的是,ceph如何做到 ceph daemon /var/run/ceph/ceph-mon.*asok log dump 就把log dump到日志文件的。 这就牵扯到admin socket机制了。
admin socket 机制并不是一个非常新的机制,这是一个很commmon的设计。
很多进程,设计的时候,需要设计一个手段,能够运行时,接收用户的指令,不能将程序设计成一个黑匣子,相反,要提供一些手段,使是用户或者可以干预进程的运行(如改变配置项),或者可以获取到进程运行的状态信息。
admin socket机制,那是下一篇的任务。
阅读(1054) | 评论(0) | 转发(0) |