Skip to content

Commit fecd9c2

Browse files
sunbychyezhMrPresent-Hantedxujaime0815
authored
feat: LRU cache implementation (milvus-io#32567)
issue: milvus-io#32783 This pr is the implementation of lru cache on branch lru-dev. Signed-off-by: sunby <sunbingyi1992@gmail.com> Co-authored-by: chyezh <chyezh@outlook.com> Co-authored-by: MrPresent-Han <chun.han@zilliz.com> Co-authored-by: Ted Xu <ted.xu@zilliz.com> Co-authored-by: jaime <yun.zhang@zilliz.com> Co-authored-by: wayblink <anyang.wang@zilliz.com>
1 parent 37a99ca commit fecd9c2

74 files changed

Lines changed: 3502 additions & 968 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

configs/milvus.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ queryNode:
367367
maxQueueLength: 16 # Maximum length of task queue in flowgraph
368368
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
369369
enableSegmentPrune: false # use partition prune function on shard delegator
370+
useStreamComputing: false
370371
ip: # if not specified, use the first unicastable address
371372
port: 21123
372373
grpc:

internal/core/src/index/VectorDiskIndex.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,11 @@ VectorDiskAnnIndex<T>::update_load_json(const Config& config) {
502502
}
503503
}
504504

505+
if (config.contains(kMmapFilepath)) {
506+
load_config.erase(kMmapFilepath);
507+
load_config[kEnableMmap] = true;
508+
}
509+
505510
return load_config;
506511
}
507512

internal/core/src/index/VectorMemIndex.cpp

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
#include <unistd.h>
2020
#include <cmath>
21-
#include <cstdint>
2221
#include <cstring>
2322
#include <filesystem>
2423
#include <memory>
@@ -33,7 +32,6 @@
3332

3433
#include "index/Index.h"
3534
#include "index/IndexInfo.h"
36-
#include "index/Meta.h"
3735
#include "index/Utils.h"
3836
#include "common/EasyAssert.h"
3937
#include "config/ConfigKnowhere.h"
@@ -44,16 +42,15 @@
4442
#include "common/FieldData.h"
4543
#include "common/File.h"
4644
#include "common/Slice.h"
47-
#include "common/Tracer.h"
4845
#include "common/RangeSearchHelper.h"
4946
#include "common/Utils.h"
5047
#include "log/Log.h"
51-
#include "mmap/Types.h"
5248
#include "storage/DataCodec.h"
5349
#include "storage/MemFileManagerImpl.h"
5450
#include "storage/ThreadPools.h"
5551
#include "storage/space.h"
5652
#include "storage/Util.h"
53+
#include "storage/prometheus_client.h"
5754

5855
namespace milvus::index {
5956

@@ -733,7 +730,8 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
733730
}
734731

735732
LOG_INFO("load with slice meta: {}", !slice_meta_filepath.empty());
736-
733+
std::chrono::duration<double> load_duration_sum;
734+
std::chrono::duration<double> write_disk_duration_sum;
737735
if (!slice_meta_filepath
738736
.empty()) { // load with the slice meta info, then we can load batch by batch
739737
std::string index_file_prefix = slice_meta_filepath.substr(
@@ -751,15 +749,20 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
751749
std::string prefix = item[NAME];
752750
int slice_num = item[SLICE_NUM];
753751
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
754-
755752
auto HandleBatch = [&](int index) {
753+
auto start_load2_mem = std::chrono::system_clock::now();
756754
auto batch_data = file_manager_->LoadIndexToMemory(batch);
755+
load_duration_sum +=
756+
(std::chrono::system_clock::now() - start_load2_mem);
757757
for (int j = index - batch.size() + 1; j <= index; j++) {
758758
std::string file_name = GenSlicedFileName(prefix, j);
759759
AssertInfo(batch_data.find(file_name) != batch_data.end(),
760760
"lost index slice data");
761761
auto data = batch_data[file_name];
762+
auto start_write_file = std::chrono::system_clock::now();
762763
auto written = file.Write(data->Data(), data->Size());
764+
write_disk_duration_sum +=
765+
(std::chrono::system_clock::now() - start_write_file);
763766
AssertInfo(
764767
written == data->Size(),
765768
fmt::format("failed to write index data to disk {}: {}",
@@ -784,24 +787,46 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
784787
}
785788
}
786789
} else {
790+
//1. load files into memory
791+
auto start_load_files2_mem = std::chrono::system_clock::now();
787792
auto result = file_manager_->LoadIndexToMemory(std::vector<std::string>(
788793
pending_index_files.begin(), pending_index_files.end()));
794+
load_duration_sum +=
795+
(std::chrono::system_clock::now() - start_load_files2_mem);
796+
//2. write data into files
797+
auto start_write_file = std::chrono::system_clock::now();
789798
for (auto& [_, index_data] : result) {
790799
file.Write(index_data->Data(), index_data->Size());
791800
}
792-
}
801+
write_disk_duration_sum +=
802+
(std::chrono::system_clock::now() - start_write_file);
803+
}
804+
milvus::storage::internal_storage_download_duration.Observe(
805+
std::chrono::duration_cast<std::chrono::milliseconds>(load_duration_sum)
806+
.count());
807+
milvus::storage::internal_storage_write_disk_duration.Observe(
808+
std::chrono::duration_cast<std::chrono::milliseconds>(
809+
write_disk_duration_sum)
810+
.count());
793811
file.Close();
794812

795813
LOG_INFO("load index into Knowhere...");
796814
auto conf = config;
797815
conf.erase(kMmapFilepath);
798816
conf[kEnableMmap] = true;
817+
auto start_deserialize = std::chrono::system_clock::now();
799818
auto stat = index_.DeserializeFromFile(filepath.value(), conf);
819+
auto deserialize_duration =
820+
std::chrono::system_clock::now() - start_deserialize;
800821
if (stat != knowhere::Status::success) {
801822
PanicInfo(ErrorCode::UnexpectedError,
802823
"failed to Deserialize index: {}",
803824
KnowhereStatusString(stat));
804825
}
826+
milvus::storage::internal_storage_deserialize_duration.Observe(
827+
std::chrono::duration_cast<std::chrono::milliseconds>(
828+
deserialize_duration)
829+
.count());
805830

806831
auto dim = index_.Dim();
807832
this->SetDim(index_.Dim());
@@ -811,7 +836,18 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
811836
"failed to unlink mmap index file {}: {}",
812837
filepath.value(),
813838
strerror(errno));
814-
LOG_INFO("load vector index done");
839+
LOG_INFO(
840+
"load vector index done, mmap_file_path:{}, download_duration:{}, "
841+
"write_files_duration:{}, deserialize_duration:{}",
842+
filepath.value(),
843+
std::chrono::duration_cast<std::chrono::milliseconds>(load_duration_sum)
844+
.count(),
845+
std::chrono::duration_cast<std::chrono::milliseconds>(
846+
write_disk_duration_sum)
847+
.count(),
848+
std::chrono::duration_cast<std::chrono::milliseconds>(
849+
deserialize_duration)
850+
.count());
815851
}
816852

817853
template <typename T>

internal/core/src/segcore/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ set(SEGCORE_FILES
2525
SegmentSealedImpl.cpp
2626
FieldIndexing.cpp
2727
Reduce.cpp
28+
StreamReduce.cpp
2829
metrics_c.cpp
2930
plan_c.cpp
3031
reduce_c.cpp
@@ -36,7 +37,8 @@ set(SEGCORE_FILES
3637
segcore_init_c.cpp
3738
TimestampIndex.cpp
3839
Utils.cpp
39-
ConcurrentVector.cpp)
40+
ConcurrentVector.cpp
41+
ReduceUtils.cpp)
4042
add_library(milvus_segcore SHARED ${SEGCORE_FILES})
4143

4244
target_link_libraries(milvus_segcore milvus_query milvus_bitset milvus_exec ${OpenMP_CXX_FLAGS} milvus-storage)

internal/core/src/segcore/ConcurrentVector.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,35 +41,36 @@ class ThreadSafeVector {
4141
template <typename... Args>
4242
void
4343
emplace_to_at_least(int64_t size, Args... args) {
44+
std::lock_guard lck(mutex_);
4445
if (size <= size_) {
4546
return;
4647
}
47-
std::lock_guard lck(mutex_);
4848
while (vec_.size() < size) {
4949
vec_.emplace_back(std::forward<Args...>(args...));
5050
++size_;
5151
}
5252
}
5353
const Type&
5454
operator[](int64_t index) const {
55+
std::shared_lock lck(mutex_);
5556
AssertInfo(index < size_,
5657
fmt::format(
5758
"index out of range, index={}, size_={}", index, size_));
58-
std::shared_lock lck(mutex_);
5959
return vec_[index];
6060
}
6161

6262
Type&
6363
operator[](int64_t index) {
64+
std::shared_lock lck(mutex_);
6465
AssertInfo(index < size_,
6566
fmt::format(
6667
"index out of range, index={}, size_={}", index, size_));
67-
std::shared_lock lck(mutex_);
6868
return vec_[index];
6969
}
7070

7171
int64_t
7272
size() const {
73+
std::lock_guard lck(mutex_);
7374
return size_;
7475
}
7576

@@ -81,7 +82,7 @@ class ThreadSafeVector {
8182
}
8283

8384
private:
84-
std::atomic<int64_t> size_ = 0;
85+
int64_t size_ = 0;
8586
std::deque<Type> vec_;
8687
mutable std::shared_mutex mutex_;
8788
};

internal/core/src/segcore/InsertRecord.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,11 @@ struct InsertRecord {
598598
fields_data_.clear();
599599
}
600600

601+
bool
602+
empty() const {
603+
return pk2offset_->empty();
604+
}
605+
601606
public:
602607
ConcurrentVector<Timestamp> timestamps_;
603608
ConcurrentVector<idx_t> row_ids_;

internal/core/src/segcore/Reduce.cpp

Lines changed: 4 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "Utils.h"
2020
#include "common/EasyAssert.h"
2121
#include "pkVisitor.h"
22+
#include "ReduceUtils.h"
2223

2324
namespace milvus::segcore {
2425

@@ -130,7 +131,6 @@ ReduceHelper::FilterInvalidSearchResult(SearchResult* search_result) {
130131

131132
void
132133
ReduceHelper::FillPrimaryKey() {
133-
std::vector<SearchResult*> valid_search_results;
134134
// get primary keys for duplicates removal
135135
uint32_t valid_index = 0;
136136
for (auto& search_result : search_results_) {
@@ -368,7 +368,7 @@ ReduceHelper::GetSearchResultDataSlice(int slice_index) {
368368
search_result_data->set_all_search_count(all_search_count);
369369

370370
// `result_pairs` contains the SearchResult and result_offset info, used for filling output fields
371-
std::vector<std::pair<SearchResult*, int64_t>> result_pairs(result_count);
371+
std::vector<MergeBase> result_pairs(result_count);
372372

373373
// reserve space for pks
374374
auto primary_field_id =
@@ -461,14 +461,14 @@ ReduceHelper::GetSearchResultDataSlice(int slice_index) {
461461
group_by_values[loc] =
462462
search_result->group_by_values_.value()[ki];
463463
// set result offset to fill output fields data
464-
result_pairs[loc] = std::make_pair(search_result, ki);
464+
result_pairs[loc] = {&search_result->output_fields_data_, ki};
465465
}
466466
}
467467

468468
// update result topKs
469469
search_result_data->mutable_topks()->Set(qi - nq_begin, topk_count);
470470
}
471-
AssembleGroupByValues(search_result_data, group_by_values);
471+
AssembleGroupByValues(search_result_data, group_by_values, plan_);
472472

473473
AssertInfo(search_result_data->scores_size() == result_count,
474474
"wrong scores size, size = " +
@@ -498,89 +498,4 @@ ReduceHelper::GetSearchResultDataSlice(int slice_index) {
498498
return buffer;
499499
}
500500

501-
void
502-
ReduceHelper::AssembleGroupByValues(
503-
std::unique_ptr<milvus::proto::schema::SearchResultData>& search_result,
504-
const std::vector<GroupByValueType>& group_by_vals) {
505-
auto group_by_field_id = plan_->plan_node_->search_info_.group_by_field_id_;
506-
if (group_by_field_id.has_value() && group_by_vals.size() > 0) {
507-
auto group_by_values_field =
508-
std::make_unique<milvus::proto::schema::ScalarField>();
509-
auto group_by_field =
510-
plan_->schema_.operator[](group_by_field_id.value());
511-
DataType group_by_data_type = group_by_field.get_data_type();
512-
513-
int group_by_val_size = group_by_vals.size();
514-
switch (group_by_data_type) {
515-
case DataType::INT8: {
516-
auto field_data = group_by_values_field->mutable_int_data();
517-
field_data->mutable_data()->Resize(group_by_val_size, 0);
518-
for (std::size_t idx = 0; idx < group_by_val_size; idx++) {
519-
int8_t val = std::get<int8_t>(group_by_vals[idx]);
520-
field_data->mutable_data()->Set(idx, val);
521-
}
522-
break;
523-
}
524-
case DataType::INT16: {
525-
auto field_data = group_by_values_field->mutable_int_data();
526-
field_data->mutable_data()->Resize(group_by_val_size, 0);
527-
for (std::size_t idx = 0; idx < group_by_val_size; idx++) {
528-
int16_t val = std::get<int16_t>(group_by_vals[idx]);
529-
field_data->mutable_data()->Set(idx, val);
530-
}
531-
break;
532-
}
533-
case DataType::INT32: {
534-
auto field_data = group_by_values_field->mutable_int_data();
535-
field_data->mutable_data()->Resize(group_by_val_size, 0);
536-
for (std::size_t idx = 0; idx < group_by_val_size; idx++) {
537-
int32_t val = std::get<int32_t>(group_by_vals[idx]);
538-
field_data->mutable_data()->Set(idx, val);
539-
}
540-
break;
541-
}
542-
case DataType::INT64: {
543-
auto field_data = group_by_values_field->mutable_long_data();
544-
field_data->mutable_data()->Resize(group_by_val_size, 0);
545-
for (std::size_t idx = 0; idx < group_by_val_size; idx++) {
546-
int64_t val = std::get<int64_t>(group_by_vals[idx]);
547-
field_data->mutable_data()->Set(idx, val);
548-
}
549-
break;
550-
}
551-
case DataType::BOOL: {
552-
auto field_data = group_by_values_field->mutable_bool_data();
553-
field_data->mutable_data()->Resize(group_by_val_size, 0);
554-
for (std::size_t idx = 0; idx < group_by_val_size; idx++) {
555-
bool val = std::get<bool>(group_by_vals[idx]);
556-
field_data->mutable_data()->Set(idx, val);
557-
}
558-
break;
559-
}
560-
case DataType::VARCHAR: {
561-
auto field_data = group_by_values_field->mutable_string_data();
562-
for (std::size_t idx = 0; idx < group_by_val_size; idx++) {
563-
std::string val =
564-
std::move(std::get<std::string>(group_by_vals[idx]));
565-
*(field_data->mutable_data()->Add()) = val;
566-
}
567-
break;
568-
}
569-
default: {
570-
PanicInfo(
571-
DataTypeInvalid,
572-
fmt::format("unsupported datatype for group_by operations ",
573-
group_by_data_type));
574-
}
575-
}
576-
577-
search_result->mutable_group_by_field_value()->set_type(
578-
milvus::proto::schema::DataType(group_by_data_type));
579-
search_result->mutable_group_by_field_value()
580-
->mutable_scalars()
581-
->MergeFrom(*group_by_values_field.get());
582-
return;
583-
}
584-
}
585-
586501
} // namespace milvus::segcore

0 commit comments

Comments
 (0)