11 June 2014

Ceph is great. It is a pure distributed storage system running on commodity hardware. The advantage compared to existing storage systems I summarized below are

  • Scale out with linear performance increase (theoretically) on large amount of nodes.
  • CRUSH map is highly customizable as related to object distribution and cluster layout.
  • The Monitor (internally uses Paxos) achieves cluster autonomous (auto recover of node fail, add/remove nodes, etc).
  • Converge filesystem storage, object storage and block/volume storage into one Ceph.
  • Ceph is high available, strong consistency, linear scalable, partition tolerant. Forget CAP.
  • The almost only one opensource, free, large-scale, stable, full-featured and easy-to-use distributed FS, object and block storage.
  • Highly (almost natively) integrated with Openstack (and Kubernetes, and other cloud platforms). Opensource community love ceph.
  • Ceph has good papers. Read that and you’ll understand Ceph.

I will carry out a code deep dive into ceph next. The version is firefly V0.80 (I didn’t remember the exact version but it should be OK).

Ceph Monitor

Raw deep dive notes below. I will parse that into proper format and language when have time.

1. MonMap 结构很简单,就是
	epoch(整数)
	各个monitor的名字和ip地址

2. 可以研究的东西
	1. lockdep用来检测锁循环
	2. encode机制,有专门的文件encoding.h,用到了ceph::buffe::list

------------------------

1. KeyValueStore这一块
	LevelDBStore中,prefix其实是和key粘在了一起存进去了
	transaction其实是LevelDBStore的Batch操作
	compact指的是合并SSTable,就是LSM-Tree的标准操作
	Leveldb::DB的打开,在LevelDBStore::do_open()中
	MonitorDBStore底层用了LevelDBStore,其构造是文件路径path,在构造函数中传入。

1.1. Mon大量使用MonitorDBStore,即leveldb存储状态信息

2. Messenger这一块
	Dispatcher即指接收消息的人

3. Paxos
	Paxos.h定位是,paxos算法模型+消息发送,数据只是bytes
	PaxosService的定位是,有数据类型的paxos,并提供根据数据类型的一些方法,比如monmap
	尽管难懂,Paxos可以说是最extensively commented的代码了
	dispatch()函数往往是处理流程的核心吗?

   PaxosService->dispatch()->propose_pending()->encode_pending()

	PGMonitor的关键是pending_inc,从encode_pending()中寻找paxos要同步的数据
	OSDMonitor存有crushmap
		tick()中检查OSD状态,和do_propose

	What is MonSession?
	LogMonitor似乎比较简单,适合用来学习
	Tip: 可以通过跟踪state变量的变化,来学习Monitor.cc的代码
	PaxosService的一组类中,似乎不与messenger直接沟通。它们的dispatch()函数由Monitor.cc来调用的,在Monitor::dispatch()中。

4. Paxos规则
	ref: http://duanple.blog.163.com/blog/static/709717672011440267333/
	0. 角色和名词
		Proposer:意为提案者,它可以提出一个提案
		Proposal:提案,由Proposer提出。一个提案由一个编号及value形成的对组成,编号是为了防止混淆保证提案的可区分性,value即代表了提案本身的内容。
		
		Acceptor:是提案的受理者,有权决定是否它本身是否接受该提案
		Choose:提案被选定,在本文中当有半数以上Acceptor接受该提案时,就认为该提案被选定了,被选定的提案
		
		Learner:需要知道被选定的提案信息的那些人

	1. P1: 一个acceptor必须通过(accept)它收到的第一个提案。
	   P1a: 一个acceptor可以接受一个编号为n的提案,只要它还未响应任何编号大于n的prepare请求。

	2. P2: 如果具有value值v的提案被选定(chosen)了,那么所有比它编号更高的被选定的提案的value值也必须是v。
	   P2c: 对于任意的n和v,如果编号为n和value值为v的提案被提出,那么肯定存在一个由半数以上的acceptor组成的集合S,可以满足条件a)或者b)中的一个:
	   a)S中不存在任何的acceptor通过过编号小于n的提案。
	   b)v是S中所有acceptor通过的编号小于n的具有最大编号的提案的value值。
	   
	   P2c决定proposer如何产生proposal
	
	3. proposer如何产生proposal的算法:

		1. proposer选择一个新的提案编号n,然后向某个acceptors集合的成员发送请求,要求acceptor做出如下回应:
			(a).保证不再通过任何编号小于n的提案
			(b).当前它已经通过的编号小于n的最大编号的提案,如果存在的话

		2. 如果proposer收到了来自半数以上的acceptor的响应结果,那么它就可以产生编号为n,value值为v的提案,这里v是所有响应中编号最大的提案的value值,如果响应中不包含任何的提案那么这个值就可以由proposer任意选择。

		我们把这样的一个请求称为编号为n的prepare请求。

		Proposer通过向某个acceptors集合发送需要被通过的提案请求来产生一个提案(此时的acceptors集合不一定是响应prepare阶段请求的那个acceptors集合)。我们称此请求为accept请求。

	4. acceptor如何响应上述算法?

	   Acceptor可以忽略任何请求而不用担心破坏其算法的安全性。
	   Acceptor必须记住这些信息即使是在出错或者重启的情况下。
	   Proposer可以总是可以丢弃提案以及它所有的信息—只要它可以保证不会产生具有相同编号的提案即可。
	
	5.  将proposer和acceptor放在一块,我们可以得到算法的如下两阶段执行过程:

		Phase1.(a) proposer选择一个提案编号n,然后向acceptors的某个majority集合的成员发送编号为n的prepare请求。

		(b).如果一个acceptor收到一个编号为n的prepare请求,且n大于它已经响应的所有prepare请求的编号。那么它就会保证不会再通过(accept)任何编号小于n的提案,同时将它已经通过的最大编号的提案(如果存在的话)作为响应{!?此处隐含了一个结论,最大编号的提案肯定是小于n的}。

		Phase2.(a)如果proposer收到来自半数以上的acceptor对于它的prepare请求(编号为n)的响应,那么它就会发送一个针对编号为n,value值为v的提案的accept请求给acceptors,在这里v是收到的响应中编号最大的提案的值,如果响应中不包含提案,那么它就是任意值。

		(b).如果acceptor收到一个针对编号n的提案的accept请求,只要它还未对编号大于n的prepare请求作出响应,它就可以通过这个提案。	

	6. 很容易构造出一种情况,在该情况下,两个proposers持续地生成编号递增的一系列提案。
	   为了保证进度,必须选择一个特定的proposer来作为一个唯一的提案提出者。

	   如果系统中有足够的组件(proposer,acceptors及通信网络)工作良好,通过选择一个特定的proposer,活性就可以达到。著名的FLP结论指出,一个可靠的proposer选举算法要么利用随机性要么利用实时性来实现—比如使用超时机制。然而,无论选举是否成功,安全性都可以保证。{!即即使同时有2个或以上的proposers存在,算法仍然可以保证正确性}

	7. 不同的proposers会从不相交的编号集合中选择自己的编号,这样任何两个proposers就不会有相同编号的提案了。

	8. 关于leader election算法:http://csrd.aliapp.com/?p=162

-------------------------------------------------

1. monitor的paxos
	1. 所有的数据存在MonitorDBStore中,实际上是leveldb
	2. Paxos <- PaxosService <- MonmapService, OSDService ... <- Monitor

2. Monitor walkthrough
	[Mon0]
	init()
		bootstrap()
			state = STATE_PROBING
			send_message(new MMonProbe(OP_PROBE..)..)
	
	[Mon2]
	dispatch()
		handle_probe_probe()
			send_message(new MMonProbe(OP_REPLY..)..)
					

	[Mon0]
	dispatch()
		handle_proble_reply()
			if newer monmap
				use new monmap
				bootstrap()
			if ...
				bootstrap()
			if paxos->get_version() < m->paxos_first_version && m->paxos_first_version > 1 // my paxos verison is too low
				sync_start()
			if paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version
				sync_start()
			if I'm part of cluster
				start_election()
			if outside_quorum.size() >= monmap->size() / 2 + 1
				start_election()
				
