From ca7ad9cc15bb2612be568ce4f79a266933f2cce0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 23 Aug 2022 18:17:32 +0800 Subject: [PATCH 1/5] chore: refactor mqtt connector --- .../src/emqx_connector_mqtt.erl | 11 + .../src/mqtt/emqx_connector_mqtt_mod.erl | 44 +--- .../src/mqtt/emqx_connector_mqtt_worker.erl | 229 +++++------------- 3 files changed, 82 insertions(+), 202 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index bdf43885a..59eb3f0fb 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -39,6 +39,7 @@ on_start/2, on_stop/2, on_query/3, + on_query_async/4, on_get_status/2 ]). @@ -190,6 +191,16 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), ok. +on_query_async( + _InstId, + {send_message, Msg}, + {ReplayFun, Args}, + #{name := InstanceId} +) -> + ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}), + ok. + on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> AutoReconn = maps:get(auto_reconnect, Conf, true), case emqx_connector_mqtt_worker:status(InstanceId) of diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 372405b59..7571c59b8 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -21,6 +21,7 @@ -export([ start/1, send/2, + send_async/3, stop/1, ping/1 ]). @@ -32,7 +33,6 @@ %% callbacks for emqtt -export([ - handle_puback/2, handle_publish/3, handle_disconnected/2 ]). @@ -134,44 +134,11 @@ safe_stop(Pid, StopF, Timeout) -> exit(Pid, kill) end. -send(Conn, Msgs) -> - send(Conn, Msgs, []). +send(#{client_pid := ClientPid}, Msg) -> + emqtt:publish(ClientPid, Msg). -send(_Conn, [], []) -> - %% all messages in the batch are QoS-0 - Ref = make_ref(), - %% QoS-0 messages do not have packet ID - %% the batch ack is simulated with a loop-back message - self() ! {batch_ack, Ref}, - {ok, Ref}; -send(_Conn, [], PktIds) -> - %% PktIds is not an empty list if there is any non-QoS-0 message in the batch, - %% And the worker should wait for all acks - {ok, PktIds}; -send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) -> - case emqtt:publish(ClientPid, Msg) of - ok -> - send(Conn, Rest, PktIds); - {ok, PktId} -> - send(Conn, Rest, [PktId | PktIds]); - {error, Reason} -> - %% NOTE: There is no partial success of a batch and recover from the middle - %% only to retry all messages in one batch - {error, Reason} - end. - -handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) when - RC =:= ?RC_SUCCESS; - RC =:= ?RC_NO_MATCHING_SUBSCRIBERS --> - Parent ! {batch_ack, PktId}, - ok; -handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> - ?SLOG(warning, #{ - msg => "publish_to_remote_node_falied", - packet_id => PktId, - reason_code => RC - }). +send_async(#{client_pid := ClientPid}, Msg, Callback) -> + emqtt:publish_async(ClientPid, Msg, Callback). handle_publish(Msg, undefined, _Opts) -> ?SLOG(error, #{ @@ -200,7 +167,6 @@ handle_disconnected(Reason, Parent) -> make_hdlr(Parent, Vars, Opts) -> #{ - puback => {fun ?MODULE:handle_puback/2, [Parent]}, publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} }. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 3f3a4b9ce..5e4fa8f72 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -91,16 +91,14 @@ ensure_stopped/1, status/1, ping/1, - send_to_remote/2 + send_to_remote/2, + send_to_remote_async/3 ]). -export([get_forwards/1]). -export([get_subscriptions/1]). -%% Internal --export([msg_marshaller/1]). - -export_type([ config/0, ack_ref/0 @@ -133,12 +131,6 @@ %% mountpoint: The topic mount point for messages sent to remote node/cluster %% `undefined', `<<>>' or `""' to disable %% forwards: Local topics to subscribe. -%% replayq.batch_bytes_limit: Max number of bytes to collect in a batch for each -%% send call towards emqx_bridge_connect -%% replayq.batch_count_limit: Max number of messages to collect in a batch for -%% each send call towards emqx_bridge_connect -%% replayq.dir: Directory where replayq should persist messages -%% replayq.seg_bytes: Size in bytes for each replayq segment file %% %% Find more connection specific configs in the callback modules %% of emqx_bridge_connect behaviour. @@ -173,9 +165,14 @@ ping(Name) -> gen_statem:call(name(Name), ping). send_to_remote(Pid, Msg) when is_pid(Pid) -> - gen_statem:cast(Pid, {send_to_remote, Msg}); + gen_statem:call(Pid, {send_to_remote, Msg}); send_to_remote(Name, Msg) -> - gen_statem:cast(name(Name), {send_to_remote, Msg}). + gen_statem:call(name(Name), {send_to_remote, Msg}). + +send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) -> + gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback}); +send_to_remote_async(Name, Msg, Callback) -> + gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}). %% @doc Return all forwards (local subscriptions). -spec get_forwards(id()) -> [topic()]. @@ -194,12 +191,10 @@ init(#{name := Name} = ConnectOpts) -> name => Name }), erlang:process_flag(trap_exit, true), - Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), State = init_state(ConnectOpts), self() ! idle, {ok, idle, State#{ - connect_opts => pre_process_opts(ConnectOpts), - replayq => Queue + connect_opts => pre_process_opts(ConnectOpts) }}. init_state(Opts) -> @@ -212,32 +207,11 @@ init_state(Opts) -> start_type => StartType, reconnect_interval => ReconnDelayMs, mountpoint => format_mountpoint(Mountpoint), - inflight => [], max_inflight => MaxInflightSize, connection => undefined, name => Name }. -open_replayq(Name, QCfg) -> - Dir = maps:get(dir, QCfg, undefined), - SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES), - MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE), - QueueConfig = - case Dir =:= undefined orelse Dir =:= "" of - true -> - #{mem_only => true}; - false -> - #{ - dir => filename:join([Dir, node(), Name]), - seg_bytes => SegBytes, - max_total_size => MaxTotalSize - } - end, - replayq:open(QueueConfig#{ - sizer => fun emqx_connector_mqtt_msg:estimate_size/1, - marshaller => fun ?MODULE:msg_marshaller/1 - }). - pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> ConnectOpts#{ subscriptions => pre_process_in_out(in, InConf), @@ -276,9 +250,8 @@ pre_process_conf(Key, Conf) -> code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. -terminate(_Reason, _StateName, #{replayq := Q} = State) -> +terminate(_Reason, _StateName, State) -> _ = disconnect(State), - _ = replayq:close(Q), maybe_destroy_session(State). maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) -> @@ -322,15 +295,18 @@ connecting(#{reconnect_interval := ReconnectDelayMs} = State) -> {keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}} end. -connected(state_timeout, connected, #{inflight := Inflight} = State) -> - case retry_inflight(State#{inflight := []}, Inflight) of - {ok, NewState} -> - {keep_state, NewState, {next_event, internal, maybe_send}}; - {error, NewState} -> - {keep_state, NewState} +connected(state_timeout, connected, State) -> + %% nothing to do + {keep_state, State}; +connected({call, From}, {send_to_remote, Msg}, State) -> + case do_send(State, Msg) of + {ok, NState} -> + {keep_state, NState, [{reply, From, ok}]}; + {error, Reason} -> + {keep_state_and_data, [[reply, From, {error, Reason}]]} end; -connected(internal, maybe_send, State) -> - {_, NewState} = pop_and_send(State), +connected(cast, {send_to_remote_async, Msg, Callback}, State) -> + {_, NewState} = do_send_async(State, Msg, Callback), {keep_state, NewState}; connected( info, @@ -345,9 +321,6 @@ connected( false -> keep_state_and_data end; -connected(info, {batch_ack, Ref}, State) -> - NewState = handle_batch_ack(State, Ref), - {keep_state, NewState, {next_event, internal, maybe_send}}; connected(Type, Content, State) -> common(connected, Type, Content, State). @@ -368,9 +341,6 @@ common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; -common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) -> - NewQ = replayq:append(Q, [Msg]), - {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(StateName, Type, Content, #{name := Name} = State) -> ?SLOG(notice, #{ msg => "bridge_discarded_event", @@ -384,13 +354,12 @@ common(StateName, Type, Content, #{name := Name} = State) -> do_connect( #{ connect_opts := ConnectOpts, - inflight := Inflight, name := Name } = State ) -> case emqx_connector_mqtt_mod:start(ConnectOpts) of {ok, Conn} -> - ?tp(info, connected, #{name => Name, inflight => length(Inflight)}), + ?tp(info, connected, #{name => Name}), {ok, State#{connection => Conn}}; {error, Reason} -> ConnectOpts1 = obfuscate(ConnectOpts), @@ -402,39 +371,7 @@ do_connect( {error, Reason, State} end. -%% Retry all inflight (previously sent but not acked) batches. -retry_inflight(State, []) -> - {ok, State}; -retry_inflight(State, [#{q_ack_ref := QAckRef, msg := Msg} | Rest] = OldInf) -> - case do_send(State, QAckRef, Msg) of - {ok, State1} -> - retry_inflight(State1, Rest); - {error, #{inflight := NewInf} = State1} -> - {error, State1#{inflight := NewInf ++ OldInf}} - end. - -pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) -> - pop_and_send_loop(State, Max - length(Inflight)). - -pop_and_send_loop(State, 0) -> - ?tp(debug, inflight_full, #{}), - {ok, State}; -pop_and_send_loop(#{replayq := Q} = State, N) -> - case replayq:is_empty(Q) of - true -> - ?tp(debug, replayq_drained, #{}), - {ok, State}; - false -> - BatchSize = 1, - Opts = #{count_limit => BatchSize, bytes_limit => 999999999}, - {Q1, QAckRef, [Msg]} = replayq:pop(Q, Opts), - case do_send(State#{replayq := Q1}, QAckRef, Msg) of - {ok, NewState} -> pop_and_send_loop(NewState, N - 1); - {error, NewState} -> {error, NewState} - end - end. - -do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) -> +do_send(#{connect_opts := #{forwards := undefined}}, Msg) -> ?SLOG(error, #{ msg => "cannot_forward_messages_to_remote_broker" @@ -443,98 +380,68 @@ do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) -> }); do_send( #{ - inflight := Inflight, connection := Connection, mountpoint := Mountpoint, connect_opts := #{forwards := Forwards} } = State, - QAckRef, Msg ) -> Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), - ExportMsg = fun(Message) -> - emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) - end, + ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars), ?SLOG(debug, #{ msg => "publish_to_remote_broker", message => Msg, vars => Vars }), - case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of - {ok, Refs} -> - {ok, State#{ - inflight := Inflight ++ - [ - #{ - q_ack_ref => QAckRef, - send_ack_ref => map_set(Refs), - msg => Msg - } - ] - }}; + case emqx_connector_mqtt_mod:send(Connection, ExportMsg) of + ok -> + {ok, State}; + {ok, #{reason_code := RC}} when + RC =:= ?RC_SUCCESS; + RC =:= ?RC_NO_MATCHING_SUBSCRIBERS + -> + {ok, State}; + {ok, #{reason_code := RC, reason_code_name := RCN}} -> + ?SLOG(warning, #{ + msg => "publish_to_remote_node_falied", + message => Msg, + reason_code => RC, + reason_code_name => RCN + }), + {error, RCN}; {error, Reason} -> ?SLOG(info, #{ msg => "mqtt_bridge_produce_failed", reason => Reason }), - {error, State} + {error, Reason} end. -%% map as set, ack-reference -> 1 -map_set(Ref) when is_reference(Ref) -> - %% QoS-0 or RPC call returns a reference - map_set([Ref]); -map_set(List) -> - map_set(List, #{}). - -map_set([], Set) -> Set; -map_set([H | T], Set) -> map_set(T, Set#{H => 1}). - -handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) -> - Inflight1 = do_ack(Inflight0, Ref), - Inflight = drop_acked_batches(Q, Inflight1), - State#{inflight := Inflight}. - -do_ack([], Ref) -> - ?SLOG(debug, #{ - msg => "stale_batch_ack_reference", - ref => Ref - }), - []; -do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) -> - case maps:is_key(Ref, Refs) of - true -> - NewRefs = maps:without([Ref], Refs), - [First#{send_ack_ref := NewRefs} | Rest]; - false -> - [First | do_ack(Rest, Ref)] - end. - -%% Drop the consecutive header of the inflight list having empty send_ack_ref -drop_acked_batches(_Q, []) -> - ?tp(debug, inflight_drained, #{}), - []; -drop_acked_batches( - Q, - [ - #{ - send_ack_ref := Refs, - q_ack_ref := QAckRef - } - | Rest - ] = All +do_send_async(#{connect_opts := #{forwards := undefined}}, Msg, _Callback) -> + %% TODO: eval callback with undefined error + ?SLOG(error, #{ + msg => + "cannot_forward_messages_to_remote_broker" + "_as_'egress'_is_not_configured", + messages => Msg + }); +do_send_async( + #{ + connection := Connection, + mountpoint := Mountpoint, + connect_opts := #{forwards := Forwards} + }, + Msg, + Callback ) -> - case maps:size(Refs) of - 0 -> - %% all messages are acked by bridge target - %% now it's safe to ack replayq (delete from disk) - ok = replayq:ack(Q, QAckRef), - %% continue to check more sent batches - drop_acked_batches(Q, Rest); - _ -> - %% the head (oldest) inflight batch is not acked, keep waiting - All - end. + Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), + ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars), + ?SLOG(debug, #{ + msg => "publish_to_remote_broker", + message => Msg, + vars => Vars + }), + emqx_connector_mqtt_mod:send_async(Connection, ExportMsg, Callback). disconnect(#{connection := Conn} = State) when Conn =/= undefined -> emqx_connector_mqtt_mod:stop(Conn), @@ -542,10 +449,6 @@ disconnect(#{connection := Conn} = State) when Conn =/= undefined -> disconnect(State) -> State. -%% Called only when replayq needs to dump it to disk. -msg_marshaller(Bin) when is_binary(Bin) -> emqx_connector_mqtt_msg:from_binary(Bin); -msg_marshaller(Msg) -> emqx_connector_mqtt_msg:to_binary(Msg). - format_mountpoint(undefined) -> undefined; format_mountpoint(Prefix) -> From 0aa10702db157c03e23edd53b7d741ed3b8dcf9e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 24 Aug 2022 15:53:21 +0800 Subject: [PATCH 2/5] feat(bridge): support async mode resource options --- apps/emqx_bridge/src/emqx_bridge_api.erl | 8 ++++ .../src/schema/emqx_bridge_mqtt_schema.erl | 45 ++++++++++--------- .../src/emqx_connector_mqtt.erl | 2 +- .../src/mqtt/emqx_connector_mqtt_mod.erl | 2 +- .../src/mqtt/emqx_connector_mqtt_worker.erl | 4 +- 5 files changed, 36 insertions(+), 25 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 36f8cf0f5..848266b43 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(hoconsc, [mk/2, array/1, enum/1]). @@ -237,6 +238,13 @@ mqtt_main_example() -> keepalive => <<"300s">>, retry_interval => <<"15s">>, max_inflight => 100, + resource_opts => #{ + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + query_mode => sync, + enable_queue => false, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + }, ssl => #{ enable => false } diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl index 973fc8192..8fb6af65c 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl @@ -12,16 +12,29 @@ namespace() -> "bridge_mqtt". roots() -> []. + fields("config") -> + %% enable emqx_bridge_schema:common_bridge_fields() ++ + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ emqx_connector_mqtt_schema:fields("config"); +fields("creation_opts") -> + Opts = emqx_resource_schema:fields("creation_opts"), + [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; fields("post") -> - [ - type_field(), - name_field() - ] ++ emqx_connector_mqtt_schema:fields("config"); + [type_field(), name_field() | fields("config")]; fields("put") -> - emqx_connector_mqtt_schema:fields("config"); + fields("config"); fields("get") -> emqx_bridge_schema:metrics_status_fields() ++ fields("config"). @@ -31,22 +44,12 @@ desc(_) -> undefined. %%====================================================================================== +%% internal +is_hidden_opts(Field) -> + lists:member(Field, [enable_batch, batch_size, batch_time]). + type_field() -> - {type, - mk( - mqtt, - #{ - required => true, - desc => ?DESC("desc_type") - } - )}. + {type, mk(mqtt, #{required => true, desc => ?DESC("desc_type")})}. name_field() -> - {name, - mk( - binary(), - #{ - required => true, - desc => ?DESC("desc_name") - } - )}. + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 59eb3f0fb..0b9f7c85c 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -141,7 +141,7 @@ on_message_received(Msg, HookPoint, ResId) -> emqx:run_hook(HookPoint, [Msg]). %% =================================================================== -callback_mode() -> always_sync. +callback_mode() -> async_if_possible. on_start(InstId, Conf) -> InstanceId = binary_to_atom(InstId, utf8), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 7571c59b8..f1ecbf68c 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -138,7 +138,7 @@ send(#{client_pid := ClientPid}, Msg) -> emqtt:publish(ClientPid, Msg). send_async(#{client_pid := ClientPid}, Msg, Callback) -> - emqtt:publish_async(ClientPid, Msg, Callback). + emqtt:publish_async(ClientPid, Msg, infinity, Callback). handle_publish(Msg, undefined, _Opts) -> ?SLOG(error, #{ diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 5e4fa8f72..618361ad3 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -306,8 +306,8 @@ connected({call, From}, {send_to_remote, Msg}, State) -> {keep_state_and_data, [[reply, From, {error, Reason}]]} end; connected(cast, {send_to_remote_async, Msg, Callback}, State) -> - {_, NewState} = do_send_async(State, Msg, Callback), - {keep_state, NewState}; + _ = do_send_async(State, Msg, Callback), + {keep_state, State}; connected( info, {disconnected, Conn, Reason}, From a6eff81163916faf409fcee3c680edd1e7a7ad01 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 24 Aug 2022 15:54:06 +0800 Subject: [PATCH 3/5] chore: update emqtt to 1.7.0-rc.1 --- apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl | 2 ++ apps/emqx_connector/rebar.config | 3 +-- mix.exs | 2 +- rebar.config | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl index 8fb6af65c..6d2baaaa8 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl @@ -40,6 +40,8 @@ fields("get") -> desc("config") -> ?DESC("config"); +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index 0ac2d0da8..4d0b53e9a 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -20,8 +20,7 @@ %% By accident, We have always been using the upstream fork due to %% eredis_cluster's dependency getting resolved earlier. %% Here we pin 1.5.2 to avoid surprises in the future. - {poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}} + {poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}} ]}. {shell, [ diff --git a/mix.exs b/mix.exs index 3a15b36b4..1a4227900 100644 --- a/mix.exs +++ b/mix.exs @@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true}, {:replayq, "0.3.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, - {:emqtt, github: "emqx/emqtt", tag: "1.6.0", override: true}, + {:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.1", override: true}, {:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, diff --git a/rebar.config b/rebar.config index 8370278f1..59f2e9101 100644 --- a/rebar.config +++ b/rebar.config @@ -61,7 +61,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.4"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.1"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} From c1afb34a865e35b48b821dc692f10b7295d7f1e2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 1 Sep 2022 11:28:38 +0800 Subject: [PATCH 4/5] test: fix failed tests --- apps/emqx/test/emqx_mqtt_SUITE.erl | 4 ++-- apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 10 ++++++++-- apps/emqx_bridge/src/emqx_bridge_api.erl | 7 +++---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/apps/emqx/test/emqx_mqtt_SUITE.erl b/apps/emqx/test/emqx_mqtt_SUITE.erl index f43804991..7032e553c 100644 --- a/apps/emqx/test/emqx_mqtt_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_SUITE.erl @@ -115,7 +115,7 @@ message_expiry_interval_init() -> message_expiry_interval_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a and waiting for the message expired - emqtt:publish( + _ = emqtt:publish( CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, @@ -152,7 +152,7 @@ message_expiry_interval_exipred(CPublish, CControl, QoS) -> message_expiry_interval_not_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a - emqtt:publish( + _ = emqtt:publish( CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 88ccda452..cb71cef95 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -529,8 +529,11 @@ t_connack_max_qos_allowed(Config) -> %% [MQTT-3.2.2-10] {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), - {ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1), %% [MQTT-3.2.2-11] + ?assertMatch( + {error, {disconnected, 155, _}}, + emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1) + ), ?assertEqual(155, receive_disconnect_reasoncode()), waiting_client_process_exit(Client1), @@ -563,8 +566,11 @@ t_connack_max_qos_allowed(Config) -> %% [MQTT-3.2.2-10] {ok, _, [2]} = emqtt:subscribe(Client3, Topic, 2), - {ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2), %% [MQTT-3.2.2-11] + ?assertMatch( + {error, {disconnected, 155, _}}, + emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2) + ), ?assertEqual(155, receive_disconnect_reasoncode()), waiting_client_process_exit(Client3), diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a8bc207d6..bd892edcd 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -20,7 +20,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_bridge/include/emqx_bridge.hrl"). -import(hoconsc, [mk/2, array/1, enum/1]). @@ -232,11 +231,11 @@ mqtt_main_example() -> retry_interval => <<"15s">>, max_inflight => 100, resource_opts => #{ - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + health_check_interval => <<"15s">>, + auto_restart_interval => <<"60s">>, query_mode => sync, enable_queue => false, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_queue_bytes => 100 * 1024 * 1024 }, ssl => #{ enable => false From dda0b4ac8cb98c1a9c5ecd4165bfa3a41548cefd Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 1 Sep 2022 12:35:37 +0800 Subject: [PATCH 5/5] chore: update emqtt vsn --- apps/emqx/rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 5b5b3db39..8e0d1f71c 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -43,7 +43,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.1"}}} ]}, {extra_src_dirs, [{"test", [recursive]}]} ]}