RadosClient OSDC
时间: 2019-03-23来源:OSCHINA
RadosClient.h class librados::RadosClient : public Dispatcher //继承自Dispatcher(消息分发类) { public: using Dispatcher::cct; md_config_t *conf; //配置文件 private: enum { DISCONNECTED, CONNECTING, CONNECTED, } state; //网络连接状态 MonClient monclient; // monc Messenger *messenger; //网络消息接口 uint64_t instance_id; //相关消息分发 Dispatcher类的函数重写 bool _dispatch(Message *m); bool ms_dispatch(Message *m); bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); void ms_handle_connect(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con); Objecter *objecter; // Osdc模块中的 用于发送封装好的OP消息 Mutex lock; Cond cond; SafeTimer timer; //定时器 int refcnt; version_t log_last_version; rados_log_callback_t log_cb; void *log_cb_arg; string log_watch; int wait_for_osdmap(); public: Finisher finisher; // 回调函数的类 explicit RadosClient(CephContext *cct_); ~RadosClient(); int ping_monitor(string mon_id, string *result); int connect(); // 连接 void shutdown(); int watch_flush(); int async_watch_flush(AioCompletionImpl *c); uint64_t get_instance_id(); int wait_for_latest_osdmap(); // 根据pool名字或id创建ioctx int create_ioctx(const char *name, IoCtxImpl **io); int create_ioctx(int64_t, IoCtxImpl **io); int get_fsid(std::string *s); // pool相关操作 int64_t lookup_pool(const char *name); bool pool_requires_alignment(int64_t pool_id); int pool_requires_alignment2(int64_t pool_id, bool *requires); uint64_t pool_required_alignment(int64_t pool_id); int pool_required_alignment2(int64_t pool_id, uint64_t *alignment); int pool_get_auid(uint64_t pool_id, unsigned long long *auid); int pool_get_name(uint64_t pool_id, std::string *auid); int pool_list(std::list<std::pair<int64_t, string> >& ls); int get_pool_stats(std::list<string>& ls, map<string,::pool_stat_t>& result); int get_fs_stats(ceph_statfs& result); /* -1 was set as the default value and monitor will pickup the right crush rule with below order: a) osd pool default crush replicated ruleset b) the first ruleset in crush ruleset c) error out if no value find */ // 同步创建pool 和 异步创建pool int pool_create(string& name, unsigned long long auid=0, int16_t crush_rule=-1); int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0, int16_t crush_rule=-1); int pool_get_base_tier(int64_t pool_id, int64_t* base_tier); //同步删除和异步删除 int pool_delete(const char *name); int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); int blacklist_add(const string& client_address, uint32_t expire_seconds); //Mon命令处理,调用monclient.start_mon_command 把命令发送给Mon处理 int mon_command(const vector<string>& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); int mon_command(int rank, const vector<string>& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); int mon_command(string name, const vector<string>& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); //OSD命令处理,objector->osd_command 把命令发送给OSD处理 int osd_command(int osd, vector<string>& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs); //PG命令处理,objector->pg_command 把命令发送给PG处理 int pg_command(pg_t pgid, vector<string>& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs); void handle_log(MLog *m); int monitor_log(const string& level, rados_log_callback_t cb, void *arg); void get(); bool put(); void blacklist_self(bool set); };
connect() 连接 int librados::RadosClient::connect() { common_init_finish(cct); int err; // already connected? if (state == CONNECTING) return -EINPROGRESS; if (state == CONNECTED) return -EISCONN; state = CONNECTING; // get monmap err = monclient.build_initial_monmap(); //通过配置文件获取初始化的Monitor if (err < 0) goto out; err = -ENOMEM; messenger = Messenger::create_client_messenger(cct, "radosclient"); //创建通信模块 if (!messenger) goto out; // require OSDREPLYMUX feature. this means we will fail to talk to // old servers. this is necessary because otherwise we won't know // how to decompose the reply data into its consituent pieces. messenger->set_default_policy(Messenger::Policy::lossy_client(0, CEPH_FEATURE_OSDREPLYMUX)); ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl; ldout(cct, 1) << "starting objecter" << dendl; //创建objecter objecter = new (std::nothrow) Objecter(cct, messenger, &monclient, &finisher, cct->_conf->rados_mon_op_timeout, cct->_conf->rados_osd_op_timeout); if (!objecter) goto out; objecter->set_balanced_budget(); // mc添加 messenger monclient.set_messenger(messenger); // objecter 初始化 objecter->init(); // messenger添加 dispather messenger->add_dispatcher_tail(objecter); messenger->add_dispatcher_tail(this); // messenger启动 messenger->start(); ldout(cct, 1) << "setting wanted keys" << dendl; monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); ldout(cct, 1) << "calling monclient init" << dendl; // mc 初始化 err = monclient.init(); if (err) { ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl; shutdown(); goto out; } err = monclient.authenticate(conf->client_mount_timeout); if (err) { ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl; shutdown(); goto out; } messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); objecter->set_client_incarnation(0); // objecter 启动 objecter->start(); lock.Lock(); // 定时器初始化 timer.init(); monclient.renew_subs(); //执行回调的完成类start finisher.start(); // 更新 状态为已连接 state = CONNECTED; instance_id = monclient.get_global_id(); ... }
create_ioctx 根据pool创建ioctx int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io) { // 获取 poolid int64_t poolid = lookup_pool(name); ... // 创建 IoCtxImpl *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP); return 0; }
Mon OSD pg 命令操作 int librados::RadosClient::mon_command(const vector<string>& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs) { // mc start_mon_command 发送到monitor monclient.start_mon_command(cmd, inbl, outbl, outs, new C_SafeCond(&mylock, &cond, &done, &rval)); } int librados::RadosClient::osd_command(int osd, vector<string>& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs) { // 发送到osd int r = objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs, new C_SafeCond(&mylock, &cond, &done, &ret)); } int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs) { // 发送到pg int r = objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs, new C_SafeCond(&mylock, &cond, &done, &ret)); }
librados::IoCtx的实现IoCtxImpl 把请求封装成ObjectOperation 类(osdc 中的) 把相关的pool信息添加到里面,封装成Objecter::Op对像 调用相应的函数 objecter- >op_submit 发送给相应的OSD 操作完成后,调用相应的回调函数。
如rados_write extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, size_t len, uint64_t off) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); bufferlist bl; bl.append(buf, len); int retval = ctx->write(oid, bl, len, off); }
调用IoCtxImpl::write int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl, size_t len, uint64_t off) { ::ObjectOperation op; prepare_assert_ops(&op); // assert ops bufferlist mybl; mybl.substr_of(bl, 0, len); op.write(off, mybl); // 封装到op.write Objecter.h ObjectOperation write return operate(oid, &op, NULL); // IoCtxImpl::operate } int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o, ceph::real_time *pmtime, int flags) { int op = o->ops[0].op.op; Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc, *o, snapc, ut, flags, NULL, oncommit, &ver); objecter->op_submit(objecter_op); }
ObjectOperation struct ObjectOperation { vector<OSDOp> ops; // ops集合 int flags; int priority; vector<bufferlist*> out_bl; // 输出bufferlist vector<Context*> out_handler; // 回调函数 vector<int*> out_rval; // 返回码集合 size_t size() { // op个数 return ops.size(); } /** * This is a more limited form of C_Contexts, but that requires * a ceph_context which we don't have here. */ // 用户添加回调函数 class C_TwoContexts : public Context { Context *first; Context *second; }; /** * Add a callback to run when this operation completes, * after any other callbacks for it. */ // 添加回调函数 void add_handler(Context *extra) { size_t last = out_handler.size() - 1; Context *orig = out_handler[last]; if (orig) { Context *wrapper = new C_TwoContexts(orig, extra); out_handler[last] = wrapper; } else { out_handler[last] = extra; } } // 添加操作 OSDOp& add_op(int op) { int s = ops.size(); ops.resize(s+1); ops[s].op.op = op; out_bl.resize(s+1); out_bl[s] = NULL; out_handler.resize(s+1); out_handler[s] = NULL; out_rval.resize(s+1); out_rval[s] = NULL; return ops[s]; } // 添加data void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) { OSDOp& osd_op = add_op(op); osd_op.op.extent.offset = off; osd_op.op.extent.length = len; osd_op.indata.claim_append(bl); } void add_clone_range(int op, uint64_t off, uint64_t len, const object_t& srcoid, uint64_t srcoff, snapid_t srcsnapid) {} void add_xattr(int op, const char *name, const bufferlist& data) {} void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& data) {} // 添加call method void add_call(int op, const char *cname, const char *method, bufferlist &indata, bufferlist *outbl, Context *ctx, int *prval) { OSDOp& osd_op = add_op(op); unsigned p = ops.size() - 1; out_handler[p] = ctx; out_bl[p] = outbl; out_rval[p] = prval; osd_op.op.cls.class_len = strlen(cname); osd_op.op.cls.method_len = strlen(method); osd_op.op.cls.indata_len = indata.length(); osd_op.indata.append(cname, osd_op.op.cls.class_len); osd_op.indata.append(method, osd_op.op.cls.method_len); osd_op.indata.append(indata); } void add_pgls(int op, uint64_t count, collection_list_handle_t cookie, epoch_t start_epoch) {} void add_pgls_filter(int op, uint64_t count, const bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) {} void add_alloc_hint(int op, uint64_t expected_object_size, uint64_t expected_write_size) {} // ------ // pg 操作 void pg_ls(uint64_t count, bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) {} void pg_nls(uint64_t count, const bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) {} // 创建 操作 void create(bool excl) { OSDOp& o = add_op(CEPH_OSD_OP_CREATE); o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0); } // 状态 struct C_ObjectOperation_stat : public Context { bufferlist bl; uint64_t *psize; ceph::real_time *pmtime; time_t *ptime; struct timespec *pts; int *prval; // 完成大小,时间等 void finish(int r) {} } }; // 查看状态,获取C_ObjectOperation_stat void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {} void stat(uint64_t *psize, time_t *ptime, int *prval) {} void stat(uint64_t *psize, struct timespec *pts, int *prval) {} // object data // 读操作 void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval, Context* ctx) { bufferlist bl; add_data(CEPH_OSD_OP_READ, off, len, bl); unsigned p = ops.size() - 1; out_bl[p] = pbl; out_rval[p] = prval; out_handler[p] = ctx; } void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m, bufferlist *data_bl, int *prval) {} // 写操作 void write(uint64_t off, bufferlist& bl, uint64_t truncate_size, uint32_t truncate_seq) { add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); // 添加data, 将WRITE存入ops,将数据放在op中 OSDOp& o = *ops.rbegin(); o.op.extent.truncate_size = truncate_size; o.op.extent.truncate_seq = truncate_seq; } void write(uint64_t off, bufferlist& bl) {} void write_full(bufferlist& bl) {} void append(bufferlist& bl) {} void zero(uint64_t off, uint64_t len) {} void truncate(uint64_t off) {} void remove() {} void mapext(uint64_t off, uint64_t len) {} void sparse_read(uint64_t off, uint64_t len) {} void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len, uint64_t dst_offset) {} // object attrs // 属性操作 void getxattr(const char *name, bufferlist *pbl, int *prval) {} void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {} void setxattr(const char *name, const bufferlist& bl) {} void setxattr(const char *name, const string& s) {} void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& bl) {} void rmxattr(const char *name) {} void setxattrs(map<string, bufferlist>& attrs) {} void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {} // trivialmap void tmap_update(bufferlist& bl) {} void tmap_put(bufferlist& bl) {} void tmap_get(bufferlist *pbl, int *prval) {} void tmap_get() {} void tmap_to_omap(bool nullok=false) {} // objectmap void omap_get_keys(const string &start_after, uint64_t max_to_get, std::set<std::string> *out_set, int *prval) { OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS); bufferlist bl; ::encode(start_after, bl); ::encode(max_to_get, bl); op.op.extent.offset = 0; op.op.extent.length = bl.length(); op.indata.claim_append(bl); if (prval || out_set) { unsigned p = ops.size() - 1; C_ObjectOperation_decodekeys *h = new C_ObjectOperation_decodekeys(out_set, prval); out_handler[p] = h; out_bl[p] = &h->bl; out_rval[p] = prval; } } void omap_get_vals(const string &start_after, const string &filter_prefix, uint64_t max_to_get, std::map<std::string, bufferlist> *out_set, int *prval) {} void omap_get_vals_by_keys(const std::set<std::string> &to_get, std::map<std::string, bufferlist> *out_set, int *prval) {} void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions, int *prval) {} void copy_get(object_copy_cursor_t *cursor, uint64_t max, uint64_t *out_size, ceph::real_time *out_mtime, std::map<std::string,bufferlist> *out_attrs, bufferlist *out_data, bufferlist *out_omap_header, bufferlist *out_omap_data, vector<snapid_t> *out_snaps, snapid_t *out_snap_seq, uint32_t *out_flags, uint32_t *out_data_digest, uint32_t *out_omap_digest, vector<pair<osd_reqid_t, version_t> > *out_reqids, uint64_t *truncate_seq, uint64_t *truncate_size, int *prval) {} void undirty() {} struct C_ObjectOperation_isdirty : public Context {}; void is_dirty(bool *pisdirty, int *prval) {} void omap_get_header(bufferlist *bl, int *prval) {} void omap_set(const map<string, bufferlist> &map) {} void omap_set_header(bufferlist &bl) {} void omap_clear() {} void omap_rm_keys(const std::set<std::string> &to_remove) {} // object classes void call(const char *cname, const char *method, bufferlist &indata) {} void call(const char *cname, const char *method, bufferlist &indata, bufferlist *outdata, Context *ctx, int *prval) {} void rollback(uint64_t snapid) {} void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc, version_t src_version, unsigned flags, unsigned src_fadvise_flags) {} };
OSDOp osd_types.h struct OSDOp { ceph_osd_op op; // 操作 sobject_t soid; // oid bufferlist indata, outdata; // 输入输出data int32_t rval; // 返回码 };
Objecter class Objecter : public md_config_obs_t, public Dispatcher { public: Messenger *messenger; // 消息 MonClient *monc; // mc Finisher *finisher; private: OSDMap *osdmap; // osdmap public: using Dispatcher::cct; std::multimap<string,string> crush_location; atomic_t initialized; private: atomic64_t last_tid; atomic_t inflight_ops; atomic_t client_inc; uint64_t max_linger_id; atomic_t num_unacked; atomic_t num_uncommitted; atomic_t global_op_flags; // flags which are applied to each IO op bool keep_balanced_budget; bool honor_osdmap_full; public: void maybe_request_map(); private: void _maybe_request_map(); version_t last_seen_osdmap_version; version_t last_seen_pgmap_version; mutable boost::shared_mutex rwlock; using lock_guard = std::unique_lock<decltype(rwlock)>; using unique_lock = std::unique_lock<decltype(rwlock)>; using shared_lock = boost::shared_lock<decltype(rwlock)>; using shunique_lock = ceph::shunique_lock<decltype(rwlock)>; ceph::timer<ceph::mono_clock> timer; PerfCounters *logger; uint64_t tick_event; void start_tick(); void tick(); void update_crush_location(); public: /*** track pending operations ***/ // read public: struct OSDSession; struct op_target_t {} // 操作目标 struct Op : public RefCountedObject {}; // 操作 };
操作目标,封装pg信息,osd信息 struct op_target_t { int flags; object_t base_oid; object_locator_t base_oloc; object_t target_oid; // 目标oid object_locator_t target_oloc; // 位置 // 是否 base_pgid bool precalc_pgid; ///< true if we are directed at base_pgid, not base_oid // 直接的 pgid pg_t base_pgid; ///< explciti pg target, if any pg_t pgid; ///< last pg we mapped to unsigned pg_num; ///< last pg_num we mapped to unsigned pg_num_mask; ///< last pg_num_mask we mapped to // 启动的osd vector<int> up; ///< set of up osds for last pg we mapped to // acting osd vector<int> acting; ///< set of acting osds for last pg we mapped to // primary int up_primary; ///< primary for last pg we mapped to based on the up set int acting_primary; ///< primary for last pg we mapped to based on the /// acting set // pool 大小 int size; ///< the size of the pool when were were last mapped // pool 最小size int min_size; ///< the min size of the pool when were were last mapped // 是否按位排序 bool sort_bitwise; ///< whether the hobject_t sort order is bitwise // 是否副本 bool used_replica; bool paused; int osd; ///< the final target osd, or -1 };
操作 struct Op : public RefCountedObject { OSDSession *session; // session 连接 int incarnation; op_target_t target; // 操作目标 ConnectionRef con; // for rx buffer only uint64_t features; // explicitly specified op features vector<OSDOp> ops; // 操作集合 snapid_t snapid; SnapContext snapc; ceph::real_time mtime; bufferlist *outbl; vector<bufferlist*> out_bl; vector<Context*> out_handler; vector<int*> out_rval; int priority; Context *onack, *oncommit; uint64_t ontimeout; Context *oncommit_sync; // used internally by watch/notify ceph_tid_t tid; eversion_t replay_version; // for op replay int attempts; version_t *objver; epoch_t *reply_epoch; ceph::mono_time stamp; epoch_t map_dne_bound; bool budgeted; /// true if we should resend this message on failure bool should_resend; /// true if the throttle budget is get/put on a series of OPs, /// instead of per OP basis, when this flag is set, the budget is /// acquired before sending the very first OP of the series and /// released upon receiving the last OP reply. bool ctx_budgeted; int *data_offset; epoch_t last_force_resend; osd_reqid_t reqid; // explicitly setting reqid };
分片 Striper
扩展 ObjectExtent
记录分片信息 class ObjectExtent { public: object_t oid; // object id uint64_t objectno; // 序号 uint64_t offset; // in object object内偏移 uint64_t length; // in object 分片长度 uint64_t truncate_size; // in object object_locator_t oloc; // object locator (pool etc) pool位置 vector<pair<uint64_t,uint64_t> > buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!) };