Skip to content

Commit

Permalink
Merge branch '2.1' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ddanielr committed Jan 8, 2025
2 parents 19a4ebc + 8124a6b commit 9c10eca
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public class ZooInfoViewer implements KeywordExecutable {
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
private static final Logger log = LoggerFactory.getLogger(ZooInfoViewer.class);

private final NullWatcher nullWatcher =
new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));
private NullWatcher nullWatcher;

private static final String INDENT = " ";

Expand All @@ -105,6 +104,7 @@ public String description() {

@Override
public void execute(String[] args) throws Exception {
nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));

ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts();
opts.parseArgs(ZooInfoViewer.class.getName(), args);
Expand All @@ -115,11 +115,15 @@ public void execute(String[] args) throws Exception {

var conf = opts.getSiteConfiguration();

try (ServerContext context = new ServerContext(conf)) {
try (ServerContext context = getContext(opts)) {
generateReport(context, opts);
}
}

ServerContext getContext(ZooInfoViewer.Opts opts) {
return new ServerContext(opts.getSiteConfiguration());
}

void generateReport(final ServerContext context, final ZooInfoViewer.Opts opts) throws Exception {

OutputStream outStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@
public class ZooPropEditor implements KeywordExecutable {

private static final Logger LOG = LoggerFactory.getLogger(ZooPropEditor.class);
private final NullWatcher nullWatcher =
new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));
private NullWatcher nullWatcher;

/**
* No-op constructor - provided so ServiceLoader autoload does not consume resources.
Expand All @@ -79,6 +78,8 @@ public String description() {

@Override
public void execute(String[] args) throws Exception {
nullWatcher = new NullWatcher(new ReadyMonitor(ZooInfoViewer.class.getSimpleName(), 20_000L));

ZooPropEditor.Opts opts = new ZooPropEditor.Opts();
opts.parseArgs(ZooPropEditor.class.getName(), args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.MockServerContext;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
import org.apache.accumulo.server.conf.store.NamespacePropKey;
Expand Down Expand Up @@ -139,22 +140,30 @@ public void instanceIdOutputTest() throws Exception {
var context = MockServerContext.getWithZK(InstanceId.of(uuid), "fakeHost", 2000);
ZooReader zooReader = createMock(ZooReader.class);
expect(context.getZooReader()).andReturn(zooReader).anyTimes();

var instanceName = "test";
expect(zooReader.getChildren(eq(ZROOT + ZINSTANCES))).andReturn(List.of(instanceName)).once();
expect(zooReader.getData(eq(ZROOT + ZINSTANCES + "/" + instanceName)))
.andReturn(uuid.getBytes(UTF_8)).once();
replay(context, zooReader);

String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt";

ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts();
opts.parseArgs(ZooInfoViewer.class.getName(),
new String[] {"--print-instances", "--outfile", testFileName});
expect(context.getZooReader()).andReturn(zooReader).once();
context.close();

ZooInfoViewer viewer = new ZooInfoViewer();
viewer.generateReport(context, opts);
replay(context, zooReader);

verify(context, zooReader);
class ZooInfoViewerTestClazz extends ZooInfoViewer {
@Override
ServerContext getContext(ZooInfoViewer.Opts ots) {
return context;
}
}

ZooInfoViewer viewer = new ZooInfoViewerTestClazz();
viewer.execute(new String[] {"--print-instances", "--outfile", testFileName});

verify(zooReader, context);

String line;
try (Scanner scanner = new Scanner(new File(testFileName))) {
Expand Down Expand Up @@ -215,6 +224,13 @@ public void instanceNameOutputTest() throws Exception {
public void propTest() throws Exception {
String uuid = UUID.randomUUID().toString();
InstanceId iid = InstanceId.of(uuid);
// ZooReaderWriter zrw = createMock(ZooReaderWriter.class);
// expect(zrw.getSessionTimeout()).andReturn(2_000).anyTimes();
// expect(zrw.exists(eq("/accumulo/" + iid), anyObject())).andReturn(true).anyTimes();
// replay(zrw);

// ServerContext context = MockServerContext.getMockContextWithPropStore(iid, zrw, propStore);

var context = MockServerContext.getWithZK(iid, "fakeHost", 2000);
ZooReader zooReader = createMock(ZooReader.class);
expect(context.getZooReader()).andReturn(zooReader).anyTimes();
Expand Down Expand Up @@ -285,15 +301,21 @@ public void propTest() throws Exception {
log.trace("namespace base path: {}", nsKey.getPath());

String testFileName = "./target/zoo-info-viewer-" + System.currentTimeMillis() + ".txt";
context.close();

ZooInfoViewer.Opts opts = new ZooInfoViewer.Opts();
opts.parseArgs(ZooInfoViewer.class.getName(),
new String[] {"--print-props", "--outfile", testFileName});
replay(context);

ZooInfoViewer viewer = new ZooInfoViewer();
viewer.generateReport(context, opts);
class ZooInfoViewerTestClazz extends ZooInfoViewer {
@Override
ServerContext getContext(ZooInfoViewer.Opts ots) {
return context;
}
}

verify(context, zooReader);
ZooInfoViewer viewer = new ZooInfoViewerTestClazz();
viewer.execute(new String[] {"--print-props", "--outfile", testFileName});

verify(zooReader, context);

Map<String,String> props = new HashMap<>();
try (Scanner scanner = new Scanner(new File(testFileName))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,21 @@ public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdura

UpdateSession us =
new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials),
credentials, durability);
credentials, durability) {
@Override
public boolean cleanup() {
// This is called when a client abandons a session. When this happens need to decrement
// any queued mutations.
if (queuedMutationSize > 0) {
log.trace(
"cleaning up abandoned update session, decrementing totalQueuedMutationSize by {}",
queuedMutationSize);
server.updateTotalQueuedMutationSize(-queuedMutationSize);
queuedMutationSize = 0;
}
return true;
}
};
return server.sessionManager.createSession(us, false);
}

Expand All @@ -223,8 +237,8 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
return;
}
if (us.currentTablet == null
&& (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
if (us.currentTablet == null && (us.failures.containsKey(keyExtent)
|| us.authFailures.containsKey(keyExtent) || us.unhandledException != null)) {
// if there were previous failures, then do not accept additional writes
return;
}
Expand Down Expand Up @@ -293,6 +307,11 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
Mutation mutation = new ServerMutation(tmutation);
// Deserialize the mutation in an attempt to check for data corruption that happened on
// the network. This will avoid writing a corrupt mutation to the write ahead log and
// failing after its written to the write ahead log when it is deserialized to update the
// in memory map.
mutation.getUpdates();
mutations.add(mutation);
additionalMutationSize += mutation.numBytes();
}
Expand All @@ -312,6 +331,15 @@ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
}
}
}
} catch (RuntimeException e) {
// This method is a thrift oneway method so an exception from it will not make it back to the
// client. Need to record the exception and set the session such that any future updates to
// the session are ignored.
us.unhandledException = e;
us.currentTablet = null;

// Rethrowing it will cause logging from thrift, so not adding logging here.
throw e;
} finally {
if (reserved) {
server.sessionManager.unreserveSession(us);
Expand Down Expand Up @@ -495,6 +523,20 @@ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDE
}

try {
if (us.unhandledException != null) {
// Since flush() is not being called, any memory added to the global queued mutations
// counter will not be decremented. So do that here before throwing an exception.
server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
us.queuedMutationSize = 0;
// make this memory available for GC
us.queuedMutations.clear();

// Something unexpected happened during this write session, so throw an exception here to
// cause a TApplicationException on the client side.
throw new IllegalStateException(
"Write session " + updateID + " saw an unexpected exception", us.unhandledException);
}

// clients may or may not see data from an update session while
// it is in progress, however when the update session is closed
// want to ensure that reads wait for the write to finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,12 @@ public void run() {
}

public long updateTotalQueuedMutationSize(long additionalMutationSize) {
return totalQueuedMutationSize.addAndGet(additionalMutationSize);
var newTotal = totalQueuedMutationSize.addAndGet(additionalMutationSize);
if (log.isTraceEnabled()) {
log.trace("totalQueuedMutationSize is now {} after adding {}", newTotal,
additionalMutationSize);
}
return newTotal;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class UpdateSession extends Session {
public long flushTime = 0;
public long queuedMutationSize = 0;
public final Durability durability;
public Exception unhandledException = null;

public UpdateSession(TservConstraintEnv env, TCredentials credentials, Durability durability) {
super(credentials);
Expand Down
149 changes: 149 additions & 0 deletions test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TMutation;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletingest.thrift.TDurability;
import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TServiceClient;
import org.junit.jupiter.api.Test;

public class CorruptMutationIT extends AccumuloClusterHarness {

@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "10");
}

@Test
public void testCorruptMutation() throws Exception {

String table = getUniqueNames(1)[0];
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
c.tableOperations().create(table);
try (BatchWriter writer = c.createBatchWriter(table)) {
Mutation m = new Mutation("1");
m.put("f1", "q1", new Value("v1"));
writer.addMutation(m);
}

var ctx = (ClientContext) c;
var tableId = ctx.getTableId(table);
var extent = new KeyExtent(tableId, null, null);
var tabletMetadata = ctx.getAmple().readTablet(extent, TabletMetadata.ColumnType.LOCATION);
var location = tabletMetadata.getLocation();
assertNotNull(location);
assertEquals(TabletMetadata.LocationType.CURRENT, location.getType());

TabletIngestClientService.Iface client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, location.getHostAndPort(), ctx);
// Make the same RPC calls made by the BatchWriter, but pass a corrupt serialized mutation in
// this try block.
try {
TInfo tinfo = TraceUtil.traceInfo();

long sessionId = client.startUpdate(tinfo, ctx.rpcCreds(), TDurability.DEFAULT);

// Write two valid mutations to the session. The tserver buffers data it receives via
// applyUpdates and may not write them until closeUpdate RPC is called. Because
// TSERV_TOTAL_MUTATION_QUEUE_MAX was set so small, these values should be written.
client.applyUpdates(tinfo, sessionId, extent.toThrift(),
List.of(createTMutation("abc", "z1"), createTMutation("def", "z2")));

// Simulate data corruption in the serialized mutation
TMutation badMutation = createTMutation("ghi", "z3");
badMutation.entries = -42;

// Write some good and bad mutations to the session. The server side will see an error here,
// however since this is a thrift oneway method no exception is expected here. This should
// leave the session in a broken state where it no longer accepts any new data.
client.applyUpdates(tinfo, sessionId, extent.toThrift(),
List.of(createTMutation("jkl", "z4"), badMutation, createTMutation("mno", "z5")));

// Write two more valid mutations to the session, these should be dropped on the server side
// because of the previous error. So should never see these updates.
client.applyUpdates(tinfo, sessionId, extent.toThrift(),
List.of(createTMutation("pqr", "z6"), createTMutation("stu", "z7")));

// Since client.applyUpdates experienced an error, should see an error when closing the
// session.
assertThrows(TApplicationException.class, () -> client.closeUpdate(tinfo, sessionId));
} finally {
ThriftUtil.returnClient((TServiceClient) client, ctx);
}

// The values that a scan must see
Set<String> expectedValues = Set.of("v1", "v2", "z1", "z2");

// The failed mutation should not have left the tablet in a bad state. Do some follow-on
// actions to ensure the tablet is still functional.
try (BatchWriter writer = c.createBatchWriter(table)) {
Mutation m = new Mutation("2");
m.put("f1", "q1", new Value("v2"));
writer.addMutation(m);
}

try (Scanner scanner = c.createScanner(table)) {
var valuesSeen =
scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet());
assertEquals(expectedValues, valuesSeen);
}

c.tableOperations().flush(table, null, null, true);

try (Scanner scanner = c.createScanner(table)) {
var valuesSeen =
scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet());
assertEquals(expectedValues, valuesSeen);
}
}
}

private static TMutation createTMutation(String row, String value) {
Mutation m = new Mutation(row);
m.put("x", "y", value);
return m.toThrift();
}
}

0 comments on commit 9c10eca

Please sign in to comment.