반응형
0. 전체 스크립트
앞선 분석과 준비 과정을 통해 코드를 작성할 준비를 모두 마쳤습니다. 전체 코드를 본 다음에 기능별로 설명을 이어나가겠습니다.
import time, os
import pandas as pd
from glob import glob
from datetime import date
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--window-size=1920,1080')
driver = webdriver.Chrome(options=chrome_options)
params = {'behavior': 'allow', 'downloadPath':'파일 다운로드 경로'}
driver.execute_cdp_cmd('Page.setDownloadBehavior', params)
driver.get("https://ctas.krcert.or.kr/index")
driver.find_element(By.NAME, 'userId').send_keys('c-tas ID')
driver.find_element(By.NAME, 'userPassword').send_keys('c-tas PW')
driver.find_element(By.CLASS_NAME, 'btn_login').click()
driver.get("https://ctas.krcert.or.kr/threatnew/worker/sharesCombine")
driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div[2]/div[2]/fieldset/div/form/div/select/option[2]").click()
driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div[2]/div[2]/fieldset/div/form/div/button").click()
driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div[2]/div[2]/div[2]/ul/li[1]/span[3]/a[1]").click()
time.sleep(2)
driver.quit()
csv_file = glob('파일 다운로드 경로\\*.csv')
csv_file = str(csv_file).replace("\\", "").replace("파일 다운로드 경로", "").replace("'", "").replace("[", "").replace("]", "")
df = pd.read_csv("파일 다운로드 경로\\"+csv_file)
black_ip_list = str(df['threat_info'].values).replace("[", "").replace("]", "").replace("\n", "").replace("' '", ",").replace("'", "")
os.remove("파일 다운로드 경로\\"+csv_file)
result_list = black_ip_list.split(",")
result_list = [f' - "{ip}' for ip in result_list]
result_list = [ip + '/32"' for ip in result_list]
def send_message(result_list):
channel_id = "채널ID"
channel_name = "#채널이름"
bot_token = "봇 토큰"
client = WebClient(token=bot_token)
today = str(date.today()).replace("-", "")
client.chat_postMessage(channel=channel_name, text=today+" 유해 IP목록(cc.<@유저ID>)")
response = client.conversations_history(channel=channel_id)
latest_timestamp = response["messages"][0]["ts"]
message = "\n".join(result_list)
client.chat_postMessage(channel=channel_name, text=message, thread_ts=latest_timestamp)
send_message(result_list)
time.sleep(1)
1. Selenium 설정
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--window-size=1920,1080')
driver = webdriver.Chrome(options=chrome_options)
params = {'behavior': 'allow', 'downloadPath':'파일 다운로드 경로'}
driver.execute_cdp_cmd('Page.setDownloadBehavior', params)
headless 설정과 headless로 작동 시 파일을 다운로드 하기 위한 옵션을 설정합니다.
2. C-TAS 홈페이지 로그인
chrome_options = Options()
chrome_options.add_experimental_option("detach", True)
driver = webdriver.Chrome(options=chrome_options)
driver.get("https://ctas.krcert.or.kr/index")
driver.find_element(By.NAME, 'userId').send_keys('c-tas_ID')
driver.find_element(By.NAME, 'userPassword').send_keys('c-tas_PASS')
driver.find_element(By.CLASS_NAME, 'btn_login').click()
Selenium을 활용해 C-TAS 홈페이지에 접속 후 계정 정보를 입력하고 로그인합니다.
3. 위협IP 정보 파일 다운로드
driver.get("https://ctas.krcert.or.kr/index")
driver.find_element(By.NAME, 'userId').send_keys('c-tas ID')
driver.find_element(By.NAME, 'userPassword').send_keys('c-tas PW')
driver.find_element(By.CLASS_NAME, 'btn_login').click()
driver.get("https://ctas.krcert.or.kr/threatnew/worker/sharesCombine")
driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div[2]/div[2]/fieldset/div/form/div/select/option[2]").click()
driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div[2]/div[2]/fieldset/div/form/div/button").click()
driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div[2]/div[2]/div[2]/ul/li[1]/span[3]/a[1]").click()
time.sleep(2)
driver.quit()
위협IP 정보를 담고있는 CSV 파일을 다운로드하기 위한 XPath 까지 접근하여 파일을 다운로드하고 Selenium을 종료합니다. 파일이 다운로드되는 도중 Selenium이 닫히는 것을 방지하기 위해 sleep을 설정했습니다.
4. 위협IP 추출 및 형식 다듬기
csv_file = glob('파일 다운로드 경로\\*.csv')
csv_file = str(csv_file).replace("\\", "").replace("파일 다운로드 경로", "").replace("'", "").replace("[", "").replace("]", "")
df = pd.read_csv("파일 다운로드 경로\\"+csv_file)
black_ip_list = str(df['threat_info'].values).replace("[", "").replace("]", "").replace("\n", "").replace("' '", ",").replace("'", "")
os.remove("파일 다운로드 경로\\"+csv_file)
result_list = black_ip_list.split(",")
result_list = [f' - "{ip}' for ip in result_list]
result_list = [ip + '/32"' for ip in result_list]
다운로드 한 CSV 파일의 파일명을 추출하고 → CSV 파일 중 위협IP가 기록된 특정 컬럼의 모든 내용을 파싱한 후 → CSV 파일을 삭제하고 → 위협IP의 포맷을 원하는 형태로 수정하는 부분입니다.
5. 슬랙 메세지 전송
def send_message(result_list):
channel_id = "채널ID"
channel_name = "#채널이름"
bot_token = "봇 토큰"
client = WebClient(token=bot_token)
today = str(date.today()).replace("-", "")
client.chat_postMessage(channel=channel_name, text=today+" 유해 IP목록(cc.<@유저ID>)")
response = client.conversations_history(channel=channel_id)
latest_timestamp = response["messages"][0]["ts"]
message = "\n".join(result_list)
client.chat_postMessage(channel=channel_name, text=message, thread_ts=latest_timestamp)
send_message(result_list)
time.sleep(1)
특정 채널에 "유해 IP 목록"이라는 메세지를 발송하고 해당 메세지의 댓글에 위협IP를 기록하는 부분입니다. IP의 수가 많을 경우 채널의 가독성을 저해할 우려가 있어 댓글에 기록하였습니다. 또한 메세지가 누락되지 않도록 관련 엔지니어를 호출합니다. 아래와 같이 예쁘게 내용을 확인할 수 있습니다.
반응형
'etc' 카테고리의 다른 글
Certified Kubernetes Application Developer(CKAD) 자격증 취득 후기(23.09. 시험) (0) | 2023.09.03 |
---|---|
Keycloak 구축 및 AWS SAML 연동 (0) | 2023.06.18 |
C-TAS에서 위협 IP를 자동으로 수집해 슬랙으로 전송하기 - (3) 크롤링 분석 (0) | 2023.04.22 |
C-TAS에서 위협 IP를 자동으로 수집해 슬랙으로 전송하기 - (2) Slack bot 생성 및 권한 부여 (0) | 2023.04.21 |
C-TAS에서 위협 IP를 자동으로 수집해 슬랙으로 전송하기 - (1) Selenium 설치 (0) | 2023.04.21 |