1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.SessionImpl;
13 
14 import hunt.collection.ArrayList;
15 import hunt.collection.Set;
16 import hunt.collection.LinkedHashMap;
17 import hunt.collection.List;
18 import hunt.collection.Map;
19 
20 import hunt.proton.engine.impl.EndpointImplQuery;
21 import hunt.proton.amqp.Symbol;
22 import hunt.proton.engine.EndpointState;
23 import hunt.proton.engine.Event;
24 import hunt.proton.engine.ProtonJSession;
25 import hunt.proton.engine.Session;
26 import hunt.proton.engine.impl.EndpointImpl;
27 import hunt.proton.engine.impl.ConnectionImpl;
28 import hunt.proton.engine.impl.SenderImpl;
29 import hunt.proton.engine.impl.ReceiverImpl;
30 import hunt.proton.engine.impl.LinkImpl;
31 import hunt.proton.engine.impl.TransportSession;
32 import hunt.proton.engine.impl.LinkNode;
33 import hunt.Exceptions;
34 import std.conv: to;
35 import hunt.logging;
36 class SessionImpl : EndpointImpl , ProtonJSession
37 {
38     private ConnectionImpl _connection;
39 
40     private Map!(string, SenderImpl) _senders ;// = new LinkedHashMap<string, SenderImpl>();
41     private Map!(string, ReceiverImpl)  _receivers  ;//= new LinkedHashMap<String, ReceiverImpl>();
42     private List!LinkImpl _oldLinksToFree  ;// = new ArrayList<LinkImpl>();
43     private TransportSession _transportSession;
44     private int _incomingCapacity = 0;
45     private int _incomingBytes = 0;
46     private int _outgoingBytes = 0;
47     private int _incomingDeliveries = 0;
48     private int _outgoingDeliveries = 0;
49     private long _outgoingWindow = 2147483647;
50     private Map!(Symbol, Object) _properties;
51     private Map!(Symbol, Object) _remoteProperties;
52     private Symbol[] _offeredCapabilities;
53     private Symbol[] _remoteOfferedCapabilities;
54     private Symbol[] _desiredCapabilities;
55     private Symbol[] _remoteDesiredCapabilities;
56 
57     private LinkNode!SessionImpl _node;
58 
59 
60     this(ConnectionImpl connection)
61     {
62         _senders = new LinkedHashMap!(string, SenderImpl)();
63         _receivers = new LinkedHashMap!(string, ReceiverImpl)();
64         _oldLinksToFree = new ArrayList!LinkImpl;
65 
66         _connection = connection;
67         _connection.incref();
68         _node = _connection.addSessionEndpoint(this);
69         _connection.put(Type.SESSION_INIT, this);
70     }
71 
72     override
73     public SenderImpl sender(string name)
74     {
75         SenderImpl sender = _senders.get(name);
76         if(sender is null)
77         {
78             sender = new SenderImpl(this, name);
79             _senders.put(name, sender);
80         }
81         else
82         {
83             if(sender.getLocalState() == EndpointState.CLOSED
84                   && sender.getRemoteState() == EndpointState.CLOSED)
85             {
86                 _oldLinksToFree.add(sender);
87 
88                 sender = new SenderImpl(this, name);
89                 _senders.put(name, sender);
90             }
91         }
92         return sender;
93     }
94 
95     override
96     public ReceiverImpl receiver(string name)
97     {
98         ReceiverImpl receiver = _receivers.get(name);
99         if(receiver is null)
100         {
101             receiver = new ReceiverImpl(this, name);
102             _receivers.put(name, receiver);
103         }
104         else
105         {
106             if(receiver.getLocalState() == EndpointState.CLOSED
107                   && receiver.getRemoteState() == EndpointState.CLOSED)
108             {
109                 _oldLinksToFree.add(receiver);
110 
111                 receiver = new ReceiverImpl(this, name);
112                 _receivers.put(name, receiver);
113             }
114         }
115         return receiver;
116     }
117 
118     override
119     public Session next(Set!EndpointState local, Set!EndpointState remote)
120     {
121         Query!SessionImpl query = new EndpointImplQuery!SessionImpl(local, remote);
122 
123         LinkNode!SessionImpl sessionNode = _node.next(query);
124 
125         return sessionNode is null ? null : sessionNode.getValue();
126     }
127 
128     override
129     public ConnectionImpl getConnectionImpl()
130     {
131         return _connection;
132     }
133 
134     override
135     public ConnectionImpl getConnection()
136     {
137         return getConnectionImpl();
138     }
139 
140     override
141     void postFinal() {
142         _connection.put(Type.SESSION_FINAL, this);
143         _connection.decref();
144     }
145 
146     override
147     void doFree() {
148         _connection.freeSession(this);
149         _connection.removeSessionEndpoint(_node);
150         _node = null;
151 
152         List!SenderImpl senders = new ArrayList!SenderImpl(_senders.values());
153         foreach(SenderImpl sender ; senders) {
154             sender.free();
155         }
156         _senders.clear();
157 
158         List!ReceiverImpl receivers = new ArrayList!ReceiverImpl(_receivers.values());
159         foreach(ReceiverImpl receiver ; receivers) {
160             receiver.free();
161         }
162         _receivers.clear();
163 
164         List!LinkImpl links = new ArrayList!LinkImpl(_oldLinksToFree);
165         foreach(LinkImpl link ; links) {
166             link.free();
167         }
168     }
169 
170     void modifyEndpoints() {
171         foreach (SenderImpl snd ; _senders.values()) {
172             snd.modifyEndpoints();
173         }
174 
175         foreach (ReceiverImpl rcv ; _receivers.values()) {
176             rcv.modifyEndpoints();
177         }
178         modified();
179     }
180 
181     TransportSession getTransportSession()
182     {
183         return _transportSession;
184     }
185 
186     void setTransportSession(TransportSession transportSession)
187     {
188         _transportSession = transportSession;
189     }
190 
191     void setNode(LinkNode!SessionImpl node)
192     {
193         _node = node;
194     }
195 
196     void freeSender(SenderImpl sender)
197     {
198         string name = sender.getName();
199         SenderImpl existing = _senders.get(name);
200         if (sender is (existing))
201         {
202             _senders.remove(name);
203         }
204         else
205         {
206             _oldLinksToFree.remove(sender);
207         }
208     }
209 
210     void freeReceiver(ReceiverImpl receiver)
211     {
212         string name = receiver.getName();
213         ReceiverImpl existing = _receivers.get(name);
214         if (receiver is (existing))
215         {
216             _receivers.remove(name);
217         }
218         else
219         {
220             _oldLinksToFree.remove(receiver);
221         }
222     }
223 
224     override
225     public int getIncomingCapacity()
226     {
227         return _incomingCapacity;
228     }
229 
230     override
231     public void setIncomingCapacity(int capacity)
232     {
233         _incomingCapacity = capacity;
234     }
235 
236     override
237     public int getIncomingBytes()
238     {
239         return _incomingBytes;
240     }
241 
242     void incrementIncomingBytes(int delta)
243     {
244         _incomingBytes += delta;
245     }
246 
247     override
248     public int getOutgoingBytes()
249     {
250         return _outgoingBytes;
251     }
252 
253     void incrementOutgoingBytes(int delta)
254     {
255         _outgoingBytes += delta;
256     }
257 
258     void incrementIncomingDeliveries(int delta)
259     {
260         _incomingDeliveries += delta;
261     }
262 
263     int getOutgoingDeliveries()
264     {
265         return _outgoingDeliveries;
266     }
267 
268     void incrementOutgoingDeliveries(int delta)
269     {
270         _outgoingDeliveries += delta;
271     }
272 
273     override
274     void localOpen()
275     {
276         getConnectionImpl().put(Type.SESSION_LOCAL_OPEN, this);
277     }
278 
279     override
280     void localClose()
281     {
282         getConnectionImpl().put(Type.SESSION_LOCAL_CLOSE, this);
283     }
284 
285     override
286     public void setOutgoingWindow(long outgoingWindow) {
287         if(outgoingWindow < 0 || outgoingWindow > 0xFFFFFFFFL)
288         {
289             throw new IllegalArgumentException("Value '" ~ to!string(outgoingWindow) ~ "' must be in the"
290                     ~ " range [0 - 2^32-1]");
291         }
292         _outgoingWindow = outgoingWindow;
293     }
294 
295     override
296     public long getOutgoingWindow()
297     {
298         return _outgoingWindow;
299     }
300 
301     override
302     public Map!(Symbol, Object) getProperties()
303     {
304         return _properties;
305     }
306 
307     override
308     public void setProperties(Map!(Symbol, Object) properties)
309     {
310         _properties = properties;
311     }
312 
313     override
314     public Map!(Symbol, Object) getRemoteProperties()
315     {
316         return _remoteProperties;
317     }
318 
319     void setRemoteProperties(Map!(Symbol, Object) remoteProperties)
320     {
321         _remoteProperties = remoteProperties;
322     }
323 
324     override
325     public Symbol[] getDesiredCapabilities()
326     {
327         return _desiredCapabilities;
328     }
329 
330     override
331     public void setDesiredCapabilities(Symbol[] desiredCapabilities)
332     {
333         _desiredCapabilities = desiredCapabilities;
334     }
335 
336     override
337     public Symbol[] getRemoteDesiredCapabilities()
338     {
339         return _remoteDesiredCapabilities;
340     }
341 
342     void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
343     {
344         _remoteDesiredCapabilities = remoteDesiredCapabilities;
345     }
346 
347     override
348     public Symbol[] getOfferedCapabilities()
349     {
350         return _offeredCapabilities;
351     }
352 
353     override
354     public void setOfferedCapabilities(Symbol[] offeredCapabilities)
355     {
356         _offeredCapabilities = offeredCapabilities;
357     }
358 
359     override
360     public Symbol[] getRemoteOfferedCapabilities()
361     {
362         return _remoteOfferedCapabilities;
363     }
364 
365     void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
366     {
367         _remoteOfferedCapabilities = remoteOfferedCapabilities;
368     }
369 }