1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.ReceiverImpl;
13 
14 import hunt.proton.amqp.UnsignedInteger;
15 import hunt.proton.codec.ReadableBuffer;
16 import hunt.proton.codec.WritableBuffer;
17 import hunt.proton.engine.Receiver;
18 import hunt.proton.engine.impl.LinkImpl;
19 import hunt.proton.engine.impl.DeliveryImpl;
20 import hunt.proton.engine.impl.TransportReceiver;
21 import hunt.proton.engine.impl.SessionImpl;
22 import hunt.Exceptions;
23 
24 
25 class ReceiverImpl : LinkImpl , Receiver
26 {
27     private bool _drainFlagMode = true;
28 
29     override
30     public bool advance()
31     {
32         DeliveryImpl current = current();
33         if(current !is null)
34         {
35             current.setDone();
36         }
37         bool advance = super.advance();
38         if(advance)
39         {
40             decrementQueued();
41             decrementCredit();
42             getSession().incrementIncomingBytes(-current.pending());
43             getSession().incrementIncomingDeliveries(-1);
44             if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) {
45                 modified();
46             }
47         }
48         return advance;
49     }
50 
51     private TransportReceiver _transportReceiver;
52     private int _unsentCredits;
53 
54 
55     this(SessionImpl session, string name)
56     {
57         super(session, name);
58     }
59 
60     override
61     public void flow(int credits)
62     {
63         addCredit(credits);
64         _unsentCredits += credits;
65         modified();
66         if (!_drainFlagMode)
67         {
68             setDrain(false);
69             _drainFlagMode = false;
70         }
71     }
72 
73     int clearUnsentCredits()
74     {
75         int credits = _unsentCredits;
76         _unsentCredits = 0;
77         return credits;
78     }
79 
80     override
81     public int recv(byte[] bytes, int offset, int size)
82     {
83         if (_current is null) {
84             throw new IllegalStateException("no current delivery");
85         }
86 
87         int consumed = _current.recv(bytes, offset, size);
88         if (consumed > 0) {
89             getSession().incrementIncomingBytes(-consumed);
90             if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) {
91                 modified();
92             }
93         }
94         return consumed;
95     }
96 
97     override
98     public int recv(WritableBuffer buffer)
99     {
100         if (_current is null) {
101             throw new IllegalStateException("no current delivery");
102         }
103 
104         int consumed = _current.recv(buffer);
105         if (consumed > 0) {
106             getSession().incrementIncomingBytes(-consumed);
107             if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) {
108                 modified();
109             }
110         }
111         return consumed;
112     }
113 
114     override
115     public ReadableBuffer recv()
116     {
117         if (_current is null) {
118             throw new IllegalStateException("no current delivery");
119         }
120 
121         ReadableBuffer consumed = _current.recv();
122         if (consumed.remaining() > 0) {
123             getSession().incrementIncomingBytes(-consumed.remaining());
124             if (getSession().getTransportSession().getIncomingWindowSize() == (UnsignedInteger.ZERO)) {
125                 modified();
126             }
127         }
128         return consumed;
129     }
130 
131     override
132     void doFree()
133     {
134         getSession().freeReceiver(this);
135         super.doFree();
136     }
137 
138     bool hasIncoming()
139     {
140         return false;  //TODO - Implement
141     }
142 
143     void setTransportLink(TransportReceiver transportReceiver)
144     {
145         _transportReceiver = transportReceiver;
146     }
147 
148     override
149     TransportReceiver getTransportLink()
150     {
151         return _transportReceiver;
152     }
153 
154     override
155     public void drain(int credit)
156     {
157         setDrain(true);
158         flow(credit);
159         _drainFlagMode = false;
160     }
161 
162     override
163     public bool draining()
164     {
165         return getDrain() && (getCredit() > getQueued());
166     }
167 
168     override
169     public void setDrain(bool drain)
170     {
171         super.setDrain(drain);
172         modified();
173         _drainFlagMode = true;
174     }
175 
176     override
177     public int getRemoteCredit()
178     {
179         // Credit is only decremented once advance is called on a received message,
180         // so we also need to consider the queued count.
181         return getCredit() - getQueued();
182     }
183 }