Albirew/nyaa-pantsu
Archivé
1
0
Bifurcation 0
Ce dépôt a été archivé le 2022-05-07. Vous pouvez voir ses fichiers ou le cloner, mais pas ouvrir de ticket ou de demandes d'ajout, ni soumettre de changements.
nyaa-pantsu/vendor/github.com/anacrolix/go-libutp/utp_internal.cpp
akuma06 b2b48f61b0 Torrent Generation on not found error (#1600)
* [WIP] Torrent Generation on not found error
As asked in #1517, it allows on-the-fly torrent generation. Since it uses magnet links, it needs some time to connect to peers. So it can't be instant generation, we need the user to wait and try after a minute at least.

* Replace Fatal by simple error

* attempt at fixing travis

* del

* Add Anacrolyx dependency

* Add back difflib

* Remove .torrent suffix in the url example

* Add some explanations when file missing page shown

* Ignore downloads directory

* Either use cache (third-party site) or own download directory

* Wrong import

* If there is an error then it means we aren't generating a torrent file

May it be "torrent not found" or "We do not store torrent files" which are the two only existing errors for this page

* hash is never empty

* TorrentLink may be empty at times

So we add a /download/:hash link if it is

* Update README.md

* Made a mistake here, need to check if false

* Update en-us.all.json

* Update CHANGELOG.md

* Torrent file generation can be triggered by click on button if JS enabled

* Update download.go

* Update download.go

* Use c.JSON instead of text/template

* Return to default behavior if we don't generate the file

* Don't do the query if returned to default behavior

* Add "Could not generate torrent file" error

* Fix JS condition & lower delay until button updates

* Start download automatically once torrent file is generated

* Fix torrentFileExists() constantly returning false if external torrent download URL

* torrent-view-data is two tables instead of one

This allows the removal of useless things without any problem (e.g Website link), but also a better responsibe design since the previous one separated stats after a certain res looking very wonky

* CSS changes to go along

* Remove useless <b></b>

* Update main.css

* In torrentFileExists, check if filestorage path exists instead of looking at the domain in torrent link

When checking if the file is stored on another server i used to simply check if the domain name was inside the torrent link, but we can straight up check for filestorage length

* Fix JS of on-demand stat fetching

* ScrapeAge variable accessible through view.jet.html

Contains last scraped time in hours, is at -1 is torrent has never been scraped
Stats will get updated if it's either at -1 or above 1460 (2 months old)

* Refresh stats if older than two months OR unknown and older than 24h

Show last scraped date even if stats are unknown

* Add StatsObsolete variable to torrent

Indicating if:
- They can be shown
- They need to be updated

* Update scraped data even if Unknown, prevent users from trying to fetch stats every seconds

* Torrent file stored locally by default

* no need to do all of that if no filestorage

* fix filestorage path

* Fix torrent download button stuck on "Generating torrent file" at rare times

* fix some css rules that didn't work on IE

* Fix panic error

Seems like this error is a known bug from  anacrolyx torrent https://github.com/anacrolix/torrent/issues/83

To prevent it, I'm creating a single client and modifying the socket.go to make it not raise a panic but a simple error log.
2017-10-21 09:40:43 +02:00

3443 lignes
105 Kio
C++

/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <limits.h> // for UINT_MAX
#include <time.h>
#include "utp_types.h"
#include "utp_packedsockaddr.h"
#include "utp_internal.h"
#include "utp_hash.h"
#define TIMEOUT_CHECK_INTERVAL 500
// number of bytes to increase max window size by, per RTT. This is
// scaled down linearly proportional to off_target. i.e. if all packets
// in one window have 0 delay, window size will increase by this number.
// Typically it's less. TCP increases one MSS per RTT, which is 1500
#define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
#define CUR_DELAY_SIZE 3
// experiments suggest that a clock skew of 10 ms per 325 seconds
// is not impossible. Reset delay_base every 13 minutes. The clock
// skew is dealt with by observing the delay base in the other
// direction, and adjusting our own upwards if the opposite direction
// delay base keeps going down
#define DELAY_BASE_HISTORY 13
#define MAX_WINDOW_DECAY 100 // ms
#define REORDER_BUFFER_SIZE 32
#define REORDER_BUFFER_MAX_SIZE 1024
#define OUTGOING_BUFFER_MAX_SIZE 1024
#define PACKET_SIZE 1435
// this is the minimum max_window value. It can never drop below this
#define MIN_WINDOW_SIZE 10
// if we receive 4 or more duplicate acks, we resend the packet
// that hasn't been acked yet
#define DUPLICATE_ACKS_BEFORE_RESEND 3
// Allow a reception window of at least 3 ack_nrs behind seq_nr
// A non-SYN packet with an ack_nr difference greater than this is
// considered suspicious and ignored
#define ACK_NR_ALLOWED_WINDOW DUPLICATE_ACKS_BEFORE_RESEND
#define RST_INFO_TIMEOUT 10000
#define RST_INFO_LIMIT 1000
// 29 seconds determined from measuring many home NAT devices
#define KEEPALIVE_INTERVAL 29000
#define SEQ_NR_MASK 0xFFFF
#define ACK_NR_MASK 0xFFFF
#define TIMESTAMP_MASK 0xFFFFFFFF
#define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
// The totals are derived from the following data:
// 45: IPv6 address including embedded IPv4 address
// 11: Scope Id
// 2: Brackets around IPv6 address when port is present
// 6: Port (including colon)
// 1: Terminating null byte
char addrbuf[65];
#define addrfmt(x, s) x.fmt(s, sizeof(s))
#if (defined(__SVR4) && defined(__sun))
#pragma pack(1)
#else
#pragma pack(push,1)
#endif
// these packet sizes are including the uTP header wich
// is either 20 or 23 bytes depending on version
#define PACKET_SIZE_EMPTY_BUCKET 0
#define PACKET_SIZE_EMPTY 23
#define PACKET_SIZE_SMALL_BUCKET 1
#define PACKET_SIZE_SMALL 373
#define PACKET_SIZE_MID_BUCKET 2
#define PACKET_SIZE_MID 723
#define PACKET_SIZE_BIG_BUCKET 3
#define PACKET_SIZE_BIG 1400
#define PACKET_SIZE_HUGE_BUCKET 4
struct PACKED_ATTRIBUTE PacketFormatV1 {
// packet_type (4 high bits)
// protocol version (4 low bits)
byte ver_type;
byte version() const { return ver_type & 0xf; }
byte type() const { return ver_type >> 4; }
void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); }
void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); }
// Type of the first extension header
byte ext;
// connection ID
uint16_big connid;
uint32_big tv_usec;
uint32_big reply_micro;
// receive window size in bytes
uint32_big windowsize;
// Sequence number
uint16_big seq_nr;
// Acknowledgment number
uint16_big ack_nr;
};
struct PACKED_ATTRIBUTE PacketFormatAckV1 {
PacketFormatV1 pf;
byte ext_next;
byte ext_len;
byte acks[4];
};
#if (defined(__SVR4) && defined(__sun))
#pragma pack(0)
#else
#pragma pack(pop)
#endif
enum {
ST_DATA = 0, // Data packet.
ST_FIN = 1, // Finalize the connection. This is the last packet.
ST_STATE = 2, // State packet. Used to transmit an ACK with no data.
ST_RESET = 3, // Terminate connection forcefully.
ST_SYN = 4, // Connect SYN
ST_NUM_STATES, // used for bounds checking
};
static const cstr flagnames[] = {
"ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
};
enum CONN_STATE {
CS_UNINITIALIZED = 0,
CS_IDLE,
CS_SYN_SENT,
CS_SYN_RECV,
CS_CONNECTED,
CS_CONNECTED_FULL,
CS_GOT_FIN,
CS_DESTROY_DELAY,
CS_FIN_SENT,
CS_RESET,
CS_DESTROY
};
static const cstr statenames[] = {
"UNINITIALIZED", "IDLE","SYN_SENT", "SYN_RECV", "CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
};
struct OutgoingPacket {
size_t length;
size_t payload;
uint64 time_sent; // microseconds
uint transmissions:31;
bool need_resend:1;
byte data[1];
};
struct SizableCircularBuffer {
// This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
size_t mask;
// This is the elements that the circular buffer points to
void **elements;
void *get(size_t i) const { assert(elements); return elements ? elements[i & mask] : NULL; }
void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }
void grow(size_t item, size_t index);
void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
size_t size() { return mask + 1; }
};
// Item contains the element we want to make space for
// index is the index in the list.
void SizableCircularBuffer::grow(size_t item, size_t index)
{
// Figure out the new size.
size_t size = mask + 1;
do size *= 2; while (index >= size);
// Allocate the new buffer
void **buf = (void**)calloc(size, sizeof(void*));
size--;
// Copy elements from the old buffer to the new buffer
for (size_t i = 0; i <= mask; i++) {
buf[(item - index + i) & size] = get(item - index + i);
}
// Swap to the newly allocated buffer
mask = size;
free(elements);
elements = buf;
}
// compare if lhs is less than rhs, taking wrapping
// into account. if lhs is close to UINT_MAX and rhs
// is close to 0, lhs is assumed to have wrapped and
// considered smaller
bool wrapping_compare_less(uint32 lhs, uint32 rhs, uint32 mask)
{
// distance walking from lhs to rhs, downwards
const uint32 dist_down = (lhs - rhs) & mask;
// distance walking from lhs to rhs, upwards
const uint32 dist_up = (rhs - lhs) & mask;
// if the distance walking up is shorter, lhs
// is less than rhs. If the distance walking down
// is shorter, then rhs is less than lhs
return dist_up < dist_down;
}
struct DelayHist {
uint32 delay_base;
// this is the history of delay samples,
// normalized by using the delay_base. These
// values are always greater than 0 and measures
// the queuing delay in microseconds
uint32 cur_delay_hist[CUR_DELAY_SIZE];
size_t cur_delay_idx;
// this is the history of delay_base. It's
// a number that doesn't have an absolute meaning
// only relative. It doesn't make sense to initialize
// it to anything other than values relative to
// what's been seen in the real world.
uint32 delay_base_hist[DELAY_BASE_HISTORY];
size_t delay_base_idx;
// the time when we last stepped the delay_base_idx
uint64 delay_base_time;
bool delay_base_initialized;
void clear(uint64 current_ms)
{
delay_base_initialized = false;
delay_base = 0;
cur_delay_idx = 0;
delay_base_idx = 0;
delay_base_time = current_ms;
for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
cur_delay_hist[i] = 0;
}
for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
delay_base_hist[i] = 0;
}
}
void shift(const uint32 offset)
{
// the offset should never be "negative"
// assert(offset < 0x10000000);
// increase all of our base delays by this amount
// this is used to take clock skew into account
// by observing the other side's changes in its base_delay
for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
delay_base_hist[i] += offset;
}
delay_base += offset;
}
void add_sample(const uint32 sample, uint64 current_ms)
{
// The two clocks (in the two peers) are assumed not to
// progress at the exact same rate. They are assumed to be
// drifting, which causes the delay samples to contain
// a systematic error, either they are under-
// estimated or over-estimated. This is why we update the
// delay_base every two minutes, to adjust for this.
// This means the values will keep drifting and eventually wrap.
// We can cross the wrapping boundry in two directions, either
// going up, crossing the highest value, or going down, crossing 0.
// if the delay_base is close to the max value and sample actually
// wrapped on the other end we would see something like this:
// delay_base = 0xffffff00, sample = 0x00000400
// sample - delay_base = 0x500 which is the correct difference
// if the delay_base is instead close to 0, and we got an even lower
// sample (that will eventually update the delay_base), we may see
// something like this:
// delay_base = 0x00000400, sample = 0xffffff00
// sample - delay_base = 0xfffffb00
// this needs to be interpreted as a negative number and the actual
// recorded delay should be 0.
// It is important that all arithmetic that assume wrapping
// is done with unsigned intergers. Signed integers are not guaranteed
// to wrap the way unsigned integers do. At least GCC takes advantage
// of this relaxed rule and won't necessarily wrap signed ints.
// remove the clock offset and propagation delay.
// delay base is min of the sample and the current
// delay base. This min-operation is subject to wrapping
// and care needs to be taken to correctly choose the
// true minimum.
// specifically the problem case is when delay_base is very small
// and sample is very large (because it wrapped past zero), sample
// needs to be considered the smaller
if (!delay_base_initialized) {
// delay_base being 0 suggests that we haven't initialized
// it or its history with any real measurements yet. Initialize
// everything with this sample.
for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
// if we don't have a value, set it to the current sample
delay_base_hist[i] = sample;
continue;
}
delay_base = sample;
delay_base_initialized = true;
}
if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx], TIMESTAMP_MASK)) {
// sample is smaller than the current delay_base_hist entry
// update it
delay_base_hist[delay_base_idx] = sample;
}
// is sample lower than delay_base? If so, update delay_base
if (wrapping_compare_less(sample, delay_base, TIMESTAMP_MASK)) {
// sample is smaller than the current delay_base
// update it
delay_base = sample;
}
// this operation may wrap, and is supposed to
const uint32 delay = sample - delay_base;
// sanity check. If this is triggered, something fishy is going on
// it means the measured sample was greater than 32 seconds!
//assert(delay < 0x2000000);
cur_delay_hist[cur_delay_idx] = delay;
cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE;
// once every minute
if (current_ms - delay_base_time > 60 * 1000) {
delay_base_time = current_ms;
delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY;
// clear up the new delay base history spot by initializing
// it to the current sample, then update it
delay_base_hist[delay_base_idx] = sample;
delay_base = delay_base_hist[0];
// Assign the lowest delay in the last 2 minutes to delay_base
for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
if (wrapping_compare_less(delay_base_hist[i], delay_base, TIMESTAMP_MASK))
delay_base = delay_base_hist[i];
}
}
}
uint32 get_value()
{
uint32 value = UINT_MAX;
for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
value = min<uint32>(cur_delay_hist[i], value);
}
// value could be UINT_MAX if we have no samples yet...
return value;
}
};
struct UTPSocket {
~UTPSocket();
PackedSockAddr addr;
utp_context *ctx;
int ida; //for ack socket list
uint16 retransmit_count;
uint16 reorder_count;
byte duplicate_ack;
// the number of packets in the send queue. Packets that haven't
// yet been sent count as well as packets marked as needing resend
// the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
uint16 cur_window_packets;
// how much of the window is used, number of bytes in-flight
// packets that have not yet been sent do not count, packets
// that are marked as needing to be re-sent (due to a timeout)
// don't count either
size_t cur_window;
// maximum window size, in bytes
size_t max_window;
// UTP_SNDBUF setting, in bytes
size_t opt_sndbuf;
// UTP_RCVBUF setting, in bytes
size_t opt_rcvbuf;
// this is the target delay, in microseconds
// for this socket. defaults to 100000.
size_t target_delay;
// Is a FIN packet in the reassembly buffer?
bool got_fin:1;
// Timeout procedure
bool fast_timeout:1;
// max receive window for other end, in bytes
size_t max_window_user;
CONN_STATE state;
// TickCount when we last decayed window (wraps)
int64 last_rwin_decay;
// the sequence number of the FIN packet. This field is only set
// when we have received a FIN, and the flag field has the FIN flag set.
// it is used to know when it is safe to destroy the socket, we must have
// received all packets up to this sequence number first.
uint16 eof_pkt;
// All sequence numbers up to including this have been properly received
// by us
uint16 ack_nr;
// This is the sequence number for the next packet to be sent.
uint16 seq_nr;
uint16 timeout_seq_nr;
// This is the sequence number of the next packet we're allowed to
// do a fast resend with. This makes sure we only do a fast-resend
// once per packet. We can resend the packet with this sequence number
// or any later packet (with a higher sequence number).
uint16 fast_resend_seq_nr;
uint32 reply_micro;
uint64 last_got_packet;
uint64 last_sent_packet;
uint64 last_measured_delay;
// timestamp of the last time the cwnd was full
// this is used to prevent the congestion window
// from growing when we're not sending at capacity
mutable uint64 last_maxed_out_window;
void *userdata;
// Round trip time
uint rtt;
// Round trip time variance
uint rtt_var;
// Round trip timeout
uint rto;
DelayHist rtt_hist;
uint retransmit_timeout;
// The RTO timer will timeout here.
uint64 rto_timeout;
// When the window size is set to zero, start this timer. It will send a new packet every 30secs.
uint64 zerowindow_time;
uint32 conn_seed;
// Connection ID for packets I receive
uint32 conn_id_recv;
// Connection ID for packets I send
uint32 conn_id_send;
// Last rcv window we advertised, in bytes
size_t last_rcv_win;
DelayHist our_hist;
DelayHist their_hist;
// extension bytes from SYN packet
byte extensions[8];
// MTU Discovery
// time when we should restart the MTU discovery
uint64 mtu_discover_time;
// ceiling and floor of binary search. last is the mtu size
// we're currently using
uint32 mtu_ceiling, mtu_floor, mtu_last;
// we only ever have a single probe in flight at any given time.
// this is the sequence number of that probe, and the size of
// that packet
uint32 mtu_probe_seq, mtu_probe_size;
// this is the average delay samples, as compared to the initial
// sample. It's averaged over 5 seconds
int32 average_delay;
// this is the sum of all the delay samples
// we've made recently. The important distinction
// of these samples is that they are all made compared
// to the initial sample, this is to deal with
// wrapping in a simple way.
int64 current_delay_sum;
// number of sample ins current_delay_sum
int current_delay_samples;
// initialized to 0, set to the first raw delay sample
// each sample that's added to current_delay_sum
// is subtracted from the value first, to make it
// a delay relative to this sample
uint32 average_delay_base;
// the next time we should add an average delay
// sample into average_delay_hist
uint64 average_sample_time;
// the estimated clock drift between our computer
// and the endpoint computer. The unit is microseconds
// per 5 seconds
int32 clock_drift;
// just used for logging
int32 clock_drift_raw;
SizableCircularBuffer inbuf, outbuf;
#ifdef _DEBUG
// Public per-socket statistics, returned by utp_get_stats()
utp_socket_stats _stats;
#endif
// true if we're in slow-start (exponential growth) phase
bool slow_start;
// the slow-start threshold, in bytes
size_t ssthresh;
void log(int level, char const *fmt, ...)
{
va_list va;
char buf[4096], buf2[4096];
// don't bother with vsnprintf() etc calls if we're not going to log.
if (!ctx->would_log(level)) {
return;
}
va_start(va, fmt);
vsnprintf(buf, 4096, fmt, va);
va_end(va);
buf[4095] = '\0';
snprintf(buf2, 4096, "%p %s %06u %s", this, addrfmt(addr, addrbuf), conn_id_recv, buf);
buf2[4095] = '\0';
ctx->log_unchecked(this, buf2);
}
void schedule_ack();
// called every time mtu_floor or mtu_ceiling are adjusted
void mtu_search_update();
void mtu_reset();
// Calculates the current receive window
size_t get_rcv_window()
{
// Trim window down according to what's already in buffer.
const size_t numbuf = utp_call_get_read_buffer_size(this->ctx, this);
assert((int)numbuf >= 0);
return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0;
}
// Test if we're ready to decay max_window
// XXX this breaks when spaced by > INT_MAX/2, which is 49
// days; the failure mode in that case is we do an extra decay
// or fail to do one when we really shouldn't.
bool can_decay_win(int64 msec) const
{
return (msec - last_rwin_decay) >= MAX_WINDOW_DECAY;
}
// If we can, decay max window, returns true if we actually did so
void maybe_decay_win(uint64 current_ms)
{
if (can_decay_win(current_ms)) {
// TCP uses 0.5
max_window = (size_t)(max_window * .5);
last_rwin_decay = current_ms;
if (max_window < MIN_WINDOW_SIZE)
max_window = MIN_WINDOW_SIZE;
slow_start = false;
ssthresh = max_window;
}
}
size_t get_header_size() const
{
return sizeof(PacketFormatV1);
}
size_t get_udp_mtu()
{
socklen_t len;
SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
return utp_call_get_udp_mtu(this->ctx, this, (const struct sockaddr *)&sa, len);
}
size_t get_udp_overhead()
{
socklen_t len;
SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
return utp_call_get_udp_overhead(this->ctx, this, (const struct sockaddr *)&sa, len);
}
size_t get_overhead()
{
return get_udp_overhead() + get_header_size();
}
void send_data(byte* b, size_t length, bandwidth_type_t type, uint32 flags = 0);
void send_ack(bool synack = false);
void send_keep_alive();
static void send_rst(utp_context *ctx,
const PackedSockAddr &addr, uint32 conn_id_send,
uint16 ack_nr, uint16 seq_nr);
void send_packet(OutgoingPacket *pkt);
bool is_full(int bytes = -1);
bool flush_packets();
void write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs);
#ifdef _DEBUG
void check_invariant();
#endif
void check_timeouts();
int ack_packet(uint16 seq);
size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt);
void selective_ack(uint base, const byte *mask, byte len);
void apply_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt);
size_t get_packet_size() const;
};
void removeSocketFromAckList(UTPSocket *conn)
{
if (conn->ida >= 0)
{
UTPSocket *last = conn->ctx->ack_sockets[conn->ctx->ack_sockets.GetCount() - 1];
assert(last->ida < (int)(conn->ctx->ack_sockets.GetCount()));
assert(conn->ctx->ack_sockets[last->ida] == last);
last->ida = conn->ida;
conn->ctx->ack_sockets[conn->ida] = last;
conn->ida = -1;
// Decrease the count
conn->ctx->ack_sockets.SetCount(conn->ctx->ack_sockets.GetCount() - 1);
}
}
static void utp_register_sent_packet(utp_context *ctx, size_t length)
{
if (length <= PACKET_SIZE_MID) {
if (length <= PACKET_SIZE_EMPTY) {
ctx->context_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++;
} else if (length <= PACKET_SIZE_SMALL) {
ctx->context_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++;
} else
ctx->context_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++;
} else {
if (length <= PACKET_SIZE_BIG) {
ctx->context_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++;
} else
ctx->context_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++;
}
}
void send_to_addr(utp_context *ctx, const byte *p, size_t len, const PackedSockAddr &addr, int flags = 0)
{
socklen_t tolen;
SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen);
utp_register_sent_packet(ctx, len);
utp_call_sendto(ctx, NULL, p, len, (const struct sockaddr *)&to, tolen, flags);
}
void UTPSocket::schedule_ack()
{
if (ida == -1){
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "schedule_ack");
#endif
ida = ctx->ack_sockets.Append(this);
} else {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "schedule_ack: already in list");
#endif
}
}
void UTPSocket::send_data(byte* b, size_t length, bandwidth_type_t type, uint32 flags)
{
// time stamp this packet with local time, the stamp goes into
// the header of every packet at the 8th byte for 8 bytes :
// two integers, check packet.h for more
uint64 time = utp_call_get_microseconds(ctx, this);
PacketFormatV1* b1 = (PacketFormatV1*)b;
b1->tv_usec = (uint32)time;
b1->reply_micro = reply_micro;
last_sent_packet = ctx->current_ms;
#ifdef _DEBUG
_stats.nbytes_xmit += length;
++_stats.nxmit;
#endif
if (ctx->callbacks[UTP_ON_OVERHEAD_STATISTICS]) {
size_t n;
if (type == payload_bandwidth) {
// if this packet carries payload, just
// count the header as overhead
type = header_overhead;
n = get_overhead();
} else {
n = length + get_udp_overhead();
}
utp_call_on_overhead_statistics(ctx, this, true, n, type);
}
#if UTP_DEBUG_LOGGING
int flags2 = b1->type();
uint16 seq_nr = b1->seq_nr;
uint16 ack_nr = b1->ack_nr;
log(UTP_LOG_DEBUG, "send %s len:%u id:%u timestamp:" I64u " reply_micro:%u flags:%s seq_nr:%u ack_nr:%u",
addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags2],
seq_nr, ack_nr);
#endif
send_to_addr(ctx, b, length, addr, flags);
removeSocketFromAckList(this);
}
void UTPSocket::send_ack(bool synack)
{
PacketFormatAckV1 pfa;
zeromem(&pfa);
size_t len;
last_rcv_win = get_rcv_window();
pfa.pf.set_version(1);
pfa.pf.set_type(ST_STATE);
pfa.pf.ext = 0;
pfa.pf.connid = conn_id_send;
pfa.pf.ack_nr = ack_nr;
pfa.pf.seq_nr = seq_nr;
pfa.pf.windowsize = (uint32)last_rcv_win;
len = sizeof(PacketFormatV1);
// we never need to send EACK for connections
// that are shutting down
if (reorder_count != 0 && state < CS_GOT_FIN) {
// if reorder count > 0, send an EACK.
// reorder count should always be 0
// for synacks, so this should not be
// as synack
assert(!synack);
pfa.pf.ext = 1;
pfa.ext_next = 0;
pfa.ext_len = 4;
uint m = 0;
// reorder count should only be non-zero
// if the packet ack_nr + 1 has not yet
// been received
assert(inbuf.get(ack_nr + 1) == NULL);
size_t window = min<size_t>(14+16, inbuf.size());
// Generate bit mask of segments received.
for (size_t i = 0; i < window; i++) {
if (inbuf.get(ack_nr + i + 2) != NULL) {
m |= 1 << i;
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "EACK packet [%u]", ack_nr + i + 2);
#endif
}
}
pfa.acks[0] = (byte)m;
pfa.acks[1] = (byte)(m >> 8);
pfa.acks[2] = (byte)(m >> 16);
pfa.acks[3] = (byte)(m >> 24);
len += 4 + 2;
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Sending EACK %u [%u] bits:[%032b]", ack_nr, conn_id_send, m);
#endif
} else {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Sending ACK %u [%u]", ack_nr, conn_id_send);
#endif
}
send_data((byte*)&pfa, len, ack_overhead);
removeSocketFromAckList(this);
}
void UTPSocket::send_keep_alive()
{
ack_nr--;
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Sending KeepAlive ACK %u [%u]", ack_nr, conn_id_send);
#endif
send_ack();
ack_nr++;
}
void UTPSocket::send_rst(utp_context *ctx,
const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr)
{
PacketFormatV1 pf1;
zeromem(&pf1);
size_t len;
pf1.set_version(1);
pf1.set_type(ST_RESET);
pf1.ext = 0;
pf1.connid = conn_id_send;
pf1.ack_nr = ack_nr;
pf1.seq_nr = seq_nr;
pf1.windowsize = 0;
len = sizeof(PacketFormatV1);
// LOG_DEBUG("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr);
// LOG_DEBUG("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send);
send_to_addr(ctx, (const byte*)&pf1, len, addr);
}
void UTPSocket::send_packet(OutgoingPacket *pkt)
{
// only count against the quota the first time we
// send the packet. Don't enforce quota when closing
// a socket. Only enforce the quota when we're sending
// at slow rates (max window < packet size)
//size_t max_send = min(max_window, opt_sndbuf, max_window_user);
time_t cur_time = utp_call_get_milliseconds(this->ctx, this);
if (pkt->transmissions == 0 || pkt->need_resend) {
cur_window += pkt->payload;
}
pkt->need_resend = false;
PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
p1->ack_nr = ack_nr;
pkt->time_sent = utp_call_get_microseconds(this->ctx, this);
//socklen_t salen;
//SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&salen);
bool use_as_mtu_probe = false;
// TODO: this is subject to nasty wrapping issues! Below as well
if (mtu_discover_time < (uint64)cur_time) {
// it's time to reset our MTU assupmtions
// and trigger a new search
mtu_reset();
}
// don't use packets that are larger then mtu_ceiling
// as probes, since they were probably used as probes
// already and failed, now we need it to fragment
// just to get it through
// if seq_nr == 1, the probe would end up being 0
// which is a magic number representing no-probe
// that why we don't send a probe for a packet with
// sequence number 0
if (mtu_floor < mtu_ceiling
&& pkt->length > mtu_floor
&& pkt->length <= mtu_ceiling
&& mtu_probe_seq == 0
&& seq_nr != 1
&& pkt->transmissions == 0) {
// we've already incremented seq_nr
// for this packet
mtu_probe_seq = (seq_nr - 1) & ACK_NR_MASK;
mtu_probe_size = pkt->length;
assert(pkt->length >= mtu_floor);
assert(pkt->length <= mtu_ceiling);
use_as_mtu_probe = true;
log(UTP_LOG_MTU, "MTU [PROBE] floor:%d ceiling:%d current:%d"
, mtu_floor, mtu_ceiling, mtu_probe_size);
}
pkt->transmissions++;
send_data((byte*)pkt->data, pkt->length,
(state == CS_SYN_SENT) ? connect_overhead
: (pkt->transmissions == 1) ? payload_bandwidth
: retransmit_overhead, use_as_mtu_probe ? UTP_UDP_DONTFRAG : 0);
}
bool UTPSocket::is_full(int bytes)
{
size_t packet_size = get_packet_size();
if (bytes < 0) bytes = packet_size;
else if (bytes > (int)packet_size) bytes = (int)packet_size;
size_t max_send = min(max_window, opt_sndbuf, max_window_user);
// subtract one to save space for the FIN packet
if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "is_full:false cur_window_packets:%d MAX:%d", cur_window_packets, OUTGOING_BUFFER_MAX_SIZE - 1);
#endif
last_maxed_out_window = ctx->current_ms;
return true;
}
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "is_full:%s. cur_window:%u pkt:%u max:%u cur_window_packets:%u max_window:%u"
, (cur_window + bytes > max_send) ? "true" : "false"
, cur_window, bytes, max_send, cur_window_packets
, max_window);
#endif
if (cur_window + bytes > max_send) {
last_maxed_out_window = ctx->current_ms;
return true;
}
return false;
}
bool UTPSocket::flush_packets()
{
size_t packet_size = get_packet_size();
// send packets that are waiting on the pacer to be sent
// i has to be an unsigned 16 bit counter to wrap correctly
// signed types are not guaranteed to wrap the way you expect
for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
// have we run out of quota?
if (is_full()) return true;
// Nagle check
// don't send the last packet if we have one packet in-flight
// and the current packet is still smaller than packet_size.
if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
cur_window_packets == 1 ||
pkt->payload >= packet_size) {
send_packet(pkt);
}
}
return false;
}
// @payload: number of bytes to send
// @flags: either ST_DATA, or ST_FIN
// @iovec: base address of iovec array
// @num_iovecs: number of iovecs in array
void UTPSocket::write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs)
{
// Setup initial timeout timer
if (cur_window_packets == 0) {
retransmit_timeout = rto;
rto_timeout = ctx->current_ms + retransmit_timeout;
assert(cur_window == 0);
}
size_t packet_size = get_packet_size();
do {
assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE);
assert(flags == ST_DATA || flags == ST_FIN);
size_t added = 0;
OutgoingPacket *pkt = NULL;
if (cur_window_packets > 0) {
pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
}
const size_t header_size = get_header_size();
bool append = true;
// if there's any room left in the last packet in the window
// and it hasn't been sent yet, fill that frame first
if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
// Use the previous unsent packet
added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
pkt = (OutgoingPacket*)realloc(pkt,
(sizeof(OutgoingPacket) - 1) +
header_size +
pkt->payload + added);
outbuf.put(seq_nr - 1, pkt);
append = false;
assert(!pkt->need_resend);
} else {
// Create the packet to send.
added = payload;
pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) +
header_size +
added);
pkt->payload = 0;
pkt->transmissions = 0;
pkt->need_resend = false;
}
if (added) {
assert(flags == ST_DATA);
// Fill it with data from the upper layer.
unsigned char *p = pkt->data + header_size + pkt->payload;
size_t needed = added;
/*
while (needed) {
*p = *(char*)iovec[0].iov_base;
p++;
iovec[0].iov_base = (char *)iovec[0].iov_base + 1;
needed--;
}
*/
for (size_t i = 0; i < num_iovecs && needed; i++) {
if (iovec[i].iov_len == 0)
continue;
size_t num = min<size_t>(needed, iovec[i].iov_len);
memcpy(p, iovec[i].iov_base, num);
p += num;
iovec[i].iov_len -= num;
iovec[i].iov_base = (byte*)iovec[i].iov_base + num; // iovec[i].iov_base += num, but without void* pointers
needed -= num;
}
assert(needed == 0);
}
pkt->payload += added;
pkt->length = header_size + pkt->payload;
last_rcv_win = get_rcv_window();
PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
p1->set_version(1);
p1->set_type(flags);
p1->ext = 0;
p1->connid = conn_id_send;
p1->windowsize = (uint32)last_rcv_win;
p1->ack_nr = ack_nr;
if (append) {
// Remember the message in the outgoing queue.
outbuf.ensure_size(seq_nr, cur_window_packets);
outbuf.put(seq_nr, pkt);
p1->seq_nr = seq_nr;
seq_nr++;
cur_window_packets++;
}
payload -= added;
} while (payload);
flush_packets();
}
#ifdef _DEBUG
void UTPSocket::check_invariant()
{
if (reorder_count > 0) {
assert(inbuf.get(ack_nr + 1) == NULL);
}
size_t outstanding_bytes = 0;
for (int i = 0; i < cur_window_packets; ++i) {
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
outstanding_bytes += pkt->payload;
}
assert(outstanding_bytes == cur_window);
}
#endif
void UTPSocket::check_timeouts()
{
#ifdef _DEBUG
check_invariant();
#endif
// this invariant should always be true
assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets));
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "CheckTimeouts timeout:%d max_window:%u cur_window:%u "
"state:%s cur_window_packets:%u",
(int)(rto_timeout - ctx->current_ms), (uint)max_window, (uint)cur_window,
statenames[state], cur_window_packets);
#endif
if (state != CS_DESTROY) flush_packets();
switch (state) {
case CS_SYN_SENT:
case CS_SYN_RECV:
case CS_CONNECTED_FULL:
case CS_CONNECTED:
case CS_FIN_SENT: {
// Reset max window...
if ((int)(ctx->current_ms - zerowindow_time) >= 0 && max_window_user == 0) {
max_window_user = PACKET_SIZE;
}
if ((int)(ctx->current_ms - rto_timeout) >= 0
&& rto_timeout > 0) {
bool ignore_loss = false;
if (cur_window_packets == 1
&& ((seq_nr - 1) & ACK_NR_MASK) == mtu_probe_seq
&& mtu_probe_seq != 0) {
// we only had a single outstanding packet that timed out, and it was the probe
mtu_ceiling = mtu_probe_size - 1;
mtu_search_update();
// this packet was most likely dropped because the packet size being
// too big and not because congestion. To accelerate the binary search for
// the MTU, resend immediately and don't reset the window size
ignore_loss = true;
log(UTP_LOG_MTU, "MTU [PROBE-TIMEOUT] floor:%d ceiling:%d current:%d"
, mtu_floor, mtu_ceiling, mtu_last);
}
// we dropepd the probe, clear these fields to
// allow us to send a new one
mtu_probe_seq = mtu_probe_size = 0;
log(UTP_LOG_MTU, "MTU [TIMEOUT]");
/*
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
// If there were a lot of retransmissions, force recomputation of round trip time
if (pkt->transmissions >= 4)
rtt = 0;
*/
// Increase RTO
const uint new_timeout = ignore_loss ? retransmit_timeout : retransmit_timeout * 2;
// They initiated the connection but failed to respond before the rto.
// A malicious client can also spoof the destination address of a ST_SYN bringing us to this state.
// Kill the connection and do not notify the upper layer
if (state == CS_SYN_RECV) {
state = CS_DESTROY;
utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
return;
}
// We initiated the connection but the other side failed to respond before the rto
if (retransmit_count >= 4 || (state == CS_SYN_SENT && retransmit_count >= 2)) {
// 4 consecutive transmissions have timed out. Kill it. If we
// haven't even connected yet, give up after only 2 consecutive
// failed transmissions.
if (state == CS_FIN_SENT)
state = CS_DESTROY;
else
state = CS_RESET;
utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
return;
}
retransmit_timeout = new_timeout;
rto_timeout = ctx->current_ms + new_timeout;
if (!ignore_loss) {
// On Timeout
duplicate_ack = 0;
int packet_size = get_packet_size();
if ((cur_window_packets == 0) && ((int)max_window > packet_size)) {
// we don't have any packets in-flight, even though
// we could. This implies that the connection is just
// idling. No need to be aggressive about resetting the
// congestion window. Just let it decay by a 3:rd.
// don't set it any lower than the packet size though
max_window = max(max_window * 2 / 3, size_t(packet_size));
} else {
// our delay was so high that our congestion window
// was shrunk below one packet, preventing us from
// sending anything for one time-out period. Now, reset
// the congestion window to fit one packet, to start over
// again
max_window = packet_size;
slow_start = true;
}
}
// every packet should be considered lost
for (int i = 0; i < cur_window_packets; ++i) {
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
pkt->need_resend = true;
assert(cur_window >= pkt->payload);
cur_window -= pkt->payload;
}
if (cur_window_packets > 0) {
retransmit_count++;
// used in parse_log.py
log(UTP_LOG_NORMAL, "Packet timeout. Resend. seq_nr:%u. timeout:%u "
"max_window:%u cur_window_packets:%d"
, seq_nr - cur_window_packets, retransmit_timeout
, (uint)max_window, int(cur_window_packets));
fast_timeout = true;
timeout_seq_nr = seq_nr;
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
assert(pkt);
// Re-send the packet.
send_packet(pkt);
}
}
// Mark the socket as writable. If the cwnd has grown, or if the number of
// bytes in-flight is lower than cwnd, we need to make the socket writable again
// in case it isn't
if (state == CS_CONNECTED_FULL && !is_full()) {
state = CS_CONNECTED;
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
(uint)max_window, (uint)cur_window, (uint)get_packet_size());
#endif
utp_call_on_state_change(this->ctx, this, UTP_STATE_WRITABLE);
}
if (state >= CS_CONNECTED && state < CS_GOT_FIN) {
if ((int)(ctx->current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
send_keep_alive();
}
}
break;
}
// Close?
case CS_GOT_FIN:
case CS_DESTROY_DELAY:
if ((int)(ctx->current_ms - rto_timeout) >= 0) {
state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
if (cur_window_packets > 0) {
utp_call_on_error(ctx, this, UTP_ECONNRESET);
}
}
break;
// prevent warning
case CS_UNINITIALIZED:
case CS_IDLE:
case CS_RESET:
case CS_DESTROY:
break;
}
}
// this should be called every time we change mtu_floor or mtu_ceiling
void UTPSocket::mtu_search_update()
{
assert(mtu_floor <= mtu_ceiling);
// binary search
mtu_last = (mtu_floor + mtu_ceiling) / 2;
// enable a new probe to be sent
mtu_probe_seq = mtu_probe_size = 0;
// if the floor and ceiling are close enough, consider the
// MTU binary search complete. We set the current value
// to floor since that's the only size we know can go through
// also set the ceiling to floor to terminate the searching
if (mtu_ceiling - mtu_floor <= 16) {
mtu_last = mtu_floor;
log(UTP_LOG_MTU, "MTU [DONE] floor:%d ceiling:%d current:%d"
, mtu_floor, mtu_ceiling, mtu_last);
mtu_ceiling = mtu_floor;
assert(mtu_floor <= mtu_ceiling);
// Do another search in 30 minutes
mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
}
}
void UTPSocket::mtu_reset()
{
mtu_ceiling = get_udp_mtu();
// Less would not pass TCP...
mtu_floor = 576;
log(UTP_LOG_MTU, "MTU [RESET] floor:%d ceiling:%d current:%d"
, mtu_floor, mtu_ceiling, mtu_last);
assert(mtu_floor <= mtu_ceiling);
mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
}
// returns:
// 0: the packet was acked.
// 1: it means that the packet had already been acked
// 2: the packet has not been sent yet
int UTPSocket::ack_packet(uint16 seq)
{
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq);
// the packet has already been acked (or not sent)
if (pkt == NULL) {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "got ack for:%u (already acked, or never sent)", seq);
#endif
return 1;
}
// can't ack packets that haven't been sent yet!
if (pkt->transmissions == 0) {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "got ack for:%u (never sent, pkt_size:%u need_resend:%u)",
seq, (uint)pkt->payload, pkt->need_resend);
#endif
return 2;
}
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "got ack for:%u (pkt_size:%u need_resend:%u)",
seq, (uint)pkt->payload, pkt->need_resend);
#endif
outbuf.put(seq, NULL);
// if we never re-sent the packet, update the RTT estimate
if (pkt->transmissions == 1) {
// Estimate the round trip time.
const uint32 ertt = (uint32)((utp_call_get_microseconds(this->ctx, this) - pkt->time_sent) / 1000);
if (rtt == 0) {
// First round trip time sample
rtt = ertt;
rtt_var = ertt / 2;
// sanity check. rtt should never be more than 6 seconds
// assert(rtt < 6000);
} else {
// Compute new round trip times
const int delta = (int)rtt - ertt;
rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
rtt = rtt - rtt/8 + ertt/8;
// sanity check. rtt should never be more than 6 seconds
// assert(rtt < 6000);
rtt_hist.add_sample(ertt, ctx->current_ms);
}
rto = max<uint>(rtt + rtt_var * 4, 1000);
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "rtt:%u avg:%u var:%u rto:%u",
ertt, rtt, rtt_var, rto);
#endif
}
retransmit_timeout = rto;
rto_timeout = ctx->current_ms + rto;
// if need_resend is set, this packet has already
// been considered timed-out, and is not included in
// the cur_window anymore
if (!pkt->need_resend) {
assert(cur_window >= pkt->payload);
cur_window -= pkt->payload;
}
free(pkt);
retransmit_count = 0;
return 0;
}
// count the number of bytes that were acked by the EACK header
size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt)
{
if (cur_window_packets == 0) return 0;
size_t acked_bytes = 0;
int bits = len * 8;
uint64 now = utp_call_get_microseconds(this->ctx, this);
do {
uint v = base + bits;
// ignore bits that haven't been sent yet
// see comment in UTPSocket::selective_ack
if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
continue;
// ignore bits that represents packets we haven't sent yet
// or packets that have already been acked
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
if (!pkt || pkt->transmissions == 0)
continue;
// Count the number of segments that were successfully received past it.
if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) {
assert((int)(pkt->payload) >= 0);
acked_bytes += pkt->payload;
if (pkt->time_sent < now)
min_rtt = min<int64>(min_rtt, now - pkt->time_sent);
else
min_rtt = min<int64>(min_rtt, 50000);
continue;
}
} while (--bits >= -1);
return acked_bytes;
}
enum { MAX_EACK = 128 };
void UTPSocket::selective_ack(uint base, const byte *mask, byte len)
{
if (cur_window_packets == 0) return;
// the range is inclusive [0, 31] bits
int bits = len * 8 - 1;
int count = 0;
// resends is a stack of sequence numbers we need to resend. Since we
// iterate in reverse over the acked packets, at the end, the top packets
// are the ones we want to resend
int resends[MAX_EACK];
int nr = 0;
#if UTP_DEBUG_LOGGING
char bitmask[1024] = {0};
int counter = bits;
for (int i = 0; i <= bits; ++i) {
bool bit_set = counter >= 0 && mask[counter>>3] & (1 << (counter & 7));
bitmask[i] = bit_set ? '1' : '0';
--counter;
}
log(UTP_LOG_DEBUG, "Got EACK [%s] base:%u", bitmask, base);
#endif
do {
// we're iterating over the bits from higher sequence numbers
// to lower (kind of in reverse order, wich might not be very
// intuitive)
uint v = base + bits;
// ignore bits that haven't been sent yet
// and bits that fall below the ACKed sequence number
// this can happen if an EACK message gets
// reordered and arrives after a packet that ACKs up past
// the base for thie EACK message
// this is essentially the same as:
// if v >= seq_nr || v <= seq_nr - cur_window_packets
// but it takes wrapping into account
// if v == seq_nr the -1 will make it wrap. if v > seq_nr
// it will also wrap (since it will fall further below 0)
// and be > cur_window_packets.
// if v == seq_nr - cur_window_packets, the result will be
// seq_nr - (seq_nr - cur_window_packets) - 1
// == seq_nr - seq_nr + cur_window_packets - 1
// == cur_window_packets - 1 which will be caught by the
// test. If v < seq_nr - cur_window_packets the result will grow
// fall furhter outside of the cur_window_packets range.
// sequence number space:
//
// rejected < accepted > rejected
// <============+--------------+============>
// ^ ^
// | |
// (seq_nr-wnd) seq_nr
if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
continue;
// this counts as a duplicate ack, even though we might have
// received an ack for this packet previously (in another EACK
// message for instance)
bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7));
// if this packet is acked, it counts towards the duplicate ack counter
if (bit_set) count++;
// ignore bits that represents packets we haven't sent yet
// or packets that have already been acked
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
if (!pkt || pkt->transmissions == 0) {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "skipping %u. pkt:%08x transmissions:%u %s",
v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)");
#endif
continue;
}
// Count the number of segments that were successfully received past it.
if (bit_set) {
// the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets
assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask));
ack_packet(v);
continue;
}
// Resend segments
// if count is less than our re-send limit, we haven't seen enough
// acked packets in front of this one to warrant a re-send.
// if count == 0, we're still going through the tail of zeroes
if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
count >= DUPLICATE_ACKS_BEFORE_RESEND) {
// resends is a stack, and we're mostly interested in the top of it
// if we're full, just throw away the lower half
if (nr >= MAX_EACK - 2) {
memmove(resends, &resends[MAX_EACK/2], MAX_EACK/2 * sizeof(resends[0]));
nr -= MAX_EACK / 2;
}
resends[nr++] = v;
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "no ack for %u", v);
#endif
} else {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
v, count, duplicate_ack, fast_resend_seq_nr);
#endif
}
} while (--bits >= -1);
if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
count >= DUPLICATE_ACKS_BEFORE_RESEND) {
// if we get enough duplicate acks to start
// resending, the first packet we should resend
// is base-1
resends[nr++] = (base - 1) & ACK_NR_MASK;
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "no ack for %u", (base - 1) & ACK_NR_MASK);
#endif
} else {
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
base - 1, count, duplicate_ack, fast_resend_seq_nr);
#endif
}
bool back_off = false;
int i = 0;
while (nr > 0) {
uint v = resends[--nr];
// don't consider the tail of 0:es to be lost packets
// only unacked packets with acked packets after should
// be considered lost
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
// this may be an old (re-ordered) packet, and some of the
// packets in here may have been acked already. In which
// case they will not be in the send queue anymore
if (!pkt) continue;
// used in parse_log.py
log(UTP_LOG_NORMAL, "Packet %u lost. Resending", v);
// On Loss
back_off = true;
#ifdef _DEBUG
++_stats.rexmit;
#endif
send_packet(pkt);
fast_resend_seq_nr = (v + 1) & ACK_NR_MASK;
// Re-send max 4 packets.
if (++i >= 4) break;
}
if (back_off)
maybe_decay_win(ctx->current_ms);
duplicate_ack = count;
}
void UTPSocket::apply_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
{
// the delay can never be greater than the rtt. The min_rtt
// variable is the RTT in microseconds
assert(min_rtt >= 0);
int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt));
assert(our_delay != INT_MAX);
assert(our_delay >= 0);
utp_call_on_delay_sample(this->ctx, this, our_delay / 1000);
// This test the connection under heavy load from foreground
// traffic. Pretend that our delays are very high to force the
// connection to use sub-packet size window sizes
//our_delay *= 4;
// target is microseconds
int target = target_delay;
if (target <= 0) target = 100000;
// this is here to compensate for very large clock drift that affects
// the congestion controller into giving certain endpoints an unfair
// share of the bandwidth. We have an estimate of the clock drift
// (clock_drift). The unit of this is microseconds per 5 seconds.
// empirically, a reasonable cut-off appears to be about 200000
// (which is pretty high). The main purpose is to compensate for
// people trying to "cheat" uTP by making their clock run slower,
// and this definitely catches that without any risk of false positives
// if clock_drift < -200000 start applying a penalty delay proportional
// to how far beoynd -200000 the clock drift is
int32 penalty = 0;
if (clock_drift < -200000) {
penalty = (-clock_drift - 200000) / 7;
our_delay += penalty;
}
double off_target = target - our_delay;
// this is the same as:
//
// (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
//
// so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
// of the target delay the current delay represents.
// The min() around off_target protects against crazy values of our_delay, which may happen when th
// timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
// of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
// as for large negative numbers, this direction is already capped at the min packet size further down
// the min around the bytes_acked protects against the case where the window size was recently
// shrunk and the number of acked bytes exceeds that. This is considered no more than one full
// window, in order to keep the gain within sane boundries.
assert(bytes_acked > 0);
double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
double delay_factor = off_target / target;
double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
// since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
// may increase per RTT, we may not increase the window size more than that proportional
// to the number of bytes that were acked, so that once one window has been acked (one rtt)
// the increase limit is not exceeded
// the +1. is to allow for floating point imprecision
assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked));
if (scaled_gain > 0 && ctx->current_ms - last_maxed_out_window > 1000) {
// if it was more than 1 second since we tried to send a packet
// and stopped because we hit the max window, we're most likely rate
// limited (which prevents us from ever hitting the window size)
// if this is the case, we cannot let the max_window grow indefinitely
scaled_gain = 0;
}
size_t ledbat_cwnd = (max_window + scaled_gain < MIN_WINDOW_SIZE) ? MIN_WINDOW_SIZE : (size_t)(max_window + scaled_gain);
if (slow_start) {
size_t ss_cwnd = (size_t)(max_window + window_factor*get_packet_size());
if (ss_cwnd > ssthresh) {
slow_start = false;
} else if (our_delay > target*0.9) {
// even if we're a little under the target delay, we conservatively
// discontinue the slow start phase
slow_start = false;
ssthresh = max_window;
} else {
max_window = max(ss_cwnd, ledbat_cwnd);
}
} else {
max_window = ledbat_cwnd;
}
// make sure that the congestion window is below max
// make sure that we don't shrink our window too small
max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf);
// used in parse_log.py
log(UTP_LOG_NORMAL, "actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u "
"delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u "
"scaled_gain:%f rtt:%u rate:%u wnduser:%u rto:%u timeout:%d get_microseconds:" I64u " "
"cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u "
"average_delay:%d clock_drift:%d clock_drift_raw:%d delay_penalty:%d current_delay_sum:" I64u
"current_delay_samples:%d average_delay_base:%d last_maxed_out_window:" I64u " opt_sndbuf:%d "
"current_ms:" I64u "",
actual_delay, our_delay / 1000, their_hist.get_value() / 1000,
int(off_target / 1000), uint(max_window), uint32(our_hist.delay_base),
int((our_delay + their_hist.get_value()) / 1000), int(target / 1000), uint(bytes_acked),
(uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt,
(uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)),
(uint)max_window_user, rto, (int)(rto_timeout - ctx->current_ms),
utp_call_get_microseconds(this->ctx, this), cur_window_packets, (uint)get_packet_size(),
their_hist.delay_base, their_hist.delay_base + their_hist.get_value(),
average_delay, clock_drift, clock_drift_raw, penalty / 1000,
current_delay_sum, current_delay_samples, average_delay_base,
uint64(last_maxed_out_window), int(opt_sndbuf), uint64(ctx->current_ms));
}
static void utp_register_recv_packet(UTPSocket *conn, size_t len)
{
#ifdef _DEBUG
++conn->_stats.nrecv;
conn->_stats.nbytes_recv += len;
#endif
if (len <= PACKET_SIZE_MID) {
if (len <= PACKET_SIZE_EMPTY) {
conn->ctx->context_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++;
} else if (len <= PACKET_SIZE_SMALL) {
conn->ctx->context_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++;
} else
conn->ctx->context_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++;
} else {
if (len <= PACKET_SIZE_BIG) {
conn->ctx->context_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++;
} else
conn->ctx->context_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++;
}
}
// returns the max number of bytes of payload the uTP
// connection is allowed to send
size_t UTPSocket::get_packet_size() const
{
int header_size = sizeof(PacketFormatV1);
size_t mtu = mtu_last ? mtu_last : mtu_ceiling;
return mtu - header_size;
}
// Process an incoming packet
// syn is true if this is the first packet received. It will cut off parsing
// as soon as the header is done
size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false)
{
utp_register_recv_packet(conn, len);
conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
const PacketFormatV1 *pf1 = (PacketFormatV1*)packet;
const byte *packet_end = packet + len;
uint16 pk_seq_nr = pf1->seq_nr;
uint16 pk_ack_nr = pf1->ack_nr;
uint8 pk_flags = pf1->type();
if (pk_flags >= ST_NUM_STATES) return 0;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Got %s. seq_nr:%u ack_nr:%u state:%s timestamp:" I64u " reply_micro:%u"
, flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state]
, uint64(pf1->tv_usec), (uint32)(pf1->reply_micro));
#endif
// mark receipt time
uint64 time = utp_call_get_microseconds(conn->ctx, conn);
// window packets size is used to calculate a minimum
// permissible range for received acks. connections with acks falling
// out of this range are dropped
const uint16 curr_window = max<uint16>(conn->cur_window_packets + ACK_NR_ALLOWED_WINDOW, ACK_NR_ALLOWED_WINDOW);
// ignore packets whose ack_nr is invalid. This would imply a spoofed address
// or a malicious attempt to attach the uTP implementation.
// acking a packet that hasn't been sent yet!
// SYN packets have an exception, since there are no previous packets
if ((pk_flags != ST_SYN || conn->state != CS_SYN_RECV) &&
(wrapping_compare_less(conn->seq_nr - 1, pk_ack_nr, ACK_NR_MASK)
|| wrapping_compare_less(pk_ack_nr, conn->seq_nr - 1 - curr_window, ACK_NR_MASK))) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Invalid ack_nr: %u. our seq_nr: %u last unacked: %u"
, pk_ack_nr, conn->seq_nr, (conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK);
#endif
return 0;
}
// RSTs are handled earlier, since the connid matches the send id not the recv id
assert(pk_flags != ST_RESET);
// TODO: maybe send a ST_RESET if we're in CS_RESET?
const byte *selack_ptr = NULL;
// Unpack UTP packet options
// Data pointer
const byte *data = (const byte*)pf1 + conn->get_header_size();
if (conn->get_header_size() > len) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Invalid packet size (less than header size)");
#endif
return 0;
}
// Skip the extension headers
uint extension = pf1->ext;
if (extension != 0) {
do {
// Verify that the packet is valid.
data += 2;
if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Invalid len of extensions");
#endif
return 0;
}
switch(extension) {
case 1: // Selective Acknowledgment
selack_ptr = data;
break;
case 2: // extension bits
if (data[-1] != 8) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Invalid len of extension bits header");
#endif
return 0;
}
memcpy(conn->extensions, data, 8);
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x",
conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3],
conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]);
#endif
}
extension = data[-2];
data += data[-1];
} while (extension);
}
if (conn->state == CS_SYN_SENT) {
// if this is a syn-ack, initialize our ack_nr
// to match the sequence number we got from
// the other end
conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK;
}
conn->last_got_packet = conn->ctx->current_ms;
if (syn) {
return 0;
}
// seqnr is the number of packets past the expected
// packet this is. ack_nr is the last acked, seq_nr is the
// current. Subtracring 1 makes 0 mean "this is the next
// expected packet".
const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;
// Getting an invalid sequence number?
if (seqnr >= REORDER_BUFFER_MAX_SIZE) {
if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) {
conn->schedule_ack();
}
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, " Got old Packet/Ack (%u/%u)=%u"
, pk_seq_nr, conn->ack_nr, seqnr);
#endif
return 0;
}
// Process acknowledgment
// acks is the number of packets that was acked
int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK;
// this happens when we receive an old ack nr
if (acks > conn->cur_window_packets) acks = 0;
// if we get the same ack_nr as in the last packet
// increase the duplicate_ack counter, otherwise reset
// it to 0.
// It's important to only count ACKs in ST_STATE packets. Any other
// packet (primarily ST_DATA) is likely to have been sent because of the
// other end having new outgoing data, not in response to incoming data.
// For instance, if we're receiving a steady stream of payload with no
// outgoing data, and we suddently have a few bytes of payload to send (say,
// a bittorrent HAVE message), we're very likely to see 3 duplicate ACKs
// immediately after sending our payload packet. This effectively disables
// the fast-resend on duplicate-ack logic for bi-directional connections
// (except in the case of a selective ACK). This is in line with BSD4.4 TCP
// implementation.
if (conn->cur_window_packets > 0) {
if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK)
&& conn->cur_window_packets > 0
&& pk_flags == ST_STATE) {
++conn->duplicate_ack;
if (conn->duplicate_ack == DUPLICATE_ACKS_BEFORE_RESEND && conn->mtu_probe_seq) {
// It's likely that the probe was rejected due to its size, but we haven't got an
// ICMP report back yet
if (pk_ack_nr == ((conn->mtu_probe_seq - 1) & ACK_NR_MASK)) {
conn->mtu_ceiling = conn->mtu_probe_size - 1;
conn->mtu_search_update();
conn->log(UTP_LOG_MTU, "MTU [DUPACK] floor:%d ceiling:%d current:%d"
, conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
} else {
// A non-probe was blocked before our probe.
// Can't conclude much, send a new probe
conn->mtu_probe_seq = conn->mtu_probe_size = 0;
}
}
} else {
conn->duplicate_ack = 0;
}
// TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
// and fast_resend_seq_nr <= ack_nr + 1
// resend ack_nr + 1
// also call maybe_decay_win()
}
// figure out how many bytes were acked
size_t acked_bytes = 0;
// the minimum rtt of all acks
// this is the upper limit on the delay we get back
// from the other peer. Our delay cannot exceed
// the rtt of the packet. If it does, clamp it.
// this is done in apply_ledbat_ccontrol()
int64 min_rtt = INT64_MAX;
uint64 now = utp_call_get_microseconds(conn->ctx, conn);
for (int i = 0; i < acks; ++i) {
int seq = (conn->seq_nr - conn->cur_window_packets + i) & ACK_NR_MASK;
OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq);
if (pkt == 0 || pkt->transmissions == 0) continue;
assert((int)(pkt->payload) >= 0);
acked_bytes += pkt->payload;
if (conn->mtu_probe_seq && seq == conn->mtu_probe_seq) {
conn->mtu_floor = conn->mtu_probe_size;
conn->mtu_search_update();
conn->log(UTP_LOG_MTU, "MTU [ACK] floor:%d ceiling:%d current:%d"
, conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
}
// in case our clock is not monotonic
if (pkt->time_sent < now)
min_rtt = min<int64>(min_rtt, now - pkt->time_sent);
else
min_rtt = min<int64>(min_rtt, 50000);
}
// count bytes acked by EACK
if (selack_ptr != NULL) {
acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK,
selack_ptr, selack_ptr[-1], min_rtt);
}
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u",
acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt);
#endif
uint64 p = pf1->tv_usec;
conn->last_measured_delay = conn->ctx->current_ms;
// get delay in both directions
// record the delay to report back
const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
conn->reply_micro = their_delay;
uint32 prev_delay_base = conn->their_hist.delay_base;
if (their_delay != 0) conn->their_hist.add_sample(their_delay, conn->ctx->current_ms);
// if their new delay base is less than their previous one
// we should shift our delay base in the other direction in order
// to take the clock skew into account
if (prev_delay_base != 0 &&
wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base, TIMESTAMP_MASK)) {
// never adjust more than 10 milliseconds
if (prev_delay_base - conn->their_hist.delay_base <= 10000) {
conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base);
}
}
const uint32 actual_delay = (uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro));
// if the actual delay is 0, it means the other end
// hasn't received a sample from us yet, and doesn't
// know what it is. We can't update out history unless
// we have a true measured sample
if (actual_delay != 0) {
conn->our_hist.add_sample(actual_delay, conn->ctx->current_ms);
// this is keeping an average of the delay samples
// we've recevied within the last 5 seconds. We sum
// all the samples and increase the count in order to
// calculate the average every 5 seconds. The samples
// are based off of the average_delay_base to deal with
// wrapping counters.
if (conn->average_delay_base == 0) conn->average_delay_base = actual_delay;
int64 average_delay_sample = 0;
// distance walking from lhs to rhs, downwards
const uint32 dist_down = conn->average_delay_base - actual_delay;
// distance walking from lhs to rhs, upwards
const uint32 dist_up = actual_delay - conn->average_delay_base;
if (dist_down > dist_up) {
// assert(dist_up < INT_MAX / 4);
// average_delay_base < actual_delay, we should end up
// with a positive sample
average_delay_sample = dist_up;
} else {
// assert(-int64(dist_down) < INT_MAX / 4);
// average_delay_base >= actual_delay, we should end up
// with a negative sample
average_delay_sample = -int64(dist_down);
}
conn->current_delay_sum += average_delay_sample;
++conn->current_delay_samples;
if (conn->ctx->current_ms > conn->average_sample_time) {
int32 prev_average_delay = conn->average_delay;
assert(conn->current_delay_sum / conn->current_delay_samples < INT_MAX);
assert(conn->current_delay_sum / conn->current_delay_samples > -INT_MAX);
// write the new average
conn->average_delay = (int32)(conn->current_delay_sum / conn->current_delay_samples);
// each slot represents 5 seconds
conn->average_sample_time += 5000;
conn->current_delay_sum = 0;
conn->current_delay_samples = 0;
// this makes things very confusing when logging the average delay
//#if !g_log_utp
// normalize the average samples
// since we're only interested in the slope
// of the curve formed by the average delay samples,
// we can cancel out the actual offset to make sure
// we won't have problems with wrapping.
int min_sample = min(prev_average_delay, conn->average_delay);
int max_sample = max(prev_average_delay, conn->average_delay);
// normalize around zero. Try to keep the min <= 0 and max >= 0
int adjust = 0;
if (min_sample > 0) {
// adjust all samples (and the baseline) down by min_sample
adjust = -min_sample;
} else if (max_sample < 0) {
// adjust all samples (and the baseline) up by -max_sample
adjust = -max_sample;
}
if (adjust) {
conn->average_delay_base -= adjust;
conn->average_delay += adjust;
prev_average_delay += adjust;
}
//#endif
// update the clock drift estimate
// the unit is microseconds per 5 seconds
// what we're doing is just calculating the average of the
// difference between each slot. Since each slot is 5 seconds
// and the timestamps unit are microseconds, we'll end up with
// the average slope across our history. If there is a consistent
// trend, it will show up in this value
//int64 slope = 0;
int32 drift = conn->average_delay - prev_average_delay;
// clock_drift is a rolling average
conn->clock_drift = (int64(conn->clock_drift) * 7 + drift) / 8;
conn->clock_drift_raw = drift;
}
}
// if our new delay base is less than our previous one
// we should shift the other end's delay base in the other
// direction in order to take the clock skew into account
// This is commented out because it creates bad interactions
// with our adjustment in the other direction. We don't really
// need our estimates of the other peer to be very accurate
// anyway. The problem with shifting here is that we're more
// likely shift it back later because of a low latency. This
// second shift back would cause us to shift our delay base
// which then get's into a death spiral of shifting delay bases
/* if (prev_delay_base != 0 &&
wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
// never adjust more than 10 milliseconds
if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
}
}
*/
// if the delay estimate exceeds the RTT, adjust the base_delay to
// compensate
assert(min_rtt >= 0);
if (int64(conn->our_hist.get_value()) > min_rtt) {
conn->our_hist.shift((uint32)(conn->our_hist.get_value() - min_rtt));
}
// only apply the congestion controller on acks
// if we don't have a delay measurement, there's
// no point in invoking the congestion control
if (actual_delay != 0 && acked_bytes >= 1)
conn->apply_ccontrol(acked_bytes, actual_delay, min_rtt);
// sanity check, the other end should never ack packets
// past the point we've sent
if (acks <= conn->cur_window_packets) {
conn->max_window_user = pf1->windowsize;
// If max user window is set to 0, then we startup a timer
// That will reset it to 1 after 15 seconds.
if (conn->max_window_user == 0)
// Reset max_window_user to 1 every 15 seconds.
conn->zerowindow_time = conn->ctx->current_ms + 15000;
// Respond to connect message
// Switch to CONNECTED state.
// If this is an ack and we're in still handshaking
// transition over to the connected state.
// Incoming connection completion
if (pk_flags == ST_DATA && conn->state == CS_SYN_RECV) {
conn->state = CS_CONNECTED;
}
// Outgoing connection completion
if (pk_flags == ST_STATE && conn->state == CS_SYN_SENT) {
conn->state = CS_CONNECTED;
// If the user has defined the ON_CONNECT callback, use that to
// notify the user that the socket is now connected. If ON_CONNECT
// has not been defined, notify the user via ON_STATE_CHANGE.
if (conn->ctx->callbacks[UTP_ON_CONNECT])
utp_call_on_connect(conn->ctx, conn);
else
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_CONNECT);
// We've sent a fin, and everything was ACKed (including the FIN),
// it's safe to destroy the socket. cur_window_packets == acks
// means that this packet acked all the remaining packets that
// were in-flight.
} else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks) {
conn->state = CS_DESTROY;
}
// Update fast resend counter
if (wrapping_compare_less(conn->fast_resend_seq_nr
, (pk_ack_nr + 1) & ACK_NR_MASK, ACK_NR_MASK))
conn->fast_resend_seq_nr = (pk_ack_nr + 1) & ACK_NR_MASK;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "fast_resend_seq_nr:%u", conn->fast_resend_seq_nr);
#endif
for (int i = 0; i < acks; ++i) {
int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets);
// if ack_status is 0, the packet was acked.
// if acl_stauts is 1, it means that the packet had already been acked
// if it's 2, the packet has not been sent yet
// We need to break this loop in the latter case. This could potentially
// happen if we get an ack_nr that does not exceed what we have stuffed
// into the outgoing buffer, but does exceed what we have sent
if (ack_status == 2) {
#ifdef _DEBUG
OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
assert(pkt->transmissions == 0);
#endif
break;
}
conn->cur_window_packets--;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "decementing cur_window_packets:%u", conn->cur_window_packets);
#endif
}
#ifdef _DEBUG
if (conn->cur_window_packets == 0)
assert(conn->cur_window == 0);
#endif
// packets in front of this may have been acked by a
// selective ack (EACK). Keep decreasing the window packet size
// until we hit a packet that is still waiting to be acked
// in the send queue
// this is especially likely to happen when the other end
// has the EACK send bug older versions of uTP had
while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)) {
conn->cur_window_packets--;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "decementing cur_window_packets:%u", conn->cur_window_packets);
#endif
}
#ifdef _DEBUG
if (conn->cur_window_packets == 0)
assert(conn->cur_window == 0);
#endif
// this invariant should always be true
assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
// flush Nagle
if (conn->cur_window_packets == 1) {
OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1);
// do we still have quota?
if (pkt->transmissions == 0) {
conn->send_packet(pkt);
}
}
// Fast timeout-retry
if (conn->fast_timeout) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr);
#endif
// if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already
// resent the packet that timed out, and we should leave the fast-timeout mode.
if (((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) {
conn->fast_timeout = false;
} else {
// resend the oldest packet and increment fast_resend_seq_nr
// to not allow another fast resend on it again
OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
if (pkt && pkt->transmissions > 0) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Packet %u fast timeout-retry.", conn->seq_nr - conn->cur_window_packets);
#endif
#ifdef _DEBUG
++conn->_stats.fastrexmit;
#endif
conn->fast_resend_seq_nr++;
conn->send_packet(pkt);
}
}
}
}
// Process selective acknowledgent
if (selack_ptr != NULL) {
conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]);
}
// this invariant should always be true
assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u ",
acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets);
#endif
// In case the ack dropped the current window below
// the max_window size, Mark the socket as writable
if (conn->state == CS_CONNECTED_FULL && !conn->is_full()) {
conn->state = CS_CONNECTED;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
(uint)conn->max_window, (uint)conn->cur_window, (uint)conn->get_packet_size());
#endif
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_WRITABLE);
}
if (pk_flags == ST_STATE) {
// This is a state packet only.
return 0;
}
// The connection is not in a state that can accept data?
if (conn->state != CS_CONNECTED &&
conn->state != CS_CONNECTED_FULL &&
conn->state != CS_FIN_SENT) {
return 0;
}
// Is this a finalize packet?
if (pk_flags == ST_FIN && !conn->got_fin) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Got FIN eof_pkt:%u", pk_seq_nr);
#endif
conn->got_fin = true;
conn->eof_pkt = pk_seq_nr;
// at this point, it is possible for the
// other end to have sent packets with
// sequence numbers higher than seq_nr.
// if this is the case, our reorder_count
// is out of sync. This case is dealt with
// when we re-order and hit the eof_pkt.
// we'll just ignore any packets with
// sequence numbers past this
}
// Getting an in-order packet?
if (seqnr == 0) {
size_t count = packet_end - data;
if (count > 0 && conn->state != CS_FIN_SENT) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Got Data len:%u (rb:%u)", (uint)count, (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
#endif
// Post bytes to the upper layer
utp_call_on_read(conn->ctx, conn, data, count);
}
conn->ack_nr++;
// Check if the next packet has been received too, but waiting
// in the reorder buffer.
for (;;) {
if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
if (conn->state != CS_FIN_SENT) {
conn->state = CS_GOT_FIN;
conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Posting EOF");
#endif
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
}
// if the other end wants to close, ack
conn->send_ack();
// reorder_count is not necessarily 0 at this point.
// even though it is most of the time, the other end
// may have sent packets with higher sequence numbers
// than what later end up being eof_pkt
// since we have received all packets up to eof_pkt
// just ignore the ones after it.
conn->reorder_count = 0;
}
// Quick get-out in case there is nothing to reorder
if (conn->reorder_count == 0)
break;
// Check if there are additional buffers in the reorder buffers
// that need delivery.
byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
if (p == NULL)
break;
conn->inbuf.put(conn->ack_nr+1, NULL);
count = *(uint*)p;
if (count > 0 && conn->state != CS_FIN_SENT) {
// Pass the bytes to the upper layer
utp_call_on_read(conn->ctx, conn, p + sizeof(uint), count);
}
conn->ack_nr++;
// Free the element from the reorder buffer
free(p);
assert(conn->reorder_count > 0);
conn->reorder_count--;
}
conn->schedule_ack();
} else {
// Getting an out of order packet.
// The packet needs to be remembered and rearranged later.
// if we have received a FIN packet, and the EOF-sequence number
// is lower than the sequence number of the packet we just received
// something is wrong.
if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Got an invalid packet sequence number, past EOF "
"reorder_count:%u len:%u (rb:%u)",
conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
#endif
return 0;
}
// if the sequence number is entirely off the expected
// one, just drop it. We can't allocate buffer space in
// the inbuf entirely based on untrusted input
if (seqnr > 0x3ff) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "0x%08x: Got an invalid packet sequence number, too far off "
"reorder_count:%u len:%u (rb:%u)",
conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
#endif
return 0;
}
// we need to grow the circle buffer before we
// check if the packet is already in here, so that
// we don't end up looking at an older packet (since
// the indices wraps around).
conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);
// Has this packet already been received? (i.e. a duplicate)
// If that is the case, just discard it.
if (conn->inbuf.get(pk_seq_nr) != NULL) {
#ifdef _DEBUG
++conn->_stats.nduprecv;
#endif
return 0;
}
// Allocate memory to fit the packet that needs to re-ordered
byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
*(uint*)mem = (uint)(packet_end - data);
memcpy(mem + sizeof(uint), data, packet_end - data);
// Insert into reorder buffer and increment the count
// of # of packets to be reordered.
// we add one to seqnr in order to leave the last
// entry empty, that way the assert in send_ack
// is valid. we have to add one to seqnr too, in order
// to make the circular buffer grow around the correct
// point (which is conn->ack_nr + 1).
assert(conn->inbuf.get(pk_seq_nr) == NULL);
assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
conn->inbuf.put(pk_seq_nr, mem);
conn->reorder_count++;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
#endif
conn->schedule_ack();
}
return (size_t)(packet_end - data);
}
inline byte UTP_Version(PacketFormatV1 const* pf)
{
return (pf->type() < ST_NUM_STATES && pf->ext < 3 ? pf->version() : 0);
}
UTPSocket::~UTPSocket()
{
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Killing socket");
#endif
utp_call_on_state_change(ctx, this, UTP_STATE_DESTROYING);
if (ctx->last_utp_socket == this) {
ctx->last_utp_socket = NULL;
}
// Remove object from the global hash table
UTPSocketKeyData* kd = ctx->utp_sockets->Delete(UTPSocketKey(addr, conn_id_recv));
assert(kd);
// remove the socket from ack_sockets if it was there also
removeSocketFromAckList(this);
// Free all memory occupied by the socket object.
for (size_t i = 0; i <= inbuf.mask; i++) {
free(inbuf.elements[i]);
}
for (size_t i = 0; i <= outbuf.mask; i++) {
free(outbuf.elements[i]);
}
// TODO: The circular buffer should have a destructor
free(inbuf.elements);
free(outbuf.elements);
}
void UTP_FreeAll(struct UTPSocketHT *utp_sockets) {
utp_hash_iterator_t it;
UTPSocketKeyData* keyData;
while ((keyData = utp_sockets->Iterate(it))) {
delete keyData->socket;
}
}
void utp_initialize_socket( utp_socket *conn,
const struct sockaddr *addr,
socklen_t addrlen,
bool need_seed_gen,
uint32 conn_seed,
uint32 conn_id_recv,
uint32 conn_id_send)
{
PackedSockAddr psaddr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen);
if (need_seed_gen) {
do {
conn_seed = utp_call_get_random(conn->ctx, conn);
// we identify v1 and higher by setting the first two bytes to 0x0001
conn_seed &= 0xffff;
} while (conn->ctx->utp_sockets->Lookup(UTPSocketKey(psaddr, conn_seed)));
conn_id_recv += conn_seed;
conn_id_send += conn_seed;
}
conn->state = CS_IDLE;
conn->conn_seed = conn_seed;
conn->conn_id_recv = conn_id_recv;
conn->conn_id_send = conn_id_send;
conn->addr = psaddr;
conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, NULL);
conn->last_got_packet = conn->ctx->current_ms;
conn->last_sent_packet = conn->ctx->current_ms;
conn->last_measured_delay = conn->ctx->current_ms + 0x70000000;
conn->average_sample_time = conn->ctx->current_ms + 5000;
conn->last_rwin_decay = conn->ctx->current_ms - MAX_WINDOW_DECAY;
conn->our_hist.clear(conn->ctx->current_ms);
conn->their_hist.clear(conn->ctx->current_ms);
conn->rtt_hist.clear(conn->ctx->current_ms);
// initialize MTU floor and ceiling
conn->mtu_reset();
conn->mtu_last = conn->mtu_ceiling;
conn->ctx->utp_sockets->Add(UTPSocketKey(conn->addr, conn->conn_id_recv))->socket = conn;
// we need to fit one packet in the window when we start the connection
conn->max_window = conn->get_packet_size();
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP socket initialized");
#endif
}
utp_socket* utp_create_socket(utp_context *ctx)
{
assert(ctx);
if (!ctx) return NULL;
UTPSocket *conn = new UTPSocket; // TODO: UTPSocket should have a constructor
conn->state = CS_UNINITIALIZED;
conn->ctx = ctx;
conn->userdata = NULL;
conn->reorder_count = 0;
conn->duplicate_ack = 0;
conn->timeout_seq_nr = 0;
conn->last_rcv_win = 0;
conn->got_fin = false;
conn->fast_timeout = false;
conn->rtt = 0;
conn->retransmit_timeout = 0;
conn->rto_timeout = 0;
conn->zerowindow_time = 0;
conn->average_delay = 0;
conn->current_delay_samples = 0;
conn->cur_window = 0;
conn->eof_pkt = 0;
conn->last_maxed_out_window = 0;
conn->mtu_probe_seq = 0;
conn->mtu_probe_size = 0;
conn->current_delay_sum = 0;
conn->average_delay_base = 0;
conn->retransmit_count = 0;
conn->rto = 3000;
conn->rtt_var = 800;
conn->seq_nr = 1;
conn->ack_nr = 0;
conn->max_window_user = 255 * PACKET_SIZE;
conn->cur_window_packets = 0;
conn->fast_resend_seq_nr = conn->seq_nr;
conn->target_delay = ctx->target_delay;
conn->reply_micro = 0;
conn->opt_sndbuf = ctx->opt_sndbuf;
conn->opt_rcvbuf = ctx->opt_rcvbuf;
conn->slow_start = true;
conn->ssthresh = conn->opt_sndbuf;
conn->clock_drift = 0;
conn->clock_drift_raw = 0;
conn->outbuf.mask = 15;
conn->inbuf.mask = 15;
conn->outbuf.elements = (void**)calloc(16, sizeof(void*));
conn->inbuf.elements = (void**)calloc(16, sizeof(void*));
conn->ida = -1; // set the index of every new socket in ack_sockets to
// -1, which also means it is not in ack_sockets yet
memset(conn->extensions, 0, sizeof(conn->extensions));
#ifdef _DEBUG
memset(&conn->_stats, 0, sizeof(utp_socket_stats));
#endif
return conn;
}
int utp_context_set_option(utp_context *ctx, int opt, int val)
{
assert(ctx);
if (!ctx) return -1;
switch (opt) {
case UTP_LOG_NORMAL:
ctx->log_normal = val ? true : false;
return 0;
case UTP_LOG_MTU:
ctx->log_mtu = val ? true : false;
return 0;
case UTP_LOG_DEBUG:
ctx->log_debug = val ? true : false;
return 0;
case UTP_TARGET_DELAY:
ctx->target_delay = val;
return 0;
case UTP_SNDBUF:
assert(val >= 1);
ctx->opt_sndbuf = val;
return 0;
case UTP_RCVBUF:
assert(val >= 1);
ctx->opt_rcvbuf = val;
return 0;
}
return -1;
}
int utp_context_get_option(utp_context *ctx, int opt)
{
assert(ctx);
if (!ctx) return -1;
switch (opt) {
case UTP_LOG_NORMAL: return ctx->log_normal ? 1 : 0;
case UTP_LOG_MTU: return ctx->log_mtu ? 1 : 0;
case UTP_LOG_DEBUG: return ctx->log_debug ? 1 : 0;
case UTP_TARGET_DELAY: return ctx->target_delay;
case UTP_SNDBUF: return ctx->opt_sndbuf;
case UTP_RCVBUF: return ctx->opt_rcvbuf;
}
return -1;
}
int utp_setsockopt(UTPSocket* conn, int opt, int val)
{
assert(conn);
if (!conn) return -1;
switch (opt) {
case UTP_SNDBUF:
assert(val >= 1);
conn->opt_sndbuf = val;
return 0;
case UTP_RCVBUF:
assert(val >= 1);
conn->opt_rcvbuf = val;
return 0;
case UTP_TARGET_DELAY:
conn->target_delay = val;
return 0;
}
return -1;
}
int utp_getsockopt(UTPSocket* conn, int opt)
{
assert(conn);
if (!conn) return -1;
switch (opt) {
case UTP_SNDBUF: return conn->opt_sndbuf;
case UTP_RCVBUF: return conn->opt_rcvbuf;
case UTP_TARGET_DELAY: return conn->target_delay;
}
return -1;
}
// Try to connect to a specified host.
int utp_connect(utp_socket *conn, const struct sockaddr *to, socklen_t tolen)
{
assert(conn);
if (!conn) return -1;
assert(conn->state == CS_UNINITIALIZED);
if (conn->state != CS_UNINITIALIZED) {
conn->state = CS_DESTROY;
return -1;
}
utp_initialize_socket(conn, to, tolen, true, 0, 0, 1);
assert(conn->cur_window_packets == 0);
assert(conn->outbuf.get(conn->seq_nr) == NULL);
assert(sizeof(PacketFormatV1) == 20);
conn->state = CS_SYN_SENT;
conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
// Create and send a connect message
// used in parse_log.py
conn->log(UTP_LOG_NORMAL, "UTP_Connect conn_seed:%u packet_size:%u (B) "
"target_delay:%u (ms) delay_history:%u "
"delay_base_history:%u (minutes)",
conn->conn_seed, PACKET_SIZE, conn->target_delay / 1000,
CUR_DELAY_SIZE, DELAY_BASE_HISTORY);
// Setup initial timeout timer.
conn->retransmit_timeout = 3000;
conn->rto_timeout = conn->ctx->current_ms + conn->retransmit_timeout;
conn->last_rcv_win = conn->get_rcv_window();
// if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
//conn->seq_nr = 1;
conn->seq_nr = utp_call_get_random(conn->ctx, conn);
// Create the connect packet.
const size_t header_size = sizeof(PacketFormatV1);
OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_size);
PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
memset(p1, 0, header_size);
// SYN packets are special, and have the receive ID in the connid field,
// instead of conn_id_send.
p1->set_version(1);
p1->set_type(ST_SYN);
p1->ext = 0;
p1->connid = conn->conn_id_recv;
p1->windowsize = (uint32)conn->last_rcv_win;
p1->seq_nr = conn->seq_nr;
pkt->transmissions = 0;
pkt->length = header_size;
pkt->payload = 0;
/*
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Sending connect %s [%u].",
addrfmt(conn->addr, addrbuf), conn_seed);
#endif
*/
// Remember the message in the outgoing queue.
conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets);
conn->outbuf.put(conn->seq_nr, pkt);
conn->seq_nr++;
conn->cur_window_packets++;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "incrementing cur_window_packets:%u", conn->cur_window_packets);
#endif
conn->send_packet(pkt);
return 0;
}
// Returns 1 if the UDP payload was recognized as a UTP packet, or 0 if it was not
int utp_process_udp(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
{
assert(ctx);
if (!ctx) return 0;
assert(buffer);
if (!buffer) return 0;
assert(to);
if (!to) return 0;
const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
if (len < sizeof(PacketFormatV1)) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len);
#endif
return 0;
}
const PacketFormatV1 *pf1 = (PacketFormatV1*)buffer;
const byte version = UTP_Version(pf1);
const uint32 id = uint32(pf1->connid);
if (version != 1) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u version:%u unsupported version", addrfmt(addr, addrbuf), (uint)len, version);
#endif
return 0;
}
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id);
ctx->log(UTP_LOG_DEBUG, NULL, "recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr);
#endif
const byte flags = pf1->type();
if (flags == ST_RESET) {
// id is either our recv id or our send id
// if it's our send id, and we initiated the connection, our recv id is id + 1
// if it's our send id, and we did not initiate the connection, our recv id is id - 1
// we have to check every case
UTPSocketKeyData* keyData;
if ( (keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id))) ||
((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1))) && keyData->socket->conn_id_send == id) ||
((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id - 1))) && keyData->socket->conn_id_send == id))
{
UTPSocket* conn = keyData->socket;
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv RST for existing connection");
#endif
if (conn->state == CS_FIN_SENT)
conn->state = CS_DESTROY;
else
conn->state = CS_RESET;
utp_call_on_overhead_statistics(conn->ctx, conn, false, len + conn->get_udp_overhead(), close_overhead);
const int err = (conn->state == CS_SYN_SENT) ? UTP_ECONNREFUSED : UTP_ECONNRESET;
utp_call_on_error(conn->ctx, conn, err);
}
else {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv RST for unknown connection");
#endif
}
return 1;
}
else if (flags != ST_SYN) {
UTPSocket* conn = NULL;
if (ctx->last_utp_socket && ctx->last_utp_socket->addr == addr && ctx->last_utp_socket->conn_id_recv == id) {
conn = ctx->last_utp_socket;
} else {
UTPSocketKeyData* keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id));
if (keyData) {
conn = keyData->socket;
ctx->last_utp_socket = conn;
}
}
if (conn) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv processing");
#endif
const size_t read = utp_process_incoming(conn, buffer, len);
utp_call_on_overhead_statistics(conn->ctx, conn, false, (len - read) + conn->get_udp_overhead(), header_overhead);
return 1;
}
}
// We have not found a matching utp_socket, and this isn't a SYN. Reject it.
const uint32 seq_nr = pf1->seq_nr;
if (flags != ST_SYN) {
ctx->current_ms = utp_call_get_milliseconds(ctx, NULL);
for (size_t i = 0; i < ctx->rst_info.GetCount(); i++) {
if ((ctx->rst_info[i].connid == id) &&
(ctx->rst_info[i].addr == addr) &&
(ctx->rst_info[i].ack_nr == seq_nr))
{
ctx->rst_info[i].timestamp = ctx->current_ms;
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv not sending RST to non-SYN (stored)");
#endif
return 1;
}
}
if (ctx->rst_info.GetCount() > RST_INFO_LIMIT) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv not sending RST to non-SYN (limit at %u stored)", (uint)ctx->rst_info.GetCount());
#endif
return 1;
}
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv send RST to non-SYN (%u stored)", (uint)ctx->rst_info.GetCount());
#endif
RST_Info &r = ctx->rst_info.Append();
r.addr = addr;
r.connid = id;
r.ack_nr = seq_nr;
r.timestamp = ctx->current_ms;
UTPSocket::send_rst(ctx, addr, id, seq_nr, utp_call_get_random(ctx, NULL));
return 1;
}
if (ctx->callbacks[UTP_ON_ACCEPT]) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "Incoming connection from %s", addrfmt(addr, addrbuf));
#endif
UTPSocketKeyData* keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1));
if (keyData) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, connection already exists");
#endif
return 1;
}
if (ctx->utp_sockets->GetCount() > 3000) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, too many uTP sockets %d", ctx->utp_sockets->GetCount());
#endif
return 1;
}
// true means yes, block connection. false means no, don't block.
if (utp_call_on_firewall(ctx, to, tolen)) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, firewall callback returned true");
#endif
return 1;
}
// Create a new UTP socket to handle this new connection
UTPSocket *conn = utp_create_socket(ctx);
utp_initialize_socket(conn, to, tolen, false, id, id+1, id);
conn->ack_nr = seq_nr;
conn->seq_nr = utp_call_get_random(ctx, NULL);
conn->fast_resend_seq_nr = conn->seq_nr;
conn->state = CS_SYN_RECV;
const size_t read = utp_process_incoming(conn, buffer, len, true);
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "recv send connect ACK");
#endif
conn->send_ack(true);
utp_call_on_accept(ctx, conn, to, tolen);
// we report overhead after on_accept(), because the callbacks are setup now
utp_call_on_overhead_statistics(conn->ctx, conn, false, (len - read) + conn->get_udp_overhead(), header_overhead); // SYN
utp_call_on_overhead_statistics(conn->ctx, conn, true, conn->get_overhead(), ack_overhead); // SYNACK
}
else {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, UTP_ON_ACCEPT callback not set");
#endif
}
return 1;
}
// Called by utp_process_icmp_fragmentation() and utp_process_icmp_error() below
static UTPSocket* parse_icmp_payload(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
{
assert(ctx);
if (!ctx) return NULL;
assert(buffer);
if (!buffer) return NULL;
assert(to);
if (!to) return NULL;
const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
// ICMP packets are only required to quote the first 8 bytes of the layer4
// payload. The UDP payload is 8 bytes, and the UTP header is another 20
// bytes. So, in order to find the entire UTP header, we need the ICMP
// packet to quote 28 bytes.
if (len < sizeof(PacketFormatV1)) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: runt length %d", addrfmt(addr, addrbuf), len);
#endif
return NULL;
}
const PacketFormatV1 *pf = (PacketFormatV1*)buffer;
const byte version = UTP_Version(pf);
const uint32 id = uint32(pf->connid);
if (version != 1) {
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: not UTP version 1", addrfmt(addr, addrbuf));
#endif
return NULL;
}
UTPSocketKeyData* keyData;
if ( (keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id))) ||
((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1))) && keyData->socket->conn_id_send == id) ||
((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id - 1))) && keyData->socket->conn_id_send == id))
{
return keyData->socket;
}
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: No matching connection found for id %u", addrfmt(addr, addrbuf), id);
#endif
return NULL;
}
// Should be called when an ICMP Type 3, Code 4 packet (fragmentation needed) is received, to adjust the MTU
//
// Returns 1 if the UDP payload (delivered in the ICMP packet) was recognized as a UTP packet, or 0 if it was not
//
// @ctx: utp_context
// @buf: Contents of the original UDP payload, which the ICMP packet quoted. *Not* the ICMP packet itself.
// @len: buffer length
// @to: destination address of the original UDP pakcet
// @tolen: address length
// @next_hop_mtu:
int utp_process_icmp_fragmentation(utp_context *ctx, const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen, uint16 next_hop_mtu)
{
UTPSocket* conn = parse_icmp_payload(ctx, buffer, len, to, tolen);
if (!conn) return 0;
// Constrain the next_hop_mtu to sane values. It might not be initialized or sent properly
if (next_hop_mtu >= 576 && next_hop_mtu < 0x2000) {
conn->mtu_ceiling = min<uint32>(next_hop_mtu, conn->mtu_ceiling);
conn->mtu_search_update();
// this is something of a speecial case, where we don't set mtu_last
// to the value in between the floor and the ceiling. We can update the
// floor, because there might be more network segments after the one
// that sent this ICMP with smaller MTUs. But we want to test this
// MTU size first. If the next probe gets through, mtu_floor is updated
conn->mtu_last = conn->mtu_ceiling;
} else {
// Otherwise, binary search. At this point we don't actually know
// what size the packet that failed was, and apparently we can't
// trust the next hop mtu either. It seems reasonably conservative
// to just lower the ceiling. This should not happen on working networks
// anyway.
conn->mtu_ceiling = (conn->mtu_floor + conn->mtu_ceiling) / 2;
conn->mtu_search_update();
}
conn->log(UTP_LOG_MTU, "MTU [ICMP] floor:%d ceiling:%d current:%d", conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
return 1;
}
// Should be called when an ICMP message is received that should tear down the connection.
//
// Returns 1 if the UDP payload (delivered in the ICMP packet) was recognized as a UTP packet, or 0 if it was not
//
// @ctx: utp_context
// @buf: Contents of the original UDP payload, which the ICMP packet quoted. *Not* the ICMP packet itself.
// @len: buffer length
// @to: destination address of the original UDP pakcet
// @tolen: address length
int utp_process_icmp_error(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
{
UTPSocket* conn = parse_icmp_payload(ctx, buffer, len, to, tolen);
if (!conn) return 0;
const int err = (conn->state == CS_SYN_SENT) ? UTP_ECONNREFUSED : UTP_ECONNRESET;
const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
switch(conn->state) {
// Don't pass on errors for idle/closed connections
case CS_IDLE:
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s in state CS_IDLE, ignoring", addrfmt(addr, addrbuf));
#endif
return 1;
case CS_FIN_SENT:
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s in state CS_FIN_SENT, setting state to CS_DESTROY and causing error %d", addrfmt(addr, addrbuf), err);
#endif
conn->state = CS_DESTROY;
break;
default:
#if UTP_DEBUG_LOGGING
ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s, setting state to CS_RESET and causing error %d", addrfmt(addr, addrbuf), err);
#endif
conn->state = CS_RESET;
break;
}
utp_call_on_error(conn->ctx, conn, err);
return 1;
}
// Write bytes to the UTP socket. Returns the number of bytes written.
// 0 indicates the socket is no longer writable, -1 indicates an error
ssize_t utp_writev(utp_socket *conn, struct utp_iovec *iovec_input, size_t num_iovecs)
{
static utp_iovec iovec[UTP_IOV_MAX];
assert(conn);
if (!conn) return -1;
assert(iovec_input);
if (!iovec_input) return -1;
assert(num_iovecs);
if (!num_iovecs) return -1;
if (num_iovecs > UTP_IOV_MAX)
num_iovecs = UTP_IOV_MAX;
memcpy(iovec, iovec_input, sizeof(struct utp_iovec)*num_iovecs);
size_t bytes = 0;
size_t sent = 0;
for (size_t i = 0; i < num_iovecs; i++)
bytes += iovec[i].iov_len;
#if UTP_DEBUG_LOGGING
size_t param = bytes;
#endif
if (conn->state != CS_CONNECTED) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = false (not CS_CONNECTED)", (uint)bytes);
#endif
return 0;
}
conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
// don't send unless it will all fit in the window
size_t packet_size = conn->get_packet_size();
size_t num_to_send = min<size_t>(bytes, packet_size);
while (!conn->is_full(num_to_send)) {
// Send an outgoing packet.
// Also add it to the outgoing of packets that have been sent but not ACKed.
bytes -= num_to_send;
sent += num_to_send;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u cur_window_packets:%u",
conn->seq_nr, conn->ack_nr,
(uint)(conn->cur_window + num_to_send),
(uint)conn->max_window, (uint)conn->max_window_user,
(uint)conn->last_rcv_win, num_to_send,
conn->cur_window_packets);
#endif
conn->write_outgoing_packet(num_to_send, ST_DATA, iovec, num_iovecs);
num_to_send = min<size_t>(bytes, packet_size);
if (num_to_send == 0) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = true", (uint)param);
#endif
return sent;
}
}
bool full = conn->is_full();
if (full) {
// mark the socket as not being writable.
conn->state = CS_CONNECTED_FULL;
}
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = %s", (uint)bytes, full ? "false" : "true");
#endif
// returns whether or not the socket is still writable
// if the congestion window is not full, we can still write to it
//return !full;
return sent;
}
void utp_read_drained(utp_socket *conn)
{
assert(conn);
if (!conn) return;
assert(conn->state != CS_UNINITIALIZED);
if (conn->state == CS_UNINITIALIZED) return;
const size_t rcvwin = conn->get_rcv_window();
if (rcvwin > conn->last_rcv_win) {
// If last window was 0 send ACK immediately, otherwise should set timer
if (conn->last_rcv_win == 0) {
conn->send_ack();
} else {
conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
conn->schedule_ack();
}
}
}
// Should be called each time the UDP socket is drained
void utp_issue_deferred_acks(utp_context *ctx)
{
assert(ctx);
if (!ctx) return;
for (size_t i = 0; i < ctx->ack_sockets.GetCount(); i++) {
UTPSocket *conn = ctx->ack_sockets[i];
conn->send_ack();
i--;
}
}
// Should be called every 500ms
void utp_check_timeouts(utp_context *ctx)
{
assert(ctx);
if (!ctx) return;
ctx->current_ms = utp_call_get_milliseconds(ctx, NULL);
if (ctx->current_ms - ctx->last_check < TIMEOUT_CHECK_INTERVAL)
return;
ctx->last_check = ctx->current_ms;
for (size_t i = 0; i < ctx->rst_info.GetCount(); i++) {
if ((int)(ctx->current_ms - ctx->rst_info[i].timestamp) >= RST_INFO_TIMEOUT) {
ctx->rst_info.MoveUpLast(i);
i--;
}
}
if (ctx->rst_info.GetCount() != ctx->rst_info.GetAlloc()) {
ctx->rst_info.Compact();
}
utp_hash_iterator_t it;
UTPSocketKeyData* keyData;
while ((keyData = ctx->utp_sockets->Iterate(it))) {
UTPSocket *conn = keyData->socket;
conn->check_timeouts();
// Check if the object was deleted
if (conn->state == CS_DESTROY) {
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "Destroying");
#endif
delete conn;
}
}
}
int utp_getpeername(utp_socket *conn, struct sockaddr *addr, socklen_t *addrlen)
{
assert(addr);
if (!addr) return -1;
assert(addrlen);
if (!addrlen) return -1;
assert(conn);
if (!conn) return -1;
assert(conn->state != CS_UNINITIALIZED);
if (conn->state == CS_UNINITIALIZED) return -1;
socklen_t len;
const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len);
*addrlen = min(len, *addrlen);
memcpy(addr, &sa, *addrlen);
return 0;
}
int utp_get_delays(UTPSocket *conn, uint32 *ours, uint32 *theirs, uint32 *age)
{
assert(conn);
if (!conn) return -1;
assert(conn->state != CS_UNINITIALIZED);
if (conn->state == CS_UNINITIALIZED) {
if (ours) *ours = 0;
if (theirs) *theirs = 0;
if (age) *age = 0;
return -1;
}
if (ours) *ours = conn->our_hist.get_value();
if (theirs) *theirs = conn->their_hist.get_value();
if (age) *age = (uint32)(conn->ctx->current_ms - conn->last_measured_delay);
return 0;
}
// Close the UTP socket.
// It is not valid for the upper layer to refer to socket after it is closed.
// Data will keep to try being delivered after the close.
void utp_close(UTPSocket *conn)
{
assert(conn);
if (!conn) return;
assert(conn->state != CS_UNINITIALIZED
&& conn->state != CS_DESTROY_DELAY
&& conn->state != CS_FIN_SENT
&& conn->state != CS_DESTROY);
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "UTP_Close in state:%s", statenames[conn->state]);
#endif
switch(conn->state) {
case CS_CONNECTED:
case CS_CONNECTED_FULL:
conn->state = CS_FIN_SENT;
conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
break;
case CS_SYN_SENT:
conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
// fall through
case CS_GOT_FIN:
conn->state = CS_DESTROY_DELAY;
break;
case CS_SYN_RECV:
// fall through
default:
conn->state = CS_DESTROY;
break;
}
}
utp_context* utp_get_context(utp_socket *socket) {
assert(socket);
return socket ? socket->ctx : NULL;
}
void* utp_set_userdata(utp_socket *socket, void *userdata) {
assert(socket);
if (socket) socket->userdata = userdata;
return socket ? socket->userdata : NULL;
}
void* utp_get_userdata(utp_socket *socket) {
assert(socket);
return socket ? socket->userdata : NULL;
}
void struct_utp_context::log(int level, utp_socket *socket, char const *fmt, ...)
{
if (!would_log(level)) {
return;
}
va_list va;
va_start(va, fmt);
log_unchecked(socket, fmt, va);
va_end(va);
}
void struct_utp_context::log_unchecked(utp_socket *socket, char const *fmt, ...)
{
va_list va;
char buf[4096];
va_start(va, fmt);
vsnprintf(buf, 4096, fmt, va);
buf[4095] = '\0';
va_end(va);
utp_call_log(this, socket, (const byte *)buf);
}
inline bool struct_utp_context::would_log(int level)
{
if (level == UTP_LOG_NORMAL) return log_normal;
if (level == UTP_LOG_MTU) return log_mtu;
if (level == UTP_LOG_DEBUG) return log_debug;
return true;
}
utp_socket_stats* utp_get_stats(utp_socket *socket)
{
#ifdef _DEBUG
assert(socket);
if (!socket) return NULL;
socket->_stats.mtu_guess = socket->mtu_last ? socket->mtu_last : socket->mtu_ceiling;
return &socket->_stats;
#else
return NULL;
#endif
}