chore(gw): use emqx_gateway_cm:call/4 instead ConnMod:call/3

This commit is contained in:
JianBo He 2022-03-09 12:55:44 +08:00
parent 9422ac4bc1
commit 5436a3c496
2 changed files with 60 additions and 55 deletions

View File

@ -171,8 +171,7 @@ get_chan_info(GwName, ClientId) ->
get_chan_info(GwName, ClientId, ChanPid) get_chan_info(GwName, ClientId, ChanPid)
end). end).
-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> -spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()].
[pid()].
do_lookup_by_clientid(GwName, ClientId) -> do_lookup_by_clientid(GwName, ClientId) ->
ChanTab = emqx_gateway_cm:tabname(chan, GwName), ChanTab = emqx_gateway_cm:tabname(chan, GwName),
[Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)]. [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)].
@ -191,13 +190,15 @@ do_get_chan_info(GwName, ClientId, ChanPid) ->
-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:infos() | undefined. -> emqx_types:infos() | undefined.
get_chan_info(GwName, ClientId, ChanPid) -> get_chan_info(GwName, ClientId, ChanPid) ->
wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). wrap_rpc(
emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)
).
-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> -spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()].
[pid()].
lookup_by_clientid(GwName, ClientId) -> lookup_by_clientid(GwName, ClientId) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of case emqx_gateway_cm_proto_v1:lookup_by_clientid(
Nodes, GwName, ClientId) of
{Pids, []} -> {Pids, []} ->
lists:append(Pids); lists:append(Pids);
{_, _BadNodes} -> {_, _BadNodes} ->
@ -565,41 +566,54 @@ do_get_chann_conn_mod(GwName, ClientId, ChanPid) ->
get_chann_conn_mod(GwName, ClientId, ChanPid) -> get_chann_conn_mod(GwName, ClientId, ChanPid) ->
wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)). wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)).
-spec call(gateway_name(), emqx_types:clientid(), term()) -> term(). -spec call(gateway_name(), emqx_types:clientid(), term())
-> undefined | term().
call(GwName, ClientId, Req) -> call(GwName, ClientId, Req) ->
with_channel(GwName, ClientId, fun(ChanPid) -> with_channel(
wrap_rpc(emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req)) GwName, ClientId,
fun(ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req)
)
end). end).
-spec call(gateway_name(), emqx_types:clientid(), term(), timeout()) -> term(). -spec call(gateway_name(), emqx_types:clientid(), term(), timeout())
-> undefined | term().
call(GwName, ClientId, Req, Timeout) -> call(GwName, ClientId, Req, Timeout) ->
with_channel(GwName, ClientId, fun(ChanPid) -> with_channel(
GwName, ClientId,
fun(ChanPid) ->
wrap_rpc( wrap_rpc(
emqx_gateway_cm_proto_v1:call( emqx_gateway_cm_proto_v1:call(
GwName, ClientId, ChanPid, Req, Timeout)) GwName, ClientId, ChanPid, Req, Timeout)
)
end). end).
do_call(GwName, ClientId, ChanPid, Req) -> do_call(GwName, ClientId, ChanPid, Req) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> error(noproc); undefined -> throw(noproc);
ConnMod -> ConnMod:call(ChanPid, Req) ConnMod -> ConnMod:call(ChanPid, Req)
end. end.
do_call(GwName, ClientId, ChanPid, Req, Timeout) -> do_call(GwName, ClientId, ChanPid, Req, Timeout) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> error(noproc); undefined -> throw(noproc);
ConnMod -> ConnMod:call(ChanPid, Req, Timeout) ConnMod -> ConnMod:call(ChanPid, Req, Timeout)
end. end.
-spec cast(gateway_name(), emqx_types:clientid(), term()) -> term(). -spec cast(gateway_name(), emqx_types:clientid(), term()) -> ok.
cast(GwName, ClientId, Req) -> cast(GwName, ClientId, Req) ->
with_channel(GwName, ClientId, fun(ChanPid) -> with_channel(
wrap_rpc(emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req)) GwName, ClientId,
end). fun(ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req))
end),
ok.
do_cast(GwName, ClientId, ChanPid, Req) -> do_cast(GwName, ClientId, ChanPid, Req) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> error(noproc); undefined -> throw(noproc);
ConnMod -> ConnMod:cast(ChanPid, Req) ConnMod -> ConnMod:cast(ChanPid, Req)
end. end.
@ -625,7 +639,7 @@ locker_unlock(Locker, ClientId) ->
%% @private %% @private
wrap_rpc(Ret) -> wrap_rpc(Ret) ->
case Ret of case Ret of
{badrpc, Reason} -> error(Reason); {badrpc, Reason} -> throw({badrpc, Reason});
Res -> Res Res -> Res
end. end.

