feat(authz): support authorization config file part 2

This commit is contained in:
zhanghongtong 2021-08-24 15:08:21 +08:00 committed by Rory Z
parent a7fac1a7a3
commit a2bafd1a18
20 changed files with 396 additions and 606 deletions

View File

@ -20,8 +20,8 @@ jobs:
container: ${{ matrix.container }} container: ${{ matrix.container }}
outputs: outputs:
profiles: ${{ steps.set_profile.outputs.profiles}} profiles: ${{ steps.set_profile.outputs.profiles }}
old_vsns: ${{ steps.set_profile.outputs.old_vsns}} old_vsns: ${{ steps.set_profile.outputs.old_vsns }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2

View File

@ -68,11 +68,6 @@ authorization_rules {
# } # }
# collection: mqtt_authz # collection: mqtt_authz
# find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] } # find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
# }, # }
{
permission = allow
action = all
topics = ["#"]
}
] ]
} }

View File

@ -19,8 +19,13 @@
-define(APP, emqx_authz). -define(APP, emqx_authz).
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))). -define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= <<"allow">>) orelse
-define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= all))). (A =:= deny) orelse (A =:= <<"deny">>)
)).
-define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= <<"subscribe">>) orelse
(A =:= publish) orelse (A =:= <<"publish">>) orelse
(A =:= all) orelse (A =:= <<"all">>)
)).
-record(authz_metrics, { -record(authz_metrics, {
allow = 'client.authorize.allow', allow = 'client.authorize.allow',

View File

@ -27,13 +27,11 @@
-export([ register_metrics/0 -export([ register_metrics/0
, init/0 , init/0
, init_rule/1
, lookup/0 , lookup/0
, lookup/1 , lookup/1
, move/2 , move/2
, update/2 , update/2
, authorize/5 , authorize/5
, match/4
]). ]).
-export([post_config_update/3, pre_config_update/2]). -export([post_config_update/3, pre_config_update/2]).
@ -47,7 +45,7 @@ register_metrics() ->
init() -> init() ->
ok = register_metrics(), ok = register_metrics(),
emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
NRules = [init_rule(Rule) || Rule <- emqx:get_config(?CONF_KEY_PATH, [])], NRules = [init_provider(Rule) || Rule <- emqx:get_config(?CONF_KEY_PATH, [])],
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NRules]}, -1). ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NRules]}, -1).
lookup() -> lookup() ->
@ -148,12 +146,12 @@ post_config_update({move, Id, #{<<"after">> := AfterId}}, _NewRules, _OldRules)
ok = emqx_authz_cache:drain_cache(); ok = emqx_authz_cache:drain_cache();
post_config_update({head, Rules}, _NewRules, _OldConf) -> post_config_update({head, Rules}, _NewRules, _OldConf) ->
InitedRules = [init_rule(R) || R <- check_rules(Rules)], InitedRules = [init_provider(R) || R <- check_rules(Rules)],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules ++ lookup()]}, -1), ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules ++ lookup()]}, -1),
ok = emqx_authz_cache:drain_cache(); ok = emqx_authz_cache:drain_cache();
post_config_update({tail, Rules}, _NewRules, _OldConf) -> post_config_update({tail, Rules}, _NewRules, _OldConf) ->
InitedRules = [init_rule(R) || R <- check_rules(Rules)], InitedRules = [init_provider(R) || R <- check_rules(Rules)],
emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedRules]}, -1), emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedRules]}, -1),
ok = emqx_authz_cache:drain_cache(); ok = emqx_authz_cache:drain_cache();
@ -167,14 +165,14 @@ post_config_update({{replace_once, Id}, Rule}, _NewRules, _OldConf) when is_map(
ok = emqx_resource:remove(Id) ok = emqx_resource:remove(Id)
end, end,
{OldRules1, OldRules2 } = lists:split(Index, OldInitedRules), {OldRules1, OldRules2 } = lists:split(Index, OldInitedRules),
InitedRules = [init_rule(R#{annotations => #{id => Id}}) || R <- check_rules([Rule])], InitedRules = [init_provider(R#{annotations => #{id => Id}}) || R <- check_rules([Rule])],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:droplast(OldRules1) ++ InitedRules ++ OldRules2]}, -1), ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:droplast(OldRules1) ++ InitedRules ++ OldRules2]}, -1),
ok = emqx_authz_cache:drain_cache(); ok = emqx_authz_cache:drain_cache();
post_config_update(_, NewRules, _OldConf) -> post_config_update(_, NewRules, _OldConf) ->
%% overwrite the entire config! %% overwrite the entire config!
OldInitedRules = lookup(), OldInitedRules = lookup(),
InitedRules = [init_rule(Rule) || Rule <- NewRules], InitedRules = [init_provider(Rule) || Rule <- NewRules],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules]}, -1), ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules]}, -1),
lists:foreach(fun (#{type := _Type, enable := true, annotations := #{id := Id}}) -> lists:foreach(fun (#{type := _Type, enable := true, annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id); ok = emqx_resource:remove(Id);
@ -235,29 +233,11 @@ create_resource(#{type := DB,
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
-spec(init_rule(rule()) -> rule()). -spec(init_provider(rule()) -> rule()).
init_rule(#{topics := Topics, init_provider(#{enable := true,
action := Action, type := file,
permission := Permission, path := Path
principal := Principal, } = Rule) ->
annotations := #{id := Id}
} = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) ->
Rule#{annotations =>
#{id => Id,
principal => compile_principal(Principal),
topics => [compile_topic(Topic) || Topic <- Topics]}
};
init_rule(#{topics := Topics,
action := Action,
permission := Permission
} = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) ->
init_rule(Rule#{annotations =>#{id => gen_id(simple)}});
init_rule(#{principal := Principal,
enable := true,
type := file,
path := Path
} = Rule) ->
Rules = case file:consult(Path) of Rules = case file:consult(Path) of
{ok, Terms} -> {ok, Terms} ->
[emqx_authz_rule:compile(Term) || Term <- Terms]; [emqx_authz_rule:compile(Term) || Term <- Terms];
@ -275,92 +255,42 @@ init_rule(#{principal := Principal,
#{id => gen_id(file), #{id => gen_id(file),
rules => Rules rules => Rules
}}; }};
init_rule(#{principal := Principal, init_provider(#{enable := true,
enable := true, type := http,
type := http, config := #{url := Url} = Config
config := #{url := Url} = Config } = Rule) ->
} = Rule) ->
NConfig = maps:merge(Config, #{base_url => maps:remove(query, Url)}), NConfig = maps:merge(Config, #{base_url => maps:remove(query, Url)}),
case create_resource(Rule#{config := NConfig}) of case create_resource(Rule#{config := NConfig}) of
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
Id -> Rule#{annotations => Id -> Rule#{annotations =>
#{id => Id, #{id => Id}
principal => compile_principal(Principal)
}
} }
end; end;
init_provider(#{enable := true,
init_rule(#{principal := Principal, type := DB
enable := true, } = Rule) when DB =:= redis;
type := DB DB =:= mongo ->
} = Rule) when DB =:= redis;
DB =:= mongo ->
case create_resource(Rule) of case create_resource(Rule) of
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
Id -> Rule#{annotations => Id -> Rule#{annotations =>
#{id => Id, #{id => Id}
principal => compile_principal(Principal)
}
} }
end; end;
init_provider(#{enable := true,
init_rule(#{principal := Principal, type := DB,
enable := true, sql := SQL
type := DB, } = Rule) when DB =:= mysql;
sql := SQL DB =:= pgsql ->
} = Rule) when DB =:= mysql;
DB =:= pgsql ->
Mod = list_to_existing_atom(io_lib:format("~s_~s",[?APP, DB])), Mod = list_to_existing_atom(io_lib:format("~s_~s",[?APP, DB])),
case create_resource(Rule) of case create_resource(Rule) of
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
Id -> Rule#{annotations => Id -> Rule#{annotations =>
#{id => Id, #{id => Id,
principal => compile_principal(Principal),
sql => Mod:parse_query(SQL) sql => Mod:parse_query(SQL)
} }
} }
end; end;
init_provider(#{enable := false} = Rule) ->Rule.
init_rule(#{enable := false,
type := _DB
} = Rule) ->
Rule.
compile_principal(all) -> all;
compile_principal(#{username := Username}) ->
{ok, MP} = re:compile(bin(Username)),
#{username => MP};
compile_principal(#{clientid := Clientid}) ->
{ok, MP} = re:compile(bin(Clientid)),
#{clientid => MP};
compile_principal(#{ipaddress := IpAddress}) ->
#{ipaddress => esockd_cidr:parse(b2l(IpAddress), true)};
compile_principal(#{'and' := Principals}) when is_list(Principals) ->
#{'and' => [compile_principal(Principal) || Principal <- Principals]};
compile_principal(#{'or' := Principals}) when is_list(Principals) ->
#{'or' => [compile_principal(Principal) || Principal <- Principals]}.
compile_topic(<<"eq ", Topic/binary>>) ->
compile_topic(#{'eq' => Topic});
compile_topic(#{'eq' := Topic}) ->
#{'eq' => emqx_topic:words(bin(Topic))};
compile_topic(Topic) when is_binary(Topic)->
Words = emqx_topic:words(bin(Topic)),
case pattern(Words) of
true -> #{pattern => Words};
false -> Words
end.
pattern(Words) ->
lists:member(<<"%u">>, Words) orelse lists:member(<<"%c">>, Words).
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(B) when is_binary(B) -> B;
bin(L) when is_list(L) -> list_to_binary(L);
bin(X) -> X.
b2l(B) when is_list(B) -> B;
b2l(B) when is_binary(B) -> binary_to_list(B).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% AuthZ callbacks %% AuthZ callbacks
@ -387,97 +317,21 @@ authorize(#{username := Username,
end. end.
do_authorize(Client, PubSub, Topic, do_authorize(Client, PubSub, Topic,
[Connector = #{type := DB, [#{type := file,
enable := true, enable := true,
annotations := #{principal := Principal} annotations := #{rule := Rules}
} | Tail] ) -> } | Tail] ) ->
case match_principal(Client, Principal) of case emqx_authz_rule:match(Client, PubSub, Topic, Rules) of
true -> nomatch -> do_authorize(Client, PubSub, Topic, Tail);
Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_authz, DB])), Matched -> Matched
case Mod:authorize(Client, PubSub, Topic, Connector) of
nomatch -> do_authorize(Client, PubSub, Topic, Tail);
Matched -> Matched
end;
false -> do_authorize(Client, PubSub, Topic, Tail)
end; end;
do_authorize(Client, PubSub, Topic, do_authorize(Client, PubSub, Topic,
[#{permission := Permission} = Rule | Tail]) -> [Connector = #{type := Type,
case match(Client, PubSub, Topic, Rule) of enable := true
true -> {matched, Permission}; } | Tail] ) ->
false -> do_authorize(Client, PubSub, Topic, Tail) Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_authz, Type])),
case Mod:authorize(Client, PubSub, Topic, Connector) of
nomatch -> do_authorize(Client, PubSub, Topic, Tail);
Matched -> Matched
end; end;
do_authorize(_Client, _PubSub, _Topic, []) -> nomatch. do_authorize(_Client, _PubSub, _Topic, []) -> nomatch.
match(Client, PubSub, Topic,
#{action := Action,
annotations := #{
principal := Principal,
topics := TopicFilters
}
}) ->
match_action(PubSub, Action) andalso
match_principal(Client, Principal) andalso
match_topics(Client, Topic, TopicFilters).
match_action(publish, publish) -> true;
match_action(subscribe, subscribe) -> true;
match_action(_, all) -> true;
match_action(_, _) -> false.
match_principal(_, all) -> true;
match_principal(#{username := undefined}, #{username := _MP}) ->
false;
match_principal(#{username := Username}, #{username := MP}) ->
case re:run(Username, MP) of
{match, _} -> true;
_ -> false
end;
match_principal(#{clientid := Clientid}, #{clientid := MP}) ->
case re:run(Clientid, MP) of
{match, _} -> true;
_ -> false
end;
match_principal(#{peerhost := undefined}, #{ipaddress := _CIDR}) ->
false;
match_principal(#{peerhost := IpAddress}, #{ipaddress := CIDR}) ->
esockd_cidr:match(IpAddress, CIDR);
match_principal(ClientInfo, #{'and' := Principals}) when is_list(Principals) ->
lists:foldl(fun(Principal, Permission) ->
match_principal(ClientInfo, Principal) andalso Permission
end, true, Principals);
match_principal(ClientInfo, #{'or' := Principals}) when is_list(Principals) ->
lists:foldl(fun(Principal, Permission) ->
match_principal(ClientInfo, Principal) orelse Permission
end, false, Principals);
match_principal(_, _) -> false.
match_topics(_ClientInfo, _Topic, []) ->
false;
match_topics(ClientInfo, Topic, [#{pattern := PatternFilter}|Filters]) ->
TopicFilter = feed_var(ClientInfo, PatternFilter),
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(ClientInfo, Topic, Filters);
match_topics(ClientInfo, Topic, [TopicFilter|Filters]) ->
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(ClientInfo, Topic, Filters).
match_topic(Topic, #{'eq' := TopicFilter}) ->
Topic == TopicFilter;
match_topic(Topic, TopicFilter) ->
emqx_topic:match(Topic, TopicFilter).
feed_var(ClientInfo, Pattern) ->
feed_var(ClientInfo, Pattern, []).
feed_var(_ClientInfo, [], Acc) ->
lists:reverse(Acc);
feed_var(ClientInfo = #{clientid := undefined}, [<<"%c">>|Words], Acc) ->
feed_var(ClientInfo, Words, [<<"%c">>|Acc]);
feed_var(ClientInfo = #{clientid := ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(ClientInfo, Words, [ClientId |Acc]);
feed_var(ClientInfo = #{username := undefined}, [<<"%u">>|Words], Acc) ->
feed_var(ClientInfo, Words, [<<"%u">>|Acc]);
feed_var(ClientInfo = #{username := Username}, [<<"%u">>|Words], Acc) ->
feed_var(ClientInfo, Words, [Username|Acc]);
feed_var(ClientInfo, [W|Words], Acc) ->
feed_var(ClientInfo, Words, [W|Acc]).

View File

@ -419,12 +419,26 @@ move_rule_api() ->
{"/authorization/:id/move", Metadata, move_rule}. {"/authorization/:id/move", Metadata, move_rule}.
rules(get, #{query_string := Query}) -> rules(get, #{query_string := Query}) ->
Rules = lists:foldl(fun (#{type := _Type, enable := true, annotations := #{id := Id} = Annotations} = Rule, AccIn) -> Rules = lists:foldl(fun (#{type := _Type, enable := true, config := #{server := Server} = Config, annotations := #{id := Id}} = Rule, AccIn) ->
NRule = case emqx_resource:health_check(Id) of NRule = case emqx_resource:health_check(Id) of
ok -> ok ->
Rule#{annotations => Annotations#{status => healthy}}; Rule#{config => Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)},
annotations => #{id => Id,
status => healthy}};
_ -> _ ->
Rule#{annotations => Annotations#{status => unhealthy}} Rule#{config => Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)},
annotations => #{id => Id,
status => unhealthy}}
end,
lists:append(AccIn, [NRule]);
(#{type := _Type, enable := true, annotations := #{id := Id}} = Rule, AccIn) ->
NRule = case emqx_resource:health_check(Id) of
ok ->
Rule#{annotations => #{id => Id,
status => healthy}};
_ ->
Rule#{annotations => #{id => Id,
status => unhealthy}}
end, end,
lists:append(AccIn, [NRule]); lists:append(AccIn, [NRule]);
(Rule, AccIn) -> (Rule, AccIn) ->
@ -462,17 +476,26 @@ rules(put, #{body := RawConfig}) ->
rule(get, #{bindings := #{id := Id}}) -> rule(get, #{bindings := #{id := Id}}) ->
case emqx_authz:lookup(Id) of case emqx_authz:lookup(Id) of
{error, Reason} -> {404, #{messgae => atom_to_binary(Reason)}}; {error, Reason} -> {404, #{messgae => atom_to_binary(Reason)}};
Rule -> #{type := file} = Rule -> {200, Rule};
case maps:get(type, Rule, undefined) of #{config := #{server := Server} = Config} = Rule ->
undefined -> {200, Rule}; case emqx_resource:health_check(Id) of
ok ->
{200, Rule#{config => Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)},
annotations => #{id => Id,
status => healthy}}};
_ -> _ ->
case emqx_resource:health_check(Id) of {200, Rule#{config => Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)},
ok -> annotations => #{id => Id,
{200, Rule#{annotations => #{status => healthy}}}; status => unhealthy}}}
_ -> end;
{200, Rule#{annotations => #{status => unhealthy}}} Rule ->
end case emqx_resource:health_check(Id) of
ok ->
{200, Rule#{annotations => #{id => Id,
status => healthy}}};
_ ->
{200, Rule#{annotations => #{id => Id,
status => unhealthy}}}
end end
end; end;
rule(put, #{bindings := #{id := RuleId}, body := RawConfig}) -> rule(put, #{bindings := #{id := RuleId}, body := RawConfig}) ->

View File

@ -44,38 +44,19 @@ authorize(Client, PubSub, Topic,
nomatch; nomatch;
[] -> nomatch; [] -> nomatch;
Rows -> Rows ->
do_authorize(Client, PubSub, Topic, Rows) Rules = [ emqx_authz_rule:compile({Permission, all, Action, Topics})
|| #{<<"topics">> := Topics, <<"permission">> := Permission, <<"action">> := Action} <- Rows],
do_authorize(Client, PubSub, Topic, Rules)
end. end.
do_authorize(_Client, _PubSub, _Topic, []) -> do_authorize(_Client, _PubSub, _Topic, []) ->
nomatch; nomatch;
do_authorize(Client, PubSub, Topic, [Rule | Tail]) -> do_authorize(Client, PubSub, Topic, [Rule | Tail]) ->
case match(Client, PubSub, Topic, Rule) of case emqx_authz_rule:match(Client, PubSub, Topic, Rule) of
{matched, Permission} -> {matched, Permission}; {matched, Permission} -> {matched, Permission};
nomatch -> do_authorize(Client, PubSub, Topic, Tail) nomatch -> do_authorize(Client, PubSub, Topic, Tail)
end. end.
match(Client, PubSub, Topic,
#{<<"topics">> := Topics,
<<"permission">> := Permission,
<<"action">> := Action
}) ->
Rule = #{<<"permission">> => Permission,
<<"topics">> => Topics,
<<"action">> => Action
},
#{simple_rule :=
#{permission := NPermission} = NRule
} = hocon_schema:check_plain(
emqx_authz_schema,
#{<<"simple_rule">> => Rule},
#{atom_key => true},
[simple_rule]),
case emqx_authz:match(Client, PubSub, Topic, emqx_authz:init_rule(NRule)) of
true -> {matched, NPermission};
false -> nomatch
end.
replvar(Find, #{clientid := Clientid, replvar(Find, #{clientid := Clientid,
username := Username, username := Username,
peerhost := IpAddress peerhost := IpAddress

View File

@ -62,39 +62,25 @@ authorize(Client, PubSub, Topic,
do_authorize(_Client, _PubSub, _Topic, _Columns, []) -> do_authorize(_Client, _PubSub, _Topic, _Columns, []) ->
nomatch; nomatch;
do_authorize(Client, PubSub, Topic, Columns, [Row | Tail]) -> do_authorize(Client, PubSub, Topic, Columns, [Row | Tail]) ->
case match(Client, PubSub, Topic, format_result(Columns, Row)) of case emqx_authz_rule:match(Client, PubSub, Topic,
emqx_authz_rule:compile(format_result(Columns, Row))
) of
{matched, Permission} -> {matched, Permission}; {matched, Permission} -> {matched, Permission};
nomatch -> do_authorize(Client, PubSub, Topic, Columns, Tail) nomatch -> do_authorize(Client, PubSub, Topic, Columns, Tail)
end. end.
format_result(Columns, Row) ->
L = [ begin
K = lists:nth(I, Columns),
V = lists:nth(I, Row),
{K, V}
end || I <- lists:seq(1, length(Columns)) ],
maps:from_list(L).
match(Client, PubSub, Topic, format_result(Columns, Row) ->
#{<<"permission">> := Permission, Permission = lists:nth(index(<<"permission">>, Columns), Row),
<<"action">> := Action, Action = lists:nth(index(<<"action">>, Columns), Row),
<<"topic">> := TopicFilter Topic = lists:nth(index(<<"topic">>, Columns), Row),
}) -> {Permission, all, Action, [Topic]}.
Rule = #{<<"topics">> => [TopicFilter],
<<"action">> => Action, index(Elem, List) ->
<<"permission">> => Permission index(Elem, List, 1).
}, index(_Elem, [], _Index) -> {error, not_found};
#{simple_rule := index(Elem, [ Elem | _List], Index) -> Index;
#{permission := NPermission} = NRule index(Elem, [ _ | List], Index) -> index(Elem, List, Index + 1).
} = hocon_schema:check_plain(
emqx_authz_schema,
#{<<"simple_rule">> => Rule},
#{atom_key => true},
[simple_rule]),
case emqx_authz:match(Client, PubSub, Topic, emqx_authz:init_rule(NRule)) of
true -> {matched, NPermission};
false -> nomatch
end.
replvar(Params, ClientInfo) -> replvar(Params, ClientInfo) ->
replvar(Params, ClientInfo, []). replvar(Params, ClientInfo, []).

View File

@ -66,39 +66,25 @@ authorize(Client, PubSub, Topic,
do_authorize(_Client, _PubSub, _Topic, _Columns, []) -> do_authorize(_Client, _PubSub, _Topic, _Columns, []) ->
nomatch; nomatch;
do_authorize(Client, PubSub, Topic, Columns, [Row | Tail]) -> do_authorize(Client, PubSub, Topic, Columns, [Row | Tail]) ->
case match(Client, PubSub, Topic, format_result(Columns, Row)) of case emqx_authz_rule:match(Client, PubSub, Topic,
emqx_authz_rule:compile(format_result(Columns, Row))
) of
{matched, Permission} -> {matched, Permission}; {matched, Permission} -> {matched, Permission};
nomatch -> do_authorize(Client, PubSub, Topic, Columns, Tail) nomatch -> do_authorize(Client, PubSub, Topic, Columns, Tail)
end. end.
format_result(Columns, Row) -> format_result(Columns, Row) ->
L = [ begin Permission = lists:nth(index(<<"permission">>, 2, Columns), erlang:tuple_to_list(Row)),
{column, K, _, _, _, _, _, _, _} = lists:nth(I, Columns), Action = lists:nth(index(<<"action">>, 2, Columns), erlang:tuple_to_list(Row)),
V = lists:nth(I, tuple_to_list(Row)), Topic = lists:nth(index(<<"topic">>, 2, Columns), erlang:tuple_to_list(Row)),
{K, V} {Permission, all, Action, [Topic]}.
end || I <- lists:seq(1, length(Columns)) ],
maps:from_list(L).
match(Client, PubSub, Topic, index(Key, N, TupleList) when is_integer(N) ->
#{<<"permission">> := Permission, Tuple = lists:keyfind(Key, N, TupleList),
<<"action">> := Action, index(Tuple, TupleList, 1);
<<"topic">> := TopicFilter index(_Tuple, [], _Index) -> {error, not_found};
}) -> index(Tuple, [Tuple | _TupleList], Index) -> Index;
Rule = #{<<"topics">> => [TopicFilter], index(Tuple, [_ | TupleList], Index) -> index(Tuple, TupleList, Index + 1).
<<"action">> => Action,
<<"permission">> => Permission
},
#{simple_rule :=
#{permission := NPermission} = NRule
} = hocon_schema:check_plain(
emqx_authz_schema,
#{<<"simple_rule">> => Rule},
#{atom_key => true},
[simple_rule]),
case emqx_authz:match(Client, PubSub, Topic, emqx_authz:init_rule(NRule)) of
true -> {matched, NPermission};
false -> nomatch
end.
replvar(Params, ClientInfo) -> replvar(Params, ClientInfo) ->
replvar(Params, ClientInfo, []). replvar(Params, ClientInfo, []).

View File

@ -50,35 +50,13 @@ authorize(Client, PubSub, Topic,
do_authorize(_Client, _PubSub, _Topic, []) -> do_authorize(_Client, _PubSub, _Topic, []) ->
nomatch; nomatch;
do_authorize(Client, PubSub, Topic, [TopicFilter, Action | Tail]) -> do_authorize(Client, PubSub, Topic, [TopicFilter, Action | Tail]) ->
case match(Client, PubSub, Topic, case emqx_authz_rule:match(Client, PubSub, Topic,
#{topics => TopicFilter, emqx_authz_rule:compile({allow, all, Action, [TopicFilter]})
action => Action )of
})
of
{matched, Permission} -> {matched, Permission}; {matched, Permission} -> {matched, Permission};
nomatch -> do_authorize(Client, PubSub, Topic, Tail) nomatch -> do_authorize(Client, PubSub, Topic, Tail)
end. end.
match(Client, PubSub, Topic,
#{topics := TopicFilter,
action := Action
}) ->
Rule = #{<<"principal">> => all,
<<"topics">> => [TopicFilter],
<<"action">> => Action,
<<"permission">> => allow
},
#{simple_rule := NRule
} = hocon_schema:check_plain(
emqx_authz_schema,
#{<<"simple_rule">> => Rule},
#{atom_key => true},
[simple_rule]),
case emqx_authz:match(Client, PubSub, Topic, emqx_authz:init_rule(NRule)) of
true -> {matched, allow};
false -> nomatch
end.
replvar(Cmd, Client = #{cn := CN}) -> replvar(Cmd, Client = #{cn := CN}) ->
replvar(repl(Cmd, "%C", CN), maps:remove(cn, Client)); replvar(repl(Cmd, "%C", CN), maps:remove(cn, Client));
replvar(Cmd, Client = #{dn := DN}) -> replvar(Cmd, Client = #{dn := DN}) ->

View File

@ -26,15 +26,14 @@
%% APIs %% APIs
-export([ match/4 -export([ match/4
, matches/4
, compile/1 , compile/1
]). ]).
-export_type([rule/0]). -export_type([rule/0]).
compile({Permission, Who, Action, TopicFilters}) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(TopicFilters) -> compile({Permission, Who, Action, TopicFilters}) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(TopicFilters) ->
{Permission, compile_who(Who), Action, [compile_topic(Topic) || Topic <- TopicFilters]}; {atom(Permission), compile_who(Who), atom(Action), [compile_topic(Topic) || Topic <- TopicFilters]}.
compile({Permission, Who, Action, Topic}) when ?ALLOW_DENY(Permission), ?PUBSUB(Action) ->
{Permission, compile_who(Who), Action, [compile_topic(Topic)]}.
compile_who(all) -> all; compile_who(all) -> all;
compile_who({username, Username}) -> compile_who({username, Username}) ->
@ -52,6 +51,8 @@ compile_who({'and', L}) when is_list(L) ->
compile_who({'or', L}) when is_list(L) -> compile_who({'or', L}) when is_list(L) ->
{'or', [compile_who(Who) || Who <- L]}. {'or', [compile_who(Who) || Who <- L]}.
compile_topic(<<"eq ", Topic/binary>>) ->
{eq, emqx_topic:words(Topic)};
compile_topic({eq, Topic}) -> compile_topic({eq, Topic}) ->
{eq, emqx_topic:words(bin(Topic))}; {eq, emqx_topic:words(bin(Topic))};
compile_topic(Topic) -> compile_topic(Topic) ->
@ -64,11 +65,32 @@ compile_topic(Topic) ->
pattern(Words) -> pattern(Words) ->
lists:member(<<"%u">>, Words) orelse lists:member(<<"%c">>, Words). lists:member(<<"%u">>, Words) orelse lists:member(<<"%c">>, Words).
atom(B) when is_binary(B) ->
try binary_to_existing_atom(B, utf8)
catch
_ -> binary_to_atom(B)
end;
atom(L) when is_list(L) ->
try list_to_existing_atom(L)
catch
_ -> list_to_atom(L)
end;
atom(A) when is_atom(A) -> A.
bin(L) when is_list(L) -> bin(L) when is_list(L) ->
list_to_binary(L); list_to_binary(L);
bin(B) when is_binary(B) -> bin(B) when is_binary(B) ->
B. B.
-spec(matches(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), [rule()])
-> {matched, allow} | {matched, deny} | nomatch).
matches(Client, PubSub, Topic, []) -> nomatch;
matches(Client, PubSub, Topic, [{Permission, Who, Action, TopicFilters} | Tail]) ->
case match(Client, PubSub, Topic, {Permission, Who, Action, TopicFilters}) of
nomatch -> matches(Client, PubSub, Topic, Tail);
Matched -> Matched
end.
-spec(match(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), rule()) -spec(match(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), rule())
-> {matched, allow} | {matched, deny} | nomatch). -> {matched, allow} | {matched, deny} | nomatch).
match(Client, PubSub, Topic, {Permission, Who, Action, TopicFilters}) -> match(Client, PubSub, Topic, {Permission, Who, Action, TopicFilters}) ->

View File

@ -23,8 +23,7 @@ fields("authorization_rules") ->
[ {rules, rules()} [ {rules, rules()}
]; ];
fields(file) -> fields(file) ->
[ {principal, principal()} [ {type, #{type => http}}
, {type, #{type => http}}
, {enable, #{type => boolean(), , {enable, #{type => boolean(),
default => true}} default => true}}
, {path, #{type => string(), , {path, #{type => string(),
@ -36,8 +35,7 @@ fields(file) ->
}} }}
]; ];
fields(http) -> fields(http) ->
[ {principal, principal()} [ {type, #{type => http}}
, {type, #{type => http}}
, {enable, #{type => boolean(), , {enable, #{type => boolean(),
default => true}} default => true}}
, {config, #{type => hoconsc:union([ hoconsc:ref(?MODULE, http_get) , {config, #{type => hoconsc:union([ hoconsc:ref(?MODULE, http_get)
@ -113,16 +111,6 @@ fields(mysql) ->
fields(pgsql) -> fields(pgsql) ->
connector_fields(pgsql) ++ connector_fields(pgsql) ++
[ {sql, query()} ]; [ {sql, query()} ];
fields(simple_rule) ->
[ {permission, #{type => permission()}}
, {action, #{type => action()}}
, {topics, #{type => union_array(
[ binary()
, hoconsc:ref(?MODULE, eq_topic)
]
)}}
, {principal, principal()}
];
fields(username) -> fields(username) ->
[{username, #{type => binary()}}]; [{username, #{type => binary()}}];
fields(clientid) -> fields(clientid) ->
@ -160,8 +148,7 @@ union_array(Item) when is_list(Item) ->
rules() -> rules() ->
#{type => union_array( #{type => union_array(
[ hoconsc:ref(?MODULE, simple_rule) [ hoconsc:ref(?MODULE, file)
, hoconsc:ref(?MODULE, file)
, hoconsc:ref(?MODULE, http) , hoconsc:ref(?MODULE, http)
, hoconsc:ref(?MODULE, mysql) , hoconsc:ref(?MODULE, mysql)
, hoconsc:ref(?MODULE, pgsql) , hoconsc:ref(?MODULE, pgsql)
@ -170,18 +157,6 @@ rules() ->
]) ])
}. }.
principal() ->
#{default => all,
type => hoconsc:union(
[ all
, hoconsc:ref(?MODULE, username)
, hoconsc:ref(?MODULE, clientid)
, hoconsc:ref(?MODULE, ipaddress)
, hoconsc:ref(?MODULE, andlist)
, hoconsc:ref(?MODULE, orlist)
])
}.
query() -> query() ->
#{type => binary(), #{type => binary(),
validator => fun(S) -> validator => fun(S) ->
@ -202,8 +177,7 @@ connector_fields(DB) ->
Error -> Error ->
erlang:error(Error) erlang:error(Error)
end, end,
[ {principal, principal()} [ {type, #{type => DB}}
, {type, #{type => DB}}
, {enable, #{type => boolean(), , {enable, #{type => boolean(),
default => true}} default => true}}
] ++ Mod:fields(""). ] ++ Mod:fields("").

