diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 71ed81dda..585122539 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -198,10 +198,7 @@ on_query_async( #{name := InstanceId} ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - %% this is a cast, currently. - ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}), - WorkerPid = get_worker_pid(InstanceId), - {ok, WorkerPid}. + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). on_get_status(_InstId, #{name := InstanceId}) -> case emqx_connector_mqtt_worker:status(InstanceId) of @@ -215,12 +212,6 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) -> {error, Reason} -> {error, Reason} end. -%% mqtt workers, when created and called via bridge callbacks, are -%% registered. --spec get_worker_pid(atom()) -> pid(). -get_worker_pid(InstanceId) -> - whereis(InstanceId). - make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined, _Conf, _) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index afe173985..6acbe3bb4 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -26,6 +26,8 @@ ping/1 ]). +-export([info/2]). + -export([ ensure_subscribed/3, ensure_unsubscribed/2 @@ -90,6 +92,9 @@ ping(undefined) -> ping(#{client_pid := Pid}) -> emqtt:ping(Pid). +info(pid, #{client_pid := Pid}) -> + Pid. + ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when is_pid(Pid) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 00b45789e..776d2d8d9 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -168,9 +168,9 @@ send_to_remote(Name, Msg) -> gen_statem:call(name(Name), {send_to_remote, Msg}). send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) -> - gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback}); + gen_statem:call(Pid, {send_to_remote_async, Msg, Callback}); send_to_remote_async(Name, Msg, Callback) -> - gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}). + gen_statem:call(name(Name), {send_to_remote_async, Msg, Callback}). %% @doc Return all forwards (local subscriptions). -spec get_forwards(id()) -> [topic()]. @@ -270,12 +270,14 @@ maybe_destroy_session(_State) -> idle({call, From}, ensure_started, State) -> case do_connect(State) of {ok, State1} -> - {next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]}; + {next_state, connected, State1, {reply, From, ok}}; {error, Reason, _State} -> {keep_state_and_data, {reply, From, {error, Reason}}} end; idle({call, From}, {send_to_remote, _}, _State) -> {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}}; +idle({call, From}, {send_to_remote_async, _, _}, _State) -> + {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}}; %% @doc Standing by for manual start. idle(info, idle, #{start_type := manual}) -> keep_state_and_data; @@ -290,14 +292,11 @@ idle(Type, Content, State) -> connecting(#{reconnect_interval := ReconnectDelayMs} = State) -> case do_connect(State) of {ok, State1} -> - {next_state, connected, State1, {state_timeout, 0, connected}}; + {next_state, connected, State1}; _ -> {keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}} end. -connected(state_timeout, connected, State) -> - %% nothing to do - {keep_state, State}; connected({call, From}, {send_to_remote, Msg}, State) -> case do_send(State, Msg) of {ok, NState} -> @@ -305,9 +304,13 @@ connected({call, From}, {send_to_remote, Msg}, State) -> {error, Reason} -> {keep_state_and_data, {reply, From, {error, Reason}}} end; -connected(cast, {send_to_remote_async, Msg, Callback}, State) -> +connected( + {call, From}, + {send_to_remote_async, Msg, Callback}, + State = #{connection := Connection} +) -> _ = do_send_async(State, Msg, Callback), - {keep_state, State}; + {keep_state, State, {reply, From, {ok, emqx_connector_mqtt_mod:info(pid, Connection)}}}; connected( info, {disconnected, Conn, Reason},