OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TBufferTransports.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _SANDESH_TRANSPORT_TBUFFERTRANSPORTS_H_
21 #define _SANDESH_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
22 
23 #include <cstring>
24 #include "boost/scoped_array.hpp"
25 
26 #include <sandesh/transport/TTransport.h>
27 #include <sandesh/transport/TVirtualTransport.h>
28 
29 #ifdef __GNUC__
30 #define TDB_LIKELY(val) (__builtin_expect((val), 1))
31 #define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
32 #else
33 #define TDB_LIKELY(val) (val)
34 #define TDB_UNLIKELY(val) (val)
35 #endif
36 
37 namespace contrail { namespace sandesh { namespace transport {
38 
39 
50 class TBufferBase : public TVirtualTransport<TBufferBase> {
51 
52  public:
53 
62  int32_t read(uint8_t* buf, uint32_t len) {
63  uint8_t* new_rBase = rBase_ + len;
64  if (TDB_LIKELY(new_rBase <= rBound_)) {
65  std::memcpy(buf, rBase_, len);
66  rBase_ = new_rBase;
67  return len;
68  }
69  return readSlow(buf, len);
70  }
71 
75  int32_t readAll(uint8_t* buf, uint32_t len) {
76  uint8_t* new_rBase = rBase_ + len;
77  if (TDB_LIKELY(new_rBase <= rBound_)) {
78  std::memcpy(buf, rBase_, len);
79  rBase_ = new_rBase;
80  return len;
81  }
82  return contrail::sandesh::transport::readAll(*this, buf, len);
83  }
84 
94  int write(const uint8_t* buf, uint32_t len) {
95  uint8_t* new_wBase = wBase_ + len;
96  if (TDB_LIKELY(new_wBase <= wBound_)) {
97  std::memcpy(wBase_, buf, len);
98  wBase_ = new_wBase;
99  return 0;
100  }
101  return writeSlow(buf, len);
102  }
103 
107  const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
108  if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
109  // With strict aliasing, writing to len shouldn't force us to
110  // refetch rBase_ from memory. TODO(dreiss): Verify this.
111  *len = static_cast<uint32_t>(rBound_ - rBase_);
112  return rBase_;
113  }
114  return borrowSlow(buf, len);
115  }
116 
120  void consume(uint32_t len) {
121  if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
122  rBase_ += len;
123  } else {
124  LOG(ERROR, __func__ << ": Consume did not follow a borrow.");
125  assert(false);
126  }
127  }
128 
130  void setReadBuffer(uint8_t* buf, uint32_t len) {
131  rBase_ = buf;
132  rBound_ = buf+len;
133  }
134 
136  void setWriteBuffer(uint8_t* buf, uint32_t len) {
137  wBase_ = buf;
138  wBound_ = buf+len;
139  }
140 
141  protected:
142 
144  virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
145 
147  virtual int writeSlow(const uint8_t* buf, uint32_t len) = 0;
148 
154  virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
155 
164  : rBase_(NULL)
165  , rBound_(NULL)
166  , wBase_(NULL)
167  , wBound_(NULL)
168  {}
169 
170  virtual ~TBufferBase() {}
171 
173  uint8_t* rBase_;
175  uint8_t* rBound_;
176 
178  uint8_t* wBase_;
180  uint8_t* wBound_;
181 };
182 
192 class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
193  private:
194 
195  // Common initialization done by all constructors.
196  void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
197  if (buf == NULL && size != 0) {
198  assert(owner);
199  buf = (uint8_t*)std::malloc(size);
200  if (buf == NULL) {
201  LOG(ERROR, __func__ << ": Allocation of " << size << " bytes FAILED");
202  assert(false);
203  }
204  }
205 
206  buffer_ = buf;
207  bufferSize_ = size;
208 
209  rBase_ = buffer_;
210  rBound_ = buffer_ + wPos;
211  // TODO(dreiss): Investigate NULL-ing this if !owner.
212  wBase_ = buffer_ + wPos;
214 
215  owner_ = owner;
216 
217  // rBound_ is really an artifact. In principle, it should always be
218  // equal to wBase_. We update it in a few places (computeRead, etc.).
219  }
220 
221  public:
222  static const uint32_t defaultSize = 1024;
223 
245  { OBSERVE = 1
246  , COPY = 2
248  };
249 
255  initCommon(NULL, defaultSize, true, 0);
256  }
257 
264  TMemoryBuffer(uint32_t sz) {
265  initCommon(NULL, sz, true, 0);
266  }
267 
278  TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
279  if (buf == NULL && sz != 0) {
280  LOG(ERROR, __func__ << ": TMemoryBuffer given null buffer with "
281  "non-zero size.");
282  assert(false);
283  }
284  switch (policy) {
285  case OBSERVE:
286  case TAKE_OWNERSHIP:
287  initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
288  break;
289  case COPY:
290  initCommon(NULL, sz, true, 0);
291  this->write(buf, sz);
292  break;
293  default:
294  LOG(ERROR, __func__ << ": Invalid MemoryPolicy for TMemoryBuffer");
295  assert(false);
296  break;
297  }
298  }
299 
301  if (owner_) {
302  std::free(buffer_);
303  }
304  }
305 
306  bool isOpen() {
307  return true;
308  }
309 
310  bool peek() {
311  return (rBase_ < wBase_);
312  }
313 
314  void open() {}
315 
316  void close() {}
317 
318  // TODO(dreiss): Make bufPtr const.
319  void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
320  *bufPtr = rBase_;
321  *sz = static_cast<uint32_t>(wBase_ - rBase_);
322  }
323 
324  std::string getBufferAsString() {
325  if (buffer_ == NULL) {
326  return "";
327  }
328  uint8_t* buf;
329  uint32_t sz;
330  getBuffer(&buf, &sz);
331  return std::string((char*)buf, (std::string::size_type)sz);
332  }
333 
334  void appendBufferToString(std::string& str) {
335  if (buffer_ == NULL) {
336  return;
337  }
338  uint8_t* buf;
339  uint32_t sz;
340  getBuffer(&buf, &sz);
341  str.append((char*)buf, sz);
342  }
343 
344  void resetBuffer() {
345  rBase_ = buffer_;
346  rBound_ = buffer_;
347  wBase_ = buffer_;
348  // It isn't safe to write into a buffer we don't own.
349  if (!owner_) {
350  wBound_ = wBase_;
351  bufferSize_ = 0;
352  }
353  }
354 
356  void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
357  // Use a variant of the copy-and-swap trick for assignment operators.
358  // This is sub-optimal in terms of performance for two reasons:
359  // 1/ The constructing and swapping of the (small) values
360  // in the temporary object takes some time, and is not necessary.
361  // 2/ If policy == COPY, we allocate the new buffer before
362  // freeing the old one, precluding the possibility of
363  // reusing that memory.
364  // I doubt that either of these problems could be optimized away,
365  // but the second is probably no a common case, and the first is minor.
366  // I don't expect resetBuffer to be a common operation, so I'm willing to
367  // bite the performance bullet to make the method this simple.
368 
369  // Construct the new buffer.
370  TMemoryBuffer new_buffer(buf, sz, policy);
371  // Move it into ourself.
372  this->swap(new_buffer);
373  // Our old self gets destroyed.
374  }
375 
377  void resetBuffer(uint32_t sz) {
378  // Construct the new buffer.
379  TMemoryBuffer new_buffer(sz);
380  // Move it into ourself.
381  this->swap(new_buffer);
382  // Our old self gets destroyed.
383  }
384 
385  std::string readAsString(uint32_t len) {
386  std::string str;
387  (void)readAppendToString(str, len);
388  return str;
389  }
390 
391  uint32_t readAppendToString(std::string& str, uint32_t len);
392 
393  // return number of bytes read
394  uint32_t readEnd() {
395  uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_);
396  if (rBase_ == wBase_) {
397  resetBuffer();
398  }
399  return bytes;
400  }
401 
402  // Return number of bytes written
403  uint32_t writeEnd() {
404  return static_cast<uint32_t>(wBase_ - buffer_);
405  }
406 
407  uint32_t available_read() const {
408  // Remember, wBase_ is the real rBound_.
409  return static_cast<uint32_t>(wBase_ - rBase_);
410  }
411 
412  uint32_t available_write() const {
413  return static_cast<uint32_t>(wBound_ - wBase_);
414  }
415 
416  // Returns a pointer to where the client can write data to append to
417  // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
418  // write of the provided length. The returned pointer is very convenient for
419  // passing to read(), recv(), or similar. You must call wroteBytes() as soon
420  // as data is written or the buffer will not be aware that data has changed.
421  uint8_t* getWritePtr(uint32_t len) {
422  ensureCanWrite(len);
423  return wBase_;
424  }
425 
426  // Informs the buffer that the client has written 'len' bytes into storage
427  // that had been provided by getWritePtr().
428  void wroteBytes(uint32_t len);
429 
430  /*
431  * TVirtualTransport provides a default implementation of readAll().
432  * We want to use the TBufferBase version instead.
433  */
434  uint32_t readAll(uint8_t* buf, uint32_t len) {
435  return TBufferBase::readAll(buf,len);
436  }
437 
438  protected:
439  void swap(TMemoryBuffer& that) {
440  using std::swap;
441  swap(buffer_, that.buffer_);
443 
444  swap(rBase_, that.rBase_);
445  swap(rBound_, that.rBound_);
446  swap(wBase_, that.wBase_);
447  swap(wBound_, that.wBound_);
448 
449  swap(owner_, that.owner_);
450  }
451 
452  // Make sure there's at least 'len' bytes available for writing.
453  int ensureCanWrite(uint32_t len);
454 
455  // Compute the position and available data for reading.
456  void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
457 
458  uint32_t readSlow(uint8_t* buf, uint32_t len);
459 
460  int writeSlow(const uint8_t* buf, uint32_t len);
461 
462  const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
463 
464  // Data buffer
465  uint8_t* buffer_;
466 
467  // Allocated buffer size
468  uint32_t bufferSize_;
469 
470  // Is this object the owner of the buffer?
471  bool owner_;
472 
473  // Don't forget to update constrctors, initCommon, and swap if
474  // you add new members.
475 };
476 
477 }}} // contrail::sandesh::transport
478 
479 #endif // #ifndef _SANDESH_TRANSPORT_TBUFFERTRANSPORTS_H_
void getBuffer(uint8_t **bufPtr, uint32_t *sz)
void resetBuffer(uint32_t sz)
See constructor documentation.
uint8_t * wBound_
Writes may extend to just before here.
uint8_t * rBound_
Reads may extend to just before here.
void initCommon(uint8_t *buf, uint32_t size, bool owner, uint32_t wPos)
uint32_t readAll(uint8_t *buf, uint32_t len)
int writeSlow(const uint8_t *buf, uint32_t len)
Slow path write.
void setWriteBuffer(uint8_t *buf, uint32_t len)
Convenience mutator for setting the write buffer.
int32_t readAll(uint8_t *buf, uint32_t len)
void setReadBuffer(uint8_t *buf, uint32_t len)
Convenience mutator for setting the read buffer.
uint32_t readSlow(uint8_t *buf, uint32_t len)
Slow path read.
const uint8_t * borrowSlow(uint8_t *buf, uint32_t *len)
virtual uint32_t readSlow(uint8_t *buf, uint32_t len)=0
Slow path read.
#define TDB_LIKELY(val)
const uint8_t * borrow(uint8_t *buf, uint32_t *len)
TMemoryBuffer(uint8_t *buf, uint32_t sz, MemoryPolicy policy=OBSERVE)
virtual int writeSlow(const uint8_t *buf, uint32_t len)=0
Slow path write.
int32_t read(uint8_t *buf, uint32_t len)
uint32_t readAppendToString(std::string &str, uint32_t len)
void resetBuffer(uint8_t *buf, uint32_t sz, MemoryPolicy policy=OBSERVE)
See constructor documentation.
#define LOG(_Level, _Msg)
Definition: logging.h:33
void computeRead(uint32_t len, uint8_t **out_start, uint32_t *out_give)
int write(const uint8_t *buf, uint32_t len)
virtual const uint8_t * borrowSlow(uint8_t *buf, uint32_t *len)=0
int32_t readAll(Transport_ &trans, uint8_t *buf, uint32_t len)
Definition: TTransport.h:35