How does Ceph OSD Handle the Request from Client

It's good starting point where you completely and perfectly understand how Ceph works.

This page will be update soon to put more perfect explanation coming from more analisys.

/*
 * Operation started.
 */
#./src/osd/OSD.cc
5676 bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
5677 {
           ...
5700   // client ops
5701   case CEPH_MSG_OSD_OP:
5702     handle_op(op, osdmap);
           ...
/*
 * OSD started to handle request from client:
 */
7948 void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
7949 {
           ...
8093   if (pg) {
8094     op->send_map_update = share_map.should_send;
8095     op->sent_epoch = m->get_map_epoch();
8096     enqueue_op(pg, op);
           ...
8160 void OSD::enqueue_op(PG *pg, OpRequestRef& op)
8161 {
           ...
8167   pg->queue_op(op);

#./src/osd/PG.cc
1835 void PG::queue_op(OpRequestRef& op)
1836 {
...
/*
 * An OSD instance will be created but it must wait for OSD map.
 */
1847   osd->op_wq.queue(make_pair(PGRef(this), op));

/*
 * An OSD looks for PG to read the data.
 */
#./src/osd/OSD.cc
8303 /*
8304  * NOTE: dequeue called in worker thread, with pg lock
8305  */
8306 void OSD::dequeue_op(
8307   PGRef pg, OpRequestRef op,
8308   ThreadPool::TPHandle &handle)
8309 {
...
8343   pg->do_request(op, handle);

/*
 * Transaction will be created on the basis of context.
 */
#./src/osd/ReplicatedPG.cc
 1238 void ReplicatedPG::do_request(
 1239   OpRequestRef& op,
 1240   ThreadPool::TPHandle &handle)
 1241 {
  ...
 1293     do_op(op); // do it now
 1294     break;
  ...
 1361 /** do_op - do an op
 1362  * pg lock will be held (if multithreaded)
 1363  * osd_lock NOT held.
 1364  */
 1365 void ReplicatedPG::do_op(OpRequestRef& op)
 1366 {
  ...

 1697   OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);
  ...
 1760   execute_ctx(ctx);
  ...
 2218 void ReplicatedPG::execute_ctx(OpContext *ctx)
 2219 {
  ...
 2289   int result = prepare_transaction(ctx);
  ...
 5668 int ReplicatedPG::prepare_transaction(OpContext *ctx)
 5669 {
  ...
 5680   // prepare the actual mutation
 5681   int result = do_osd_ops(ctx, ctx->ops);
  ...
 3348 int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 3349 {
  ...
 3463         } else if (pool.info.require_rollback()) {
 3464           ctx->pending_async_reads.push_back(
 3465             make_pair(
 3466               boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
 3467               make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length))));
 3468           dout(10) << " async_read noted for " << soid << dendl;
  ...
 2094   ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
  ...
 2345   // read or error?
 2346   if (ctx->op_t->empty() || result < 0) {
  ...
 2348     if (result == 0)
  ...
 2351     if (ctx->pending_async_reads.empty()) {
  ...
 2353     } else {
  ...

/*

 * Read operation starts.

 */
 2355       ctx->start_async_reads(this);
  ...
 107 // OpContext
 108 void ReplicatedPG::OpContext::start_async_reads(ReplicatedPG *pg)
 109 {
 110   inflightreads = 1;
 111   pg->pgbackend->objects_read_async(

#./src/osd/ReplicatgedBackend.cc
 277 void ReplicatedBackend::objects_read_async(
 278   const hobject_t &hoid,
 279   const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
 280                   pair<bufferlist*, Context*> > > &to_read,
 281   Context *on_complete)
 282 {
  ...
 302       new AsyncReadCallback(r, on_complete)));
  ...
 114     new OnReadComplete(pg, this));
  ...
 93 struct OnReadComplete : public Context {
 94   ReplicatedPG *pg;
 95   ReplicatedPG::OpContext *opcontext;
 96   OnReadComplete(
 97     ReplicatedPG *pg,
 98     ReplicatedPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
 99   void finish(int r) {
 100     if (r < 0)
 101       opcontext->async_read_result = r;
 102     opcontext->finish_read(pg);
 103   }
 104   ~OnReadComplete() {}
 105 };
  ...
 117 void ReplicatedPG::OpContext::finish_read(ReplicatedPG *pg)
 118 {
 119   assert(inflightreads > 0);
 120   --inflightreads;
 121   if (async_reads_complete()) {
 122     assert(pg->in_progress_async_reads.size());
 123     assert(pg->in_progress_async_reads.front().second == this);
 124     pg->in_progress_async_reads.pop_front();
 125     pg->complete_read_ctx(async_read_result, this);
 126   }
 127 }
  ...
 5911 void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
 5912 {
 5913   MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
 5914   assert(ctx->async_reads_complete());
  ...
 5942   reply->set_result(result);
 5943   reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
 5944   osd->send_message_osd_client(reply, m->get_connection());
 5945   close_op_ctx(ctx, 0);
 5946 }