From f0395be3830a0baeeba43c7674e17733f875b47b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 31 Jan 2023 23:18:46 +0300 Subject: [PATCH] refactor(mqtt-worker): avoid unnecessary abstraction So the code is easier to follow. --- .../src/emqx_connector_mqtt.erl | 5 +- .../src/mqtt/emqx_connector_mqtt_worker.erl | 78 ++++++++++--------- 2 files changed, 45 insertions(+), 38 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 462bac0b8..c1a051836 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -198,8 +198,9 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of ok -> - % TODO this is racy - {ok, emqx_connector_mqtt_worker:pid(InstanceId)}; + ok; + {ok, Pid} -> + {ok, Pid}; {error, Reason} -> classify_error(Reason) end. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 85261a063..9fac20153 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -67,8 +67,7 @@ %% APIs -export([ start_link/2, - stop/1, - pid/1 + stop/1 ]). %% management APIs @@ -175,7 +174,7 @@ mk_client_event_handler(undefined, _Opts) -> connect(Name) -> #{subscriptions := Subscriptions} = get_config(Name), - case emqtt:connect(pid(Name)) of + case emqtt:connect(get_pid(Name)) of {ok, Properties} -> case subscribe_remote_topics(Name, Subscriptions) of ok -> @@ -206,37 +205,28 @@ subscribe_remote_topics(_Ref, undefined) -> stop(Ref) -> emqtt:stop(ref(Ref)). -pid(Name) -> - gproc:lookup_pid(?NAME(Name)). - status(Ref) -> - trycall( - fun() -> - Info = emqtt:info(ref(Ref)), - case proplists:get_value(socket, Info) of - Socket when Socket /= undefined -> - connected; - undefined -> - connecting - end - end, - #{noproc => disconnected} - ). + try + Info = emqtt:info(ref(Ref)), + case proplists:get_value(socket, Info) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + catch + exit:{noproc, _} -> + disconnected + end. ping(Ref) -> emqtt:ping(ref(Ref)). send_to_remote(Name, MsgIn) -> - trycall( - fun() -> do_send(Name, export_msg(Name, MsgIn)) end, - #{ - badarg => {error, disconnected}, - noproc => {error, disconnected} - } - ). + trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end). do_send(Name, {true, Msg}) -> - case emqtt:publish(pid(Name), Msg) of + case emqtt:publish(get_pid(Name), Msg) of ok -> ok; {ok, #{reason_code := RC}} when @@ -263,13 +253,16 @@ do_send(_Name, false) -> ok. send_to_remote_async(Name, MsgIn, Callback) -> - trycall( - fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end, - #{badarg => {error, disconnected}} - ). + trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end). do_send_async(Name, {true, Msg}, Callback) -> - emqtt:publish_async(pid(Name), Msg, _Timeout = infinity, Callback); + Pid = get_pid(Name), + case emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback) of + ok -> + {ok, Pid}; + {error, _} = Error -> + Error + end; do_send_async(_Name, false, _Callback) -> ok. @@ -278,14 +271,14 @@ ref(Pid) when is_pid(Pid) -> ref(Term) -> ?REF(Term). -trycall(Fun, Else) -> +trycall(Fun) -> try Fun() catch - error:badarg -> - maps:get(badarg, Else); + throw:noproc -> + {error, disconnected}; exit:{noproc, _} -> - maps:get(noproc, Else) + {error, disconnected} end. format_mountpoint(undefined) -> @@ -325,8 +318,21 @@ pre_process_conf(Key, Conf) -> Conf#{Key => Val} end. +get_pid(Name) -> + case gproc:where(?NAME(Name)) of + Pid when is_pid(Pid) -> + Pid; + undefined -> + throw(noproc) + end. + get_config(Name) -> - gproc:lookup_value(?NAME(Name)). + try + gproc:lookup_value(?NAME(Name)) + catch + error:badarg -> + throw(noproc) + end. export_msg(Name, Msg) -> case get_config(Name) of