6 minute read

0. Intro

회사에서는 나를 ‘새로운 걸 맡겨도 중박은 치는 녀석’ 으로 인지한 듯 하다. 그래서 이번 과제도 우리 회사에서는 아무도 해보지 않은 것이었다. 자세한 내용을 말하지는 못하지만 대충 이런 과정을 거쳐야 하는 과제였다. (이해를 돕기 위해 덧붙이자면, 우리 회사는 가스 제조 회사이다.)

process

  1. Raspberry Pi 가 CCTV 로부터 영상을 받아 실시간으로 이미지를 처리한다.
  2. 이미지 처리해서 나오는 결과는 정수값으로, 이 정수값을 DCS 로 보낸다.

아마 이 글을 검색해서 들어온 사람들은 DCS 가 뭔지 아니까 검색했을 것이겠지만, 모르는 사람을 위해 간단히 설명하자면 DCS 는 Distributed Control System 이다. 여러개의 PLC 를 분산하여 전체 공정을 감시하고 제어하는 것으로 생각하면 된다. PLC 는 또 뭐냐면, Programmable Logic Controller 이다. 개별 공정을 제어하는 장치이다. 그러니까 개별 공정을 제어하는 PLC들을 모아 통합관리하는 것을 DCS 라고 부른다고 생각하면 된다.

python 으로 프로젝트를 좀 해봤거나, Raspberry Pi 를 좀 갖고 놀아본 사람이라면 CCTV 영상은 어떻게 받아올 지 감이 올 것이다. 그런데 python 으로 DCS 한테 신호를 어떻게 보내지…? DCS 는 애초에 공장이 있는 회사에 일하지 않고서야 접할 일도 없기 때문에 애매한 것이다. 나 역시 마찬가지였다. 게다가 나는… 예전 포스트에서도 한번 강조한 바 있긴 하지만, 나는 컴퓨터공학과 출신이 아니다… 상경대 통계학과 나왔다… 통신에 대해서는 문외한이라는 말이다. 나한테 이런 일을 맡기다니 아마 내가 잘못 이해한 것이 있을 수도 있고, 글에 틀린 부분이 있을 수도 있다. 혹시 그런 것을 발견하신 분은 꼭 댓글로 정정해주시길 바란다!! 아무튼 이렇게 하니 잘 되더라- 하는 기록이니 나와 비슷한 상황에 놓인 사람들을 위해 조금의 도움이라도 될까하여 이 글을 쓴다.

1. 어디로 연결할까요

1.1. 내가 쓰는 제품을 확인해보자

아까부터 Raspberry Pi 라고 명시하기는 했으나 나는 Comfile Pi 제품을 사용했다. comfilepi

산업용으로 개조된 제품이라 안심이 된다고 할까… 그랬기 때문이다. 이 제품에는 포트가 다음과 같이 있다.

comfilepiback

이걸 우리 회사 전기계기팀 분들께 보여드리니, 이더넷 포트로 CCTV 연결을 하고 RS485 포트로 DCS 연결을 하자고 하셨다. 이더넷 포트는 아는데, 어… RS485는 어떻게 연결하는거지…

1.2. RS485? RS232?

연결은 그냥 + 는 + 끼리, - 는 - 끼리 하면 된다. 그런데 python 에서 RS485 연결을 어떻게 인식할까? 포트 이름을 알아내서 코드에 명시해주어야 한다. 나는 개조된 기기를 사용했기 때문에 아래 Documentation 링크에서 이름을 확인했다. Documentation 을 보기 쉽게 온라인에 정리해주신 Comfile Pi 개발자분들께 감사를…

Documentation 이동

RS485 는 /dev/serial1 임을 확인했다. 나중에 코드 작성할 때 써야하니 기억해두자.

1.3. CCTV 주소는…?

전기계기팀에 CCTV 주소를 물어보면 IP 주소와 ID, Password 정도를 알려줄 것이다. 하지만 python open-cv 에서 비디오를 읽어오려면 이것보다 더 많은 정보가 필요할 것이다.

먼저 웹브라우저에서 IP 주소로 접속해서 어떤 뷰어를 사용하는지 보자. 뷰어 이름으로 구글링하면 어딘가에는 답이 있다…! 아무거나 써서 시도했다가는 나처럼 1시간동안 왜 안되지 흑흑.. 하고 있게 될 것이다

2. 어떻게 통신할까요

2.1. Modbus Protocol