View File

@ -31,6 +31,10 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT),
ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_ct_helpers:start_apps([emqx_authz]),
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
@ -39,43 +43,63 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx_authz:update(replace, []),
emqx_ct_helpers:stop_apps([emqx_authz]), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]),
meck:unload(emqx_resource),
ok. ok.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx_authz:update(replace, []),
Config. Config.
-define(RULE1, #{<<"principal">> => <<"all">>, -define(RULE1, #{<<"type">> => <<"http">>,
<<"topics">> => [<<"#">>], <<"config">> => #{
<<"action">> => <<"all">>, <<"url">> => <<"https://fake.com:443/">>,
<<"permission">> => <<"deny">>} <<"headers">> => #{},
). <<"method">> => <<"get">>,
-define(RULE2, #{<<"principal">> => <<"request_timeout">> => 5000}
#{<<"ipaddress">> => <<"127.0.0.1">>}, }).
<<"topics">> => -define(RULE2, #{<<"type">> => <<"mongo">>,
[#{<<"eq">> => <<"#">>}, <<"config">> => #{
#{<<"eq">> => <<"+">>} <<"mongo_type">> => <<"single">>,
] , <<"server">> => <<"127.0.0.1:27017">>,
<<"action">> => <<"all">>, <<"pool_size">> => 1,
<<"permission">> => <<"allow">>} <<"database">> => <<"mqtt">>,
). <<"ssl">> => #{<<"enable">> => false}},
-define(RULE3,#{<<"principal">> => <<"collection">> => <<"fake">>,
#{<<"and">> => [#{<<"username">> => <<"^test?">>}, <<"find">> => #{<<"a">> => <<"b">>}
#{<<"clientid">> => <<"^test?">>} }).
]}, -define(RULE3, #{<<"type">> => <<"mysql">>,
<<"topics">> => [<<"test">>], <<"config">> => #{
<<"action">> => <<"publish">>, <<"server">> => <<"127.0.0.1:27017">>,
<<"permission">> => <<"allow">>} <<"pool_size">> => 1,
). <<"database">> => <<"mqtt">>,
-define(RULE4,#{<<"principal">> => <<"username">> => <<"xx">>,
#{<<"or">> => [#{<<"username">> => <<"^test">>}, <<"password">> => <<"ee">>,
#{<<"clientid">> => <<"test?">>} <<"auto_reconnect">> => true,
]}, <<"ssl">> => #{<<"enable">> => false}},
<<"topics">> => [<<"%u">>,<<"%c">>], <<"sql">> => <<"abcb">>
<<"action">> => <<"publish">>, }).
<<"permission">> => <<"deny">>} -define(RULE4, #{<<"type">> => <<"pgsql">>,
). <<"config">> => #{
<<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1,
<<"database">> => <<"mqtt">>,
<<"username">> => <<"xx">>,
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false}},
<<"sql">> => <<"abcb">>
}).
-define(RULE5, #{<<"type">> => <<"redis">>,
<<"config">> => #{
<<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1,
<<"database">> => 0,
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false}},
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
@ -86,73 +110,50 @@ t_update_rule(_) ->
{ok, _} = emqx_authz:update(head, [?RULE1]), {ok, _} = emqx_authz:update(head, [?RULE1]),
{ok, _} = emqx_authz:update(tail, [?RULE3]), {ok, _} = emqx_authz:update(tail, [?RULE3]),
dbg:tracer(),dbg:p(all,c),
dbg:tpl(hocon_schema, check, cx),
Lists1 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE3]), Lists1 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE3]),
?assertMatch(Lists1, emqx:get_config([authorization_rules, rules], [])), ?assertMatch(Lists1, emqx:get_config([authorization_rules, rules], [])),
[#{annotations := #{id := Id1, [#{annotations := #{id := Id1}, type := http},
principal := all, #{annotations := #{id := Id2}, type := mongo},
topics := [['#']]} #{annotations := #{id := Id3}, type := mysql}
},
#{annotations := #{id := Id2,
principal := #{ipaddress := {{127,0,0,1},{127,0,0,1},32}},
topics := [#{eq := ['#']}, #{eq := ['+']}]}
},
#{annotations := #{id := Id3,
principal :=
#{'and' := [#{username := {re_pattern, _, _, _, _}},
#{clientid := {re_pattern, _, _, _, _}}
]
},
topics := [[<<"test">>]]}
}
] = emqx_authz:lookup(), ] = emqx_authz:lookup(),
{ok, _} = emqx_authz:update({replace_once, Id1}, ?RULE5),
{ok, _} = emqx_authz:update({replace_once, Id3}, ?RULE4), {ok, _} = emqx_authz:update({replace_once, Id3}, ?RULE4),
Lists2 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE4]), Lists2 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE4]),
?assertMatch(Lists2, emqx:get_config([authorization_rules, rules], [])), ?assertMatch(Lists2, emqx:get_config([authorization_rules, rules], [])),
[#{annotations := #{id := Id1, [#{annotations := #{id := Id1}, type := redis},
principal := all, #{annotations := #{id := Id2}, type := mongo},
topics := [['#']]} #{annotations := #{id := Id3}, type := pgsql}
},
#{annotations := #{id := Id2,
principal := #{ipaddress := {{127,0,0,1},{127,0,0,1},32}},
topics := [#{eq := ['#']},
#{eq := ['+']}]}
},
#{annotations := #{id := Id3,
principal :=
#{'or' := [#{username := {re_pattern, _, _, _, _}},
#{clientid := {re_pattern, _, _, _, _}}
]
},
topics := [#{pattern := [<<"%u">>]},
#{pattern := [<<"%c">>]}
]}
}
] = emqx_authz:lookup(), ] = emqx_authz:lookup(),
{ok, _} = emqx_authz:update(replace, []). {ok, _} = emqx_authz:update(replace, []).
t_move_rule(_) -> t_move_rule(_) ->
{ok, _} = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4]), {ok, _} = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4, ?RULE5]),
[#{annotations := #{id := Id1}}, [#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id3}}, #{annotations := #{id := Id3}},
#{annotations := #{id := Id4}} #{annotations := #{id := Id4}},
#{annotations := #{id := Id5}}
] = emqx_authz:lookup(), ] = emqx_authz:lookup(),
{ok, _} = emqx_authz:move(Id4, <<"top">>), {ok, _} = emqx_authz:move(Id4, <<"top">>),
?assertMatch([#{annotations := #{id := Id4}}, ?assertMatch([#{annotations := #{id := Id4}},
#{annotations := #{id := Id1}}, #{annotations := #{id := Id1}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id3}} #{annotations := #{id := Id3}},
#{annotations := #{id := Id5}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),
{ok, _} = emqx_authz:move(Id1, <<"bottom">>), {ok, _} = emqx_authz:move(Id1, <<"bottom">>),
?assertMatch([#{annotations := #{id := Id4}}, ?assertMatch([#{annotations := #{id := Id4}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id3}}, #{annotations := #{id := Id3}},
#{annotations := #{id := Id5}},
#{annotations := #{id := Id1}} #{annotations := #{id := Id1}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),
@ -160,66 +161,15 @@ t_move_rule(_) ->
?assertMatch([#{annotations := #{id := Id3}}, ?assertMatch([#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}}, #{annotations := #{id := Id4}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id5}},
#{annotations := #{id := Id1}} #{annotations := #{id := Id1}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),
{ok, _} = emqx_authz:move(Id2, #{<<"after">> => Id1}), {ok, _} = emqx_authz:move(Id2, #{<<"after">> => Id1}),
?assertMatch([#{annotations := #{id := Id3}}, ?assertMatch([#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}}, #{annotations := #{id := Id4}},
#{annotations := #{id := Id5}},
#{annotations := #{id := Id1}}, #{annotations := #{id := Id1}},
#{annotations := #{id := Id2}} #{annotations := #{id := Id2}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),
ok. ok.
t_authz(_) ->
ClientInfo1 = #{clientid => <<"test">>,
username => <<"test">>,
peerhost => {127,0,0,1},
zone => default,
listener => mqtt_tcp
},
ClientInfo2 = #{clientid => <<"test">>,
username => <<"test">>,
peerhost => {192,168,0,10},
zone => default,
listener => mqtt_tcp
},
ClientInfo3 = #{clientid => <<"test">>,
username => <<"fake">>,
peerhost => {127,0,0,1},
zone => default,
listener => mqtt_tcp
},
ClientInfo4 = #{clientid => <<"fake">>,
username => <<"test">>,
peerhost => {127,0,0,1},
zone => default,
listener => mqtt_tcp
},
Rules1 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE1, ?RULE2])],
Rules2 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE2, ?RULE1])],
Rules3 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE3, ?RULE4])],
Rules4 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE4, ?RULE1])],
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo1, subscribe, <<"#">>, deny, [])),
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo1, subscribe, <<"+">>, deny, Rules1)),
?assertEqual({stop, allow},
emqx_authz:authorize(ClientInfo1, subscribe, <<"+">>, deny, Rules2)),
?assertEqual({stop, allow},
emqx_authz:authorize(ClientInfo1, publish, <<"test">>, deny, Rules3)),
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo1, publish, <<"test">>, deny, Rules4)),
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo2, subscribe, <<"#">>, deny, Rules2)),
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo3, publish, <<"test">>, deny, Rules3)),
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo3, publish, <<"fake">>, deny, Rules4)),
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo4, publish, <<"test">>, deny, Rules3)),
?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo4, publish, <<"fake">>, deny, Rules4)),
ok.

