@@ -7760,4 +7760,157 @@ TEST_F(BlockFileCacheTest, test_upgrade_cache_dir_version) {
7760
7760
}
7761
7761
}
7762
7762
7763
+ TEST_F (BlockFileCacheTest, cached_remote_file_reader_ttl_index) {
7764
+ if (fs::exists (cache_base_path)) {
7765
+ fs::remove_all (cache_base_path);
7766
+ }
7767
+ fs::create_directories (cache_base_path);
7768
+ TUniqueId query_id;
7769
+ query_id.hi = 1 ;
7770
+ query_id.lo = 1 ;
7771
+ io::FileCacheSettings settings;
7772
+ settings.query_queue_size = 6291456 ;
7773
+ settings.query_queue_elements = 6 ;
7774
+ settings.index_queue_size = 1048576 ;
7775
+ settings.index_queue_elements = 1 ;
7776
+ settings.disposable_queue_size = 1048576 ;
7777
+ settings.disposable_queue_elements = 1 ;
7778
+ settings.capacity = 8388608 ;
7779
+ settings.max_file_block_size = 1048576 ;
7780
+ settings.max_query_cache_size = 0 ;
7781
+ io::CacheContext context;
7782
+ ReadStatistics rstats;
7783
+ context.stats = &rstats;
7784
+ context.query_id = query_id;
7785
+ ASSERT_TRUE (FileCacheFactory::instance ()->create_file_cache (cache_base_path, settings).ok ());
7786
+ BlockFileCache* cache = FileCacheFactory::instance ()->get_by_path (cache_base_path);
7787
+
7788
+ for (int i = 0 ; i < 100 ; i++) {
7789
+ if (cache->get_async_open_success ()) {
7790
+ break ;
7791
+ };
7792
+ std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
7793
+ }
7794
+
7795
+ FileReaderSPtr local_reader;
7796
+ ASSERT_TRUE (global_local_filesystem ()->open_file (tmp_file, &local_reader));
7797
+ io::FileReaderOptions opts;
7798
+ opts.cache_type = io::cache_type_from_string (" file_block_cache" );
7799
+ opts.is_doris_table = true ;
7800
+ CachedRemoteFileReader reader (local_reader, opts);
7801
+ auto key = io::BlockFileCache::hash (" tmp_file" );
7802
+ EXPECT_EQ (reader._cache_hash , key);
7803
+ EXPECT_EQ (local_reader->path ().native (), reader.path ().native ());
7804
+ EXPECT_EQ (local_reader->size (), reader.size ());
7805
+ EXPECT_FALSE (reader.closed ());
7806
+ EXPECT_EQ (local_reader->path ().native (), reader.get_remote_reader ()->path ().native ());
7807
+ {
7808
+ std::string buffer;
7809
+ buffer.resize (64_kb);
7810
+ IOContext io_ctx;
7811
+ FileCacheStatistics stats;
7812
+ io_ctx.file_cache_stats = &stats;
7813
+ io_ctx.is_index_data = true ;
7814
+ int64_t cur_time = UnixSeconds ();
7815
+ io_ctx.expiration_time = cur_time + 120 ;
7816
+ size_t bytes_read {0 };
7817
+ EXPECT_TRUE (
7818
+ reader.read_at (0 , Slice (buffer.data (), buffer.size ()), &bytes_read, &io_ctx).ok ());
7819
+ }
7820
+ std::this_thread::sleep_for (std::chrono::seconds (3 ));
7821
+ LOG (INFO) << " ttl:" << cache->_ttl_queue .cache_size ;
7822
+ LOG (INFO) << " index:" << cache->_index_queue .cache_size ;
7823
+ LOG (INFO) << " normal:" << cache->_normal_queue .cache_size ;
7824
+ LOG (INFO) << " disp:" << cache->_disposable_queue .cache_size ;
7825
+ EXPECT_EQ (cache->_ttl_queue .cache_size , 1048576 );
7826
+ EXPECT_EQ (cache->_index_queue .cache_size , 0 );
7827
+
7828
+ EXPECT_TRUE (reader.close ().ok ());
7829
+ EXPECT_TRUE (reader.closed ());
7830
+ std::this_thread::sleep_for (std::chrono::seconds (1 ));
7831
+ if (fs::exists (cache_base_path)) {
7832
+ fs::remove_all (cache_base_path);
7833
+ }
7834
+ FileCacheFactory::instance ()->_caches .clear ();
7835
+ FileCacheFactory::instance ()->_path_to_cache .clear ();
7836
+ FileCacheFactory::instance ()->_capacity = 0 ;
7837
+ }
7838
+
7839
+ TEST_F (BlockFileCacheTest, cached_remote_file_reader_normal_index) {
7840
+ if (fs::exists (cache_base_path)) {
7841
+ fs::remove_all (cache_base_path);
7842
+ }
7843
+ fs::create_directories (cache_base_path);
7844
+ TUniqueId query_id;
7845
+ query_id.hi = 1 ;
7846
+ query_id.lo = 1 ;
7847
+ io::FileCacheSettings settings;
7848
+ settings.query_queue_size = 6291456 ;
7849
+ settings.query_queue_elements = 6 ;
7850
+ settings.index_queue_size = 1048576 ;
7851
+ settings.index_queue_elements = 1 ;
7852
+ settings.disposable_queue_size = 1048576 ;
7853
+ settings.disposable_queue_elements = 1 ;
7854
+ settings.capacity = 8388608 ;
7855
+ settings.max_file_block_size = 1048576 ;
7856
+ settings.max_query_cache_size = 0 ;
7857
+ io::CacheContext context;
7858
+ ReadStatistics rstats;
7859
+ context.stats = &rstats;
7860
+ context.query_id = query_id;
7861
+ ASSERT_TRUE (FileCacheFactory::instance ()->create_file_cache (cache_base_path, settings).ok ());
7862
+ BlockFileCache* cache = FileCacheFactory::instance ()->get_by_path (cache_base_path);
7863
+
7864
+ for (int i = 0 ; i < 100 ; i++) {
7865
+ if (cache->get_async_open_success ()) {
7866
+ break ;
7867
+ };
7868
+ std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
7869
+ }
7870
+
7871
+ FileReaderSPtr local_reader;
7872
+ ASSERT_TRUE (global_local_filesystem ()->open_file (tmp_file, &local_reader));
7873
+ io::FileReaderOptions opts;
7874
+ opts.cache_type = io::cache_type_from_string (" file_block_cache" );
7875
+ opts.is_doris_table = true ;
7876
+ CachedRemoteFileReader reader (local_reader, opts);
7877
+ auto key = io::BlockFileCache::hash (" tmp_file" );
7878
+ EXPECT_EQ (reader._cache_hash , key);
7879
+ EXPECT_EQ (local_reader->path ().native (), reader.path ().native ());
7880
+ EXPECT_EQ (local_reader->size (), reader.size ());
7881
+ EXPECT_FALSE (reader.closed ());
7882
+ EXPECT_EQ (local_reader->path ().native (), reader.get_remote_reader ()->path ().native ());
7883
+
7884
+ {
7885
+ std::string buffer;
7886
+ buffer.resize (64_kb);
7887
+ IOContext io_ctx;
7888
+ FileCacheStatistics stats;
7889
+ io_ctx.file_cache_stats = &stats;
7890
+ io_ctx.is_index_data = true ;
7891
+ // int64_t cur_time = UnixSeconds();
7892
+ // io_ctx.expiration_time = cur_time + 120;
7893
+ size_t bytes_read {0 };
7894
+ EXPECT_TRUE (
7895
+ reader.read_at (0 , Slice (buffer.data (), buffer.size ()), &bytes_read, &io_ctx).ok ());
7896
+ }
7897
+ std::this_thread::sleep_for (std::chrono::seconds (3 ));
7898
+ LOG (INFO) << " ttl:" << cache->_ttl_queue .cache_size ;
7899
+ LOG (INFO) << " index:" << cache->_index_queue .cache_size ;
7900
+ LOG (INFO) << " normal:" << cache->_normal_queue .cache_size ;
7901
+ LOG (INFO) << " disp:" << cache->_disposable_queue .cache_size ;
7902
+ EXPECT_EQ (cache->_ttl_queue .cache_size , 0 );
7903
+ EXPECT_EQ (cache->_index_queue .cache_size , 1048576 );
7904
+
7905
+ EXPECT_TRUE (reader.close ().ok ());
7906
+ EXPECT_TRUE (reader.closed ());
7907
+ std::this_thread::sleep_for (std::chrono::seconds (1 ));
7908
+ if (fs::exists (cache_base_path)) {
7909
+ fs::remove_all (cache_base_path);
7910
+ }
7911
+ FileCacheFactory::instance ()->_caches .clear ();
7912
+ FileCacheFactory::instance ()->_path_to_cache .clear ();
7913
+ FileCacheFactory::instance ()->_capacity = 0 ;
7914
+ }
7915
+
7763
7916
} // namespace doris::io
0 commit comments