chore: fill message headers
This commit is contained in:
parent
7d06e48b4b
commit
8dfc8ed96b
|
@ -244,15 +244,26 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
|
||||||
case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of
|
case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of
|
||||||
allow ->
|
allow ->
|
||||||
_ = emqx_broker:publish(
|
_ = emqx_broker:publish(
|
||||||
emqx_message:set_flag(retain, false,
|
packet_to_message(Topic, Payload, State)), ok;
|
||||||
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
|
|
||||||
ok;
|
|
||||||
deny ->
|
deny ->
|
||||||
?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
|
?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
|
||||||
[Topic, ClientId]),
|
[Topic, ClientId]),
|
||||||
{error, forbidden}
|
{error, forbidden}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
packet_to_message(Topic, Payload,
|
||||||
|
#state{clientid = ClientId,
|
||||||
|
username = Username,
|
||||||
|
peername = {PeerHost, _}}) ->
|
||||||
|
Message = emqx_message:set_flag(
|
||||||
|
retain, false,
|
||||||
|
emqx_message:make(ClientId, ?QOS_0, Topic, Payload)
|
||||||
|
),
|
||||||
|
emqx_message:set_headers(
|
||||||
|
#{ proto_ver => 1
|
||||||
|
, protocol => coap
|
||||||
|
, username => Username
|
||||||
|
, peerhost => PeerHost}, Message).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Deliver
|
%% Deliver
|
||||||
|
@ -384,4 +395,3 @@ clientinfo(#state{peername = {PeerHost, _},
|
||||||
mountpoint => undefined,
|
mountpoint => undefined,
|
||||||
ws_cookie => undefined
|
ws_cookie => undefined
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -340,17 +340,14 @@ handle_call({unsubscribe, TopicFilter},
|
||||||
handle_call({publish, Topic, Qos, Payload},
|
handle_call({publish, Topic, Qos, Payload},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
conn_state = connected,
|
conn_state = connected,
|
||||||
clientinfo = ClientInfo
|
clientinfo = ClientInfo}) ->
|
||||||
= #{clientid := From,
|
|
||||||
mountpoint := Mountpoint}}) ->
|
|
||||||
case is_acl_enabled(ClientInfo) andalso
|
case is_acl_enabled(ClientInfo) andalso
|
||||||
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
||||||
deny ->
|
deny ->
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
Msg = emqx_message:make(From, Qos, Topic, Payload),
|
Msg = packet_to_message(Topic, Qos, Payload, Channel),
|
||||||
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
|
_ = emqx:publish(Msg),
|
||||||
_ = emqx:publish(NMsg),
|
|
||||||
{reply, ok, Channel}
|
{reply, ok, Channel}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -419,6 +416,24 @@ is_anonymous(_AuthResult) -> false.
|
||||||
clean_anonymous_clients() ->
|
clean_anonymous_clients() ->
|
||||||
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
|
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
|
||||||
|
|
||||||
|
packet_to_message(Topic, Qos, Payload,
|
||||||
|
#channel{
|
||||||
|
conninfo = #{proto_ver := ProtoVer},
|
||||||
|
clientinfo = #{
|
||||||
|
protocol := Protocol,
|
||||||
|
clientid := ClientId,
|
||||||
|
username := Username,
|
||||||
|
peerhost := PeerHost,
|
||||||
|
mountpoint := Mountpoint}}) ->
|
||||||
|
Msg = emqx_message:make(
|
||||||
|
ClientId, Qos,
|
||||||
|
Topic, Payload, #{},
|
||||||
|
#{proto_ver => ProtoVer,
|
||||||
|
protocol => Protocol,
|
||||||
|
username => Username,
|
||||||
|
peerhost => PeerHost}),
|
||||||
|
emqx_mountpoint:mount(Mountpoint, Msg).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Sub/UnSub
|
%% Sub/UnSub
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -591,6 +606,8 @@ default_conninfo(ConnInfo) ->
|
||||||
ConnInfo#{clean_start => true,
|
ConnInfo#{clean_start => true,
|
||||||
clientid => undefined,
|
clientid => undefined,
|
||||||
username => undefined,
|
username => undefined,
|
||||||
|
proto_name => undefined,
|
||||||
|
proto_ver => undefined,
|
||||||
conn_props => #{},
|
conn_props => #{},
|
||||||
connected => true,
|
connected => true,
|
||||||
connected_at => erlang:system_time(millisecond),
|
connected_at => erlang:system_time(millisecond),
|
||||||
|
|
|
@ -235,8 +235,20 @@ unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) ->
|
||||||
emqx_broker:unsubscribe(Topic),
|
emqx_broker:unsubscribe(Topic),
|
||||||
emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]).
|
emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]).
|
||||||
|
|
||||||
publish(Topic, Payload, Qos, EndpointName) ->
|
publish(Topic, Payload, Qos,
|
||||||
emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))).
|
#lwm2m_state{
|
||||||
|
version = ProtoVer,
|
||||||
|
peername = {PeerHost, _},
|
||||||
|
endpoint_name = EndpointName}) ->
|
||||||
|
Message = emqx_message:set_flag(
|
||||||
|
retain, false,
|
||||||
|
emqx_message:make(EndpointName, Qos, Topic, Payload)
|
||||||
|
),
|
||||||
|
NMessage = emqx_message:set_headers(
|
||||||
|
#{proto_ver => ProtoVer,
|
||||||
|
protocol => lwm2m,
|
||||||
|
peerhost => PeerHost}, Message),
|
||||||
|
emqx_broker:publish(NMessage).
|
||||||
|
|
||||||
time_now() -> erlang:system_time(millisecond).
|
time_now() -> erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
@ -281,7 +293,7 @@ do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpo
|
||||||
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
|
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
|
||||||
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
|
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
|
||||||
Topic = uplink_topic(EventType, Lwm2mState),
|
Topic = uplink_topic(EventType, Lwm2mState),
|
||||||
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
|
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Auto Observe
|
%% Auto Observe
|
||||||
|
|
|
@ -588,15 +588,29 @@ next_ackid() ->
|
||||||
put(ackid, AckId + 1),
|
put(ackid, AckId + 1),
|
||||||
AckId.
|
AckId.
|
||||||
|
|
||||||
make_mqtt_message(Topic, Headers, Body) ->
|
make_mqtt_message(Topic, Headers, Body,
|
||||||
Msg = emqx_message:make(stomp, Topic, Body),
|
#pstate{
|
||||||
Headers1 = lists:foldl(fun(Key, Headers0) ->
|
conninfo = #{proto_ver := ProtoVer},
|
||||||
proplists:delete(Key, Headers0)
|
clientinfo = #{
|
||||||
end, Headers, [<<"destination">>,
|
protocol := Protocol,
|
||||||
<<"content-length">>,
|
clientid := ClientId,
|
||||||
<<"content-type">>,
|
username := Username,
|
||||||
<<"transaction">>,
|
peerhost := PeerHost}}) ->
|
||||||
<<"receipt">>]),
|
Msg = emqx_message:make(
|
||||||
|
ClientId, ?QOS_0,
|
||||||
|
Topic, Body, #{},
|
||||||
|
#{proto_ver => ProtoVer,
|
||||||
|
protocol => Protocol,
|
||||||
|
username => Username,
|
||||||
|
peerhost => PeerHost}),
|
||||||
|
Headers1 = lists:foldl(
|
||||||
|
fun(Key, Headers0) ->
|
||||||
|
proplists:delete(Key, Headers0)
|
||||||
|
end, Headers, [<<"destination">>,
|
||||||
|
<<"content-length">>,
|
||||||
|
<<"content-type">>,
|
||||||
|
<<"transaction">>,
|
||||||
|
<<"receipt">>]),
|
||||||
emqx_message:set_headers(#{stomp_headers => Headers1}, Msg).
|
emqx_message:set_headers(#{stomp_headers => Headers1}, Msg).
|
||||||
|
|
||||||
receipt_id(Headers) ->
|
receipt_id(Headers) ->
|
||||||
|
@ -611,7 +625,7 @@ handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, bod
|
||||||
allow ->
|
allow ->
|
||||||
_ = maybe_send_receipt(receipt_id(Headers), State),
|
_ = maybe_send_receipt(receipt_id(Headers), State),
|
||||||
_ = emqx_broker:publish(
|
_ = emqx_broker:publish(
|
||||||
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
make_mqtt_message(Topic, Headers, iolist_to_binary(Body), State)
|
||||||
),
|
),
|
||||||
State;
|
State;
|
||||||
deny ->
|
deny ->
|
||||||
|
|
Loading…
Reference in New Issue