diff --git a/.ci/docker-compose-file/docker-compose-ldap.yaml b/.ci/docker-compose-file/docker-compose-ldap.yaml index 6804f6b01..339db8d85 100644 --- a/.ci/docker-compose-file/docker-compose-ldap.yaml +++ b/.ci/docker-compose-file/docker-compose-ldap.yaml @@ -10,7 +10,7 @@ services: nofile: 1024 image: openldap #ports: - # - 389:389 + # - "389:389" volumes: - ./certs/ca.crt:/etc/certs/ca.crt restart: always diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index aa505ff37..45a96be9a 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -122,9 +122,10 @@ jobs: run: | ls -lR _packages/$PROFILE mv _packages/$PROFILE/*.tar.gz ./ + - name: Enable containerd image store on Docker Engine run: | - echo "$(jq '. += {"features": {"containerd-snapshotter": true}}' /etc/docker/daemon.json)" > daemon.json + echo "$(sudo cat /etc/docker/daemon.json | jq '. += {"features": {"containerd-snapshotter": true}}')" > daemon.json sudo mv daemon.json /etc/docker/daemon.json sudo systemctl restart docker diff --git a/.github/workflows/build_packages_cron.yaml b/.github/workflows/build_packages_cron.yaml index 26ba3b1ec..69a6e4b00 100644 --- a/.github/workflows/build_packages_cron.yaml +++ b/.github/workflows/build_packages_cron.yaml @@ -23,6 +23,7 @@ jobs: profile: - ['emqx', 'master'] - ['emqx', 'release-57'] + - ['emqx', 'release-58'] os: - ubuntu22.04 - amzn2023 diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml index f086b97f6..f6077262f 100644 --- a/.github/workflows/codeql.yaml +++ b/.github/workflows/codeql.yaml @@ -24,6 +24,7 @@ jobs: branch: - master - release-57 + - release-58 language: - cpp - python diff --git a/.github/workflows/green_master.yaml b/.github/workflows/green_master.yaml index 8479ea5ed..a5317027b 100644 --- a/.github/workflows/green_master.yaml +++ b/.github/workflows/green_master.yaml @@ -24,6 +24,7 @@ jobs: ref: - master - release-57 + - release-58 steps: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: diff --git a/Makefile b/Makefile index 2f4519cfd..69667c952 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,8 @@ include env.sh # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.9.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1 +export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.1 export EMQX_RELUP ?= true export EMQX_REL_FORM ?= tgz diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 09f7495ea..1c3fd770c 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -683,6 +683,7 @@ end). -define(FRAME_PARSE_ERROR, frame_parse_error). -define(FRAME_SERIALIZE_ERROR, frame_serialize_error). + -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})). -define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})). diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index c8e84ea06..7979d00fd 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,7 +32,7 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.7.1"). +-define(EMQX_RELEASE_CE, "5.8.0-alpha.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.7.1"). +-define(EMQX_RELEASE_EE, "5.8.0-alpha.1"). diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index a65b05089..885049a35 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -91,7 +91,7 @@ ?_DO_TRACE(Tag, Msg, Meta), ?SLOG( Level, - (emqx_trace_formatter:format_meta_map(Meta))#{msg => Msg, tag => Tag}, + (Meta)#{msg => Msg, tag => Tag}, #{is_trace => false} ) end). diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b98728ed1..6e629e9fa 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}}, diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 20b1445c9..fca68ba8c 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.3.3"}, + {vsn, "5.3.4"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 1e68d74d6..c2c631b0b 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -146,7 +146,9 @@ -type replies() :: emqx_types:packet() | reply() | [reply()]. -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}). - +-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState), + ((ConnState == connected) orelse (ConnState == reauthenticating)) +). -define(IS_COMMON_SESSION_TIMER(N), ((N == retry_delivery) orelse (N == expire_awaiting_rel)) ). @@ -337,7 +339,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) -> | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}. handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel); handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) -> @@ -567,29 +569,8 @@ handle_in( process_disconnect(ReasonCode, Properties, NChannel); handle_in(?AUTH_PACKET(), Channel) -> handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> - shutdown(shutdown_count(frame_error, Reason), Channel); -handle_in( - {frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting} -) -> - shutdown( - shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel - ); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) -> - shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); -handle_in( - {frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState} -) when - ConnState =:= connected orelse ConnState =:= reauthenticating --> - handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating --> - handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), - {ok, Channel}; +handle_in({frame_error, Reason}, Channel) -> + handle_frame_error(Reason, Channel); handle_in(Packet, Channel) -> ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}), handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel). @@ -1021,6 +1002,68 @@ not_nacked({deliver, _Topic, Msg}) -> true end. +%%-------------------------------------------------------------------- +%% Handle Frame Error +%%-------------------------------------------------------------------- + +handle_frame_error( + Reason = #{cause := frame_too_large}, + Channel = #channel{conn_state = ConnState, conninfo = ConnInfo} +) when + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) +-> + ShutdownCount = shutdown_count(frame_error, Reason), + case proto_ver(Reason, ConnInfo) of + ?MQTT_PROTO_V5 -> + handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); + _ -> + shutdown(ShutdownCount, Channel) + end; +%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting, +%% otherwise DONOT send any CONNACK or DISCONNECT packet. +handle_frame_error( + Reason, + Channel = #channel{conn_state = ConnState, conninfo = ConnInfo} +) when + is_map(Reason) andalso + (ConnState == idle orelse ConnState == connecting) +-> + ShutdownCount = shutdown_count(frame_error, Reason), + ProtoVer = proto_ver(Reason, ConnInfo), + NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}}, + case ProtoVer of + ?MQTT_PROTO_V5 -> + shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel); + _ -> + shutdown(ShutdownCount, NChannel) + end; +handle_frame_error( + Reason, + Channel = #channel{conn_state = connecting} +) -> + shutdown( + shutdown_count(frame_error, Reason), + ?CONNACK_PACKET(?RC_MALFORMED_PACKET), + Channel + ); +handle_frame_error( + Reason, + Channel = #channel{conn_state = ConnState} +) when + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) +-> + handle_out( + disconnect, + {?RC_MALFORMED_PACKET, Reason}, + Channel + ); +handle_frame_error( + Reason, + Channel = #channel{conn_state = disconnected} +) -> + ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), + {ok, Channel}. + %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- @@ -1289,7 +1332,7 @@ handle_info( session = Session } ) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), @@ -2636,8 +2679,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) -> NAliases = maps:put(Topic, AliasId, Aliases), TopicAliases#{outbound => NAliases}. --compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}). - +-compile({inline, [reply/2, shutdown/2, shutdown/3]}). reply(Reply, Channel) -> {reply, Reply, Channel}. @@ -2673,13 +2715,13 @@ disconnect_and_shutdown( ?IS_MQTT_V5 = #channel{conn_state = ConnState} ) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> NChannel = ensure_disconnected(Reason, Channel), shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel); %% mqtt v3/v4 connected sessions disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating + ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState) -> NChannel = ensure_disconnected(Reason, Channel), shutdown(Reason, Reply, NChannel); @@ -2722,6 +2764,13 @@ is_durable_session(#channel{session = Session}) -> false end. +proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) -> + ProtoVer; +proto_ver(_Reason, #{proto_ver := ProtoVer}) -> + ProtoVer; +proto_ver(_, _) -> + ?MQTT_PROTO_V4. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index a3c545ea6..c0801b1e3 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -783,7 +783,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> input_bytes => Data, parsed_packets => Packets }), - {[{frame_error, Reason} | Packets], State}; + NState = enrich_state(Reason, State), + {[{frame_error, Reason} | Packets], NState}; error:Reason:Stacktrace -> ?LOG(error, #{ at_state => emqx_frame:describe_state(ParseState), @@ -1227,6 +1228,12 @@ inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. +enrich_state(#{parse_state := NParseState}, State) -> + Serialize = emqx_frame:serialize_opts(NParseState), + State#state{parse_state = NParseState, serialize = Serialize}; +enrich_state(_, State) -> + State. + set_tcp_keepalive({quic, _Listener}) -> ok; set_tcp_keepalive({Type, Id}) -> diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 41fc00f4c..0d8ec160c 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -267,28 +267,50 @@ packet(Header, Variable) -> packet(Header, Variable, Payload) -> #mqtt_packet{header = Header, variable = Variable, payload = Payload}. -parse_connect(FrameBin, StrictMode) -> - {ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name), - case ProtoName of - <<"MQTT">> -> - ok; - <<"MQIsdp">> -> - ok; - _ -> - %% from spec: the server MAY send disconnect with reason code 0x84 - %% we chose to close socket because the client is likely not talking MQTT anyway - ?PARSE_ERR(#{ - cause => invalid_proto_name, - expected => <<"'MQTT' or 'MQIsdp'">>, - received => ProtoName - }) - end, - parse_connect2(ProtoName, Rest, StrictMode). +parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) -> + {ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name), + %% No need to parse and check proto_ver if proto_name is invalid, check it first + %% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2` + _ = validate_proto_name(ProtoName), + {IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0), + NOptions = Options#{version => ProtoVer}, + try + do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode) + catch + throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) -> + ?PARSE_ERR( + ReasonM#{ + proto_ver => ProtoVer, + proto_name => ProtoName, + parse_state => ?NONE(NOptions) + } + ); + throw:{?FRAME_PARSE_ERROR, Reason} -> + ?PARSE_ERR( + #{ + cause => Reason, + proto_ver => ProtoVer, + proto_name => ProtoName, + parse_state => ?NONE(NOptions) + } + ) + end. -parse_connect2( +do_parse_connect( ProtoName, - <>, + IsBridge, + ProtoVer, + << + UsernameFlagB:1, + PasswordFlagB:1, + WillRetainB:1, + WillQoS:2, + WillFlagB:1, + CleanStart:1, + Reserved:1, + KeepAlive:16/big, + Rest/binary + >>, StrictMode ) -> _ = validate_connect_reserved(Reserved), @@ -303,14 +325,14 @@ parse_connect2( UsernameFlag = bool(UsernameFlagB), PasswordFlag = bool(PasswordFlagB) ), - {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), + {Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode), {ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid), ConnPacket = #mqtt_packet_connect{ proto_name = ProtoName, proto_ver = ProtoVer, %% For bridge mode, non-standard implementation %% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html - is_bridge = (BridgeTag =:= 8), + is_bridge = IsBridge, clean_start = bool(CleanStart), will_flag = WillFlag, will_qos = WillQoS, @@ -343,16 +365,16 @@ parse_connect2( unexpected_trailing_bytes => size(Rest7) }) end; -parse_connect2(_ProtoName, Bin, _StrictMode) -> - %% sent less than 32 bytes +do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) -> + %% sent less than 24 bytes ?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}). parse_packet( #mqtt_packet_header{type = ?CONNECT}, FrameBin, - #{strict_mode := StrictMode} + Options ) -> - parse_connect(FrameBin, StrictMode); + parse_connect(FrameBin, Options); parse_packet( #mqtt_packet_header{type = ?CONNACK}, <>, @@ -516,6 +538,12 @@ parse_packet_id(<>) -> parse_packet_id(_) -> ?PARSE_ERR(invalid_packet_id). +parse_connect_proto_ver(<>) -> + {_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest}; +parse_connect_proto_ver(Bin) -> + %% sent less than 1 bytes or empty + ?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}). + parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 -> {#{}, Bin}; %% TODO: version mess? @@ -739,6 +767,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize, strict_mode := StrictMode}) initial_serialize_opts(Opts) -> maps:merge(?DEFAULT_OPTIONS, Opts). +serialize_opts(?NONE(Options)) -> + maps:merge(?DEFAULT_OPTIONS, Options); serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), #{version => ProtoVer, max_size => MaxSize, strict_mode => false}. @@ -1157,18 +1187,34 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos); validate_subqos([_ | T]) -> validate_subqos(T); validate_subqos([]) -> ok. +%% from spec: the server MAY send disconnect with reason code 0x84 +%% we chose to close socket because the client is likely not talking MQTT anyway +validate_proto_name(<<"MQTT">>) -> + ok; +validate_proto_name(<<"MQIsdp">>) -> + ok; +validate_proto_name(ProtoName) -> + ?PARSE_ERR(#{ + cause => invalid_proto_name, + expected => <<"'MQTT' or 'MQIsdp'">>, + received => ProtoName + }). + %% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3] +-compile({inline, [validate_connect_reserved/1]}). validate_connect_reserved(0) -> ok; validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag). +-compile({inline, [validate_connect_will/3]}). %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] -validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos); +validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos); %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos); %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain); validate_connect_will(_, _, _) -> ok. +-compile({inline, [validate_connect_password_flag/4]}). %% MQTT-v3.1 %% Username flag and password flag are not strongly related %% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect @@ -1183,6 +1229,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) -> validate_connect_password_flag(_, _, _, _) -> ok. +-compile({inline, [bool/1]}). bool(0) -> false; bool(1) -> true. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index ba50b3630..87fecffd4 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -432,7 +432,7 @@ do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTE esockd:open( Id, ListenOn, - merge_default(esockd_opts(Id, Type, Name, Opts)) + merge_default(esockd_opts(Id, Type, Name, Opts, _OldOpts = undefined)) ); %% Start MQTT/WS listener do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) -> @@ -476,7 +476,7 @@ do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when Id = listener_id(Type, Name), case maps:get(bind, OldConf) of ListenOn -> - esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf)); + esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf, OldConf)); _Different -> %% TODO %% Again, we're not strictly required to drop live connections in this case. @@ -588,7 +588,7 @@ perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) -> perform_listener_change(stop, {Type, Name, Conf}) -> stop_listener(Type, Name, Conf). -esockd_opts(ListenerId, Type, Name, Opts0) -> +esockd_opts(ListenerId, Type, Name, Opts0, OldOpts) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = @@ -620,7 +620,7 @@ esockd_opts(ListenerId, Type, Name, Opts0) -> tcp -> Opts3#{tcp_options => tcp_opts(Opts0)}; ssl -> - OptsWithCRL = inject_crl_config(Opts0), + OptsWithCRL = inject_crl_config(Opts0, OldOpts), OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL), OptsWithRootFun = inject_root_fun(OptsWithSNI), OptsWithVerifyFun = inject_verify_fun(OptsWithRootFun), @@ -996,7 +996,7 @@ inject_sni_fun(_ListenerId, Conf) -> Conf. inject_crl_config( - Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts} + Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}, _OldOpts ) -> HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)), Conf#{ @@ -1006,7 +1006,16 @@ inject_crl_config( crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}} } }; -inject_crl_config(Conf) -> +inject_crl_config(#{ssl_options := SSLOpts0} = Conf0, #{} = OldOpts) -> + %% Note: we must set crl options to `undefined' to unset them. Otherwise, + %% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL + %% options were previously enabled. + WasEnabled = emqx_utils_maps:deep_get([ssl_options, enable_crl_check], OldOpts, false), + Undefine = fun(Acc, K) -> emqx_utils_maps:put_if(Acc, K, undefined, WasEnabled) end, + SSLOpts1 = Undefine(SSLOpts0, crl_check), + SSLOpts = Undefine(SSLOpts1, crl_cache), + Conf0#{ssl_options := SSLOpts}; +inject_crl_config(Conf, undefined = _OldOpts) -> Conf. maybe_unregister_ocsp_stapling_refresh( diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 515b2114a..fade19112 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -105,7 +105,7 @@ format(Msg, Meta, Config) -> maybe_format_msg(undefined, _Meta, _Config) -> #{}; maybe_format_msg({report, Report0} = Msg, #{report_cb := Cb} = Meta, Config) -> - Report = emqx_logger_textfmt:try_encode_payload(Report0, Config), + Report = emqx_logger_textfmt:try_encode_meta(Report0, Config), case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of true -> %% reporting a map without a customised format function diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index b7cc64510..0c6223f8d 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -20,7 +20,7 @@ -export([format/2]). -export([check_config/1]). --export([try_format_unicode/1, try_encode_payload/2]). +-export([try_format_unicode/1, try_encode_meta/2]). %% Used in the other log formatters -export([evaluate_lazy_values_if_dbg_level/1, evaluate_lazy_values/1]). @@ -111,7 +111,7 @@ is_list_report_acceptable(_) -> enrich_report(ReportRaw0, Meta, Config) -> %% clientid and peername always in emqx_conn's process metadata. %% topic and username can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2 - ReportRaw = try_encode_payload(ReportRaw0, Config), + ReportRaw = try_encode_meta(ReportRaw0, Config), Topic = case maps:get(topic, Meta, undefined) of undefined -> maps:get(topic, ReportRaw, undefined); @@ -180,9 +180,22 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) -> enrich_topic(Msg, _) -> Msg. -try_encode_payload(#{payload := Payload} = Report, #{payload_encode := Encode}) -> +try_encode_meta(Report, Config) -> + lists:foldl( + fun(Meta, Acc) -> + try_encode_meta(Meta, Acc, Config) + end, + Report, + [payload, packet] + ). + +try_encode_meta(payload, #{payload := Payload} = Report, #{payload_encode := Encode}) -> Report#{payload := encode_payload(Payload, Encode)}; -try_encode_payload(Report, _Config) -> +try_encode_meta(packet, #{packet := Packet} = Report, #{payload_encode := Encode}) when + is_tuple(Packet) +-> + Report#{packet := emqx_packet:format(Packet, Encode)}; +try_encode_meta(_, Report, _Config) -> Report. encode_payload(Payload, text) -> @@ -190,4 +203,5 @@ encode_payload(Payload, text) -> encode_payload(_Payload, hidden) -> "******"; encode_payload(Payload, hex) -> - binary:encode_hex(Payload). + Bin = emqx_utils_conv:bin(Payload), + binary:encode_hex(Bin). diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 9d6516126..6f5fa0972 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -51,7 +51,6 @@ ]). -export([ - format/1, format/2 ]). @@ -481,10 +480,6 @@ will_msg(#mqtt_packet_connect{ headers = #{username => Username, properties => Props} }. -%% @doc Format packet --spec format(emqx_types:packet()) -> iolist(). -format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()). - %% @doc Format packet -spec format(emqx_types:packet(), hex | text | hidden) -> iolist(). format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) -> diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index df9520d90..c63014cea 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -62,7 +62,7 @@ streams := [{pid(), quicer:stream_handle()}], %% New stream opts stream_opts := map(), - %% If conneciton is resumed from session ticket + %% If connection is resumed from session ticket is_resumed => boolean(), %% mqtt message serializer config serialize => undefined, @@ -70,8 +70,8 @@ }. -type cb_ret() :: quicer_lib:cb_ret(). -%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked -%% for the activation from control stream after it is accepted as a legit conneciton. +%% @doc Data streams initializations are started in parallel with control streams, data streams are blocked +%% for the activation from control stream after it is accepted as a legit connection. %% For security, the initial number of allowed data streams from client should be limited by %% 'peer_bidi_stream_count` & 'peer_unidi_stream_count` -spec activate_data_streams(pid(), { @@ -80,7 +80,7 @@ activate_data_streams(ConnOwner, {PS, Serialize, Channel}) -> gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity). -%% @doc conneciton owner init callback +%% @doc connection owner init callback -spec init(map()) -> {ok, cb_state()}. init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); diff --git a/apps/emqx/src/emqx_tls_lib.erl b/apps/emqx/src/emqx_tls_lib.erl index e1de50385..1b9e89199 100644 --- a/apps/emqx/src/emqx_tls_lib.erl +++ b/apps/emqx/src/emqx_tls_lib.erl @@ -589,6 +589,14 @@ ensure_valid_options(Options, Versions) -> ensure_valid_options([], _, Acc) -> lists:reverse(Acc); +ensure_valid_options([{K, undefined} | T], Versions, Acc) when + K =:= crl_check; + K =:= crl_cache +-> + %% Note: we must set crl options to `undefined' to unset them. Otherwise, + %% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL + %% options were previously enabled. + ensure_valid_options(T, Versions, [{K, undefined} | Acc]); ensure_valid_options([{_, undefined} | T], Versions, Acc) -> ensure_valid_options(T, Versions, Acc); ensure_valid_options([{_, ""} | T], Versions, Acc) -> diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index e540ae82a..728280700 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -17,7 +17,6 @@ -include("emqx_mqtt.hrl"). -export([format/2]). --export([format_meta_map/1]). %% logger_formatter:config/0 is not exported. -type config() :: map(). @@ -43,10 +42,6 @@ format( format(Event, Config) -> emqx_logger_textfmt:format(Event, Config). -format_meta_map(Meta) -> - Encode = emqx_trace_handler:payload_encode(), - format_meta_map(Meta, Encode). - format_meta_map(Meta, Encode) -> format_meta_map(Meta, Encode, [ {packet, fun format_packet/2}, diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 1fa7433b1..bb9756a5c 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) -> %% TODO: should not close the ws connection ?LOG(error, #{msg => "unexpected_frame", frame => Frame}), shutdown(unexpected_ws_frame, State). + websocket_info({call, From, Req}, State) -> handle_call(From, Req, State); websocket_info({cast, rate_limit}, State) -> @@ -737,7 +738,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> input_bytes => Data }), FrameError = {frame_error, Reason}, - {[{incoming, FrameError} | Packets], State}; + NState = enrich_state(Reason, State), + {[{incoming, FrameError} | Packets], NState}; error:Reason:Stacktrace -> ?LOG(error, #{ at_state => emqx_frame:describe_state(ParseState), @@ -830,7 +832,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> ?LOG(warning, #{ msg => "packet_discarded", reason => "frame_too_large", - packet => emqx_packet:format(Packet) + packet => Packet }), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), @@ -1069,6 +1071,13 @@ check_max_connection(Type, Listener) -> {denny, Reason} end end. + +enrich_state(#{parse_state := NParseState}, State) -> + Serialize = emqx_frame:serialize_opts(NParseState), + State#state{parse_state = NParseState, serialize = Serialize}; +enrich_state(_, State) -> + State. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index d157cc914..83b862892 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -414,24 +414,32 @@ t_handle_in_auth(_) -> emqx_channel:handle_in(?AUTH_PACKET(), Channel). t_handle_in_frame_error(_) -> - IdleChannel = channel(#{conn_state => idle}), - {shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} = - emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel), + IdleChannelV5 = channel(#{conn_state => idle}), + %% no CONNACK packet for v4 + ?assertMatch( + {shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan}, + emqx_channel:handle_in( + {frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5) + ) + ), + ConnectingChan = channel(#{conn_state => connecting}), ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), - {shutdown, - #{ - shutdown_count := frame_too_large, - cause := frame_too_large, - limit := 100, - received := 101 - }, - ConnackPacket, - _} = + ?assertMatch( + {shutdown, + #{ + shutdown_count := frame_too_large, + cause := frame_too_large, + limit := 100, + received := 101 + }, + ConnackPacket, _}, emqx_channel:handle_in( {frame_error, #{cause => frame_too_large, received => 101, limit => 100}}, ConnectingChan - ), + ) + ), + DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE), ConnectedChan = channel(#{conn_state => connected}), ?assertMatch( diff --git a/apps/emqx/test/emqx_crl_cache_SUITE.erl b/apps/emqx/test/emqx_crl_cache_SUITE.erl index 1d7d0f1d5..51196439e 100644 --- a/apps/emqx/test/emqx_crl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_crl_cache_SUITE.erl @@ -138,13 +138,14 @@ init_per_testcase(t_refresh_config = TestCase, Config) -> ]; init_per_testcase(TestCase, Config) when TestCase =:= t_update_listener; + TestCase =:= t_update_listener_enable_disable; TestCase =:= t_validations -> ct:timetrap({seconds, 30}), ok = snabbkaffe:start_trace(), %% when running emqx standalone tests, we can't use those %% features. - case does_module_exist(emqx_management) of + case does_module_exist(emqx_mgmt) of true -> DataDir = ?config(data_dir, Config), CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]), @@ -165,7 +166,7 @@ init_per_testcase(TestCase, Config) when {emqx_conf, #{config => #{listeners => #{ssl => #{default => ListenerConf}}}}}, emqx, emqx_management, - {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + emqx_mgmt_api_test_util:emqx_dashboard() ], #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} ), @@ -206,6 +207,7 @@ read_crl(Filename) -> end_per_testcase(TestCase, Config) when TestCase =:= t_update_listener; + TestCase =:= t_update_listener_enable_disable; TestCase =:= t_validations -> Skip = proplists:get_bool(skip_does_not_apply, Config), @@ -1057,3 +1059,104 @@ do_t_validations(_Config) -> ), ok. + +%% Checks that if CRL is ever enabled and then disabled, clients can connect, even if they +%% would otherwise not have their corresponding CRLs cached and fail with `{bad_crls, +%% no_relevant_crls}`. +t_update_listener_enable_disable(Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ct:pal("skipping as this test does not apply in this profile"), + ok; + false -> + do_t_update_listener_enable_disable(Config) + end. + +do_t_update_listener_enable_disable(Config) -> + DataDir = ?config(data_dir, Config), + Keyfile = filename:join([DataDir, "server.key.pem"]), + Certfile = filename:join([DataDir, "server.cert.pem"]), + Cacertfile = filename:join([DataDir, "ca-chain.cert.pem"]), + ClientCert = filename:join(DataDir, "client.cert.pem"), + ClientKey = filename:join(DataDir, "client.key.pem"), + + ListenerId = "ssl:default", + %% Enable CRL + {ok, {{_, 200, _}, _, ListenerData0}} = get_listener_via_api(ListenerId), + CRLConfig0 = + #{ + <<"ssl_options">> => + #{ + <<"keyfile">> => Keyfile, + <<"certfile">> => Certfile, + <<"cacertfile">> => Cacertfile, + <<"enable_crl_check">> => true, + <<"fail_if_no_peer_cert">> => true + } + }, + ListenerData1 = emqx_utils_maps:deep_merge(ListenerData0, CRLConfig0), + {ok, {_, _, ListenerData2}} = update_listener_via_api(ListenerId, ListenerData1), + ?assertMatch( + #{ + <<"ssl_options">> := + #{ + <<"enable_crl_check">> := true, + <<"verify">> := <<"verify_peer">>, + <<"fail_if_no_peer_cert">> := true + } + }, + ListenerData2 + ), + + %% Disable CRL + CRLConfig1 = + #{ + <<"ssl_options">> => + #{ + <<"keyfile">> => Keyfile, + <<"certfile">> => Certfile, + <<"cacertfile">> => Cacertfile, + <<"enable_crl_check">> => false, + <<"fail_if_no_peer_cert">> => true + } + }, + ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, CRLConfig1), + redbug:start( + [ + "esockd_server:get_listener_prop -> return", + "esockd_server:set_listener_prop -> return", + "esockd:merge_opts -> return", + "esockd_listener_sup:set_options -> return", + "emqx_listeners:inject_crl_config -> return" + ], + [{msgs, 100}] + ), + {ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3), + ?assertMatch( + #{ + <<"ssl_options">> := + #{ + <<"enable_crl_check">> := false, + <<"verify">> := <<"verify_peer">>, + <<"fail_if_no_peer_cert">> := true + } + }, + ListenerData4 + ), + + %% Now the client that would be blocked tries to connect and should now be allowed. + {ok, C} = emqtt:start_link([ + {ssl, true}, + {ssl_opts, [ + {certfile, ClientCert}, + {keyfile, ClientKey}, + {verify, verify_none} + ]}, + {port, 8883} + ]), + ?assertMatch({ok, _}, emqtt:connect(C)), + emqtt:stop(C), + + ?assertNotReceive({http_get, _}), + + ok. diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 9c8a99547..0c5a36231 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -63,6 +63,7 @@ groups() -> t_parse_malformed_properties, t_malformed_connect_header, t_malformed_connect_data, + t_malformed_connect_data_proto_ver, t_reserved_connect_flag, t_invalid_clientid, t_undefined_password, @@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) -> ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). +%% TODO: parse v3 with 0 length clientid + t_serialize_parse_v3_connect(_) -> Bin = <<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117, @@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) -> header = #mqtt_packet_header{type = ?CONNECT}, variable = #mqtt_packet_connect{ clientid = <<"C_00:0C:29:2B:77:52">>, - proto_ver = 16#03, + proto_ver = ?MQTT_PROTO_V3, proto_name = <<"MQIsdp">>, is_bridge = true, will_retain = true, @@ -686,15 +689,36 @@ t_malformed_connect_header(_) -> ). t_malformed_connect_data(_) -> + ProtoNameWithLen = <<0, 6, "MQIsdp">>, + ConnectFlags = <<2#00000000>>, + ClientIdwithLen = <<0, 1, "a">>, + UnexpectedRestBin = <<0, 1, 2>>, ?ASSERT_FRAME_THROW( - #{cause := malformed_connect, unexpected_trailing_bytes := _}, - emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>) + #{cause := malformed_connect, unexpected_trailing_bytes := 3}, + emqx_frame:parse( + <<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0, + ClientIdwithLen/binary, UnexpectedRestBin/binary>> + ) + ). + +t_malformed_connect_data_proto_ver(_) -> + Proto3NameWithLen = <<0, 6, "MQIsdp">>, + ?ASSERT_FRAME_THROW( + #{cause := malformed_connect, header_bytes := <<>>}, + emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>) + ), + ProtoNameWithLen = <<0, 4, "MQTT">>, + ?ASSERT_FRAME_THROW( + #{cause := malformed_connect, header_bytes := <<>>}, + emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>) ). t_reserved_connect_flag(_) -> ?assertException( throw, - {frame_parse_error, reserved_connect_flag}, + {frame_parse_error, #{ + cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">> + }}, emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>) ). @@ -726,7 +750,7 @@ t_undefined_password(_) -> }, variable = #mqtt_packet_connect{ proto_name = <<"MQTT">>, - proto_ver = 4, + proto_ver = ?MQTT_PROTO_V4, is_bridge = false, clean_start = true, will_flag = false, @@ -774,7 +798,9 @@ t_invalid_will_retain(_) -> 54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>, ?assertException( throw, - {frame_parse_error, invalid_will_retain}, + {frame_parse_error, #{ + cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBin) ), ok. @@ -796,22 +822,30 @@ t_invalid_will_qos(_) -> ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1)) ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2)) ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3)) ), ?assertException( throw, - {frame_parse_error, invalid_will_qos}, + {frame_parse_error, #{ + cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">> + }}, emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3)) ), ok. diff --git a/apps/emqx/test/emqx_packet_SUITE.erl b/apps/emqx/test/emqx_packet_SUITE.erl index 4ba6d1d82..04ad53a1e 100644 --- a/apps/emqx/test/emqx_packet_SUITE.erl +++ b/apps/emqx/test/emqx_packet_SUITE.erl @@ -377,42 +377,60 @@ t_will_msg(_) -> t_format(_) -> io:format("~ts", [ - emqx_packet:format(#mqtt_packet{ - header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0}, - variable = undefined - }) - ]), - io:format("~ts", [ - emqx_packet:format(#mqtt_packet{ - header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">> - }) + emqx_packet:format( + #mqtt_packet{ + header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0}, + variable = undefined + }, + text + ) ]), + io:format( + "~ts", + [ + emqx_packet:format( + #mqtt_packet{ + header = #mqtt_packet_header{type = ?CONNACK}, + variable = 1, + payload = <<"payload">> + }, + text + ) + ] + ), io:format("~ts", [ emqx_packet:format( - ?CONNECT_PACKET(#mqtt_packet_connect{ - will_flag = true, - will_retain = true, - will_qos = ?QOS_2, - will_topic = <<"topic">>, - will_payload = <<"payload">> - }) + ?CONNECT_PACKET( + #mqtt_packet_connect{ + will_flag = true, + will_retain = true, + will_qos = ?QOS_2, + will_topic = <<"topic">>, + will_payload = <<"payload">> + } + ), + text ) ]), io:format("~ts", [ - emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password})) + emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}), text) ]), - io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), - io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), - io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), - io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), - io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]), + io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER), text)]), + io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1), text)]), io:format("~ts", [ - emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}])) + emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>), text) ]), - io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]), - io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), - io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]), - io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]). + io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98), text)]), + io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99), text)]), + io:format("~ts", [ + emqx_packet:format( + ?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]), text + ) + ]), + io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]), text)]), + io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]), text)]), + io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90), text)]), + io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128), text)]). t_parse_empty_publish(_) -> %% 52: 0011(type=PUBLISH) 0100 (QoS=2) diff --git a/apps/emqx_auth/src/emqx_auth.app.src b/apps/emqx_auth/src/emqx_auth.app.src index d61ba281b..d2212ffe2 100644 --- a/apps/emqx_auth/src/emqx_auth.app.src +++ b/apps/emqx_auth/src/emqx_auth.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth, [ {description, "EMQX Authentication and authorization"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {modules, []}, {registered, [emqx_auth_sup]}, {applications, [ diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl index 8bc60a600..e76d52535 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl @@ -477,9 +477,15 @@ authorize_deny( sources() ) -> authz_result(). -authorize(Client, PubSub, Topic, _DefaultResult, Sources) -> +authorize(#{username := Username} = Client, PubSub, Topic, _DefaultResult, Sources) -> case maps:get(is_superuser, Client, false) of true -> + ?tp(authz_skipped, #{reason => client_is_superuser, action => PubSub}), + ?TRACE("AUTHZ", "authorization_skipped_as_superuser", #{ + username => Username, + topic => Topic, + action => emqx_access_control:format_action(PubSub) + }), emqx_metrics:inc(?METRIC_SUPERUSER), {stop, #{result => allow, from => superuser}}; false -> diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl index 575eb4109..4745d7ec6 100644 --- a/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl @@ -674,5 +674,77 @@ t_publish_last_will_testament_banned_client_connecting(_Config) -> ok. +t_sikpped_as_superuser(_Config) -> + ClientInfo = #{ + clientid => <<"clientid">>, + username => <<"username">>, + peerhost => {127, 0, 0, 1}, + zone => default, + listener => {tcp, default}, + is_superuser => true + }, + ?check_trace( + begin + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_0), <<"p/t/0">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_1), <<"p/t/1">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_2), <<"p/t/2">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_0), <<"s/t/0">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_1), <<"s/t/1">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_2), <<"s/t/2">>) + ) + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_0, action_type := publish} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_1, action_type := publish} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_2, action_type := publish} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_0, action_type := subscribe} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_1, action_type := subscribe} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_2, action_type := subscribe} + } + ], + ?of_kind(authz_skipped, Trace) + ), + ok + end + ), + + ok = snabbkaffe:stop(). + stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps). diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index 9cf62ae15..b2885711d 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_http, [ {description, "EMQX External HTTP API Authentication and Authorization"}, - {vsn, "0.3.0"}, + {vsn, "0.3.1"}, {registered, []}, {mod, {emqx_auth_http_app, []}}, {applications, [ diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src index 1edb5fc67..885a0002b 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_jwt, [ {description, "EMQX JWT Authentication and Authorization"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, []}, {mod, {emqx_auth_jwt_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src index 7fcdda1d3..3e862e474 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mnesia, [ {description, "EMQX Buitl-in Database Authentication and Authorization"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {mod, {emqx_auth_mnesia_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src index 5ffc59787..837f20230 100644 --- a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src +++ b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mongodb, [ {description, "EMQX MongoDB Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_mongodb_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src b/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src index abd9a7e27..07329c5b0 100644 --- a/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src +++ b/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mysql, [ {description, "EMQX MySQL Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_mysql_app, []}}, {applications, [ diff --git a/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src b/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src index 1eabc93f0..1fddca42f 100644 --- a/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src +++ b/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_postgresql, [ {description, "EMQX PostgreSQL Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_postgresql_app, []}}, {applications, [ diff --git a/apps/emqx_auth_redis/src/emqx_auth_redis.app.src b/apps/emqx_auth_redis/src/emqx_auth_redis.app.src index 9b43eca2c..7d82242a7 100644 --- a/apps/emqx_auth_redis/src/emqx_auth_redis.app.src +++ b/apps/emqx_auth_redis/src/emqx_auth_redis.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_redis, [ {description, "EMQX Redis Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_redis_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 30930c494..ea0339943 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 6b160f3b3..fdbbc5376 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1154,7 +1154,7 @@ t_bridges_probe(Config) -> ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, - <<"message">> := <<"Connection refused">> + <<"message">> := <<"Connection refused", _/binary>> }}, request_json( post, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index d98f4f926..46d1883bd 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) -> ), ?force_ordering( - #{?snk_kind := call_query}, + #{?snk_kind := SNKKind} when + SNKKind =:= call_query orelse SNKKind =:= simple_query_enter, #{?snk_kind := cut_connection, ?snk_span := start} ), %% Note: order of arguments here is reversed compared to `?force_ordering'. @@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) -> emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort) ) end), + ?tp("publishing_message", #{}), try {_, {ok, _}} = snabbkaffe:wait_async_action( @@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) -> infinity ) after + ?tp("healing_failure", #{}), emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort) end, {ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index a39c4be99..6d786a2bd 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index 8b8d379e4..e215014a9 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_http, [ {description, "EMQX HTTP Bridge and Connector Application"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, ehttpc]}, {env, [ diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 0e906203d..5a8973666 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 74d3a5f54..d7408f0e5 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1918,13 +1918,14 @@ t_node_joins_existing_cluster(Config) -> _Attempts2 = 50, [] =/= erpc:call(N2, emqx_router, lookup_routes, [MQTTTopic]) ), + NumMsgs = 50 * NPartitions, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{ ?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _} }), - NPartitions, + NumMsgs, 20_000 ), lists:foreach( @@ -1933,7 +1934,7 @@ t_node_joins_existing_cluster(Config) -> Val = <<"v", (integer_to_binary(N))/binary>>, publish(Config, KafkaTopic, [#{key => Key, value => Val}]) end, - lists:seq(1, 10 * NPartitions) + lists:seq(1, NumMsgs) ), {ok, _} = snabbkaffe:receive_events(SRef1), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index d43ec5591..0d3796398 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 118542356..6b9a40123 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -98,7 +98,7 @@ on_start(ResourceId, #{server := Server} = Conf) -> server => Server }}; {error, Reason} -> - {error, Reason} + {error, emqx_maybe:define(explain_error(Reason), Reason)} end. on_add_channel( @@ -200,7 +200,7 @@ on_get_channel_status( } = _State ) when is_map_key(ChannelId, Channels) -> %% The channel should be ok as long as the MQTT client is ok - connected. + ?status_connected. on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). @@ -356,10 +356,15 @@ on_get_status(_ResourceId, State) -> Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)], try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of Statuses -> - combine_status(Statuses) + case combine_status(Statuses) of + {Status, Msg} -> + {Status, State, Msg}; + Status -> + Status + end catch exit:timeout -> - connecting + ?status_connecting end. get_status({_Pool, Worker}) -> @@ -367,7 +372,7 @@ get_status({_Pool, Worker}) -> {ok, Client} -> emqx_bridge_mqtt_ingress:status(Client); {error, _} -> - disconnected + ?status_disconnected end. combine_status(Statuses) -> @@ -375,11 +380,25 @@ combine_status(Statuses) -> %% Natural order of statuses: [connected, connecting, disconnected] %% * `disconnected` wins over any other status %% * `connecting` wins over `connected` - case lists:reverse(lists:usort(Statuses)) of + ToStatus = fun + ({S, _Reason}) -> S; + (S) when is_atom(S) -> S + end, + CompareFn = fun(S1A, S2A) -> + S1 = ToStatus(S1A), + S2 = ToStatus(S2A), + S1 > S2 + end, + case lists:usort(CompareFn, Statuses) of + [{Status, Reason} | _] -> + case explain_error(Reason) of + undefined -> Status; + Msg -> {Status, Msg} + end; [Status | _] -> Status; [] -> - disconnected + ?status_disconnected end. mk_ingress_config( @@ -514,15 +533,54 @@ connect(Pid, Name) -> {ok, Pid}; {error, Reason} = Error -> IsDryRun = emqx_resource:is_dry_run(Name), - ?SLOG(?LOG_LEVEL(IsDryRun), #{ - msg => "ingress_client_connect_failed", - reason => Reason, - resource_id => Name - }), + log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name), _ = catch emqtt:stop(Pid), Error end. +log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => explain_error(Reason) + }); +log_connect_error_reason(Level, econnrefused = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => explain_error(Reason) + }); +log_connect_error_reason(Level, Reason, Name) -> + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }). + +explain_error(econnrefused) -> + << + "Connection refused. " + "This error indicates that your connection attempt to the MQTT server was rejected. " + "In simpler terms, the server you tried to connect to refused your request. " + "There can be multiple reasons for this. " + "For example, the MQTT server you're trying to connect to might be down or not " + "running at all or you might have provided the wrong address " + "or port number for the server." + >>; +explain_error({tcp_closed, _}) -> + << + "Your MQTT connection attempt was unsuccessful. " + "It might be at its maximum capacity for handling new connections. " + "To diagnose the issue further, you can check the server logs for " + "any specific messages related to the unavailability or connection limits." + >>; +explain_error(_Reason) -> + undefined. + handle_disconnect(_Reason) -> ok. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 1749d4194..35aea67a6 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). %% management APIs -export([ @@ -234,13 +235,13 @@ status(Pid) -> try case proplists:get_value(socket, info(Pid)) of Socket when Socket /= undefined -> - connected; + ?status_connected; undefined -> - connecting + ?status_connecting end catch exit:{noproc, _} -> - disconnected + ?status_disconnected end. %% diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index a220eb9f7..42cf9d2b8 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -1025,31 +1025,39 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> ct:sleep(1000), %% stop the listener 1883 to make the bridge disconnected - ok = emqx_listeners:stop_listener('tcp:default'), - ct:sleep(1500), - ?assertMatch( - #{<<"status">> := Status} when - Status == <<"connecting">> orelse Status == <<"disconnected">>, - request_bridge(BridgeIDEgress) + ?check_trace( + begin + ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), + ?assertMatch( + #{<<"status">> := Status} when + Status == <<"connecting">> orelse Status == <<"disconnected">>, + request_bridge(BridgeIDEgress) + ), + + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + ?assertMatch( + #{<<"status">> := <<"connected">>}, + request_bridge(BridgeIDEgress) + ), + + N = stop_publisher(Publisher), + + %% all those messages should eventually be delivered + [ + assert_mqtt_msg_received(RemoteTopic, Payload) + || I <- lists:seq(1, N), + Payload <- [integer_to_binary(I)] + ], + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_econnrefused_error, Trace)), + ok + end ), - - %% start the listener 1883 to make the bridge reconnected - ok = emqx_listeners:start_listener('tcp:default'), - timer:sleep(1500), - ?assertMatch( - #{<<"status">> := <<"connected">>}, - request_bridge(BridgeIDEgress) - ), - - N = stop_publisher(Publisher), - - %% all those messages should eventually be delivered - [ - assert_mqtt_msg_received(RemoteTopic, Payload) - || I <- lists:seq(1, N), - Payload <- [integer_to_binary(I)] - ], - ok. start_publisher(Topic, Interval, CtrlPid) -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index b9097b9c3..e0598fa1e 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -131,6 +131,9 @@ hookpoint(Config) -> BridgeId = bridge_id(Config), emqx_bridge_resource:bridge_hookpoint(BridgeId). +simplify_result(Res) -> + emqx_bridge_v2_testlib:simplify_result(Res). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -246,3 +249,46 @@ t_receive_via_rule(Config) -> end ), ok. + +t_connect_with_more_clients_than_the_broker_accepts(Config) -> + Name = ?config(connector_name, Config), + OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), + on_exit(fun() -> + emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) + end), + NewConf = OrgConf#{<<"max_connections">> => 3}, + {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), + ?check_trace( + #{timetrap => 10_000}, + begin + ?assertMatch( + {201, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result( + emqx_bridge_v2_testlib:create_connector_api( + Config, + #{<<"pool_size">> => 100} + ) + ) + ), + ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), + ?assertMatch( + {200, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result(emqx_bridge_v2_testlib:get_connector_api(mqtt, Name)) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)), + ok + end + ), + + ok. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index dcb86a3ca..93603db21 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl index 6d15687f6..fb9a38cc6 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl @@ -11,7 +11,8 @@ action_type_name/0, connector_type_name/0, schema_module/0, - is_action/1 + is_action/1, + connector_action_config_to_bridge_v1_config/2 ]). is_action(_) -> true. @@ -23,3 +24,28 @@ action_type_name() -> pulsar. connector_type_name() -> pulsar. schema_module() -> emqx_bridge_pulsar_pubsub_schema. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, ActionConfig + ), + BridgeV1Config = maps:with(v1_fields(pulsar_producer), BridgeV1Config1), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun(RO) -> maps:with(v1_fields(producer_resource_opts), RO) end, + BridgeV1Config + ). + +%%------------------------------------------------------------------------------------------ +%% Internal helper functions +%%------------------------------------------------------------------------------------------ + +v1_fields(Struct) -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_pulsar:fields(Struct) + ]. + +to_bin(B) when is_binary(B) -> B; +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 64dde77fb..470f7f832 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -60,6 +60,8 @@ resource_type() -> pulsar. callback_mode() -> async_if_possible. +query_mode(#{resource_opts := #{query_mode := sync}}) -> + simple_sync_internal_buffer; query_mode(_Config) -> simple_async_internal_buffer. @@ -204,12 +206,17 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> sync_timeout => SyncTimeout, is_async => false }), - try - pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) - catch - error:timeout -> - {error, timeout} - end + ?tp_span( + "pulsar_producer_query_enter", + #{instance_id => _InstanceId, message => Message, mode => sync}, + try + ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => sync}), + pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) + catch + error:timeout -> + {error, timeout} + end + ) end. -spec on_query_async( @@ -220,11 +227,11 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> - {error, channel_not_found}; + {error, {unrecoverable_error, channel_not_found}}; {ok, #{message := MessageTmpl, producers := Producers}} -> ?tp_span( - pulsar_producer_on_query_async, - #{instance_id => _InstanceId, message => Message}, + "pulsar_producer_query_enter", + #{instance_id => _InstanceId, message => Message, mode => async}, on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ) end. @@ -235,6 +242,7 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> message => PulsarMessage, is_async => true }), + ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => async}), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). on_format_query_result({ok, Info}) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl index dff62843e..515fcdb5a 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -66,10 +66,8 @@ fields(action_resource_opts) -> batch_size, batch_time, worker_pool_size, - request_ttl, inflight_window, - max_buffer_bytes, - query_mode + max_buffer_bytes ], lists:filter( fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index cd54e2194..0a908f5be 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -843,7 +843,8 @@ do_t_send_with_failure(Config, FailureType) -> ?wait_async_action( emqx:publish(Message0), #{ - ?snk_kind := pulsar_producer_on_query_async, + ?snk_kind := "pulsar_producer_query_enter", + mode := async, ?snk_span := {complete, _} }, 5_000 @@ -970,7 +971,11 @@ t_producer_process_crash(Config) -> {_, {ok, _}} = ?wait_async_action( emqx:publish(Message0), - #{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}}, + #{ + ?snk_kind := "pulsar_producer_query_enter", + mode := async, + ?snk_span := {complete, _} + }, 5_000 ), Data0 = receive_consumed(20_000), diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl index 11caa15c6..94534fafd 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -23,31 +23,25 @@ %%------------------------------------------------------------------------------ all() -> - [ - {group, plain}, - {group, tls} - ]. + All0 = emqx_common_test_helpers:all(?MODULE), + All = All0 -- matrix_cases(), + Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), + Groups ++ All. groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - [ - {plain, AllTCs}, - {tls, AllTCs} - ]. + emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). + +matrix_cases() -> + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% Ensure enterprise bridge module is loaded - _ = emqx_bridge_enterprise:module_info(), - {ok, Cwd} = file:get_cwd(), - PrivDir = ?config(priv_dir, Config), - WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd), Apps = emqx_cth_suite:start( lists:flatten([ ?APPS, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard() ]), - #{work_dir => WorkDir} + #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{suite_apps, Apps} | Config]. @@ -61,6 +55,7 @@ init_per_group(plain = Type, Config) -> case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of true -> Config1 = common_init_per_group(), + ConnectorName = ?MODULE, NewConfig = [ {proxy_name, ProxyName}, @@ -70,7 +65,7 @@ init_per_group(plain = Type, Config) -> {use_tls, false} | Config1 ++ Config ], - create_connector(?MODULE, NewConfig), + create_connector(ConnectorName, NewConfig), NewConfig; false -> maybe_skip_without_ci() @@ -82,6 +77,7 @@ init_per_group(tls = Type, Config) -> case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of true -> Config1 = common_init_per_group(), + ConnectorName = ?MODULE, NewConfig = [ {proxy_name, ProxyName}, @@ -91,17 +87,21 @@ init_per_group(tls = Type, Config) -> {use_tls, true} | Config1 ++ Config ], - create_connector(?MODULE, NewConfig), + create_connector(ConnectorName, NewConfig), NewConfig; false -> maybe_skip_without_ci() - end. + end; +init_per_group(_Group, Config) -> + Config. end_per_group(Group, Config) when Group =:= plain; Group =:= tls -> common_end_per_group(Config), + ok; +end_per_group(_Group, _Config) -> ok. common_init_per_group() -> @@ -189,66 +189,49 @@ pulsar_connector(Config) -> ":", integer_to_binary(PulsarPort) ]), - Connector = #{ - <<"connectors">> => #{ - <<"pulsar">> => #{ - Name => #{ - <<"enable">> => true, - <<"ssl">> => #{ - <<"enable">> => UseTLS, - <<"verify">> => <<"verify_none">>, - <<"server_name_indication">> => <<"auto">> - }, - <<"authentication">> => <<"none">>, - <<"servers">> => ServerURL - } - } - } + InnerConfigMap = #{ + <<"enable">> => true, + <<"ssl">> => #{ + <<"enable">> => UseTLS, + <<"verify">> => <<"verify_none">>, + <<"server_name_indication">> => <<"auto">> + }, + <<"authentication">> => <<"none">>, + <<"servers">> => ServerURL }, - parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). + emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap). pulsar_action(Config) -> + QueryMode = proplists:get_value(query_mode, Config, <<"sync">>), Name = atom_to_binary(?MODULE), - Action = #{ - <<"actions">> => #{ - <<"pulsar">> => #{ - Name => #{ - <<"connector">> => Name, - <<"enable">> => true, - <<"parameters">> => #{ - <<"retention_period">> => <<"infinity">>, - <<"max_batch_bytes">> => <<"1MB">>, - <<"batch_size">> => 100, - <<"strategy">> => <<"random">>, - <<"buffer">> => #{ - <<"mode">> => <<"memory">>, - <<"per_partition_limit">> => <<"10MB">>, - <<"segment_bytes">> => <<"5MB">>, - <<"memory_overload_protection">> => true - }, - <<"message">> => #{ - <<"key">> => <<"${.clientid}">>, - <<"value">> => <<"${.}">> - }, - <<"pulsar_topic">> => ?config(pulsar_topic, Config) - }, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"1s">>, - <<"metrics_flush_interval">> => <<"300ms">> - } - } - } + InnerConfigMap = #{ + <<"connector">> => Name, + <<"enable">> => true, + <<"parameters">> => #{ + <<"retention_period">> => <<"infinity">>, + <<"max_batch_bytes">> => <<"1MB">>, + <<"batch_size">> => 100, + <<"strategy">> => <<"random">>, + <<"buffer">> => #{ + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"10MB">>, + <<"segment_bytes">> => <<"5MB">>, + <<"memory_overload_protection">> => true + }, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.}">> + }, + <<"pulsar_topic">> => ?config(pulsar_topic, Config) + }, + <<"resource_opts">> => #{ + <<"query_mode">> => QueryMode, + <<"request_ttl">> => <<"1s">>, + <<"health_check_interval">> => <<"1s">>, + <<"metrics_flush_interval">> => <<"300ms">> } }, - parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). - -parse_and_check(Key, Mod, Conf, Name) -> - ConfStr = hocon_pp:do(Conf, #{}), - ct:pal(ConfStr), - {ok, RawConf} = hocon:binary(ConfStr, #{format => map}), - hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), - #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf, - RetConf. + emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap). instance_id(Type, Name) -> ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), @@ -404,20 +387,44 @@ assert_status_api(Line, Type, Name, Status) -> ). -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). +proplists_with(Keys, PList) -> + lists:filter(fun({K, _}) -> lists:member(K, Keys) end, PList). + +group_path(Config) -> + case emqx_common_test_helpers:group_path(Config) of + [] -> + undefined; + Path -> + Path + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -t_action_probe(Config) -> +t_action_probe(matrix) -> + [[plain], [tls]]; +t_action_probe(Config) when is_list(Config) -> Name = atom_to_binary(?FUNCTION_NAME), Action = pulsar_action(Config), {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), ?assertMatch({{_, 204, _}, _, _}, Res0), ok. -t_action(Config) -> +t_action(matrix) -> + [ + [plain, async], + [plain, sync], + [tls, async] + ]; +t_action(Config) when is_list(Config) -> + QueryMode = + case group_path(Config) of + [_, QM | _] -> atom_to_binary(QM); + _ -> <<"async">> + end, Name = atom_to_binary(?FUNCTION_NAME), - create_action(Name, Config), + create_action(Name, [{query_mode, QueryMode} | Config]), Actions = emqx_bridge_v2:list(actions), Any = fun(#{name := BName}) -> BName =:= Name end, ?assert(lists:any(Any, Actions), Actions), @@ -465,7 +472,9 @@ t_action(Config) -> %% Tests that deleting/disabling an action that share the same Pulsar topic with other %% actions do not disturb the latter. -t_multiple_actions_sharing_topic(Config) -> +t_multiple_actions_sharing_topic(matrix) -> + [[plain], [tls]]; +t_multiple_actions_sharing_topic(Config) when is_list(Config) -> Type = ?TYPE, ConnectorName = <<"c">>, ConnectorConfig = pulsar_connector(Config), @@ -546,3 +555,31 @@ t_multiple_actions_sharing_topic(Config) -> [] ), ok. + +t_sync_query_down(matrix) -> + [[plain]]; +t_sync_query_down(Config0) when is_list(Config0) -> + ct:timetrap({seconds, 15}), + Payload = #{<<"x">> => <<"some data">>}, + PayloadBin = emqx_utils_json:encode(Payload), + ClientId = <<"some_client">>, + Opts = #{ + make_message_fn => fun(Topic) -> emqx_message:make(ClientId, Topic, PayloadBin) end, + enter_tp_filter => + ?match_event(#{?snk_kind := "pulsar_producer_send"}), + error_tp_filter => + ?match_event(#{?snk_kind := "resource_simple_sync_internal_buffer_query_timeout"}), + success_tp_filter => + ?match_event(#{?snk_kind := pulsar_echo_consumer_message}) + }, + Config = [ + {connector_type, ?TYPE}, + {connector_name, ?FUNCTION_NAME}, + {connector_config, pulsar_connector(Config0)}, + {action_type, ?TYPE}, + {action_name, ?FUNCTION_NAME}, + {action_config, pulsar_action(Config0)} + | proplists_with([proxy_name, proxy_host, proxy_port], Config0) + ], + emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts), + ok. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index c178b1f5e..1ff42961b 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {mod, {emqx_bridge_rabbitmq_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index da9cd1a96..49d6a5985 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_s3, [ {description, "EMQX Enterprise S3 Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 009a8d16b..4060c926e 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, [ diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src index cd1d51b01..59276e32e 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_syskeeper, [ {description, "EMQX Enterprise Data bridge for Syskeeper"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index dc406b735..80b504699 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib]}, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index ea9e582ae..473555e6c 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 8cb61793d..cdbba4f95 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -125,6 +125,7 @@ create(Type, Name, Conf0, Opts) -> TypeBin = bin(Type), ResourceId = resource_id(Type, Name), Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, + _ = emqx_alarm:ensure_deactivated(ResourceId), {ok, _Data} = emqx_resource:create_local( ResourceId, ?CONNECTOR_RESOURCE_GROUP, @@ -132,7 +133,6 @@ create(Type, Name, Conf0, Opts) -> parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) ), - _ = emqx_alarm:ensure_deactivated(ResourceId), ok. update(ConnectorId, {OldConf, Conf}) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard.app.src b/apps/emqx_dashboard/src/emqx_dashboard.app.src index 2835feb34..3a595629e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.app.src +++ b/apps/emqx_dashboard/src/emqx_dashboard.app.src @@ -2,7 +2,7 @@ {application, emqx_dashboard, [ {description, "EMQX Web Dashboard"}, % strict semver, bump manually! - {vsn, "5.1.3"}, + {vsn, "5.1.4"}, {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [ diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src index 95d49a150..f3d11d445 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src @@ -1,6 +1,6 @@ {application, emqx_dashboard_sso, [ {description, "EMQX Dashboard Single Sign-On"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, [emqx_dashboard_sso_sup]}, {applications, [ kernel, diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index e9c1f2b4a..54bd68562 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.9"}, + {vsn, "0.1.10"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index 1d5cb85b8..230692c94 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.12"}, + {vsn, "0.1.13"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src index f96d112e9..2e5eece56 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_gbt32960, [ {description, "GBT32960 Gateway"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src index 8d1e33f74..94a6dc767 100644 --- a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_jt808, [ {description, "JT/T 808 Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 1dc3f6939..f3a39c5a1 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index eb471458f..09d3f1847 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl, redbug]}, diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index c22793cf0..aca888aa6 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.2.3"}, + {vsn, "5.2.4"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_management/src/emqx_mgmt_app.erl b/apps/emqx_management/src/emqx_mgmt_app.erl index f8f0b6f5b..e8febb7ec 100644 --- a/apps/emqx_management/src/emqx_mgmt_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_app.erl @@ -29,13 +29,9 @@ start(_Type, _Args) -> ok = mria:wait_for_tables(emqx_mgmt_auth:create_tables()), - case emqx_mgmt_auth:init_bootstrap_file() of - ok -> - emqx_conf:add_handler([api_key], emqx_mgmt_auth), - emqx_mgmt_sup:start_link(); - {error, Reason} -> - {error, Reason} - end. + emqx_mgmt_auth:try_init_bootstrap_file(), + emqx_conf:add_handler([api_key], emqx_mgmt_auth), + emqx_mgmt_sup:start_link(). stop(_State) -> emqx_conf:remove_handler([api_key]), diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index d822eb788..e64caad57 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -32,7 +32,7 @@ update/5, delete/1, list/0, - init_bootstrap_file/0, + try_init_bootstrap_file/0, format/1 ]). @@ -52,6 +52,7 @@ -ifdef(TEST). -export([create/7]). -export([trans/2, force_create_app/1]). +-export([init_bootstrap_file/1]). -endif. -define(APP, emqx_app). @@ -114,11 +115,12 @@ post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) -> end, ok. --spec init_bootstrap_file() -> ok | {error, _}. -init_bootstrap_file() -> +-spec try_init_bootstrap_file() -> ok | {error, _}. +try_init_bootstrap_file() -> File = bootstrap_file(), ?SLOG(debug, #{msg => "init_bootstrap_api_keys_from_file", file => File}), - init_bootstrap_file(File). + _ = init_bootstrap_file(File), + ok. create(Name, Enable, ExpiredAt, Desc, Role) -> ApiKey = generate_unique_api_key(Name), @@ -357,10 +359,6 @@ init_bootstrap_file(File) -> init_bootstrap_file(File, Dev, MP); {error, Reason0} -> Reason = emqx_utils:explain_posix(Reason0), - FmtReason = emqx_utils:format( - "load API bootstrap file failed, file:~ts, reason:~ts", - [File, Reason] - ), ?SLOG( error, @@ -371,7 +369,7 @@ init_bootstrap_file(File) -> } ), - {error, FmtReason} + {error, Reason} end. init_bootstrap_file(File, Dev, MP) -> diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 2742167f5..035af188f 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/logger.hrl"). -define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}). +-define(EXCLUSIVE_TAB, emqx_exclusive_subscription). -export([load/0]). @@ -45,7 +46,8 @@ olp/1, data/1, ds/1, - cluster_info/0 + cluster_info/0, + exclusive/1 ]). -spec load() -> ok. @@ -1024,7 +1026,9 @@ print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) -> NL = maps:get(nl, Options, 0), RH = maps:get(rh, Options, 0), RAP = maps:get(rap, Options, 0), - emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]). + emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]); +print({exclusive, {exclusive_subscription, Topic, ClientId}}) -> + emqx_ctl:print("topic:~ts -> ClientId:~ts~n", [Topic, ClientId]). format(_, undefined) -> undefined; @@ -1085,3 +1089,19 @@ safe_call_mria(Fun, Args, OnFail) -> }), OnFail end. +%%-------------------------------------------------------------------- +%% @doc Exclusive topics +exclusive(["list"]) -> + case ets:info(?EXCLUSIVE_TAB, size) of + 0 -> emqx_ctl:print("No topics.~n"); + _ -> dump(?EXCLUSIVE_TAB, exclusive) + end; +exclusive(["delete", Topic0]) -> + Topic = erlang:iolist_to_binary(Topic0), + emqx_exclusive_subscription:unsubscribe(Topic, #{is_exclusive => true}), + emqx_ctl:print("ok~n"); +exclusive(_) -> + emqx_ctl:usage([ + {"exclusive list", "List all exclusive topics"}, + {"exclusive delete ", "Delete an exclusive topic"} + ]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl index 177216b42..f2856fe63 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl @@ -100,7 +100,7 @@ t_bootstrap_file(_) -> BadBin = <<"test-1:secret-11\ntest-2 secret-12">>, ok = file:write_file(File, BadBin), update_file(File), - ?assertMatch({error, #{reason := "invalid_format"}}, emqx_mgmt_auth:init_bootstrap_file()), + ?assertMatch({error, #{reason := "invalid_format"}}, emqx_mgmt_auth:init_bootstrap_file(File)), ?assertEqual(ok, auth_authorize(TestPath, <<"test-1">>, <<"secret-11">>)), ?assertMatch({error, _}, auth_authorize(TestPath, <<"test-2">>, <<"secret-12">>)), update_file(<<>>), @@ -123,7 +123,7 @@ t_bootstrap_file_override(_) -> ok = file:write_file(File, Bin), update_file(File), - ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file()), + ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file(File)), MatchFun = fun(ApiKey) -> mnesia:match_object(#?APP{api_key = ApiKey, _ = '_'}) end, ?assertMatch( @@ -156,7 +156,7 @@ t_bootstrap_file_dup_override(_) -> File = "./bootstrap_api_keys.txt", ok = file:write_file(File, Bin), update_file(File), - ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file()), + ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file(File)), SameAppWithDiffName = #?APP{ name = <<"name-1">>, @@ -190,7 +190,7 @@ t_bootstrap_file_dup_override(_) -> %% Similar to loading bootstrap file at node startup %% the duplicated apikey in mnesia will be cleaned up - ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file()), + ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file(File)), ?assertMatch( {ok, [ #?APP{ diff --git a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl index f85fdbe5b..16871c129 100644 --- a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl @@ -360,4 +360,9 @@ t_autocluster_leave(Config) -> ) ). +t_exclusive(_Config) -> + emqx_ctl:run_command(["exclusive", "list"]), + emqx_ctl:run_command(["exclusive", "delete", "t/1"]), + ok. + format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]). diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index 19caf6763..a25bcd7ca 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_modules, [ {description, "EMQX Modules"}, - {vsn, "5.0.27"}, + {vsn, "5.0.28"}, {modules, []}, {applications, [kernel, stdlib, emqx, emqx_ctl, observer_cli]}, {mod, {emqx_modules_app, []}}, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src index c6cfae12b..dfdcf0fcd 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src @@ -1,6 +1,6 @@ {application, emqx_node_rebalance, [ {description, "EMQX Node Rebalance"}, - {vsn, "5.0.9"}, + {vsn, "5.0.10"}, {registered, [ emqx_node_rebalance_sup, emqx_node_rebalance, diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index 9c4cec6f9..7ee2eb540 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugins, [ {description, "EMQX Plugin Management"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {modules, []}, {mod, {emqx_plugins_app, []}}, {applications, [kernel, stdlib, emqx, erlavro]}, diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 8ae211b55..13459136c 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -1049,19 +1049,22 @@ do_load_plugin_app(AppName, Ebin) -> end. start_app(App) -> - case application:ensure_all_started(App) of - {ok, Started} -> + case run_with_timeout(application, ensure_all_started, [App], 10_000) of + {ok, {ok, Started}} -> case Started =/= [] of true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started}); false -> ok - end, - ?SLOG(debug, #{msg => "started_plugin_app", app => App}), - ok; - {error, {ErrApp, Reason}} -> + end; + {ok, {error, Reason}} -> + throw(#{ + msg => "failed_to_start_app", + app => App, + reason => Reason + }); + {error, Reason} -> throw(#{ msg => "failed_to_start_plugin_app", app => App, - err_app => ErrApp, reason => Reason }) end. @@ -1586,3 +1589,20 @@ bin(B) when is_binary(B) -> B. wrap_to_list(Path) -> binary_to_list(iolist_to_binary(Path)). + +run_with_timeout(Module, Function, Args, Timeout) -> + Self = self(), + Fun = fun() -> + Result = apply(Module, Function, Args), + Self ! {self(), Result} + end, + Pid = spawn(Fun), + TimerRef = erlang:send_after(Timeout, self(), {timeout, Pid}), + receive + {Pid, Result} -> + _ = erlang:cancel_timer(TimerRef), + {ok, Result}; + {timeout, Pid} -> + exit(Pid, kill), + {error, timeout} + end. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.app.src b/apps/emqx_prometheus/src/emqx_prometheus.app.src index e5bb770cd..3e525657c 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.app.src +++ b/apps/emqx_prometheus/src/emqx_prometheus.app.src @@ -2,7 +2,7 @@ {application, emqx_prometheus, [ {description, "Prometheus for EMQX"}, % strict semver, bump manually! - {vsn, "5.2.3"}, + {vsn, "5.2.4"}, {modules, []}, {registered, [emqx_prometheus_sup]}, {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]}, diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 6e35949a9..54418adcd 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.32"}, + {vsn, "0.1.33"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 9c1b398ff..e37917215 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -198,6 +198,9 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1), case simple_async_query(Id, Request, QueryOpts) of {error, _} = Error -> + ?tp("resource_simple_sync_internal_buffer_query_error", #{ + id => Id, request => Request + }), Error; {async_return, {error, _} = Error} -> Error; @@ -210,7 +213,11 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> receive {ReplyAlias, Response} -> Response - after 0 -> {error, timeout} + after 0 -> + ?tp("resource_simple_sync_internal_buffer_query_timeout", #{ + id => Id, request => Request + }), + {error, timeout} end end end @@ -1324,6 +1331,7 @@ do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Res ?tp(simple_query_override, #{query_mode => ReqQM}), #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), + ?tp(simple_query_enter, #{}), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer @@ -1331,6 +1339,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Res %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), + ?tp(simple_query_enter, #{}), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) -> %% when calling from the buffer worker or other simple queries, @@ -2327,6 +2336,7 @@ reply_call(Alias, Response) -> %% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to' %% callbacks. reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) -> + ?tp("reply_call_internal_buffer", #{}), ?MODULE:reply_call(ReplyAlias, Response), do_reply_caller(MaybeReplyTo, Response). diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index b25042c20..289b167fd 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -34,7 +34,7 @@ type :: serde_type(), eval_context :: term(), %% for future use - extra = [] + extra = #{} }). -type serde() :: #serde{}. diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.erl b/apps/emqx_schema_registry/src/emqx_schema_registry.erl index f8d760ddc..3919cd7be 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.erl @@ -148,14 +148,19 @@ post_config_update( post_config_update( [?CONF_KEY_ROOT, schemas, NewName], _Cmd, - NewSchemas, - %% undefined or OldSchemas - _, + NewSchema, + OldSchema, _AppEnvs ) -> - case build_serdes([{NewName, NewSchemas}]) of + case OldSchema of + undefined -> + ok; + _ -> + ensure_serde_absent(NewName) + end, + case build_serdes([{NewName, NewSchema}]) of ok -> - {ok, #{NewName => NewSchemas}}; + {ok, #{NewName => NewSchema}}; {error, Reason, SerdesToRollback} -> lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), {error, Reason} @@ -176,6 +181,7 @@ post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, Old async_delete_serdes(RemovedNames) end, SchemasToBuild = maps:to_list(maps:merge(Changed, Added)), + ok = lists:foreach(fun ensure_serde_absent/1, [N || {N, _} <- SchemasToBuild]), case build_serdes(SchemasToBuild) of ok -> {ok, NewConf}; diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 0e661c356..b5944a68e 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -48,6 +48,10 @@ -type eval_context() :: term(). +-type fingerprint() :: binary(). + +-type protobuf_cache_key() :: {schema_name(), fingerprint()}. + -export_type([serde_type/0]). %%------------------------------------------------------------------------------ @@ -175,11 +179,12 @@ make_serde(avro, Name, Source) -> eval_context = Store }; make_serde(protobuf, Name, Source) -> - SerdeMod = make_protobuf_serde_mod(Name, Source), + {CacheKey, SerdeMod} = make_protobuf_serde_mod(Name, Source), #serde{ name = Name, type = protobuf, - eval_context = SerdeMod + eval_context = SerdeMod, + extra = #{cache_key => CacheKey} }; make_serde(json, Name, Source) -> case json_decode(Source) of @@ -254,8 +259,9 @@ eval_encode(#serde{type = json, name = Name}, [Map]) -> destroy(#serde{type = avro, name = _Name}) -> ?tp(serde_destroyed, #{type => avro, name => _Name}), ok; -destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) -> +destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod} = Serde) -> unload_code(SerdeMod), + destroy_protobuf_code(Serde), ?tp(serde_destroyed, #{type => protobuf, name => _Name}), ok; destroy(#serde{type = json, name = Name}) -> @@ -282,13 +288,14 @@ jesse_validate(Name, Map) -> jesse_name(Str) -> unicode:characters_to_list(Str). --spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). +-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> {protobuf_cache_key(), module()}. make_protobuf_serde_mod(Name, Source) -> {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> load_code(SerdeMod, SerdeModFileName, ModBinary), - SerdeMod; + CacheKey = protobuf_cache_key(Name, Source), + {CacheKey, SerdeMod}; {error, #{error := Error, warnings := Warnings}} -> ?SLOG( warning, @@ -310,6 +317,13 @@ protobuf_serde_mod_name(Name) -> SerdeModFileName = SerdeModName ++ ".memory", {SerdeMod, SerdeModFileName}. +%% Fixme: we cannot uncomment the following typespec because Dialyzer complains that +%% `Source' should be `string()' due to `gpb_compile:string/3', but it does work fine with +%% binaries... +%% -spec protobuf_cache_key(schema_name(), schema_source()) -> {schema_name(), fingerprint()}. +protobuf_cache_key(Name, Source) -> + {Name, erlang:md5(Source)}. + -spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. lazy_generate_protobuf_code(Name, SerdeMod0, Source) -> @@ -326,9 +340,9 @@ lazy_generate_protobuf_code(Name, SerdeMod0, Source) -> -spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> - Fingerprint = erlang:md5(Source), - _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write), - case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of + CacheKey = protobuf_cache_key(Name, Source), + _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, CacheKey}, write), + case mnesia:read(?PROTOBUF_CACHE_TAB, CacheKey) of [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] -> ?tp(schema_registry_protobuf_cache_hit, #{name => Name}), {ok, SerdeMod, ModBinary}; @@ -337,7 +351,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> case generate_protobuf_code(SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> CacheEntry = #protobuf_cache{ - fingerprint = Fingerprint, + fingerprint = CacheKey, module = SerdeMod, module_binary = ModBinary }, @@ -345,7 +359,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> {ok, SerdeMod, ModBinary}; {ok, SerdeMod, ModBinary, _Warnings} -> CacheEntry = #protobuf_cache{ - fingerprint = Fingerprint, + fingerprint = CacheKey, module = SerdeMod, module_binary = ModBinary }, @@ -390,6 +404,21 @@ unload_code(SerdeMod) -> _ = code:delete(SerdeMod), ok. +-spec destroy_protobuf_code(serde()) -> ok. +destroy_protobuf_code(Serde) -> + #serde{extra = #{cache_key := CacheKey}} = Serde, + {atomic, Res} = mria:transaction( + ?SCHEMA_REGISTRY_SHARD, + fun destroy_protobuf_code_trans/1, + [CacheKey] + ), + ?tp("schema_registry_protobuf_cache_destroyed", #{name => Serde#serde.name}), + Res. + +-spec destroy_protobuf_code_trans({schema_name(), fingerprint()}) -> ok. +destroy_protobuf_code_trans(CacheKey) -> + mnesia:delete(?PROTOBUF_CACHE_TAB, CacheKey, write). + -spec has_inner_type(serde_type(), eval_context(), [binary()]) -> boolean(). has_inner_type(protobuf, _SerdeMod, [_, _ | _]) -> diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl index bdc083736..685e152e6 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl @@ -207,6 +207,66 @@ t_protobuf_invalid_schema(_Config) -> ), ok. +%% Checks that we unload code and clear code generation cache after destroying a protobuf +%% serde. +t_destroy_protobuf(_Config) -> + SerdeName = ?FUNCTION_NAME, + SerdeNameBin = atom_to_binary(SerdeName), + ?check_trace( + #{timetrap => 5_000}, + begin + Params = schema_params(protobuf), + ok = emqx_schema_registry:add_schema(SerdeName, Params), + {ok, {ok, _}} = + ?wait_async_action( + emqx_schema_registry:delete_schema(SerdeName), + #{?snk_kind := serde_destroyed, name := SerdeNameBin} + ), + %% Create again to check we don't hit the cache. + ok = emqx_schema_registry:add_schema(SerdeName, Params), + {ok, {ok, _}} = + ?wait_async_action( + emqx_schema_registry:delete_schema(SerdeName), + #{?snk_kind := serde_destroyed, name := SerdeNameBin} + ), + ok + end, + fun(Trace) -> + ?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)), + ?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)), + ok + end + ), + ok. + +%% Checks that we don't leave entries lingering in the protobuf code cache table when +%% updating the source of a serde. +t_update_protobuf_cache(_Config) -> + SerdeName = ?FUNCTION_NAME, + ?check_trace( + #{timetrap => 5_000}, + begin + #{source := Source0} = Params0 = schema_params(protobuf), + ok = emqx_schema_registry:add_schema(SerdeName, Params0), + %% Now we touch the source so protobuf needs to be recompiled. + Source1 = <>, + Params1 = Params0#{source := Source1}, + {ok, {ok, _}} = + ?wait_async_action( + emqx_schema_registry:add_schema(SerdeName, Params1), + #{?snk_kind := "schema_registry_protobuf_cache_destroyed"} + ), + ok + end, + fun(Trace) -> + ?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)), + ?assertMatch([_, _ | _], ?of_kind(schema_registry_protobuf_cache_miss, Trace)), + ?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)), + ok + end + ), + ok. + t_json_invalid_schema(_Config) -> SerdeName = invalid_json, Params = schema_params(json), diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index b2ec221e3..9c3f5eed4 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.2.3"}, + {vsn, "5.2.4"}, {modules, [ emqx_utils, emqx_utils_api, diff --git a/changes/ce/feat-13524.en.md b/changes/ce/feat-13524.en.md new file mode 100644 index 000000000..efe21104f --- /dev/null +++ b/changes/ce/feat-13524.en.md @@ -0,0 +1 @@ +Added CLI interface `emqx ctl exclusive` for the feature exclusive topics. diff --git a/changes/ce/feat-13534.en.md b/changes/ce/feat-13534.en.md new file mode 100644 index 000000000..5c5af0bf5 --- /dev/null +++ b/changes/ce/feat-13534.en.md @@ -0,0 +1 @@ +Add trace logging when superuser skipped authz check. diff --git a/changes/ce/fix-13357.en.md b/changes/ce/fix-13357.en.md new file mode 100644 index 000000000..ea497a847 --- /dev/null +++ b/changes/ce/fix-13357.en.md @@ -0,0 +1,4 @@ +Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets. + +- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected. +- Otherwise **DONOT** send any CONNACK or DISCONNECT packet. diff --git a/changes/ce/fix-13425.en.md b/changes/ce/fix-13425.en.md new file mode 100644 index 000000000..e02e99c0a --- /dev/null +++ b/changes/ce/fix-13425.en.md @@ -0,0 +1 @@ +The MQTT connector error log messages have been improved to provide clearer and more detailed information. diff --git a/changes/ce/fix-13541.en.md b/changes/ce/fix-13541.en.md new file mode 100644 index 000000000..48013cc19 --- /dev/null +++ b/changes/ce/fix-13541.en.md @@ -0,0 +1 @@ +Previously, if CRL checks were ever enabled for a listener, later disabling them via the configuration would not actually disable them until the listener restarted. This has been fixed. diff --git a/changes/ce/fix-13552.en.md b/changes/ce/fix-13552.en.md new file mode 100644 index 000000000..5af1b4140 --- /dev/null +++ b/changes/ce/fix-13552.en.md @@ -0,0 +1,8 @@ +Add a startup timeout limit for the plug-in application. Currently the timeout is 10 seconds. + +Starting a bad plugin while EMQX is running will result in a thrown runtime error. +When EMQX is closed and restarted, the main starting process may hang due to the the plugin application to start failures. + +Maybe restarting with modified: +- Modifed config file: make the bad plugin enabled. +- Add a plugin with bad plugin config. diff --git a/changes/ee/feat-13546.en.md b/changes/ee/feat-13546.en.md new file mode 100644 index 000000000..c403409ac --- /dev/null +++ b/changes/ee/feat-13546.en.md @@ -0,0 +1 @@ +Added the option to configure the query mode for Pulsar Producer action. diff --git a/changes/ee/fix-13543.en.md b/changes/ee/fix-13543.en.md new file mode 100644 index 000000000..f9f56a5a6 --- /dev/null +++ b/changes/ee/fix-13543.en.md @@ -0,0 +1 @@ +Fixed an issue where the internal cache for Protobuf schemas in Schema Registry was not properly cleaned up after deleting or updating a schema. diff --git a/changes/v5.7.2.en.md b/changes/v5.7.2.en.md new file mode 100644 index 000000000..fc2d3ae87 --- /dev/null +++ b/changes/v5.7.2.en.md @@ -0,0 +1,87 @@ +## 5.7.2 + +*Release Date: 2024-08-06* + +### Enhancements + +- [#13317](https://github.com/emqx/emqx/pull/13317) Added a new per-authorization source metric type: `ignore`. This metric increments when an authorization source attempts to authorize a request but encounters scenarios where the authorization is not applicable or encounters an error, resulting in an undecidable outcome. + +- [#13336](https://github.com/emqx/emqx/pull/13336) Added functionality to initialize authentication data in the built-in database of an empty EMQX node or cluster using a bootstrap file in CSV or JSON format. This feature introduces new configuration entries, `bootstrap_file` and `bootstrap_type`. + +- [#13348](https://github.com/emqx/emqx/pull/13348) Added a new field `payload_encode` in the log configuration to determine the format of the payload in the log data. + +- [#13436](https://github.com/emqx/emqx/pull/13436) Added the option to add custom request headers to JWKS requests. + +- [#13507](https://github.com/emqx/emqx/pull/13507) Introduced a new built-in function `getenv` in the rule engine and variform expression to facilitate access to environment variables. This function adheres to the following constraints: + + - Prefix `EMQXVAR_` is added before reading from OS environment variables. For example, `getenv('FOO_BAR')` is to read `EMQXVAR_FOO_BAR`. + - These values are immutable once loaded from the OS environment. + +- [#13521](https://github.com/emqx/emqx/pull/13521) Resolved an issue where LDAP query timeouts could cause the underlying connection to become unusable, potentially causing subsequent queries to return outdated results. The fix ensures the system reconnects automatically in case of a timeout. + +- [#13528](https://github.com/emqx/emqx/pull/13528) Applied log throttling for the event of unrecoverable errors in data integrations. + +- [#13548](https://github.com/emqx/emqx/pull/13548) EMQX now can optionally invoke the `on_config_changed/2` callback function when the plugin configuration is updated via the REST API. This callback function is assumed to be exported by the `_app` module. + For example, if the plugin name and version are `my_plugin-1.0.0`, then the callback function is assumed to be `my_plugin_app:on_config_changed/2`. + +- [#13386](https://github.com/emqx/emqx/pull/13386) Added support for initializing a list of banned clients on an empty EMQX node or cluster with a bootstrap file in CSV format. The corresponding config entry to specify the file path is `banned.bootstrap_file`. This file is a CSV file with `,` as its delimiter. The first line of this file must be a header line. All valid headers are listed here: + + - as :: required + - who :: required + - by :: optional + - reason :: optional + - at :: optional + - until :: optional + + See the [Configuration Manual](https://docs.emqx.com/en/enterprise/v@EE_VERSION@/hocon/) for details on each field. + + Each row in the rest of this file must contain the same number of columns as the header line, and the column can be omitted then its value is `undefined`. + +### Bug Fixes + +- [#13222](https://github.com/emqx/emqx/pull/13222) Resolved issues with flags checking and error handling associated with the Will message in the `CONNECT` packet. + For detailed specifications, refer to: + + - MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] + - MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] + - MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] + +- [#13307](https://github.com/emqx/emqx/pull/13307) Updated `ekka` library to version 0.19.5. This version of `ekka` utilizes `mria` 0.8.8, enhancing auto-heal functionality. Previously, the auto-heal worked only when all core nodes were reachable. This update allows to apply auto-heal once the majority of core nodes are alive. For details, refer to the [Mria PR](https://github.com/emqx/mria/pull/180). + +- [#13334](https://github.com/emqx/emqx/pull/13334) Implemented strict mode checking for the `PasswordFlag` in the MQTT v3.1.1 CONNECT packet to align with protocol specifications. + + Note: To ensure bug-to-bug compatibility, this check is performed only in strict mode. + +- [#13344](https://github.com/emqx/emqx/pull/13344) Resolved an issue where the `POST /clients/:clientid/subscribe/bulk` API would not function correctly if the node receiving the API request did not maintain the connection to the specified `clientid`. + +- [#13358](https://github.com/emqx/emqx/pull/13358) Fixed an issue when the `reason` in the `authn_complete_event` event was incorrectly displayed. +- [#13375](https://github.com/emqx/emqx/pull/13375) The value `infinity` has been added as default value to the listener configuration fields `max_conn_rate`, `messages_rate`, and `bytes_rate`. + +- [#13382](https://github.com/emqx/emqx/pull/13382) Updated the `emqtt` library to version 0.4.14, which resolves an issue preventing `emqtt_pool`s from reusing pools that are in an inconsistent state. + +- [#13389](https://github.com/emqx/emqx/pull/13389) Fixed an issue where the `Derived Key Length` for `pbkdf2` could be set to a negative integer. + +- [#13389](https://github.com/emqx/emqx/pull/13389) Fixed an issue where topics in the authorization rules might be parsed incorrectly. + +- [#13393](https://github.com/emqx/emqx/pull/13393) Fixed an issue where plugin applications failed to restart after a node joined a cluster, resulting in hooks not being properly installed and causing inconsistent states. + +- [#13398](https://github.com/emqx/emqx/pull/13398) Fixed an issue where ACL rules were incorrectly cleared when reloading the built-in database for authorization using the command line. + +- [#13403](https://github.com/emqx/emqx/pull/13403) Addressed a security issue where environment variable configuration overrides were inadvertently logging passwords. This fix ensures that passwords present in environment variables are not logged. + +- [#13408](https://github.com/emqx/emqx/pull/13408) Resolved a `function_clause` crash triggered by authentication attempts with invalid salt or password types. This fix enhances error handling to better manage authentication failures involving incorrect salt or password types. + +- [#13419](https://github.com/emqx/emqx/pull/13419) Resolved an issue where crash log messages from the `/configs` API were displaying garbled hints. This fix ensures that log messages related to API calls are clear and understandable. + +- [#13422](https://github.com/emqx/emqx/pull/13422) Fixed an issue where the option `force_shutdown.max_heap_size` could not be set to 0 to disable this tuning. + +- [#13442](https://github.com/emqx/emqx/pull/13442) Fixed an issue where the health check interval configuration for actions/sources was not being respected. Previously, EMQX ignored the specified health check interval for actions and used the connector's interval instead. The fix ensures that EMQX now correctly uses the health check interval configured for actions/sources, allowing for independent and accurate health monitoring frequencies. + +- [#13503](https://github.com/emqx/emqx/pull/13503) Fixed an issue where connectors did not adhere to the configured health check interval upon initial startup, requiring an update or restart to apply the correct interval. + +- [#13515](https://github.com/emqx/emqx/pull/13515) Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason. + +- [#13527](https://github.com/emqx/emqx/pull/13527) Fixed an issue in the Rule Engine where executing a SQL test for the Message Publish event would consistently return no results when a `$bridges/...` source was included in the `FROM` clause. + +- [#13541](https://github.com/emqx/emqx/pull/13541) Fixed an issue where disabling CRL checks for a listener required a listener restart to take effect. +- [#13552](https://github.com/emqx/emqx/pull/13552) Added a startup timeout limit for EMQX plugins with a default timeout of 10 seconds. Before this update, problematic plugins could cause runtime errors during startup, leading to potential issues where the main startup process might hang when EMQX is stopped and restarted. diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index cd795d4f4..3c25b9ebc 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.7.1 +version: 5.8.0-alpha.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.7.1 +appVersion: 5.8.0-alpha.1 diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index d31648f2f..49a3f7c84 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.7.1 +version: 5.8.0-alpha.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.7.1 +appVersion: 5.8.0-alpha.1 diff --git a/mix.exs b/mix.exs index 96bb32632..d292e0069 100644 --- a/mix.exs +++ b/mix.exs @@ -182,7 +182,7 @@ defmodule EMQXUmbrella.MixProject do end def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true} - def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true} + def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.12.0", override: true} def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true} def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.2", override: true} def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true} diff --git a/rebar.config b/rebar.config index 68b2df1d7..ad70d128d 100644 --- a/rebar.config +++ b/rebar.config @@ -82,7 +82,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, diff --git a/scripts/rel/cut.sh b/scripts/rel/cut.sh index 1affd48bf..8c8899b91 100755 --- a/scripts/rel/cut.sh +++ b/scripts/rel/cut.sh @@ -135,6 +135,12 @@ rel_branch() { e5.7.*) echo 'release-57' ;; + v5.8.*) + echo 'release-58' + ;; + e5.8.*) + echo 'release-58' + ;; *) logerr "Unsupported version tag $TAG" exit 1 diff --git a/scripts/rel/sync-remotes.sh b/scripts/rel/sync-remotes.sh index 430021a79..c986535ce 100755 --- a/scripts/rel/sync-remotes.sh +++ b/scripts/rel/sync-remotes.sh @@ -5,7 +5,7 @@ set -euo pipefail # ensure dir cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.." -BASE_BRANCHES=( 'release-57' 'release-56' 'release-55' 'master' ) +BASE_BRANCHES=( 'release-58' 'release-57' 'release-56' 'release-55' 'master' ) usage() { cat <