----------------------------------------------------------
	[election process] 编号小的mon胜利(entity_name_t._num,也是mon->rank)
		1. 每个Elector都向其它人发proposal,申请自己是leader
		2. 每个Elector收到proposal,leader_acked, m->get_source().num(), 自己mon->rank,谁小就defer到谁。
		   defer()会发送OP_ACK消息
		3. 收到ACK的Elector,会检查如果acked_me.size() == mon->monmap->size(),则victory()

	start_election()
		elector.call_election()
			 if (epoch % 2 == 0) 
			    bump_epoch(epoch+1)
			electing_me = true;
			broadcast to all
				send_message(new MMonElection(OP_PROPOSE, epoch, mon->monmap))

	Monitor.dispatch()
		case MSG_MON_ELECTION:
			elector.dispatch(m)
				if (peermap->epoch > mon->monmap->epoch)
					mon->monmap->decode(em->monmap_bl)
					mon->bootstrap()
				 switch (em->op)
					case MMonElection::OP_PROPOSE:
						handle_propose(em)
							if ignoring propose without required features
								nak_old_peer()
								return
							if (m->epoch > epoch)
								bump_epoch()
									mon->join_election()
							if (m->epoch < epoch) // got an "old" propose
								...
								return
							if (mon->rank < from) // i would win over them.
								...
							else 
								defer(from)
									send_message(new MMonElection(OP_ACK, epoch, mon->monmap), from)

	Elector.dispatch()
		case MMonElection::OP_ACK:
			handle_ack(em)
				if (m->epoch > epoch)
					bump_epoch(m->epoch);
    					start()
					return
				if (electing_me) // thanks
					 if (acked_me.size() == mon->monmap->size())
      						victory()
							change cmd set
							for each one
								send_message(new MMonElection(OP_VICTORY, epoch, mon->monmap), mon->monmap->get_inst(*p))
							mon->win_election()
								state = STATE_LEADER
								paxos->leader_init()
								monmon()->election_finished()
								
	Elector.dispatch()
		case MMonElection::OP_VICTORY:
			handle_victory()
				mon->lose_election()
				stash leader's commands


---------------------

1. Mon sync_start() 
/*同步的内容是paxos->get_version(), 整个

*/

[mon0]
sync_start()
	state = STATE_SYNCHRONIZING
	sync_provider = other
	send_message(new MMonSync(sync_full?OP_GET_COOKIE_FULL:OP_GET_COOKIE_RECENT), sync_provider)

[mon1]
dispatch()
	handle_sync_get_cookie()
		MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
  		reply->last_committed = sp.last_committed;
  		messenger->send_message(reply, m->get_connection());

[mon0]
dispatch()
	handle_sync()
		handle_sync_cookie()
			sync_cookie = m->cookie;
  			sync_start_version = m->last_committed;
  			MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
  			messenger->send_message(r, sync_provider);

[mon1]
dispatch()
	handle_sync()
		handle_sync_get_chunk()
			MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
			
			MonitorDBStore::Transaction tx;
			tx.put(paxos->get_name(), sp.last_committed, bl);
			sp.synchronizer->get_chunk_tx(tx, left);	// 拷贝整个MonitorDBStore
			::encode(tx, reply->chunk_bl);
			
			if no next chunk
				reply->op = MMonSync::OP_LAST_CHUNK;

			messenger->send_message(reply, m->get_connection());

[mon0]
dispatch()
	handle_sync()
		handle_sync_chunk()
			MonitorDBStore::Transaction tx;
			tx.append_from_encoded(m->chunk_bl);
			store->apply_transaction(tx);

			if OP_LAST_CHUNK
				sync_finish(m->last_committed);
					init_paxos();
					bootstrap();

---------------------------------------------------------

1. Paxos & PaxosService

	1. PaxosService::propose_pending()调用Paxos::propose_new_value(),称作commit。
	   MonmapService之类的都通过propose_ending()实现提交,不需要直接调用propose_new_value()。

	   propose_pending()中调用了encode_pendine()。
	   PaxosService::encode_pending()抽象函数,由子类覆盖。通过它能找到子类负责什么样的数据。

	2. Monitor::preinit()中,调用了
			paxos->init();
			for (int i = 0; i < PAXOS_NUM; ++i) {
				paxos_service[i]->init();
			}

  		Monitor::_reset中,调用了
  			paxos->restart();
  			for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
    			(*p)->restart();

  		Monitor::win_election()中,调用了
  			paxos->leader_init()
  			monmon()->election_finished();
			for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
				if (*p != monmon())
					(*p)->election_finished();
			}

  		Monitor::lose_election()中,调用了
  			paxos->peon_init()
  			for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
    			(*p)->election_finished();

1.5. Paxos leader collect
	leader_init()
		...
		collect(0);

	[mon0]
	collect(0)   //leader
		state = STATE_RECOVERING;
		
		// look for uncommitted value
  		if (get_store()->exists(get_name(), last_committed+1)) {
  			version_t v = get_store()->get(get_name(), "pending_v");
    		version_t pn = get_store()->get(get_name(), "pending_pn");
    		uncommitted_pn = pn;
    		uncommitted_v = last_committed+1;
    		get_store()->get(get_name(), last_committed+1, uncommitted_value);
    	}

    	// pick new pn
  		accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));

  		// send collect
  		for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) {
		    if (*p == mon->rank) continue;
		    
		    MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, ceph_clock_now(g_ceph_context));
		    collect->last_committed = last_committed;
		    collect->first_committed = first_committed;
		    collect->pn = accepted_pn;
		    mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
		}

	[mon1]
	handle_collect()	//peon
		state = STATE_RECOVERING

		MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, ceph_clock_now(g_ceph_context));
  		last->last_committed = last_committed;
  		last->first_committed = first_committed;

  		// can we accept this pn?
  		if (collect->pn > accepted_pn) {
  			accepted_pn = collect->pn;
  			MonitorDBStore::Transaction t;
  			t.put(get_name(), "accepted_pn", accepted_pn);
  		}

  		// share whatever committed values we have
  		if (collect->last_committed < last_committed)
    		share_state(last, collect->first_committed, collect->last_committed)	// 把我的过去多个commit放到了last中
    			for ( ; v <= last_committed; v++) {
					if (get_store()->exists(get_name(), v)) {
						get_store()->get(get_name(), v, m->values[v]);
					}
				}
    			m->last_committed = last_committed;

    	// do we have an accepted but uncommitted value?
  		//  (it'll be at last_committed+1)	
  		if (collect->last_committed <= last_committed && get_store()->exists(get_name(), last_committed+1)) {
  			get_store()->get(get_name(), last_committed+1, bl);
  			last->values[last_committed+1] = bl;
  			version_t v = get_store()->get(get_name(), "pending_v");
    		version_t pn = get_store()->get(get_name(), "pending_pn");
    		last->uncommitted_pn = pn;
  		}

  		// send reply
  		mon->messenger->send_message(last, collect->get_source_inst());

  	[mon0]
  	handle_last() 	// leader
  		// store any committed values if any are specified in the message
  		need_refresh = store_state(last);

  		// do they accept your pn?
  		if (last->pn > accepted_pn) {
  			// no, try again
  			collect(last->pn);
  		} else if (last->pn == accepted_pn) {
  			// yes, they do. great!
  			num_last++;

  			// did this person send back an accepted but uncommitted value?
  			if (last->uncommitted_pn) {
		    if (last->uncommitted_pn >= uncommitted_pn && last->last_committed >= last_committed && last->last_committed + 1 >= uncommitted_v) {
		    	// we learned an uncommitted value
				uncommitted_v = last->last_committed+1;
				uncommitted_pn = last->uncommitted_pn;
				uncommitted_value = last->values[uncommitted_v];
		      }
		    }

		    // is that everyone?
		    if (num_last == mon->get_quorum().size()) {
		    	// share committed values?
				for (map<int,version_t>::iterator p = peer_last_committed.begin(); p != peer_last_committed.end(); ++p) {
					if (p->second < last_committed) {
						// share committed values
					MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, ceph_clock_now(g_ceph_context));
					share_state(commit, peer_first_committed[p->first], p->second);
					mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
				}
		    }

		    // did we learn an old value?
      		if (uncommitted_v == last_committed+1 && uncommitted_value.length()) {
				state = STATE_UPDATING_PREVIOUS;
				begin(uncommitted_value);
			} else{
				finish_round();
					state = STATE_ACTIVE
			}

  		} else {
  			// this is an old message, discard
  		}

