From 4125209d1c19e9f70c0e359f7dcaf35cfa65598f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 22 Jul 2021 22:04:15 +0800 Subject: [PATCH] fix(gw): fix bad function calling --- apps/emqx_gateway/src/emqx_gateway_schema.erl | 2 +- .../src/exproto/emqx_exproto_channel.erl | 34 ++++++++++++------- .../src/exproto/emqx_exproto_gsvr.erl | 2 +- .../src/mqttsn/emqx_sn_channel.erl | 9 +++-- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 2b5269758..f4957cb68 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -142,7 +142,7 @@ fields(listener_settings) -> , {proxy_protocol, t(boolean())} , {proxy_protocol_timeout, t(duration())} , {backlog, t(integer(), undefined, 1024)} - , {send_timeout, t(duration(), undefined, "15s")} + , {send_timeout, t(duration(), undefined, "15s")} %% FIXME: mapping it , {send_timeout_close, t(boolean(), undefined, true)} , {recbuf, t(bytesize())} , {sndbuf, t(bytesize())} diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index ff1289fa5..bfd883c59 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -274,8 +274,10 @@ handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = conne ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; handle_call({auth, ClientInfo0, Password}, - Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), ConnInfo1 = enrich_conninfo(ClientInfo1, ConnInfo), @@ -284,14 +286,23 @@ handle_call({auth, ClientInfo0, Password}, #{clientid := ClientId, username := Username} = ClientInfo1, - case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of - ok -> + case emqx_gateway_ctx:authenticate( + Ctx, ClientInfo1#{password => Password}) of + {ok, NClientInfo} -> + SessFun = fun(_, _) -> #{} end, emqx_logger:set_metadata_clientid(ClientId), - case emqx_cm:open_session(true, ClientInfo1, ConnInfo1) of + case emqx_gateway_ctx:open_session( + Ctx, + true, + NClientInfo, + ConnInfo1, + SessFun + ) of {ok, _Session} -> ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!", [ClientId, Username]), - {reply, ok, [{event, connected}], ensure_connected(Channel1)}; + {reply, ok, [{event, connected}], + ensure_connected(Channel1#channel{clientinfo = NClientInfo})}; {error, Reason} -> ?LOG(warning, "Client ~s (Username: '~s') open session failed for ~0p", [ClientId, Username, Reason]), @@ -315,10 +326,10 @@ handle_call({start_timer, keepalive, Interval}, handle_call({subscribe, TopicFilter, Qos}, Channel = #channel{ + ctx = Ctx, conn_state = connected, clientinfo = ClientInfo}) -> - case is_acl_enabled(ClientInfo) andalso - emqx_access_control:authorize(ClientInfo, subscribe, TopicFilter) of + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of deny -> {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; _ -> @@ -333,12 +344,12 @@ handle_call({unsubscribe, TopicFilter}, handle_call({publish, Topic, Qos, Payload}, Channel = #channel{ + ctx = Ctx, conn_state = connected, clientinfo = ClientInfo = #{clientid := From, mountpoint := Mountpoint}}) -> - case is_acl_enabled(ClientInfo) andalso - emqx_access_control:authorize(ClientInfo, publish, Topic) of + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of deny -> {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; _ -> @@ -464,9 +475,6 @@ do_unsubscribe(TopicFilter, UnSubOpts, Channel = parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). -is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> - (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [authorization, enable]). - %%-------------------------------------------------------------------- %% Ensure & Hooks %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl index 3b3d54f71..f09a9df7b 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl @@ -125,7 +125,7 @@ call(ConnStr, Req) -> Pid when is_pid(Pid) -> case erlang:is_process_alive(Pid) of true -> - emqx_exproto_conn:call(Pid, Req); + emqx_gateway_conn:call(Pid, Req); false -> {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>} diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index af4143436..9de86f101 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -728,8 +728,8 @@ convert_topic_id_to_name({{id, TopicId}, Flags, Data}, end. check_pub_acl({TopicName, _Flags, _Data}, - #channel{clientinfo = ClientInfo}) -> - case emqx_access_control:authorize(ClientInfo, publish, TopicName) of + #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, TopicName) of allow -> ok; deny -> {error, ?SN_RC_NOT_AUTHORIZE} end. @@ -833,9 +833,8 @@ preproc_subs_type(?SN_SUBSCRIBE_MSG_TYPE(_Reserved, _TopicId, _QoS), {error, ?SN_RC_NOT_SUPPORTED}. check_subscribe_acl({_TopicId, TopicName, _QoS}, - Channel = #channel{clientinfo = ClientInfo}) -> - case emqx_access_control:authorize( - ClientInfo, subscribe, TopicName) of + Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicName) of allow -> {ok, Channel}; _ ->