Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.carbondata.core.locks;

import java.util.ArrayList;
import java.util.List;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
Expand Down Expand Up @@ -134,7 +137,6 @@ public static void deleteExpiredSegmentLockFiles(CarbonTable carbonTable) {
}
CarbonFile[] files = FileFactory.getCarbonFile(lockFilesDir)
.listFiles(new CarbonFileFilter() {

@Override
public boolean accept(CarbonFile pathName) {
if (CarbonTablePath.isSegmentLockFilePath(pathName.getName())) {
Expand All @@ -149,4 +151,28 @@ public boolean accept(CarbonFile pathName) {
file.delete();
}
}

public static List<ICarbonLock> acquireLocks(CarbonTable carbonTable,
List<String> locksToBeAcquired) {
List<ICarbonLock> acquiredLocks = new ArrayList<>();
try {
locksToBeAcquired.forEach(lock ->
acquiredLocks.add(CarbonLockUtil
.getLockObject(carbonTable.getAbsoluteTableIdentifier(), lock))
);
} catch (Exception e) {
releaseLocks(acquiredLocks);
}
return acquiredLocks;
}

public static void releaseLocks(List<ICarbonLock> locks) {
locks.forEach(carbonLock -> {
if (carbonLock.unlock()) {
LOGGER.info("Alter table lock released successfully");
} else {
LOGGER.error("Unable to release lock");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -992,13 +993,13 @@ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId
List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments,
String uuid) throws IOException {
if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
Set<Segment> segmentSet = new HashSet<>(
Set<String> segmentSet = new HashSet<>(
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments(carbonTable.isMV()).getValidSegments());
.getValidAndInvalidSegments(carbonTable.isMV()).getValidSegments())
.stream().map(segment -> segment.getSegmentNo()).collect(Collectors.toSet());
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId,
true, false,
Segment.toSegmentList(toBeDeleteSegments, null),
Segment.toSegmentList(toBeUpdatedSegments, null), uuid);
new HashSet<>(toBeDeleteSegments), new HashSet<>(toBeUpdatedSegments), uuid);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public static String getDeleteDeltaFilePath(String blockPath, String blockName,
String timestamp) {
return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName
+ CarbonCommonConstants.HYPHEN + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;

}

/**
Expand All @@ -146,6 +145,10 @@ public static String getDeleteDeltaFilePath(String blockPath, String blockName,
*/
public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList,
CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
if (updateDetailsList.isEmpty()) {
return true;
}

boolean status = false;
SegmentUpdateStatusManager segmentUpdateStatusManager = new SegmentUpdateStatusManager(table);
ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
Expand Down Expand Up @@ -236,12 +239,13 @@ public static void mergeSegmentUpdate(boolean isCompaction, List<SegmentUpdateDe
* @param segmentsToBeDeleted
* @return
*/
public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdateRequired,
boolean isUpdateStatusFileUpdateRequired, List<Segment> segmentsToBeDeleted) {
boolean isUpdateStatusFileUpdateRequired, Set<String> segmentsToBeDeleted,
Set<String> segmentsToBeUpdated, String uuid) throws IOException {
return updateTableMetadataStatus(updatedSegmentsList, table, updatedTimeStamp,
isTimestampUpdateRequired, isUpdateStatusFileUpdateRequired,
segmentsToBeDeleted, new ArrayList<Segment>(), "");
segmentsToBeDeleted, segmentsToBeUpdated, uuid, null);
}

/**
Expand All @@ -253,10 +257,11 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList
* @param segmentsToBeDeleted
* @return
*/
public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdateRequired,
boolean isUpdateStatusFileUpdateRequired, List<Segment> segmentsToBeDeleted,
List<Segment> segmentFilesTobeUpdated, String uuid) {
boolean isUpdateStatusFileUpdateRequired, Set<String> segmentsToBeDeleted,
Set<String> segmentFilesTobeUpdated, String uuid,
LoadMetadataDetails newLoadEntry) throws IOException {

boolean status = false;
String metaDataFilepath = table.getMetadataPath();
Expand Down Expand Up @@ -286,13 +291,13 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList

if (isTimestampUpdateRequired) {
// if the segments is in the list of marked for delete then update the status.
if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) {
if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrDeletionTimestamp(Long.parseLong(updatedTimeStamp));
}
}
for (Segment segName : updatedSegmentsList) {
if (loadMetadata.getLoadName().equalsIgnoreCase(segName.getSegmentNo())) {
for (String segName : updatedSegmentsList) {
if (loadMetadata.getLoadName().equalsIgnoreCase(segName)) {
// if this call is coming from the delete delta flow then the time stamp
// String will come empty then no need to write into table status file.
if (isTimestampUpdateRequired) {
Expand All @@ -304,15 +309,35 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList
// update end timestamp for each time.
loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
}
if (segmentFilesTobeUpdated
.contains(Segment.toSegment(loadMetadata.getLoadName(), null))) {
if (segmentFilesTobeUpdated.contains(loadMetadata.getLoadName())) {
loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
+ CarbonTablePath.SEGMENT_EXT);
}
}
}
}

if (newLoadEntry != null) {
// existing entry needs to be overwritten as the entry will exist with some
// intermediate status
int indexToOverwriteNewMetaEntry = 0;
boolean found = false;
for (LoadMetadataDetails entry : listOfLoadFolderDetailsArray) {
if (entry.getLoadName().equals(newLoadEntry.getLoadName())
&& entry.getLoadStartTime() == newLoadEntry.getLoadStartTime()) {
newLoadEntry.setExtraInfo(entry.getExtraInfo());
found = true;
break;
}
indexToOverwriteNewMetaEntry++;
}
if (!found) {
LOGGER.error("Entry not found to update " + newLoadEntry + " From list");
throw new IOException("Entry not found to update in the table status file");
}
listOfLoadFolderDetailsArray[indexToOverwriteNewMetaEntry] = newLoadEntry;
}

try {
SegmentStatusManager
.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
Expand Down Expand Up @@ -537,7 +562,7 @@ public static long readCurrentTime() {
* @param segmentBlockCount
*/
public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
Map<String, Long> segmentBlockCount) {
Map<String, Long> segmentBlockCount) {

String segId = details.getSegmentName();

Expand All @@ -550,16 +575,13 @@ public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
* @param segmentBlockCount
* @return
*/
public static List<Segment> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) {
List<Segment> segmentsToBeDeleted =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

public static Set<String> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) {
Set<String> segmentsToBeDeleted =
new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {

if (eachSeg.getValue() == 0) {
segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), ""));
segmentsToBeDeleted.add(eachSeg.getKey());
}

}
return segmentsToBeDeleted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
Expand Down Expand Up @@ -364,14 +363,14 @@ public boolean accept(CarbonFile pathName) {
* @param blockName the specified block of the segment
* @return delete delta file list of the block
*/
public List<String> getDeleteDeltaFilesList(final Segment segment, final String blockName) {
public List<String> getDeleteDeltaFilesList(final String segment, final String blockName) {
List<String> deleteDeltaFileList = new ArrayList<>();
String segmentPath = CarbonTablePath.getSegmentPath(
identifier.getTablePath(), segment.getSegmentNo());
identifier.getTablePath(), segment);

for (SegmentUpdateDetails block : updateDetails) {
if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
(block.getSegmentName().equalsIgnoreCase(segment.getSegmentNo())) &&
(block.getSegmentName().equalsIgnoreCase(segment)) &&
!CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
Set<String> deltaFileTimestamps = block.getDeltaFileStamps();
if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) {
Expand Down
Loading