]> git.localhorst.tv Git - blank.git/blob - src/net/chunk.cpp
more packet introspection from payload
[blank.git] / src / net / chunk.cpp
1 #include "ChunkReceiver.hpp"
2 #include "ChunkTransmission.hpp"
3 #include "ChunkTransmitter.hpp"
4
5 #include "ClientConnection.hpp"
6 #include "Packet.hpp"
7 #include "../world/Chunk.hpp"
8 #include "../world/ChunkStore.hpp"
9
10 #include <iostream>
11 #include <zlib.h>
12 #include <glm/gtx/io.hpp>
13
14 using namespace std;
15
16
17 namespace blank {
18
19 ChunkReceiver::ChunkReceiver(ChunkStore &store)
20 : store(store)
21 , transmissions()
22 , timer(5000) {
23         timer.Start();
24 }
25
26 ChunkReceiver::~ChunkReceiver() {
27
28 }
29
30 void ChunkReceiver::Update(int dt) {
31         timer.Update(dt);
32         for (ChunkTransmission &trans : transmissions) {
33                 if (trans.active && (timer.Elapsed() - trans.last_update) > timer.Interval()) {
34                         cout << "timeout for transmission of chunk " << trans.coords << endl;
35                         trans.Clear();
36                 }
37         }
38         if (transmissions.size() > 3) {
39                 for (auto iter = transmissions.begin(), end = transmissions.end(); iter != end; ++iter) {
40                         if (!iter->active) {
41                                 transmissions.erase(iter);
42                                 break;
43                         }
44                 }
45         }
46 }
47
48 void ChunkReceiver::Handle(const Packet::ChunkBegin &pack) {
49         uint32_t id;
50         pack.ReadTransmissionId(id);
51         ChunkTransmission &trans = GetTransmission(id);
52         pack.ReadFlags(trans.flags);
53         pack.ReadChunkCoords(trans.coords);
54         pack.ReadDataSize(trans.data_size);
55         trans.last_update = timer.Elapsed();
56         trans.header_received = true;
57         Commit(trans);
58 }
59
60 void ChunkReceiver::Handle(const Packet::ChunkData &pack) {
61         uint32_t id, pos, size;
62         pack.ReadTransmissionId(id);
63         pack.ReadDataOffset(pos);
64         if (pos >= sizeof(ChunkTransmission::buffer)) {
65                 cout << "received chunk data offset outside of buffer size" << endl;
66                 return;
67         }
68         pack.ReadDataSize(size);
69         ChunkTransmission &trans = GetTransmission(id);
70         size_t len = min(size_t(size), sizeof(ChunkTransmission::buffer) - pos);
71         pack.ReadData(&trans.buffer[pos], len);
72         // TODO: this method breaks when a packet arrives twice
73         trans.data_received += len;
74         trans.last_update = timer.Elapsed();
75         Commit(trans);
76 }
77
78 ChunkTransmission &ChunkReceiver::GetTransmission(uint32_t id) {
79         // search for ongoing
80         for (ChunkTransmission &trans : transmissions) {
81                 if (trans.active && trans.id == id) {
82                         return trans;
83                 }
84         }
85         // search for unused
86         for (ChunkTransmission &trans : transmissions) {
87                 if (!trans.active) {
88                         trans.active = true;
89                         trans.id = id;
90                         return trans;
91                 }
92         }
93         // allocate new
94         transmissions.emplace_back();
95         transmissions.back().active = true;
96         transmissions.back().id = id;
97         return transmissions.back();
98 }
99
100 void ChunkReceiver::Commit(ChunkTransmission &trans) {
101         if (!trans.Complete()) return;
102
103         Chunk *chunk = store.Allocate(trans.coords);
104         if (!chunk) {
105                 // chunk no longer of interes, just drop the data
106                 // it should probably be cached to disk, but not now :P
107                 trans.Clear();
108                 return;
109         }
110
111         const Byte *src = &trans.buffer[0];
112         uLong src_len = min(size_t(trans.data_size), sizeof(ChunkTransmission::buffer));
113         Byte *dst = reinterpret_cast<Byte *>(chunk->BlockData());
114         uLong dst_len = Chunk::BlockSize();
115
116         if (trans.Compressed()) {
117                 if (uncompress(dst, &dst_len, src, src_len) != Z_OK) {
118                         // omg, now what?
119                         cout << "got corruped chunk data for " << trans.coords << endl;
120                 }
121         } else {
122                 memcpy(dst, src, min(src_len, dst_len));
123         }
124         trans.Clear();
125 }
126
127 ChunkTransmission::ChunkTransmission()
128 : id(0)
129 , flags(0)
130 , coords()
131 , data_size(0)
132 , data_received(0)
133 , last_update(0)
134 , header_received(false)
135 , active(false)
136 , buffer() {
137
138 }
139
140 void ChunkTransmission::Clear() noexcept {
141         data_size = 0;
142         data_received = 0;
143         last_update = 0;
144         header_received = false;
145         active = false;
146 }
147
148 bool ChunkTransmission::Complete() const noexcept {
149         return header_received && data_received == data_size;
150 }
151
152 bool ChunkTransmission::Compressed() const noexcept {
153         return flags & 1;
154 }
155
156
157 ChunkTransmitter::ChunkTransmitter(ClientConnection &conn)
158 : conn(conn)
159 , current(nullptr)
160 , buffer_size(Chunk::BlockSize() + 10)
161 , buffer(new uint8_t[buffer_size])
162 , buffer_len(0)
163 , packet_len(Packet::ChunkData::MAX_DATA_LEN)
164 , cursor(0)
165 , num_packets(0)
166 , begin_packet(-1)
167 , data_packets()
168 , confirm_wait(0)
169 , trans_id(0)
170 , compressed(false) {
171
172 }
173
174 ChunkTransmitter::~ChunkTransmitter() {
175         Abort();
176 }
177
178 bool ChunkTransmitter::Idle() const noexcept {
179         return !Transmitting() && !Waiting();
180 }
181
182 bool ChunkTransmitter::Transmitting() const noexcept {
183         return cursor < num_packets;
184 }
185
186 void ChunkTransmitter::Transmit() {
187         if (cursor < num_packets) {
188                 SendData(cursor);
189                 ++cursor;
190         }
191 }
192
193 bool ChunkTransmitter::Waiting() const noexcept {
194         return confirm_wait > 0;
195 }
196
197 void ChunkTransmitter::Ack(uint16_t seq) {
198         if (!Waiting()) {
199                 return;
200         }
201         if (seq == begin_packet) {
202                 begin_packet = -1;
203                 --confirm_wait;
204                 if (Idle()) {
205                         Release();
206                 }
207                 return;
208         }
209         for (int i = 0, end = data_packets.size(); i < end; ++i) {
210                 if (seq == data_packets[i]) {
211                         data_packets[i] = -1;
212                         --confirm_wait;
213                         if (Idle()) {
214                                 Release();
215                         }
216                         return;
217                 }
218         }
219 }
220
221 void ChunkTransmitter::Nack(uint16_t seq) {
222         if (!Waiting()) {
223                 return;
224         }
225         if (seq == begin_packet) {
226                 SendBegin();
227                 return;
228         }
229         for (size_t i = 0, end = data_packets.size(); i < end; ++i) {
230                 if (seq == data_packets[i]) {
231                         SendData(i);
232                         return;
233                 }
234         }
235 }
236
237 void ChunkTransmitter::Abort() {
238         if (!current) return;
239
240         Release();
241
242         begin_packet = -1;
243         data_packets.clear();
244         confirm_wait = 0;
245 }
246
247 void ChunkTransmitter::Send(Chunk &chunk) {
248         // abort current chunk, if any
249         Abort();
250
251         current = &chunk;
252         current->Ref();
253
254         // load new chunk data
255         compressed = true;
256         buffer_len = buffer_size;
257         if (compress(buffer.get(), &buffer_len, reinterpret_cast<const Bytef *>(chunk.BlockData()), Chunk::BlockSize()) != Z_OK) {
258                 // compression failed, send it uncompressed
259                 buffer_len = Chunk::BlockSize();
260                 memcpy(buffer.get(), chunk.BlockData(), buffer_len);
261                 compressed = false;
262         }
263         cursor = 0;
264         num_packets = (buffer_len / packet_len) + (buffer_len % packet_len != 0);
265         data_packets.resize(num_packets, -1);
266
267         ++trans_id;
268         SendBegin();
269 }
270
271 void ChunkTransmitter::SendBegin() {
272         uint32_t flags = compressed;
273         auto pack = conn.Prepare<Packet::ChunkBegin>();
274         pack.WriteTransmissionId(trans_id);
275         pack.WriteFlags(flags);
276         pack.WriteChunkCoords(current->Position());
277         pack.WriteDataSize(buffer_len);
278         if (begin_packet == -1) {
279                 ++confirm_wait;
280         }
281         begin_packet = conn.Send();
282 }
283
284 void ChunkTransmitter::SendData(size_t i) {
285         int pos = i * packet_len;
286         int len = min(packet_len, buffer_len - pos);
287         const uint8_t *data = &buffer[pos];
288
289         auto pack = conn.Prepare<Packet::ChunkData>();
290         pack.WriteTransmissionId(trans_id);
291         pack.WriteDataOffset(pos);
292         pack.WriteDataSize(len);
293         pack.WriteData(data, len);
294
295         if (data_packets[i] == -1) {
296                 ++confirm_wait;
297         }
298         data_packets[i] = conn.Send();
299 }
300
301 void ChunkTransmitter::Release() {
302         if (current) {
303                 current->UnRef();
304                 current = nullptr;
305         }
306 }
307
308 }