2. Paxos proposal
	PaxosService::dispatch(m)
		preprocess_query(PaxosServiceMessage* m)
		if (!mon->is_leader()) {
			mon->forward_request_leader(m);
			return true;
		}
		prepare_update(m)
		if (should_propose(delay)) {
      		if (delay == 0.0) {
				propose_pending();
      	}

    [mon0]
	PaxosService::propose_pending()
		Paxos::propose_new_value()
			queue_proposal(bl, onfinished);
			proposed_queued()
				C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
				proposal->proposed = true;
				state = STATE_UPDATING;
				begin(proposal->bl);	//leader
					// accept it ourselves
  					accepted.clear();
  					accepted.insert(mon->rank);
  					new_value = v;

  					// store the proposed value in the store.
  					MonitorDBStore::Transaction t;
  					t.put(get_name(), last_committed+1, new_value);
  					t.put(get_name(), "pending_v", last_committed + 1);
  					t.put(get_name(), "pending_pn", accepted_pn);
  					get_store()->apply_transaction(t);

  					// ask others to accept it too!
					for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) {
						if (*p == mon->rank) continue;
						
						MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, ceph_clock_now(g_ceph_context));
						begin->values[last_committed+1] = new_value;
						begin->last_committed = last_committed;
						begin->pn = accepted_pn;
						
						mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
					}

					// set timeout event
  					accept_timeout_event = new C_AcceptTimeout(this);
  					mon->timer.add_event_after(g_conf->mon_accept_timeout, accept_timeout_event); // 如果accept长时间未完成,则触发accept_timeout

	[mon1..n]
	handle_begin()	//peon
		if (begin->pn < accepted_pn) {return;}
		state = STATE_UPDATING;

		version_t v = last_committed+1;
		MonitorDBStore::Transaction t;
		t.put(get_name(), v, begin->values[v]);
		t.put(get_name(), "pending_v", v);
  		t.put(get_name(), "pending_pn", accepted_pn);
  		get_store()->apply_transaction(t);

  		MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
				    ceph_clock_now(g_ceph_context));
	  	accept->pn = accepted_pn;
	  	accept->last_committed = last_committed;
	  	mon->messenger->send_message(accept, begin->get_source_inst());


	[mon0]
	handle_accept()	//leader
		accepted.insert(from);
		// new majority?
		if (accepted.size() == (unsigned)mon->monmap->size()/2+1) {
			commit();
				MonitorDBStore::Transaction t;
				// commit locally
  				last_committed++;
  				last_commit_time = ceph_clock_now(g_ceph_context);
  				t.put(get_name(), "last_committed", last_committed);

  				for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) {
					if (*p == mon->rank) continue;

					MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, ceph_clock_now(g_ceph_context));
					commit->values[last_committed] = new_value;
					commit->pn = accepted_pn;
					commit->last_committed = last_committed;
					mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
				}

			do_refresh()  // to notify PaxosService subclasses 
				...
			commit_proposal()
				C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
				proposals.pop_front();
				proposal->complete(0);

		// done?
  		if (accepted == mon->get_quorum()) {
  			extend_lease();
  				lease_expire = ceph_clock_now(g_ceph_context);
  				lease_expire += g_conf->mon_lease;
  				acked_lease.clear();
  				acked_lease.insert(mon->rank);

				for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) {
					if (*p == mon->rank) continue;
					
					MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, ceph_clock_now(g_ceph_context));
					lease->last_committed = last_committed;
					lease->lease_timestamp = lease_expire;
					lease->first_committed = first_committed;
					mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
				}

  			finish_round();
  				state = STATE_ACTIVE;
  		}

  	[mon1..n]
  	handle_commit(MMonPaxos *commit)
  		store_state(commit)
  			start, end = ... // we want to write the range [last_committed, m->last_committed] only.
  			for (it = start; it != end; ++it) {
				t.put(get_name(), it->first, it->second);
				decode_append_transaction(t, it->second);
		    }
		    get_store()->apply_transaction(t);

  		do_refresh()

  	/*
  	I guess
  		last_committed表示paxos算法instance
  		version_t表示一个算法instance内,proposal的编号

  		如果accept长时间未完成,则触发accept_timeout
  		如果peon长时间为达成一致accept,那么extend_lease()就不会为它们执行,它们会发生lease_timeout

  		Monitor所用的paxos似乎是一种改进版的paxos。
  			首先保证有且仅有一个leader。
  			然后phase1只需要在leader初始时运行一次。
  			之后的propose只需要phase2。
  	*/

  	------------------ OP_LEASE process ----------------
  	[mon0]
  	extend_lease();	// extend lease of other mon

  	[mon1..n]
  	handle_lease()
  		lease_expire = lease->lease_timestamp;
  		state = STATE_ACTIVE;

		// ack
		MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, ceph_clock_now(g_ceph_context));
		ack->last_committed = last_committed;
		ack->first_committed = first_committed;
		ack->lease_timestamp = ceph_clock_now(g_ceph_context);
		mon->messenger->send_message(ack, lease->get_source_inst());

		// (re)set timeout event.
  		reset_lease_timeout();

  	[mon0]
  	handle_lease_ack()
  		if (acked_lease == mon->get_quorum()) {
      		mon->timer.cancel_event(lease_ack_timeout_event);
      		lease_ack_timeout_event = 0;
      	}

    ---------------- OP_ACCEPT timeout ----------------

    void Paxos::accept_timeout()
		mon->bootstrap();

    -----------------How paxos value is read -----------------

    Paxos::handle_last() or handle_accept() or handle_commit() in the end
    	Paxos::do_refresh()
	    	mon->refresh_from_paxos(&need_bootstrap);
				for (int i = 0; i < PAXOS_NUM; ++i) {
					paxos_service[i]->refresh(need_bootstrap);
						// update cached versions
	  					cached_first_committed = mon->store->get(get_service_name(), first_committed_name);
	  					cached_last_committed = mon->store->get(get_service_name(), last_committed_name);

	  					update_from_paxos(need_bootstrap)			// implemented by subclasses, below use code of MonmapMonitor
	  						version_t version = get_last_committed();
	  						int ret = get_version(version, monmap_bl);	
	  						mon->monmap->decode(monmap_bl);
				}
				for (int i = 0; i < PAXOS_NUM; ++i) {
					paxos_service[i]->post_paxos_update()		// implemented by subclasses, below use code of MonmapMonitor
						// 什么都没写
				}


	/*
		假如不是MonmapMonitor的commit,MonmapMonitor也给refresh了怎么办?
			update_from_paxos()中get_version()对应的put_version()在encode_pending()中。
			get_version()并不是直接从paxos中拿,而是从get(get_service_name(), ver, bl)的get_service_name()中拿
	*/

	----------------- MonClient how to get ----------------

	MonClient::get_monmap()
		_sub_want("monmap", 0, 0);

		 while (want_monmap)
    		map_cond.Wait(monc_lock);

    [MonClient]
    MonClient::_reopen_session()
    	if (!sub_have.empty())
    		_renew_subs();
    			MMonSubscribe *m = new MMonSubscribe;
   				m->what = sub_have;
    			_send_mon_message(m);

    [Monitor]
    dispatch()
    	handle_subscribe()
    		for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin(); p != m->what.end(); ++p){
    			session_map.add_update_sub(s, p->first, p->second.start, p->second.flags & CEPH_SUBSCRIBE_ONETIME, m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
    		}

    [OSDMonitor]
    OSDMonitor::update_from_paxos()
    	check_subs()
    		check_sub()
    			send_incremental(sub->next, sub->session->inst, sub->incremental_onetime);

    [MDSMonitor]
    同OSDMonitor

-------------------------

    [MonClient]
    MonClient::get_monmap_privately()
    	messenger->send_message(new MMonGetMap, cur_con)

    [Monitor]
    dispatch()
    	case CEPH_MSG_MON_GET_MAP:
      		handle_mon_get_map(static_cast<MMonGetMap*>(m));
      			send_latest_monmap(m->get_connection().get());
      				messenger->send_message(new MMonMap(bl), con);

    [MonClient]
    ms_dispatch()
    	case CEPH_MSG_MON_MAP:
    		handle_monmap(static_cast<MMonMap*>(m));
    			::decode(monmap, p);
    			map_cond.Signal();

The large state charts

Ceph Monitor States

Ceph OSD

Raw deep dive notes below. I will parse that into proper format and language when have time.

1. Context是回调函数
	C_Context这个类是把一组Context聚合成一个回调函数

2. coll_t实际上代表了一个目录,目录中是对象的集合
	ref: http://www.cnblogs.com/D-Tec/archive/2013/03/01/2939254.html

3. questions
	1. whay does sync do?
	2. why each store class implement themselves Transaction?
	3. what is omap?

4. MemStore
	1. queue_transaction()发起操作
	2. 操作由_do_transaction()完成
	3. MemStore中定义了自己的Collector类型和Object类型。数据结构是MemStore中存一个coll_t->Collection的map,Collection中存ghobject_t->Object的map。
	4. 父类ObjectStore中,只有coll_t,ghobject_t这样的东西,相当于id号,没有collection、object的实际数据结构,没有object_t出现。

5. ghobject_t
	1. 定义在hobject.h
	2. ghobject_t -> hobject_t -> object_t -> string name 可能就是一个文件路径

6. KeyValueStore
	Component: StripObjectMap -> GenericObjectMap->KeyValueDB
			   继承树:KeyValueDB
				    ->LevelDBStore
			   而MonitorDBStore内部使用了LevelDBStore
			   StripObjectMap中的KeyValueDB,是从构造函数中传入的
		
	在KeyValueStore中,StripObjectMap被称作backend
		GenericObjectMap似乎是围绕这header在搞
		header有parent属性
	
	throttle机制,KeyValueStore.cc line:1018,有借鉴意义
	
	queue_transactions()->op_queue_reserve_throttle()
	->queue_op()
	->op被放到OpSequencer中,->OpSequencer被OpWQ包一起来->OpWQ._enqueue()把OpSequence放入线程池ThreadPool
	->OpWQ._process()->KeyValueStore._do_op()->osr->peek_queue()取出op->_do_transactions(o->tls, o->op, &handle::TPHandle)
	->创建BufferTransaction->调用_do_transaction(),传入的是transaction而不是tls
	->终于能看见按op类型的switch case了

	->看一下_write()操作
	->BufferTransaction去lookup_cached_header->_generic_write()
	->file_to_extents()进行了strip分割
	-> ...

7. OSDMap中,包含了crush

8. questions
	1. how replication worked?
	2. how strong sync is achieved?
	3. snap shot how?
	4. osd gossip?
	5. osd auto recovery?
	6. what is the IO path?
	7. recovery and backfill process.
	8. snapshot process.
	9. what is OSD's superblock?
	10. what does OSDService do?
	11. what does PG's upgrade mean? seen in OSD::load_pgs()
	12. PG到底是怎么处理CephPeeringEvent的?
	13. OSD怎么做到增量写、thin provision的?
	14. split?
	15. what does pg's parent mean?
	16. what is the object's ondisk file/kv structure, including snap?
	    ref: https://ceph.com/docs/master/dev/osd_internals/snaps/
	17. what is object's generation

9. good points to learn
	1. how OSD/PG heartbeat is monitored
	2. how OSD health is monitored
	3. there are many checks and asserts embedded
	4. used a lot for waiting lists, such as OSD::waiting_for_osdmap, OSD::waiting_for_pg, OSD::pending_splits
	5. OSD::7531 -> op->mark_reached_pg(); 这追踪到TrackedOp::mark_event()。它最终写了一条日志,方便追踪op的轨迹。
	   我们可以借鉴它,不一定写日志,但追踪op的执行路径
	6. Messenger throttle机制
	   这是一个好的pattern。分布式存储系统中,如果recovery、scrubing、replication、rebalance等流量不加throttle,很可能significant performance regression
	7. reservation机制。
	   https://github.com/ceph/ceph/blob/master/doc/dev/osd_internals/backfill_reservation.rst
	   backfill reservation:如果所有的backfill同时发生,那么就会把目标机淹死。reservation使得同时进行的backfill数量得到限制。

10. 重复的pg
	1. 一个pg从属于一个pool
	2. 由pg_id、pg_t区分的一个pg,实际上是指有3份拷贝的一组pg
	   而3份拷贝中的每一份单独的pg,由spg_t类型表示

11. 可以研究的东西
	1. SafeTimer

12. material
	1. placement group states:
		http://ceph.com/docs/master/rados/operations/pg-states/

--------------------------------------------------------
[OSD Flow Tracing]

1. OSD msg dispatching

OSD::ms_dispatch()
	do_waiters()	// while !finished.empty(), do dispatch_op(next)
	_dispatch(m)
		// -- don't need lock --
		case CEPH_MSG_PING:
			break;
	
		// -- don't need OSDMap --
		case CEPH_MSG_OSD_MAP:
			handle_osd_map(static_cast<MOSDMap*>(m))
				to 2.1
		case CEPH_MSG_SHUTDOWN:
			shutdown();
		case MSG_PGSTATSACK:
			handle_pg_stats_ack(MPGStatsAck *ack)
				to 3.1
		case MSG_MON_COMMAND:
    		handle_command(static_cast<MMonCommand*>(m));
    			to 4
	  	case MSG_COMMAND:
    		handle_command(static_cast<MCommand*>(m));
    			to 4
    	case MSG_OSD_SCRUB:
			handle_scrub(static_cast<MOSDScrub*>(m));
			   	to 5.1
  		case MSG_OSD_REP_SCRUB:
    		handle_rep_scrub(static_cast<MOSDRepScrub*>(m));
    			to 5.4

		// -- need OSDMap --
		default:
			dispatch_op(op);

2. OSD OSDMap updating process

	2.1. when receive osd map message

		handle_osd_map(static_cast<MOSDMap*>(m))
			if (first > osdmap->get_epoch() + 1)	// missing some epoch of OSDMap
				osdmap_subscribe(..)				

			// store new maps: queue for disk and put in the osdmap cache
			for (epoch_t e = start; e <= last; e++)
				t.write(coll_t::META_COLL, fulloid, 0, bl.length(), bl);

			// update superblock
			superblock.oldest_map = first;
			superblock.newest_map = last;

			// advance through the new maps
			for (epoch_t cur = start; cur <= superblock.newest_map; cur++)
				// start blacklisting messages sent to peers that go down.
			service.pre_publish_map(newmap);

			// kill connections to newly down osds
			...

			osdmap = newmap;

			advance_map(t, fin);	// since crushmap is different now, I need update my pg
				ceph::unordered_map<spg_t, create_pg_info>::iterator n = creating_pgs.begin();
				while (n != creating_pgs.end())
					// am i still primary?
					if (primary != whoami)
						creating_pgs.erase(p);

				// scan for waiting_for_pg
				map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
					while (p != waiting_for_pg.end())
						int role = osdmap->calc_pg_role(whoami, acting, nrep);		// my osd rank in the pg's acting osds
						...

			// the only place to change state = ACTIVE
			if (is_booting())
				state = ACTIVE

			// check am I in osdmap? 
			if state == ACTIVE
				if (!osdmap->exists(whoami))
					do_shutdown = true
				else if (!osdmap->is_up(whoami) || addr wrong) 		// if something wrong
					... stop, or start_waiting_for_healthy()

			// superblock and commit
			write_superblock(t);
			store->queue_transaction(0, _t, new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()), 0, fin);
			service.publish_superblock(superblock);

			// yay!
			consume_map();
				// scan pg's, to count num_pg_primary, num_pg_replica, num_pg_stray, and find which pg need to be removed
				for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin(); it != pg_map.end()
					...
				// remove pg
				for (..)
					_remove_pg(&**i);
				// scan pg's
				for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin(); it != pg_map.end()
					pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());


			if (!is_active()) {
				peering_wq.drain();
			} else {
				activate_map();
					wake_all_pg_waiters();
					// norecover?
					...
					service.activate_map();
			}

			// end
			if (m->newest_map && m->newest_map > last) {
				osdmap_subscribe(osdmap->get_epoch()+1, true);
			} else if (is_booting()) {
				start_boot();  // retry
			} else if (do_restart)
				start_boot();

			if (do_shutdown)
				shutdown();

	2.2. where osdmap is sent out 

		handle_replica_op() or handle_op()
			_share_map_incoming(..)
				send_incremental_map(epoch, con)

		handle_osd_ping() or do_notifies() or do_queries() or do_infos() or handle_pg_query()
			_share_map_outgoing(..)
				send_incremental_map(pe, con)				
	
