YOGYUI

웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (Final) 본문

Data Analysis/Data Engineering

웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (Final)

요겨 2022. 1. 8. 21:15
반응형

Get Corporations List Classified by Sectors from DART(fss)

[시리즈]

웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (1)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (2)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (3)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (Final)

11. 기업 객체 정보 가져오도록 스크립트 수정

테이블에서 기업 이름 및 기업 고유 코드 (8자리 문자열), 섹터 id를 함께 가져오도록 javascript를 수정하도록 하자

 

[run_code.js]

/* 
* 필요한 함수 선언 
*/
// jsTree 노드의 부모 노드들의 id와 text 어레이로 반환
function fnGetParents(treeObj, node) {
    parents = [];
    for (i = 0; i < node.parents.length - 1; i++) {
        parent_node_id = node.parents[i];
        parent_node = treeObj.get_node(parent_node_id);
        parents.push({id: parent_node_id, text: parent_node.text});
    }
    parents.reverse();
    
    return parents;
}

// jsTree 최하위노드 재귀검색
function fnTreeGetLeafNodes(treeObj, node, arrCollector) {
    var child_id_arr = node.children;    // 노드의 자식 노드들의 id 문자열이 담긴 어레이를 가져온다
    var arrCollector = (arrCollector) ? arrCollector : [];

    for (var i = 0; i < child_id_arr.length; ++i) {
        var node_id = child_id_arr[i];
        var nodeChild = treeObj.get_node(node_id);    // 트리의 노드 객체를 가져온다
        if (tree.is_leaf(node_id)) {
            // 잎노드일 경우 (최하위 노드)
            arrCollector.push({
                node_id: node_id,
                node_text: nodeChild.text,
                parents: fnGetParents(treeObj, nodeChild),
                corp_info_arr: []
            });
        } else {
            // 최하위 노드가 아니면 재귀호출
            arrCollector = fnTreeGetLeafNodes(treeObj, nodeChild, arrCollector);
        }
    }

    return arrCollector;
}

// 비동기 delay
function sleep(ms) {
    return new Promise((r) => setTimeout(r, ms));
}

// search 함수 호출 후 <table>에서 기업 정보 가져오기
async function fnGetCorpNamesFromTableSearch(index, corp_info_arr, node_id, delay_ms_table) {
    if (index > 1) {
        search(index);
        await sleep(delay_ms_table);
    }

    var corp_info_arr = (corp_info_arr) ? corp_info_arr : [];
    var table = document.getElementsByTagName("table")[0];
    var tbody = table.getElementsByTagName("tbody")[0];
    var tr_list = tbody.getElementsByTagName("tr");

    for (i = 0; i < tr_list.length; i++) {
        tr = tr_list[i];
        td = tr.getElementsByTagName("td")[0];
        span = td.getElementsByTagName("span")[0];
        a = span.getElementsByTagName("a")[0];
        // 기업 이름 (\r, \n, \t 문자 정규식 통해 치환)
        corp_name = a.text;
        corp_name = corp_name.replace(/(\r\n|\n|\r|\t)/gm,"");
        // 기업 고유 코드 (8자리)는 href 속성 문자열 파싱
        href = a.getAttribute('href');
        index1 = href.indexOf("'");
        index2 = href.slice(index1 + 1, href.length + 1).indexOf("'") + index1;
        unique_code = href.slice(index1 + 1, index2 + 1);

        info = {
            name: corp_name,
            code: unique_code,
            sector: node_id
        };
        corp_info_arr.push(info);
    }
    
    return corp_info_arr;
}

