1
+ import asyncio
2
+ import oracledb
3
+ import os
4
+ import sys
5
+ from dotenv import load_dotenv
6
+ from datetime import datetime , timedelta
7
+ # 현재 스크립트의 상위 디렉터리를 모듈 경로에 추가(package 폴더에 있으므로)
8
+ sys .path .append (os .path .abspath (os .path .join (os .path .dirname (__file__ ), '..' )))
9
+ from models .FirmInfo import FirmInfo # FirmInfo 클래스 가져오기
10
+
11
+ # 환경 변수 로드
12
+ load_dotenv ()
13
+
14
+ # Oracle 설정
15
+ WALLET_LOCATION = os .getenv ('WALLET_LOCATION' )
16
+ WALLET_PASSWORD = os .getenv ('WALLET_PASSWORD' )
17
+ DB_USER = os .getenv ('DB_USER' )
18
+ DB_PASSWORD = os .getenv ('DB_PASSWORD' )
19
+ DB_DSN = os .getenv ('DB_DSN' )
20
+
21
+ class OracleManagerSQL :
22
+ def __init__ (self ):
23
+ """Oracle 데이터베이스 연결 초기화"""
24
+ self .conn = None
25
+ self .cursor = None
26
+
27
+ def open_connection (self ):
28
+ """데이터베이스 연결 열기"""
29
+ if self .conn is None :
30
+ print ("Opening Oracle connection..." )
31
+ self .conn = oracledb .connect (
32
+ user = DB_USER ,
33
+ password = DB_PASSWORD ,
34
+ dsn = DB_DSN ,
35
+ config_dir = WALLET_LOCATION ,
36
+ wallet_location = WALLET_LOCATION ,
37
+ wallet_password = WALLET_PASSWORD
38
+ )
39
+ self .cursor = self .conn .cursor ()
40
+ print ("Oracle connection opened successfully." )
41
+ elif self ._is_connection_closed ():
42
+ print ("Reopening closed Oracle connection..." )
43
+ self .close_connection ()
44
+ self .open_connection ()
45
+
46
+ def _is_connection_closed (self ):
47
+ """연결이 닫혔는지 확인하는 헬퍼 메서드"""
48
+ try :
49
+ # 간단한 쿼리 실행으로 연결 상태 확인
50
+ self .cursor .execute ("SELECT 1 FROM dual" )
51
+ return False
52
+ except (oracledb .InterfaceError , oracledb .DatabaseError ):
53
+ return True
54
+
55
+ def close_connection (self ):
56
+ """데이터베이스 연결 종료"""
57
+ if self .cursor :
58
+ self .cursor .close ()
59
+ self .cursor = None
60
+ if self .conn :
61
+ self .conn .close ()
62
+ self .conn = None
63
+ print ("Oracle connection closed." )
64
+
65
+ def create_table (self , table_name , columns ):
66
+ """테이블 생성 (Oracle 방식)"""
67
+ self .open_connection ()
68
+ columns_str = ", " .join (f"{ col } { dtype } " for col , dtype in columns .items ())
69
+ query = f"CREATE TABLE { table_name } ({ columns_str } )"
70
+ try :
71
+ self .cursor .execute (query )
72
+ self .conn .commit ()
73
+ result = {"status" : "success" , "query" : query }
74
+ except oracledb .DatabaseError as e :
75
+ result = {"status" : "error" , "error" : str (e ), "query" : query }
76
+ self .close_connection ()
77
+ return result
78
+
79
+ def insert_data (self , table_name , data ):
80
+ """단일 데이터 삽입"""
81
+ self .open_connection ()
82
+ placeholders = ', ' .join (f':{ i + 1 } ' for i in range (len (data )))
83
+ query = f"INSERT INTO { table_name } VALUES ({ placeholders } )"
84
+ try :
85
+ self .cursor .execute (query , data )
86
+ self .conn .commit ()
87
+ result = {"status" : "success" , "query" : query , "data" : data }
88
+ except oracledb .DatabaseError as e :
89
+ result = {"status" : "error" , "error" : str (e ), "query" : query }
90
+ self .close_connection ()
91
+ return result
92
+
93
+ def fetch_all (self , table_name ):
94
+ """모든 데이터 조회"""
95
+ self .open_connection ()
96
+ query = f"SELECT * FROM { table_name } "
97
+ self .cursor .execute (query )
98
+ rows = self .cursor .fetchall ()
99
+ columns = [desc [0 ].lower () for desc in self .cursor .description ]
100
+ result = [dict (zip (columns , row )) for row in rows ]
101
+ self .close_connection ()
102
+ return result
103
+
104
+ def insert_json_data_list (self , json_data_list , table_name ):
105
+ """JSON 데이터 리스트 삽입 및 업데이트 (MERGE 사용)"""
106
+ self .open_connection ()
107
+ inserted_count = 0
108
+ updated_count = 0
109
+ for entry in json_data_list :
110
+ merge_query = f"""
111
+ MERGE INTO { table_name } t
112
+ USING (SELECT :key AS KEY FROM dual) s
113
+ ON (t.KEY = s.KEY)
114
+ WHEN MATCHED THEN
115
+ UPDATE SET
116
+ REG_DT = :reg_dt,
117
+ WRITER = :writer,
118
+ MKT_TP = :mkt_tp,
119
+ DOWNLOAD_URL = CASE
120
+ WHEN :download_url IS NOT NULL AND :download_url != ''
121
+ THEN :download_url
122
+ ELSE t.DOWNLOAD_URL
123
+ END,
124
+ TELEGRAM_URL = CASE
125
+ WHEN :telegram_url IS NOT NULL AND :telegram_url != ''
126
+ THEN :telegram_url
127
+ ELSE t.TELEGRAM_URL
128
+ END
129
+ WHEN NOT MATCHED THEN
130
+ INSERT (
131
+ SEC_FIRM_ORDER, ARTICLE_BOARD_ORDER, FIRM_NM, REG_DT,
132
+ ATTACH_URL, ARTICLE_TITLE, ARTICLE_URL, MAIN_CH_SEND_YN,
133
+ DOWNLOAD_URL, TELEGRAM_URL, WRITER, MKT_TP, KEY, SAVE_TIME
134
+ )
135
+ VALUES (
136
+ :sec_firm_order, :article_board_order, :firm_nm, :reg_dt,
137
+ :attach_url, :article_title, :article_url, :main_ch_send_yn,
138
+ :download_url, :telegram_url, :writer, :mkt_tp, :key, :save_time
139
+ )
140
+ """
141
+ params = {
142
+ "sec_firm_order" : entry ["SEC_FIRM_ORDER" ],
143
+ "article_board_order" : entry ["ARTICLE_BOARD_ORDER" ],
144
+ "firm_nm" : entry ["FIRM_NM" ],
145
+ "reg_dt" : entry .get ("REG_DT" , "" ),
146
+ "attach_url" : entry .get ("ATTACH_URL" , "" ),
147
+ "article_title" : entry ["ARTICLE_TITLE" ],
148
+ "article_url" : entry .get ("ARTICLE_URL" , None ),
149
+ "main_ch_send_yn" : entry .get ("MAIN_CH_SEND_YN" , "N" ),
150
+ "download_url" : entry .get ("DOWNLOAD_URL" , None ),
151
+ "telegram_url" : entry .get ("TELEGRAM_URL" , None ),
152
+ "writer" : entry .get ("WRITER" , "" ),
153
+ "mkt_tp" : entry .get ("MKT_TP" , "KR" ),
154
+ "key" : entry .get ("KEY" ) or entry .get ("ATTACH_URL" , "" ),
155
+ "save_time" : entry ["SAVE_TIME" ]
156
+ }
157
+ self .cursor .execute (merge_query , params )
158
+ if self .cursor .rowcount > 0 :
159
+ self .cursor .execute (f"SELECT COUNT(*) FROM { table_name } WHERE KEY = :key" , {"key" : params ["key" ]})
160
+ if self .cursor .fetchone ()[0 ] == 1 :
161
+ inserted_count += 1
162
+ else :
163
+ updated_count += 1
164
+ self .conn .commit ()
165
+ self .close_connection ()
166
+ print (f"Oracle: Data inserted successfully: { inserted_count } rows." )
167
+ print (f"Oracle: Data updated successfully: { updated_count } rows." )
168
+ return inserted_count , updated_count
169
+
170
+ async def fetch_daily_articles_by_date (self , firm_info : FirmInfo , date_str = None ):
171
+ """TELEGRAM_URL 갱신이 필요한 레코드 비동기 조회"""
172
+ async with await oracledb .connect_async (
173
+ user = DB_USER , password = DB_PASSWORD , dsn = DB_DSN ,
174
+ config_dir = WALLET_LOCATION , wallet_location = WALLET_LOCATION , wallet_password = WALLET_PASSWORD
175
+ ) as conn :
176
+ async with conn .cursor () as cursor :
177
+ query_date = date_str if date_str else datetime .now ().strftime ('%Y%m%d' )
178
+ firmInfo = firm_info .get_state ()
179
+ query = f"""
180
+ SELECT
181
+ id, SEC_FIRM_ORDER, ARTICLE_BOARD_ORDER, FIRM_NM, REG_DT,
182
+ ATTACH_URL, ARTICLE_TITLE, ARTICLE_URL, MAIN_CH_SEND_YN,
183
+ DOWNLOAD_URL, WRITER, SAVE_TIME, MAIN_CH_SEND_YN, TELEGRAM_URL, KEY
184
+ FROM
185
+ data_main_daily_send
186
+ WHERE
187
+ REG_DT BETWEEN TO_CHAR(TO_DATE('{ query_date } ', 'YYYYMMDD') - 3, 'YYYYMMDD')
188
+ AND TO_CHAR(TO_DATE('{ query_date } ', 'YYYYMMDD') + 2, 'YYYYMMDD')
189
+ AND SEC_FIRM_ORDER = :sec_firm_order
190
+ AND KEY IS NOT NULL
191
+ AND TELEGRAM_URL = ''
192
+ ORDER BY SEC_FIRM_ORDER, ARTICLE_BOARD_ORDER, SAVE_TIME
193
+ """
194
+ await cursor .execute (query , {"sec_firm_order" : firmInfo ["SEC_FIRM_ORDER" ]})
195
+ rows = await cursor .fetchall ()
196
+ columns = [desc [0 ].lower () for desc in cursor .description ]
197
+ return [dict (zip (columns , row )) for row in rows ]
198
+
199
+ async def update_telegram_url (self , record_id , telegram_url , article_title = None ):
200
+ """TELEGRAM_URL 비동기 업데이트"""
201
+ async with await oracledb .connect_async (
202
+ user = DB_USER , password = DB_PASSWORD , dsn = DB_DSN ,
203
+ config_dir = WALLET_LOCATION , wallet_location = WALLET_LOCATION , wallet_password = WALLET_PASSWORD
204
+ ) as conn :
205
+ async with conn .cursor () as cursor :
206
+ if article_title is not None :
207
+ query = """
208
+ UPDATE data_main_daily_send
209
+ SET TELEGRAM_URL = :telegram_url, ARTICLE_TITLE = :article_title
210
+ WHERE id = :record_id
211
+ """
212
+ params = {"telegram_url" : telegram_url , "article_title" : article_title , "record_id" : record_id }
213
+ else :
214
+ query = """
215
+ UPDATE data_main_daily_send
216
+ SET TELEGRAM_URL = :telegram_url
217
+ WHERE id = :record_id
218
+ """
219
+ params = {"telegram_url" : telegram_url , "record_id" : record_id }
220
+ await cursor .execute (query , params )
221
+ await conn .commit ()
222
+ return {"status" : "success" , "query" : query , "record_id" : record_id , "telegram_url" : telegram_url , "article_title" : article_title }
223
+
224
+ async def execute_query (self , query , params = None , close = False ):
225
+ """비동기 쿼리 실행"""
226
+ async with await oracledb .connect_async (
227
+ user = DB_USER , password = DB_PASSWORD , dsn = DB_DSN ,
228
+ config_dir = WALLET_LOCATION , wallet_location = WALLET_LOCATION , wallet_password = WALLET_PASSWORD
229
+ ) as conn :
230
+ async with conn .cursor () as cursor :
231
+ try :
232
+ if params :
233
+ await cursor .execute (query , params )
234
+ else :
235
+ await cursor .execute (query )
236
+ if query .strip ().lower ().startswith ("select" ):
237
+ rows = await cursor .fetchall ()
238
+ columns = [desc [0 ].lower () for desc in cursor .description ]
239
+ result = [dict (zip (columns , row )) for row in rows ]
240
+ else :
241
+ await conn .commit ()
242
+ result = {"status" : "success" , "affected_rows" : cursor .rowcount }
243
+ except oracledb .DatabaseError as e :
244
+ result = {"status" : "error" , "error" : str (e )}
245
+ return result
246
+
247
+ async def daily_select_data (self , date_str = None , type = None ):
248
+ """지정된 날짜 데이터 비동기 조회"""
249
+ async with await oracledb .connect_async (
250
+ user = DB_USER , password = DB_PASSWORD , dsn = DB_DSN ,
251
+ config_dir = WALLET_LOCATION , wallet_location = WALLET_LOCATION , wallet_password = WALLET_PASSWORD
252
+ ) as conn :
253
+ async with conn .cursor () as cursor :
254
+ print (f"date_str: { date_str } , type: { type } " )
255
+ if type not in ['send' , 'download' ]:
256
+ raise ValueError ("Invalid type. Must be 'send' or 'download'." )
257
+
258
+ if date_str is None :
259
+ query_date = datetime .now ().strftime ('%Y-%m-%d' )
260
+ query_reg_dt = (datetime .now () + timedelta (days = 2 )).strftime ('%Y%m%d' )
261
+ else :
262
+ query_date = f"{ date_str [:4 ]} -{ date_str [4 :6 ]} -{ date_str [6 :]} "
263
+ query_reg_dt = (datetime .strptime (date_str , '%Y%m%d' ) + timedelta (days = 2 )).strftime ('%Y%m%d' )
264
+
265
+ three_days_ago = (datetime .now () - timedelta (days = 3 )).strftime ('%Y%m%d' )
266
+ if type == 'send' :
267
+ query_condition = "(MAIN_CH_SEND_YN != 'Y' OR MAIN_CH_SEND_YN IS NULL) AND (SEC_FIRM_ORDER != 19 OR (SEC_FIRM_ORDER = 19 AND TELEGRAM_URL <> ''))"
268
+ elif type == 'download' :
269
+ query_condition = "MAIN_CH_SEND_YN = 'Y' AND DOWNLOAD_STATUS_YN != 'Y'"
270
+
271
+ query = f"""
272
+ SELECT
273
+ id, SEC_FIRM_ORDER, ARTICLE_BOARD_ORDER, FIRM_NM, REG_DT,
274
+ ATTACH_URL, ARTICLE_TITLE, ARTICLE_URL, MAIN_CH_SEND_YN,
275
+ DOWNLOAD_URL, WRITER, SAVE_TIME, TELEGRAM_URL, MAIN_CH_SEND_YN
276
+ FROM
277
+ data_main_daily_send
278
+ WHERE
279
+ TO_CHAR(SAVE_TIME, 'YYYY-MM-DD') = '{ query_date } '
280
+ AND REG_DT >= '{ three_days_ago } '
281
+ AND REG_DT <= '{ query_reg_dt } '
282
+ AND { query_condition }
283
+ ORDER BY SEC_FIRM_ORDER, ARTICLE_BOARD_ORDER, SAVE_TIME
284
+ """
285
+ print ('=' * 30 )
286
+ print (query )
287
+ print ('=' * 30 )
288
+ await cursor .execute (query )
289
+ rows = await cursor .fetchall ()
290
+ columns = [desc [0 ].lower () for desc in cursor .description ]
291
+ return [dict (zip (columns , row )) for row in rows ]
292
+
293
+ async def daily_update_data (self , date_str = None , fetched_rows = None , type = None ):
294
+ """데이터 비동기 업데이트"""
295
+ async with await oracledb .connect_async (
296
+ user = DB_USER , password = DB_PASSWORD , dsn = DB_DSN ,
297
+ config_dir = WALLET_LOCATION , wallet_location = WALLET_LOCATION , wallet_password = WALLET_PASSWORD
298
+ ) as conn :
299
+ async with conn .cursor () as cursor :
300
+ if date_str is None :
301
+ query_date = datetime .now ().strftime ('%Y-%m-%d' )
302
+ else :
303
+ query_date = f"{ date_str [:4 ]} -{ date_str [4 :6 ]} -{ date_str [6 :]} "
304
+
305
+ if type not in ['send' , 'download' ]:
306
+ raise ValueError ("Invalid type. Must be 'send' or 'download'." )
307
+
308
+ if type == 'send' :
309
+ update_query = """
310
+ UPDATE data_main_daily_send
311
+ SET MAIN_CH_SEND_YN = 'Y'
312
+ WHERE id = :id
313
+ """
314
+ for row in fetched_rows :
315
+ print (f"Row data: { row } " )
316
+ print (f"Executing query: { update_query } " )
317
+ print (f"With parameters: {{'id': { row ['id' ]} }}" )
318
+ await cursor .execute (update_query , {"id" : row ["id" ]})
319
+ elif type == 'download' :
320
+ update_query = """
321
+ UPDATE data_main_daily_send
322
+ SET DOWNLOAD_STATUS_YN = 'Y'
323
+ WHERE id = :id
324
+ """
325
+ print (f"Single row for download: { fetched_rows } " )
326
+ print (f"Executing query: { update_query } " )
327
+ print (f"With parameters: {{'id': { fetched_rows ['id' ]} }}" )
328
+ await cursor .execute (update_query , {"id" : fetched_rows ["id" ]})
329
+
330
+ await conn .commit ()
331
+ return {"status" : "success" , "affected_rows" : cursor .rowcount }
332
+
333
+ # 예시 사용법
334
+ async def main ():
335
+ db = OracleManagerSQL ()
336
+ db .open_connection ()
337
+ rows = db .fetch_all ('data_main_daily_send' )
338
+ print (rows )
339
+ db .close_connection ()
340
+
341
+ if __name__ == "__main__" :
342
+ asyncio .run (main ())
0 commit comments