3. send pg status to mon, and receive pg status ack on osd

	3.1. when MSG_PGSTATSACK comes

		handle_pg_stats_ack(MPGStatsAck *ack)
				if (!require_mon_peer(ack))
					return;		// the other end of msg must be mon
				xlist<PG*>::iterator p = pg_stat_queue.begin();
  				while (!p.end())
  					pg->stat_queue_item.remove_myself();

  	3.2. where MSG_PGSTATSACK sends out

  		in OSD.cc
  		flush_pg_stats() or do_mon_report() or ms_handle_connect()
  		send_pg_stats(..)
  			MPGStats *m = new MPGStats(monc->get_fsid(), osdmap->get_epoch(), had_for);
  			xlist<PG*>::iterator p = pg_stat_queue.begin();
  			while (!p.end())
  				if (pg->pg_stats_publish_valid)
  					m->pg_stat[pg->info.pgid.pgid] = pg->pg_stats_publish;

  			monc->send_mon_message(m);

  		in PGMonitor.cc
  		prepare_update(PaxosServiceMessage *m)
  			case MSG_PGSTATS:
  				return prepare_pg_stats((MPGStats*)m);
  					MPGStatsAck *ack = new MPGStatsAck;
  					...
  					mon->send_reply(stats, ack);

  			case MSG_MON_COMMAND:
  				...

4. handle command 

	handle_command(static_cast<MMonCommand*>(m)) or handle_command(static_cast<MCommand*>(m));
		command_wq.queue(c);

	OSD::CommandWQ
		void _process(Command *c)
			osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
				... 	// handle cli commands
				MCommandReply *reply = new MCommandReply(r, rs);
				client_messenger->send_message(reply, con);

