diff --git a/.github/workflows/apps_version_check.yaml b/.github/workflows/apps_version_check.yaml index 4dd058468..c2c2b6357 100644 --- a/.github/workflows/apps_version_check.yaml +++ b/.github/workflows/apps_version_check.yaml @@ -29,3 +29,10 @@ jobs: run: ./scripts/update-appup.sh emqx-ee --check - name: Check apps version run: ./scripts/apps-version-check.sh + - uses: actions/upload-artifact@v3.1.0 + if: failure() + with: + name: expected_appup_files + path: | + {src,apps}/**/*.appup.src + retention-days: 1 diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index d58486cda..239b8184e 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -93,6 +93,7 @@ jobs: -f .ci/docker-compose-file/docker-compose-enterprise-pgsql-and-timescale-client.yaml \ up -d --build docker exec -i erlang bash -c "echo \"https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com\" > /root/.git-credentials && git config --global credential.helper store" + docker exec -i erlang bash -c "git config --global --add safe.directory /emqx" while [ $(docker ps -a --filter name=client --filter exited=0 | wc -l) \ != $(docker ps -a --filter name=client | wc -l) ]; do sleep 5 diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 03fe37084..7cc4ac787 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -10,6 +10,20 @@ File format: - One list item per change topic Change log ends with a list of github PRs +## v4.3.16 + +### Enhancements + +- Add the possibility of configuring the password for + password-protected private key files used for dashboard and + management HTTPS listeners. [#8129] + +### Bug fixes + +- Avoid repeated writing `loaded_plugins` file if the plugin enable stauts has not changed [#8179] +- Correctly tally `connack.auth_error` metrics when a client uses MQTT + 3.1. [#8177] + ## v4.3.15 ### Enhancements diff --git a/apps/emqx_management/etc/emqx_management.conf b/apps/emqx_management/etc/emqx_management.conf index 0170059d7..f9e6a518c 100644 --- a/apps/emqx_management/etc/emqx_management.conf +++ b/apps/emqx_management/etc/emqx_management.conf @@ -43,6 +43,7 @@ management.listener.http.ipv6_v6only = false ## management.listener.https.send_timeout_close = on ## management.listener.https.certfile = etc/certs/cert.pem ## management.listener.https.keyfile = etc/certs/key.pem +## management.listener.https.key_password = yourpass ## management.listener.https.cacertfile = etc/certs/cacert.pem ## management.listener.https.verify = verify_peer ## NOTE: Do not use tlsv1.3 if emqx is running on OTP-22 or earlier diff --git a/apps/emqx_management/priv/emqx_management.schema b/apps/emqx_management/priv/emqx_management.schema index a30a20e4d..e0cc47d2f 100644 --- a/apps/emqx_management/priv/emqx_management.schema +++ b/apps/emqx_management/priv/emqx_management.schema @@ -143,6 +143,10 @@ {datatype, string} ]}. +{mapping, "management.listener.https.key_password", "emqx_management.listeners", [ + {datatype, string} +]}. + {mapping, "management.listener.https.certfile", "emqx_management.listeners", [ {datatype, string} ]}. @@ -217,6 +221,7 @@ end}. Filter([{versions, Versions}, {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, + {password, cuttlefish:conf_get(Prefix ++ ".key_password", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index edeffbde4..ae82d56c9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -35,24 +35,47 @@ }, target_qos => #{ order => 2, - type => number, - enum => [-1, 0, 1, 2], + input => editable_select, + type => [number, string], + enum => [0, 1, 2, <<"${qos}">>], required => true, default => 0, title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, - description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>, - zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>} + description => #{en => + <<"The QoS Level to be used when republishing the message." + " Support placeholder variables." + " Set to ${qos} to use the original QoS. Default is 0">>, + zh => + <<"重新发布消息时用的 QoS 级别。" + "支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS。默认 0"/utf8>>} + }, + target_retain => #{ + order => 3, + input => editable_select, + type => [boolean, string], + enum => [true, false, <<"${flags.retain}">>], + required => false, + default => false, + title => #{en => <<"Target Retain">>, + zh => <<"目标保留消息标识"/utf8>>}, + description => #{en => <<"The Retain flag to be used when republishing the message." + " Set to ${flags.retain} to use the original Retain." + " Support placeholder variables. Default is false">>, + zh => <<"重新发布消息时用的保留消息标识。" + "支持占位符变量,可以填写 ${flags.retain} 来使用原消息的 Retain。" + "默认 false"/utf8>>} }, payload_tmpl => #{ - order => 3, + order => 4, type => string, input => textarea, required => false, default => <<"${payload}">>, title => #{en => <<"Payload Template">>, zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported">>, + description => #{en => <<"The payload template, " + "variable interpolation is supported">>, zh => <<"消息内容模板,支持变量"/utf8>>} } }). @@ -89,7 +112,8 @@ params => #{}, title => #{en => <<"Do Nothing (debug)">>, zh => <<"空动作 (调试)"/utf8>>}, - description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>, + description => #{en => <<"This action does nothing and never fails. " + "It's for debug purpose">>, zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>} }). @@ -113,7 +137,8 @@ on_resource_create(_Name, Conf) -> %%------------------------------------------------------------------------------ %% Action 'inspect' %%------------------------------------------------------------------------------ --spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_inspect(Id, Params) -> Params. @@ -129,12 +154,15 @@ on_action_inspect(Selected, Envs) -> %%------------------------------------------------------------------------------ %% Action 'republish' %%------------------------------------------------------------------------------ --spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_republish(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_republish(Id, Params = #{ <<"target_topic">> := TargetTopic, - <<"target_qos">> := TargetQoS, + <<"target_qos">> := TargetQoS0, <<"payload_tmpl">> := PayloadTmpl }) -> + TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), + TargetQoS = to_qos(TargetQoS0), TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -157,20 +185,21 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, + } = Bindings}) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + Message = #message{ id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end, + qos = get_qos(TargetQoS, Selected, QoS), from = ActId, - flags = Flags, + flags = Flags#{retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), payload = format_msg(PayloadTks, Selected), timestamp = Timestamp - }); + }, + increase_and_publish(ActId, Message); %% in case this is not a "message.publish" request on_action_republish(Selected, _Envs = #{ @@ -180,27 +209,29 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, + } = Bindings}) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + Message = #message{ id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end, + qos = get_qos(TargetQoS, Selected, 0), from = ActId, - flags = #{dup => false, retain => false}, + flags = #{dup => false, retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), payload = format_msg(PayloadTks, Selected), timestamp = erlang:system_time(millisecond) - }). + }, + increase_and_publish(ActId, Message). increase_and_publish(ActId, Msg) -> _ = emqx_broker:safe_publish(Msg), emqx_rule_metrics:inc_actions_success(ActId), emqx_metrics:inc_msg(Msg). --spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_do_nothing(ActId, Params) when is_binary(ActId) -> Params. @@ -211,3 +242,57 @@ format_msg([], Data) -> emqx_json:encode(Data); format_msg(Tokens, Data) -> emqx_rule_utils:proc_tmpl(Tokens, Data). + +%% -1 for old version. +to_qos(<<"-1">>) -> -1; +to_qos(-1) -> -1; +to_qos(TargetQoS) -> + try + qos(TargetQoS) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetQoS) of + Tmpl = [{var, _}] -> + Tmpl; + _BadQoS -> + error({bad_qos, TargetQoS}) + end + end. + +get_qos(-1, _Data, Default) -> Default; +get_qos(TargetQoS, Data, _Default) -> + qos(emqx_rule_utils:replace_var(TargetQoS, Data)). + +qos(<<"0">>) -> 0; +qos(<<"1">>) -> 1; +qos(<<"2">>) -> 2; +qos(0) -> 0; +qos(1) -> 1; +qos(2) -> 2; +qos(BadQoS) -> error({bad_qos, BadQoS}). + +to_retain(TargetRetain) -> + try + retain(TargetRetain) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetRetain) of + Tmpl = [{var, _}] -> + Tmpl; + _BadRetain -> + error({bad_retain, TargetRetain}) + end + end. + +get_retain(TargetRetain, Data) -> + retain(emqx_rule_utils:replace_var(TargetRetain, Data)). + +retain(true) -> true; +retain(false) -> false; +retain(<<"true">>) -> true; +retain(<<"false">>) -> false; +retain(<<"1">>) -> true; +retain(<<"0">>) -> false; +retain(1) -> true; +retain(0) -> false; +retain(BadRetain) -> error({bad_retain, BadRetain}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index d287f1ad0..137e22128 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -16,6 +16,9 @@ -module(emqx_rule_utils). +-export([ replace_var/2 + ]). + %% preprocess and process tempalte string with place holders -export([ preproc_tmpl/1 , proc_tmpl/2 @@ -87,6 +90,14 @@ preproc_tmpl([[Str, Phld]| Tokens], Acc) -> preproc_tmpl([[Str]| Tokens], Acc) -> preproc_tmpl(Tokens, put_head(str, Str, Acc)). +%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result +%% value will be an integer 1. +replace_var(Tokens, Data) when is_list(Tokens) -> + [Val] = proc_tmpl(Tokens, Data, #{return => rawlist}), + Val; +replace_var(Val, _Data) -> + Val. + put_head(_Type, <<>>, List) -> List; put_head(Type, Term, List) -> [{Type, Term} | List]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl index e32ec66ab..237e5dfba 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_validator.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_validator.erl @@ -84,6 +84,8 @@ validate_spec(ParamsSepc) -> %% Internal Functions %%------------------------------------------------------------------------------ +validate_value(Val, #{type := Types} = Spec) when is_list(Types) -> + validate_types(Val, Types, Spec); validate_value(Val, #{enum := Enum}) -> validate_enum(Val, Enum); validate_value(Val, #{type := object} = Spec) -> @@ -91,6 +93,15 @@ validate_value(Val, #{type := object} = Spec) -> validate_value(Val, #{type := Type} = Spec) -> validate_type(Val, Type, Spec). +validate_types(Val, [], _Spec) -> + throw({invalid_data_type, Val}); +validate_types(Val, [Type | Types], Spec) -> + try + validate_type(Val, Type, Spec) + catch _:_ -> + validate_types(Val, Types, Spec) + end. + validate_type(Val, file, _Spec) -> validate_file(Val); validate_type(Val, String, Spec) when String =:= string; @@ -157,6 +168,9 @@ do_validate_spec(Name, #{type := array} = Spec) -> fun (not_found) -> throw({required_field_missing, {items, {in, Name}}}); (Items) -> do_validate_spec(Name, Items) end); +do_validate_spec(_Name, #{type := Types}) when is_list(Types) -> + _ = [ok = supported_data_type(Type, ?DATA_TYPES) || Type <- Types], + ok; do_validate_spec(_Name, #{type := Type}) -> _ = supported_data_type(Type, ?DATA_TYPES); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c871ad0e2..ff1b8663e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -407,15 +407,31 @@ t_reset_metrics(_Config) -> ok. t_republish_action(_Config) -> - Qos0Received = emqx_metrics:val('messages.qos0.received'), + TargetQoSList = [-1, 0, 1, 2, <<"${qos}">>], + TargetRetainList = [true, false, <<"${flags.retain}">>], + [[republish_action_test(TargetQoS, TargetRetain) || TargetRetain <- TargetRetainList] + || TargetQoS <- TargetQoSList], + ok. + +republish_action_test(TargetQoS, TargetRetain) -> + {QoSReceivedMetricsName, PubQoS} = + case TargetQoS of + <<"${qos}">> -> {'messages.qos0.received', 0}; + -1 -> {'messages.qos0.received', 0}; + 0 -> {'messages.qos0.received', 0}; + 1 -> {'messages.qos1.received', 1}; + 2 -> {'messages.qos2.received', 2} + end, + QosReceived = emqx_metrics:val(QoSReceivedMetricsName), Received = emqx_metrics:val('messages.received'), ok = emqx_rule_engine:load_providers(), {ok, #rule{id = Id, for = [<<"t1">>]}} = emqx_rule_engine:create_rule( - #{rawsql => <<"select topic, payload, qos from \"t1\"">>, + #{rawsql => <<"select * from \"t1\"">>, actions => [#{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => TargetQoS, + <<"target_retain">> => TargetRetain, <<"payload_tmpl">> => <<"${payload}">>}}], description => <<"builtin-republish-rule">>}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -423,7 +439,7 @@ t_republish_action(_Config) -> {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), Msg = <<"{\"id\": 1, \"name\": \"ha\"}">>, - emqtt:publish(Client, <<"t1">>, Msg, 0), + emqtt:publish(Client, <<"t1">>, Msg, PubQoS), receive {publish, #{topic := <<"t2">>, payload := Payload}} -> ?assertEqual(Msg, Payload) after 1000 -> @@ -431,7 +447,7 @@ t_republish_action(_Config) -> end, emqtt:stop(Client), emqx_rule_registry:remove_rule(Id), - ?assertEqual(2, emqx_metrics:val('messages.qos0.received') - Qos0Received), + ?assertEqual(2, emqx_metrics:val(QoSReceivedMetricsName) - QosReceived), ?assertEqual(2, emqx_metrics:val('messages.received') - Received), ok. @@ -481,7 +497,7 @@ t_crud_rule_api(_Config) -> {<<"params">>,[ {<<"arg1">>,1}, {<<"target_topic">>, <<"t2">>}, - {<<"target_qos">>, -1}, + {<<"target_qos">>, 0}, {<<"payload_tmpl">>, <<"${payload}">>} ]} ]] diff --git a/deploy/charts/emqx/templates/StatefulSet.yaml b/deploy/charts/emqx/templates/StatefulSet.yaml index fc798bbe5..fbe5d2ae2 100644 --- a/deploy/charts/emqx/templates/StatefulSet.yaml +++ b/deploy/charts/emqx/templates/StatefulSet.yaml @@ -206,13 +206,6 @@ spec: initialDelaySeconds: 60 periodSeconds: 30 failureThreshold: 10 - lifecycle: - preStop: - exec: - command: - - "/opt/emqx/bin/emqx_ctl" - - "cluster" - - "leave" {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/deploy/charts/emqx/templates/ingress.dashboard.yaml b/deploy/charts/emqx/templates/ingress.dashboard.yaml index 5a8bbd4a9..6b305334d 100644 --- a/deploy/charts/emqx/templates/ingress.dashboard.yaml +++ b/deploy/charts/emqx/templates/ingress.dashboard.yaml @@ -34,7 +34,7 @@ spec: paths: - path: / {{- if (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }} - pathType: {{ .Values.ingress.dashboard.pathType | default "ImplementationSpecific" }} + pathType: {{ $.Values.ingress.dashboard.pathType | default "ImplementationSpecific" }} {{- end }} backend: {{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }} diff --git a/deploy/charts/emqx/templates/ingress.mgmt.yaml b/deploy/charts/emqx/templates/ingress.mgmt.yaml index 6c3811f77..bd0940098 100644 --- a/deploy/charts/emqx/templates/ingress.mgmt.yaml +++ b/deploy/charts/emqx/templates/ingress.mgmt.yaml @@ -34,7 +34,7 @@ spec: paths: - path: {{ $.Values.ingress.mgmt.path | default "/" }} {{- if (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }} - pathType: {{ .Values.ingress.mgmt.pathType | default "ImplementationSpecific" }} + pathType: {{ $.Values.ingress.mgmt.pathType | default "ImplementationSpecific" }} {{- end }} backend: {{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }} diff --git a/deploy/charts/emqx/templates/ingress.wss.yaml b/deploy/charts/emqx/templates/ingress.wss.yaml index ec74889bc..86c12abc1 100644 --- a/deploy/charts/emqx/templates/ingress.wss.yaml +++ b/deploy/charts/emqx/templates/ingress.wss.yaml @@ -34,7 +34,7 @@ spec: paths: - path: {{ $.Values.ingress.wss.path | default "/mqtt" }} {{- if (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }} - pathType: {{ .Values.ingress.wss.pathType | default "ImplementationSpecific" }} + pathType: {{ $.Values.ingress.wss.pathType | default "ImplementationSpecific" }} {{- end }} backend: {{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }} diff --git a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf index 2d59264a1..f59f27a47 100644 --- a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf +++ b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf @@ -82,6 +82,12 @@ dashboard.listener.http.ipv6_v6only = false ## Value: File ## dashboard.listener.https.keyfile = etc/certs/key.pem +## String containing the private key file password. Only used if the +## private keyfile is password-protected. +## +## Value: String +## dashboard.listener.https.key_password = yourpass + ## Path to a file containing the user certificate. ## ## Value: File diff --git a/lib-ce/emqx_dashboard/priv/emqx_dashboard.schema b/lib-ce/emqx_dashboard/priv/emqx_dashboard.schema index a2985429b..43093c3ba 100644 --- a/lib-ce/emqx_dashboard/priv/emqx_dashboard.schema +++ b/lib-ce/emqx_dashboard/priv/emqx_dashboard.schema @@ -74,6 +74,10 @@ {datatype, string} ]}. +{mapping, "dashboard.listener.https.key_password", "emqx_dashboard.listeners", [ + {datatype, string} +]}. + {mapping, "dashboard.listener.https.certfile", "emqx_dashboard.listeners", [ {datatype, string} ]}. @@ -127,6 +131,7 @@ {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, + {password, cuttlefish:conf_get(Prefix ++ ".key_password", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, diff --git a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl index b7068f4df..b51d29291 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl @@ -23,7 +23,7 @@ -ifdef(TEST). -export([ compile/1 - , match_and_rewrite/2 + , match_and_rewrite/3 ]). -endif. @@ -49,14 +49,17 @@ load(RawRules) -> emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}). -rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> - {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. +rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) -> + Binds = fill_client_binds(ClientInfo), + {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. -rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> - {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. +rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) -> + Binds = fill_client_binds(ClientInfo), + {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}. rewrite_publish(Message = #message{topic = Topic}, Rules) -> - {ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}. + Binds = fill_client_binds(Message), + {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}. unload(_) -> emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), @@ -80,16 +83,16 @@ compile(Rules) -> end || {rewrite, sub, Topic, Re, Dest}<- Rules ], {PubRules, SubRules}. -match_and_rewrite(Topic, []) -> +match_and_rewrite(Topic, [], _) -> Topic; -match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) -> +match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules], Binds) -> case emqx_topic:match(Topic, Filter) of - true -> rewrite(Topic, MP, Dest); - false -> match_and_rewrite(Topic, Rules) + true -> rewrite(Topic, MP, Dest, Binds); + false -> match_and_rewrite(Topic, Rules, Binds) end. -rewrite(Topic, MP, Dest) -> +rewrite(Topic, MP, Dest, Binds) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> Vars = lists:zip(["\\$" ++ integer_to_list(I) @@ -97,7 +100,21 @@ rewrite(Topic, MP, Dest) -> iolist_to_binary(lists:foldl( fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) - end, Dest, Vars)); + end, Dest, Binds ++ Vars)); nomatch -> Topic end. +fill_client_binds(#{clientid := ClientId, username := Username}) -> + filter_client_binds([{"%c", ClientId}, {"%u", Username}]); + +fill_client_binds(#message{from = ClientId, headers = Headers}) -> + Username = maps:get(username, Headers, undefined), + filter_client_binds([{"%c", ClientId}, {"%u", Username}]). + +filter_client_binds(Binds) -> + lists:filter(fun({_, undefined}) -> false; + ({_, <<"">>}) -> false; + ({_, ""}) -> false; + (_) -> true + end, + Binds). diff --git a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl index 466f4a3f8..44574344b 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl @@ -23,7 +23,11 @@ -include_lib("eunit/include/eunit.hrl"). -define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>}, - {rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>} + {rewrite, pub, <<"name/#">>,<<"^name/(.+)$">>,<<"pub/%u/$1">>}, + {rewrite, pub, <<"c/#">>,<<"^c/(.+)$">>,<<"pub/%c/$1">>}, + {rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}, + {rewrite, sub, <<"name/#">>,<<"^name/(.+)$">>,<<"sub/%u/$1">>}, + {rewrite, sub, <<"c/#">>,<<"^c/(.+)$">>,<<"sub/%c/$1">>} ]). all() -> emqx_ct:all(?MODULE). @@ -41,16 +45,18 @@ end_per_suite(_Config) -> %% Test case for emqx_mod_write t_mod_rewrite(_Config) -> ok = emqx_mod_rewrite:load(?RULES), - {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), + {ok, C} = emqtt:start_link([{clientid, <<"c1">>}, {username , <<"u1">>}]), {ok, _} = emqtt:connect(C), - PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], - PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], - SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>], - SubDestTopics = [<<"y/z/b">>, <<"y/def">>], + + PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"name/1">>, <<"c/1">>], + PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"pub/u1/1">>, <<"pub/c1/1">>], + SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>, <<"name/1">>, <<"c/1">>], + SubDestTopics = [<<"y/z/b">>, <<"y/def">>, <<"sub/u1/1">>, <<"sub/c1/1">>], + %% Sub Rules {ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]), timer:sleep(100), - Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>), + Subscriptions = emqx_broker:subscriptions(<<"c1">>), ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), RecvTopics1 = [begin ok = emqtt:publish(C, Topic, <<"payload">>), @@ -60,7 +66,8 @@ t_mod_rewrite(_Config) -> ?assertEqual(SubDestTopics, RecvTopics1), {ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics), timer:sleep(100), - ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)), + ?assertEqual([], emqx_broker:subscriptions(<<"c1">>)), + %% Pub Rules {ok, _, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), RecvTopics2 = [begin @@ -76,10 +83,10 @@ t_mod_rewrite(_Config) -> t_rewrite_rule(_Config) -> {PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES), - ?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), - ?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), - ?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), - ?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). + ?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])), + ?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])), + ?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])), + ?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])). %%-------------------------------------------------------------------- %% Internal functions diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index d5c9645aa..641f61db3 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -17,66 +17,61 @@ bad_app_count=0 no_comment_re='(^[^\s?%])' ## TODO: c source code comments re (in $app_path/c_src dirs) -get_vsn() { - commit="$1" - app_src_file="$2" - if [ "$commit" = 'HEAD' ]; then - if [ -f "$app_src_file" ]; then - grep vsn "$app_src_file" | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"' || true - fi - else - git show "$commit":"$app_src_file" 2>/dev/null | grep vsn | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"' || true - fi +parse_semver() { + echo "$1" | tr '.|-' ' ' } check_apps() { - while read -r app_path; do - app=$(basename "$app_path") - src_file="$app_path/src/$app.app.src" - old_app_version="$(get_vsn "$latest_release" "$src_file")" - ## TODO: delete it after new version is released with emqx app in apps dir - if [ "$app" = 'emqx' ] && [ "$old_app_version" = '' ]; then - old_app_version="$(get_vsn "$latest_release" 'src/emqx.app.src')" - fi - now_app_version="$(get_vsn 'HEAD' "$src_file")" - ## TODO: delete it after new version is released with emqx app in apps dir - if [ "$app" = 'emqx' ] && [ "$now_app_version" = '' ]; then - now_app_version="$(get_vsn 'HEAD' 'src/emqx.app.src')" - fi - if [ -z "$now_app_version" ]; then - echo "failed_to_get_new_app_vsn for $app" - exit 1 - fi - if [ -z "${old_app_version:-}" ]; then - echo "skiped checking new app ${app}" - elif [ "$old_app_version" = "$now_app_version" ]; then - lines="$(git diff "$latest_release"...HEAD --ignore-blank-lines -G "$no_comment_re" \ + while read -r app; do + if [ "$app" != "emqx" ]; then + app_path="$app" + else + app_path="." + fi + src_file="$app_path/src/$(basename "$app").app.src" + old_app_version="$(git show "$latest_release":"$src_file" | grep vsn | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')" + now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') + if [ "$old_app_version" = "$now_app_version" ]; then + changed_lines="$(git diff "$latest_release"...HEAD --ignore-blank-lines -G "$no_comment_re" \ -- "$app_path/src" \ -- ":(exclude)'$app_path/src/*.appup.src'" \ -- "$app_path/priv" \ -- "$app_path/c_src" | wc -l ) " - if [ "$lines" -gt 0 ]; then - echo "$src_file needs a vsn bump (old=$old_app_version)" - echo "changed: $lines" - bad_app_count=$(( bad_app_count + 1)) - elif [ "$app" = 'emqx_dashboard' ]; then - ## emqx_dashboard is ensured to be upgraded after all other plugins - ## at the end of its appup instructions, there is the final instruction - ## {apply, {emqx_plugins, load, []} - ## since we don't know which plugins are stopped during the upgrade - ## for safety, we just force a dashboard version bump for each and every release - ## even if there is nothing changed in the app - echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade" - bad_app_count=$(( bad_app_count + 1)) - fi + if [ "$changed_lines" -gt 0 ]; then + echo "$src_file needs a vsn bump (old=$old_app_version)" + echo "changed: $changed_lines" + bad_app_count=$(( bad_app_count + 1)) + elif [ "$app" = 'emqx_dashboard' ]; then + ## emqx_dashboard is ensured to be upgraded after all other plugins + ## at the end of its appup instructions, there is the final instruction + ## {apply, {emqx_plugins, load, []} + ## since we don't know which plugins are stopped during the upgrade + ## for safety, we just force a dashboard version bump for each and every release + ## even if there is nothing changed in the app + echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade" + bad_app_count=$(( bad_app_count + 1)) fi - done < <(./scripts/find-apps.sh) - - if [ $bad_app_count -gt 0 ]; then - exit 1 else - echo "apps version check successfully" + # shellcheck disable=SC2207 + old_app_version_semver=($(parse_semver "$old_app_version")) + # shellcheck disable=SC2207 + now_app_version_semver=($(parse_semver "$now_app_version")) + if [ "${old_app_version_semver[0]}" = "${now_app_version_semver[0]}" ] && \ + [ "${old_app_version_semver[1]}" = "${now_app_version_semver[1]}" ] && \ + [ "$(( "${old_app_version_semver[2]}" + 1 ))" = "${now_app_version_semver[2]}" ]; then + true + else + echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version" + bad_app_count=$(( bad_app_count + 1)) + fi fi + done < <(./scripts/find-apps.sh) + + if [ $bad_app_count -gt 0 ]; then + exit 1 + else + echo "apps version check successfully" + fi } _main() { diff --git a/scripts/relup-base-packages.sh b/scripts/relup-base-packages.sh index 901460a24..3b0c2aee1 100755 --- a/scripts/relup-base-packages.sh +++ b/scripts/relup-base-packages.sh @@ -66,7 +66,7 @@ pushd _upgrade_base for tag in $(../scripts/relup-base-vsns.sh $EDITION | xargs echo -n); do filename="$PROFILE-${tag#[e|v]}-otp$OTP_VSN-$SYSTEM-$ARCH.zip" url="https://www.emqx.com/downloads/$DIR/${tag#[e|v]}/$filename" - if [ ! -f "$filename" ] && curl -I -m 10 -o /dev/null -s -w "%{http_code}" "${url}" | grep -q -oE "^[23]+" ; then + if [ ! -f "$filename" ] && curl -L -I -m 10 -o /dev/null -s -w "%{http_code}" "${url}" | grep -q -oE "^[23]+" ; then echo "downloading base package from ${url} ..." curl -L -o "${filename}" "${url}" if [ "$SYSTEM" != "centos6" ]; then diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index dd8dd9ca2..0ac5084ab 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -55,7 +55,10 @@ app_specific_actions(_) -> []. ignored_apps() -> - [gpb, emqx_dashboard, emqx_management] ++ otp_standard_apps(). + [gpb, %% only a build tool + emqx_dashboard, %% generic appup file for all versions + emqx_management %% generic appup file for all versions + ] ++ otp_standard_apps(). main(Args) -> #{prev_tag := Baseline} = Options = parse_args(Args, default_options()), @@ -530,8 +533,10 @@ contains_contents(File, Upgrade, Downgrade) -> index_apps(ReleaseDir) -> log("INFO: indexing apps in ~s~n", [ReleaseDir]), - Apps0 = maps:from_list([index_app(filename:join(ReleaseDir, AppFile)) || - AppFile <- filelib:wildcard("**/ebin/*.app", ReleaseDir)]), + AppFiles0 = filelib:wildcard("**/ebin/*.app", ReleaseDir), + %% everything in _build sub-dir e.g. cuttlefish/_build should be ignored + AppFiles = lists:filter(fun(File) -> re:run(File, "_build") =:= nomatch end, AppFiles0), + Apps0 = maps:from_list([index_app(filename:join(ReleaseDir, AppFile)) || AppFile <- AppFiles]), maps:without(ignored_apps(), Apps0). index_app(AppFile) -> diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c7cdd3e60..fc38c25da 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -5,6 +5,7 @@ [{add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 7bec83465..c919e25eb 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -429,8 +429,12 @@ inc_sent(Packet) -> do_inc_sent(?CONNACK_PACKET(ReasonCode)) -> (ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'), - (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.connack.auth_error'), - (ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'), + ((ReasonCode == ?RC_NOT_AUTHORIZED) + orelse (ReasonCode == ?CONNACK_AUTH)) + andalso inc('packets.connack.auth_error'), + ((ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) + orelse (ReasonCode == ?CONNACK_CREDENTIALS)) + andalso inc('packets.connack.auth_error'), inc('packets.connack.sent'); do_inc_sent(?PUBLISH_PACKET(QoS)) -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index c928d4d31..3177f05e7 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -387,7 +387,8 @@ plugin_loaded(_Name, false) -> ok; plugin_loaded(Name, true) -> case read_loaded() of - {ok, Names} -> + {ok, Names0} -> + Names = filter_plugins(Names0), case lists:member(Name, Names) of false -> %% write file if plugin is loaded diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 9d393bb44..9c771b705 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -587,6 +587,38 @@ t_connect_client_never_negative(Config) when is_list(Config) -> t_connect_client_never_negative({'end', _Config}) -> ok. +t_connack_auth_error({init, Config}) -> + process_flag(trap_exit, true), + emqx_ct_helpers:stop_apps([]), + emqx_ct_helpers:boot_modules(all), + Handler = + fun(emqx) -> + application:set_env(emqx, acl_nomatch, deny), + application:set_env(emqx, allow_anonymous, false), + application:set_env(emqx, enable_acl_cache, false), + ok; + (_) -> + ok + end, + emqx_ct_helpers:start_apps([], Handler), + Config; +t_connack_auth_error({'end', _Config}) -> + emqx_ct_helpers:stop_apps([]), + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + ok; +t_connack_auth_error(Config) when is_list(Config) -> + %% MQTT 3.1 + ?assertEqual(0, emqx_metrics:val('packets.connack.auth_error')), + {ok, C0} = emqtt:start_link([{proto_ver, v4}]), + ?assertEqual({error, {unauthorized_client, undefined}}, emqtt:connect(C0)), + ?assertEqual(1, emqx_metrics:val('packets.connack.auth_error')), + %% MQTT 5.0 + {ok, C1} = emqtt:start_link([{proto_ver, v5}]), + ?assertEqual({error, {not_authorized, #{}}}, emqtt:connect(C1)), + ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')), + ok. + wait_for_events(Action, Kinds) -> wait_for_events(Action, Kinds, 500).