diff --git a/src/java/org/apache/cassandra/net/Header.java b/src/java/org/apache/cassandra/net/Header.java index f07a7f6..a8a0666 100644 --- a/src/java/org/apache/cassandra/net/Header.java +++ b/src/java/org/apache/cassandra/net/Header.java @@ -50,6 +50,7 @@ public class Header // and RowMutationVerbHandler.forwardToLocalNodes) private final InetAddress from_; private final StorageService.Verb verb_; + private final long creationTime_; protected final Map details_; Header(InetAddress from, StorageService.Verb verb) @@ -59,11 +60,17 @@ public class Header Header(InetAddress from, StorageService.Verb verb, Map details) { + this(from, verb, details, System.currentTimeMillis()); + } + + Header(InetAddress from, StorageService.Verb verb, Map details, long creationTime) + { assert from != null; assert verb != null; from_ = from; verb_ = verb; + creationTime_ = creationTime; details_ = ImmutableMap.copyOf(details); } @@ -82,6 +89,11 @@ public class Header return details_.get(key); } + long getCreationTime() + { + return creationTime_; + } + Header withDetailsAdded(String key, byte[] value) { Map detailsCopy = Maps.newHashMap(details_); @@ -104,6 +116,7 @@ public class Header size += CompactEndpointSerializationHelper.serializedSize(getFrom()); size += 4; size += 4; + size += Long.SIZE; for (String key : details_.keySet()) { size += 2 + FBUtilities.encodedUTF8Length(key); @@ -121,6 +134,7 @@ class HeaderSerializer implements IVersionedSerializer
CompactEndpointSerializationHelper.serialize(t.getFrom(), dos); dos.writeInt(t.getVerb().ordinal()); dos.writeInt(t.details_.size()); + dos.writeLong(t.getCreationTime()); for (String key : t.details_.keySet()) { dos.writeUTF(key); @@ -135,6 +149,7 @@ class HeaderSerializer implements IVersionedSerializer
InetAddress from = CompactEndpointSerializationHelper.deserialize(dis); int verbOrdinal = dis.readInt(); int size = dis.readInt(); + long creationTime = dis.readLong(); Map details = new Hashtable(size); for ( int i = 0; i < size; ++i ) { @@ -144,7 +159,7 @@ class HeaderSerializer implements IVersionedSerializer
dis.readFully(bytes); details.put(key, bytes); } - return new Header(from, StorageService.VERBS[verbOrdinal], details); + return new Header(from, StorageService.VERBS[verbOrdinal], details, creationTime); } public long serializedSize(Header header, int version) diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index eb743d1..c42ee1f 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -75,6 +75,11 @@ public class Message return header_.getFrom(); } + public long getCreationTime() + { + return header_.getCreationTime(); + } + public Stage getMessageType() { return StorageService.verbStages.get(getVerb()); diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 2d393d6..e704d8c 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -55,6 +55,9 @@ import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.net.io.SerializerType; import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.service.IWriteResponseHandler; +import org.apache.cassandra.service.PBSTracker; +import org.apache.cassandra.service.ReadCallback; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.FileStreamTask; @@ -390,6 +393,12 @@ public final class MessagingService implements MessagingServiceMBean public String sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout) { String id = addCallback(cb, message, to, timeout); + + if(cb instanceof IWriteResponseHandler || cb instanceof ReadCallback) + { + PBSTracker.startOperation(id); + } + sendOneWay(message, id, to); return id; } diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 818f703..de3ce1d 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -21,6 +21,10 @@ package org.apache.cassandra.net; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.service.IWriteResponseHandler; +import org.apache.cassandra.service.PBSTracker; +import org.apache.cassandra.service.ReadCallback; + public class ResponseVerbHandler implements IVerbHandler { private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class ); @@ -42,6 +46,16 @@ public class ResponseVerbHandler implements IVerbHandler { if (logger_.isDebugEnabled()) logger_.debug("Processing response on a callback from " + id + "@" + message.getFrom()); + + if(cb instanceof IWriteResponseHandler) + { + PBSTracker.logWriteResponse(id, message); + } + else if(cb instanceof ReadCallback) + { + PBSTracker.logReadResponse(id, message); + } + ((IAsyncCallback) cb).response(message); } else diff --git a/src/java/org/apache/cassandra/service/PBSTracker.java b/src/java/org/apache/cassandra/service/PBSTracker.java new file mode 100644 index 0000000..5acc62d --- /dev/null +++ b/src/java/org/apache/cassandra/service/PBSTracker.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.apache.cassandra.net.Message; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Vector; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +// Maintains latencies of WARS operations +// Using LatencyTracker would be nice but would lose order statistic properties +// Eventually, may be used to optimize N,R,W configurations online + +public class PBSTracker implements PBSTrackerMBean +{ + public static final String MBEAN_NAME = "org.apache.cassandra.service:type=PBSTracker"; + + static + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(new PBSTracker(), new ObjectName(MBEAN_NAME)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + // todo: make config option instead of static compile + private static final long MAX_ENTRIES = 10000; + + private static final Map messageIdToStartTimes = new ConcurrentHashMap(); + + // used for LRU replacement + private static final Queue messageIds = new ConcurrentLinkedQueue(); + + // We keep each message ID's response times separate for use in more complex latency models, where we + // explicitly consider the response time of the ith response, instead of assuming the latencies are + // independently, identically distributed (IID). + + private static final Map> messageIdToWLatencies = + new ConcurrentHashMap>(); + private static final Map> messageIdToALatencies = + new ConcurrentHashMap>(); + private static final Map> messageIdToRLatencies = + new ConcurrentHashMap>(); + private static final Map> messageIdToSLatencies = + new ConcurrentHashMap>(); + + public static void startOperation(String id) + { + assert(!messageIdToStartTimes.containsKey(id)); + + messageIdToStartTimes.put(id, System.currentTimeMillis()); + + messageIds.add(id); + + // LRU replacement of latencies + // the maximum number of entries is sloppy, but that's acceptable for our purposes + if(messageIds.size() > MAX_ENTRIES) + { + String toEvict = messageIds.remove(); + messageIdToStartTimes.remove(toEvict); + messageIdToWLatencies.remove(toEvict); + messageIdToALatencies.remove(toEvict); + messageIdToRLatencies.remove(toEvict); + messageIdToSLatencies.remove(toEvict); + } + + messageIdToWLatencies.put(id, new ConcurrentLinkedQueue()); + messageIdToALatencies.put(id, new ConcurrentLinkedQueue()); + messageIdToRLatencies.put(id, new ConcurrentLinkedQueue()); + messageIdToSLatencies.put(id, new ConcurrentLinkedQueue()); + } + + public static void logWriteResponse(String id, Message response) + { + // may be evicted; unlikely, but need to check + if(!messageIdToStartTimes.containsKey(id)) + { + return; + } + + long time = System.currentTimeMillis(); + messageIdToWLatencies.get(id).add(response.getCreationTime()-messageIdToStartTimes.get(id)); + messageIdToALatencies.get(id).add(time-response.getCreationTime()); + } + + public static void logReadResponse(String id, Message response) + { + // may be evicted; unlikely, but need to check + if(!messageIdToStartTimes.containsKey(id)) + { + return; + } + + long time = System.currentTimeMillis(); + messageIdToRLatencies.get(id).add(response.getCreationTime()-messageIdToStartTimes.get(id)); + messageIdToSLatencies.get(id).add(time-response.getCreationTime()); + } + + // We support two modes of latency retrieval: + // Unordered latency retrieval means that all latencies are returned, whether they + // are the first or the last + // Ordered latency retrieval means that the latencies of the first to nth responses + // are returned, separated by key. + + public List getUnorderedWLatencies() + { + return getUnorderedLatencies(messageIdToWLatencies.values()); + } + + public Map> getOrderedWLatencies() + { + return getOrderedLatencies(messageIdToWLatencies.values()); + } + + public List getUnorderedALatencies() + { + return getUnorderedLatencies(messageIdToALatencies.values()); + } + + public Map> getOrderedALatencies() + { + return getOrderedLatencies(messageIdToALatencies.values()); + } + + public List getUnorderedRLatencies() + { + return getUnorderedLatencies(messageIdToRLatencies.values()); + } + + public Map> getOrderedRLatencies() + { + return getOrderedLatencies(messageIdToRLatencies.values()); + } + + public List getUnorderedSLatencies() + { + return getUnorderedLatencies(messageIdToSLatencies.values()); + } + + public Map> getOrderedSLatencies() + { + return getOrderedLatencies(messageIdToSLatencies.values()); + } + + static List getUnorderedLatencies(Collection> latencyLists) + { + Vector ret = new Vector(); + + for(List subReturn : getOrderedLatencies(latencyLists).values()) + { + ret.addAll(subReturn); + } + + return ret; + } + + static Map> getOrderedLatencies(Collection> latencyLists) + { + Map> ret = new HashMap>(); + + // N may vary + int maxResponses = 0; + + for(Collection latencies : latencyLists) + { + List sortedLatencies = new ArrayList(latencies); + Collections.sort(sortedLatencies); + + if(sortedLatencies.size() > maxResponses) + { + for(int i = maxResponses+1; i <= sortedLatencies.size(); ++i) + { + ret.put(i, new Vector()); + } + + maxResponses = sortedLatencies.size(); + } + + // indexing by 0 is awkward since we're talking about the ith response + for(int i = 1; i <= sortedLatencies.size(); ++i) + { + ret.get(i).add(sortedLatencies.get(i-1)); + } + } + + return ret; + } +} diff --git a/src/java/org/apache/cassandra/service/PBSTrackerMBean.java b/src/java/org/apache/cassandra/service/PBSTrackerMBean.java new file mode 100644 index 0000000..78db95b --- /dev/null +++ b/src/java/org/apache/cassandra/service/PBSTrackerMBean.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.util.List; +import java.util.Map; + +public interface PBSTrackerMBean +{ + public List getUnorderedWLatencies(); + public Map> getOrderedWLatencies(); + + public List getUnorderedALatencies(); + public Map> getOrderedALatencies(); + + public List getUnorderedRLatencies(); + public Map> getOrderedRLatencies(); + + public List getUnorderedSLatencies(); + public Map> getOrderedSLatencies(); +} diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index e9a3d90..4dbc4d9 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -32,6 +32,7 @@ import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import org.apache.cassandra.service.CacheServiceMBean; +import org.apache.cassandra.service.PBSTrackerMBean; import org.apache.commons.cli.*; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; @@ -116,6 +117,8 @@ public class NodeCmd UPGRADESSTABLES, VERSION, DESCRIBERING, + PBSWARS, + PBSWARSUNORDERED } @@ -145,6 +148,8 @@ public class NodeCmd addCmdHelp(header, "gossipinfo", "Shows the gossip information for the cluster"); addCmdHelp(header, "invalidatekeycache", "Invalidate the key cache"); addCmdHelp(header, "invalidaterowcache", "Invalidate the row cache"); + addCmdHelp(header, "pbswars", "Print out (ordered) WARS latency statistics."); + addCmdHelp(header, "pbswarsunordered", "Print out (unordered) WARS latency statistics."); // One arg addCmdHelp(header, "netstats [host]", "Print network information on provided host (connecting node by default)"); @@ -584,6 +589,52 @@ public class NodeCmd outs.println(probe.isThriftServerRunning() ? "running" : "not running"); } + private void printSpecificLatencyOrdered(String which, Map> latencies) + { + List replicaNumbers = new ArrayList(latencies.keySet()); + Collections.sort(replicaNumbers); + + System.out.println(which); + for(int replicaNo : replicaNumbers) + { + System.out.println("replica number "+replicaNo); + for(long latency : latencies.get(replicaNo)) + { + System.out.println(latency); + } + } + } + + public void printWARSOrdered() + { + PBSTrackerMBean pbsTracker = probe.getPBSTrackerMBean(); + + printSpecificLatencyOrdered("w latencies", pbsTracker.getOrderedWLatencies()); + printSpecificLatencyOrdered("a latencies", pbsTracker.getOrderedALatencies()); + printSpecificLatencyOrdered("r latencies", pbsTracker.getOrderedRLatencies()); + printSpecificLatencyOrdered("s latencies", pbsTracker.getOrderedSLatencies()); + } + + private void printSpecificLatencyUnordered(String which, List latencies) + { + System.out.println(which); + + for(long latency : latencies) + { + System.out.println(latency); + } + } + + public void printWARSUnordered() + { + PBSTrackerMBean pbsTracker = probe.getPBSTrackerMBean(); + + printSpecificLatencyUnordered("w latencies", pbsTracker.getUnorderedWLatencies()); + printSpecificLatencyUnordered("a latencies", pbsTracker.getUnorderedALatencies()); + printSpecificLatencyUnordered("r latencies", pbsTracker.getUnorderedRLatencies()); + printSpecificLatencyUnordered("s latencies", pbsTracker.getUnorderedSLatencies()); + } + public static void main(String[] args) throws IOException, InterruptedException, ConfigurationException, ParseException { CommandLineParser parser = new PosixParser(); @@ -772,6 +823,14 @@ public class NodeCmd nodeCmd.printDescribeRing(arguments[0], System.out); break; + case PBSWARS: + nodeCmd.printWARSOrdered(); + break; + + case PBSWARSUNORDERED: + nodeCmd.printWARSUnordered(); + break; + default : throw new RuntimeException("Unreachable code."); } diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index a982b01..ce0fd07 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -53,6 +53,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingServiceMBean; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.CacheServiceMBean; +import org.apache.cassandra.service.PBSTracker; +import org.apache.cassandra.service.PBSTrackerMBean; import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.streaming.StreamingService; import org.apache.cassandra.streaming.StreamingServiceMBean; @@ -82,6 +84,7 @@ public class NodeProbe public MessagingServiceMBean msProxy; private FailureDetectorMBean fdProxy; private CacheServiceMBean cacheService; + private PBSTrackerMBean pbsTracker; /** * Creates a NodeProbe using the specified JMX host, port, username, and password. @@ -150,6 +153,8 @@ public class NodeProbe { ObjectName name = new ObjectName(ssObjName); ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); + name = new ObjectName(PBSTracker.MBEAN_NAME); + pbsTracker = JMX.newMBeanProxy(mbeanServerConn, name, PBSTrackerMBean.class); name = new ObjectName(MessagingService.MBEAN_NAME); msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class); name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME); @@ -638,6 +643,11 @@ public class NodeProbe { return ssProxy.describeRingJMX(keyspaceName); } + + public PBSTrackerMBean getPBSTrackerMBean() + { + return pbsTracker; + } } class ColumnFamilyStoreMBeanIterator implements Iterator>