// 페이지 여러개에 대한 비동기 순차 테이블 조회
async function fnGetCorpNamesFromTableAll(corp_info_arr, node_id, delay_ms_table) {
    var pageInfo = document.getElementsByClassName("pageInfo");
    var pageSkip = document.getElementsByClassName("pageSkip")[0];
    try {
        var li_tags = pageSkip.getElementsByTagName("li");
        var arrIndex = [];
        for (i = 0; i < li_tags.length; i++) {
            arrIndex.push(i + 1);
        }
    
        await arrIndex.reduce((prevTask, currTask) => {
            return prevTask.then(() => fnGetCorpNamesFromTableSearch(
                currTask, corp_info_arr, node_id, delay_ms_table));
        }, Promise.resolve());
    } catch (error) {
        // 해당 종목에 회사가 아예 없으면 pageSkip 태그가 없다! (TypeError 발생)
        ;
    }
}

// 트리 노드 선택 후 테이블에서 기업 정보 가져오기
async function fnSelectNodeAndGetCorpNamesFromTableAll(leaf_node_dict, treeObj, delay_ms_node, delay_ms_table) {
    // 트리 노드 선택 - 딜레이
    var node_id = leaf_node_dict.node_id;
    treeObj.select_node(node_id);
    
    await sleep(delay_ms_node);

    // 테이블에서 모든 기업 정보 가져오기
    var corp_info_arr = []
    await fnGetCorpNamesFromTableAll(corp_info_arr, node_id, delay_ms_table);
    // 기업 정보를 가져온 뒤 결과를 딕셔너리 내의 어레이에 concat
    if (corp_info_arr.length > 0) {
        leaf_node_dict.corp_info_arr = leaf_node_dict.corp_info_arr.concat(corp_info_arr);
        console.log(leaf_node_dict.node_text, ": ", leaf_node_dict.corp_info_arr.length, "companies")
    } else {
        console.log(leaf_node_dict.node_text, ": Empty!", )
    }
    
    // 트리 노드 선택 해제
    treeObj.deselect_node(node_id);
}

// 호출 함수
async function fnProcess(leaf_node_arr, treeObj, delay_ms_node, delay_ms_table) {
    await leaf_node_arr.reduce((prevTask, currTask) => {
        return prevTask.then(() => fnSelectNodeAndGetCorpNamesFromTableAll(
            currTask, tree, delay_ms_node, delay_ms_table));
    }, Promise.resolve());
    
    console.log('Finished');
}

console.log('Start');
fn_toggleTab("business");  // 크롤링 시각화 위해 탭 변경 (선택사항)

/* 
* Step.1 트리의 모든 최하위 노드 가져오기 
*/
var tree = $j("#businessTree").jstree(true);
top_node = tree.get_node('all');
arr_leaf_nodes = fnTreeGetLeafNodes(tree, top_node);

/*
* Step.2 모든 최하위 노드들을 순차 조회하며 테이블에서 기업 정보 가져오기
*/
fnProcess(arr_leaf_nodes, tree, 1000, 1000);

이에 맞춰서 트리 클래스도 바꿔주자 (Company 클래스 추가)

 

[tree.py]

from typing import List, Union

class Company:
    _name: str = ''
    _unique_id: str = ''
    _sector: str = ''

    def __init__(self, name: str = '', unique_id: str = '', sector: str = ''):
        self._name = name
        self._unique_id = unique_id
        self._sector = sector

    def __ge__(self, other):
        return self._name > other.name

	def __repr__(self):
        return f'{self._name} ({self._unique_id}, {self._sector})'

    @property
    def name(self) -> str:
        return self._name

    @property
    def unique_id(self) -> str:
        return self._unique_id

    @property
    def sector(self) -> str:
        return self._sector

class TreeNodeBase:
    _corp_info_lst: List[Company]
    _ident: str = ''
    _text: str = ''

    def __init__(self, ident: str, text: str, corp_info_lst: List[Company] = None):
        self._ident = ident
        self._text = text
        self._corp_info_lst = list()
        if corp_info_lst is not None:
            self._corp_info_lst.extend(corp_info_lst)
            self._corp_info_lst.sort(key=lambda x: x.name)

    def getIdent(self) -> str:
        return self._ident

    def getText(self) -> str:
        return self._text

    def getCorpInfoList(self) -> List[Company]:
        result = list()
        for child in self.getChildren():
            corp_info_lst = child.getCorpInfoList()
            result.extend(corp_info_lst)
        result.extend(self._corp_info_lst)
        return result

    def setParentNode(self, node):
        pass

    def addChildNode(self, node):
        pass

    def hasChildNode(self, node) -> bool:
        return False

    def getChildren(self) -> list:
        return []

    def __repr__(self):
        return f'{self._text}({self._ident})'

