Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions exaudfclient/exa_lib_loader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <iostream>
#include <string>
#include <sstream>
#include <dlfcn.h>
#include <dlfcn.h>

#include "exa_lib_loader.h"
#include "utils/debug_message.h"

void* exa_load_libary(const std::string& stdLibPath) {
if (stdLibPath.empty()) {
return nullptr;
}

void* handle = dlmopen(LM_ID_NEWLM, stdLibPath.c_str(), RTLD_NOW);
if (!handle) {
std::cerr << "dlmopen error: " << dlerror() << "; while loading " << stdLibPath << std::endl;
return nullptr;
}
return handle;
}

void* exa_load_symbol(void *handle, const std::string& symbol_name) {
void *p_res = nullptr;
char *error = nullptr;
if(handle) {
p_res = dlsym(handle, symbol_name.c_str());

if((error = dlerror()) != nullptr) {
std::cerr << "Error when trying to load symbol '" << symbol_name << "': " << error << std::endl;
return nullptr;
}
}
return p_res;
}

5 changes: 5 additions & 0 deletions exaudfclient/exa_lib_loader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#pragma once
#include <string>

void* exa_load_libary(const std::string& stdLibPath);
void* exa_load_symbol(void *handle, const std::string& symbol_name);
17 changes: 17 additions & 0 deletions exaudfclient/exa_set_env.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include <iostream>
#include <cstdlib>
#include <cstdio>
#include <clocale>
#include <cstring>
#include <cerrno>

#include "exa_set_env.h"

void setup_environment() {
if (::setenv("HOME", "/tmp", 1) == -1) {
std::cerr << "Failed setting HOME env var: " << std::strerror(errno) << std::endl;
}
if (::setlocale(LC_ALL, "en_US.utf8") == nullptr) {
std::cerr << "Failed setting locale: " << std::strerror(errno) << std::endl;
}
}
3 changes: 3 additions & 0 deletions exaudfclient/exa_set_env.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#pragma once

void setup_environment();
110 changes: 110 additions & 0 deletions exaudfclient/exa_udf_base.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include <iostream>
#include <string>

#include "exaudf_lib_output_path.h"
#include "vm/swig_vm.h"
#include "exa_udf_base.h"
#include "exa_lib_loader.h"


void ExaUdfClientBase::parse_arguments(int argc, char** argv) {
Comment thread
sgn4sangar marked this conversation as resolved.
Outdated
// Now assumption is that all cmd line aguments are already validated.
m_socket = argv[1];
std::string strLangParam = argv[2];

if(strLangParam.find("python") != std::string::npos)
m_lang = ExaUdfLanguage::Python3;
else if(strLangParam.find("java") != std::string::npos)
m_lang = ExaUdfLanguage::Java;
else if(strLangParam.find("streaming") != std::string::npos)
m_lang = ExaUdfLanguage::Streaming;
else if(strLangParam.find("benchmark") != std::string::npos)
m_lang = ExaUdfLanguage::Benchmark;

mb_useCtpgParser = false;
if(argc == 4) {
mb_useCtpgParser = is_use_ctpg_parser(argv[3]);

@tomuben tomuben May 27, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a bug: If there is no fourth CLI parameter, the environment variable SCRIPT_OPTIONS_PARSER_VERSION is never eveluated!

}
}

// Parser option 2 means use ctpg parser.
// argv_parser_option is command line argument as it is
bool ExaUdfClientBase::is_use_ctpg_parser(const std::string& argv_parser_option) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_use_ctpg_parser is a strange name. Maybe just use_ctpg_parser?

bool use_ctpg_option_parser = false;

// env var has higher priority than argv value.
const char* env_val = ::getenv("SCRIPT_OPTIONS_PARSER_VERSION");
if(env_val) {
use_ctpg_option_parser = (strcmp(env_val, "2") == 0);
}
else if(argv_parser_option.compare("scriptOptionsParserVersion=2") == 0) {
use_ctpg_option_parser = true;
}
return use_ctpg_option_parser;
}

