diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index bbd347ae9..6631fd23a 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -182,7 +182,6 @@ basic_config(#{ replayq := ReplayQ, ssl := #{enable := EnableSsl} = Ssl}) -> #{ - conn_type => mqtt, replayq => ReplayQ, %% connection opts server => Server, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index c8b7ff77b..3de7feac4 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -50,7 +50,7 @@ start(Config) -> Parent = self(), {Host, Port} = maps:get(server, Config), Mountpoint = maps:get(receive_mountpoint, Config, undefined), - Subscriptions = maps:get(subscriptions, Config), + Subscriptions = maps:get(subscriptions, Config, undefined), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), Handlers = make_hdlr(Parent, Vars), Config1 = Config#{ diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 425fa06f1..7f8435fd1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -19,7 +19,7 @@ -export([ to_binary/1 , from_binary/1 , make_pub_vars/2 - , to_remote_msg/3 + , to_remote_msg/2 , to_broker_msg/2 , estimate_size/1 ]). @@ -55,13 +55,13 @@ make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, local_topic := %% Shame that we have to know the callback module here %% would be great if we can get rid of #mqtt_msg{} record %% and use #message{} in all places. --spec to_remote_msg(emqx_bridge_rpc | emqx_connector_mqtt_mod, msg(), variables()) +-spec to_remote_msg(msg(), variables()) -> exp_msg(). -to_remote_msg(emqx_connector_mqtt_mod, #message{flags = Flags0} = Msg, Vars) -> +to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> Retain0 = maps:get(retain, Flags0, false), MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), - to_remote_msg(emqx_connector_mqtt_mod, MapMsg, Vars); -to_remote_msg(emqx_connector_mqtt_mod, MapMsg, #{topic := TopicToken, payload := PayloadToken, + to_remote_msg(MapMsg, Vars); +to_remote_msg(MapMsg, #{topic := TopicToken, payload := PayloadToken, qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg), @@ -72,7 +72,7 @@ to_remote_msg(emqx_connector_mqtt_mod, MapMsg, #{topic := TopicToken, payload := topic = topic(Mountpoint, Topic), props = #{}, payload = Payload}; -to_remote_msg(_Module, #message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> +to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 83f7ce746..6ced719df 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -185,12 +185,10 @@ callback_mode() -> [state_functions]. init(#{name := Name} = ConnectOpts) -> ?LOG(info, "starting bridge worker for ~p", [Name]), erlang:process_flag(trap_exit, true), - ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), State = init_state(ConnectOpts), self() ! idle, {ok, idle, State#{ - connect_module => ConnectModule, connect_opts => pre_process_opts(ConnectOpts), replayq => Queue }}. @@ -311,15 +309,13 @@ connected(Type, Content, State) -> %% Common handlers common(StateName, {call, From}, status, _State) -> {keep_state_and_data, [{reply, From, StateName}]}; -common(_StateName, {call, From}, ping, #{connection := Conn, - connect_module := ConnectModule} =_State) -> - Reply = ConnectModule:ping(Conn), +common(_StateName, {call, From}, ping, #{connection := Conn} =_State) -> + Reply = emqx_connector_mqtt_mod:ping(Conn), {keep_state_and_data, [{reply, From, Reply}]}; common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) -> {keep_state_and_data, [{reply, From, ok}]}; -common(_StateName, {call, From}, ensure_stopped, #{connection := Conn, - connect_module := ConnectModule} = State) -> - Reply = ConnectModule:stop(Conn), +common(_StateName, {call, From}, ensure_stopped, #{connection := Conn} = State) -> + Reply = emqx_connector_mqtt_mod:stop(Conn), {next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]}; common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) -> {keep_state_and_data, [{reply, From, Forwards}]}; @@ -341,22 +337,21 @@ common(StateName, Type, Content, #{name := Name} = State) -> [Name, Type, StateName, Content]), {keep_state, State}. -do_connect(#{connect_module := ConnectModule, - connect_opts := ConnectOpts = #{forwards := Forwards}, +do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards}, inflight := Inflight, name := Name} = State) -> case Forwards of undefined -> ok; #{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name) end, - case ConnectModule:start(ConnectOpts) of + case emqx_connector_mqtt_mod:start(ConnectOpts) of {ok, Conn} -> ?tp(info, connected, #{name => Name, inflight => length(Inflight)}), {ok, State#{connection => Conn}}; {error, Reason} -> ConnectOpts1 = obfuscate(ConnectOpts), - ?LOG(error, "Failed to connect with module=~p\n" - "config=~p\nreason:~p", [ConnectModule, ConnectOpts1, Reason]), + ?LOG(error, "Failed to connect \n" + "config=~p\nreason:~p", [ConnectOpts1, Reason]), {error, Reason, State} end. @@ -385,16 +380,13 @@ pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) -> pop_and_send_loop(State, 0) -> ?tp(debug, inflight_full, #{}), {ok, State}; -pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) -> +pop_and_send_loop(#{replayq := Q} = State, N) -> case replayq:is_empty(Q) of true -> ?tp(debug, replayq_drained, #{}), {ok, State}; false -> - BatchSize = case Module of - emqx_bridge_rpc -> maps:get(batch_size, State); - _ -> 1 - end, + BatchSize = 1, Opts = #{count_limit => BatchSize, bytes_limit => 999999999}, {Q1, QAckRef, Batch} = replayq:pop(Q, Opts), case do_send(State#{replayq := Q1}, QAckRef, Batch) of @@ -407,7 +399,6 @@ pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) -> do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) -> ?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt..in' not configured, msg: ~p", [Batch]); do_send(#{inflight := Inflight, - connect_module := Module, connection := Connection, mountpoint := Mountpoint, connect_opts := #{forwards := Forwards}, @@ -415,10 +406,10 @@ do_send(#{inflight := Inflight, Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), ExportMsg = fun(Message) -> bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'), - emqx_connector_mqtt_msg:to_remote_msg(Module, Message, Vars) + emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) end, ?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]), - case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of + case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of {ok, Refs} -> {ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef, send_ack_ref => map_set(Refs), @@ -492,10 +483,8 @@ do_subscribe(RawTopic, Name) -> {Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}), emqx_broker:subscribe(Topic, Name, SubOpts). -disconnect(#{connection := Conn, - connect_module := Module - } = State) when Conn =/= undefined -> - Module:stop(Conn), +disconnect(#{connection := Conn} = State) when Conn =/= undefined -> + emqx_connector_mqtt_mod:stop(Conn), State#{connection => undefined}; disconnect(State) -> State. @@ -538,13 +527,6 @@ obfuscate(Map) -> is_sensitive(password) -> true; is_sensitive(_) -> false. -conn_type(rpc) -> - emqx_bridge_rpc; -conn_type(mqtt) -> - emqx_connector_mqtt_mod; -conn_type(Mod) when is_atom(Mod) -> - Mod. - str(A) when is_atom(A) -> atom_to_list(A); str(B) when is_binary(B) -> diff --git a/apps/emqx_connector/test/emqx_connetor_mqtt_tests.erl b/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl similarity index 93% rename from apps/emqx_connector/test/emqx_connetor_mqtt_tests.erl rename to apps/emqx_connector/test/emqx_connector_mqtt_tests.erl index 7943f5a77..0f4d651c9 100644 --- a/apps/emqx_connector/test/emqx_connetor_mqtt_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_bridge_mqtt_tests). +-module(emqx_connector_mqtt_tests). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -37,7 +37,7 @@ send_and_ack_test() -> try Max = 1, Batch = lists:seq(1, Max), - {ok, Conn} = emqx_connector_mqtt_mod:start(#{address => "127.0.0.1:1883"}), + {ok, Conn} = emqx_connector_mqtt_mod:start(#{server => {{127,0,0,1}, 1883}}), % %% return last packet id as batch reference {ok, _AckRef} = emqx_connector_mqtt_mod:send(Conn, Batch), diff --git a/apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl b/apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl new file mode 100644 index 000000000..090106cef --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl @@ -0,0 +1,149 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_mqtt_worker_tests). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +-define(BRIDGE_NAME, test). +-define(BRIDGE_REG_NAME, emqx_connector_mqtt_worker_test). +-define(WAIT(PATTERN, TIMEOUT), + receive + PATTERN -> + ok + after + TIMEOUT -> + error(timeout) + end). + +-export([start/1, send/2, stop/1]). + +start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) -> + case is_pid(Pid) of + true -> Pid ! {connection_start_attempt, Ref}; + false -> ok + end, + Result. + +send(SendFun, Batch) when is_function(SendFun, 2) -> + SendFun(Batch). + +stop(_Pid) -> ok. + +%% bridge worker should retry connecting remote node indefinitely +% reconnect_test() -> +% emqx_metrics:start_link(), +% emqx_connector_mqtt_worker:register_metrics(), +% Ref = make_ref(), +% Config = make_config(Ref, self(), {error, test}), +% {ok, Pid} = emqx_connector_mqtt_worker:start_link(?BRIDGE_NAME, Config), +% %% assert name registered +% ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), +% ?WAIT({connection_start_attempt, Ref}, 1000), +% %% expect same message again +% ?WAIT({connection_start_attempt, Ref}, 1000), +% ok = emqx_connector_mqtt_worker:stop(?BRIDGE_REG_NAME), +% emqx_metrics:stop(), +% ok. + +%% connect first, disconnect, then connect again +disturbance_test() -> + meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]), + meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end), + meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end), + meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end), + try + emqx_metrics:start_link(), + emqx_connector_mqtt_worker:register_metrics(), + Ref = make_ref(), + TestPid = self(), + Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}), + {ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => bridge_disturbance}), + ?assertEqual(Pid, whereis(bridge_disturbance)), + ?WAIT({connection_start_attempt, Ref}, 1000), + Pid ! {disconnected, TestPid, test}, + ?WAIT({connection_start_attempt, Ref}, 1000), + emqx_metrics:stop(), + ok = emqx_connector_mqtt_worker:stop(Pid) + after + meck:unload(emqx_connector_mqtt_mod) + end. + +% % %% buffer should continue taking in messages when disconnected +% buffer_when_disconnected_test_() -> +% {timeout, 10000, fun test_buffer_when_disconnected/0}. + +% test_buffer_when_disconnected() -> +% Ref = make_ref(), +% Nums = lists:seq(1, 100), +% Sender = spawn_link(fun() -> receive {bridge, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end), +% SenderMref = monitor(process, Sender), +% Receiver = spawn_link(fun() -> receive {bridge, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end), +% ReceiverMref = monitor(process, Receiver), +% SendFun = fun(Batch) -> +% BatchRef = make_ref(), +% Receiver ! {batch, BatchRef, Batch}, +% {ok, BatchRef} +% end, +% Config0 = make_config(Ref, false, {ok, #{client_pid => undefined}}), +% Config = Config0#{reconnect_delay_ms => 100}, +% emqx_metrics:start_link(), +% emqx_connector_mqtt_worker:register_metrics(), +% {ok, Pid} = emqx_connector_mqtt_worker:start_link(?BRIDGE_NAME, Config), +% Sender ! {bridge, Pid}, +% Receiver ! {bridge, Pid}, +% ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), +% Pid ! {disconnected, Ref, test}, +% ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000), +% ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), +% ok = emqx_connector_mqtt_worker:stop(?BRIDGE_REG_NAME), +% emqx_metrics:stop(). + +manual_start_stop_test() -> + meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]), + meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end), + meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end), + meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end), + try + emqx_metrics:start_link(), + emqx_connector_mqtt_worker:register_metrics(), + Ref = make_ref(), + TestPid = self(), + BridgeName = manual_start_stop, + Config0 = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}), + Config = Config0#{start_type := manual}, + {ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => BridgeName}), + %% call ensure_started again should yeld the same result + ok = emqx_connector_mqtt_worker:ensure_started(BridgeName), + emqx_connector_mqtt_worker:ensure_stopped(BridgeName), + emqx_metrics:stop(), + ok = emqx_connector_mqtt_worker:stop(Pid) + after + meck:unload(emqx_connector_mqtt_mod) + end. + +make_config(Ref, TestPid, Result) -> + #{ + start_type => auto, + subscriptions => undefined, + forwards => undefined, + reconnect_interval => 50, + test_pid => TestPid, + test_ref => Ref, + connect_result => Result + }. diff --git a/apps/emqx_plugin_libs/test/emqx_rule_libs_rule_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl similarity index 99% rename from apps/emqx_plugin_libs/test/emqx_rule_libs_rule_SUITE.erl rename to apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl index e4c358695..56733147f 100644 --- a/apps/emqx_plugin_libs/test/emqx_rule_libs_rule_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_rule_utils_SUITE). +-module(emqx_plugin_libs_rule_SUITE). -compile(export_all). -compile(nowarn_export_all).