1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.ByteBufferUtils; 13 14 import hunt.io.ByteBuffer; 15 import hunt.io.BufferUtils; 16 import hunt.proton.engine.Transport; 17 import hunt.proton.engine.TransportException; 18 import std.algorithm; 19 import hunt.proton.engine.impl.TransportInput; 20 import hunt.Exceptions; 21 22 class ByteBufferUtils 23 { 24 /** 25 * @return number of bytes poured 26 */ 27 public static int pour(ByteBuffer source, ByteBuffer destination) 28 { 29 int numberOfBytesToPour = min(source.remaining(), destination.remaining()); 30 ByteBuffer sourceSubBuffer = source.duplicate(); 31 sourceSubBuffer.limit(sourceSubBuffer.position() + numberOfBytesToPour); 32 destination.put(sourceSubBuffer); 33 source.position(source.position() + numberOfBytesToPour); 34 return numberOfBytesToPour; 35 } 36 37 /** 38 * Assumes {@code destination} is ready to be written. 39 * 40 * @return number of bytes poured which may be fewer than {@code sizeRequested} if 41 * {@code destination} has insufficient remaining 42 */ 43 public static int pourArrayToBuffer(byte[] source, int offset, int sizeRequested, ByteBuffer destination) 44 { 45 int numberToWrite = min(destination.remaining(), sizeRequested); 46 destination.put(source, offset, numberToWrite); 47 return numberToWrite; 48 } 49 50 /** 51 * Pours the contents of {@code source} into {@code destinationTransportInput}, calling 52 * the TransportInput many times if necessary. If the TransportInput returns a {@link hunt.proton.engine.TransportResult} 53 * other than ok, data may remain in source. 54 */ 55 public static int pourAll(ByteBuffer source, TransportInput destinationTransportInput) 56 { 57 int capacity = destinationTransportInput.capacity(); 58 if (capacity == Transport.END_OF_STREAM) 59 { 60 if (source.hasRemaining()) { 61 throw new IllegalStateException("Destination has reached end of stream: "); 62 } else { 63 return Transport.END_OF_STREAM; 64 } 65 } 66 67 int total = source.remaining(); 68 69 while(source.hasRemaining() && destinationTransportInput.capacity() > 0) 70 { 71 pour(source, destinationTransportInput.tail()); 72 destinationTransportInput.process(); 73 } 74 75 return total - source.remaining(); 76 } 77 78 /** 79 * Assumes {@code source} is ready to be read. 80 * 81 * @return number of bytes poured which may be fewer than {@code sizeRequested} if 82 * {@code source} has insufficient remaining 83 */ 84 public static int pourBufferToArray(ByteBuffer source, byte[] destination, int offset, int sizeRequested) 85 { 86 int numberToRead = min(source.remaining(), sizeRequested); 87 source.get(destination, offset, numberToRead); 88 return numberToRead; 89 } 90 91 public static ByteBuffer newWriteableBuffer(int capacity) 92 { 93 ByteBuffer newBuffer = BufferUtils.allocate(capacity); 94 return newBuffer; 95 } 96 97 public static ByteBuffer newReadableBuffer(int capacity) 98 { 99 ByteBuffer newBuffer = BufferUtils.allocate(capacity); 100 newBuffer.flip(); 101 return newBuffer; 102 } 103 104 }