View File

@ -37,46 +37,100 @@
-define(API_VERSION, "v5"). -define(API_VERSION, "v5").
-define(BASE_PATH, "api"). -define(BASE_PATH, "api").
-define(CONF_DEFAULT, <<"authorization: {rules: []}">>). % -define(RULE1, #{<<"principal">> => <<"all">>,
% <<"topics">> => [<<"#">>],
% <<"action">> => <<"all">>,
% <<"permission">> => <<"deny">>}
% ).
% -define(RULE2, #{<<"principal">> =>
% #{<<"ipaddress">> => <<"127.0.0.1">>},
% <<"topics">> =>
% [#{<<"eq">> => <<"#">>},
% #{<<"eq">> => <<"+">>}
% ] ,
% <<"action">> => <<"all">>,
% <<"permission">> => <<"allow">>}
% ).
% -define(RULE3,#{<<"principal">> =>
% #{<<"and">> => [#{<<"username">> => <<"^test?">>},
% #{<<"clientid">> => <<"^test?">>}
% ]},
% <<"topics">> => [<<"test">>],
% <<"action">> => <<"publish">>,
% <<"permission">> => <<"allow">>}
% ).
% -define(RULE4,#{<<"principal">> =>
% #{<<"or">> => [#{<<"username">> => <<"^test">>},
% #{<<"clientid">> => <<"test?">>}
% ]},
% <<"topics">> => [<<"%u">>,<<"%c">>],
% <<"action">> => <<"publish">>,
% <<"permission">> => <<"deny">>}
% ).
-define(RULE1, #{<<"principal">> => <<"all">>, -define(RULE1, #{<<"type">> => <<"http">>,
<<"topics">> => [<<"#">>], <<"config">> => #{
<<"action">> => <<"all">>, <<"url">> => <<"https://fake.com:443/">>,
<<"permission">> => <<"deny">>} <<"headers">> => #{},
). <<"method">> => <<"get">>,
-define(RULE2, #{<<"principal">> => <<"request_timeout">> => 5000}
#{<<"ipaddress">> => <<"127.0.0.1">>}, }).
<<"topics">> => -define(RULE2, #{<<"type">> => <<"mongo">>,
[#{<<"eq">> => <<"#">>}, <<"config">> => #{
#{<<"eq">> => <<"+">>} <<"mongo_type">> => <<"single">>,
] , <<"server">> => <<"127.0.0.1:27017">>,
<<"action">> => <<"all">>, <<"pool_size">> => 1,
<<"permission">> => <<"allow">>} <<"database">> => <<"mqtt">>,
). <<"ssl">> => #{<<"enable">> => false}},
-define(RULE3,#{<<"principal">> => <<"collection">> => <<"fake">>,
#{<<"and">> => [#{<<"username">> => <<"^test?">>}, <<"find">> => #{<<"a">> => <<"b">>}
#{<<"clientid">> => <<"^test?">>} }).
]}, -define(RULE3, #{<<"type">> => <<"mysql">>,
<<"topics">> => [<<"test">>], <<"config">> => #{
<<"action">> => <<"publish">>, <<"server">> => <<"127.0.0.1:27017">>,
<<"permission">> => <<"allow">>} <<"pool_size">> => 1,
). <<"database">> => <<"mqtt">>,
-define(RULE4,#{<<"principal">> => <<"username">> => <<"xx">>,
#{<<"or">> => [#{<<"username">> => <<"^test">>}, <<"password">> => <<"ee">>,
#{<<"clientid">> => <<"test?">>} <<"auto_reconnect">> => true,
]}, <<"ssl">> => #{<<"enable">> => false}},
<<"topics">> => [<<"%u">>,<<"%c">>], <<"sql">> => <<"abcb">>
<<"action">> => <<"publish">>, }).
<<"permission">> => <<"deny">>} -define(RULE4, #{<<"type">> => <<"pgsql">>,
). <<"config">> => #{
<<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1,
<<"database">> => <<"mqtt">>,
<<"username">> => <<"xx">>,
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false}},
<<"sql">> => <<"abcb">>
}).
-define(RULE5, #{<<"type">> => <<"redis">>,
<<"config">> => #{
<<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1,
<<"database">> => 0,
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false}},
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>
}).
all() -> all() ->
emqx_ct:all(?MODULE). % emqx_ct:all(?MODULE).
[].
groups() -> groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ekka_mnesia:start(), ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot), emqx_mgmt_auth:mnesia(boot),
@ -89,7 +143,8 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx_authz:update(replace, []),
emqx_ct_helpers:stop_apps([emqx_authz, emqx_management]), emqx_ct_helpers:stop_apps([emqx_resource, emqx_authz, emqx_management]),
meck:unload(emqx_resource),
ok. ok.
set_special_configs(emqx_management) -> set_special_configs(emqx_management) ->
@ -111,12 +166,7 @@ t_api(_) ->
?assertEqual([], get_rules(Result1)), ?assertEqual([], get_rules(Result1)),
lists:foreach(fun(_) -> lists:foreach(fun(_) ->
{ok, 204, _} = request(post, uri(["authorization"]), {ok, 204, _} = request(post, uri(["authorization"]), ?RULE1)
#{<<"action">> => <<"all">>,
<<"permission">> => <<"deny">>,
<<"principal">> => <<"all">>,
<<"topics">> => [<<"#">>]}
)
end, lists:seq(1, 20)), end, lists:seq(1, 20)),
{ok, 200, Result2} = request(get, uri(["authorization"]), []), {ok, 200, Result2} = request(get, uri(["authorization"]), []),
?assertEqual(20, length(get_rules(Result2))), ?assertEqual(20, length(get_rules(Result2))),
@ -128,30 +178,23 @@ t_api(_) ->
?assertEqual(10, length(get_rules(Result))) ?assertEqual(10, length(get_rules(Result)))
end, lists:seq(1, 2)), end, lists:seq(1, 2)),
{ok, 204, _} = request(put, uri(["authorization"]), {ok, 204, _} = request(put, uri(["authorization"]), [?RULE1, ?RULE2, ?RULE3, ?RULE4]),
[ #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}
, #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}
, #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}
]),
{ok, 200, Result3} = request(get, uri(["authorization"]), []), {ok, 200, Result3} = request(get, uri(["authorization"]), []),
Rules = get_rules(Result3), Rules = get_rules(Result3),
?assertEqual(3, length(Rules)), ?assertEqual(4, length(Rules)),
?assertMatch([ #{<<"type">> := <<"http">>}
lists:foreach(fun(#{<<"permission">> := Allow}) -> , #{<<"type">> := <<"mongo">>}
?assertEqual(<<"allow">>, Allow) , #{<<"type">> := <<"mysql">>}
end, Rules), , #{<<"type">> := <<"pgsql">>}
], Rules),
#{<<"annotations">> := #{<<"id">> := Id}} = lists:nth(2, Rules), #{<<"annotations">> := #{<<"id">> := Id}} = lists:nth(2, Rules),
{ok, 204, _} = request(put, uri(["authorization", binary_to_list(Id)]), {ok, 204, _} = request(put, uri(["authorization", binary_to_list(Id)]), ?RULE5),
#{<<"action">> => <<"all">>, <<"permission">> => <<"deny">>,
<<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}),
{ok, 200, Result4} = request(get, uri(["authorization", binary_to_list(Id)]), []), {ok, 200, Result4} = request(get, uri(["authorization", binary_to_list(Id)]), []),
?assertMatch(#{<<"annotations">> := #{<<"id">> := Id}, ?assertMatch(#{<<"type">> := <<"redis">>}, jsx:decode(Result4)),
<<"permission">> := <<"deny">>
}, jsx:decode(Result4)),
lists:foreach(fun(#{<<"annotations">> := #{<<"id">> := Id0}}) -> lists:foreach(fun(#{<<"annotations">> := #{<<"id">> := Id0}}) ->
{ok, 204, _} = request(delete, uri(["authorization", binary_to_list(Id0)]), []) {ok, 204, _} = request(delete, uri(["authorization", binary_to_list(Id0)]), [])
@ -161,11 +204,12 @@ t_api(_) ->
ok. ok.
t_move_rule(_) -> t_move_rule(_) ->
{ok, _} = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4]), {ok, _} = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4, ?RULE5]),
[#{annotations := #{id := Id1}}, [#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id3}}, #{annotations := #{id := Id3}},
#{annotations := #{id := Id4}} #{annotations := #{id := Id4}},
#{annotations := #{id := Id5}}
] = emqx_authz:lookup(), ] = emqx_authz:lookup(),
{ok, 204, _} = request(post, uri(["authorization", Id4, "move"]), {ok, 204, _} = request(post, uri(["authorization", Id4, "move"]),
@ -173,7 +217,8 @@ t_move_rule(_) ->
?assertMatch([#{annotations := #{id := Id4}}, ?assertMatch([#{annotations := #{id := Id4}},
#{annotations := #{id := Id1}}, #{annotations := #{id := Id1}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id3}} #{annotations := #{id := Id3}},
#{annotations := #{id := Id5}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),
{ok, 204, _} = request(post, uri(["authorization", Id1, "move"]), {ok, 204, _} = request(post, uri(["authorization", Id1, "move"]),
@ -181,6 +226,7 @@ t_move_rule(_) ->
?assertMatch([#{annotations := #{id := Id4}}, ?assertMatch([#{annotations := #{id := Id4}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id3}}, #{annotations := #{id := Id3}},
#{annotations := #{id := Id5}},
#{annotations := #{id := Id1}} #{annotations := #{id := Id1}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),
@ -189,6 +235,7 @@ t_move_rule(_) ->
?assertMatch([#{annotations := #{id := Id3}}, ?assertMatch([#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}}, #{annotations := #{id := Id4}},
#{annotations := #{id := Id2}}, #{annotations := #{id := Id2}},
#{annotations := #{id := Id5}},
#{annotations := #{id := Id1}} #{annotations := #{id := Id1}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),
@ -196,6 +243,7 @@ t_move_rule(_) ->
#{<<"position">> => #{<<"after">> => Id1}}), #{<<"position">> => #{<<"after">> => Id1}}),
?assertMatch([#{annotations := #{id := Id3}}, ?assertMatch([#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}}, #{annotations := #{id := Id4}},
#{annotations := #{id := Id5}},
#{annotations := #{id := Id1}}, #{annotations := #{id := Id1}},
#{annotations := #{id := Id2}} #{annotations := #{id := Id2}}
], emqx_authz:lookup()), ], emqx_authz:lookup()),

View File

@ -41,14 +41,13 @@ init_per_suite(Config) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{ <<"config">> => #{ Rules = [#{<<"type">> => <<"http">>,
<<"url">> => <<"https://fake.com:443/">>, <<"config">> => #{
<<"headers">> => #{}, <<"url">> => <<"https://fake.com:443/">>,
<<"method">> => <<"get">>, <<"headers">> => #{},
<<"request_timeout">> => 5000 <<"method">> => <<"get">>,
}, <<"request_timeout">> => 5000
<<"principal">> => <<"all">>, }}
<<"type">> => <<"http">>}
], ],
{ok, _} = emqx_authz:update(replace, Rules), {ok, _} = emqx_authz:update(replace, Rules),
Config. Config.

View File

@ -39,17 +39,16 @@ init_per_suite(Config) ->
ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_ct_helpers:start_apps([emqx_authz]),
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{ <<"config">> => #{ Rules = [#{<<"type">> => <<"mongo">>,
<<"mongo_type">> => <<"single">>, <<"config">> => #{
<<"server">> => <<"127.0.0.1:27017">>, <<"mongo_type">> => <<"single">>,
<<"pool_size">> => 1, <<"server">> => <<"127.0.0.1:27017">>,
<<"database">> => <<"mqtt">>, <<"pool_size">> => 1,
<<"ssl">> => #{<<"enable">> => false}}, <<"database">> => <<"mqtt">>,
<<"principal">> => <<"all">>, <<"ssl">> => #{<<"enable">> => false}},
<<"collection">> => <<"fake">>, <<"collection">> => <<"fake">>,
<<"find">> => #{<<"a">> => <<"b">>}, <<"find">> => #{<<"a">> => <<"b">>}
<<"type">> => <<"mongo">>} }],
],
{ok, _} = emqx_authz:update(replace, Rules), {ok, _} = emqx_authz:update(replace, Rules),
Config. Config.