class TreeNode(TreeNodeBase):
    _parent: Union[TreeNodeBase, None]
    _children: List[TreeNodeBase]

    def __init__(self, ident: str, text: str, corp_info_lst: List[Company] = None):
        super().__init__(ident, text, corp_info_lst)
        self._parent = None
        self._children = list()

    def setParentNode(self, node: TreeNodeBase):
        self._parent = node
        if not self._parent.hasChildNode(self):
            self._parent.addChildNode(self)

    def addChildNode(self, node: TreeNodeBase):
        self._children.append(node)

    def hasChildNode(self, node: TreeNodeBase) -> bool:
        return node in self._children

    def getChildren(self) -> List[TreeNodeBase]:
        return self._children

    def getChildNode(self, ident: int) -> Union[TreeNodeBase, None]:
        filterted = list(filter(lambda x: x.getIdent() == ident, self._children))
        if len(filterted) == 1:
            return filterted[0]
        else:
            return None

    def __eq__(self, other: TreeNodeBase):
        return self._ident == other.getIdent()


class Tree:
    _node_list: List[TreeNode]

    def __init__(self):
        self._node_list = list()  # 여러개의 최상위 노드를 담을 수 있게 일단은 구성

    def add_leaf_node(self, ident: str, text: str, parents: List[dict], corp_info_lst: List[Company] = None):
        iter_node: Union[TreeNode, None] = None

        for i, elem in enumerate(parents):
            temp_ident = elem.get('id')
            temp_text = elem.get('text')
            temp_node = TreeNode(temp_ident, temp_text)
            if iter_node is None:  # 최초 iteration
                if temp_node in self._node_list:  # 최상위 노드가 트리의 노드 리스트에 있을 경우
                    temp_node = list(filter(lambda x: x.getIdent() == temp_ident, self._node_list))[0]
                else:  # 최상위 노드가 트리의 노드 리스트에 없을 경우 (추가)
                    self._node_list.append(temp_node)
            else:  # 자식 노드 iteration
                if iter_node.hasChildNode(temp_node):  # 이전 노드의 자식으로 이미 포함되어 있을 경우
                    temp_node = iter_node.getChildNode(temp_ident)
                else:  # 이전 노드의 자식에 없을 경우 (추가)
                    iter_node.addChildNode(temp_node)
            iter_node = temp_node
        node = TreeNode(ident, text, corp_info_lst)
        iter_node.addChildNode(node)

    def add_leaf_node_dict(self, leaf_node_dict: dict):
        ident = leaf_node_dict.get('node_id')
        text = leaf_node_dict.get('node_text')
        parents = leaf_node_dict.get('parents')
        corp_info_arr = leaf_node_dict.get('corp_info_arr')
        corp_info_lst = []
        for elem in corp_info_arr:
            info = Company(elem.get('name'), elem.get('code'), elem.get('sector'))
            corp_info_lst.append(info)
        self.add_leaf_node(ident, text, parents, corp_info_lst)

    def add_leaf_nodes(self, leaf_node_dict_list: List[dict]):
        [self.add_leaf_node_dict(x) for x in leaf_node_dict_list]

    def printHierarchy(self):
        txt = ''

        def iter_print(node_: TreeNodeBase, level: int = 0):
            txt_node = '-' * (level * 4) + node_.getText() + f'({node_.getIdent()})\n'
            if len(node_.getChildren()) > 0:
                for child in node_.getChildren():
                    txt_node += iter_print(child, level + 1)
            return txt_node

        for node in self._node_list:
            txt += iter_print(node)

        print(txt)

    def getNodeList(self) -> List[TreeNode]:
        return self._node_list