bool ExaUdfClientBase::validate_arguments(int argc, char** argv) {
Comment thread
sgn4sangar marked this conversation as resolved.
Outdated
if (argc < 3 || argc > 4) {
usage(argv[0]);
return false;
}

if (argc == 4 &&
strcmp(argv[3], "scriptOptionsParserVersion=1") != 0 &&
strcmp(argv[3], "scriptOptionsParserVersion=2") != 0) {
usage(argv[0]);
return false;
}

return true;
}

int ExaUdfClientBase::startClientBase(int argc, char** argv) {
if (!validate_arguments(argc, argv)) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
parse_arguments(argc, argv);

std::string libexaudflibPath;
#ifdef CUSTOM_LIBEXAUDFLIB_PATH
libexaudflibPath = std::string(CUSTOM_LIBEXAUDFLIB_PATH);
#else
libexaudflibPath = std::string(::getenv("LIBEXAUDFLIB_PATH"));
#endif
DBGMSG(std::cerr, "Load libexaudflib");
DBGVAR(std::cerr, libexaudflibPath);
void* handle = exa_load_libary(libexaudflibPath);
if (!handle) {
std::cerr << "Failed to load library: " << libexaudflibPath << std::endl;
exit(EXIT_FAILURE);
}

MAIN_FUN exaudfclient_main = (MAIN_FUN)exa_load_symbol(handle, "exaudfclient_main");
SET_SWIGVM_PARAMS set_SWIGVM_params = (SET_SWIGVM_PARAMS)exa_load_symbol(handle, "set_SWIGVM_params");

setup_environment();
std::function<SWIGVMContainers::SWIGVM*()> vmMaker;
switch(m_lang) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Base class should be an abstract class and call here create_vm, however don't implement it. You mixed implementation of the client that supports all languages with the implementation of the abstract class. For the moment, we need the ExaudfclientBase as abstract class, ExaudfclientAllLangs, and we can add for each language its Exaudfclient{Lang} if we want to, however I would wait with later until we do the split into repos.

case ExaUdfLanguage::Python3:
ExaUdfClientPython pythonClient;
vmMaker = pythonClient.create_vm();
break;
case ExaUdfLanguage::Java:
ExaUdfClientJava javaClient;
vmMaker = javaClient.create_vm();
break;
case ExaUdfLanguage::Streaming:
ExaUdfClientStreaming streamingClient;
vmMaker = streamingClient.create_vm();
break;
case ExaUdfLanguage::Benchmark:
ExaUdfClientBenchmark benchmarkClient;
vmMaker = benchmarkClient.create_vm();
break;
}

SWIGVMContainers::SWIGVM_params = new SWIGVMContainers::SWIGVM_params_t(true);
set_SWIGVM_params(SWIGVMContainers::SWIGVM_params);
return exaudfclient_main(vmMaker, argc, argv);
}
36 changes: 36 additions & 0 deletions exaudfclient/exa_udf_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once
#include <functional>

#include "vm/swig_vm.h"

namespace SWIGVMContainers {
__thread SWIGVM_params_t * SWIGVM_params = nullptr;
}

typedef bool (*SET_SWIGVM_PARAMS)(SWIGVM_params_t*);
typedef int (*MAIN_FUN)(std::function<SWIGVMContainers::SWIGVM*()> vmMaker, int, char**);

enum class ExaUdfLanguage {
Python3,
Java,
Streaming,
Benchmark
};

class ExaUdfClientBase {
public:
virtual ~ExaUdfClientBase() = default;
virtual void usage(const std::string& programName) = 0;
virtual std::function<SWIGVMContainers::SWIGVM*()> create_vm() = 0;

bool validate_arguments(int argc, char** argv);
Comment thread
sgn4sangar marked this conversation as resolved.
Outdated
void parse_arguments(int argc, char** argv);
Comment thread
sgn4sangar marked this conversation as resolved.
Outdated
int startClientBase(int argc, char** argv);
bool is_use_ctpg_parser(const std::string& argv_parser_option);

protected:
std::string m_socket;
Comment thread
sgn4sangar marked this conversation as resolved.
Outdated
std::string m_lang;
ExaUdfLanguage m_lang;
bool mb_useCtpgParser;
};
82 changes: 82 additions & 0 deletions exaudfclient/exa_udf_clients.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "exa_udf_python.h"
#include "vm/swig_vm.h"


//----------------------------------------Python UDF Client----------------------------------------
#ifdef ENABLE_PYTHON_VM
#include "python/pythoncontainer.h"
#endif //ENABLE_PYTHON_VM

