1 #include "ChunkReceiver.hpp"
2 #include "ChunkTransmission.hpp"
3 #include "ChunkTransmitter.hpp"
5 #include "ClientConnection.hpp"
7 #include "../world/Chunk.hpp"
8 #include "../world/ChunkStore.hpp"
12 #include <glm/gtx/io.hpp>
19 ChunkReceiver::ChunkReceiver(ChunkStore &store)
26 ChunkReceiver::~ChunkReceiver() {
30 void ChunkReceiver::Update(int dt) {
32 for (ChunkTransmission &trans : transmissions) {
33 if (trans.active && (timer.Elapsed() - trans.last_update) > timer.Interval()) {
34 cout << "timeout for transmission of chunk " << trans.coords << endl;
38 if (transmissions.size() > 3) {
39 for (auto iter = transmissions.begin(), end = transmissions.end(); iter != end; ++iter) {
41 transmissions.erase(iter);
48 void ChunkReceiver::Handle(const Packet::ChunkBegin &pack) {
50 pack.ReadTransmissionId(id);
51 ChunkTransmission &trans = GetTransmission(id);
52 pack.ReadFlags(trans.flags);
53 pack.ReadChunkCoords(trans.coords);
54 pack.ReadDataSize(trans.data_size);
55 trans.last_update = timer.Elapsed();
56 trans.header_received = true;
60 void ChunkReceiver::Handle(const Packet::ChunkData &pack) {
61 uint32_t id, pos, size;
62 pack.ReadTransmissionId(id);
63 pack.ReadDataOffset(pos);
64 if (pos >= sizeof(ChunkTransmission::buffer)) {
65 cout << "received chunk data offset outside of buffer size" << endl;
68 pack.ReadDataSize(size);
69 ChunkTransmission &trans = GetTransmission(id);
70 size_t len = min(size_t(size), sizeof(ChunkTransmission::buffer) - pos);
71 pack.ReadData(&trans.buffer[pos], len);
72 // TODO: this method breaks when a packet arrives twice
73 trans.data_received += len;
74 trans.last_update = timer.Elapsed();
78 ChunkTransmission &ChunkReceiver::GetTransmission(uint32_t id) {
80 for (ChunkTransmission &trans : transmissions) {
81 if (trans.active && trans.id == id) {
86 for (ChunkTransmission &trans : transmissions) {
94 transmissions.emplace_back();
95 transmissions.back().active = true;
96 transmissions.back().id = id;
97 return transmissions.back();
100 void ChunkReceiver::Commit(ChunkTransmission &trans) {
101 if (!trans.Complete()) return;
103 Chunk *chunk = store.Allocate(trans.coords);
105 // chunk no longer of interes, just drop the data
106 // it should probably be cached to disk, but not now :P
111 const Byte *src = &trans.buffer[0];
112 uLong src_len = min(size_t(trans.data_size), sizeof(ChunkTransmission::buffer));
113 Byte *dst = reinterpret_cast<Byte *>(chunk->BlockData());
114 uLong dst_len = Chunk::BlockSize();
116 if (trans.Compressed()) {
117 if (uncompress(dst, &dst_len, src, src_len) != Z_OK) {
119 cout << "got corruped chunk data for " << trans.coords << endl;
122 memcpy(dst, src, min(src_len, dst_len));
127 ChunkTransmission::ChunkTransmission()
134 , header_received(false)
140 void ChunkTransmission::Clear() noexcept {
144 header_received = false;
148 bool ChunkTransmission::Complete() const noexcept {
149 return header_received && data_received == data_size;
152 bool ChunkTransmission::Compressed() const noexcept {
157 ChunkTransmitter::ChunkTransmitter(ClientConnection &conn)
160 , buffer_size(Chunk::BlockSize() + 10)
161 , buffer(new uint8_t[buffer_size])
163 , packet_len(Packet::ChunkData::MAX_DATA_LEN)
170 , compressed(false) {
174 ChunkTransmitter::~ChunkTransmitter() {
178 bool ChunkTransmitter::Idle() const noexcept {
179 return !Transmitting() && !Waiting();
182 bool ChunkTransmitter::Transmitting() const noexcept {
183 return cursor < num_packets;
186 void ChunkTransmitter::Transmit() {
187 if (cursor < num_packets) {
193 bool ChunkTransmitter::Waiting() const noexcept {
194 return confirm_wait > 0;
197 void ChunkTransmitter::Ack(uint16_t seq) {
201 if (seq == begin_packet) {
209 for (int i = 0, end = data_packets.size(); i < end; ++i) {
210 if (seq == data_packets[i]) {
211 data_packets[i] = -1;
221 void ChunkTransmitter::Nack(uint16_t seq) {
225 if (seq == begin_packet) {
229 for (size_t i = 0, end = data_packets.size(); i < end; ++i) {
230 if (seq == data_packets[i]) {
237 void ChunkTransmitter::Abort() {
238 if (!current) return;
243 data_packets.clear();
247 void ChunkTransmitter::Send(Chunk &chunk) {
248 // abort current chunk, if any
254 // load new chunk data
256 buffer_len = buffer_size;
257 if (compress(buffer.get(), &buffer_len, reinterpret_cast<const Bytef *>(chunk.BlockData()), Chunk::BlockSize()) != Z_OK) {
258 // compression failed, send it uncompressed
259 buffer_len = Chunk::BlockSize();
260 memcpy(buffer.get(), chunk.BlockData(), buffer_len);
264 num_packets = (buffer_len / packet_len) + (buffer_len % packet_len != 0);
265 data_packets.resize(num_packets, -1);
271 void ChunkTransmitter::SendBegin() {
272 uint32_t flags = compressed;
273 auto pack = conn.Prepare<Packet::ChunkBegin>();
274 pack.WriteTransmissionId(trans_id);
275 pack.WriteFlags(flags);
276 pack.WriteChunkCoords(current->Position());
277 pack.WriteDataSize(buffer_len);
278 if (begin_packet == -1) {
281 begin_packet = conn.Send();
284 void ChunkTransmitter::SendData(size_t i) {
285 int pos = i * packet_len;
286 int len = min(packet_len, buffer_len - pos);
287 const uint8_t *data = &buffer[pos];
289 auto pack = conn.Prepare<Packet::ChunkData>();
290 pack.WriteTransmissionId(trans_id);
291 pack.WriteDataOffset(pos);
292 pack.WriteDataSize(len);
293 pack.WriteData(data, len);
295 if (data_packets[i] == -1) {
298 data_packets[i] = conn.Send();
301 void ChunkTransmitter::Release() {