Add support gRPC (gRPC proto 3)

This commit is contained in:
Namhyeon Go 2023-09-05 03:24:42 +09:00 committed by GitHub
parent f83b39bc88
commit 9c7ba07017
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 202 additions and 0 deletions

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: WelsonAppLoader.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15WelsonAppLoader.proto\"\x1d\n\nAppRequest\x12\x0f\n\x07\x61ppName\x18\x01 \x01(\t\"#\n\x0b\x41ppResponse\x12\x14\n\x0cresponseText\x18\x01 \x01(\t25\n\x0fWelsonAppLoader\x12\"\n\x03Run\x12\x0b.AppRequest\x1a\x0c.AppResponse\"\x00\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'WelsonAppLoader_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_APPREQUEST']._serialized_start=25
_globals['_APPREQUEST']._serialized_end=54
_globals['_APPRESPONSE']._serialized_start=56
_globals['_APPRESPONSE']._serialized_end=91
_globals['_WELSONAPPLOADER']._serialized_start=93
_globals['_WELSONAPPLOADER']._serialized_end=146
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,17 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional
DESCRIPTOR: _descriptor.FileDescriptor
class AppRequest(_message.Message):
__slots__ = ["appName"]
APPNAME_FIELD_NUMBER: _ClassVar[int]
appName: str
def __init__(self, appName: _Optional[str] = ...) -> None: ...
class AppResponse(_message.Message):
__slots__ = ["responseText"]
RESPONSETEXT_FIELD_NUMBER: _ClassVar[int]
responseText: str
def __init__(self, responseText: _Optional[str] = ...) -> None: ...

View File

@ -0,0 +1,66 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import WelsonAppLoader_pb2 as WelsonAppLoader__pb2
class WelsonAppLoaderStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Run = channel.unary_unary(
'/WelsonAppLoader/Run',
request_serializer=WelsonAppLoader__pb2.AppRequest.SerializeToString,
response_deserializer=WelsonAppLoader__pb2.AppResponse.FromString,
)
class WelsonAppLoaderServicer(object):
"""Missing associated documentation comment in .proto file."""
def Run(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_WelsonAppLoaderServicer_to_server(servicer, server):
rpc_method_handlers = {
'Run': grpc.unary_unary_rpc_method_handler(
servicer.Run,
request_deserializer=WelsonAppLoader__pb2.AppRequest.FromString,
response_serializer=WelsonAppLoader__pb2.AppResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'WelsonAppLoader', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class WelsonAppLoader(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def Run(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/WelsonAppLoader/Run',
WelsonAppLoader__pb2.AppRequest.SerializeToString,
WelsonAppLoader__pb2.AppResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@ -0,0 +1,62 @@
#-*- coding: utf-8 -*-
# apploader.py
# WelsonJS app loader over gRPC protocol (gRPC proto 3) - Server
# https://github.com/gnh1201/welsonjs
import os
import sys
import win32com
import win32com.client
import pythoncom
sys.path.insert(0, '')
from concurrent import futures
import logging
import grpc
import WelsonAppLoader_pb2
import WelsonAppLoader_pb2_grpc
# get file path
workingDirectory = os.path.join(os.path.dirname(__file__), '../../..')
print('[*] Working Directory: ' + workingDirectory);
def runApp(appName):
# Initalization for SubThread
pythoncom.CoInitialize()
# load scriptcontrol.js file
f = open(workingDirectory + "/scriptcontrol.js", 'r')
payload_code = f.read()
# load JScript engine
sc = win32com.client.Dispatch('MSScriptControl.ScriptControl')
sc.Language = "JScript"
sc.AddCode(payload_code);
sc.Run('setWorkingDirectory', workingDirectory);
result = sc.Run('run', appName)
# check a result
print (result)
# Return a result
return result
class WelsonAppLoader(WelsonAppLoader_pb2_grpc.WelsonAppLoaderServicer):
def Run(self, request, context):
return WelsonAppLoader_pb2.AppResponse(responseText=runApp(request.appName))
def serve():
port = "50051"
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) # Only allow 1 worker
WelsonAppLoader_pb2_grpc.add_WelsonAppLoaderServicer_to_server(WelsonAppLoader(), server)
server.add_insecure_port("[::]:" + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig()
serve()

View File

@ -0,0 +1,27 @@
#-*- coding: utf-8 -*-
# apploader_test.py
# WelsonJS app loader over gRPC protocol (gRPC proto 3) - Client
# https://github.com/gnh1201/welsonjs
from __future__ import print_function
import sys
sys.path.insert(0, '')
from concurrent import futures
import logging
import grpc
import WelsonAppLoader_pb2
import WelsonAppLoader_pb2_grpc
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = WelsonAppLoader_pb2_grpc.WelsonAppLoaderStub(channel)
response = stub.Run(WelsonAppLoader_pb2.AppRequest(appName="helloworld"))
print("Response: " + response.responseText)
if __name__ == "__main__":
logging.basicConfig()
run()