새로 크롤링을 해준뒤 직렬화 (용량이 꽤 커졌다 ㅎㅎ)

result_list.pkl
3.18MB

제대로 가져왔는지 테스트해보자

import pickle
with open('./result_list.pkl', 'rb') as fp:
    node_list = pickle.load(fp)
tree = Tree()
tree.add_leaf_nodes(node_list)
In [1]: node_list[0].get('corp_info_arr')[0]
Out[1]: {'code': '00530255', 'name': '농업회사법인동주물산', 'sector': '01110'}

In [2]: node_list[-1].get('corp_info_arr')[-1]
Out[2]: {'code': '00339522', 'name': '太平洋시멘트', 'sector': '99'}

In [3]: tree.getNodeList()[0].getChildren()[0].getCorpInfoList()[0]
Out[3]: 농업회사법인동주물산 (00530255, 01110)

In [4]: tree.getNodeList()[0].getChildren()[0].getCorpInfoList()[1]
Out[4]: 농업회사법인보령스마트에너지 (01506477, 01110)

굳! 이제 각 잎노드(leaf node)가 회사명 뿐만 아니라 DART에서 사용되는 기업 고유 코드, 최하위 노드에 해당하는 섹터 id까지 정보를 함께 갖고 있다

 

변경사항에 맞춰서 treeview 테스트 코드도 수정해주자

 

[treeview.py]

from typing import List
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QResizeEvent
from PyQt5.QtWidgets import QWidget, QTreeWidget, QTreeWidgetItem, QTableWidget, QTableWidgetItem
from PyQt5.QtWidgets import QSplitter, QHeaderView, QAbstractItemView
from tree import Tree, TreeNodeBase, Company

class CorpTreeItem(QTreeWidgetItem):
    def __init__(self, node: TreeNodeBase):
        super().__init__()
        self._node = node
        self._sector = node.getIdent()
        text = node.getText() + ' (' + node.getIdent() + ')'
        self.setText(0, text)

    def getCorpInfoList(self) -> List[Company]:
        return self._node.getCorpInfoList()

    def getSector(self) -> str:
        return self._sector

class CorpTreeViewWidget(QWidget):
    def __init__(self):
        super().__init__()
        self._tree = QTreeWidget()
        self._treeitems: List[CorpTreeItem] = list()
        self._table = QTableWidget()
        self._splitter = QSplitter(Qt.Horizontal, self)
        self.initControl()
        self.initLayout()

    def initControl(self):
        self._tree.itemSelectionChanged.connect(self.onItemSelectionChanged)
        self._tree.setHeaderHidden(True)
        self._table.setColumnCount(3)
        self._table.setHorizontalHeaderLabels(['이름', '고유번호', '업종코드'])
        self._table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch)
        self._table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents)
        self._table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents)
        self._table.setSelectionBehavior(QAbstractItemView.SelectRows)
        self._table.setSelectionMode(QAbstractItemView.SingleSelection)
        self._table.itemDoubleClicked.connect(self.onTableItemDoubleClicked)

    def initLayout(self):
        self._splitter.addWidget(self._tree)
        self._splitter.addWidget(self._table)

    def setCorpTree(self, tree_: Tree):
        self._tree.clear()
        tree_node_list = tree_.getNodeList()
        for top_node in tree_node_list:
            def addTreeItem(tree_node: TreeNodeBase, tree_item: CorpTreeItem = None):
                if tree_item is None:
                    tree_item = CorpTreeItem(tree_node)
                    self._tree.addTopLevelItem(tree_item)
                    self._treeitems.append(tree_item)
                else:
                    tree_child_item = CorpTreeItem(tree_node)
                    tree_item.addChild(tree_child_item)
                    self._treeitems.append(tree_child_item)
                    tree_item = tree_child_item
                for child_node in tree_node.getChildren():
                    addTreeItem(child_node, tree_item)

            addTreeItem(top_node)

    def resizeEvent(self, a0: QResizeEvent) -> None:
        self._splitter.resize(self.width(), self.height())

    def onItemSelectionChanged(self):
        selitems = self._tree.selectedItems()
        corp_info_lst = []
        if len(selitems) > 0:
            for elem in selitems:
                if isinstance(elem, CorpTreeItem):
                    corp_info_lst.extend(elem.getCorpInfoList())
        self._table.setRowCount(len(corp_info_lst))
        for i in range(self._table.rowCount()):
            item = QTableWidgetItem()
            item.setText(corp_info_lst[i].name)
            item.setFlags(Qt.ItemFlags(int(item.flags()) ^ Qt.ItemIsEditable))
            item.setTextAlignment(Qt.AlignLeft | Qt.AlignVCenter)
            self._table.setItem(i, 0, item)
            item = QTableWidgetItem()
            item.setText(corp_info_lst[i].unique_id)
            item.setFlags(Qt.ItemFlags(int(item.flags()) ^ Qt.ItemIsEditable))
            item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
            self._table.setItem(i, 1, item)
            item = QTableWidgetItem()
            item.setText(corp_info_lst[i].sector)
            item.setFlags(Qt.ItemFlags(int(item.flags()) ^ Qt.ItemIsEditable))
            item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
            self._table.setItem(i, 2, item)

    def onTableItemDoubleClicked(self):
        item = self._table.selectedItems()
        rows = list(set([x.row() for x in item]))
        if len(rows) == 1:
            row = rows[0]
            sector = self._table.item(row, 2).text()
            tree_item = list(filter(lambda x: x.getSector() == sector, self._treeitems))
            if len(tree_item) == 1:
                self._tree.setCurrentItem(tree_item[0])
