diff --git a/.ci/fvt_tests/relup.lux b/.ci/fvt_tests/relup.lux index cbecb9e14..bd37d241d 100644 --- a/.ci/fvt_tests/relup.lux +++ b/.ci/fvt_tests/relup.lux @@ -72,7 +72,7 @@ [shell bench] !cd $BENCH_PATH - !./emqtt_bench pub -c 10 -I 1000 -t t/%i -s 64 -L 600 + !./emqtt_bench pub -c 10 -I 1000 -t t/%i -s 64 -L 300 ???sent [shell emqx] @@ -109,7 +109,7 @@ ???publish complete ??SH-PROMPT: !curl http://127.0.0.1:8080/counter - ???{"data":600,"code":0} + ???{"data":300,"code":0} ?SH-PROMPT [shell emqx2] diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 70419bcc1..f5fe3fe5c 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -95,6 +95,10 @@ jobs: if (Test-Path rebar.lock) { Remove-Item -Force -Path rebar.lock } + make ensure-rebar3 + copy rebar3 "${{ steps.install_erlang.outputs.erlpath }}\bin" + ls "${{ steps.install_erlang.outputs.erlpath }}\bin" + rebar3 --help make ${{ matrix.profile }} mkdir -p _packages/${{ matrix.profile }} Compress-Archive -Path _build/${{ matrix.profile }}/rel/emqx -DestinationPath _build/${{ matrix.profile }}/rel/$pkg_name @@ -155,6 +159,8 @@ jobs: - name: build run: | . $HOME/.kerl/${{ matrix.erl_otp }}/activate + make -C source ensure-rebar3 + sudo cp source/rebar3 /usr/local/bin/rebar3 make -C source ${{ matrix.profile }}-zip - name: test run: | diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 573b84e1b..08a35299c 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -53,7 +53,7 @@ jobs: strategy: matrix: erl_otp: - - 23.2.7.2 + - 23.2.7.2-emqx-2 steps: - uses: actions/checkout@v1 @@ -82,11 +82,14 @@ jobs: if: steps.cache.outputs.cache-hit != 'true' timeout-minutes: 60 run: | + export OTP_GITHUB_URL="https://github.com/emqx/otp" kerl build ${{ matrix.erl_otp }} kerl install ${{ matrix.erl_otp }} $HOME/.kerl/${{ matrix.erl_otp }} - name: build run: | . $HOME/.kerl/${{ matrix.erl_otp }}/activate + make ensure-rebar3 + sudo cp rebar3 /usr/local/bin/rebar3 make ${EMQX_NAME}-zip - name: test run: | diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 07afdcde2..d5a4a021e 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -130,7 +130,7 @@ jobs: docker exec --env-file .env -i erlang bash -c "make coveralls" - name: cat rebar.crashdump if: failure() - run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump' fi + run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi - uses: actions/upload-artifact@v1 if: failure() with: diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 945abcdfc..83ce7a759 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mqtt, [{description, "EMQ X Bridge to MQTT Broker"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,replayq,emqtt]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index 0c7b8ebf3..03e6119ae 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -2,9 +2,15 @@ {VSN, [ + {"4.3.0", [ + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ], [ + {"4.3.0", [ + {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ] }. diff --git a/apps/emqx_coap/test/emqx_coap_SUITE.erl b/apps/emqx_coap/test/emqx_coap_SUITE.erl index 444bcc064..440b80ebd 100644 --- a/apps/emqx_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_coap/test/emqx_coap_SUITE.erl @@ -120,7 +120,7 @@ t_observe_acl_deny(_Config) -> ok = meck:unload(emqx_access_control). t_observe_wildcard(_Config) -> - Topic = <<"+/b">>, TopicStr = http_uri:encode(binary_to_list(Topic)), + Topic = <<"+/b">>, TopicStr = emqx_http_lib:uri_encode(binary_to_list(Topic)), Payload = <<"123">>, Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret", {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri), @@ -143,7 +143,7 @@ t_observe_wildcard(_Config) -> [] = emqx:subscribers(Topic). t_observe_pub(_Config) -> - Topic = <<"+/b">>, TopicStr = http_uri:encode(binary_to_list(Topic)), + Topic = <<"+/b">>, TopicStr = emqx_http_lib:uri_encode(binary_to_list(Topic)), Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret", {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri), ?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]), @@ -152,7 +152,7 @@ t_observe_pub(_Config) -> ?assert(is_pid(SubPid)), Topic2 = <<"a/b">>, Payload2 = <<"UFO">>, - TopicStr2 = http_uri:encode(binary_to_list(Topic2)), + TopicStr2 = emqx_http_lib:uri_encode(binary_to_list(Topic2)), URI2 = "coap://127.0.0.1/mqtt/"++TopicStr2++"?c=client1&u=tom&p=secret", Reply2 = er_coap_client:request(put, URI2, #coap_content{format = <<"application/octet-stream">>, payload = Payload2}), @@ -164,7 +164,7 @@ t_observe_pub(_Config) -> ?assertEqual(Payload2, PayloadRecv2), Topic3 = <<"j/b">>, Payload3 = <<"ET629">>, - TopicStr3 = http_uri:encode(binary_to_list(Topic3)), + TopicStr3 = emqx_http_lib:uri_encode(binary_to_list(Topic3)), URI3 = "coap://127.0.0.1/mqtt/"++TopicStr3++"?c=client2&u=mike&p=guess", Reply3 = er_coap_client:request(put, URI3, #coap_content{format = <<"application/octet-stream">>, payload = Payload3}), {ok,changed, _} = Reply3, @@ -186,7 +186,7 @@ t_one_clientid_sub_2_topics(_Config) -> [SubPid] = emqx:subscribers(Topic1), ?assert(is_pid(SubPid)), - Topic2 = <<"x/y">>, TopicStr2 = http_uri:encode(binary_to_list(Topic2)), + Topic2 = <<"x/y">>, TopicStr2 = emqx_http_lib:uri_encode(binary_to_list(Topic2)), Payload2 = <<"456">>, Uri2 = "coap://127.0.0.1/mqtt/"++TopicStr2++"?c=client1&u=tom&p=secret", {ok, Pid2, N2, Code2, Content2} = er_coap_observer:observe(Uri2), @@ -217,7 +217,7 @@ t_invalid_parameter(_Config) -> %% "cid=client2" is invaid %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Topic3 = <<"a/b">>, Payload3 = <<"ET629">>, - TopicStr3 = http_uri:encode(binary_to_list(Topic3)), + TopicStr3 = emqx_http_lib:uri_encode(binary_to_list(Topic3)), URI3 = "coap://127.0.0.1/mqtt/"++TopicStr3++"?cid=client2&u=tom&p=simple", Reply3 = er_coap_client:request(put, URI3, #coap_content{format = <<"application/octet-stream">>, payload = Payload3}), ?assertMatch({error,bad_request}, Reply3), diff --git a/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl b/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl index 1aaf6cb69..c018b9165 100644 --- a/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl +++ b/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl @@ -173,7 +173,7 @@ t_case01_publish_post(_Config) -> ?assertEqual(<<"42">>, CT2), %% post to publish message to topic maintopic/topic1 - FullTopicStr = http_uri:encode(binary_to_list(FullTopic)), + FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)), URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret", PubPayload = <<"PUBLISH">>, @@ -286,7 +286,7 @@ t_case01_publish_put(_Config) -> ?assertEqual(<<"42">>, CT2), %% put to publish message to topic maintopic/topic1 - FullTopicStr = http_uri:encode(binary_to_list(FullTopic)), + FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)), URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret", PubPayload = <<"PUBLISH">>, @@ -430,7 +430,7 @@ t_case01_subscribe(_Config) -> t_case02_subscribe(_Config) -> Topic = <<"a/b">>, TopicStr = binary_to_list(Topic), - PercentEncodedTopic = http_uri:encode(TopicStr), + PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Payload = <<"payload">>, %% post to publish a new topic "a/b", and the topic is created @@ -477,7 +477,7 @@ t_case03_subscribe(_Config) -> %% Subscribe to the unexisted topic "a/b", got not_found Topic = <<"a/b">>, TopicStr = binary_to_list(Topic), - PercentEncodedTopic = http_uri:encode(TopicStr), + PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", {error, not_found} = er_coap_observer:observe(Uri), @@ -487,7 +487,7 @@ t_case04_subscribe(_Config) -> %% Subscribe to the wildcad topic "+/b", got bad_request Topic = <<"+/b">>, TopicStr = binary_to_list(Topic), - PercentEncodedTopic = http_uri:encode(TopicStr), + PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", {error, bad_request} = er_coap_observer:observe(Uri), @@ -582,7 +582,7 @@ t_case04_read(_Config) -> t_case05_read(_Config) -> Topic = <<"a/b">>, TopicStr = binary_to_list(Topic), - PercentEncodedTopic = http_uri:encode(TopicStr), + PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Payload = <<"payload">>, %% post to publish a new topic "a/b", and the topic is created @@ -609,7 +609,7 @@ t_case05_read(_Config) -> t_case01_delete(_Config) -> TopicInPayload = <<"a/b">>, TopicStr = binary_to_list(TopicInPayload), - PercentEncodedTopic = http_uri:encode(TopicStr), + PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), Payload = list_to_binary("<"++PercentEncodedTopic++">;ct=42"), URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret", @@ -621,7 +621,7 @@ t_case01_delete(_Config) -> %% Client post to CREATE topic "a/b/c" TopicInPayload1 = <<"a/b/c">>, - PercentEncodedTopic1 = http_uri:encode(binary_to_list(TopicInPayload1)), + PercentEncodedTopic1 = emqx_http_lib:uri_encode(binary_to_list(TopicInPayload1)), Payload1 = list_to_binary("<"++PercentEncodedTopic1++">;ct=42"), Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload1}), ?LOGT("Reply =~p", [Reply1]), @@ -643,7 +643,7 @@ t_case01_delete(_Config) -> t_case02_delete(_Config) -> TopicInPayload = <<"a/b">>, TopicStr = binary_to_list(TopicInPayload), - PercentEncodedTopic = http_uri:encode(TopicStr), + PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr), %% DELETE the unexisted topic "a/b" Uri1 = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret", diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 612b5151f..72ba26581 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -237,14 +237,14 @@ message EmptySuccess { } message ValuedResponse { // The responsed value type - // - ignore: Ignore the responsed value // - contiune: Use the responsed value and execute the next hook + // - ignore: Ignore the responsed value // - stop_and_return: Use the responsed value and stop the chain executing enum ResponsedType { - IGNORE = 0; + CONTINUE = 0; - CONTINUE = 1; + IGNORE = 1; STOP_AND_RETURN = 2; } diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 555243107..452d2a742 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,6 +1,6 @@ {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.3.0"}, + {vsn, "4.3.1"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src new file mode 100644 index 000000000..2811c1554 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -0,0 +1,15 @@ +%% -*-: erlang -*- +{VSN, + [ + {"4.3.0", [ + {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.0", [ + {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl b/apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl index 2cfb745bf..257502f2f 100644 --- a/apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl @@ -1886,8 +1886,11 @@ std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic) -> timer:sleep(100). resolve_uri(Uri) -> - {ok, {Scheme, _UserInfo, Host, PortNo, Path, Query}} = - http_uri:parse(Uri, [{scheme_defaults, [{coap, ?DEFAULT_COAP_PORT}, {coaps, ?DEFAULT_COAPS_PORT}]}]), + {ok, #{scheme := Scheme, + host := Host, + port := PortNo, + path := Path} = URIMap} = emqx_http_lib:uri_parse(Uri), + Query = maps:get(query, URIMap, ""), {ok, PeerIP} = inet:getaddr(Host, inet), {Scheme, {PeerIP, PortNo}, split_path(Path), split_query(Query)}. @@ -1896,7 +1899,7 @@ split_path([$/]) -> []; split_path([$/ | Path]) -> split_segments(Path, $/, []). split_query([]) -> []; -split_query([$? | Path]) -> split_segments(Path, $&, []). +split_query(Path) -> split_segments(Path, $&, []). split_segments(Path, Char, Acc) -> case string:rchr(Path, Char) of @@ -1908,7 +1911,7 @@ split_segments(Path, Char, Acc) -> end. make_segment(Seg) -> - list_to_binary(http_uri:decode(Seg)). + list_to_binary(emqx_http_lib:uri_decode(Seg)). get_coap_path(Options) -> diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index 3604d3505..43fb5ba53 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -1,6 +1,6 @@ {application, emqx_management, [{description, "EMQ X Management API and CLI"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel,stdlib,minirest]}, diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 3206ce31b..9454d25b4 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,14 +1,24 @@ %% -*-: erlang -*- -{"4.3.2", - [ {<<"4.3.[0-1]">>, - [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} - , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []} - ]} - ], - [ +{VSN, + [ {"4.3.2", + [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []} + ]}, {<<"4.3.[0-1]">>, [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []} - ]} + , {load_module, emqx_mgmt, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.2", + [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []} + ]}, + {<<"4.3.[0-1]">>, + [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} + , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []} + , {load_module, emqx_mgmt, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} ] }. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 8c196f682..bfba34603 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -139,7 +139,7 @@ node_info(Node) when Node =:= node() -> Info#{node => node(), otp_release => iolist_to_binary(otp_rel()), memory_total => proplists:get_value(allocated, Memory), - memory_used => proplists:get_value(used, Memory), + memory_used => proplists:get_value(total, Memory), process_available => erlang:system_info(process_limit), process_used => erlang:system_info(process_count), max_fds => proplists:get_value(max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))), diff --git a/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl b/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl index 150dcb151..5ccef4c6b 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl @@ -53,6 +53,12 @@ , unregister/2 ]). +-export([ get_topic_metrics/2 + , register_topic_metrics/2 + , unregister_topic_metrics/2 + , unregister_all_topic_metrics/1 + ]). + list(#{topic := Topic0}, _Params) -> execute_when_enabled(fun() -> Topic = emqx_mgmt_util:urldecode(Topic0), diff --git a/lib-ce/emqx_modules/src/emqx_modules.app.src b/lib-ce/emqx_modules/src/emqx_modules.app.src index 576316703..702652fc2 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.app.src +++ b/lib-ce/emqx_modules/src/emqx_modules.app.src @@ -1,6 +1,6 @@ {application, emqx_modules, [{description, "EMQ X Module Management"}, - {vsn, "4.3.1"}, + {vsn, "4.3.2"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src index b44a65c17..aa997c453 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.appup.src +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -1,14 +1,22 @@ %% -*-: erlang -*- {VSN, [ + {"4.3.1", [ + {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} + ]}, {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}} + {update, emqx_mod_delayed, {advanced, []}}, + {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], [ + {"4.3.1", [ + {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} + ]}, {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}} + {update, emqx_mod_delayed, {advanced, []}}, + {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/lib-ce/emqx_telemetry/src/emqx_telemetry.app.src b/lib-ce/emqx_telemetry/src/emqx_telemetry.app.src index c14d00f73..e65678c37 100644 --- a/lib-ce/emqx_telemetry/src/emqx_telemetry.app.src +++ b/lib-ce/emqx_telemetry/src/emqx_telemetry.app.src @@ -1,6 +1,6 @@ {application, emqx_telemetry, [{description, "EMQ X Telemetry"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_telemetry_sup]}, {applications, [kernel,stdlib]}, diff --git a/lib-ce/emqx_telemetry/src/emqx_telemetry.appup.src b/lib-ce/emqx_telemetry/src/emqx_telemetry.appup.src new file mode 100644 index 000000000..27998f0d5 --- /dev/null +++ b/lib-ce/emqx_telemetry/src/emqx_telemetry.appup.src @@ -0,0 +1,15 @@ +%% -*- mode: erlang -*- +{VSN, + [ + {"4.3.0", [ + {load_module, emqx_telemetry, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.0", [ + {load_module, emqx_telemetry, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ] +}. diff --git a/pkg-vsn.sh b/pkg-vsn.sh index 9da5a1075..904690ad1 100755 --- a/pkg-vsn.sh +++ b/pkg-vsn.sh @@ -12,8 +12,10 @@ else EDITION='opensource' fi +## emqx_release.hrl is the single source of truth for release version RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl | cut -d '"' -f2)" +## git commit hash is added as suffix in case the git tag and release version is not an exact match if [ -d .git ] && ! git describe --tags --match "[e|v]${RELEASE}" --exact >/dev/null 2>&1; then SUFFIX="-$(git rev-parse HEAD | cut -b1-8)" fi diff --git a/priv/emqx.schema b/priv/emqx.schema index 37679cf59..04970a150 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2301,7 +2301,7 @@ end}. {mapping, "broker.session_locking_strategy", "emqx.session_locking_strategy", [ {default, quorum}, - {datatype, {enum, [local,one,quorum,all]}} + {datatype, {enum, [local,leader,quorum,all]}} ]}. %% @doc Shared Subscription Dispatch Strategy. diff --git a/rebar.config b/rebar.config index 2ee94ff04..ec78316bc 100644 --- a/rebar.config +++ b/rebar.config @@ -8,7 +8,8 @@ {edoc_opts, [{preprocess,true}]}. {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import, - warn_obsolete_guard,compressed]}. + warn_obsolete_guard,compressed, + {d, snk_kind, msg}]}. {extra_src_dirs, [{"etc", [{recursive,true}]}]}. @@ -35,7 +36,8 @@ {erl_first_files, ["src/emqx_logger.erl", "src/emqx_rule_actions_trans.erl"]}. {deps, - [ {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.5"}}} + [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.5"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} @@ -53,7 +55,7 @@ , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.12.0"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} ]}. {xref_ignores, diff --git a/rebar.config.erl b/rebar.config.erl index 6e81908ba..8ac16ebbb 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -2,10 +2,15 @@ -export([do/2]). -do(_Dir, CONFIG) -> - {HasElixir, C1} = deps(CONFIG), - Config = dialyzer(C1), - maybe_dump(Config ++ [{overrides, overrides()}] ++ coveralls() ++ config(HasElixir)). +do(Dir, CONFIG) -> + case iolist_to_binary(Dir) of + <<".">> -> + {HasElixir, C1} = deps(CONFIG), + Config = dialyzer(C1), + maybe_dump(Config ++ [{overrides, overrides()}] ++ coveralls() ++ config(HasElixir)); + _ -> + CONFIG + end. bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}. @@ -46,6 +51,8 @@ overrides() -> [ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]} , {erl_opts, [{compile_info, [{emqx_vsn, get_vsn()}]}]} ]} + , {add, snabbkaffe, + [{erl_opts, common_compile_opts()}]} ] ++ community_plugin_overrides(). community_plugin_overrides() -> @@ -106,6 +113,7 @@ test_deps() -> common_compile_opts() -> [ debug_info % alwyas include debug_info , {compile_info, [{emqx_vsn, get_vsn()}]} + , {d, snk_kind, msg} ] ++ [{d, 'EMQX_ENTERPRISE'} || is_enterprise()] ++ [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1" ]. diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index 098653444..8de85cd8c 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -1,7 +1,8 @@ #!/bin/bash set -euo pipefail -latest_release=$(git describe --tags "$(git rev-list --tags --max-count=1 --remotes=refs/remote/origin)") +remote="refs/remote/$(git remote -v | grep fetch | grep 'emqx/emqx' | awk '{print $1}')" +latest_release=$(git describe --tags "$(git rev-list --tags --max-count=1 --remotes="$remote")") bad_app_count=0 diff --git a/scripts/get-dashboard.sh b/scripts/get-dashboard.sh index e37badfb2..99c5b5330 100755 --- a/scripts/get-dashboard.sh +++ b/scripts/get-dashboard.sh @@ -12,11 +12,14 @@ if [ -f 'EMQX_ENTERPRISE' ]; then DASHBOARD_PATH='lib-ee/emqx_dashboard/priv' DASHBOARD_REPO='emqx-enterprise-dashboard-frontend-src' AUTH="Authorization: token $(cat scripts/git-token)" + # have to be resolved with auth and redirect + DIRECT_DOWNLOAD_URL="" else VERSION="${EMQX_CE_DASHBOARD_VERSION}" DASHBOARD_PATH='lib-ce/emqx_dashboard/priv' DASHBOARD_REPO='emqx-dashboard-frontend' AUTH="" + DIRECT_DOWNLOAD_URL="https://github.com/emqx/${DASHBOARD_REPO}/releases/download/${VERSION}/emqx-dashboard.zip" fi case $(uname) in @@ -32,27 +35,32 @@ if [ -d "$DASHBOARD_PATH/www" ] && [ "$(version)" = "$VERSION" ]; then exit 0 fi -get_assets(){ +find_url() { # Get the download URL of our desired asset - download_url="$(curl --silent --show-error \ - --header "${AUTH}" \ - --header "Accept: application/vnd.github.v3+json" \ - "https://api.github.com/repos/emqx/${DASHBOARD_REPO}/releases/tags/${VERSION}" \ - | jq --raw-output ".assets[] | select(.name==\"${RELEASE_ASSET_FILE}\").url" \ - | tr -d '\n' | tr -d '\r')" + release_url="https://api.github.com/repos/emqx/${DASHBOARD_REPO}/releases/tags/${VERSION}" + release_info="$(curl --silent --show-error --header "${AUTH}" --header "Accept: application/vnd.github.v3+json" "$release_url")" + if ! download_url="$(echo "$release_info" | jq --raw-output ".assets[] | select(.name==\"${RELEASE_ASSET_FILE}\").url" | tr -d '\n' | tr -d '\r')"; then + echo "failed to query $release_url" + echo "${release_info}" + exit 1 + fi # Get GitHub's S3 redirect URL - redirect_url=$(curl --silent --show-error \ - --header "${AUTH}" \ - --header "Accept: application/octet-stream" \ - --write-out "%{redirect_url}" \ - "$download_url") curl --silent --show-error \ + --header "${AUTH}" \ --header "Accept: application/octet-stream" \ - --output "${RELEASE_ASSET_FILE}" \ - "$redirect_url" + --write-out "%{redirect_url}" \ + "$download_url" } -get_assets +if [ -z "$DIRECT_DOWNLOAD_URL" ]; then + DIRECT_DOWNLOAD_URL="$(find_url)" +fi + +curl -L --silent --show-error \ + --header "Accept: application/octet-stream" \ + --output "${RELEASE_ASSET_FILE}" \ + "$DIRECT_DOWNLOAD_URL" + unzip -q "$RELEASE_ASSET_FILE" -d "$DASHBOARD_PATH" rm -rf "$DASHBOARD_PATH/www" mv "$DASHBOARD_PATH/dist" "$DASHBOARD_PATH/www" diff --git a/src/emqx.app.src b/src/emqx.app.src index b195d7a1b..884c511df 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,7 +1,7 @@ {application, emqx, [{id, "emqx"}, {description, "EMQ X"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 3bf40272c..98fa9ece2 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,17 +1,25 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- {VSN, [ + {"4.3.2", [ + {load_module, emqx_http_lib, brutal_purge, soft_purge, []} + ]}, {"4.3.1", [ + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_plugins, brutal_purge, soft_purge, []} + {load_module, emqx_plugins, brutal_purge, soft_purge, []}, + {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_http_lib, brutal_purge, soft_purge, []} ]}, {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -21,24 +29,33 @@ {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []}, - %% + {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} + {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}, + {load_module, emqx_http_lib, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], [ + {"4.3.2", [ + {load_module, emqx_http_lib, brutal_purge, soft_purge, []} + ]}, {"4.3.1", [ + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, - {load_module, emqx_plugins, brutal_purge, soft_purge, []} + {load_module, emqx_plugins, brutal_purge, soft_purge, []}, + {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_http_lib, brutal_purge, soft_purge, []} ]}, {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -48,10 +65,11 @@ {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_app, brutal_purge, soft_purge, []}, {load_module, emqx_plugins, brutal_purge, soft_purge, []}, + {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []}, %% Just load the module. We don't need to change the 'messages.retained' %% and 'messages.retained' counter type. {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} + {load_module, emqx_http_lib, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 814fd9007..0e897f0b3 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -41,7 +41,8 @@ , stats/1 ]). --export([ async_set_keepalive/4 +-export([ async_set_keepalive/3 + , async_set_keepalive/4 , async_set_socket_options/2 ]). @@ -200,6 +201,9 @@ stats(#state{transport = Transport, %% %% NOTE: This API sets TCP socket options, which has nothing to do with %% the MQTT layer's keepalive (PINGREQ and PINGRESP). +async_set_keepalive(Idle, Interval, Probes) -> + async_set_keepalive(self(), Idle, Interval, Probes). + async_set_keepalive(Pid, Idle, Interval, Probes) -> Options = [ {keepalive, true} , {raw, 6, 4, <>} diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index c4a2f6ac0..37063c65f 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -121,17 +121,8 @@ parse(Bin, {{body, #{hdr := Header, len := Length, rest := Body} }, Options}) when is_binary(Bin) -> - BodyBytes = body_bytes(Body), - {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin), - NewBody = append_body(Body, NewBodyPart), - parse_frame(NewBody, Tail, Header, Length, Options). - -%% split given binary with the first N bytes -split(N, Bin) when N =< 0 -> - {Bin, <<>>}; -split(N, Bin) when N =< size(Bin) -> - <> = Bin, - {H, T}. + NewBody = append_body(Body, Bin), + parse_frame(NewBody, Header, Length, Options). parse_remaining_len(<<>>, Header, Options) -> {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; @@ -178,19 +169,15 @@ append_body(H, T) when is_binary(H) -> append_body(?Q(Bytes, Q), T) -> ?Q(Bytes + iolist_size(T), queue:in(T, Q)). -flatten_body(Body, Tail) when is_binary(Body) -> <>; -flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]). +flatten_body(Body) when is_binary(Body) -> Body; +flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)). +parse_frame(Body, Header, 0, Options) -> + {ok, packet(Header), flatten_body(Body), ?none(Options)}; parse_frame(Body, Header, Length, Options) -> - %% already appended - parse_frame(Body, _SplitTail = <<>>, Header, Length, Options). - -parse_frame(Body, Tail, Header, 0, Options) -> - {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)}; -parse_frame(Body, Tail, Header, Length, Options) -> case body_bytes(Body) >= Length of true -> - <> = flatten_body(Body, Tail), + <> = flatten_body(Body), case parse_packet(Header, FrameBin, Options) of {Variable, Payload} -> {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; @@ -202,7 +189,7 @@ parse_frame(Body, Tail, Header, Length, Options) -> false -> {more, {{body, #{hdr => Header, len => Length, - rest => append_body(Body, Tail) + rest => Body }}, Options}} end. diff --git a/src/emqx_http_lib.erl b/src/emqx_http_lib.erl index 60f19e6bb..893c260ee 100644 --- a/src/emqx_http_lib.erl +++ b/src/emqx_http_lib.erl @@ -108,11 +108,7 @@ normalise_headers(Headers0) -> [{K, proplists:get_value(K, Headers)} || K <- Keys]. normalise_parse_result(#{host := Host, scheme := Scheme0} = Map) -> - Scheme = atom_scheme(Scheme0), - DefaultPort = case https =:= Scheme of - true -> 443; - false -> 80 - end, + {Scheme, DefaultPort} = atom_scheme_and_default_port(Scheme0), Port = case maps:get(port, Map, undefined) of N when is_number(N) -> N; _ -> DefaultPort @@ -122,11 +118,14 @@ normalise_parse_result(#{host := Host, scheme := Scheme0} = Map) -> , port => Port }. -%% NOTE: so far we only support http schemes. -atom_scheme(Scheme) when is_list(Scheme) -> atom_scheme(list_to_binary(Scheme)); -atom_scheme(<<"https">>) -> https; -atom_scheme(<<"http">>) -> http; -atom_scheme(Other) -> throw({unsupported_scheme, Other}). +%% NOTE: so far we only support http/coap schemes. +atom_scheme_and_default_port(Scheme) when is_list(Scheme) -> + atom_scheme_and_default_port(list_to_binary(Scheme)); +atom_scheme_and_default_port(<<"http">> ) -> {http, 80}; +atom_scheme_and_default_port(<<"https">>) -> {https, 443}; +atom_scheme_and_default_port(<<"coap">> ) -> {coap, 5683}; +atom_scheme_and_default_port(<<"coaps">>) -> {coaps, 5684}; +atom_scheme_and_default_port(Other) -> throw({unsupported_scheme, Other}). do_uri_encode(Char) -> case reserved(Char) of diff --git a/src/emqx_logger_textfmt.erl b/src/emqx_logger_textfmt.erl index 3bc9f185a..291153d74 100644 --- a/src/emqx_logger_textfmt.erl +++ b/src/emqx_logger_textfmt.erl @@ -35,15 +35,9 @@ format(#{msg := Msg0, meta := Meta} = Event, Config) -> logger_formatter:format(Event#{msg := Msg}, Config). maybe_merge({report, Report}, Meta) when is_map(Report) -> - {report, maps:merge(rename(Report), filter(Meta))}; + {report, maps:merge(Report, filter(Meta))}; maybe_merge(Report, _Meta) -> Report. filter(Meta) -> maps:without(?WITHOUT_MERGE, Meta). - -rename(#{'$kind' := Kind} = Meta0) -> % snabbkaffe - Meta = maps:remove('$kind', Meta0), - Meta#{msg => Kind}; -rename(Meta) -> - Meta. diff --git a/src/emqx_node_dump.erl b/src/emqx_node_dump.erl index 18189bb57..ff0895553 100644 --- a/src/emqx_node_dump.erl +++ b/src/emqx_node_dump.erl @@ -52,7 +52,7 @@ censor([Key | _], Val) -> end. is_sensitive(Key) when is_atom(Key) -> - is_sensitive(atom_to_binary(Key)); + is_sensitive(atom_to_binary(Key, utf8)); is_sensitive(Key) when is_list(Key) -> try iolist_to_binary(Key) of Bin -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index f05da760e..c2de5fe57 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -172,7 +172,14 @@ load_ext_plugin(PluginDir) -> error({plugin_app_file_not_found, AppFile}) end, ok = load_plugin_app(AppName, Ebin), - ok = load_plugin_conf(AppName, PluginDir). + try + ok = load_plugin_conf(AppName, PluginDir) + catch + throw : {conf_file_not_found, ConfFile} -> + %% this is maybe a dependency of an external plugin + ?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]), + ok + end. load_plugin_app(AppName, Ebin) -> _ = code:add_patha(Ebin), @@ -180,8 +187,8 @@ load_plugin_app(AppName, Ebin) -> lists:foreach( fun(BeamFile) -> Module = list_to_atom(filename:basename(BeamFile, ".beam")), - case code:ensure_loaded(Module) of - {module, Module} -> ok; + case code:load_file(Module) of + {module, _} -> ok; {error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason}) end end, Modules), @@ -193,12 +200,12 @@ load_plugin_app(AppName, Ebin) -> load_plugin_conf(AppName, PluginDir) -> Priv = filename:join([PluginDir, "priv"]), Etc = filename:join([PluginDir, "etc"]), - Schema = filelib:wildcard(filename:join([Priv, "*.schema"])), ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]), Conf = case filelib:is_file(ConfFile) of true -> cuttlefish_conf:file(ConfFile); - false -> error({conf_file_not_found, ConfFile}) + false -> throw({conf_file_not_found, ConfFile}) end, + Schema = filelib:wildcard(filename:join([Priv, "*.schema"])), ?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]), AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf), lists:foreach(fun({AppName1, Envs}) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 389a81e7b..7bc68c271 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -257,13 +257,16 @@ websocket_init([Req, Opts]) -> case proplists:get_bool(proxy_protocol, Opts) andalso maps:get(proxy_header, Req) of #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} -> - ProxyName = {SrcAddr, SrcPort}, + SourceName = {SrcAddr, SrcPort}, %% Notice: Only CN is available in Proxy Protocol V2 additional info - ProxySSL = case maps:get(cn, SSL, undefined) of + SourceSSL = case maps:get(cn, SSL, undefined) of undeined -> nossl; CN -> [{pp2_ssl_cn, CN}] end, - {ProxyName, ProxySSL}; + {SourceName, SourceSSL}; + #{src_address := SrcAddr, src_port := SrcPort} -> + SourceName = {SrcAddr, SrcPort}, + {SourceName , nossl}; _ -> {get_peer(Req, Opts), cowboy_req:cert(Req)} end, diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 0cfba4737..eb2c3564f 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -181,16 +181,20 @@ t_discard_session(_) -> ok = meck:unload(emqx_connection). t_discard_session_race(_) -> - ok = snabbkaffe:start_trace(), - #{conninfo := ConnInfo0} = ?ChanInfo, - ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection}, - {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end), - ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo), - Pid ! stop, - receive {'DOWN', Ref, process, Pid, normal} -> ok end, - ok = emqx_cm:discard_session(<<"clientid">>), - {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000), - snabbkaffe:stop(). + ?check_trace( + begin + #{conninfo := ConnInfo0} = ?ChanInfo, + ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection}, + {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end), + ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo), + Pid ! stop, + receive {'DOWN', Ref, process, Pid, normal} -> ok end, + ok = emqx_cm:discard_session(<<"clientid">>), + {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000) + end, + fun(_, _) -> + true + end). t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 09aa97c3e..09206cee1 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -58,7 +58,8 @@ groups() -> t_serialize_parse_connack_v5 ]}, {publish, [parallel], - [t_serialize_parse_qos0_publish, + [t_parse_sticky_frames, + t_serialize_parse_qos0_publish, t_serialize_parse_qos1_publish, t_serialize_parse_qos2_publish, t_serialize_parse_publish_v5 @@ -286,6 +287,24 @@ t_serialize_parse_connack_v5(_) -> Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). +t_parse_sticky_frames(_) -> + Payload = lists:duplicate(10, 0), + P = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = false, + qos = ?QOS_0, + retain = false}, + variable = #mqtt_packet_publish{topic_name = <<"a/b">>, + packet_id = undefined}, + payload = iolist_to_binary(Payload) + }, + Bin = serialize_to_binary(P), + Size = size(Bin), + <> = Bin, + {more, PState1} = emqx_frame:parse(H), %% needs 2 more bytes + %% feed 3 bytes as if the next 1 byte belongs to the next packet. + {ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1), + ?assertMatch({none, _}, PState2). + t_serialize_parse_qos0_publish(_) -> Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>, Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, diff --git a/test/emqx_http_lib_tests.erl b/test/emqx_http_lib_tests.erl index a850da8f5..7bcb7d056 100644 --- a/test/emqx_http_lib_tests.erl +++ b/test/emqx_http_lib_tests.erl @@ -66,6 +66,16 @@ uri_parse_test_() -> emqx_http_lib:uri_parse("HTTPS://127.0.0.1")) end } + , {"coap default port", + fun() -> ?assertMatch({ok, #{scheme := coap, port := 5683}}, + emqx_http_lib:uri_parse("coap://127.0.0.1")) + end + } + , {"coaps default port", + fun() -> ?assertMatch({ok, #{scheme := coaps, port := 5684}}, + emqx_http_lib:uri_parse("coaps://127.0.0.1")) + end + } , {"unsupported_scheme", fun() -> ?assertEqual({error, {unsupported_scheme, <<"wss">>}}, emqx_http_lib:uri_parse("wss://127.0.0.1"))