128 lines
3.9 KiB
C++
128 lines
3.9 KiB
C++
#include "utils.hpp"
|
||
#include <fstream>
|
||
#include <sstream>
|
||
#include <stdexcept>
|
||
#include <numeric>
|
||
|
||
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) {
|
||
std::map<DayIndex, std::vector<Record>> days;
|
||
|
||
for (const auto& r : recs) {
|
||
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400;
|
||
days[day].push_back(r);
|
||
}
|
||
|
||
return days;
|
||
}
|
||
|
||
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts) {
|
||
std::vector<std::vector<DayIndex>> out(parts);
|
||
|
||
int i = 0;
|
||
for (auto& kv : days) {
|
||
out[i % parts].push_back(kv.first);
|
||
i++;
|
||
}
|
||
|
||
return out;
|
||
}
|
||
|
||
int get_num_cpu_threads() {
|
||
const char* env_threads = std::getenv("NUM_CPU_THREADS");
|
||
int num_cpu_threads = 1;
|
||
if (env_threads) {
|
||
num_cpu_threads = std::atoi(env_threads);
|
||
if (num_cpu_threads < 1) num_cpu_threads = 1;
|
||
}
|
||
return num_cpu_threads;
|
||
}
|
||
|
||
std::string get_env(const char* name) {
|
||
const char* env = std::getenv(name);
|
||
if (!env) {
|
||
throw std::runtime_error(std::string("Environment variable not set: ") + name);
|
||
}
|
||
return std::string(env);
|
||
}
|
||
|
||
std::string get_data_path() {
|
||
return get_env("DATA_PATH");
|
||
}
|
||
|
||
std::vector<int> get_data_read_shares() {
|
||
std::vector<int> shares;
|
||
std::stringstream ss(get_env("DATA_READ_SHARES"));
|
||
std::string item;
|
||
while (std::getline(ss, item, ',')) {
|
||
shares.push_back(std::stoi(item));
|
||
}
|
||
return shares;
|
||
}
|
||
|
||
int64_t get_read_overlap_bytes() {
|
||
return std::stoll(get_env("READ_OVERLAP_BYTES"));
|
||
}
|
||
|
||
int64_t get_file_size(const std::string& path) {
|
||
std::ifstream file(path, std::ios::binary | std::ios::ate);
|
||
if (!file.is_open()) {
|
||
throw std::runtime_error("Cannot open file: " + path);
|
||
}
|
||
return static_cast<int64_t>(file.tellg());
|
||
}
|
||
|
||
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
|
||
const std::vector<int>& shares, int64_t overlap_bytes) {
|
||
// Если shares пустой или не соответствует size, используем равные доли
|
||
std::vector<int> effective_shares;
|
||
if (shares.size() == static_cast<size_t>(size)) {
|
||
effective_shares = shares;
|
||
} else {
|
||
effective_shares.assign(size, 1);
|
||
}
|
||
|
||
int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0);
|
||
|
||
// Вычисляем базовые границы для каждого ранка
|
||
int64_t bytes_per_share = file_size / total_shares;
|
||
|
||
int64_t base_start = 0;
|
||
for (int i = 0; i < rank; i++) {
|
||
base_start += bytes_per_share * effective_shares[i];
|
||
}
|
||
|
||
int64_t base_end = base_start + bytes_per_share * effective_shares[rank];
|
||
|
||
// Применяем overlap
|
||
ByteRange range;
|
||
|
||
if (rank == 0) {
|
||
// Первый ранк: начинаем с 0, добавляем overlap в конце
|
||
range.start = 0;
|
||
range.end = std::min(base_end + overlap_bytes, file_size);
|
||
} else if (rank == size - 1) {
|
||
// Последний ранк: вычитаем overlap в начале, читаем до конца файла
|
||
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
|
||
range.end = file_size;
|
||
} else {
|
||
// Промежуточные ранки: overlap с обеих сторон
|
||
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
|
||
range.end = std::min(base_end + overlap_bytes, file_size);
|
||
}
|
||
|
||
return range;
|
||
}
|
||
|
||
void trim_edge_days(std::vector<DayStats>& days, int rank, int size) {
|
||
if (days.empty()) return;
|
||
|
||
if (rank == 0) {
|
||
days.pop_back();
|
||
} else if (rank == size - 1) {
|
||
days.erase(days.begin());
|
||
} else {
|
||
days.pop_back();
|
||
days.erase(days.begin());
|
||
}
|
||
}
|