From c89dad559e4c6ff7d4363937c3a6d290b0e02412 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Thu, 17 Dec 2020 10:36:51 +0900 Subject: [PATCH 1/3] feat(coap): use emqx_access_control:check_acl before pub/sub --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index da0dade5b..b77f44244 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -206,8 +206,13 @@ code_change(_OldVsn, State, _Extra) -> chann_subscribe(Topic, State = #state{clientid = ClientId}) -> ?LOG(debug, "subscribe Topic=~p", [Topic]), - emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS), - emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]). + case emqx_access_control:check_acl(clientinfo(State), subscribe, Topic) of + allow -> + emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS), + emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]); + deny -> + ?LOG(warning, "subscribe to ~p by clientid ~p failed due to acl check.", [Topic, ClientId]) + end. chann_unsubscribe(Topic, State) -> ?LOG(debug, "unsubscribe Topic=~p", [Topic]), @@ -215,11 +220,17 @@ chann_unsubscribe(Topic, State) -> emqx_broker:unsubscribe(Topic), emqx_hooks:run('session.unsubscribed', [clientinfo(State), Topic, Opts]). -chann_publish(Topic, Payload, #state{clientid = ClientId}) -> +chann_publish(Topic, Payload, State = #state{clientid = ClientId}) -> ?LOG(debug, "publish Topic=~p, Payload=~p", [Topic, Payload]), - emqx_broker:publish( - emqx_message:set_flag(retain, false, - emqx_message:make(ClientId, ?QOS_0, Topic, Payload))). + case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of + allow -> + emqx_broker:publish( + emqx_message:set_flag(retain, false, + emqx_message:make(ClientId, ?QOS_0, Topic, Payload))); + deny -> + ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.", [Topic, ClientId]) + end. + %%-------------------------------------------------------------------- %% Deliver From f06ec6baaa78a4d0cadba30f590a9aec5c8bd5ea Mon Sep 17 00:00:00 2001 From: z8674558 Date: Thu, 17 Dec 2020 10:55:57 +0900 Subject: [PATCH 2/3] chore(lint): fix elvis --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index b77f44244..fc4e5c60c 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -50,6 +50,8 @@ -record(state, {peername, clientid, username, password, sub_topics = [], connected_at}). +-type(state() :: #state{}). + -define(ALIVE_INTERVAL, 20000). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -74,8 +76,10 @@ client_pid(ClientId, Username, Password, Channel) -> start(ClientId, Username, Password, Channel) -> % DO NOT use start_link, since multiple coap_reponsder may have relation with one mqtt adapter, % one coap_responder crashes should not make mqtt adapter crash too - % And coap_responder is not a system process, it is dangerous to link mqtt adapter to coap_responder - gen_server:start({via, emqx_coap_registry, {ClientId, Username, Password}}, ?MODULE, {ClientId, Username, Password, Channel}, []). + % And coap_responder is not a system process + % it is dangerous to link mqtt adapter to coap_responder + gen_server:start({via, emqx_coap_registry, {ClientId, Username, Password}}, + ?MODULE, {ClientId, Username, Password, Channel}, []). stop(Pid) -> gen_server:stop(Pid). @@ -164,7 +168,8 @@ handle_cast(Msg, State) -> ?LOG(error, "broker_api unexpected cast ~p", [Msg]), {noreply, State, hibernate}. -handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}}, State = #state{sub_topics = Subscribers}) -> +handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}}, + State = #state{sub_topics = Subscribers}) -> deliver([{Topic, Payload}], Subscribers), {noreply, State, hibernate}; @@ -211,7 +216,8 @@ chann_subscribe(Topic, State = #state{clientid = ClientId}) -> emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS), emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]); deny -> - ?LOG(warning, "subscribe to ~p by clientid ~p failed due to acl check.", [Topic, ClientId]) + ?LOG(warning, "subscribe to ~p by clientid ~p failed due to acl check.", + [Topic, ClientId]) end. chann_unsubscribe(Topic, State) -> @@ -228,7 +234,8 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) -> emqx_message:set_flag(retain, false, emqx_message:make(ClientId, ?QOS_0, Topic, Payload))); deny -> - ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.", [Topic, ClientId]) + ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.", + [Topic, ClientId]) end. @@ -253,7 +260,8 @@ deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) -> true -> emqx_topic:match(TopicName, TopicFilter); false -> TopicName =:= TopicFilter end, - %?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p", [Matched, CoapPid, TopicName, Payload, T]), + %?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p", + % [Matched, CoapPid, TopicName, Payload, T]), Matched andalso (CoapPid ! {dispatch, TopicName, Payload}), deliver_to_coap(TopicName, Payload, T). @@ -278,7 +286,7 @@ info(State) -> sockinfo(#state{peername = Peername}) -> #{socktype => udp, peername => Peername, - sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock? + sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock? sockstate => running, active_n => 1 }. @@ -296,7 +304,7 @@ conninfo(#state{peername = Peername, clientid = ClientId, connected_at = ConnectedAt}) -> #{socktype => udp, - sockname => {{127,0,0,1}, 5683}, + sockname => {{127, 0, 0, 1}, 5683}, peername => Peername, peercert => nossl, %% TODO: dtls conn_mod => ?MODULE, @@ -328,7 +336,7 @@ session_info(#state{sub_topics = SubTopics, connected_at = ConnectedAt}) -> %% The stats keys copied from emqx_connection:stats/1 stats(#state{sub_topics = SubTopics}) -> - SockStats = [{recv_oct,0}, {recv_cnt,0}, {send_oct,0}, {send_cnt,0}, {send_pend,0}], + SockStats = [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0}, {send_pend, 0}], ConnStats = emqx_pd:get_counters(?CONN_STATS), ChanStats = [{subscriptions_cnt, length(SubTopics)}, {subscriptions_max, length(SubTopics)}, From a564670863243fc393df5b7d1222fc8916e4f052 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Thu, 17 Dec 2020 15:57:38 +0900 Subject: [PATCH 3/3] fix(coap): fix the issue the connection with same clientid is not discarded --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index fc4e5c60c..6ce30a9e6 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -111,12 +111,12 @@ init({ClientId, Username, Password, Channel}) -> _ = run_hooks('client.connect', [conninfo(State0)], undefined), case emqx_access_control:authenticate(clientinfo(State0)) of {ok, _AuthResult} -> + ok = emqx_cm:discard_session(ClientId), + _ = run_hooks('client.connack', [conninfo(State0), success], undefined), State = State0#state{connected_at = erlang:system_time(millisecond)}, - %% TODO: Evict same clientid on other node?? - run_hooks('client.connected', [clientinfo(State), conninfo(State)]), erlang:send_after(?ALIVE_INTERVAL, self(), check_alive), @@ -186,6 +186,11 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]), {stop, {shutdown, conflict}, State}; +handle_info(discard, State) -> + ?LOG(warning, "the connection is discarded. " ++ + "possibly there is another client with the same clientid", []), + {stop, {shutdown, discarded}, State}; + handle_info(kick, State) -> ?LOG(info, "Kicked", []), {stop, {shutdown, kick}, State};