From 6a1ebe299a897e499451bd7a9d31e8372077455f Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Fri, 28 Dec 2018 19:44:41 +0800 Subject: [PATCH] Merge emqx32 to emqx30 (#2112) --- Makefile | 3 +- etc/emqx.conf | 75 ++++++++++--- priv/emqx.schema | 38 +++++-- rebar.config | 2 +- src/emqx.app.src | 3 +- src/emqx_bridge.erl | 223 +++++++++++++++++++++++++++++-------- src/emqx_client.erl | 8 +- src/emqx_plugins.erl | 5 +- src/emqx_vm.erl | 1 - test/emqx_bridge_SUITE.erl | 23 ++-- 10 files changed, 290 insertions(+), 91 deletions(-) diff --git a/Makefile b/Makefile index 69312a1c2..41cdd5ab8 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ PROJECT = emqx PROJECT_DESCRIPTION = EMQ X Broker -DEPS = jsx gproc gen_rpc ekka esockd cowboy +DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq dep_jsx = hex-emqx 2.9.0 dep_gproc = hex-emqx 0.8.0 @@ -11,6 +11,7 @@ dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.0 dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.3 dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.1 dep_cowboy = hex-emqx 2.4.0 +dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1 NO_AUTOPATCH = cuttlefish diff --git a/etc/emqx.conf b/etc/emqx.conf index cb713c4d0..a47abddf7 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -142,6 +142,12 @@ cluster.autoclean = 5m ## Value: String ## cluster.k8s.namespace = default +## Kubernates Namespace +## +## Value: String +## cluster.k8s.namespace = default + + ##-------------------------------------------------------------------- ## Node ##-------------------------------------------------------------------- @@ -1598,6 +1604,16 @@ bridge.aws.start_type = manual ## Default: 30 seconds bridge.aws.reconnect_interval = 30s +## Retry interval for bridge QoS1 message delivering. +## +## Value: Duration +bridge.aws.retry_interval = 20s + +## Inflight size. +## +## Value: Integer +bridge.aws.max_inflight = 32 + ## Bridge address: node name for local bridge, host:port for remote. ## ## Value: String @@ -1673,16 +1689,33 @@ bridge.aws.subscription.2.topic = cmd/topic2 ## Value: Number bridge.aws.subscription.2.qos = 1 -## Bridge message queue message type. +## If enabled, queue would be written into disk more quickly. +## However, If disabled, some message would be dropped in +## the situation emqx crashed. ## -## Value: Enum -## Example: memory | disk -bridge.aws.mqueue_type = memory +## Value: on | off +bridge.aws.queue.mem_cache = on -## The pending message queue of a bridge. +## Batch size for buffer queue stored ## -## Value: Number -bridge.aws.max_pending_messages = 10000 +## Value: Integer +## default: 1000 +bridge.aws.queue.batch_size = 1000 + +## Base directory for replayq to store messages on disk +## If this config entry is missing or set to undefined, +## replayq works in a mem-only manner. If the config +## entry was set to `bridge.aws.mqueue_type = memory` +## this config entry would have no effect on mqueue +## +## Value: String +bridge.aws.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ + +## Replayq segment size +## +## Value: Bytesize + +bridge.aws.queue.replayq_seg_bytes = 10MB ## Bribge to remote server via SSL. ## @@ -1735,6 +1768,11 @@ bridge.aws.ssl = off ## Default: 30 seconds ## bridge.azure.reconnect_time = 30s +## Retry interval for bridge QoS1 message delivering. +## +## Value: Duration +## bridge.azure.retry_interval = 20s + ## Bridge address: node name for local bridge, host:port for remote. ## ## Value: String @@ -1810,17 +1848,26 @@ bridge.aws.ssl = off ## Value: Number ## bridge.azure.subscription.2.qos = 1 -## Bridge store message type. +## Batch size for buffer queue stored ## -## Value: Enum -## Example: memory | disk -## bridge.azure.store_type = memory +## Value: Integer +## default: 1000 +## bridge.azure.queue.batch_size = 1000 -## The pending message queue of a bridge. +## Base directory for replayq to store messages on disk +## If this config entry is missing or set to undefined, +## replayq works in a mem-only manner. If the config +## entry was set to `bridge.aws.mqueue_type = memory` +## this config entry would have no effect on mqueue ## -## Value: Number -## bridge.azure.max_pending_messages = 10000 +## Value: String +## Default: {{ platform_data_dir }}/emqx_aws_bridge/ +## bridge.azure.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/ +## Replayq segment size +## +## Value: Bytesize +## bridge.azure.queue.replayq_seg_bytes = 10MB ## PEM-encoded CA certificates of the bridge. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index f74b53e7f..134931016 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1495,8 +1495,20 @@ end}. %%-------------------------------------------------------------------- %% Bridges %%-------------------------------------------------------------------- -{mapping, "bridge.$name.mqueue_type", "emqx.bridges", [ - {datatype, {enum, [memory, disk]}} +{mapping, "bridge.$name.queue.mem_cache", "emqx.bridges", [ + {datatype, flag} +]}. + +{mapping, "bridge.$name.queue.batch_size", "emqx.bridges", [ + {datatype, integer} +]}. + +{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [ + {datatype, bytesize} ]}. {mapping, "bridge.$name.address", "emqx.bridges", [ @@ -1554,11 +1566,6 @@ end}. {datatype, string} ]}. -{mapping, "bridge.$name.max_pending_messages", "emqx.bridges", [ - {default, 10000}, - {datatype, integer} -]}. - {mapping, "bridge.$name.keepalive", "emqx.bridges", [ {default, "10s"}, {datatype, {duration, ms}} @@ -1587,6 +1594,15 @@ end}. {datatype, {duration, ms}} ]}. +{mapping, "bridge.$name.retry_interval", "emqx.bridges", [ + {default, "20s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "bridge.$name.max_inflight", "emqx.bridges", [ + {default, 0}, + {datatype, integer} +]}. {translation, "emqx.bridges", fun(Conf) -> Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, @@ -1618,7 +1634,11 @@ end}. [{Opt, Val}|Opts] end end, - + Queue = fun(Name) -> + Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".queue", Conf), + QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, "queue", QOpt], QValue} <- Configs], + maps:from_list(QOpts) + end, Subscriptions = fun(Name) -> Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], @@ -1629,7 +1649,7 @@ end}. lists:foldl( fun({["bridge", Name, Opt], Val}, Acc) -> %% e.g #{aws => [{OptKey, OptVal}]} - Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}], + Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}, {queue, Queue(Name)}], maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); (_, Acc) -> Acc diff --git a/rebar.config b/rebar.config index d3dabae81..8fa9663ae 100644 --- a/rebar.config +++ b/rebar.config @@ -7,6 +7,7 @@ {github_emqx_deps, [{gen_rpc, "2.3.0"}, {ekka, "v0.5.1"}, + {replayq, "v0.1.1"}, {esockd, "v5.4.3"}, {cuttlefish, "v2.2.0"} ]}. @@ -26,4 +27,3 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. - diff --git a/src/emqx.app.src b/src/emqx.app.src index ce643634e..f23bf4792 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,8 +3,7 @@ {vsn,"git"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd, - cowboy]}, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index b1ccdc44b..46107e877 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -1,3 +1,4 @@ + %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -30,9 +31,17 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {client_pid, options, reconnect_interval, - mountpoint, queue, mqueue_type, max_pending_messages, - forwards = [], subscriptions = []}). +-record(state, {client_pid :: pid(), + options :: list(), + reconnect_interval :: pos_integer(), + mountpoint :: binary(), + readq :: list(), + writeq :: list(), + replayq :: map(), + ackref :: replayq:ack_ref(), + queue_option :: map(), + forwards :: list(), + subscriptions :: list()}). -record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, packet_id, topic, props, payload}). @@ -104,20 +113,85 @@ init([Options]) -> auto -> erlang:send_after(1000, self(), start) end, ReconnectInterval = get_value(reconnect_interval, Options, 30000), - MaxPendingMsg = get_value(max_pending_messages, Options, 10000), Mountpoint = format_mountpoint(get_value(mountpoint, Options)), - MqueueType = get_value(mqueue_type, Options, memory), - Queue = [], + QueueOptions = get_value(queue, Options), {ok, #state{mountpoint = Mountpoint, - queue = Queue, - mqueue_type = MqueueType, + queue_option = QueueOptions, + readq = [], + writeq = [], options = Options, - reconnect_interval = ReconnectInterval, - max_pending_messages = MaxPendingMsg}}. + reconnect_interval = ReconnectInterval}}. -handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> - {noreply, NewState} = handle_info(start, State), - {reply, #{msg => <<"start bridge successfully">>}, NewState}; +handle_call(start_bridge, _From, State = #state{options = Options, + replayq = undefined, + client_pid = undefined, + queue_option = #{batch_size := BatchSize, + replayq_dir := ReplayqDir, + replayq_seg_bytes := ReplayqSegBytes}}) -> + case emqx_client:start_link([{owner, self()}|options(Options)]) of + {ok, ClientPid} -> + case emqx_client:connect(ClientPid) of + {ok, _} -> + emqx_logger:info("[Bridge] connected to remote successfully"), + Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), + Forwards = subscribe_local_topics(Options), + ReplayQ = replayq:open(#{dir => ReplayqDir, + seg_bytes => ReplayqSegBytes, + sizer => fun(Term) -> + size(term_to_binary(Term)) + end, + marshaller => fun({PktId, Msg}) -> + term_to_binary({PktId, Msg}); + (Bin) -> + binary_to_term(Bin) + end + }), + {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), + {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), + {reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid, + subscriptions = Subs, + readq = NewReadQ, + replayq = NewReplayQ, + ackref = AckRef, + forwards = Forwards}}; + {error, Reason} -> + emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), + {reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}} + end; + {error, Reason} -> + emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]), + {reply, #{msg => <<"start bridge failed">>}, State} + end; + + +handle_call(start_bridge, _From, State = #state{options = Options, + client_pid = undefined, + replayq = ReplayQ, + queue_option = #{batch_size := BatchSize} + }) -> + case emqx_client:start_link([{owner, self()} | options(Options)]) of + {ok, ClientPid} -> + case emqx_client:connect(ClientPid) of + {ok, _} -> + emqx_logger:info("[Bridge] connected to remote ysucessfully"), + Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), + Forwards = subscribe_local_topics(Options), + {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), + {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), + {reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid, + subscriptions = Subs, + readq = NewReadQ, + replayq = NewReplayQ, + ackref = AckRef, + forwards = Forwards}}; + {error, Reason} -> + emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), + {reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}} + end; + {error, Reason} -> + emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]), + {reply, #{msg => <<"start bridge failed">>}, State} + end; handle_call(start_bridge, _From, State) -> {reply, #{msg => <<"bridge already started">>}, State}; @@ -184,46 +258,82 @@ handle_cast(Msg, State) -> emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), {noreply, State}. -%%---------------------------------------------------------------- -%% start message bridge -%%---------------------------------------------------------------- -handle_info(start, State = #state{options = Options, - client_pid = undefined}) -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of +handle_info(restart, State = #state{options = Options, + client_pid = undefined, + replayq = ReplayQ, + queue_option = #{batch_size := BatchSize} + }) -> + case emqx_client:start_link([{owner, self()} | options(Options)]) of {ok, ClientPid} -> case emqx_client:connect(ClientPid) of {ok, _} -> - emqx_logger:info("[Bridge] connected to remote sucessfully"), + emqx_logger:info("[Bridge] connected to remote successfully"), Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), - Forwards = subscribe_local_topics(get_value(forwards, Options, [])), + Forwards = subscribe_local_topics(Options), + {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), + {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, + readq = NewReadQ, + replayq = NewReplayQ, + ackref = AckRef, forwards = Forwards}}; {error, Reason} -> emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), {noreply, State#state{client_pid = ClientPid}} end; {error, Reason} -> - emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]), + emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]), {noreply, State} end; +%%---------------------------------------------------------------- +%% pop message from replayq and publish again +%%---------------------------------------------------------------- +handle_info(pop, State = #state{writeq = WriteQ, replayq = ReplayQ, + queue_option = #{batch_size := BatchSize}}) -> + {NewReplayQ, AckRef, NewReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), + {NewReadQ1, NewWriteQ} = case NewReadQ of + [] -> {WriteQ, []}; + _ -> {NewReadQ, WriteQ} + end, + self() ! replay, + {noreply, State#state{readq = NewReadQ1, writeq = NewWriteQ, replayq = NewReplayQ, ackref = AckRef}}; + +handle_info(dump, State = #state{writeq = WriteQ, replayq = ReplayQ}) -> + NewReplayQueue = replayq:append(ReplayQ, lists:reverse(WriteQ)), + {noreply, State#state{replayq = NewReplayQueue, writeq = []}}; + +%%---------------------------------------------------------------- +%% replay message from replayq +%%---------------------------------------------------------------- +handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) -> + {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), + {noreply, State#state{readq = NewReadQ}}; + %%---------------------------------------------------------------- %% received local node message %%---------------------------------------------------------------- handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue, - mqueue_type = MqueueType, max_pending_messages = MaxPendingMsg}) -> + State = #state{client_pid = undefined, + mountpoint = Mountpoint}) -> + Msg = #mqtt_msg{qos = 1, + retain = Retain, + topic = mountpoint(Mountpoint, Topic), + payload = Payload}, + {noreply, en_writeq({undefined, Msg}, State)}; +handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, + State = #state{client_pid = Pid, mountpoint = Mountpoint}) -> Msg = #mqtt_msg{qos = 1, retain = Retain, topic = mountpoint(Mountpoint, Topic), payload = Payload}, case emqx_client:publish(Pid, Msg) of - {ok, PkgId} -> - {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; - {error, Reason} -> + {ok, PktId} -> + {noreply, en_writeq({PktId, Msg}, State)}; + {error, {PktId, Reason}} -> emqx_logger:error("[Bridge] Publish fail:~p", [Reason]), - {noreply, State} + {noreply, en_writeq({PktId, Msg}, State)} end; %%---------------------------------------------------------------- @@ -239,18 +349,19 @@ handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic %%---------------------------------------------------------------- %% received remote puback message %%---------------------------------------------------------------- -handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueue_type = MqueueType}) -> - % lists:keydelete(PkgId, 1, Queue) - {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}}; +handle_info({puback, #{packet_id := PktId}}, State) -> + {noreply, delete(PktId, State)}; handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> emqx_logger:warning("[Bridge] stop ~p", [normal]), + self() ! dump, {noreply, State#state{client_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, reconnect_interval = ReconnectInterval}) -> emqx_logger:error("[Bridge] stop ~p", [Reason]), - erlang:send_after(ReconnectInterval, self(), start), + self() ! dump, + erlang:send_after(ReconnectInterval, self(), restart), {noreply, State#state{client_pid = undefined}}; handle_info(Info, State) -> @@ -267,8 +378,10 @@ subscribe_remote_topics(ClientPid, Subscriptions) -> [begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})]. -subscribe_local_topics(Topics) -> - [begin emqx_broker:subscribe(bin(Topic)), bin(Topic) end +subscribe_local_topics(Options) -> + Topics = get_value(forwards, Options, []), + Subid = get_value(client_id, Options, <<"bridge">>), + [begin emqx_broker:subscribe(bin(Topic), #{qos => 1, subid => Subid}), bin(Topic) end || Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})]. proto_ver(mqttv3) -> v3; @@ -320,15 +433,35 @@ format_mountpoint(undefined) -> format_mountpoint(Prefix) -> binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). -store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg -> - [Data | Queue]; -store(memory, _Data, Queue, _MaxPendingMsg) -> - logger:error("Beyond max pending messages"), - Queue; -store(disk, Data, Queue, _MaxPendingMsg)-> - [Data | Queue]. -delete(memory, PkgId, Queue) -> - lists:keydelete(PkgId, 1, Queue); -delete(disk, PkgId, Queue) -> - lists:keydelete(PkgId, 1, Queue). +en_writeq(Msg, State = #state{replayq = ReplayQ, + queue_option = #{mem_cache := false}}) -> + NewReplayQ = replayq:append(ReplayQ, [Msg]), + State#state{replayq = NewReplayQ}; +en_writeq(Msg, State = #state{writeq = WriteQ, + queue_option = #{batch_size := BatchSize, + mem_cache := true}}) + when length(WriteQ) < BatchSize-> + State#state{writeq = [Msg | WriteQ]} ; +en_writeq(Msg, State = #state{writeq = WriteQ, replayq = ReplayQ, + queue_option = #{mem_cache := true}}) -> + NewReplayQ =replayq:append(ReplayQ, lists:reverse(WriteQ)), + State#state{writeq = [Msg], replayq = NewReplayQ}. + +publish_readq_msg(_ClientPid, [], ReadQ) -> + {ok, ReadQ}; +publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], ReadQ) -> + io:format("~n replay msg: ~p ~n", [Msg]), + {ok, PktId} = emqx_client:publish(ClientPid, Msg), + publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | ReadQ]). + +delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) -> + ok = replayq:ack(ReplayQ, AckRef), + self() ! pop, + State; + +delete(PktId, State = #state{readq = [], writeq = WriteQ}) -> + State#state{writeq = lists:keydelete(PktId, 1, WriteQ)}; + +delete(PktId, State = #state{readq = ReadQ}) -> + State#state{readq = lists:keydelete(PktId, 1, ReadQ)}. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 2d9cb0a80..6fbb29724 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -199,7 +199,7 @@ start_link() -> start_link([]). start_link(Options) when is_map(Options) -> start_link(maps:to_list(Options)); start_link(Options) when is_list(Options) -> - ok = emqx_mqtt_props:validate( + ok = emqx_mqtt_props:validate( proplists:get_value(properties, Options, #{})), case proplists:get_value(name, Options) of undefined -> @@ -769,7 +769,7 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> case emqx_inflight:is_full(Inflight) of true -> - {keep_state, State, [{reply, From, {error, inflight_full}}]}; + {keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]}; false -> Msg1 = Msg#mqtt_msg{packet_id = PacketId}, case send(Msg1, State) of @@ -777,8 +777,8 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), [{reply, From, {ok, PacketId}}]}; - Error = {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, Error}]} + {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} end end; diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 479c36a89..ea3190a35 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -146,7 +146,7 @@ unload() -> with_loaded_file(File, fun stop_plugins/1) end. -%% stop plugins +%% Stop plugins stop_plugins(Names) -> [stop_app(App) || App <- Names]. @@ -252,7 +252,7 @@ unload_plugin(App, Persistent) -> {error, Reason} -> {error, Reason} end. - + stop_app(App) -> case application:stop(App) of ok -> @@ -329,4 +329,3 @@ write_loaded(AppNames) -> emqx_logger:error("Open File ~p Error: ~p", [File, Error]), {error, Error} end. - diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index 74b815795..15ae00689 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -418,4 +418,3 @@ mapping([{owner, V}|Entries], Acc) when is_pid(V) -> mapping(Entries, [{owner, Owner}|Acc]); mapping([{Key, Value}|Entries], Acc) -> mapping(Entries, [{Key, Value}|Acc]). - diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl index f337e3b4e..bd2894101 100644 --- a/test/emqx_bridge_SUITE.erl +++ b/test/emqx_bridge_SUITE.erl @@ -31,27 +31,28 @@ end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). bridge_test(_) -> - {ok, _Pid} = emqx_bridge:start_link(emqx, []), #{msg := <<"start bridge successfully">>} - = emqx_bridge:start_bridge(emqx), + = emqx_bridge:start_bridge(aws), test_forwards(), test_subscriptions(0), test_subscriptions(1), test_subscriptions(2), #{msg := <<"stop bridge successfully">>} - = emqx_bridge:stop_bridge(emqx), + = emqx_bridge:stop_bridge(aws), ok. test_forwards() -> - emqx_bridge:add_forward(emqx, <<"test_forwards">>), - [<<"test_forwards">>] = emqx_bridge:show_forwards(emqx), - emqx_bridge:del_forward(emqx, <<"test_forwards">>), - [] = emqx_bridge:show_forwards(emqx), + emqx_bridge:add_forward(aws, <<"test_forwards">>), + [<<"test_forwards">>, <<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws), + emqx_bridge:del_forward(aws, <<"test_forwards">>), + [<<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws), ok. test_subscriptions(QoS) -> - emqx_bridge:add_subscription(emqx, <<"test_subscriptions">>, QoS), - [{<<"test_subscriptions">>, QoS}] = emqx_bridge:show_subscriptions(emqx), - emqx_bridge:del_subscription(emqx, <<"test_subscriptions">>), - [] = emqx_bridge:show_subscriptions(emqx), + emqx_bridge:add_subscription(aws, <<"test_subscriptions">>, QoS), + [{<<"test_subscriptions">>, QoS}, + {<<"cmd/topic1">>, 1}, + {<<"cmd/topic2">>, 1}] = emqx_bridge:show_subscriptions(aws), + emqx_bridge:del_subscription(aws, <<"test_subscriptions">>), + [{<<"cmd/topic1">>,1}, {<<"cmd/topic2">>,1}] = emqx_bridge:show_subscriptions(aws), ok.