1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.ConnectionImpl; 13 14 import hunt.collection.ArrayList; 15 import hunt.collection.Set; 16 import hunt.collection.List; 17 import hunt.collection.Map; 18 19 import hunt.proton.amqp.Symbol; 20 import hunt.proton.amqp.transport.Open; 21 import hunt.proton.engine.Collector; 22 import hunt.proton.engine.EndpointState; 23 import hunt.proton.engine.Event; 24 import hunt.proton.engine.Link; 25 import hunt.proton.engine.ProtonJConnection; 26 import hunt.proton.engine.Session; 27 import hunt.proton.engine.Reactor; 28 import hunt.proton.engine.impl.EndpointImpl; 29 import hunt.proton.engine.impl.SessionImpl; 30 import hunt.proton.engine.impl.LinkNode; 31 import hunt.proton.engine.impl.LinkImpl; 32 import hunt.proton.engine.impl.DeliveryImpl; 33 import hunt.proton.engine.impl.TransportImpl; 34 import hunt.proton.engine.impl.CollectorImpl; 35 import hunt.proton.engine.impl.EndpointImplQuery; 36 import hunt.collection.Iterator; 37 import hunt.proton.engine.impl.EventImpl; 38 import hunt.proton.engine.ReactorChild; 39 import hunt.logging; 40 41 42 /** 43 * 44 */ 45 class ConnectionImpl : EndpointImpl, ProtonJConnection { 46 static int MAX_CHANNELS = 65535; 47 48 private List!SessionImpl _sessions; // = new ArrayList<SessionImpl>(); 49 private EndpointImpl _transportTail; 50 private EndpointImpl _transportHead; 51 private int _maxChannels; //= MAX_CHANNELS; 52 53 private LinkNode!SessionImpl _sessionHead; 54 private LinkNode!SessionImpl _sessionTail; 55 56 private LinkNode!(LinkImpl) _linkHead; 57 private LinkNode!(LinkImpl) _linkTail; 58 59 private DeliveryImpl _workHead; 60 private DeliveryImpl _workTail; 61 62 private TransportImpl _transport; 63 private DeliveryImpl _transportWorkHead; 64 private DeliveryImpl _transportWorkTail; 65 private int _transportWorkSize = 0; 66 private string _localContainerId = ""; 67 private string _localHostname; 68 private string _remoteContainer; 69 private string _remoteHostname; 70 private Symbol[] _offeredCapabilities; 71 private Symbol[] _desiredCapabilities; 72 private Symbol[] _remoteOfferedCapabilities; 73 private Symbol[] _remoteDesiredCapabilities; 74 private Map!(Symbol, Object) _properties; 75 private Map!(Symbol, Object) _remoteProperties; 76 77 private Object _context; 78 private CollectorImpl _collector; 79 private Reactor _reactor; 80 81 private static Symbol[] EMPTY_SYMBOL_ARRAY; 82 83 //static Symbol[] EMPTY_SYMBOL_ARRAY () { 84 // __gshared Symbol[] inst; 85 // return initOnce!inst(new TransportResultImpl(OK, null, null)); 86 //} 87 /** 88 * Application code should use {@link hunt.proton.engine.Connection.Factory#create()} instead. 89 */ 90 this() { 91 _maxChannels = MAX_CHANNELS; 92 _sessions = new ArrayList!SessionImpl(); 93 } 94 95 override SessionImpl session() { 96 SessionImpl session = new SessionImpl(this); 97 _sessions.add(session); 98 99 return session; 100 } 101 102 override void free() { 103 super.free(); 104 } 105 106 override int opCmp(ReactorChild o) { 107 ConnectionImpl other = cast(ConnectionImpl) o; 108 return cast(int)(this._localContainerId.hashOf - other._localContainerId.hashOf); 109 } 110 111 void freeSession(SessionImpl session) { 112 _sessions.remove(session); 113 } 114 115 LinkNode!SessionImpl addSessionEndpoint(SessionImpl endpoint) { 116 LinkNode!SessionImpl node; 117 if (_sessionHead is null) { 118 node = _sessionHead = _sessionTail = LinkNode!SessionImpl.newList!SessionImpl( 119 endpoint); 120 } else { 121 node = _sessionTail = _sessionTail.addAtTail(endpoint); 122 } 123 return node; 124 } 125 126 void removeSessionEndpoint(LinkNode!SessionImpl node) { 127 LinkNode!SessionImpl prev = node.getPrev(); 128 LinkNode!SessionImpl next = node.getNext(); 129 130 if (_sessionHead == node) { 131 _sessionHead = next; 132 } 133 if (_sessionTail == node) { 134 _sessionTail = prev; 135 } 136 node.remove(); 137 } 138 139 LinkNode!LinkImpl addLinkEndpoint(LinkImpl endpoint) { 140 LinkNode!LinkImpl node; 141 if (_linkHead is null) { 142 node = _linkHead = _linkTail = LinkNode!LinkImpl.newList!LinkImpl(endpoint); 143 } else { 144 node = _linkTail = _linkTail.addAtTail(endpoint); 145 } 146 return node; 147 } 148 149 void removeLinkEndpoint(LinkNode!LinkImpl node) { 150 LinkNode!LinkImpl prev = node.getPrev(); 151 LinkNode!LinkImpl next = node.getNext(); 152 153 if (_linkHead == node) { 154 _linkHead = next; 155 } 156 if (_linkTail == node) { 157 _linkTail = prev; 158 } 159 node.remove(); 160 } 161 162 override Session sessionHead(Set!EndpointState local, Set!EndpointState remote) { 163 if (_sessionHead is null) { 164 return null; 165 } else { 166 Query!SessionImpl query = new EndpointImplQuery!SessionImpl(local, remote); 167 LinkNode!SessionImpl node = query.matches(_sessionHead) 168 ? _sessionHead : _sessionHead.next(query); 169 return node is null ? null : node.getValue(); 170 } 171 } 172 173 override Link linkHead(Set!EndpointState local, Set!EndpointState remote) { 174 if (_linkHead is null) { 175 return null; 176 } else { 177 Query!LinkImpl query = new EndpointImplQuery!LinkImpl(local, remote); 178 LinkNode!LinkImpl node = query.matches(_linkHead) ? _linkHead : _linkHead.next(query); 179 return node is null ? null : node.getValue(); 180 } 181 } 182 183 override protected ConnectionImpl getConnectionImpl() { 184 return this; 185 } 186 187 override void postFinal() { 188 put(Type.CONNECTION_FINAL, this); 189 } 190 191 override void doFree() { 192 List!SessionImpl sessions = new ArrayList!SessionImpl(_sessions); 193 foreach (SessionImpl session; sessions) { 194 session.free(); 195 } 196 _sessions = null; 197 } 198 199 void modifyEndpoints() { 200 if (_sessions !is null) { 201 foreach (SessionImpl ssn; _sessions) { 202 ssn.modifyEndpoints(); 203 } 204 } 205 if (!freed) { 206 modified(); 207 } 208 } 209 210 void handleOpen(Open open) { 211 // TODO - store state 212 setRemoteState(EndpointState.ACTIVE); 213 setRemoteHostname(open.getHostname() is null ? null : open.getHostname().value); 214 setRemoteContainer(open.getContainerId() is null ? null : open.getContainerId().value); 215 if (open.getDesiredCapabilities() !is null) { 216 setRemoteDesiredCapabilities(open.getDesiredCapabilities().toArray); 217 } 218 if (open.getOfferedCapabilities() !is null) { 219 setRemoteOfferedCapabilities(open.getOfferedCapabilities().toArray); 220 } 221 setRemoteProperties(open.getProperties()); 222 put(Type.CONNECTION_REMOTE_OPEN, this); 223 } 224 225 EndpointImpl getTransportHead() { 226 return _transportHead; 227 } 228 229 EndpointImpl getTransportTail() { 230 return _transportTail; 231 } 232 233 void addModified(EndpointImpl endpoint) { 234 if (_transportTail is null) { 235 endpoint.setTransportNext(null); 236 endpoint.setTransportPrev(null); 237 _transportHead = _transportTail = endpoint; 238 } else { 239 _transportTail.setTransportNext(endpoint); 240 endpoint.setTransportPrev(_transportTail); 241 _transportTail = endpoint; 242 _transportTail.setTransportNext(null); 243 } 244 } 245 246 void removeModified(EndpointImpl endpoint) { 247 if (_transportHead == endpoint) { 248 _transportHead = endpoint.transportNext(); 249 } else { 250 endpoint.transportPrev().setTransportNext(endpoint.transportNext()); 251 } 252 253 if (_transportTail == endpoint) { 254 _transportTail = endpoint.transportPrev(); 255 } else { 256 endpoint.transportNext().setTransportPrev(endpoint.transportPrev()); 257 } 258 } 259 260 override int getMaxChannels() { 261 return _maxChannels; 262 } 263 264 string getLocalContainerId() { 265 return _localContainerId; 266 } 267 268 override void setLocalContainerId(string localContainerId) { 269 _localContainerId = localContainerId; 270 } 271 272 override DeliveryImpl getWorkHead() { 273 return _workHead; 274 } 275 276 override void setContainer(string container) { 277 _localContainerId = container; 278 } 279 280 override string getContainer() { 281 return _localContainerId; 282 } 283 284 override void setHostname(string hostname) { 285 _localHostname = hostname; 286 } 287 288 override string getRemoteContainer() { 289 return _remoteContainer; 290 } 291 292 override string getRemoteHostname() { 293 return _remoteHostname; 294 } 295 296 override void setOfferedCapabilities(Symbol[] capabilities) { 297 _offeredCapabilities = capabilities; 298 } 299 300 override void setDesiredCapabilities(Symbol[] capabilities) { 301 _desiredCapabilities = capabilities; 302 } 303 304 override Symbol[] getRemoteOfferedCapabilities() { 305 return _remoteOfferedCapabilities is null ? EMPTY_SYMBOL_ARRAY : _remoteOfferedCapabilities; 306 } 307 308 override Symbol[] getRemoteDesiredCapabilities() { 309 return _remoteDesiredCapabilities is null ? EMPTY_SYMBOL_ARRAY : _remoteDesiredCapabilities; 310 } 311 312 Symbol[] getOfferedCapabilities() { 313 return _offeredCapabilities; 314 } 315 316 Symbol[] getDesiredCapabilities() { 317 return _desiredCapabilities; 318 } 319 320 void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) { 321 _remoteOfferedCapabilities = remoteOfferedCapabilities; 322 } 323 324 void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) { 325 _remoteDesiredCapabilities = remoteDesiredCapabilities; 326 } 327 328 Map!(Symbol, Object) getProperties() { 329 return _properties; 330 } 331 332 override void setProperties(Map!(Symbol, Object) properties) { 333 _properties = properties; 334 } 335 336 override Map!(Symbol, Object) getRemoteProperties() { 337 return _remoteProperties; 338 } 339 340 void setRemoteProperties(Map!(Symbol, Object) remoteProperties) { 341 _remoteProperties = remoteProperties; 342 } 343 344 override string getHostname() { 345 return _localHostname; 346 } 347 348 void setRemoteContainer(string remoteContainerId) { 349 _remoteContainer = remoteContainerId; 350 } 351 352 void setRemoteHostname(string remoteHostname) { 353 _remoteHostname = remoteHostname; 354 } 355 356 DeliveryImpl getWorkTail() { 357 return _workTail; 358 } 359 360 void removeWork(DeliveryImpl delivery) { 361 if (!delivery._work) 362 return; 363 364 DeliveryImpl next = delivery.getWorkNext(); 365 DeliveryImpl prev = delivery.getWorkPrev(); 366 367 if (prev !is null) { 368 prev.setWorkNext(next); 369 } 370 371 if (next !is null) { 372 next.setWorkPrev(prev); 373 } 374 375 delivery.setWorkNext(null); 376 delivery.setWorkPrev(null); 377 378 if (_workHead == delivery) { 379 _workHead = next; 380 381 } 382 383 if (_workTail == delivery) { 384 _workTail = prev; 385 } 386 387 delivery._work = false; 388 } 389 390 void addWork(DeliveryImpl delivery) { 391 if (delivery._work) 392 return; 393 394 delivery.setWorkNext(null); 395 delivery.setWorkPrev(_workTail); 396 397 if (_workTail !is null) { 398 _workTail.setWorkNext(delivery); 399 } 400 401 _workTail = delivery; 402 403 if (_workHead is null) { 404 _workHead = delivery; 405 } 406 407 delivery._work = true; 408 } 409 410 Iterator!DeliveryImpl getWorkSequence() { 411 return new WorkSequence(_workHead); 412 } 413 414 void setTransport(TransportImpl transport) { 415 _transport = transport; 416 } 417 418 override TransportImpl getTransport() { 419 return _transport; 420 } 421 422 class WorkSequence : Iterator!DeliveryImpl { 423 private DeliveryImpl _next; 424 425 this(DeliveryImpl workHead) { 426 _next = workHead; 427 } 428 429 bool hasNext() { 430 return _next !is null; 431 } 432 433 //override 434 //void remove() 435 //{ 436 // import 437 // // throw new UnsupportedOperationException(); 438 //} 439 440 DeliveryImpl next() { 441 DeliveryImpl next = _next; 442 if (next !is null) { 443 _next = next.getWorkNext(); 444 } 445 return next; 446 } 447 } 448 449 DeliveryImpl getTransportWorkHead() { 450 return _transportWorkHead; 451 } 452 453 int getTransportWorkSize() { 454 return _transportWorkSize; 455 } 456 457 void removeTransportWork(DeliveryImpl delivery) { 458 if (!delivery._transportWork) 459 return; 460 461 DeliveryImpl next = delivery.getTransportWorkNext(); 462 DeliveryImpl prev = delivery.getTransportWorkPrev(); 463 464 if (prev !is null) { 465 prev.setTransportWorkNext(next); 466 } 467 468 if (next !is null) { 469 next.setTransportWorkPrev(prev); 470 } 471 472 delivery.setTransportWorkNext(null); 473 delivery.setTransportWorkPrev(null); 474 475 if (_transportWorkHead == delivery) { 476 _transportWorkHead = next; 477 478 } 479 480 if (_transportWorkTail == delivery) { 481 _transportWorkTail = prev; 482 } 483 484 delivery._transportWork = false; 485 _transportWorkSize--; 486 } 487 488 void addTransportWork(DeliveryImpl delivery) { 489 modified(); 490 if (delivery._transportWork) 491 return; 492 493 delivery.setTransportWorkNext(null); 494 delivery.setTransportWorkPrev(_transportWorkTail); 495 496 if (_transportWorkTail !is null) { 497 _transportWorkTail.setTransportWorkNext(delivery); 498 } 499 500 _transportWorkTail = delivery; 501 502 if (_transportWorkHead is null) { 503 _transportWorkHead = delivery; 504 } 505 506 delivery._transportWork = true; 507 _transportWorkSize++; 508 } 509 510 void workUpdate(DeliveryImpl delivery) { 511 if (delivery !is null) { 512 if (!delivery.isSettled() && (delivery.isReadable() 513 || delivery.isWritable() || delivery.isUpdated())) { 514 addWork(delivery); 515 } else { 516 removeWork(delivery); 517 } 518 } 519 } 520 521 override Object getContext() { 522 return _context; 523 } 524 525 override void setContext(Object context) { 526 _context = context; 527 } 528 529 override void collect(Collector collector) { 530 _collector = cast(CollectorImpl) collector; 531 532 put(Type.CONNECTION_INIT, this); 533 534 LinkNode!SessionImpl ssn = _sessionHead; 535 while (ssn !is null) { 536 put(Type.SESSION_INIT, ssn.getValue()); 537 ssn = ssn.getNext(); 538 } 539 540 LinkNode!LinkImpl lnk = _linkHead; 541 while (lnk !is null) { 542 put(Type.LINK_INIT, lnk.getValue()); 543 lnk = lnk.getNext(); 544 } 545 } 546 547 EventImpl put(Type type, Object context) { 548 //logInfo("EventImpl put ############################### %d ", type.ordinal); 549 if (_collector !is null) { 550 // logInfo("EventImpl put in ###############################"); 551 return _collector.put(type, context); 552 } else { 553 return null; 554 } 555 } 556 557 override void localOpen() { 558 put(Type.CONNECTION_LOCAL_OPEN, this); 559 } 560 561 override void localClose() { 562 put(Type.CONNECTION_LOCAL_CLOSE, this); 563 } 564 565 override Reactor getReactor() { 566 return _reactor; 567 } 568 569 void setReactor(Reactor reactor) { 570 _reactor = reactor; 571 } 572 }