다양한 환경에서 노드간 통신이 잘 이루어지는가?

(1) 노드간 동기화 문제 해결

  • 숫자를 순차적으로 보내서 누락된게 있는지 확인해보자
    • 그런데 보낸 쪽이 안 보낸건지, 받는 쪽이 못 받은건지 구분할 수 있나?
  • 중간보다는 맨 처음과 끝에서 누락되는지 잘 살펴보자
  • 받는 쪽을 먼저 실행시켜 놓고, 그 다음에 보내는 쪽을 실행시켜야 하지 않나?
    • roslaunch로는 노드를 순서대로 실행시킬 수 없으니 rosrun을 사용하자
    • 더 좋고 편한 방법은 없을까?
실제 테스트
  • sender_serial.py
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32


pub = rospy.Publisher('my_topic', Int32)

rate = rospy.Rate(2)
count = 1

while not rospy.is_shutdown():
	count = count + 1
  • receiver_serial.py
#!/usr/bin/env python

import rospy
from std_msgs.msg import Int32

def callback(msg):
	print msg.data

sub=rospy.Subscriber('my_topic', Int32, callback)

실행 결과
  • 처음 토픽을 못 받고 2부터 찍히는 것을 확인할 수 있었음


  • 받는 쪽을 먼저 실행시켜 놓고, 그 다음에 보내는 쪽을 실행시켜야 하지 않나?

    • 서로 통신하기 위해 각각 Subsriber와 Publisher가 실행이 되면 Core에게 정보를 던져주고 Core가 정보를 받아 둘을 연결시켜주는 방법인데 Subscriber을 먼저 실행시키든, Publisher을 먼저 실행시키든 Core에서 둘 다 실행된 것을 확인하고 매칭을 시켜주는 것이므로 결국에는 Subscriber을 미리 먼저 실행시킨다고 해서 해결되는 문제가 아님.
    • Publisher가 Subscriber이 준비가 되었는지 확인을 하고 Topic을 던지는 것이 더 중요함
  • 노드가 등록이 됐는지 확인하는 함수를 이용

    • get_num_connections() : 이 topic에 대해서 기다리고 있는 ROS node의 개수를 보내 주는 함수
      • 값이 0일 경우에는 topic을 받을 node가 아무것도 없음을 알고 준비가 안되었다는 것을 확인할 수 있음
      • 값이 1일 경우에는 topic을 받을 node가 준비가 되었다는 것을 확인하고 보낼 수 있음
      • 예를 들어 5명에게 보내야 할 경우에는 값이 5인 것(5개의 노드가 준비된 것)을 확인하고 보내면 됨
해결 방법
  • Subsriber가 연결되었음을 확인하고 그 다음에 메시지를 전송함
  • sender_serial.py를 아래와 같이 수정함
#!/usr/bin/env python
import rospy
from std_msgs.msg import Int32


pub = rospy.Publisher('my_topic', Int32)

rate = rospy.Rate(2)
count = 1

while (pub.get_num_connections() == 0 ) : # receiver 노드가 토픽을 수신할 수 있을 때 까지 기다림

while not rospy.is_shutdown():
	count = count + 1
해결 후 실행 결과
  • 맨 처음 토픽인 1부터 순차적으로 출력하는 것을 확인할 수 있었음


한가지 추가 된 궁금점


  • 근데 왜 코드를 수정하기 전에 Sender을 먼저 실행시키고, 나중에 Receiver을 실행시키면 숫자가 2보다 더 큰 숫자로 시작하는 걸까?
  • Core에서 Topic 전송이 Publisher과 Subsriber을 매칭을 해준 후에 실행되는 것이고 Sender의 코드는 그 전부터 계속 돌고 있어서 그런 것인가?

(2) ROS 전송속도

  • 파이썬 파일 2개랑 런치파일 1개를 만들어서 테스트 해보자

    • sender_speed.py

    • receiver_speed.py

    • sr_speedl.launch

  • 정해진 크기의 데이터를 반복해서 왕창 보내자

    • 보내는 쪽은 10분 동안 시간을 정해 놓고 쉴 새 없이 보내자
    • 10분 동안 몇 바이트를 보냈는지 체크해서 송신 속도를 계산해보자
    • 받는 쪽도 10분 동안 시간을 정해놓고 모두 얼마나 받았는지 체크해서 수신 속도를 계산하자
  • 받는 쪽이 없으면 어떻게 될까?

    • 토픽에 대해서 구독하는 노드가 없으면 송신속도가 더 빨라지나?
실제 테스트
  • sender_speed.py
#! usr/bin/env python

import rospy
import std_msgs.msg import String

name = "sender"
pub_topic = "long_string"

pub = rospy.Publisher(pub_topic,String,queue_size=1)

