Left: | ||
Right: |
OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Licensed to the Apache Software Foundation (ASF) under one | 2 * Licensed to the Apache Software Foundation (ASF) under one |
3 * or more contributor license agreements. See the NOTICE file | 3 * or more contributor license agreements. See the NOTICE file |
4 * distributed with this work for additional information | 4 * distributed with this work for additional information |
5 * regarding copyright ownership. The ASF licenses this file | 5 * regarding copyright ownership. The ASF licenses this file |
6 * to you under the Apache License, Version 2.0 (the | 6 * to you under the Apache License, Version 2.0 (the |
7 * "License"); you may not use this file except in compliance | 7 * "License"); you may not use this file except in compliance |
8 * with the License. You may obtain a copy of the License at | 8 * with the License. You may obtain a copy of the License at |
9 * | 9 * |
10 * http://www.apache.org/licenses/LICENSE-2.0 | 10 * http://www.apache.org/licenses/LICENSE-2.0 |
11 * | 11 * |
12 * Unless required by applicable law or agreed to in writing, software | 12 * Unless required by applicable law or agreed to in writing, software |
13 * distributed under the License is distributed on an "AS IS" BASIS, | 13 * distributed under the License is distributed on an "AS IS" BASIS, |
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 * See the License for the specific language governing permissions and | 15 * See the License for the specific language governing permissions and |
16 * limitations under the License. | 16 * limitations under the License. |
17 */ | 17 */ |
18 package org.apache.cassandra.service; | 18 package org.apache.cassandra.service; |
19 | 19 |
20 import java.io.BufferedReader; | |
21 import java.io.IOException; | |
22 import java.io.InputStreamReader; | |
20 import java.net.InetAddress; | 23 import java.net.InetAddress; |
21 import java.util.Collection; | 24 import java.util.Collection; |
22 import java.util.List; | 25 import java.util.List; |
23 import java.util.concurrent.ThreadLocalRandom; | 26 import java.util.concurrent.ThreadLocalRandom; |
24 import java.util.concurrent.TimeUnit; | 27 import java.util.concurrent.TimeUnit; |
28 import java.util.concurrent.atomic.AtomicLong; | |
25 | 29 |
26 import com.google.common.collect.Iterables; | 30 import com.google.common.collect.Iterables; |
27 import org.slf4j.Logger; | 31 import org.slf4j.Logger; |
28 import org.slf4j.LoggerFactory; | 32 import org.slf4j.LoggerFactory; |
29 | 33 |
34 import com.codahale.metrics.Counter; | |
30 import org.apache.cassandra.concurrent.Stage; | 35 import org.apache.cassandra.concurrent.Stage; |
31 import org.apache.cassandra.concurrent.StageManager; | 36 import org.apache.cassandra.concurrent.StageManager; |
32 import org.apache.cassandra.db.ColumnFamilyStore; | 37 import org.apache.cassandra.db.ColumnFamilyStore; |
33 import org.apache.cassandra.db.ConsistencyLevel; | 38 import org.apache.cassandra.db.ConsistencyLevel; |
34 import org.apache.cassandra.db.ReadCommand; | 39 import org.apache.cassandra.db.ReadCommand; |
35 import org.apache.cassandra.db.SinglePartitionReadCommand; | 40 import org.apache.cassandra.db.SinglePartitionReadCommand; |
36 import org.apache.cassandra.db.Keyspace; | 41 import org.apache.cassandra.db.Keyspace; |
37 import org.apache.cassandra.db.partitions.PartitionIterator; | 42 import org.apache.cassandra.db.partitions.PartitionIterator; |
38 import org.apache.cassandra.exceptions.ReadFailureException; | 43 import org.apache.cassandra.exceptions.ReadFailureException; |
39 import org.apache.cassandra.exceptions.ReadTimeoutException; | 44 import org.apache.cassandra.exceptions.ReadTimeoutException; |
(...skipping 11 matching lines...) Expand all Loading... | |
51 * Sends a read request to the replicas needed to satisfy a given ConsistencyLev el. | 56 * Sends a read request to the replicas needed to satisfy a given ConsistencyLev el. |
52 * | 57 * |
53 * Optionally, may perform additional requests to provide redundancy against rep lica failure: | 58 * Optionally, may perform additional requests to provide redundancy against rep lica failure: |
54 * AlwaysSpeculatingReadExecutor will always send a request to one extra replica , while | 59 * AlwaysSpeculatingReadExecutor will always send a request to one extra replica , while |
55 * SpeculatingReadExecutor will wait until it looks like the original request is in danger | 60 * SpeculatingReadExecutor will wait until it looks like the original request is in danger |
56 * of timing out before performing extra reads. | 61 * of timing out before performing extra reads. |
57 */ | 62 */ |
58 public abstract class AbstractReadExecutor | 63 public abstract class AbstractReadExecutor |
59 { | 64 { |
60 private static final Logger logger = LoggerFactory.getLogger(AbstractReadExe cutor.class); | 65 private static final Logger logger = LoggerFactory.getLogger(AbstractReadExe cutor.class); |
61 | |
62 protected final ReadCommand command; | 66 protected final ReadCommand command; |
63 protected final List<InetAddress> targetReplicas; | 67 protected final List<InetAddress> targetReplicas; |
64 protected final ReadCallback handler; | 68 protected final ReadCallback handler; |
65 protected final TraceState traceState; | 69 protected final TraceState traceState; |
66 protected final ColumnFamilyStore cfs; | 70 protected final ColumnFamilyStore cfs; |
71 private final static AtomicLong lastSync = new AtomicLong(System.currentTime Millis()); | |
72 private final static AtomicLong numberOfRequests = new AtomicLong(0); | |
73 private final static int INTERVAL = 5000; | |
74 private final static String PREDICTOR_PATH = "/home/csd/projects/lx3/ml/pred ict.py"; | |
67 | 75 |
68 AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand c ommand, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, lon g queryStartNanoTime) | 76 AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand c ommand, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, lon g queryStartNanoTime) |
69 { | 77 { |
70 this.command = command; | 78 this.command = command; |
71 this.targetReplicas = targetReplicas; | 79 this.targetReplicas = targetReplicas; |
72 this.handler = new ReadCallback(new DigestResolver(keyspace, command, co nsistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplica s, queryStartNanoTime); | 80 this.handler = new ReadCallback(new DigestResolver(keyspace, command, co nsistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplica s, queryStartNanoTime); |
73 this.cfs = cfs; | 81 this.cfs = cfs; |
74 this.traceState = Tracing.instance.get(); | 82 this.traceState = Tracing.instance.get(); |
75 | 83 |
76 // Set the digest version (if we request some digests). This is the smal lest version amongst all our target replicas since new nodes | 84 // Set the digest version (if we request some digests). This is the smal lest version amongst all our target replicas since new nodes |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
112 MessageOut<ReadCommand> message = readCommand.createMessage(); | 120 MessageOut<ReadCommand> message = readCommand.createMessage(); |
113 MessagingService.instance().sendRRWithFailure(message, endpoint, han dler); | 121 MessagingService.instance().sendRRWithFailure(message, endpoint, han dler); |
114 } | 122 } |
115 | 123 |
116 // We delay the local (potentially blocking) read till the end to avoid stalling remote requests. | 124 // We delay the local (potentially blocking) read till the end to avoid stalling remote requests. |
117 if (hasLocalEndpoint) | 125 if (hasLocalEndpoint) |
118 { | 126 { |
119 logger.trace("reading {} locally", readCommand.isDigestQuery() ? "di gest" : "data"); | 127 logger.trace("reading {} locally", readCommand.isDigestQuery() ? "di gest" : "data"); |
120 StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalR eadRunnable(command, handler)); | 128 StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalR eadRunnable(command, handler)); |
121 } | 129 } |
130 numberOfRequests.incrementAndGet(); | |
122 } | 131 } |
123 | 132 |
124 /** | 133 /** |
125 * Perform additional requests if it looks like the original will time out. May block while it waits | 134 * Perform additional requests if it looks like the original will time out. May block while it waits |
126 * to see if the original requests are answered first. | 135 * to see if the original requests are answered first. |
127 */ | 136 */ |
128 public abstract void maybeTryAdditionalReplicas(); | 137 public abstract void maybeTryAdditionalReplicas(); |
129 | 138 |
130 /** | 139 /** |
131 * Get the replicas involved in the [finished] request. | 140 * Get the replicas involved in the [finished] request. |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
194 // Throw UAE early if we don't have enough replicas. | 203 // Throw UAE early if we don't have enough replicas. |
195 consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas); | 204 consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas); |
196 | 205 |
197 if (repairDecision != ReadRepairDecision.NONE) | 206 if (repairDecision != ReadRepairDecision.NONE) |
198 { | 207 { |
199 Tracing.trace("Read-repair {}", repairDecision); | 208 Tracing.trace("Read-repair {}", repairDecision); |
200 ReadRepairMetrics.attempted.mark(); | 209 ReadRepairMetrics.attempted.mark(); |
201 } | 210 } |
202 | 211 |
203 ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata() .id); | 212 ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata() .id); |
204 SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry; | 213 SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry; |
Jiangfeng
2017/12/07 19:28:04
I suggest you save the current percentile paramete
| |
205 | 214 |
215 synchronized (AbstractReadExecutor.class) { | |
216 AtomicLong currentTime = new AtomicLong(System.currentTimeMillis()); | |
217 logger.info("Current time: {}", currentTime); | |
218 logger.info("LastSync start: {}", lastSync.get()); | |
219 | |
220 if (lastSync.get() + INTERVAL <= currentTime.get()) { | |
221 float throughput = numberOfRequests.get() / ((currentTime.get() - lastSync.get()) / 1000); | |
222 logger.info("Current throughput: {} r/s", throughput); | |
223 logger.info(String.format("python %s %f", PREDICTOR_PATH, throug hput)); | |
224 Process p; | |
225 String output = ""; | |
226 try { | |
227 p = Runtime.getRuntime().exec(String.format("python %s %f", PREDICTOR_PATH, throughput)); | |
228 p.waitFor(); | |
229 BufferedReader reader = new BufferedReader(new InputStreamRe ader(p.getInputStream())); | |
230 output = reader.readLine(); | |
231 | |
232 } catch (Exception e) { | |
233 e.printStackTrace(); | |
234 } | |
235 logger.info(String.format("New method is %f", Float.parseFloat(o utput))); | |
236 retry = SpeculativeRetryParam.percentile(Float.parseFloat(output )); | |
Jiangfeng
2017/12/07 19:28:04
If retry is set here, then, the readexecutor of th
| |
237 lastSync.set(System.currentTimeMillis()); | |
238 numberOfRequests.set(0); | |
239 } | |
240 } | |
241 | |
242 | |
206 // Speculative retry is disabled *OR* | 243 // Speculative retry is disabled *OR* |
207 // 11980: Disable speculative retry if using EACH_QUORUM in order to pre vent miscounting DC responses | 244 // 11980: Disable speculative retry if using EACH_QUORUM in order to pre vent miscounting DC responses |
208 if (retry.equals(SpeculativeRetryParam.NONE) | 245 if (retry.equals(SpeculativeRetryParam.NONE) |
209 | consistencyLevel == ConsistencyLevel.EACH_QUORUM) | 246 | consistencyLevel == ConsistencyLevel.EACH_QUORUM) |
210 return new NeverSpeculatingReadExecutor(keyspace, cfs, command, cons istencyLevel, targetReplicas, queryStartNanoTime, false); | 247 return new NeverSpeculatingReadExecutor(keyspace, cfs, command, cons istencyLevel, targetReplicas, queryStartNanoTime, false); |
211 | 248 |
212 // There are simply no extra replicas to speculate. | 249 // There are simply no extra replicas to speculate. |
213 // Handle this separately so it can log failed attempts to speculate due to lack of replicas | 250 // Handle this separately so it can log failed attempts to speculate due to lack of replicas |
214 if (consistencyLevel.blockFor(keyspace) == allReplicas.size()) | 251 if (consistencyLevel.blockFor(keyspace) == allReplicas.size()) |
215 return new NeverSpeculatingReadExecutor(keyspace, cfs, command, cons istencyLevel, targetReplicas, queryStartNanoTime, true); | 252 return new NeverSpeculatingReadExecutor(keyspace, cfs, command, cons istencyLevel, targetReplicas, queryStartNanoTime, true); |
(...skipping 187 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
403 cfs.metric.speculativeRetries.inc(); | 440 cfs.metric.speculativeRetries.inc(); |
404 } | 441 } |
405 | 442 |
406 @Override | 443 @Override |
407 void onReadTimeout() | 444 void onReadTimeout() |
408 { | 445 { |
409 cfs.metric.speculativeFailedRetries.inc(); | 446 cfs.metric.speculativeFailedRetries.inc(); |
410 } | 447 } |
411 } | 448 } |
412 } | 449 } |
OLD | NEW |