void ExaUdfClientPython::usage(const std::string& programName) {
std::cerr << "Usage: " << programName << " <socket> lang=python <scriptOptionsParserVersion=1|2>"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scriptOptionsParserVersion is actually only relevant for Java

<< std::endl;
}

std::function<SWIGVMContainers::SWIGVM*()> ExaUdfClientPython::create_vm() {
#ifdef ENABLE_PYTHON_VM
return []() { return new SWIGVMContainers::PythonVM(false); };
#else
throw SWIGVMContainers::SWIGVM::exception("this exaudfclient has been compilied without Python support");
#endif
}


//----------------------------------------Java UDF Client----------------------------------------
#ifdef ENABLE_JAVA_VM
#include "javacontainer/javacontainer_builder.h"
#endif //ENABLE_JAVA_VM

void ExaUdfClientJava::usage(const std::string& programName) {
std::cerr << "Usage: " << programName << " <socket> lang=java <scriptOptionsParserVersion=1|2>"
<< std::endl;
}

std::function<SWIGVMContainers::SWIGVM*()> ExaUdfClientJava::create_vm() {
#ifdef ENABLE_JAVA_VM
if (mb_useCtpgParser) {
return [&](){return SWIGVMContainers::JavaContainerBuilder().useCtpgParser().build();};
} else {
return [&](){return SWIGVMContainers::JavaContainerBuilder().build();};
}
#else
throw SWIGVMContainers::SWIGVM::exception("this exaudfclient has been compilied without Java support");
#endif
}


//----------------------------------------Streaming UDF Client----------------------------------------
#ifdef ENABLE_STREAMING_VM
#include "streaming_container/streamingcontainer.h"
#endif

void ExaUdfClientStreaming::usage(const std::string& programName) {
std::cerr << "Usage: " << programName << " <socket> lang=streaming <scriptOptionsParserVersion=1|2>"
<< std::endl;
}

std::function<SWIGVMContainers::SWIGVM*()> ExaUdfClientStreaming::create_vm() {
#ifdef ENABLE_STREAMING_VM
return []() { return new SWIGVMContainers::StreamingVM(false); };
#else
throw SWIGVMContainers::SWIGVM::exception("this exaudfclient has been compilied without Streaming support");
#endif
}


//----------------------------------------Benchmark UDF Client----------------------------------------
#ifdef ENABLE_BENCHMARK_VM
#include "benchmark_container/benchmark_container.h"
#endif

void ExaUdfClientBenchmark::usage(const std::string& programName) {
std::cerr << "Usage: " << programName << " <socket> lang=benchmark <scriptOptionsParserVersion=1|2>"
<< std::endl;
}

std::function<SWIGVMContainers::SWIGVM*()> ExaUdfClientBenchmark::create_vm() {
#ifdef ENABLE_BENCHMARK_VM
return []() { return new SWIGVMContainers::BenchmarkVM(false); };
#else
throw SWIGVMContainers::SWIGVM::exception("this exaudfclient has been compilied without Benchmark support");
#endif
}
37 changes: 37 additions & 0 deletions exaudfclient/exa_udf_clients.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once
#include <string>

#include "exa_udf_base.h"
#include "vm/swig_vm.h"

class ExaUdfClientPython : public ExaUdfClientBase {
public:
~ExaUdfClientPython() override = default;

void usage(const std::string& programName) override;
std::function<SWIGVMContainers::SWIGVM*()> create_vm() override;
};

class ExaUdfClientJava : public ExaUdfClientBase {
public:
~ExaUdfClientJava() override = default;

void usage(const std::string& programName) override;
std::function<SWIGVMContainers::SWIGVM*()> create_vm() override;
};

class ExaUdfStreaming : public ExaUdfClientBase {
public:
~ExaUdfStreaming() override = default;

void usage(const std::string& programName) override;
std::function<SWIGVMContainers::SWIGVM*()> create_vm() override;
};

class ExaUdfClientBenchmark : public ExaUdfClientBase {
public:
~ExaUdfClientBenchmark() override = default;

void usage(const std::string& programName) override;
std::function<SWIGVMContainers::SWIGVM*()> create_vm() override;
};
Loading
Loading