1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.SessionImpl; 13 14 import hunt.collection.ArrayList; 15 import hunt.collection.Set; 16 import hunt.collection.LinkedHashMap; 17 import hunt.collection.List; 18 import hunt.collection.Map; 19 20 import hunt.proton.engine.impl.EndpointImplQuery; 21 import hunt.proton.amqp.Symbol; 22 import hunt.proton.engine.EndpointState; 23 import hunt.proton.engine.Event; 24 import hunt.proton.engine.ProtonJSession; 25 import hunt.proton.engine.Session; 26 import hunt.proton.engine.impl.EndpointImpl; 27 import hunt.proton.engine.impl.ConnectionImpl; 28 import hunt.proton.engine.impl.SenderImpl; 29 import hunt.proton.engine.impl.ReceiverImpl; 30 import hunt.proton.engine.impl.LinkImpl; 31 import hunt.proton.engine.impl.TransportSession; 32 import hunt.proton.engine.impl.LinkNode; 33 import hunt.Exceptions; 34 import std.conv: to; 35 import hunt.logging; 36 class SessionImpl : EndpointImpl , ProtonJSession 37 { 38 private ConnectionImpl _connection; 39 40 private Map!(string, SenderImpl) _senders ;// = new LinkedHashMap<string, SenderImpl>(); 41 private Map!(string, ReceiverImpl) _receivers ;//= new LinkedHashMap<String, ReceiverImpl>(); 42 private List!LinkImpl _oldLinksToFree ;// = new ArrayList<LinkImpl>(); 43 private TransportSession _transportSession; 44 private int _incomingCapacity = 0; 45 private int _incomingBytes = 0; 46 private int _outgoingBytes = 0; 47 private int _incomingDeliveries = 0; 48 private int _outgoingDeliveries = 0; 49 private long _outgoingWindow = 2147483647; 50 private Map!(Symbol, Object) _properties; 51 private Map!(Symbol, Object) _remoteProperties; 52 private Symbol[] _offeredCapabilities; 53 private Symbol[] _remoteOfferedCapabilities; 54 private Symbol[] _desiredCapabilities; 55 private Symbol[] _remoteDesiredCapabilities; 56 57 private LinkNode!SessionImpl _node; 58 59 60 this(ConnectionImpl connection) 61 { 62 _senders = new LinkedHashMap!(string, SenderImpl)(); 63 _receivers = new LinkedHashMap!(string, ReceiverImpl)(); 64 _oldLinksToFree = new ArrayList!LinkImpl; 65 66 _connection = connection; 67 _connection.incref(); 68 _node = _connection.addSessionEndpoint(this); 69 _connection.put(Type.SESSION_INIT, this); 70 } 71 72 override 73 public SenderImpl sender(string name) 74 { 75 SenderImpl sender = _senders.get(name); 76 if(sender is null) 77 { 78 sender = new SenderImpl(this, name); 79 _senders.put(name, sender); 80 } 81 else 82 { 83 if(sender.getLocalState() == EndpointState.CLOSED 84 && sender.getRemoteState() == EndpointState.CLOSED) 85 { 86 _oldLinksToFree.add(sender); 87 88 sender = new SenderImpl(this, name); 89 _senders.put(name, sender); 90 } 91 } 92 return sender; 93 } 94 95 override 96 public ReceiverImpl receiver(string name) 97 { 98 ReceiverImpl receiver = _receivers.get(name); 99 if(receiver is null) 100 { 101 receiver = new ReceiverImpl(this, name); 102 _receivers.put(name, receiver); 103 } 104 else 105 { 106 if(receiver.getLocalState() == EndpointState.CLOSED 107 && receiver.getRemoteState() == EndpointState.CLOSED) 108 { 109 _oldLinksToFree.add(receiver); 110 111 receiver = new ReceiverImpl(this, name); 112 _receivers.put(name, receiver); 113 } 114 } 115 return receiver; 116 } 117 118 override 119 public Session next(Set!EndpointState local, Set!EndpointState remote) 120 { 121 Query!SessionImpl query = new EndpointImplQuery!SessionImpl(local, remote); 122 123 LinkNode!SessionImpl sessionNode = _node.next(query); 124 125 return sessionNode is null ? null : sessionNode.getValue(); 126 } 127 128 override 129 public ConnectionImpl getConnectionImpl() 130 { 131 return _connection; 132 } 133 134 override 135 public ConnectionImpl getConnection() 136 { 137 return getConnectionImpl(); 138 } 139 140 override 141 void postFinal() { 142 _connection.put(Type.SESSION_FINAL, this); 143 _connection.decref(); 144 } 145 146 override 147 void doFree() { 148 _connection.freeSession(this); 149 _connection.removeSessionEndpoint(_node); 150 _node = null; 151 152 List!SenderImpl senders = new ArrayList!SenderImpl(_senders.values()); 153 foreach(SenderImpl sender ; senders) { 154 sender.free(); 155 } 156 _senders.clear(); 157 158 List!ReceiverImpl receivers = new ArrayList!ReceiverImpl(_receivers.values()); 159 foreach(ReceiverImpl receiver ; receivers) { 160 receiver.free(); 161 } 162 _receivers.clear(); 163 164 List!LinkImpl links = new ArrayList!LinkImpl(_oldLinksToFree); 165 foreach(LinkImpl link ; links) { 166 link.free(); 167 } 168 } 169 170 void modifyEndpoints() { 171 foreach (SenderImpl snd ; _senders.values()) { 172 snd.modifyEndpoints(); 173 } 174 175 foreach (ReceiverImpl rcv ; _receivers.values()) { 176 rcv.modifyEndpoints(); 177 } 178 modified(); 179 } 180 181 TransportSession getTransportSession() 182 { 183 return _transportSession; 184 } 185 186 void setTransportSession(TransportSession transportSession) 187 { 188 _transportSession = transportSession; 189 } 190 191 void setNode(LinkNode!SessionImpl node) 192 { 193 _node = node; 194 } 195 196 void freeSender(SenderImpl sender) 197 { 198 string name = sender.getName(); 199 SenderImpl existing = _senders.get(name); 200 if (sender is (existing)) 201 { 202 _senders.remove(name); 203 } 204 else 205 { 206 _oldLinksToFree.remove(sender); 207 } 208 } 209 210 void freeReceiver(ReceiverImpl receiver) 211 { 212 string name = receiver.getName(); 213 ReceiverImpl existing = _receivers.get(name); 214 if (receiver is (existing)) 215 { 216 _receivers.remove(name); 217 } 218 else 219 { 220 _oldLinksToFree.remove(receiver); 221 } 222 } 223 224 override 225 public int getIncomingCapacity() 226 { 227 return _incomingCapacity; 228 } 229 230 override 231 public void setIncomingCapacity(int capacity) 232 { 233 _incomingCapacity = capacity; 234 } 235 236 override 237 public int getIncomingBytes() 238 { 239 return _incomingBytes; 240 } 241 242 void incrementIncomingBytes(int delta) 243 { 244 _incomingBytes += delta; 245 } 246 247 override 248 public int getOutgoingBytes() 249 { 250 return _outgoingBytes; 251 } 252 253 void incrementOutgoingBytes(int delta) 254 { 255 _outgoingBytes += delta; 256 } 257 258 void incrementIncomingDeliveries(int delta) 259 { 260 _incomingDeliveries += delta; 261 } 262 263 int getOutgoingDeliveries() 264 { 265 return _outgoingDeliveries; 266 } 267 268 void incrementOutgoingDeliveries(int delta) 269 { 270 _outgoingDeliveries += delta; 271 } 272 273 override 274 void localOpen() 275 { 276 getConnectionImpl().put(Type.SESSION_LOCAL_OPEN, this); 277 } 278 279 override 280 void localClose() 281 { 282 getConnectionImpl().put(Type.SESSION_LOCAL_CLOSE, this); 283 } 284 285 override 286 public void setOutgoingWindow(long outgoingWindow) { 287 if(outgoingWindow < 0 || outgoingWindow > 0xFFFFFFFFL) 288 { 289 throw new IllegalArgumentException("Value '" ~ to!string(outgoingWindow) ~ "' must be in the" 290 ~ " range [0 - 2^32-1]"); 291 } 292 _outgoingWindow = outgoingWindow; 293 } 294 295 override 296 public long getOutgoingWindow() 297 { 298 return _outgoingWindow; 299 } 300 301 override 302 public Map!(Symbol, Object) getProperties() 303 { 304 return _properties; 305 } 306 307 override 308 public void setProperties(Map!(Symbol, Object) properties) 309 { 310 _properties = properties; 311 } 312 313 override 314 public Map!(Symbol, Object) getRemoteProperties() 315 { 316 return _remoteProperties; 317 } 318 319 void setRemoteProperties(Map!(Symbol, Object) remoteProperties) 320 { 321 _remoteProperties = remoteProperties; 322 } 323 324 override 325 public Symbol[] getDesiredCapabilities() 326 { 327 return _desiredCapabilities; 328 } 329 330 override 331 public void setDesiredCapabilities(Symbol[] desiredCapabilities) 332 { 333 _desiredCapabilities = desiredCapabilities; 334 } 335 336 override 337 public Symbol[] getRemoteDesiredCapabilities() 338 { 339 return _remoteDesiredCapabilities; 340 } 341 342 void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) 343 { 344 _remoteDesiredCapabilities = remoteDesiredCapabilities; 345 } 346 347 override 348 public Symbol[] getOfferedCapabilities() 349 { 350 return _offeredCapabilities; 351 } 352 353 override 354 public void setOfferedCapabilities(Symbol[] offeredCapabilities) 355 { 356 _offeredCapabilities = offeredCapabilities; 357 } 358 359 override 360 public Symbol[] getRemoteOfferedCapabilities() 361 { 362 return _remoteOfferedCapabilities; 363 } 364 365 void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) 366 { 367 _remoteOfferedCapabilities = remoteOfferedCapabilities; 368 } 369 }