1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.ConnectionImpl;
13 
14 import hunt.collection.ArrayList;
15 import hunt.collection.Set;
16 import hunt.collection.List;
17 import hunt.collection.Map;
18 
19 import hunt.proton.amqp.Symbol;
20 import hunt.proton.amqp.transport.Open;
21 import hunt.proton.engine.Collector;
22 import hunt.proton.engine.EndpointState;
23 import hunt.proton.engine.Event;
24 import hunt.proton.engine.Link;
25 import hunt.proton.engine.ProtonJConnection;
26 import hunt.proton.engine.Session;
27 import hunt.proton.engine.Reactor;
28 import hunt.proton.engine.impl.EndpointImpl;
29 import hunt.proton.engine.impl.SessionImpl;
30 import hunt.proton.engine.impl.LinkNode;
31 import hunt.proton.engine.impl.LinkImpl;
32 import hunt.proton.engine.impl.DeliveryImpl;
33 import hunt.proton.engine.impl.TransportImpl;
34 import hunt.proton.engine.impl.CollectorImpl;
35 import hunt.proton.engine.impl.EndpointImplQuery;
36 import hunt.collection.Iterator;
37 import hunt.proton.engine.impl.EventImpl;
38 import hunt.proton.engine.ReactorChild;
39 import hunt.logging;
40 
41 
42 /**
43  * 
44  */
45 class ConnectionImpl : EndpointImpl, ProtonJConnection {
46     static int MAX_CHANNELS = 65535;
47 
48     private List!SessionImpl _sessions; // = new ArrayList<SessionImpl>();
49     private EndpointImpl _transportTail;
50     private EndpointImpl _transportHead;
51     private int _maxChannels; //= MAX_CHANNELS;
52 
53     private LinkNode!SessionImpl _sessionHead;
54     private LinkNode!SessionImpl _sessionTail;
55 
56     private LinkNode!(LinkImpl) _linkHead;
57     private LinkNode!(LinkImpl) _linkTail;
58 
59     private DeliveryImpl _workHead;
60     private DeliveryImpl _workTail;
61 
62     private TransportImpl _transport;
63     private DeliveryImpl _transportWorkHead;
64     private DeliveryImpl _transportWorkTail;
65     private int _transportWorkSize = 0;
66     private string _localContainerId = "";
67     private string _localHostname;
68     private string _remoteContainer;
69     private string _remoteHostname;
70     private Symbol[] _offeredCapabilities;
71     private Symbol[] _desiredCapabilities;
72     private Symbol[] _remoteOfferedCapabilities;
73     private Symbol[] _remoteDesiredCapabilities;
74     private Map!(Symbol, Object) _properties;
75     private Map!(Symbol, Object) _remoteProperties;
76 
77     private Object _context;
78     private CollectorImpl _collector;
79     private Reactor _reactor;
80 
81     private static Symbol[] EMPTY_SYMBOL_ARRAY;
82 
83     //static Symbol[]  EMPTY_SYMBOL_ARRAY () {
84     //    __gshared Symbol[]  inst;
85     //    return initOnce!inst(new TransportResultImpl(OK, null, null));
86     //}
87     /**
88      * Application code should use {@link hunt.proton.engine.Connection.Factory#create()} instead.
89      */
90     this() {
91         _maxChannels = MAX_CHANNELS;
92         _sessions = new ArrayList!SessionImpl();
93     }
94 
95     override SessionImpl session() {
96         SessionImpl session = new SessionImpl(this);
97         _sessions.add(session);
98 
99         return session;
100     }
101 
102     override void free() {
103         super.free();
104     }
105 
106     override int opCmp(ReactorChild o) {
107         ConnectionImpl other = cast(ConnectionImpl) o;
108         return cast(int)(this._localContainerId.hashOf - other._localContainerId.hashOf);
109     }
110 
111     void freeSession(SessionImpl session) {
112         _sessions.remove(session);
113     }
114 
115     LinkNode!SessionImpl addSessionEndpoint(SessionImpl endpoint) {
116         LinkNode!SessionImpl node;
117         if (_sessionHead is null) {
118             node = _sessionHead = _sessionTail = LinkNode!SessionImpl.newList!SessionImpl(
119                     endpoint);
120         } else {
121             node = _sessionTail = _sessionTail.addAtTail(endpoint);
122         }
123         return node;
124     }
125 
126     void removeSessionEndpoint(LinkNode!SessionImpl node) {
127         LinkNode!SessionImpl prev = node.getPrev();
128         LinkNode!SessionImpl next = node.getNext();
129 
130         if (_sessionHead == node) {
131             _sessionHead = next;
132         }
133         if (_sessionTail == node) {
134             _sessionTail = prev;
135         }
136         node.remove();
137     }
138 
139     LinkNode!LinkImpl addLinkEndpoint(LinkImpl endpoint) {
140         LinkNode!LinkImpl node;
141         if (_linkHead is null) {
142             node = _linkHead = _linkTail = LinkNode!LinkImpl.newList!LinkImpl(endpoint);
143         } else {
144             node = _linkTail = _linkTail.addAtTail(endpoint);
145         }
146         return node;
147     }
148 
149     void removeLinkEndpoint(LinkNode!LinkImpl node) {
150         LinkNode!LinkImpl prev = node.getPrev();
151         LinkNode!LinkImpl next = node.getNext();
152 
153         if (_linkHead == node) {
154             _linkHead = next;
155         }
156         if (_linkTail == node) {
157             _linkTail = prev;
158         }
159         node.remove();
160     }
161 
162     override Session sessionHead(Set!EndpointState local, Set!EndpointState remote) {
163         if (_sessionHead is null) {
164             return null;
165         } else {
166             Query!SessionImpl query = new EndpointImplQuery!SessionImpl(local, remote);
167             LinkNode!SessionImpl node = query.matches(_sessionHead)
168                 ? _sessionHead : _sessionHead.next(query);
169             return node is null ? null : node.getValue();
170         }
171     }
172 
173     override Link linkHead(Set!EndpointState local, Set!EndpointState remote) {
174         if (_linkHead is null) {
175             return null;
176         } else {
177             Query!LinkImpl query = new EndpointImplQuery!LinkImpl(local, remote);
178             LinkNode!LinkImpl node = query.matches(_linkHead) ? _linkHead : _linkHead.next(query);
179             return node is null ? null : node.getValue();
180         }
181     }
182 
183     override protected ConnectionImpl getConnectionImpl() {
184         return this;
185     }
186 
187     override void postFinal() {
188         put(Type.CONNECTION_FINAL, this);
189     }
190 
191     override void doFree() {
192         List!SessionImpl sessions = new ArrayList!SessionImpl(_sessions);
193         foreach (SessionImpl session; sessions) {
194             session.free();
195         }
196         _sessions = null;
197     }
198 
199     void modifyEndpoints() {
200         if (_sessions !is null) {
201             foreach (SessionImpl ssn; _sessions) {
202                 ssn.modifyEndpoints();
203             }
204         }
205         if (!freed) {
206             modified();
207         }
208     }
209 
210     void handleOpen(Open open) {
211         // TODO - store state
212         setRemoteState(EndpointState.ACTIVE);
213         setRemoteHostname(open.getHostname() is null ? null : open.getHostname().value);
214         setRemoteContainer(open.getContainerId() is null ? null : open.getContainerId().value);
215         if (open.getDesiredCapabilities() !is null) {
216             setRemoteDesiredCapabilities(open.getDesiredCapabilities().toArray);
217         }
218         if (open.getOfferedCapabilities() !is null) {
219             setRemoteOfferedCapabilities(open.getOfferedCapabilities().toArray);
220         }
221         setRemoteProperties(open.getProperties());
222         put(Type.CONNECTION_REMOTE_OPEN, this);
223     }
224 
225     EndpointImpl getTransportHead() {
226         return _transportHead;
227     }
228 
229     EndpointImpl getTransportTail() {
230         return _transportTail;
231     }
232 
233     void addModified(EndpointImpl endpoint) {
234         if (_transportTail is null) {
235             endpoint.setTransportNext(null);
236             endpoint.setTransportPrev(null);
237             _transportHead = _transportTail = endpoint;
238         } else {
239             _transportTail.setTransportNext(endpoint);
240             endpoint.setTransportPrev(_transportTail);
241             _transportTail = endpoint;
242             _transportTail.setTransportNext(null);
243         }
244     }
245 
246     void removeModified(EndpointImpl endpoint) {
247         if (_transportHead == endpoint) {
248             _transportHead = endpoint.transportNext();
249         } else {
250             endpoint.transportPrev().setTransportNext(endpoint.transportNext());
251         }
252 
253         if (_transportTail == endpoint) {
254             _transportTail = endpoint.transportPrev();
255         } else {
256             endpoint.transportNext().setTransportPrev(endpoint.transportPrev());
257         }
258     }
259 
260     override int getMaxChannels() {
261         return _maxChannels;
262     }
263 
264     string getLocalContainerId() {
265         return _localContainerId;
266     }
267 
268     override void setLocalContainerId(string localContainerId) {
269         _localContainerId = localContainerId;
270     }
271 
272     override DeliveryImpl getWorkHead() {
273         return _workHead;
274     }
275 
276     override void setContainer(string container) {
277         _localContainerId = container;
278     }
279 
280     override string getContainer() {
281         return _localContainerId;
282     }
283 
284     override void setHostname(string hostname) {
285         _localHostname = hostname;
286     }
287 
288     override string getRemoteContainer() {
289         return _remoteContainer;
290     }
291 
292     override string getRemoteHostname() {
293         return _remoteHostname;
294     }
295 
296     override void setOfferedCapabilities(Symbol[] capabilities) {
297         _offeredCapabilities = capabilities;
298     }
299 
300     override void setDesiredCapabilities(Symbol[] capabilities) {
301         _desiredCapabilities = capabilities;
302     }
303 
304     override Symbol[] getRemoteOfferedCapabilities() {
305         return _remoteOfferedCapabilities is null ? EMPTY_SYMBOL_ARRAY : _remoteOfferedCapabilities;
306     }
307 
308     override Symbol[] getRemoteDesiredCapabilities() {
309         return _remoteDesiredCapabilities is null ? EMPTY_SYMBOL_ARRAY : _remoteDesiredCapabilities;
310     }
311 
312     Symbol[] getOfferedCapabilities() {
313         return _offeredCapabilities;
314     }
315 
316     Symbol[] getDesiredCapabilities() {
317         return _desiredCapabilities;
318     }
319 
320     void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) {
321         _remoteOfferedCapabilities = remoteOfferedCapabilities;
322     }
323 
324     void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) {
325         _remoteDesiredCapabilities = remoteDesiredCapabilities;
326     }
327 
328     Map!(Symbol, Object) getProperties() {
329         return _properties;
330     }
331 
332     override void setProperties(Map!(Symbol, Object) properties) {
333         _properties = properties;
334     }
335 
336     override Map!(Symbol, Object) getRemoteProperties() {
337         return _remoteProperties;
338     }
339 
340     void setRemoteProperties(Map!(Symbol, Object) remoteProperties) {
341         _remoteProperties = remoteProperties;
342     }
343 
344     override string getHostname() {
345         return _localHostname;
346     }
347 
348     void setRemoteContainer(string remoteContainerId) {
349         _remoteContainer = remoteContainerId;
350     }
351 
352     void setRemoteHostname(string remoteHostname) {
353         _remoteHostname = remoteHostname;
354     }
355 
356     DeliveryImpl getWorkTail() {
357         return _workTail;
358     }
359 
360     void removeWork(DeliveryImpl delivery) {
361         if (!delivery._work)
362             return;
363 
364         DeliveryImpl next = delivery.getWorkNext();
365         DeliveryImpl prev = delivery.getWorkPrev();
366 
367         if (prev !is null) {
368             prev.setWorkNext(next);
369         }
370 
371         if (next !is null) {
372             next.setWorkPrev(prev);
373         }
374 
375         delivery.setWorkNext(null);
376         delivery.setWorkPrev(null);
377 
378         if (_workHead == delivery) {
379             _workHead = next;
380 
381         }
382 
383         if (_workTail == delivery) {
384             _workTail = prev;
385         }
386 
387         delivery._work = false;
388     }
389 
390     void addWork(DeliveryImpl delivery) {
391         if (delivery._work)
392             return;
393 
394         delivery.setWorkNext(null);
395         delivery.setWorkPrev(_workTail);
396 
397         if (_workTail !is null) {
398             _workTail.setWorkNext(delivery);
399         }
400 
401         _workTail = delivery;
402 
403         if (_workHead is null) {
404             _workHead = delivery;
405         }
406 
407         delivery._work = true;
408     }
409 
410     Iterator!DeliveryImpl getWorkSequence() {
411         return new WorkSequence(_workHead);
412     }
413 
414     void setTransport(TransportImpl transport) {
415         _transport = transport;
416     }
417 
418     override TransportImpl getTransport() {
419         return _transport;
420     }
421 
422     class WorkSequence : Iterator!DeliveryImpl {
423         private DeliveryImpl _next;
424 
425         this(DeliveryImpl workHead) {
426             _next = workHead;
427         }
428 
429         bool hasNext() {
430             return _next !is null;
431         }
432 
433         //override
434         //void remove()
435         //{
436         //    import
437         //   // throw new UnsupportedOperationException();
438         //}
439 
440         DeliveryImpl next() {
441             DeliveryImpl next = _next;
442             if (next !is null) {
443                 _next = next.getWorkNext();
444             }
445             return next;
446         }
447     }
448 
449     DeliveryImpl getTransportWorkHead() {
450         return _transportWorkHead;
451     }
452 
453     int getTransportWorkSize() {
454         return _transportWorkSize;
455     }
456 
457     void removeTransportWork(DeliveryImpl delivery) {
458         if (!delivery._transportWork)
459             return;
460 
461         DeliveryImpl next = delivery.getTransportWorkNext();
462         DeliveryImpl prev = delivery.getTransportWorkPrev();
463 
464         if (prev !is null) {
465             prev.setTransportWorkNext(next);
466         }
467 
468         if (next !is null) {
469             next.setTransportWorkPrev(prev);
470         }
471 
472         delivery.setTransportWorkNext(null);
473         delivery.setTransportWorkPrev(null);
474 
475         if (_transportWorkHead == delivery) {
476             _transportWorkHead = next;
477 
478         }
479 
480         if (_transportWorkTail == delivery) {
481             _transportWorkTail = prev;
482         }
483 
484         delivery._transportWork = false;
485         _transportWorkSize--;
486     }
487 
488     void addTransportWork(DeliveryImpl delivery) {
489         modified();
490         if (delivery._transportWork)
491             return;
492 
493         delivery.setTransportWorkNext(null);
494         delivery.setTransportWorkPrev(_transportWorkTail);
495 
496         if (_transportWorkTail !is null) {
497             _transportWorkTail.setTransportWorkNext(delivery);
498         }
499 
500         _transportWorkTail = delivery;
501 
502         if (_transportWorkHead is null) {
503             _transportWorkHead = delivery;
504         }
505 
506         delivery._transportWork = true;
507         _transportWorkSize++;
508     }
509 
510     void workUpdate(DeliveryImpl delivery) {
511         if (delivery !is null) {
512             if (!delivery.isSettled() && (delivery.isReadable()
513                     || delivery.isWritable() || delivery.isUpdated())) {
514                 addWork(delivery);
515             } else {
516                 removeWork(delivery);
517             }
518         }
519     }
520 
521     override Object getContext() {
522         return _context;
523     }
524 
525     override void setContext(Object context) {
526         _context = context;
527     }
528 
529     override void collect(Collector collector) {
530         _collector = cast(CollectorImpl) collector;
531 
532         put(Type.CONNECTION_INIT, this);
533 
534         LinkNode!SessionImpl ssn = _sessionHead;
535         while (ssn !is null) {
536             put(Type.SESSION_INIT, ssn.getValue());
537             ssn = ssn.getNext();
538         }
539 
540         LinkNode!LinkImpl lnk = _linkHead;
541         while (lnk !is null) {
542             put(Type.LINK_INIT, lnk.getValue());
543             lnk = lnk.getNext();
544         }
545     }
546 
547     EventImpl put(Type type, Object context) {
548         //logInfo("EventImpl put ############################### %d ", type.ordinal);
549         if (_collector !is null) {
550             // logInfo("EventImpl put in ###############################");
551             return _collector.put(type, context);
552         } else {
553             return null;
554         }
555     }
556 
557     override void localOpen() {
558         put(Type.CONNECTION_LOCAL_OPEN, this);
559     }
560 
561     override void localClose() {
562         put(Type.CONNECTION_LOCAL_CLOSE, this);
563     }
564 
565     override Reactor getReactor() {
566         return _reactor;
567     }
568 
569     void setReactor(Reactor reactor) {
570         _reactor = reactor;
571     }
572 }