5.  scrubbing process

	5.1. handle MSG_OSD_SCRUB
		handle_scrub(MOSDScrub *m)
			if (!require_mon_peer(m))		// must be sent from mon
	    		return;
	    	
	    	if (m->scrub_pgs.empty())
	    		for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin(); p != pg_map.end(); ++p)
	    			if (pg->is_primary())
	    				pg->unreg_next_scrub();
	    				pg->scrubber.must_scrub = true;
	    				pg->reg_next_scrub();
	    					osd->reg_last_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);
	    						last_scrub_pg.insert(pair<utime_t,spg_t>(t, pgid));
	    	else
	    		for (vector<pg_t>::iterator p = m->scrub_pgs.begin(); p != m->scrub_pgs.end(); ++p)
	    			if (osdmap->get_primary_shard(*p, &pcand) && pg_map.count(pcand)
	    				PG *pg = pg_map[pcand];			// to get primary pg
	    			if (pg->is_primary())
	    				pg->unreg_next_scrub();
	    				pg->scrubber.must_scrub = true;
	    				pg->reg_next_scrub();

	5.2. when will last_scrub_pg be cancelled

		OSD::handle_scrub(MOSDScrub *m) or ReplicatedPG::on_shutdown()
			PG::unreg_next_scrub()
				osd->unreg_last_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);		// OSDService::unreg_last_pg_scrub(spg_t pgid, utime_t t)
					last_scrub_pg.erase(it);

	5.3. when will scrubbing happen

		OSD::sched_scrub()
			pg->sched_scrub()
				queue_scrub();
					state_set(PG_STATE_SCRUBBING);
					osd->queue_for_scrub(this);
						scrub_wq.queue(pg);

		OSD::ScrubWQ
			void _process(..)
				pg->scrub(handle);
					if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
					    state_clear(PG_STATE_SCRUBBING);
					    state_clear(PG_STATE_REPAIR);
					    state_clear(PG_STATE_DEEP_SCRUB);
					    publish_stats_to_osd();
					    return;
					}

					// when we're starting a scrub, we need to determine which type of scrub to do
					if (!scrubber.active)
						scrubber.is_chunky = true;
						if (!con->has_feature(CEPH_FEATURE_CHUNKY_SCRUB))
							scrubber.is_chunky = false;

					// do scrubbing
					if (scrubber.is_chunky) {
						chunky_scrub(handle);
					} else {
						classic_scrub(handle);
					}

		/*
			the next scrubbing process is handled by PG::Scrubber
			the process is a bit complex
		*/

	5.4. handle MSG_OSD_REP_SCRUB
		handle_rep_scrub(static_cast<MOSDRepScrub*>(m))		//rep means replica
		 	rep_scrub_wq.queue(m);

		 OSD::RepScrubWQ
		 	void _process()
		 		PG *pg = osd->_lookup_lock_pg(msg->pgid);
		 		pg->replica_scrub(msg, handle);

		/*
		  Guessing:
		  	MOSDRepScrub or MSG_OSD_REP_SCRUB comes in scrubing process, to request replica to do scrubing
		*/

	5.5. where MOSDScrub msg comes from

		OSDMonitor::preprocess_command()		// handle cli commands
			...
			else if ((prefix == "osd scrub" || prefix == "osd deep-scrub" || prefix == "osd repair"))
				mon->try_send_message(new MOSDScrub(osdmap.get_fsid(), pvec.back() == "repair", pvec.back() == "deep-scrub"), osdmap.get_inst(osd));

		PGMonitor::preprocess_command(MMonCommand *m)		// handle cli commands
			...
			else if (prefix == "pg scrub" || prefix == "pg repair" || prefix == "pg deep-scrub") 
	     		mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs, scrubop == "repair", scrubop == "deep-scrub"), mon->osdmon()->osdmap.get_inst(osd));

	    /*
	    	They come from user cli commands
	    */

6. OSD startup

	6.1. OSD booting

		[osd]
		OSD::start_boot()
			C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
			monc->get_version("osdmap", &c->newest, &c->oldest, c);
				...
				C_OSD_GetVersion::finish()
					osd->_maybe_boot(oldest, newest);
						if (osdmap->test_flag(CEPH_OSDMAP_NOUP))
							log ...
						else if (is_waiting_for_healthy() || !_is_healthy())
							if (!is_waiting_for_healthy())
	      						start_waiting_for_healthy();
	      					heartbeat_kick();
	      				else if osdmap->get_epoch() >= oldest - 1 && osdmap->get_epoch() + cct->_conf->osd_map_message_max > newest
	      					_send_boot();
	      						MOSDBoot *mboot = new MOSDBoot(superblock, boot_epoch, hb_back_addr, hb_front_addr, cluster_addr);
	      						_collect_metadata(&mboot->metadata);
	      						monc->send_mon_message(mboot);
	      					return

	      				// get all the latest maps
	      				// subscribe后,将会得到Monitor发来的MOSDMap消息。MOSDMap消息Monitor和OSD都可以发。MOSDMap到handle_osd_map()中,又会触发start_boot()
						if (osdmap->get_epoch() > oldest)
							osdmap_subscribe(osdmap->get_epoch() + 1, true);
						else
							osdmap_subscribe(oldest - 1, true);
		/*
			start_boot()中,
				如果OSDMap版本够新,则
					monc->set_mon_message(MOSDBoot mboot)
					Monitor给MOSDMap消息,包含最新OSDMap
				如果不够,则
					osdmap_subscribe(...)
					Monitor发送OSD订阅的MOSDMap消息
					handle_osd_map(MOSDMap m)
						...
						start_boot()
					重新又绕回到start_boot()
		*/

		// after moc->send_mon_message(MOSDBoot mboot)
		[mon]
		PaxosService::dispatch()
			OSDMonitor::preprocess_query()
				preprocess_boot()
					// already booted?
					if (osdmap.is_up(from) && osdmap.get_inst(from) == m->get_orig_source_inst())
						_booted(m, false)
							send_latest(m, m->sb.current_epoch+1);
								if (start == 0)
									send_full(m);
										mon->send_reply(MOSDMap *m)
								else
									send_incremental(m, start);
										mon->send_reply(MOSDMap *m)
						return true
					// noup?
					if (!can_mark_up(from))
						send_latest(m, m->sb.current_epoch+1);
						return true

		// 最终从Monitor发回MOSDMap消息
		[osd]
		OSD::handle_osd_map(..)
			...
			state = STATE_ACTIVE
			...

	6.2. init processes

		// 无资源分配
		OSD::pre_init()
			cct->_conf->add_observer(this);

		OSD::init()
			store->mount();
			read_superblock();
				store->read(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, 0, bl);
				::decode(superblock, p);
			
			// make sure info object exists
			if (!store->exists(coll_t::META_COLL, service.infos_oid))
				t.touch(coll_t::META_COLL, service.infos_oid);
				r = store->apply_transaction(t);

			// make sure snap mapper object exists
			...

			// lookup "current" osdmap
			osdmap = get_map(superblock.current_epoch);
				return service.get_map(e);
					OSDMapRef ret(try_get_map(e));
						OSDService::try_get_map(epoch_t epoch)
							OSDMapRef retval = map_cache.lookup(epoch);
							if (retval)
								return retval
							OSDMap *map = new OSDMap;
							_get_map_bl(epoch, bl)
								 store->read(coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
							map->decode(bl);
							return _add_map(map);
								OSDMapRef l = map_cache.add(e, o);

			// load up pgs (as they previously existed)
  			load_pgs();
  				set<spg_t> head_pgs;
  				map<spg_t, interval_set<snapid_t> > pgs;
  				for (vector<coll_t>::iterator it = ls.begin(); it != ls.end(); ++it)
  					pgs[pgid].insert(snap);
  					head_pgs.insert(pgid);

  				bool has_upgraded = false;
  				for (map<spg_t, interval_set<snapid_t> >::iterator i = pgs.begin(); i != pgs.end(); ++i)
  					spg_t pgid(i->first);

  					epoch_t map_epoch = PG::peek_map_epoch(store, coll_t(pgid), service.infos_oid, &bl);
  					PG *pg = _open_lock_pg(map_epoch == 0 ? osdmap : service.get_map(map_epoch), pgid);
  						PG* pg = _make_pg(createmap, pgid);
  							PGPool pool = _get_pool(pgid.pool(), createmap);
  							pg = new ReplicatedPG(&service, createmap, pool, pgid, logoid, infooid);
  						pg->lock(no_lockdep_check);

  					// read pg state, log
    				pg->read_state(store, bl);

    				if (pg->must_upgrade())
    					has_upgraded = true;
    					pg->upgrade(store, i->second);

    				service.init_splits_between(pg->info.pgid, pg->get_osdmap(), osdmap);
    				pg->reg_next_scrub();

    				pg->get_osdmap()->pg_to_up_acting_osds(pgid.pgid, &up, &up_primary, &acting, &primary); 
			        pg->init_primary_up_acting(up, acting, up_primary, primary);

			        int role = OSDMap::calc_pg_role(whoami, pg->acting);
    				pg->set_role(role);

    				PG::RecoveryCtx rctx(0, 0, 0, 0, 0, 0);
    				pg->handle_loaded(&rctx);

    			build_past_intervals_parallel();

    		// i'm ready!
    		client_messenger->add_dispatcher_head(this);
  			...

  			service.init();		// OSDService::init()
			service.publish_map(osdmap);
			service.publish_superblock(superblock);

			consume_map();

			state = STATE_BOOTING;
  			start_boot();
  				to 6.1

  		OSD::final_init()
  			AdminSocket *admin_socket = cct->get_admin_socket();
  			test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);

  			r = admin_socket->register_command(..)
  			... // init admin sockets