View File

@ -255,48 +255,39 @@ kickout_client(GwName, ClientId) ->
-> {error, any()} -> {error, any()}
| {ok, list()}. | {ok, list()}.
list_client_subscriptions(GwName, ClientId) -> list_client_subscriptions(GwName, ClientId) ->
with_channel(GwName, ClientId, case client_call(GwName, ClientId, subscriptions) of
fun(Pid) -> {error, Reason} -> {error, Reason};
case emqx_gateway_conn:call(
Pid,
subscriptions, ?DEFAULT_CALL_TIMEOUT) of
{ok, Subs} -> {ok, Subs} ->
{ok, lists:map(fun({Topic, SubOpts}) -> {ok, lists:map(fun({Topic, SubOpts}) ->
SubOpts#{topic => Topic} SubOpts#{topic => Topic}
end, Subs)}; end, Subs)}
{error, Reason} -> end.
{error, Reason}
end
end).
-spec client_subscribe(gateway_name(), emqx_types:clientid(), -spec client_subscribe(gateway_name(), emqx_types:clientid(),
emqx_types:topic(), emqx_types:subopts()) emqx_types:topic(), emqx_types:subopts())
-> {error, any()} -> {error, any()}
| {ok, {emqx_types:topic(), emqx_types:subopts()}}. | {ok, {emqx_types:topic(), emqx_types:subopts()}}.
client_subscribe(GwName, ClientId, Topic, SubOpts) -> client_subscribe(GwName, ClientId, Topic, SubOpts) ->
with_channel(GwName, ClientId, client_call(GwName, ClientId, {subscribe, Topic, SubOpts}).
fun(Pid) ->
emqx_gateway_conn:call(
Pid, {subscribe, Topic, SubOpts},
?DEFAULT_CALL_TIMEOUT
)
end).
-spec client_unsubscribe(gateway_name(), -spec client_unsubscribe(gateway_name(),
emqx_types:clientid(), emqx_types:topic()) emqx_types:clientid(), emqx_types:topic())
-> {error, any()} -> {error, any()}
| ok. | ok.
client_unsubscribe(GwName, ClientId, Topic) -> client_unsubscribe(GwName, ClientId, Topic) ->
with_channel(GwName, ClientId, client_call(GwName, ClientId, {unsubscribe, Topic}).
fun(Pid) ->
emqx_gateway_conn:call(
Pid, {unsubscribe, Topic}, ?DEFAULT_CALL_TIMEOUT)
end).
with_channel(GwName, ClientId, Fun) -> client_call(GwName, ClientId, Req) ->
case emqx_gateway_cm:with_channel(GwName, ClientId, Fun) of try emqx_gateway_cm:call(
undefined -> {error, not_found}; GwName, ClientId,
Req, ?DEFAULT_CALL_TIMEOUT) of
undefined ->
{error, not_found};
Res -> Res Res -> Res
catch throw : noproc ->
{error, not_found};
throw : {badrpc, Reason} ->
{error, {badrpc, Reason}}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------