통신규약이라는 것이 있다. DCS 랑 Raspberry Pi 가 서로 얘기하려면 아무래도 얘기하는 규칙이 같아야 할 것이기 때문이다. 우리 회사의 DCS 는 Modbus Protocol 로 통신이 된다고 했기 때문에, 이걸 기준으로 설명해보겠다.

Modbus Protocol 이 뭔지, 왜 쓰는지, 원리 등등은 구글링하면 나오기 때문에 생략하겠다. 우리는 그저 통신이 잘 되기만 하면 되는 것이다. 그러려면 알아야 할 것들과 DCS 와 맞춰야 할 것이 몇가지 있다.

  1. Modbus Protocol 에는 SlaveMaster 개념이 있다. 뭔진 잘 모르겠지만 반드시 나의 라즈베리파이가 Slave 인지 Master 인지는 확인해주자. 어떤 역할인지에 따라 코드가 완전히 달라진다. 우리 DCS 는 항상 Master 로만 설정 가능하다고 해서 나는 Slave 기준의 코드를 작성했다. 대충 구글링해서 나오는 Modbus 통신 코드는 대체로 Master 기준의 코드임을 유의하자. 그리고 내가 이해하기로는 Slave 에서 신호를 보내는 것이라기보다는 Slave 의 어떠한 상태를 Master 이 읽어낸다는 느낌이었다.

  2. baudrate, parity, stopbits, bytesize : 시리얼 통신 시 통신하는 두 기기에서 서로 맞춰줘야하는 정보이다. 각각이 어떤 의미인지 설명은 나도 잘 모름 생략하겠다. 보통 Slave 에서 정하고 Master 이 Slave 의 정보에 맞춰주는 것 같다. 내가 Slave 니까 사람들이 많이들 쓰는 값들로 정한 다음 DCS 엔지니어에게 말해주도록 하자. 나의 경우 9600, N, 1, 8 로 정했다.

  3. function code & address : 모드버스 통신 과정에서 필요한 내용이다. 어디에 신호를 저장했는지 DCS 엔지니어에게 알려주도록 하자.

3. 이제 코드를 짜보자 (Slave 기준)

pymodbus 를 사용하였다. pyserial 은 Master 기준 코드는 작성이 가능하지만 Slave 서버는 만들지 못한다고 들었는데 아닐 수도 있다. 아무튼 나는 (오로지 나의 편의를 위해) 아래와 같은 클래스를 만들어 썼다.

from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext

class SlaveServer() :

    def __init__(self) :
        self.store = ModbusSlaveContext(
            di=ModbusSequentialDataBlock(0, [0] * 100),  # Discrete Inputs
            co=ModbusSequentialDataBlock(0, [0] * 100),  # Coils
            hr=ModbusSequentialDataBlock(0, [0] * 100),  # Holding Registers
            ir=ModbusSequentialDataBlock(0, [0] * 100),  # Input Registers
        )
        self.context = ModbusServerContext(slaves=self.store, single=True)
        self.identity = ModbusDeviceIdentification()

    def update_register(self,val,address=0,function_code=3) :
        self.context[0].setValues(function_code,address,[val])

    def get_register_value(self,address,length=1,function_code=3) :
        return self.context[0].getValues(function_code,address,length)

코드 설명 들어갑니다잉.

Modbus 의 기본 개념을 차치하고, 우리는 Slave Server 이라는 서랍장을 가지고 있다고 생각하자. 서랍장은 다음과 같이 4칸으로 되어있다.

chest

이 서랍의 왼쪽 칸들은 모두 단일 비트 데이터만을 넣을 수 있고, 오른쪽 칸들은 16비트 데이터만을 넣을 수 있다. 그리고 윗줄은 읽기만 가능하고, 아랫줄은 읽기와 쓰기가 모두 가능하다. 이러한 서랍장을 위 코드에서 self.store 에서 정의하였다. 서랍장을 만들 때, 서랍 한개를 열어보면 100개의 칸이 있고 각 칸 안에 전부 0을 넣어 initiate 해 두었다.

말인즉 Holding Register 칸은 16비트 데이터를 읽고 쓰는 것이 가능하다. 우리 과제에서는 정수를 사용하기 때문에 무난하게 Holding Register 칸을 사용하는 것이 좋을 것이다. 그렇다면 holding register 를 사용할 것이라는 걸 모드버스에서는 어떻게 명시할까?

function code 라는 것이 있는데, 자세한 내용은 잘 정리한 글이 있어 링크를 공유한다. 방금의 서랍 사진에서 이름 아래 function code 를 써보면 다음과 같다.

chest-with-number

우리는 Holding Register 를 사용하기 때문에 나중에 function code=3 을 parameter 에 넣어주면 된다.

