用 Python 监控 NASA TV 直播画面

共 22626字,需浏览 46分钟

 ·

2021-05-21 02:37

本文分享一个名为"Spacestills"的开源程序,它可以用于查看 NASA TV 的直播画面(静止帧)。
演示地址:
https://replit.com/@PaoloAmoroso/spacestills


在Replit上运行的Spacestills主窗口
这是一个具有GUI的简单系统,它访问feed流并从Web下载数据。该程序仅需350行代码,并依赖于一些开源的Python库。
关于程序
Spacestills会定期从feed流中下载NASA TV静止帧并将其显示在GUI中。
该程序可以校正帧的纵横比,并将其保存为PNG格式。它会自动下载最新的帧,并提供手动重新加载,禁用自动重新加载或更改下载频率的选项。
Spacestillsis是一个比较初级的版本,但是它可以做一些有用的事情:捕获并保存NASA TV直播的太空事件图像。太空爱好者经常在社交网络或论坛共享他们从NASA TV手动获取的屏幕截图。Spacestills节省了使用屏幕捕获工具的时间,并保存了可供共享的图像文件。您可以在Replit上在线运行Spacestills。
开发环境
笔者用Replit开发了Spacestills。Replit是云上的开发,部署和协作环境,它支持包括Python在内的数十种编程语言和框架。作为Chrome操作系统和云计算爱好者,笔者非常喜欢Replit,因为它可以在浏览器中完全正常运行,无需下载或安装任何内容。
资源和依赖包
Spacestills依赖于一些外部资源和Python库。
  • NASA TV feed 流
肯尼迪航天中心的网站上有一个页面,其中包含精选的NASA视频流,包括NASA电视公共频道。feed流显示最新的静止帧并自动更新。
每个feed都带有三种尺寸的帧,Spacestills依赖于具有704x408像素帧的最大NASA TV feed流。最大更新频率为每45秒一次。因此,检索最新的静止帧就像从feed流的URL下载JPEG图像一样简单。
原始图像被垂直拉伸,看起来很奇怪。因此,该程序可以通过压缩图像并生成未失真的16:9版本来校正纵横比。
  • Python
因PySimpleGUI的原因需要安装 Python 3.6 版本。
  • 第三方库
Pillow:图像处理
PySimpleGUI:GUI框架(Spacestills使用Tkinter后端)
Request:HTTP请求
完整代码

from io import BytesIO
from datetime import datetime, timedelta
from pathlib import Path
import requests
from requests.exceptions import Timeout
from PIL import Image
import PySimpleGUI as sg


FEED_URL = 'https://science.ksc.nasa.gov/shuttle/countdown/video/chan2large.jpg'

# Frame size without and with 16:9 aspect ratio correction
WIDTH = 704
HEIGHT = 480
HEIGHT_16_9 = 396

# Minimum, default, and maximum autoreload interval in seconds
MIN_DELTA = 45
DELTA = MIN_DELTA
MAX_DELTA = 300


class StillFrame():
    """Holds a still frame.
    
    The image is stored as a PNG PIL.Image and kept in PNG format.
    Attributes
    ----------
        image : PIL.Image
            A still frame
        original : PIL.Image
            Original frame with wchich the instance is initialized, cached in case of
            resizing to the original size
    
    Methods
    -------
        bytes : Return the raw bytes
        resize : Resize the screenshot
        new_size : Calculate new aspect ratio
    """


    def __init__(self, image):
        """Convert the image to PNG and cache the converted original.
        Parameters
        ----------
            image : PIL.Image
                Image to store
        """

        self.image = image
        self._topng()
        self.original = self.image

    def _topng(self):
        """Convert image format of frame to PNG.
        Returns
        -------
            StillFrame
                Frame with image in PNG format
        """

        if not self.image.format == 'PNG':
            png_file = BytesIO()
            self.image.save(png_file, 'png')
            png_file.seek(0)
            png_image = Image.open(png_file)
            self.image = png_image
        return self

    def bytes(self):
        """Return raw bytes of a frame image.
        
        Returns
        -------
            bytes
                Byte stream of the frame image
        """

        file = BytesIO()
        self.image.save(file, 'png')
        file.seek(0)
        return file.read()

    def new_size(self):
        """Return image size toggled between original and 16:9.
        
        Returns
        -------
            2-tuple
                New size
        """

        size = self.image.size
        original_size = self.original.size
        new_size = (WIDTH, HEIGHT_16_9) if size == original_size else (WIDTH, HEIGHT)
        return new_size

    def resize(self, new_size):
        """Resize frame image.
        
        Parameters
        ----------
            new_size : 2-tuple
                New size
        Returns
        -------
            StillFrame
                Frame with image resized
        """

        if not(self.image.size == new_size):
            self.image = self.image.resize(new_size)
        return self
    

def make_blank_image(size=(WIDTH, HEIGHT)):
    """Create a blank image with a blue background.
    
    Parameters
    ----------
        size : 2-tuple
            Image size
    
    Returns
    -------
        PIL.Image
            Blank image
    """

    image = Image.new('RGB', size=size, color='blue')
    return image


