fix(gw): fix bad function calling
This commit is contained in:
parent
dbd78b83b1
commit
4125209d1c
|
@ -142,7 +142,7 @@ fields(listener_settings) ->
|
||||||
, {proxy_protocol, t(boolean())}
|
, {proxy_protocol, t(boolean())}
|
||||||
, {proxy_protocol_timeout, t(duration())}
|
, {proxy_protocol_timeout, t(duration())}
|
||||||
, {backlog, t(integer(), undefined, 1024)}
|
, {backlog, t(integer(), undefined, 1024)}
|
||||||
, {send_timeout, t(duration(), undefined, "15s")}
|
, {send_timeout, t(duration(), undefined, "15s")} %% FIXME: mapping it
|
||||||
, {send_timeout_close, t(boolean(), undefined, true)}
|
, {send_timeout_close, t(boolean(), undefined, true)}
|
||||||
, {recbuf, t(bytesize())}
|
, {recbuf, t(bytesize())}
|
||||||
, {sndbuf, t(bytesize())}
|
, {sndbuf, t(bytesize())}
|
||||||
|
|
|
@ -274,7 +274,9 @@ handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = conne
|
||||||
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
|
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
|
||||||
handle_call({auth, ClientInfo0, Password},
|
handle_call({auth, ClientInfo0, Password},
|
||||||
Channel = #channel{conninfo = ConnInfo,
|
Channel = #channel{
|
||||||
|
ctx = Ctx,
|
||||||
|
conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
|
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
|
||||||
ConnInfo1 = enrich_conninfo(ClientInfo1, ConnInfo),
|
ConnInfo1 = enrich_conninfo(ClientInfo1, ConnInfo),
|
||||||
|
@ -284,14 +286,23 @@ handle_call({auth, ClientInfo0, Password},
|
||||||
|
|
||||||
#{clientid := ClientId, username := Username} = ClientInfo1,
|
#{clientid := ClientId, username := Username} = ClientInfo1,
|
||||||
|
|
||||||
case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of
|
case emqx_gateway_ctx:authenticate(
|
||||||
ok ->
|
Ctx, ClientInfo1#{password => Password}) of
|
||||||
|
{ok, NClientInfo} ->
|
||||||
|
SessFun = fun(_, _) -> #{} end,
|
||||||
emqx_logger:set_metadata_clientid(ClientId),
|
emqx_logger:set_metadata_clientid(ClientId),
|
||||||
case emqx_cm:open_session(true, ClientInfo1, ConnInfo1) of
|
case emqx_gateway_ctx:open_session(
|
||||||
|
Ctx,
|
||||||
|
true,
|
||||||
|
NClientInfo,
|
||||||
|
ConnInfo1,
|
||||||
|
SessFun
|
||||||
|
) of
|
||||||
{ok, _Session} ->
|
{ok, _Session} ->
|
||||||
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
|
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
|
||||||
[ClientId, Username]),
|
[ClientId, Username]),
|
||||||
{reply, ok, [{event, connected}], ensure_connected(Channel1)};
|
{reply, ok, [{event, connected}],
|
||||||
|
ensure_connected(Channel1#channel{clientinfo = NClientInfo})};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(warning, "Client ~s (Username: '~s') open session failed for ~0p",
|
?LOG(warning, "Client ~s (Username: '~s') open session failed for ~0p",
|
||||||
[ClientId, Username, Reason]),
|
[ClientId, Username, Reason]),
|
||||||
|
@ -315,10 +326,10 @@ handle_call({start_timer, keepalive, Interval},
|
||||||
|
|
||||||
handle_call({subscribe, TopicFilter, Qos},
|
handle_call({subscribe, TopicFilter, Qos},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
ctx = Ctx,
|
||||||
conn_state = connected,
|
conn_state = connected,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
case is_acl_enabled(ClientInfo) andalso
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of
|
||||||
emqx_access_control:authorize(ClientInfo, subscribe, TopicFilter) of
|
|
||||||
deny ->
|
deny ->
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -333,12 +344,12 @@ handle_call({unsubscribe, TopicFilter},
|
||||||
|
|
||||||
handle_call({publish, Topic, Qos, Payload},
|
handle_call({publish, Topic, Qos, Payload},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
ctx = Ctx,
|
||||||
conn_state = connected,
|
conn_state = connected,
|
||||||
clientinfo = ClientInfo
|
clientinfo = ClientInfo
|
||||||
= #{clientid := From,
|
= #{clientid := From,
|
||||||
mountpoint := Mountpoint}}) ->
|
mountpoint := Mountpoint}}) ->
|
||||||
case is_acl_enabled(ClientInfo) andalso
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
||||||
emqx_access_control:authorize(ClientInfo, publish, Topic) of
|
|
||||||
deny ->
|
deny ->
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -464,9 +475,6 @@ do_unsubscribe(TopicFilter, UnSubOpts, Channel =
|
||||||
parse_topic_filters(TopicFilters) ->
|
parse_topic_filters(TopicFilters) ->
|
||||||
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
||||||
|
|
||||||
is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) ->
|
|
||||||
(not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [authorization, enable]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Ensure & Hooks
|
%% Ensure & Hooks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -125,7 +125,7 @@ call(ConnStr, Req) ->
|
||||||
Pid when is_pid(Pid) ->
|
Pid when is_pid(Pid) ->
|
||||||
case erlang:is_process_alive(Pid) of
|
case erlang:is_process_alive(Pid) of
|
||||||
true ->
|
true ->
|
||||||
emqx_exproto_conn:call(Pid, Req);
|
emqx_gateway_conn:call(Pid, Req);
|
||||||
false ->
|
false ->
|
||||||
{error, ?RESP_CONN_PROCESS_NOT_ALIVE,
|
{error, ?RESP_CONN_PROCESS_NOT_ALIVE,
|
||||||
<<"Connection process is not alive">>}
|
<<"Connection process is not alive">>}
|
||||||
|
|
|
@ -728,8 +728,8 @@ convert_topic_id_to_name({{id, TopicId}, Flags, Data},
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_pub_acl({TopicName, _Flags, _Data},
|
check_pub_acl({TopicName, _Flags, _Data},
|
||||||
#channel{clientinfo = ClientInfo}) ->
|
#channel{ctx = Ctx, clientinfo = ClientInfo}) ->
|
||||||
case emqx_access_control:authorize(ClientInfo, publish, TopicName) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, TopicName) of
|
||||||
allow -> ok;
|
allow -> ok;
|
||||||
deny -> {error, ?SN_RC_NOT_AUTHORIZE}
|
deny -> {error, ?SN_RC_NOT_AUTHORIZE}
|
||||||
end.
|
end.
|
||||||
|
@ -833,9 +833,8 @@ preproc_subs_type(?SN_SUBSCRIBE_MSG_TYPE(_Reserved, _TopicId, _QoS),
|
||||||
{error, ?SN_RC_NOT_SUPPORTED}.
|
{error, ?SN_RC_NOT_SUPPORTED}.
|
||||||
|
|
||||||
check_subscribe_acl({_TopicId, TopicName, _QoS},
|
check_subscribe_acl({_TopicId, TopicName, _QoS},
|
||||||
Channel = #channel{clientinfo = ClientInfo}) ->
|
Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
|
||||||
case emqx_access_control:authorize(
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicName) of
|
||||||
ClientInfo, subscribe, TopicName) of
|
|
||||||
allow ->
|
allow ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
|
|
Loading…
Reference in New Issue