diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index 5b91c150c86a5..f650685b7ff94 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -29,7 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.ServiceLoader; import java.util.SortedSet; import java.util.TreeSet; @@ -77,7 +79,8 @@ private static String versionFor(Class pluginKlass) throws Refl @Override protected PluginScanResult scanPlugins(PluginSource source) { ClassGraph classGraphBuilder = new ClassGraph() - .addClassLoader(source.loader()) + .addClassLoader(source.loader()) + .overrideClassLoaders(classLoaderOrder(source)) .enableExternalClasses() .enableClassInfo(); try (ScanResult classGraph = classGraphBuilder.scan()) { @@ -105,6 +108,23 @@ private SortedSet>> getTransformationPluginDesc(Plu return (SortedSet>>) (SortedSet) getPluginDesc(classGraph, PluginType.TRANSFORMATION, source); } + private ClassLoader[] classLoaderOrder(PluginSource source) { + // By default, java and classgraph uses parent first classloading, hence if a plugin is loaded by the classpath + // loader, and then by an isolated plugin loader, the default precedence will always load the classpath version. + // This breaks isolation and hence connect uses isolated plugin loaders, which are child first classloaders. + // Therefore, we override the classloader order to be child first, so that the isolated plugin loader is used first. + // In addition, we need to explicitly specify the full classloader order, to force classgraph to scan classes in + // the class path. This does not happen by default as it uses reflections to obtain access to classpath URLs from + // the application classloader, which can fail with illegal access exceptions. + List classLoaderOrder = new ArrayList<>(); + ClassLoader cl = source.loader(); + while (cl != null) { + classLoaderOrder.add(cl); + cl = cl.getParent(); + } + return classLoaderOrder.toArray(new ClassLoader[0]); + } + @SuppressWarnings({"unchecked"}) private SortedSet> getPluginDesc( ScanResult classGraph, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java index d083f980349a1..ca099976444e9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.connect.storage.Converter; + import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -26,11 +28,13 @@ import java.nio.file.Path; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -137,6 +141,18 @@ public void testVersionedPluginsHasVersion(PluginScanner scanner) { versionedPluginResult.forEach(pluginDesc -> assertEquals("1.0.0", pluginDesc.version())); } + @ParameterizedTest + @MethodSource("parameters") + public void testClasspathPluginIsAlsoLoadedInIsolation(PluginScanner scanner) { + Set isolatedClassPathPlugin = TestPlugins.pluginPath(TestPlugins.TestPlugin.CLASSPATH_CONVERTER); + PluginScanResult result = scan(scanner, isolatedClassPathPlugin); + Optional> pluginDesc = result.converters().stream() + .filter(desc -> desc.className().equals(TestPlugins.TestPlugin.CLASSPATH_CONVERTER.className())) + .findFirst(); + assertTrue(pluginDesc.isPresent()); + assertInstanceOf(PluginClassLoader.class, pluginDesc.get().loader()); + } + private PluginScanResult scan(PluginScanner scanner, Set pluginLocations) { ClassLoaderFactory factory = new ClassLoaderFactory(); Set pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index c9d8892da91ce..adb2c2418d5fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -85,7 +85,8 @@ public enum TestPackage { SAMPLING_CONVERTER("sampling-converter"), SAMPLING_HEADER_CONVERTER("sampling-header-converter"), SERVICE_LOADER("service-loader"), - SUBCLASS_OF_CLASSPATH("subclass-of-classpath"); + SUBCLASS_OF_CLASSPATH("subclass-of-classpath"), + CLASSPATH_CONVERTER("classpath-converter"); private final String resourceDir; private final Predicate removeRuntimeClasses; @@ -251,7 +252,11 @@ public enum TestPlugin { /** * A ServiceLoader discovered plugin which subclasses another plugin which is present on the classpath */ - SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY(TestPackage.SUBCLASS_OF_CLASSPATH, "test.plugins.SubclassOfClasspathOverridePolicy"); + SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY(TestPackage.SUBCLASS_OF_CLASSPATH, "test.plugins.SubclassOfClasspathOverridePolicy"), + /** + * A plugin which is part of the classpath by default. This packages it as a separate jar which is used to test plugin isolation from the classpath plugin. + */ + CLASSPATH_CONVERTER(TestPackage.CLASSPATH_CONVERTER, "org.apache.kafka.connect.converters.ByteArrayConverter", false); private final TestPackage testPackage; private final String className; diff --git a/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 0000000000000..ae9c2a5820304 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.converters.ByteArrayConverter + diff --git a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java new file mode 100644 index 0000000000000..699d71635a042 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.converters; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.HeaderConverter; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +public class ByteArrayConverter implements Converter, HeaderConverter, Versioned { + + private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef(); + @Override + public String version() { + return AppInfoParser.getVersion(); + } + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void configure(Map configs) { + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + if (schema != null && schema.type() != Schema.Type.BYTES) + throw new DataException("Invalid schema type for ByteArrayConverter: " + schema.type().toString()); + + if (value != null && !(value instanceof byte[]) && !(value instanceof ByteBuffer)) + throw new DataException("ByteArrayConverter is not compatible with objects of type " + value.getClass()); + + return value instanceof ByteBuffer ? getBytesFromByteBuffer((ByteBuffer) value) : (byte[]) value; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value); + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return fromConnectData(topic, schema, value); + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return toConnectData(topic, value); + } + + @Override + public void close() { + // do nothing + } + + private byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return null; + } + + byteBuffer.rewind(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } +}