How does Ceph OSD Handle the Request from Client
It's good starting point where you completely and perfectly understand how Ceph works.
This page will be update soon to put more perfect explanation coming from more analisys.
/*
* Operation started.
*/
#./src/osd/OSD.cc
5676 bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
5677 {
...
5700 // client ops
5701 case CEPH_MSG_OSD_OP:
5702 handle_op(op, osdmap);
...
/*
* OSD started to handle request from client:
*/
7948 void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
7949 {
...
8093 if (pg) {
8094 op->send_map_update = share_map.should_send;
8095 op->sent_epoch = m->get_map_epoch();
8096 enqueue_op(pg, op);
...
8160 void OSD::enqueue_op(PG *pg, OpRequestRef& op)
8161 {
...
8167 pg->queue_op(op);
#./src/osd/PG.cc
1835 void PG::queue_op(OpRequestRef& op)
1836 {
...
/*
* An OSD instance will be created but it must wait for OSD map.
*/
1847 osd->op_wq.queue(make_pair(PGRef(this), op));
/*
* An OSD looks for PG to read the data.
*/
#./src/osd/OSD.cc
8303 /*
8304 * NOTE: dequeue called in worker thread, with pg lock
8305 */
8306 void OSD::dequeue_op(
8307 PGRef pg, OpRequestRef op,
8308 ThreadPool::TPHandle &handle)
8309 {
...
8343 pg->do_request(op, handle);
/*
* Transaction will be created on the basis of context.
*/
#./src/osd/ReplicatedPG.cc
1238 void ReplicatedPG::do_request(
1239 OpRequestRef& op,
1240 ThreadPool::TPHandle &handle)
1241 {
...
1293 do_op(op); // do it now
1294 break;
...
1361 /** do_op - do an op
1362 * pg lock will be held (if multithreaded)
1363 * osd_lock NOT held.
1364 */
1365 void ReplicatedPG::do_op(OpRequestRef& op)
1366 {
...
1697 OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);
...
1760 execute_ctx(ctx);
...
2218 void ReplicatedPG::execute_ctx(OpContext *ctx)
2219 {
...
2289 int result = prepare_transaction(ctx);
...
5668 int ReplicatedPG::prepare_transaction(OpContext *ctx)
5669 {
...
5680 // prepare the actual mutation
5681 int result = do_osd_ops(ctx, ctx->ops);
...
3348 int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
3349 {
...
3463 } else if (pool.info.require_rollback()) {
3464 ctx->pending_async_reads.push_back(
3465 make_pair(
3466 boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
3467 make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length))));
3468 dout(10) << " async_read noted for " << soid << dendl;
...
2094 ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
...
2345 // read or error?
2346 if (ctx->op_t->empty() || result < 0) {
...
2348 if (result == 0)
...
2351 if (ctx->pending_async_reads.empty()) {
...
2353 } else {
...
/*
* Read operation starts.
*/
2355 ctx->start_async_reads(this);
...
107 // OpContext
108 void ReplicatedPG::OpContext::start_async_reads(ReplicatedPG *pg)
109 {
110 inflightreads = 1;
111 pg->pgbackend->objects_read_async(
#./src/osd/ReplicatgedBackend.cc
277 void ReplicatedBackend::objects_read_async(
278 const hobject_t &hoid,
279 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
280 pair<bufferlist*, Context*> > > &to_read,
281 Context *on_complete)
282 {
...
302 new AsyncReadCallback(r, on_complete)));
...
114 new OnReadComplete(pg, this));
...
93 struct OnReadComplete : public Context {
94 ReplicatedPG *pg;
95 ReplicatedPG::OpContext *opcontext;
96 OnReadComplete(
97 ReplicatedPG *pg,
98 ReplicatedPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
99 void finish(int r) {
100 if (r < 0)
101 opcontext->async_read_result = r;
102 opcontext->finish_read(pg);
103 }
104 ~OnReadComplete() {}
105 };
...
117 void ReplicatedPG::OpContext::finish_read(ReplicatedPG *pg)
118 {
119 assert(inflightreads > 0);
120 --inflightreads;
121 if (async_reads_complete()) {
122 assert(pg->in_progress_async_reads.size());
123 assert(pg->in_progress_async_reads.front().second == this);
124 pg->in_progress_async_reads.pop_front();
125 pg->complete_read_ctx(async_read_result, this);
126 }
127 }
...
5911 void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
5912 {
5913 MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
5914 assert(ctx->async_reads_complete());
...
5942 reply->set_result(result);
5943 reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
5944 osd->send_message_osd_client(reply, m->get_connection());
5945 close_op_ctx(ctx, 0);
5946 }