1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.ByteBufferUtils;
13 
14 import hunt.io.ByteBuffer;
15 import hunt.io.BufferUtils;
16 import hunt.proton.engine.Transport;
17 import hunt.proton.engine.TransportException;
18 import std.algorithm;
19 import hunt.proton.engine.impl.TransportInput;
20 import hunt.Exceptions;
21 
22 class ByteBufferUtils
23 {
24     /**
25      * @return number of bytes poured
26      */
27     public static int pour(ByteBuffer source, ByteBuffer destination)
28     {
29         int numberOfBytesToPour = min(source.remaining(), destination.remaining());
30         ByteBuffer sourceSubBuffer = source.duplicate();
31         sourceSubBuffer.limit(sourceSubBuffer.position() + numberOfBytesToPour);
32         destination.put(sourceSubBuffer);
33         source.position(source.position() + numberOfBytesToPour);
34         return numberOfBytesToPour;
35     }
36 
37     /**
38      * Assumes {@code destination} is ready to be written.
39      *
40      * @return number of bytes poured which may be fewer than {@code sizeRequested} if
41      * {@code destination} has insufficient remaining
42      */
43     public static int pourArrayToBuffer(byte[] source, int offset, int sizeRequested, ByteBuffer destination)
44     {
45         int numberToWrite = min(destination.remaining(), sizeRequested);
46         destination.put(source, offset, numberToWrite);
47         return numberToWrite;
48     }
49 
50     /**
51      * Pours the contents of {@code source} into {@code destinationTransportInput}, calling
52      * the TransportInput many times if necessary.  If the TransportInput returns a {@link hunt.proton.engine.TransportResult}
53      * other than ok, data may remain in source.
54      */
55     public static int pourAll(ByteBuffer source, TransportInput destinationTransportInput)
56     {
57         int capacity = destinationTransportInput.capacity();
58         if (capacity == Transport.END_OF_STREAM)
59         {
60             if (source.hasRemaining()) {
61                 throw new IllegalStateException("Destination has reached end of stream: ");
62             } else {
63                 return Transport.END_OF_STREAM;
64             }
65         }
66 
67         int total = source.remaining();
68 
69         while(source.hasRemaining() && destinationTransportInput.capacity() > 0)
70         {
71             pour(source, destinationTransportInput.tail());
72             destinationTransportInput.process();
73         }
74 
75         return total - source.remaining();
76     }
77 
78     /**
79      * Assumes {@code source} is ready to be read.
80      *
81      * @return number of bytes poured which may be fewer than {@code sizeRequested} if
82      * {@code source} has insufficient remaining
83      */
84     public static int pourBufferToArray(ByteBuffer source, byte[] destination, int offset, int sizeRequested)
85     {
86         int numberToRead = min(source.remaining(), sizeRequested);
87         source.get(destination, offset, numberToRead);
88         return numberToRead;
89     }
90 
91     public static ByteBuffer newWriteableBuffer(int capacity)
92     {
93         ByteBuffer newBuffer = BufferUtils.allocate(capacity);
94         return newBuffer;
95     }
96 
97     public static ByteBuffer newReadableBuffer(int capacity)
98     {
99         ByteBuffer newBuffer = BufferUtils.allocate(capacity);
100         newBuffer.flip();
101         return newBuffer;
102     }
103 
104 }