Wiki

Clone wiki

Pipsta / Simple Monitoring for Print Job Completion

Overview

This bulletin describes a method of exiting a script only once the print job is complete, i.e. once the dot image has been fully rendered onto the paper/label roll and printing has stopped.

Background

The Pipsta printer has a simple STATUS ENQUIRY command; in ASCII this is:

GS, ENQ

Or, in hex:

0x1D, 0x05

The printer will return a single byte (two bytes if the printer is in an error state - though this is beyond the scope of this bulletin):

pertinent_status_bits.PNG

The two bits of interest are:

  • Bit 2 (data buffer empty)
  • Bit 1 (mechanism running)

In simple printing applications, the following sequence is expected:

  • The data buffer will begin to fill with print job data,
  • Once there is sufficient data to render one or more dots lines, a dot image will be generated,
  • The dot image will be sent to the thermal printer mechanism,
  • The dots will be rendered,
  • The paper will be moved-on.

Thus, if we want a robust method of sensing when a print job is truly complete, we need to firstly wait for the data buffer to become empty and then wait for the printer mechanism to stop running. This two-step test prevents us erroneously concluding the print job is complete before the printer mechanism has started printing.

Example

Using BasicPrint.py as a basis, the following script has been produced:

#!python

# BasicPrint2.py
# Copyright (c) 2015 Able Systems Limited. All rights reserved.
'''This simple code example is provided as-is, and is for demonstration
purposes only. Able Systems takes no responsibility for any system
implementations based on this code.

This very simple python script establishes USB communication with the Pipsta
printer sends a simple text string to the printer.

The code has been modified to demonstrate a simple method of detecting print
job completion by means of STATUS BYTE ENQUIRY.

Copyright (c) 2015 Able Systems Limited. All rights reserved.
'''
import argparse
import platform
import sys
import time
import usb.backend.libusb0 as libusb0

import usb.core
import usb.util

FEED_PAST_CUTTER = b'\n' * 10
USB_BUSY = 66
ENQUIRE_STATUS = b'\x1d\x05'

# STATUS BITS
HEAD_UP = 1
MECHANISM_RUNNING = 2
BUFFER_EMPTY = 4
PAPER_OUT = 8
RESERVED = 16
SPOOLING = 32
ERROR = 64
ALWAYS_SET = 128


# NOTE: The following section establishes communication to the Pipsta printer
# via USB. YOU DO NOT NEED TO UNDERSTAND THIS SECTION TO PROGRESS WITH THE
# TUTORIALS! ALTERING THIS SECTION IN ANY WAY CAN CAUSE A FAILURE TO COMMUNICATE
# WITH THE PIPSTA. If you are interested in learning about what is happening
# herein, please look at the following references:
#
# PyUSB: http://sourceforge.net/apps/trac/pyusb/
# ...which is a wrapper for...
# LibUSB: http://www.libusb.org/
#
# For full help on PyUSB, at the IDLE prompt, type:
# >>> import usb
# >>> help(usb)
# 'Deeper' help can be trawled by (e.g.):
# >>> help(usb.core)
#
# or at the Linux prompt, type:
# pydoc usb
# pydoc usb.core
PIPSTA_USB_VENDOR_ID = 0x0483
PIPSTA_USB_PRODUCT_ID = 0xA053

def parse_arguments():
    '''Parse the arguments passed to the script looking for a font file name
    and a text string to print.  If either are mssing defaults are used.
    '''
    txt = 'Hello World from Pipsta!'
    parser = argparse.ArgumentParser()
    parser.add_argument('text', help='the text to print',
                        nargs='*', default=txt.split())
    args = parser.parse_args()

    return ' '.join(args.text)

def connect():
    '''Establishes a read/write connection to the 1st Pipsta printer found on the USB
    bus.
    '''

    # Find the Pipsta's specific Vendor ID and Product ID (also known as vid
    # and pid)
    dev = usb.core.find(idVendor=PIPSTA_USB_VENDOR_ID,
                        idProduct=PIPSTA_USB_PRODUCT_ID,
                        backend=libusb0.get_backend())
    if dev is None:                 # if no such device is connected...
        raise IOError('Printer not found')  # ...report error

    try:
        if platform.system() == 'Linux':
            dev.reset()

        # Initialisation. Passing no arguments sets the configuration to the
        # currently active configuration.
        dev.set_configuration()
    except usb.core.USBError as ex:
        raise IOError('Failed to configure the printer', ex)
    except AttributeError as ex:
        raise IOError('Failed to configure the printer')

    cfg = dev.get_active_configuration()  # Get a handle to the active interface

    interface_number = cfg[(0, 0)].bInterfaceNumber
    usb.util.claim_interface(dev, interface_number)
    alternate_setting = usb.control.get_interface(dev, interface_number)
    intf = usb.util.find_descriptor(
        cfg, bInterfaceNumber=interface_number,
        bAlternateSetting=alternate_setting)

    ep_out = usb.util.find_descriptor(
        intf,
        custom_match=lambda e:
        usb.util.endpoint_direction(e.bEndpointAddress) ==
        usb.util.ENDPOINT_OUT
    )

    ep_in = usb.util.find_descriptor(
        intf,
        custom_match=lambda e:
        usb.util.endpoint_direction(e.bEndpointAddress) ==
        usb.util.ENDPOINT_IN
    )
    return(dev, ep_out, ep_in)