hello_str = String()
rate = rospy.Rate(1)

pub_size =1000000 # 1M byte
#pub_size =5000000 # 5M byte
#pub_size =10000000 # 10M byte
#pub_size =20000000 # 20M byte
#pub_size =50000000 # 50M byte

my_string =""

for i in range(pubsize):
	my_string +="#"
while not rospy.is_shutdown():
	hello_str.data = my_string + ":" + str(rospy.get_time())
	# rospy.loginfo(str(hello_str.data).split(":")[1])

  • receiver_speed.py
#! /usr/bin/env python

import rospy
from std_msgs.msg import String

name = "receiver"
sub_topic = "long_string"

def callback(data):
    current_time = str(rospy.get_time())
    arrival_data = str(data.data).split(":")

    time_diff = float(current_time) - float(arrival_data[1])
    string_size = len(arrival_data[0])
    rospy.loginfo(str(string_size) + " byte: " + str(time_diff) + " second")
    rospy.loginfo("speed: " + str(float(string_size) / time_diff) + "byte/s")

rospy.init_node(name, anonymous = True)
rospy.Subscriber(sub_topic, String, callback)

실행 결과
  • 1Mbyte
  • 전송시 약 0.01초 소요, 전송속도 : 약 100MBps


  • 5M byte
  • 전송시 약 0.01초 소요, 전송속도 : 약 500MBps(166MBps,714MBps, 250MBps도 보임)


  • 10M byte
  • 전송시 약 0.03초 소요, 전송속도 : 약 333MBps


  • 20M byte
  • 전송시 약 0.05초 소요, 전송속도 : 약 400MBps


  • 50M byte
  • 전송시 약 0.25초 소요, 전송속도 : 약 200MBps


(3) ROS 처리 지연 문제

  • 파이썬 파일 2개랑 런치파일 1개를 만들어서 테스트 해보자

    • sender_overflow.py
      • sender라는 이름으로 노드 생성

      • 발행하는 토픽 이름은 my_topic 타입은 Int32

      • 1초에 1000번씩 숫자를 1씩 증가해서 토픽을 발행함

    • receiver_overflow.py
      • Receiver라는 이름으로 노드 생성

      • 구독자의 콜백함수 안에 시간 많이 걸리는 코드를 넣어서 토픽 처리에 시간이 걸리도록함

      • Sender로 부터 my_topic을 화면에 출력하여 토픽의 누락 여부를 확인함

        • 1씩 숫자가 증가하지 않으면 뭔가 문제가 있다는 것을 알 수 있음
    • sr_overflow.launch
  • 받는 쪽을 버벅되게 만들어 놓고 데이터를 왕창 보내자

    • 구독자의 콜백함수 안에 시간 많이 걸리는 코드를 넣어서 토픽 처리에 시간이 걸리도록 만들자
  • 콜백 함수가 끝나지 않았는데 토픽이 새로 도착하며 어떻게 되는가?

    • 도착한 토픽은 임시로 어딘가에 쌓이는가? 그걸 나중에 꺼내서 처리할 수 있는가?
    • 아님 그냥 없어지는가? 한 번 못 받은 토픽은 영영 못 받는 것인가?
    • 발행자는 이 사실을 아는가? 알려줄 수 있는 방법이 있는가?
실제 테스트
  • sender_overflow.py
#!/usr/bin/env python

import rospy
from std_msgs.msg import Int32

name = "sender"
pub_topic = "my_topic"

pub = rospy.Publisher(pub_topic, Int32, queue_size = 1)

rate = rospy.Rate(1000)
count = 1

while (pub.get_num_connections() == 0):
while not rospy.is_shutdown():
    count = count + 1
  • receiver_overflow.py
#! /usr/bin/env python

import rospy
from std_msgs.msg import Int32

name = "receiver"
sub_topic = "my_topic"

def callback(msg):
    rospy.loginfo("callback is being processed")
    rospy.sleep(5) # 5초간 잠을 재움
    print msg.data

rospy.Subscribe(sub_topic, Int32, callback, queue_size = 1)
실행 결과
  • 받은 토픽이 1씩 증가하지 않는 것을 확인할 수 있음

  • 중간에 잃어버린다는 결론을 내릴 수 있음


해결 방법
  • receiver_overflow.py 코드를 rospy.Subscriber(sub_topic, Int32, callback, queue_size = 30000) 로 수정
해결 후 실행결과
  • 1부터 순차적으로 출력되는 것을 확인할 수 있음


