From fb8133b99855ee24e11aa31642a310b36745899c Mon Sep 17 00:00:00 2001 From: z8674558 Date: Wed, 24 Feb 2021 15:59:27 +0900 Subject: [PATCH 01/32] chore(emqx_auth_http): match emqx:hook with ok --- apps/emqx_auth_http/src/emqx_auth_http_app.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index 2988dac78..89f42a2cd 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -109,14 +109,14 @@ load_hooks() -> {ok, _} = ehttpc_sup:start_pool(PoolName, PoolOpts), case application:get_env(?APP, super_req) of undefined -> - emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), - super => undefined}]}); + emqx_hooks:put('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), + super => undefined}]}); {ok, SuperReq} -> PoolOpts1 = proplists:get_value(pool_opts, SuperReq), PoolName1 = proplists:get_value(pool_name, SuperReq), {ok, _} = ehttpc_sup:start_pool(PoolName1, PoolOpts1), - emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), - super => maps:from_list(SuperReq)}]}) + emqx_hooks:put('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), + super => maps:from_list(SuperReq)}]}) end end, case application:get_env(?APP, acl_req) of @@ -126,7 +126,7 @@ load_hooks() -> PoolOpts2 = proplists:get_value(pool_opts, ACLReq), PoolName2 = proplists:get_value(pool_name, ACLReq), {ok, _} = ehttpc_sup:start_pool(PoolName2, PoolOpts2), - emqx:hook('client.check_acl', {emqx_acl_http, check_acl, [#{acl => maps:from_list(ACLReq)}]}) + emqx_hooks:put('client.check_acl', {emqx_acl_http, check_acl, [#{acl => maps:from_list(ACLReq)}]}) end, ok. From 02a755fbea98732a2a8da16bf463ea1ab9b48685 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Wed, 24 Feb 2021 16:01:44 +0900 Subject: [PATCH 02/32] chore(emqx_exproto): fix dialyzer warinings on default_conninfo --- apps/emqx_exproto/src/emqx_exproto_channel.erl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 04ed8b414..0eec36410 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -565,15 +565,14 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> NClientInfo#{protocol => ProtoName}. default_conninfo(ConnInfo) -> - ConnInfo#{proto_name => undefined, - proto_ver => undefined, - clean_start => true, + ConnInfo#{clean_start => true, clientid => undefined, username => undefined, - conn_props => [], + conn_mod => undefined, + conn_props => #{}, connected => true, connected_at => erlang:system_time(millisecond), - keepalive => undefined, + keepalive => 0, receive_maximum => 0, expiry_interval => 0}. From a6b5e0707d8da50ca2c4efbd1524955b5c88822c Mon Sep 17 00:00:00 2001 From: z8674558 Date: Wed, 24 Feb 2021 16:02:14 +0900 Subject: [PATCH 03/32] chore(emqx_exproto): match emqx_misc:tune_heap_size --- apps/emqx_exproto/src/emqx_exproto_conn.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index 72c18410a..fe54fff73 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -264,7 +264,7 @@ init_state(WrappedSock, Peername, Options) -> run_loop(Parent, State = #state{socket = Socket, peername = Peername}) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), - emqx_misc:tune_heap_size(?DEFAULT_OOM_POLICY), + _ = emqx_misc:tune_heap_size(?DEFAULT_OOM_POLICY), case activate_socket(State) of {ok, NState} -> hibernate(Parent, NState); From 3e9abbe95a1b087e7ae0280ad0fc6d4a118d9a16 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Wed, 24 Feb 2021 16:03:28 +0900 Subject: [PATCH 04/32] chore(emqx_exproto): tell dialyzer exit functions do not return --- apps/emqx_exproto/src/emqx_exproto_conn.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index fe54fff73..b29e0b7f5 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -273,6 +273,7 @@ run_loop(Parent, State = #state{socket = Socket, exit_on_sock_error(Reason) end. +-spec exit_on_sock_error(atom()) -> no_return(). exit_on_sock_error(Reason) when Reason =:= einval; Reason =:= enotconn; Reason =:= closed -> @@ -449,6 +450,7 @@ handle_msg(Msg, State) -> %%-------------------------------------------------------------------- %% Terminate +-spec terminate(atom(), state()) -> no_return(). terminate(Reason, State = #state{channel = Channel}) -> ?LOG(debug, "Terminated due to ~p", [Reason]), _ = emqx_exproto_channel:terminate(Reason, Channel), From 47033d2f0bcda06eec32d663b009540c78af7726 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 26 Feb 2021 17:34:36 +0100 Subject: [PATCH 05/32] chore(script): delete sync-apps.sh It is no longer needed --- sync-apps.sh | 103 --------------------------------------------------- 1 file changed, 103 deletions(-) delete mode 100755 sync-apps.sh diff --git a/sync-apps.sh b/sync-apps.sh deleted file mode 100755 index 63145a987..000000000 --- a/sync-apps.sh +++ /dev/null @@ -1,103 +0,0 @@ -#!/bin/bash - -set -euo pipefail - -force="${1:-no}" - -apps=( -# "emqx_auth_http" # permanently diverged -# "emqx_web_hook" # permanently diverged -"emqx_auth_jwt" -"emqx_auth_ldap" -"emqx_auth_mongo" -"emqx_auth_mysql" -"emqx_auth_pgsql" -"emqx_auth_redis" -"emqx_bridge_mqtt" -"emqx_coap" -# "emqx_dashboard" # moved to lib-ce -"emqx_exhook" -"emqx_exproto" -"emqx_lua_hook" -"emqx_lwm2m" -# "emqx_management" # moved to lib-ce -"emqx_prometheus" -"emqx_psk_file" -"emqx_recon" -"emqx_retainer" -"emqx_rule_engine" -"emqx_sasl" -"emqx_sn" -"emqx_stomp" -"emqx_telemetry" -) - -if git status --porcelain | grep -qE 'apps/'; then - echo 'apps dir is not git-clear, refuse to sync' -# exit 1 -fi - -mkdir -p tmp/ - -download_zip() { - local app="$1" - local ref="$2" - local vsn - vsn="$(echo "$ref" | tr '/' '-')" - local file="tmp/${app}-${vsn}.zip" - if [ -f "$file" ] && [ "$force" != "force" ]; then - return 0 - fi - local repo - repo=${app//_/-} - local url="https://github.com/emqx/$repo/archive/$ref.zip" - echo "downloading ${url}" - curl -fLsS -o "$file" "$url" -} - -default_vsn="dev/v4.3.0" -download_zip "emqx_auth_mnesia" "e4.2.3" -for app in "${apps[@]}"; do - download_zip "$app" "$default_vsn" -done - -extract_zip(){ - local app="$1" - local ref="$2" - local vsn_arg="${3:-}" - local vsn_dft - vsn_dft="$(echo "$ref" | tr '/' '-')" - local vsn - if [ -n "$vsn_arg" ]; then - vsn="$vsn_arg" - else - vsn="$vsn_dft" - fi - local file="tmp/${app}-${vsn_dft}.zip" - local repo - repo=${app//_/-} - rm -rf "apps/${app}/" - unzip "$file" -d apps/ - mv "apps/${repo}-${vsn}/" "apps/$app/" -} - -extract_zip "emqx_auth_mnesia" "e4.2.3" "e4.2.3" -for app in "${apps[@]}"; do - extract_zip "$app" "$default_vsn" -done - -cleanup_app(){ - local app="$1" - pushd "apps/$app" - rm -f Makefile rebar.config.script LICENSE src/*.app.src.script src/*.appup.src - rm -rf ".github" ".ci" - # restore rebar.config and app.src - git checkout rebar.config - git checkout src/*.app.src - popd -} - -apps+=( "emqx_auth_mnesia" ) -for app in "${apps[@]}"; do - cleanup_app "$app" -done From 7afeadd6fc1ae230bfab05872772bb35740d4367 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 26 Feb 2021 20:23:35 +0100 Subject: [PATCH 06/32] chore(ci): run eunit test in github action --- .github/workflows/run_test_cases.yaml | 1 + Makefile | 2 +- rebar.config.erl | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 00e980d30..745deb7f9 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -47,6 +47,7 @@ jobs: printenv > .env docker exec -i erlang bash -c "make xref" docker exec --env-file .env -i erlang bash -c "make ct" + docker exec --env-file .env -i erlang bash -c "make eunit" docker exec -i erlang bash -c "make cover" docker exec -i erlang bash -c "make coveralls" - uses: actions/upload-artifact@v1 diff --git a/Makefile b/Makefile index 81b8b4fb0..c3dbc28a2 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ get-dashboard: .PHONY: eunit eunit: $(REBAR) - $(REBAR) eunit + $(REBAR) eunit -v -c .PHONY: proper proper: $(REBAR) diff --git a/rebar.config.erl b/rebar.config.erl index 2365e9999..d8f0d74ec 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -72,6 +72,7 @@ profiles() -> , {test, [ {deps, test_deps()} , {plugins, test_plugins()} , {erl_opts, [debug_info, {parse_transform, mod_vsn}] ++ erl_opts_i()} + , {extra_src_dirs, [{"test", [{recursive,true}]}]} ]} ]. From d8ad7a0edbb8f1396fa8ee5458ac6a53a050795f Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 26 Feb 2021 20:58:38 +0100 Subject: [PATCH 07/32] chore(emqx_exhook): Ensure semver app vsn --- apps/emqx_exhook/src/emqx_exhook.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index fd8bc98ae..555243107 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,6 +1,6 @@ {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "git"}, + {vsn, "4.3.0"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, From c54636b6c2606933bc48b7e3881d7e0927a33f6d Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 26 Feb 2021 21:43:58 +0100 Subject: [PATCH 08/32] chore(build): inject emqx_vsn to all modules as attribute --- rebar.config | 3 ++- rebar.config.erl | 7 ++++--- src/emqx_mod_vsn.erl | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 src/emqx_mod_vsn.erl diff --git a/rebar.config b/rebar.config index e35ac8426..c37fa8070 100644 --- a/rebar.config +++ b/rebar.config @@ -11,7 +11,8 @@ {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import, warn_obsolete_guard,compressed]}. -{overrides,[{add,[{extra_src_dirs, [{"etc", [{recursive,true}]}]}]} +{overrides,[{add,[ {extra_src_dirs, [{"etc", [{recursive,true}]}]} + , {erl_opts, [{parse_transform, emqx_mod_vsn}]}]} ]}. {extra_src_dirs, [{"etc", [{recursive,true}]}]}. diff --git a/rebar.config.erl b/rebar.config.erl index d8f0d74ec..2145fc212 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -52,7 +52,7 @@ test_deps() -> ]. default_compile_opts() -> - [compressed, deterministic, no_debug_info, warnings_as_errors, {parse_transform, mod_vsn}]. + [compressed, deterministic, no_debug_info, warnings_as_errors, {parse_transform, emqx_mod_vsn}]. profiles() -> [ {'emqx', [ {erl_opts, default_compile_opts()} @@ -67,11 +67,11 @@ profiles() -> , {'emqx-edge-pkg', [ {erl_opts, default_compile_opts()} , {relx, relx('emqx-edge-pkg')} ]} - , {check, [ {erl_opts, [debug_info, warnings_as_errors, {parse_transform, mod_vsn}]} + , {check, [ {erl_opts, [debug_info, warnings_as_errors, {parse_transform, emqx_mod_vsn}]} ]} , {test, [ {deps, test_deps()} , {plugins, test_plugins()} - , {erl_opts, [debug_info, {parse_transform, mod_vsn}] ++ erl_opts_i()} + , {erl_opts, [debug_info, {parse_transform, emqx_mod_vsn}] ++ erl_opts_i()} , {extra_src_dirs, [{"test", [{recursive,true}]}]} ]} ]. @@ -281,6 +281,7 @@ provide_bcrypt_release(ReleaseType) -> compile_and_load_pase_transforms(Dir) -> PtFiles = [ "apps/emqx_rule_engine/src/emqx_rule_actions_trans.erl" + , "src/emqx_mod_vsn.erl" ], CompileOpts = [verbose,report_errors,report_warnings,return_errors,debug_info], lists:foreach(fun(PtFile) -> {ok, _Mod} = compile:file(path(Dir, PtFile), CompileOpts) end, PtFiles). diff --git a/src/emqx_mod_vsn.erl b/src/emqx_mod_vsn.erl new file mode 100644 index 000000000..861398569 --- /dev/null +++ b/src/emqx_mod_vsn.erl @@ -0,0 +1,40 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% This module provides a parse_transform to inject emqx version number +%% to all modules as a module attribute. +%% The module attribute is so far only used for beam reload inspection. +-module(emqx_mod_vsn). + +-export([parse_transform/2]). + +parse_transform(Form, _Opts) -> + case os:getenv("PKG_VSN") of + false -> Form; + Vsn -> trans(Form, {attribute, 1, emqx_vsn, Vsn}) + end. + +trans(Form, Injection) -> + trans(Form, Injection, []). + +trans([], _Injection, Acc) -> + lists:reverse(Acc); +trans([{eof, _} | _] = EOF, _Injection, Acc) -> + lists:reverse(Acc) ++ EOF; +trans([{attribute, _, module, _} = Module | Form], Injection, Acc) -> + lists:reverse(Acc) ++ [Module, Injection | Form]; +trans([H | T], Injection, Acc) -> + trans(T, Injection, [H | Acc]). From ddadece0b2316679f92bbc38c6b1549abde28e66 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sun, 28 Feb 2021 12:12:28 +0100 Subject: [PATCH 09/32] chore(build): include emqx version as compile info --- rebar.config | 4 ---- rebar.config.erl | 41 +++++++++++++++++++++++++++++++---------- src/emqx_mod_vsn.erl | 40 ---------------------------------------- 3 files changed, 31 insertions(+), 54 deletions(-) delete mode 100644 src/emqx_mod_vsn.erl diff --git a/rebar.config b/rebar.config index c37fa8070..c692b185e 100644 --- a/rebar.config +++ b/rebar.config @@ -6,14 +6,10 @@ %% with rebar.config.erl module. Final result is written to %% rebar.config.rendered if environment DEBUG is set. -{minimum_otp_vsn, "21.3"}. {edoc_opts, [{preprocess,true}]}. {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import, warn_obsolete_guard,compressed]}. -{overrides,[{add,[ {extra_src_dirs, [{"etc", [{recursive,true}]}]} - , {erl_opts, [{parse_transform, emqx_mod_vsn}]}]} - ]}. {extra_src_dirs, [{"etc", [{recursive,true}]}]}. {xref_checks,[undefined_function_calls,undefined_functions,locals_not_used, diff --git a/rebar.config.erl b/rebar.config.erl index 2145fc212..d216e8464 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -6,7 +6,7 @@ do(Dir, CONFIG) -> ok = compile_and_load_pase_transforms(Dir), C1 = deps(CONFIG), Config = dialyzer(C1), - dump(Config ++ coveralls() ++ config()). + dump(Config ++ [{overrides, overrides()}] ++ coveralls() ++ config()). bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}. @@ -19,6 +19,14 @@ deps(Config) -> end, lists:keystore(deps, 1, Config, {deps, OldDpes ++ MoreDeps}). +overrides() -> + [ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]} + , {erl_opts, [ deterministic + , {compile_info, [{emqx_vsn, get_vsn()}]} + ]} + ]} + ]. + config() -> [ {plugins, plugins()} , {profiles, profiles()} @@ -51,27 +59,41 @@ test_deps() -> , meck ]. -default_compile_opts() -> - [compressed, deterministic, no_debug_info, warnings_as_errors, {parse_transform, emqx_mod_vsn}]. +common_compile_opts() -> + [ deterministic + , {compile_info, [{emqx_vsn, get_vsn()}]} + ]. + +prod_compile_opts() -> + [ compressed + , no_debug_info + , warnings_as_errors + | common_compile_opts() + ]. + +test_compile_opts() -> + [ debug_info + | common_compile_opts() + ]. profiles() -> - [ {'emqx', [ {erl_opts, default_compile_opts()} + [ {'emqx', [ {erl_opts, prod_compile_opts()} , {relx, relx('emqx')} ]} - , {'emqx-pkg', [ {erl_opts, default_compile_opts()} + , {'emqx-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx('emqx-pkg')} ]} - , {'emqx-edge', [ {erl_opts, default_compile_opts()} + , {'emqx-edge', [ {erl_opts, prod_compile_opts()} , {relx, relx('emqx-edge')} ]} - , {'emqx-edge-pkg', [ {erl_opts, default_compile_opts()} + , {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()} , {relx, relx('emqx-edge-pkg')} ]} - , {check, [ {erl_opts, [debug_info, warnings_as_errors, {parse_transform, emqx_mod_vsn}]} + , {check, [ {erl_opts, test_compile_opts()} ]} , {test, [ {deps, test_deps()} , {plugins, test_plugins()} - , {erl_opts, [debug_info, {parse_transform, emqx_mod_vsn}] ++ erl_opts_i()} + , {erl_opts, test_compile_opts() ++ erl_opts_i()} , {extra_src_dirs, [{"test", [{recursive,true}]}]} ]} ]. @@ -281,7 +303,6 @@ provide_bcrypt_release(ReleaseType) -> compile_and_load_pase_transforms(Dir) -> PtFiles = [ "apps/emqx_rule_engine/src/emqx_rule_actions_trans.erl" - , "src/emqx_mod_vsn.erl" ], CompileOpts = [verbose,report_errors,report_warnings,return_errors,debug_info], lists:foreach(fun(PtFile) -> {ok, _Mod} = compile:file(path(Dir, PtFile), CompileOpts) end, PtFiles). diff --git a/src/emqx_mod_vsn.erl b/src/emqx_mod_vsn.erl deleted file mode 100644 index 861398569..000000000 --- a/src/emqx_mod_vsn.erl +++ /dev/null @@ -1,40 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% This module provides a parse_transform to inject emqx version number -%% to all modules as a module attribute. -%% The module attribute is so far only used for beam reload inspection. --module(emqx_mod_vsn). - --export([parse_transform/2]). - -parse_transform(Form, _Opts) -> - case os:getenv("PKG_VSN") of - false -> Form; - Vsn -> trans(Form, {attribute, 1, emqx_vsn, Vsn}) - end. - -trans(Form, Injection) -> - trans(Form, Injection, []). - -trans([], _Injection, Acc) -> - lists:reverse(Acc); -trans([{eof, _} | _] = EOF, _Injection, Acc) -> - lists:reverse(Acc) ++ EOF; -trans([{attribute, _, module, _} = Module | Form], Injection, Acc) -> - lists:reverse(Acc) ++ [Module, Injection | Form]; -trans([H | T], Injection, Acc) -> - trans(T, Injection, [H | Acc]). From 4b43b6532c60f2bf9bc9d9af4a59bf2f27c794d7 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Mon, 1 Mar 2021 12:53:20 +0900 Subject: [PATCH 10/32] chore(emqx_mgmt_api_banned): do_pack_banned takes map instead of impropr record --- .../src/emqx_mgmt_api_banned.erl | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl index c3cd8e313..ac80e0373 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl @@ -105,37 +105,33 @@ validate_params(Params) -> {error, ?ERROR8, Msg} end. -%% TODO who and reason is undefined - causing dialyzer errors. fix later --dialyzer({nowarn_function,pack_banned/1}). pack_banned(Params) -> Now = erlang:system_time(second), - do_pack_banned(Params, #banned{by = <<"user">>, - at = Now, - until = Now + 300}). + do_pack_banned(Params, #{by => <<"user">>, at => Now, until => Now + 300}). -do_pack_banned([], Banned) -> - {ok, Banned}; +do_pack_banned([], #{who := Who, by := By, reason := Reason, at := At, until := Until}) -> + {ok, #banned{who = Who, by = By, reason = Reason, at = At, until = Until}}; do_pack_banned([{<<"who">>, Who} | Params], Banned) -> case lists:keytake(<<"as">>, 1, Params) of {value, {<<"as">>, <<"peerhost">>}, Params2} -> {ok, IPAddress} = inet:parse_address(str(Who)), - do_pack_banned(Params2, Banned#banned{who = {peerhost, IPAddress}}); + do_pack_banned(Params2, Banned#{who => {peerhost, IPAddress}}); {value, {<<"as">>, <<"clientid">>}, Params2} -> - do_pack_banned(Params2, Banned#banned{who = {clientid, Who}}); + do_pack_banned(Params2, Banned#{who => {clientid, Who}}); {value, {<<"as">>, <<"username">>}, Params2} -> - do_pack_banned(Params2, Banned#banned{who = {username, Who}}) + do_pack_banned(Params2, Banned#{who => {username, Who}}) end; do_pack_banned([P1 = {<<"as">>, _}, P2 | Params], Banned) -> do_pack_banned([P2, P1 | Params], Banned); do_pack_banned([{<<"by">>, By} | Params], Banned) -> - do_pack_banned(Params, Banned#banned{by = By}); + do_pack_banned(Params, Banned#{by => By}); do_pack_banned([{<<"reason">>, Reason} | Params], Banned) -> - do_pack_banned(Params, Banned#banned{reason = Reason}); + do_pack_banned(Params, Banned#{reason => Reason}); do_pack_banned([{<<"at">>, At} | Params], Banned) -> - do_pack_banned(Params, Banned#banned{at = At}); + do_pack_banned(Params, Banned#{at => At}); do_pack_banned([{<<"until">>, Until} | Params], Banned) -> - do_pack_banned(Params, Banned#banned{until = Until}); -do_pack_banned([_P | Params], Banned) -> %% ingore other params + do_pack_banned(Params, Banned#{until => Until}); +do_pack_banned([_P | Params], Banned) -> %% ignore other params do_pack_banned(Params, Banned). do_delete(<<"peerhost">>, Who) -> From c52d241a068667a8472448ddea55ceb67ed224c3 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Mon, 1 Mar 2021 12:55:09 +0900 Subject: [PATCH 11/32] chore(emqx_mgmt_api_listeners): return error tuple --- lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl index 7acbe8107..b2e75d7ff 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -65,7 +65,7 @@ restart(#{identifier := Identifier}, _Params) -> Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()], case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of [] -> return(ok); - Errors -> return({error, Errors}) + Errors -> return({error, {restart, Errors}}) end. format(Listeners) when is_list(Listeners) -> From 6ea4501de4fb882a07d6d684e31203ee6b8fb0df Mon Sep 17 00:00:00 2001 From: z8674558 Date: Mon, 1 Mar 2021 13:02:08 +0900 Subject: [PATCH 12/32] chore(emqx_listeners): fix type --- src/emqx_listeners.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 651be0e8e..5df90b40a 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -99,7 +99,7 @@ ensure_all_started([L | Rest], Results) -> ensure_all_started(Rest, NewResults). %% @doc Format address:port for logging. --spec(format_listen_on(esockd:listen_on()) -> binary()). +-spec(format_listen_on(esockd:listen_on()) -> [char()]). format_listen_on(ListenOn) -> format(ListenOn). -spec(start_listener(listener()) -> ok). From 64ac20eec59006f8ee772178a8000785fe378e78 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Mon, 1 Mar 2021 13:02:53 +0900 Subject: [PATCH 13/32] chore(emqx_listeners): fix clauses for dialyzer --- src/emqx_listeners.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 5df90b40a..ecbc54053 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -197,9 +197,8 @@ restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> restart_listener(Proto, ListenOn, _Opts) -> esockd:reopen(Proto, ListenOn). -ok(ok) -> ok; ok({ok, _}) -> ok; -ok(Error) -> Error. +ok(Other) -> Other. %% @doc Stop all listeners. -spec(stop() -> ok). From 62a350949ccc65a084057b1cde391c5ffb4d11aa Mon Sep 17 00:00:00 2001 From: tigercl Date: Mon, 1 Mar 2021 16:12:43 +0800 Subject: [PATCH 14/32] style: remove space --- lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl index ac80e0373..483ab4be3 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl @@ -109,7 +109,7 @@ pack_banned(Params) -> Now = erlang:system_time(second), do_pack_banned(Params, #{by => <<"user">>, at => Now, until => Now + 300}). -do_pack_banned([], #{who := Who, by := By, reason := Reason, at := At, until := Until}) -> +do_pack_banned([], #{who := Who, by := By, reason := Reason, at := At, until := Until}) -> {ok, #banned{who = Who, by = By, reason = Reason, at = At, until = Until}}; do_pack_banned([{<<"who">>, Who} | Params], Banned) -> case lists:keytake(<<"as">>, 1, Params) of From 6081d45d813ef47cc81ca6762669a87b9a6113d9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 1 Mar 2021 20:15:12 +0800 Subject: [PATCH 15/32] fix(rule): reformat some code * fix(rule): reformat some code for rule-engine * fix(lwm2m): change publish_update_when to publish_update_msg_when Change the option name publish_update_when -> publish_update_msg_when. Also change the object_list_changed to contains_object_list, as the the later describes the default behavior correctly. * fix(lwm2m): publish_update_msg_when -> update_msg_publish_condition --- apps/emqx_lwm2m/etc/emqx_lwm2m.conf | 6 +-- apps/emqx_lwm2m/priv/emqx_lwm2m.schema | 6 +-- .../emqx_lwm2m/src/emqx_lwm2m_coap_server.erl | 4 +- apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 6 +-- .../emqx_rule_engine/src/emqx_rule_engine.erl | 37 +++++++++---------- 5 files changed, 28 insertions(+), 31 deletions(-) diff --git a/apps/emqx_lwm2m/etc/emqx_lwm2m.conf b/apps/emqx_lwm2m/etc/emqx_lwm2m.conf index 83111c956..152aee604 100644 --- a/apps/emqx_lwm2m/etc/emqx_lwm2m.conf +++ b/apps/emqx_lwm2m/etc/emqx_lwm2m.conf @@ -44,11 +44,11 @@ lwm2m.topics.update = up/resp # When publish the update message. # # Can be one of: -# - object_list_changed: only if the object list is changed +# - contains_object_list: only if the update message contains object list # - always: always publish the update message # -# Defaults to object_list_changed -#lwm2m.publish_update_when = object_list_changed +# Defaults to contains_object_list +#lwm2m.update_msg_publish_condition = contains_object_list # Dir where the object definition files can be found lwm2m.xml_dir = {{ platform_etc_dir }}/lwm2m_xml diff --git a/apps/emqx_lwm2m/priv/emqx_lwm2m.schema b/apps/emqx_lwm2m/priv/emqx_lwm2m.schema index f15269833..b5ed778f5 100644 --- a/apps/emqx_lwm2m/priv/emqx_lwm2m.schema +++ b/apps/emqx_lwm2m/priv/emqx_lwm2m.schema @@ -112,9 +112,9 @@ end}. {default, "lwm2m/%e/up/resp"} ]}. -{mapping, "lwm2m.publish_update_when", "emqx_lwm2m.publish_update_when", [ - {datatype, {enum, [object_list_changed, always]}}, - {default, object_list_changed} +{mapping, "lwm2m.update_msg_publish_condition", "emqx_lwm2m.update_msg_publish_condition", [ + {datatype, {enum, [contains_object_list, always]}}, + {default, contains_object_list} ]}. {translation, "emqx_lwm2m.topics", fun(Conf) -> diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl index ed5743203..31986da54 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl @@ -101,12 +101,12 @@ get_lwm2m_opts(Envs) -> AutoObserve = proplists:get_value(auto_observe, Envs, []), QmodeTimeWindow = proplists:get_value(qmode_time_window, Envs, []), Topics = proplists:get_value(topics, Envs, []), - PublishUpdateWhen = proplists:get_value(publish_update_when, Envs, object_list_changed), + PublishCondition = proplists:get_value(update_msg_publish_condition, Envs, contains_object_list), [{lifetime_max, LifetimeMax}, {lifetime_min, LifetimeMin}, {mountpoint, list_to_binary(Mountpoint)}, {port, Sockport}, {auto_observe, AutoObserve}, {qmode_time_window, QmodeTimeWindow}, - {publish_update_when, PublishUpdateWhen}, + {update_msg_publish_condition, PublishCondition}, {topics, Topics}]. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index c571e7cb0..45023783b 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -121,11 +121,11 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{ UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo), - case proplists:get_value(publish_update_when, - lwm2m_coap_responder:options(), object_list_changed) of + case proplists:get_value(update_msg_publish_condition, + lwm2m_coap_responder:options(), contains_object_list) of always -> send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); - object_list_changed -> + contains_object_list -> %% - report the registration info update, but only when objectList is updated. case NewRegInfo of #{<<"objectList">> := _} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 02781ca9b..860b9e702 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -269,16 +269,10 @@ do_update_resource_check(Id, NewParams) -> config = OldConfig, description = OldDescription} = _OldResource} -> try - do_update_resource(#{id => Id, - config => case maps:find(<<"config">>, NewParams) of - {ok, NewConfig} -> NewConfig; - error -> OldConfig - end, - type => Type, - description => case maps:find(<<"description">>, NewParams) of - {ok, NewDescription} -> NewDescription; - error -> OldDescription - end}), + Conifg = maps:get(<<"config">>, NewParams, OldConfig), + Descr = maps:get(<<"description">>, NewParams, OldDescription), + do_update_resource(#{id => Id, config => Conifg, type => Type, + description => Descr}), ok catch _ : Reason -> {error, Reason} @@ -294,11 +288,13 @@ do_update_resource(#{id := Id, type := Type, description := NewDescription, conf Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), case test_resource(#{type => Type, config => NewConfig}) of ok -> - Resource = #resource{id = Id, - type = Type, - config = Config, - description = NewDescription, - created_at = erlang:system_time(millisecond)}, + Resource = #resource{ + id = Id, + type = Type, + config = Config, + description = NewDescription, + created_at = erlang:system_time(millisecond) + }, cluster_call(init_resource, [Module, Create, Id, Config]), emqx_rule_registry:add_resource(Resource); {error, Reason} -> @@ -468,18 +464,19 @@ may_update_rule_params(Rule, Params = #{rawsql := SQL}) -> maps:remove(rawsql, Params)); Reason -> throw(Reason) end; -may_update_rule_params(Rule = #rule{enabled = OldE, actions = Actions}, - Params = #{enabled := ToE}) -> - case {OldE, ToE} of +may_update_rule_params(Rule = #rule{enabled = OldEnb, actions = Actions}, + Params = #{enabled := NewEnb}) -> + case {OldEnb, NewEnb} of {false, true} -> refresh_rule(Rule); {true, false} -> clear_actions(Actions); _ -> ok end, - may_update_rule_params(Rule#rule{enabled = ToE}, maps:remove(enabled, Params)); + may_update_rule_params(Rule#rule{enabled = NewEnb}, maps:remove(enabled, Params)); may_update_rule_params(Rule, Params = #{description := Descr}) -> may_update_rule_params(Rule#rule{description = Descr}, maps:remove(description, Params)); may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) -> - may_update_rule_params(Rule#rule{on_action_failed = OnFailed}, maps:remove(on_action_failed, Params)); + may_update_rule_params(Rule#rule{on_action_failed = OnFailed}, + maps:remove(on_action_failed, Params)); may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) -> %% prepare new actions before removing old ones NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)), From fcfcbf139d3d2de95ca27d9ff9705eb5e38151f5 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 24 Feb 2021 23:40:34 +0100 Subject: [PATCH 16/32] chore(webhook): merge enterprise to opensource --- apps/emqx_web_hook/etc/emqx_web_hook.conf | 4 +- apps/emqx_web_hook/include/emqx_web_hook.hrl | 2 +- apps/emqx_web_hook/priv/emqx_web_hook.schema | 5 +- apps/emqx_web_hook/rebar.config | 11 - .../emqx_web_hook/src/emqx_web_hook.appup.src | 10 + .../src/emqx_web_hook_actions.erl | 311 +++++++------ .../emqx_web_hook/test/prop_webhook_confs.erl | 142 ++++++ .../emqx_web_hook/test/prop_webhook_hooks.erl | 409 ++++++++++++++++++ .../test/props/prop_webhook_confs.erl | 6 +- 9 files changed, 743 insertions(+), 157 deletions(-) create mode 100644 apps/emqx_web_hook/src/emqx_web_hook.appup.src create mode 100644 apps/emqx_web_hook/test/prop_webhook_confs.erl create mode 100644 apps/emqx_web_hook/test/prop_webhook_hooks.erl diff --git a/apps/emqx_web_hook/etc/emqx_web_hook.conf b/apps/emqx_web_hook/etc/emqx_web_hook.conf index 159769394..218718079 100644 --- a/apps/emqx_web_hook/etc/emqx_web_hook.conf +++ b/apps/emqx_web_hook/etc/emqx_web_hook.conf @@ -5,10 +5,10 @@ ## Webhook URL ## ## Value: String -web.hook.url = http://127.0.0.1:80 +web.hook.api.url = http://127.0.0.1:80 ## HTTP Headers -## +## ## Example: ## 1. web.hook.headers.content-type = application/json ## 2. web.hook.headers.accept = * diff --git a/apps/emqx_web_hook/include/emqx_web_hook.hrl b/apps/emqx_web_hook/include/emqx_web_hook.hrl index 4666b4d27..73019ec8c 100644 --- a/apps/emqx_web_hook/include/emqx_web_hook.hrl +++ b/apps/emqx_web_hook/include/emqx_web_hook.hrl @@ -1 +1 @@ --define(APP, emqx_web_hook). \ No newline at end of file +-define(APP, emqx_web_hook). diff --git a/apps/emqx_web_hook/priv/emqx_web_hook.schema b/apps/emqx_web_hook/priv/emqx_web_hook.schema index 9610ae094..beed1b107 100644 --- a/apps/emqx_web_hook/priv/emqx_web_hook.schema +++ b/apps/emqx_web_hook/priv/emqx_web_hook.schema @@ -1,7 +1,7 @@ %%-*- mode: erlang -*- %% EMQ X R3.0 config mapping -{mapping, "web.hook.url", "emqx_web_hook.url", [ +{mapping, "web.hook.api.url", "emqx_web_hook.url", [ {datatype, string} ]}. @@ -15,14 +15,17 @@ ]}. {mapping, "web.hook.ssl.cacertfile", "emqx_web_hook.cacertfile", [ + {default, ""}, {datatype, string} ]}. {mapping, "web.hook.ssl.certfile", "emqx_web_hook.certfile", [ + {default, ""}, {datatype, string} ]}. {mapping, "web.hook.ssl.keyfile", "emqx_web_hook.keyfile", [ + {default, ""}, {datatype, string} ]}. diff --git a/apps/emqx_web_hook/rebar.config b/apps/emqx_web_hook/rebar.config index 3684b78b0..65d1434df 100644 --- a/apps/emqx_web_hook/rebar.config +++ b/apps/emqx_web_hook/rebar.config @@ -18,14 +18,3 @@ {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. - -{profiles, - [{test, - [{erl_opts, [export_all, nowarn_export_all]}, - {deps, - [{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}}, - {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}} - ]} - ]} - ]}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src new file mode 100644 index 000000000..0c7b8ebf3 --- /dev/null +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -0,0 +1,10 @@ +%% -*-: erlang -*- + +{VSN, + [ + {<<".*">>, []} + ], + [ + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index f5d27f09c..4d5fed64b 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -17,95 +17,111 @@ %% Define the default actions. -module(emqx_web_hook_actions). +-export([ on_resource_create/2 + , on_get_resource_status/2 + , on_resource_destroy/2 + ]). + +-export([ on_action_create_data_to_webserver/2 + , on_action_data_to_webserver/2 + ]). + +-export_type([action_fun/0]). + -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_rule_engine/include/rule_actions.hrl"). --include("emqx_web_hook.hrl"). + +-type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())). + +-type(url() :: binary()). -define(RESOURCE_TYPE_WEBHOOK, 'web_hook'). -define(RESOURCE_CONFIG_SPEC, #{ - url => #{ - order => 1, + method => #{order => 1, type => string, - format => url, - required => true, - title => #{en => <<"URL">>, - zh => <<"URL"/utf8>>}, - description => #{en => <<"The URL of the server that will receive the Webhook requests.">>, - zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>} - }, - connect_timeout => #{ - order => 2, - type => number, - default => 5, - title => #{en => <<"Connect Timeout">>, - zh => <<"连接超时时间"/utf8>>}, - description => #{en => <<"Connect timeout in seconds">>, - zh => <<"连接超时时间,单位秒"/utf8>>}}, - request_timeout => #{ - order => 3, - type => number, - default => 5, - title => #{en => <<"Request Timeout">>, - zh => <<"请求超时时间时间"/utf8>>}, - description => #{en => <<"Request timeout in seconds">>, - zh => <<"请求超时时间,单位秒"/utf8>>}}, - cacertfile => #{ - order => 4, - type => file, - default => <<>>, - title => #{en => <<"CA Certificate File">>, - zh => <<"CA 证书文件"/utf8>>}, - description => #{en => <<"CA certificate file.">>, - zh => <<"CA 证书文件。"/utf8>>} - }, - certfile => #{ - order => 5, - type => file, - default => <<>>, - title => #{en => <<"Certificate File">>, - zh => <<"证书文件"/utf8>>}, - description => #{en => <<"Certificate file.">>, - zh => <<"证书文件。"/utf8>>} - }, - keyfile => #{ - order => 6, - type => file, - default => <<>>, - title => #{en => <<"Private Key File">>, - zh => <<"私钥文件"/utf8>>}, - description => #{en => <<"Private key file.">>, - zh => <<"私钥文件。"/utf8>>} - }, - verify => #{ - order => 7, + enum => [<<"PUT">>,<<"POST">>], + default => <<"POST">>, + title => #{en => <<"Request Method">>, + zh => <<"请求方法"/utf8>>}, + description => #{en => <<"Request Method">>, + zh => <<"请求方法"/utf8>>}}, + url => #{order => 2, + type => string, + format => url, + required => true, + title => #{en => <<"Request URL">>, + zh => <<"请求 URL"/utf8>>}, + description => #{en => <<"The URL of the server that will receive the Webhook requests.">>, + zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}}, + headers => #{order => 3, + type => object, + schema => #{}, + default => #{}, + title => #{en => <<"Request Header">>, + zh => <<"请求头"/utf8>>}, + description => #{en => <<"Request Header">>, + zh => <<"请求头"/utf8>>}}, + connect_timeout => #{order => 4, + type => string, + default => <<"5s">>, + title => #{en => <<"Connect Timeout">>, + zh => <<"连接超时时间"/utf8>>}, + description => #{en => <<"Connect Timeout In Seconds">>, + zh => <<"连接超时时间"/utf8>>}}, + request_timeout => #{order => 5, + type => string, + default => <<"5s">>, + title => #{en => <<"Request Timeout">>, + zh => <<"请求超时时间时间"/utf8>>}, + description => #{en => <<"Request Timeout In Seconds">>, + zh => <<"请求超时时间"/utf8>>}}, + pool_size => #{order => 6, + type => number, + default => 8, + title => #{en => <<"Pool Size">>, zh => <<"连接池大小"/utf8>>}, + description => #{en => <<"Connection Pool">>, + zh => <<"连接池大小"/utf8>>} + }, + cacertfile => #{order => 7, + type => file, + default => <<"">>, + title => #{en => <<"CA Certificate File">>, + zh => <<"CA 证书文件"/utf8>>}, + description => #{en => <<"CA Certificate file">>, + zh => <<"CA 证书文件"/utf8>>}}, + keyfile => #{order => 8, + type => file, + default => <<"">>, + title =>#{en => <<"SSL Key">>, + zh => <<"SSL Key"/utf8>>}, + description => #{en => <<"Your ssl keyfile">>, + zh => <<"SSL 私钥"/utf8>>}}, + certfile => #{order => 9, + type => file, + default => <<"">>, + title =>#{en => <<"SSL Cert">>, + zh => <<"SSL Cert"/utf8>>}, + description => #{en => <<"Your ssl certfile">>, + zh => <<"SSL 证书"/utf8>>}}, + verify => #{order => 10, type => boolean, default => false, - title => #{en => <<"Verify">>, - zh => <<"Verify"/utf8>>}, - description => #{en => <<"Turn on peer certificate verification.">>, - zh => <<"是否开启对端证书验证。"/utf8>>} - }, - pool_size => #{ - order => 8, - type => number, - default => 32, - title => #{en => <<"Pool Size">>, - zh => <<"连接池大小"/utf8>>}, - description => #{en => <<"Pool size for HTTP server.">>, - zh => <<"HTTP server 连接池大小。"/utf8>>} - } - }). + title =>#{en => <<"Verify Server Certfile">>, + zh => <<"校验服务器证书"/utf8>>}, + description => #{en => <<"Whether to verify the server certificate. By default, the client will not verify the server's certificate. If verification is required, please set it to true.">>, + zh => <<"是否校验服务器证书。 默认客户端不会去校验服务器的证书,如果需要校验,请设置成true。"/utf8>>}} +}). -define(ACTION_PARAM_RESOURCE, #{ - order => 0, - type => string, - required => true, - title => #{en => <<"Resource ID">>, - zh => <<"资源 ID"/utf8>>}, - description => #{en => <<"Bind a resource to this action.">>, - zh => <<"给动作绑定一个资源。"/utf8>>} - }). + order => 0, + type => string, + required => true, + title => #{en => <<"Resource ID">>, + zh => <<"资源 ID"/utf8>>}, + description => #{en => <<"Bind a resource to this action">>, + zh => <<"给动作绑定一个资源"/utf8>>} +}). -define(ACTION_DATA_SPEC, #{ '$resource' => ?ACTION_PARAM_RESOURCE, @@ -153,39 +169,29 @@ "默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}} }). --resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK, - create => on_resource_create, - status => on_get_resource_status, - destroy => on_resource_destroy, - params => ?RESOURCE_CONFIG_SPEC, - title => #{en => <<"WebHook">>, - zh => <<"WebHook"/utf8>>}, - description => #{en => <<"WebHook">>, - zh => <<"WebHook"/utf8>>} - }). +-resource_type( + #{name => ?RESOURCE_TYPE_WEBHOOK, + create => on_resource_create, + status => on_get_resource_status, + destroy => on_resource_destroy, + params => ?RESOURCE_CONFIG_SPEC, + title => #{en => <<"WebHook">>, + zh => <<"WebHook"/utf8>>}, + description => #{en => <<"WebHook">>, + zh => <<"WebHook"/utf8>>} +}). -rule_action(#{name => data_to_webserver, - category => data_forward, - for => '$any', - create => on_action_create_data_to_webserver, - params => ?ACTION_DATA_SPEC, - types => [?RESOURCE_TYPE_WEBHOOK], - title => #{en => <<"Data to Web Server">>, - zh => <<"发送数据到 Web 服务"/utf8>>}, - description => #{en => <<"Forward Messages to Web Server">>, - zh => <<"将数据转发给 Web 服务"/utf8>>} - }). - --type(url() :: binary()). - --export([ on_resource_create/2 - , on_get_resource_status/2 - , on_resource_destroy/2 - ]). - --export([ on_action_create_data_to_webserver/2 - , on_action_data_to_webserver/2 - ]). + category => data_forward, + for => '$any', + create => on_action_create_data_to_webserver, + params => ?ACTION_DATA_SPEC, + types => [?RESOURCE_TYPE_WEBHOOK], + title => #{en => <<"Data to Web Server">>, + zh => <<"发送数据到 Web 服务"/utf8>>}, + description => #{en => <<"Forward Messages to Web Server">>, + zh => <<"将数据转发给 Web 服务"/utf8>>} +}). %%------------------------------------------------------------------------------ %% Actions for web hook @@ -194,7 +200,7 @@ -spec(on_resource_create(binary(), map()) -> map()). on_resource_create(ResId, Conf) -> {ok, _} = application:ensure_all_started(ehttpc), - Options = pool_opts(Conf), + Options = pool_opts(Conf, ResId), PoolName = pool_name(ResId), case test_http_connect(Conf) of true -> ok; @@ -299,7 +305,7 @@ parse_action_params(Params = #{<<"url">> := URL}) -> path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))), headers => NHeaders, body => maps:get(<<"body">>, Params, <<>>), - request_timeout => timer:seconds(maps:get(<<"request_timeout">>, Params, 5)), + request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))), pool => maps:get(<<"pool">>, Params)} catch _:_ -> throw({invalid_params, Params}) @@ -328,50 +334,77 @@ str(Str) when is_list(Str) -> Str; str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Bin) when is_binary(Bin) -> binary_to_list(Bin). -pool_opts(Params = #{<<"url">> := URL}) -> - #{host := Host0, - scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(URL)), +add_default_scheme(<<"http://", _/binary>> = URL) -> + URL; +add_default_scheme(<<"https://", _/binary>> = URL) -> + URL; +add_default_scheme(URL) -> + <<"http://", URL/binary>>. + +pool_opts(Params = #{<<"url">> := URL}, ResId) -> + #{host := Host0, scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(add_default_scheme(URL))), Port = maps:get(port, URIMap, case Scheme of "https" -> 443; _ -> 80 end), PoolSize = maps:get(<<"pool_size">>, Params, 32), - ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)), + ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), {Inet, Host} = parse_host(Host0), - MoreOpts = case Scheme of - "http" -> - [{transport_opts, [Inet]}]; - "https" -> - KeyFile = maps:get(<<"keyfile">>, Params), - CertFile = maps:get(<<"certfile">>, Params), - CACertFile = maps:get(<<"cacertfile">>, Params), - VerifyType = case maps:get(<<"verify">>, Params) of - true -> verify_peer; - false -> verify_none - end, - TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> -> - false; - (_) -> - true - end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]), - NTLSOpts = [ {verify, VerifyType} - , {versions, emqx_tls_lib:default_versions()} - , {ciphers, emqx_tls_lib:default_ciphers()} - | TLSOpts - ], - [{transport, ssl}, {transport_opts, [Inet | NTLSOpts]}] - end, + SslOpts = get_ssl_options(Params, ResId, add_default_scheme(URL)), [{host, Host}, {port, Port}, {pool_size, PoolSize}, {pool_type, hash}, {connect_timeout, ConnectTimeout}, {retry, 5}, - {retry_timeout, 1000}] ++ MoreOpts. + {retry_timeout, 1000}, + {transport_opts, [Inet] ++ SslOpts}]. pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). +get_ssl_options(Config, ResId, <<"https://", _URL/binary>>) -> + [{transport, ssl}, {transport_opts, get_ssl_opts(Config, ResId)}]; +get_ssl_options(_Config, _ResId, _URL) -> + []. + +get_ssl_opts(Opts, ResId) -> + KeyFile = maps:get(<<"keyfile">>, Opts, undefined), + CertFile = maps:get(<<"certfile">>, Opts, undefined), + CAFile = maps:get(<<"cacertfile">>, Opts, undefined), + Filter = fun(Opts1) -> + [{K, V} || {K, V} <- Opts1, + V =/= undefined, + V =/= <<>>, + V =/= "" ] + end, + Key = save_upload_file(KeyFile, ResId), + Cert = save_upload_file(CertFile, ResId), + CA = save_upload_file(CAFile, ResId), + Verify = case maps:get(<<"verify">>, Opts, false) of + false -> verify_none; + true -> verify_peer + end, + case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of + [] -> [{verify, Verify}]; + SslOpts -> + [{verify, Verify} | SslOpts] + end. + +save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> ""; +save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath); +save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) -> + FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]), + ok = filelib:ensure_dir(FullFilename), + case file:write_file(FullFilename, File) of + ok -> + binary_to_list(FullFilename); + {error, Reason} -> + logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]), + error({ResId, store_file_fail}) + end; +save_upload_file(_, _) -> "". + parse_host(Host) -> case inet:parse_address(Host) of {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr}; diff --git a/apps/emqx_web_hook/test/prop_webhook_confs.erl b/apps/emqx_web_hook/test/prop_webhook_confs.erl new file mode 100644 index 000000000..bfe170239 --- /dev/null +++ b/apps/emqx_web_hook/test/prop_webhook_confs.erl @@ -0,0 +1,142 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_webhook_confs). +-include_lib("proper/include/proper.hrl"). + +-import(emqx_ct_proper_types, + [ url/0 + , nof/1 + ]). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_confs() -> + Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")), + ?ALL({Url, Confs0}, {url(), confs()}, + begin + Confs = [{"web.hook.api.url", Url}|Confs0], + Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)), + + assert_confs(Confs, Envs), + + set_application_envs(Envs), + {ok, _} = application:ensure_all_started(emqx_web_hook), + application:stop(emqx_web_hook), + unset_application_envs(Envs), + true + end). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +do_setup() -> + application:set_env(kernel, logger_level, error), + emqx_ct_helpers:start_apps([], fun set_special_cfgs/1), + ok. + +do_teardown(_) -> + emqx_ct_helpers:stop_apps([]), + ok. + +set_special_cfgs(_) -> + application:set_env(emqx, plugins_loaded_file, undefined), + application:set_env(emqx, modules_loaded_file, undefined), + ok. + +assert_confs([{"web.hook.api.url", Url}|More], Envs) -> + %% Assert! + Url = deep_get_env("emqx_web_hook.url", Envs), + assert_confs(More, Envs); + +assert_confs([{"web.hook.rule." ++ HookName0, Spec}|More], Envs) -> + HookName = re:replace(HookName0, "\\.[0-9]", "", [{return, list}]), + Rules = deep_get_env("emqx_web_hook.rules", Envs), + + %% Assert! + Spec = proplists:get_value(HookName, Rules), + + assert_confs(More, Envs); + +assert_confs([_|More], Envs) -> + assert_confs(More, Envs); + +assert_confs([], _) -> + true. + +deep_get_env(Path, Envs) -> + lists:foldl( + fun(_K, undefiend) -> undefiend; + (K, Acc) -> proplists:get_value(binary_to_atom(K, utf8), Acc) + end, Envs, re:split(Path, "\\.")). + +set_application_envs(Envs) -> + application:set_env(Envs). + +unset_application_envs(Envs) -> + lists:foreach(fun({App, Es}) -> + lists:foreach(fun({K, _}) -> + application:unset_env(App, K) + end, Es) end, Envs). + +cuttlefish_conf_file(Ls) when is_list(Ls) -> + [cuttlefish_conf_option(K,V) || {K, V} <- Ls]. + +cuttlefish_conf_option(K, V) + when is_list(K) -> + {re:split(K, "[.]", [{return, list}]), V}. + +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +confs() -> + nof([{"web.hook.encode_payload", oneof(["base64", "base62"])}, + {"web.hook.rule.client.connect.1", rule_spec()}, + {"web.hook.rule.client.connack.1", rule_spec()}, + {"web.hook.rule.client.connected.1", rule_spec()}, + {"web.hook.rule.client.disconnected.1", rule_spec()}, + {"web.hook.rule.client.subscribe.1", rule_spec()}, + {"web.hook.rule.client.unsubscribe.1", rule_spec()}, + {"web.hook.rule.session.subscribed.1", rule_spec()}, + {"web.hook.rule.session.unsubscribed.1", rule_spec()}, + {"web.hook.rule.session.terminated.1", rule_spec()}, + {"web.hook.rule.message.publish.1", rule_spec()}, + {"web.hook.rule.message.delivered.1", rule_spec()}, + {"web.hook.rule.message.acked.1", rule_spec()} + ]). + +rule_spec() -> + ?LET(Action, action_names(), + begin + binary_to_list(emqx_json:encode(#{action => Action})) + end). + +action_names() -> + oneof([on_client_connect, on_client_connack, on_client_connected, + on_client_connected, on_client_disconnected, on_client_subscribe, on_client_unsubscribe, + on_session_subscribed, on_session_unsubscribed, on_session_terminated, + on_message_publish, on_message_delivered, on_message_acked]). + diff --git a/apps/emqx_web_hook/test/prop_webhook_hooks.erl b/apps/emqx_web_hook/test/prop_webhook_hooks.erl new file mode 100644 index 000000000..4e51573a6 --- /dev/null +++ b/apps/emqx_web_hook/test/prop_webhook_hooks.erl @@ -0,0 +1,409 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_webhook_hooks). + +-include_lib("proper/include/proper.hrl"). + +-import(emqx_ct_proper_types, + [ conninfo/0 + , clientinfo/0 + , sessioninfo/0 + , message/0 + , connack_return_code/0 + , topictab/0 + , topic/0 + , subopts/0 + ]). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_client_connect() -> + ?ALL({ConnInfo, ConnProps, Env}, + {conninfo(), conn_properties(), empty_env()}, + begin + ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_connect, + node => stringfy(node()), + clientid => maps:get(clientid, ConnInfo), + username => maybe(maps:get(username, ConnInfo)), + ipaddress => peer2addr(maps:get(peername, ConnInfo)), + keepalive => maps:get(keepalive, ConnInfo), + proto_ver => maps:get(proto_ver, ConnInfo) + }), + true + end). + +prop_client_connack() -> + ?ALL({ConnInfo, Rc, AckProps, Env}, + {conninfo(), connack_return_code(), ack_properties(), empty_env()}, + begin + ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_connack, + node => stringfy(node()), + clientid => maps:get(clientid, ConnInfo), + username => maybe(maps:get(username, ConnInfo)), + ipaddress => peer2addr(maps:get(peername, ConnInfo)), + keepalive => maps:get(keepalive, ConnInfo), + proto_ver => maps:get(proto_ver, ConnInfo), + conn_ack => Rc + }), + true + end). + +prop_client_connected() -> + ?ALL({ClientInfo, ConnInfo, Env}, + {clientinfo(), conninfo(), empty_env()}, + begin + ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_connected, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + ipaddress => peer2addr(maps:get(peerhost, ClientInfo)), + keepalive => maps:get(keepalive, ConnInfo), + proto_ver => maps:get(proto_ver, ConnInfo), + connected_at => maps:get(connected_at, ConnInfo) + }), + true + end). + +prop_client_disconnected() -> + ?ALL({ClientInfo, Reason, ConnInfo, Env}, + {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()}, + begin + ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_disconnected, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + disconnected_at => maps:get(disconnected_at, ConnInfo), + reason => stringfy(Reason) + }), + true + end). + +prop_client_subscribe() -> + ?ALL({ClientInfo, SubProps, TopicTab, Env}, + {clientinfo(), sub_properties(), topictab(), topic_filter_env()}, + begin + ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env), + + Matched = filter_topictab(TopicTab, Env), + + lists:foreach(fun({Topic, Opts}) -> + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_subscribe, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic, + opts => Opts}) + end, Matched), + true + end). + +prop_client_unsubscribe() -> + ?ALL({ClientInfo, SubProps, TopicTab, Env}, + {clientinfo(), unsub_properties(), topictab(), topic_filter_env()}, + begin + ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env), + + Matched = filter_topictab(TopicTab, Env), + + lists:foreach(fun({Topic, Opts}) -> + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => client_unsubscribe, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic, + opts => Opts}) + end, Matched), + true + end). + +prop_session_subscribed() -> + ?ALL({ClientInfo, Topic, SubOpts, Env}, + {clientinfo(), topic(), subopts(), topic_filter_env()}, + begin + ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env), + filter_topic_match(Topic, Env) andalso begin + Body = receive_http_request_body(), + Body1 = emqx_json:encode( + #{action => session_subscribed, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic, + opts => SubOpts + }), + Body = Body1 + end, + true + end). + +prop_session_unsubscribed() -> + ?ALL({ClientInfo, Topic, SubOpts, Env}, + {clientinfo(), topic(), subopts(), empty_env()}, + begin + ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env), + filter_topic_match(Topic, Env) andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => session_unsubscribed, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + topic => Topic + }) + end, + true + end). + +prop_session_terminated() -> + ?ALL({ClientInfo, Reason, SessInfo, Env}, + {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()}, + begin + ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env), + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => session_terminated, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + reason => stringfy(Reason) + }), + true + end). + +prop_message_publish() -> + ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()}, + begin + application:set_env(emqx_web_hook, encode_payload, Encode), + {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env), + application:unset_env(emqx_web_hook, encode_payload), + + (not emqx_message:is_sys(Msg)) + andalso filter_topic_match(emqx_message:topic(Msg), Env) + andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => message_publish, + node => stringfy(node()), + from_client_id => emqx_message:from(Msg), + from_username => maybe(emqx_message:get_header(username, Msg)), + topic => emqx_message:topic(Msg), + qos => emqx_message:qos(Msg), + retain => emqx_message:get_flag(retain, Msg), + payload => encode(emqx_message:payload(Msg), Encode), + ts => emqx_message:timestamp(Msg) + }) + end, + true + end). + +prop_message_delivered() -> + ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()}, + begin + application:set_env(emqx_web_hook, encode_payload, Encode), + ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env), + application:unset_env(emqx_web_hook, encode_payload), + + (not emqx_message:is_sys(Msg)) + andalso filter_topic_match(emqx_message:topic(Msg), Env) + andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => message_delivered, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + from_client_id => emqx_message:from(Msg), + from_username => maybe(emqx_message:get_header(username, Msg)), + topic => emqx_message:topic(Msg), + qos => emqx_message:qos(Msg), + retain => emqx_message:get_flag(retain, Msg), + payload => encode(emqx_message:payload(Msg), Encode), + ts => emqx_message:timestamp(Msg) + }) + end, + true + end). + +prop_message_acked() -> + ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()}, + begin + application:set_env(emqx_web_hook, encode_payload, Encode), + ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env), + application:unset_env(emqx_web_hook, encode_payload), + + (not emqx_message:is_sys(Msg)) + andalso filter_topic_match(emqx_message:topic(Msg), Env) + andalso begin + Body = receive_http_request_body(), + Body = emqx_json:encode( + #{action => message_acked, + node => stringfy(node()), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo)), + from_client_id => emqx_message:from(Msg), + from_username => maybe(emqx_message:get_header(username, Msg)), + topic => emqx_message:topic(Msg), + qos => emqx_message:qos(Msg), + retain => emqx_message:get_flag(retain, Msg), + payload => encode(emqx_message:payload(Msg), Encode), + ts => emqx_message:timestamp(Msg) + }) + end, + true + end). + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- +do_setup() -> + %% Pre-defined envs + application:set_env(emqx_web_hook, path, "path"), + application:set_env(emqx_web_hook, headers, []), + + meck:new(ehttpc_pool, [passthrough, no_history]), + meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end), + + Self = self(), + meck:new(ehttpc, [passthrough, no_history]), + meck:expect(ehttpc, request, + fun(_ClientId, Method, {Path, Headers, Body}) -> + Self ! {Method, Path, Headers, Body}, {ok, ok, ok} + end), + + meck:new(emqx_metrics, [passthrough, no_history]), + meck:expect(emqx_metrics, inc, fun(_) -> ok end), + ok. + +do_teardown(_) -> + meck:unload(ehttpc_pool), + meck:unload(ehttpc), + meck:unload(emqx_metrics). + +maybe(undefined) -> null; +maybe(T) -> T. + +peer2addr({Host, _}) -> + list_to_binary(inet:ntoa(Host)); +peer2addr(Host) -> + list_to_binary(inet:ntoa(Host)). + +ensure_to_binary(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +ensure_to_binary(Bin) when is_binary(Bin) -> Bin. + +stringfy({shutdown, Reason}) -> + stringfy(Reason); +stringfy(Term) when is_atom(Term); is_binary(Term) -> + Term; +stringfy(Term) -> + unicode:characters_to_binary(io_lib:format("~0p", [Term])). + +receive_http_request_body() -> + receive + {post, _, _, Body} -> + Body + after 100 -> + exit(waiting_message_timeout) + end. + +receive_http_request_bodys() -> + receive_http_request_bodys_([]). + +receive_http_request_bodys_(Acc) -> + receive + {post, _, _, Body} -> + receive_http_request_bodys_([Body|Acc]) + after 1000 -> + lists:reverse(Acc) + end. + +filter_topictab(TopicTab, {undefined}) -> + TopicTab; +filter_topictab(TopicTab, {TopicFilter}) -> + lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab). + +filter_topic_match(_Topic, {undefined}) -> + true; +filter_topic_match(Topic, {TopicFilter}) -> + emqx_topic:match(Topic, TopicFilter). + +encode(Bin, base64) -> + base64:encode(Bin); +encode(Bin, base62) -> + emqx_base62:encode(Bin); +encode(Bin, _) -> + Bin. + +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +conn_properties() -> + #{}. + +ack_properties() -> + #{}. + +sub_properties() -> + #{}. + +unsub_properties() -> + #{}. + +shutdown_reason() -> + oneof([any(), {shutdown, atom()}]). + +empty_env() -> + {undefined}. + +topic_filter_env() -> + oneof([{<<"#">>}, {undefined}, {topic()}]). + +payload_encode() -> + oneof([base62, base64, undefined]). + +http_code() -> + oneof([socket_closed_remotely, others]). + +disconnected_conninfo() -> + ?LET(Info, conninfo(), + begin + Info#{disconnected_at => erlang:system_time(millisecond)} + end). diff --git a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl index 24903ddec..bfe170239 100644 --- a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl +++ b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl @@ -34,8 +34,9 @@ prop_confs() -> Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")), - ?ALL(Confs, confs(), + ?ALL({Url, Confs0}, {url(), confs()}, begin + Confs = [{"web.hook.api.url", Url}|Confs0], Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)), assert_confs(Confs, Envs), @@ -112,8 +113,7 @@ cuttlefish_conf_option(K, V) %%-------------------------------------------------------------------- confs() -> - nof([{"web.hook.api.url", url()}, - {"web.hook.encode_payload", oneof(["base64", "base62"])}, + nof([{"web.hook.encode_payload", oneof(["base64", "base62"])}, {"web.hook.rule.client.connect.1", rule_spec()}, {"web.hook.rule.client.connack.1", rule_spec()}, {"web.hook.rule.client.connected.1", rule_spec()}, From 219eeed6d79a7f18209430cd6ae31fb00d72cbf2 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 25 Feb 2021 10:16:37 +0100 Subject: [PATCH 17/32] fix(webhook): Explicit default tls version and cipher --- apps/emqx_web_hook/src/emqx_web_hook_actions.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 4d5fed64b..bfbc89daa 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -364,7 +364,11 @@ pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). get_ssl_options(Config, ResId, <<"https://", _URL/binary>>) -> - [{transport, ssl}, {transport_opts, get_ssl_opts(Config, ResId)}]; + [{transport, ssl}, + {transport_opts, get_ssl_opts(Config, ResId)}, + {versions, emqx_tls_lib:default_versions()}, + {ciphers, emqx_tls_lib:default_ciphers()} + ]; get_ssl_options(_Config, _ResId, _URL) -> []. From da6f1104dc912b9fcb7dabfafdab00d721f3b2df Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 25 Feb 2021 00:50:43 +0100 Subject: [PATCH 18/32] chore(bridge-mqtt): sync enterprise code --- .../src/emqx_bridge_mqtt.appup.src | 10 +++ .../emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl | 1 - .../src/emqx_bridge_mqtt_actions.erl | 73 ++++++++++++++----- 3 files changed, 63 insertions(+), 21 deletions(-) create mode 100644 apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src new file mode 100644 index 000000000..f6d128b08 --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -0,0 +1,10 @@ +%% -*-: erlang -*- + +{VSN, + [ + {<<".*">>, []} + ], + [ + {<<"*.">>, []} + ] +}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index 3f63cdb46..4692561e7 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -195,4 +195,3 @@ feedvar(max_inflight, 0, _) -> feedvar(max_inflight, Size, _) -> Size. - diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 87bc6c694..4824c1c1f 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -185,18 +185,16 @@ }, ssl => #{ order => 14, - type => string, - required => false, - default => <<"off">>, - enum => [<<"on">>, <<"off">>], - title => #{en => <<"Bridge SSL">>, - zh => <<"Bridge SSL"/utf8>>}, - description => #{en => <<"Switch which used to enable ssl connection of the bridge">>, - zh => <<"是否启用 Bridge SSL 连接"/utf8>>} + type => boolean, + default => false, + title => #{en => <<"Enable SSL">>, + zh => <<"开启SSL链接"/utf8>>}, + description => #{en => <<"Enable SSL or not">>, + zh => <<"是否开启 SSL"/utf8>>} }, cacertfile => #{ order => 15, - type => string, + type => file, required => false, default => <<"etc/certs/cacert.pem">>, title => #{en => <<"CA certificates">>, @@ -206,7 +204,7 @@ }, certfile => #{ order => 16, - type => string, + type => file, required => false, default => <<"etc/certs/client-cert.pem">>, title => #{en => <<"SSL Certfile">>, @@ -216,7 +214,7 @@ }, keyfile => #{ order => 17, - type => string, + type => file, required => false, default => <<"etc/certs/client-key.pem">>, title => #{en => <<"SSL Keyfile">>, @@ -246,7 +244,6 @@ } }). - -define(RESOURCE_CONFIG_SPEC_MQTT_SUB, #{ address => #{ order => 1, @@ -424,7 +421,6 @@ } }). - -define(RESOURCE_CONFIG_SPEC_RPC, #{ address => #{ order => 1, @@ -573,7 +569,7 @@ on_resource_create(ResId, Params) -> ?LOG(info, "Initiating Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), {ok, _} = application:ensure_all_started(ecpool), PoolName = pool_name(ResId), - Options = options(Params, PoolName), + Options = options(Params, PoolName, ResId), start_resource(ResId, PoolName, Options), case test_resource_status(PoolName) of true -> ok; @@ -719,7 +715,7 @@ name(Pool, Id) -> pool_name(ResId) -> list_to_atom("bridge_mqtt:" ++ str(ResId)). -options(Options, PoolName) -> +options(Options, PoolName, ResId) -> GetD = fun(Key, Default) -> maps:get(Key, Options, Default) end, Get = fun(Key) -> GetD(Key, undefined) end, Address = Get(<<"address">>), @@ -757,16 +753,13 @@ options(Options, PoolName) -> {proto_ver, mqtt_ver(Get(<<"proto_ver">>))}, {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)}, {ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))}, - {ssl_opts, [ {keyfile, str(Get(<<"keyfile">>))} - , {certfile, str(Get(<<"certfile">>))} - , {cacertfile, str(Get(<<"cacertfile">>))} - , {versions, TlsVersions} + {ssl_opts, [ {versions, TlsVersions} , {ciphers, emqx_tls_lib:integral_ciphers(TlsVersions, Get(<<"ciphers">>))} + | get_ssl_opts(Options, ResId) ]} ] ++ Subscriptions1 end. - mqtt_ver(ProtoVer) -> case ProtoVer of <<"mqttv3">> -> v3; @@ -779,3 +772,43 @@ format_subscriptions(SubOpts) -> lists:map(fun(Sub) -> {maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)} end, SubOpts). + +get_ssl_opts(Opts, ResId) -> + KeyFile = maps:get(<<"keyfile">>, Opts, undefined), + CertFile = maps:get(<<"certfile">>, Opts, undefined), + CAFile = case maps:get(<<"cacertfile">>, Opts, undefined) of + undefined -> maps:get(<<"cafile">>, Opts, undefined); + CAFile0 -> CAFile0 + end, + Filter = fun(Opts1) -> + [{K, V} || {K, V} <- Opts1, + V =/= undefined, + V =/= <<>>, + V =/= "" ] + end, + Key = save_upload_file(KeyFile, ResId), + Cert = save_upload_file(CertFile, ResId), + CA = save_upload_file(CAFile, ResId), + Verify = case maps:get(<<"verify">>, Opts, false) of + false -> verify_none; + true -> verify_peer + end, + case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of + [] -> [{verify, Verify}]; + SslOpts -> + [{verify, Verify} | SslOpts] + end. + +save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> ""; +save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath); +save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) -> + FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]), + ok = filelib:ensure_dir(FullFilename), + case file:write_file(FullFilename, File) of + ok -> + binary_to_list(FullFilename); + {error, Reason} -> + logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]), + error({ResId, store_file_fail}) + end; +save_upload_file(_, _) -> "". From 812c57dee990f3469be67b142ec90a827682212a Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 25 Feb 2021 11:32:34 +0100 Subject: [PATCH 19/32] refactor(plubin_libs): Add emqx_plugin_libs app --- apps/emqx_plugin_libs/emqx_plugin_libs.app.src | 7 +++++++ apps/emqx_plugin_libs/emqx_plugin_libs.erl | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 apps/emqx_plugin_libs/emqx_plugin_libs.app.src create mode 100644 apps/emqx_plugin_libs/emqx_plugin_libs.erl diff --git a/apps/emqx_plugin_libs/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/emqx_plugin_libs.app.src new file mode 100644 index 000000000..9b773e80c --- /dev/null +++ b/apps/emqx_plugin_libs/emqx_plugin_libs.app.src @@ -0,0 +1,7 @@ +{application, emqx_plugin_libs, + [{description, "EMQ X Plugin utility libs"}, + {vsn, "4.3.0"}, + {modules, []}, + {applications, [kernel,stdlib]}, + {env, []} + ]}. diff --git a/apps/emqx_plugin_libs/emqx_plugin_libs.erl b/apps/emqx_plugin_libs/emqx_plugin_libs.erl new file mode 100644 index 000000000..2f84be06f --- /dev/null +++ b/apps/emqx_plugin_libs/emqx_plugin_libs.erl @@ -0,0 +1,18 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_libs). + From 700fa71754dec4288d5b31d5f29b896134f2d9bf Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 25 Feb 2021 14:49:24 +0100 Subject: [PATCH 20/32] refactor(tls): abstract lib for tls options parsing --- .gitignore | 1 + .../src/emqx_bridge_mqtt_actions.erl | 56 ++---------- .../{ => src}/emqx_plugin_libs.app.src | 0 .../{ => src}/emqx_plugin_libs.erl | 0 .../src/emqx_plugin_libs_ssl.erl | 89 +++++++++++++++++++ .../test/emqx_plugin_libs_ssl_tests.erl | 78 ++++++++++++++++ src/emqx_tls_lib.erl | 45 +++++++++- 7 files changed, 219 insertions(+), 50 deletions(-) rename apps/emqx_plugin_libs/{ => src}/emqx_plugin_libs.app.src (100%) rename apps/emqx_plugin_libs/{ => src}/emqx_plugin_libs.erl (100%) create mode 100644 apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl create mode 100644 apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl diff --git a/.gitignore b/.gitignore index 51b4acf83..387a3ff90 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .eunit +test-data/ deps !deps/.placeholder *.o diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 4824c1c1f..56a461241 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -739,8 +739,6 @@ options(Options, PoolName, ResId) -> Topic -> [{subscriptions, [{Topic, Get(<<"qos">>)}]} | Subscriptions] end, - %% TODO check why only ciphers are configurable but not versions - TlsVersions = emqx_tls_lib:default_versions(), [{address, binary_to_list(Address)}, {bridge_mode, GetD(<<"bridge_mode">>, true)}, {clean_start, true}, @@ -751,15 +749,17 @@ options(Options, PoolName, ResId) -> {username, str(Get(<<"username">>))}, {password, str(Get(<<"password">>))}, {proto_ver, mqtt_ver(Get(<<"proto_ver">>))}, - {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)}, - {ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))}, - {ssl_opts, [ {versions, TlsVersions} - , {ciphers, emqx_tls_lib:integral_ciphers(TlsVersions, Get(<<"ciphers">>))} - | get_ssl_opts(Options, ResId) - ]} + {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)} + | maybe_ssl(Options, cuttlefish_flag:parse(str(Get(<<"ssl">>))), ResId) ] ++ Subscriptions1 end. +maybe_ssl(_Options, false, _ResId) -> + [{ssl, false}]; +maybe_ssl(Options, true, ResId) -> + Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]), + [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Options, Dir)}]. + mqtt_ver(ProtoVer) -> case ProtoVer of <<"mqttv3">> -> v3; @@ -772,43 +772,3 @@ format_subscriptions(SubOpts) -> lists:map(fun(Sub) -> {maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)} end, SubOpts). - -get_ssl_opts(Opts, ResId) -> - KeyFile = maps:get(<<"keyfile">>, Opts, undefined), - CertFile = maps:get(<<"certfile">>, Opts, undefined), - CAFile = case maps:get(<<"cacertfile">>, Opts, undefined) of - undefined -> maps:get(<<"cafile">>, Opts, undefined); - CAFile0 -> CAFile0 - end, - Filter = fun(Opts1) -> - [{K, V} || {K, V} <- Opts1, - V =/= undefined, - V =/= <<>>, - V =/= "" ] - end, - Key = save_upload_file(KeyFile, ResId), - Cert = save_upload_file(CertFile, ResId), - CA = save_upload_file(CAFile, ResId), - Verify = case maps:get(<<"verify">>, Opts, false) of - false -> verify_none; - true -> verify_peer - end, - case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of - [] -> [{verify, Verify}]; - SslOpts -> - [{verify, Verify} | SslOpts] - end. - -save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> ""; -save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath); -save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) -> - FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]), - ok = filelib:ensure_dir(FullFilename), - case file:write_file(FullFilename, File) of - ok -> - binary_to_list(FullFilename); - {error, Reason} -> - logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]), - error({ResId, store_file_fail}) - end; -save_upload_file(_, _) -> "". diff --git a/apps/emqx_plugin_libs/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src similarity index 100% rename from apps/emqx_plugin_libs/emqx_plugin_libs.app.src rename to apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src diff --git a/apps/emqx_plugin_libs/emqx_plugin_libs.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs.erl similarity index 100% rename from apps/emqx_plugin_libs/emqx_plugin_libs.erl rename to apps/emqx_plugin_libs/src/emqx_plugin_libs.erl diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl new file mode 100644 index 000000000..4b0746335 --- /dev/null +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_ssl.erl @@ -0,0 +1,89 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_libs_ssl). + +-export([save_files_return_opts/2]). + +-type file_input_key() :: binary(). %% <<"file">> | <<"filename">> +-type file_input() :: #{file_input_key() => binary()}. + +%% options are below paris +%% <<"keyfile">> => file_input() +%% <<"certfile">> => file_input() +%% <<"cafile">> => file_input() %% backward compatible +%% <<"cacertfile">> => file_input() +%% <<"verify">> => boolean() +%% <<"tls_versions">> => binary() +%% <<"ciphers">> => binary() +-type opts_key() :: binary(). +-type opts_input() :: #{opts_key() => file_input() | boolean() | binary()}. + +-type opt_key() :: keyfile | certfile | cacertfile | verify | versions | ciphers. +-type opt_value() :: term(). +-type opts() :: [{opt_key(), opt_value()}]. + +%% @doc Parse ssl options input. +%% If the input contains file content, save the files in the given dir. +%% Returns ssl options for Erlang's ssl application. +-spec save_files_return_opts(opts_input(), file:name_all()) -> opts(). +save_files_return_opts(Options, Dir) -> + GetD = fun(Key, Default) -> maps:get(Key, Options, Default) end, + Get = fun(Key) -> GetD(Key, undefined) end, + KeyFile = Get(<<"keyfile">>), + CertFile = Get(<<"certfile">>), + CAFile = GetD(<<"cacertfile">>, Get(<<"cafile">>)), + Key = save_file(KeyFile, Dir), + Cert = save_file(CertFile, Dir), + CA = save_file(CAFile, Dir), + Verify = case GetD(<<"verify">>, false) of + false -> verify_none; + _ -> verify_peer + end, + Versions = emqx_tls_lib:integral_versions(Get(<<"tls_versions">>)), + Ciphers = emqx_tls_lib:integral_ciphers(Versions, Get(<<"ciphers">>)), + filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}, + {verify, Verify}, {versions, Versions}, {ciphers, Ciphers}]). + +filter([]) -> []; +filter([{_, ""} | T]) -> filter(T); +filter([H | T]) -> [H | filter(T)]. + +save_file(#{<<"filename">> := FileName, <<"file">> := Content}, Dir) + when FileName =/= undefined andalso Content =/= undefined -> + save_file(ensure_str(FileName), iolist_to_binary(Content), Dir); +save_file(FilePath, _) when is_binary(FilePath) -> + ensure_str(FilePath); +save_file(FilePath, _) when is_list(FilePath) -> + FilePath; +save_file(_, _) -> "". + +save_file("", _, _Dir) -> ""; %% ignore +save_file(_, <<>>, _Dir) -> ""; %% ignore +save_file(FileName, Content, Dir) -> + FullFilename = filename:join([Dir, FileName]), + ok = filelib:ensure_dir(FullFilename), + case file:write_file(FullFilename, Content) of + ok -> + ensure_str(FullFilename); + {error, Reason} -> + logger:error("failed_to_save_ssl_file ~s: ~0p", [FullFilename, Reason]), + error({"failed_to_save_ssl_file", FullFilename, Reason}) + end. + +ensure_str(L) when is_list(L) -> L; +ensure_str(B) when is_binary(B) -> unicode:characters_to_list(B, utf8). + diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl new file mode 100644 index 000000000..d989b9711 --- /dev/null +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_ssl_tests.erl @@ -0,0 +1,78 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_libs_ssl_tests). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +no_crash_test_() -> + Opts = [{numtests, 1000}, {to_file, user}], + {timeout, 60, + fun() -> ?assert(proper:quickcheck(prop_run(), Opts)) end}. + +prop_run() -> + ?FORALL(Generated, prop_opts_input(), test_opts_input(Generated)). + +%% proper type to generate input value. +prop_opts_input() -> + [{keyfile, prop_file_or_content()}, + {certfile, prop_file_or_content()}, + {cacertfile, prop_file_or_content()}, + {verify, proper_types:boolean()}, + {versions, prop_tls_versions()}, + {ciphers, prop_tls_ciphers()}, + {other, proper_types:binary()}]. + +prop_file_or_content() -> + proper_types:oneof([prop_cert_file_name(), + {prop_cert_file_name(), proper_types:binary()}]). + +prop_cert_file_name() -> + proper_types:oneof(["certname1", <<"certname2">>, "", <<>>, undefined]). + +prop_tls_versions() -> + proper_types:oneof(["tlsv1.3", + <<"tlsv1.3,tlsv1.2">>, + "tlsv1.2 , tlsv1.1", + "1.2", + "v1.3", + "", + <<>>, + undefined]). + +prop_tls_ciphers() -> + proper_types:oneof(["TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256", + <<>>, + "", + undefined]). + +test_opts_input(Inputs) -> + KF = fun(K) -> {_, V} = lists:keyfind(K, 1, Inputs), V end, + Generated = #{<<"keyfile">> => file_or_content(KF(keyfile)), + <<"certfile">> => file_or_content(KF(certfile)), + <<"cafile">> => file_or_content(KF(cacertfile)), + <<"verify">> => file_or_content(KF(verify)), + <<"tls_versions">> => KF(versions), + <<"ciphers">> => KF(ciphers), + <<"other">> => KF(other)}, + _ = emqx_plugin_libs_ssl:save_files_return_opts(Generated, "test-data"), + true. + +file_or_content({Name, Content}) -> + #{<<"file">> => Content, <<"filename">> => Name}; +file_or_content(Name) -> + Name. diff --git a/src/emqx_tls_lib.erl b/src/emqx_tls_lib.erl index 6153160e7..74b6719ee 100644 --- a/src/emqx_tls_lib.erl +++ b/src/emqx_tls_lib.erl @@ -23,7 +23,8 @@ , integral_ciphers/2 ]). --define(IS_STRING_LIST(L), (is_list(L) andalso L =/= [] andalso is_list(hd(L)))). +-define(IS_STRING(L), (is_list(L) andalso L =/= [] andalso is_integer(hd(L)))). +-define(IS_STRING_LIST(L), (is_list(L) andalso L =/= [] andalso ?IS_STRING(hd(L)))). %% @doc Returns the default supported tls versions. -spec default_versions() -> [atom()]. @@ -33,7 +34,18 @@ default_versions() -> %% @doc Validate a given list of desired tls versions. %% raise an error exception if non of them are available. --spec integral_versions([ssl:tls_version()]) -> [ssl:tls_version()]. +%% The input list can be a string/binary of comma separated versions. +-spec integral_versions(undefined | string() | binary() | [ssl:tls_version()]) -> [ssl:tls_version()]. +integral_versions(undefined) -> + integral_versions(default_versions()); +integral_versions([]) -> + integral_versions(default_versions()); +integral_versions(<<>>) -> + integral_versions(default_versions()); +integral_versions(Desired) when is_binary(Desired) -> + integral_versions(parse_versions(Desired)); +integral_versions(Desired) when ?IS_STRING(Desired) -> + integral_versions(iolist_to_binary(Desired)); integral_versions(Desired) -> {_, Available} = lists:keyfind(available, 1, ssl:versions()), case lists:filter(fun(V) -> lists:member(V, Available) end, Desired) of @@ -96,3 +108,32 @@ default_versions(_) -> %% Deduplicate a list without re-ordering the elements. dedup([]) -> []; dedup([H | T]) -> [H | dedup([I || I <- T, I =/= H])]. + +%% parse comma separated tls version strings +parse_versions(Versions) -> + do_parse_versions(split_by_comma(Versions), []). + +do_parse_versions([], Acc) -> lists:reverse(Acc); +do_parse_versions([V | More], Acc) -> + case parse_version(V) of + unknown -> + emqx_logger:warning("unknown_tls_version_discarded: ~p", [V]), + do_parse_versions(More, Acc); + Parsed -> + do_parse_versions(More, [Parsed | Acc]) + end. + +parse_version(<<"tlsv", Vsn/binary>>) -> parse_version(Vsn); +parse_version(<<"v", Vsn/binary>>) -> parse_version(Vsn); +parse_version(<<"1.3">>) -> 'tlsv1.3'; +parse_version(<<"1.2">>) -> 'tlsv1.2'; +parse_version(<<"1.1">>) -> 'tlsv1.1'; +parse_version(<<"1">>) -> 'tlsv1'; +parse_version(_) -> unknown. + +split_by_comma(Bin) -> + [trim_space(I) || I <- binary:split(Bin, <<",">>, [global])]. + +%% trim spaces +trim_space(Bin) -> + hd([I || I <- binary:split(Bin, <<" ">>), I =/= <<>>]). From 0a51bd4c2ffd15a99b4c6f73448093592b99a038 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 26 Feb 2021 13:57:36 +0100 Subject: [PATCH 21/32] fix(webhook): Call common lib for ssl options --- .../src/emqx_web_hook_actions.erl | 41 ++----------------- rebar.config.erl | 1 + src/emqx_tls_lib.erl | 5 ++- 3 files changed, 8 insertions(+), 39 deletions(-) diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index bfbc89daa..a848350d3 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -365,49 +365,14 @@ pool_name(ResId) -> get_ssl_options(Config, ResId, <<"https://", _URL/binary>>) -> [{transport, ssl}, - {transport_opts, get_ssl_opts(Config, ResId)}, - {versions, emqx_tls_lib:default_versions()}, - {ciphers, emqx_tls_lib:default_ciphers()} + {transport_opts, get_ssl_opts(Config, ResId)} ]; get_ssl_options(_Config, _ResId, _URL) -> []. get_ssl_opts(Opts, ResId) -> - KeyFile = maps:get(<<"keyfile">>, Opts, undefined), - CertFile = maps:get(<<"certfile">>, Opts, undefined), - CAFile = maps:get(<<"cacertfile">>, Opts, undefined), - Filter = fun(Opts1) -> - [{K, V} || {K, V} <- Opts1, - V =/= undefined, - V =/= <<>>, - V =/= "" ] - end, - Key = save_upload_file(KeyFile, ResId), - Cert = save_upload_file(CertFile, ResId), - CA = save_upload_file(CAFile, ResId), - Verify = case maps:get(<<"verify">>, Opts, false) of - false -> verify_none; - true -> verify_peer - end, - case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of - [] -> [{verify, Verify}]; - SslOpts -> - [{verify, Verify} | SslOpts] - end. - -save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> ""; -save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath); -save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) -> - FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]), - ok = filelib:ensure_dir(FullFilename), - case file:write_file(FullFilename, File) of - ok -> - binary_to_list(FullFilename); - {error, Reason} -> - logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]), - error({ResId, store_file_fail}) - end; -save_upload_file(_, _) -> "". + Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]), + [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Opts, Dir)}]. parse_host(Host) -> case inet:parse_address(Host) of diff --git a/rebar.config.erl b/rebar.config.erl index d216e8464..95d79b188 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -148,6 +148,7 @@ relx_apps(ReleaseType) -> , emqx , {mnesia, load} , {ekka, load} + , {emqx_plugin_libs, load} ] ++ [bcrypt || provide_bcrypt_release(ReleaseType)] ++ relx_apps_per_rel(ReleaseType) diff --git a/src/emqx_tls_lib.erl b/src/emqx_tls_lib.erl index 74b6719ee..024de699e 100644 --- a/src/emqx_tls_lib.erl +++ b/src/emqx_tls_lib.erl @@ -23,7 +23,9 @@ , integral_ciphers/2 ]). +%% non-empty string -define(IS_STRING(L), (is_list(L) andalso L =/= [] andalso is_integer(hd(L)))). +%% non-empty list of strings -define(IS_STRING_LIST(L), (is_list(L) andalso L =/= [] andalso ?IS_STRING(hd(L)))). %% @doc Returns the default supported tls versions. @@ -35,7 +37,8 @@ default_versions() -> %% @doc Validate a given list of desired tls versions. %% raise an error exception if non of them are available. %% The input list can be a string/binary of comma separated versions. --spec integral_versions(undefined | string() | binary() | [ssl:tls_version()]) -> [ssl:tls_version()]. +-spec integral_versions(undefined | string() | binary() | [ssl:tls_version()]) -> + [ssl:tls_version()]. integral_versions(undefined) -> integral_versions(default_versions()); integral_versions([]) -> From 534b6c00621fd8bf051d6848ad3ad265c9e16f1e Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 26 Feb 2021 14:53:41 +0100 Subject: [PATCH 22/32] chore(scripts): get-dashboard.sh accepts URL as arg Also move it to scripts dir --- Makefile | 2 +- get-dashboard.sh => scripts/get-dashboard.sh | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) rename get-dashboard.sh => scripts/get-dashboard.sh (56%) diff --git a/Makefile b/Makefile index c3dbc28a2..5dc0d60f7 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ $(REBAR): ensure-rebar3 .PHONY: get-dashboard get-dashboard: - $(CURDIR)/get-dashboard.sh $(DASHBOARD_VERSION) + $(CURDIR)/scripts/get-dashboard.sh $(DASHBOARD_VERSION) .PHONY: eunit eunit: $(REBAR) diff --git a/get-dashboard.sh b/scripts/get-dashboard.sh similarity index 56% rename from get-dashboard.sh rename to scripts/get-dashboard.sh index 991f2b39d..48a846112 100755 --- a/get-dashboard.sh +++ b/scripts/get-dashboard.sh @@ -1,15 +1,18 @@ #!/bin/bash -set -eu +## NOTE: execute this script in the project root -VERSION="$1" +set -euo pipefail -# ensure dir -cd -P -- "$(dirname -- "$0")" +if [[ "$1" == https://* ]]; then + VERSION='*' # alwyas download + DOWNLOAD_URL="$1" +else + VERSION="$1" + DOWNLOAD_URL="https://github.com/emqx/emqx-dashboard-frontend/releases/download/${VERSION}/emqx-dashboard.zip" +fi -DOWNLOAD_URL='https://github.com/emqx/emqx-dashboard-frontend/releases/download' - -if [ "$EMQX_ENTERPRISE" = 'true' ] || [ "$EMQX_ENTERPRISE" == '1' ]; then +if [ "${EMQX_ENTERPRISE:-}" = 'true' ] || [ "${EMQX_ENTERPRISE:-}" == '1' ]; then DASHBOARD_PATH='lib-ee/emqx_dashboard/priv' else DASHBOARD_PATH='lib-ce/emqx_dashboard/priv' @@ -28,7 +31,7 @@ if [ -d "$DASHBOARD_PATH/www" ] && [ "$(version)" = "$VERSION" ]; then exit 0 fi -curl -f -L "${DOWNLOAD_URL}/${VERSION}/emqx-dashboard.zip" -o ./emqx-dashboard.zip +curl -f -L "${DOWNLOAD_URL}" -o ./emqx-dashboard.zip unzip -q ./emqx-dashboard.zip -d "$DASHBOARD_PATH" rm -rf "$DASHBOARD_PATH/www" mv "$DASHBOARD_PATH/dist" "$DASHBOARD_PATH/www" From 6e0b53fbb9362bb4f29cfea0c7ae8dc6f524abc8 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 26 Feb 2021 20:05:23 +0100 Subject: [PATCH 23/32] fix(eunit): fix emqx_tls_lib_tests --- src/emqx_tls_lib.erl | 4 ++-- test/emqx_tls_lib_tests.erl | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/emqx_tls_lib.erl b/src/emqx_tls_lib.erl index 024de699e..215f0b5ca 100644 --- a/src/emqx_tls_lib.erl +++ b/src/emqx_tls_lib.erl @@ -45,10 +45,10 @@ integral_versions([]) -> integral_versions(default_versions()); integral_versions(<<>>) -> integral_versions(default_versions()); -integral_versions(Desired) when is_binary(Desired) -> - integral_versions(parse_versions(Desired)); integral_versions(Desired) when ?IS_STRING(Desired) -> integral_versions(iolist_to_binary(Desired)); +integral_versions(Desired) when is_binary(Desired) -> + integral_versions(parse_versions(Desired)); integral_versions(Desired) -> {_, Available} = lists:keyfind(available, 1, ssl:versions()), case lists:filter(fun(V) -> lists:member(V, Available) end, Desired) of diff --git a/test/emqx_tls_lib_tests.erl b/test/emqx_tls_lib_tests.erl index 452909db2..4748f5854 100644 --- a/test/emqx_tls_lib_tests.erl +++ b/test/emqx_tls_lib_tests.erl @@ -53,8 +53,12 @@ tls_versions_test() -> ?assert(lists:member('tlsv1.3', emqx_tls_lib:default_versions())). tls_version_unknown_test() -> - ?assertError(#{reason := no_available_tls_version}, + ?assertEqual(emqx_tls_lib:default_versions(), emqx_tls_lib:integral_versions([])), + ?assertEqual(emqx_tls_lib:default_versions(), + emqx_tls_lib:integral_versions(<<>>)), + ?assertEqual(emqx_tls_lib:default_versions(), + emqx_tls_lib:integral_versions("foo")), ?assertError(#{reason := no_available_tls_version}, emqx_tls_lib:integral_versions([foo])). From 675603fd73325734c17612c7cfe60772bb569739 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 1 Mar 2021 07:34:06 +0100 Subject: [PATCH 24/32] fix(bridge-mqtt): no ssl-option at all (empty list) when ssl is disabled --- apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 56a461241..7a1c55798 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -755,7 +755,7 @@ options(Options, PoolName, ResId) -> end. maybe_ssl(_Options, false, _ResId) -> - [{ssl, false}]; + []; maybe_ssl(Options, true, ResId) -> Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]), [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Options, Dir)}]. From 30293b602a795cbfc8dd7d1c03376082bef1863c Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 1 Mar 2021 07:49:22 +0100 Subject: [PATCH 25/32] chore(build): move rebar3 download script to scripts dir --- Makefile | 5 +++-- ensure-rebar3.sh => scripts/ensure-rebar3.sh | 7 +++---- scripts/get-dashboard.sh | 5 +++-- 3 files changed, 9 insertions(+), 8 deletions(-) rename ensure-rebar3.sh => scripts/ensure-rebar3.sh (86%) diff --git a/Makefile b/Makefile index 5dc0d60f7..0bd5dc450 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ REBAR_VERSION = 3.14.3-emqx-4 DASHBOARD_VERSION = v4.3.0 REBAR = $(CURDIR)/rebar3 BUILD = $(CURDIR)/build +SCRIPTS = $(CURDIR)/scripts export EMQX_ENTERPRISE=false export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) @@ -20,13 +21,13 @@ all: $(REBAR) $(PROFILES) .PHONY: ensure-rebar3 ensure-rebar3: - $(CURDIR)/ensure-rebar3.sh $(REBAR_VERSION) + $(SCRIPTS)/ensure-rebar3.sh $(REBAR_VERSION) $(REBAR): ensure-rebar3 .PHONY: get-dashboard get-dashboard: - $(CURDIR)/scripts/get-dashboard.sh $(DASHBOARD_VERSION) + $(SCRIPTS)/get-dashboard.sh $(DASHBOARD_VERSION) .PHONY: eunit eunit: $(REBAR) diff --git a/ensure-rebar3.sh b/scripts/ensure-rebar3.sh similarity index 86% rename from ensure-rebar3.sh rename to scripts/ensure-rebar3.sh index 941df5b85..5612beab4 100755 --- a/ensure-rebar3.sh +++ b/scripts/ensure-rebar3.sh @@ -1,12 +1,11 @@ -#!/bin/sh +#!/bin/bash -#set -euo pipefail -set -eu +set -euo pipefail VERSION="$1" # ensure dir -cd -P -- "$(dirname -- "$0")" +cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." DOWNLOAD_URL='https://github.com/emqx/rebar3/releases/download' diff --git a/scripts/get-dashboard.sh b/scripts/get-dashboard.sh index 48a846112..bc28164a5 100755 --- a/scripts/get-dashboard.sh +++ b/scripts/get-dashboard.sh @@ -1,9 +1,10 @@ #!/bin/bash -## NOTE: execute this script in the project root - set -euo pipefail +# ensure dir +cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." + if [[ "$1" == https://* ]]; then VERSION='*' # alwyas download DOWNLOAD_URL="$1" From 4aca2c294f617468b85ee57484ed487a6cbf9526 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 1 Mar 2021 09:48:29 +0100 Subject: [PATCH 26/32] fix(webhook): transport options --- apps/emqx_web_hook/src/emqx_web_hook_actions.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index a848350d3..bb4ec07a6 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -350,25 +350,24 @@ pool_opts(Params = #{<<"url">> := URL}, ResId) -> PoolSize = maps:get(<<"pool_size">>, Params, 32), ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), {Inet, Host} = parse_host(Host0), - SslOpts = get_ssl_options(Params, ResId, add_default_scheme(URL)), [{host, Host}, {port, Port}, {pool_size, PoolSize}, {pool_type, hash}, {connect_timeout, ConnectTimeout}, {retry, 5}, - {retry_timeout, 1000}, - {transport_opts, [Inet] ++ SslOpts}]. + {retry_timeout, 1000} + | maybe_ssl(Params, ResId, add_default_scheme(URL), Inet)]. pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). -get_ssl_options(Config, ResId, <<"https://", _URL/binary>>) -> +maybe_ssl(Config, ResId, <<"https://", _URL/binary>>, Inet) -> [{transport, ssl}, - {transport_opts, get_ssl_opts(Config, ResId)} + {transport_opts, [Inet | get_ssl_opts(Config, ResId)]} ]; -get_ssl_options(_Config, _ResId, _URL) -> - []. +maybe_ssl(_Config, _ResId, _URL, Inet) -> + [{transport_opts, [Inet]}]. get_ssl_opts(Opts, ResId) -> Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]), From 319d44d1bf88df46d409823a4e5d7dbf1b960c64 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 1 Mar 2021 16:59:53 +0800 Subject: [PATCH 27/32] chore(webhook): remove needless files --- .../emqx_web_hook/test/prop_webhook_confs.erl | 142 ------ .../emqx_web_hook/test/prop_webhook_hooks.erl | 409 ------------------ 2 files changed, 551 deletions(-) delete mode 100644 apps/emqx_web_hook/test/prop_webhook_confs.erl delete mode 100644 apps/emqx_web_hook/test/prop_webhook_hooks.erl diff --git a/apps/emqx_web_hook/test/prop_webhook_confs.erl b/apps/emqx_web_hook/test/prop_webhook_confs.erl deleted file mode 100644 index bfe170239..000000000 --- a/apps/emqx_web_hook/test/prop_webhook_confs.erl +++ /dev/null @@ -1,142 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_webhook_confs). --include_lib("proper/include/proper.hrl"). - --import(emqx_ct_proper_types, - [ url/0 - , nof/1 - ]). - --define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> - State = do_setup(), - fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_confs() -> - Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")), - ?ALL({Url, Confs0}, {url(), confs()}, - begin - Confs = [{"web.hook.api.url", Url}|Confs0], - Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)), - - assert_confs(Confs, Envs), - - set_application_envs(Envs), - {ok, _} = application:ensure_all_started(emqx_web_hook), - application:stop(emqx_web_hook), - unset_application_envs(Envs), - true - end). - -%%-------------------------------------------------------------------- -%% Helpers -%%-------------------------------------------------------------------- - -do_setup() -> - application:set_env(kernel, logger_level, error), - emqx_ct_helpers:start_apps([], fun set_special_cfgs/1), - ok. - -do_teardown(_) -> - emqx_ct_helpers:stop_apps([]), - ok. - -set_special_cfgs(_) -> - application:set_env(emqx, plugins_loaded_file, undefined), - application:set_env(emqx, modules_loaded_file, undefined), - ok. - -assert_confs([{"web.hook.api.url", Url}|More], Envs) -> - %% Assert! - Url = deep_get_env("emqx_web_hook.url", Envs), - assert_confs(More, Envs); - -assert_confs([{"web.hook.rule." ++ HookName0, Spec}|More], Envs) -> - HookName = re:replace(HookName0, "\\.[0-9]", "", [{return, list}]), - Rules = deep_get_env("emqx_web_hook.rules", Envs), - - %% Assert! - Spec = proplists:get_value(HookName, Rules), - - assert_confs(More, Envs); - -assert_confs([_|More], Envs) -> - assert_confs(More, Envs); - -assert_confs([], _) -> - true. - -deep_get_env(Path, Envs) -> - lists:foldl( - fun(_K, undefiend) -> undefiend; - (K, Acc) -> proplists:get_value(binary_to_atom(K, utf8), Acc) - end, Envs, re:split(Path, "\\.")). - -set_application_envs(Envs) -> - application:set_env(Envs). - -unset_application_envs(Envs) -> - lists:foreach(fun({App, Es}) -> - lists:foreach(fun({K, _}) -> - application:unset_env(App, K) - end, Es) end, Envs). - -cuttlefish_conf_file(Ls) when is_list(Ls) -> - [cuttlefish_conf_option(K,V) || {K, V} <- Ls]. - -cuttlefish_conf_option(K, V) - when is_list(K) -> - {re:split(K, "[.]", [{return, list}]), V}. - -%%-------------------------------------------------------------------- -%% Generators -%%-------------------------------------------------------------------- - -confs() -> - nof([{"web.hook.encode_payload", oneof(["base64", "base62"])}, - {"web.hook.rule.client.connect.1", rule_spec()}, - {"web.hook.rule.client.connack.1", rule_spec()}, - {"web.hook.rule.client.connected.1", rule_spec()}, - {"web.hook.rule.client.disconnected.1", rule_spec()}, - {"web.hook.rule.client.subscribe.1", rule_spec()}, - {"web.hook.rule.client.unsubscribe.1", rule_spec()}, - {"web.hook.rule.session.subscribed.1", rule_spec()}, - {"web.hook.rule.session.unsubscribed.1", rule_spec()}, - {"web.hook.rule.session.terminated.1", rule_spec()}, - {"web.hook.rule.message.publish.1", rule_spec()}, - {"web.hook.rule.message.delivered.1", rule_spec()}, - {"web.hook.rule.message.acked.1", rule_spec()} - ]). - -rule_spec() -> - ?LET(Action, action_names(), - begin - binary_to_list(emqx_json:encode(#{action => Action})) - end). - -action_names() -> - oneof([on_client_connect, on_client_connack, on_client_connected, - on_client_connected, on_client_disconnected, on_client_subscribe, on_client_unsubscribe, - on_session_subscribed, on_session_unsubscribed, on_session_terminated, - on_message_publish, on_message_delivered, on_message_acked]). - diff --git a/apps/emqx_web_hook/test/prop_webhook_hooks.erl b/apps/emqx_web_hook/test/prop_webhook_hooks.erl deleted file mode 100644 index 4e51573a6..000000000 --- a/apps/emqx_web_hook/test/prop_webhook_hooks.erl +++ /dev/null @@ -1,409 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_webhook_hooks). - --include_lib("proper/include/proper.hrl"). - --import(emqx_ct_proper_types, - [ conninfo/0 - , clientinfo/0 - , sessioninfo/0 - , message/0 - , connack_return_code/0 - , topictab/0 - , topic/0 - , subopts/0 - ]). - --define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> - State = do_setup(), - fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_client_connect() -> - ?ALL({ConnInfo, ConnProps, Env}, - {conninfo(), conn_properties(), empty_env()}, - begin - ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_connect, - node => stringfy(node()), - clientid => maps:get(clientid, ConnInfo), - username => maybe(maps:get(username, ConnInfo)), - ipaddress => peer2addr(maps:get(peername, ConnInfo)), - keepalive => maps:get(keepalive, ConnInfo), - proto_ver => maps:get(proto_ver, ConnInfo) - }), - true - end). - -prop_client_connack() -> - ?ALL({ConnInfo, Rc, AckProps, Env}, - {conninfo(), connack_return_code(), ack_properties(), empty_env()}, - begin - ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_connack, - node => stringfy(node()), - clientid => maps:get(clientid, ConnInfo), - username => maybe(maps:get(username, ConnInfo)), - ipaddress => peer2addr(maps:get(peername, ConnInfo)), - keepalive => maps:get(keepalive, ConnInfo), - proto_ver => maps:get(proto_ver, ConnInfo), - conn_ack => Rc - }), - true - end). - -prop_client_connected() -> - ?ALL({ClientInfo, ConnInfo, Env}, - {clientinfo(), conninfo(), empty_env()}, - begin - ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_connected, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - ipaddress => peer2addr(maps:get(peerhost, ClientInfo)), - keepalive => maps:get(keepalive, ConnInfo), - proto_ver => maps:get(proto_ver, ConnInfo), - connected_at => maps:get(connected_at, ConnInfo) - }), - true - end). - -prop_client_disconnected() -> - ?ALL({ClientInfo, Reason, ConnInfo, Env}, - {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()}, - begin - ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_disconnected, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - disconnected_at => maps:get(disconnected_at, ConnInfo), - reason => stringfy(Reason) - }), - true - end). - -prop_client_subscribe() -> - ?ALL({ClientInfo, SubProps, TopicTab, Env}, - {clientinfo(), sub_properties(), topictab(), topic_filter_env()}, - begin - ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env), - - Matched = filter_topictab(TopicTab, Env), - - lists:foreach(fun({Topic, Opts}) -> - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_subscribe, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic, - opts => Opts}) - end, Matched), - true - end). - -prop_client_unsubscribe() -> - ?ALL({ClientInfo, SubProps, TopicTab, Env}, - {clientinfo(), unsub_properties(), topictab(), topic_filter_env()}, - begin - ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env), - - Matched = filter_topictab(TopicTab, Env), - - lists:foreach(fun({Topic, Opts}) -> - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => client_unsubscribe, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic, - opts => Opts}) - end, Matched), - true - end). - -prop_session_subscribed() -> - ?ALL({ClientInfo, Topic, SubOpts, Env}, - {clientinfo(), topic(), subopts(), topic_filter_env()}, - begin - ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env), - filter_topic_match(Topic, Env) andalso begin - Body = receive_http_request_body(), - Body1 = emqx_json:encode( - #{action => session_subscribed, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic, - opts => SubOpts - }), - Body = Body1 - end, - true - end). - -prop_session_unsubscribed() -> - ?ALL({ClientInfo, Topic, SubOpts, Env}, - {clientinfo(), topic(), subopts(), empty_env()}, - begin - ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env), - filter_topic_match(Topic, Env) andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => session_unsubscribed, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - topic => Topic - }) - end, - true - end). - -prop_session_terminated() -> - ?ALL({ClientInfo, Reason, SessInfo, Env}, - {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()}, - begin - ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env), - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => session_terminated, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - reason => stringfy(Reason) - }), - true - end). - -prop_message_publish() -> - ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()}, - begin - application:set_env(emqx_web_hook, encode_payload, Encode), - {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env), - application:unset_env(emqx_web_hook, encode_payload), - - (not emqx_message:is_sys(Msg)) - andalso filter_topic_match(emqx_message:topic(Msg), Env) - andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => message_publish, - node => stringfy(node()), - from_client_id => emqx_message:from(Msg), - from_username => maybe(emqx_message:get_header(username, Msg)), - topic => emqx_message:topic(Msg), - qos => emqx_message:qos(Msg), - retain => emqx_message:get_flag(retain, Msg), - payload => encode(emqx_message:payload(Msg), Encode), - ts => emqx_message:timestamp(Msg) - }) - end, - true - end). - -prop_message_delivered() -> - ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()}, - begin - application:set_env(emqx_web_hook, encode_payload, Encode), - ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env), - application:unset_env(emqx_web_hook, encode_payload), - - (not emqx_message:is_sys(Msg)) - andalso filter_topic_match(emqx_message:topic(Msg), Env) - andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => message_delivered, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - from_client_id => emqx_message:from(Msg), - from_username => maybe(emqx_message:get_header(username, Msg)), - topic => emqx_message:topic(Msg), - qos => emqx_message:qos(Msg), - retain => emqx_message:get_flag(retain, Msg), - payload => encode(emqx_message:payload(Msg), Encode), - ts => emqx_message:timestamp(Msg) - }) - end, - true - end). - -prop_message_acked() -> - ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()}, - begin - application:set_env(emqx_web_hook, encode_payload, Encode), - ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env), - application:unset_env(emqx_web_hook, encode_payload), - - (not emqx_message:is_sys(Msg)) - andalso filter_topic_match(emqx_message:topic(Msg), Env) - andalso begin - Body = receive_http_request_body(), - Body = emqx_json:encode( - #{action => message_acked, - node => stringfy(node()), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo)), - from_client_id => emqx_message:from(Msg), - from_username => maybe(emqx_message:get_header(username, Msg)), - topic => emqx_message:topic(Msg), - qos => emqx_message:qos(Msg), - retain => emqx_message:get_flag(retain, Msg), - payload => encode(emqx_message:payload(Msg), Encode), - ts => emqx_message:timestamp(Msg) - }) - end, - true - end). - -%%-------------------------------------------------------------------- -%% Helper -%%-------------------------------------------------------------------- -do_setup() -> - %% Pre-defined envs - application:set_env(emqx_web_hook, path, "path"), - application:set_env(emqx_web_hook, headers, []), - - meck:new(ehttpc_pool, [passthrough, no_history]), - meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end), - - Self = self(), - meck:new(ehttpc, [passthrough, no_history]), - meck:expect(ehttpc, request, - fun(_ClientId, Method, {Path, Headers, Body}) -> - Self ! {Method, Path, Headers, Body}, {ok, ok, ok} - end), - - meck:new(emqx_metrics, [passthrough, no_history]), - meck:expect(emqx_metrics, inc, fun(_) -> ok end), - ok. - -do_teardown(_) -> - meck:unload(ehttpc_pool), - meck:unload(ehttpc), - meck:unload(emqx_metrics). - -maybe(undefined) -> null; -maybe(T) -> T. - -peer2addr({Host, _}) -> - list_to_binary(inet:ntoa(Host)); -peer2addr(Host) -> - list_to_binary(inet:ntoa(Host)). - -ensure_to_binary(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); -ensure_to_binary(Bin) when is_binary(Bin) -> Bin. - -stringfy({shutdown, Reason}) -> - stringfy(Reason); -stringfy(Term) when is_atom(Term); is_binary(Term) -> - Term; -stringfy(Term) -> - unicode:characters_to_binary(io_lib:format("~0p", [Term])). - -receive_http_request_body() -> - receive - {post, _, _, Body} -> - Body - after 100 -> - exit(waiting_message_timeout) - end. - -receive_http_request_bodys() -> - receive_http_request_bodys_([]). - -receive_http_request_bodys_(Acc) -> - receive - {post, _, _, Body} -> - receive_http_request_bodys_([Body|Acc]) - after 1000 -> - lists:reverse(Acc) - end. - -filter_topictab(TopicTab, {undefined}) -> - TopicTab; -filter_topictab(TopicTab, {TopicFilter}) -> - lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab). - -filter_topic_match(_Topic, {undefined}) -> - true; -filter_topic_match(Topic, {TopicFilter}) -> - emqx_topic:match(Topic, TopicFilter). - -encode(Bin, base64) -> - base64:encode(Bin); -encode(Bin, base62) -> - emqx_base62:encode(Bin); -encode(Bin, _) -> - Bin. - -%%-------------------------------------------------------------------- -%% Generators -%%-------------------------------------------------------------------- - -conn_properties() -> - #{}. - -ack_properties() -> - #{}. - -sub_properties() -> - #{}. - -unsub_properties() -> - #{}. - -shutdown_reason() -> - oneof([any(), {shutdown, atom()}]). - -empty_env() -> - {undefined}. - -topic_filter_env() -> - oneof([{<<"#">>}, {undefined}, {topic()}]). - -payload_encode() -> - oneof([base62, base64, undefined]). - -http_code() -> - oneof([socket_closed_remotely, others]). - -disconnected_conninfo() -> - ?LET(Info, conninfo(), - begin - Info#{disconnected_at => erlang:system_time(millisecond)} - end). From 026e1494fbf3d7b1450988b3c46d88e7a6af1161 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 1 Mar 2021 10:00:38 +0100 Subject: [PATCH 28/32] fix(webhook): fix config entry web.hook.api.url -> web.hook.url --- apps/emqx_web_hook/README.md | 2 +- apps/emqx_web_hook/etc/emqx_web_hook.conf | 2 +- apps/emqx_web_hook/priv/emqx_web_hook.schema | 2 +- apps/emqx_web_hook/test/props/prop_webhook_confs.erl | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_web_hook/README.md b/apps/emqx_web_hook/README.md index 29b5f3ca8..c76c2936d 100644 --- a/apps/emqx_web_hook/README.md +++ b/apps/emqx_web_hook/README.md @@ -11,7 +11,7 @@ Please see: [EMQ X - WebHook](https://docs.emqx.io/broker/latest/en/advanced/web ## The web services URL for Hook request ## ## Value: String -web.hook.api.url = http://127.0.0.1:8080 +web.hook.url = http://127.0.0.1:8080 ## Encode message payload field ## diff --git a/apps/emqx_web_hook/etc/emqx_web_hook.conf b/apps/emqx_web_hook/etc/emqx_web_hook.conf index 218718079..6c50924ff 100644 --- a/apps/emqx_web_hook/etc/emqx_web_hook.conf +++ b/apps/emqx_web_hook/etc/emqx_web_hook.conf @@ -5,7 +5,7 @@ ## Webhook URL ## ## Value: String -web.hook.api.url = http://127.0.0.1:80 +web.hook.url = http://127.0.0.1:80 ## HTTP Headers ## diff --git a/apps/emqx_web_hook/priv/emqx_web_hook.schema b/apps/emqx_web_hook/priv/emqx_web_hook.schema index beed1b107..3a56b8b1d 100644 --- a/apps/emqx_web_hook/priv/emqx_web_hook.schema +++ b/apps/emqx_web_hook/priv/emqx_web_hook.schema @@ -1,7 +1,7 @@ %%-*- mode: erlang -*- %% EMQ X R3.0 config mapping -{mapping, "web.hook.api.url", "emqx_web_hook.url", [ +{mapping, "web.hook.url", "emqx_web_hook.url", [ {datatype, string} ]}. diff --git a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl index bfe170239..cc5e7af64 100644 --- a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl +++ b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl @@ -36,7 +36,7 @@ prop_confs() -> Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")), ?ALL({Url, Confs0}, {url(), confs()}, begin - Confs = [{"web.hook.api.url", Url}|Confs0], + Confs = [{"web.hook.url", Url}|Confs0], Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)), assert_confs(Confs, Envs), @@ -66,7 +66,7 @@ set_special_cfgs(_) -> application:set_env(emqx, modules_loaded_file, undefined), ok. -assert_confs([{"web.hook.api.url", Url}|More], Envs) -> +assert_confs([{"web.hook.url", Url}|More], Envs) -> %% Assert! Url = deep_get_env("emqx_web_hook.url", Envs), assert_confs(More, Envs); From db13f18cbe36fe91cec111f56ba48b6549684fc2 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 1 Mar 2021 17:06:39 +0800 Subject: [PATCH 29/32] fix(webhook): transport options --- .../src/emqx_web_hook_actions.erl | 49 +++++++------------ 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index bb4ec07a6..6fc263bde 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -38,15 +38,7 @@ -define(RESOURCE_TYPE_WEBHOOK, 'web_hook'). -define(RESOURCE_CONFIG_SPEC, #{ - method => #{order => 1, - type => string, - enum => [<<"PUT">>,<<"POST">>], - default => <<"POST">>, - title => #{en => <<"Request Method">>, - zh => <<"请求方法"/utf8>>}, - description => #{en => <<"Request Method">>, - zh => <<"请求方法"/utf8>>}}, - url => #{order => 2, + url => #{order => 1, type => string, format => url, required => true, @@ -54,57 +46,49 @@ zh => <<"请求 URL"/utf8>>}, description => #{en => <<"The URL of the server that will receive the Webhook requests.">>, zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}}, - headers => #{order => 3, - type => object, - schema => #{}, - default => #{}, - title => #{en => <<"Request Header">>, - zh => <<"请求头"/utf8>>}, - description => #{en => <<"Request Header">>, - zh => <<"请求头"/utf8>>}}, - connect_timeout => #{order => 4, + connect_timeout => #{order => 2, type => string, default => <<"5s">>, title => #{en => <<"Connect Timeout">>, zh => <<"连接超时时间"/utf8>>}, description => #{en => <<"Connect Timeout In Seconds">>, zh => <<"连接超时时间"/utf8>>}}, - request_timeout => #{order => 5, + request_timeout => #{order => 3, type => string, default => <<"5s">>, title => #{en => <<"Request Timeout">>, zh => <<"请求超时时间时间"/utf8>>}, description => #{en => <<"Request Timeout In Seconds">>, zh => <<"请求超时时间"/utf8>>}}, - pool_size => #{order => 6, + pool_size => #{order => 4, type => number, default => 8, title => #{en => <<"Pool Size">>, zh => <<"连接池大小"/utf8>>}, description => #{en => <<"Connection Pool">>, zh => <<"连接池大小"/utf8>>} }, - cacertfile => #{order => 7, + cacertfile => #{order => 5, type => file, default => <<"">>, title => #{en => <<"CA Certificate File">>, zh => <<"CA 证书文件"/utf8>>}, description => #{en => <<"CA Certificate file">>, zh => <<"CA 证书文件"/utf8>>}}, - keyfile => #{order => 8, + keyfile => #{order => 6, type => file, default => <<"">>, title =>#{en => <<"SSL Key">>, zh => <<"SSL Key"/utf8>>}, description => #{en => <<"Your ssl keyfile">>, zh => <<"SSL 私钥"/utf8>>}}, - certfile => #{order => 9, + certfile => #{order => 7, type => file, default => <<"">>, title =>#{en => <<"SSL Cert">>, zh => <<"SSL Cert"/utf8>>}, description => #{en => <<"Your ssl certfile">>, zh => <<"SSL 证书"/utf8>>}}, - verify => #{order => 10, + verify => #{order => 8, type => boolean, default => false, title =>#{en => <<"Verify Server Certfile">>, @@ -156,7 +140,7 @@ description => #{en => <<"HTTP headers.">>, zh => <<"HTTP headers。"/utf8>>}}, body => #{ - order => 5, + order => 4, type => string, input => textarea, required => false, @@ -350,24 +334,27 @@ pool_opts(Params = #{<<"url">> := URL}, ResId) -> PoolSize = maps:get(<<"pool_size">>, Params, 32), ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), {Inet, Host} = parse_host(Host0), + SslOpts0 = maybe_ssl(Params, ResId, add_default_scheme(URL)), + TranOpts = lists:keyfind(transport_opts, 1, SslOpts0), + SslOpts = lists:keyreplace(transport_opts, 1, + {transport_opts, [Inet | TranOpts]}, SslOpts0), [{host, Host}, {port, Port}, {pool_size, PoolSize}, {pool_type, hash}, {connect_timeout, ConnectTimeout}, {retry, 5}, - {retry_timeout, 1000} - | maybe_ssl(Params, ResId, add_default_scheme(URL), Inet)]. + {retry_timeout, 1000} | SslOpts]. pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). -maybe_ssl(Config, ResId, <<"https://", _URL/binary>>, Inet) -> +maybe_ssl(Config, ResId, <<"https://", _URL/binary>>) -> [{transport, ssl}, - {transport_opts, [Inet | get_ssl_opts(Config, ResId)]} + {transport_opts, get_ssl_opts(Config, ResId)} ]; -maybe_ssl(_Config, _ResId, _URL, Inet) -> - [{transport_opts, [Inet]}]. +maybe_ssl(_Config, _ResId, _URL) -> + [{transport_opts, []}]. get_ssl_opts(Opts, ResId) -> Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]), From 801b3d6daf55558d58e50730513d57627954b447 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 1 Mar 2021 17:10:27 +0800 Subject: [PATCH 30/32] style: remove spaces at the end of lines --- apps/emqx_web_hook/src/emqx_web_hook_actions.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 6fc263bde..6df390546 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -335,8 +335,8 @@ pool_opts(Params = #{<<"url">> := URL}, ResId) -> ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), {Inet, Host} = parse_host(Host0), SslOpts0 = maybe_ssl(Params, ResId, add_default_scheme(URL)), - TranOpts = lists:keyfind(transport_opts, 1, SslOpts0), - SslOpts = lists:keyreplace(transport_opts, 1, + {transport_opts, TranOpts} = lists:keyfind(transport_opts, 1, SslOpts0), + SslOpts = lists:keyreplace(transport_opts, 1, {transport_opts, [Inet | TranOpts]}, SslOpts0), [{host, Host}, {port, Port}, From 16c27663cd933c6e7555dd9926580a01c804a255 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 1 Mar 2021 12:40:59 +0100 Subject: [PATCH 31/32] refactor(webhook): transport options refine --- .../src/emqx_web_hook_actions.erl | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 6df390546..e5aa25c3a 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -326,35 +326,40 @@ add_default_scheme(URL) -> <<"http://", URL/binary>>. pool_opts(Params = #{<<"url">> := URL}, ResId) -> - #{host := Host0, scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(add_default_scheme(URL))), - Port = maps:get(port, URIMap, case Scheme of - "https" -> 443; - _ -> 80 - end), + #{host := Host0, scheme := Scheme} = URIMap = + uri_string:parse(binary_to_list(add_default_scheme(URL))), + DefaultPort = case is_https(Scheme) of + true -> 443; + false -> 80 + end, + Port = maps:get(port, URIMap, DefaultPort), PoolSize = maps:get(<<"pool_size">>, Params, 32), - ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), + ConnectTimeout = + cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), {Inet, Host} = parse_host(Host0), - SslOpts0 = maybe_ssl(Params, ResId, add_default_scheme(URL)), - {transport_opts, TranOpts} = lists:keyfind(transport_opts, 1, SslOpts0), - SslOpts = lists:keyreplace(transport_opts, 1, - {transport_opts, [Inet | TranOpts]}, SslOpts0), + TransportOpts = + case is_https(Scheme) of + true -> [Inet | get_ssl_opts(Params, ResId)]; + false -> [Inet] + end, + Opts = case is_https(Scheme) of + true -> [{transport_opts, TransportOpts}, {transport, ssl}]; + false -> [{transport_opts, TransportOpts}] + end, [{host, Host}, {port, Port}, {pool_size, PoolSize}, {pool_type, hash}, {connect_timeout, ConnectTimeout}, {retry, 5}, - {retry_timeout, 1000} | SslOpts]. + {retry_timeout, 1000} | Opts]. pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). -maybe_ssl(Config, ResId, <<"https://", _URL/binary>>) -> - [{transport, ssl}, - {transport_opts, get_ssl_opts(Config, ResId)} - ]; -maybe_ssl(_Config, _ResId, _URL) -> - [{transport_opts, []}]. +is_https(Scheme) when is_list(Scheme) -> is_https(list_to_binary(Scheme)); +is_https(<<"https", _/binary>>) -> true; +is_https(_) -> false. get_ssl_opts(Opts, ResId) -> Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]), From 990b7bd4941e845e37a58ffc1f39b8dfbcab264b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 1 Mar 2021 20:25:47 +0800 Subject: [PATCH 32/32] fix(bridge_mqtt): typos --- apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index f6d128b08..0c7b8ebf3 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -5,6 +5,6 @@ {<<".*">>, []} ], [ - {<<"*.">>, []} + {<<".*">>, []} ] }.