Thursday, July 16, 2015

How to handle ditributed counter across cluster when each node contribute to counter - Distributed throttling

Handle throttling in distributed environment is bit tricky task. For this we need to maintain time window and counters per instance and also those counters should be shared across cluster as well. Recently i worked on similar issue and i will share my thoughts about this problem.

Lets say we have 5 nodes. Then each node will serve x number of requests within minutes. So across cluster we can server 5X requests per minutes. And some cases node1 may server 2x while other servers 1x. But still we need to have 5x across cluster. To address this issue we need shared counter across cluster. So each and every node can contribute to that and maintain counters.

To implement something like that we may use following approach.

We can maintain two Hazelcast IAtomicLong data structures or similar distributed counter as follow. This should be handle in cluster level.
And node do not have to do anything about replication.

  • Shared Counter : This will maintain global request count across the cluster
  • Shared Timestamp : This will be used for manage time window across the cluster for particular throttling period

In each and every instance we should maintain following per each counter object.
  • Local global counter which sync up with shared counter in replication task(Local global counter = shared counter + Local counter )
  • Local counter which holds request counts until replication task run.(after replication Local counter = 0)

We may use replication task that will run periodically.
During the replication task following tasks will be happen.
Update the shared counter with node local counter and then update local global counter with the shared counter.
If global counter set to zero, it will reset the global counter.


In addition we need to set the current time into the hazelcast Atomic Long .When other servers get the first request it sets it first access time as the value in hazelcast according to the caller context ID.So all the servers will set into the one first access time. We check the throttle time will become to the time come from hazelcast and unit Time according to tier.
To check time window is elapsed so if this happen We set previous callercontext globalcount to 0.
As assumption we made was all nodes in cluster are having the same timestamp.




See following diagrams.






If you need to use throttle core for your application/component run in WSO2 runtime you can import throttle core to your project and use following code to check access availability.


Here i have listed code to throttle message using handler. So you can write your own handler and call doThrottle method in message flow. First you need to import org.wso2.carbon.throttle.core to your project.


       private boolean doThrottle(MessageContext messageContext) {
            boolean canAccess = true;
            boolean isResponse = messageContext.isResponse();
            org.apache.axis2.context.MessageContext axis2MC = ((Axis2MessageContext) messageContext).
                    getAxis2MessageContext();
            ConfigurationContext cc = axis2MC.getConfigurationContext();
            synchronized (this) {

                if (!isResponse) {
                    initThrottle(messageContext, cc);
                }
            }         // if the access is success through concurrency throttle and if this is a request message
            // then do access rate based throttling
            if (!isResponse && throttle != null) {
                AuthenticationContext authContext = APISecurityUtils.getAuthenticationContext(messageContext);
                String tier;             if (authContext != null) {
                    AccessInformation info = null;
                    try {

                        String ipBasedKey = (String) ((TreeMap) axis2MC.
                                getProperty("TRANSPORT_HEADERS")).get("X-Forwarded-For");
                        if (ipBasedKey == null) {
                            ipBasedKey = (String) axis2MC.getProperty("REMOTE_ADDR");
                        }
                        tier = authContext.getApplicationTier();
                        ThrottleContext apiThrottleContext =
                                ApplicationThrottleController.
                                        getApplicationThrottleContext(messageContext, cc, tier);
                        //    if (isClusteringEnable) {
                        //      applicationThrottleContext.setConfigurationContext(cc);
                        apiThrottleContext.setThrottleId(id);
                        info = applicationRoleBasedAccessController.canAccess(apiThrottleContext,
                                                                              ipBasedKey, tier);
                        canAccess = info.isAccessAllowed();
                    } catch (ThrottleException e) {
                        handleException("Error while trying evaluate IPBased throttling policy", e);
                    }
                }
            }         if (!canAccess) {
                handleThrottleOut(messageContext);
                return false;
            }

            return canAccess;
        }    


    private void initThrottle(MessageContext synCtx, ConfigurationContext cc) {
            if (policyKey == null) {
                throw new SynapseException("Throttle policy unspecified for the API");
            }         Entry entry = synCtx.getConfiguration().getEntryDefinition(policyKey);
            if (entry == null) {
                handleException("Cannot find throttling policy using key: " + policyKey);
                return;
            }
            Object entryValue = null;
            boolean reCreate = false;         if (entry.isDynamic()) {
                if ((!entry.isCached()) || (entry.isExpired()) || throttle == null) {
                    entryValue = synCtx.getEntry(this.policyKey);
                    if (this.version != entry.getVersion()) {
                        reCreate = true;
                    }
                }
            } else if (this.throttle == null) {
                entryValue = synCtx.getEntry(this.policyKey);
            }         if (reCreate || throttle == null) {
                if (entryValue == null || !(entryValue instanceof OMElement)) {
                    handleException("Unable to load throttling policy using key: " + policyKey);
                    return;
                }
                version = entry.getVersion();             try {
                    // Creates the throttle from the policy
                    throttle = ThrottleFactory.createMediatorThrottle(
                            PolicyEngine.getPolicy((OMElement) entryValue));

                } catch (ThrottleException e) {
                    handleException("Error processing the throttling policy", e);
                }
            }
        }



No comments:

Post a Comment