(4) ROS 타임 슬롯 문제

  • 1초에 5번 반복하게 하고 작업시간이 0.2초가 넘게 만들어보자

    • Rate(5)로 세팅하고 sleep() 앞에 들어간 작업코드의 수행시간을 늘려보자
    • 늘렸다 줄었다 변동성 있게 해보자. 입력값을 받아서 이걸 조정할 수 있게 만들까?
  • 1초에 5번 규칙을 지킬 수 없으면 어떻게 할까?

    • 앞에서부터 쭉 밀리는 식으로 일을 할까?
    • 쉬는 시간을 조정할까?
    • 이번에 3번만 하고 다음번을 기약할까?
실제 테스트
  • teacher_int32_job.py
#!/usr/bin/env python

import rospy
import time
from std_msgs.msg import Int32

def do_job(time):
    for i in range(0, time):
        i = i + 1

def list_append_time():

pub = rospy.Publisher('msg_to_students', Int32, queue_size = 0)
rate = rospy.Rate(5)

while not rospy.is_shutdown():
    start = []
    end = []
    sleep = []
    num = input("input count number> ")
    total_start = time.time()

    for j in range(5):
        start_time = time.time()
        end_time = time.time()

        sleep_time = time.time()
    total_end = time.time()

    for t in range(5):
        sleep[t] = sleep[t] - end[t]
        end[t] = end[t] - start[t]
    for result in range(5):
        print "spend time > ", round(end[result], 4), 's'
        print "sleep time > ", round(sleep[result], 4), 's'
    print "------------------------"
    print "total time > ", round((total_end - total_start), 4), 's'
    print "------------------------\n\n"
실행 결과
  • 하는 일의 양이 적거나 아주 많을 때
    • 10000 경우에는 작업을 0.2초 안에 완료하고 남은 시간은 sleep
    • 50000 경우에는 작업을 0.2초 넘겨서 완료하고 남은시간이 없으므로 sleep을 못함 결국 시간을 넘겨서 전체 작업 완료


  • 하는 일의 양이 타임슬롯의 경계를 왔다 갔다 할 경우
    • 38000 경우에는 작업을 0.2초 안에 완료하고 남은 시간을 sleep 함
    • 42000 경우에는 작업이 0.2초를 넘겨서 끝나 뒤에서 sleep 시간을 단축하여 보충함


(5) ROS 노드의 순차 실행

  • 파이썬 파일 5개랑 런치파일 1개를 만들자

    • first.py
    • second.py
    • third.py
    • fourth.py
    • receiver.py
    • sr_order.launch
  • 순서대로 receiver에 메시지를 보내도록 만듬

    • receiver는 도착한 순서대로 출력함 (결과적으로 First-Second-Third-Fourth 순서대로 출력해야 함)
    • 앞에 있는 노드가 Topic을 보내기 전에 먼저 Topic을 보내면 안됨
  • 어떻게 동기를 맞추고 순서를 맞출 수 있을까?

    • Launch 파일을 이용해서 할 수 있을까?
    • ROS의 도움으로 할 수 있을까?
    • 프로그래밍을 따로 작성해야 하는 것인가?
실제 테스트
  • sr_order.launch
    <node name = "receiver" pkg = "order_test" type = "receiver.py" output = "screen"/>
    <node name = "first" pkg = "order_test" type = "first.py" output = "screen"/>
    <node name = "second" pkg = "order_test" type = "second.py" output = "screen"/>
    <node name = "third" pkg = "order_test" type = "third.py" output = "screen"/>
    <node name = "fourth" pkg = "order_test", type = "fourth.py" output = "screen"/>
  • receiver.py
#! /usr/bin/env python

import rospy
from std_msgs.msg import String

name = "receiver"
pub_topic = "start_ctl"
sub_topic = "msg_to_receiver"

def callback(data):
    rospy.loginfo("i heard %s", data.data)

rospy.Subscriber(sub_topic, String, callback)
pub = rospy.Publisher(pub_topic, String, queue_size = 1)

rate = rospy.Rate(10)
hello_str = String()

sq = ["first", "second", "third", "fourth"]
pub_msg = String()

for i in sq:
    pub_msg.data = i + ":go"

  • first.py, second.py, third.py, fourth.py
#!/usr/bin/env python

import rospy
from std_msgs.msg import String

name = "first"  # second, third, fourth 
pub_topic = "msg_to_receiver"
sub_topic = "start_ctl"

OK = None

def ctl_callback(data):
    global OK
    OK = str(data.data)

rospy.Subscriber(sub_topic, String, ctl_callback)

while True:
    if OK == None:
    d = OK.split(":")
    if (len(d) == 2) and (d[0] == name) and (d[1] == "go"):
pub = rospy.Publisher(pub_topic, String, queue_size = 1)
rate = rospy.Rate(2)
hello_str = String()

while not rospy.is_shutdown():
    hello_str.data = "my name is " + name
실행 결과
  • first 부터 fourth 까지 순차적으로 실행되는 것을 확인할 수 있음


  • rqt_graph
