Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/nf-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dependencies {
api('com.azure:azure-storage-blob:12.33.3') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
api('com.azure:azure-compute-batch:1.0.0-beta.3') {
api('com.azure:azure-compute-batch:1.0.0-beta.6') {
exclude group: 'org.slf4j', module: 'slf4j-api'
exclude group: 'com.google.guava', module: 'guava'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,29 @@ import com.azure.compute.batch.BatchClientBuilder
import com.azure.compute.batch.models.AutoUserScope
import com.azure.compute.batch.models.AutoUserSpecification
import com.azure.compute.batch.models.AzureFileShareConfiguration
import com.azure.compute.batch.models.BatchJobCreateContent
import com.azure.compute.batch.models.BatchJobCreateParameters
import com.azure.compute.batch.models.BatchJobConstraints
import com.azure.compute.batch.models.BatchJobUpdateContent
import com.azure.compute.batch.models.BatchJobUpdateParameters
import com.azure.compute.batch.models.BatchNodeFillType
import com.azure.compute.batch.models.BatchPool
import com.azure.compute.batch.models.BatchPoolCreateContent
import com.azure.compute.batch.models.BatchPoolCreateParameters
import com.azure.compute.batch.models.BatchPoolInfo
import com.azure.compute.batch.models.BatchPoolState
import com.azure.compute.batch.models.BatchStartTask
import com.azure.compute.batch.models.BatchSupportedImage
import com.azure.compute.batch.models.BatchTask
import com.azure.compute.batch.models.BatchTaskConstraints
import com.azure.compute.batch.models.BatchTaskContainerSettings
import com.azure.compute.batch.models.BatchTaskCreateContent
import com.azure.compute.batch.models.BatchTaskCreateParameters
import com.azure.compute.batch.models.BatchTaskSchedulingPolicy
import com.azure.compute.batch.models.ContainerConfiguration
import com.azure.compute.batch.models.BatchContainerConfiguration
import com.azure.compute.batch.models.ContainerRegistryReference
import com.azure.compute.batch.models.ContainerType
import com.azure.compute.batch.models.ElevationLevel
import com.azure.compute.batch.models.MetadataItem
import com.azure.compute.batch.models.BatchMetadataItem
import com.azure.compute.batch.models.MountConfiguration
import com.azure.compute.batch.models.NetworkConfiguration
import com.azure.compute.batch.models.OnAllBatchTasksComplete
import com.azure.compute.batch.models.BatchAllTasksCompleteMode
import com.azure.compute.batch.models.OutputFile
import com.azure.compute.batch.models.OutputFileBlobContainerDestination
import com.azure.compute.batch.models.OutputFileDestination
Expand Down Expand Up @@ -445,7 +445,7 @@ class AzBatchService implements Closeable {
log.debug "[AZURE BATCH] created job for ${task.processor.name} with pool ${poolId}"
// create a batch job
final jobId = makeJobId(task)
final content = new BatchJobCreateContent(jobId, new BatchPoolInfo(poolId: poolId))
final content = new BatchJobCreateParameters(jobId, new BatchPoolInfo(poolId: poolId))

if (config.batch().jobMaxWallClockTime) {
content.setConstraints(createJobConstraints(config.batch().jobMaxWallClockTime))
Expand All @@ -455,7 +455,7 @@ class AzBatchService implements Closeable {
return jobId
}

protected void applyCreateJob(BatchJobCreateContent content) {
protected void applyCreateJob(BatchJobCreateParameters content) {
final maxRetries = config.batch().maxJobQuotaRetries
final retryDelay = config.batch().jobQuotaRetryDelay

Expand Down Expand Up @@ -492,7 +492,7 @@ class AzBatchService implements Closeable {
}
}

protected void createJobRequest(BatchJobCreateContent content) {
protected void createJobRequest(BatchJobCreateParameters content) {
apply(() -> client.createJob(content))
}

Expand All @@ -518,7 +518,7 @@ class AzBatchService implements Closeable {
return key.size()>MAX_LEN ? key.substring(0,MAX_LEN) : key
}

protected BatchTaskCreateContent createTask(String poolId, String jobId, TaskRun task) {
protected BatchTaskCreateParameters createTask(String poolId, String jobId, TaskRun task) {
assert poolId, 'Missing Azure Batch poolId argument'
assert jobId, 'Missing Azure Batch jobId argument'
assert task, 'Missing Azure Batch task argument'
Expand Down Expand Up @@ -594,7 +594,7 @@ class AzBatchService implements Closeable {
final constraints = taskConstraints(task)

log.trace "[AZURE BATCH] Submitting task: $taskId, cpus=${task.config.getCpus()}, mem=${task.config.getMemory()?:'-'}, slots: $slots"
return new BatchTaskCreateContent(taskId, cmd)
return new BatchTaskCreateParameters(taskId, cmd)
.setUserIdentity(userIdentity(pool.opts.privileged, pool.opts.runAs, AutoUserScope.TASK))
.setContainerSettings(containerOpts)
.setResourceFiles(resourceFileUrls(task, sas))
Expand Down Expand Up @@ -840,7 +840,7 @@ class AzBatchService implements Closeable {
*
* https://github.com/MicrosoftDocs/azure-docs/blob/master/articles/batch/batch-docker-container-workloads.md#:~:text=Run%20container%20applications%20on%20Azure,compatible%20containers%20on%20the%20nodes.
*/
final containerConfig = new ContainerConfiguration(ContainerType.DOCKER_COMPATIBLE)
final containerConfig = new BatchContainerConfiguration(ContainerType.DOCKER_COMPATIBLE)
final registryOpts = config.registry()

if( registryOpts && registryOpts.isConfigured() ) {
Expand Down Expand Up @@ -891,7 +891,7 @@ class AzBatchService implements Closeable {

protected void createPool(AzVmPoolSpec spec) {

final poolParams = new BatchPoolCreateContent(spec.poolId, spec.vmType.name)
final poolParams = new BatchPoolCreateParameters(spec.poolId, spec.vmType.name)
.setVirtualMachineConfiguration(poolVmConfig(spec.opts))
// same as the number of cores
// maximum of 256, which is the limit on Azure Batch
Expand All @@ -906,7 +906,7 @@ class AzBatchService implements Closeable {
// resource labels
if( spec.metadata ) {
final metadata = spec.metadata.collect { name, value ->
new MetadataItem(name, value)
new BatchMetadataItem(name, value)
}
poolParams.setMetadata(metadata)
}
Expand Down Expand Up @@ -1036,8 +1036,8 @@ class AzBatchService implements Closeable {
final job = apply(() -> client.getJob(jobId))
final poolInfo = job.poolInfo

final jobParameter = new BatchJobUpdateContent()
.setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB)
final jobParameter = new BatchJobUpdateParameters()
.setAllTasksCompleteMode(BatchAllTasksCompleteMode.TERMINATE_JOB)
.setPoolInfo(poolInfo)

apply(() -> client.updateJob(jobId, jobParameter))
Expand All @@ -1056,7 +1056,7 @@ class AzBatchService implements Closeable {
for( String jobId : allJobIds.values() ) {
try {
log.trace "Deleting Azure job ${jobId}"
apply(() -> client.deleteJob(jobId))
apply(() -> client.beginDeleteJob(jobId).waitForCompletion())
}
catch (Exception e) {
log.warn "Unable to delete Azure Batch job ${jobId} - Reason: ${e.message ?: e}"
Expand All @@ -1067,7 +1067,7 @@ class AzBatchService implements Closeable {
protected void cleanupPools() {
for( String poolId : allPools.keySet() ) {
try {
apply(() -> client.deletePool(poolId))
apply(() -> client.beginDeletePool(poolId).waitForCompletion())
}
catch (Exception e) {
log.warn "Unable to delete Azure Batch pool ${poolId} - Reason: ${e.message ?: e}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import java.time.temporal.ChronoUnit
import java.util.function.Predicate

import com.azure.compute.batch.models.BatchPool
import com.azure.compute.batch.models.BatchJobCreateContent
import com.azure.compute.batch.models.BatchJobCreateParameters
import com.azure.compute.batch.models.ElevationLevel
import com.azure.compute.batch.models.EnvironmentSetting
import com.azure.core.exception.HttpResponseException
import com.azure.core.http.HttpResponse
import com.azure.json.JsonProviders
import com.azure.identity.ManagedIdentityCredential
import com.google.common.hash.HashCode
import nextflow.Global
Expand Down Expand Up @@ -627,7 +628,7 @@ class AzBatchServiceTest extends Specification {
when:
def result = svc.specFromPoolConfig(POOL_ID)
then:
1 * svc.getPool(_) >> new BatchPool(vmSize: 'Standard_D2_v2')
1 * svc.getPool(_) >> BatchPool.fromJson(JsonProviders.createReader('{"vmSize":"Standard_D2_v2"}'))
and:
result.vmType.name == 'Standard_D2_v2'
result.vmType.numberOfCores == 2
Expand Down Expand Up @@ -1048,7 +1049,7 @@ class AzBatchServiceTest extends Specification {
int createCalls = 0
def service = new AzBatchService(exec) {
@Override
protected void createJobRequest(BatchJobCreateContent content) {
protected void createJobRequest(BatchJobCreateParameters content) {
createCalls++
}
}
Expand All @@ -1068,7 +1069,7 @@ class AzBatchServiceTest extends Specification {
int createCalls = 0
def service = new AzBatchService(exec) {
@Override
protected void createJobRequest(BatchJobCreateContent content) {
protected void createJobRequest(BatchJobCreateParameters content) {
createCalls++
if (createCalls == 1)
throw new HttpResponseException('first call', null)
Expand All @@ -1094,7 +1095,7 @@ class AzBatchServiceTest extends Specification {
int createCalls = 0
def service = new AzBatchService(exec) {
@Override
protected void createJobRequest(BatchJobCreateContent content) {
protected void createJobRequest(BatchJobCreateParameters content) {
createCalls++
throw new HttpResponseException('quota error', null)
}
Expand All @@ -1121,7 +1122,7 @@ class AzBatchServiceTest extends Specification {
int createCalls = 0
def service = new AzBatchService(exec) {
@Override
protected void createJobRequest(BatchJobCreateContent content) {
protected void createJobRequest(BatchJobCreateParameters content) {
createCalls++
throw new HttpResponseException('quota error', null)
}
Expand All @@ -1147,7 +1148,7 @@ class AzBatchServiceTest extends Specification {
int createCalls = 0
def service = new AzBatchService(exec) {
@Override
protected void createJobRequest(BatchJobCreateContent content) {
protected void createJobRequest(BatchJobCreateParameters content) {
createCalls++
throw new HttpResponseException('Job already exists', null)
}
Expand All @@ -1172,7 +1173,7 @@ class AzBatchServiceTest extends Specification {
int createCalls = 0
def service = new AzBatchService(exec) {
@Override
protected void createJobRequest(BatchJobCreateContent content) {
protected void createJobRequest(BatchJobCreateParameters content) {
createCalls++
throw new IllegalArgumentException('unexpected error')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package nextflow.cloud.azure.batch

import com.azure.compute.batch.models.BatchTask
import com.azure.compute.batch.models.BatchTaskExecutionInfo
import com.azure.compute.batch.models.BatchTaskFailureInfo
import com.azure.compute.batch.models.BatchTaskState
import com.azure.compute.batch.models.ErrorCategory
import com.azure.json.JsonProviders
import com.sun.jna.platform.unix.X11
import nextflow.processor.TaskStatus

Expand Down Expand Up @@ -132,11 +129,7 @@ class AzBatchTaskHandlerTest extends Specification {
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
execInfo.exitCode = 0
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED
def azTask = BatchTask.fromJson(JsonProviders.createReader('{"state":"completed","executionInfo":{"retryCount":0,"requeueCount":0,"exitCode":0}}'))

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
Expand Down Expand Up @@ -169,11 +162,7 @@ class AzBatchTaskHandlerTest extends Specification {
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
execInfo.exitCode = 137
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED
def azTask = BatchTask.fromJson(JsonProviders.createReader('{"state":"completed","executionInfo":{"retryCount":0,"requeueCount":0,"exitCode":137}}'))

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
Expand Down Expand Up @@ -207,10 +196,7 @@ class AzBatchTaskHandlerTest extends Specification {
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED
def azTask = BatchTask.fromJson(JsonProviders.createReader('{"state":"completed","executionInfo":{"retryCount":0,"requeueCount":0}}'))

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
Expand Down Expand Up @@ -243,13 +229,7 @@ class AzBatchTaskHandlerTest extends Specification {
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
def failureInfo = new BatchTaskFailureInfo(ErrorCategory.USER_ERROR)
failureInfo.message = 'Unknown error'
execInfo.failureInfo = failureInfo
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED
def azTask = BatchTask.fromJson(JsonProviders.createReader('{"state":"completed","executionInfo":{"retryCount":0,"requeueCount":0,"failureInfo":{"category":"userError","message":"Unknown error"}}}'))

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
Expand Down
Loading