View File

@ -40,18 +40,17 @@ init_per_suite(Config) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{ <<"config">> => #{ Rules = [#{<<"type">> => <<"mysql">>,
<<"server">> => <<"127.0.0.1:27017">>, <<"config">> => #{
<<"pool_size">> => 1, <<"server">> => <<"127.0.0.1:27017">>,
<<"database">> => <<"mqtt">>, <<"pool_size">> => 1,
<<"username">> => <<"xx">>, <<"database">> => <<"mqtt">>,
<<"password">> => <<"ee">>, <<"username">> => <<"xx">>,
<<"auto_reconnect">> => true, <<"password">> => <<"ee">>,
<<"ssl">> => #{<<"enable">> => false} <<"auto_reconnect">> => true,
}, <<"ssl">> => #{<<"enable">> => false}},
<<"principal">> => <<"all">>, <<"sql">> => <<"abcb">>
<<"sql">> => <<"abcb">>, }],
<<"type">> => <<"mysql">> }],
{ok, _} = emqx_authz:update(replace, Rules), {ok, _} = emqx_authz:update(replace, Rules),
Config. Config.
@ -60,17 +59,14 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]),
meck:unload(emqx_resource). meck:unload(emqx_resource).
-define(COLUMNS, [ <<"ipaddress">> -define(COLUMNS, [ <<"action">>
, <<"username">>
, <<"clientid">>
, <<"action">>
, <<"permission">> , <<"permission">>
, <<"topic">> , <<"topic">>
]). ]).
-define(RULE1, [[<<"127.0.0.1">>, <<>>, <<>>, <<"all">>, <<"deny">>, <<"#">>]]). -define(RULE1, [[<<"all">>, <<"deny">>, <<"#">>]]).
-define(RULE2, [[<<"127.0.0.1">>, <<>>, <<>>, <<"all">>, <<"allow">>, <<"eq #">>]]). -define(RULE2, [[<<"all">>, <<"allow">>, <<"eq #">>]]).
-define(RULE3, [[<<>>, <<"^test">>, <<"^test">> ,<<"subscribe">>, <<"allow">>, <<"test/%c">>]]). -define(RULE3, [[<<"subscribe">>, <<"allow">>, <<"test/%c">>]]).
-define(RULE4, [[<<>>, <<"^test">>, <<"^test">> ,<<"publish">>, <<"allow">>, <<"test/%u">>]]). -define(RULE4, [[<<"publish">>, <<"allow">>, <<"test/%u">>]]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases

View File

@ -40,17 +40,17 @@ init_per_suite(Config) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{ <<"config">> => #{ Rules = [#{<<"type">> => <<"pgsql">>,
<<"server">> => <<"127.0.0.1:27017">>, <<"config">> => #{
<<"pool_size">> => 1, <<"server">> => <<"127.0.0.1:27017">>,
<<"database">> => <<"mqtt">>, <<"pool_size">> => 1,
<<"username">> => <<"xx">>, <<"database">> => <<"mqtt">>,
<<"password">> => <<"ee">>, <<"username">> => <<"xx">>,
<<"auto_reconnect">> => true, <<"password">> => <<"ee">>,
<<"ssl">> => #{<<"enable">> => false} <<"auto_reconnect">> => true,
}, <<"ssl">> => #{<<"enable">> => false}},
<<"sql">> => <<"abcb">>, <<"sql">> => <<"abcb">>
<<"type">> => <<"pgsql">> }], }],
{ok, _} = emqx_authz:update(replace, Rules), {ok, _} = emqx_authz:update(replace, Rules),
Config. Config.
@ -59,17 +59,14 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]),
meck:unload(emqx_resource). meck:unload(emqx_resource).
-define(COLUMNS, [ {column, <<"ipaddress">>, meck, meck, meck, meck, meck, meck, meck} -define(COLUMNS, [ {column, <<"action">>, meck, meck, meck, meck, meck, meck, meck}
, {column, <<"username">>, meck, meck, meck, meck, meck, meck, meck}
, {column, <<"clientid">>, meck, meck, meck, meck, meck, meck, meck}
, {column, <<"action">>, meck, meck, meck, meck, meck, meck, meck}
, {column, <<"permission">>, meck, meck, meck, meck, meck, meck, meck} , {column, <<"permission">>, meck, meck, meck, meck, meck, meck, meck}
, {column, <<"topic">>, meck, meck, meck, meck, meck, meck, meck} , {column, <<"topic">>, meck, meck, meck, meck, meck, meck, meck}
]). ]).
-define(RULE1, [{<<"127.0.0.1">>, <<>>, <<>>, <<"all">>, <<"deny">>, <<"#">>}]). -define(RULE1, [{<<"all">>, <<"deny">>, <<"#">>}]).
-define(RULE2, [{<<"127.0.0.1">>, <<>>, <<>>, <<"all">>, <<"allow">>, <<"eq #">>}]). -define(RULE2, [{<<"all">>, <<"allow">>, <<"eq #">>}]).
-define(RULE3, [{<<>>, <<"^test">>, <<"^test">> ,<<"subscribe">>, <<"allow">>, <<"test/%c">>}]). -define(RULE3, [{<<"subscribe">>, <<"allow">>, <<"test/%c">>}]).
-define(RULE4, [{<<>>, <<"^test">>, <<"^test">> ,<<"publish">>, <<"allow">>, <<"test/%u">>}]). -define(RULE4, [{<<"publish">>, <<"allow">>, <<"test/%u">>}]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases

View File

@ -41,16 +41,16 @@ init_per_suite(Config) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{ <<"config">> => #{ Rules = [#{<<"type">> => <<"redis">>,
<<"server">> => <<"127.0.0.1:27017">>, <<"config">> => #{
<<"pool_size">> => 1, <<"server">> => <<"127.0.0.1:27017">>,
<<"database">> => 0, <<"pool_size">> => 1,
<<"password">> => <<"ee">>, <<"database">> => 0,
<<"auto_reconnect">> => true, <<"password">> => <<"ee">>,
<<"ssl">> => #{<<"enable">> => false} <<"auto_reconnect">> => true,
}, <<"ssl">> => #{<<"enable">> => false}},
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>, <<"cmd">> => <<"HGETALL mqtt_authz:%u">>
<<"type">> => <<"redis">> }], }],
{ok, _} = emqx_authz:update(replace, Rules), {ok, _} = emqx_authz:update(replace, Rules),
Config. Config.

