Параллельное чтение данных
This commit is contained in:
@@ -1,4 +1,8 @@
|
||||
#include "utils.hpp"
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <numeric>
|
||||
|
||||
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) {
|
||||
std::map<DayIndex, std::vector<Record>> days;
|
||||
@@ -23,3 +27,88 @@ std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vect
|
||||
return out;
|
||||
}
|
||||
|
||||
int get_num_cpu_threads() {
|
||||
const char* env_threads = std::getenv("NUM_CPU_THREADS");
|
||||
int num_cpu_threads = 1;
|
||||
if (env_threads) {
|
||||
num_cpu_threads = std::atoi(env_threads);
|
||||
if (num_cpu_threads < 1) num_cpu_threads = 1;
|
||||
}
|
||||
return num_cpu_threads;
|
||||
}
|
||||
|
||||
std::string get_env(const char* name) {
|
||||
const char* env = std::getenv(name);
|
||||
if (!env) {
|
||||
throw std::runtime_error(std::string("Environment variable not set: ") + name);
|
||||
}
|
||||
return std::string(env);
|
||||
}
|
||||
|
||||
std::string get_data_path() {
|
||||
return get_env("DATA_PATH");
|
||||
}
|
||||
|
||||
std::vector<int> get_data_read_shares() {
|
||||
std::vector<int> shares;
|
||||
std::stringstream ss(get_env("DATA_READ_SHARES"));
|
||||
std::string item;
|
||||
while (std::getline(ss, item, ',')) {
|
||||
shares.push_back(std::stoi(item));
|
||||
}
|
||||
return shares;
|
||||
}
|
||||
|
||||
int64_t get_read_overlap_bytes() {
|
||||
return std::stoll(get_env("READ_OVERLAP_BYTES"));
|
||||
}
|
||||
|
||||
int64_t get_file_size(const std::string& path) {
|
||||
std::ifstream file(path, std::ios::binary | std::ios::ate);
|
||||
if (!file.is_open()) {
|
||||
throw std::runtime_error("Cannot open file: " + path);
|
||||
}
|
||||
return static_cast<int64_t>(file.tellg());
|
||||
}
|
||||
|
||||
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
|
||||
const std::vector<int>& shares, int64_t overlap_bytes) {
|
||||
// Если shares пустой или не соответствует size, используем равные доли
|
||||
std::vector<int> effective_shares;
|
||||
if (shares.size() == static_cast<size_t>(size)) {
|
||||
effective_shares = shares;
|
||||
} else {
|
||||
effective_shares.assign(size, 1);
|
||||
}
|
||||
|
||||
int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0);
|
||||
|
||||
// Вычисляем базовые границы для каждого ранка
|
||||
int64_t bytes_per_share = file_size / total_shares;
|
||||
|
||||
int64_t base_start = 0;
|
||||
for (int i = 0; i < rank; i++) {
|
||||
base_start += bytes_per_share * effective_shares[i];
|
||||
}
|
||||
|
||||
int64_t base_end = base_start + bytes_per_share * effective_shares[rank];
|
||||
|
||||
// Применяем overlap
|
||||
ByteRange range;
|
||||
|
||||
if (rank == 0) {
|
||||
// Первый ранк: начинаем с 0, добавляем overlap в конце
|
||||
range.start = 0;
|
||||
range.end = std::min(base_end + overlap_bytes, file_size);
|
||||
} else if (rank == size - 1) {
|
||||
// Последний ранк: вычитаем overlap в начале, читаем до конца файла
|
||||
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
|
||||
range.end = file_size;
|
||||
} else {
|
||||
// Промежуточные ранки: overlap с обеих сторон
|
||||
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
|
||||
range.end = std::min(base_end + overlap_bytes, file_size);
|
||||
}
|
||||
|
||||
return range;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user