7. ObjectStore

	7.1. How ObjectStore finally calls LevelDB

		KeyValueStore::do_transactions(list<Transaction*> &tls, uint64_t op_seq)
			return _do_transactions(tls, op_seq, 0);
				for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p, trans_num++)
					r = _do_transaction(**p, bt, spos, handle);
						...	// a lot of transaction op
				r = bt.submit_transaction();	// KeyValueStore::BufferTransaction::submit_transaction()
					r = store->backend->save_strip_header(header, spos, t);
					return store->backend->submit_transaction(t);	// store->backend is StripObjectMap
						return db->submit_transaction(t);	// GenericObjectMap::submit_transaction()
							// KeyValueDB::submit_transaction(), actually LevelDB::submit_transaction()
							leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat)); 

8. MSG_OSD_PG_CREATE

	8.1 where it comes?

		/*
			这个函数在PGMonitor中经常性地被调用,基本上paxos一有变动就被调用
		*/
		PGMonitor::send_pg_creates()
			for (map<int, set<pg_t> >::iterator p = pg_map.creating_pgs_by_osd.begin()
			PGMonitor::send_pg_creates(int osd, Connection *con)
				...
				/* m.mkpg携带要创建什么PG, 它来自于PGMonitor::pg_map.creating_pgs_by_osd.find(osd)*/
				MOSDPGCreate *m = new MOSDPGCreate(mon->osdmon()->osdmap.get_epoch());
				...
	
	8.2 Handling PG creation
		OSD::handle_pg_create()
			MOSDPGCreate *m = (MOSDPGCreate*)op->get_req();
			for (map<pg_t,pg_create_t>::iterator p = m->mkpg.begin(); p != m->mkpg.end(); ++p)
				pg_t on = p->first;
				spg_t pgid;
				bool mapped = osdmap->get_primary_shard(on, &pgid);
				
				// register.
				creating_pgs[pgid].history = history;
				creating_pgs[pgid].parent = parent;
				creating_pgs[pgid].acting.swap(acting);
				
				PG *pg = NULL;
				if (can_create_pg(pgid))
					pg = _create_lock_pg(osdmap, pgid, true, false, false, 0, creating_pgs[pgid].acting, whoami, creating_pgs[pgid].acting, whoami,	history, pi, *rctx.transaction);
						PG *pg = _open_lock_pg(createmap, pgid, true, hold_map_lock);
							PG* pg = _make_pg(createmap, pgid);
								pg = new ReplicatedPG(&service, createmap, pool, pgid, logoid, infooid);
						service.init_splits_between(pgid, pg->get_osdmap(), service.get_osdmap());	
						pg->init(role, up, up_primary, acting, acting_primary, history, pi, backfill, &t);
						return pg;
					pg->handle_create(&rctx);
					pg->publish_stats_to_osd();
						// a lot of updating this.info
						...
						if (is_primary())
							osd->pg_stat_queue_enqueue(this);
				dispatch_context(rctx, pg, osdmap);
					do_notifies(*ctx.notify_list, curmap);
					do_queries(*ctx.query_map, curmap);
					do_infos(*ctx.info_map, curmap);
				
			maybe_update_heartbeat_peers();			
	
	8.3 PG::upgrade(..)
		PG::upgrade(..)
			for (interval_set<snapid_t>::const_iterator i = snapcolls.begin(); i != snapcolls.end(); ++i)
				for (snapid_t next_dir = i.get_start(); next_dir != i.get_start() + i.get_len(); ++next_dir)
					coll_t cid(info.pgid, next_dir);
					int r = get_pgbackend()->objects_list_partial(cur, store->get_ideal_list_min(), store->get_ideal_list_max(), 0, &objects, &cur);
					for (vector<hobject_t>::iterator j = objects.begin(); j != objects.end(); ++j)
						t.remove(cid, *j);
			
			while (1)
				/* to repair snap? */
				for (vector<hobject_t>::iterator j = objects.begin(); j != objects.end(); ++j)
					...
					snap_mapper.get_snaps(*j, &cur_snaps);
					...
					snap_mapper.add_oid(*j, oi_snaps, &_t);
			
9. MSG_OSD_PG_NOTIFY
	9.1 handle_pg_notify
		/** PGNotify
		 * from non-primary to primary
		 * includes pg_info_t.
		 * NOTE: called with opqueue active.
		 */
		 OSD::handle_pg_notify(..)
			handle_pg_peering_evt(..)
			/*
			 * look up a pg.  if we have it, great.  if not, consider creating it IF the pg mapping
			 * hasn't changed since the given epoch and we are the primary.
			 */
					PG *pg = _create_lock_pg(..)
	
	9.2. where it comes from
		
			OSD::handle_pg_query(OpRequestRef op) || OSD::dispatch_context(..)
				/** do_notifies
				 * Send an MOSDPGNotify to a primary, with a list of PGs that I have
				 * content for, and they are primary for.
				 */
				OSD::do_notifies(..)
					 MOSDPGNotify *m = new MOSDPGNotify(..)
					 
10. MSG_OSD_PG_QUERY
	10.1 handle_pg_query
		/** PGQuery
		 * from primary to replica | stray
		 * NOTE: called with opqueue active.
		 */
		OSD::handle_pg_query(..)
			pg->queue_query(..)
				...
				peering_queue.push_back(evt);
				to next
			...
			MOSDPGLog *mlog = new MOSDPGLog();
			_share_map_outgoing(from, con.get(), osdmap);
			cluster_messenger->send_message(mlog, con.get());
			...
			do_notifies(notify_list, osdmap);
		
		[after pg->queue_query]
			... // seems to be related to recovery process
	
	10.2 where PGQuery comes from
		OSD::dispatch_context
			OSD::do_queries(..)
				...
				
11. MSG_OSD_PG_LOG
	
	11.1 handle
		OSD::handle_pg_log(OpRequestRef op)
			handle_pg_peering_evt(..);
				...
				pg->queue_peering_event(evt);
				...
				
	11.2. where comes
		OSD::handle_pg_query(OpRequestRef op)
			...
		PG::activate(..)
			...
		PG::share_pg_log()
			...
		PG::fulfill_log(..)
			...
	
12. MSG_OSD_PG_INFO
		OSD::handle_pg_info(OpRequestRef op)
			handle_pg_peering_evt(..)
		
		/* where comes */
		OSD::dispatch_context(..)
			do_infos(..)
		PG::share_pg_info(..)
			...
		PG::PG::_activate_committed(epoch_t e)
			...

13. MSG_OSD_PG_SCAN
	
	13.1 where it comes?
		ReplicatedPG::do_scan(..)
		ReplicatedPG::recover_backfill(..)

	13.2. handle_pg_scan
		OSD::handle_pg_scan(..)
			pg = _lookup_pg(m->pgid);
			enqueue_op(pg, op);
				pg->queue_op(op);
					osd->op_wq.queue(make_pair(PGRef(this), op));

		[OSD::OpWQ]
		OSD::OpWQ::_process(..)
			osd->dequeue_op(pg, op, handle);
				op->mark_reached_pg();
				pg->do_request(op, handle);
					ReplicatedPG::do_request(..)
						case MSG_OSD_PG_SCAN:
    						do_scan(op, handle);
    							...
    							MOSDPGScan *reply = new MOSDPGScan(..)
    							...
    							/*
    								Seems heavily related to Backfill. From http://ceph.com/docs/master/rados/operations/pg-states/ see:
    								Backfill
										Ceph is scanning and synchronizing the entire contents of a placement group instead of inferring what contents need to be synchronized from the logs of recent operations. Backfill is a special case of recovery.
    							*/

14. what is CephPeeringEvt?
	14.1. PG.h::class CephPeeringEvt{
			epoch_sent, epoch_requested, 
			evt,
		}
	     no subclasses

	14.2. PG::queue_peering_event(CephPeeringEvtRef evt)
			peering_queue.push_back(evt);
			OSDService::queue_for_peering(PG *pg=this)
				peering_wq.queue(pg);

		  OSD::peering_wq::_process(..)
		  	osd->process_peering_events(pgs, handle);
		  		for (list<PG*>::const_iterator i = pgs.begin(); i != pgs.end(); ++i) {
		  			advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs);

		  			PG::CephPeeringEvtRef evt = pg->peering_queue.front();
		  			pg->handle_peering_event(evt, &rctx);
		  				recovery_state.handle_event(evt, rctx);

		  			pg->write_if_dirty(*rctx.transaction);
		  		}

