diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index f4c9ac74b..5762c6f42 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -6,7 +6,6 @@ bridges.mqtt.my_mqtt_bridge_to_aws { server = "127.0.0.1:1883" proto_ver = "v4" - clientid = "my_mqtt_bridge_to_aws" username = "username1" password = "" clean_start = true @@ -19,7 +18,6 @@ bridges.mqtt.my_mqtt_bridge_to_aws { dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" seg_bytes = "100MB" offload = false - max_total_bytes = "1GB" } ssl { enable = false diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 06f1ba6c9..7db5882b2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -221,15 +221,19 @@ get_matched_bridges(Topic) -> Bridges = emqx:get_config([bridges], #{}), maps:fold(fun (BType, Conf, Acc0) -> maps:fold(fun - (BName, #{from_local_topic := Filter}, Acc1) -> - case emqx_topic:match(Topic, Filter) of - true -> [bridge_id(BType, BName) | Acc1]; - false -> Acc1 - end; - (_Name, _BridgeConf, Acc1) -> Acc1 + (BName, #{egress := Egress}, Acc1) -> + get_matched_bridge_id(Egress, Topic, BType, BName, Acc1); + (BName, BridgeConf, Acc1) -> + get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1) end, Acc0, Conf) end, [], Bridges). +get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) -> + case emqx_topic:match(Topic, Filter) of + true -> [bridge_id(BType, BName) | Acc]; + false -> Acc + end. + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index c13c8629c..c0068f34c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -74,8 +74,9 @@ load_bridges(Configs) -> %% TODO: move this monitor into emqx_resource %% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}). load_bridge(<<"http">>, Name, Config) -> - Config1 = parse_http_confs(Config), - do_load_bridge(<<"http">>, Name, Config1). + do_load_bridge(<<"http">>, Name, parse_http_confs(Config)); +load_bridge(Type, Name, Config) -> + do_load_bridge(Type, Name, Config). do_load_bridge(Type, Name, Config) -> case emqx_resource:check_and_create_local( diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index f98a5fa77..46aa13453 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -46,7 +46,7 @@ %%===================================================================== %% Hocon schema roots() -> - [{config, #{type => hoconsc:ref(?MODULE, "config")}}]. + fields("config"). fields("config") -> emqx_connector_mqtt_schema:fields("config"). @@ -89,109 +89,62 @@ drop_bridge(Name) -> %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called %% if the bridge received msgs from the remote broker. -on_message_received(Msg, ChannId) -> - Name = atom_to_binary(ChannId, utf8), +on_message_received(Msg, BridgeId) -> + Name = atom_to_binary(BridgeId, utf8), emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]). %% =================================================================== on_start(InstId, Conf) -> - ?SLOG(info, #{msg => "starting mqtt connector", - connector => InstId, config => Conf}), "bridge:" ++ NamePrefix = binary_to_list(InstId), + BridgeId = list_to_atom(NamePrefix), + ?SLOG(info, #{msg => "starting mqtt connector", + connector => BridgeId, config => Conf}), BasicConf = basic_config(Conf), - InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, - InOutConfigs = taged_map_list(ingress, maps:get(ingress, Conf, #{})) - ++ taged_map_list(egress, maps:get(egress, Conf, #{})), - lists:foldl(fun - (_InOutConf, {error, Reason}) -> - {error, Reason}; - (InOutConf, {ok, #{channels := SubBridges} = Res}) -> - case create_channel(InOutConf, NamePrefix, BasicConf) of - {error, Reason} -> {error, Reason}; - {ok, Name} -> {ok, Res#{channels => [Name | SubBridges]}} - end - end, InitRes, InOutConfigs). - -on_stop(InstId, #{channels := NameList}) -> - ?SLOG(info, #{msg => "stopping mqtt connector", - connector => InstId}), - lists:foreach(fun(Name) -> - remove_channel(Name) - end, NameList). - -%% TODO: let the emqx_resource trigger on_query/4 automatically according to the -%% `ingress` and `egress` config -on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, - baisc_conf := BasicConf}) -> - create_channel(Conf, Prefix, BasicConf); -on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) -> - ?SLOG(debug, #{msg => "send msg to remote node", message => Msg, - channel_id => ChannelId}), - emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg). - -on_health_check(_InstId, #{channels := NameList} = State) -> - Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList], - case lists:all(fun({_, pong}) -> true; ({_, _}) -> false end, Results) of - true -> {ok, State}; - false -> {error, {some_channel_down, Results}, State} - end. - -create_channel({{ingress, Id}, #{from_remote_topic := RemoteT} = Conf}, - NamePrefix, BasicConf) -> - LocalT = maps:get(to_local_topic, Conf, undefined), - ChannId = ingress_channel_id(NamePrefix, Id), - ?SLOG(info, #{msg => "creating ingress channel", - to_remote_topic => RemoteT, - to_local_topic => LocalT, - channel_id => ChannId}), - do_create_channel(BasicConf#{ - name => ChannId, - clientid => clientid(ChannId), - subscriptions => Conf#{ - to_local_topic => LocalT, - on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]} + SubRemoteConf = maps:get(ingress, Conf, #{}), + FrowardConf = maps:get(egress, Conf, #{}), + BridgeConf = BasicConf#{ + name => BridgeId, + clientid => clientid(BridgeId), + subscriptions => SubRemoteConf#{ + to_local_topic => maps:get(to_local_topic, SubRemoteConf, undefined), + on_message_received => {fun ?MODULE:on_message_received/2, [BridgeId]} }, - forwards => undefined}); - -create_channel({{egress, Id}, #{to_remote_topic := RemoteT} = Conf}, - NamePrefix, BasicConf) -> - LocalT = maps:get(from_local_topic, Conf, undefined), - ChannId = egress_channel_id(NamePrefix, Id), - ?SLOG(info, #{msg => "creating egress channel", - to_remote_topic => RemoteT, - to_local_topic => LocalT, - channel_id => ChannId}), - do_create_channel(BasicConf#{ - name => ChannId, - clientid => clientid(ChannId), - subscriptions => undefined, - forwards => Conf#{from_local_topic => LocalT}}). - -remove_channel(ChannId) -> - ?SLOG(info, #{msg => "removing channel", - channel_id => ChannId}), - case ?MODULE:drop_bridge(ChannId) of - ok -> ok; - {error, not_found} -> ok; - {error, Reason} -> - ?SLOG(error, #{msg => "stop channel failed", - channel_id => ChannId, reason => Reason}) - end. - -do_create_channel(#{name := Name} = Conf) -> - case ?MODULE:create_bridge(Conf) of + forwards => FrowardConf#{ + from_local_topic => maps:get(from_local_topic, FrowardConf, undefined) + } + }, + case ?MODULE:create_bridge(BridgeConf) of {ok, _Pid} -> - start_channel(Name); + case emqx_connector_mqtt_worker:ensure_started(BridgeId) of + ok -> {ok, #{name => BridgeId}}; + {error, Reason} -> {error, Reason} + end; {error, {already_started, _Pid}} -> - {ok, Name}; + {ok, #{name => BridgeId}}; {error, Reason} -> {error, Reason} end. -start_channel(Name) -> - case emqx_connector_mqtt_worker:ensure_started(Name) of - ok -> {ok, Name}; - {error, Reason} -> {error, Reason} +on_stop(_InstId, #{name := BridgeId}) -> + ?SLOG(info, #{msg => "stopping mqtt connector", + connector => BridgeId}), + case ?MODULE:drop_bridge(BridgeId) of + ok -> ok; + {error, not_found} -> ok; + {error, Reason} -> + ?SLOG(error, #{msg => "stop mqtt connector", + connector => BridgeId, reason => Reason}) + end. + +on_query(_InstId, {send_message, BridgeId, Msg}, _AfterQuery, _State) -> + ?SLOG(debug, #{msg => "send msg to remote node", message => Msg, + connector => BridgeId}), + emqx_connector_mqtt_worker:send_to_remote(BridgeId, Msg). + +on_health_check(_InstId, #{name := BridgeId} = State) -> + case emqx_connector_mqtt_worker:ping(BridgeId) of + pong -> {ok, State}; + _ -> {error, {connector_down, BridgeId}, State} end. basic_config(#{ @@ -225,19 +178,8 @@ basic_config(#{ if_record_metrics => true }. -taged_map_list(Tag, Map) -> - [{{Tag, K}, V} || {K, V} <- maps:to_list(Map)]. - -ingress_channel_id(Prefix, Id) -> - channel_name("ingress", Prefix, Id). -egress_channel_id(Prefix, Id) -> - channel_name("egress", Prefix, Id). - -channel_name(Type, Prefix, Id) -> - list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)). - clientid(Id) -> - list_to_binary(str(Id) ++ ":" ++ emqx_misc:gen_id(8)). + list_to_binary(lists:concat([str(Id), ":", node()])). str(A) when is_atom(A) -> atom_to_list(A); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index d53716ced..5a52dc613 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -92,7 +92,7 @@ the rule. """ })} , {egress, - sc(hoconsc:map(id, ref("egress")), + sc(ref("egress"), #{ default => #{} , desc => """ The egress config defines how this bridge forwards messages from the local broker to the remote diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 3ab829218..4027ee898 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -101,14 +101,12 @@ -export([msg_marshaller/1]). -export_type([ config/0 - , batch/0 , ack_ref/0 ]). -type id() :: atom() | string() | pid(). -type qos() :: emqx_types:qos(). -type config() :: map(). --type batch() :: [emqx_connector_mqtt_msg:exp_msg()]. -type ack_ref() :: term(). -type topic() :: emqx_types:topic(). @@ -117,7 +115,7 @@ %% same as default in-flight limit for emqtt --define(DEFAULT_BATCH_SIZE, 32). +-define(DEFAULT_INFLIGHT_SIZE, 32). -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). -define(DEFAULT_MAX_TOTAL_SIZE, (1 bsl 31)). @@ -205,12 +203,10 @@ init_state(Opts) -> ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS), StartType = maps:get(start_type, Opts, manual), Mountpoint = maps:get(forward_mountpoint, Opts, undefined), - MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_BATCH_SIZE), - BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), + MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_INFLIGHT_SIZE), Name = maps:get(name, Opts, undefined), #{start_type => StartType, reconnect_interval => ReconnDelayMs, - batch_size => BatchSize, mountpoint => format_mountpoint(Mountpoint), inflight => [], max_inflight => MaxInflightSize, @@ -327,10 +323,6 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F {keep_state_and_data, [{reply, From, Forwards}]}; common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; -common(_StateName, info, {deliver, _, Msg}, State = #{replayq := Q}) -> - Msgs = collect([Msg]), - NewQ = replayq:append(Q, Msgs), - {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) -> @@ -342,13 +334,9 @@ common(StateName, Type, Content, #{name := Name} = State) -> content => Content}), {keep_state, State}. -do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards}, +do_connect(#{connect_opts := ConnectOpts, inflight := Inflight, name := Name} = State) -> - case Forwards of - undefined -> ok; - #{from_local_topic := Topic} -> from_local_topic(Topic, Name) - end, case emqx_connector_mqtt_mod:start(ConnectOpts) of {ok, Conn} -> ?tp(info, connected, #{name => Name, inflight => length(Inflight)}), @@ -360,19 +348,10 @@ do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards}, {error, Reason, State} end. -collect(Acc) -> - receive - {deliver, _, Msg} -> - collect([Msg | Acc]) - after - 0 -> - lists:reverse(Acc) - end. - %% Retry all inflight (previously sent but not acked) batches. retry_inflight(State, []) -> {ok, State}; -retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf) -> - case do_send(State, QAckRef, Batch) of +retry_inflight(State, [#{q_ack_ref := QAckRef, msg := Msg} | Rest] = OldInf) -> + case do_send(State, QAckRef, Msg) of {ok, State1} -> retry_inflight(State1, Rest); {error, #{inflight := NewInf} = State1} -> @@ -393,34 +372,33 @@ pop_and_send_loop(#{replayq := Q} = State, N) -> false -> BatchSize = 1, Opts = #{count_limit => BatchSize, bytes_limit => 999999999}, - {Q1, QAckRef, Batch} = replayq:pop(Q, Opts), - case do_send(State#{replayq := Q1}, QAckRef, Batch) of + {Q1, QAckRef, [Msg]} = replayq:pop(Q, Opts), + case do_send(State#{replayq := Q1}, QAckRef, Msg) of {ok, NewState} -> pop_and_send_loop(NewState, N - 1); {error, NewState} -> {error, NewState} end end. -%% Assert non-empty batch because we have a is_empty check earlier. -do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) -> +do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) -> ?SLOG(error, #{msg => "cannot forward messages to remote broker" - " as egress_channel is not configured", - messages => Batch}); + " as forwards is not configured", + messages => Msg}); do_send(#{inflight := Inflight, connection := Connection, mountpoint := Mountpoint, - connect_opts := #{forwards := Forwards}} = State, QAckRef, [_ | _] = Batch) -> + connect_opts := #{forwards := Forwards}} = State, QAckRef, Msg) -> Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), ExportMsg = fun(Message) -> emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'), emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) end, ?SLOG(debug, #{msg => "publish to remote broker", - message => Batch, vars => Vars}), - case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of + message => Msg, vars => Vars}), + case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of {ok, Refs} -> {ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef, send_ack_ref => map_set(Refs), - batch => Batch}]}}; + msg => Msg}]}}; {error, Reason} -> ?SLOG(info, #{msg => "mqtt_bridge_produce_failed", reason => Reason}), @@ -473,27 +451,6 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs, All end. -from_local_topic(undefined, _Name) -> - ok; -from_local_topic(Topic, Name) -> - do_subscribe(Topic, Name). - -topic(T) -> iolist_to_binary(T). - -validate(RawTopic) -> - Topic = topic(RawTopic), - try emqx_topic:validate(Topic) of - _Success -> Topic - catch - error:Reason -> - error({bad_topic, Topic, Reason}) - end. - -do_subscribe(RawTopic, Name) -> - TopicFilter = validate(RawTopic), - {Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}), - emqx_broker:subscribe(Topic, Name, SubOpts). - disconnect(#{connection := Conn} = State) when Conn =/= undefined -> emqx_connector_mqtt_mod:stop(Conn), State#{connection => undefined};