Merge branch 'main-v4.3' into fix_sql_compare
This commit is contained in:
commit
88cf427ecc
|
@ -12,27 +12,21 @@ File format:
|
||||||
|
|
||||||
## v4.3.19
|
## v4.3.19
|
||||||
|
|
||||||
### Bug fixes
|
|
||||||
|
|
||||||
- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655)
|
|
||||||
- Fix rule SQL compare to null values always returns false. [#8743](https://github.com/emqx/emqx/pull/8743)
|
|
||||||
Before this change, the following SQL failed to match on the WHERE clause (`clientid != foo` returns false):
|
|
||||||
`SELECT 'some_var' as clientid FROM "t" WHERE clientid != foo`.
|
|
||||||
The `foo` variable is a null value, so `clientid != foo` should be evaluated as true.
|
|
||||||
|
|
||||||
## v4.3.19
|
|
||||||
|
|
||||||
### Enhancements
|
### Enhancements
|
||||||
|
|
||||||
- Improve error message for LwM2M plugin when object ID is not valid [#8654](https://github.com/emqx/emqx/pull/8654).
|
- Improve error message for LwM2M plugin when object ID is not valid [#8654](https://github.com/emqx/emqx/pull/8654).
|
||||||
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
|
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
|
||||||
- Add node evacuation and cluster rebalancing features [#8597]
|
- Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597)
|
||||||
|
- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737)
|
||||||
## v4.3.19
|
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
- Add a idle timer for ExProto UDP client to avoid client leaking [#8628]
|
- Fix rule SQL compare to null values always returns false. [#8743](https://github.com/emqx/emqx/pull/8743)
|
||||||
|
Before this change, the following SQL failed to match on the WHERE clause (`clientid != foo` returns false):
|
||||||
|
`SELECT 'some_var' as clientid FROM "t" WHERE clientid != foo`.
|
||||||
|
The `foo` variable is a null value, so `clientid != foo` should be evaluated as true.
|
||||||
|
- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655)
|
||||||
|
- Add a idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628)
|
||||||
|
|
||||||
## v4.3.18
|
## v4.3.18
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_mqtt,
|
{application, emqx_bridge_mqtt,
|
||||||
[{description, "EMQ X Bridge to MQTT Broker"},
|
[{description, "EMQ X Bridge to MQTT Broker"},
|
||||||
{vsn, "4.3.5"}, % strict semver, bump manually!
|
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,replayq,emqtt]},
|
{applications, [kernel,stdlib,replayq,emqtt]},
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.4",
|
[{<<"4\\.3\\.[4-5]">>,
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||||
|
@ -14,7 +14,7 @@
|
||||||
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.4",
|
[{<<"4\\.3\\.[4-5]">>,
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -433,7 +433,7 @@ test_resource_status(PoolName) ->
|
||||||
try
|
try
|
||||||
Status = [
|
Status = [
|
||||||
receive {Pid, R} -> R
|
receive {Pid, R} -> R
|
||||||
after 1000 -> %% get_worker_status/1 should be a quick operation
|
after 10000 -> %% get_worker_status/1 should be a quick operation
|
||||||
throw({timeout, Pid})
|
throw({timeout, Pid})
|
||||||
end || Pid <- Pids],
|
end || Pid <- Pids],
|
||||||
lists:any(fun(St) -> St =:= true end, Status)
|
lists:any(fun(St) -> St =:= true end, Status)
|
||||||
|
@ -444,13 +444,28 @@ test_resource_status(PoolName) ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-define(RETRY_TIMES, 4).
|
||||||
|
|
||||||
get_worker_status(Worker) ->
|
get_worker_status(Worker) ->
|
||||||
|
get_worker_status(Worker, ?RETRY_TIMES).
|
||||||
|
|
||||||
|
get_worker_status(_Worker, 0) ->
|
||||||
|
false;
|
||||||
|
get_worker_status(Worker, Times) ->
|
||||||
case ecpool_worker:client(Worker) of
|
case ecpool_worker:client(Worker) of
|
||||||
{ok, Bridge} ->
|
{ok, Bridge} ->
|
||||||
try emqx_bridge_worker:status(Bridge) of
|
try emqx_bridge_worker:status(Bridge) of
|
||||||
connected -> true;
|
connected ->
|
||||||
_ -> false
|
true;
|
||||||
catch _Error:_Reason ->
|
idle ->
|
||||||
|
?LOG(info, "MQTT Bridge get status idle. Should not ignore this."),
|
||||||
|
timer:sleep(100),
|
||||||
|
get_worker_status(Worker, Times - 1);
|
||||||
|
ErrorStatus ->
|
||||||
|
?LOG(error, "MQTT Bridge get status ~p", [ErrorStatus]),
|
||||||
|
false
|
||||||
|
catch Error:Reason:ST ->
|
||||||
|
?LOG(error, "MQTT Bridge get status error: ~p reason: ~p stacktrace: ~p", [Error, Reason, ST]),
|
||||||
false
|
false
|
||||||
end;
|
end;
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
|
|
|
@ -1,3 +1,19 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-compile({parse_transform, emqx_rule_actions_trans}).
|
-compile({parse_transform, emqx_rule_actions_trans}).
|
||||||
|
|
||||||
-type selected_data() :: map().
|
-type selected_data() :: map().
|
||||||
|
@ -6,6 +22,9 @@
|
||||||
|
|
||||||
-define(BINDING_KEYS, '__bindings__').
|
-define(BINDING_KEYS, '__bindings__').
|
||||||
|
|
||||||
|
-define(LOG_RULE_ACTION(Level, Metadata, Fmt, Args),
|
||||||
|
emqx_rule_utils:log_action(Level, Metadata, Fmt, Args)).
|
||||||
|
|
||||||
-define(bound_v(Key, ENVS0),
|
-define(bound_v(Key, ENVS0),
|
||||||
maps:get(Key,
|
maps:get(Key,
|
||||||
maps:get(?BINDING_KEYS, ENVS0, #{}))).
|
maps:get(?BINDING_KEYS, ENVS0, #{}))).
|
||||||
|
|
|
@ -171,9 +171,13 @@ on_action_create_republish(Id, Params = #{
|
||||||
on_action_republish(_Selected, Envs = #{
|
on_action_republish(_Selected, Envs = #{
|
||||||
topic := Topic,
|
topic := Topic,
|
||||||
headers := #{republish_by := ActId},
|
headers := #{republish_by := ActId},
|
||||||
?BINDING_KEYS := #{'Id' := ActId}
|
?BINDING_KEYS := #{'Id' := ActId},
|
||||||
|
metadata := Metadata
|
||||||
}) ->
|
}) ->
|
||||||
?LOG(error, "[republish] recursively republish detected, msg topic: ~p, target topic: ~p",
|
?LOG_RULE_ACTION(
|
||||||
|
error,
|
||||||
|
Metadata,
|
||||||
|
"[republish] recursively republish detected, msg topic: ~p, target topic: ~p",
|
||||||
[Topic, ?bound_v('TargetTopic', Envs)]),
|
[Topic, ?bound_v('TargetTopic', Envs)]),
|
||||||
emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)),
|
emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)),
|
||||||
{badact, recursively_republish};
|
{badact, recursively_republish};
|
||||||
|
@ -186,8 +190,9 @@ on_action_republish(Selected, _Envs = #{
|
||||||
'TargetQoS' := TargetQoS,
|
'TargetQoS' := TargetQoS,
|
||||||
'TopicTks' := TopicTks,
|
'TopicTks' := TopicTks,
|
||||||
'PayloadTks' := PayloadTks
|
'PayloadTks' := PayloadTks
|
||||||
} = Bindings}) ->
|
} = Bindings,
|
||||||
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
metadata := Metadata}) ->
|
||||||
|
?LOG_RULE_ACTION(debug, Metadata, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
||||||
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
||||||
Message =
|
Message =
|
||||||
#message{
|
#message{
|
||||||
|
@ -210,8 +215,9 @@ on_action_republish(Selected, _Envs = #{
|
||||||
'TargetQoS' := TargetQoS,
|
'TargetQoS' := TargetQoS,
|
||||||
'TopicTks' := TopicTks,
|
'TopicTks' := TopicTks,
|
||||||
'PayloadTks' := PayloadTks
|
'PayloadTks' := PayloadTks
|
||||||
} = Bindings}) ->
|
} = Bindings,
|
||||||
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
metadata := Metadata}) ->
|
||||||
|
?LOG_RULE_ACTION(debug, Metadata, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
||||||
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
||||||
Message =
|
Message =
|
||||||
#message{
|
#message{
|
||||||
|
|
|
@ -1,17 +1,21 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.13",[
|
[{"4.3.13",
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
]},
|
|
||||||
{"4.3.12",[
|
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
|
||||||
]},
|
|
||||||
{"4.3.11",
|
|
||||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.12",
|
||||||
|
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.11",
|
||||||
|
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.10",
|
{"4.3.10",
|
||||||
|
@ -173,16 +177,21 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.13",[
|
[{"4.3.13",
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{"4.3.12",[
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{"4.3.12",
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.11",
|
{"4.3.11",
|
||||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-module(emqx_rule_runtime).
|
-module(emqx_rule_runtime).
|
||||||
|
|
||||||
-include("rule_engine.hrl").
|
-include("rule_engine.hrl").
|
||||||
|
-include("rule_actions.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
@ -54,36 +55,37 @@ apply_rules([Rule|More], Input) ->
|
||||||
apply_rule(Rule, Input),
|
apply_rule(Rule, Input),
|
||||||
apply_rules(More, Input).
|
apply_rules(More, Input).
|
||||||
|
|
||||||
apply_rule(Rule = #rule{id = RuleID}, Input) ->
|
apply_rule(Rule = #rule{id = RuleId}, Input) ->
|
||||||
clear_rule_payload(),
|
clear_rule_payload(),
|
||||||
ok = emqx_rule_metrics:inc_rules_matched(RuleID),
|
ok = emqx_rule_metrics:inc_rules_matched(RuleId),
|
||||||
try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
|
%% Add metadata here caused we need support `metadata` and `rule_id` in SQL
|
||||||
|
try do_apply_rule(Rule, emqx_rule_utils:add_metadata(Input, #{rule_id => RuleId}))
|
||||||
catch
|
catch
|
||||||
%% ignore the errors if select or match failed
|
%% ignore the errors if select or match failed
|
||||||
_:Reason = {select_and_transform_error, Error} ->
|
_:Reason = {select_and_transform_error, Error} ->
|
||||||
emqx_rule_metrics:inc_rules_exception(RuleID),
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
||||||
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
|
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]),
|
[RuleId, Error]),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
_:Reason = {match_conditions_error, Error} ->
|
_:Reason = {match_conditions_error, Error} ->
|
||||||
emqx_rule_metrics:inc_rules_exception(RuleID),
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
||||||
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
|
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]),
|
[RuleId, Error]),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
_:Reason = {select_and_collect_error, Error} ->
|
_:Reason = {select_and_collect_error, Error} ->
|
||||||
emqx_rule_metrics:inc_rules_exception(RuleID),
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
||||||
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
|
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]),
|
[RuleId, Error]),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
_:Reason = {match_incase_error, Error} ->
|
_:Reason = {match_incase_error, Error} ->
|
||||||
emqx_rule_metrics:inc_rules_exception(RuleID),
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
||||||
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
|
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]),
|
[RuleId, Error]),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
_:Error:StkTrace ->
|
_:Error:StkTrace ->
|
||||||
emqx_rule_metrics:inc_rules_exception(RuleID),
|
emqx_rule_metrics:inc_rules_exception(RuleId),
|
||||||
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
|
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
|
||||||
[RuleID, Error, StkTrace]),
|
[RuleId, Error, StkTrace]),
|
||||||
{error, {Error, StkTrace}}
|
{error, {Error, StkTrace}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -247,9 +249,10 @@ number(Bin) ->
|
||||||
catch error:badarg -> binary_to_float(Bin)
|
catch error:badarg -> binary_to_float(Bin)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Step3 -> Take actions
|
%% %% Step3 -> Take actions
|
||||||
|
%% fallback actions already have `rule_id` in `metadata`
|
||||||
take_actions(Actions, Selected, Envs, OnFailed) ->
|
take_actions(Actions, Selected, Envs, OnFailed) ->
|
||||||
[take_action(ActInst, Selected, Envs, OnFailed, ?ActionMaxRetry)
|
[take_action(ActInst, Selected, emqx_rule_utils:add_metadata(Envs, ActInst), OnFailed, ?ActionMaxRetry)
|
||||||
|| ActInst <- Actions].
|
|| ActInst <- Actions].
|
||||||
|
|
||||||
take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
|
take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
|
||||||
|
@ -314,12 +317,12 @@ wait_action_on(Id, RetryN) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
|
handle_action_failure(continue, _Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
|
||||||
?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
|
?LOG_RULE_ACTION(error, Metadata, "Continue next action, reason: ~0p", [Reason]),
|
||||||
_ = take_actions(Fallbacks, Selected, Envs, continue),
|
_ = take_actions(Fallbacks, Selected, Envs, continue),
|
||||||
failed;
|
failed;
|
||||||
handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
|
handle_action_failure(stop, Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
|
||||||
?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
|
?LOG_RULE_ACTION(error, Metadata, "Skip all actions, reason: ~0p", [Reason]),
|
||||||
_ = take_actions(Fallbacks, Selected, Envs, continue),
|
_ = take_actions(Fallbacks, Selected, Envs, continue),
|
||||||
error({take_action_failed, {Id, Reason}}).
|
error({take_action_failed, {Id, Reason}}).
|
||||||
|
|
||||||
|
@ -431,10 +434,6 @@ do_apply_func(Name, Args, Input) ->
|
||||||
Result -> Result
|
Result -> Result
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
|
|
||||||
NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata),
|
|
||||||
Input#{metadata => NewMetadata}.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
|
|
||||||
-module(emqx_rule_utils).
|
-module(emqx_rule_utils).
|
||||||
|
|
||||||
|
-include("rule_engine.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-export([ replace_var/2
|
-export([ replace_var/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -59,6 +62,10 @@
|
||||||
, can_topic_match_oneof/2
|
, can_topic_match_oneof/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ add_metadata/2
|
||||||
|
, log_action/4
|
||||||
|
]).
|
||||||
|
|
||||||
-compile({no_auto_import,
|
-compile({no_auto_import,
|
||||||
[ float/1
|
[ float/1
|
||||||
]}).
|
]}).
|
||||||
|
@ -371,3 +378,30 @@ can_topic_match_oneof(Topic, Filters) ->
|
||||||
lists:any(fun(Fltr) ->
|
lists:any(fun(Fltr) ->
|
||||||
emqx_topic:match(Topic, Fltr)
|
emqx_topic:match(Topic, Fltr)
|
||||||
end, Filters).
|
end, Filters).
|
||||||
|
|
||||||
|
add_metadata(Envs, Metadata) when is_map(Envs), is_map(Metadata) ->
|
||||||
|
NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata),
|
||||||
|
Envs#{metadata => NMetadata};
|
||||||
|
add_metadata(Envs, Action) when is_map(Envs), is_record(Action, action_instance)->
|
||||||
|
Metadata = gen_metadata_from_action(Action),
|
||||||
|
NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata),
|
||||||
|
Envs#{metadata => NMetadata}.
|
||||||
|
|
||||||
|
gen_metadata_from_action(#action_instance{name = Name, args = undefined}) ->
|
||||||
|
#{action_name => Name, resource_id => undefined};
|
||||||
|
gen_metadata_from_action(#action_instance{name = Name, args = Args})
|
||||||
|
when is_map(Args) ->
|
||||||
|
#{action_name => Name, resource_id => maps:get(<<"$resource">>, Args, undefined)};
|
||||||
|
gen_metadata_from_action(#action_instance{name = Name}) ->
|
||||||
|
#{action_name => Name, resource_id => undefined}.
|
||||||
|
|
||||||
|
log_action(Level, Metadata, Fmt, Args) ->
|
||||||
|
?LOG(Level,
|
||||||
|
"Rule: ~p; Action: ~p; Resource: ~p. " ++ Fmt,
|
||||||
|
metadata_values(Metadata) ++ Args).
|
||||||
|
|
||||||
|
metadata_values(Metadata) ->
|
||||||
|
RuleId = maps:get(rule_id, Metadata, undefined),
|
||||||
|
ActionName = maps:get(action_name, Metadata, undefined),
|
||||||
|
ResourceName = maps:get(resource_id, Metadata, undefined),
|
||||||
|
[RuleId, ActionName, ResourceName].
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_web_hook,
|
{application, emqx_web_hook,
|
||||||
[{description, "EMQ X WebHook Plugin"},
|
[{description, "EMQ X WebHook Plugin"},
|
||||||
{vsn, "4.3.13"}, % strict semver, bump manually!
|
{vsn, "4.3.14"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_web_hook_sup]},
|
{registered, [emqx_web_hook_sup]},
|
||||||
{applications, [kernel,stdlib,ehttpc]},
|
{applications, [kernel,stdlib,ehttpc]},
|
||||||
|
|
|
@ -1,12 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{<<"4\\.3\\.[0-2]">>,
|
[{<<"4\\.3\\.[0-7]">>,
|
||||||
[{apply,{application,stop,[emqx_web_hook]}},
|
|
||||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
|
||||||
{<<"4\\.3\\.[3-7]">>,
|
|
||||||
[{apply,{application,stop,[emqx_web_hook]}},
|
[{apply,{application,stop,[emqx_web_hook]}},
|
||||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||||
|
@ -26,15 +21,10 @@
|
||||||
{"4.3.11",
|
{"4.3.11",
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.12",
|
{<<"4\\.3\\.1[2-3]">>,
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{<<"4\\.3\\.[0-2]">>,
|
[{<<"4\\.3\\.[0-7]">>,
|
||||||
[{apply,{application,stop,[emqx_web_hook]}},
|
|
||||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
|
||||||
{<<"4\\.3\\.[3-7]">>,
|
|
||||||
[{apply,{application,stop,[emqx_web_hook]}},
|
[{apply,{application,stop,[emqx_web_hook]}},
|
||||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||||
|
@ -54,6 +44,6 @@
|
||||||
{"4.3.11",
|
{"4.3.11",
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.12",
|
{<<"4\\.3\\.1[2-3]">>,
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}]}.
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -259,25 +259,27 @@ on_action_data_to_webserver(Selected, _Envs =
|
||||||
'BodyTokens' := BodyTokens,
|
'BodyTokens' := BodyTokens,
|
||||||
'Pool' := Pool,
|
'Pool' := Pool,
|
||||||
'RequestTimeout' := RequestTimeout},
|
'RequestTimeout' := RequestTimeout},
|
||||||
clientid := ClientID}) ->
|
clientid := ClientID,
|
||||||
|
metadata := Metadata}) ->
|
||||||
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
|
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
|
||||||
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
|
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
|
||||||
Req = create_req(Method, NPath, Headers, NBody),
|
Req = create_req(Method, NPath, Headers, NBody),
|
||||||
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
|
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
|
||||||
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
||||||
|
?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]),
|
||||||
emqx_rule_metrics:inc_actions_success(Id);
|
emqx_rule_metrics:inc_actions_success(Id);
|
||||||
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
||||||
emqx_rule_metrics:inc_actions_success(Id);
|
emqx_rule_metrics:inc_actions_success(Id);
|
||||||
{ok, StatusCode, _} ->
|
{ok, StatusCode, _} ->
|
||||||
?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
|
?LOG_RULE_ACTION(warning, Metadata, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
|
||||||
emqx_rule_metrics:inc_actions_error(Id),
|
emqx_rule_metrics:inc_actions_error(Id),
|
||||||
{badact, StatusCode};
|
{badact, StatusCode};
|
||||||
{ok, StatusCode, _, _} ->
|
{ok, StatusCode, _, _} ->
|
||||||
?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
|
?LOG_RULE_ACTION(warning, Metadata, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
|
||||||
emqx_rule_metrics:inc_actions_error(Id),
|
emqx_rule_metrics:inc_actions_error(Id),
|
||||||
{badact, StatusCode};
|
{badact, StatusCode};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "HTTP request failed path: ~p error: ~p", [NPath, Reason]),
|
?LOG_RULE_ACTION(error, Metadata, "HTTP request failed path: ~p error: ~p", [NPath, Reason]),
|
||||||
emqx_rule_metrics:inc_actions_error(Id),
|
emqx_rule_metrics:inc_actions_error(Id),
|
||||||
{badact, Reason}
|
{badact, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -41,7 +41,7 @@
|
||||||
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||||
, {redbug, "2.0.7"}
|
, {redbug, "2.0.7"}
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
|
||||||
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.2"}}}
|
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.3"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
||||||
|
|
Loading…
Reference in New Issue