fix(emqx_coap): return error code forbidden on ACL denied #4456
This commit is contained in:
parent
9e219ed68d
commit
83b0d444de
|
@ -151,8 +151,9 @@ To subscribe any topic, issue following command:
|
|||
- if clientid is absent, a "bad_request" will be returned.
|
||||
- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
|
||||
- {username} and {password} are optional.
|
||||
- if {username} and {password} are not correct, an uauthorized error will be returned.
|
||||
- if {username} or {password} is incorrect, the error code `uauthorized` will be returned.
|
||||
- topic is subscribed with qos1.
|
||||
- if the subscription failed due to ACL deny, the error code `forbidden` will be returned.
|
||||
|
||||
CoAP Client Unobserve Operation (unsubscribe topic)
|
||||
---------------------------------------------------
|
||||
|
@ -168,7 +169,7 @@ To cancel observation, issue following command:
|
|||
- if clientid is absent, a "bad_request" will be returned.
|
||||
- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
|
||||
- {username} and {password} are optional.
|
||||
- if {username} and {password} are not correct, an uauthorized error will be returned.
|
||||
- if {username} or {password} is incorrect, the error code `uauthorized` will be returned.
|
||||
|
||||
CoAP Client Notification Operation (subscribed Message)
|
||||
-------------------------------------------------------
|
||||
|
@ -179,7 +180,7 @@ Server will issue an observe-notification as a subscribed message.
|
|||
|
||||
CoAP Client Publish Operation
|
||||
-----------------------------
|
||||
Issue a coap put command to do publishment. For example:
|
||||
Issue a coap put command to publish messages. For example:
|
||||
|
||||
```
|
||||
PUT coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password}
|
||||
|
@ -191,10 +192,11 @@ Issue a coap put command to do publishment. For example:
|
|||
- if clientid is absent, a "bad_request" will be returned.
|
||||
- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
|
||||
- {username} and {password} are optional.
|
||||
- if {username} and {password} are not correct, an uauthorized error will be returned.
|
||||
- if {username} or {password} is incorrect, the error code `uauthorized` will be returned.
|
||||
- payload could be any binary data.
|
||||
- payload data type is "application/octet-stream".
|
||||
- publish message will be sent with qos0.
|
||||
- if the publishing failed due to ACL deny, the error code `forbidden` will be returned.
|
||||
|
||||
CoAP Client Keep Alive
|
||||
----------------------
|
||||
|
@ -209,7 +211,7 @@ Device should issue a get command periodically, serve as a ping to keep mqtt ses
|
|||
- {any_topicname} is optional, and should be percent-encoded to prevent special characters.
|
||||
- {clientid} is mandatory. If clientid is absent, a "bad_request" will be returned.
|
||||
- {username} and {password} are optional.
|
||||
- if {username} and {password} are not correct, an uauthorized error will be returned.
|
||||
- if {username} or {password} is incorrect, the error code `uauthorized` will be returned.
|
||||
- coap client should do keepalive work periodically to keep mqtt session online, especially those devices in a NAT network.
|
||||
|
||||
|
||||
|
|
|
@ -133,8 +133,8 @@ init({ClientId, Username, Password, Channel}) ->
|
|||
handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
|
||||
NewTopics = proplists:delete(Topic, TopicList),
|
||||
IsWild = emqx_topic:wildcard(Topic),
|
||||
chann_subscribe(Topic, State),
|
||||
{reply, ok, State#state{sub_topics = [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
|
||||
{reply, chann_subscribe(Topic, State), State#state{sub_topics =
|
||||
[{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
|
||||
|
||||
handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
|
||||
NewTopics = proplists:delete(Topic, TopicList),
|
||||
|
@ -142,8 +142,7 @@ handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = Top
|
|||
{reply, ok, State#state{sub_topics = NewTopics}, hibernate};
|
||||
|
||||
handle_call({publish, Topic, Payload}, _From, State) ->
|
||||
_ = chann_publish(Topic, Payload, State),
|
||||
{reply, ok, State};
|
||||
{reply, chann_publish(Topic, Payload, State), State};
|
||||
|
||||
handle_call(info, _From, State) ->
|
||||
{reply, info(State), State};
|
||||
|
@ -221,10 +220,12 @@ chann_subscribe(Topic, State = #state{clientid = ClientId}) ->
|
|||
case emqx_access_control:check_acl(clientinfo(State), subscribe, Topic) of
|
||||
allow ->
|
||||
emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS),
|
||||
emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]);
|
||||
emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]),
|
||||
ok;
|
||||
deny ->
|
||||
?LOG(warning, "subscribe to ~p by clientid ~p failed due to acl check.",
|
||||
[Topic, ClientId])
|
||||
[Topic, ClientId]),
|
||||
{error, forbidden}
|
||||
end.
|
||||
|
||||
chann_unsubscribe(Topic, State) ->
|
||||
|
@ -239,10 +240,12 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
|
|||
allow ->
|
||||
emqx_broker:publish(
|
||||
emqx_message:set_flag(retain, false,
|
||||
emqx_message:make(ClientId, ?QOS_0, Topic, Payload)));
|
||||
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
|
||||
ok;
|
||||
deny ->
|
||||
?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
|
||||
[Topic, ClientId])
|
||||
[Topic, ClientId]),
|
||||
{error, forbidden}
|
||||
end.
|
||||
|
||||
|
||||
|
|
|
@ -112,14 +112,16 @@ coap_observe(ChId, ?PS_PREFIX, TopicPath, Ack, Content) when TopicPath =/= [] ->
|
|||
Topic = topic(TopicPath),
|
||||
?LOG(debug, "observe Topic=~p, Ack=~p,Content=~p", [Topic, Ack, Content]),
|
||||
Pid = get(mqtt_client_pid),
|
||||
emqx_coap_mqtt_adapter:subscribe(Pid, Topic),
|
||||
Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of
|
||||
true ->
|
||||
nocontent;
|
||||
false->
|
||||
content
|
||||
end,
|
||||
{ok, {state, ChId, ?PS_PREFIX, [Topic]}, Code, Content};
|
||||
case emqx_coap_mqtt_adapter:subscribe(Pid, Topic) of
|
||||
ok ->
|
||||
Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of
|
||||
true -> nocontent;
|
||||
false-> content
|
||||
end,
|
||||
{ok, {state, ChId, ?PS_PREFIX, [Topic]}, Code, Content};
|
||||
{error, Code} ->
|
||||
{error, Code}
|
||||
end;
|
||||
|
||||
coap_observe(ChId, Prefix, TopicPath, Ack, _Content) ->
|
||||
?LOG(error, "unknown observe request ChId=~p, Prefix=~p, TopicPath=~p, Ack=~p", [ChId, Prefix, TopicPath, Ack]),
|
||||
|
@ -222,17 +224,19 @@ format_string_to_int(<<"application/json">>) ->
|
|||
|
||||
handle_received_publish(Topic, MaxAge, Format, Payload) ->
|
||||
case add_topic_info(publish, Topic, MaxAge, format_string_to_int(Format), Payload) of
|
||||
{Ret ,true} ->
|
||||
{Ret, true} ->
|
||||
Pid = get(mqtt_client_pid),
|
||||
emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload),
|
||||
Content = case Ret of
|
||||
changed ->
|
||||
#coap_content{};
|
||||
created ->
|
||||
LocPath = concatenate_location_path([<<"ps">>, Topic, <<>>]),
|
||||
#coap_content{location_path = [LocPath]}
|
||||
end,
|
||||
{ok, Ret, Content};
|
||||
case emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload) of
|
||||
ok ->
|
||||
{ok, Ret, case Ret of
|
||||
changed -> #coap_content{};
|
||||
created ->
|
||||
#coap_content{location_path = [
|
||||
concatenate_location_path([<<"ps">>, Topic, <<>>])]}
|
||||
end};
|
||||
{error, Code} ->
|
||||
{error, Code}
|
||||
end;
|
||||
{_, false} ->
|
||||
?LOG(debug, "add_topic_info failed, will return bad_request", []),
|
||||
{error, bad_request}
|
||||
|
|
|
@ -56,7 +56,7 @@ coap_get(ChId, ?MQTT_PREFIX, Path, Query, _Content) ->
|
|||
#coap_content{};
|
||||
{error, auth_failure} ->
|
||||
put(mqtt_client_pid, undefined),
|
||||
{error, unauthorized};
|
||||
{error, forbidden};
|
||||
{error, bad_request} ->
|
||||
put(mqtt_client_pid, undefined),
|
||||
{error, bad_request};
|
||||
|
@ -74,8 +74,7 @@ coap_post(_ChId, _Prefix, _Topic, _Content) ->
|
|||
coap_put(_ChId, ?MQTT_PREFIX, Topic, #coap_content{payload = Payload}) when Topic =/= [] ->
|
||||
?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]),
|
||||
Pid = get(mqtt_client_pid),
|
||||
emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload),
|
||||
ok;
|
||||
emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload);
|
||||
coap_put(_ChId, Prefix, Topic, Content) ->
|
||||
?LOG(error, "put has error, Prefix=~p, Topic=~p, Content=~p", [Prefix, Topic, Content]),
|
||||
{error, bad_request}.
|
||||
|
@ -87,8 +86,10 @@ coap_observe(ChId, ?MQTT_PREFIX, Topic, Ack, Content) when Topic =/= [] ->
|
|||
TrueTopic = topic(Topic),
|
||||
?LOG(debug, "observe Topic=~p, Ack=~p", [TrueTopic, Ack]),
|
||||
Pid = get(mqtt_client_pid),
|
||||
emqx_coap_mqtt_adapter:subscribe(Pid, TrueTopic),
|
||||
{ok, {state, ChId, ?MQTT_PREFIX, [TrueTopic]}, content, Content};
|
||||
case emqx_coap_mqtt_adapter:subscribe(Pid, TrueTopic) of
|
||||
ok -> {ok, {state, ChId, ?MQTT_PREFIX, [TrueTopic]}, content, Content};
|
||||
{error, Code} -> {error, Code}
|
||||
end;
|
||||
coap_observe(ChId, Prefix, Topic, Ack, _Content) ->
|
||||
?LOG(error, "unknown observe request ChId=~p, Prefix=~p, Topic=~p, Ack=~p", [ChId, Prefix, Topic, Ack]),
|
||||
{error, bad_request}.
|
||||
|
|
|
@ -68,6 +68,25 @@ t_publish(_Config) ->
|
|||
?assert(false)
|
||||
end.
|
||||
|
||||
t_publish_acl_deny(_Config) ->
|
||||
Topic = <<"abc">>, Payload = <<"123">>,
|
||||
TopicStr = binary_to_list(Topic),
|
||||
URI = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
||||
|
||||
%% Sub topic first
|
||||
emqx:subscribe(Topic),
|
||||
|
||||
ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history]),
|
||||
ok = meck:expect(emqx_access_control, check_acl, 3, deny),
|
||||
Reply = er_coap_client:request(put, URI, #coap_content{format = <<"application/octet-stream">>, payload = Payload}),
|
||||
?assertEqual({error,forbidden}, Reply),
|
||||
ok = meck:unload(emqx_access_control),
|
||||
receive
|
||||
{deliver, Topic, Msg} -> ct:fail({unexpected, {Topic, Msg}})
|
||||
after
|
||||
500 -> ok
|
||||
end.
|
||||
|
||||
t_observe(_Config) ->
|
||||
Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
|
||||
Payload = <<"123">>,
|
||||
|
@ -91,6 +110,15 @@ t_observe(_Config) ->
|
|||
|
||||
[] = emqx:subscribers(Topic).
|
||||
|
||||
t_observe_acl_deny(_Config) ->
|
||||
Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
|
||||
Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
||||
ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history]),
|
||||
ok = meck:expect(emqx_access_control, check_acl, 3, deny),
|
||||
?assertEqual({error,forbidden}, er_coap_observer:observe(Uri)),
|
||||
[] = emqx:subscribers(Topic),
|
||||
ok = meck:unload(emqx_access_control).
|
||||
|
||||
t_observe_wildcard(_Config) ->
|
||||
Topic = <<"+/b">>, TopicStr = http_uri:encode(binary_to_list(Topic)),
|
||||
Payload = <<"123">>,
|
||||
|
|
Loading…
Reference in New Issue