Holding Register 칸을 열어서 위에서 보면 이런 상태이다.

hr

address 는 list 나 tuple 의 index 로 생각해주면 된다. 만약 내가 결과 integer 를 맨 첫 칸에 저장한다면, DCS 엔지니어에게 ‘Holding Register 에 저장해놨고 Address 는 0입니다’ 라고 말씀드리면 된다.

내가 짠 클래스가 initialize 된 상태에서 update_register(1,0,3) 을 실행한다면, holding register 서랍은 다음과 같이 상태가 변할 것이다.

hr-updated

Master은 이렇게 업데이트되는 Slave 의 Holding Register 서랍 0번째 칸을 주기적으로 확인하는 것이다.

그럼 이러한 클래스가 정의되어있다고 가정하고, Raspberry Pi 를 Slave 로 만든 상태에서 주기적으로 holding register 를 업데이트하는 코드를 알아보자.

import threading
import time
from pymodbus.server import StartSerialServer
from pymodbus.transaction import ModbusRtuFramer

#import logging

#logging.basicConfig()
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)

def update_registers(slave):
    while True:
        register_value = slave.get_register_value(0, 1 ,3)[0]  # Read the current value of register 0
        new_register_value = (register_value + 1) % 65536  # Increment the value

        # Update the holding register with the new value
        slave.update_register(new_register_value,0, 3)
        slave.update_register(0,1,3)
        time.sleep(1)

serial_info = {
        'port' : '/dev/serial1',
        'baudrate' : 9600,
        'parity' : 'N',
        'stopbits' : 1,
        'bytesize' : 8,
    }

slave = SlaveServer()
server_kwargs = {
    'context':slave.context,
    'framer':ModbusRtuFramer,
    'identity':slave.identity}
server_kwargs.update(serial_info)
print("Initialized")

# Start Threading
server_thread = threading.Thread(
    target=StartSerialServer, 
    kwargs = server_kwargs
    )
server_thread.start()
update_thread = threading.Thread(
    target=update_registers, 
    args=(slave,),
    )
update_thread.start()
try :
    update_thread.join()
except KeyboardInterrupt :
    print("Shutting Down...")
    server_thread.join(timeout=1)

주석 처리한 logging 부분은 디버깅을 위해서는 실행 시 주석을 풀어주도록 하자.

Slave Server 를 실행시키면서 동시에 Slave Server 내 holding register 를 업데이트해줘야 하기 때문에 multithreading 을 해줘야 한다. 예시 코드의 update_registers 는 그저 첫번째 holding register 의 값을 점점 올리기만 하는 것이기 때문에, 이 부분을 각자 필요한 기능으로 변경해주면 된다.

4. 꼭 테스트를 하고 가세요

공장에 연결하는 경우 인터넷도 안되고 마땅한 책상도 없는 현장 서버실 가서 오류가 나면 골치 아프기 짝이 없기 때문에, 집에서 충분히 테스트를 하고 가야한다. 출장 계획을 잡기 전에 임의의 IP 카메라와 컴퓨터로 최대한 비슷하게 시뮬레이션을 해보고 문제가 없다는 걸 확인하자.

나는 DCS 대신 내 개인 맥북을 Modbus Master 로 두고 USB-RS485 컨버터로 맥북과 라즈베리파이를 연결한 후 테스트를 진행했다.

맥북에서 라즈베리파이의 holding register 를 읽는 테스트 코드는 다음과 같다.

from pymodbus.client import ModbusSerialClient
import time

# 시리얼 설정
serial_port = 'RS485연결된 포트 이름' 
baudrate = 9600
parity = 'N'
stopbits = 1
bytesize = 8

client = ModbusSerialClient(
    method='rtu',
    port=serial_port,
    baudrate=baudrate,
    bytesize=bytesize,
    parity=parity,
    stopbits=stopbits,
    timeout=1
)

connected = client.connect()

if connected:
    slave_address = 1
    register_address = 0
    num_registers = 1
    while True :
        response = client.read_holding_registers(register_address, num_registers, unit=slave_address)
        if response.isError():
            print("Error reading registers")
        else:
            print("Register values:", response.registers)
        time.sleep(1)
else:
    print("Failed to connect to the Modbus slave")

client.close()

여기서 포트 이름은 맥북 터미널에서

ls /dev/tty.*

명령어로 확인할 수 있다. 출력된 이름들 중 /dev/tty.usbserial-XXXXXXX 형식의 이름을 복사해서 사용하면 된다.

Leave a comment