View File

@ -19,8 +19,9 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-type server() :: string(). -type server() :: emqx_schema:ip_port().
-reflect_type([server/0]). -reflect_type([server/0]).
-typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([ on_start/2
@ -95,7 +96,7 @@ on_start(InstId, Config = #{server := Server,
mongo_type := single}) -> mongo_type := single}) ->
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
Opts = [{type, single}, Opts = [{type, single},
{hosts, [Server]} {hosts, [emqx_connector_schema_lib:ip_port_to_string(Server)]}
], ],
do_start(InstId, Opts, Config); do_start(InstId, Opts, Config);
@ -104,14 +105,17 @@ on_start(InstId, Config = #{servers := Servers,
replica_set_name := RsName}) -> replica_set_name := RsName}) ->
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
Opts = [{type, {rs, RsName}}, Opts = [{type, {rs, RsName}},
{hosts, Servers}], {hosts, [emqx_connector_schema_lib:ip_port_to_string(S)
|| S <- Servers]}
],
do_start(InstId, Opts, Config); do_start(InstId, Opts, Config);
on_start(InstId, Config = #{servers := Servers, on_start(InstId, Config = #{servers := Servers,
mongo_type := sharded}) -> mongo_type := sharded}) ->
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]),
Opts = [{type, sharded}, Opts = [{type, sharded},
{hosts, Servers} {hosts, [emqx_connector_schema_lib:ip_port_to_string(S)
|| S <- Servers]}
], ],
do_start(InstId, Opts, Config). do_start(InstId, Opts, Config).

View File

@ -19,10 +19,9 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-type server() :: tuple(). -type server() :: emqx_schema:ip_port().
-reflect_type([server/0]). -reflect_type([server/0]).
-typerefl_from_string({server/0, ?MODULE, to_server}). -typerefl_from_string({server/0, emqx_connector_schema_lib, to_ip_port}).
-export([to_server/1]).
-export([structs/0, fields/1]). -export([structs/0, fields/1]).
@ -170,9 +169,3 @@ redis_fields() ->
default => 0}} default => 0}}
, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} , {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ].
to_server(Server) ->
case string:tokens(Server, ":") of
[Host, Port] -> {ok, {Host, list_to_integer(Port)}};
_ -> {error, Server}
end.