refactor(mqtt-worker): avoid unnecessary abstraction

So the code is easier to follow.
This commit is contained in:
Andrew Mayorov 2023-01-31 23:18:46 +03:00
parent 13511d2782
commit f0395be383
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 45 additions and 38 deletions

View File

@ -198,8 +198,9 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
ok ->
% TODO this is racy
{ok, emqx_connector_mqtt_worker:pid(InstanceId)};
ok;
{ok, Pid} ->
{ok, Pid};
{error, Reason} ->
classify_error(Reason)
end.

View File

@ -67,8 +67,7 @@
%% APIs
-export([
start_link/2,
stop/1,
pid/1
stop/1
]).
%% management APIs
@ -175,7 +174,7 @@ mk_client_event_handler(undefined, _Opts) ->
connect(Name) ->
#{subscriptions := Subscriptions} = get_config(Name),
case emqtt:connect(pid(Name)) of
case emqtt:connect(get_pid(Name)) of
{ok, Properties} ->
case subscribe_remote_topics(Name, Subscriptions) of
ok ->
@ -206,37 +205,28 @@ subscribe_remote_topics(_Ref, undefined) ->
stop(Ref) ->
emqtt:stop(ref(Ref)).
pid(Name) ->
gproc:lookup_pid(?NAME(Name)).
status(Ref) ->
trycall(
fun() ->
Info = emqtt:info(ref(Ref)),
case proplists:get_value(socket, Info) of
Socket when Socket /= undefined ->
connected;
undefined ->
connecting
end
end,
#{noproc => disconnected}
).
try
Info = emqtt:info(ref(Ref)),
case proplists:get_value(socket, Info) of
Socket when Socket /= undefined ->
connected;
undefined ->
connecting
end
catch
exit:{noproc, _} ->
disconnected
end.
ping(Ref) ->
emqtt:ping(ref(Ref)).
send_to_remote(Name, MsgIn) ->
trycall(
fun() -> do_send(Name, export_msg(Name, MsgIn)) end,
#{
badarg => {error, disconnected},
noproc => {error, disconnected}
}
).
trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end).
do_send(Name, {true, Msg}) ->
case emqtt:publish(pid(Name), Msg) of
case emqtt:publish(get_pid(Name), Msg) of
ok ->
ok;
{ok, #{reason_code := RC}} when
@ -263,13 +253,16 @@ do_send(_Name, false) ->
ok.
send_to_remote_async(Name, MsgIn, Callback) ->
trycall(
fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end,
#{badarg => {error, disconnected}}
).
trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end).
do_send_async(Name, {true, Msg}, Callback) ->
emqtt:publish_async(pid(Name), Msg, _Timeout = infinity, Callback);
Pid = get_pid(Name),
case emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback) of
ok ->
{ok, Pid};
{error, _} = Error ->
Error
end;
do_send_async(_Name, false, _Callback) ->
ok.
@ -278,14 +271,14 @@ ref(Pid) when is_pid(Pid) ->
ref(Term) ->
?REF(Term).
trycall(Fun, Else) ->
trycall(Fun) ->
try
Fun()
catch
error:badarg ->
maps:get(badarg, Else);
throw:noproc ->
{error, disconnected};
exit:{noproc, _} ->
maps:get(noproc, Else)
{error, disconnected}
end.
format_mountpoint(undefined) ->
@ -325,8 +318,21 @@ pre_process_conf(Key, Conf) ->
Conf#{Key => Val}
end.
get_pid(Name) ->
case gproc:where(?NAME(Name)) of
Pid when is_pid(Pid) ->
Pid;
undefined ->
throw(noproc)
end.
get_config(Name) ->
gproc:lookup_value(?NAME(Name)).
try
gproc:lookup_value(?NAME(Name))
catch
error:badarg ->
throw(noproc)
end.
export_msg(Name, Msg) ->
case get_config(Name) of