def main():
    """The main loop of the application.  Wrapping the code in a function
    prevents it being executed when various tools import the code.
    """
    dev, ep_out, ep_in = connect()
    if ep_out is None:  # check we have a real endpoint handle
        raise IOError("Could not find an endpoint to print to")
    if ep_in is None:
        raise IOError("Could not find an endpoint to read from")

    # Now that the USB endpoint is open, we can start to send data to the
    # printer.
    # The following opens the text_file, by using the 'with' statemnent there is
    # no need to close the text_file manually.  This method ensures that the
    # close is called in all situation (including unhandled exceptions).

    txt = parse_arguments()
    ep_out.write(b'\x1b!\x00')

    # Print a char at a time and check the printers buffer isn't full
    for x in txt:
        ep_out.write(x)    # write all the data to the USB OUT endpoint

        res = dev.ctrl_transfer(0xC0, 0x0E, 0x020E, 0, 2)
        while res[0] == USB_BUSY:
            time.sleep(0.01)
            res = dev.ctrl_transfer(0xC0, 0x0E, 0x020E, 0, 2)

    ep_out.write(FEED_PAST_CUTTER)

    ep_out.write(ENQUIRE_STATUS)
    response = ep_in.read(1)
    # Wait for the input buffer to empty
    while((response[0] & BUFFER_EMPTY)==0):
        ep_out.write(ENQUIRE_STATUS)
        response = ep_in.read(1)
        print("Await buffer emptying: {}".format(hex(response[0])))
        time.sleep(0.01)

    while(response[0] & MECHANISM_RUNNING):
        ep_out.write(ENQUIRE_STATUS)
        response = ep_in.read(1)
        print("Await Mech Stopping: {}".format(hex(response[0])))
        time.sleep(0.01)
    usb.util.dispose_resources(dev)


# Ensure that BasicPrint is ran in a stand-alone fashion (as intended) and not
# imported as a module. Prevents accidental execution of code.
if __name__ == '__main__':
    main()

Modifications

1) BasicPrint.py's:

#!python

FEED_PAST_CUTTER = b'\n' * 5

...has been modified to read:

#!python

FEED_PAST_CUTTER = b'\n' * 10

This will effectively append an additional 5 newline characters on to the end of the print and simulate a longer print job.

2) Both the STATUS ENQUIRY command, and the STATUS BITS in the printer's response have also been added:

#!python

ENQUIRE_STATUS = b'\x1d\x05'

# STATUS BITS
HEAD_UP = 1
MECHANISM_RUNNING = 2
BUFFER_EMPTY = 4
PAPER_OUT = 8
RESERVED = 16
SPOOLING = 32
ERROR = 64
ALWAYS_SET = 128

3) A new function, connect() has been introduced. This replaces the previous method of establishing a USB connection, and opens both Bulk Out and Bulk In Endpoints. As in BasicPrint.py, the Bulk Out Endpoint ep_out is used to transmit the print data to the printer, but it is also now used to transmit the STATUS ENQUIRY. The newly added Bulk In Endpoint ep_in will receive the response to the status enquiry.

4) Following the transmission of the trailing 10 newline characters, but prior to the disposal of the USB resources is the code to monitor for print job completion. Looking at this code in more detail:

#!python

ep_out.write(ENQUIRE_STATUS) 
response = ep_in.read(1)
    # Wait for the input buffer to empty
    while((response[0] & BUFFER_EMPTY)==0):
        ep_out.write(ENQUIRE_STATUS)
        response = ep_in.read(1)
        print("Await buffer emptying: {}".format(hex(response[0])))
        time.sleep(0.01)

    while(response[0] & MECHANISM_RUNNING):
        ep_out.write(ENQUIRE_STATUS)
        response = ep_in.read(1)
        print("Await Mech Stopping: {}".format(hex(response[0])))
        time.sleep(0.01)

a) A STATUS ENQUIRY is sent out on the Bulk Out Endpoint,

b) A request for a single byte (minimum) response is made of the Bulk In Endpoint,

c) Execution now remains in a while() loop until it is determined that the BUFFER EMPTY flag has been set. A 10ms delay is invoked each time around the loop before polling again,

d) A second loop to wait for the MECHANISM RUNNING flag has been cleared, once again introducing a delay before polling again,

e) In order to show the operation of the loops, print() commands have been included, which print the STATUS ENQUIRY responses to the terminal. These can be removed or commented-out once the sequence of responses is understood:

TIP: Note that the inclusion of print() commands will serve to add appreciable delays to the loops.

await_buffer_empty1.PNG

...and concluding with:

await_mech2.PNG

Consulting the STATUS BYTE table once again:

pertinent_status_bits.PNG

...we can see that:

0x80 - Data buffer NOT EMPTY, Mechanism STOPPED,

0x82 - Data buffer NOT EMPTY, Mechanism RUNNING,

0x86 - Data buffer EMPTY, Mechanism RUNNING,

0x84 - Data buffer EMPTY, Mechanism STOPPED.

[End of Document]

Updated