1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.SenderImpl;
13 
14 import hunt.proton.codec.ReadableBuffer;
15 import hunt.proton.engine.EndpointState;
16 import hunt.proton.engine.Sender;
17 import hunt.proton.engine.impl.LinkImpl;
18 import hunt.proton.engine.impl.TransportSender;
19 import hunt.proton.engine.impl.SessionImpl;
20 import hunt.Exceptions;
21 import hunt.proton.engine.impl.DeliveryImpl;
22 import hunt.logging;
23 class SenderImpl  : LinkImpl , Sender
24 {
25     private int _offered;
26     private TransportSender _transportLink;
27 
28     this(SessionImpl session, string name)
29     {
30         super(session, name);
31     }
32 
33     override
34     public void offer(int credits)
35     {
36         _offered = credits;
37     }
38 
39     override
40     public int send(byte[] bytes, int offset, int length)
41     {
42         if (getLocalState() == EndpointState.CLOSED)
43         {
44             throw new IllegalStateException("send not allowed after the sender is closed.");
45         }
46         DeliveryImpl current = current();
47         if (current is null || current.getLink() != this)
48         {
49             throw new IllegalArgumentException();//TODO.
50         }
51         int sent = current.send(bytes, offset, length);
52         if (sent > 0) {
53             getSession().incrementOutgoingBytes(sent);
54         }
55         return sent;
56     }
57 
58     override
59     public int send(ReadableBuffer buffer)
60     {
61         if (getLocalState() == EndpointState.CLOSED)
62         {
63             throw new IllegalStateException("send not allowed after the sender is closed.");
64         }
65         DeliveryImpl current = current();
66         if (current is null || current.getLink() != this)
67         {
68             throw new IllegalArgumentException();
69         }
70         int sent = current.send(buffer);
71         if (sent > 0) {
72             getSession().incrementOutgoingBytes(sent);
73         }
74         return sent;
75     }
76 
77     override
78     public int sendNoCopy(ReadableBuffer buffer)
79     {
80         if (getLocalState() == EndpointState.CLOSED)
81         {
82             throw new IllegalStateException("send not allowed after the sender is closed.");
83         }
84         DeliveryImpl current = current();
85         if (current is null || current.getLink() != this)
86         {
87             throw new IllegalArgumentException();
88         }
89         int sent = current.sendNoCopy(buffer);
90         if (sent > 0) {
91             getSession().incrementOutgoingBytes(sent);
92         }
93         return sent;
94     }
95 
96     override
97     public void abort()
98     {
99         //TODO.
100     }
101 
102     override
103     void doFree()
104     {
105         getSession().freeSender(this);
106         super.doFree();
107     }
108 
109     override
110     public bool advance()
111     {
112         DeliveryImpl delivery = current();
113         if (delivery !is null) {
114             delivery.setComplete();
115         }
116 
117         bool advance = super.advance();
118         if(advance && _offered > 0)
119         {
120             _offered--;
121         }
122         if(advance)
123         {
124             decrementCredit();
125             delivery.addToTransportWorkList();
126             getSession().incrementOutgoingDeliveries(1);
127         }
128 
129         return advance;
130     }
131 
132     bool hasOfferedCredits()
133     {
134         return _offered > 0;
135     }
136 
137     override
138     TransportSender getTransportLink()
139     {
140         return _transportLink;
141     }
142 
143     void setTransportLink(TransportSender transportLink)
144     {
145         _transportLink = transportLink;
146     }
147 
148 
149     override
150     public void setCredit(int credit)
151     {
152         super.setCredit(credit);
153        /* while(getQueued()>0 && getCredit()>0)
154         {
155             advance();
156         }*/
157     }
158 
159     override
160     public int getRemoteCredit()
161     {
162         // Credit is decremented as soon as advance is called on a send,
163         // so we need only consider the credit count, not the queued count.
164         return getCredit();
165     }
166 }