Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,42 @@
import org.apache.arrow.adbc.core.IngestOption;
import org.apache.arrow.adbc.core.IsolationLevel;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.jni.impl.ChildReferences;
import org.apache.arrow.adbc.driver.jni.impl.HasChildReferences;
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
import org.apache.arrow.adbc.driver.jni.impl.NativeConnectionHandle;
import org.apache.arrow.adbc.driver.jni.impl.NativeStatementHandle;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;

public class JniConnection implements AdbcConnection {
public class JniConnection implements AdbcConnection, HasChildReferences {
private final BufferAllocator allocator;
private final NativeConnectionHandle handle;
private final ChildReferences childReferences;
// Hold the owning database alive, and try to ensure this connection gets cleaned up before the
// database does
private @Nullable HasChildReferences parent;

public JniConnection(BufferAllocator allocator, NativeConnectionHandle handle) {
public JniConnection(
BufferAllocator allocator, HasChildReferences parent, NativeConnectionHandle handle) {
this.allocator = allocator;
this.handle = handle;
this.childReferences = new ChildReferences();
this.parent = parent;
parent.getChildReferences().addReference(this);
}

@Override
public ChildReferences getChildReferences() {
return childReferences;
}

@Override
public AdbcStatement createStatement() throws AdbcException {
return new JniStatement(allocator, JniLoader.INSTANCE.openStatement(handle));
return new JniStatement(allocator, this, JniLoader.INSTANCE.openStatement(handle));
}

@Override
Expand Down Expand Up @@ -108,7 +124,7 @@ AdbcStatement bulkIngestImpl(String targetTableName, BulkIngestMode mode, Ingest
}
}

return new JniStatement(allocator, stmtHandle);
return new JniStatement(allocator, this, stmtHandle);
} catch (Exception e) {
stmtHandle.close();
throw e;
Expand All @@ -117,7 +133,7 @@ AdbcStatement bulkIngestImpl(String targetTableName, BulkIngestMode mode, Ingest

@Override
public ArrowReader getInfo(int @Nullable [] infoCodes) throws AdbcException {
return JniLoader.INSTANCE.connectionGetInfo(handle, infoCodes).importStream(allocator);
return JniLoader.INSTANCE.connectionGetInfo(handle, infoCodes).importStream(allocator, this);
}

@Override
Expand All @@ -138,7 +154,7 @@ public ArrowReader getObjects(
tableNamePattern,
tableTypes,
columnNamePattern)
.importStream(allocator);
.importStream(allocator, this);
}

@Override
Expand All @@ -151,7 +167,7 @@ public Schema getTableSchema(String catalog, String dbSchema, String tableName)

@Override
public ArrowReader getTableTypes() throws AdbcException {
return JniLoader.INSTANCE.connectionGetTableTypes(handle).importStream(allocator);
return JniLoader.INSTANCE.connectionGetTableTypes(handle).importStream(allocator, this);
}

@Override
Expand Down Expand Up @@ -257,7 +273,9 @@ public void setCurrentDbSchema(String dbSchema) throws AdbcException {

@Override
public ArrowReader readPartition(ByteBuffer descriptor) throws AdbcException {
return JniLoader.INSTANCE.connectionReadPartition(handle, descriptor).importStream(allocator);
return JniLoader.INSTANCE
.connectionReadPartition(handle, descriptor)
.importStream(allocator, this);
}

@Override
Expand All @@ -267,17 +285,26 @@ public ArrowReader getStatistics(
return JniLoader.INSTANCE
.connectionGetStatistics(
handle, catalogPattern, dbSchemaPattern, tableNamePattern, approximate)
.importStream(allocator);
.importStream(allocator, this);
}

@Override
public ArrowReader getStatisticNames() throws AdbcException {
return JniLoader.INSTANCE.connectionGetStatisticNames(handle).importStream(allocator);
return JniLoader.INSTANCE.connectionGetStatisticNames(handle).importStream(allocator, this);
}

@Override
public void close() {
handle.close();
public void close() throws AdbcException {
try {
AutoCloseables.close(childReferences, handle);
} catch (Exception e) {
throw AdbcException.internal("[jni] failed to close connection").withCause(e);
}
final var parent = this.parent;
if (parent != null) {
parent.getChildReferences().releaseReference(this);
this.parent = null;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this and other similar blocks be in a finally so the child always gets detached? Just curious.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,37 @@
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.jni.impl.ChildReferences;
import org.apache.arrow.adbc.driver.jni.impl.HasChildReferences;
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
import org.apache.arrow.adbc.driver.jni.impl.NativeDatabaseHandle;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;

public class JniDatabase implements AdbcDatabase {
public class JniDatabase implements AdbcDatabase, HasChildReferences {
private final BufferAllocator allocator;
private final NativeDatabaseHandle handle;
private final ChildReferences childReferences;

public JniDatabase(BufferAllocator allocator, NativeDatabaseHandle handle) {
this.allocator = allocator;
this.handle = handle;
this.childReferences = new ChildReferences();
}

@Override
public ChildReferences getChildReferences() {
return childReferences;
}

@Override
public AdbcConnection connect() throws AdbcException {
return new JniConnection(allocator, JniLoader.INSTANCE.openConnection(handle));
return new JniConnection(allocator, this, JniLoader.INSTANCE.openConnection(handle));
}

@Override
public void close() {
handle.close();
public void close() throws Exception {
AutoCloseables.close(childReferences, handle);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JniConnection.close() and JniStatement.close() wrap failures with AdbcException.internal(...) but here we're just declaring throws exception and letting it propagate. Any reason for the difference? Should we wrap here also for consistency?


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.TypedKey;
import org.apache.arrow.adbc.driver.jni.impl.ChildReferences;
import org.apache.arrow.adbc.driver.jni.impl.HasChildReferences;
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
import org.apache.arrow.adbc.driver.jni.impl.NativePartitionResult;
import org.apache.arrow.adbc.driver.jni.impl.NativeQueryResult;
Expand All @@ -39,15 +41,28 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;

public class JniStatement implements AdbcStatement {
public class JniStatement implements AdbcStatement, HasChildReferences {
private final BufferAllocator allocator;
private final NativeStatementHandle handle;
private final ChildReferences childReferences;
// Hold the owning connection alive, and try to ensure this statement gets cleaned up before the
// connection does
private @Nullable HasChildReferences parent;
private @Nullable VectorSchemaRoot bindRoot;
private @Nullable ArrowReader bindStream;

public JniStatement(BufferAllocator allocator, NativeStatementHandle handle) {
public JniStatement(
BufferAllocator allocator, HasChildReferences parent, NativeStatementHandle handle) {
this.allocator = allocator;
this.handle = handle;
this.childReferences = new ChildReferences();
this.parent = parent;
parent.getChildReferences().addReference(this);
}

@Override
public ChildReferences getChildReferences() {
return childReferences;
}

@Override
Expand Down Expand Up @@ -126,7 +141,7 @@ public PartitionResult executePartitioned() throws AdbcException {
public QueryResult executeQuery() throws AdbcException {
exportBind();
NativeQueryResult result = JniLoader.INSTANCE.statementExecuteQuery(handle);
return new QueryResult(result.rowsAffected(), result.importStream(allocator));
return new QueryResult(result.rowsAffected(), result.importStream(allocator, this));
}

@Override
Expand Down Expand Up @@ -171,10 +186,15 @@ public Iterator<PartitionResult> pollPartitioned() throws AdbcException {
@Override
public void close() throws AdbcException {
try {
AutoCloseables.close(handle, bindStream);
AutoCloseables.close(childReferences, handle, bindStream);
} catch (Exception e) {
throw AdbcException.internal("[jni] failed to close statement").withCause(e);
}
final var parent = this.parent;
if (parent != null) {
parent.getChildReferences().releaseReference(this);
this.parent = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.adbc.driver.jni.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import org.apache.arrow.util.AutoCloseables;

/**
* Track child resources for the ADBC FFI.
*
* <p>You are supposed to close statements before closing the connection (etc.). This class helps
* track those references to prevent misuse at runtime.
*/
public final class ChildReferences implements AutoCloseable {
private final Set<AutoCloseable> openReferences;

public ChildReferences() {
this.openReferences = Collections.newSetFromMap(new WeakHashMap<>());
}
Comment on lines +37 to +41

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a one-line comment just to point out that WeakHashMap isn't thread-safe so that we don't overlook that in any future changes?


public void close() throws Exception {
try {
var closeables = new ArrayList<>(openReferences);
AutoCloseables.close(closeables);
} finally {
openReferences.clear();
}
}
Comment thread
lidavidm marked this conversation as resolved.

public void addReference(AutoCloseable any) {
openReferences.add(any);
}

public void releaseReference(AutoCloseable any) {
openReferences.remove(any);
}
Comment thread
lidavidm marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adbc.driver.jni.impl;

public interface HasChildReferences {
ChildReferences getChildReferences();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.checkerframework.checker.nullness.qual.Nullable;

public class NativeQueryResult {
private final long rowsAffected;
Expand All @@ -39,10 +40,11 @@ public long rowsAffected() {
}

/** Import the C Data stream into a Java ArrowReader. */
public ArrowReader importStream(BufferAllocator allocator) {
public ArrowReader importStream(BufferAllocator allocator, @Nullable HasChildReferences parent) {
try (final ArrowArrayStream cStream = ArrowArrayStream.allocateNew(allocator)) {
cStream.save(streamSnapshot);
return Data.importArrayStream(allocator, cStream);
final var reader = Data.importArrayStream(allocator, cStream);
return new TiedArrowReader(allocator, reader, parent);
}
}
}
Loading
Loading