15. CephPeeringEvent and Recovery
    Ceph peering model: http://lists.ceph.com/pipermail/ceph-users-ceph.com/attachments/20130415/39f25f5a/attachment-0001.png

	// general pg op process
	enqueue
	_process
	dequeue
	op->mark_reached_pg();
	pg->do_request(op, handle);

	// pg scan
	PGBackend::objects_list_partial(..)
			ObjectStore::collection_list_partial()
				KeyValueStore
					StripObjectMap
						GenericObjectMap::list_objects(..)
							KeyValueDB::get_iterator()

	// ----------------

	// queue_peering_event(CephPeeringEvent..)

	PG::queue_peering_event(CephPeeringEvtRef evt)
		peering_queue.push_back(evt);
		osd->queue_for_peering(this);

	OSD::peering_wq::_process()
		osd->process_peering_events(pgs, handle);
			advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs);
				pg->handle_advance_map(nextmap, lastmap, newup, up_primary, newacting, acting_primary, rctx);
					recovery_state.handle_event(evt, rctx);
				if (parent.is_split(..))
					.. // handle splits
			
			PG::CephPeeringEvtRef evt = pg->peering_queue.front();
			pg->peering_queue.pop_front();
			pg->handle_peering_event(evt, &rctx);
				recovery_state.handle_event(evt, rctx);
			
			pg->write_if_dirty(*rctx.transaction);
			
	// PG recover states

	Initial 
		Load -> Reset
		MNotifyRec -> Primary
		MInfoRec -> Stray
		MLogRec -> Stray
		
	Started
		AdvMap -> (up or acting affected)?Reset:Nothing

	Reset
		AdvMap -> 
			 pg->start_peering_interval(..)
				 init_primary_up_acting(..)
				 // did acting, up, primary|acker change?
				 ...
				 // did primary change?
				 ...
		ActMap -> Started
		
	Primary
		NeedActingChange -> WaitActingChange
		
	Peering
		pg->state_set(PG_STATE_PEERING);
		
		Activate -> Active
		AdvMap -> Reset
		
	Backfilling
		pg->osd->queue_for_recovery(pg);

		RemoteReservationRejected 
			pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
			pg->state_set(PG_STATE_BACKFILL_TOOFULL);
			-> NotBackfilling
		Backfilled -> Recovered
		
	WaitRemoteBackfillReserved
		RemoteReservationRejected
		AllBackfillsReserved -> Backfilling
		RemoteBackfillReserved -> 
			if(){
				if(){
					pg->osd->send_message_osd_cluster(new MBackfillReserve(..)..)
				}else{
					post_event(RemoteBackfillReserved());
				}
			}else{
				post_event(AllBackfillsReserved());
			}

	WaitLocalBackfillReserved
		LocalBackfillReserved -> WaitRemoteBackfillReserved

	NotBackfilling & Activating
		RequestBackfill -> WaitLocalBackfillReserved
		

	Active
		AdvMap -> 
		ActMap ->
		MNotifyRec -> 
		MInfoRec -> 
		MLogRec -> 
		AllReplicasActivated -> 

	Activating
		AllReplicasRecovered, Recovered
		DoRecovery, WaitLocalRecoveryReserved
		RequestBackfill, WaitLocalBackfillReserved

	ReplicaActive

	Stray

	Recovering
		pg->osd->queue_for_recovery(pg);

		AllReplicasRecovered -> Recovered
		RequestBackfill -> WaitRemoteBackfillReserved

	WaitRemoteRecoveryReserved
		RemoteRecoveryReserved -> 
		AllRemotesReserved -> Recovering

	WaitLocalRecoveryReserved
		LocalRecoveryReserved -> WaitRemoteRecoveryReserved

	Clean & Activating
		DoRecovery -> WaitLocalRecoveryReserved
		
	Clean
		DoRecovery -> WaitLocalRecoveryReserved
		
	-------
	pg->osd->queue_for_recovery(pg);
		OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
			bool more = pg->start_recovery_ops(max, &rctx, handle, &started);
				ReplicatedPG::start_recovery_ops(..)
					recover_replicas(..)
						started += prep_object_replica_pushes(soid, r->second.need, h);
						pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority)

					recover_primary(..)
						pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
					
					recover_backfill(..)

					if started < max
						RequestBackfill()   // so, backfill is a special case for recoverying, when normal reovery cannot succeed
						
	------

	/*
		according to https://github.com/ceph/ceph/blob/master/doc/dev/osd_internals/recovery_reservation.rst
		there are two kinds of recovery: log-based recovery and backfill
		recover_replica() & recover_primary() try to do log-based recovery
		recover_backfill() does backfill recovery.
		backfill is to copy all objects instead of log
	*/

	//TODO PG:Activate() read and get to know recovery/backfill

	//TOOD how snap works, snapmanager
					
16. OSD Scrubbing
    "Silent data corruption caused by hardware can be a big issue on a large data store. RADOS offers a scrubbing feature" 
    
    "Regular scrubbing is lighter and checks that the object is correctly replicated among the nodes. It also checks the object’s metadata and attributes. Deep scrubbing is heavier and expands the check to the actual data."

    "It ensures data integrity by reading the data and computing checksums."
    	-- https://www.usenix.org/system/files/login/articles/02_giannakos.pdf

   	in swift, the same thing is done by "Auditor"

    	// register scrubbing
    	pg->reg_next_scrub();
    		osd->reg_last_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);

    	// schedule scrubbing
    	OSD::tick()
    		OSD::sched_scrub();
    			PG::sched_scrub()
    				queue_scrub();
    					state_set(PG_STATE_SCRUBBING);
    					osd->queue_for_scrub(this)

    	// do scrubbing
    	OSD::ScrubWQ::_process()
			pg->scrub(handle);
				if (scrubber.is_chunky) {
					chunky_scrub(handle);
						while (!done) {
							switch (scrubber.state) {
      							case PG::Scrubber::INACTIVE:
      								scrubber.state = PG::Scrubber::NEW_CHUNK;
      							case PG::Scrubber::NEW_CHUNK:
      							...
      							...
      							case PG::Scrubber::COMPARE_MAPS:
      							case PG::Scrubber::FINISH:
      						}
						}
				} else {
					classic_scrub(handle);
						...
				}

17. PGBackend::objects_list_partial()
		ObjectStore::collection_list_partial()
    PGBackend::objects_list_range()
    	ObjectStore::collection_list_range()

    ObjectStore::collection_list_partial()
    ObjectStore::collection_list_range()

    ObjectStore::get_ideal_list_min()
    ObjectStore::get_ideal_list_max()

    	@see ObjectStore.h, detailed comments

18. PG startup process
		PG::PG() 
			... // do nothing

		PG::init()
			... // set role, actiing set
		
		PG::activate()
			state_set(PG_STATE_ACTIVE);
			... // log, recovery, backfill stuff

19. snap mechanism & io path
    http://ceph.com/docs/master/dev/osd_internals/snaps/
    http://www.wzxue.com/%E8%A7%A3%E6%9E%90ceph-snapshot/
    http://blog.sina.com.cn/s/blog_c2e1a9c7010151xb.html
	
	/* 收到client op */
	OSD::dispatch_op()
		case CEPH_MSG_OSD_OP:
			handle_op(op);
				MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
				/*
					MOSDOp中
						snapid_t snapid;
						snapid_t snap_seq;
					与snapshot有关
				*/
				enqueue_op(pg, op);
					PG::queue_op(OpRequestRef op)
						osd->op_wq.queue(make_pair(PGRef(this), op));
	
	/* 进入处理 */
	OSD::OpWQ::_process()
		osd->dequeue_op(pg, op, handle);
			pg->do_request(op, handle);
				ReplicatedPG::do_request(OpRequestRef op, ThreadPool::TPHandle &handle)
					case CEPH_MSG_OSD_OP:
						ReplicatedPG::do_op(op)
							if (op->includes_pg_op()) {
								return do_pg_op(op);
							}
							
							hobject_t head(m->get_oid(), m->get_object_locator().key,
								CEPH_NOSNAP, m->get_pg().ps(),
								info.pgid.pool(), m->get_object_locator().nspace);
							hobject_t snapdir(m->get_oid(), m->get_object_locator().key,
								CEPH_SNAPDIR, m->get_pg().ps(), info.pgid.pool(),
								m->get_object_locator().nspace);
							
							ObjectContextRef obc;		
							hobject_t oid(m->get_oid(),
								m->get_object_locator().key,
								m->get_snapid(),
								m->get_pg().ps(),
								m->get_object_locator().get_pool(),
								m->get_object_locator().nspace);
							r = find_object_context(oid, &obc, can_create, &missing_oid);
							
							map<hobject_t,ObjectContextRef> src_obc;
							for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); ++p) {
								hobject_t src_oid(osd_op.soid, src_oloc.key, m->get_pg().ps(),
									info.pgid.pool(), m->get_object_locator().nspace);
								r = find_object_context(src_oid, &sobc, false, &wait_oid)
								src_obc[src_oid] = sobc;
							}
							
							 // any SNAPDIR op needs to have all clones present.
							if (m->get_snapid() == CEPH_SNAPDIR){
								for (vector<snapid_t>::iterator p = obc->ssc->snapset.clones.begin(); p != obc->ssc->snapset.clones.end(); ++p) {
									hobject_t clone_oid = obc->obs.oi.soid;
									clone_oid.snap = *p;
									if (!src_obc.count(clone_oid)){
										ObjectContextRef sobc;
										r = find_object_context(clone_oid, &sobc, false, &wait_oid);
										src_obc[clone_oid] = sobc;
									}
								}
							}
							
							OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
											&obc->obs, obc->ssc, 
											this);
							ctx->op_t = pgbackend->get_transaction();
							ctx->obc = obc;
							
							ctx->src_obc = src_obc;
							execute_ctx(ctx);
								/* this method must be idempotent since we may call it several times
  								   before we finally apply the resulting transaction. */
								if (op->may_write() || op->may_cache()) {
									 // snap
									if (pool.info.is_pool_snaps_mode()) {
										// use pool's snapc
										ctx->snapc = pool.snapc;
									}else{
										// client specified snapc
										ctx->snapc.seq = m->get_snap_seq();
										ctx->snapc.snaps = m->get_snaps();
									}
								}
								
								int result = prepare_transaction(ctx);
									const hobject_t& soid = ctx->obs->oi.soid;
									int result = do_osd_ops(ctx, ctx->ops);
										PGBackend::PGTransaction* t = ctx->op_t;
										for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
											// the greate swtich-case where op is handled
											switch (op.op) {
												case CEPH_OSD_OP_WATCH:
												...
												case CEPH_OSD_OP_READ:
													int r = pgbackend->objects_read_sync(
															soid, op.extent.offset, op.extent.length, &osd_op.outdata);
												case CEPH_OSD_OP_LIST_SNAPS:
													obj_list_snap_response_t resp;
													resp.clones.reserve(clonecount);
													resp.clones.push_back(ci);
													resp.encode(osd_op.outdata);
												case CEPH_OSD_OP_WRITE:
													t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
												...
												case ...
												...
												case ...
												...
												case ...								
												...
												case ...
												...
												...





												/* 
													the great switch-case 
												*/





												case ...
												...
											}
										}
										
									do_osd_op_effects(ctx);
										... // notify/callbacks
									
									// clone, if necessary
									if (soid.snap == CEPH_NOSNAP)
										make_writeable(ctx);
											const hobject_t& soid = ctx->obs->oi.soid;
											PGBackend::PGTransaction *t = pgbackend->get_transaction();
											hobject_t coid = soid;
											coid.snap = snapc.seq;
											
											ctx->clone_obc = object_contexts.lookup_or_create(static_snap_oi.soid);
											ctx->clone_obc->destructor_callback = new C_PG_ObjectContext(this, ctx->clone_obc.get());
											ctx->clone_obc->obs.oi = static_snap_oi;
											ctx->clone_obc->obs.exists = true;
											
											_make_clone(ctx, t, ctx->clone_obc, soid, coid, snap_oi);
												t->clone(head, coid);
												
											// prepend transaction to op_t
											t->append(ctx->op_t);
											delete ctx->op_t;
											ctx->op_t = t;
									
									finish_ctx(ctx,	ctx->new_obs.exists ? pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE);
										// snapset
										if (soid.snap == CEPH_NOSNAP) {
											hobject_t snapoid(soid.oid, soid.get_key(), CEPH_SNAPDIR, soid.hash, info.pgid.pool(), soid.get_namespace());

											ctx->op_t->stash(snapoid, ctx->at_version.version);
										}

										if (ctx->new_obs.exists) {
											bufferlist bv(sizeof(ctx->new_obs.oi));
										    ::encode(ctx->new_obs.oi, bv);
										    setattr_maybe_cache(ctx->obc, ctx, ctx->op_t, OI_ATTR, bv);
										}

								bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
								
								// issue replica writes
								RepGather *repop = new_repop(ctx, obc, rep_tid);
								repop->src_obc.swap(src_obc);
								
								issue_repop(repop, now);
									pgbackend->submit_transaction(
									    soid,
									    repop->ctx->at_version,
									    repop->ctx->op_t,
									    pg_trim_to,
									    repop->ctx->log,
									    onapplied_sync,
									    on_all_applied,
									    on_all_commit,
									    repop->rep_tid,
									    repop->ctx->reqid,
									    repop->ctx->op);
									    	RPGTransaction *t = dynamic_cast<RPGTransaction*>(_t);
									    	
									    	issue_op(      // ReplicatedBackend::issue_op
											    soid,
											    at_version,
											    tid,
											    reqid,
											    trim_to,
											    t->get_temp_added().size() ? *(t->get_temp_added().begin()) : hobject_t(),
											    t->get_temp_cleared().size() ?
											      *(t->get_temp_cleared().begin()) :hobject_t(),
											    log_entries,
											    &op,
											    op_t);

											    /* 各个replica接到数据是在这里 */
											    for (set<pg_shard_t>::const_iterator i = parent->get_actingbackfill_shards().begin(); i != parent->get_actingbackfill_shards().end(); ++i) {
												    // forward the write/update/whatever
												    MOSDSubOp *wr = new MOSDSubOp(
												      reqid, parent->whoami_shard(),
												      spg_t(get_info().pgid.pgid, i->shard),
												      soid,
												      false, acks_wanted,
												      get_osdmap()->get_epoch(),
												      tid, at_version);

												    // ship resulting transaction, log entries, and pg_stats
												    ::encode(*op_t, wr->get_data());
												    get_parent()->send_message_osd_cluster(peer.osd, wr, get_osdmap()->get_epoch());
												}
											
											/* op写入本地是在这里 */
											parent->queue_transaction(op_t, op.op);
												ReplicatedPG::queue_transaction(ObjectStore::Transaction *t, OpRequestRef op)
												osd->store->queue_transaction(osr.get(), t, 0, 0, 0, op);
								
								/* 主要是,send ack */
								eval_repop(repop);
									MOSDOp *m = NULL;
									m = static_cast<MOSDOp *>(repop->ctx->op->get_req());

									// send commit.
									MOSDOpReply *reply = repop->ctx->reply;
									osd->send_message_osd_client(reply, m->get_connection());

									// applied?
									for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin(); i != waiting_for_ack[repop->v].end(); ++i) {
										MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
										osd->send_message_osd_client(reply, m->get_connection());
									}

									// done.
									if (repop->all_applied && repop->all_committed) {
										repop_queue.pop_front();
    									remove_repop(repop);
									}

	/* replica收到MOSDSubOp */
	OSD::dispatch_op(..)
		case CEPH_MSG_OSD_OP:
    		handle_op(op);
    			enqueue_op(pg, op);
    				 pg->queue_op(op);
    				 	osd->op_wq.queue(make_pair(PGRef(this), op));

    OSD::OpWQ::_process(..)
    	osd->dequeue_op(pg, op, handle);
    		pg->do_request(op, handle);
    			ReplicatedPG::do_request(..)
    				pgbackend->handle_message(op)
    					case MSG_OSD_SUBOP:
    						OSDOp *first = &m->ops[0];
							switch (first->op.op) {
								case CEPH_OSD_OP_PULL:
									sub_op_pull(op);
									return true;
								case CEPH_OSD_OP_PUSH:
									sub_op_push(op);
										handle_push(m->from, pop, &resp, t);
											submit_push_data(pop.recovery_info,
											   first,
											   complete,
											   pop.data_included,
											   data,
											   pop.omap_header,
											   pop.attrset,
											   pop.omap_entries,
											   t);
											   		... // many operations are written into t
											   		submit_push_complete(recovery_info, t);
										get_parent()->queue_transaction(t);
									return true
							}
							sub_op_modify(op);
								RepModifyRef rm(new RepModify);
								...
								parent->queue_transaction(&(rm->localt), op);

    				case MSG_OSD_SUBOP:
    					do_sub_op(op);
    						... // 这个是给scrubber用的
    					break;


19.5 ondisk object structure
	1. object_t
	   snapid_t

	   sobject_t = object_t + snapid_t (snapped object)
	   hobject_t = object_t + snapid_t + hash (hashed object)
	   ghobject_t = hobject_t + gen_t + shard_t (generationed object)
	   		gen_t = version_t
	   		shard_t = uint8_t

	   coll_t = str (collection)
	   
	   pg_t = m_pool
	   spg_t = shard_id_t + pg_t
	   pg_t => spg_t : OSDMap::get_primary_shard(pg_t, spg_t){
						*out = spg_t(pgid);}		// NO_SHARD

Ceph OSD structure chart

Ceph OSD Structure

OSD PG (Placement Group) peering states chart

PG Peering States



Create an Issue or comment below