일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 미국주식
- 해외주식
- matter
- 파이썬
- 배당
- cluster
- 애플
- 오블완
- 국내주식
- 공모주
- 현대통신
- 힐스테이트 광교산
- RS-485
- 월패드
- Python
- 홈네트워크
- Bestin
- Home Assistant
- homebridge
- ConnectedHomeIP
- MQTT
- Espressif
- Apple
- 티스토리챌린지
- 매터
- 나스닥
- 코스피
- raspberry pi
- esp32
- SK텔레콤
- Today
- Total
YOGYUI
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (Final) 본문
Get Corporations List Classified by Sectors from DART(fss)
[시리즈]
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (1)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (2)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (3)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (Final)
11. 기업 객체 정보 가져오도록 스크립트 수정
테이블에서 기업 이름 및 기업 고유 코드 (8자리 문자열), 섹터 id를 함께 가져오도록 javascript를 수정하도록 하자
[run_code.js]
/*
* 필요한 함수 선언
*/
// jsTree 노드의 부모 노드들의 id와 text 어레이로 반환
function fnGetParents(treeObj, node) {
parents = [];
for (i = 0; i < node.parents.length - 1; i++) {
parent_node_id = node.parents[i];
parent_node = treeObj.get_node(parent_node_id);
parents.push({id: parent_node_id, text: parent_node.text});
}
parents.reverse();
return parents;
}
// jsTree 최하위노드 재귀검색
function fnTreeGetLeafNodes(treeObj, node, arrCollector) {
var child_id_arr = node.children; // 노드의 자식 노드들의 id 문자열이 담긴 어레이를 가져온다
var arrCollector = (arrCollector) ? arrCollector : [];
for (var i = 0; i < child_id_arr.length; ++i) {
var node_id = child_id_arr[i];
var nodeChild = treeObj.get_node(node_id); // 트리의 노드 객체를 가져온다
if (tree.is_leaf(node_id)) {
// 잎노드일 경우 (최하위 노드)
arrCollector.push({
node_id: node_id,
node_text: nodeChild.text,
parents: fnGetParents(treeObj, nodeChild),
corp_info_arr: []
});
} else {
// 최하위 노드가 아니면 재귀호출
arrCollector = fnTreeGetLeafNodes(treeObj, nodeChild, arrCollector);
}
}
return arrCollector;
}
// 비동기 delay
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
// search 함수 호출 후 <table>에서 기업 정보 가져오기
async function fnGetCorpNamesFromTableSearch(index, corp_info_arr, node_id, delay_ms_table) {
if (index > 1) {
search(index);
await sleep(delay_ms_table);
}
var corp_info_arr = (corp_info_arr) ? corp_info_arr : [];
var table = document.getElementsByTagName("table")[0];
var tbody = table.getElementsByTagName("tbody")[0];
var tr_list = tbody.getElementsByTagName("tr");
for (i = 0; i < tr_list.length; i++) {
tr = tr_list[i];
td = tr.getElementsByTagName("td")[0];
span = td.getElementsByTagName("span")[0];
a = span.getElementsByTagName("a")[0];
// 기업 이름 (\r, \n, \t 문자 정규식 통해 치환)
corp_name = a.text;
corp_name = corp_name.replace(/(\r\n|\n|\r|\t)/gm,"");
// 기업 고유 코드 (8자리)는 href 속성 문자열 파싱
href = a.getAttribute('href');
index1 = href.indexOf("'");
index2 = href.slice(index1 + 1, href.length + 1).indexOf("'") + index1;
unique_code = href.slice(index1 + 1, index2 + 1);
info = {
name: corp_name,
code: unique_code,
sector: node_id
};
corp_info_arr.push(info);
}
return corp_info_arr;
}
// 페이지 여러개에 대한 비동기 순차 테이블 조회
async function fnGetCorpNamesFromTableAll(corp_info_arr, node_id, delay_ms_table) {
var pageInfo = document.getElementsByClassName("pageInfo");
var pageSkip = document.getElementsByClassName("pageSkip")[0];
try {
var li_tags = pageSkip.getElementsByTagName("li");
var arrIndex = [];
for (i = 0; i < li_tags.length; i++) {
arrIndex.push(i + 1);
}
await arrIndex.reduce((prevTask, currTask) => {
return prevTask.then(() => fnGetCorpNamesFromTableSearch(
currTask, corp_info_arr, node_id, delay_ms_table));
}, Promise.resolve());
} catch (error) {
// 해당 종목에 회사가 아예 없으면 pageSkip 태그가 없다! (TypeError 발생)
;
}
}
// 트리 노드 선택 후 테이블에서 기업 정보 가져오기
async function fnSelectNodeAndGetCorpNamesFromTableAll(leaf_node_dict, treeObj, delay_ms_node, delay_ms_table) {
// 트리 노드 선택 - 딜레이
var node_id = leaf_node_dict.node_id;
treeObj.select_node(node_id);
await sleep(delay_ms_node);
// 테이블에서 모든 기업 정보 가져오기
var corp_info_arr = []
await fnGetCorpNamesFromTableAll(corp_info_arr, node_id, delay_ms_table);
// 기업 정보를 가져온 뒤 결과를 딕셔너리 내의 어레이에 concat
if (corp_info_arr.length > 0) {
leaf_node_dict.corp_info_arr = leaf_node_dict.corp_info_arr.concat(corp_info_arr);
console.log(leaf_node_dict.node_text, ": ", leaf_node_dict.corp_info_arr.length, "companies")
} else {
console.log(leaf_node_dict.node_text, ": Empty!", )
}
// 트리 노드 선택 해제
treeObj.deselect_node(node_id);
}
// 호출 함수
async function fnProcess(leaf_node_arr, treeObj, delay_ms_node, delay_ms_table) {
await leaf_node_arr.reduce((prevTask, currTask) => {
return prevTask.then(() => fnSelectNodeAndGetCorpNamesFromTableAll(
currTask, tree, delay_ms_node, delay_ms_table));
}, Promise.resolve());
console.log('Finished');
}
console.log('Start');
fn_toggleTab("business"); // 크롤링 시각화 위해 탭 변경 (선택사항)
/*
* Step.1 트리의 모든 최하위 노드 가져오기
*/
var tree = $j("#businessTree").jstree(true);
top_node = tree.get_node('all');
arr_leaf_nodes = fnTreeGetLeafNodes(tree, top_node);
/*
* Step.2 모든 최하위 노드들을 순차 조회하며 테이블에서 기업 정보 가져오기
*/
fnProcess(arr_leaf_nodes, tree, 1000, 1000);
이에 맞춰서 트리 클래스도 바꿔주자 (Company 클래스 추가)
[tree.py]
from typing import List, Union
class Company:
_name: str = ''
_unique_id: str = ''
_sector: str = ''
def __init__(self, name: str = '', unique_id: str = '', sector: str = ''):
self._name = name
self._unique_id = unique_id
self._sector = sector
def __ge__(self, other):
return self._name > other.name
def __repr__(self):
return f'{self._name} ({self._unique_id}, {self._sector})'
@property
def name(self) -> str:
return self._name
@property
def unique_id(self) -> str:
return self._unique_id
@property
def sector(self) -> str:
return self._sector
class TreeNodeBase:
_corp_info_lst: List[Company]
_ident: str = ''
_text: str = ''
def __init__(self, ident: str, text: str, corp_info_lst: List[Company] = None):
self._ident = ident
self._text = text
self._corp_info_lst = list()
if corp_info_lst is not None:
self._corp_info_lst.extend(corp_info_lst)
self._corp_info_lst.sort(key=lambda x: x.name)
def getIdent(self) -> str:
return self._ident
def getText(self) -> str:
return self._text
def getCorpInfoList(self) -> List[Company]:
result = list()
for child in self.getChildren():
corp_info_lst = child.getCorpInfoList()
result.extend(corp_info_lst)
result.extend(self._corp_info_lst)
return result
def setParentNode(self, node):
pass
def addChildNode(self, node):
pass
def hasChildNode(self, node) -> bool:
return False
def getChildren(self) -> list:
return []
def __repr__(self):
return f'{self._text}({self._ident})'
class TreeNode(TreeNodeBase):
_parent: Union[TreeNodeBase, None]
_children: List[TreeNodeBase]
def __init__(self, ident: str, text: str, corp_info_lst: List[Company] = None):
super().__init__(ident, text, corp_info_lst)
self._parent = None
self._children = list()
def setParentNode(self, node: TreeNodeBase):
self._parent = node
if not self._parent.hasChildNode(self):
self._parent.addChildNode(self)
def addChildNode(self, node: TreeNodeBase):
self._children.append(node)
def hasChildNode(self, node: TreeNodeBase) -> bool:
return node in self._children
def getChildren(self) -> List[TreeNodeBase]:
return self._children
def getChildNode(self, ident: int) -> Union[TreeNodeBase, None]:
filterted = list(filter(lambda x: x.getIdent() == ident, self._children))
if len(filterted) == 1:
return filterted[0]
else:
return None
def __eq__(self, other: TreeNodeBase):
return self._ident == other.getIdent()
class Tree:
_node_list: List[TreeNode]
def __init__(self):
self._node_list = list() # 여러개의 최상위 노드를 담을 수 있게 일단은 구성
def add_leaf_node(self, ident: str, text: str, parents: List[dict], corp_info_lst: List[Company] = None):
iter_node: Union[TreeNode, None] = None
for i, elem in enumerate(parents):
temp_ident = elem.get('id')
temp_text = elem.get('text')
temp_node = TreeNode(temp_ident, temp_text)
if iter_node is None: # 최초 iteration
if temp_node in self._node_list: # 최상위 노드가 트리의 노드 리스트에 있을 경우
temp_node = list(filter(lambda x: x.getIdent() == temp_ident, self._node_list))[0]
else: # 최상위 노드가 트리의 노드 리스트에 없을 경우 (추가)
self._node_list.append(temp_node)
else: # 자식 노드 iteration
if iter_node.hasChildNode(temp_node): # 이전 노드의 자식으로 이미 포함되어 있을 경우
temp_node = iter_node.getChildNode(temp_ident)
else: # 이전 노드의 자식에 없을 경우 (추가)
iter_node.addChildNode(temp_node)
iter_node = temp_node
node = TreeNode(ident, text, corp_info_lst)
iter_node.addChildNode(node)
def add_leaf_node_dict(self, leaf_node_dict: dict):
ident = leaf_node_dict.get('node_id')
text = leaf_node_dict.get('node_text')
parents = leaf_node_dict.get('parents')
corp_info_arr = leaf_node_dict.get('corp_info_arr')
corp_info_lst = []
for elem in corp_info_arr:
info = Company(elem.get('name'), elem.get('code'), elem.get('sector'))
corp_info_lst.append(info)
self.add_leaf_node(ident, text, parents, corp_info_lst)
def add_leaf_nodes(self, leaf_node_dict_list: List[dict]):
[self.add_leaf_node_dict(x) for x in leaf_node_dict_list]
def printHierarchy(self):
txt = ''
def iter_print(node_: TreeNodeBase, level: int = 0):
txt_node = '-' * (level * 4) + node_.getText() + f'({node_.getIdent()})\n'
if len(node_.getChildren()) > 0:
for child in node_.getChildren():
txt_node += iter_print(child, level + 1)
return txt_node
for node in self._node_list:
txt += iter_print(node)
print(txt)
def getNodeList(self) -> List[TreeNode]:
return self._node_list
새로 크롤링을 해준뒤 직렬화 (용량이 꽤 커졌다 ㅎㅎ)
제대로 가져왔는지 테스트해보자
import pickle
with open('./result_list.pkl', 'rb') as fp:
node_list = pickle.load(fp)
tree = Tree()
tree.add_leaf_nodes(node_list)
In [1]: node_list[0].get('corp_info_arr')[0]
Out[1]: {'code': '00530255', 'name': '농업회사법인동주물산', 'sector': '01110'}
In [2]: node_list[-1].get('corp_info_arr')[-1]
Out[2]: {'code': '00339522', 'name': '太平洋시멘트', 'sector': '99'}
In [3]: tree.getNodeList()[0].getChildren()[0].getCorpInfoList()[0]
Out[3]: 농업회사법인동주물산 (00530255, 01110)
In [4]: tree.getNodeList()[0].getChildren()[0].getCorpInfoList()[1]
Out[4]: 농업회사법인보령스마트에너지 (01506477, 01110)
굳! 이제 각 잎노드(leaf node)가 회사명 뿐만 아니라 DART에서 사용되는 기업 고유 코드, 최하위 노드에 해당하는 섹터 id까지 정보를 함께 갖고 있다
변경사항에 맞춰서 treeview 테스트 코드도 수정해주자
[treeview.py]
from typing import List
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QResizeEvent
from PyQt5.QtWidgets import QWidget, QTreeWidget, QTreeWidgetItem, QTableWidget, QTableWidgetItem
from PyQt5.QtWidgets import QSplitter, QHeaderView, QAbstractItemView
from tree import Tree, TreeNodeBase, Company
class CorpTreeItem(QTreeWidgetItem):
def __init__(self, node: TreeNodeBase):
super().__init__()
self._node = node
self._sector = node.getIdent()
text = node.getText() + ' (' + node.getIdent() + ')'
self.setText(0, text)
def getCorpInfoList(self) -> List[Company]:
return self._node.getCorpInfoList()
def getSector(self) -> str:
return self._sector
class CorpTreeViewWidget(QWidget):
def __init__(self):
super().__init__()
self._tree = QTreeWidget()
self._treeitems: List[CorpTreeItem] = list()
self._table = QTableWidget()
self._splitter = QSplitter(Qt.Horizontal, self)
self.initControl()
self.initLayout()
def initControl(self):
self._tree.itemSelectionChanged.connect(self.onItemSelectionChanged)
self._tree.setHeaderHidden(True)
self._table.setColumnCount(3)
self._table.setHorizontalHeaderLabels(['이름', '고유번호', '업종코드'])
self._table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch)
self._table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents)
self._table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents)
self._table.setSelectionBehavior(QAbstractItemView.SelectRows)
self._table.setSelectionMode(QAbstractItemView.SingleSelection)
self._table.itemDoubleClicked.connect(self.onTableItemDoubleClicked)
def initLayout(self):
self._splitter.addWidget(self._tree)
self._splitter.addWidget(self._table)
def setCorpTree(self, tree_: Tree):
self._tree.clear()
tree_node_list = tree_.getNodeList()
for top_node in tree_node_list:
def addTreeItem(tree_node: TreeNodeBase, tree_item: CorpTreeItem = None):
if tree_item is None:
tree_item = CorpTreeItem(tree_node)
self._tree.addTopLevelItem(tree_item)
self._treeitems.append(tree_item)
else:
tree_child_item = CorpTreeItem(tree_node)
tree_item.addChild(tree_child_item)
self._treeitems.append(tree_child_item)
tree_item = tree_child_item
for child_node in tree_node.getChildren():
addTreeItem(child_node, tree_item)
addTreeItem(top_node)
def resizeEvent(self, a0: QResizeEvent) -> None:
self._splitter.resize(self.width(), self.height())
def onItemSelectionChanged(self):
selitems = self._tree.selectedItems()
corp_info_lst = []
if len(selitems) > 0:
for elem in selitems:
if isinstance(elem, CorpTreeItem):
corp_info_lst.extend(elem.getCorpInfoList())
self._table.setRowCount(len(corp_info_lst))
for i in range(self._table.rowCount()):
item = QTableWidgetItem()
item.setText(corp_info_lst[i].name)
item.setFlags(Qt.ItemFlags(int(item.flags()) ^ Qt.ItemIsEditable))
item.setTextAlignment(Qt.AlignLeft | Qt.AlignVCenter)
self._table.setItem(i, 0, item)
item = QTableWidgetItem()
item.setText(corp_info_lst[i].unique_id)
item.setFlags(Qt.ItemFlags(int(item.flags()) ^ Qt.ItemIsEditable))
item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
self._table.setItem(i, 1, item)
item = QTableWidgetItem()
item.setText(corp_info_lst[i].sector)
item.setFlags(Qt.ItemFlags(int(item.flags()) ^ Qt.ItemIsEditable))
item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
self._table.setItem(i, 2, item)
def onTableItemDoubleClicked(self):
item = self._table.selectedItems()
rows = list(set([x.row() for x in item]))
if len(rows) == 1:
row = rows[0]
sector = self._table.item(row, 2).text()
tree_item = list(filter(lambda x: x.getSector() == sector, self._treeitems))
if len(tree_item) == 1:
self._tree.setCurrentItem(tree_item[0])
if __name__ == "__main__":
import sys
import pickle
from PyQt5.QtWidgets import QApplication
with open('./result_list.pkl', 'rb') as fp:
node_list = pickle.load(fp)
app = QApplication(sys.argv)
wgt_ = CorpTreeViewWidget()
wgt_.resize(600, 600)
wgt_.show()
corp_tree = Tree()
corp_tree.add_leaf_nodes(node_list)
wgt_.setCorpTree(corp_tree)
app.exec_()
굳!
의도했던 대로 잘 가져온 것 같다 ㅎㅎ
12. GitHub 등록
아... 개발하고 글쓰고하느라 3일이나 지나버렸다... 하루만에 끝내려고 했는데;
글은 그냥 이쯤에서 마무리하고 이때까지 한 작업 내용을 깃허브에 올렸다
https://github.com/YOGYUI/DARTSectorWebCrawler
다른 어플리케이션에 이 프로젝트를 활용한 결과는 다른 글에서 다뤄보도록 하자 ㅎㅎ
아쉬운점
- 노드 클릭, 테이블 페이지 탐색 등 페이지 렌더링을 기다려야 하므로 너무 느리다
- 전체 크롤링 과정이 종료되었는지에 대한 확인 절차가 없어, 사용자가 임의로 결과 가져오기를 수행해야 한다
- 전체 결과를 로컬에 남기기만 하고, DB 연동은 너무너무 귀찮아서 다음 과제로 미뤘다
뭔가 남은게 있냐하면 흠...
jstree를 갖다쓰는 웹페이지를 크롤링하는 방법에 대해 공유하게 되었다는 점? ㅎㅎ
업종별 기업 정보를 DB에 연동한 뒤, 야금야금 개발하고 있는 전자공시조회 OPENDART 어플리케이션에 접목시키는게 최종 목표!
끝~!
[시리즈]
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (1)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (2)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (3)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (Final)
'Data Analysis > Data Engineering' 카테고리의 다른 글
웹크롤링 - 한국환경공단(에어코리아) 측정소 정보 가져오기 (0) | 2022.01.13 |
---|---|
공공데이터포털::대기오염정보 조회 (REST API) (0) | 2022.01.12 |
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (3) (0) | 2022.01.08 |
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (2) (0) | 2022.01.07 |
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (1) (0) | 2022.01.06 |