def download_image(url):
    """Download current NASA TV image.
    Parameters
    ----------
        url : str
            URL to download the image from
    
    Returns
    -------
        PIL.Image
            Downloaded image if no errors, otherwise blank image
    """

    try:
        response = requests.get(url, timeout=(0.50.5))
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
        else:
            image = make_blank_image()
    except Timeout:
        image = make_blank_image()
    return image


def refresh(window, resize=False, feed=FEED_URL):
    """Display the latest still frame in window.
    
    Parameters
    ----------
        window : sg.Window
            Window to display the still to
        feed : string
            Feed URL
    
    Returns
    -------
        StillFrame
            Refreshed screenshot
    """

    still = StillFrame(download_image(feed))
    if resize:
        still = change_aspect_ratio(window, still, new_size=(WIDTH, HEIGHT_16_9))
    else:
        window['-IMAGE-'].update(data=still.bytes())
    return still


def change_aspect_ratio(window, still, new_size=(WIDTH, HEIGHT_16_9)):
    """Change the aspect ratio of the still displayed in window.
    
    Parameters
    ----------
        window : sg.Window
            Window containing the still
        new_size : 2-tuple
            New size of the still
    
    Returns
    -------
        StillFrame
            Frame containing the resized image
    """

    resized_still = still.resize(new_size)
    window['-IMAGE-'].update(data=resized_still.bytes())
    return resized_still


def save(still, path):
    """Save still to a file.
    Parameters
    ----------
        still : StillFrame
            Still to save
        path : string
            File name
    
    Returns
    -------
        Boolean
            True if file saved with no errors
    """

    filename = Path(path)
    try:
        with open(filename, 'wb'as file:
            file.write(still.bytes())
        saved = True
    except OSError:
        saved = False
    return saved


def next_timeout(delta):
    """Return the moment in time right now + delta seconds from now.
    Parameters
    ----------
        delta : int
            Time in seconds until the next timeout
    
    Returns
    -------
        datetime.datetime
            Moment in time of the next timeout
    """

    rightnow = datetime.now()
    return rightnow + timedelta(seconds=delta)


def timeout_due(next_timeout):
    """Return True if the next timeout is due.
    Parameters
    ----------
        next_timeout : datetime.datetime
    
    Returns
    -------
        bool
            True if the next timeout is due
    """

    rightnow = datetime.now()
    return rightnow >= next_timeout


def validate_delta(value):
    """Check if value is an int within the proper range for a time delta.
    Parameters
    ----------
        value : int
            Time in seconds until the next timeout
    
    Returns
    -------
        int
            Time in seconds until the next timeout
        bool
            True if the argument is a valid time delta
    """

    isinteger = False
    try:
        isinteger = type(int(value)) is int
    except Exception:
        delta = DELTA
    delta = int(value) if isinteger else delta
    isvalid = MIN_DELTA <= delta <= MAX_DELTA
    delta = delta if isvalid else DELTA
    return delta, isinteger and isvalid


LAYOUT = [[sg.Image(key='-IMAGE-')],
          [sg.Checkbox('Correct aspect ratio', key='-RESIZE-', enable_events=True),
           sg.Button('Reload', key='-RELOAD-'),
           sg.Button('Save', key='-SAVE-'),
           sg.Exit()],
          [sg.Checkbox('Auto-reload every (seconds):', key='-AUTORELOAD-',
                       default=True),
           sg.Input(DELTA, key='-DELTA-', size=(31), justification='right'),
           sg.Button('Set', key='-UPDATE_DELTA-')]]


def main(layout):
    """Run event loop."""
    window = sg.Window('Spacestills', layout, finalize=True)
    current_still = refresh(window)

    delta = DELTA
    next_reload_time = datetime.now() + timedelta(seconds=delta)

    while True:
        event, values = window.read(timeout=100)
        if event in (sg.WIN_CLOSED, 'Exit'):
            break
        elif ((event == '-RELOAD-'or
                (values['-AUTORELOAD-'and timeout_due(next_reload_time))):
            current_still = refresh(window, values['-RESIZE-'])
            if values['-AUTORELOAD-']:
                next_reload_time = next_timeout(delta)
        elif event == '-RESIZE-':
            current_still = change_aspect_ratio(
                window, current_still, current_still.new_size())
        elif event == '-SAVE-':
            filename = sg.popup_get_file(
                'File name', file_types=[('PNG''*.png')], save_as=True,
                title='Save image', default_extension='.png')
            if filename:
                saved = save(current_still, filename)
                if not saved:
                    sg.popup_ok('Error while saving file:', filename, title='Error')
        elif event == '-UPDATE_DELTA-':
            # The current cycle should complete at the already scheduled time. So
            # don't update next_reload_time yet because it'll be taken care of at the
            # next -AUTORELOAD- or -RELOAD- event.
            delta, valid = validate_delta(values['-DELTA-'])
            if not valid:
                window['-DELTA-'].update(str(DELTA))

    window.close()
    del window


if __name__ == '__main__':
    main(LAYOUT)

更多阅读



用 XGBoost 进行时间序列预测


5分钟掌握 Python 随机爬山算法


5分钟完全读懂关联规则挖掘算法

特别推荐





点击下方阅读原文加入社区会员

浏览 35
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报