if __name__ == "__main__":
    import sys
    import pickle
    from PyQt5.QtWidgets import QApplication

    with open('./result_list.pkl', 'rb') as fp:
        node_list = pickle.load(fp)

    app = QApplication(sys.argv)
    wgt_ = CorpTreeViewWidget()
    wgt_.resize(600, 600)
    wgt_.show()
    corp_tree = Tree()
    corp_tree.add_leaf_nodes(node_list)
    wgt_.setCorpTree(corp_tree)
    app.exec_()

굳!

의도했던 대로 잘 가져온 것 같다 ㅎㅎ

12. GitHub 등록

아... 개발하고 글쓰고하느라 3일이나 지나버렸다... 하루만에 끝내려고 했는데;

글은 그냥 이쯤에서 마무리하고 이때까지 한 작업 내용을 깃허브에 올렸다

https://github.com/YOGYUI/DARTSectorWebCrawler

 

GitHub - YOGYUI/DARTSectorWebCrawler

Contribute to YOGYUI/DARTSectorWebCrawler development by creating an account on GitHub.

github.com

다른 어플리케이션에 이 프로젝트를 활용한 결과는 다른 글에서 다뤄보도록 하자 ㅎㅎ

 

아쉬운점

  • 노드 클릭, 테이블 페이지 탐색 등 페이지 렌더링을 기다려야 하므로 너무 느리다
  • 전체 크롤링 과정이 종료되었는지에 대한 확인 절차가 없어, 사용자가 임의로 결과 가져오기를 수행해야 한다
  • 전체 결과를 로컬에 남기기만 하고, DB 연동은 너무너무 귀찮아서 다음 과제로 미뤘다

뭔가 남은게 있냐하면 흠...

jstree를 갖다쓰는 웹페이지를 크롤링하는 방법에 대해 공유하게 되었다는 점? ㅎㅎ

 

업종별 기업 정보를 DB에 연동한 뒤, 야금야금 개발하고 있는 전자공시조회 OPENDART 어플리케이션에 접목시키는게 최종 목표!

끝~!

[시리즈]

웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (1)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (2)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (3)
웹크롤링 - DART 기업개황 업종별 기업 리스트 가져오기 (Final)

반응형