diff --git a/apps/emqx_auth/include/emqx_authn.hrl b/apps/emqx_auth/include/emqx_authn.hrl index 782bfb9ca..d7ea32a92 100644 --- a/apps/emqx_auth/include/emqx_authn.hrl +++ b/apps/emqx_auth/include/emqx_authn.hrl @@ -28,7 +28,7 @@ -type authenticator_id() :: binary(). --define(AUTHN_RESOURCE_GROUP, <<"emqx_authn">>). +-define(AUTHN_RESOURCE_GROUP, <<"authn">>). %% VAR_NS_CLIENT_ATTRS is added here because it can be initialized before authn. %% NOTE: authn return may add more to (or even overwrite) client_attrs. diff --git a/apps/emqx_auth/include/emqx_authz.hrl b/apps/emqx_auth/include/emqx_authz.hrl index c638646f8..a7bef4b28 100644 --- a/apps/emqx_auth/include/emqx_authz.hrl +++ b/apps/emqx_auth/include/emqx_authz.hrl @@ -156,7 +156,7 @@ count => 1 }). --define(AUTHZ_RESOURCE_GROUP, <<"emqx_authz">>). +-define(AUTHZ_RESOURCE_GROUP, <<"authz">>). -define(AUTHZ_FEATURES, [rich_actions]). diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl index 35f585e44..2c42c00e2 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl @@ -198,7 +198,7 @@ qos_from_opts(Opts) -> ) end catch - {bad_qos, QoS} -> + throw:{bad_qos, QoS} -> throw(#{ reason => invalid_authorization_qos, qos => QoS diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl index 90c4e4fec..6e179b02c 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl @@ -16,6 +16,9 @@ -module(emqx_authz_utils). +-feature(maybe_expr, enable). + +-include_lib("emqx/include/emqx_placeholder.hrl"). -include_lib("emqx_authz.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). @@ -28,7 +31,7 @@ remove_resource/1, update_config/2, vars_for_rule_query/2, - parse_rule_from_row/2 + do_authorize/6 ]). -export([ @@ -133,14 +136,18 @@ content_type(Headers) when is_list(Headers) -> -define(RAW_RULE_KEYS, [<<"permission">>, <<"action">>, <<"topic">>, <<"qos">>, <<"retain">>]). -parse_rule_from_row(ColumnNames, Row) -> - RuleRaw = maps:with(?RAW_RULE_KEYS, maps:from_list(lists:zip(ColumnNames, to_list(Row)))), - case emqx_authz_rule_raw:parse_rule(RuleRaw) of +-spec parse_rule_from_row([binary()], [binary()] | map()) -> + {ok, emqx_authz_rule:rule()} | {error, term()}. +parse_rule_from_row(_ColumnNames, RuleMap = #{}) -> + case emqx_authz_rule_raw:parse_rule(RuleMap) of {ok, {Permission, Action, Topics}} -> - emqx_authz_rule:compile({Permission, all, Action, Topics}); + {ok, emqx_authz_rule:compile({Permission, all, Action, Topics})}; {error, Reason} -> - error(Reason) - end. + {error, Reason} + end; +parse_rule_from_row(ColumnNames, Row) -> + RuleMap = maps:with(?RAW_RULE_KEYS, maps:from_list(lists:zip(ColumnNames, to_list(Row)))), + parse_rule_from_row(ColumnNames, RuleMap). vars_for_rule_query(Client, ?authz_action(PubSub, Qos) = Action) -> Client#{ @@ -157,3 +164,39 @@ to_list(Tuple) when is_tuple(Tuple) -> tuple_to_list(Tuple); to_list(List) when is_list(List) -> List. + +do_authorize(Type, Client, Action, Topic, ColumnNames, Row) -> + try + maybe + {ok, Rule} ?= parse_rule_from_row(ColumnNames, Row), + {matched, Permission} ?= emqx_authz_rule:match(Client, Action, Topic, Rule), + {matched, Permission} + else + nomatch -> + nomatch; + {error, Reason0} -> + log_match_rule_error(Type, Row, Reason0), + nomatch + end + catch + throw:Reason1 -> + log_match_rule_error(Type, Row, Reason1), + nomatch + end. + +log_match_rule_error(Type, Row, Reason0) -> + Msg0 = #{ + msg => "match_rule_error", + rule => Row, + type => Type + }, + Msg1 = + case is_map(Reason0) of + true -> maps:merge(Msg0, Reason0); + false -> Msg0#{reason => Reason0} + end, + ?SLOG( + error, + Msg1, + #{tag => "AUTHZ"} + ). diff --git a/apps/emqx_auth/test/emqx_authn/emqx_authn_schema_SUITE.erl b/apps/emqx_auth/test/emqx_authn/emqx_authn_schema_SUITE.erl index f2688fff9..46eb18b82 100644 --- a/apps/emqx_auth/test/emqx_authn/emqx_authn_schema_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authn/emqx_authn_schema_SUITE.erl @@ -122,14 +122,6 @@ t_union_member_selector(_) -> }, check(BadMechanism) ), - BadCombination = Base#{<<"mechanism">> => <<"scram">>, <<"backend">> => <<"http">>}, - ?assertThrow( - #{ - reason := "unknown_mechanism", - expected := "password_based" - }, - check(BadCombination) - ), ok. t_http_auth_selector(_) -> diff --git a/apps/emqx_auth_http/include/emqx_auth_http.hrl b/apps/emqx_auth_http/include/emqx_auth_http.hrl index 9fc3b029e..c0bfa2177 100644 --- a/apps/emqx_auth_http/include/emqx_auth_http.hrl +++ b/apps/emqx_auth_http/include/emqx_auth_http.hrl @@ -22,8 +22,13 @@ -define(AUTHN_MECHANISM, password_based). -define(AUTHN_MECHANISM_BIN, <<"password_based">>). + +-define(AUTHN_MECHANISM_SCRAM, scram). +-define(AUTHN_MECHANISM_SCRAM_BIN, <<"scram">>). + -define(AUTHN_BACKEND, http). -define(AUTHN_BACKEND_BIN, <<"http">>). -define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}). +-define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}). -endif. diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index b97743b41..3d8ae0dad 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -25,10 +25,12 @@ start(_StartType, _StartArgs) -> ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http), ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http), + ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http), {ok, Sup} = emqx_auth_http_sup:start_link(), {ok, Sup}. stop(_State) -> ok = emqx_authn:deregister_provider(?AUTHN_TYPE), + ok = emqx_authn:deregister_provider(?AUTHN_TYPE_SCRAM), ok = emqx_authz:unregister_source(?AUTHZ_TYPE), ok. diff --git a/apps/emqx_auth_http/src/emqx_authn_http.erl b/apps/emqx_auth_http/src/emqx_authn_http.erl index 818e355e5..4e7770334 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http.erl @@ -28,6 +28,13 @@ destroy/1 ]). +-export([ + with_validated_config/2, + generate_request/2, + request_for_log/2, + response_for_log/1 +]). + -define(DEFAULT_CONTENT_TYPE, <<"application/json">>). %%------------------------------------------------------------------------------ diff --git a/apps/emqx_auth_http/src/emqx_authn_http_schema.erl b/apps/emqx_auth_http/src/emqx_authn_http_schema.erl index aff16b824..0167571c0 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http_schema.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http_schema.erl @@ -27,6 +27,8 @@ namespace/0 ]). +-export([url/1, headers/1, headers_no_content_type/1, request_timeout/1]). + -include("emqx_auth_http.hrl"). -include_lib("emqx_auth/include/emqx_authn.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -61,12 +63,6 @@ select_union_member( got => Else }) end; -select_union_member(#{<<"backend">> := ?AUTHN_BACKEND_BIN}) -> - throw(#{ - reason => "unknown_mechanism", - expected => "password_based", - got => undefined - }); select_union_member(_Value) -> undefined. diff --git a/apps/emqx_auth_http/src/emqx_authn_scram_http.erl b/apps/emqx_auth_http/src/emqx_authn_scram_http.erl new file mode 100644 index 000000000..0e6190b4b --- /dev/null +++ b/apps/emqx_auth_http/src/emqx_authn_scram_http.erl @@ -0,0 +1,179 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_authn_scram_http). + +-include_lib("emqx_auth/include/emqx_authn.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-behaviour(emqx_authn_provider). + +-export([ + create/2, + update/2, + authenticate/2, + destroy/1 +]). + +-define(REQUIRED_USER_INFO_KEYS, [ + <<"stored_key">>, + <<"server_key">>, + <<"salt">> +]). + +-define(OPTIONAL_USER_INFO_KEYS, [ + <<"is_superuser">> +]). + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + +create(_AuthenticatorID, Config) -> + create(Config). + +create(Config0) -> + emqx_authn_http:with_validated_config(Config0, fun(Config, State) -> + ResourceId = emqx_authn_utils:make_resource_id(?MODULE), + % {Config, State} = parse_config(Config0), + {ok, _Data} = emqx_authn_utils:create_resource( + ResourceId, + emqx_bridge_http_connector, + Config + ), + {ok, merge_scram_conf(Config, State#{resource_id => ResourceId})} + end). + +update(Config0, #{resource_id := ResourceId} = _State) -> + emqx_authn_http:with_validated_config(Config0, fun(Config, NState) -> + % {Config, NState} = parse_config(Config0), + case emqx_authn_utils:update_resource(emqx_bridge_http_connector, Config, ResourceId) of + {error, Reason} -> + error({load_config_error, Reason}); + {ok, _} -> + {ok, merge_scram_conf(Config, NState#{resource_id => ResourceId})} + end + end). + +authenticate( + #{ + auth_method := AuthMethod, + auth_data := AuthData, + auth_cache := AuthCache + } = Credential, + State +) -> + RetrieveFun = fun(Username) -> + retrieve(Username, Credential, State) + end, + OnErrFun = fun(Msg, Reason) -> + ?TRACE_AUTHN_PROVIDER(Msg, #{ + reason => Reason + }) + end, + emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State); +authenticate(_Credential, _State) -> + ignore. + +destroy(#{resource_id := ResourceId}) -> + _ = emqx_resource:remove_local(ResourceId), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +retrieve( + Username, + Credential, + #{ + resource_id := ResourceId, + method := Method, + request_timeout := RequestTimeout + } = State +) -> + Request = emqx_authn_http:generate_request(Credential#{username := Username}, State), + Response = emqx_resource:simple_sync_query(ResourceId, {Method, Request, RequestTimeout}), + ?TRACE_AUTHN_PROVIDER("scram_http_response", #{ + request => emqx_authn_http:request_for_log(Credential, State), + response => emqx_authn_http:response_for_log(Response), + resource => ResourceId + }), + case Response of + {ok, 200, Headers, Body} -> + handle_response(Headers, Body); + {ok, _StatusCode, _Headers} -> + {error, bad_response}; + {ok, _StatusCode, _Headers, _Body} -> + {error, bad_response}; + {error, _Reason} = Error -> + Error + end. + +handle_response(Headers, Body) -> + ContentType = proplists:get_value(<<"content-type">>, Headers), + case safely_parse_body(ContentType, Body) of + {ok, NBody} -> + body_to_user_info(NBody); + {error, Reason} = Error -> + ?TRACE_AUTHN_PROVIDER( + error, + "parse_scram_http_response_failed", + #{content_type => ContentType, body => Body, reason => Reason} + ), + Error + end. + +body_to_user_info(Body) -> + Required0 = maps:with(?REQUIRED_USER_INFO_KEYS, Body), + case maps:size(Required0) =:= erlang:length(?REQUIRED_USER_INFO_KEYS) of + true -> + case safely_convert_hex(Required0) of + {ok, Required} -> + UserInfo0 = maps:merge(Required, maps:with(?OPTIONAL_USER_INFO_KEYS, Body)), + UserInfo1 = emqx_utils_maps:safe_atom_key_map(UserInfo0), + UserInfo = maps:merge(#{is_superuser => false}, UserInfo1), + {ok, UserInfo}; + Error -> + Error + end; + _ -> + ?TRACE_AUTHN_PROVIDER("bad_response_body", #{http_body => Body}), + {error, bad_response} + end. + +safely_parse_body(ContentType, Body) -> + try + parse_body(ContentType, Body) + catch + _Class:_Reason -> + {error, invalid_body} + end. + +safely_convert_hex(Required) -> + try + {ok, + maps:map( + fun(_Key, Hex) -> + binary:decode_hex(Hex) + end, + Required + )} + catch + _Class:Reason -> + {error, Reason} + end. + +parse_body(<<"application/json", _/binary>>, Body) -> + {ok, emqx_utils_json:decode(Body, [return_maps])}; +parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) -> + Flags = ?REQUIRED_USER_INFO_KEYS ++ ?OPTIONAL_USER_INFO_KEYS, + RawMap = maps:from_list(cow_qs:parse_qs(Body)), + NBody = maps:with(Flags, RawMap), + {ok, NBody}; +parse_body(ContentType, _) -> + {error, {unsupported_content_type, ContentType}}. + +merge_scram_conf(Conf, State) -> + maps:merge(maps:with([algorithm, iteration_count], Conf), State). diff --git a/apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl b/apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl new file mode 100644 index 000000000..ca43fe3a6 --- /dev/null +++ b/apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl @@ -0,0 +1,81 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_authn_scram_http_schema). + +-behaviour(emqx_authn_schema). + +-export([ + fields/1, + validations/0, + desc/1, + refs/0, + select_union_member/1, + namespace/0 +]). + +-include("emqx_auth_http.hrl"). +-include_lib("emqx_auth/include/emqx_authn.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +namespace() -> "authn". + +refs() -> + [?R_REF(scram_http_get), ?R_REF(scram_http_post)]. + +select_union_member( + #{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN} = Value +) -> + case maps:get(<<"method">>, Value, undefined) of + <<"get">> -> + [?R_REF(scram_http_get)]; + <<"post">> -> + [?R_REF(scramm_http_post)]; + Else -> + throw(#{ + reason => "unknown_http_method", + expected => "get | post", + field_name => method, + got => Else + }) + end; +select_union_member(_Value) -> + undefined. + +fields(scram_http_get) -> + [ + {method, #{type => get, required => true, desc => ?DESC(emqx_authn_http_schema, method)}}, + {headers, fun emqx_authn_http_schema:headers_no_content_type/1} + ] ++ common_fields(); +fields(scram_http_post) -> + [ + {method, #{type => post, required => true, desc => ?DESC(emqx_authn_http_schema, method)}}, + {headers, fun emqx_authn_http_schema:headers/1} + ] ++ common_fields(). + +desc(scram_http_get) -> + ?DESC(emqx_authn_http_schema, get); +desc(scram_http_post) -> + ?DESC(emqx_authn_http_schema, post); +desc(_) -> + undefined. + +validations() -> + emqx_authn_http_schema:validations(). + +common_fields() -> + emqx_authn_schema:common_fields() ++ + [ + {mechanism, emqx_authn_schema:mechanism(?AUTHN_MECHANISM_SCRAM)}, + {backend, emqx_authn_schema:backend(?AUTHN_BACKEND)}, + {algorithm, fun emqx_authn_scram_mnesia_schema:algorithm/1}, + {iteration_count, fun emqx_authn_scram_mnesia_schema:iteration_count/1}, + {url, fun emqx_authn_http_schema:url/1}, + {body, + hoconsc:mk(typerefl:alias("map", map([{fuzzy, term(), binary()}])), #{ + required => false, desc => ?DESC(emqx_authn_http_schema, body) + })}, + {request_timeout, fun emqx_authn_http_schema:request_timeout/1} + ] ++ + proplists:delete(pool_type, emqx_bridge_http_connector:fields(config)). diff --git a/apps/emqx_auth_http/src/emqx_authz_http.erl b/apps/emqx_auth_http/src/emqx_authz_http.erl index b1c2ef0ab..6d9ffff68 100644 --- a/apps/emqx_auth_http/src/emqx_authz_http.erl +++ b/apps/emqx_auth_http/src/emqx_authz_http.erl @@ -67,7 +67,11 @@ description() -> create(Config) -> NConfig = parse_config(Config), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_bridge_http_connector, NConfig), + {ok, _Data} = emqx_authz_utils:create_resource( + ResourceId, + emqx_bridge_http_connector, + NConfig + ), NConfig#{annotations => #{id => ResourceId}}. update(Config) -> diff --git a/apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl new file mode 100644 index 000000000..b00212cb1 --- /dev/null +++ b/apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl @@ -0,0 +1,438 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_authn_scram_http_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_auth/include/emqx_authn.hrl"). + +-define(PATH, [authentication]). + +-define(HTTP_PORT, 34333). +-define(HTTP_PATH, "/user/[...]"). +-define(ALGORITHM, sha512). +-define(ALGORITHM_STR, <<"sha512">>). +-define(ITERATION_COUNT, 4096). + +-include_lib("emqx/include/emqx_placeholder.hrl"). + +all() -> + case emqx_release:edition() of + ce -> + []; + _ -> + emqx_common_test_helpers:all(?MODULE) + end. + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start([cowboy, emqx, emqx_conf, emqx_auth, emqx_auth_http], #{ + work_dir => ?config(priv_dir, Config) + }), + + IdleTimeout = emqx_config:get([mqtt, idle_timeout]), + [{apps, Apps}, {idle_timeout, IdleTimeout} | Config]. + +end_per_suite(Config) -> + ok = emqx_config:put([mqtt, idle_timeout], ?config(idle_timeout, Config)), + emqx_authn_test_lib:delete_authenticators( + [authentication], + ?GLOBAL + ), + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +init_per_testcase(_Case, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + emqx_authn_test_lib:delete_authenticators( + [authentication], + ?GLOBAL + ), + {ok, _} = emqx_authn_scram_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH), + Config. + +end_per_testcase(_Case, _Config) -> + ok = emqx_authn_scram_http_test_server:stop(). + +%%------------------------------------------------------------------------------ +%% Tests +%%------------------------------------------------------------------------------ + +t_create(_Config) -> + AuthConfig = raw_config(), + + {ok, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, AuthConfig} + ), + + {ok, [#{provider := emqx_authn_scram_http}]} = emqx_authn_chains:list_authenticators(?GLOBAL). + +t_create_invalid(_Config) -> + AuthConfig = raw_config(), + + InvalidConfigs = + [ + AuthConfig#{<<"headers">> => []}, + AuthConfig#{<<"method">> => <<"delete">>}, + AuthConfig#{<<"url">> => <<"localhost">>}, + AuthConfig#{<<"url">> => <<"http://foo.com/xxx#fragment">>}, + AuthConfig#{<<"url">> => <<"http://${foo}.com/xxx">>}, + AuthConfig#{<<"url">> => <<"//foo.com/xxx">>}, + AuthConfig#{<<"algorithm">> => <<"sha128">>} + ], + + lists:foreach( + fun(Config) -> + ct:pal("creating authenticator with invalid config: ~p", [Config]), + {error, _} = + try + emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, Config} + ) + catch + throw:Error -> + {error, Error} + end, + ?assertEqual( + {error, {not_found, {chain, ?GLOBAL}}}, + emqx_authn_chains:list_authenticators(?GLOBAL) + ) + end, + InvalidConfigs + ). + +t_authenticate(_Config) -> + Username = <<"u">>, + Password = <<"p">>, + + set_user_handler(Username, Password), + init_auth(), + + ok = emqx_config:put([mqtt, idle_timeout], 500), + + {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ClientFirstMessage = esasl_scram:client_first_message(Username), + + ConnectPacket = ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFirstMessage + } + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), + + %% Intentional sleep to trigger idle timeout for the connection not yet authenticated + ok = ct:sleep(1000), + + ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{'Authentication-Data' := ServerFirstMessage} + ) = receive_packet(), + + {continue, ClientFinalMessage, ClientCache} = + esasl_scram:check_server_first_message( + ServerFirstMessage, + #{ + client_first_message => ClientFirstMessage, + password => Password, + algorithm => ?ALGORITHM + } + ), + + AuthContinuePacket = ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFinalMessage + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket), + + ?CONNACK_PACKET( + ?RC_SUCCESS, + _, + #{'Authentication-Data' := ServerFinalMessage} + ) = receive_packet(), + + ok = esasl_scram:check_server_final_message( + ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM} + ). + +t_authenticate_bad_props(_Config) -> + Username = <<"u">>, + Password = <<"p">>, + + set_user_handler(Username, Password), + init_auth(), + + {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ConnectPacket = ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">> + } + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), + + ?CONNACK_PACKET(?RC_NOT_AUTHORIZED) = receive_packet(). + +t_authenticate_bad_username(_Config) -> + Username = <<"u">>, + Password = <<"p">>, + + set_user_handler(Username, Password), + init_auth(), + + {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ClientFirstMessage = esasl_scram:client_first_message(<<"badusername">>), + + ConnectPacket = ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFirstMessage + } + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), + + ?CONNACK_PACKET(?RC_NOT_AUTHORIZED) = receive_packet(). + +t_authenticate_bad_password(_Config) -> + Username = <<"u">>, + Password = <<"p">>, + + set_user_handler(Username, Password), + init_auth(), + + {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ClientFirstMessage = esasl_scram:client_first_message(Username), + + ConnectPacket = ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFirstMessage + } + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), + + ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{'Authentication-Data' := ServerFirstMessage} + ) = receive_packet(), + + {continue, ClientFinalMessage, _ClientCache} = + esasl_scram:check_server_first_message( + ServerFirstMessage, + #{ + client_first_message => ClientFirstMessage, + password => <<"badpassword">>, + algorithm => ?ALGORITHM + } + ), + + AuthContinuePacket = ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFinalMessage + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket), + + ?CONNACK_PACKET(?RC_NOT_AUTHORIZED) = receive_packet(). + +t_destroy(_Config) -> + Username = <<"u">>, + Password = <<"p">>, + + set_user_handler(Username, Password), + init_auth(), + + ok = emqx_config:put([mqtt, idle_timeout], 500), + + {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ConnectPacket = ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">> + } + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), + + ok = ct:sleep(1000), + + ?CONNACK_PACKET(?RC_NOT_AUTHORIZED) = receive_packet(), + + %% emqx_authn_mqtt_test_client:stop(Pid), + + emqx_authn_test_lib:delete_authenticators( + [authentication], + ?GLOBAL + ), + + {ok, Pid2} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ok = emqx_authn_mqtt_test_client:send(Pid2, ConnectPacket), + + ok = ct:sleep(1000), + + ?CONNACK_PACKET( + ?RC_SUCCESS, + _, + _ + ) = receive_packet(). + +t_is_superuser() -> + State = init_auth(), + ok = test_is_superuser(State, false), + ok = test_is_superuser(State, true), + ok = test_is_superuser(State, false). + +test_is_superuser(State, ExpectedIsSuperuser) -> + Username = <<"u">>, + Password = <<"p">>, + + set_user_handler(Username, Password, ExpectedIsSuperuser), + + ClientFirstMessage = esasl_scram:client_first_message(Username), + + {continue, ServerFirstMessage, ServerCache} = + emqx_authn_scram_http:authenticate( + #{ + auth_method => <<"SCRAM-SHA-512">>, + auth_data => ClientFirstMessage, + auth_cache => #{} + }, + State + ), + + {continue, ClientFinalMessage, ClientCache} = + esasl_scram:check_server_first_message( + ServerFirstMessage, + #{ + client_first_message => ClientFirstMessage, + password => Password, + algorithm => ?ALGORITHM + } + ), + + {ok, UserInfo1, ServerFinalMessage} = + emqx_authn_scram_http:authenticate( + #{ + auth_method => <<"SCRAM-SHA-512">>, + auth_data => ClientFinalMessage, + auth_cache => ServerCache + }, + State + ), + + ok = esasl_scram:check_server_final_message( + ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM} + ), + + ?assertMatch(#{is_superuser := ExpectedIsSuperuser}, UserInfo1). + +%%------------------------------------------------------------------------------ +%% Helpers +%%------------------------------------------------------------------------------ + +raw_config() -> + #{ + <<"mechanism">> => <<"scram">>, + <<"backend">> => <<"http">>, + <<"enable">> => <<"true">>, + <<"method">> => <<"get">>, + <<"url">> => <<"http://127.0.0.1:34333/user">>, + <<"body">> => #{<<"username">> => ?PH_USERNAME}, + <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>}, + <<"algorithm">> => ?ALGORITHM_STR, + <<"iteration_count">> => ?ITERATION_COUNT + }. + +set_user_handler(Username, Password) -> + set_user_handler(Username, Password, false). +set_user_handler(Username, Password, IsSuperuser) -> + %% HTTP Server + Handler = fun(Req0, State) -> + #{ + username := Username + } = cowboy_req:match_qs([username], Req0), + + UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT, IsSuperuser), + Req = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + emqx_utils_json:encode(UserInfo), + Req0 + ), + {ok, Req, State} + end, + ok = emqx_authn_scram_http_test_server:set_handler(Handler). + +init_auth() -> + init_auth(raw_config()). + +init_auth(Config) -> + {ok, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, Config} + ), + + {ok, [#{state := State}]} = emqx_authn_chains:list_authenticators(?GLOBAL), + State. + +make_user_info(Password, Algorithm, IterationCount, IsSuperuser) -> + {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info( + Password, + #{ + algorithm => Algorithm, + iteration_count => IterationCount + } + ), + #{ + stored_key => binary:encode_hex(StoredKey), + server_key => binary:encode_hex(ServerKey), + salt => binary:encode_hex(Salt), + is_superuser => IsSuperuser + }. + +receive_packet() -> + receive + {packet, Packet} -> + ct:pal("Delivered packet: ~p", [Packet]), + Packet + after 1000 -> + ct:fail("Deliver timeout") + end. diff --git a/apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl b/apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl new file mode 100644 index 000000000..5467df621 --- /dev/null +++ b/apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl @@ -0,0 +1,115 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_authn_scram_http_test_server). + +-behaviour(supervisor). +-behaviour(cowboy_handler). + +% cowboy_server callbacks +-export([init/2]). + +% supervisor callbacks +-export([init/1]). + +% API +-export([ + start_link/2, + start_link/3, + stop/0, + set_handler/1 +]). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link(Port, Path) -> + start_link(Port, Path, false). + +start_link(Port, Path, SSLOpts) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [Port, Path, SSLOpts]). + +stop() -> + gen_server:stop(?MODULE). + +set_handler(F) when is_function(F, 2) -> + true = ets:insert(?MODULE, {handler, F}), + ok. + +%%------------------------------------------------------------------------------ +%% supervisor API +%%------------------------------------------------------------------------------ + +init([Port, Path, SSLOpts]) -> + Dispatch = cowboy_router:compile( + [ + {'_', [{Path, ?MODULE, []}]} + ] + ), + + ProtoOpts = #{env => #{dispatch => Dispatch}}, + + Tab = ets:new(?MODULE, [set, named_table, public]), + ets:insert(Tab, {handler, fun default_handler/2}), + + {Transport, TransOpts, CowboyModule} = transport_settings(Port, SSLOpts), + + ChildSpec = ranch:child_spec(?MODULE, Transport, TransOpts, CowboyModule, ProtoOpts), + + {ok, {#{}, [ChildSpec]}}. + +%%------------------------------------------------------------------------------ +%% cowboy_server API +%%------------------------------------------------------------------------------ + +init(Req, State) -> + [{handler, Handler}] = ets:lookup(?MODULE, handler), + Handler(Req, State). + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +transport_settings(Port, false) -> + TransOpts = #{ + socket_opts => [{port, Port}], + connection_type => supervisor + }, + {ranch_tcp, TransOpts, cowboy_clear}; +transport_settings(Port, SSLOpts) -> + TransOpts = #{ + socket_opts => [ + {port, Port}, + {next_protocols_advertised, [<<"h2">>, <<"http/1.1">>]}, + {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} + | SSLOpts + ], + connection_type => supervisor + }, + {ranch_ssl, TransOpts, cowboy_tls}. + +default_handler(Req0, State) -> + Req = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"text/plain">>}, + <<"">>, + Req0 + ), + {ok, Req, State}. + +make_user_info(Password, Algorithm, IterationCount) -> + {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info( + Password, + #{ + algorithm => Algorithm, + iteration_count => IterationCount + } + ), + #{ + stored_key => StoredKey, + server_key => ServerKey, + salt => Salt, + is_superuser => false + }. diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl index ffa8175b7..22aed8e57 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl @@ -22,6 +22,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -32,6 +33,8 @@ -define(DEFAULT_POOL_SIZE, 8). +resource_type() -> jwks. + callback_mode() -> always_sync. on_start(InstId, Opts) -> diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl index fff7d056e..0d3d1a8c2 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl @@ -188,7 +188,8 @@ do_create( ResourceId, ?AUTHN_RESOURCE_GROUP, emqx_authn_jwks_connector, - connector_opts(Config) + connector_opts(Config), + #{} ), {ok, #{ jwk_resource => ResourceId, diff --git a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src index d84d6ff81..a58117356 100644 --- a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src +++ b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_ldap, [ {description, "EMQX LDAP Authentication and Authorization"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_auth_ldap_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl index 611469c5b..d59afea28 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl @@ -133,17 +133,15 @@ authenticate( }, State ) -> - case ensure_auth_method(AuthMethod, AuthData, State) of - true -> - case AuthCache of - #{next_step := client_final} -> - check_client_final_message(AuthData, AuthCache, State); - _ -> - check_client_first_message(AuthData, AuthCache, State) - end; - false -> - ignore - end; + RetrieveFun = fun(Username) -> + retrieve(Username, State) + end, + OnErrFun = fun(Msg, Reason) -> + ?TRACE_AUTHN_PROVIDER(Msg, #{ + reason => Reason + }) + end, + emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State); authenticate(_Credential, _State) -> ignore. @@ -257,55 +255,6 @@ run_fuzzy_filter( %% Internal functions %%------------------------------------------------------------------------------ -ensure_auth_method(_AuthMethod, undefined, _State) -> - false; -ensure_auth_method(<<"SCRAM-SHA-256">>, _AuthData, #{algorithm := sha256}) -> - true; -ensure_auth_method(<<"SCRAM-SHA-512">>, _AuthData, #{algorithm := sha512}) -> - true; -ensure_auth_method(_AuthMethod, _AuthData, _State) -> - false. - -check_client_first_message(Bin, _Cache, #{iteration_count := IterationCount} = State) -> - RetrieveFun = fun(Username) -> - retrieve(Username, State) - end, - case - esasl_scram:check_client_first_message( - Bin, - #{ - iteration_count => IterationCount, - retrieve => RetrieveFun - } - ) - of - {continue, ServerFirstMessage, Cache} -> - {continue, ServerFirstMessage, Cache}; - ignore -> - ignore; - {error, Reason} -> - ?TRACE_AUTHN_PROVIDER("check_client_first_message_error", #{ - reason => Reason - }), - {error, not_authorized} - end. - -check_client_final_message(Bin, #{is_superuser := IsSuperuser} = Cache, #{algorithm := Alg}) -> - case - esasl_scram:check_client_final_message( - Bin, - Cache#{algorithm => Alg} - ) - of - {ok, ServerFinalMessage} -> - {ok, #{is_superuser => IsSuperuser}, ServerFinalMessage}; - {error, Reason} -> - ?TRACE_AUTHN_PROVIDER("check_client_final_message_error", #{ - reason => Reason - }), - {error, not_authorized} - end. - user_info_record( #{ user_id := UserID, diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia_schema.erl b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia_schema.erl index 5d442cd57..dbad2118f 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia_schema.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia_schema.erl @@ -29,6 +29,8 @@ select_union_member/1 ]). +-export([algorithm/1, iteration_count/1]). + namespace() -> "authn". refs() -> @@ -38,11 +40,6 @@ select_union_member(#{ <<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN }) -> refs(); -select_union_member(#{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN}) -> - throw(#{ - reason => "unknown_backend", - expected => ?AUTHN_BACKEND - }); select_union_member(_) -> undefined. diff --git a/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl b/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl index 6a58c6c16..731c64af6 100644 --- a/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl +++ b/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl @@ -101,19 +101,9 @@ authorize( do_authorize(_Client, _Action, _Topic, _ColumnNames, []) -> nomatch; do_authorize(Client, Action, Topic, ColumnNames, [Row | Tail]) -> - try - emqx_authz_rule:match( - Client, Action, Topic, emqx_authz_utils:parse_rule_from_row(ColumnNames, Row) - ) - of - {matched, Permission} -> {matched, Permission}; - nomatch -> do_authorize(Client, Action, Topic, ColumnNames, Tail) - catch - error:Reason -> - ?SLOG(error, #{ - msg => "match_rule_error", - reason => Reason, - rule => Row - }), - do_authorize(Client, Action, Topic, ColumnNames, Tail) + case emqx_authz_utils:do_authorize(mysql, Client, Action, Topic, ColumnNames, Row) of + nomatch -> + do_authorize(Client, Action, Topic, ColumnNames, Tail); + {matched, Permission} -> + {matched, Permission} end. diff --git a/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl b/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl index b09f6e009..31bd968d0 100644 --- a/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl +++ b/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl @@ -107,22 +107,11 @@ authorize( do_authorize(_Client, _Action, _Topic, _ColumnNames, []) -> nomatch; do_authorize(Client, Action, Topic, ColumnNames, [Row | Tail]) -> - try - emqx_authz_rule:match( - Client, Action, Topic, emqx_authz_utils:parse_rule_from_row(ColumnNames, Row) - ) - of - {matched, Permission} -> {matched, Permission}; - nomatch -> do_authorize(Client, Action, Topic, ColumnNames, Tail) - catch - error:Reason:Stack -> - ?SLOG(error, #{ - msg => "match_rule_error", - reason => Reason, - rule => Row, - stack => Stack - }), - do_authorize(Client, Action, Topic, ColumnNames, Tail) + case emqx_authz_utils:do_authorize(postgresql, Client, Action, Topic, ColumnNames, Row) of + nomatch -> + do_authorize(Client, Action, Topic, ColumnNames, Tail); + {matched, Permission} -> + {matched, Permission} end. column_names(Columns) -> diff --git a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl index 50bff634d..1dfd30899 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl @@ -198,9 +198,9 @@ test_user_auth(#{ t_authenticate_disabled_prepared_statements(_Config) -> ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}), - {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig), + {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig, #{}), on_exit(fun() -> - emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config()) + emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config(), #{}) end), ok = lists:foreach( fun(Sample0) -> diff --git a/apps/emqx_auth_redis/src/emqx_authz_redis.erl b/apps/emqx_auth_redis/src/emqx_authz_redis.erl index e1c675a61..669eb081c 100644 --- a/apps/emqx_auth_redis/src/emqx_authz_redis.erl +++ b/apps/emqx_auth_redis/src/emqx_authz_redis.erl @@ -92,44 +92,30 @@ authorize( do_authorize(_Client, _Action, _Topic, []) -> nomatch; do_authorize(Client, Action, Topic, [TopicFilterRaw, RuleEncoded | Tail]) -> - try - emqx_authz_rule:match( - Client, - Action, - Topic, - compile_rule(RuleEncoded, TopicFilterRaw) - ) - of - {matched, Permission} -> {matched, Permission}; - nomatch -> do_authorize(Client, Action, Topic, Tail) - catch - error:Reason:Stack -> - ?SLOG(error, #{ - msg => "match_rule_error", - reason => Reason, - rule_encoded => RuleEncoded, - topic_filter_raw => TopicFilterRaw, - stacktrace => Stack + case parse_rule(RuleEncoded) of + {ok, RuleMap0} -> + RuleMap = + maps:merge( + #{ + <<"permission">> => <<"allow">>, + <<"topic">> => TopicFilterRaw + }, + RuleMap0 + ), + case emqx_authz_utils:do_authorize(redis, Client, Action, Topic, undefined, RuleMap) of + nomatch -> + do_authorize(Client, Action, Topic, Tail); + {matched, Permission} -> + {matched, Permission} + end; + {error, Reason} -> + ?SLOG(error, Reason#{ + msg => "parse_rule_error", + rule => RuleEncoded }), do_authorize(Client, Action, Topic, Tail) end. -compile_rule(RuleBin, TopicFilterRaw) -> - RuleRaw = - maps:merge( - #{ - <<"permission">> => <<"allow">>, - <<"topic">> => TopicFilterRaw - }, - parse_rule(RuleBin) - ), - case emqx_authz_rule_raw:parse_rule(RuleRaw) of - {ok, {Permission, Action, Topics}} -> - emqx_authz_rule:compile({Permission, all, Action, Topics}); - {error, Reason} -> - error(Reason) - end. - parse_cmd(Query) -> case emqx_redis_command:split(Query) of {ok, Cmd} -> @@ -154,17 +140,17 @@ validate_cmd(Cmd) -> end. parse_rule(<<"publish">>) -> - #{<<"action">> => <<"publish">>}; + {ok, #{<<"action">> => <<"publish">>}}; parse_rule(<<"subscribe">>) -> - #{<<"action">> => <<"subscribe">>}; + {ok, #{<<"action">> => <<"subscribe">>}}; parse_rule(<<"all">>) -> - #{<<"action">> => <<"all">>}; + {ok, #{<<"action">> => <<"all">>}}; parse_rule(Bin) when is_binary(Bin) -> case emqx_utils_json:safe_decode(Bin, [return_maps]) of {ok, Map} when is_map(Map) -> - maps:with([<<"qos">>, <<"action">>, <<"retain">>], Map); + {ok, maps:with([<<"qos">>, <<"action">>, <<"retain">>], Map)}; {ok, _} -> - error({invalid_topic_rule, Bin, notamap}); - {error, Error} -> - error({invalid_topic_rule, Bin, Error}) + {error, #{reason => invalid_topic_rule_not_map, value => Bin}}; + {error, _Error} -> + {error, #{reason => invalid_topic_rule_not_json, value => Bin}} end. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 9215e0787..9ac9a27a8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -198,7 +198,7 @@ create(Type, Name, Conf0, Opts) -> Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, {ok, _Data} = emqx_resource:create_local( resource_id(Type, Name), - <<"emqx_bridge">>, + <<"bridge">>, bridge_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) @@ -284,7 +284,7 @@ create_dry_run(Type0, Conf0) -> create_dry_run_bridge_v1(Type, Conf0) -> TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpPath = emqx_utils:safe_filename(TmpName), - %% Already typechecked, no need to catch errors + %% Already type checked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), Conf1 = maps:without([<<"name">>], Conf0), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index dc2e8f275..d81d710ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -1110,6 +1110,7 @@ t_query_uses_action_query_mode(_Config) -> %% ... now we use a quite different query mode for the action meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), resource_type, 0, dummy), meck:expect(con_mod(), callback_mode, 0, async_if_possible), {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 27db7486f..ae1eb58fc 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -302,6 +302,7 @@ init_mocks() -> meck:new(emqx_connector_resource, [passthrough, no_link]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), + meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( ?CONNECTOR_IMPL, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl index 6b4001b6b..1bb7fd37f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl @@ -15,15 +15,17 @@ %% this module is only intended to be mocked -module(emqx_bridge_v2_dummy_connector). +-behavior(emqx_resource). -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, on_add_channel/4, on_get_channel_status/3 ]). - +resource_type() -> dummy. callback_mode() -> error(unexpected). on_start(_, _) -> error(unexpected). on_stop(_, _) -> error(unexpected). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index c528d097c..7daedf19a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -19,6 +19,7 @@ -export([ query_mode/1, + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -34,6 +35,8 @@ query_mode(_Config) -> sync. +resource_type() -> test_connector. + callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl index cdf16a8cb..740493f49 100644 --- a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl +++ b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl @@ -18,6 +18,7 @@ %% `emqx_resource' API -export([ callback_mode/0, + resource_type/0, on_start/2, on_stop/2, @@ -148,6 +149,10 @@ callback_mode() -> always_sync. +-spec resource_type() -> atom(). +resource_type() -> + azure_blob_storage. + -spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()} | {error, _Reason}. on_start(_ConnResId, ConnConfig) -> diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 946ca591a..3e7422112 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.3.1"}, + {vsn, "0.3.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index df278b791..9f830eb69 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -19,6 +19,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -94,6 +95,7 @@ desc("connector") -> %%-------------------------------------------------------------------- %% callbacks for emqx_resource +resource_type() -> cassandra. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index f38036b83..794d067bd 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.4.1"}, + {vsn, "0.4.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index f6888cad5..c5b82122a 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -29,6 +29,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -128,6 +129,7 @@ values(_) -> %% =================================================================== %% Callbacks defined in emqx_resource %% =================================================================== +resource_type() -> clickhouse. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl index 1e7122800..2c104ee16 100644 --- a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl @@ -15,6 +15,7 @@ %% `emqx_resource' API -export([ callback_mode/0, + resource_type/0, on_start/2, on_stop/2, @@ -84,6 +85,10 @@ callback_mode() -> always_sync. +-spec resource_type() -> atom(). +resource_type() -> + couchbase. + -spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()} | {error, _Reason}. on_start(ConnResId, ConnConfig) -> diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index ac71e04e7..b8fee4dee 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 82f5fb18d..181de34a8 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -17,6 +17,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -68,6 +69,7 @@ fields(config) -> %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> dynamo. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src index 262ac84bd..8f3dc3a7e 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_es, [ {description, "EMQX Enterprise Elastic Search Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {modules, [ emqx_bridge_es, emqx_bridge_es_connector diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 20de92e6e..feccd42f1 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -14,6 +14,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -207,6 +208,8 @@ base_url(#{server := Server}) -> "http://" ++ Server. %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> elastic_search. + callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 5c51cd2d9..344fc05c6 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -8,6 +8,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -84,6 +85,8 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +-spec resource_type() -> resource_type(). +resource_type() -> gcp_pubsub_consumer. -spec callback_mode() -> callback_mode(). callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 0c668cb95..a6a65ab97 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -41,6 +41,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -62,6 +63,7 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +resource_type() -> gcp_pubsub. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index e4cc0aa31..a2ffd6219 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -67,6 +68,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> greptimedb. + callback_mode() -> async_if_possible. on_add_channel( diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index af232accc..7ae86bba0 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index cf53291b2..2d061e455 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -44,6 +45,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> hstreamdb. + callback_mode() -> always_sync. on_start(InstId, Config) -> diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 2b125c1db..0d848a064 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -183,6 +184,7 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). %% =================================================================== +resource_type() -> webhook. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index a8314541a..eae5028c6 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 852f78485..bf93309f8 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -70,6 +71,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> influxdb. + callback_mode() -> async_if_possible. on_add_channel( diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 691778cfd..88ac09da9 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_connector diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 78866ef79..ec880e785 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -15,6 +15,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -206,6 +207,8 @@ proplists_without(Keys, List) -> %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> iotdb. + callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 2f0aff68d..8c7c5ffa1 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -7,6 +7,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -126,6 +127,7 @@ %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> kafka_consumer. callback_mode() -> async_if_possible. @@ -631,16 +633,6 @@ consumer_group_id(_ConsumerParams, BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. --spec is_dry_run(connector_resource_id()) -> boolean(). -is_dry_run(ConnectorResId) -> - TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX), - case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, ConnectorResId) - end. - -spec check_client_connectivity(pid()) -> ?status_connected | ?status_disconnected @@ -676,7 +668,7 @@ maybe_clean_error(Reason) -> -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). make_client_id(ConnectorResId, BridgeType, BridgeName) -> - case is_dry_run(ConnectorResId) of + case emqx_resource:is_dry_run(ConnectorResId) of false -> ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), binary_to_atom(ClientID0); diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8395251e8..42941868d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -10,6 +10,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, query_mode/1, callback_mode/0, on_start/2, @@ -35,6 +36,8 @@ -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). +resource_type() -> kafka_producer. + query_mode(#{parameters := #{query_mode := sync}}) -> simple_sync_internal_buffer; query_mode(_) -> @@ -140,14 +143,7 @@ create_producers_for_bridge_v2( KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), - TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), - IsDryRun = - case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, InstId) - end, + IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src index f411b95fb..a85121905 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kinesis, [ {description, "EMQX Enterprise Amazon Kinesis Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 95d193d92..3143cf904 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -30,6 +30,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -50,6 +51,7 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +resource_type() -> kinesis_producer. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index df9935dbb..5bb7e396d 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mongodb, [ {description, "EMQX Enterprise MongoDB Bridge"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index 6b6db358a..dac9bef57 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -11,6 +11,7 @@ %% `emqx_resource' API -export([ on_remove_channel/3, + resource_type/0, callback_mode/0, on_add_channel/4, on_get_channel_status/3, @@ -25,6 +26,7 @@ %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> emqx_mongodb:resource_type(). callback_mode() -> emqx_mongodb:callback_mode(). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 26b8967f0..d43ec5591 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index d507d11b8..118542356 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -30,6 +30,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -76,6 +77,8 @@ on_message_received(Msg, HookPoints, ResId) -> ok. %% =================================================================== +resource_type() -> mqtt. + callback_mode() -> async_if_possible. on_start(ResourceId, #{server := Server} = Conf) -> @@ -207,7 +210,7 @@ start_mqtt_clients(ResourceId, Conf) -> start_mqtt_clients(ResourceId, Conf, ClientOpts). start_mqtt_clients(ResourceId, StartConf, ClientOpts) -> - PoolName = <>, + PoolName = ResourceId, #{ pool_size := PoolSize } = StartConf, @@ -227,7 +230,7 @@ start_mqtt_clients(ResourceId, StartConf, ClientOpts) -> on_stop(ResourceId, State) -> ?SLOG(info, #{ msg => "stopping_mqtt_connector", - connector => ResourceId + resource_id => ResourceId }), %% on_stop can be called with State = undefined StateMap = @@ -271,7 +274,7 @@ on_query( on_query(ResourceId, {_ChannelId, Msg}, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", - connector => ResourceId, + resource_id => ResourceId, message => Msg, reason => "Egress is not configured" }). @@ -298,7 +301,7 @@ on_query_async( on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", - connector => ResourceId, + resource_id => ResourceId, message => Msg, reason => "Egress is not configured" }). @@ -463,8 +466,10 @@ connect(Options) -> {ok, Pid} -> connect(Pid, Name); {error, Reason} = Error -> - ?SLOG(error, #{ + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG(?LOG_LEVEL(IsDryRun), #{ msg => "client_start_failed", + resource_id => Name, config => emqx_utils:redact(ClientOpts), reason => Reason }), @@ -508,10 +513,11 @@ connect(Pid, Name) -> {ok, _Props} -> {ok, Pid}; {error, Reason} = Error -> - ?SLOG(warning, #{ + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG(?LOG_LEVEL(IsDryRun), #{ msg => "ingress_client_connect_failed", reason => Reason, - name => Name + resource_id => Name }), _ = catch emqtt:stop(Pid), Error diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src index 63bc61e62..fb670e072 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mysql, [ {description, "EMQX Enterprise MySQL Bridge"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl index da9377814..6905c86eb 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl @@ -10,6 +10,7 @@ %% `emqx_resource' API -export([ on_remove_channel/3, + resource_type/0, callback_mode/0, on_add_channel/4, on_batch_query/3, @@ -24,6 +25,7 @@ %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> emqx_mysql:resource_type(). callback_mode() -> emqx_mysql:callback_mode(). diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index 65cc97e4c..a27791853 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_opents, [ {description, "EMQX Enterprise OpenTSDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 19e117a0d..a970bb374 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -18,6 +18,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -114,6 +115,8 @@ connector_example_values() -> -define(HTTP_CONNECT_TIMEOUT, 1000). +resource_type() -> opents. + callback_mode() -> always_sync. on_start( diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index a8eeba483..dcb86a3ca 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 9d269493d..64dde77fb 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -10,6 +10,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -55,6 +56,7 @@ %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> pulsar. callback_mode() -> async_if_possible. @@ -255,7 +257,7 @@ format_servers(Servers0) -> -spec make_client_id(resource_id()) -> pulsar_client_id(). make_client_id(InstanceId) -> - case is_dry_run(InstanceId) of + case emqx_resource:is_dry_run(InstanceId) of true -> pulsar_producer_probe; false -> @@ -269,14 +271,6 @@ make_client_id(InstanceId) -> binary_to_atom(ClientIdBin) end. --spec is_dry_run(resource_id()) -> boolean(). -is_dry_run(InstanceId) -> - TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), - case TestIdStart of - nomatch -> false; - _ -> string:equal(TestIdStart, InstanceId) - end. - conn_opts(#{authentication := none}) -> #{}; conn_opts(#{authentication := #{username := Username, password := Password}}) -> @@ -297,7 +291,7 @@ replayq_dir(ClientId) -> filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). producer_name(InstanceId, ChannelId) -> - case is_dry_run(InstanceId) of + case emqx_resource:is_dry_run(InstanceId) of %% do not create more atom true -> pulsar_producer_probe_worker; diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index ab7c0e331..3e75138d5 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -31,6 +31,7 @@ on_remove_channel/3, on_get_channels/1, on_stop/2, + resource_type/0, callback_mode/0, on_get_status/2, on_get_channel_status/3, @@ -60,6 +61,7 @@ fields(config) -> %% =================================================================== %% emqx_resource callback +resource_type() -> rabbitmq. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 2cd037ed5..61cd837bb 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index f117c4e7a..162c38368 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -12,6 +12,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_add_channel/4, on_remove_channel/3, @@ -29,7 +30,9 @@ %% resource callbacks %% ------------------------------------------------------------------------------------------------- -callback_mode() -> always_sync. +resource_type() -> emqx_redis:resource_type(). + +callback_mode() -> emqx_redis:callback_mode(). on_add_channel( _InstanceId, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index fc59aeeca..9657ac115 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, [ diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 4fe3ea4c4..b03602bd2 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -16,6 +16,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -90,6 +91,8 @@ servers() -> %% `emqx_resource' API %%======================================================================================== +resource_type() -> rocketmq. + callback_mode() -> always_sync. on_start( diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index e90016927..f6260ea1d 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -13,6 +13,7 @@ -behaviour(emqx_resource). -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -92,6 +93,8 @@ -define(AGGREG_SUP, emqx_bridge_s3_sup). %% +-spec resource_type() -> resource_type(). +resource_type() -> s3. -spec callback_mode() -> callback_mode(). callback_mode() -> diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 521a215a5..4b1033ca9 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -30,6 +30,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -173,6 +174,7 @@ server() -> %%==================================================================== %% Callbacks defined in emqx_resource %%==================================================================== +resource_type() -> sqlserver. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src index 5ae95ca67..cd1d51b01 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_syskeeper, [ {description, "EMQX Enterprise Data bridge for Syskeeper"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 898915f56..c277faa4f 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -18,6 +18,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -147,6 +148,7 @@ server() -> %% ------------------------------------------------------------------------------------------------- %% `emqx_resource' API +resource_type() -> syskeeper. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl index e26ae43c1..d0aa44cd3 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl @@ -12,6 +12,7 @@ %% `emqx_resource' API -export([ + resource_type/0, query_mode/1, on_start/2, on_stop/2, @@ -40,6 +41,8 @@ %% ------------------------------------------------------------------------------------------------- %% emqx_resource +resource_type() -> + syskeeper_proxy_server. query_mode(_) -> no_queries. diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index d358ba8fa..5fe325b38 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 324694edc..46980e768 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -19,6 +19,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -140,6 +141,7 @@ connector_example_values() -> %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> tdengine. callback_mode() -> always_sync. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 5185803b6..3a6411cbe 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -19,6 +19,7 @@ %% callbacks of behaviour emqx_resource -export([ callback_mode/0, + resource_type/0, on_start/2, on_stop/2, on_query/3, @@ -99,6 +100,10 @@ remove_msg_fwd_resource(ClusterName) -> callback_mode() -> async_if_possible. +-spec resource_type() -> atom(). +resource_type() -> + cluster_link_mqtt. + on_start(ResourceId, #{pool_size := PoolSize} = ClusterConf) -> PoolName = ResourceId, Options = [ diff --git a/apps/emqx_conf/src/emqx_conf_schema_inject.erl b/apps/emqx_conf/src/emqx_conf_schema_inject.erl index 0e0f36401..fb4fee4c7 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_inject.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_inject.erl @@ -55,7 +55,10 @@ authn_mods(ce) -> ]; authn_mods(ee) -> authn_mods(ce) ++ - [emqx_gcp_device_authn_schema]. + [ + emqx_gcp_device_authn_schema, + emqx_authn_scram_http_schema + ]. authz() -> [{emqx_authz_schema, authz_mods()}]. diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl index 4b29dd5ce..0004cd72c 100644 --- a/apps/emqx_connector/include/emqx_connector.hrl +++ b/apps/emqx_connector/include/emqx_connector.hrl @@ -37,4 +37,4 @@ "The " ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if `[:Port]` is not specified." ). --define(CONNECTOR_RESOURCE_GROUP, <<"emqx_connector">>). +-define(CONNECTOR_RESOURCE_GROUP, <<"connector">>). diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index be8d3a32d..50b2132e2 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -18,6 +18,7 @@ -include("../../emqx_bridge/include/emqx_bridge_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include("emqx_connector.hrl"). -export([ connector_to_resource_type/1, @@ -126,7 +127,7 @@ create(Type, Name, Conf0, Opts) -> Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, {ok, _Data} = emqx_resource:create_local( ResourceId, - <<"emqx_connector">>, + ?CONNECTOR_RESOURCE_GROUP, ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) @@ -208,7 +209,7 @@ create_dry_run(Type, Conf) -> create_dry_run(Type, Conf, fun(_) -> ok end). create_dry_run(Type, Conf0, Callback) -> - %% Already typechecked, no need to catch errors + %% Already type checked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), %% We use a fixed name here to avoid creating an atom diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index fbdece6ff..8e5a6d288 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -50,6 +50,7 @@ t_connector_lifecycle({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -171,6 +172,7 @@ t_remove_fail({'init', Config}) -> meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{enable => true}}]), meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), @@ -234,6 +236,7 @@ t_create_with_bad_name_direct_path({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -265,6 +268,7 @@ t_create_with_bad_name_root_path({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -299,6 +303,7 @@ t_no_buffer_workers({'init', Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_get_channels, 1, []), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 31069b075..6d119473a 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -225,6 +225,7 @@ init_mocks(_TestCase) -> meck:new(emqx_connector_resource, [passthrough, no_link]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), + meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( ?CONNECTOR_IMPL, diff --git a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl index c5d9e4f83..d506c9633 100644 --- a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl +++ b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl @@ -15,8 +15,10 @@ %% this module is only intended to be mocked -module(emqx_connector_dummy_impl). +-behavior(emqx_resource). -export([ + resource_type/0, query_mode/1, callback_mode/0, on_start/2, @@ -25,6 +27,7 @@ on_get_channel_status/3 ]). +resource_type() -> dummy. query_mode(_) -> error(unexpected). callback_mode() -> error(unexpected). on_start(_, _) -> error(unexpected). diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl index 6834da9e9..60fec5171 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl @@ -45,7 +45,7 @@ -define(MOD_TAB, emqx_dashboard_sso). -define(MOD_KEY_PATH, [dashboard, sso]). -define(MOD_KEY_PATH(Sub), [dashboard, sso, Sub]). --define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). +-define(RESOURCE_GROUP, <<"dashboard_sso">>). -define(NO_ERROR, <<>>). -define(DEFAULT_RESOURCE_OPTS, #{ start_after_created => false diff --git a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl index 51524f0fd..40c5de9e5 100644 --- a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl +++ b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl @@ -24,7 +24,7 @@ -define(MOD_TAB, emqx_dashboard_sso). -define(MOD_KEY_PATH, [dashboard, sso, ldap]). --define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). +-define(RESOURCE_GROUP, <<"dashboard_sso">>). -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1, request_api/3]). diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl index 0ee16824d..0707c12aa 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl @@ -381,6 +381,9 @@ params_fuzzy_in_qs() -> schema_authn() -> emqx_dashboard_swagger:schema_with_examples( - emqx_authn_schema:authenticator_type_without([emqx_authn_scram_mnesia_schema]), + emqx_authn_schema:authenticator_type_without([ + emqx_authn_scram_mnesia_schema, + emqx_authn_scram_http_schema + ]), emqx_authn_api:authenticator_examples() ). diff --git a/apps/emqx_ldap/src/emqx_ldap.app.src b/apps/emqx_ldap/src/emqx_ldap.app.src index b0d8ec59c..1b1b667cc 100644 --- a/apps/emqx_ldap/src/emqx_ldap.app.src +++ b/apps/emqx_ldap/src/emqx_ldap.app.src @@ -1,6 +1,6 @@ {application, emqx_ldap, [ {description, "EMQX LDAP Connector"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_ldap/src/emqx_ldap.erl b/apps/emqx_ldap/src/emqx_ldap.erl index d04be5d68..67b250420 100644 --- a/apps/emqx_ldap/src/emqx_ldap.erl +++ b/apps/emqx_ldap/src/emqx_ldap.erl @@ -27,6 +27,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -129,6 +130,8 @@ ensure_username(Field) -> emqx_connector_schema_lib:username(Field). %% =================================================================== +resource_type() -> ldap. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_mongodb/src/emqx_mongodb.app.src b/apps/emqx_mongodb/src/emqx_mongodb.app.src index 92d7026cc..51230fd47 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.app.src +++ b/apps/emqx_mongodb/src/emqx_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_mongodb, [ {description, "EMQX MongoDB Connector"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mongodb/src/emqx_mongodb.erl b/apps/emqx_mongodb/src/emqx_mongodb.erl index e262f5ccd..8d6fad89f 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.erl +++ b/apps/emqx_mongodb/src/emqx_mongodb.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -172,6 +173,7 @@ desc(_) -> undefined. %% =================================================================== +resource_type() -> mongodb. callback_mode() -> always_sync. diff --git a/apps/emqx_mysql/src/emqx_mysql.app.src b/apps/emqx_mysql/src/emqx_mysql.app.src index 9637cc473..c7fcb0975 100644 --- a/apps/emqx_mysql/src/emqx_mysql.app.src +++ b/apps/emqx_mysql/src/emqx_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_mysql, [ {description, "EMQX MySQL Database Connector"}, - {vsn, "0.1.9"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index 6311d66f2..197e33d75 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -91,6 +92,8 @@ server() -> emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). %% =================================================================== +resource_type() -> mysql. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 3f238ae9c..80ff8da09 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 5b25e049a..6e2a40f95 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -21,6 +21,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -67,6 +68,8 @@ batch_params_tokens := params_tokens() }. +resource_type() -> oracle. + % As ecpool is not monitoring the worker's PID when doing a handover_async, the % request can be lost if worker crashes. Thus, it's better to force requests to % be sync for now. diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 7aaf42e71..e1bd67325 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 7e64a3e83..a061fc15e 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -29,6 +29,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -120,6 +121,8 @@ adjust_fields(Fields) -> ). %% =================================================================== +resource_type() -> pgsql. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 059e9aa23..9507913ed 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -28,6 +28,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -119,6 +120,8 @@ redis_type(Type) -> desc => ?DESC(Type) }}. +resource_type() -> redis. + callback_mode() -> always_sync. on_start(InstId, Config0) -> diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 587786cb2..8c2bb39a1 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -23,7 +23,8 @@ %% remind us of that. -define(rm_status_stopped, stopped). --type resource_type() :: module(). +-type resource_type() :: atom(). +-type resource_module() :: module(). -type resource_id() :: binary(). -type channel_id() :: binary(). -type raw_resource_config() :: binary() | raw_term_resource_config(). @@ -158,5 +159,12 @@ %% See `hocon_tconf` -define(TEST_ID_PREFIX, "t_probe_"). -define(RES_METRICS, resource_metrics). +-define(LOG_LEVEL(_L_), + case _L_ of + true -> info; + false -> warning + end +). +-define(TAG, "RESOURCE"). -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 15c0a0293..bf47c83ab 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -39,12 +39,10 @@ -export([ %% store the config and start the instance - create_local/4, create_local/5, create_dry_run_local/2, create_dry_run_local/3, create_dry_run_local/4, - recreate_local/3, recreate_local/4, %% remove the config and stop the instance remove_local/1, @@ -98,6 +96,7 @@ -export([ %% get the callback mode of a specific module get_callback_mode/1, + get_resource_type/1, %% start the instance call_start/3, %% verify if the resource is working normally @@ -140,6 +139,8 @@ validate_name/1 ]). +-export([is_dry_run/1]). + -export_type([ query_mode/0, resource_id/0, @@ -243,6 +244,9 @@ QueryResult :: term() ) -> term(). +%% Used for tagging log entries. +-callback resource_type() -> atom(). + -define(SAFE_CALL(EXPR), (fun() -> try @@ -279,16 +283,10 @@ is_resource_mod(Module) -> %% ================================================================================= %% APIs for resource instances %% ================================================================================= - --spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) -> - {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create_local(ResId, Group, ResourceType, Config) -> - create_local(ResId, Group, ResourceType, Config, #{}). - -spec create_local( resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> @@ -296,7 +294,7 @@ create_local(ResId, Group, ResourceType, Config) -> create_local(ResId, Group, ResourceType, Config, Opts) -> emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts). --spec create_dry_run_local(resource_type(), resource_config()) -> +-spec create_dry_run_local(resource_module(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> emqx_resource_manager:create_dry_run(ResourceType, Config). @@ -304,19 +302,21 @@ create_dry_run_local(ResourceType, Config) -> create_dry_run_local(ResId, ResourceType, Config) -> emqx_resource_manager:create_dry_run(ResId, ResourceType, Config). --spec create_dry_run_local(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> +-spec create_dry_run_local( + resource_id(), + resource_module(), + resource_config(), + OnReadyCallback +) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) -> emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback). --spec recreate_local(resource_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(ResId, ResourceType, Config) -> - recreate_local(ResId, ResourceType, Config, #{}). - --spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate_local( + resource_id(), resource_module(), resource_config(), creation_opts() +) -> {ok, resource_data()} | {error, Reason :: term()}. recreate_local(ResId, ResourceType, Config, Opts) -> emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). @@ -330,11 +330,15 @@ remove_local(ResId) -> ok; Error -> %% Only log, the ResId worker is always removed in manager's remove action. - ?SLOG(warning, #{ - msg => "remove_local_resource_failed", - error => Error, - resource_id => ResId - }), + ?SLOG( + warning, + #{ + msg => "remove_resource_failed", + error => Error, + resource_id => ResId + }, + #{tag => ?TAG} + ), ok end. @@ -487,6 +491,10 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). +-spec get_resource_type(module()) -> resource_type(). +get_resource_type(Mod) -> + Mod:resource_type(). + -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> @@ -599,7 +607,7 @@ query_mode(Mod, Config, Opts) -> maps:get(query_mode, Opts, sync) end. --spec check_config(resource_type(), raw_resource_config()) -> +-spec check_config(resource_module(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. check_config(ResourceType, Conf) -> emqx_hocon:check(ResourceType, Conf). @@ -607,7 +615,7 @@ check_config(ResourceType, Conf) -> -spec check_and_create_local( resource_id(), resource_group(), - resource_type(), + resource_module(), raw_resource_config() ) -> {ok, resource_data()} | {error, term()}. @@ -617,7 +625,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig) -> -spec check_and_create_local( resource_id(), resource_group(), - resource_type(), + resource_module(), raw_resource_config(), creation_opts() ) -> {ok, resource_data()} | {error, term()}. @@ -630,7 +638,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> -spec check_and_recreate_local( resource_id(), - resource_type(), + resource_module(), raw_resource_config(), creation_opts() ) -> @@ -769,6 +777,13 @@ validate_name(Name) -> _ = validate_name(Name, #{atom_name => false}), ok. +-spec is_dry_run(resource_id()) -> boolean(). +is_dry_run(ResId) -> + case string:find(ResId, ?TEST_ID_PREFIX) of + nomatch -> false; + TestIdStart -> string:equal(TestIdStart, ResId) + end. + validate_name(<<>>, _Opts) -> invalid_data("Name cannot be empty string"); validate_name(Name, _Opts) when size(Name) >= 255 -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 1a3781a65..575116f77 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -75,6 +75,7 @@ -record(data, { id, group, + type, mod, callback_mode, query_mode, @@ -166,7 +167,7 @@ where(ResId) -> -spec ensure_resource( resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> {ok, resource_data()}. @@ -179,7 +180,9 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) -> end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate( + resource_id(), resource_module(), resource_config(), creation_opts() +) -> {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of @@ -222,8 +225,8 @@ create(ResId, Group, ResourceType, Config, Opts) -> %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% %% Triggers the `emqx_resource_manager_sup` supervisor to actually create -%% and link the process itself if not already started, and then immedately stops. --spec create_dry_run(resource_type(), resource_config()) -> +%% and link the process itself if not already started, and then immediately stops. +-spec create_dry_run(resource_module(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> ResId = make_test_id(), @@ -235,7 +238,9 @@ create_dry_run(ResId, ResourceType, Config) -> do_nothing_on_ready(_ResId) -> ok. --spec create_dry_run(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> +-spec create_dry_run( + resource_id(), resource_module(), resource_config(), OnReadyCallback +) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). @@ -245,7 +250,9 @@ create_dry_run(ResId, ResourceType, Config, OnReadyCallback) -> true -> maps:get(resource_opts, Config, #{}); false -> #{} end, - ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child( + ResId, <<"dry_run">>, ResourceType, Config, Opts + ), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000), case wait_for_ready(ResId, Timeout) of @@ -526,6 +533,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) -> ), Data = #data{ id = ResId, + type = emqx_resource:get_resource_type(ResourceType), group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), @@ -710,11 +718,13 @@ handle_event(EventType, EventData, State, Data) -> error, #{ msg => "ignore_all_other_events", + resource_id => Data#data.id, event_type => EventType, event_data => EventData, state => State, data => emqx_utils:redact(Data) - } + }, + #{tag => tag(Data#data.group, Data#data.type)} ), keep_state_and_data. @@ -779,7 +789,8 @@ handle_remove_event(From, ClearMetrics, Data) -> start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of + #data{id = ResId, mod = Mod, config = Config, group = Group, type = Type} = Data, + case emqx_resource:call_start(ResId, Mod, Config) of {ok, ResourceState} -> UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state @@ -787,12 +798,17 @@ start_resource(Data, From) -> Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions}; {error, Reason} = Err -> - ?SLOG(warning, #{ - msg => "start_resource_failed", - id => Data#data.id, - reason => Reason - }), - _ = maybe_alarm(?status_disconnected, Data#data.id, Err, Data#data.error), + IsDryRun = emqx_resource:is_dry_run(ResId), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "start_resource_failed", + resource_id => ResId, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), + _ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error), %% Add channels and raise alarms NewData1 = channels_health_check(?status_disconnected, add_channels(Data)), %% Keep track of the error reason why the connection did not work @@ -823,13 +839,20 @@ add_channels(Data) -> add_channels_in_list([], Data) -> Data; add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> + #data{ + id = ResId, + mod = Mod, + state = State, + added_channels = AddedChannelsMap, + group = Group, + type = Type + } = Data, case emqx_resource:call_add_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig + ResId, Mod, State, ChannelID, ChannelConfig ) of {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, %% Set the channel status to connecting to indicate that %% we have not yet performed the initial health_check NewAddedChannelsMap = maps:put( @@ -843,12 +866,17 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> }, add_channels_in_list(Rest, NewData); {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => add_channel_failed, - id => Data#data.id, - channel_id => ChannelID, - reason => Reason - }), + IsDryRun = emqx_resource:is_dry_run(ResId), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "add_channel_failed", + resource_id => ResId, + channel_id => ChannelID, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:put( ChannelID, @@ -859,7 +887,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> added_channels = NewAddedChannelsMap }, %% Raise an alarm since the channel could not be added - _ = maybe_alarm(?status_disconnected, ChannelID, Error, no_prev_error), + _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error), add_channels_in_list(Rest, NewData) end. @@ -883,7 +911,8 @@ stop_resource(#data{id = ResId} = Data) -> false -> ok end, - _ = maybe_clear_alarm(ResId), + IsDryRun = emqx_resource:is_dry_run(ResId), + _ = maybe_clear_alarm(IsDryRun, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), NewData#data{status = ?rm_status_stopped}. @@ -894,16 +923,24 @@ remove_channels(Data) -> remove_channels_in_list([], Data, _KeepInChannelMap) -> Data; remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> - AddedChannelsMap = Data#data.added_channels, + #data{ + id = ResId, + added_channels = AddedChannelsMap, + mod = Mod, + state = State, + group = Group, + type = Type + } = Data, + IsDryRun = emqx_resource:is_dry_run(ResId), NewAddedChannelsMap = case KeepInChannelMap of true -> AddedChannelsMap; false -> - _ = maybe_clear_alarm(ChannelID), + _ = maybe_clear_alarm(IsDryRun, ChannelID), maps:remove(ChannelID, AddedChannelsMap) end, - case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of + case safe_call_remove_channel(ResId, Mod, State, ChannelID) of {ok, NewState} -> NewData = Data#data{ state = NewState, @@ -911,12 +948,18 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> }, remove_channels_in_list(Rest, NewData, KeepInChannelMap); {error, Reason} -> - ?SLOG(warning, #{ - msg => remove_channel_failed, - id => Data#data.id, - channel_id => ChannelID, - reason => Reason - }), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "remove_channel_failed", + resource_id => ResId, + group => Group, + type => Type, + channel_id => ChannelID, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), NewData = Data#data{ added_channels = NewAddedChannelsMap }, @@ -995,8 +1038,8 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) -> handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, - %% Deactivate alarm - _ = maybe_clear_alarm(ChannelId), + IsDryRun = emqx_resource:is_dry_run(Data#data.id), + _ = maybe_clear_alarm(IsDryRun, ChannelId), case channel_status_is_channel_added( maps:get(ChannelId, Channels, channel_status_not_added(undefined)) @@ -1017,13 +1060,18 @@ handle_remove_channel(From, ChannelId, Data) -> end. handle_remove_channel_exists(From, ChannelId, Data) -> + #data{ + id = Id, + group = Group, + type = Type, + added_channels = AddedChannelsMap + } = Data, case emqx_resource:call_remove_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelId + Id, Data#data.mod, Data#data.state, ChannelId ) of {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:remove(ChannelId, AddedChannelsMap), UpdatedData = Data#data{ state = NewState, @@ -1031,13 +1079,17 @@ handle_remove_channel_exists(From, ChannelId, Data) -> }, {keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]}; {error, Reason} = Error -> - %% Log the error as a warning - ?SLOG(warning, #{ - msg => remove_channel_failed, - id => Data#data.id, - channel_id => ChannelId, - reason => Reason - }), + IsDryRun = emqx_resource:is_dry_run(Id), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "remove_channel_failed", + resource_id => Id, + channel_id => ChannelId, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), {keep_state_and_data, [{reply, From, Error}]} end. @@ -1048,7 +1100,8 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, NewChannels = maps:remove(ChannelId, Channels), NewData = Data#data{added_channels = NewChannels}, - _ = maybe_clear_alarm(ChannelId), + IsDryRun = emqx_resource:is_dry_run(Data#data.id), + _ = maybe_clear_alarm(IsDryRun, ChannelId), {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when @@ -1117,7 +1170,8 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) -> error = PrevError } = Data0, {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0), - _ = maybe_alarm(NewStatus, ResId, Err, PrevError), + IsDryRun = emqx_resource:is_dry_run(ResId), + _ = maybe_alarm(NewStatus, IsDryRun, ResId, Err, PrevError), ok = maybe_resume_resource_workers(ResId, NewStatus), Data1 = Data0#data{ state = NewState, status = NewStatus, error = Err @@ -1141,11 +1195,17 @@ continue_resource_health_check_connected(NewStatus, Data0) -> Actions = Replies ++ resource_health_check_actions(Data), {keep_state, Data, Actions}; _ -> - ?SLOG(warning, #{ - msg => "health_check_failed", - id => Data0#data.id, - status => NewStatus - }), + #data{id = ResId, group = Group, type = Type} = Data0, + IsDryRun = emqx_resource:is_dry_run(ResId), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "health_check_failed", + resource_id => ResId, + status => NewStatus + }, + #{tag => tag(Group, Type)} + ), %% Note: works because, coincidentally, channel/resource status is a %% subset of resource manager state... But there should be a conversion %% between the two here, as resource manager also has `stopped', which is @@ -1241,7 +1301,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) -> channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> %% Whenever the resource is connecting: %% 1. Change the status of all added channels to connecting - %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) + %% 2. Raise alarms Channels = Data0#data.added_channels, ChannelsToChangeStatusFor = [ {ChannelId, Config} @@ -1267,9 +1327,10 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> || {ChannelId, NewStatus} <- maps:to_list(NewChannels) ], %% Raise alarms for all channels + IsDryRun = emqx_resource:is_dry_run(Data0#data.id), lists:foreach( fun({ChannelId, Status, PrevStatus}) -> - maybe_alarm(?status_connecting, ChannelId, Status, PrevStatus) + maybe_alarm(?status_connecting, IsDryRun, ChannelId, Status, PrevStatus) end, ChannelsWithNewAndPrevErrorStatuses ), @@ -1302,9 +1363,10 @@ channels_health_check(ConnectorStatus, Data0) -> || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms + IsDryRun = emqx_resource:is_dry_run(Data1#data.id), _ = lists:foreach( fun({ChannelId, OldStatus, NewStatus}) -> - _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus) + _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus) end, ChannelsWithNewAndOldStatuses ), @@ -1413,13 +1475,14 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta NewStatus = maps:get(ChannelId, Data1#data.added_channels), ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), + IsDryRun = emqx_resource:is_dry_run(Data1#data.id), %% Raise/clear alarms case NewStatus of #{status := ?status_connected} -> - _ = maybe_clear_alarm(ChannelId), + _ = maybe_clear_alarm(IsDryRun, ChannelId), ok; _ -> - _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus), + _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus), ok end, Data. @@ -1583,15 +1646,21 @@ remove_runtime_data(#data{} = Data0) -> health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). --spec maybe_alarm(resource_status(), resource_id(), _Error :: term(), _PrevError :: term()) -> ok. -maybe_alarm(?status_connected, _ResId, _Error, _PrevError) -> +-spec maybe_alarm( + resource_status(), + boolean(), + resource_id(), + _Error :: term(), + _PrevError :: term() +) -> ok. +maybe_alarm(?status_connected, _IsDryRun, _ResId, _Error, _PrevError) -> ok; -maybe_alarm(_Status, <>, _Error, _PrevError) -> +maybe_alarm(_Status, true, _ResId, _Error, _PrevError) -> ok; %% Assume that alarm is already active -maybe_alarm(_Status, _ResId, Error, Error) -> +maybe_alarm(_Status, _IsDryRun, _ResId, Error, Error) -> ok; -maybe_alarm(_Status, ResId, Error, _PrevError) -> +maybe_alarm(_Status, false, ResId, Error, _PrevError) -> HrError = case Error of {error, undefined} -> @@ -1623,10 +1692,10 @@ maybe_resume_resource_workers(ResId, ?status_connected) -> maybe_resume_resource_workers(_, _) -> ok. --spec maybe_clear_alarm(resource_id()) -> ok | {error, not_found}. -maybe_clear_alarm(<>) -> +-spec maybe_clear_alarm(boolean(), resource_id()) -> ok | {error, not_found}. +maybe_clear_alarm(true, _ResId) -> ok; -maybe_clear_alarm(ResId) -> +maybe_clear_alarm(false, ResId) -> emqx_alarm:safe_deactivate(ResId). parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> @@ -1642,7 +1711,8 @@ parse_health_check_result({error, Error}, Data) -> msg => "health_check_exception", resource_id => Data#data.id, reason => Error - } + }, + #{tag => tag(Data#data.group, Data#data.type)} ), {?status_disconnected, Data#data.state, {error, Error}}. @@ -1794,10 +1864,18 @@ add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) -> ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig), NewChannels = maps:put(ChannelId, ChannelStatus, Channels), ResStatus = state_to_status(State), - maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), + IsDryRun = emqx_resource:is_dry_run(ChannelId), + maybe_alarm(ResStatus, IsDryRun, ChannelId, ChannelStatus, no_prev), Data#data{added_channels = NewChannels}. state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_connected) -> ?status_connected; state_to_status(?state_connecting) -> ?status_connecting; state_to_status(?state_disconnected) -> ?status_disconnected. + +log_level(true) -> info; +log_level(false) -> warning. + +tag(Group, Type) -> + Str = emqx_utils_conv:str(Group) ++ "/" ++ emqx_utils_conv:str(Type), + string:uppercase(Str). diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 7af6eca81..8542eec1c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -58,10 +58,11 @@ init([]) -> child_spec(ResId, Group, ResourceType, Config, Opts) -> #{ id => ResId, - start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, + start => + {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, restart => transient, %% never force kill a resource manager. - %% becasue otherwise it may lead to release leak, + %% because otherwise it may lead to release leak, %% resource_manager's terminate callback calls resource on_stop shutdown => infinity, type => worker, diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 97a09b074..98a74dfad 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -17,6 +17,7 @@ -module(emqx_resource_metrics). -include_lib("emqx/include/logger.hrl"). +-include("emqx_resource.hrl"). -export([ events/0, @@ -74,7 +75,6 @@ success_get/1 ]). --define(RES_METRICS, resource_metrics). -define(TELEMETRY_PREFIX, emqx, resource). -spec events() -> [telemetry:event_name()]. @@ -127,15 +127,19 @@ handle_telemetry_event( %% We catch errors to avoid detaching the telemetry handler function. %% When restarting a resource while it's under load, there might be transient %% failures while the metrics are not yet created. - ?SLOG(warning, #{ - msg => "handle_resource_metrics_failed", - hint => "transient failures may occur when restarting a resource", - kind => Kind, - reason => Reason, - stacktrace => Stacktrace, - resource_id => ID, - event => Event - }), + ?SLOG( + warning, + #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }, + #{tag => ?TAG} + ), ok end; handle_telemetry_event( @@ -151,15 +155,19 @@ handle_telemetry_event( %% We catch errors to avoid detaching the telemetry handler function. %% When restarting a resource while it's under load, there might be transient %% failures while the metrics are not yet created. - ?SLOG(warning, #{ - msg => "handle_resource_metrics_failed", - hint => "transient failures may occur when restarting a resource", - kind => Kind, - reason => Reason, - stacktrace => Stacktrace, - resource_id => ID, - event => Event - }), + ?SLOG( + warning, + #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }, + #{tag => ?TAG} + ), ok end; handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> diff --git a/apps/emqx_resource/src/emqx_resource_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl index 47e7ed7ff..ba286e35c 100644 --- a/apps/emqx_resource/src/emqx_resource_pool.erl +++ b/apps/emqx_resource/src/emqx_resource_pool.erl @@ -26,6 +26,7 @@ ]). -include_lib("emqx/include/logger.hrl"). +-include("emqx_resource.hrl"). -ifndef(TEST). -define(HEALTH_CHECK_TIMEOUT, 15000). @@ -37,33 +38,43 @@ start(Name, Mod, Options) -> case ecpool:start_sup_pool(Name, Mod, Options) of {ok, _} -> - ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}), + ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}, #{tag => ?TAG}), ok; {error, {already_started, _Pid}} -> stop(Name), start(Name, Mod, Options); {error, Reason} -> NReason = parse_reason(Reason), - ?SLOG(error, #{ - msg => "start_ecpool_error", - pool_name => Name, - reason => NReason - }), + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG( + ?LOG_LEVEL(IsDryRun), + #{ + msg => "start_ecpool_error", + resource_id => Name, + reason => NReason + }, + #{tag => ?TAG} + ), {error, {start_pool_failed, Name, NReason}} end. stop(Name) -> case ecpool:stop_sup_pool(Name) of ok -> - ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}); + ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}, #{tag => ?TAG}); {error, not_found} -> ok; {error, Reason} -> - ?SLOG(error, #{ - msg => "stop_ecpool_failed", - pool_name => Name, - reason => Reason - }), + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG( + ?LOG_LEVEL(IsDryRun), + #{ + msg => "stop_ecpool_failed", + resource_id => Name, + reason => Reason + }, + #{tag => ?TAG} + ), error({stop_pool_failed, Name, Reason}) end. diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index 859b9fa52..47b724271 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -40,7 +40,7 @@ deprecated_since() -> -spec create( resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> @@ -51,7 +51,7 @@ create(ResId, Group, ResourceType, Config, Opts) -> ]). -spec create_dry_run( - resource_type(), + resource_module(), resource_config() ) -> ok | {error, Reason :: term()}. @@ -60,7 +60,7 @@ create_dry_run(ResourceType, Config) -> -spec recreate( resource_id(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 0fc11cc66..e068defb1 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -24,6 +24,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -62,6 +63,8 @@ register(required) -> true; register(default) -> false; register(_) -> undefined. +resource_type() -> demo. + callback_mode() -> persistent_term:get(?CM_KEY). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e77a90efa..f463c741b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3492,7 +3492,7 @@ gauge_metric_set_fns() -> ]. create(Id, Group, Type, Config) -> - emqx_resource:create_local(Id, Group, Type, Config). + emqx_resource:create_local(Id, Group, Type, Config, #{}). create(Id, Group, Type, Config, Opts) -> emqx_resource:create_local(Id, Group, Type, Config, Opts). diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 7d0000a1c..cf0aae4d8 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -128,3 +128,4 @@ -define(KEY_PATH, [rule_engine, rules]). -define(RULE_PATH(RULE), [rule_engine, rules, RULE]). +-define(TAG, "RULE"). diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 7d45c47e6..57e1d02b8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -134,7 +134,15 @@ republish( }, _Args ) -> - ?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic}); + ?SLOG( + error, + #{ + msg => "recursive_republish_detected", + topic => Topic, + rule_id => RuleId + }, + #{tag => ?TAG} + ); republish( Selected, #{metadata := #{rule_id := RuleId}} = Env, @@ -321,6 +329,8 @@ render_pub_props(UserPropertiesTemplate, Selected, Env) -> rule_id => emqx_utils_maps:deep_get([metadata, rule_id], ENV, undefined), reason => REASON, property => K + }#{ + tag => ?TAG } ) ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index e33c1d6fb..4f68175af 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -21,6 +21,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include("rule_engine.hrl"). -export([check_params/2]). @@ -36,10 +37,14 @@ check_params(Params, Tag) -> #{Tag := Checked} -> {ok, Checked} catch throw:Reason -> - ?SLOG(error, #{ - msg => "check_rule_params_failed", - reason => Reason - }), + ?SLOG( + info, + #{ + msg => "check_rule_params_failed", + reason => Reason + }, + #{tag => ?TAG} + ), {error, Reason} end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 359984228..c4b562b2b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -459,15 +459,15 @@ handle_call({delete_rule, Rule}, _From, State) -> ok = do_delete_rule(Rule), {reply, ok, State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", request => Req}), + ?SLOG(error, #{msg => "unexpected_call", request => Req}, #{tag => ?TAG}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", request => Msg}), + ?SLOG(error, #{msg => "unexpected_cast", request => Msg}, #{tag => ?TAG}), {noreply, State}. handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected_info", request => Info}), + ?SLOG(error, #{msg => "unexpected_info", request => Info}, #{tag => ?TAG}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 0ac59b36c..fa3ee3db0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -390,11 +390,15 @@ param_path_id() -> {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {201, format_rule_info_resp(Rule)}; {error, Reason} -> - ?SLOG(error, #{ - msg => "create_rule_failed", - id => Id, - reason => Reason - }), + ?SLOG( + info, + #{ + msg => "create_rule_failed", + rule_id => Id, + reason => Reason + }, + #{tag => ?TAG} + ), {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}} end end @@ -450,11 +454,15 @@ param_path_id() -> {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} -> {200, format_rule_info_resp(Rule)}; {error, Reason} -> - ?SLOG(error, #{ - msg => "update_rule_failed", - id => Id, - reason => Reason - }), + ?SLOG( + info, + #{ + msg => "update_rule_failed", + rule_id => Id, + reason => Reason + }, + #{tag => ?TAG} + ), {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}} end; '/rules/:id'(delete, #{bindings := #{id := Id}}) -> @@ -465,11 +473,15 @@ param_path_id() -> {ok, _} -> {204}; {error, Reason} -> - ?SLOG(error, #{ - msg => "delete_rule_failed", - id => Id, - reason => Reason - }), + ?SLOG( + error, + #{ + msg => "delete_rule_failed", + rule_id => Id, + reason => Reason + }, + #{tag => ?TAG} + ), {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}} end; not_found -> @@ -589,10 +601,15 @@ get_rule_metrics(Id) -> NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults], NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok], NodeErrors == [] orelse - ?SLOG(warning, #{ - msg => "rpc_get_rule_metrics_errors", - errors => NodeErrors - }), + ?SLOG( + warning, + #{ + msg => "rpc_get_rule_metrics_errors", + rule_id => Id, + errors => NodeErrors + }, + #{tag => ?TAG} + ), NodeMetrics. format_metrics(Node, #{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 39ed7440d..a3d9d5ebe 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -15,6 +15,7 @@ -module(emqx_rule_sqltester). -include_lib("emqx/include/logger.hrl"). +-include("rule_engine.hrl"). -export([ test/1, @@ -127,10 +128,15 @@ test(#{sql := Sql, context := Context}) -> end end; {error, Reason} -> - ?SLOG(debug, #{ - msg => "rulesql_parse_error", - detail => Reason - }), + ?SLOG( + debug, + #{ + msg => "rulesql_parse_error", + sql => Sql, + reason => Reason + }, + #{tag => ?TAG} + ), {error, Reason} end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl index 6a0d6b3ec..cf5d3afbe 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -40,6 +41,8 @@ ]). %% =================================================================== +resource_type() -> test_connector. + callback_mode() -> always_sync. on_start( diff --git a/apps/emqx_utils/src/emqx_utils_scram.erl b/apps/emqx_utils/src/emqx_utils_scram.erl new file mode 100644 index 000000000..9d0543703 --- /dev/null +++ b/apps/emqx_utils/src/emqx_utils_scram.erl @@ -0,0 +1,81 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_scram). + +-export([authenticate/6]). + +%%------------------------------------------------------------------------------ +%% Authentication +%%------------------------------------------------------------------------------ +authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, Conf) -> + case ensure_auth_method(AuthMethod, AuthData, Conf) of + true -> + case AuthCache of + #{next_step := client_final} -> + check_client_final_message(AuthData, AuthCache, Conf, OnErrFun); + _ -> + check_client_first_message(AuthData, AuthCache, Conf, RetrieveFun, OnErrFun) + end; + false -> + ignore + end. + +ensure_auth_method(_AuthMethod, undefined, _Conf) -> + false; +ensure_auth_method(<<"SCRAM-SHA-256">>, _AuthData, #{algorithm := sha256}) -> + true; +ensure_auth_method(<<"SCRAM-SHA-512">>, _AuthData, #{algorithm := sha512}) -> + true; +ensure_auth_method(_AuthMethod, _AuthData, _Conf) -> + false. + +check_client_first_message( + Bin, _Cache, #{iteration_count := IterationCount}, RetrieveFun, OnErrFun +) -> + case + esasl_scram:check_client_first_message( + Bin, + #{ + iteration_count => IterationCount, + retrieve => RetrieveFun + } + ) + of + {continue, ServerFirstMessage, Cache} -> + {continue, ServerFirstMessage, Cache}; + ignore -> + ignore; + {error, Reason} -> + OnErrFun("check_client_first_message_error", Reason), + {error, not_authorized} + end. + +check_client_final_message( + Bin, #{is_superuser := IsSuperuser} = Cache, #{algorithm := Alg}, OnErrFun +) -> + case + esasl_scram:check_client_final_message( + Bin, + Cache#{algorithm => Alg} + ) + of + {ok, ServerFinalMessage} -> + {ok, #{is_superuser => IsSuperuser}, ServerFinalMessage}; + {error, Reason} -> + OnErrFun("check_client_final_message_error", Reason), + {error, not_authorized} + end. diff --git a/changes/ee/feat-13504.en.md b/changes/ee/feat-13504.en.md new file mode 100644 index 000000000..c9b22f403 --- /dev/null +++ b/changes/ee/feat-13504.en.md @@ -0,0 +1 @@ +Added a HTTP backend for the authentication mechanism `scram`.