diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 24ec4415f..92fdfd9e5 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -164,21 +164,20 @@ insert_channel_info(GwName, ClientId, Info, Stats) -> %% @doc Get info of a channel. -spec get_chan_info(gateway_name(), emqx_types:clientid()) - -> emqx_types:infos() | undefined. + -> emqx_types:infos() | undefined. get_chan_info(GwName, ClientId) -> with_channel(GwName, ClientId, fun(ChanPid) -> get_chan_info(GwName, ClientId, ChanPid) end). --spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> - [pid()]. +-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()]. do_lookup_by_clientid(GwName, ClientId) -> ChanTab = emqx_gateway_cm:tabname(chan, GwName), [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)]. -spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid()) - -> emqx_types:infos() | undefined. + -> emqx_types:infos() | undefined. do_get_chan_info(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try @@ -189,15 +188,17 @@ do_get_chan_info(GwName, ClientId, ChanPid) -> end. -spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) - -> emqx_types:infos() | undefined. + -> emqx_types:infos() | undefined. get_chan_info(GwName, ClientId, ChanPid) -> - wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). + wrap_rpc( + emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid) + ). --spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> - [pid()]. +-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()]. lookup_by_clientid(GwName, ClientId) -> Nodes = mria_mnesia:running_nodes(), - case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of + case emqx_gateway_cm_proto_v1:lookup_by_clientid( + Nodes, GwName, ClientId) of {Pids, []} -> lists:append(Pids); {_, _BadNodes} -> @@ -565,41 +566,54 @@ do_get_chann_conn_mod(GwName, ClientId, ChanPid) -> get_chann_conn_mod(GwName, ClientId, ChanPid) -> wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)). --spec call(gateway_name(), emqx_types:clientid(), term()) -> term(). +-spec call(gateway_name(), emqx_types:clientid(), term()) + -> undefined | term(). call(GwName, ClientId, Req) -> - with_channel(GwName, ClientId, fun(ChanPid) -> - wrap_rpc(emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req)) - end). + with_channel( + GwName, ClientId, + fun(ChanPid) -> + wrap_rpc( + emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req) + ) + end). --spec call(gateway_name(), emqx_types:clientid(), term(), timeout()) -> term(). +-spec call(gateway_name(), emqx_types:clientid(), term(), timeout()) + -> undefined | term(). call(GwName, ClientId, Req, Timeout) -> - with_channel(GwName, ClientId, fun(ChanPid) -> - wrap_rpc( - emqx_gateway_cm_proto_v1:call( - GwName, ClientId, ChanPid, Req, Timeout)) - end). + with_channel( + GwName, ClientId, + fun(ChanPid) -> + wrap_rpc( + emqx_gateway_cm_proto_v1:call( + GwName, ClientId, ChanPid, Req, Timeout) + ) + end). do_call(GwName, ClientId, ChanPid, Req) -> case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of - undefined -> error(noproc); + undefined -> throw(noproc); ConnMod -> ConnMod:call(ChanPid, Req) end. do_call(GwName, ClientId, ChanPid, Req, Timeout) -> case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of - undefined -> error(noproc); + undefined -> throw(noproc); ConnMod -> ConnMod:call(ChanPid, Req, Timeout) end. --spec cast(gateway_name(), emqx_types:clientid(), term()) -> term(). +-spec cast(gateway_name(), emqx_types:clientid(), term()) -> ok. cast(GwName, ClientId, Req) -> - with_channel(GwName, ClientId, fun(ChanPid) -> - wrap_rpc(emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req)) - end). + with_channel( + GwName, ClientId, + fun(ChanPid) -> + wrap_rpc( + emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req)) + end), + ok. do_cast(GwName, ClientId, ChanPid, Req) -> case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of - undefined -> error(noproc); + undefined -> throw(noproc); ConnMod -> ConnMod:cast(ChanPid, Req) end. @@ -625,7 +639,7 @@ locker_unlock(Locker, ClientId) -> %% @private wrap_rpc(Ret) -> case Ret of - {badrpc, Reason} -> error(Reason); + {badrpc, Reason} -> throw({badrpc, Reason}); Res -> Res end. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index bb4bfa7f9..4802aa6ff 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -255,48 +255,39 @@ kickout_client(GwName, ClientId) -> -> {error, any()} | {ok, list()}. list_client_subscriptions(GwName, ClientId) -> - with_channel(GwName, ClientId, - fun(Pid) -> - case emqx_gateway_conn:call( - Pid, - subscriptions, ?DEFAULT_CALL_TIMEOUT) of - {ok, Subs} -> - {ok, lists:map(fun({Topic, SubOpts}) -> - SubOpts#{topic => Topic} - end, Subs)}; - {error, Reason} -> - {error, Reason} - end - end). + case client_call(GwName, ClientId, subscriptions) of + {error, Reason} -> {error, Reason}; + {ok, Subs} -> + {ok, lists:map(fun({Topic, SubOpts}) -> + SubOpts#{topic => Topic} + end, Subs)} + end. -spec client_subscribe(gateway_name(), emqx_types:clientid(), emqx_types:topic(), emqx_types:subopts()) -> {error, any()} | {ok, {emqx_types:topic(), emqx_types:subopts()}}. client_subscribe(GwName, ClientId, Topic, SubOpts) -> - with_channel(GwName, ClientId, - fun(Pid) -> - emqx_gateway_conn:call( - Pid, {subscribe, Topic, SubOpts}, - ?DEFAULT_CALL_TIMEOUT - ) - end). + client_call(GwName, ClientId, {subscribe, Topic, SubOpts}). -spec client_unsubscribe(gateway_name(), emqx_types:clientid(), emqx_types:topic()) -> {error, any()} | ok. client_unsubscribe(GwName, ClientId, Topic) -> - with_channel(GwName, ClientId, - fun(Pid) -> - emqx_gateway_conn:call( - Pid, {unsubscribe, Topic}, ?DEFAULT_CALL_TIMEOUT) - end). + client_call(GwName, ClientId, {unsubscribe, Topic}). -with_channel(GwName, ClientId, Fun) -> - case emqx_gateway_cm:with_channel(GwName, ClientId, Fun) of - undefined -> {error, not_found}; +client_call(GwName, ClientId, Req) -> + try emqx_gateway_cm:call( + GwName, ClientId, + Req, ?DEFAULT_CALL_TIMEOUT) of + undefined -> + {error, not_found}; Res -> Res + catch throw : noproc -> + {error, not_found}; + throw : {badrpc, Reason} -> + {error, {badrpc, Reason}} end. %%--------------------------------------------------------------------