Line data Source code
1 : /*
2 : *
3 : * Copyright (c) 2020-2021 Project CHIP Authors
4 : * Copyright (c) 2013-2017 Nest Labs, Inc.
5 : * All rights reserved.
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : /**
21 : * @file
22 : * This file implements the CHIP Transport object that maintains TCP connections
23 : * to peers. Handles both establishing new connections and accepting peer connection
24 : * requests.
25 : */
26 : #include <transport/raw/TCP.h>
27 :
28 : #include <lib/core/CHIPEncoding.h>
29 : #include <lib/support/CodeUtils.h>
30 : #include <lib/support/logging/CHIPLogging.h>
31 : #include <transport/raw/MessageHeader.h>
32 :
33 : #include <inttypes.h>
34 : #include <limits>
35 :
36 : namespace chip {
37 : namespace Transport {
38 : namespace {
39 :
40 : using namespace chip::Encoding;
41 :
42 : // Packets start with a 16-bit size
43 : constexpr size_t kPacketSizeBytes = 2;
44 :
45 : // TODO: Actual limit may be lower (spec issue #2119)
46 : constexpr uint16_t kMaxMessageSize = static_cast<uint16_t>(System::PacketBuffer::kMaxSizeWithoutReserve - kPacketSizeBytes);
47 :
48 : constexpr int kListenBacklogSize = 2;
49 :
50 : } // namespace
51 :
52 5 : TCPBase::~TCPBase()
53 : {
54 5 : if (mListenSocket != nullptr)
55 : {
56 : // endpoint is only non null if it is initialized and listening
57 5 : mListenSocket->Free();
58 5 : mListenSocket = nullptr;
59 : }
60 :
61 5 : CloseActiveConnections();
62 5 : }
63 :
64 5 : void TCPBase::CloseActiveConnections()
65 : {
66 25 : for (size_t i = 0; i < mActiveConnectionsSize; i++)
67 : {
68 20 : if (mActiveConnections[i].InUse())
69 : {
70 0 : mActiveConnections[i].Free();
71 0 : mUsedEndPointCount--;
72 : }
73 : }
74 5 : }
75 :
76 5 : CHIP_ERROR TCPBase::Init(TcpListenParameters & params)
77 : {
78 5 : CHIP_ERROR err = CHIP_NO_ERROR;
79 :
80 5 : VerifyOrExit(mState == State::kNotReady, err = CHIP_ERROR_INCORRECT_STATE);
81 :
82 : #if INET_CONFIG_ENABLE_TCP_ENDPOINT
83 5 : err = params.GetEndPointManager()->NewEndPoint(&mListenSocket);
84 : #else
85 : err = CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE;
86 : #endif
87 5 : SuccessOrExit(err);
88 :
89 5 : err = mListenSocket->Bind(params.GetAddressType(), Inet::IPAddress::Any, params.GetListenPort(),
90 5 : params.GetInterfaceId().IsPresent());
91 5 : SuccessOrExit(err);
92 :
93 5 : err = mListenSocket->Listen(kListenBacklogSize);
94 5 : SuccessOrExit(err);
95 :
96 5 : mListenSocket->mAppState = reinterpret_cast<void *>(this);
97 5 : mListenSocket->OnDataReceived = OnTcpReceive;
98 5 : mListenSocket->OnConnectComplete = OnConnectionComplete;
99 5 : mListenSocket->OnConnectionClosed = OnConnectionClosed;
100 5 : mListenSocket->OnConnectionReceived = OnConnectionReceived;
101 5 : mListenSocket->OnAcceptError = OnAcceptError;
102 5 : mEndpointType = params.GetAddressType();
103 :
104 5 : mState = State::kInitialized;
105 :
106 5 : exit:
107 5 : if (err != CHIP_NO_ERROR)
108 : {
109 0 : ChipLogError(Inet, "Failed to initialize TCP transport: %s", ErrorStr(err));
110 0 : if (mListenSocket)
111 : {
112 0 : mListenSocket->Free();
113 0 : mListenSocket = nullptr;
114 : }
115 : }
116 :
117 5 : return err;
118 : }
119 :
120 0 : void TCPBase::Close()
121 : {
122 0 : if (mListenSocket)
123 : {
124 0 : mListenSocket->Free();
125 0 : mListenSocket = nullptr;
126 : }
127 0 : mState = State::kNotReady;
128 0 : }
129 :
130 4 : TCPBase::ActiveConnectionState * TCPBase::FindActiveConnection(const PeerAddress & address)
131 : {
132 4 : if (address.GetTransportType() != Type::kTcp)
133 : {
134 0 : return nullptr;
135 : }
136 :
137 17 : for (size_t i = 0; i < mActiveConnectionsSize; i++)
138 : {
139 14 : if (!mActiveConnections[i].InUse())
140 : {
141 12 : continue;
142 : }
143 : Inet::IPAddress addr;
144 : uint16_t port;
145 2 : mActiveConnections[i].mEndPoint->GetPeerInfo(&addr, &port);
146 :
147 2 : if ((addr == address.GetIPAddress()) && (port == address.GetPort()))
148 : {
149 1 : return &mActiveConnections[i];
150 : }
151 : }
152 :
153 3 : return nullptr;
154 : }
155 :
156 8 : TCPBase::ActiveConnectionState * TCPBase::FindActiveConnection(const Inet::TCPEndPoint * endPoint)
157 : {
158 13 : for (size_t i = 0; i < mActiveConnectionsSize; i++)
159 : {
160 13 : if (mActiveConnections[i].mEndPoint == endPoint)
161 : {
162 8 : return &mActiveConnections[i];
163 : }
164 : }
165 0 : return nullptr;
166 : }
167 :
168 3 : CHIP_ERROR TCPBase::SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf)
169 : {
170 : // Sent buffer data format is:
171 : // - packet size as a uint16_t
172 : // - actual data
173 :
174 3 : VerifyOrReturnError(address.GetTransportType() == Type::kTcp, CHIP_ERROR_INVALID_ARGUMENT);
175 3 : VerifyOrReturnError(mState == State::kInitialized, CHIP_ERROR_INCORRECT_STATE);
176 3 : VerifyOrReturnError(kPacketSizeBytes + msgBuf->DataLength() <= std::numeric_limits<uint16_t>::max(),
177 : CHIP_ERROR_INVALID_ARGUMENT);
178 :
179 : // The check above about kPacketSizeBytes + msgBuf->DataLength() means it definitely fits in uint16_t.
180 3 : VerifyOrReturnError(msgBuf->EnsureReservedSize(static_cast<uint16_t>(kPacketSizeBytes)), CHIP_ERROR_NO_MEMORY);
181 :
182 3 : msgBuf->SetStart(msgBuf->Start() - kPacketSizeBytes);
183 :
184 3 : uint8_t * output = msgBuf->Start();
185 3 : LittleEndian::Write16(output, static_cast<uint16_t>(msgBuf->DataLength() - kPacketSizeBytes));
186 :
187 : // Reuse existing connection if one exists, otherwise a new one
188 : // will be established
189 3 : ActiveConnectionState * connection = FindActiveConnection(address);
190 :
191 3 : if (connection != nullptr)
192 : {
193 0 : return connection->mEndPoint->Send(std::move(msgBuf));
194 : }
195 :
196 3 : return SendAfterConnect(address, std::move(msgBuf));
197 : }
198 :
199 3 : CHIP_ERROR TCPBase::SendAfterConnect(const PeerAddress & addr, System::PacketBufferHandle && msg)
200 : {
201 : // This will initiate a connection to the specified peer
202 3 : bool alreadyConnecting = false;
203 :
204 : // Iterate through the ENTIRE array. If a pending packet for
205 : // the address already exists, this means a connection is pending and
206 : // does NOT need to be re-established.
207 3 : mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) {
208 0 : if (pending->mPeerAddress == addr)
209 : {
210 : // same destination exists.
211 0 : alreadyConnecting = true;
212 0 : pending->mPacketBuffer->AddToEnd(std::move(msg));
213 0 : return Loop::Break;
214 : }
215 0 : return Loop::Continue;
216 : });
217 :
218 : // If already connecting, buffer was just enqueued for more sending
219 3 : if (alreadyConnecting)
220 : {
221 0 : return CHIP_NO_ERROR;
222 : }
223 :
224 : // Ensures sufficient active connections size exist
225 3 : VerifyOrReturnError(mUsedEndPointCount < mActiveConnectionsSize, CHIP_ERROR_NO_MEMORY);
226 :
227 : #if INET_CONFIG_ENABLE_TCP_ENDPOINT
228 3 : Inet::TCPEndPoint * endPoint = nullptr;
229 3 : ReturnErrorOnFailure(mListenSocket->GetEndPointManager().NewEndPoint(&endPoint));
230 0 : auto EndPointDeletor = [](Inet::TCPEndPoint * e) { e->Free(); };
231 3 : std::unique_ptr<Inet::TCPEndPoint, decltype(EndPointDeletor)> endPointHolder(endPoint, EndPointDeletor);
232 :
233 3 : endPoint->mAppState = reinterpret_cast<void *>(this);
234 3 : endPoint->OnDataReceived = OnTcpReceive;
235 3 : endPoint->OnConnectComplete = OnConnectionComplete;
236 3 : endPoint->OnConnectionClosed = OnConnectionClosed;
237 3 : endPoint->OnConnectionReceived = OnConnectionReceived;
238 3 : endPoint->OnAcceptError = OnAcceptError;
239 3 : endPoint->OnPeerClose = OnPeerClosed;
240 :
241 3 : ReturnErrorOnFailure(endPoint->Connect(addr.GetIPAddress(), addr.GetPort(), addr.GetInterface()));
242 :
243 : // enqueue the packet once the connection succeeds
244 3 : VerifyOrReturnError(mPendingPackets.CreateObject(addr, std::move(msg)) != nullptr, CHIP_ERROR_NO_MEMORY);
245 3 : mUsedEndPointCount++;
246 :
247 3 : endPointHolder.release();
248 :
249 3 : return CHIP_NO_ERROR;
250 : #else
251 : return CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE;
252 : #endif
253 3 : }
254 :
255 8 : CHIP_ERROR TCPBase::ProcessReceivedBuffer(Inet::TCPEndPoint * endPoint, const PeerAddress & peerAddress,
256 : System::PacketBufferHandle && buffer)
257 : {
258 8 : ActiveConnectionState * state = FindActiveConnection(endPoint);
259 8 : VerifyOrReturnError(state != nullptr, CHIP_ERROR_INTERNAL);
260 8 : state->mReceived.AddToEnd(std::move(buffer));
261 :
262 17 : while (!state->mReceived.IsNull())
263 : {
264 : uint8_t messageSizeBuf[kPacketSizeBytes];
265 10 : CHIP_ERROR err = state->mReceived->Read(messageSizeBuf);
266 10 : if (err == CHIP_ERROR_BUFFER_TOO_SMALL)
267 : {
268 : // We don't have enough data to read the message size. Wait until there's more.
269 1 : return CHIP_NO_ERROR;
270 : }
271 10 : if (err != CHIP_NO_ERROR)
272 : {
273 0 : return err;
274 : }
275 10 : uint16_t messageSize = LittleEndian::Get16(messageSizeBuf);
276 10 : if (messageSize >= kMaxMessageSize)
277 : {
278 : // This message is too long for upper layers.
279 1 : return CHIP_ERROR_MESSAGE_TOO_LONG;
280 : }
281 : // The subtraction will not underflow because we successfully read kPacketSizeBytes.
282 9 : if (messageSize > (state->mReceived->TotalLength() - kPacketSizeBytes))
283 : {
284 : // We have not yet received the complete message.
285 0 : return CHIP_NO_ERROR;
286 : }
287 9 : state->mReceived.Consume(kPacketSizeBytes);
288 9 : ReturnErrorOnFailure(ProcessSingleMessage(peerAddress, state, messageSize));
289 : }
290 :
291 7 : return CHIP_NO_ERROR;
292 : }
293 :
294 9 : CHIP_ERROR TCPBase::ProcessSingleMessage(const PeerAddress & peerAddress, ActiveConnectionState * state, uint16_t messageSize)
295 : {
296 : // We enter with `state->mReceived` containing at least one full message, perhaps in a chain.
297 : // `state->mReceived->Start()` currently points to the message data.
298 : // On exit, `state->mReceived` will have had `messageSize` bytes consumed, no matter what.
299 9 : System::PacketBufferHandle message;
300 9 : if (state->mReceived->DataLength() == messageSize)
301 : {
302 : // In this case, the head packet buffer contains exactly the message.
303 : // This is common because typical messages fit in a network packet, and are delivered as such.
304 : // Peel off the head to pass upstream, which effectively consumes it from `state->mReceived`.
305 6 : message = state->mReceived.PopHead();
306 : }
307 : else
308 : {
309 : // The message is either longer or shorter than the head buffer.
310 : // In either case, copy the message to a fresh linear buffer to pass upstream. We always copy, rather than provide
311 : // a shared reference to the current buffer, in case upper layers manipulate the buffer in ways that would affect
312 : // our use, e.g. chaining it elsewhere or reusing space beyond the current message.
313 3 : message = System::PacketBufferHandle::New(messageSize, 0);
314 3 : if (message.IsNull())
315 : {
316 0 : return CHIP_ERROR_NO_MEMORY;
317 : }
318 3 : CHIP_ERROR err = state->mReceived->Read(message->Start(), messageSize);
319 3 : state->mReceived.Consume(messageSize);
320 3 : ReturnErrorOnFailure(err);
321 3 : message->SetDataLength(messageSize);
322 : }
323 :
324 9 : HandleMessageReceived(peerAddress, std::move(message));
325 9 : return CHIP_NO_ERROR;
326 9 : }
327 :
328 3 : void TCPBase::ReleaseActiveConnection(Inet::TCPEndPoint * endPoint)
329 : {
330 15 : for (size_t i = 0; i < mActiveConnectionsSize; i++)
331 : {
332 12 : if (mActiveConnections[i].mEndPoint == endPoint)
333 : {
334 3 : mActiveConnections[i].Free();
335 3 : mUsedEndPointCount--;
336 : }
337 : }
338 3 : }
339 :
340 3 : CHIP_ERROR TCPBase::OnTcpReceive(Inet::TCPEndPoint * endPoint, System::PacketBufferHandle && buffer)
341 : {
342 : Inet::IPAddress ipAddress;
343 : uint16_t port;
344 3 : Inet::InterfaceId interfaceId;
345 :
346 3 : endPoint->GetPeerInfo(&ipAddress, &port);
347 3 : endPoint->GetInterfaceId(&interfaceId);
348 3 : PeerAddress peerAddress = PeerAddress::TCP(ipAddress, port, interfaceId);
349 :
350 3 : TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->mAppState);
351 3 : CHIP_ERROR err = tcp->ProcessReceivedBuffer(endPoint, peerAddress, std::move(buffer));
352 :
353 3 : if (err != CHIP_NO_ERROR)
354 : {
355 : // Connection could need to be closed at this point
356 0 : ChipLogError(Inet, "Failed to accept received TCP message: %s", ErrorStr(err));
357 0 : return CHIP_ERROR_UNEXPECTED_EVENT;
358 : }
359 3 : return CHIP_NO_ERROR;
360 : }
361 :
362 3 : void TCPBase::OnConnectionComplete(Inet::TCPEndPoint * endPoint, CHIP_ERROR inetErr)
363 : {
364 3 : CHIP_ERROR err = CHIP_NO_ERROR;
365 3 : bool foundPendingPacket = false;
366 3 : TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->mAppState);
367 : Inet::IPAddress ipAddress;
368 : uint16_t port;
369 3 : Inet::InterfaceId interfaceId;
370 :
371 3 : endPoint->GetPeerInfo(&ipAddress, &port);
372 3 : endPoint->GetInterfaceId(&interfaceId);
373 3 : PeerAddress addr = PeerAddress::TCP(ipAddress, port, interfaceId);
374 :
375 : // Send any pending packets
376 3 : tcp->mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) {
377 3 : if (pending->mPeerAddress == addr)
378 : {
379 3 : foundPendingPacket = true;
380 3 : System::PacketBufferHandle buffer = std::move(pending->mPacketBuffer);
381 3 : tcp->mPendingPackets.ReleaseObject(pending);
382 :
383 3 : if ((inetErr == CHIP_NO_ERROR) && (err == CHIP_NO_ERROR))
384 : {
385 3 : err = endPoint->Send(std::move(buffer));
386 : }
387 3 : }
388 3 : return Loop::Continue;
389 : });
390 :
391 3 : if (err == CHIP_NO_ERROR)
392 : {
393 3 : err = inetErr;
394 : }
395 :
396 3 : if (!foundPendingPacket && (err == CHIP_NO_ERROR))
397 : {
398 : // Force a close: new connections are only expected when a
399 : // new buffer is being sent.
400 0 : ChipLogError(Inet, "Connection accepted without pending buffers");
401 0 : err = CHIP_ERROR_CONNECTION_CLOSED_UNEXPECTEDLY;
402 : }
403 :
404 : // cleanup packets or mark as free
405 3 : if (err != CHIP_NO_ERROR)
406 : {
407 0 : ChipLogError(Inet, "Connection complete encountered an error: %s", ErrorStr(err));
408 0 : endPoint->Free();
409 0 : tcp->mUsedEndPointCount--;
410 : }
411 : else
412 : {
413 3 : bool connectionStored = false;
414 6 : for (size_t i = 0; i < tcp->mActiveConnectionsSize; i++)
415 : {
416 6 : if (!tcp->mActiveConnections[i].InUse())
417 : {
418 3 : tcp->mActiveConnections[i].Init(endPoint);
419 3 : connectionStored = true;
420 3 : break;
421 : }
422 : }
423 :
424 : // since we track end points counts, we always expect to store the
425 : // connection.
426 3 : if (!connectionStored)
427 : {
428 0 : endPoint->Free();
429 0 : ChipLogError(Inet, "Internal logic error: insufficient space to store active connection");
430 : }
431 : }
432 3 : }
433 :
434 0 : void TCPBase::OnConnectionClosed(Inet::TCPEndPoint * endPoint, CHIP_ERROR err)
435 : {
436 0 : TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->mAppState);
437 :
438 0 : ChipLogProgress(Inet, "Connection closed.");
439 :
440 0 : ChipLogProgress(Inet, "Freeing closed connection.");
441 0 : tcp->ReleaseActiveConnection(endPoint);
442 0 : }
443 :
444 3 : void TCPBase::OnConnectionReceived(Inet::TCPEndPoint * listenEndPoint, Inet::TCPEndPoint * endPoint,
445 : const Inet::IPAddress & peerAddress, uint16_t peerPort)
446 : {
447 3 : TCPBase * tcp = reinterpret_cast<TCPBase *>(listenEndPoint->mAppState);
448 :
449 3 : if (tcp->mUsedEndPointCount < tcp->mActiveConnectionsSize)
450 : {
451 : // have space to use one more (even if considering pending connections)
452 3 : for (size_t i = 0; i < tcp->mActiveConnectionsSize; i++)
453 : {
454 3 : if (!tcp->mActiveConnections[i].InUse())
455 : {
456 3 : tcp->mActiveConnections[i].Init(endPoint);
457 3 : tcp->mUsedEndPointCount++;
458 3 : break;
459 : }
460 : }
461 :
462 3 : endPoint->mAppState = listenEndPoint->mAppState;
463 3 : endPoint->OnDataReceived = OnTcpReceive;
464 3 : endPoint->OnConnectComplete = OnConnectionComplete;
465 3 : endPoint->OnConnectionClosed = OnConnectionClosed;
466 3 : endPoint->OnConnectionReceived = OnConnectionReceived;
467 3 : endPoint->OnAcceptError = OnAcceptError;
468 3 : endPoint->OnPeerClose = OnPeerClosed;
469 : }
470 : else
471 : {
472 0 : ChipLogError(Inet, "Insufficient connection space to accept new connections");
473 0 : endPoint->Free();
474 : }
475 3 : }
476 :
477 0 : void TCPBase::OnAcceptError(Inet::TCPEndPoint * endPoint, CHIP_ERROR err)
478 : {
479 0 : ChipLogError(Inet, "Accept error: %s", ErrorStr(err));
480 0 : }
481 :
482 3 : void TCPBase::Disconnect(const PeerAddress & address)
483 : {
484 : // Closes an existing connection
485 15 : for (size_t i = 0; i < mActiveConnectionsSize; i++)
486 : {
487 12 : if (mActiveConnections[i].InUse())
488 : {
489 : Inet::IPAddress ipAddress;
490 : uint16_t port;
491 6 : Inet::InterfaceId interfaceId;
492 :
493 6 : mActiveConnections[i].mEndPoint->GetPeerInfo(&ipAddress, &port);
494 6 : mActiveConnections[i].mEndPoint->GetInterfaceId(&interfaceId);
495 6 : if (address == PeerAddress::TCP(ipAddress, port, interfaceId))
496 : {
497 : // NOTE: this leaves the socket in TIME_WAIT.
498 : // Calling Abort() would clean it since SO_LINGER would be set to 0,
499 : // however this seems not to be useful.
500 3 : mActiveConnections[i].Free();
501 3 : mUsedEndPointCount--;
502 : }
503 : }
504 : }
505 3 : }
506 :
507 3 : void TCPBase::OnPeerClosed(Inet::TCPEndPoint * endPoint)
508 : {
509 3 : TCPBase * tcp = reinterpret_cast<TCPBase *>(endPoint->mAppState);
510 :
511 3 : ChipLogProgress(Inet, "Freeing connection: connection closed by peer");
512 :
513 3 : tcp->ReleaseActiveConnection(endPoint);
514 3 : }
515 :
516 3 : bool TCPBase::HasActiveConnections() const
517 : {
518 15 : for (size_t i = 0; i < mActiveConnectionsSize; i++)
519 : {
520 12 : if (mActiveConnections[i].InUse())
521 : {
522 0 : return true;
523 : }
524 : }
525 :
526 3 : return false;
527 : }
528 :
529 : } // namespace Transport
530 : } // namespace chip
|