| CODENOTIFIER | HelpYou are not signed inSign in |
Project: Adobe BlazeDS
Revision: 3091
Author: matamel@adobe.com
Date: 04 Sep 2008 10:53:43
Changes:Feature: Message throttling - message frequency limits subfeature.
QA: Yes
Doc: Not yet
Checkintests: Pass
Details: This is the BlazeDS server part of the message frequency limits sub-feature of Message throttling feature. These are the main changes:
- A new throttling log category.
- A new OutboundQueueThrottleManager for FlexClientOutboundQueueProcessor that handles outgoing client level message throttling (instead of ThrottleManager)
- Clean-up of existing ThrottleManager.
- Expose methods to get/set a FlexClientOutboundQueueProcessor factory on MessageBroker.
- Added support for maxFrequency handling at Consumer and Consumer subcription levels on the server.
| ... | ...@@ -264,7 +264,9 @@ | |
| 264 | 264 | ns.setSubscriptionTimeoutMinutes(subscriptionTimeoutMinutes); |
| 265 | 265 | |
| 266 | 266 | // Throttle Settings |
| 267 | throttle(ns.getThrottleSettings(), network); | |
| 267 | ThrottleSettings ts = ns.getThrottleSettings(); | |
| 268 | ts.setDestinationName(getId()); | |
| 269 | throttle(ts, network); | |
| 268 | 270 | |
| 269 | 271 | setNetworkSettings(ns); |
| 270 | 272 | } |
| ... | ...@@ -273,42 +273,25 @@ | |
| 273 | 273 | public FlexClientOutboundQueueProcessor createOutboundQueueProcessor(FlexClient flexClient, String endpointId) |
| 274 | 274 | { |
| 275 | 275 | FlexClientOutboundQueueProcessor processor = null; |
| 276 | ||
| 277 | try | |
| 278 | { | |
| 279 | Endpoint endpoint = broker.getEndpoint(endpointId); | |
| 280 | if (endpoint instanceof AbstractEndpoint) | |
| 281 | { | |
| 282 | Class processorClass = ((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorClass(); | |
| 283 | if (processorClass != null) | |
| 284 | { | |
| 285 | Object instance = ClassUtil.createDefaultInstance(processorClass, null); | |
| 286 | if (instance instanceof FlexClientOutboundQueueProcessor) | |
| 287 | { | |
| 288 | processor = (FlexClientOutboundQueueProcessor)instance; | |
| 289 | processor.setFlexClient(flexClient); | |
| 290 | processor.setEndpointId(endpointId); | |
| 291 | processor.initialize(((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorConfig()); | |
| 292 | } | |
| 293 | } | |
| 294 | } | |
| 295 | } | |
| 296 | catch (Throwable t) | |
| 297 | { | |
| 298 | if (Log.isWarn()) | |
| 299 | Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn("Failed to create custom outbound queue processor for FlexClient with id '" + flexClient.getId() + "'. Using default queue processor.", t); | |
| 300 | } | |
| 301 | ||
| 276 | ||
| 277 | // First, try to create a custom outbound queue processor, if one exists. | |
| 278 | processor = createCustomOutboundQueueProcessor(flexClient, endpointId, processor); | |
| 279 | ||
| 280 | // If no custom processor, then try to create MessageBroker's default queue processor. | |
| 281 | if (processor == null) | |
| 282 | processor = createBrokerOutboundQueueProcessor(flexClient, endpointId, processor); | |
| 283 | ||
| 284 | // If MessageBroker's default queue processor fails, use the default processor. | |
| 302 | 285 | if (processor == null) |
| 303 | 286 | { |
| 304 | 287 | processor = new FlexClientOutboundQueueProcessor(); |
| 305 | 288 | processor.setFlexClient(flexClient); |
| 306 | 289 | processor.setEndpointId(endpointId); |
| 307 | } | |
| 308 | ||
| 290 | } | |
| 291 | ||
| 309 | 292 | return processor; |
| 310 | 293 | } |
| 311 | ||
| 294 | ||
| 312 | 295 | /** |
| 313 | 296 | * @exclude |
| 314 | 297 | * Monitors an async poll for a FlexClient for timeout. |
| ... | ...@@ -421,4 +404,67 @@ | |
| 421 | 404 | } |
| 422 | 405 | } |
| 423 | 406 | } |
| 407 | ||
| 408 | //-------------------------------------------------------------------------- | |
| 409 | // | |
| 410 | // Private Methods | |
| 411 | // | |
| 412 | //-------------------------------------------------------------------------- | |
| 413 | ||
| 414 | private FlexClientOutboundQueueProcessor createBrokerOutboundQueueProcessor( | |
| 415 | FlexClient flexClient, String endpointId, | |
| 416 | FlexClientOutboundQueueProcessor processor) | |
| 417 | { | |
| 418 | Class processorClass = broker.getFlexClientOutboundQueueProcessorClass(); | |
| 419 | if (processorClass != null) | |
| 420 | { | |
| 421 | try | |
| 422 | { | |
| 423 | Object instance = ClassUtil.createDefaultInstance(processorClass, null); | |
| 424 | if (instance instanceof FlexClientOutboundQueueProcessor) | |
| 425 | { | |
| 426 | processor = (FlexClientOutboundQueueProcessor)instance; | |
| 427 | processor.setFlexClient(flexClient); | |
| 428 | processor.setEndpointId(endpointId); | |
| 429 | } | |
| 430 | } | |
| 431 | catch (Throwable t) | |
| 432 | { | |
| 433 | if (Log.isWarn()) | |
| 434 | Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn("Failed to create MessageBroker's outbound queue processor for FlexClient with id '" + flexClient.getId() + "'. Using default queue processor.", t); | |
| 435 | } | |
| 436 | } | |
| 437 | return processor; | |
| 438 | } | |
| 439 | ||
| 440 | private FlexClientOutboundQueueProcessor createCustomOutboundQueueProcessor( | |
| 441 | FlexClient flexClient, String endpointId, | |
| 442 | FlexClientOutboundQueueProcessor processor) | |
| 443 | { | |
| 444 | Endpoint endpoint = broker.getEndpoint(endpointId); | |
| 445 | if (endpoint instanceof AbstractEndpoint) | |
| 446 | { | |
| 447 | Class processorClass = ((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorClass(); | |
| 448 | if (processorClass != null) | |
| 449 | { | |
| 450 | try | |
| 451 | { | |
| 452 | Object instance = ClassUtil.createDefaultInstance(processorClass, null); | |
| 453 | if (instance instanceof FlexClientOutboundQueueProcessor) | |
| 454 | { | |
| 455 | processor = (FlexClientOutboundQueueProcessor)instance; | |
| 456 | processor.setFlexClient(flexClient); | |
| 457 | processor.setEndpointId(endpointId); | |
| 458 | processor.initialize(((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorConfig()); | |
| 459 | } | |
| 460 | } | |
| 461 | catch (Throwable t) | |
| 462 | { | |
| 463 | if (Log.isWarn()) | |
| 464 | Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn("Failed to create custom outbound queue processor for FlexClient with id '" + flexClient.getId() + "'. Using MessageBroker's default queue processor.", t); | |
| 465 | } | |
| 466 | } | |
| 467 | } | |
| 468 | return processor; | |
| 469 | } | |
| 424 | 470 | } |
| 425 | 471 | \ No newline at end of file |
| ... | ...@@ -0,0 +1,98 @@ | |
| 1 | /************************************************************************* | |
| 2 | * | |
| 3 | * ADOBE CONFIDENTIAL | |
| 4 | * __________________ | |
| 5 | * | |
| 6 | * Copyright 2008 Adobe Systems Incorporated | |
| 7 | * All Rights Reserved. | |
| 8 | * | |
| 9 | * NOTICE: All information contained herein is, and remains | |
| 10 | * the property of Adobe Systems Incorporated and its suppliers, | |
| 11 | * if any. The intellectual and technical concepts contained | |
| 12 | * herein are proprietary to Adobe Systems Incorporated and its | |
| 13 | * suppliers and may be covered by U.S. and Foreign Patents, | |
| 14 | * patents in process, and are protected by trade secret or copyright law. | |
| 15 | * Dissemination of this information or reproduction of this material | |
| 16 | * is strictly forbidden unless prior written permission is obtained | |
| 17 | * from Adobe Systems Incorporated. | |
| 18 | **************************************************************************/ | |
| 19 | package flex.messaging.services.messaging; | |
| 20 | ||
| 21 | import flex.messaging.config.ThrottleSettings.Policy; | |
| 22 | import flex.messaging.services.messaging.ThrottleManager.ThrottleResult; | |
| 23 | import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result; | |
| 24 | ||
| 25 | /** | |
| 26 | * This class is used by ThrottleManager and FlexClientOutboundQueueProcessor | |
| 27 | * to keep track of inbound and outbound message rates per destination and | |
| 28 | * per client-subscription. | |
| 29 | */ | |
| 30 | public class MessageFrequency | |
| 31 | { | |
| 32 | private final String id; | |
| 33 | public final int messageHistorySize; | |
| 34 | private int messageCount; | |
| 35 | private long [] previousMessageTimes; | |
| 36 | ||
| 37 | /** | |
| 38 | * Creates a new MessageFrequency with the specified id. | |
| 39 | * | |
| 40 | * @param id Either the destination or the client id associated with the MessageFrequency. | |
| 41 | * @param messageHistorySize The number of messages to use in calculating message rates. | |
| 42 | */ | |
| 43 | public MessageFrequency(String id, int messageHistorySize) | |
| 44 | { | |
| 45 | this.id = id; | |
| 46 | this.messageHistorySize = messageHistorySize; | |
| 47 | messageCount = 0; | |
| 48 | previousMessageTimes = new long[messageHistorySize]; | |
| 49 | } | |
| 50 | ||
| 51 | /** | |
| 52 | * Returns the id associated with the MessageFrequency. | |
| 53 | * | |
| 54 | * @return The id associated with the MessageFrequency. | |
| 55 | */ | |
| 56 | public String getId() | |
| 57 | { | |
| 58 | return id; | |
| 59 | } | |
| 60 | ||
| 61 | /** | |
| 62 | * Given a message timestamp and a maximum frequency, checks that the message | |
| 63 | * is not exceeding the max frequency limit. If message exceeds the limit, | |
| 64 | * returns a throttle result object that is appropriate for the passed in policy. | |
| 65 | * | |
| 66 | * @param messageTimestamp The message timestamp. | |
| 67 | * @param maxFrequency The maximum frequency to enforce. | |
| 68 | * @param policy The throttling policy. | |
| 69 | */ | |
| 70 | public ThrottleResult checkLimit(long messageTimestamp, int maxFrequency, Policy policy) | |
| 71 | { | |
| 72 | if (maxFrequency > 0) | |
| 73 | { | |
| 74 | // If we have enough messages to start testing. | |
| 75 | if (messageCount >= messageHistorySize) | |
| 76 | { | |
| 77 | // Time delay between this message and the last N messages. | |
| 78 | long interval = (messageTimestamp - previousMessageTimes[messageCount % messageHistorySize]) / 1000; | |
| 79 | long actualFrequency = messageHistorySize / (interval > 0 ? interval : 1); | |
| 80 | // If the rate is too high, toss this message and do not record it, | |
| 81 | // so the history represents the rate of messages actually delivered. | |
| 82 | if (actualFrequency >= maxFrequency) | |
| 83 | { | |
| 84 | Result result = ThrottleManager.getResult(policy); | |
| 85 | String detail = "[actual-frequency=" + actualFrequency + ", max-frequency=" + maxFrequency + "]"; | |
| 86 | return new ThrottleResult(result, detail); | |
| 87 | } | |
| 88 | } | |
| 89 | // Handle integer wrap | |
| 90 | if (messageCount == Integer.MAX_VALUE) | |
| 91 | messageCount = 0; | |
| 92 | // Increase the messageCount and update the message times. | |
| 93 | previousMessageTimes[messageCount++ % messageHistorySize] = messageTimestamp; | |
| 94 | } | |
| 95 | // Return the default OK result. | |
| 96 | return new ThrottleResult(); | |
| 97 | } | |
| 98 | } |
| ... | ...@@ -19,12 +19,12 @@ | |
| 19 | 19 | package flex.messaging.log; |
| 20 | 20 | |
| 21 | 21 | /** |
| 22 | * @exclude | |
| 23 | * | |
| 22 | 24 | * This class contains all the log categories used in our classes. When adding |
| 23 | 25 | * a new log category, make sure the sample configuration file is updated |
| 24 | 26 | * as well. |
| 25 | * | |
| 26 | * @author matamel | |
| 27 | * @exclude | |
| 27 | * | |
| 28 | 28 | */ |
| 29 | 29 | public interface LogCategories |
| 30 | 30 | { |
| ... | ...@@ -75,6 +75,7 @@ | |
| 75 | 75 | String SERVICE_DATA_TRANSACTION = "Service.Data.Transaction"; |
| 76 | 76 | String SERVICE_HTTP = "Service.HTTP"; |
| 77 | 77 | String SERVICE_MESSAGE = "Service.Message"; |
| 78 | String SERVICE_MESSAGE_THROTTLE = "Service.Message.Throttle"; | |
| 78 | 79 | String SERVICE_MESSAGE_JMS = "Service.Message.JMS"; |
| 79 | 80 | String SERVICE_REMOTING = "Service.Remoting"; |
| 80 | 81 |
| ... | ...@@ -42,7 +42,6 @@ | |
| 42 | 42 | import flex.messaging.services.messaging.RemoteSubscriptionManager; |
| 43 | 43 | import flex.messaging.services.messaging.SubscriptionManager; |
| 44 | 44 | import flex.messaging.services.messaging.Subtopic; |
| 45 | import flex.messaging.services.messaging.ThrottleManager.ThrottleResult; | |
| 46 | 45 | import flex.messaging.services.messaging.adapters.MessagingAdapter; |
| 47 | 46 | import flex.messaging.services.messaging.selector.JMSSelector; |
| 48 | 47 | import flex.messaging.util.StringUtils; |
| ... | ...@@ -217,24 +216,11 @@ | |
| 217 | 216 | |
| 218 | 217 | // Throttle the inbound message - this also attempts to prevent duplicate |
| 219 | 218 | // messages sent by a client. |
| 220 | ThrottleResult throttleResult; | |
| 219 | boolean throttled = false; | |
| 221 | 220 | if (throttle) |
| 222 | throttleResult = dest.getThrottleManager().throttleIncomingMessage(message); | |
| 223 | else | |
| 224 | throttleResult = new ThrottleResult(); | |
| 221 | throttled = dest.getThrottleManager().throttleIncomingMessage(message); | |
| 225 | 222 | |
| 226 | ThrottleResult.Result res = throttleResult.getResult(); | |
| 227 | MessageException me = throttleResult.getException(); | |
| 228 | if (res == ThrottleResult.Result.ERROR) | |
| 229 | { | |
| 230 | throw me; | |
| 231 | } | |
| 232 | else if (res == ThrottleResult.Result.IGNORE) | |
| 233 | { | |
| 234 | if (Log.isDebug()) | |
| 235 | Log.getLogger(LOG_CATEGORY).debug(me.getMessage(), me); | |
| 236 | } | |
| 237 | else | |
| 223 | if (!throttled) | |
| 238 | 224 | { |
| 239 | 225 | // Block any sent messages that have a subtopic header containing |
| 240 | 226 | // wildcards - wildcards are only supported in subscribe/unsubscribe |
| ... | ...@@ -530,19 +516,8 @@ | |
| 530 | 516 | FlexContext.setMessageRoutedNotifier(routingNotifier); |
| 531 | 517 | |
| 532 | 518 | // Throttle outgoing at the destination level |
| 533 | ThrottleResult throttleResult = destination.getThrottleManager().throttleOutgoingMessage(message, null); | |
| 534 | ThrottleResult.Result res = throttleResult.getResult(); | |
| 535 | MessageException me = throttleResult.getException(); | |
| 536 | if (res == ThrottleResult.Result.ERROR) | |
| 537 | { | |
| 538 | throw me; | |
| 539 | } | |
| 540 | else if (res == ThrottleResult.Result.IGNORE) | |
| 541 | { | |
| 542 | if (Log.isDebug()) | |
| 543 | Log.getLogger(LOG_CATEGORY).debug(me.getMessage(), me); | |
| 544 | } | |
| 545 | else | |
| 519 | boolean throttled = destination.getThrottleManager().throttleOutgoingMessage(message); | |
| 520 | if (!throttled) | |
| 546 | 521 | { |
| 547 | 522 | SubscriptionManager subscriptionManager = destination.getSubscriptionManager(); |
| 548 | 523 | |
| ... | ...@@ -560,7 +535,7 @@ | |
| 560 | 535 | continue; |
| 561 | 536 | } |
| 562 | 537 | |
| 563 | pushMessageToClient(client, destination, message, evalSelector, throttleResult); | |
| 538 | pushMessageToClient(client, destination, message, evalSelector); | |
| 564 | 539 | } |
| 565 | 540 | } |
| 566 | 541 | |
| ... | ...@@ -576,7 +551,7 @@ | |
| 576 | 551 | } |
| 577 | 552 | |
| 578 | 553 | void pushMessageToClient(MessageClient client, MessageDestination destination, Message message, |
| 579 | boolean evalSelector, ThrottleResult throttleResult) | |
| 554 | boolean evalSelector) | |
| 580 | 555 | { |
| 581 | 556 | // Normally we'll process the message selector criteria as part of fetching the |
| 582 | 557 | // clients which should receive this message. However, because the API exposed the evalSelecor flag |
| ... | ...@@ -589,68 +564,47 @@ | |
| 589 | 564 | return; |
| 590 | 565 | } |
| 591 | 566 | |
| 592 | if (throttleResult.getResult() == ThrottleResult.Result.OK) | |
| 567 | // Push the message to the client. Note that client level outbound throttling | |
| 568 | // might still happen at the FlexClientOutboundQueueProcessor level. | |
| 569 | try | |
| 593 | 570 | { |
| 594 | // Throttle at client level. | |
| 595 | throttleResult = destination.getThrottleManager().throttleOutgoingMessage(message, client.getClientId()); | |
| 596 | ||
| 597 | ThrottleResult.Result res = throttleResult.getResult(); | |
| 598 | MessageException me = throttleResult.getException(); | |
| 599 | if (res == ThrottleResult.Result.ERROR) | |
| 600 | { | |
| 601 | // Log these, but they are not propagated to subscriber, as that would defeat | |
| 602 | // the purpose of throttling outbound messages to that subscriber client | |
| 603 | Log.getLogger(LOG_CATEGORY).error(me.getMessage(), me); | |
| 604 | } | |
| 605 | else if (res == ThrottleResult.Result.IGNORE) | |
| 606 | { | |
| 607 | if (Log.isDebug()) | |
| 608 | Log.getLogger(LOG_CATEGORY).debug(me.getMessage(), me); | |
| 609 | } | |
| 610 | else | |
| 611 | { | |
| 612 | // Push the message. | |
| 613 | try | |
| 614 | { | |
| 615 | // Only update client last use if the message is not a pushed server command. | |
| 616 | if (!(message instanceof CommandMessage)) | |
| 617 | client.updateLastUse(); | |
| 618 | ||
| 619 | // Remove any data in the base message that should not be included in the multicast copies. | |
| 620 | Map messageHeaders = message.getHeaders(); | |
| 621 | messageHeaders.remove(Message.FLEX_CLIENT_ID_HEADER); | |
| 622 | messageHeaders.remove(Message.ENDPOINT_HEADER); | |
| 623 | ||
| 624 | // FIXME: [Pete] Investigate whether this is a performance issue. | |
| 625 | // We also need to ensure message ids do not expose FlexClient ids | |
| 626 | //message.setMessageId(UUIDUtils.createUUID()); | |
| 627 | ||
| 628 | // We need a unique instance of the message for each client; both to prevent | |
| 629 | // outbound queue processing for various clients from interfering with each other | |
| 630 | // as well as needing to target the copy of the message to a specific MessageAgent | |
| 631 | // instance on the client. | |
| 632 | Message messageForClient = (Message)message.clone(); | |
| 633 | ||
| 634 | // the MPIUTil call will be a no-op if MPI is not enabled. Otherwise it will add | |
| 635 | // a server pre-push processing timestamp to the MPI object | |
| 636 | MessagePerformanceUtils.markServerPrePushTime(message); | |
| 637 | MessagePerformanceUtils.markServerPostAdapterTime(message); | |
| 638 | MessagePerformanceUtils.markServerPostAdapterExternalTime(message); | |
| 571 | // Only update client last use if the message is not a pushed server command. | |
| 572 | if (!(message instanceof CommandMessage)) | |
| 573 | client.updateLastUse(); | |
| 574 | ||
| 575 | // Remove any data in the base message that should not be included in the multicast copies. | |
| 576 | Map messageHeaders = message.getHeaders(); | |
| 577 | messageHeaders.remove(Message.FLEX_CLIENT_ID_HEADER); | |
| 578 | messageHeaders.remove(Message.ENDPOINT_HEADER); | |
| 579 | ||
| 580 | // FIXME: [Pete] Investigate whether this is a performance issue. | |
| 581 | // We also need to ensure message ids do not expose FlexClient ids | |
| 582 | //message.setMessageId(UUIDUtils.createUUID()); | |
| 583 | ||
| 584 | // We need a unique instance of the message for each client; both to prevent | |
| 585 | // outbound queue processing for various clients from interfering with each other | |
| 586 | // as well as needing to target the copy of the message to a specific MessageAgent | |
| 587 | // instance on the client. | |
| 588 | Message messageForClient = (Message)message.clone(); | |
| 589 | ||
| 590 | // the MPIUTil call will be a no-op if MPI is not enabled. Otherwise it will add | |
| 591 | // a server pre-push processing timestamp to the MPI object | |
| 592 | MessagePerformanceUtils.markServerPrePushTime(message); | |
| 593 | MessagePerformanceUtils.markServerPostAdapterTime(message); | |
| 594 | MessagePerformanceUtils.markServerPostAdapterExternalTime(message); | |
| 639 | 595 | |
| 640 | // Target the message to a specific MessageAgent on the client. | |
| 641 | messageForClient.setClientId(client.getClientId()); | |
| 596 | // Target the message to a specific MessageAgent on the client. | |
| 597 | messageForClient.setClientId(client.getClientId()); | |
| 642 | 598 | |
| 643 | if (Log.isDebug()) | |
| 644 | Log.getLogger(MessageService.LOG_CATEGORY).debug("Routing message to FlexClient id:" + client.getFlexClient().getId() + "', MessageClient id: " + client.getClientId()); | |
| 599 | if (Log.isDebug()) | |
| 600 | Log.getLogger(MessageService.LOG_CATEGORY).debug("Routing message to FlexClient id:" + client.getFlexClient().getId() + "', MessageClient id: " + client.getClientId()); | |
| 645 | 601 | |
| 646 | getMessageBroker().routeMessageToMessageClient(messageForClient, client); | |
| 647 | } | |
| 648 | catch (MessageException ignore) | |
| 649 | { | |
| 650 | // Client is subscribed but has disconnected or the network failed. | |
| 651 | // There's nothing we can do to correct this so just continue server processing. | |
| 652 | } | |
| 653 | } | |
| 602 | getMessageBroker().routeMessageToMessageClient(messageForClient, client); | |
| 603 | } | |
| 604 | catch (MessageException ignore) | |
| 605 | { | |
| 606 | // Client is subscribed but has disconnected or the network failed. | |
| 607 | // There's nothing we can do to correct this so just continue server processing. | |
| 654 | 608 | } |
| 655 | 609 | } |
| 656 | 610 | |
| ... | ...@@ -906,7 +860,10 @@ | |
| 906 | 860 | * system so keeping them in sync is potentially problematic. Also, it |
| 907 | 861 | * seems like the adapter should have the option to manage endpoints themselves? |
| 908 | 862 | */ |
| 909 | subscriptionManager.addSubscriber(clientId, selectorExpr, subtopicString, endpointId); | |
| 863 | ||
| 864 | // Extract the maxFrequency that might have been specified by the client. | |
| 865 | int maxFrequency = processMaxFrequencyHeader(command); | |
| 866 | subscriptionManager.addSubscriber(clientId, selectorExpr, subtopicString, endpointId, maxFrequency); | |
| 910 | 867 | } |
| 911 | 868 | finally |
| 912 | 869 | { |
| ... | ...@@ -971,18 +928,32 @@ | |
| 971 | 928 | |
| 972 | 929 | if (adds != null) |
| 973 | 930 | { |
| 931 | // Extract the maxFrequency that might have been specified | |
| 932 | // by the client for every subscription (selector/subtopic). | |
| 933 | int maxFrequency = processMaxFrequencyHeader(command); | |
| 974 | 934 | for (int i = 0; i < adds.length; i++) |
| 975 | 935 | { |
| 936 | // Use the maxFrequency by default. | |
| 937 | int maxFrequencyPerSubscription = maxFrequency; | |
| 976 | 938 | String ss = (String) adds[i]; |
| 977 | 939 | int ix = ss.indexOf(CommandMessage.SUBTOPIC_SEPARATOR); |
| 978 | 940 | if (ix != -1) |
| 979 | 941 | { |
| 980 | 942 | String subtopic = (ix == 0 ? null : ss.substring(0, ix)); |
| 981 | String selector = ss.substring(ix+CommandMessage.SUBTOPIC_SEPARATOR.length()); | |
| 982 | if (selector.length() == 0) | |
| 983 | selector = null; | |
| 984 | ||
| 985 | subscriptionManager.addSubscriber(clientId, selector, subtopic, endpointId); | |
| 943 | String selector = null; | |
| 944 | String selectorAndMaxFrequency = ss.substring(ix+CommandMessage.SUBTOPIC_SEPARATOR.length()); | |
| 945 | if (selectorAndMaxFrequency.length() != 0) | |
| 946 | { | |
| 947 | int ix2 = selectorAndMaxFrequency.indexOf(CommandMessage.SUBTOPIC_SEPARATOR); | |
| 948 | if (ix2 != -1) | |
| 949 | { | |
| 950 | selector = (ix2 == 0? null : selectorAndMaxFrequency.substring(0, ix2)); | |
| 951 | String maxFrequencyString = selectorAndMaxFrequency.substring(ix2 + CommandMessage.SUBTOPIC_SEPARATOR.length()); | |
| 952 | if (maxFrequencyString.length() != 0) | |
| 953 | maxFrequencyPerSubscription = Integer.parseInt(maxFrequencyString); | |
| 954 | } | |
| 955 | } | |
| 956 | subscriptionManager.addSubscriber(clientId, selector, subtopic, endpointId, maxFrequencyPerSubscription); | |
| 986 | 957 | } |
| 987 | 958 | // invalid message |
| 988 | 959 | } |
| ... | ...@@ -1073,7 +1044,6 @@ | |
| 1073 | 1044 | return replyMessage; |
| 1074 | 1045 | } |
| 1075 | 1046 | |
| 1076 | ||
| 1077 | 1047 | /** |
| 1078 | 1048 | * Returns the log category of the <code>MessageService</code>. |
| 1079 | 1049 | * |
| ... | ...@@ -1120,6 +1090,14 @@ | |
| 1120 | 1090 | } |
| 1121 | 1091 | } |
| 1122 | 1092 | |
| 1093 | private int processMaxFrequencyHeader(CommandMessage command) | |
| 1094 | { | |
| 1095 | Object maxFrequencyHeader = command.getHeader(CommandMessage.MAX_FREQUENCY_HEADER); | |
| 1096 | if (maxFrequencyHeader != null) | |
| 1097 | return ((Integer)maxFrequencyHeader).intValue(); | |
| 1098 | return 0; | |
| 1099 | } | |
| 1100 | ||
| 1123 | 1101 | private void testProducerSubtopic(MessageDestination dest, String subtopicString) |
| 1124 | 1102 | { |
| 1125 | 1103 | if ((subtopicString != null) && (subtopicString.length() > 0)) |
| ... | ...@@ -24,6 +24,7 @@ | |
| 24 | 24 | import flex.management.runtime.messaging.log.LogManager; |
| 25 | 25 | import flex.messaging.client.FlexClient; |
| 26 | 26 | import flex.messaging.client.FlexClientManager; |
| 27 | import flex.messaging.client.FlexClientOutboundQueueProcessor; | |
| 27 | 28 | import flex.messaging.cluster.ClusterManager; |
| 28 | 29 | import flex.messaging.config.ChannelSettings; |
| 29 | 30 | import flex.messaging.config.ConfigMap; |
| ... | ...@@ -152,6 +153,9 @@ | |
| 152 | 153 | /** @exclude */ |
| 153 | 154 | private FlexClientManager flexClientManager; |
| 154 | 155 | |
| 156 | // The default FlexClientOutbuondQueueProcessor class. | |
| 157 | private Class flexClientOutboundQueueProcessorClass = FlexClientOutboundQueueProcessor.class; | |
| 158 | ||
| 155 | 159 | /** @exclude */ |
| 156 | 160 | private LoginManager loginManager; |
| 157 | 161 | |
| ... | ...@@ -1129,6 +1133,18 @@ | |
| 1129 | 1133 | flexClientManager = value; |
| 1130 | 1134 | } |
| 1131 | 1135 | |
| 1136 | /** @exclude */ | |
| 1137 | public Class getFlexClientOutboundQueueProcessorClass() | |
| 1138 | { | |
| 1139 | return flexClientOutboundQueueProcessorClass; | |
| 1140 | } | |
| 1141 | ||
| 1142 | /** @exclude */ | |
| 1143 | public void setFlexClientOutboundQueueProcessorClass(Class flexClientOutboundQueueProcessorClass) | |
| 1144 | { | |
| 1145 | this.flexClientOutboundQueueProcessorClass = flexClientOutboundQueueProcessorClass; | |
| 1146 | } | |
| 1147 | ||
| 1132 | 1148 | /** @exclude **/ |
| 1133 | 1149 | public RedeployManager getRedeployManager() |
| 1134 | 1150 | { |
| ... | ...@@ -187,6 +187,12 @@ | |
| 187 | 187 | public static final String CREDENTIALS_CHARSET_HEADER = "DSCredentialsCharset"; |
| 188 | 188 | |
| 189 | 189 | /** |
| 190 | * Header to indicate the maximum number of messages a Consumer wants to | |
| 191 | * receive per second. | |
| 192 | */ | |
| 193 | public static final String MAX_FREQUENCY_HEADER = "DSMaxFrequency"; | |
| 194 | ||
| 195 | /** | |
| 190 | 196 | * @exclude |
| 191 | 197 | * The position of the operation flag within all flags. |
| 192 | 198 | * Constant used during serialization. |
| ... | ...@@ -361,7 +361,27 @@ | |
| 361 | 361 | return null; |
| 362 | 362 | } |
| 363 | 363 | } |
| 364 | ||
| 364 | ||
| 365 | /** | |
| 366 | * @exclude | |
| 367 | * Returns the queue processor registered with the FlexClient with the supplied | |
| 368 | * endpoint id, or null if no queue processor was registered with the FlexClient | |
| 369 | * for that endpoint. | |
| 370 | * | |
| 371 | * @param endpointId The endpoint id. | |
| 372 | * @return The queue processor registered with the FlexClient. | |
| 373 | */ | |
| 374 | public FlexClientOutboundQueueProcessor getOutboundQueueProcessor(String endpointId) | |
| 375 | { | |
| 376 | synchronized (lock) | |
| 377 | { | |
| 378 | EndpointQueue queue = (EndpointQueue)outboundQueues.get(endpointId); | |
| 379 | if (queue != null) | |
| 380 | return queue.processor; | |
| 381 | return null; | |
| 382 | } | |
| 383 | } | |
| 384 | ||
| 365 | 385 | /** |
| 366 | 386 | * Override {@link flex.messaging.util.TimeoutAbstractObject#getLastUse()} to make timeout |
| 367 | 387 | * dependent upon FlexClient inactivity but also upon the presence of an active push-enabled session, |
| ... | ...@@ -589,7 +609,32 @@ | |
| 589 | 609 | } |
| 590 | 610 | return count; |
| 591 | 611 | } |
| 592 | ||
| 612 | ||
| 613 | /** | |
| 614 | * Returns the message client registered with the FlexClient with the supplied | |
| 615 | * client id, or null if no message client was registered with the FlexClient | |
| 616 | * with that client id. | |
| 617 | * | |
| 618 | * @param clientId The client id. | |
| 619 | * @return The message client registered with the FlexClient. | |
| 620 | */ | |
| 621 | public MessageClient getMessageClient(String clientId) | |
| 622 | { | |
| 623 | synchronized (lock) | |
| 624 | { | |
| 625 | if (messageClients != null && !messageClients.isEmpty()) | |
| 626 | { | |
| 627 | for (Iterator iter = messageClients.iterator(); iter.hasNext();) | |
| 628 | { | |
| 629 | MessageClient messageClient = (MessageClient)iter.next(); | |
| 630 | if (messageClient.getClientId().equals(clientId)) | |
| 631 | return messageClient; | |
| 632 | } | |
| 633 | } | |
| 634 | } | |
| 635 | return null; | |
| 636 | } | |
| 637 | ||
| 593 | 638 | /** |
| 594 | 639 | * Returns a snapshot of the MessageClients (subscriptions) associated with the FlexClient |
| 595 | 640 | * when this method is invoked. |
| ... | ...@@ -1119,7 +1164,7 @@ | |
| 1119 | 1164 | } |
| 1120 | 1165 | } |
| 1121 | 1166 | } |
| 1122 | } | |
| 1167 | } | |
| 1123 | 1168 | |
| 1124 | 1169 | /** |
| 1125 | 1170 | * Removes the attribute bound to the specified name for the FlexClient. |
| ... | ...@@ -1309,7 +1354,13 @@ | |
| 1309 | 1354 | { |
| 1310 | 1355 | // Decrement the ref count of MessageClients using this queue. |
| 1311 | 1356 | queue.messageClientRefCount--; |
| 1312 | ||
| 1357 | ||
| 1358 | // Unregister the message client from the outbound throttle | |
| 1359 | // manager (if one exists). | |
| 1360 | OutboundQueueThrottleManager tm = queue.processor.getOutboundQueueThrottleManager(); | |
| 1361 | if (tm != null) | |
| 1362 | tm.unregisterAllSubscriptions(messageClient.getDestinationId(), (String)messageClient.getClientId()); | |
| 1363 | ||
| 1313 | 1364 | // If we're not attempting to notify the remote client that this MessageClient has |
| 1314 | 1365 | // been invalidated, remove any associated messages from the queue. |
| 1315 | 1366 | if (!messageClient.isAttemptingInvalidationClientNotification()) |
| ... | ...@@ -1514,7 +1565,7 @@ | |
| 1514 | 1565 | } |
| 1515 | 1566 | return newQueue; |
| 1516 | 1567 | } |
| 1517 | ||
| 1568 | ||
| 1518 | 1569 | /** |
| 1519 | 1570 | * Utility method to flush the outbound queue and log any problems. |
| 1520 | 1571 | * Any exceptions are logged and then rethrown. |
| ... | ...@@ -1655,7 +1706,7 @@ | |
| 1655 | 1706 | } |
| 1656 | 1707 | } |
| 1657 | 1708 | } |
| 1658 | ||
| 1709 | ||
| 1659 | 1710 | /** |
| 1660 | 1711 | * Utility method used to shutdown endpoint queues accessed via polling channels |
| 1661 | 1712 | * that have no more active subscriptions and no more pending outbound messages. |
| ... | ...@@ -84,18 +84,34 @@ | |
| 84 | 84 | } |
| 85 | 85 | |
| 86 | 86 | /** |
| 87 | * Returns whether client throttling is enabled. | |
| 87 | * Returns true if inbound or outbound client throttling is enabled. | |
| 88 | 88 | * |
| 89 | * @return <code>true</code> if the incoming client frequency or outgoing | |
| 90 | * client frequency is greater than zero; otherwise <code>false</code>. | |
| 89 | * @return True if the incoming client frequency or outgoing | |
| 90 | * client frequency is greater than zero; otherwise false. | |
| 91 | 91 | */ |
| 92 | 92 | public boolean isClientThrottleEnabled() |
| 93 | 93 | { |
| 94 | if (getIncomingClientFrequency() > 0 || getOutgoingClientFrequency() > 0) | |
| 95 | { | |
| 96 | return true; | |
| 97 | } | |
| 98 | return false; | |
| 94 | return isInboundClientThrottleEnabled() || isOutboundClientThrottleEnabled(); | |
| 95 | } | |
| 96 | ||
| 97 | /** | |
| 98 | * Returns true if inbound client throttling is enabled. | |
| 99 | * | |
| 100 | * @return True if the inbound client throttling is enabled. | |
| 101 | */ | |
| 102 | public boolean isInboundClientThrottleEnabled() | |
| 103 | { | |
| 104 | return getIncomingClientFrequency() > 0; | |
| 105 | } | |
| 106 | ||
| 107 | /** | |
| 108 | * Returns true if outbound client throttling is enabled. | |
| 109 | * | |
| 110 | * @return True if the outbound client throttling is enabled. | |
| 111 | */ | |
| 112 | public boolean isOutboundClientThrottleEnabled() | |
| 113 | { | |
| 114 | return getOutgoingClientFrequency() > 0; | |
| 99 | 115 | } |
| 100 | 116 | |
| 101 | 117 | /** |
| ... | ...@@ -106,11 +122,7 @@ | |
| 106 | 122 | */ |
| 107 | 123 | public boolean isDestinationThrottleEnabled() |
| 108 | 124 | { |
| 109 | if (getIncomingDestinationFrequency() > 0 || getOutgoingDestinationFrequency() > 0) | |
| 110 | { | |
| 111 | return true; | |
| 112 | } | |
| 113 | return false; | |
| 125 | return getIncomingDestinationFrequency() > 0 || getOutgoingDestinationFrequency() > 0; | |
| 114 | 126 | } |
| 115 | 127 | |
| 116 | 128 | /** |
| ... | ...@@ -153,7 +165,9 @@ | |
| 153 | 165 | if (outPolicy != Policy.NONE && outPolicy != Policy.IGNORE) |
| 154 | 166 | { |
| 155 | 167 | ConfigurationException ex = new ConfigurationException(); |
| 156 | ex.setMessage("Invalid outbound throttle policy '" + outPolicy + "'. Valid values are 'NONE' or 'IGNORE'"); | |
| 168 | ex.setMessage("Invalid outbound throttle policy '" + outPolicy | |
| 169 | + "' for destination '" + destinationName | |
| 170 | + "'. Valid values are 'NONE' or 'IGNORE'."); | |
| 157 | 171 | throw ex; |
| 158 | 172 | } |
| 159 | 173 | this.outPolicy = outPolicy; |
| ... | ...@@ -199,12 +213,15 @@ | |
| 199 | 213 | */ |
| 200 | 214 | public void setIncomingClientFrequency(int n) |
| 201 | 215 | { |
| 216 | String name = "incoming client frequency"; | |
| 217 | blockNegative(n, name); | |
| 218 | ||
| 202 | 219 | if (inDestinationMessagesPerSec > 0 && n > inDestinationMessagesPerSec) |
| 203 | 220 | { |
| 204 | 221 | ConfigurationException ex = new ConfigurationException(); |
| 205 | ex.setMessage("The incoming client frequency '" + n | |
| 206 | + "' cannot be more than the incoming destination frequency '" | |
| 207 | + inDestinationMessagesPerSec + "'"); | |
| 222 | ex.setMessage("Invalid " + name + " for destination '" + destinationName + "'. " | |
| 223 | + name + " '" + n + "' cannot be more than the incoming destination frequency '" | |
| 224 | + inDestinationMessagesPerSec + "'."); | |
| 208 | 225 | throw ex; |
| 209 | 226 | } |
| 210 | 227 | this.inClientMessagesPerSec = n; |
| ... | ...@@ -229,12 +246,15 @@ | |
| 229 | 246 | */ |
| 230 | 247 | public void setIncomingDestinationFrequency(int n) |
| 231 | 248 | { |
| 232 | if (n < inClientMessagesPerSec) | |
| 249 | String name = "The incoming destination frequency"; | |
| 250 | blockNegative(n, name); | |
| 251 | ||
| 252 | if (inClientMessagesPerSec > 0 && n < inClientMessagesPerSec) | |
| 233 | 253 | { |
| 234 | 254 | ConfigurationException ex = new ConfigurationException(); |